diff options
Diffstat (limited to 'src/im-asapo.c')
-rw-r--r-- | src/im-asapo.c | 165 |
1 files changed, 160 insertions, 5 deletions
diff --git a/src/im-asapo.c b/src/im-asapo.c index 82934d50..c34954e2 100644 --- a/src/im-asapo.c +++ b/src/im-asapo.c @@ -3,11 +3,11 @@ * * ASAP::O data interface * - * Copyright © 2021 Deutsches Elektronen-Synchrotron DESY, - * a research centre of the Helmholtz Association. + * Copyright © 2021-2023 Deutsches Elektronen-Synchrotron DESY, + * a research centre of the Helmholtz Association. * * Authors: - * 2021 Thomas White <taw@physics.org> + * 2021-2023 Thomas White <taw@physics.org> * * This file is part of CrystFEL. * @@ -35,7 +35,9 @@ #include <stdint.h> #include <assert.h> #include <unistd.h> +#include <sys/time.h> #include <asapo/consumer_c.h> +#include <asapo/producer_c.h> #include <image.h> #include <utils.h> @@ -50,6 +52,7 @@ struct im_asapo { char *stream; AsapoConsumerHandle consumer; + AsapoProducerHandle producer; AsapoStringHandle group_id; int wait_for_stream; }; @@ -70,6 +73,54 @@ static void show_asapo_error(const char *msg, const AsapoErrorHandle err) } +static int create_producer(struct im_asapo *a, struct im_asapo_params *params) +{ + char *source; + AsapoSourceCredentialsHandle cred; + AsapoErrorHandle err = asapo_new_handle(); + + if ( !params->write_output_stream ) { + a->producer = NULL; + return 0; + } + + source = malloc(strlen(params->source)+6); + if ( source == NULL ) return 1; + + strcpy(source, params->source); + strcat(source, "_hits"); + + cred = asapo_create_source_credentials(kProcessed, + "auto", /* instance ID */ + "indexamajig", /* pipeline step */ + params->beamtime, + "", /* beamline */ + source, + params->token); + STATUS("Writing hits-only output stream as data source '%s'\n", source); + free(source); + + a->producer = asapo_create_producer(params->endpoint, + 8, /* Number of sender threads */ + kTcp, + cred, + 30000, /* Timeout */ + &err); + + if ( asapo_is_error(err) ) { + show_asapo_error("Cannot create ASAP::O producer", err); + asapo_free_handle(&cred); + asapo_free_handle(&err); + return 1; + } + + asapo_producer_set_log_level(a->producer, Debug); + + asapo_free_handle(&err); + return 0; +} + + struct im_asapo *im_asapo_connect(struct im_asapo_params *params) { struct im_asapo *a; @@ -108,18 +159,22 @@ struct im_asapo *im_asapo_connect(struct im_asapo_params *params) params->source, params->token); a->consumer = asapo_create_consumer(params->endpoint, "auto", 0, cred, &err); - asapo_free_handle(&cred); if ( asapo_is_error(err) ) { show_asapo_error("Cannot create ASAP::O consumer", err); + asapo_free_handle(&cred); free(a); return NULL; } + if ( create_producer(a, params) ) return NULL; + a->stream = strdup(params->stream); asapo_consumer_set_timeout(a->consumer, 3000); a->group_id = asapo_string_from_c_str(params->group_id); a->wait_for_stream = params->wait_for_stream; + asapo_free_handle(&cred); + return a; } @@ -144,7 +199,7 @@ static int stream_empty(struct im_asapo *a) void *im_asapo_fetch(struct im_asapo *a, size_t *pdata_size, char **pmeta, char **pfilename, char **pevent, - int *pfinished) + int *pfinished, int *pmessageid) { void *data_copy; AsapoMessageMetaHandle meta; @@ -204,6 +259,7 @@ void *im_asapo_fetch(struct im_asapo *a, size_t *pdata_size, *pmeta = strdup(asapo_message_meta_get_metadata(meta)); *pfilename = strdup(asapo_message_meta_get_name(meta)); *pevent = strdup("//"); + *pmessageid = asapo_message_meta_get_id(meta); profile_end("copy-meta"); asapo_free_handle(&err); @@ -215,6 +271,105 @@ void *im_asapo_fetch(struct im_asapo *a, size_t *pdata_size, } +static void send_callback(void *a, AsapoRequestCallbackPayloadHandle payload, + AsapoErrorHandle err) +{ + if ( asapo_is_error(err) ) { + show_asapo_error("ASAP::O send error", err); + } +} + + +static void send_real(struct im_asapo *a, struct image *image) +{ + AsapoMessageHeaderHandle header; + AsapoErrorHandle err; + char filename[1024]; + + snprintf(filename, 1024, "processed/%s_hits/%s-%i.data", + a->stream, a->stream, image->serial); + + header = asapo_create_message_header(image->serial, + image->data_block_size, + filename, + image->meta_data, + 0, /* Dataset substream */ + 0, + 0); /* Auto ID */ + + struct timeval tv; + gettimeofday(&tv,NULL); + STATUS("sent %s at %lli . %lli\n", filename, tv.tv_sec, tv.tv_usec); + + err = asapo_new_handle(); + asapo_producer_send(a->producer, header, image->data_block, + kDefaultIngestMode, a->stream, + send_callback, &err); + if ( asapo_is_error(err) ) { + show_asapo_error("Couldn't send ASAP::O message", err); + asapo_free_handle(&header); + asapo_free_handle(&err); + return; + } + + asapo_free_handle(&header); + asapo_free_handle(&err); +} + + +static void send_placeholder(struct im_asapo *a, struct image *image) +{ + AsapoMessageHeaderHandle header; + AsapoErrorHandle err; + char filename[1024]; + + snprintf(filename, 1024, "processed/%s_hits/%s-%i.placeholder", + a->stream, a->stream, image->serial); + + struct timeval tv; + gettimeofday(&tv,NULL); + STATUS("sent %s at %lli . %lli\n", filename, tv.tv_sec, tv.tv_usec); + + header = asapo_create_message_header(image->serial, + 8, /* strlen("SKIPPED"+\0) */ + filename, + image->meta_data, + 0, /* Dataset substream */ + 0, + 0); /* Auto ID */ + + err = asapo_new_handle(); + asapo_producer_send(a->producer, header, "SKIPPED", + kDefaultIngestMode, a->stream, + send_callback, &err); + if ( asapo_is_error(err) ) { + show_asapo_error("Couldn't send ASAP::O message", err); + asapo_free_handle(&header); + asapo_free_handle(&err); + return; + } + + asapo_free_handle(&header); + asapo_free_handle(&err); +} + + +/* Send the image to the output ASAP::O stream, if it's a hit. Otherwise, + * send a placeholder */ +void im_asapo_send(struct im_asapo *a, struct image *image, int hit) +{ + if ( a == NULL ) return; + if ( a->producer == NULL ) return; + profile_start("asapo-send"); + if ( hit ) { + send_real(a, image); + } else { + send_placeholder(a, image); + } + profile_end("asapo-send"); +} + + void im_asapo_shutdown(struct im_asapo *a) { if ( a == NULL ) return; |