aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--CMakeLists.txt26
-rw-r--r--src/im-sandbox.c138
-rw-r--r--src/im-sandbox.h7
-rw-r--r--src/im-zmq.c (renamed from src/zmq.c)89
-rw-r--r--src/im-zmq.h (renamed from src/zmq.h)17
-rw-r--r--src/indexamajig.c20
-rw-r--r--src/process_image.c6
-rw-r--r--src/process_image.h7
8 files changed, 250 insertions, 60 deletions
diff --git a/CMakeLists.txt b/CMakeLists.txt
index 30b34d36..3c01b267 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -24,6 +24,20 @@ link_directories (${GLIB_LIBRARY_DIRS})
set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -Wall")
+pkg_search_module(MSGPACK msgpack)
+if (MSGPACK_FOUND)
+ message(STATUS "Found Messagepack")
+else ()
+ message(STATUS "MessagePack not found.")
+endif ()
+
+pkg_search_module(ZMQ libzmq)
+if (ZMQ_FOUND)
+ message(STATUS "Found ZMQ")
+else ()
+ message(STATUS "ZMQ not found.")
+endif ()
+
pkg_search_module(GTK gtk+-3.0)
if (NOT GTK_FOUND)
@@ -257,13 +271,23 @@ list(APPEND CRYSTFEL_EXECUTABLES list_events)
# indexamajig
set(INDEXAMAJIG_SOURCES src/indexamajig.c src/im-sandbox.c src/process_image.c
- src/time-accounts.c src/zmq.c)
+ src/time-accounts.c)
+
+if ( ZMQ_FOUND AND MSGPACK_FOUND )
+ list(APPEND INDEXAMAJIG_SOURCES src/im-zmq.c)
+endif ()
add_executable(indexamajig ${INDEXAMAJIG_SOURCES})
target_include_directories(indexamajig PRIVATE ${COMMON_INCLUDES})
target_link_libraries(indexamajig ${COMMON_LIBRARIES})
list(APPEND CRYSTFEL_EXECUTABLES indexamajig)
+if ( ZMQ_FOUND AND MSGPACK_FOUND )
+ target_include_directories(indexamajig PRIVATE ${ZMQ_INCLUDE_DIR} ${MSGPACK_INCLUDE_DIR})
+ target_link_libraries(indexamajig ${ZMQ_LIBRARIES} ${MSGPACK_LIBRARIES})
+endif ()
+
+
# ----------------------------------------------------------------------
# get_hkl
diff --git a/src/im-sandbox.c b/src/im-sandbox.c
index e8fbe763..37574310 100644
--- a/src/im-sandbox.c
+++ b/src/im-sandbox.c
@@ -3,17 +3,18 @@
*
* Sandbox for indexing
*
- * Copyright © 2012-2018 Deutsches Elektronen-Synchrotron DESY,
+ * Copyright © 2012-2019 Deutsches Elektronen-Synchrotron DESY,
* a research centre of the Helmholtz Association.
* Copyright © 2012 Richard Kirian
* Copyright © 2012 Lorenzo Galli
*
* Authors:
- * 2010-2018 Thomas White <taw@physics.org>
+ * 2010-2019 Thomas White <taw@physics.org>
* 2014 Valerio Mariani
* 2011 Richard Kirian
* 2012 Lorenzo Galli
* 2012 Chunhong Yoon
+ * 2017 Stijn de Graaf
*
* This file is part of CrystFEL.
*
@@ -64,6 +65,7 @@
#include "im-sandbox.h"
#include "process_image.h"
#include "time-accounts.h"
+#include "im-zmq.h"
struct sandbox
@@ -92,6 +94,10 @@ struct sandbox
const char *tmpdir;
+ /* ZMQ mode */
+ int zmq;
+ const char *zmq_address;
+
/* Final output */
Stream *stream;
};
@@ -364,7 +370,19 @@ static void run_work(const struct index_args *iargs, Stream *st,
int cookie, const char *tmpdir, struct sandbox *sb)
{
int allDone = 0;
- TimeAccounts *taccs = time_accounts_init();
+ TimeAccounts *taccs;
+ struct im_zmq *zmqstuff;
+
+ /* Connect via ZMQ */
+ if ( sb->zmq ) {
+ zmqstuff = im_zmq_connect(sb->zmq_address);
+ if ( zmqstuff == NULL ) {
+ ERROR("ZMQ setup failed.\n");
+ return;
+ }
+ }
+
+ taccs = time_accounts_init();
while ( !allDone ) {
@@ -375,58 +393,66 @@ static void run_work(const struct index_args *iargs, Stream *st,
struct event *ev;
int r;
- /* Wait until an event is ready */
- time_accounts_set(taccs, TACC_EVENTWAIT);
- set_last_task(sb->shared->last_task[cookie], "wait_event");
- if ( sem_wait(sb->queue_sem) != 0 ) {
- ERROR("Failed to wait on queue semaphore: %s\n",
- strerror(errno));
- }
+ if ( !sb->zmq ) {
- /* Get the event from the queue */
- set_last_task(sb->shared->last_task[cookie], "read_queue");
- pthread_mutex_lock(&sb->shared->queue_lock);
- if ( (sb->shared->n_events==0) && (sb->shared->no_more) ) {
- /* Queue is empty and no more coming, so exit */
- pthread_mutex_unlock(&sb->shared->queue_lock);
- allDone = 1;
- continue;
- }
- if ( sb->shared->n_events == 0 ) {
- ERROR("Got the semaphore, but no events in queue!\n");
- ERROR("no_more = %i\n", sb->shared->no_more);
+ /* Wait until an event is ready */
+ time_accounts_set(taccs, TACC_EVENTWAIT);
+ set_last_task(sb->shared->last_task[cookie], "wait_event");
+ if ( sem_wait(sb->queue_sem) != 0 ) {
+ ERROR("Failed to wait on queue semaphore: %s\n",
+ strerror(errno));
+ }
+
+ /* Get the event from the queue */
+ set_last_task(sb->shared->last_task[cookie], "read_queue");
+ pthread_mutex_lock(&sb->shared->queue_lock);
+ if ( (sb->shared->n_events==0) && (sb->shared->no_more) ) {
+ /* Queue is empty and no more coming, so exit */
+ pthread_mutex_unlock(&sb->shared->queue_lock);
+ allDone = 1;
+ continue;
+ }
+ if ( sb->shared->n_events == 0 ) {
+ ERROR("Got the semaphore, but no events in queue!\n");
+ ERROR("no_more = %i\n", sb->shared->no_more);
+ pthread_mutex_unlock(&sb->shared->queue_lock);
+ allDone = 1;
+ continue;
+ }
+ r = sscanf(sb->shared->queue[0], "%s %s %i",
+ filename, event_str, &ser);
+ if ( r != 3 ) {
+ STATUS("Invalid event string '%s'\n",
+ sb->shared->queue[0]);
+ }
+ memcpy(sb->shared->last_ev[cookie], sb->shared->queue[0],
+ MAX_EV_LEN);
+ shuffle_events(sb->shared);
pthread_mutex_unlock(&sb->shared->queue_lock);
- allDone = 1;
- continue;
- }
- r = sscanf(sb->shared->queue[0], "%s %s %i",
- filename, event_str, &ser);
- if ( r != 3 ) {
- STATUS("Invalid event string '%s'\n",
- sb->shared->queue[0]);
- }
- memcpy(sb->shared->last_ev[cookie], sb->shared->queue[0],
- MAX_EV_LEN);
- shuffle_events(sb->shared);
- pthread_mutex_unlock(&sb->shared->queue_lock);
- if ( r != 3 ) continue;
+ if ( r != 3 ) continue;
- pargs.filename_p_e = initialize_filename_plus_event();
- pargs.filename_p_e->filename = strdup(filename);
+ pargs.filename_p_e = initialize_filename_plus_event();
+ pargs.filename_p_e->filename = strdup(filename);
- if ( strcmp(event_str, "(none)") != 0 ) {
+ if ( strcmp(event_str, "(none)") != 0 ) {
+
+ ev = get_event_from_event_string(event_str);
+ if ( ev == NULL ) {
+ ERROR("Bad event string '%s'\n", event_str);
+ continue;
+ }
+ pargs.filename_p_e->ev = ev;
+
+ } else {
+
+ pargs.filename_p_e->ev = NULL;
- ev = get_event_from_event_string(event_str);
- if ( ev == NULL ) {
- ERROR("Bad event string '%s'\n", event_str);
- continue;
}
- pargs.filename_p_e->ev = ev;
} else {
- pargs.filename_p_e->ev = NULL;
+ pargs.msgpack_obj = im_zmq_fetch(zmqstuff);
}
@@ -434,10 +460,16 @@ static void run_work(const struct index_args *iargs, Stream *st,
process_image(iargs, &pargs, st, cookie, tmpdir, ser,
sb->shared, taccs, sb->shared->last_task[cookie]);
- free_filename_plus_event(pargs.filename_p_e);
+ if ( !sb->zmq ) {
+ free_filename_plus_event(pargs.filename_p_e);
+ } else {
+ im_zmq_clean(zmqstuff);
+ }
}
+ im_zmq_shutdown(zmqstuff);
+
time_accounts_set(taccs, TACC_FINALCLEANUP);
cleanup_indexing(iargs->ipriv);
free_detector_geometry(iargs->det);
@@ -804,6 +836,11 @@ static int setup_shm(struct sandbox *sb)
/* Assumes the caller is already holding queue_lock! */
static int fill_queue(struct get_pattern_ctx *gpctx, struct sandbox *sb)
{
+ if ( sb->zmq ) {
+ /* Do nothing */
+ return 0;
+ }
+
while ( sb->shared->n_events < QUEUE_SIZE ) {
struct filename_plus_event *ne;
@@ -998,7 +1035,8 @@ char *create_tempdir(const char *temp_location)
* If the return value is zero, something is probably wrong. */
int create_sandbox(struct index_args *iargs, int n_proc, char *prefix,
int config_basename, FILE *fh,
- Stream *stream, const char *tmpdir, int serial_start)
+ Stream *stream, const char *tmpdir, int serial_start,
+ const char *zmq_address)
{
int i;
struct sandbox *sb;
@@ -1027,6 +1065,12 @@ int create_sandbox(struct index_args *iargs, int n_proc, char *prefix,
sb->iargs = iargs;
sb->serial = serial_start;
sb->tmpdir = tmpdir;
+ if ( zmq_address != NULL ) {
+ sb->zmq = 1;
+ sb->zmq_address = zmq_address;
+ } else {
+ sb->zmq = 0;
+ }
sb->fds = NULL;
sb->fhs = NULL;
diff --git a/src/im-sandbox.h b/src/im-sandbox.h
index ee2de993..9da11526 100644
--- a/src/im-sandbox.h
+++ b/src/im-sandbox.h
@@ -3,13 +3,13 @@
*
* Sandbox for indexing
*
- * Copyright © 2012-2018 Deutsches Elektronen-Synchrotron DESY,
+ * Copyright © 2012-2019 Deutsches Elektronen-Synchrotron DESY,
* a research centre of the Helmholtz Association.
* Copyright © 2012 Richard Kirian
* Copyright © 2012 Lorenzo Galli
*
* Authors:
- * 2010-2018 Thomas White <taw@physics.org>
+ * 2010-2019 Thomas White <taw@physics.org>
* 2011 Richard Kirian
* 2012 Lorenzo Galli
* 2012 Chunhong Yoon
@@ -83,6 +83,7 @@ extern void set_last_task(char *lt, const char *task);
extern int create_sandbox(struct index_args *iargs, int n_proc, char *prefix,
int config_basename, FILE *fh, Stream *stream,
- const char *tempdir, int serial_start);
+ const char *tempdir, int serial_start,
+ const char *zmq_address);
#endif /* IM_SANDBOX_H */
diff --git a/src/zmq.c b/src/im-zmq.c
index 41bd2eec..c299b980 100644
--- a/src/zmq.c
+++ b/src/im-zmq.c
@@ -38,6 +38,7 @@
#include <hdf5.h>
#include <assert.h>
#include <unistd.h>
+#include <zmq.h>
#include <msgpack.h>
#include "events.h"
@@ -46,6 +47,94 @@
#include "utils.h"
+struct im_zmq
+{
+ void *ctx;
+ void *socket;
+ zmq_msg_t msg;
+ msgpack_unpacked unpacked;
+ int unpacked_set;
+};
+
+
+struct im_zmq *im_zmq_connect(const char *zmq_address)
+{
+ struct im_zmq *z;
+
+ z = malloc(sizeof(struct im_zmq));
+ if ( z == NULL ) return NULL;
+
+ z->unpacked_set = 0;
+
+ z->ctx = zmq_ctx_new();
+ if ( z->ctx == NULL ) return NULL;
+
+ z->socket = zmq_socket(z->ctx, ZMQ_REQ);
+ if ( z->socket == NULL ) return NULL;
+
+ STATUS("Connecting to ZMQ at '%s'\n", zmq_address);
+ if ( zmq_connect(z->socket, zmq_address) == -1 ) {
+ ERROR("ZMQ connection failed: %s\n", zmq_strerror(errno));
+ return NULL;
+ }
+ STATUS("ZMQ connected.\n");
+
+ return z;
+}
+
+
+msgpack_object *im_zmq_fetch(struct im_zmq *z)
+{
+ int msg_size;
+ int r;
+
+ if ( zmq_send(z->socket, "m", 1, 0) == -1 ) {
+ ERROR("ZMQ message send failed: %s\n", zmq_strerror(errno));
+ return NULL;
+ }
+
+ zmq_msg_init(&z->msg);
+ msg_size = zmq_msg_recv(&z->msg, z->socket, 0);
+ if ( msg_size == -1 ) {
+ ERROR("ZMQ recieve failed: %s\n", zmq_strerror(errno));
+ zmq_msg_close(&z->msg);
+ return NULL;
+ }
+
+ msgpack_unpacked_init(&z->unpacked);
+ r = msgpack_unpack_next(&z->unpacked, zmq_msg_data(&z->msg),
+ msg_size, NULL);
+ if ( r != MSGPACK_UNPACK_SUCCESS ) {
+ ERROR("Msgpack unpack failed: %i\n", r);
+ zmq_msg_close(&z->msg);
+ return NULL;
+ }
+ z->unpacked_set = 1;
+
+ return &z->unpacked.data;
+}
+
+
+/* Clean structures ready for next frame */
+void im_zmq_clean(struct im_zmq *z)
+{
+ if ( z->unpacked_set ) {
+ msgpack_unpacked_destroy(&z->unpacked);
+ zmq_msg_close(&z->msg);
+ z->unpacked_set = 0;
+ }
+}
+
+
+void im_zmq_shutdown(struct im_zmq *z)
+{
+ if ( z == NULL ) return;
+
+ zmq_msg_close(&z->msg);
+ zmq_close(z->socket);
+ zmq_ctx_destroy(z->ctx);
+}
+
/**
* get_peaks_onda:
* @obj: A %msgpack_object containing data in OnDA format
diff --git a/src/zmq.h b/src/im-zmq.h
index ee04f437..e6abe562 100644
--- a/src/zmq.h
+++ b/src/im-zmq.h
@@ -3,13 +3,13 @@
*
* ZMQ data interface
*
- * Copyright © 2017-2018 Deutsches Elektronen-Synchrotron DESY,
+ * Copyright © 2017-2019 Deutsches Elektronen-Synchrotron DESY,
* a research centre of the Helmholtz Association.
*
* Authors:
- * 2018 Thomas White <taw@physics.org>
- * 2014 Valerio Mariani
- * 2017 Stijna de Graaf
+ * 2018-2019 Thomas White <taw@physics.org>
+ * 2014 Valerio Mariani
+ * 2017 Stijn de Graaf
*
* This file is part of CrystFEL.
*
@@ -40,9 +40,18 @@
#include "image.h"
+extern struct im_zmq *im_zmq_connect(const char *zmq_address);
+
+extern msgpack_object *im_zmq_fetch(struct im_zmq *z);
+
+extern void im_zmq_clean(struct im_zmq *z);
+
+extern void im_zmq_shutdown(struct im_zmq *z);
+
extern int get_peaks_onda(msgpack_object *obj, struct image *image,
int half_pixel_shift);
extern int obj_read(msgpack_object *obj, struct image *image);
+
#endif /* CRYSTFEL_ZMQ_H */
diff --git a/src/indexamajig.c b/src/indexamajig.c
index 2d584726..356d0c94 100644
--- a/src/indexamajig.c
+++ b/src/indexamajig.c
@@ -86,6 +86,7 @@ static void show_help(const char *s)
" --profile Show timing data for performance monitoring\n"
" --temp-dir=<path> Put the temporary folder under <path>\n"
" --wait-for-file=<n> Time to wait for each file before processing\n"
+" --zmq-msgpack Receive data in MessagePack format over ZMQ\n"
"\nPeak search options:\n\n"
" --peaks=<method> Peak search method (zaef,peakfinder8,peakfinder9,hdf5,cxi)\n"
" Default: zaef\n"
@@ -340,6 +341,8 @@ int main(int argc, char *argv[])
int if_retry = 1;
int serial_start = 1;
char *spectrum_fn = NULL;
+ int zmq = 0;
+ char *zmq_address = NULL;
/* Defaults */
iargs.cell = NULL;
@@ -457,6 +460,7 @@ int main(int argc, char *argv[])
{"no-multi", 0, &if_multi, 0},
{"multi", 0, &if_multi, 1},
{"overpredict", 0, &iargs.overpredict, 1},
+ {"zmq-msgpack", 0, &zmq, 1},
/* Long-only options which don't actually do anything */
{"no-sat-corr", 0, &iargs.satcorr, 0},
@@ -1297,8 +1301,22 @@ int main(int argc, char *argv[])
gsl_set_error_handler_off();
+ if ( zmq ) {
+ char line[1024];
+ char *rval;
+ rval = fgets(line, 1024, fh);
+ if ( rval == NULL ) {
+ ERROR("Failed to read ZMQ server/port from input.\n");
+ return 1;
+ }
+ chomp(line);
+ zmq_address = strdup(line);
+ /* In future, read multiple addresses and hand them out
+ * evenly to workers */
+ }
+
r = create_sandbox(&iargs, n_proc, prefix, config_basename, fh,
- st, tmpdir, serial_start);
+ st, tmpdir, serial_start, zmq_address);
free_imagefile_field_list(iargs.copyme);
cell_free(iargs.cell);
diff --git a/src/process_image.c b/src/process_image.c
index 2f2eb698..b3b3b1da 100644
--- a/src/process_image.c
+++ b/src/process_image.c
@@ -3,12 +3,13 @@
*
* The processing pipeline for one image
*
- * Copyright © 2012-2018 Deutsches Elektronen-Synchrotron DESY,
+ * Copyright © 2012-2019 Deutsches Elektronen-Synchrotron DESY,
* a research centre of the Helmholtz Association.
*
* Authors:
- * 2010-2017 Thomas White <taw@physics.org>
+ * 2010-2019 Thomas White <taw@physics.org>
* 2014-2017 Valerio Mariani <valerio.mariani@desy.de>
+ * 2017 Stijn de Graaf
*
* This file is part of CrystFEL.
*
@@ -38,6 +39,7 @@
#include <gsl/gsl_sort.h>
#include <unistd.h>
#include <sys/stat.h>
+#include <msgpack.h>
#include "utils.h"
#include "hdf5-file.h"
diff --git a/src/process_image.h b/src/process_image.h
index 2a43d11d..b61fe83f 100644
--- a/src/process_image.h
+++ b/src/process_image.h
@@ -3,11 +3,11 @@
*
* The processing pipeline for one image
*
- * Copyright © 2012-2018 Deutsches Elektronen-Synchrotron DESY,
+ * Copyright © 2012-2019 Deutsches Elektronen-Synchrotron DESY,
* a research centre of the Helmholtz Association.
*
* Authors:
- * 2010-2016 Thomas White <taw@physics.org>
+ * 2010-2019 Thomas White <taw@physics.org>
* 2014-2017 Valerio Mariani <valerio.mariani@desy.de>
* 2017-2018 Yaroslav Gevorkov <yaroslav.gevorkov@desy.de>
*
@@ -37,6 +37,8 @@
struct index_args;
+#include <msgpack.h>
+
#include "integration.h"
#include "im-sandbox.h"
#include "time-accounts.h"
@@ -122,6 +124,7 @@ struct pattern_args
{
/* "Input" */
struct filename_plus_event *filename_p_e;
+ msgpack_object *msgpack_obj;
};