diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/im-sandbox.c | 150 | ||||
-rw-r--r-- | src/im-sandbox.h | 7 | ||||
-rw-r--r-- | src/im-zmq.c | 491 | ||||
-rw-r--r-- | src/im-zmq.h | 77 | ||||
-rw-r--r-- | src/indexamajig.c | 29 | ||||
-rw-r--r-- | src/process_image.c | 121 | ||||
-rw-r--r-- | src/process_image.h | 15 |
7 files changed, 786 insertions, 104 deletions
diff --git a/src/im-sandbox.c b/src/im-sandbox.c index e8fbe763..0ec20ea2 100644 --- a/src/im-sandbox.c +++ b/src/im-sandbox.c @@ -3,17 +3,18 @@ * * Sandbox for indexing * - * Copyright © 2012-2018 Deutsches Elektronen-Synchrotron DESY, + * Copyright © 2012-2019 Deutsches Elektronen-Synchrotron DESY, * a research centre of the Helmholtz Association. * Copyright © 2012 Richard Kirian * Copyright © 2012 Lorenzo Galli * * Authors: - * 2010-2018 Thomas White <taw@physics.org> + * 2010-2019 Thomas White <taw@physics.org> * 2014 Valerio Mariani * 2011 Richard Kirian * 2012 Lorenzo Galli * 2012 Chunhong Yoon + * 2017 Stijn de Graaf * * This file is part of CrystFEL. * @@ -64,6 +65,7 @@ #include "im-sandbox.h" #include "process_image.h" #include "time-accounts.h" +#include "im-zmq.h" struct sandbox @@ -92,6 +94,10 @@ struct sandbox const char *tmpdir; + /* ZMQ mode */ + int zmq; + const char *zmq_address; + /* Final output */ Stream *stream; }; @@ -360,11 +366,23 @@ void set_last_task(char *lt, const char *task) } -static void run_work(const struct index_args *iargs, Stream *st, - int cookie, const char *tmpdir, struct sandbox *sb) +static int run_work(const struct index_args *iargs, Stream *st, + int cookie, const char *tmpdir, struct sandbox *sb) { int allDone = 0; - TimeAccounts *taccs = time_accounts_init(); + TimeAccounts *taccs; + struct im_zmq *zmqstuff = NULL; + + /* Connect via ZMQ */ + if ( sb->zmq ) { + zmqstuff = im_zmq_connect(sb->zmq_address); + if ( zmqstuff == NULL ) { + ERROR("ZMQ setup failed.\n"); + return 1; + } + } + + taccs = time_accounts_init(); while ( !allDone ) { @@ -375,58 +393,71 @@ static void run_work(const struct index_args *iargs, Stream *st, struct event *ev; int r; - /* Wait until an event is ready */ - time_accounts_set(taccs, TACC_EVENTWAIT); - set_last_task(sb->shared->last_task[cookie], "wait_event"); - if ( sem_wait(sb->queue_sem) != 0 ) { - ERROR("Failed to wait on queue semaphore: %s\n", - strerror(errno)); - } + if ( !sb->zmq ) { - /* Get the event from the queue */ - set_last_task(sb->shared->last_task[cookie], "read_queue"); - pthread_mutex_lock(&sb->shared->queue_lock); - if ( (sb->shared->n_events==0) && (sb->shared->no_more) ) { - /* Queue is empty and no more coming, so exit */ - pthread_mutex_unlock(&sb->shared->queue_lock); - allDone = 1; - continue; - } - if ( sb->shared->n_events == 0 ) { - ERROR("Got the semaphore, but no events in queue!\n"); - ERROR("no_more = %i\n", sb->shared->no_more); + /* Wait until an event is ready */ + time_accounts_set(taccs, TACC_EVENTWAIT); + set_last_task(sb->shared->last_task[cookie], "wait_event"); + if ( sem_wait(sb->queue_sem) != 0 ) { + ERROR("Failed to wait on queue semaphore: %s\n", + strerror(errno)); + } + + /* Get the event from the queue */ + set_last_task(sb->shared->last_task[cookie], "read_queue"); + pthread_mutex_lock(&sb->shared->queue_lock); + if ( (sb->shared->n_events==0) && (sb->shared->no_more) ) { + /* Queue is empty and no more coming, so exit */ + pthread_mutex_unlock(&sb->shared->queue_lock); + allDone = 1; + continue; + } + if ( sb->shared->n_events == 0 ) { + ERROR("Got the semaphore, but no events in queue!\n"); + ERROR("no_more = %i\n", sb->shared->no_more); + pthread_mutex_unlock(&sb->shared->queue_lock); + allDone = 1; + continue; + } + r = sscanf(sb->shared->queue[0], "%s %s %i", + filename, event_str, &ser); + if ( r != 3 ) { + STATUS("Invalid event string '%s'\n", + sb->shared->queue[0]); + } + memcpy(sb->shared->last_ev[cookie], sb->shared->queue[0], + MAX_EV_LEN); + shuffle_events(sb->shared); pthread_mutex_unlock(&sb->shared->queue_lock); - allDone = 1; - continue; - } - r = sscanf(sb->shared->queue[0], "%s %s %i", - filename, event_str, &ser); - if ( r != 3 ) { - STATUS("Invalid event string '%s'\n", - sb->shared->queue[0]); - } - memcpy(sb->shared->last_ev[cookie], sb->shared->queue[0], - MAX_EV_LEN); - shuffle_events(sb->shared); - pthread_mutex_unlock(&sb->shared->queue_lock); - if ( r != 3 ) continue; + if ( r != 3 ) continue; - pargs.filename_p_e = initialize_filename_plus_event(); - pargs.filename_p_e->filename = strdup(filename); + pargs.filename_p_e = initialize_filename_plus_event(); + pargs.filename_p_e->filename = strdup(filename); - if ( strcmp(event_str, "(none)") != 0 ) { + if ( strcmp(event_str, "(none)") != 0 ) { + + ev = get_event_from_event_string(event_str); + if ( ev == NULL ) { + ERROR("Bad event string '%s'\n", event_str); + continue; + } + pargs.filename_p_e->ev = ev; + + } else { + + pargs.filename_p_e->ev = NULL; - ev = get_event_from_event_string(event_str); - if ( ev == NULL ) { - ERROR("Bad event string '%s'\n", event_str); - continue; } - pargs.filename_p_e->ev = ev; + pargs.msgpack_obj = NULL; } else { + pargs.msgpack_obj = im_zmq_fetch(zmqstuff); + pargs.filename_p_e = initialize_filename_plus_event(); + pargs.filename_p_e->filename = strdup("(from ZMQ)"); pargs.filename_p_e->ev = NULL; + ser = 0; /* FIXME */ } @@ -434,10 +465,16 @@ static void run_work(const struct index_args *iargs, Stream *st, process_image(iargs, &pargs, st, cookie, tmpdir, ser, sb->shared, taccs, sb->shared->last_task[cookie]); - free_filename_plus_event(pargs.filename_p_e); + if ( !sb->zmq ) { + free_filename_plus_event(pargs.filename_p_e); + } else { + im_zmq_clean(zmqstuff); + } } + im_zmq_shutdown(zmqstuff); + time_accounts_set(taccs, TACC_FINALCLEANUP); cleanup_indexing(iargs->ipriv); free_detector_geometry(iargs->det); @@ -446,6 +483,7 @@ static void run_work(const struct index_args *iargs, Stream *st, cell_free(iargs->cell); if ( iargs->profile ) time_accounts_print(taccs); time_accounts_free(taccs); + return 0; } @@ -689,7 +727,7 @@ static void start_worker_process(struct sandbox *sb, int slot) */ st = open_stream_fd_for_write(stream_pipe[1]); - run_work(sb->iargs, st, slot, tmp, sb); + r = run_work(sb->iargs, st, slot, tmp, sb); close_stream(st); free(tmp); @@ -699,7 +737,7 @@ static void start_worker_process(struct sandbox *sb, int slot) free(sb); - exit(0); + exit(r); } @@ -804,6 +842,11 @@ static int setup_shm(struct sandbox *sb) /* Assumes the caller is already holding queue_lock! */ static int fill_queue(struct get_pattern_ctx *gpctx, struct sandbox *sb) { + if ( sb->zmq ) { + /* Do nothing */ + return 0; + } + while ( sb->shared->n_events < QUEUE_SIZE ) { struct filename_plus_event *ne; @@ -998,7 +1041,8 @@ char *create_tempdir(const char *temp_location) * If the return value is zero, something is probably wrong. */ int create_sandbox(struct index_args *iargs, int n_proc, char *prefix, int config_basename, FILE *fh, - Stream *stream, const char *tmpdir, int serial_start) + Stream *stream, const char *tmpdir, int serial_start, + const char *zmq_address) { int i; struct sandbox *sb; @@ -1027,6 +1071,12 @@ int create_sandbox(struct index_args *iargs, int n_proc, char *prefix, sb->iargs = iargs; sb->serial = serial_start; sb->tmpdir = tmpdir; + if ( zmq_address != NULL ) { + sb->zmq = 1; + sb->zmq_address = zmq_address; + } else { + sb->zmq = 0; + } sb->fds = NULL; sb->fhs = NULL; diff --git a/src/im-sandbox.h b/src/im-sandbox.h index ee2de993..9da11526 100644 --- a/src/im-sandbox.h +++ b/src/im-sandbox.h @@ -3,13 +3,13 @@ * * Sandbox for indexing * - * Copyright © 2012-2018 Deutsches Elektronen-Synchrotron DESY, + * Copyright © 2012-2019 Deutsches Elektronen-Synchrotron DESY, * a research centre of the Helmholtz Association. * Copyright © 2012 Richard Kirian * Copyright © 2012 Lorenzo Galli * * Authors: - * 2010-2018 Thomas White <taw@physics.org> + * 2010-2019 Thomas White <taw@physics.org> * 2011 Richard Kirian * 2012 Lorenzo Galli * 2012 Chunhong Yoon @@ -83,6 +83,7 @@ extern void set_last_task(char *lt, const char *task); extern int create_sandbox(struct index_args *iargs, int n_proc, char *prefix, int config_basename, FILE *fh, Stream *stream, - const char *tempdir, int serial_start); + const char *tempdir, int serial_start, + const char *zmq_address); #endif /* IM_SANDBOX_H */ diff --git a/src/im-zmq.c b/src/im-zmq.c new file mode 100644 index 00000000..7e8bd9ac --- /dev/null +++ b/src/im-zmq.c @@ -0,0 +1,491 @@ +/* + * zmq.c + * + * ZMQ data interface + * + * Copyright © 2017-2018 Deutsches Elektronen-Synchrotron DESY, + * a research centre of the Helmholtz Association. + * + * Authors: + * 2018 Thomas White <taw@physics.org> + * 2014 Valerio Mariani + * 2017 Stijn de Graaf + * + * This file is part of CrystFEL. + * + * CrystFEL is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * CrystFEL is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with CrystFEL. If not, see <http://www.gnu.org/licenses/>. + * + */ + +#ifdef HAVE_CONFIG_H +#include <config.h> +#endif + +#include <stdlib.h> +#include <stdio.h> +#include <stdint.h> +#include <hdf5.h> +#include <assert.h> +#include <unistd.h> +#include <zmq.h> +#include <msgpack.h> + +#include "events.h" +#include "image.h" +#include "hdf5-file.h" +#include "utils.h" + + +struct im_zmq +{ + void *ctx; + void *socket; + zmq_msg_t msg; + msgpack_unpacked unpacked; + int unpacked_set; +}; + + +struct im_zmq *im_zmq_connect(const char *zmq_address) +{ + struct im_zmq *z; + + z = malloc(sizeof(struct im_zmq)); + if ( z == NULL ) return NULL; + + z->unpacked_set = 0; + + z->ctx = zmq_ctx_new(); + if ( z->ctx == NULL ) return NULL; + + z->socket = zmq_socket(z->ctx, ZMQ_REQ); + if ( z->socket == NULL ) return NULL; + + STATUS("Connecting to ZMQ at '%s'\n", zmq_address); + if ( zmq_connect(z->socket, zmq_address) == -1 ) { + ERROR("ZMQ connection failed: %s\n", zmq_strerror(errno)); + return NULL; + } + STATUS("ZMQ connected.\n"); + + return z; +} + + +msgpack_object *im_zmq_fetch(struct im_zmq *z) +{ + int msg_size; + int r; + + if ( zmq_send(z->socket, "m", 1, 0) == -1 ) { + ERROR("ZMQ message send failed: %s\n", zmq_strerror(errno)); + return NULL; + } + + zmq_msg_init(&z->msg); + msg_size = zmq_msg_recv(&z->msg, z->socket, 0); + if ( msg_size == -1 ) { + ERROR("ZMQ recieve failed: %s\n", zmq_strerror(errno)); + zmq_msg_close(&z->msg); + return NULL; + } + + msgpack_unpacked_init(&z->unpacked); + r = msgpack_unpack_next(&z->unpacked, zmq_msg_data(&z->msg), + msg_size, NULL); + if ( r != MSGPACK_UNPACK_SUCCESS ) { + ERROR("Msgpack unpack failed: %i\n", r); + zmq_msg_close(&z->msg); + return NULL; + } + z->unpacked_set = 1; + + return &z->unpacked.data; +} + + +/* Clean structures ready for next frame */ +void im_zmq_clean(struct im_zmq *z) +{ + if ( z->unpacked_set ) { + msgpack_unpacked_destroy(&z->unpacked); + zmq_msg_close(&z->msg); + z->unpacked_set = 0; + } +} + + +void im_zmq_shutdown(struct im_zmq *z) +{ + if ( z == NULL ) return; + + zmq_msg_close(&z->msg); + zmq_close(z->socket); + zmq_ctx_destroy(z->ctx); +} + + +static msgpack_object *find_msgpack_kv(msgpack_object *obj, const char *key) +{ + int i; + + if ( obj == NULL ) return NULL; + if ( obj->type != MSGPACK_OBJECT_MAP ) return NULL; + + for ( i=0; i<obj->via.map.size; i++ ) { + const char *kstr; + size_t klen; + assert(obj->via.map.ptr[i].key.type == MSGPACK_OBJECT_STR); + kstr = obj->via.map.ptr[i].key.via.str.ptr; + klen = obj->via.map.ptr[i].key.via.str.size; + if ( strncmp(kstr, key, klen) == 0 ) { + return &obj->via.map.ptr[i].val; + } + } + return NULL; +} + + +/** + * get_peaks_msgpack: + * @obj: A %msgpack_object containing data in OnDA format + * @image: An %image structure + * @half_pixel_shift: Non-zero if 0.5 should be added to all peak coordinates + * + * Get peaks from msgpack_object. The data should be in a map, with the value + * given by "peak_list" as an array of arrays. The first of these should contain + * the list of fs positions of the peaks, the second the ss positions, and the + * third the intensities of the peaks. + * + * http://c.msgpack.org/c/ provides documentation on msgpack objects + * + * CrystFEL considers all peak locations to be distances from the corner of the + * detector panel, in pixel units, consistent with its description of detector + * geometry (see 'man crystfel_geometry'). The software which generates the + * CXI files, including Cheetah, may instead consider the peak locations to be + * pixel indices in the data array. In this case, the peak coordinates should + * have 0.5 added to them. This will be done if @half_pixel_shift is non-zero. + * + * Returns: non-zero on error, zero otherwise. + * + */ +int get_peaks_msgpack(msgpack_object *obj, struct image *image, + int half_pixel_shift) +{ + + int num_peaks; + int pk; + msgpack_object *peak_list; + msgpack_object *peak_x; + msgpack_object *peak_y; + msgpack_object *peak_i; + double peak_offset = half_pixel_shift ? 0.5 : 0.0; + + if ( obj == NULL ) { + ERROR("No MessagePack object to get peaks from.\n"); + return 1; + } + + /* Object has structure: + * { + * "peak_list": [[peak_x], [peak_y], [peak_i]] + * "key2":val2, + * ... + * } + */ + peak_list = find_msgpack_kv(obj, "peak_list"); + peak_x = &peak_list->via.array.ptr[0]; + peak_y = &peak_list->via.array.ptr[1]; + peak_i = &peak_list->via.array.ptr[2]; + + /* Length of peak_x array gives number of peaks */ + num_peaks = peak_x->via.array.size; + + if ( image->features != NULL ) { + image_feature_list_free(image->features); + } + image->features = image_feature_list_new(); + image->num_peaks = num_peaks; + + for ( pk=0; pk<num_peaks; pk++ ) { + + float fs, ss, val; + struct panel *p; + + /* Retrieve data from peak_list and apply half_pixel_shift, + * if appropriate */ + fs = peak_x->via.array.ptr[pk].via.f64 + peak_offset; + ss = peak_y->via.array.ptr[pk].via.f64 + peak_offset; + val = peak_i->via.array.ptr[pk].via.f64; + + p = find_orig_panel(image->det, fs, ss); + if ( p == NULL ) continue; + if ( p->no_index ) continue; + + /* Convert coordinates to panel-relative */ + fs = fs - p->orig_min_fs; + ss = ss - p->orig_min_ss; + + image_add_feature(image->features, fs, ss, p, image, val, NULL); + } + + return 0; +} + + +static void im_zmq_fill_in_clen(struct detector *det) +{ + int i = 0; + for ( i=0; i<det->n_panels; i++) { + struct panel *p = &det->panels[i]; + if ( p->clen_from != NULL ) { + ERROR("Can't get clen over ZMQ yet.\n"); + } + adjust_centering_for_rail(p); + } +} + + +static void im_zmq_fill_in_beam_parameters(struct beam_params *beam, + struct image *image) +{ + double eV; + if ( beam->photon_energy_from == NULL ) { + /* Explicit value given */ + eV = beam->photon_energy; + } else { + ERROR("Can't get photon energy over ZMQ yet.\n"); + eV = 0.0; + } + image->lambda = ph_en_to_lambda(eV_to_J(eV))*beam->photon_energy_scale; +} + + +static int unpack_slab(struct image *image, double *data, + int data_width, int data_height) +{ + uint16_t *flags = NULL; + float *sat = NULL; + int pi; + + image->dp = malloc(image->det->n_panels*sizeof(float *)); + image->bad = malloc(image->det->n_panels*sizeof(int *)); + image->sat = malloc(image->det->n_panels*sizeof(float *)); + if ( (image->dp == NULL) || (image->bad == NULL) || (image->sat == NULL) ) { + ERROR("Failed to allocate data arrays.\n"); + return 1; + } + + for ( pi=0; pi<image->det->n_panels; pi++ ) { + + struct panel *p; + int fs, ss; + + p = &image->det->panels[pi]; + image->dp[pi] = malloc(p->w*p->h*sizeof(float)); + image->bad[pi] = malloc(p->w*p->h*sizeof(int)); + image->sat[pi] = malloc(p->w*p->h*sizeof(float)); + if ( (image->dp[pi] == NULL) || (image->bad[pi] == NULL) + || (image->sat[pi] == NULL) ) + { + ERROR("Failed to allocate panel\n"); + return 1; + } + + if ( (p->orig_min_fs + p->w > data_width) + || (p->orig_min_ss + p->h > data_height) ) + { + ERROR("Panel %s is outside range of data provided\n", + p->name); + return 1; + } + + for ( ss=0; ss<p->h; ss++) { + for ( fs=0; fs<p->w; fs++) { + + int idx; + int cfs, css; + int bad = 0; + + cfs = fs+p->orig_min_fs; + css = ss+p->orig_min_ss; + idx = cfs + css*data_width; + + image->dp[pi][fs+p->w*ss] = data[idx]; + + if ( sat != NULL ) { + image->sat[pi][fs+p->w*ss] = sat[idx]; + } else { + image->sat[pi][fs+p->w*ss] = INFINITY; + } + + if ( p->no_index ) bad = 1; + + if ( in_bad_region(image->det, p, cfs, css) ) { + bad = 1; + } + + if ( isnan(data[idx]) || isinf(data[idx]) ) bad = 1; + + if ( flags != NULL ) { + + int f; + + f = flags[idx]; + + if ( (f & image->det->mask_good) + != image->det->mask_good ) bad = 1; + + if ( f & image->det->mask_bad ) bad = 1; + + } + image->bad[pi][fs+p->w*ss] = bad; + } + } + + } + + return 0; +} + + +static double *find_msgpack_data(msgpack_object *obj, int *width, int *height) +{ + msgpack_object *corr_data_obj; + msgpack_object *data_obj; + msgpack_object *shape_obj; + double *data; + + corr_data_obj = find_msgpack_kv(obj, "corr_data"); + if ( corr_data_obj == NULL ) { + ERROR("No corr_data MessagePack object found.\n"); + return NULL; + } + + data_obj = find_msgpack_kv(corr_data_obj, "data"); + if ( data_obj == NULL ) { + ERROR("No data MessagePack object found inside corr_data.\n"); + return NULL; + } + if ( data_obj->type != MSGPACK_OBJECT_STR ) { + ERROR("corr_data.data isn't a binary object.\n"); + return NULL; + } + data = (double *)data_obj->via.str.ptr; + + shape_obj = find_msgpack_kv(corr_data_obj, "shape"); + if ( shape_obj == NULL ) { + ERROR("No shape MessagePack object found inside corr_data.\n"); + return NULL; + } + if ( shape_obj->type != MSGPACK_OBJECT_ARRAY ) { + ERROR("corr_data.shape isn't an array object.\n"); + return NULL; + } + if ( shape_obj->via.array.size != 2 ) { + ERROR("corr_data.shape is wrong size (%i, should be 2)\n", + shape_obj->via.array.size); + return NULL; + } + if ( shape_obj->via.array.ptr[0].type != MSGPACK_OBJECT_POSITIVE_INTEGER ) { + ERROR("corr_data.shape contains wrong type of element.\n"); + return NULL; + } + *height = shape_obj->via.array.ptr[0].via.i64; + *width = shape_obj->via.array.ptr[1].via.i64; + return data; +} + + +static double *zero_array(struct detector *det, int *dw, int *dh) +{ + int max_fs = 0; + int max_ss = 0; + int pi; + double *data; + + for ( pi=0; pi<det->n_panels; pi++ ) { + if ( det->panels[pi].orig_max_fs > max_fs ) { + max_fs = det->panels[pi].orig_max_fs; + } + if ( det->panels[pi].orig_max_ss > max_ss ) { + max_ss = det->panels[pi].orig_max_ss; + } + } + + data = calloc((max_fs+1)*(max_ss+1), sizeof(double)); + *dw = max_fs+1; + *dh = max_ss+1; + return data; +} + + +/* Unpacks the raw panel data from a msgpack_object, applies panel geometry, + * and stores the resulting data in an image struct. Object has structure + * { + * "corr_data": + * { + * "data": binary_data, + * "shape": [data_height, data_width], + * ... + * ... + * }, + * "key2": val2, + * ... + * ... + * } + */ +int unpack_msgpack_data(msgpack_object *obj, struct image *image, + int no_image_data) +{ + int data_width, data_height; + double *data; + + if ( image->det == NULL ) { + ERROR("Geometry not available.\n"); + return 1; + } + + if ( obj == NULL ) { + ERROR("No MessagePack object!\n"); + return 1; + } + + if ( !no_image_data ) { + data = find_msgpack_data(obj, &data_width, &data_height); + if ( data == NULL ) { + ERROR("No image data in MessagePack object.\n"); + return 1; + } + } else { + data = zero_array(image->det, &data_width, &data_height); + } + + if ( unpack_slab(image, data, data_width, data_height) ) { + ERROR("Failed to unpack data slab.\n"); + return 1; + } + + if ( image->beam != NULL ) { + im_zmq_fill_in_beam_parameters(image->beam, image); + if ( image->lambda > 1000 ) { + ERROR("Warning: Missing or nonsensical wavelength " + "(%e m).\n", image->lambda); + } + } + im_zmq_fill_in_clen(image->det); + fill_in_adu(image); + + return 0; +} diff --git a/src/im-zmq.h b/src/im-zmq.h new file mode 100644 index 00000000..a7cbbfe9 --- /dev/null +++ b/src/im-zmq.h @@ -0,0 +1,77 @@ +/* + * zmq.h + * + * ZMQ data interface + * + * Copyright © 2017-2019 Deutsches Elektronen-Synchrotron DESY, + * a research centre of the Helmholtz Association. + * + * Authors: + * 2018-2019 Thomas White <taw@physics.org> + * 2014 Valerio Mariani + * 2017 Stijn de Graaf + * + * This file is part of CrystFEL. + * + * CrystFEL is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * CrystFEL is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with CrystFEL. If not, see <http://www.gnu.org/licenses/>. + * + */ + + +#ifndef CRYSTFEL_ZMQ_H +#define CRYSTFEL_ZMQ_H + +#ifdef HAVE_CONFIG_H +#include <config.h> +#endif + +#include "image.h" + +#if defined(HAVE_MSGPACK) && defined(HAVE_ZMQ) + +#include <msgpack.h> + +extern struct im_zmq *im_zmq_connect(const char *zmq_address); + +extern void im_zmq_clean(struct im_zmq *z); + +extern void im_zmq_shutdown(struct im_zmq *z); + +extern msgpack_object *im_zmq_fetch(struct im_zmq *z); + +extern int get_peaks_msgpack(msgpack_object *obj, struct image *image, + int half_pixel_shift); + +extern int unpack_msgpack_data(msgpack_object *obj, struct image *image, + int no_image_data); + +#else /* defined(HAVE_MSGPACK) && defined(HAVE_ZMQ) */ + +static UNUSED struct im_zmq *im_zmq_connect(const char *zmq_address) { return NULL; } + +static UNUSED void im_zmq_clean(struct im_zmq *z) { return; } + +static UNUSED void im_zmq_shutdown(struct im_zmq *z) { return; } + +static UNUSED void *im_zmq_fetch(struct im_zmq *z) { return NULL; } + +static UNUSED int get_peaks_msgpack(void *obj, struct image *image, + int half_pixel_shift) { return 0; } + +static UNUSED int unpack_msgpack_data(void *obj, struct image *image, + int no_image_data) { return 1; } + +#endif /* defined(HAVE_MSGPACK) && defined(HAVE_ZMQ) */ + +#endif /* CRYSTFEL_ZMQ_H */ diff --git a/src/indexamajig.c b/src/indexamajig.c index e4568966..739b4969 100644 --- a/src/indexamajig.c +++ b/src/indexamajig.c @@ -86,9 +86,11 @@ static void show_help(const char *s) " --profile Show timing data for performance monitoring\n" " --temp-dir=<path> Put the temporary folder under <path>\n" " --wait-for-file=<n> Time to wait for each file before processing\n" +" --zmq-msgpack Receive data in MessagePack format over ZMQ\n" +" --no-image-data Do not load image data (from ZMQ)\n" "\nPeak search options:\n\n" -" --peaks=<method> Peak search method (zaef,peakfinder8,peakfinder9,hdf5,cxi)\n" -" Default: zaef\n" +" --peaks=<method> Peak search method. Default: zaef\n" +" (zaef,peakfinder8,peakfinder9,hdf5,cxi,msgpack)\n" " --peak-radius=<r> Integration radii for peak search\n" " --min-peaks=<n> Minimum number of peaks for indexing\n" " --hdf5-peaks=<p> Find peaks table in HDF5 file here\n" @@ -287,6 +289,8 @@ int main(int argc, char *argv[]) int if_retry = 1; int serial_start = 1; char *spectrum_fn = NULL; + int zmq = 0; + char *zmq_address = NULL; /* Defaults */ iargs.cell = NULL; @@ -344,6 +348,7 @@ int main(int argc, char *argv[]) iargs.fix_bandwidth = -1.0; iargs.fix_divergence = -1.0; iargs.profile = 0; + iargs.no_image_data = 0; iargs.taketwo_opts.member_thresh = -1; iargs.taketwo_opts.len_tol = -1.0; iargs.taketwo_opts.angle_tol = -1.0; @@ -405,6 +410,8 @@ int main(int argc, char *argv[]) {"no-multi", 0, &if_multi, 0}, {"multi", 0, &if_multi, 1}, {"overpredict", 0, &iargs.overpredict, 1}, + {"zmq-msgpack", 0, &zmq, 1}, + {"no-image-data", 0, &iargs.no_image_data, 1}, /* Long-only options which don't actually do anything */ {"no-sat-corr", 0, &iargs.satcorr, 0}, @@ -975,6 +982,8 @@ int main(int argc, char *argv[]) iargs.peaks = PEAK_CXI; } else if ( strcmp(speaks, "peakfinder9") == 0 ) { iargs.peaks = PEAK_PEAKFINDER9; + } else if ( strcmp(speaks, "msgpack") == 0 ) { + iargs.peaks = PEAK_MSGPACK; } else { ERROR("Unrecognised peak detection method '%s'\n", speaks); return 1; @@ -1269,8 +1278,22 @@ int main(int argc, char *argv[]) gsl_set_error_handler_off(); + if ( zmq ) { + char line[1024]; + char *rval; + rval = fgets(line, 1024, fh); + if ( rval == NULL ) { + ERROR("Failed to read ZMQ server/port from input.\n"); + return 1; + } + chomp(line); + zmq_address = strdup(line); + /* In future, read multiple addresses and hand them out + * evenly to workers */ + } + r = create_sandbox(&iargs, n_proc, prefix, config_basename, fh, - st, tmpdir, serial_start); + st, tmpdir, serial_start, zmq_address); free_imagefile_field_list(iargs.copyme); cell_free(iargs.cell); diff --git a/src/process_image.c b/src/process_image.c index 2f2eb698..a9bf1cdb 100644 --- a/src/process_image.c +++ b/src/process_image.c @@ -3,12 +3,13 @@ * * The processing pipeline for one image * - * Copyright © 2012-2018 Deutsches Elektronen-Synchrotron DESY, + * Copyright © 2012-2019 Deutsches Elektronen-Synchrotron DESY, * a research centre of the Helmholtz Association. * * Authors: - * 2010-2017 Thomas White <taw@physics.org> + * 2010-2019 Thomas White <taw@physics.org> * 2014-2017 Valerio Mariani <valerio.mariani@desy.de> + * 2017 Stijn de Graaf * * This file is part of CrystFEL. * @@ -54,6 +55,7 @@ #include "predict-refine.h" #include "im-sandbox.h" #include "time-accounts.h" +#include "im-zmq.h" static float **backup_image_data(float **dp, struct detector *det) @@ -98,34 +100,16 @@ static void restore_image_data(float **dp, struct detector *det, float **bu) } -void process_image(const struct index_args *iargs, struct pattern_args *pargs, - Stream *st, int cookie, const char *tmpdir, - int serial, struct sb_shm *sb_shared, TimeAccounts *taccs, - char *last_task) +static int file_wait_open_read(struct sb_shm *sb_shared, struct image *image, + TimeAccounts *taccs, char *last_task, + signed int wait_for_file, int cookie, + struct imagefile **pimfile) { - struct imagefile *imfile; - struct image image; - int i; - int r; - int ret; - char *rn; - float **prefilter; - int any_crystals; + signed int file_wait_time = wait_for_file; int wait_message_done = 0; int read_retry_done = 0; - signed int file_wait_time = iargs->wait_for_file; - - image.features = NULL; - image.copyme = iargs->copyme; - image.id = cookie; - image.filename = pargs->filename_p_e->filename; - image.event = pargs->filename_p_e->ev; - image.beam = iargs->beam; - image.det = copy_geom(iargs->det); - image.crystals = NULL; - image.n_crystals = 0; - image.serial = serial; - image.indexed_by = INDEXING_NONE; + int r; + struct imagefile *imfile; time_accounts_set(taccs, TACC_WAITFILE); set_last_task(last_task, "wait for file"); @@ -135,29 +119,27 @@ void process_image(const struct index_args *iargs, struct pattern_args *pargs, struct stat statbuf; sb_shared->pings[cookie]++; - r = stat(image.filename, &statbuf); + r = stat(image->filename, &statbuf); if ( r ) { - if ( (iargs->wait_for_file != 0) - && (file_wait_time != 0) ) - { + if ( (wait_for_file != 0) && (file_wait_time != 0) ) { if ( !wait_message_done ) { STATUS("Waiting for '%s'\n", - image.filename); + image->filename); wait_message_done = 1; } sleep(1); - if ( iargs->wait_for_file != -1 ) { + if ( wait_for_file != -1 ) { file_wait_time--; } continue; } - ERROR("File %s not found\n", image.filename); - return; + ERROR("File %s not found\n", image->filename); + return 1; } } while ( r ); @@ -167,42 +149,83 @@ void process_image(const struct index_args *iargs, struct pattern_args *pargs, sb_shared->pings[cookie]++; do { - imfile = imagefile_open(image.filename); + imfile = imagefile_open(image->filename); if ( imfile == NULL ) { - if ( iargs->wait_for_file && !read_retry_done ) { + if ( wait_for_file && !read_retry_done ) { read_retry_done = 1; r = 1; STATUS("File '%s' exists but could not be opened." " Trying again after 10 seconds.\n", - image.filename); + image->filename); sleep(10); continue; } - ERROR("Couldn't open file: %s\n", image.filename); - return; + ERROR("Couldn't open file: %s\n", image->filename); + return 1; } time_accounts_set(taccs, TACC_HDF5READ); set_last_task(last_task, "read file"); sb_shared->pings[cookie]++; - r = imagefile_read(imfile, &image, image.event); + r = imagefile_read(imfile, image, image->event); if ( r ) { - if ( iargs->wait_for_file && !read_retry_done ) { + if ( wait_for_file && !read_retry_done ) { read_retry_done = 1; imagefile_close(imfile); STATUS("File '%s' exists but could not be read." " Trying again after 10 seconds.\n", - image.filename); + image->filename); sleep(10); continue; } - ERROR("Couldn't open file: %s\n", image.filename); - return; + ERROR("Couldn't open file: %s\n", image->filename); + return 1; } } while ( r ); + *pimfile = imfile; + return 0; +} + + +void process_image(const struct index_args *iargs, struct pattern_args *pargs, + Stream *st, int cookie, const char *tmpdir, + int serial, struct sb_shm *sb_shared, TimeAccounts *taccs, + char *last_task) +{ + struct imagefile *imfile = NULL; + struct image image; + int i; + int r; + int ret; + char *rn; + float **prefilter; + int any_crystals; + + image.features = NULL; + image.copyme = iargs->copyme; + image.id = cookie; + image.beam = iargs->beam; + image.det = copy_geom(iargs->det); + image.crystals = NULL; + image.n_crystals = 0; + image.serial = serial; + image.indexed_by = INDEXING_NONE; + + image.filename = pargs->filename_p_e->filename; + image.event = pargs->filename_p_e->ev; + if ( pargs->msgpack_obj != NULL ) { + set_last_task(last_task, "unpacking messagepack object"); + if ( unpack_msgpack_data(pargs->msgpack_obj, &image, + iargs->no_image_data) ) return; + } else { + if ( file_wait_open_read(sb_shared, &image, taccs, last_task, + iargs->wait_for_file, cookie, + &imfile) ) return; + } + /* Take snapshot of image before applying horrible noise filters */ time_accounts_set(taccs, TACC_FILTER); set_last_task(last_task, "image filter"); @@ -315,6 +338,11 @@ void process_image(const struct index_args *iargs, struct pattern_args *pargs, } break; + case PEAK_MSGPACK: + get_peaks_msgpack(pargs->msgpack_obj, &image, + iargs->half_pixel_shift); + break; + } image.peak_resolution = estimate_peak_resolution(image.features, @@ -473,5 +501,6 @@ out: image_feature_list_free(image.features); free_detector_geometry(image.det); - imagefile_close(imfile); + if ( imfile != NULL ) imagefile_close(imfile); + set_last_task(last_task, "sandbox"); } diff --git a/src/process_image.h b/src/process_image.h index 5939ae4d..8d6682a6 100644 --- a/src/process_image.h +++ b/src/process_image.h @@ -3,11 +3,11 @@ * * The processing pipeline for one image * - * Copyright © 2012-2018 Deutsches Elektronen-Synchrotron DESY, + * Copyright © 2012-2019 Deutsches Elektronen-Synchrotron DESY, * a research centre of the Helmholtz Association. * * Authors: - * 2010-2016 Thomas White <taw@physics.org> + * 2010-2019 Thomas White <taw@physics.org> * 2014-2017 Valerio Mariani <valerio.mariani@desy.de> * 2017-2018 Yaroslav Gevorkov <yaroslav.gevorkov@desy.de> * @@ -37,6 +37,10 @@ struct index_args; +#ifdef HAVE_MSGPACK +#include <msgpack.h> +#endif + #include "integration.h" #include "im-sandbox.h" #include "time-accounts.h" @@ -51,6 +55,7 @@ enum { PEAK_ZAEF, PEAK_HDF5, PEAK_CXI, + PEAK_MSGPACK, }; @@ -114,6 +119,7 @@ struct index_args struct felix_options felix_opts; Spectrum *spectrum; signed int wait_for_file; /* -1 means wait forever */ + int no_image_data; }; @@ -122,6 +128,11 @@ struct pattern_args { /* "Input" */ struct filename_plus_event *filename_p_e; +#ifdef HAVE_MSGPACK + msgpack_object *msgpack_obj; +#else + void *msgpack_obj; +#endif }; |