diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/im-asapo.c | 165 | ||||
-rw-r--r-- | src/im-asapo.h | 14 | ||||
-rw-r--r-- | src/im-sandbox.c | 11 | ||||
-rw-r--r-- | src/indexamajig.c | 8 | ||||
-rw-r--r-- | src/process_image.c | 5 | ||||
-rw-r--r-- | src/process_image.h | 4 |
6 files changed, 195 insertions, 12 deletions
diff --git a/src/im-asapo.c b/src/im-asapo.c index 82934d50..c34954e2 100644 --- a/src/im-asapo.c +++ b/src/im-asapo.c @@ -3,11 +3,11 @@ * * ASAP::O data interface * - * Copyright © 2021 Deutsches Elektronen-Synchrotron DESY, - * a research centre of the Helmholtz Association. + * Copyright © 2021-2023 Deutsches Elektronen-Synchrotron DESY, + * a research centre of the Helmholtz Association. * * Authors: - * 2021 Thomas White <taw@physics.org> + * 2021-2023 Thomas White <taw@physics.org> * * This file is part of CrystFEL. * @@ -35,7 +35,9 @@ #include <stdint.h> #include <assert.h> #include <unistd.h> +#include <sys/time.h> #include <asapo/consumer_c.h> +#include <asapo/producer_c.h> #include <image.h> #include <utils.h> @@ -50,6 +52,7 @@ struct im_asapo { char *stream; AsapoConsumerHandle consumer; + AsapoProducerHandle producer; AsapoStringHandle group_id; int wait_for_stream; }; @@ -70,6 +73,54 @@ static void show_asapo_error(const char *msg, const AsapoErrorHandle err) } +static int create_producer(struct im_asapo *a, struct im_asapo_params *params) +{ + char *source; + AsapoSourceCredentialsHandle cred; + AsapoErrorHandle err = asapo_new_handle(); + + if ( !params->write_output_stream ) { + a->producer = NULL; + return 0; + } + + source = malloc(strlen(params->source)+6); + if ( source == NULL ) return 1; + + strcpy(source, params->source); + strcat(source, "_hits"); + + cred = asapo_create_source_credentials(kProcessed, + "auto", /* instance ID */ + "indexamajig", /* pipeline step */ + params->beamtime, + "", /* beamline */ + source, + params->token); + STATUS("Writing hits-only output stream as data source '%s'\n", source); + free(source); + + a->producer = asapo_create_producer(params->endpoint, + 8, /* Number of sender threads */ + kTcp, + cred, + 30000, /* Timeout */ + &err); + + if ( asapo_is_error(err) ) { + show_asapo_error("Cannot create ASAP::O producer", err); + asapo_free_handle(&cred); + asapo_free_handle(&err); + return 1; + } + + asapo_producer_set_log_level(a->producer, Debug); + + asapo_free_handle(&err); + return 0; +} + + struct im_asapo *im_asapo_connect(struct im_asapo_params *params) { struct im_asapo *a; @@ -108,18 +159,22 @@ struct im_asapo *im_asapo_connect(struct im_asapo_params *params) params->source, params->token); a->consumer = asapo_create_consumer(params->endpoint, "auto", 0, cred, &err); - asapo_free_handle(&cred); if ( asapo_is_error(err) ) { show_asapo_error("Cannot create ASAP::O consumer", err); + asapo_free_handle(&cred); free(a); return NULL; } + if ( create_producer(a, params) ) return NULL; + a->stream = strdup(params->stream); asapo_consumer_set_timeout(a->consumer, 3000); a->group_id = asapo_string_from_c_str(params->group_id); a->wait_for_stream = params->wait_for_stream; + asapo_free_handle(&cred); + return a; } @@ -144,7 +199,7 @@ static int stream_empty(struct im_asapo *a) void *im_asapo_fetch(struct im_asapo *a, size_t *pdata_size, char **pmeta, char **pfilename, char **pevent, - int *pfinished) + int *pfinished, int *pmessageid) { void *data_copy; AsapoMessageMetaHandle meta; @@ -204,6 +259,7 @@ void *im_asapo_fetch(struct im_asapo *a, size_t *pdata_size, *pmeta = strdup(asapo_message_meta_get_metadata(meta)); *pfilename = strdup(asapo_message_meta_get_name(meta)); *pevent = strdup("//"); + *pmessageid = asapo_message_meta_get_id(meta); profile_end("copy-meta"); asapo_free_handle(&err); @@ -215,6 +271,105 @@ void *im_asapo_fetch(struct im_asapo *a, size_t *pdata_size, } +static void send_callback(void *a, AsapoRequestCallbackPayloadHandle payload, + AsapoErrorHandle err) +{ + if ( asapo_is_error(err) ) { + show_asapo_error("ASAP::O send error", err); + } +} + + +static void send_real(struct im_asapo *a, struct image *image) +{ + AsapoMessageHeaderHandle header; + AsapoErrorHandle err; + char filename[1024]; + + snprintf(filename, 1024, "processed/%s_hits/%s-%i.data", + a->stream, a->stream, image->serial); + + header = asapo_create_message_header(image->serial, + image->data_block_size, + filename, + image->meta_data, + 0, /* Dataset substream */ + 0, + 0); /* Auto ID */ + + struct timeval tv; + gettimeofday(&tv,NULL); + STATUS("sent %s at %lli . %lli\n", filename, tv.tv_sec, tv.tv_usec); + + err = asapo_new_handle(); + asapo_producer_send(a->producer, header, image->data_block, + kDefaultIngestMode, a->stream, + send_callback, &err); + if ( asapo_is_error(err) ) { + show_asapo_error("Couldn't send ASAP::O message", err); + asapo_free_handle(&header); + asapo_free_handle(&err); + return; + } + + asapo_free_handle(&header); + asapo_free_handle(&err); +} + + +static void send_placeholder(struct im_asapo *a, struct image *image) +{ + AsapoMessageHeaderHandle header; + AsapoErrorHandle err; + char filename[1024]; + + snprintf(filename, 1024, "processed/%s_hits/%s-%i.placeholder", + a->stream, a->stream, image->serial); + + struct timeval tv; + gettimeofday(&tv,NULL); + STATUS("sent %s at %lli . %lli\n", filename, tv.tv_sec, tv.tv_usec); + + header = asapo_create_message_header(image->serial, + 8, /* strlen("SKIPPED"+\0) */ + filename, + image->meta_data, + 0, /* Dataset substream */ + 0, + 0); /* Auto ID */ + + err = asapo_new_handle(); + asapo_producer_send(a->producer, header, "SKIPPED", + kDefaultIngestMode, a->stream, + send_callback, &err); + if ( asapo_is_error(err) ) { + show_asapo_error("Couldn't send ASAP::O message", err); + asapo_free_handle(&header); + asapo_free_handle(&err); + return; + } + + asapo_free_handle(&header); + asapo_free_handle(&err); +} + + +/* Send the image to the output ASAP::O stream, if it's a hit. Otherwise, + * send a placeholder */ +void im_asapo_send(struct im_asapo *a, struct image *image, int hit) +{ + if ( a == NULL ) return; + if ( a->producer == NULL ) return; + profile_start("asapo-send"); + if ( hit ) { + send_real(a, image); + } else { + send_placeholder(a, image); + } + profile_end("asapo-send"); +} + + void im_asapo_shutdown(struct im_asapo *a) { if ( a == NULL ) return; diff --git a/src/im-asapo.h b/src/im-asapo.h index cda9fbd9..43691700 100644 --- a/src/im-asapo.h +++ b/src/im-asapo.h @@ -43,8 +43,11 @@ struct im_asapo_params char *source; char *stream; int wait_for_stream; + int write_output_stream; }; +struct im_asapo; + #if defined(HAVE_ASAPO) extern struct im_asapo *im_asapo_connect(struct im_asapo_params *params); @@ -53,7 +56,9 @@ extern void im_asapo_shutdown(struct im_asapo *a); extern void *im_asapo_fetch(struct im_asapo *a, size_t *pdata_size, char **pmeta, char **pfilename, char **pevent, - int *pfinished); + int *pfinished, int *pmessageid); + +extern void im_asapo_send(struct im_asapo *a, struct image *image, int hit); #else /* defined(HAVE_ASAPO) */ @@ -69,16 +74,21 @@ static UNUSED void im_asapo_shutdown(struct im_asapo *a) static UNUSED void *im_asapo_fetch(struct im_asapo *a, size_t *psize, char **pmeta, char **pfilename, char **pevent, - int *pfinished) + int *pfinished, int *pmessageid) { *psize = 0; *pmeta = NULL; *pfilename = NULL; *pevent = NULL; *pfinished = 1; + *pmessageid = 0; return NULL; } +static UNUSED void im_asapo_send(struct im_asapo *a, struct image *image, int hit) +{ +} + #endif /* defined(HAVE_ASAPO) */ #endif /* CRYSTFEL_ASAPO_H */ diff --git a/src/im-sandbox.c b/src/im-sandbox.c index 51d9db99..7f02b1cf 100644 --- a/src/im-sandbox.c +++ b/src/im-sandbox.c @@ -476,6 +476,7 @@ static int run_work(const struct index_args *iargs, Stream *st, char *filename; char *event; int finished = 0; + int message_id; profile_start("asapo-fetch"); set_last_task(sb->shared->last_task[cookie], "ASAPO fetch"); @@ -484,7 +485,8 @@ static int run_work(const struct index_args *iargs, Stream *st, &pargs.asapo_meta, &filename, &event, - &finished); + &finished, + &message_id); profile_end("asapo-fetch"); if ( pargs.asapo_data != NULL ) { ok = 1; @@ -496,6 +498,10 @@ static int run_work(const struct index_args *iargs, Stream *st, pargs.filename = filename; pargs.event = event; sb->shared->end_of_stream[cookie] = 0; + + /* We will also use ASAP::O's serial number + * instead of our own. */ + ser = message_id; } else { if ( finished ) { sb->shared->end_of_stream[cookie] = 1; @@ -510,7 +516,8 @@ static int run_work(const struct index_args *iargs, Stream *st, sb->shared->time_last_start[cookie] = get_monotonic_seconds(); profile_start("process-image"); process_image(iargs, &pargs, st, cookie, tmpdir, ser, - sb->shared, sb->shared->last_task[cookie]); + sb->shared, sb->shared->last_task[cookie], + asapostuff); profile_end("process-image"); } diff --git a/src/indexamajig.c b/src/indexamajig.c index 427843bc..a7b5ec9d 100644 --- a/src/indexamajig.c +++ b/src/indexamajig.c @@ -449,6 +449,10 @@ static error_t parse_arg(int key, char *arg, struct argp_state *state) break; case 222 : + args->asapo_params.write_output_stream = 1; + break; + + case 223 : args->cpu_pin = 1; break; @@ -887,6 +891,7 @@ int main(int argc, char *argv[]) args.asapo_params.source = NULL; args.asapo_params.stream = NULL; args.asapo_params.wait_for_stream = 0; + args.asapo_params.write_output_stream = 0; args.cpu_pin = 0; args.serial_start = 1; args.if_peaks = 1; @@ -1009,7 +1014,8 @@ int main(int argc, char *argv[]) {"asapo-stream", 220, "str", OPTION_NO_USAGE, "ASAP::O stream name"}, {"asapo-wait-for-stream", 221, NULL, OPTION_NO_USAGE, "Wait for ASAP::O stream to appear"}, - {"cpu-pin", 222, NULL, OPTION_NO_USAGE, "Pin worker processes to CPUs"}, + {"asapo-output-stream", 222, NULL, OPTION_NO_USAGE, "Create an ASAP::O hits-only stream"}, + {"cpu-pin", 223, NULL, OPTION_NO_USAGE, "Pin worker processes to CPUs"}, {NULL, 0, 0, OPTION_DOC, "Peak search options:", 3}, {"peaks", 301, "method", 0, "Peak search method. Default: zaef"}, diff --git a/src/process_image.c b/src/process_image.c index 5bc527a1..8acd3e86 100644 --- a/src/process_image.c +++ b/src/process_image.c @@ -56,6 +56,7 @@ #include "predict-refine.h" #include "im-sandbox.h" #include "im-zmq.h" +#include "im-asapo.h" static float **backup_image_data(float **dp, struct detgeom *det) { @@ -177,7 +178,7 @@ static struct image *file_wait_open_read(const char *filename, void process_image(const struct index_args *iargs, struct pattern_args *pargs, Stream *st, int cookie, const char *tmpdir, int serial, struct sb_shm *sb_shared, - char *last_task) + char *last_task, struct im_asapo *asapostuff) { struct image *image; int i; @@ -480,6 +481,8 @@ streamwrite: "%s\n", n, n>1?"s":"", image->filename, image->ev); } + im_asapo_send(asapostuff, image, image->hit); + out: /* Count crystals which are still good */ set_last_task(last_task, "process_image finalisation"); diff --git a/src/process_image.h b/src/process_image.h index bb68ce19..cca2f7d2 100644 --- a/src/process_image.h +++ b/src/process_image.h @@ -45,6 +45,7 @@ struct index_args; #include "im-sandbox.h" #include "peaks.h" #include "image.h" +#include "im-asapo.h" /* Information about the indexing process which is common to all patterns */ @@ -113,7 +114,8 @@ struct pattern_args extern void process_image(const struct index_args *iargs, struct pattern_args *pargs, Stream *st, int cookie, const char *tmpdir, int serial, - struct sb_shm *sb_shared, char *last_task); + struct sb_shm *sb_shared, char *last_task, + struct im_asapo *asapostuff); #endif /* PROCESS_IMAGE_H */ |