aboutsummaryrefslogtreecommitdiff
path: root/src/im-sandbox.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/im-sandbox.c')
-rw-r--r--src/im-sandbox.c146
1 files changed, 127 insertions, 19 deletions
diff --git a/src/im-sandbox.c b/src/im-sandbox.c
index bd3f47e7..83d2f043 100644
--- a/src/im-sandbox.c
+++ b/src/im-sandbox.c
@@ -62,6 +62,7 @@
#include "process_image.h"
#include "im-zmq.h"
#include "profile.h"
+#include "im-asapo.h"
struct sandbox
@@ -104,6 +105,15 @@ struct sandbox
int n_zmq_subscriptions;
const char *zmq_request;
+ /* ASAP::O mode */
+ int asapo;
+ const char *asapo_endpoint;
+ const char *asapo_token;
+ const char *asapo_beamtime;
+ const char *asapo_group_id;
+ const char *asapo_source;
+ const char *asapo_stream;
+
/* Final output */
Stream *stream;
};
@@ -330,6 +340,7 @@ static int run_work(const struct index_args *iargs, Stream *st,
{
int allDone = 0;
struct im_zmq *zmqstuff = NULL;
+ struct im_asapo *asapostuff = NULL;
if ( sb->profile ) {
profile_init();
@@ -347,6 +358,19 @@ static int run_work(const struct index_args *iargs, Stream *st,
}
}
+ if ( sb->asapo ) {
+ asapostuff = im_asapo_connect(sb->asapo_endpoint,
+ sb->asapo_token,
+ sb->asapo_beamtime,
+ sb->asapo_group_id,
+ sb->asapo_source,
+ sb->asapo_stream);
+ if ( asapostuff == NULL ) {
+ ERROR("ASAP::O setup failed.\n");
+ return 1;
+ }
+ }
+
while ( !allDone ) {
struct pattern_args pargs;
@@ -432,42 +456,84 @@ static int run_work(const struct index_args *iargs, Stream *st,
pargs.event = safe_strdup(event_str);
free(line);
+ ok = 0;
- if ( !sb->zmq ) {
-
- pargs.zmq_data = NULL;
- pargs.zmq_data_size = 0;
+ /* Default values */
+ pargs.zmq_data = NULL;
+ pargs.zmq_data_size = 0;
+ pargs.asapo_data = NULL;
+ pargs.asapo_data_size = 0;
+ pargs.asapo_meta = NULL;
- } else {
+ if ( sb->zmq ) {
do {
pargs.zmq_data = im_zmq_fetch(zmqstuff,
&pargs.zmq_data_size);
} while ( pargs.zmq_data_size < 15 );
+ ok = 1;
/* The filename/event, which will be 'fake' values in
* this case, still came via the event queue. More
* importantly, the event queue gave us a unique
* serial number for this image. */
+ } else if ( sb->asapo ) {
+
+ char *filename;
+ char *event;
+ int finished = 0;
+
+ profile_start("asapo-fetch");
+ set_last_task(sb->shared->last_task[cookie], "ASAPO fetch");
+ pargs.asapo_data = im_asapo_fetch(asapostuff,
+ &pargs.asapo_data_size,
+ &pargs.asapo_meta,
+ &filename,
+ &event,
+ &finished);
+ profile_end("asapo-fetch");
+ if ( pargs.asapo_data != NULL ) {
+ ok = 1;
+
+ /* ASAP::O provides a meaningful filename, which
+ * replaces the placeholder. */
+ free(pargs.filename);
+ free(pargs.event);
+ pargs.filename = filename;
+ pargs.event = event;
+ } else {
+ if ( finished ) {
+ sb->shared->should_shutdown = 1;
+ allDone = 1;
+ }
+ }
+
+ } else {
+ ok = 1;
}
- sb->shared->time_last_start[cookie] = get_monotonic_seconds();
- profile_start("process-image");
- process_image(iargs, &pargs, st, cookie, tmpdir, ser,
- sb->shared, sb->shared->last_task[cookie]);
- profile_end("process-image");
+ if ( ok ) {
+ sb->shared->time_last_start[cookie] = get_monotonic_seconds();
+ profile_start("process-image");
+ process_image(iargs, &pargs, st, cookie, tmpdir, ser,
+ sb->shared, sb->shared->last_task[cookie]);
+ profile_end("process-image");
+ }
- /* pargs.zmq_data will be copied into the image structure, so
- * that it can be queried for "header" values etc. It will
- * eventually be freed by image_free() under process_image() */
+ /* NB pargs.zmq_data, pargs.asapo_data and pargs.asapo_meta
+ * will be copied into the image structure, so
+ * that it can be queried for "header" values etc. They will
+ * eventually be freed by image_free() under process_image(). */
if ( sb->profile ) {
- profile_print_and_reset();
+ profile_print_and_reset(cookie);
}
}
+ /* These are both no-ops if argument is NULL */
im_zmq_shutdown(zmqstuff);
+ im_asapo_shutdown(asapostuff);
cleanup_indexing(iargs->ipriv);
cell_free(iargs->cell);
@@ -843,12 +909,19 @@ static int fill_queue(struct get_pattern_ctx *gpctx, struct sandbox *sb)
char *evstr;
if ( sb->zmq ) {
- /* These values will be passed down to process_image,
- * but ignored. The 'real' filename, which is still a
- * 'fake' filename - only for accounting purposes - will
- * be generated by image_read_data_block(). */
+ /* These are just semi-meaningful placeholder values to
+ * be put into the queue, instead of "(null)".
+ * A unique filename is needed so that the GUI can
+ * tell the frames apart from one another.
+ * ASAP::O, for one, will replace this with a filename
+ * that corresponds to something real. */
filename = "ZMQdata";
- evstr = strdup("//");
+ evstr = malloc(64);
+ snprintf(evstr, 64, "//%i", sb->serial);
+ } else if ( sb->asapo ) {
+ filename = "ASAPOdata";
+ evstr = malloc(64);
+ snprintf(evstr, 64, "//%i", sb->serial);
} else {
if ( !get_pattern(gpctx, &filename, &evstr) ) return 1;
}
@@ -1056,6 +1129,9 @@ int create_sandbox(struct index_args *iargs, int n_proc, char *prefix,
Stream *stream, const char *tmpdir, int serial_start,
const char *zmq_address, char **zmq_subscriptions,
int n_zmq_subscriptions, const char *zmq_request,
+ const char *asapo_endpoint, const char *asapo_token,
+ const char *asapo_beamtime, const char *asapo_group_id,
+ const char *asapo_source, const char *asapo_stream,
int timeout, int profile)
{
int i;
@@ -1096,6 +1172,38 @@ int create_sandbox(struct index_args *iargs, int n_proc, char *prefix,
sb->zmq = 0;
}
+ if ( asapo_endpoint != NULL ) {
+ sb->asapo = 1;
+ sb->asapo_endpoint = asapo_endpoint;
+ sb->asapo_token = asapo_token;
+ sb->asapo_beamtime = asapo_beamtime;
+ sb->asapo_source = asapo_source;
+ sb->asapo_stream = asapo_stream;
+ } else {
+ sb->asapo = 0;
+ }
+
+ if ( sb->zmq && sb->asapo ) {
+ ERROR("Cannot simultaneously use ZMQ and ASAP::O input.\n");
+ free(sb);
+ return 0;
+ }
+
+ if ( sb->asapo ) {
+ if ( asapo_group_id != NULL ) {
+ sb->asapo_group_id = strdup(asapo_group_id);
+ } else {
+ sb->asapo_group_id = im_asapo_make_unique_group_id(asapo_endpoint,
+ asapo_token);
+ }
+ if ( sb->asapo_group_id == NULL ) {
+ ERROR("Failed to create ASAP::O group ID.\n");
+ return 0;
+ } else {
+ STATUS("The unique ID is %s\n", sb->asapo_group_id);
+ }
+ }
+
sb->fds = NULL;
sb->fhs = NULL;
sb->stream = stream;