diff options
author | Thomas White <taw@physics.org> | 2019-07-17 14:54:00 +0200 |
---|---|---|
committer | Thomas White <taw@physics.org> | 2019-07-17 14:54:00 +0200 |
commit | f25b274603861b2eaecbf995493b79530b69d6a9 (patch) | |
tree | 7e7a48b77c2c356395b3464323f96503cb008053 /src/im-sandbox.c | |
parent | 2b2d41cd933bb1015fbf95b63dd8ec3c990b0767 (diff) | |
parent | 8b3f748a90421152b364be5170415ee4e0a3b329 (diff) |
Merge branch 'tom/zmq'
Diffstat (limited to 'src/im-sandbox.c')
-rw-r--r-- | src/im-sandbox.c | 150 |
1 files changed, 100 insertions, 50 deletions
diff --git a/src/im-sandbox.c b/src/im-sandbox.c index e8fbe763..0ec20ea2 100644 --- a/src/im-sandbox.c +++ b/src/im-sandbox.c @@ -3,17 +3,18 @@ * * Sandbox for indexing * - * Copyright © 2012-2018 Deutsches Elektronen-Synchrotron DESY, + * Copyright © 2012-2019 Deutsches Elektronen-Synchrotron DESY, * a research centre of the Helmholtz Association. * Copyright © 2012 Richard Kirian * Copyright © 2012 Lorenzo Galli * * Authors: - * 2010-2018 Thomas White <taw@physics.org> + * 2010-2019 Thomas White <taw@physics.org> * 2014 Valerio Mariani * 2011 Richard Kirian * 2012 Lorenzo Galli * 2012 Chunhong Yoon + * 2017 Stijn de Graaf * * This file is part of CrystFEL. * @@ -64,6 +65,7 @@ #include "im-sandbox.h" #include "process_image.h" #include "time-accounts.h" +#include "im-zmq.h" struct sandbox @@ -92,6 +94,10 @@ struct sandbox const char *tmpdir; + /* ZMQ mode */ + int zmq; + const char *zmq_address; + /* Final output */ Stream *stream; }; @@ -360,11 +366,23 @@ void set_last_task(char *lt, const char *task) } -static void run_work(const struct index_args *iargs, Stream *st, - int cookie, const char *tmpdir, struct sandbox *sb) +static int run_work(const struct index_args *iargs, Stream *st, + int cookie, const char *tmpdir, struct sandbox *sb) { int allDone = 0; - TimeAccounts *taccs = time_accounts_init(); + TimeAccounts *taccs; + struct im_zmq *zmqstuff = NULL; + + /* Connect via ZMQ */ + if ( sb->zmq ) { + zmqstuff = im_zmq_connect(sb->zmq_address); + if ( zmqstuff == NULL ) { + ERROR("ZMQ setup failed.\n"); + return 1; + } + } + + taccs = time_accounts_init(); while ( !allDone ) { @@ -375,58 +393,71 @@ static void run_work(const struct index_args *iargs, Stream *st, struct event *ev; int r; - /* Wait until an event is ready */ - time_accounts_set(taccs, TACC_EVENTWAIT); - set_last_task(sb->shared->last_task[cookie], "wait_event"); - if ( sem_wait(sb->queue_sem) != 0 ) { - ERROR("Failed to wait on queue semaphore: %s\n", - strerror(errno)); - } + if ( !sb->zmq ) { - /* Get the event from the queue */ - set_last_task(sb->shared->last_task[cookie], "read_queue"); - pthread_mutex_lock(&sb->shared->queue_lock); - if ( (sb->shared->n_events==0) && (sb->shared->no_more) ) { - /* Queue is empty and no more coming, so exit */ - pthread_mutex_unlock(&sb->shared->queue_lock); - allDone = 1; - continue; - } - if ( sb->shared->n_events == 0 ) { - ERROR("Got the semaphore, but no events in queue!\n"); - ERROR("no_more = %i\n", sb->shared->no_more); + /* Wait until an event is ready */ + time_accounts_set(taccs, TACC_EVENTWAIT); + set_last_task(sb->shared->last_task[cookie], "wait_event"); + if ( sem_wait(sb->queue_sem) != 0 ) { + ERROR("Failed to wait on queue semaphore: %s\n", + strerror(errno)); + } + + /* Get the event from the queue */ + set_last_task(sb->shared->last_task[cookie], "read_queue"); + pthread_mutex_lock(&sb->shared->queue_lock); + if ( (sb->shared->n_events==0) && (sb->shared->no_more) ) { + /* Queue is empty and no more coming, so exit */ + pthread_mutex_unlock(&sb->shared->queue_lock); + allDone = 1; + continue; + } + if ( sb->shared->n_events == 0 ) { + ERROR("Got the semaphore, but no events in queue!\n"); + ERROR("no_more = %i\n", sb->shared->no_more); + pthread_mutex_unlock(&sb->shared->queue_lock); + allDone = 1; + continue; + } + r = sscanf(sb->shared->queue[0], "%s %s %i", + filename, event_str, &ser); + if ( r != 3 ) { + STATUS("Invalid event string '%s'\n", + sb->shared->queue[0]); + } + memcpy(sb->shared->last_ev[cookie], sb->shared->queue[0], + MAX_EV_LEN); + shuffle_events(sb->shared); pthread_mutex_unlock(&sb->shared->queue_lock); - allDone = 1; - continue; - } - r = sscanf(sb->shared->queue[0], "%s %s %i", - filename, event_str, &ser); - if ( r != 3 ) { - STATUS("Invalid event string '%s'\n", - sb->shared->queue[0]); - } - memcpy(sb->shared->last_ev[cookie], sb->shared->queue[0], - MAX_EV_LEN); - shuffle_events(sb->shared); - pthread_mutex_unlock(&sb->shared->queue_lock); - if ( r != 3 ) continue; + if ( r != 3 ) continue; - pargs.filename_p_e = initialize_filename_plus_event(); - pargs.filename_p_e->filename = strdup(filename); + pargs.filename_p_e = initialize_filename_plus_event(); + pargs.filename_p_e->filename = strdup(filename); - if ( strcmp(event_str, "(none)") != 0 ) { + if ( strcmp(event_str, "(none)") != 0 ) { + + ev = get_event_from_event_string(event_str); + if ( ev == NULL ) { + ERROR("Bad event string '%s'\n", event_str); + continue; + } + pargs.filename_p_e->ev = ev; + + } else { + + pargs.filename_p_e->ev = NULL; - ev = get_event_from_event_string(event_str); - if ( ev == NULL ) { - ERROR("Bad event string '%s'\n", event_str); - continue; } - pargs.filename_p_e->ev = ev; + pargs.msgpack_obj = NULL; } else { + pargs.msgpack_obj = im_zmq_fetch(zmqstuff); + pargs.filename_p_e = initialize_filename_plus_event(); + pargs.filename_p_e->filename = strdup("(from ZMQ)"); pargs.filename_p_e->ev = NULL; + ser = 0; /* FIXME */ } @@ -434,10 +465,16 @@ static void run_work(const struct index_args *iargs, Stream *st, process_image(iargs, &pargs, st, cookie, tmpdir, ser, sb->shared, taccs, sb->shared->last_task[cookie]); - free_filename_plus_event(pargs.filename_p_e); + if ( !sb->zmq ) { + free_filename_plus_event(pargs.filename_p_e); + } else { + im_zmq_clean(zmqstuff); + } } + im_zmq_shutdown(zmqstuff); + time_accounts_set(taccs, TACC_FINALCLEANUP); cleanup_indexing(iargs->ipriv); free_detector_geometry(iargs->det); @@ -446,6 +483,7 @@ static void run_work(const struct index_args *iargs, Stream *st, cell_free(iargs->cell); if ( iargs->profile ) time_accounts_print(taccs); time_accounts_free(taccs); + return 0; } @@ -689,7 +727,7 @@ static void start_worker_process(struct sandbox *sb, int slot) */ st = open_stream_fd_for_write(stream_pipe[1]); - run_work(sb->iargs, st, slot, tmp, sb); + r = run_work(sb->iargs, st, slot, tmp, sb); close_stream(st); free(tmp); @@ -699,7 +737,7 @@ static void start_worker_process(struct sandbox *sb, int slot) free(sb); - exit(0); + exit(r); } @@ -804,6 +842,11 @@ static int setup_shm(struct sandbox *sb) /* Assumes the caller is already holding queue_lock! */ static int fill_queue(struct get_pattern_ctx *gpctx, struct sandbox *sb) { + if ( sb->zmq ) { + /* Do nothing */ + return 0; + } + while ( sb->shared->n_events < QUEUE_SIZE ) { struct filename_plus_event *ne; @@ -998,7 +1041,8 @@ char *create_tempdir(const char *temp_location) * If the return value is zero, something is probably wrong. */ int create_sandbox(struct index_args *iargs, int n_proc, char *prefix, int config_basename, FILE *fh, - Stream *stream, const char *tmpdir, int serial_start) + Stream *stream, const char *tmpdir, int serial_start, + const char *zmq_address) { int i; struct sandbox *sb; @@ -1027,6 +1071,12 @@ int create_sandbox(struct index_args *iargs, int n_proc, char *prefix, sb->iargs = iargs; sb->serial = serial_start; sb->tmpdir = tmpdir; + if ( zmq_address != NULL ) { + sb->zmq = 1; + sb->zmq_address = zmq_address; + } else { + sb->zmq = 0; + } sb->fds = NULL; sb->fhs = NULL; |