diff options
Diffstat (limited to 'src/im-sandbox.c')
-rw-r--r-- | src/im-sandbox.c | 146 |
1 files changed, 127 insertions, 19 deletions
diff --git a/src/im-sandbox.c b/src/im-sandbox.c index bd3f47e7..83d2f043 100644 --- a/src/im-sandbox.c +++ b/src/im-sandbox.c @@ -62,6 +62,7 @@ #include "process_image.h" #include "im-zmq.h" #include "profile.h" +#include "im-asapo.h" struct sandbox @@ -104,6 +105,15 @@ struct sandbox int n_zmq_subscriptions; const char *zmq_request; + /* ASAP::O mode */ + int asapo; + const char *asapo_endpoint; + const char *asapo_token; + const char *asapo_beamtime; + const char *asapo_group_id; + const char *asapo_source; + const char *asapo_stream; + /* Final output */ Stream *stream; }; @@ -330,6 +340,7 @@ static int run_work(const struct index_args *iargs, Stream *st, { int allDone = 0; struct im_zmq *zmqstuff = NULL; + struct im_asapo *asapostuff = NULL; if ( sb->profile ) { profile_init(); @@ -347,6 +358,19 @@ static int run_work(const struct index_args *iargs, Stream *st, } } + if ( sb->asapo ) { + asapostuff = im_asapo_connect(sb->asapo_endpoint, + sb->asapo_token, + sb->asapo_beamtime, + sb->asapo_group_id, + sb->asapo_source, + sb->asapo_stream); + if ( asapostuff == NULL ) { + ERROR("ASAP::O setup failed.\n"); + return 1; + } + } + while ( !allDone ) { struct pattern_args pargs; @@ -432,42 +456,84 @@ static int run_work(const struct index_args *iargs, Stream *st, pargs.event = safe_strdup(event_str); free(line); + ok = 0; - if ( !sb->zmq ) { - - pargs.zmq_data = NULL; - pargs.zmq_data_size = 0; + /* Default values */ + pargs.zmq_data = NULL; + pargs.zmq_data_size = 0; + pargs.asapo_data = NULL; + pargs.asapo_data_size = 0; + pargs.asapo_meta = NULL; - } else { + if ( sb->zmq ) { do { pargs.zmq_data = im_zmq_fetch(zmqstuff, &pargs.zmq_data_size); } while ( pargs.zmq_data_size < 15 ); + ok = 1; /* The filename/event, which will be 'fake' values in * this case, still came via the event queue. More * importantly, the event queue gave us a unique * serial number for this image. */ + } else if ( sb->asapo ) { + + char *filename; + char *event; + int finished = 0; + + profile_start("asapo-fetch"); + set_last_task(sb->shared->last_task[cookie], "ASAPO fetch"); + pargs.asapo_data = im_asapo_fetch(asapostuff, + &pargs.asapo_data_size, + &pargs.asapo_meta, + &filename, + &event, + &finished); + profile_end("asapo-fetch"); + if ( pargs.asapo_data != NULL ) { + ok = 1; + + /* ASAP::O provides a meaningful filename, which + * replaces the placeholder. */ + free(pargs.filename); + free(pargs.event); + pargs.filename = filename; + pargs.event = event; + } else { + if ( finished ) { + sb->shared->should_shutdown = 1; + allDone = 1; + } + } + + } else { + ok = 1; } - sb->shared->time_last_start[cookie] = get_monotonic_seconds(); - profile_start("process-image"); - process_image(iargs, &pargs, st, cookie, tmpdir, ser, - sb->shared, sb->shared->last_task[cookie]); - profile_end("process-image"); + if ( ok ) { + sb->shared->time_last_start[cookie] = get_monotonic_seconds(); + profile_start("process-image"); + process_image(iargs, &pargs, st, cookie, tmpdir, ser, + sb->shared, sb->shared->last_task[cookie]); + profile_end("process-image"); + } - /* pargs.zmq_data will be copied into the image structure, so - * that it can be queried for "header" values etc. It will - * eventually be freed by image_free() under process_image() */ + /* NB pargs.zmq_data, pargs.asapo_data and pargs.asapo_meta + * will be copied into the image structure, so + * that it can be queried for "header" values etc. They will + * eventually be freed by image_free() under process_image(). */ if ( sb->profile ) { - profile_print_and_reset(); + profile_print_and_reset(cookie); } } + /* These are both no-ops if argument is NULL */ im_zmq_shutdown(zmqstuff); + im_asapo_shutdown(asapostuff); cleanup_indexing(iargs->ipriv); cell_free(iargs->cell); @@ -843,12 +909,19 @@ static int fill_queue(struct get_pattern_ctx *gpctx, struct sandbox *sb) char *evstr; if ( sb->zmq ) { - /* These values will be passed down to process_image, - * but ignored. The 'real' filename, which is still a - * 'fake' filename - only for accounting purposes - will - * be generated by image_read_data_block(). */ + /* These are just semi-meaningful placeholder values to + * be put into the queue, instead of "(null)". + * A unique filename is needed so that the GUI can + * tell the frames apart from one another. + * ASAP::O, for one, will replace this with a filename + * that corresponds to something real. */ filename = "ZMQdata"; - evstr = strdup("//"); + evstr = malloc(64); + snprintf(evstr, 64, "//%i", sb->serial); + } else if ( sb->asapo ) { + filename = "ASAPOdata"; + evstr = malloc(64); + snprintf(evstr, 64, "//%i", sb->serial); } else { if ( !get_pattern(gpctx, &filename, &evstr) ) return 1; } @@ -1056,6 +1129,9 @@ int create_sandbox(struct index_args *iargs, int n_proc, char *prefix, Stream *stream, const char *tmpdir, int serial_start, const char *zmq_address, char **zmq_subscriptions, int n_zmq_subscriptions, const char *zmq_request, + const char *asapo_endpoint, const char *asapo_token, + const char *asapo_beamtime, const char *asapo_group_id, + const char *asapo_source, const char *asapo_stream, int timeout, int profile) { int i; @@ -1096,6 +1172,38 @@ int create_sandbox(struct index_args *iargs, int n_proc, char *prefix, sb->zmq = 0; } + if ( asapo_endpoint != NULL ) { + sb->asapo = 1; + sb->asapo_endpoint = asapo_endpoint; + sb->asapo_token = asapo_token; + sb->asapo_beamtime = asapo_beamtime; + sb->asapo_source = asapo_source; + sb->asapo_stream = asapo_stream; + } else { + sb->asapo = 0; + } + + if ( sb->zmq && sb->asapo ) { + ERROR("Cannot simultaneously use ZMQ and ASAP::O input.\n"); + free(sb); + return 0; + } + + if ( sb->asapo ) { + if ( asapo_group_id != NULL ) { + sb->asapo_group_id = strdup(asapo_group_id); + } else { + sb->asapo_group_id = im_asapo_make_unique_group_id(asapo_endpoint, + asapo_token); + } + if ( sb->asapo_group_id == NULL ) { + ERROR("Failed to create ASAP::O group ID.\n"); + return 0; + } else { + STATUS("The unique ID is %s\n", sb->asapo_group_id); + } + } + sb->fds = NULL; sb->fhs = NULL; sb->stream = stream; |