aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorThomas White <taw@physics.org>2019-07-17 14:54:00 +0200
committerThomas White <taw@physics.org>2019-07-17 14:54:00 +0200
commitf25b274603861b2eaecbf995493b79530b69d6a9 (patch)
tree7e7a48b77c2c356395b3464323f96503cb008053
parent2b2d41cd933bb1015fbf95b63dd8ec3c990b0767 (diff)
parent8b3f748a90421152b364be5170415ee4e0a3b329 (diff)
Merge branch 'tom/zmq'
-rw-r--r--CMakeLists.txt26
-rw-r--r--config.h.cmake.in2
-rw-r--r--doc/man/indexamajig.110
-rw-r--r--libcrystfel/src/image.c2
-rw-r--r--src/im-sandbox.c150
-rw-r--r--src/im-sandbox.h7
-rw-r--r--src/im-zmq.c491
-rw-r--r--src/im-zmq.h77
-rw-r--r--src/indexamajig.c29
-rw-r--r--src/process_image.c121
-rw-r--r--src/process_image.h15
11 files changed, 826 insertions, 104 deletions
diff --git a/CMakeLists.txt b/CMakeLists.txt
index 38b1723b..43a61e4f 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -24,6 +24,20 @@ link_directories (${GLIB_LIBRARY_DIRS})
set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -Wall")
+pkg_search_module(MSGPACK msgpack)
+if (MSGPACK_FOUND)
+ message(STATUS "Found Messagepack")
+else ()
+ message(STATUS "MessagePack not found.")
+endif ()
+
+pkg_search_module(ZMQ libzmq)
+if (ZMQ_FOUND)
+ message(STATUS "Found ZMQ")
+else ()
+ message(STATUS "ZMQ not found.")
+endif ()
+
pkg_search_module(GTK gtk+-3.0)
if (NOT GTK_FOUND)
@@ -102,6 +116,8 @@ set(HAVE_GTK ${GTK_FOUND})
set(HAVE_OPENCL ${OpenCL_FOUND})
set(HAVE_GDKPIXBUF ${GDKPIXBUF_FOUND})
set(HAVE_GDK ${GDK_FOUND})
+set(HAVE_MSGPACK ${MSGPACK_FOUND})
+set(HAVE_ZMQ ${ZMQ_FOUND})
set(PACKAGE_VERSION ${PROJECT_VERSION})
@@ -259,11 +275,21 @@ list(APPEND CRYSTFEL_EXECUTABLES list_events)
set(INDEXAMAJIG_SOURCES src/indexamajig.c src/im-sandbox.c src/process_image.c
src/time-accounts.c)
+if ( ZMQ_FOUND AND MSGPACK_FOUND )
+ list(APPEND INDEXAMAJIG_SOURCES src/im-zmq.c)
+endif ()
+
add_executable(indexamajig ${INDEXAMAJIG_SOURCES})
target_include_directories(indexamajig PRIVATE ${COMMON_INCLUDES})
target_link_libraries(indexamajig ${COMMON_LIBRARIES})
list(APPEND CRYSTFEL_EXECUTABLES indexamajig)
+if ( ZMQ_FOUND AND MSGPACK_FOUND )
+ target_include_directories(indexamajig PRIVATE ${ZMQ_INCLUDE_DIR} ${MSGPACK_INCLUDE_DIR})
+ target_link_libraries(indexamajig ${ZMQ_LIBRARIES} ${MSGPACK_LIBRARIES})
+endif ()
+
+
# ----------------------------------------------------------------------
# get_hkl
diff --git a/config.h.cmake.in b/config.h.cmake.in
index 2dd5d5dd..6d630f9e 100644
--- a/config.h.cmake.in
+++ b/config.h.cmake.in
@@ -8,6 +8,8 @@
#cmakedefine HAVE_OPENCL
#cmakedefine HAVE_CL_CL_H
#cmakedefine HAVE_CLOCK_GETTIME
+#cmakedefine HAVE_MSGPACK
+#cmakedefine HAVE_ZMQ
#define PACKAGE_VERSION "${CRYSTFEL_VERSION}"
diff --git a/doc/man/indexamajig.1 b/doc/man/indexamajig.1
index 7be0ee6c..1866231a 100644
--- a/doc/man/indexamajig.1
+++ b/doc/man/indexamajig.1
@@ -220,6 +220,16 @@ Put the temporary folder under \fIpath\fR.
.PD
Wait at most \fIn\fR seconds for each image file in the input list to be created before trying to process it. This is useful for some automated processing pipelines. It obviously only really works for single-frame files. If a file exists but is not readable when this option is set non-zero, a second attempt will be made after ten seconds. This is to allow for incompletely written files. A value of -1 means to wait forever. The default value is \fB--wait-for-file=0\fR.
+.PD 0
+.IP \fB--zmq-msgpack\fR
+.PD
+Receive data as MessagePack objects over ZeroMQ. The input "file list", given with \fB--input\fR or \fB-i\fR, should contain a socket URL suitable for passing to zmq_connect(), such as "tcp://127.0.0.1:12322". At the moment, only one URL can be given, but this may change in future.
+
+.IP \fB--no-image-data\fR
+.PD
+Do not load the actual image data (or bad pixel masks), only the metadata. This allows you to check if patterns can be indexed, without high data bandwidth requirements. Obviously, any feature requiring the image data, especially peak search procedures and integration, cannot be used in this case. At the moment, this option only works when \fB--zmq-msgpack\fR is also used. You will probably want to use \fB--peaks=msgpack\fR.
+
+
.SH PEAK SEARCH OPTIONS
.PD 0
.IP \fB--peaks=\fR\fImethod\fR
diff --git a/libcrystfel/src/image.c b/libcrystfel/src/image.c
index c7069faa..44ef2774 100644
--- a/libcrystfel/src/image.c
+++ b/libcrystfel/src/image.c
@@ -1178,6 +1178,8 @@ enum imagefile_type imagefile_get_type(struct imagefile *f)
struct hdfile *imagefile_get_hdfile(struct imagefile *f)
{
+ if ( f == NULL ) return NULL;
+
if ( f->type != IMAGEFILE_HDF5 ) {
ERROR("Not an HDF5 file!\n");
return NULL;
diff --git a/src/im-sandbox.c b/src/im-sandbox.c
index e8fbe763..0ec20ea2 100644
--- a/src/im-sandbox.c
+++ b/src/im-sandbox.c
@@ -3,17 +3,18 @@
*
* Sandbox for indexing
*
- * Copyright © 2012-2018 Deutsches Elektronen-Synchrotron DESY,
+ * Copyright © 2012-2019 Deutsches Elektronen-Synchrotron DESY,
* a research centre of the Helmholtz Association.
* Copyright © 2012 Richard Kirian
* Copyright © 2012 Lorenzo Galli
*
* Authors:
- * 2010-2018 Thomas White <taw@physics.org>
+ * 2010-2019 Thomas White <taw@physics.org>
* 2014 Valerio Mariani
* 2011 Richard Kirian
* 2012 Lorenzo Galli
* 2012 Chunhong Yoon
+ * 2017 Stijn de Graaf
*
* This file is part of CrystFEL.
*
@@ -64,6 +65,7 @@
#include "im-sandbox.h"
#include "process_image.h"
#include "time-accounts.h"
+#include "im-zmq.h"
struct sandbox
@@ -92,6 +94,10 @@ struct sandbox
const char *tmpdir;
+ /* ZMQ mode */
+ int zmq;
+ const char *zmq_address;
+
/* Final output */
Stream *stream;
};
@@ -360,11 +366,23 @@ void set_last_task(char *lt, const char *task)
}
-static void run_work(const struct index_args *iargs, Stream *st,
- int cookie, const char *tmpdir, struct sandbox *sb)
+static int run_work(const struct index_args *iargs, Stream *st,
+ int cookie, const char *tmpdir, struct sandbox *sb)
{
int allDone = 0;
- TimeAccounts *taccs = time_accounts_init();
+ TimeAccounts *taccs;
+ struct im_zmq *zmqstuff = NULL;
+
+ /* Connect via ZMQ */
+ if ( sb->zmq ) {
+ zmqstuff = im_zmq_connect(sb->zmq_address);
+ if ( zmqstuff == NULL ) {
+ ERROR("ZMQ setup failed.\n");
+ return 1;
+ }
+ }
+
+ taccs = time_accounts_init();
while ( !allDone ) {
@@ -375,58 +393,71 @@ static void run_work(const struct index_args *iargs, Stream *st,
struct event *ev;
int r;
- /* Wait until an event is ready */
- time_accounts_set(taccs, TACC_EVENTWAIT);
- set_last_task(sb->shared->last_task[cookie], "wait_event");
- if ( sem_wait(sb->queue_sem) != 0 ) {
- ERROR("Failed to wait on queue semaphore: %s\n",
- strerror(errno));
- }
+ if ( !sb->zmq ) {
- /* Get the event from the queue */
- set_last_task(sb->shared->last_task[cookie], "read_queue");
- pthread_mutex_lock(&sb->shared->queue_lock);
- if ( (sb->shared->n_events==0) && (sb->shared->no_more) ) {
- /* Queue is empty and no more coming, so exit */
- pthread_mutex_unlock(&sb->shared->queue_lock);
- allDone = 1;
- continue;
- }
- if ( sb->shared->n_events == 0 ) {
- ERROR("Got the semaphore, but no events in queue!\n");
- ERROR("no_more = %i\n", sb->shared->no_more);
+ /* Wait until an event is ready */
+ time_accounts_set(taccs, TACC_EVENTWAIT);
+ set_last_task(sb->shared->last_task[cookie], "wait_event");
+ if ( sem_wait(sb->queue_sem) != 0 ) {
+ ERROR("Failed to wait on queue semaphore: %s\n",
+ strerror(errno));
+ }
+
+ /* Get the event from the queue */
+ set_last_task(sb->shared->last_task[cookie], "read_queue");
+ pthread_mutex_lock(&sb->shared->queue_lock);
+ if ( (sb->shared->n_events==0) && (sb->shared->no_more) ) {
+ /* Queue is empty and no more coming, so exit */
+ pthread_mutex_unlock(&sb->shared->queue_lock);
+ allDone = 1;
+ continue;
+ }
+ if ( sb->shared->n_events == 0 ) {
+ ERROR("Got the semaphore, but no events in queue!\n");
+ ERROR("no_more = %i\n", sb->shared->no_more);
+ pthread_mutex_unlock(&sb->shared->queue_lock);
+ allDone = 1;
+ continue;
+ }
+ r = sscanf(sb->shared->queue[0], "%s %s %i",
+ filename, event_str, &ser);
+ if ( r != 3 ) {
+ STATUS("Invalid event string '%s'\n",
+ sb->shared->queue[0]);
+ }
+ memcpy(sb->shared->last_ev[cookie], sb->shared->queue[0],
+ MAX_EV_LEN);
+ shuffle_events(sb->shared);
pthread_mutex_unlock(&sb->shared->queue_lock);
- allDone = 1;
- continue;
- }
- r = sscanf(sb->shared->queue[0], "%s %s %i",
- filename, event_str, &ser);
- if ( r != 3 ) {
- STATUS("Invalid event string '%s'\n",
- sb->shared->queue[0]);
- }
- memcpy(sb->shared->last_ev[cookie], sb->shared->queue[0],
- MAX_EV_LEN);
- shuffle_events(sb->shared);
- pthread_mutex_unlock(&sb->shared->queue_lock);
- if ( r != 3 ) continue;
+ if ( r != 3 ) continue;
- pargs.filename_p_e = initialize_filename_plus_event();
- pargs.filename_p_e->filename = strdup(filename);
+ pargs.filename_p_e = initialize_filename_plus_event();
+ pargs.filename_p_e->filename = strdup(filename);
- if ( strcmp(event_str, "(none)") != 0 ) {
+ if ( strcmp(event_str, "(none)") != 0 ) {
+
+ ev = get_event_from_event_string(event_str);
+ if ( ev == NULL ) {
+ ERROR("Bad event string '%s'\n", event_str);
+ continue;
+ }
+ pargs.filename_p_e->ev = ev;
+
+ } else {
+
+ pargs.filename_p_e->ev = NULL;
- ev = get_event_from_event_string(event_str);
- if ( ev == NULL ) {
- ERROR("Bad event string '%s'\n", event_str);
- continue;
}
- pargs.filename_p_e->ev = ev;
+ pargs.msgpack_obj = NULL;
} else {
+ pargs.msgpack_obj = im_zmq_fetch(zmqstuff);
+ pargs.filename_p_e = initialize_filename_plus_event();
+ pargs.filename_p_e->filename = strdup("(from ZMQ)");
pargs.filename_p_e->ev = NULL;
+ ser = 0; /* FIXME */
}
@@ -434,10 +465,16 @@ static void run_work(const struct index_args *iargs, Stream *st,
process_image(iargs, &pargs, st, cookie, tmpdir, ser,
sb->shared, taccs, sb->shared->last_task[cookie]);
- free_filename_plus_event(pargs.filename_p_e);
+ if ( !sb->zmq ) {
+ free_filename_plus_event(pargs.filename_p_e);
+ } else {
+ im_zmq_clean(zmqstuff);
+ }
}
+ im_zmq_shutdown(zmqstuff);
+
time_accounts_set(taccs, TACC_FINALCLEANUP);
cleanup_indexing(iargs->ipriv);
free_detector_geometry(iargs->det);
@@ -446,6 +483,7 @@ static void run_work(const struct index_args *iargs, Stream *st,
cell_free(iargs->cell);
if ( iargs->profile ) time_accounts_print(taccs);
time_accounts_free(taccs);
+ return 0;
}
@@ -689,7 +727,7 @@ static void start_worker_process(struct sandbox *sb, int slot)
*/
st = open_stream_fd_for_write(stream_pipe[1]);
- run_work(sb->iargs, st, slot, tmp, sb);
+ r = run_work(sb->iargs, st, slot, tmp, sb);
close_stream(st);
free(tmp);
@@ -699,7 +737,7 @@ static void start_worker_process(struct sandbox *sb, int slot)
free(sb);
- exit(0);
+ exit(r);
}
@@ -804,6 +842,11 @@ static int setup_shm(struct sandbox *sb)
/* Assumes the caller is already holding queue_lock! */
static int fill_queue(struct get_pattern_ctx *gpctx, struct sandbox *sb)
{
+ if ( sb->zmq ) {
+ /* Do nothing */
+ return 0;
+ }
+
while ( sb->shared->n_events < QUEUE_SIZE ) {
struct filename_plus_event *ne;
@@ -998,7 +1041,8 @@ char *create_tempdir(const char *temp_location)
* If the return value is zero, something is probably wrong. */
int create_sandbox(struct index_args *iargs, int n_proc, char *prefix,
int config_basename, FILE *fh,
- Stream *stream, const char *tmpdir, int serial_start)
+ Stream *stream, const char *tmpdir, int serial_start,
+ const char *zmq_address)
{
int i;
struct sandbox *sb;
@@ -1027,6 +1071,12 @@ int create_sandbox(struct index_args *iargs, int n_proc, char *prefix,
sb->iargs = iargs;
sb->serial = serial_start;
sb->tmpdir = tmpdir;
+ if ( zmq_address != NULL ) {
+ sb->zmq = 1;
+ sb->zmq_address = zmq_address;
+ } else {
+ sb->zmq = 0;
+ }
sb->fds = NULL;
sb->fhs = NULL;
diff --git a/src/im-sandbox.h b/src/im-sandbox.h
index ee2de993..9da11526 100644
--- a/src/im-sandbox.h
+++ b/src/im-sandbox.h
@@ -3,13 +3,13 @@
*
* Sandbox for indexing
*
- * Copyright © 2012-2018 Deutsches Elektronen-Synchrotron DESY,
+ * Copyright © 2012-2019 Deutsches Elektronen-Synchrotron DESY,
* a research centre of the Helmholtz Association.
* Copyright © 2012 Richard Kirian
* Copyright © 2012 Lorenzo Galli
*
* Authors:
- * 2010-2018 Thomas White <taw@physics.org>
+ * 2010-2019 Thomas White <taw@physics.org>
* 2011 Richard Kirian
* 2012 Lorenzo Galli
* 2012 Chunhong Yoon
@@ -83,6 +83,7 @@ extern void set_last_task(char *lt, const char *task);
extern int create_sandbox(struct index_args *iargs, int n_proc, char *prefix,
int config_basename, FILE *fh, Stream *stream,
- const char *tempdir, int serial_start);
+ const char *tempdir, int serial_start,
+ const char *zmq_address);
#endif /* IM_SANDBOX_H */
diff --git a/src/im-zmq.c b/src/im-zmq.c
new file mode 100644
index 00000000..7e8bd9ac
--- /dev/null
+++ b/src/im-zmq.c
@@ -0,0 +1,491 @@
+/*
+ * zmq.c
+ *
+ * ZMQ data interface
+ *
+ * Copyright © 2017-2018 Deutsches Elektronen-Synchrotron DESY,
+ * a research centre of the Helmholtz Association.
+ *
+ * Authors:
+ * 2018 Thomas White <taw@physics.org>
+ * 2014 Valerio Mariani
+ * 2017 Stijn de Graaf
+ *
+ * This file is part of CrystFEL.
+ *
+ * CrystFEL is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * CrystFEL is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with CrystFEL. If not, see <http://www.gnu.org/licenses/>.
+ *
+ */
+
+#ifdef HAVE_CONFIG_H
+#include <config.h>
+#endif
+
+#include <stdlib.h>
+#include <stdio.h>
+#include <stdint.h>
+#include <hdf5.h>
+#include <assert.h>
+#include <unistd.h>
+#include <zmq.h>
+#include <msgpack.h>
+
+#include "events.h"
+#include "image.h"
+#include "hdf5-file.h"
+#include "utils.h"
+
+
+struct im_zmq
+{
+ void *ctx;
+ void *socket;
+ zmq_msg_t msg;
+ msgpack_unpacked unpacked;
+ int unpacked_set;
+};
+
+
+struct im_zmq *im_zmq_connect(const char *zmq_address)
+{
+ struct im_zmq *z;
+
+ z = malloc(sizeof(struct im_zmq));
+ if ( z == NULL ) return NULL;
+
+ z->unpacked_set = 0;
+
+ z->ctx = zmq_ctx_new();
+ if ( z->ctx == NULL ) return NULL;
+
+ z->socket = zmq_socket(z->ctx, ZMQ_REQ);
+ if ( z->socket == NULL ) return NULL;
+
+ STATUS("Connecting to ZMQ at '%s'\n", zmq_address);
+ if ( zmq_connect(z->socket, zmq_address) == -1 ) {
+ ERROR("ZMQ connection failed: %s\n", zmq_strerror(errno));
+ return NULL;
+ }
+ STATUS("ZMQ connected.\n");
+
+ return z;
+}
+
+
+msgpack_object *im_zmq_fetch(struct im_zmq *z)
+{
+ int msg_size;
+ int r;
+
+ if ( zmq_send(z->socket, "m", 1, 0) == -1 ) {
+ ERROR("ZMQ message send failed: %s\n", zmq_strerror(errno));
+ return NULL;
+ }
+
+ zmq_msg_init(&z->msg);
+ msg_size = zmq_msg_recv(&z->msg, z->socket, 0);
+ if ( msg_size == -1 ) {
+ ERROR("ZMQ recieve failed: %s\n", zmq_strerror(errno));
+ zmq_msg_close(&z->msg);
+ return NULL;
+ }
+
+ msgpack_unpacked_init(&z->unpacked);
+ r = msgpack_unpack_next(&z->unpacked, zmq_msg_data(&z->msg),
+ msg_size, NULL);
+ if ( r != MSGPACK_UNPACK_SUCCESS ) {
+ ERROR("Msgpack unpack failed: %i\n", r);
+ zmq_msg_close(&z->msg);
+ return NULL;
+ }
+ z->unpacked_set = 1;
+
+ return &z->unpacked.data;
+}
+
+
+/* Clean structures ready for next frame */
+void im_zmq_clean(struct im_zmq *z)
+{
+ if ( z->unpacked_set ) {
+ msgpack_unpacked_destroy(&z->unpacked);
+ zmq_msg_close(&z->msg);
+ z->unpacked_set = 0;
+ }
+}
+
+
+void im_zmq_shutdown(struct im_zmq *z)
+{
+ if ( z == NULL ) return;
+
+ zmq_msg_close(&z->msg);
+ zmq_close(z->socket);
+ zmq_ctx_destroy(z->ctx);
+}
+
+
+static msgpack_object *find_msgpack_kv(msgpack_object *obj, const char *key)
+{
+ int i;
+
+ if ( obj == NULL ) return NULL;
+ if ( obj->type != MSGPACK_OBJECT_MAP ) return NULL;
+
+ for ( i=0; i<obj->via.map.size; i++ ) {
+ const char *kstr;
+ size_t klen;
+ assert(obj->via.map.ptr[i].key.type == MSGPACK_OBJECT_STR);
+ kstr = obj->via.map.ptr[i].key.via.str.ptr;
+ klen = obj->via.map.ptr[i].key.via.str.size;
+ if ( strncmp(kstr, key, klen) == 0 ) {
+ return &obj->via.map.ptr[i].val;
+ }
+ }
+ return NULL;
+}
+
+
+/**
+ * get_peaks_msgpack:
+ * @obj: A %msgpack_object containing data in OnDA format
+ * @image: An %image structure
+ * @half_pixel_shift: Non-zero if 0.5 should be added to all peak coordinates
+ *
+ * Get peaks from msgpack_object. The data should be in a map, with the value
+ * given by "peak_list" as an array of arrays. The first of these should contain
+ * the list of fs positions of the peaks, the second the ss positions, and the
+ * third the intensities of the peaks.
+ *
+ * http://c.msgpack.org/c/ provides documentation on msgpack objects
+ *
+ * CrystFEL considers all peak locations to be distances from the corner of the
+ * detector panel, in pixel units, consistent with its description of detector
+ * geometry (see 'man crystfel_geometry'). The software which generates the
+ * CXI files, including Cheetah, may instead consider the peak locations to be
+ * pixel indices in the data array. In this case, the peak coordinates should
+ * have 0.5 added to them. This will be done if @half_pixel_shift is non-zero.
+ *
+ * Returns: non-zero on error, zero otherwise.
+ *
+ */
+int get_peaks_msgpack(msgpack_object *obj, struct image *image,
+ int half_pixel_shift)
+{
+
+ int num_peaks;
+ int pk;
+ msgpack_object *peak_list;
+ msgpack_object *peak_x;
+ msgpack_object *peak_y;
+ msgpack_object *peak_i;
+ double peak_offset = half_pixel_shift ? 0.5 : 0.0;
+
+ if ( obj == NULL ) {
+ ERROR("No MessagePack object to get peaks from.\n");
+ return 1;
+ }
+
+ /* Object has structure:
+ * {
+ * "peak_list": [[peak_x], [peak_y], [peak_i]]
+ * "key2":val2,
+ * ...
+ * }
+ */
+ peak_list = find_msgpack_kv(obj, "peak_list");
+ peak_x = &peak_list->via.array.ptr[0];
+ peak_y = &peak_list->via.array.ptr[1];
+ peak_i = &peak_list->via.array.ptr[2];
+
+ /* Length of peak_x array gives number of peaks */
+ num_peaks = peak_x->via.array.size;
+
+ if ( image->features != NULL ) {
+ image_feature_list_free(image->features);
+ }
+ image->features = image_feature_list_new();
+ image->num_peaks = num_peaks;
+
+ for ( pk=0; pk<num_peaks; pk++ ) {
+
+ float fs, ss, val;
+ struct panel *p;
+
+ /* Retrieve data from peak_list and apply half_pixel_shift,
+ * if appropriate */
+ fs = peak_x->via.array.ptr[pk].via.f64 + peak_offset;
+ ss = peak_y->via.array.ptr[pk].via.f64 + peak_offset;
+ val = peak_i->via.array.ptr[pk].via.f64;
+
+ p = find_orig_panel(image->det, fs, ss);
+ if ( p == NULL ) continue;
+ if ( p->no_index ) continue;
+
+ /* Convert coordinates to panel-relative */
+ fs = fs - p->orig_min_fs;
+ ss = ss - p->orig_min_ss;
+
+ image_add_feature(image->features, fs, ss, p, image, val, NULL);
+ }
+
+ return 0;
+}
+
+
+static void im_zmq_fill_in_clen(struct detector *det)
+{
+ int i = 0;
+ for ( i=0; i<det->n_panels; i++) {
+ struct panel *p = &det->panels[i];
+ if ( p->clen_from != NULL ) {
+ ERROR("Can't get clen over ZMQ yet.\n");
+ }
+ adjust_centering_for_rail(p);
+ }
+}
+
+
+static void im_zmq_fill_in_beam_parameters(struct beam_params *beam,
+ struct image *image)
+{
+ double eV;
+ if ( beam->photon_energy_from == NULL ) {
+ /* Explicit value given */
+ eV = beam->photon_energy;
+ } else {
+ ERROR("Can't get photon energy over ZMQ yet.\n");
+ eV = 0.0;
+ }
+ image->lambda = ph_en_to_lambda(eV_to_J(eV))*beam->photon_energy_scale;
+}
+
+
+static int unpack_slab(struct image *image, double *data,
+ int data_width, int data_height)
+{
+ uint16_t *flags = NULL;
+ float *sat = NULL;
+ int pi;
+
+ image->dp = malloc(image->det->n_panels*sizeof(float *));
+ image->bad = malloc(image->det->n_panels*sizeof(int *));
+ image->sat = malloc(image->det->n_panels*sizeof(float *));
+ if ( (image->dp == NULL) || (image->bad == NULL) || (image->sat == NULL) ) {
+ ERROR("Failed to allocate data arrays.\n");
+ return 1;
+ }
+
+ for ( pi=0; pi<image->det->n_panels; pi++ ) {
+
+ struct panel *p;
+ int fs, ss;
+
+ p = &image->det->panels[pi];
+ image->dp[pi] = malloc(p->w*p->h*sizeof(float));
+ image->bad[pi] = malloc(p->w*p->h*sizeof(int));
+ image->sat[pi] = malloc(p->w*p->h*sizeof(float));
+ if ( (image->dp[pi] == NULL) || (image->bad[pi] == NULL)
+ || (image->sat[pi] == NULL) )
+ {
+ ERROR("Failed to allocate panel\n");
+ return 1;
+ }
+
+ if ( (p->orig_min_fs + p->w > data_width)
+ || (p->orig_min_ss + p->h > data_height) )
+ {
+ ERROR("Panel %s is outside range of data provided\n",
+ p->name);
+ return 1;
+ }
+
+ for ( ss=0; ss<p->h; ss++) {
+ for ( fs=0; fs<p->w; fs++) {
+
+ int idx;
+ int cfs, css;
+ int bad = 0;
+
+ cfs = fs+p->orig_min_fs;
+ css = ss+p->orig_min_ss;
+ idx = cfs + css*data_width;
+
+ image->dp[pi][fs+p->w*ss] = data[idx];
+
+ if ( sat != NULL ) {
+ image->sat[pi][fs+p->w*ss] = sat[idx];
+ } else {
+ image->sat[pi][fs+p->w*ss] = INFINITY;
+ }
+
+ if ( p->no_index ) bad = 1;
+
+ if ( in_bad_region(image->det, p, cfs, css) ) {
+ bad = 1;
+ }
+
+ if ( isnan(data[idx]) || isinf(data[idx]) ) bad = 1;
+
+ if ( flags != NULL ) {
+
+ int f;
+
+ f = flags[idx];
+
+ if ( (f & image->det->mask_good)
+ != image->det->mask_good ) bad = 1;
+
+ if ( f & image->det->mask_bad ) bad = 1;
+
+ }
+ image->bad[pi][fs+p->w*ss] = bad;
+ }
+ }
+
+ }
+
+ return 0;
+}
+
+
+static double *find_msgpack_data(msgpack_object *obj, int *width, int *height)
+{
+ msgpack_object *corr_data_obj;
+ msgpack_object *data_obj;
+ msgpack_object *shape_obj;
+ double *data;
+
+ corr_data_obj = find_msgpack_kv(obj, "corr_data");
+ if ( corr_data_obj == NULL ) {
+ ERROR("No corr_data MessagePack object found.\n");
+ return NULL;
+ }
+
+ data_obj = find_msgpack_kv(corr_data_obj, "data");
+ if ( data_obj == NULL ) {
+ ERROR("No data MessagePack object found inside corr_data.\n");
+ return NULL;
+ }
+ if ( data_obj->type != MSGPACK_OBJECT_STR ) {
+ ERROR("corr_data.data isn't a binary object.\n");
+ return NULL;
+ }
+ data = (double *)data_obj->via.str.ptr;
+
+ shape_obj = find_msgpack_kv(corr_data_obj, "shape");
+ if ( shape_obj == NULL ) {
+ ERROR("No shape MessagePack object found inside corr_data.\n");
+ return NULL;
+ }
+ if ( shape_obj->type != MSGPACK_OBJECT_ARRAY ) {
+ ERROR("corr_data.shape isn't an array object.\n");
+ return NULL;
+ }
+ if ( shape_obj->via.array.size != 2 ) {
+ ERROR("corr_data.shape is wrong size (%i, should be 2)\n",
+ shape_obj->via.array.size);
+ return NULL;
+ }
+ if ( shape_obj->via.array.ptr[0].type != MSGPACK_OBJECT_POSITIVE_INTEGER ) {
+ ERROR("corr_data.shape contains wrong type of element.\n");
+ return NULL;
+ }
+ *height = shape_obj->via.array.ptr[0].via.i64;
+ *width = shape_obj->via.array.ptr[1].via.i64;
+ return data;
+}
+
+
+static double *zero_array(struct detector *det, int *dw, int *dh)
+{
+ int max_fs = 0;
+ int max_ss = 0;
+ int pi;
+ double *data;
+
+ for ( pi=0; pi<det->n_panels; pi++ ) {
+ if ( det->panels[pi].orig_max_fs > max_fs ) {
+ max_fs = det->panels[pi].orig_max_fs;
+ }
+ if ( det->panels[pi].orig_max_ss > max_ss ) {
+ max_ss = det->panels[pi].orig_max_ss;
+ }
+ }
+
+ data = calloc((max_fs+1)*(max_ss+1), sizeof(double));
+ *dw = max_fs+1;
+ *dh = max_ss+1;
+ return data;
+}
+
+
+/* Unpacks the raw panel data from a msgpack_object, applies panel geometry,
+ * and stores the resulting data in an image struct. Object has structure
+ * {
+ * "corr_data":
+ * {
+ * "data": binary_data,
+ * "shape": [data_height, data_width],
+ * ...
+ * ...
+ * },
+ * "key2": val2,
+ * ...
+ * ...
+ * }
+ */
+int unpack_msgpack_data(msgpack_object *obj, struct image *image,
+ int no_image_data)
+{
+ int data_width, data_height;
+ double *data;
+
+ if ( image->det == NULL ) {
+ ERROR("Geometry not available.\n");
+ return 1;
+ }
+
+ if ( obj == NULL ) {
+ ERROR("No MessagePack object!\n");
+ return 1;
+ }
+
+ if ( !no_image_data ) {
+ data = find_msgpack_data(obj, &data_width, &data_height);
+ if ( data == NULL ) {
+ ERROR("No image data in MessagePack object.\n");
+ return 1;
+ }
+ } else {
+ data = zero_array(image->det, &data_width, &data_height);
+ }
+
+ if ( unpack_slab(image, data, data_width, data_height) ) {
+ ERROR("Failed to unpack data slab.\n");
+ return 1;
+ }
+
+ if ( image->beam != NULL ) {
+ im_zmq_fill_in_beam_parameters(image->beam, image);
+ if ( image->lambda > 1000 ) {
+ ERROR("Warning: Missing or nonsensical wavelength "
+ "(%e m).\n", image->lambda);
+ }
+ }
+ im_zmq_fill_in_clen(image->det);
+ fill_in_adu(image);
+
+ return 0;
+}
diff --git a/src/im-zmq.h b/src/im-zmq.h
new file mode 100644
index 00000000..a7cbbfe9
--- /dev/null
+++ b/src/im-zmq.h
@@ -0,0 +1,77 @@
+/*
+ * zmq.h
+ *
+ * ZMQ data interface
+ *
+ * Copyright © 2017-2019 Deutsches Elektronen-Synchrotron DESY,
+ * a research centre of the Helmholtz Association.
+ *
+ * Authors:
+ * 2018-2019 Thomas White <taw@physics.org>
+ * 2014 Valerio Mariani
+ * 2017 Stijn de Graaf
+ *
+ * This file is part of CrystFEL.
+ *
+ * CrystFEL is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * CrystFEL is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with CrystFEL. If not, see <http://www.gnu.org/licenses/>.
+ *
+ */
+
+
+#ifndef CRYSTFEL_ZMQ_H
+#define CRYSTFEL_ZMQ_H
+
+#ifdef HAVE_CONFIG_H
+#include <config.h>
+#endif
+
+#include "image.h"
+
+#if defined(HAVE_MSGPACK) && defined(HAVE_ZMQ)
+
+#include <msgpack.h>
+
+extern struct im_zmq *im_zmq_connect(const char *zmq_address);
+
+extern void im_zmq_clean(struct im_zmq *z);
+
+extern void im_zmq_shutdown(struct im_zmq *z);
+
+extern msgpack_object *im_zmq_fetch(struct im_zmq *z);
+
+extern int get_peaks_msgpack(msgpack_object *obj, struct image *image,
+ int half_pixel_shift);
+
+extern int unpack_msgpack_data(msgpack_object *obj, struct image *image,
+ int no_image_data);
+
+#else /* defined(HAVE_MSGPACK) && defined(HAVE_ZMQ) */
+
+static UNUSED struct im_zmq *im_zmq_connect(const char *zmq_address) { return NULL; }
+
+static UNUSED void im_zmq_clean(struct im_zmq *z) { return; }
+
+static UNUSED void im_zmq_shutdown(struct im_zmq *z) { return; }
+
+static UNUSED void *im_zmq_fetch(struct im_zmq *z) { return NULL; }
+
+static UNUSED int get_peaks_msgpack(void *obj, struct image *image,
+ int half_pixel_shift) { return 0; }
+
+static UNUSED int unpack_msgpack_data(void *obj, struct image *image,
+ int no_image_data) { return 1; }
+
+#endif /* defined(HAVE_MSGPACK) && defined(HAVE_ZMQ) */
+
+#endif /* CRYSTFEL_ZMQ_H */
diff --git a/src/indexamajig.c b/src/indexamajig.c
index e4568966..739b4969 100644
--- a/src/indexamajig.c
+++ b/src/indexamajig.c
@@ -86,9 +86,11 @@ static void show_help(const char *s)
" --profile Show timing data for performance monitoring\n"
" --temp-dir=<path> Put the temporary folder under <path>\n"
" --wait-for-file=<n> Time to wait for each file before processing\n"
+" --zmq-msgpack Receive data in MessagePack format over ZMQ\n"
+" --no-image-data Do not load image data (from ZMQ)\n"
"\nPeak search options:\n\n"
-" --peaks=<method> Peak search method (zaef,peakfinder8,peakfinder9,hdf5,cxi)\n"
-" Default: zaef\n"
+" --peaks=<method> Peak search method. Default: zaef\n"
+" (zaef,peakfinder8,peakfinder9,hdf5,cxi,msgpack)\n"
" --peak-radius=<r> Integration radii for peak search\n"
" --min-peaks=<n> Minimum number of peaks for indexing\n"
" --hdf5-peaks=<p> Find peaks table in HDF5 file here\n"
@@ -287,6 +289,8 @@ int main(int argc, char *argv[])
int if_retry = 1;
int serial_start = 1;
char *spectrum_fn = NULL;
+ int zmq = 0;
+ char *zmq_address = NULL;
/* Defaults */
iargs.cell = NULL;
@@ -344,6 +348,7 @@ int main(int argc, char *argv[])
iargs.fix_bandwidth = -1.0;
iargs.fix_divergence = -1.0;
iargs.profile = 0;
+ iargs.no_image_data = 0;
iargs.taketwo_opts.member_thresh = -1;
iargs.taketwo_opts.len_tol = -1.0;
iargs.taketwo_opts.angle_tol = -1.0;
@@ -405,6 +410,8 @@ int main(int argc, char *argv[])
{"no-multi", 0, &if_multi, 0},
{"multi", 0, &if_multi, 1},
{"overpredict", 0, &iargs.overpredict, 1},
+ {"zmq-msgpack", 0, &zmq, 1},
+ {"no-image-data", 0, &iargs.no_image_data, 1},
/* Long-only options which don't actually do anything */
{"no-sat-corr", 0, &iargs.satcorr, 0},
@@ -975,6 +982,8 @@ int main(int argc, char *argv[])
iargs.peaks = PEAK_CXI;
} else if ( strcmp(speaks, "peakfinder9") == 0 ) {
iargs.peaks = PEAK_PEAKFINDER9;
+ } else if ( strcmp(speaks, "msgpack") == 0 ) {
+ iargs.peaks = PEAK_MSGPACK;
} else {
ERROR("Unrecognised peak detection method '%s'\n", speaks);
return 1;
@@ -1269,8 +1278,22 @@ int main(int argc, char *argv[])
gsl_set_error_handler_off();
+ if ( zmq ) {
+ char line[1024];
+ char *rval;
+ rval = fgets(line, 1024, fh);
+ if ( rval == NULL ) {
+ ERROR("Failed to read ZMQ server/port from input.\n");
+ return 1;
+ }
+ chomp(line);
+ zmq_address = strdup(line);
+ /* In future, read multiple addresses and hand them out
+ * evenly to workers */
+ }
+
r = create_sandbox(&iargs, n_proc, prefix, config_basename, fh,
- st, tmpdir, serial_start);
+ st, tmpdir, serial_start, zmq_address);
free_imagefile_field_list(iargs.copyme);
cell_free(iargs.cell);
diff --git a/src/process_image.c b/src/process_image.c
index 2f2eb698..a9bf1cdb 100644
--- a/src/process_image.c
+++ b/src/process_image.c
@@ -3,12 +3,13 @@
*
* The processing pipeline for one image
*
- * Copyright © 2012-2018 Deutsches Elektronen-Synchrotron DESY,
+ * Copyright © 2012-2019 Deutsches Elektronen-Synchrotron DESY,
* a research centre of the Helmholtz Association.
*
* Authors:
- * 2010-2017 Thomas White <taw@physics.org>
+ * 2010-2019 Thomas White <taw@physics.org>
* 2014-2017 Valerio Mariani <valerio.mariani@desy.de>
+ * 2017 Stijn de Graaf
*
* This file is part of CrystFEL.
*
@@ -54,6 +55,7 @@
#include "predict-refine.h"
#include "im-sandbox.h"
#include "time-accounts.h"
+#include "im-zmq.h"
static float **backup_image_data(float **dp, struct detector *det)
@@ -98,34 +100,16 @@ static void restore_image_data(float **dp, struct detector *det, float **bu)
}
-void process_image(const struct index_args *iargs, struct pattern_args *pargs,
- Stream *st, int cookie, const char *tmpdir,
- int serial, struct sb_shm *sb_shared, TimeAccounts *taccs,
- char *last_task)
+static int file_wait_open_read(struct sb_shm *sb_shared, struct image *image,
+ TimeAccounts *taccs, char *last_task,
+ signed int wait_for_file, int cookie,
+ struct imagefile **pimfile)
{
- struct imagefile *imfile;
- struct image image;
- int i;
- int r;
- int ret;
- char *rn;
- float **prefilter;
- int any_crystals;
+ signed int file_wait_time = wait_for_file;
int wait_message_done = 0;
int read_retry_done = 0;
- signed int file_wait_time = iargs->wait_for_file;
-
- image.features = NULL;
- image.copyme = iargs->copyme;
- image.id = cookie;
- image.filename = pargs->filename_p_e->filename;
- image.event = pargs->filename_p_e->ev;
- image.beam = iargs->beam;
- image.det = copy_geom(iargs->det);
- image.crystals = NULL;
- image.n_crystals = 0;
- image.serial = serial;
- image.indexed_by = INDEXING_NONE;
+ int r;
+ struct imagefile *imfile;
time_accounts_set(taccs, TACC_WAITFILE);
set_last_task(last_task, "wait for file");
@@ -135,29 +119,27 @@ void process_image(const struct index_args *iargs, struct pattern_args *pargs,
struct stat statbuf;
sb_shared->pings[cookie]++;
- r = stat(image.filename, &statbuf);
+ r = stat(image->filename, &statbuf);
if ( r ) {
- if ( (iargs->wait_for_file != 0)
- && (file_wait_time != 0) )
- {
+ if ( (wait_for_file != 0) && (file_wait_time != 0) ) {
if ( !wait_message_done ) {
STATUS("Waiting for '%s'\n",
- image.filename);
+ image->filename);
wait_message_done = 1;
}
sleep(1);
- if ( iargs->wait_for_file != -1 ) {
+ if ( wait_for_file != -1 ) {
file_wait_time--;
}
continue;
}
- ERROR("File %s not found\n", image.filename);
- return;
+ ERROR("File %s not found\n", image->filename);
+ return 1;
}
} while ( r );
@@ -167,42 +149,83 @@ void process_image(const struct index_args *iargs, struct pattern_args *pargs,
sb_shared->pings[cookie]++;
do {
- imfile = imagefile_open(image.filename);
+ imfile = imagefile_open(image->filename);
if ( imfile == NULL ) {
- if ( iargs->wait_for_file && !read_retry_done ) {
+ if ( wait_for_file && !read_retry_done ) {
read_retry_done = 1;
r = 1;
STATUS("File '%s' exists but could not be opened."
" Trying again after 10 seconds.\n",
- image.filename);
+ image->filename);
sleep(10);
continue;
}
- ERROR("Couldn't open file: %s\n", image.filename);
- return;
+ ERROR("Couldn't open file: %s\n", image->filename);
+ return 1;
}
time_accounts_set(taccs, TACC_HDF5READ);
set_last_task(last_task, "read file");
sb_shared->pings[cookie]++;
- r = imagefile_read(imfile, &image, image.event);
+ r = imagefile_read(imfile, image, image->event);
if ( r ) {
- if ( iargs->wait_for_file && !read_retry_done ) {
+ if ( wait_for_file && !read_retry_done ) {
read_retry_done = 1;
imagefile_close(imfile);
STATUS("File '%s' exists but could not be read."
" Trying again after 10 seconds.\n",
- image.filename);
+ image->filename);
sleep(10);
continue;
}
- ERROR("Couldn't open file: %s\n", image.filename);
- return;
+ ERROR("Couldn't open file: %s\n", image->filename);
+ return 1;
}
} while ( r );
+ *pimfile = imfile;
+ return 0;
+}
+
+
+void process_image(const struct index_args *iargs, struct pattern_args *pargs,
+ Stream *st, int cookie, const char *tmpdir,
+ int serial, struct sb_shm *sb_shared, TimeAccounts *taccs,
+ char *last_task)
+{
+ struct imagefile *imfile = NULL;
+ struct image image;
+ int i;
+ int r;
+ int ret;
+ char *rn;
+ float **prefilter;
+ int any_crystals;
+
+ image.features = NULL;
+ image.copyme = iargs->copyme;
+ image.id = cookie;
+ image.beam = iargs->beam;
+ image.det = copy_geom(iargs->det);
+ image.crystals = NULL;
+ image.n_crystals = 0;
+ image.serial = serial;
+ image.indexed_by = INDEXING_NONE;
+
+ image.filename = pargs->filename_p_e->filename;
+ image.event = pargs->filename_p_e->ev;
+ if ( pargs->msgpack_obj != NULL ) {
+ set_last_task(last_task, "unpacking messagepack object");
+ if ( unpack_msgpack_data(pargs->msgpack_obj, &image,
+ iargs->no_image_data) ) return;
+ } else {
+ if ( file_wait_open_read(sb_shared, &image, taccs, last_task,
+ iargs->wait_for_file, cookie,
+ &imfile) ) return;
+ }
+
/* Take snapshot of image before applying horrible noise filters */
time_accounts_set(taccs, TACC_FILTER);
set_last_task(last_task, "image filter");
@@ -315,6 +338,11 @@ void process_image(const struct index_args *iargs, struct pattern_args *pargs,
}
break;
+ case PEAK_MSGPACK:
+ get_peaks_msgpack(pargs->msgpack_obj, &image,
+ iargs->half_pixel_shift);
+ break;
+
}
image.peak_resolution = estimate_peak_resolution(image.features,
@@ -473,5 +501,6 @@ out:
image_feature_list_free(image.features);
free_detector_geometry(image.det);
- imagefile_close(imfile);
+ if ( imfile != NULL ) imagefile_close(imfile);
+ set_last_task(last_task, "sandbox");
}
diff --git a/src/process_image.h b/src/process_image.h
index 5939ae4d..8d6682a6 100644
--- a/src/process_image.h
+++ b/src/process_image.h
@@ -3,11 +3,11 @@
*
* The processing pipeline for one image
*
- * Copyright © 2012-2018 Deutsches Elektronen-Synchrotron DESY,
+ * Copyright © 2012-2019 Deutsches Elektronen-Synchrotron DESY,
* a research centre of the Helmholtz Association.
*
* Authors:
- * 2010-2016 Thomas White <taw@physics.org>
+ * 2010-2019 Thomas White <taw@physics.org>
* 2014-2017 Valerio Mariani <valerio.mariani@desy.de>
* 2017-2018 Yaroslav Gevorkov <yaroslav.gevorkov@desy.de>
*
@@ -37,6 +37,10 @@
struct index_args;
+#ifdef HAVE_MSGPACK
+#include <msgpack.h>
+#endif
+
#include "integration.h"
#include "im-sandbox.h"
#include "time-accounts.h"
@@ -51,6 +55,7 @@ enum {
PEAK_ZAEF,
PEAK_HDF5,
PEAK_CXI,
+ PEAK_MSGPACK,
};
@@ -114,6 +119,7 @@ struct index_args
struct felix_options felix_opts;
Spectrum *spectrum;
signed int wait_for_file; /* -1 means wait forever */
+ int no_image_data;
};
@@ -122,6 +128,11 @@ struct pattern_args
{
/* "Input" */
struct filename_plus_event *filename_p_e;
+#ifdef HAVE_MSGPACK
+ msgpack_object *msgpack_obj;
+#else
+ void *msgpack_obj;
+#endif
};