aboutsummaryrefslogtreecommitdiff
path: root/src/im-asapo.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/im-asapo.c')
-rw-r--r--src/im-asapo.c165
1 files changed, 160 insertions, 5 deletions
diff --git a/src/im-asapo.c b/src/im-asapo.c
index 82934d50..c34954e2 100644
--- a/src/im-asapo.c
+++ b/src/im-asapo.c
@@ -3,11 +3,11 @@
*
* ASAP::O data interface
*
- * Copyright © 2021 Deutsches Elektronen-Synchrotron DESY,
- * a research centre of the Helmholtz Association.
+ * Copyright © 2021-2023 Deutsches Elektronen-Synchrotron DESY,
+ * a research centre of the Helmholtz Association.
*
* Authors:
- * 2021 Thomas White <taw@physics.org>
+ * 2021-2023 Thomas White <taw@physics.org>
*
* This file is part of CrystFEL.
*
@@ -35,7 +35,9 @@
#include <stdint.h>
#include <assert.h>
#include <unistd.h>
+#include <sys/time.h>
#include <asapo/consumer_c.h>
+#include <asapo/producer_c.h>
#include <image.h>
#include <utils.h>
@@ -50,6 +52,7 @@ struct im_asapo
{
char *stream;
AsapoConsumerHandle consumer;
+ AsapoProducerHandle producer;
AsapoStringHandle group_id;
int wait_for_stream;
};
@@ -70,6 +73,54 @@ static void show_asapo_error(const char *msg, const AsapoErrorHandle err)
}
+static int create_producer(struct im_asapo *a, struct im_asapo_params *params)
+{
+ char *source;
+ AsapoSourceCredentialsHandle cred;
+ AsapoErrorHandle err = asapo_new_handle();
+
+ if ( !params->write_output_stream ) {
+ a->producer = NULL;
+ return 0;
+ }
+
+ source = malloc(strlen(params->source)+6);
+ if ( source == NULL ) return 1;
+
+ strcpy(source, params->source);
+ strcat(source, "_hits");
+
+ cred = asapo_create_source_credentials(kProcessed,
+ "auto", /* instance ID */
+ "indexamajig", /* pipeline step */
+ params->beamtime,
+ "", /* beamline */
+ source,
+ params->token);
+ STATUS("Writing hits-only output stream as data source '%s'\n", source);
+ free(source);
+
+ a->producer = asapo_create_producer(params->endpoint,
+ 8, /* Number of sender threads */
+ kTcp,
+ cred,
+ 30000, /* Timeout */
+ &err);
+
+ if ( asapo_is_error(err) ) {
+ show_asapo_error("Cannot create ASAP::O producer", err);
+ asapo_free_handle(&cred);
+ asapo_free_handle(&err);
+ return 1;
+ }
+
+ asapo_producer_set_log_level(a->producer, Debug);
+
+ asapo_free_handle(&err);
+ return 0;
+}
+
+
struct im_asapo *im_asapo_connect(struct im_asapo_params *params)
{
struct im_asapo *a;
@@ -108,18 +159,22 @@ struct im_asapo *im_asapo_connect(struct im_asapo_params *params)
params->source,
params->token);
a->consumer = asapo_create_consumer(params->endpoint, "auto", 0, cred, &err);
- asapo_free_handle(&cred);
if ( asapo_is_error(err) ) {
show_asapo_error("Cannot create ASAP::O consumer", err);
+ asapo_free_handle(&cred);
free(a);
return NULL;
}
+ if ( create_producer(a, params) ) return NULL;
+
a->stream = strdup(params->stream);
asapo_consumer_set_timeout(a->consumer, 3000);
a->group_id = asapo_string_from_c_str(params->group_id);
a->wait_for_stream = params->wait_for_stream;
+ asapo_free_handle(&cred);
+
return a;
}
@@ -144,7 +199,7 @@ static int stream_empty(struct im_asapo *a)
void *im_asapo_fetch(struct im_asapo *a, size_t *pdata_size,
char **pmeta, char **pfilename, char **pevent,
- int *pfinished)
+ int *pfinished, int *pmessageid)
{
void *data_copy;
AsapoMessageMetaHandle meta;
@@ -204,6 +259,7 @@ void *im_asapo_fetch(struct im_asapo *a, size_t *pdata_size,
*pmeta = strdup(asapo_message_meta_get_metadata(meta));
*pfilename = strdup(asapo_message_meta_get_name(meta));
*pevent = strdup("//");
+ *pmessageid = asapo_message_meta_get_id(meta);
profile_end("copy-meta");
asapo_free_handle(&err);
@@ -215,6 +271,105 @@ void *im_asapo_fetch(struct im_asapo *a, size_t *pdata_size,
}
+static void send_callback(void *a, AsapoRequestCallbackPayloadHandle payload,
+ AsapoErrorHandle err)
+{
+ if ( asapo_is_error(err) ) {
+ show_asapo_error("ASAP::O send error", err);
+ }
+}
+
+
+static void send_real(struct im_asapo *a, struct image *image)
+{
+ AsapoMessageHeaderHandle header;
+ AsapoErrorHandle err;
+ char filename[1024];
+
+ snprintf(filename, 1024, "processed/%s_hits/%s-%i.data",
+ a->stream, a->stream, image->serial);
+
+ header = asapo_create_message_header(image->serial,
+ image->data_block_size,
+ filename,
+ image->meta_data,
+ 0, /* Dataset substream */
+ 0,
+ 0); /* Auto ID */
+
+ struct timeval tv;
+ gettimeofday(&tv,NULL);
+ STATUS("sent %s at %lli . %lli\n", filename, tv.tv_sec, tv.tv_usec);
+
+ err = asapo_new_handle();
+ asapo_producer_send(a->producer, header, image->data_block,
+ kDefaultIngestMode, a->stream,
+ send_callback, &err);
+ if ( asapo_is_error(err) ) {
+ show_asapo_error("Couldn't send ASAP::O message", err);
+ asapo_free_handle(&header);
+ asapo_free_handle(&err);
+ return;
+ }
+
+ asapo_free_handle(&header);
+ asapo_free_handle(&err);
+}
+
+
+static void send_placeholder(struct im_asapo *a, struct image *image)
+{
+ AsapoMessageHeaderHandle header;
+ AsapoErrorHandle err;
+ char filename[1024];
+
+ snprintf(filename, 1024, "processed/%s_hits/%s-%i.placeholder",
+ a->stream, a->stream, image->serial);
+
+ struct timeval tv;
+ gettimeofday(&tv,NULL);
+ STATUS("sent %s at %lli . %lli\n", filename, tv.tv_sec, tv.tv_usec);
+
+ header = asapo_create_message_header(image->serial,
+ 8, /* strlen("SKIPPED"+\0) */
+ filename,
+ image->meta_data,
+ 0, /* Dataset substream */
+ 0,
+ 0); /* Auto ID */
+
+ err = asapo_new_handle();
+ asapo_producer_send(a->producer, header, "SKIPPED",
+ kDefaultIngestMode, a->stream,
+ send_callback, &err);
+ if ( asapo_is_error(err) ) {
+ show_asapo_error("Couldn't send ASAP::O message", err);
+ asapo_free_handle(&header);
+ asapo_free_handle(&err);
+ return;
+ }
+
+ asapo_free_handle(&header);
+ asapo_free_handle(&err);
+}
+
+
+/* Send the image to the output ASAP::O stream, if it's a hit. Otherwise,
+ * send a placeholder */
+void im_asapo_send(struct im_asapo *a, struct image *image, int hit)
+{
+ if ( a == NULL ) return;
+ if ( a->producer == NULL ) return;
+ profile_start("asapo-send");
+ if ( hit ) {
+ send_real(a, image);
+ } else {
+ send_placeholder(a, image);
+ }
+ profile_end("asapo-send");
+}
+
+
void im_asapo_shutdown(struct im_asapo *a)
{
if ( a == NULL ) return;