aboutsummaryrefslogtreecommitdiff
path: root/src/im-sandbox.c
diff options
context:
space:
mode:
authorThomas White <taw@physics.org>2019-07-17 14:54:00 +0200
committerThomas White <taw@physics.org>2019-07-17 14:54:00 +0200
commitf25b274603861b2eaecbf995493b79530b69d6a9 (patch)
tree7e7a48b77c2c356395b3464323f96503cb008053 /src/im-sandbox.c
parent2b2d41cd933bb1015fbf95b63dd8ec3c990b0767 (diff)
parent8b3f748a90421152b364be5170415ee4e0a3b329 (diff)
Merge branch 'tom/zmq'
Diffstat (limited to 'src/im-sandbox.c')
-rw-r--r--src/im-sandbox.c150
1 files changed, 100 insertions, 50 deletions
diff --git a/src/im-sandbox.c b/src/im-sandbox.c
index e8fbe763..0ec20ea2 100644
--- a/src/im-sandbox.c
+++ b/src/im-sandbox.c
@@ -3,17 +3,18 @@
*
* Sandbox for indexing
*
- * Copyright © 2012-2018 Deutsches Elektronen-Synchrotron DESY,
+ * Copyright © 2012-2019 Deutsches Elektronen-Synchrotron DESY,
* a research centre of the Helmholtz Association.
* Copyright © 2012 Richard Kirian
* Copyright © 2012 Lorenzo Galli
*
* Authors:
- * 2010-2018 Thomas White <taw@physics.org>
+ * 2010-2019 Thomas White <taw@physics.org>
* 2014 Valerio Mariani
* 2011 Richard Kirian
* 2012 Lorenzo Galli
* 2012 Chunhong Yoon
+ * 2017 Stijn de Graaf
*
* This file is part of CrystFEL.
*
@@ -64,6 +65,7 @@
#include "im-sandbox.h"
#include "process_image.h"
#include "time-accounts.h"
+#include "im-zmq.h"
struct sandbox
@@ -92,6 +94,10 @@ struct sandbox
const char *tmpdir;
+ /* ZMQ mode */
+ int zmq;
+ const char *zmq_address;
+
/* Final output */
Stream *stream;
};
@@ -360,11 +366,23 @@ void set_last_task(char *lt, const char *task)
}
-static void run_work(const struct index_args *iargs, Stream *st,
- int cookie, const char *tmpdir, struct sandbox *sb)
+static int run_work(const struct index_args *iargs, Stream *st,
+ int cookie, const char *tmpdir, struct sandbox *sb)
{
int allDone = 0;
- TimeAccounts *taccs = time_accounts_init();
+ TimeAccounts *taccs;
+ struct im_zmq *zmqstuff = NULL;
+
+ /* Connect via ZMQ */
+ if ( sb->zmq ) {
+ zmqstuff = im_zmq_connect(sb->zmq_address);
+ if ( zmqstuff == NULL ) {
+ ERROR("ZMQ setup failed.\n");
+ return 1;
+ }
+ }
+
+ taccs = time_accounts_init();
while ( !allDone ) {
@@ -375,58 +393,71 @@ static void run_work(const struct index_args *iargs, Stream *st,
struct event *ev;
int r;
- /* Wait until an event is ready */
- time_accounts_set(taccs, TACC_EVENTWAIT);
- set_last_task(sb->shared->last_task[cookie], "wait_event");
- if ( sem_wait(sb->queue_sem) != 0 ) {
- ERROR("Failed to wait on queue semaphore: %s\n",
- strerror(errno));
- }
+ if ( !sb->zmq ) {
- /* Get the event from the queue */
- set_last_task(sb->shared->last_task[cookie], "read_queue");
- pthread_mutex_lock(&sb->shared->queue_lock);
- if ( (sb->shared->n_events==0) && (sb->shared->no_more) ) {
- /* Queue is empty and no more coming, so exit */
- pthread_mutex_unlock(&sb->shared->queue_lock);
- allDone = 1;
- continue;
- }
- if ( sb->shared->n_events == 0 ) {
- ERROR("Got the semaphore, but no events in queue!\n");
- ERROR("no_more = %i\n", sb->shared->no_more);
+ /* Wait until an event is ready */
+ time_accounts_set(taccs, TACC_EVENTWAIT);
+ set_last_task(sb->shared->last_task[cookie], "wait_event");
+ if ( sem_wait(sb->queue_sem) != 0 ) {
+ ERROR("Failed to wait on queue semaphore: %s\n",
+ strerror(errno));
+ }
+
+ /* Get the event from the queue */
+ set_last_task(sb->shared->last_task[cookie], "read_queue");
+ pthread_mutex_lock(&sb->shared->queue_lock);
+ if ( (sb->shared->n_events==0) && (sb->shared->no_more) ) {
+ /* Queue is empty and no more coming, so exit */
+ pthread_mutex_unlock(&sb->shared->queue_lock);
+ allDone = 1;
+ continue;
+ }
+ if ( sb->shared->n_events == 0 ) {
+ ERROR("Got the semaphore, but no events in queue!\n");
+ ERROR("no_more = %i\n", sb->shared->no_more);
+ pthread_mutex_unlock(&sb->shared->queue_lock);
+ allDone = 1;
+ continue;
+ }
+ r = sscanf(sb->shared->queue[0], "%s %s %i",
+ filename, event_str, &ser);
+ if ( r != 3 ) {
+ STATUS("Invalid event string '%s'\n",
+ sb->shared->queue[0]);
+ }
+ memcpy(sb->shared->last_ev[cookie], sb->shared->queue[0],
+ MAX_EV_LEN);
+ shuffle_events(sb->shared);
pthread_mutex_unlock(&sb->shared->queue_lock);
- allDone = 1;
- continue;
- }
- r = sscanf(sb->shared->queue[0], "%s %s %i",
- filename, event_str, &ser);
- if ( r != 3 ) {
- STATUS("Invalid event string '%s'\n",
- sb->shared->queue[0]);
- }
- memcpy(sb->shared->last_ev[cookie], sb->shared->queue[0],
- MAX_EV_LEN);
- shuffle_events(sb->shared);
- pthread_mutex_unlock(&sb->shared->queue_lock);
- if ( r != 3 ) continue;
+ if ( r != 3 ) continue;
- pargs.filename_p_e = initialize_filename_plus_event();
- pargs.filename_p_e->filename = strdup(filename);
+ pargs.filename_p_e = initialize_filename_plus_event();
+ pargs.filename_p_e->filename = strdup(filename);
- if ( strcmp(event_str, "(none)") != 0 ) {
+ if ( strcmp(event_str, "(none)") != 0 ) {
+
+ ev = get_event_from_event_string(event_str);
+ if ( ev == NULL ) {
+ ERROR("Bad event string '%s'\n", event_str);
+ continue;
+ }
+ pargs.filename_p_e->ev = ev;
+
+ } else {
+
+ pargs.filename_p_e->ev = NULL;
- ev = get_event_from_event_string(event_str);
- if ( ev == NULL ) {
- ERROR("Bad event string '%s'\n", event_str);
- continue;
}
- pargs.filename_p_e->ev = ev;
+ pargs.msgpack_obj = NULL;
} else {
+ pargs.msgpack_obj = im_zmq_fetch(zmqstuff);
+ pargs.filename_p_e = initialize_filename_plus_event();
+ pargs.filename_p_e->filename = strdup("(from ZMQ)");
pargs.filename_p_e->ev = NULL;
+ ser = 0; /* FIXME */
}
@@ -434,10 +465,16 @@ static void run_work(const struct index_args *iargs, Stream *st,
process_image(iargs, &pargs, st, cookie, tmpdir, ser,
sb->shared, taccs, sb->shared->last_task[cookie]);
- free_filename_plus_event(pargs.filename_p_e);
+ if ( !sb->zmq ) {
+ free_filename_plus_event(pargs.filename_p_e);
+ } else {
+ im_zmq_clean(zmqstuff);
+ }
}
+ im_zmq_shutdown(zmqstuff);
+
time_accounts_set(taccs, TACC_FINALCLEANUP);
cleanup_indexing(iargs->ipriv);
free_detector_geometry(iargs->det);
@@ -446,6 +483,7 @@ static void run_work(const struct index_args *iargs, Stream *st,
cell_free(iargs->cell);
if ( iargs->profile ) time_accounts_print(taccs);
time_accounts_free(taccs);
+ return 0;
}
@@ -689,7 +727,7 @@ static void start_worker_process(struct sandbox *sb, int slot)
*/
st = open_stream_fd_for_write(stream_pipe[1]);
- run_work(sb->iargs, st, slot, tmp, sb);
+ r = run_work(sb->iargs, st, slot, tmp, sb);
close_stream(st);
free(tmp);
@@ -699,7 +737,7 @@ static void start_worker_process(struct sandbox *sb, int slot)
free(sb);
- exit(0);
+ exit(r);
}
@@ -804,6 +842,11 @@ static int setup_shm(struct sandbox *sb)
/* Assumes the caller is already holding queue_lock! */
static int fill_queue(struct get_pattern_ctx *gpctx, struct sandbox *sb)
{
+ if ( sb->zmq ) {
+ /* Do nothing */
+ return 0;
+ }
+
while ( sb->shared->n_events < QUEUE_SIZE ) {
struct filename_plus_event *ne;
@@ -998,7 +1041,8 @@ char *create_tempdir(const char *temp_location)
* If the return value is zero, something is probably wrong. */
int create_sandbox(struct index_args *iargs, int n_proc, char *prefix,
int config_basename, FILE *fh,
- Stream *stream, const char *tmpdir, int serial_start)
+ Stream *stream, const char *tmpdir, int serial_start,
+ const char *zmq_address)
{
int i;
struct sandbox *sb;
@@ -1027,6 +1071,12 @@ int create_sandbox(struct index_args *iargs, int n_proc, char *prefix,
sb->iargs = iargs;
sb->serial = serial_start;
sb->tmpdir = tmpdir;
+ if ( zmq_address != NULL ) {
+ sb->zmq = 1;
+ sb->zmq_address = zmq_address;
+ } else {
+ sb->zmq = 0;
+ }
sb->fds = NULL;
sb->fhs = NULL;