From 6d11d4f13eb46d67f5a619dec93f4b6c4c0d9718 Mon Sep 17 00:00:00 2001 From: Thomas White Date: Wed, 12 Apr 2023 17:10:24 +0200 Subject: Meson: Add ASAP::O producer dependency --- meson.build | 15 +++++++++++---- 1 file changed, 11 insertions(+), 4 deletions(-) diff --git a/meson.build b/meson.build index 8f6567fa..08247ba3 100644 --- a/meson.build +++ b/meson.build @@ -53,9 +53,16 @@ if zmqdep.found() conf_data.set10('HAVE_ZMQ', true) endif -asapodep = dependency('libasapo-consumer', required: false) -if asapodep.found() - conf_data.set10('HAVE_ASAPO', true) +# There are two separate dependencies for ASAP::O (consumer and producer) +# We need both of them. +asapoproddep = dependency('libasapo-producer', required: false) +if asapoproddep.found() + asapodep = dependency('libasapo-consumer', required: true) + if asapodep.found() + conf_data.set10('HAVE_ASAPO', true) + endif +else + asapodep = dependency('', required: false) endif if cc.has_function('clock_gettime', prefix: '#include ') @@ -161,7 +168,7 @@ endif indexamajig = executable('indexamajig', indexamajig_sources, dependencies: [mdep, libcrystfeldep, gsldep, - pthreaddep, zmqdep, asapodep], + pthreaddep, zmqdep, asapodep, asapoproddep], install: true, install_rpath: '$ORIGIN/../lib64/:$ORIGIN/../lib') -- cgit v1.2.3 From e922cc649959d2f05dc4df44b0d5889b0fcdb32c Mon Sep 17 00:00:00 2001 From: Thomas White Date: Wed, 12 Apr 2023 17:12:10 +0200 Subject: ASAP::O: Use message ID as serial number --- src/im-asapo.c | 9 +++++---- src/im-asapo.h | 5 +++-- src/im-sandbox.c | 8 +++++++- 3 files changed, 15 insertions(+), 7 deletions(-) diff --git a/src/im-asapo.c b/src/im-asapo.c index 82934d50..43455298 100644 --- a/src/im-asapo.c +++ b/src/im-asapo.c @@ -3,11 +3,11 @@ * * ASAP::O data interface * - * Copyright © 2021 Deutsches Elektronen-Synchrotron DESY, - * a research centre of the Helmholtz Association. + * Copyright © 2021-2023 Deutsches Elektronen-Synchrotron DESY, + * a research centre of the Helmholtz Association. * * Authors: - * 2021 Thomas White + * 2021-2023 Thomas White * * This file is part of CrystFEL. * @@ -144,7 +144,7 @@ static int stream_empty(struct im_asapo *a) void *im_asapo_fetch(struct im_asapo *a, size_t *pdata_size, char **pmeta, char **pfilename, char **pevent, - int *pfinished) + int *pfinished, int *pmessageid) { void *data_copy; AsapoMessageMetaHandle meta; @@ -204,6 +204,7 @@ void *im_asapo_fetch(struct im_asapo *a, size_t *pdata_size, *pmeta = strdup(asapo_message_meta_get_metadata(meta)); *pfilename = strdup(asapo_message_meta_get_name(meta)); *pevent = strdup("//"); + *pmessageid = asapo_message_meta_get_id(meta); profile_end("copy-meta"); asapo_free_handle(&err); diff --git a/src/im-asapo.h b/src/im-asapo.h index cda9fbd9..cf7edfe4 100644 --- a/src/im-asapo.h +++ b/src/im-asapo.h @@ -53,7 +53,7 @@ extern void im_asapo_shutdown(struct im_asapo *a); extern void *im_asapo_fetch(struct im_asapo *a, size_t *pdata_size, char **pmeta, char **pfilename, char **pevent, - int *pfinished); + int *pfinished, int *pmessageid); #else /* defined(HAVE_ASAPO) */ @@ -69,13 +69,14 @@ static UNUSED void im_asapo_shutdown(struct im_asapo *a) static UNUSED void *im_asapo_fetch(struct im_asapo *a, size_t *psize, char **pmeta, char **pfilename, char **pevent, - int *pfinished) + int *pfinished, int *pmessageid) { *psize = 0; *pmeta = NULL; *pfilename = NULL; *pevent = NULL; *pfinished = 1; + *pmessageid = 0; return NULL; } diff --git a/src/im-sandbox.c b/src/im-sandbox.c index 51d9db99..63ebc537 100644 --- a/src/im-sandbox.c +++ b/src/im-sandbox.c @@ -476,6 +476,7 @@ static int run_work(const struct index_args *iargs, Stream *st, char *filename; char *event; int finished = 0; + int message_id; profile_start("asapo-fetch"); set_last_task(sb->shared->last_task[cookie], "ASAPO fetch"); @@ -484,7 +485,8 @@ static int run_work(const struct index_args *iargs, Stream *st, &pargs.asapo_meta, &filename, &event, - &finished); + &finished, + &message_id); profile_end("asapo-fetch"); if ( pargs.asapo_data != NULL ) { ok = 1; @@ -496,6 +498,10 @@ static int run_work(const struct index_args *iargs, Stream *st, pargs.filename = filename; pargs.event = event; sb->shared->end_of_stream[cookie] = 0; + + /* We will also use ASAP::O's serial number + * instead of our own. */ + ser = message_id; } else { if ( finished ) { sb->shared->end_of_stream[cookie] = 1; -- cgit v1.2.3 From 6730fa1c86654ccf84b4ad694f74cc1564cd2cee Mon Sep 17 00:00:00 2001 From: Thomas White Date: Wed, 12 Apr 2023 17:12:48 +0200 Subject: indexamajig: Add --asapo-output-stream (hits only) --- doc/man/indexamajig.1 | 5 +++ src/im-asapo.c | 113 +++++++++++++++++++++++++++++++++++++++++++++++++- src/im-asapo.h | 9 ++++ src/im-sandbox.c | 3 +- src/indexamajig.c | 8 +++- src/process_image.c | 5 ++- src/process_image.h | 4 +- 7 files changed, 142 insertions(+), 5 deletions(-) diff --git a/doc/man/indexamajig.1 b/doc/man/indexamajig.1 index 8f4dc6a2..0f35d4d8 100644 --- a/doc/man/indexamajig.1 +++ b/doc/man/indexamajig.1 @@ -198,6 +198,11 @@ Receive data via the specified ASAP::O endpoint. This option and \fB--zmq-input .PD Authentication token, beamtime, data source, consumer group and stream, respectively, for ASAP::O data. +.PD 0 +.IP \fB--asapo-output-stream=\fIstream\fR +.PD +Send an output stream via ASAP::O. For non-hits, a small placeholder will be sent. + .PD 0 .IP \fB--asapo-wait-for-stream .PD diff --git a/src/im-asapo.c b/src/im-asapo.c index 43455298..37784bf3 100644 --- a/src/im-asapo.c +++ b/src/im-asapo.c @@ -36,6 +36,7 @@ #include #include #include +#include #include #include @@ -50,8 +51,10 @@ struct im_asapo { char *stream; AsapoConsumerHandle consumer; + AsapoProducerHandle producer; AsapoStringHandle group_id; int wait_for_stream; + char *output_stream; }; @@ -96,6 +99,10 @@ struct im_asapo *im_asapo_connect(struct im_asapo_params *params) ERROR("ASAP::O stream not specified.\n"); return NULL; } + if ( strcmp(params->stream, params->output_stream) == 0 ) { + ERROR("ASAP::O input and output streams cannot be the same.\n"); + return NULL; + } a = malloc(sizeof(struct im_asapo)); if ( a == NULL ) return NULL; @@ -108,18 +115,41 @@ struct im_asapo *im_asapo_connect(struct im_asapo_params *params) params->source, params->token); a->consumer = asapo_create_consumer(params->endpoint, "auto", 0, cred, &err); - asapo_free_handle(&cred); if ( asapo_is_error(err) ) { show_asapo_error("Cannot create ASAP::O consumer", err); + asapo_free_handle(&cred); free(a); return NULL; } + if ( params->output_stream != NULL ) { + a->producer = asapo_create_producer(params->endpoint, + 1, /* Number of sender threads */ + kTcp, + cred, + 60000, /* Timeout */ + &err); + if ( asapo_is_error(err) ) { + show_asapo_error("Cannot create ASAP::O producer", err); + asapo_free_handle(&a->consumer); + asapo_free_handle(&a->group_id); + asapo_free_handle(&cred); + free(a); + return NULL; + } + a->output_stream = strdup(params->output_stream); + } else { + a->producer = NULL; + a->output_stream = NULL; + } + a->stream = strdup(params->stream); asapo_consumer_set_timeout(a->consumer, 3000); a->group_id = asapo_string_from_c_str(params->group_id); a->wait_for_stream = params->wait_for_stream; + asapo_free_handle(&cred); + return a; } @@ -216,6 +246,87 @@ void *im_asapo_fetch(struct im_asapo *a, size_t *pdata_size, } +static void send_callback(void *a, AsapoRequestCallbackPayloadHandle payload, + AsapoErrorHandle err) +{ + if ( asapo_is_error(err) ) { + show_asapo_error("ASAP::O send error", err); + } else if ( err != NULL ) { + show_asapo_error("ASAP::O send warning", err); + } +} + + +static void send_real(struct im_asapo *a, struct image *image) +{ + AsapoMessageHeaderHandle header; + AsapoErrorHandle err; + + header = asapo_create_message_header(image->serial, + image->data_block_size, + image->filename, + image->meta_data, + 0, /* Dataset substream */ + 0, + 0); /* Auto ID */ + + asapo_producer_send(a->producer, header, image->data_block, + kDefaultIngestMode, a->output_stream, + send_callback, &err); + if ( asapo_is_error(err) ) { + show_asapo_error("Couldn't ASAP::O message", err); + asapo_free_handle(&header); + asapo_free_handle(&err); + return; + } + + asapo_free_handle(&header); + asapo_free_handle(&err); +} + + +static void send_placeholder(struct im_asapo *a, struct image *image) +{ + AsapoMessageHeaderHandle header; + AsapoErrorHandle err; + + header = asapo_create_message_header(image->serial, + 8, /* strlen("SKIPPED"+\0) */ + image->filename, + image->meta_data, + 0, /* Dataset substream */ + 0, + 0); /* Auto ID */ + + asapo_producer_send(a->producer, header, "SKIPPED", + kDefaultIngestMode, a->output_stream, + send_callback, &err); + if ( asapo_is_error(err) ) { + show_asapo_error("Couldn't ASAP::O message", err); + asapo_free_handle(&header); + asapo_free_handle(&err); + return; + } + + asapo_free_handle(&header); + asapo_free_handle(&err); +} + + +/* Send the image to the output ASAP::O stream, if it's a hit. Otherwise, + * send a placeholder */ +void im_asapo_send(struct im_asapo *a, struct image *image, int hit) +{ + profile_start("asapo-send"); + if ( hit ) { + send_real(a, image); + } else { + send_placeholder(a, image); + } + profile_end("asapo-send"); +} + + void im_asapo_shutdown(struct im_asapo *a) { if ( a == NULL ) return; diff --git a/src/im-asapo.h b/src/im-asapo.h index cf7edfe4..85fffc8a 100644 --- a/src/im-asapo.h +++ b/src/im-asapo.h @@ -43,8 +43,11 @@ struct im_asapo_params char *source; char *stream; int wait_for_stream; + char *output_stream; }; +struct im_asapo; + #if defined(HAVE_ASAPO) extern struct im_asapo *im_asapo_connect(struct im_asapo_params *params); @@ -55,6 +58,8 @@ extern void *im_asapo_fetch(struct im_asapo *a, size_t *pdata_size, char **pmeta, char **pfilename, char **pevent, int *pfinished, int *pmessageid); +extern void im_asapo_send(struct im_asapo *a, struct image *image, int hit); + #else /* defined(HAVE_ASAPO) */ static UNUSED struct im_asapo *im_asapo_connect(struct im_asapo_params *params) @@ -80,6 +85,10 @@ static UNUSED void *im_asapo_fetch(struct im_asapo *a, size_t *psize, return NULL; } +static UNUSED void im_asapo_send(struct im_asapo *a, struct image *image, int hit) +{ +} + #endif /* defined(HAVE_ASAPO) */ #endif /* CRYSTFEL_ASAPO_H */ diff --git a/src/im-sandbox.c b/src/im-sandbox.c index 63ebc537..7f02b1cf 100644 --- a/src/im-sandbox.c +++ b/src/im-sandbox.c @@ -516,7 +516,8 @@ static int run_work(const struct index_args *iargs, Stream *st, sb->shared->time_last_start[cookie] = get_monotonic_seconds(); profile_start("process-image"); process_image(iargs, &pargs, st, cookie, tmpdir, ser, - sb->shared, sb->shared->last_task[cookie]); + sb->shared, sb->shared->last_task[cookie], + asapostuff); profile_end("process-image"); } diff --git a/src/indexamajig.c b/src/indexamajig.c index 427843bc..3bcc06b9 100644 --- a/src/indexamajig.c +++ b/src/indexamajig.c @@ -449,6 +449,10 @@ static error_t parse_arg(int key, char *arg, struct argp_state *state) break; case 222 : + args->asapo_params.output_stream = strdup(arg); + break; + + case 223 : args->cpu_pin = 1; break; @@ -887,6 +891,7 @@ int main(int argc, char *argv[]) args.asapo_params.source = NULL; args.asapo_params.stream = NULL; args.asapo_params.wait_for_stream = 0; + args.asapo_params.output_stream = NULL; args.cpu_pin = 0; args.serial_start = 1; args.if_peaks = 1; @@ -1009,7 +1014,8 @@ int main(int argc, char *argv[]) {"asapo-stream", 220, "str", OPTION_NO_USAGE, "ASAP::O stream name"}, {"asapo-wait-for-stream", 221, NULL, OPTION_NO_USAGE, "Wait for ASAP::O stream to appear"}, - {"cpu-pin", 222, NULL, OPTION_NO_USAGE, "Pin worker processes to CPUs"}, + {"asapo-output-stream", 222, "str", OPTION_NO_USAGE, "ASAP::O output stream name"}, + {"cpu-pin", 223, NULL, OPTION_NO_USAGE, "Pin worker processes to CPUs"}, {NULL, 0, 0, OPTION_DOC, "Peak search options:", 3}, {"peaks", 301, "method", 0, "Peak search method. Default: zaef"}, diff --git a/src/process_image.c b/src/process_image.c index 5bc527a1..8acd3e86 100644 --- a/src/process_image.c +++ b/src/process_image.c @@ -56,6 +56,7 @@ #include "predict-refine.h" #include "im-sandbox.h" #include "im-zmq.h" +#include "im-asapo.h" static float **backup_image_data(float **dp, struct detgeom *det) { @@ -177,7 +178,7 @@ static struct image *file_wait_open_read(const char *filename, void process_image(const struct index_args *iargs, struct pattern_args *pargs, Stream *st, int cookie, const char *tmpdir, int serial, struct sb_shm *sb_shared, - char *last_task) + char *last_task, struct im_asapo *asapostuff) { struct image *image; int i; @@ -480,6 +481,8 @@ streamwrite: "%s\n", n, n>1?"s":"", image->filename, image->ev); } + im_asapo_send(asapostuff, image, image->hit); + out: /* Count crystals which are still good */ set_last_task(last_task, "process_image finalisation"); diff --git a/src/process_image.h b/src/process_image.h index bb68ce19..cca2f7d2 100644 --- a/src/process_image.h +++ b/src/process_image.h @@ -45,6 +45,7 @@ struct index_args; #include "im-sandbox.h" #include "peaks.h" #include "image.h" +#include "im-asapo.h" /* Information about the indexing process which is common to all patterns */ @@ -113,7 +114,8 @@ struct pattern_args extern void process_image(const struct index_args *iargs, struct pattern_args *pargs, Stream *st, int cookie, const char *tmpdir, int serial, - struct sb_shm *sb_shared, char *last_task); + struct sb_shm *sb_shared, char *last_task, + struct im_asapo *asapostuff); #endif /* PROCESS_IMAGE_H */ -- cgit v1.2.3 From 364ddd32e7e53540d7c41420df9fe877784a0bac Mon Sep 17 00:00:00 2001 From: Thomas White Date: Thu, 1 Jun 2023 11:00:33 +0200 Subject: Meson: Factorise RPATH and include ASAP::O libraries folder --- meson.build | 19 ++++++++++--------- 1 file changed, 10 insertions(+), 9 deletions(-) diff --git a/meson.build b/meson.build index 08247ba3..c07a326a 100644 --- a/meson.build +++ b/meson.build @@ -6,6 +6,7 @@ project('crystfel', 'c', default_options: ['buildtype=debugoptimized']) libcrystfel_api_version = 16 +crystfel_rpath = '$ORIGIN/../lib64/:$ORIGIN/../lib:$ORIGIN/../../asapo-libraries/lib:$ORIGIN/../../asapo-libraries/lib64' add_project_arguments('-DHAVE_CONFIG_H', language: 'c') @@ -119,7 +120,7 @@ executable('compare_hkl', ['src/compare_hkl.c', versionc], dependencies: [mdep, libcrystfeldep, gsldep], install: true, - install_rpath: '$ORIGIN/../lib64/:$ORIGIN/../lib') + install_rpath: crystfel_rpath) # check_hkl executable('check_hkl', @@ -138,21 +139,21 @@ partialator = executable('partialator', versionc], dependencies: [mdep, libcrystfeldep, gsldep, pthreaddep], install: true, - install_rpath: '$ORIGIN/../lib64/:$ORIGIN/../lib') + install_rpath: crystfel_rpath) # ambigator executable('ambigator', ['src/ambigator.c', versionc], dependencies: [mdep, libcrystfeldep, gsldep, hdf5dep], install: true, - install_rpath: '$ORIGIN/../lib64/:$ORIGIN/../lib') + install_rpath: crystfel_rpath) # whirligig executable('whirligig', ['src/whirligig.c', versionc], dependencies: [mdep, libcrystfeldep], install: true, - install_rpath: '$ORIGIN/../lib64/:$ORIGIN/../lib') + install_rpath: crystfel_rpath) # indexamajig indexamajig_sources = ['src/indexamajig.c', 'src/im-sandbox.c', @@ -170,7 +171,7 @@ indexamajig = executable('indexamajig', indexamajig_sources, dependencies: [mdep, libcrystfeldep, gsldep, pthreaddep, zmqdep, asapodep, asapoproddep], install: true, - install_rpath: '$ORIGIN/../lib64/:$ORIGIN/../lib') + install_rpath: crystfel_rpath) # make_pixelmap if hdf5dep.found() @@ -178,7 +179,7 @@ if hdf5dep.found() ['src/make_pixelmap.c', versionc], dependencies: [mdep, libcrystfeldep, hdf5dep], install: true, - install_rpath: '$ORIGIN/../lib64/:$ORIGIN/../lib') + install_rpath: crystfel_rpath) endif @@ -213,7 +214,7 @@ if gtkdep.found() executable('crystfel', gui_sources, gresources, dependencies: [mdep, libcrystfeldep, gtkdep, gsldep], install: true, - install_rpath: '$ORIGIN/../lib64/:$ORIGIN/../lib') + install_rpath: crystfel_rpath) install_data(['data/crystfel.desktop'], install_dir: get_option('datadir') / 'applications') @@ -225,7 +226,7 @@ executable('render_hkl', ['src/render_hkl.c', versionc], dependencies: [mdep, libcrystfeldep, cairodep, gsldep], install: true, - install_rpath: '$ORIGIN/../lib64/:$ORIGIN/../lib') + install_rpath: crystfel_rpath) # cell_explorer if gtkdep.found() @@ -233,7 +234,7 @@ if gtkdep.found() ['src/cell_explorer.c', 'src/multihistogram.c', versionc], dependencies: [mdep, libcrystfeldep, gtkdep, gsldep], install: true, - install_rpath: '$ORIGIN/../lib64/:$ORIGIN/../lib') + install_rpath: crystfel_rpath) install_data(['data/cell_explorer.desktop'], install_dir: get_option('datadir') / 'applications') -- cgit v1.2.3 From 3057bd3ed568fa0bdaa7c0ddf0ea0dbbff532bf3 Mon Sep 17 00:00:00 2001 From: Thomas White Date: Fri, 2 Jun 2023 13:21:19 +0200 Subject: Avoid strcmp(..., NULL) --- src/im-asapo.c | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/im-asapo.c b/src/im-asapo.c index 37784bf3..f7851b7b 100644 --- a/src/im-asapo.c +++ b/src/im-asapo.c @@ -99,7 +99,9 @@ struct im_asapo *im_asapo_connect(struct im_asapo_params *params) ERROR("ASAP::O stream not specified.\n"); return NULL; } - if ( strcmp(params->stream, params->output_stream) == 0 ) { + if ( (params->output_stream != NULL) + && (strcmp(params->stream, params->output_stream) == 0) ) + { ERROR("ASAP::O input and output streams cannot be the same.\n"); return NULL; } -- cgit v1.2.3 From 1d1a2a700d229852d9ba2e34b382a2f7b3c147f3 Mon Sep 17 00:00:00 2001 From: Thomas White Date: Fri, 2 Jun 2023 14:00:03 +0200 Subject: ASAP::O: Use XX_hits for producer data source --- src/im-asapo.c | 68 +++++++++++++++++++++++++++++++++++++++++----------------- 1 file changed, 48 insertions(+), 20 deletions(-) diff --git a/src/im-asapo.c b/src/im-asapo.c index f7851b7b..4ae4d291 100644 --- a/src/im-asapo.c +++ b/src/im-asapo.c @@ -73,6 +73,53 @@ static void show_asapo_error(const char *msg, const AsapoErrorHandle err) } +static int create_producer(struct im_asapo *a, struct im_asapo_params *params) +{ + char *source; + AsapoSourceCredentialsHandle cred; + AsapoErrorHandle err = asapo_new_handle(); + + if ( params->output_stream == NULL ) { + a->output_stream = NULL; + a->producer = NULL; + return 0; + } + + source = malloc(strlen(params->source)+6); + if ( source == NULL ) return 1; + + strcpy(source, params->source); + strcat(source, "_hits"); + + cred = asapo_create_source_credentials(kProcessed, + "auto", /* instance ID */ + "indexamajig", /* pipeline step */ + params->beamtime, + "", /* beamline */ + source, + params->token); + free(source); + + a->producer = asapo_create_producer(params->endpoint, + 1, /* Number of sender threads */ + kTcp, + cred, + 60000, /* Timeout */ + &err); + + if ( asapo_is_error(err) ) { + show_asapo_error("Cannot create ASAP::O producer", err); + asapo_free_handle(&cred); + asapo_free_handle(&err); + return 1; + } + + asapo_free_handle(&err); + a->output_stream = strdup(params->output_stream); + return 0; +} + + struct im_asapo *im_asapo_connect(struct im_asapo_params *params) { struct im_asapo *a; @@ -124,26 +171,7 @@ struct im_asapo *im_asapo_connect(struct im_asapo_params *params) return NULL; } - if ( params->output_stream != NULL ) { - a->producer = asapo_create_producer(params->endpoint, - 1, /* Number of sender threads */ - kTcp, - cred, - 60000, /* Timeout */ - &err); - if ( asapo_is_error(err) ) { - show_asapo_error("Cannot create ASAP::O producer", err); - asapo_free_handle(&a->consumer); - asapo_free_handle(&a->group_id); - asapo_free_handle(&cred); - free(a); - return NULL; - } - a->output_stream = strdup(params->output_stream); - } else { - a->producer = NULL; - a->output_stream = NULL; - } + if ( create_producer(a, params) ) return NULL; a->stream = strdup(params->stream); asapo_consumer_set_timeout(a->consumer, 3000); -- cgit v1.2.3 From e03b82dd89295305e655bea301b321b7f8e15844 Mon Sep 17 00:00:00 2001 From: Thomas White Date: Fri, 2 Jun 2023 14:30:53 +0200 Subject: ASAP::O: Use input stream name as output (add _hits to data source) This means we don't need a stream name any more, so --asapo-output-stream became a simple flag parameter. --- src/im-asapo.c | 16 ++++------------ src/im-asapo.h | 2 +- src/indexamajig.c | 6 +++--- 3 files changed, 8 insertions(+), 16 deletions(-) diff --git a/src/im-asapo.c b/src/im-asapo.c index 4ae4d291..6bc43b11 100644 --- a/src/im-asapo.c +++ b/src/im-asapo.c @@ -54,7 +54,6 @@ struct im_asapo AsapoProducerHandle producer; AsapoStringHandle group_id; int wait_for_stream; - char *output_stream; }; @@ -79,8 +78,7 @@ static int create_producer(struct im_asapo *a, struct im_asapo_params *params) AsapoSourceCredentialsHandle cred; AsapoErrorHandle err = asapo_new_handle(); - if ( params->output_stream == NULL ) { - a->output_stream = NULL; + if ( !params->write_output_stream ) { a->producer = NULL; return 0; } @@ -98,6 +96,7 @@ static int create_producer(struct im_asapo *a, struct im_asapo_params *params) "", /* beamline */ source, params->token); + STATUS("Writing hits-only output stream as data source '%s'\n", source); free(source); a->producer = asapo_create_producer(params->endpoint, @@ -115,7 +114,6 @@ static int create_producer(struct im_asapo *a, struct im_asapo_params *params) } asapo_free_handle(&err); - a->output_stream = strdup(params->output_stream); return 0; } @@ -146,12 +144,6 @@ struct im_asapo *im_asapo_connect(struct im_asapo_params *params) ERROR("ASAP::O stream not specified.\n"); return NULL; } - if ( (params->output_stream != NULL) - && (strcmp(params->stream, params->output_stream) == 0) ) - { - ERROR("ASAP::O input and output streams cannot be the same.\n"); - return NULL; - } a = malloc(sizeof(struct im_asapo)); if ( a == NULL ) return NULL; @@ -301,7 +293,7 @@ static void send_real(struct im_asapo *a, struct image *image) 0); /* Auto ID */ asapo_producer_send(a->producer, header, image->data_block, - kDefaultIngestMode, a->output_stream, + kDefaultIngestMode, a->stream, send_callback, &err); if ( asapo_is_error(err) ) { show_asapo_error("Couldn't ASAP::O message", err); @@ -329,7 +321,7 @@ static void send_placeholder(struct im_asapo *a, struct image *image) 0); /* Auto ID */ asapo_producer_send(a->producer, header, "SKIPPED", - kDefaultIngestMode, a->output_stream, + kDefaultIngestMode, a->stream, send_callback, &err); if ( asapo_is_error(err) ) { show_asapo_error("Couldn't ASAP::O message", err); diff --git a/src/im-asapo.h b/src/im-asapo.h index 85fffc8a..43691700 100644 --- a/src/im-asapo.h +++ b/src/im-asapo.h @@ -43,7 +43,7 @@ struct im_asapo_params char *source; char *stream; int wait_for_stream; - char *output_stream; + int write_output_stream; }; struct im_asapo; diff --git a/src/indexamajig.c b/src/indexamajig.c index 3bcc06b9..a7b5ec9d 100644 --- a/src/indexamajig.c +++ b/src/indexamajig.c @@ -449,7 +449,7 @@ static error_t parse_arg(int key, char *arg, struct argp_state *state) break; case 222 : - args->asapo_params.output_stream = strdup(arg); + args->asapo_params.write_output_stream = 1; break; case 223 : @@ -891,7 +891,7 @@ int main(int argc, char *argv[]) args.asapo_params.source = NULL; args.asapo_params.stream = NULL; args.asapo_params.wait_for_stream = 0; - args.asapo_params.output_stream = NULL; + args.asapo_params.write_output_stream = 0; args.cpu_pin = 0; args.serial_start = 1; args.if_peaks = 1; @@ -1014,7 +1014,7 @@ int main(int argc, char *argv[]) {"asapo-stream", 220, "str", OPTION_NO_USAGE, "ASAP::O stream name"}, {"asapo-wait-for-stream", 221, NULL, OPTION_NO_USAGE, "Wait for ASAP::O stream to appear"}, - {"asapo-output-stream", 222, "str", OPTION_NO_USAGE, "ASAP::O output stream name"}, + {"asapo-output-stream", 222, NULL, OPTION_NO_USAGE, "Create an ASAP::O hits-only stream"}, {"cpu-pin", 223, NULL, OPTION_NO_USAGE, "Pin worker processes to CPUs"}, {NULL, 0, 0, OPTION_DOC, "Peak search options:", 3}, -- cgit v1.2.3 From 447a5a8129295ec8bce1c9a6292595e3e96f3840 Mon Sep 17 00:00:00 2001 From: Thomas White Date: Fri, 2 Jun 2023 15:26:18 +0200 Subject: Seedee: Handle u4< data type --- libcrystfel/src/image-seedee.c | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/libcrystfel/src/image-seedee.c b/libcrystfel/src/image-seedee.c index 60153591..ae52c3f9 100644 --- a/libcrystfel/src/image-seedee.c +++ b/libcrystfel/src/image-seedee.c @@ -84,6 +84,20 @@ static int load_seedee_data(struct panel_template *p, } } + } else if ( (array->datatype == 'u') + && (array->itemsize == 4) + && (array->byteorder == '<') ) + { + int fs, ss; + uint32_t *in_data = (uint32_t *)array->data; + + for ( ss=0; ssorig_min_fs + (ss+p->orig_min_ss)*data_size_fs; + data[fs+ss*PANEL_WIDTH(p)] = in_data[idx]; + } + } + } else { ERROR("Unrecognised data type %c%i%c\n", array->datatype, array->itemsize, array->byteorder); -- cgit v1.2.3 From 187600a9e768cea530147d4e16a62c6665502070 Mon Sep 17 00:00:00 2001 From: Thomas White Date: Fri, 2 Jun 2023 15:26:27 +0200 Subject: ASAP::O: Make output stream write conditional --- src/im-asapo.c | 1 + 1 file changed, 1 insertion(+) diff --git a/src/im-asapo.c b/src/im-asapo.c index 6bc43b11..e0a23546 100644 --- a/src/im-asapo.c +++ b/src/im-asapo.c @@ -339,6 +339,7 @@ static void send_placeholder(struct im_asapo *a, struct image *image) * send a placeholder */ void im_asapo_send(struct im_asapo *a, struct image *image, int hit) { + if ( a->producer == NULL ) return; profile_start("asapo-send"); if ( hit ) { send_real(a, image); -- cgit v1.2.3 From ba872e9e528c5a77b1ab1cb5417866648b11179b Mon Sep 17 00:00:00 2001 From: Thomas White Date: Sat, 3 Jun 2023 17:20:15 +0200 Subject: ASAP::O: Fix some uninitialised stuff --- src/im-asapo.c | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/src/im-asapo.c b/src/im-asapo.c index e0a23546..4c0533f4 100644 --- a/src/im-asapo.c +++ b/src/im-asapo.c @@ -284,6 +284,8 @@ static void send_real(struct im_asapo *a, struct image *image) AsapoMessageHeaderHandle header; AsapoErrorHandle err; + err = asapo_new_handle(); + header = asapo_create_message_header(image->serial, image->data_block_size, image->filename, @@ -312,6 +314,8 @@ static void send_placeholder(struct im_asapo *a, struct image *image) AsapoMessageHeaderHandle header; AsapoErrorHandle err; + err = asapo_new_handle(); + header = asapo_create_message_header(image->serial, 8, /* strlen("SKIPPED"+\0) */ image->filename, @@ -339,6 +343,7 @@ static void send_placeholder(struct im_asapo *a, struct image *image) * send a placeholder */ void im_asapo_send(struct im_asapo *a, struct image *image, int hit) { + if ( a == NULL ) return; if ( a->producer == NULL ) return; profile_start("asapo-send"); if ( hit ) { -- cgit v1.2.3 From 357b023f8045ad39afb4d1b4c7ca08126b1248fb Mon Sep 17 00:00:00 2001 From: Thomas White Date: Sat, 3 Jun 2023 17:58:04 +0200 Subject: ASAP::O: Reduce producer timeout --- src/im-asapo.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/im-asapo.c b/src/im-asapo.c index 4c0533f4..16bd328f 100644 --- a/src/im-asapo.c +++ b/src/im-asapo.c @@ -103,7 +103,7 @@ static int create_producer(struct im_asapo *a, struct im_asapo_params *params) 1, /* Number of sender threads */ kTcp, cred, - 60000, /* Timeout */ + 30000, /* Timeout */ &err); if ( asapo_is_error(err) ) { -- cgit v1.2.3 From ca77de46b42713993b1cfb18e9a30de522d8a69e Mon Sep 17 00:00:00 2001 From: Thomas White Date: Sat, 3 Jun 2023 17:58:41 +0200 Subject: ASAP::O: Use unique filename for hits --- src/im-asapo.c | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/src/im-asapo.c b/src/im-asapo.c index 16bd328f..4d483669 100644 --- a/src/im-asapo.c +++ b/src/im-asapo.c @@ -283,17 +283,20 @@ static void send_real(struct im_asapo *a, struct image *image) { AsapoMessageHeaderHandle header; AsapoErrorHandle err; + char filename[1024]; - err = asapo_new_handle(); + snprintf(filename, 1024, "processed/%s_hits/%s-%i.data", + a->stream, a->stream, image->serial); header = asapo_create_message_header(image->serial, image->data_block_size, - image->filename, + filename, image->meta_data, 0, /* Dataset substream */ 0, 0); /* Auto ID */ + err = asapo_new_handle(); asapo_producer_send(a->producer, header, image->data_block, kDefaultIngestMode, a->stream, send_callback, &err); @@ -313,17 +316,20 @@ static void send_placeholder(struct im_asapo *a, struct image *image) { AsapoMessageHeaderHandle header; AsapoErrorHandle err; + char filename[1024]; - err = asapo_new_handle(); + snprintf(filename, 1024, "processed/%s_hits/%s-%i.placeholder", + a->stream, a->stream, image->serial); header = asapo_create_message_header(image->serial, 8, /* strlen("SKIPPED"+\0) */ - image->filename, + filename, image->meta_data, 0, /* Dataset substream */ 0, 0); /* Auto ID */ + err = asapo_new_handle(); asapo_producer_send(a->producer, header, "SKIPPED", kDefaultIngestMode, a->stream, send_callback, &err); -- cgit v1.2.3 From 4d1983463587a9375cb073992fca1614854e2429 Mon Sep 17 00:00:00 2001 From: Thomas White Date: Sat, 3 Jun 2023 17:58:51 +0200 Subject: Fix missing error text --- src/im-asapo.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/im-asapo.c b/src/im-asapo.c index 4d483669..87c3f0c1 100644 --- a/src/im-asapo.c +++ b/src/im-asapo.c @@ -301,7 +301,7 @@ static void send_real(struct im_asapo *a, struct image *image) kDefaultIngestMode, a->stream, send_callback, &err); if ( asapo_is_error(err) ) { - show_asapo_error("Couldn't ASAP::O message", err); + show_asapo_error("Couldn't send ASAP::O message", err); asapo_free_handle(&header); asapo_free_handle(&err); return; @@ -334,7 +334,7 @@ static void send_placeholder(struct im_asapo *a, struct image *image) kDefaultIngestMode, a->stream, send_callback, &err); if ( asapo_is_error(err) ) { - show_asapo_error("Couldn't ASAP::O message", err); + show_asapo_error("Couldn't send ASAP::O message", err); asapo_free_handle(&header); asapo_free_handle(&err); return; -- cgit v1.2.3 From 50bb229a0b23f89c3a1fa265f42bdabbf2f452fc Mon Sep 17 00:00:00 2001 From: Thomas White Date: Sat, 3 Jun 2023 18:04:41 +0200 Subject: ASAP::O: Reduce verbosity --- src/im-asapo.c | 2 -- 1 file changed, 2 deletions(-) diff --git a/src/im-asapo.c b/src/im-asapo.c index 87c3f0c1..9162946b 100644 --- a/src/im-asapo.c +++ b/src/im-asapo.c @@ -273,8 +273,6 @@ static void send_callback(void *a, AsapoRequestCallbackPayloadHandle payload, { if ( asapo_is_error(err) ) { show_asapo_error("ASAP::O send error", err); - } else if ( err != NULL ) { - show_asapo_error("ASAP::O send warning", err); } } -- cgit v1.2.3 From c4271d8171f2f1f0a58ae47ba77dc96136450687 Mon Sep 17 00:00:00 2001 From: Thomas White Date: Sat, 3 Jun 2023 20:27:07 +0200 Subject: Support Seedee format f8< --- libcrystfel/src/image-seedee.c | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/libcrystfel/src/image-seedee.c b/libcrystfel/src/image-seedee.c index ae52c3f9..b4ae5179 100644 --- a/libcrystfel/src/image-seedee.c +++ b/libcrystfel/src/image-seedee.c @@ -98,6 +98,20 @@ static int load_seedee_data(struct panel_template *p, } } + } else if ( (array->datatype == 'f') + && (array->itemsize == 8) + && (array->byteorder == '<') ) + { + int fs, ss; + double *in_data = (double *)array->data; + + for ( ss=0; ssorig_min_fs + (ss+p->orig_min_ss)*data_size_fs; + data[fs+ss*PANEL_WIDTH(p)] = in_data[idx]; + } + } + } else { ERROR("Unrecognised data type %c%i%c\n", array->datatype, array->itemsize, array->byteorder); -- cgit v1.2.3 From 98fb6da2715390f11d0b99a872aa842cd6503d60 Mon Sep 17 00:00:00 2001 From: Thomas White Date: Sat, 3 Jun 2023 20:27:23 +0200 Subject: ASAP::O: Increase number of sender threads --- src/im-asapo.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/im-asapo.c b/src/im-asapo.c index 9162946b..7cc155bc 100644 --- a/src/im-asapo.c +++ b/src/im-asapo.c @@ -100,7 +100,7 @@ static int create_producer(struct im_asapo *a, struct im_asapo_params *params) free(source); a->producer = asapo_create_producer(params->endpoint, - 1, /* Number of sender threads */ + 8, /* Number of sender threads */ kTcp, cred, 30000, /* Timeout */ -- cgit v1.2.3 From b5550e4399415fe4863057d6cbd406faf3f2df31 Mon Sep 17 00:00:00 2001 From: Thomas White Date: Sat, 3 Jun 2023 20:27:49 +0200 Subject: ASAP::O: Add verbose logging for hits-only stream Caution: extreme console spam --- src/im-asapo.c | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/src/im-asapo.c b/src/im-asapo.c index 7cc155bc..c34954e2 100644 --- a/src/im-asapo.c +++ b/src/im-asapo.c @@ -35,6 +35,7 @@ #include #include #include +#include #include #include @@ -113,6 +114,8 @@ static int create_producer(struct im_asapo *a, struct im_asapo_params *params) return 1; } + asapo_producer_set_log_level(a->producer, Debug); + asapo_free_handle(&err); return 0; } @@ -294,6 +297,10 @@ static void send_real(struct im_asapo *a, struct image *image) 0, 0); /* Auto ID */ + struct timeval tv; + gettimeofday(&tv,NULL); + STATUS("sent %s at %lli . %lli\n", filename, tv.tv_sec, tv.tv_usec); + err = asapo_new_handle(); asapo_producer_send(a->producer, header, image->data_block, kDefaultIngestMode, a->stream, @@ -319,6 +326,10 @@ static void send_placeholder(struct im_asapo *a, struct image *image) snprintf(filename, 1024, "processed/%s_hits/%s-%i.placeholder", a->stream, a->stream, image->serial); + struct timeval tv; + gettimeofday(&tv,NULL); + STATUS("sent %s at %lli . %lli\n", filename, tv.tv_sec, tv.tv_usec); + header = asapo_create_message_header(image->serial, 8, /* strlen("SKIPPED"+\0) */ filename, -- cgit v1.2.3