aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorThomas White <taw@physics.org>2019-07-17 14:54:00 +0200
committerThomas White <taw@physics.org>2019-07-17 14:54:00 +0200
commitf25b274603861b2eaecbf995493b79530b69d6a9 (patch)
tree7e7a48b77c2c356395b3464323f96503cb008053 /src
parent2b2d41cd933bb1015fbf95b63dd8ec3c990b0767 (diff)
parent8b3f748a90421152b364be5170415ee4e0a3b329 (diff)
Merge branch 'tom/zmq'
Diffstat (limited to 'src')
-rw-r--r--src/im-sandbox.c150
-rw-r--r--src/im-sandbox.h7
-rw-r--r--src/im-zmq.c491
-rw-r--r--src/im-zmq.h77
-rw-r--r--src/indexamajig.c29
-rw-r--r--src/process_image.c121
-rw-r--r--src/process_image.h15
7 files changed, 786 insertions, 104 deletions
diff --git a/src/im-sandbox.c b/src/im-sandbox.c
index e8fbe763..0ec20ea2 100644
--- a/src/im-sandbox.c
+++ b/src/im-sandbox.c
@@ -3,17 +3,18 @@
*
* Sandbox for indexing
*
- * Copyright © 2012-2018 Deutsches Elektronen-Synchrotron DESY,
+ * Copyright © 2012-2019 Deutsches Elektronen-Synchrotron DESY,
* a research centre of the Helmholtz Association.
* Copyright © 2012 Richard Kirian
* Copyright © 2012 Lorenzo Galli
*
* Authors:
- * 2010-2018 Thomas White <taw@physics.org>
+ * 2010-2019 Thomas White <taw@physics.org>
* 2014 Valerio Mariani
* 2011 Richard Kirian
* 2012 Lorenzo Galli
* 2012 Chunhong Yoon
+ * 2017 Stijn de Graaf
*
* This file is part of CrystFEL.
*
@@ -64,6 +65,7 @@
#include "im-sandbox.h"
#include "process_image.h"
#include "time-accounts.h"
+#include "im-zmq.h"
struct sandbox
@@ -92,6 +94,10 @@ struct sandbox
const char *tmpdir;
+ /* ZMQ mode */
+ int zmq;
+ const char *zmq_address;
+
/* Final output */
Stream *stream;
};
@@ -360,11 +366,23 @@ void set_last_task(char *lt, const char *task)
}
-static void run_work(const struct index_args *iargs, Stream *st,
- int cookie, const char *tmpdir, struct sandbox *sb)
+static int run_work(const struct index_args *iargs, Stream *st,
+ int cookie, const char *tmpdir, struct sandbox *sb)
{
int allDone = 0;
- TimeAccounts *taccs = time_accounts_init();
+ TimeAccounts *taccs;
+ struct im_zmq *zmqstuff = NULL;
+
+ /* Connect via ZMQ */
+ if ( sb->zmq ) {
+ zmqstuff = im_zmq_connect(sb->zmq_address);
+ if ( zmqstuff == NULL ) {
+ ERROR("ZMQ setup failed.\n");
+ return 1;
+ }
+ }
+
+ taccs = time_accounts_init();
while ( !allDone ) {
@@ -375,58 +393,71 @@ static void run_work(const struct index_args *iargs, Stream *st,
struct event *ev;
int r;
- /* Wait until an event is ready */
- time_accounts_set(taccs, TACC_EVENTWAIT);
- set_last_task(sb->shared->last_task[cookie], "wait_event");
- if ( sem_wait(sb->queue_sem) != 0 ) {
- ERROR("Failed to wait on queue semaphore: %s\n",
- strerror(errno));
- }
+ if ( !sb->zmq ) {
- /* Get the event from the queue */
- set_last_task(sb->shared->last_task[cookie], "read_queue");
- pthread_mutex_lock(&sb->shared->queue_lock);
- if ( (sb->shared->n_events==0) && (sb->shared->no_more) ) {
- /* Queue is empty and no more coming, so exit */
- pthread_mutex_unlock(&sb->shared->queue_lock);
- allDone = 1;
- continue;
- }
- if ( sb->shared->n_events == 0 ) {
- ERROR("Got the semaphore, but no events in queue!\n");
- ERROR("no_more = %i\n", sb->shared->no_more);
+ /* Wait until an event is ready */
+ time_accounts_set(taccs, TACC_EVENTWAIT);
+ set_last_task(sb->shared->last_task[cookie], "wait_event");
+ if ( sem_wait(sb->queue_sem) != 0 ) {
+ ERROR("Failed to wait on queue semaphore: %s\n",
+ strerror(errno));
+ }
+
+ /* Get the event from the queue */
+ set_last_task(sb->shared->last_task[cookie], "read_queue");
+ pthread_mutex_lock(&sb->shared->queue_lock);
+ if ( (sb->shared->n_events==0) && (sb->shared->no_more) ) {
+ /* Queue is empty and no more coming, so exit */
+ pthread_mutex_unlock(&sb->shared->queue_lock);
+ allDone = 1;
+ continue;
+ }
+ if ( sb->shared->n_events == 0 ) {
+ ERROR("Got the semaphore, but no events in queue!\n");
+ ERROR("no_more = %i\n", sb->shared->no_more);
+ pthread_mutex_unlock(&sb->shared->queue_lock);
+ allDone = 1;
+ continue;
+ }
+ r = sscanf(sb->shared->queue[0], "%s %s %i",
+ filename, event_str, &ser);
+ if ( r != 3 ) {
+ STATUS("Invalid event string '%s'\n",
+ sb->shared->queue[0]);
+ }
+ memcpy(sb->shared->last_ev[cookie], sb->shared->queue[0],
+ MAX_EV_LEN);
+ shuffle_events(sb->shared);
pthread_mutex_unlock(&sb->shared->queue_lock);
- allDone = 1;
- continue;
- }
- r = sscanf(sb->shared->queue[0], "%s %s %i",
- filename, event_str, &ser);
- if ( r != 3 ) {
- STATUS("Invalid event string '%s'\n",
- sb->shared->queue[0]);
- }
- memcpy(sb->shared->last_ev[cookie], sb->shared->queue[0],
- MAX_EV_LEN);
- shuffle_events(sb->shared);
- pthread_mutex_unlock(&sb->shared->queue_lock);
- if ( r != 3 ) continue;
+ if ( r != 3 ) continue;
- pargs.filename_p_e = initialize_filename_plus_event();
- pargs.filename_p_e->filename = strdup(filename);
+ pargs.filename_p_e = initialize_filename_plus_event();
+ pargs.filename_p_e->filename = strdup(filename);
- if ( strcmp(event_str, "(none)") != 0 ) {
+ if ( strcmp(event_str, "(none)") != 0 ) {
+
+ ev = get_event_from_event_string(event_str);
+ if ( ev == NULL ) {
+ ERROR("Bad event string '%s'\n", event_str);
+ continue;
+ }
+ pargs.filename_p_e->ev = ev;
+
+ } else {
+
+ pargs.filename_p_e->ev = NULL;
- ev = get_event_from_event_string(event_str);
- if ( ev == NULL ) {
- ERROR("Bad event string '%s'\n", event_str);
- continue;
}
- pargs.filename_p_e->ev = ev;
+ pargs.msgpack_obj = NULL;
} else {
+ pargs.msgpack_obj = im_zmq_fetch(zmqstuff);
+ pargs.filename_p_e = initialize_filename_plus_event();
+ pargs.filename_p_e->filename = strdup("(from ZMQ)");
pargs.filename_p_e->ev = NULL;
+ ser = 0; /* FIXME */
}
@@ -434,10 +465,16 @@ static void run_work(const struct index_args *iargs, Stream *st,
process_image(iargs, &pargs, st, cookie, tmpdir, ser,
sb->shared, taccs, sb->shared->last_task[cookie]);
- free_filename_plus_event(pargs.filename_p_e);
+ if ( !sb->zmq ) {
+ free_filename_plus_event(pargs.filename_p_e);
+ } else {
+ im_zmq_clean(zmqstuff);
+ }
}
+ im_zmq_shutdown(zmqstuff);
+
time_accounts_set(taccs, TACC_FINALCLEANUP);
cleanup_indexing(iargs->ipriv);
free_detector_geometry(iargs->det);
@@ -446,6 +483,7 @@ static void run_work(const struct index_args *iargs, Stream *st,
cell_free(iargs->cell);
if ( iargs->profile ) time_accounts_print(taccs);
time_accounts_free(taccs);
+ return 0;
}
@@ -689,7 +727,7 @@ static void start_worker_process(struct sandbox *sb, int slot)
*/
st = open_stream_fd_for_write(stream_pipe[1]);
- run_work(sb->iargs, st, slot, tmp, sb);
+ r = run_work(sb->iargs, st, slot, tmp, sb);
close_stream(st);
free(tmp);
@@ -699,7 +737,7 @@ static void start_worker_process(struct sandbox *sb, int slot)
free(sb);
- exit(0);
+ exit(r);
}
@@ -804,6 +842,11 @@ static int setup_shm(struct sandbox *sb)
/* Assumes the caller is already holding queue_lock! */
static int fill_queue(struct get_pattern_ctx *gpctx, struct sandbox *sb)
{
+ if ( sb->zmq ) {
+ /* Do nothing */
+ return 0;
+ }
+
while ( sb->shared->n_events < QUEUE_SIZE ) {
struct filename_plus_event *ne;
@@ -998,7 +1041,8 @@ char *create_tempdir(const char *temp_location)
* If the return value is zero, something is probably wrong. */
int create_sandbox(struct index_args *iargs, int n_proc, char *prefix,
int config_basename, FILE *fh,
- Stream *stream, const char *tmpdir, int serial_start)
+ Stream *stream, const char *tmpdir, int serial_start,
+ const char *zmq_address)
{
int i;
struct sandbox *sb;
@@ -1027,6 +1071,12 @@ int create_sandbox(struct index_args *iargs, int n_proc, char *prefix,
sb->iargs = iargs;
sb->serial = serial_start;
sb->tmpdir = tmpdir;
+ if ( zmq_address != NULL ) {
+ sb->zmq = 1;
+ sb->zmq_address = zmq_address;
+ } else {
+ sb->zmq = 0;
+ }
sb->fds = NULL;
sb->fhs = NULL;
diff --git a/src/im-sandbox.h b/src/im-sandbox.h
index ee2de993..9da11526 100644
--- a/src/im-sandbox.h
+++ b/src/im-sandbox.h
@@ -3,13 +3,13 @@
*
* Sandbox for indexing
*
- * Copyright © 2012-2018 Deutsches Elektronen-Synchrotron DESY,
+ * Copyright © 2012-2019 Deutsches Elektronen-Synchrotron DESY,
* a research centre of the Helmholtz Association.
* Copyright © 2012 Richard Kirian
* Copyright © 2012 Lorenzo Galli
*
* Authors:
- * 2010-2018 Thomas White <taw@physics.org>
+ * 2010-2019 Thomas White <taw@physics.org>
* 2011 Richard Kirian
* 2012 Lorenzo Galli
* 2012 Chunhong Yoon
@@ -83,6 +83,7 @@ extern void set_last_task(char *lt, const char *task);
extern int create_sandbox(struct index_args *iargs, int n_proc, char *prefix,
int config_basename, FILE *fh, Stream *stream,
- const char *tempdir, int serial_start);
+ const char *tempdir, int serial_start,
+ const char *zmq_address);
#endif /* IM_SANDBOX_H */
diff --git a/src/im-zmq.c b/src/im-zmq.c
new file mode 100644
index 00000000..7e8bd9ac
--- /dev/null
+++ b/src/im-zmq.c
@@ -0,0 +1,491 @@
+/*
+ * zmq.c
+ *
+ * ZMQ data interface
+ *
+ * Copyright © 2017-2018 Deutsches Elektronen-Synchrotron DESY,
+ * a research centre of the Helmholtz Association.
+ *
+ * Authors:
+ * 2018 Thomas White <taw@physics.org>
+ * 2014 Valerio Mariani
+ * 2017 Stijn de Graaf
+ *
+ * This file is part of CrystFEL.
+ *
+ * CrystFEL is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * CrystFEL is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with CrystFEL. If not, see <http://www.gnu.org/licenses/>.
+ *
+ */
+
+#ifdef HAVE_CONFIG_H
+#include <config.h>
+#endif
+
+#include <stdlib.h>
+#include <stdio.h>
+#include <stdint.h>
+#include <hdf5.h>
+#include <assert.h>
+#include <unistd.h>
+#include <zmq.h>
+#include <msgpack.h>
+
+#include "events.h"
+#include "image.h"
+#include "hdf5-file.h"
+#include "utils.h"
+
+
+struct im_zmq
+{
+ void *ctx;
+ void *socket;
+ zmq_msg_t msg;
+ msgpack_unpacked unpacked;
+ int unpacked_set;
+};
+
+
+struct im_zmq *im_zmq_connect(const char *zmq_address)
+{
+ struct im_zmq *z;
+
+ z = malloc(sizeof(struct im_zmq));
+ if ( z == NULL ) return NULL;
+
+ z->unpacked_set = 0;
+
+ z->ctx = zmq_ctx_new();
+ if ( z->ctx == NULL ) return NULL;
+
+ z->socket = zmq_socket(z->ctx, ZMQ_REQ);
+ if ( z->socket == NULL ) return NULL;
+
+ STATUS("Connecting to ZMQ at '%s'\n", zmq_address);
+ if ( zmq_connect(z->socket, zmq_address) == -1 ) {
+ ERROR("ZMQ connection failed: %s\n", zmq_strerror(errno));
+ return NULL;
+ }
+ STATUS("ZMQ connected.\n");
+
+ return z;
+}
+
+
+msgpack_object *im_zmq_fetch(struct im_zmq *z)
+{
+ int msg_size;
+ int r;
+
+ if ( zmq_send(z->socket, "m", 1, 0) == -1 ) {
+ ERROR("ZMQ message send failed: %s\n", zmq_strerror(errno));
+ return NULL;
+ }
+
+ zmq_msg_init(&z->msg);
+ msg_size = zmq_msg_recv(&z->msg, z->socket, 0);
+ if ( msg_size == -1 ) {
+ ERROR("ZMQ recieve failed: %s\n", zmq_strerror(errno));
+ zmq_msg_close(&z->msg);
+ return NULL;
+ }
+
+ msgpack_unpacked_init(&z->unpacked);
+ r = msgpack_unpack_next(&z->unpacked, zmq_msg_data(&z->msg),
+ msg_size, NULL);
+ if ( r != MSGPACK_UNPACK_SUCCESS ) {
+ ERROR("Msgpack unpack failed: %i\n", r);
+ zmq_msg_close(&z->msg);
+ return NULL;
+ }
+ z->unpacked_set = 1;
+
+ return &z->unpacked.data;
+}
+
+
+/* Clean structures ready for next frame */
+void im_zmq_clean(struct im_zmq *z)
+{
+ if ( z->unpacked_set ) {
+ msgpack_unpacked_destroy(&z->unpacked);
+ zmq_msg_close(&z->msg);
+ z->unpacked_set = 0;
+ }
+}
+
+
+void im_zmq_shutdown(struct im_zmq *z)
+{
+ if ( z == NULL ) return;
+
+ zmq_msg_close(&z->msg);
+ zmq_close(z->socket);
+ zmq_ctx_destroy(z->ctx);
+}
+
+
+static msgpack_object *find_msgpack_kv(msgpack_object *obj, const char *key)
+{
+ int i;
+
+ if ( obj == NULL ) return NULL;
+ if ( obj->type != MSGPACK_OBJECT_MAP ) return NULL;
+
+ for ( i=0; i<obj->via.map.size; i++ ) {
+ const char *kstr;
+ size_t klen;
+ assert(obj->via.map.ptr[i].key.type == MSGPACK_OBJECT_STR);
+ kstr = obj->via.map.ptr[i].key.via.str.ptr;
+ klen = obj->via.map.ptr[i].key.via.str.size;
+ if ( strncmp(kstr, key, klen) == 0 ) {
+ return &obj->via.map.ptr[i].val;
+ }
+ }
+ return NULL;
+}
+
+
+/**
+ * get_peaks_msgpack:
+ * @obj: A %msgpack_object containing data in OnDA format
+ * @image: An %image structure
+ * @half_pixel_shift: Non-zero if 0.5 should be added to all peak coordinates
+ *
+ * Get peaks from msgpack_object. The data should be in a map, with the value
+ * given by "peak_list" as an array of arrays. The first of these should contain
+ * the list of fs positions of the peaks, the second the ss positions, and the
+ * third the intensities of the peaks.
+ *
+ * http://c.msgpack.org/c/ provides documentation on msgpack objects
+ *
+ * CrystFEL considers all peak locations to be distances from the corner of the
+ * detector panel, in pixel units, consistent with its description of detector
+ * geometry (see 'man crystfel_geometry'). The software which generates the
+ * CXI files, including Cheetah, may instead consider the peak locations to be
+ * pixel indices in the data array. In this case, the peak coordinates should
+ * have 0.5 added to them. This will be done if @half_pixel_shift is non-zero.
+ *
+ * Returns: non-zero on error, zero otherwise.
+ *
+ */
+int get_peaks_msgpack(msgpack_object *obj, struct image *image,
+ int half_pixel_shift)
+{
+
+ int num_peaks;
+ int pk;
+ msgpack_object *peak_list;
+ msgpack_object *peak_x;
+ msgpack_object *peak_y;
+ msgpack_object *peak_i;
+ double peak_offset = half_pixel_shift ? 0.5 : 0.0;
+
+ if ( obj == NULL ) {
+ ERROR("No MessagePack object to get peaks from.\n");
+ return 1;
+ }
+
+ /* Object has structure:
+ * {
+ * "peak_list": [[peak_x], [peak_y], [peak_i]]
+ * "key2":val2,
+ * ...
+ * }
+ */
+ peak_list = find_msgpack_kv(obj, "peak_list");
+ peak_x = &peak_list->via.array.ptr[0];
+ peak_y = &peak_list->via.array.ptr[1];
+ peak_i = &peak_list->via.array.ptr[2];
+
+ /* Length of peak_x array gives number of peaks */
+ num_peaks = peak_x->via.array.size;
+
+ if ( image->features != NULL ) {
+ image_feature_list_free(image->features);
+ }
+ image->features = image_feature_list_new();
+ image->num_peaks = num_peaks;
+
+ for ( pk=0; pk<num_peaks; pk++ ) {
+
+ float fs, ss, val;
+ struct panel *p;
+
+ /* Retrieve data from peak_list and apply half_pixel_shift,
+ * if appropriate */
+ fs = peak_x->via.array.ptr[pk].via.f64 + peak_offset;
+ ss = peak_y->via.array.ptr[pk].via.f64 + peak_offset;
+ val = peak_i->via.array.ptr[pk].via.f64;
+
+ p = find_orig_panel(image->det, fs, ss);
+ if ( p == NULL ) continue;
+ if ( p->no_index ) continue;
+
+ /* Convert coordinates to panel-relative */
+ fs = fs - p->orig_min_fs;
+ ss = ss - p->orig_min_ss;
+
+ image_add_feature(image->features, fs, ss, p, image, val, NULL);
+ }
+
+ return 0;
+}
+
+
+static void im_zmq_fill_in_clen(struct detector *det)
+{
+ int i = 0;
+ for ( i=0; i<det->n_panels; i++) {
+ struct panel *p = &det->panels[i];
+ if ( p->clen_from != NULL ) {
+ ERROR("Can't get clen over ZMQ yet.\n");
+ }
+ adjust_centering_for_rail(p);
+ }
+}
+
+
+static void im_zmq_fill_in_beam_parameters(struct beam_params *beam,
+ struct image *image)
+{
+ double eV;
+ if ( beam->photon_energy_from == NULL ) {
+ /* Explicit value given */
+ eV = beam->photon_energy;
+ } else {
+ ERROR("Can't get photon energy over ZMQ yet.\n");
+ eV = 0.0;
+ }
+ image->lambda = ph_en_to_lambda(eV_to_J(eV))*beam->photon_energy_scale;
+}
+
+
+static int unpack_slab(struct image *image, double *data,
+ int data_width, int data_height)
+{
+ uint16_t *flags = NULL;
+ float *sat = NULL;
+ int pi;
+
+ image->dp = malloc(image->det->n_panels*sizeof(float *));
+ image->bad = malloc(image->det->n_panels*sizeof(int *));
+ image->sat = malloc(image->det->n_panels*sizeof(float *));
+ if ( (image->dp == NULL) || (image->bad == NULL) || (image->sat == NULL) ) {
+ ERROR("Failed to allocate data arrays.\n");
+ return 1;
+ }
+
+ for ( pi=0; pi<image->det->n_panels; pi++ ) {
+
+ struct panel *p;
+ int fs, ss;
+
+ p = &image->det->panels[pi];
+ image->dp[pi] = malloc(p->w*p->h*sizeof(float));
+ image->bad[pi] = malloc(p->w*p->h*sizeof(int));
+ image->sat[pi] = malloc(p->w*p->h*sizeof(float));
+ if ( (image->dp[pi] == NULL) || (image->bad[pi] == NULL)
+ || (image->sat[pi] == NULL) )
+ {
+ ERROR("Failed to allocate panel\n");
+ return 1;
+ }
+
+ if ( (p->orig_min_fs + p->w > data_width)
+ || (p->orig_min_ss + p->h > data_height) )
+ {
+ ERROR("Panel %s is outside range of data provided\n",
+ p->name);
+ return 1;
+ }
+
+ for ( ss=0; ss<p->h; ss++) {
+ for ( fs=0; fs<p->w; fs++) {
+
+ int idx;
+ int cfs, css;
+ int bad = 0;
+
+ cfs = fs+p->orig_min_fs;
+ css = ss+p->orig_min_ss;
+ idx = cfs + css*data_width;
+
+ image->dp[pi][fs+p->w*ss] = data[idx];
+
+ if ( sat != NULL ) {
+ image->sat[pi][fs+p->w*ss] = sat[idx];
+ } else {
+ image->sat[pi][fs+p->w*ss] = INFINITY;
+ }
+
+ if ( p->no_index ) bad = 1;
+
+ if ( in_bad_region(image->det, p, cfs, css) ) {
+ bad = 1;
+ }
+
+ if ( isnan(data[idx]) || isinf(data[idx]) ) bad = 1;
+
+ if ( flags != NULL ) {
+
+ int f;
+
+ f = flags[idx];
+
+ if ( (f & image->det->mask_good)
+ != image->det->mask_good ) bad = 1;
+
+ if ( f & image->det->mask_bad ) bad = 1;
+
+ }
+ image->bad[pi][fs+p->w*ss] = bad;
+ }
+ }
+
+ }
+
+ return 0;
+}
+
+
+static double *find_msgpack_data(msgpack_object *obj, int *width, int *height)
+{
+ msgpack_object *corr_data_obj;
+ msgpack_object *data_obj;
+ msgpack_object *shape_obj;
+ double *data;
+
+ corr_data_obj = find_msgpack_kv(obj, "corr_data");
+ if ( corr_data_obj == NULL ) {
+ ERROR("No corr_data MessagePack object found.\n");
+ return NULL;
+ }
+
+ data_obj = find_msgpack_kv(corr_data_obj, "data");
+ if ( data_obj == NULL ) {
+ ERROR("No data MessagePack object found inside corr_data.\n");
+ return NULL;
+ }
+ if ( data_obj->type != MSGPACK_OBJECT_STR ) {
+ ERROR("corr_data.data isn't a binary object.\n");
+ return NULL;
+ }
+ data = (double *)data_obj->via.str.ptr;
+
+ shape_obj = find_msgpack_kv(corr_data_obj, "shape");
+ if ( shape_obj == NULL ) {
+ ERROR("No shape MessagePack object found inside corr_data.\n");
+ return NULL;
+ }
+ if ( shape_obj->type != MSGPACK_OBJECT_ARRAY ) {
+ ERROR("corr_data.shape isn't an array object.\n");
+ return NULL;
+ }
+ if ( shape_obj->via.array.size != 2 ) {
+ ERROR("corr_data.shape is wrong size (%i, should be 2)\n",
+ shape_obj->via.array.size);
+ return NULL;
+ }
+ if ( shape_obj->via.array.ptr[0].type != MSGPACK_OBJECT_POSITIVE_INTEGER ) {
+ ERROR("corr_data.shape contains wrong type of element.\n");
+ return NULL;
+ }
+ *height = shape_obj->via.array.ptr[0].via.i64;
+ *width = shape_obj->via.array.ptr[1].via.i64;
+ return data;
+}
+
+
+static double *zero_array(struct detector *det, int *dw, int *dh)
+{
+ int max_fs = 0;
+ int max_ss = 0;
+ int pi;
+ double *data;
+
+ for ( pi=0; pi<det->n_panels; pi++ ) {
+ if ( det->panels[pi].orig_max_fs > max_fs ) {
+ max_fs = det->panels[pi].orig_max_fs;
+ }
+ if ( det->panels[pi].orig_max_ss > max_ss ) {
+ max_ss = det->panels[pi].orig_max_ss;
+ }
+ }
+
+ data = calloc((max_fs+1)*(max_ss+1), sizeof(double));
+ *dw = max_fs+1;
+ *dh = max_ss+1;
+ return data;
+}
+
+
+/* Unpacks the raw panel data from a msgpack_object, applies panel geometry,
+ * and stores the resulting data in an image struct. Object has structure
+ * {
+ * "corr_data":
+ * {
+ * "data": binary_data,
+ * "shape": [data_height, data_width],
+ * ...
+ * ...
+ * },
+ * "key2": val2,
+ * ...
+ * ...
+ * }
+ */
+int unpack_msgpack_data(msgpack_object *obj, struct image *image,
+ int no_image_data)
+{
+ int data_width, data_height;
+ double *data;
+
+ if ( image->det == NULL ) {
+ ERROR("Geometry not available.\n");
+ return 1;
+ }
+
+ if ( obj == NULL ) {
+ ERROR("No MessagePack object!\n");
+ return 1;
+ }
+
+ if ( !no_image_data ) {
+ data = find_msgpack_data(obj, &data_width, &data_height);
+ if ( data == NULL ) {
+ ERROR("No image data in MessagePack object.\n");
+ return 1;
+ }
+ } else {
+ data = zero_array(image->det, &data_width, &data_height);
+ }
+
+ if ( unpack_slab(image, data, data_width, data_height) ) {
+ ERROR("Failed to unpack data slab.\n");
+ return 1;
+ }
+
+ if ( image->beam != NULL ) {
+ im_zmq_fill_in_beam_parameters(image->beam, image);
+ if ( image->lambda > 1000 ) {
+ ERROR("Warning: Missing or nonsensical wavelength "
+ "(%e m).\n", image->lambda);
+ }
+ }
+ im_zmq_fill_in_clen(image->det);
+ fill_in_adu(image);
+
+ return 0;
+}
diff --git a/src/im-zmq.h b/src/im-zmq.h
new file mode 100644
index 00000000..a7cbbfe9
--- /dev/null
+++ b/src/im-zmq.h
@@ -0,0 +1,77 @@
+/*
+ * zmq.h
+ *
+ * ZMQ data interface
+ *
+ * Copyright © 2017-2019 Deutsches Elektronen-Synchrotron DESY,
+ * a research centre of the Helmholtz Association.
+ *
+ * Authors:
+ * 2018-2019 Thomas White <taw@physics.org>
+ * 2014 Valerio Mariani
+ * 2017 Stijn de Graaf
+ *
+ * This file is part of CrystFEL.
+ *
+ * CrystFEL is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * CrystFEL is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with CrystFEL. If not, see <http://www.gnu.org/licenses/>.
+ *
+ */
+
+
+#ifndef CRYSTFEL_ZMQ_H
+#define CRYSTFEL_ZMQ_H
+
+#ifdef HAVE_CONFIG_H
+#include <config.h>
+#endif
+
+#include "image.h"
+
+#if defined(HAVE_MSGPACK) && defined(HAVE_ZMQ)
+
+#include <msgpack.h>
+
+extern struct im_zmq *im_zmq_connect(const char *zmq_address);
+
+extern void im_zmq_clean(struct im_zmq *z);
+
+extern void im_zmq_shutdown(struct im_zmq *z);
+
+extern msgpack_object *im_zmq_fetch(struct im_zmq *z);
+
+extern int get_peaks_msgpack(msgpack_object *obj, struct image *image,
+ int half_pixel_shift);
+
+extern int unpack_msgpack_data(msgpack_object *obj, struct image *image,
+ int no_image_data);
+
+#else /* defined(HAVE_MSGPACK) && defined(HAVE_ZMQ) */
+
+static UNUSED struct im_zmq *im_zmq_connect(const char *zmq_address) { return NULL; }
+
+static UNUSED void im_zmq_clean(struct im_zmq *z) { return; }
+
+static UNUSED void im_zmq_shutdown(struct im_zmq *z) { return; }
+
+static UNUSED void *im_zmq_fetch(struct im_zmq *z) { return NULL; }
+
+static UNUSED int get_peaks_msgpack(void *obj, struct image *image,
+ int half_pixel_shift) { return 0; }
+
+static UNUSED int unpack_msgpack_data(void *obj, struct image *image,
+ int no_image_data) { return 1; }
+
+#endif /* defined(HAVE_MSGPACK) && defined(HAVE_ZMQ) */
+
+#endif /* CRYSTFEL_ZMQ_H */
diff --git a/src/indexamajig.c b/src/indexamajig.c
index e4568966..739b4969 100644
--- a/src/indexamajig.c
+++ b/src/indexamajig.c
@@ -86,9 +86,11 @@ static void show_help(const char *s)
" --profile Show timing data for performance monitoring\n"
" --temp-dir=<path> Put the temporary folder under <path>\n"
" --wait-for-file=<n> Time to wait for each file before processing\n"
+" --zmq-msgpack Receive data in MessagePack format over ZMQ\n"
+" --no-image-data Do not load image data (from ZMQ)\n"
"\nPeak search options:\n\n"
-" --peaks=<method> Peak search method (zaef,peakfinder8,peakfinder9,hdf5,cxi)\n"
-" Default: zaef\n"
+" --peaks=<method> Peak search method. Default: zaef\n"
+" (zaef,peakfinder8,peakfinder9,hdf5,cxi,msgpack)\n"
" --peak-radius=<r> Integration radii for peak search\n"
" --min-peaks=<n> Minimum number of peaks for indexing\n"
" --hdf5-peaks=<p> Find peaks table in HDF5 file here\n"
@@ -287,6 +289,8 @@ int main(int argc, char *argv[])
int if_retry = 1;
int serial_start = 1;
char *spectrum_fn = NULL;
+ int zmq = 0;
+ char *zmq_address = NULL;
/* Defaults */
iargs.cell = NULL;
@@ -344,6 +348,7 @@ int main(int argc, char *argv[])
iargs.fix_bandwidth = -1.0;
iargs.fix_divergence = -1.0;
iargs.profile = 0;
+ iargs.no_image_data = 0;
iargs.taketwo_opts.member_thresh = -1;
iargs.taketwo_opts.len_tol = -1.0;
iargs.taketwo_opts.angle_tol = -1.0;
@@ -405,6 +410,8 @@ int main(int argc, char *argv[])
{"no-multi", 0, &if_multi, 0},
{"multi", 0, &if_multi, 1},
{"overpredict", 0, &iargs.overpredict, 1},
+ {"zmq-msgpack", 0, &zmq, 1},
+ {"no-image-data", 0, &iargs.no_image_data, 1},
/* Long-only options which don't actually do anything */
{"no-sat-corr", 0, &iargs.satcorr, 0},
@@ -975,6 +982,8 @@ int main(int argc, char *argv[])
iargs.peaks = PEAK_CXI;
} else if ( strcmp(speaks, "peakfinder9") == 0 ) {
iargs.peaks = PEAK_PEAKFINDER9;
+ } else if ( strcmp(speaks, "msgpack") == 0 ) {
+ iargs.peaks = PEAK_MSGPACK;
} else {
ERROR("Unrecognised peak detection method '%s'\n", speaks);
return 1;
@@ -1269,8 +1278,22 @@ int main(int argc, char *argv[])
gsl_set_error_handler_off();
+ if ( zmq ) {
+ char line[1024];
+ char *rval;
+ rval = fgets(line, 1024, fh);
+ if ( rval == NULL ) {
+ ERROR("Failed to read ZMQ server/port from input.\n");
+ return 1;
+ }
+ chomp(line);
+ zmq_address = strdup(line);
+ /* In future, read multiple addresses and hand them out
+ * evenly to workers */
+ }
+
r = create_sandbox(&iargs, n_proc, prefix, config_basename, fh,
- st, tmpdir, serial_start);
+ st, tmpdir, serial_start, zmq_address);
free_imagefile_field_list(iargs.copyme);
cell_free(iargs.cell);
diff --git a/src/process_image.c b/src/process_image.c
index 2f2eb698..a9bf1cdb 100644
--- a/src/process_image.c
+++ b/src/process_image.c
@@ -3,12 +3,13 @@
*
* The processing pipeline for one image
*
- * Copyright © 2012-2018 Deutsches Elektronen-Synchrotron DESY,
+ * Copyright © 2012-2019 Deutsches Elektronen-Synchrotron DESY,
* a research centre of the Helmholtz Association.
*
* Authors:
- * 2010-2017 Thomas White <taw@physics.org>
+ * 2010-2019 Thomas White <taw@physics.org>
* 2014-2017 Valerio Mariani <valerio.mariani@desy.de>
+ * 2017 Stijn de Graaf
*
* This file is part of CrystFEL.
*
@@ -54,6 +55,7 @@
#include "predict-refine.h"
#include "im-sandbox.h"
#include "time-accounts.h"
+#include "im-zmq.h"
static float **backup_image_data(float **dp, struct detector *det)
@@ -98,34 +100,16 @@ static void restore_image_data(float **dp, struct detector *det, float **bu)
}
-void process_image(const struct index_args *iargs, struct pattern_args *pargs,
- Stream *st, int cookie, const char *tmpdir,
- int serial, struct sb_shm *sb_shared, TimeAccounts *taccs,
- char *last_task)
+static int file_wait_open_read(struct sb_shm *sb_shared, struct image *image,
+ TimeAccounts *taccs, char *last_task,
+ signed int wait_for_file, int cookie,
+ struct imagefile **pimfile)
{
- struct imagefile *imfile;
- struct image image;
- int i;
- int r;
- int ret;
- char *rn;
- float **prefilter;
- int any_crystals;
+ signed int file_wait_time = wait_for_file;
int wait_message_done = 0;
int read_retry_done = 0;
- signed int file_wait_time = iargs->wait_for_file;
-
- image.features = NULL;
- image.copyme = iargs->copyme;
- image.id = cookie;
- image.filename = pargs->filename_p_e->filename;
- image.event = pargs->filename_p_e->ev;
- image.beam = iargs->beam;
- image.det = copy_geom(iargs->det);
- image.crystals = NULL;
- image.n_crystals = 0;
- image.serial = serial;
- image.indexed_by = INDEXING_NONE;
+ int r;
+ struct imagefile *imfile;
time_accounts_set(taccs, TACC_WAITFILE);
set_last_task(last_task, "wait for file");
@@ -135,29 +119,27 @@ void process_image(const struct index_args *iargs, struct pattern_args *pargs,
struct stat statbuf;
sb_shared->pings[cookie]++;
- r = stat(image.filename, &statbuf);
+ r = stat(image->filename, &statbuf);
if ( r ) {
- if ( (iargs->wait_for_file != 0)
- && (file_wait_time != 0) )
- {
+ if ( (wait_for_file != 0) && (file_wait_time != 0) ) {
if ( !wait_message_done ) {
STATUS("Waiting for '%s'\n",
- image.filename);
+ image->filename);
wait_message_done = 1;
}
sleep(1);
- if ( iargs->wait_for_file != -1 ) {
+ if ( wait_for_file != -1 ) {
file_wait_time--;
}
continue;
}
- ERROR("File %s not found\n", image.filename);
- return;
+ ERROR("File %s not found\n", image->filename);
+ return 1;
}
} while ( r );
@@ -167,42 +149,83 @@ void process_image(const struct index_args *iargs, struct pattern_args *pargs,
sb_shared->pings[cookie]++;
do {
- imfile = imagefile_open(image.filename);
+ imfile = imagefile_open(image->filename);
if ( imfile == NULL ) {
- if ( iargs->wait_for_file && !read_retry_done ) {
+ if ( wait_for_file && !read_retry_done ) {
read_retry_done = 1;
r = 1;
STATUS("File '%s' exists but could not be opened."
" Trying again after 10 seconds.\n",
- image.filename);
+ image->filename);
sleep(10);
continue;
}
- ERROR("Couldn't open file: %s\n", image.filename);
- return;
+ ERROR("Couldn't open file: %s\n", image->filename);
+ return 1;
}
time_accounts_set(taccs, TACC_HDF5READ);
set_last_task(last_task, "read file");
sb_shared->pings[cookie]++;
- r = imagefile_read(imfile, &image, image.event);
+ r = imagefile_read(imfile, image, image->event);
if ( r ) {
- if ( iargs->wait_for_file && !read_retry_done ) {
+ if ( wait_for_file && !read_retry_done ) {
read_retry_done = 1;
imagefile_close(imfile);
STATUS("File '%s' exists but could not be read."
" Trying again after 10 seconds.\n",
- image.filename);
+ image->filename);
sleep(10);
continue;
}
- ERROR("Couldn't open file: %s\n", image.filename);
- return;
+ ERROR("Couldn't open file: %s\n", image->filename);
+ return 1;
}
} while ( r );
+ *pimfile = imfile;
+ return 0;
+}
+
+
+void process_image(const struct index_args *iargs, struct pattern_args *pargs,
+ Stream *st, int cookie, const char *tmpdir,
+ int serial, struct sb_shm *sb_shared, TimeAccounts *taccs,
+ char *last_task)
+{
+ struct imagefile *imfile = NULL;
+ struct image image;
+ int i;
+ int r;
+ int ret;
+ char *rn;
+ float **prefilter;
+ int any_crystals;
+
+ image.features = NULL;
+ image.copyme = iargs->copyme;
+ image.id = cookie;
+ image.beam = iargs->beam;
+ image.det = copy_geom(iargs->det);
+ image.crystals = NULL;
+ image.n_crystals = 0;
+ image.serial = serial;
+ image.indexed_by = INDEXING_NONE;
+
+ image.filename = pargs->filename_p_e->filename;
+ image.event = pargs->filename_p_e->ev;
+ if ( pargs->msgpack_obj != NULL ) {
+ set_last_task(last_task, "unpacking messagepack object");
+ if ( unpack_msgpack_data(pargs->msgpack_obj, &image,
+ iargs->no_image_data) ) return;
+ } else {
+ if ( file_wait_open_read(sb_shared, &image, taccs, last_task,
+ iargs->wait_for_file, cookie,
+ &imfile) ) return;
+ }
+
/* Take snapshot of image before applying horrible noise filters */
time_accounts_set(taccs, TACC_FILTER);
set_last_task(last_task, "image filter");
@@ -315,6 +338,11 @@ void process_image(const struct index_args *iargs, struct pattern_args *pargs,
}
break;
+ case PEAK_MSGPACK:
+ get_peaks_msgpack(pargs->msgpack_obj, &image,
+ iargs->half_pixel_shift);
+ break;
+
}
image.peak_resolution = estimate_peak_resolution(image.features,
@@ -473,5 +501,6 @@ out:
image_feature_list_free(image.features);
free_detector_geometry(image.det);
- imagefile_close(imfile);
+ if ( imfile != NULL ) imagefile_close(imfile);
+ set_last_task(last_task, "sandbox");
}
diff --git a/src/process_image.h b/src/process_image.h
index 5939ae4d..8d6682a6 100644
--- a/src/process_image.h
+++ b/src/process_image.h
@@ -3,11 +3,11 @@
*
* The processing pipeline for one image
*
- * Copyright © 2012-2018 Deutsches Elektronen-Synchrotron DESY,
+ * Copyright © 2012-2019 Deutsches Elektronen-Synchrotron DESY,
* a research centre of the Helmholtz Association.
*
* Authors:
- * 2010-2016 Thomas White <taw@physics.org>
+ * 2010-2019 Thomas White <taw@physics.org>
* 2014-2017 Valerio Mariani <valerio.mariani@desy.de>
* 2017-2018 Yaroslav Gevorkov <yaroslav.gevorkov@desy.de>
*
@@ -37,6 +37,10 @@
struct index_args;
+#ifdef HAVE_MSGPACK
+#include <msgpack.h>
+#endif
+
#include "integration.h"
#include "im-sandbox.h"
#include "time-accounts.h"
@@ -51,6 +55,7 @@ enum {
PEAK_ZAEF,
PEAK_HDF5,
PEAK_CXI,
+ PEAK_MSGPACK,
};
@@ -114,6 +119,7 @@ struct index_args
struct felix_options felix_opts;
Spectrum *spectrum;
signed int wait_for_file; /* -1 means wait forever */
+ int no_image_data;
};
@@ -122,6 +128,11 @@ struct pattern_args
{
/* "Input" */
struct filename_plus_event *filename_p_e;
+#ifdef HAVE_MSGPACK
+ msgpack_object *msgpack_obj;
+#else
+ void *msgpack_obj;
+#endif
};