From 4d62f31f90b76bce8b66fe2be6ccccb7b1542209 Mon Sep 17 00:00:00 2001 From: Thomas White Date: Thu, 10 Jan 2019 17:02:15 +0100 Subject: Connect up hooks for unpacking MsgPack data --- src/im-zmq.c | 8 ++-- src/im-zmq.h | 6 +-- src/indexamajig.c | 2 + src/process_image.c | 118 ++++++++++++++++++++++++++++++---------------------- src/process_image.h | 1 + 5 files changed, 79 insertions(+), 56 deletions(-) (limited to 'src') diff --git a/src/im-zmq.c b/src/im-zmq.c index c299b980..f501095a 100644 --- a/src/im-zmq.c +++ b/src/im-zmq.c @@ -136,7 +136,7 @@ void im_zmq_shutdown(struct im_zmq *z) } /** - * get_peaks_onda: + * get_peaks_msgpack: * @obj: A %msgpack_object containing data in OnDA format * @image: An %image structure * @half_pixel_shift: Non-zero if 0.5 should be added to all peak coordinates @@ -158,8 +158,8 @@ void im_zmq_shutdown(struct im_zmq *z) * Returns: non-zero on error, zero otherwise. * */ -int get_peaks_onda(msgpack_object *obj, struct image *image, - int half_pixel_shift) +int get_peaks_msgpack(msgpack_object *obj, struct image *image, + int half_pixel_shift) { int num_peaks; @@ -273,7 +273,7 @@ static void onda_fill_in_beam_parameters(struct beam_params *beam, * ... * } */ -int obj_read(msgpack_object *obj, struct image *image) +int unpack_msgpack_data(msgpack_object *obj, struct image *image) { uint16_t *flags = NULL; diff --git a/src/im-zmq.h b/src/im-zmq.h index e6abe562..eb2c6068 100644 --- a/src/im-zmq.h +++ b/src/im-zmq.h @@ -48,10 +48,10 @@ extern void im_zmq_clean(struct im_zmq *z); extern void im_zmq_shutdown(struct im_zmq *z); -extern int get_peaks_onda(msgpack_object *obj, struct image *image, - int half_pixel_shift); +extern int get_peaks_msgpack(msgpack_object *obj, struct image *image, + int half_pixel_shift); -extern int obj_read(msgpack_object *obj, struct image *image); +extern int unpack_msgpack_data(msgpack_object *obj, struct image *image); #endif /* CRYSTFEL_ZMQ_H */ diff --git a/src/indexamajig.c b/src/indexamajig.c index 356d0c94..37cec295 100644 --- a/src/indexamajig.c +++ b/src/indexamajig.c @@ -1006,6 +1006,8 @@ int main(int argc, char *argv[]) iargs.peaks = PEAK_CXI; } else if ( strcmp(speaks, "peakfinder9") == 0 ) { iargs.peaks = PEAK_PEAKFINDER9; + } else if ( strcmp(speaks, "msgpack") == 0 ) { + iargs.peaks = PEAK_MSGPACK; } else { ERROR("Unrecognised peak detection method '%s'\n", speaks); return 1; diff --git a/src/process_image.c b/src/process_image.c index 7667a05c..afd43d3b 100644 --- a/src/process_image.c +++ b/src/process_image.c @@ -39,7 +39,6 @@ #include #include #include -#include #include "utils.h" #include "hdf5-file.h" @@ -56,6 +55,7 @@ #include "predict-refine.h" #include "im-sandbox.h" #include "time-accounts.h" +#include "im-zmq.h" static float **backup_image_data(float **dp, struct detector *det) @@ -100,39 +100,16 @@ static void restore_image_data(float **dp, struct detector *det, float **bu) } -void process_image(const struct index_args *iargs, struct pattern_args *pargs, - Stream *st, int cookie, const char *tmpdir, - int serial, struct sb_shm *sb_shared, TimeAccounts *taccs, - char *last_task) +static int file_wait_open_read(struct sb_shm *sb_shared, struct image *image, + TimeAccounts *taccs, char *last_task, + signed int wait_for_file, int cookie, + struct imagefile **pimfile) { - struct imagefile *imfile; - struct image image; - int i; - int r; - int ret; - char *rn; - float **prefilter; - int any_crystals; + signed int file_wait_time = wait_for_file; int wait_message_done = 0; int read_retry_done = 0; - signed int file_wait_time = iargs->wait_for_file; - - image.features = NULL; - image.copyme = iargs->copyme; - image.id = cookie; - image.beam = iargs->beam; - image.det = copy_geom(iargs->det); - image.crystals = NULL; - image.n_crystals = 0; - image.serial = serial; - image.indexed_by = INDEXING_NONE; - - if ( pargs->filename_p_e != NULL ) { - image.filename = pargs->filename_p_e->filename; - image.event = pargs->filename_p_e->ev; - } else if ( pargs->msgpack_obj != NULL ) { - STATUS("Msgpack!\n"); - } + int r; + struct imagefile *imfile; time_accounts_set(taccs, TACC_WAITFILE); set_last_task(last_task, "wait for file"); @@ -142,29 +119,27 @@ void process_image(const struct index_args *iargs, struct pattern_args *pargs, struct stat statbuf; sb_shared->pings[cookie]++; - r = stat(image.filename, &statbuf); + r = stat(image->filename, &statbuf); if ( r ) { - if ( (iargs->wait_for_file != 0) - && (file_wait_time != 0) ) - { + if ( (wait_for_file != 0) && (file_wait_time != 0) ) { if ( !wait_message_done ) { STATUS("Waiting for '%s'\n", - image.filename); + image->filename); wait_message_done = 1; } sleep(1); - if ( iargs->wait_for_file != -1 ) { + if ( wait_for_file != -1 ) { file_wait_time--; } continue; } - ERROR("File %s not found\n", image.filename); - return; + ERROR("File %s not found\n", image->filename); + return 1; } } while ( r ); @@ -174,42 +149,82 @@ void process_image(const struct index_args *iargs, struct pattern_args *pargs, sb_shared->pings[cookie]++; do { - imfile = imagefile_open(image.filename); + imfile = imagefile_open(image->filename); if ( imfile == NULL ) { - if ( iargs->wait_for_file && !read_retry_done ) { + if ( wait_for_file && !read_retry_done ) { read_retry_done = 1; r = 1; STATUS("File '%s' exists but could not be opened." " Trying again after 10 seconds.\n", - image.filename); + image->filename); sleep(10); continue; } - ERROR("Couldn't open file: %s\n", image.filename); - return; + ERROR("Couldn't open file: %s\n", image->filename); + return 1; } time_accounts_set(taccs, TACC_HDF5READ); set_last_task(last_task, "read file"); sb_shared->pings[cookie]++; - r = imagefile_read(imfile, &image, image.event); + r = imagefile_read(imfile, image, image->event); if ( r ) { - if ( iargs->wait_for_file && !read_retry_done ) { + if ( wait_for_file && !read_retry_done ) { read_retry_done = 1; imagefile_close(imfile); STATUS("File '%s' exists but could not be read." " Trying again after 10 seconds.\n", - image.filename); + image->filename); sleep(10); continue; } - ERROR("Couldn't open file: %s\n", image.filename); - return; + ERROR("Couldn't open file: %s\n", image->filename); + return 1; } } while ( r ); + *pimfile = imfile; + return 0; +} + + +void process_image(const struct index_args *iargs, struct pattern_args *pargs, + Stream *st, int cookie, const char *tmpdir, + int serial, struct sb_shm *sb_shared, TimeAccounts *taccs, + char *last_task) +{ + struct imagefile *imfile; + struct image image; + int i; + int r; + int ret; + char *rn; + float **prefilter; + int any_crystals; + + image.features = NULL; + image.copyme = iargs->copyme; + image.id = cookie; + image.beam = iargs->beam; + image.det = copy_geom(iargs->det); + image.crystals = NULL; + image.n_crystals = 0; + image.serial = serial; + image.indexed_by = INDEXING_NONE; + + if ( pargs->filename_p_e != NULL ) { + image.filename = pargs->filename_p_e->filename; + image.event = pargs->filename_p_e->ev; + if ( file_wait_open_read(sb_shared, &image, taccs, last_task, + iargs->wait_for_file, cookie, + &imfile) ) return; + } else if ( pargs->msgpack_obj != NULL ) { + STATUS("Msgpack!\n"); + if ( unpack_msgpack_data(pargs->msgpack_obj, &image) ) return; + } + /* Take snapshot of image before applying horrible noise filters */ time_accounts_set(taccs, TACC_FILTER); set_last_task(last_task, "image filter"); @@ -322,6 +337,11 @@ void process_image(const struct index_args *iargs, struct pattern_args *pargs, } break; + case PEAK_MSGPACK: + get_peaks_msgpack(pargs->msgpack_obj, &image, + iargs->half_pixel_shift); + break; + } image.peak_resolution = estimate_peak_resolution(image.features, diff --git a/src/process_image.h b/src/process_image.h index b61fe83f..54d97f77 100644 --- a/src/process_image.h +++ b/src/process_image.h @@ -53,6 +53,7 @@ enum { PEAK_ZAEF, PEAK_HDF5, PEAK_CXI, + PEAK_MSGPACK, }; -- cgit v1.2.3