aboutsummaryrefslogtreecommitdiff
path: root/src/process_image.c
diff options
context:
space:
mode:
authorThomas White <taw@physics.org>2019-01-10 17:02:15 +0100
committerThomas White <taw@physics.org>2019-03-14 11:36:56 +0100
commit4d62f31f90b76bce8b66fe2be6ccccb7b1542209 (patch)
tree14bf58ae1f03e85ffb803ecab4bfb95864cb38e5 /src/process_image.c
parent002d6cfab105095ddc6b1cdfde9eb939c12ca0f8 (diff)
Connect up hooks for unpacking MsgPack data
Diffstat (limited to 'src/process_image.c')
-rw-r--r--src/process_image.c118
1 files changed, 69 insertions, 49 deletions
diff --git a/src/process_image.c b/src/process_image.c
index 7667a05c..afd43d3b 100644
--- a/src/process_image.c
+++ b/src/process_image.c
@@ -39,7 +39,6 @@
#include <gsl/gsl_sort.h>
#include <unistd.h>
#include <sys/stat.h>
-#include <msgpack.h>
#include "utils.h"
#include "hdf5-file.h"
@@ -56,6 +55,7 @@
#include "predict-refine.h"
#include "im-sandbox.h"
#include "time-accounts.h"
+#include "im-zmq.h"
static float **backup_image_data(float **dp, struct detector *det)
@@ -100,39 +100,16 @@ static void restore_image_data(float **dp, struct detector *det, float **bu)
}
-void process_image(const struct index_args *iargs, struct pattern_args *pargs,
- Stream *st, int cookie, const char *tmpdir,
- int serial, struct sb_shm *sb_shared, TimeAccounts *taccs,
- char *last_task)
+static int file_wait_open_read(struct sb_shm *sb_shared, struct image *image,
+ TimeAccounts *taccs, char *last_task,
+ signed int wait_for_file, int cookie,
+ struct imagefile **pimfile)
{
- struct imagefile *imfile;
- struct image image;
- int i;
- int r;
- int ret;
- char *rn;
- float **prefilter;
- int any_crystals;
+ signed int file_wait_time = wait_for_file;
int wait_message_done = 0;
int read_retry_done = 0;
- signed int file_wait_time = iargs->wait_for_file;
-
- image.features = NULL;
- image.copyme = iargs->copyme;
- image.id = cookie;
- image.beam = iargs->beam;
- image.det = copy_geom(iargs->det);
- image.crystals = NULL;
- image.n_crystals = 0;
- image.serial = serial;
- image.indexed_by = INDEXING_NONE;
-
- if ( pargs->filename_p_e != NULL ) {
- image.filename = pargs->filename_p_e->filename;
- image.event = pargs->filename_p_e->ev;
- } else if ( pargs->msgpack_obj != NULL ) {
- STATUS("Msgpack!\n");
- }
+ int r;
+ struct imagefile *imfile;
time_accounts_set(taccs, TACC_WAITFILE);
set_last_task(last_task, "wait for file");
@@ -142,29 +119,27 @@ void process_image(const struct index_args *iargs, struct pattern_args *pargs,
struct stat statbuf;
sb_shared->pings[cookie]++;
- r = stat(image.filename, &statbuf);
+ r = stat(image->filename, &statbuf);
if ( r ) {
- if ( (iargs->wait_for_file != 0)
- && (file_wait_time != 0) )
- {
+ if ( (wait_for_file != 0) && (file_wait_time != 0) ) {
if ( !wait_message_done ) {
STATUS("Waiting for '%s'\n",
- image.filename);
+ image->filename);
wait_message_done = 1;
}
sleep(1);
- if ( iargs->wait_for_file != -1 ) {
+ if ( wait_for_file != -1 ) {
file_wait_time--;
}
continue;
}
- ERROR("File %s not found\n", image.filename);
- return;
+ ERROR("File %s not found\n", image->filename);
+ return 1;
}
} while ( r );
@@ -174,42 +149,82 @@ void process_image(const struct index_args *iargs, struct pattern_args *pargs,
sb_shared->pings[cookie]++;
do {
- imfile = imagefile_open(image.filename);
+ imfile = imagefile_open(image->filename);
if ( imfile == NULL ) {
- if ( iargs->wait_for_file && !read_retry_done ) {
+ if ( wait_for_file && !read_retry_done ) {
read_retry_done = 1;
r = 1;
STATUS("File '%s' exists but could not be opened."
" Trying again after 10 seconds.\n",
- image.filename);
+ image->filename);
sleep(10);
continue;
}
- ERROR("Couldn't open file: %s\n", image.filename);
- return;
+ ERROR("Couldn't open file: %s\n", image->filename);
+ return 1;
}
time_accounts_set(taccs, TACC_HDF5READ);
set_last_task(last_task, "read file");
sb_shared->pings[cookie]++;
- r = imagefile_read(imfile, &image, image.event);
+ r = imagefile_read(imfile, image, image->event);
if ( r ) {
- if ( iargs->wait_for_file && !read_retry_done ) {
+ if ( wait_for_file && !read_retry_done ) {
read_retry_done = 1;
imagefile_close(imfile);
STATUS("File '%s' exists but could not be read."
" Trying again after 10 seconds.\n",
- image.filename);
+ image->filename);
sleep(10);
continue;
}
- ERROR("Couldn't open file: %s\n", image.filename);
- return;
+ ERROR("Couldn't open file: %s\n", image->filename);
+ return 1;
}
} while ( r );
+ *pimfile = imfile;
+ return 0;
+}
+
+
+void process_image(const struct index_args *iargs, struct pattern_args *pargs,
+ Stream *st, int cookie, const char *tmpdir,
+ int serial, struct sb_shm *sb_shared, TimeAccounts *taccs,
+ char *last_task)
+{
+ struct imagefile *imfile;
+ struct image image;
+ int i;
+ int r;
+ int ret;
+ char *rn;
+ float **prefilter;
+ int any_crystals;
+
+ image.features = NULL;
+ image.copyme = iargs->copyme;
+ image.id = cookie;
+ image.beam = iargs->beam;
+ image.det = copy_geom(iargs->det);
+ image.crystals = NULL;
+ image.n_crystals = 0;
+ image.serial = serial;
+ image.indexed_by = INDEXING_NONE;
+
+ if ( pargs->filename_p_e != NULL ) {
+ image.filename = pargs->filename_p_e->filename;
+ image.event = pargs->filename_p_e->ev;
+ if ( file_wait_open_read(sb_shared, &image, taccs, last_task,
+ iargs->wait_for_file, cookie,
+ &imfile) ) return;
+ } else if ( pargs->msgpack_obj != NULL ) {
+ STATUS("Msgpack!\n");
+ if ( unpack_msgpack_data(pargs->msgpack_obj, &image) ) return;
+ }
+
/* Take snapshot of image before applying horrible noise filters */
time_accounts_set(taccs, TACC_FILTER);
set_last_task(last_task, "image filter");
@@ -322,6 +337,11 @@ void process_image(const struct index_args *iargs, struct pattern_args *pargs,
}
break;
+ case PEAK_MSGPACK:
+ get_peaks_msgpack(pargs->msgpack_obj, &image,
+ iargs->half_pixel_shift);
+ break;
+
}
image.peak_resolution = estimate_peak_resolution(image.features,