aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--CMakeLists.txt16
-rw-r--r--libcrystfel/CMakeLists.txt5
-rw-r--r--libcrystfel/src/image-msgpack.c91
-rw-r--r--libcrystfel/src/image-msgpack.h20
-rw-r--r--src/im-sandbox.c13
-rw-r--r--src/im-zmq.c56
-rw-r--r--src/im-zmq.h24
-rw-r--r--src/process_image.c8
-rw-r--r--src/process_image.h7
9 files changed, 102 insertions, 138 deletions
diff --git a/CMakeLists.txt b/CMakeLists.txt
index 6cfaab14..62637a78 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -31,13 +31,6 @@ link_directories (${GLIB_LIBRARY_DIRS})
set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -Wall")
-pkg_search_module(MSGPACK msgpack)
-if (MSGPACK_FOUND)
- message(STATUS "Found Messagepack")
-else ()
- message(STATUS "MessagePack not found.")
-endif ()
-
pkg_search_module(ZMQ libzmq)
if (ZMQ_FOUND)
message(STATUS "Found ZMQ")
@@ -117,7 +110,6 @@ set(HAVE_GTK ${GTK_FOUND})
set(HAVE_OPENCL ${OpenCL_FOUND})
set(HAVE_GDKPIXBUF ${GDKPIXBUF_FOUND})
set(HAVE_GDK ${GDK_FOUND})
-set(HAVE_MSGPACK ${MSGPACK_FOUND})
set(HAVE_ZMQ ${ZMQ_FOUND})
set(PACKAGE_VERSION ${PROJECT_VERSION})
@@ -255,7 +247,7 @@ list(APPEND CRYSTFEL_EXECUTABLES list_events)
set(INDEXAMAJIG_SOURCES src/indexamajig.c src/im-sandbox.c src/process_image.c
src/time-accounts.c)
-if ( ZMQ_FOUND AND MSGPACK_FOUND )
+if ( ZMQ_FOUND )
list(APPEND INDEXAMAJIG_SOURCES src/im-zmq.c)
endif ()
@@ -265,9 +257,9 @@ target_include_directories(indexamajig PRIVATE ${COMMON_INCLUDES})
target_link_libraries(indexamajig ${COMMON_LIBRARIES})
list(APPEND CRYSTFEL_EXECUTABLES indexamajig)
-if ( ZMQ_FOUND AND MSGPACK_FOUND )
- target_include_directories(indexamajig PRIVATE ${ZMQ_INCLUDE_DIR} ${MSGPACK_INCLUDE_DIR})
- target_link_libraries(indexamajig ${ZMQ_LIBRARIES} ${MSGPACK_LIBRARIES})
+if ( ZMQ_FOUND )
+ target_include_directories(indexamajig PRIVATE ${ZMQ_INCLUDE_DIR})
+ target_link_libraries(indexamajig ${ZMQ_LIBRARIES})
endif ()
diff --git a/libcrystfel/CMakeLists.txt b/libcrystfel/CMakeLists.txt
index fa963c22..7bf301f0 100644
--- a/libcrystfel/CMakeLists.txt
+++ b/libcrystfel/CMakeLists.txt
@@ -177,6 +177,11 @@ if (LIBCCP4_FOUND)
target_link_libraries(${PROJECT_NAME} PRIVATE ${LIBCCP4_LIBRARIES})
endif (LIBCCP4_FOUND)
+if (MSGPACK_FOUND)
+ target_include_directories(${PROJECT_NAME} PRIVATE ${MSGPACK_INCLUDES})
+ target_link_libraries(${PROJECT_NAME} PRIVATE ${MSGPACK_LIBRARIES})
+endif (MSGPACK_FOUND)
+
target_compile_options(${PROJECT_NAME} PRIVATE -Wall)
set_target_properties(${PROJECT_NAME} PROPERTIES PUBLIC_HEADER "${LIBCRYSTFEL_HEADERS}")
diff --git a/libcrystfel/src/image-msgpack.c b/libcrystfel/src/image-msgpack.c
index 420ecfb4..2c9947a9 100644
--- a/libcrystfel/src/image-msgpack.c
+++ b/libcrystfel/src/image-msgpack.c
@@ -7,7 +7,7 @@
* a research centre of the Helmholtz Association.
*
* Authors:
- * 2018-2020 Thomas White <taw@physics.org>
+ * 2018-2021 Thomas White <taw@physics.org>
* 2014 Valerio Mariani
* 2017 Stijn de Graaf
*
@@ -210,49 +210,36 @@ static int unpack_slab(struct image *image,
static double *find_msgpack_data(msgpack_object *obj, int *width, int *height)
{
- msgpack_object *corr_data_obj;
- msgpack_object *data_obj;
- msgpack_object *shape_obj;
- double *data;
-
- corr_data_obj = find_msgpack_kv(obj, "corr_data");
- if ( corr_data_obj == NULL ) {
- ERROR("No corr_data MessagePack object found.\n");
- return NULL;
+ FILE *fh = fopen("msgpack.data", "a");
+ fprintf(fh, "object %p:\n", obj);
+ msgpack_object_print(fh, *obj);
+ fprintf(fh, "\n\n\n");
+ fclose(fh);
+
+ #if 0
+ printf("Data type: %i\n", obj->type);
+ if ( obj->type == MSGPACK_OBJECT_POSITIVE_INTEGER ) {
+ printf("got an integer: %li\n", obj->via.i64);
}
- data_obj = find_msgpack_kv(corr_data_obj, "data");
- if ( data_obj == NULL ) {
- ERROR("No data MessagePack object found inside corr_data.\n");
- return NULL;
- }
- if ( data_obj->type != MSGPACK_OBJECT_STR ) {
- ERROR("corr_data.data isn't a binary object.\n");
- return NULL;
- }
- data = (double *)data_obj->via.str.ptr;
+ if ( obj->type == MSGPACK_OBJECT_ARRAY ) {
- shape_obj = find_msgpack_kv(corr_data_obj, "shape");
- if ( shape_obj == NULL ) {
- ERROR("No shape MessagePack object found inside corr_data.\n");
- return NULL;
- }
- if ( shape_obj->type != MSGPACK_OBJECT_ARRAY ) {
- ERROR("corr_data.shape isn't an array object.\n");
- return NULL;
- }
- if ( shape_obj->via.array.size != 2 ) {
- ERROR("corr_data.shape is wrong size (%i, should be 2)\n",
- shape_obj->via.array.size);
- return NULL;
- }
- if ( shape_obj->via.array.ptr[0].type != MSGPACK_OBJECT_POSITIVE_INTEGER ) {
- ERROR("corr_data.shape contains wrong type of element.\n");
- return NULL;
+ int i;
+ printf("Array %i items\n", obj->via.array.size);
+
+ for ( i=0; i<obj->via.array.size; i++ ) {
+ msgpack_object *obj2 = obj->via.array.ptr[i];
+ printf("Item %i: type %i\n", i, obj2->type);
+ if ( obj2->type == MSGPACK_OBJECT_MAP ) {
+ printf("Map: '%s' -> ");
+ }
+ }
}
- *height = shape_obj->via.array.ptr[0].via.i64;
- *width = shape_obj->via.array.ptr[1].via.i64;
- return data;
+ #endif
+
+ *width = 2068;
+ *height = 2162;
+ return NULL;
}
@@ -272,15 +259,18 @@ static double *find_msgpack_data(msgpack_object *obj, int *width, int *height)
* }
*/
struct image *image_msgpack_read(DataTemplate *dtempl,
- msgpack_object *obj,
+ void *data,
+ size_t data_size,
int no_image_data,
int no_mask_data)
{
struct image *image;
int data_width, data_height;
- double *data;
+ double *image_data;
+ msgpack_unpacked unpacked;
+ int r;
- if ( obj == NULL ) {
+ if ( data == NULL ) {
ERROR("No MessagePack object!\n");
return NULL;
}
@@ -290,6 +280,13 @@ struct image *image_msgpack_read(DataTemplate *dtempl,
return NULL;
}
+ msgpack_unpacked_init(&unpacked);
+ r = msgpack_unpack_next(&unpacked, data, data_size, NULL);
+ if ( r != MSGPACK_UNPACK_SUCCESS ) {
+ ERROR("Msgpack unpack failed: %i\n", r);
+ return NULL;
+ }
+
image = image_new();
if ( image == NULL ) {
ERROR("Couldn't allocate image structure.\n");
@@ -297,13 +294,13 @@ struct image *image_msgpack_read(DataTemplate *dtempl,
}
if ( !no_image_data ) {
- data = find_msgpack_data(obj,
- &data_width, &data_height);
- if ( data == NULL ) {
+ image_data = find_msgpack_data(&unpacked.data,
+ &data_width, &data_height);
+ if ( image_data == NULL ) {
ERROR("No image data in MessagePack object.\n");
return NULL;
}
- unpack_slab(image, dtempl, data,
+ unpack_slab(image, dtempl, image_data,
data_width, data_height);
} else {
image_set_zero_data(image, dtempl);
diff --git a/libcrystfel/src/image-msgpack.h b/libcrystfel/src/image-msgpack.h
index a8e5af34..06fcffec 100644
--- a/libcrystfel/src/image-msgpack.h
+++ b/libcrystfel/src/image-msgpack.h
@@ -3,11 +3,11 @@
*
* Image loading, MessagePack parts
*
- * Copyright © 2012-2020 Deutsches Elektronen-Synchrotron DESY,
+ * Copyright © 2012-2021 Deutsches Elektronen-Synchrotron DESY,
* a research centre of the Helmholtz Association.
*
* Authors:
- * 2020 Thomas White <taw@physics.org>
+ * 2020-2021 Thomas White <taw@physics.org>
*
* This file is part of CrystFEL.
*
@@ -33,27 +33,29 @@
#if defined(HAVE_MSGPACK)
-#include <msgpack.h>
-
extern struct image *image_msgpack_read(DataTemplate *dtempl,
- msgpack_object *obj,
+ void *data,
+ size_t data_size,
int no_image_data);
extern ImageFeatureList *image_msgpack_read_peaks(const DataTemplate *dtempl,
- msgpack_object *obj,
+ void *data,
+ size_t data_size,
int half_pixel_shift);
#else /* defined(HAVE_MSGPACK) */
static UNUSED struct image *image_msgpack_read(DataTemplate *dtempl,
- void *obj,
- int no_image_data)
+ void *data,
+ size_t data_size,
+ int no_image_data)
{
return NULL;
}
static UNUSED ImageFeatureList *image_msgpack_read_peaks(const DataTemplate *dtempl,
- void *obj,
+ void *data,
+ size_t data_size,
int half_pixel_shift)
{
return NULL;
diff --git a/src/im-sandbox.c b/src/im-sandbox.c
index 6615c418..eb0b942f 100644
--- a/src/im-sandbox.c
+++ b/src/im-sandbox.c
@@ -427,14 +427,16 @@ static int run_work(const struct index_args *iargs, Stream *st,
free(line);
- pargs.msgpack_obj = NULL;
+ pargs.zmq_data = NULL;
+ pargs.zmq_data_size = 0;
} else {
- pargs.msgpack_obj = im_zmq_fetch(zmqstuff);
+ pargs.zmq_data = im_zmq_fetch(zmqstuff,
+ &pargs.zmq_data_size);
pargs.filename = strdup("(from ZMQ)");
pargs.event = NULL;
- ser = 0; /* FIXME */
+ ser = 0; /* FIXME: Serial numbers from ZMQ? */
}
@@ -442,10 +444,7 @@ static int run_work(const struct index_args *iargs, Stream *st,
process_image(iargs, &pargs, st, cookie, tmpdir, ser,
sb->shared, taccs, sb->shared->last_task[cookie]);
- if ( sb->zmq ) {
- im_zmq_clean(zmqstuff);
- }
-
+ free(pargs.zmq_data);
}
im_zmq_shutdown(zmqstuff);
diff --git a/src/im-zmq.c b/src/im-zmq.c
index dea8515b..5c9e90bc 100644
--- a/src/im-zmq.c
+++ b/src/im-zmq.c
@@ -7,7 +7,7 @@
* a research centre of the Helmholtz Association.
*
* Authors:
- * 2018-2020 Thomas White <taw@physics.org>
+ * 2018-2021 Thomas White <taw@physics.org>
* 2014 Valerio Mariani
* 2017 Stijn de Graaf
*
@@ -38,7 +38,6 @@
#include <assert.h>
#include <unistd.h>
#include <zmq.h>
-#include <msgpack.h>
#include <image.h>
#include <utils.h>
@@ -53,8 +52,6 @@ struct im_zmq
void *ctx;
void *socket;
zmq_msg_t msg;
- msgpack_unpacked unpacked;
- int unpacked_set;
};
@@ -65,12 +62,10 @@ struct im_zmq *im_zmq_connect(const char *zmq_address)
z = malloc(sizeof(struct im_zmq));
if ( z == NULL ) return NULL;
- z->unpacked_set = 0;
-
z->ctx = zmq_ctx_new();
if ( z->ctx == NULL ) return NULL;
- z->socket = zmq_socket(z->ctx, ZMQ_REQ);
+ z->socket = zmq_socket(z->ctx, ZMQ_SUB);
if ( z->socket == NULL ) return NULL;
STATUS("Connecting to ZMQ at '%s'\n", zmq_address);
@@ -78,60 +73,45 @@ struct im_zmq *im_zmq_connect(const char *zmq_address)
ERROR("ZMQ connection failed: %s\n", zmq_strerror(errno));
return NULL;
}
- STATUS("ZMQ connected.\n");
+ STATUS("ZMQ connected. Subscribing to 'ondaframedata'\n");
+
+ if ( zmq_setsockopt(z->socket, ZMQ_SUBSCRIBE, "ondaframedata", 13) ) {
+ ERROR("ZMQ subscription failed: %s\n", zmq_strerror(errno));
+ return NULL;
+ }
return z;
}
-msgpack_object *im_zmq_fetch(struct im_zmq *z)
+void *im_zmq_fetch(struct im_zmq *z, size_t *pdata_size)
{
int msg_size;
- int r;
-
- if ( zmq_send(z->socket, "m", 1, 0) == -1 ) {
- ERROR("ZMQ message send failed: %s\n", zmq_strerror(errno));
- return NULL;
- }
+ void *data_copy;
zmq_msg_init(&z->msg);
+ STATUS("requesting data...\n");
msg_size = zmq_msg_recv(&z->msg, z->socket, 0);
+ STATUS("done (got %i bytes)\n", msg_size);
if ( msg_size == -1 ) {
ERROR("ZMQ recieve failed: %s\n", zmq_strerror(errno));
zmq_msg_close(&z->msg);
return NULL;
}
- msgpack_unpacked_init(&z->unpacked);
- r = msgpack_unpack_next(&z->unpacked, zmq_msg_data(&z->msg),
- msg_size, NULL);
- if ( r != MSGPACK_UNPACK_SUCCESS ) {
- ERROR("Msgpack unpack failed: %i\n", r);
- zmq_msg_close(&z->msg);
- return NULL;
- }
- z->unpacked_set = 1;
+ data_copy = malloc(msg_size);
+ if ( data_copy == NULL ) return NULL;
+ memcpy(data_copy, zmq_msg_data(&z->msg), msg_size);
- return &z->unpacked.data;
-}
-
-
-/* Clean structures ready for next frame */
-void im_zmq_clean(struct im_zmq *z)
-{
- if ( z->unpacked_set ) {
- msgpack_unpacked_destroy(&z->unpacked);
- zmq_msg_close(&z->msg);
- z->unpacked_set = 0;
- }
+ zmq_msg_close(&z->msg);
+ *pdata_size = msg_size;
+ return data_copy;
}
void im_zmq_shutdown(struct im_zmq *z)
{
if ( z == NULL ) return;
-
- zmq_msg_close(&z->msg);
zmq_close(z->socket);
zmq_ctx_destroy(z->ctx);
}
diff --git a/src/im-zmq.h b/src/im-zmq.h
index 87128895..c6bad6cc 100644
--- a/src/im-zmq.h
+++ b/src/im-zmq.h
@@ -7,7 +7,7 @@
* a research centre of the Helmholtz Association.
*
* Authors:
- * 2018-2019 Thomas White <taw@physics.org>
+ * 2018-2021 Thomas White <taw@physics.org>
* 2014 Valerio Mariani
* 2017 Stijn de Graaf
*
@@ -36,28 +36,18 @@
#include <config.h>
#endif
-#if defined(HAVE_MSGPACK) && defined(HAVE_ZMQ)
-
-#include <msgpack.h>
+#if defined(HAVE_ZMQ)
extern struct im_zmq *im_zmq_connect(const char *zmq_address);
-
-extern void im_zmq_clean(struct im_zmq *z);
-
extern void im_zmq_shutdown(struct im_zmq *z);
+extern void *im_zmq_fetch(struct im_zmq *z, size_t *pdata_size);
-extern msgpack_object *im_zmq_fetch(struct im_zmq *z);
-
-#else /* defined(HAVE_MSGPACK) && defined(HAVE_ZMQ) */
+#else /* defined(HAVE_ZMQ) */
static UNUSED struct im_zmq *im_zmq_connect(const char *zmq_address) { return NULL; }
+static UNUSED void im_zmq_shutdown(struct im_zmq *z) { }
+static UNUSED void *im_zmq_fetch(struct im_zmq *z, size_t *psize) { *psize = 0; return NULL; }
-static UNUSED void im_zmq_clean(struct im_zmq *z) { return; }
-
-static UNUSED void im_zmq_shutdown(struct im_zmq *z) { return; }
-
-static UNUSED void *im_zmq_fetch(struct im_zmq *z) { return NULL; }
-
-#endif /* defined(HAVE_MSGPACK) && defined(HAVE_ZMQ) */
+#endif /* defined(HAVE_ZMQ) */
#endif /* CRYSTFEL_ZMQ_H */
diff --git a/src/process_image.c b/src/process_image.c
index 825e57d9..52e0a9e0 100644
--- a/src/process_image.c
+++ b/src/process_image.c
@@ -193,10 +193,11 @@ void process_image(const struct index_args *iargs, struct pattern_args *pargs,
float **prefilter;
int any_crystals;
- if ( pargs->msgpack_obj != NULL ) {
+ if ( pargs->zmq_data != NULL ) {
set_last_task(last_task, "unpacking messagepack object");
image = image_msgpack_read(iargs->dtempl,
- pargs->msgpack_obj,
+ pargs->zmq_data,
+ pargs->zmq_data_size,
iargs->no_image_data);
if ( image == NULL ) return;
} else {
@@ -303,7 +304,8 @@ void process_image(const struct index_args *iargs, struct pattern_args *pargs,
case PEAK_MSGPACK:
image->features = image_msgpack_read_peaks(iargs->dtempl,
- pargs->msgpack_obj,
+ pargs->zmq_data,
+ pargs->zmq_data_size,
iargs->half_pixel_shift);
break;
diff --git a/src/process_image.h b/src/process_image.h
index e8be0c29..ec7e22e2 100644
--- a/src/process_image.h
+++ b/src/process_image.h
@@ -119,11 +119,8 @@ struct pattern_args
/* "Input" */
char *filename;
char *event;
-#ifdef HAVE_MSGPACK
- msgpack_object *msgpack_obj;
-#else
- void *msgpack_obj;
-#endif
+ void *zmq_data;
+ size_t zmq_data_size;
};