aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorThomas White <taw@physics.org>2021-04-15 15:30:25 +0200
committerThomas White <taw@physics.org>2021-04-15 16:43:40 +0200
commitafcb7b568947c20fb3477a178be5aefe3203b603 (patch)
tree25d3d8a475bd4353083ce00f893e766525e1e625
parent0dcd6c7e2fbfe78e4d2f26e01de0d4ea032d8fd6 (diff)
Separate ZMQ from MessagePack, switch to pub/sub socket
Indexamajig uses only ZMQ, to receive streaming data, while libcrystfel uses only msgpack to implement another type of image format. The two of these are eventually tied together in process_image, which does this: if ( have_zmq_data ) interpret_zmq_data_as_msgpack; - however, they would be easy to split up if we wanted to do something else (CBF data over ZMQ, anyone?). This commit also switches the ZMQ connector to use a pub/sub socket instead of a request/reply one. This matches changes in OnDA. At the moment, the MessagePack image reader simply dumps the object to disk.
-rw-r--r--CMakeLists.txt16
-rw-r--r--libcrystfel/CMakeLists.txt5
-rw-r--r--libcrystfel/src/image-msgpack.c91
-rw-r--r--libcrystfel/src/image-msgpack.h20
-rw-r--r--src/im-sandbox.c13
-rw-r--r--src/im-zmq.c56
-rw-r--r--src/im-zmq.h24
-rw-r--r--src/process_image.c8
-rw-r--r--src/process_image.h7
9 files changed, 102 insertions, 138 deletions
diff --git a/CMakeLists.txt b/CMakeLists.txt
index 6cfaab14..62637a78 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -31,13 +31,6 @@ link_directories (${GLIB_LIBRARY_DIRS})
set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -Wall")
-pkg_search_module(MSGPACK msgpack)
-if (MSGPACK_FOUND)
- message(STATUS "Found Messagepack")
-else ()
- message(STATUS "MessagePack not found.")
-endif ()
-
pkg_search_module(ZMQ libzmq)
if (ZMQ_FOUND)
message(STATUS "Found ZMQ")
@@ -117,7 +110,6 @@ set(HAVE_GTK ${GTK_FOUND})
set(HAVE_OPENCL ${OpenCL_FOUND})
set(HAVE_GDKPIXBUF ${GDKPIXBUF_FOUND})
set(HAVE_GDK ${GDK_FOUND})
-set(HAVE_MSGPACK ${MSGPACK_FOUND})
set(HAVE_ZMQ ${ZMQ_FOUND})
set(PACKAGE_VERSION ${PROJECT_VERSION})
@@ -255,7 +247,7 @@ list(APPEND CRYSTFEL_EXECUTABLES list_events)
set(INDEXAMAJIG_SOURCES src/indexamajig.c src/im-sandbox.c src/process_image.c
src/time-accounts.c)
-if ( ZMQ_FOUND AND MSGPACK_FOUND )
+if ( ZMQ_FOUND )
list(APPEND INDEXAMAJIG_SOURCES src/im-zmq.c)
endif ()
@@ -265,9 +257,9 @@ target_include_directories(indexamajig PRIVATE ${COMMON_INCLUDES})
target_link_libraries(indexamajig ${COMMON_LIBRARIES})
list(APPEND CRYSTFEL_EXECUTABLES indexamajig)
-if ( ZMQ_FOUND AND MSGPACK_FOUND )
- target_include_directories(indexamajig PRIVATE ${ZMQ_INCLUDE_DIR} ${MSGPACK_INCLUDE_DIR})
- target_link_libraries(indexamajig ${ZMQ_LIBRARIES} ${MSGPACK_LIBRARIES})
+if ( ZMQ_FOUND )
+ target_include_directories(indexamajig PRIVATE ${ZMQ_INCLUDE_DIR})
+ target_link_libraries(indexamajig ${ZMQ_LIBRARIES})
endif ()
diff --git a/libcrystfel/CMakeLists.txt b/libcrystfel/CMakeLists.txt
index fa963c22..7bf301f0 100644
--- a/libcrystfel/CMakeLists.txt
+++ b/libcrystfel/CMakeLists.txt
@@ -177,6 +177,11 @@ if (LIBCCP4_FOUND)
target_link_libraries(${PROJECT_NAME} PRIVATE ${LIBCCP4_LIBRARIES})
endif (LIBCCP4_FOUND)
+if (MSGPACK_FOUND)
+ target_include_directories(${PROJECT_NAME} PRIVATE ${MSGPACK_INCLUDES})
+ target_link_libraries(${PROJECT_NAME} PRIVATE ${MSGPACK_LIBRARIES})
+endif (MSGPACK_FOUND)
+
target_compile_options(${PROJECT_NAME} PRIVATE -Wall)
set_target_properties(${PROJECT_NAME} PROPERTIES PUBLIC_HEADER "${LIBCRYSTFEL_HEADERS}")
diff --git a/libcrystfel/src/image-msgpack.c b/libcrystfel/src/image-msgpack.c
index 420ecfb4..2c9947a9 100644
--- a/libcrystfel/src/image-msgpack.c
+++ b/libcrystfel/src/image-msgpack.c
@@ -7,7 +7,7 @@
* a research centre of the Helmholtz Association.
*
* Authors:
- * 2018-2020 Thomas White <taw@physics.org>
+ * 2018-2021 Thomas White <taw@physics.org>
* 2014 Valerio Mariani
* 2017 Stijn de Graaf
*
@@ -210,49 +210,36 @@ static int unpack_slab(struct image *image,
static double *find_msgpack_data(msgpack_object *obj, int *width, int *height)
{
- msgpack_object *corr_data_obj;
- msgpack_object *data_obj;
- msgpack_object *shape_obj;
- double *data;
-
- corr_data_obj = find_msgpack_kv(obj, "corr_data");
- if ( corr_data_obj == NULL ) {
- ERROR("No corr_data MessagePack object found.\n");
- return NULL;
+ FILE *fh = fopen("msgpack.data", "a");
+ fprintf(fh, "object %p:\n", obj);
+ msgpack_object_print(fh, *obj);
+ fprintf(fh, "\n\n\n");
+ fclose(fh);
+
+ #if 0
+ printf("Data type: %i\n", obj->type);
+ if ( obj->type == MSGPACK_OBJECT_POSITIVE_INTEGER ) {
+ printf("got an integer: %li\n", obj->via.i64);
}
- data_obj = find_msgpack_kv(corr_data_obj, "data");
- if ( data_obj == NULL ) {
- ERROR("No data MessagePack object found inside corr_data.\n");
- return NULL;
- }
- if ( data_obj->type != MSGPACK_OBJECT_STR ) {
- ERROR("corr_data.data isn't a binary object.\n");
- return NULL;
- }
- data = (double *)data_obj->via.str.ptr;
+ if ( obj->type == MSGPACK_OBJECT_ARRAY ) {
- shape_obj = find_msgpack_kv(corr_data_obj, "shape");
- if ( shape_obj == NULL ) {
- ERROR("No shape MessagePack object found inside corr_data.\n");
- return NULL;
- }
- if ( shape_obj->type != MSGPACK_OBJECT_ARRAY ) {
- ERROR("corr_data.shape isn't an array object.\n");
- return NULL;
- }
- if ( shape_obj->via.array.size != 2 ) {
- ERROR("corr_data.shape is wrong size (%i, should be 2)\n",
- shape_obj->via.array.size);
- return NULL;
- }
- if ( shape_obj->via.array.ptr[0].type != MSGPACK_OBJECT_POSITIVE_INTEGER ) {
- ERROR("corr_data.shape contains wrong type of element.\n");
- return NULL;
+ int i;
+ printf("Array %i items\n", obj->via.array.size);
+
+ for ( i=0; i<obj->via.array.size; i++ ) {
+ msgpack_object *obj2 = obj->via.array.ptr[i];
+ printf("Item %i: type %i\n", i, obj2->type);
+ if ( obj2->type == MSGPACK_OBJECT_MAP ) {
+ printf("Map: '%s' -> ");
+ }
+ }
}
- *height = shape_obj->via.array.ptr[0].via.i64;
- *width = shape_obj->via.array.ptr[1].via.i64;
- return data;
+ #endif
+
+ *width = 2068;
+ *height = 2162;
+ return NULL;
}
@@ -272,15 +259,18 @@ static double *find_msgpack_data(msgpack_object *obj, int *width, int *height)
* }
*/
struct image *image_msgpack_read(DataTemplate *dtempl,
- msgpack_object *obj,
+ void *data,
+ size_t data_size,
int no_image_data,
int no_mask_data)
{
struct image *image;
int data_width, data_height;
- double *data;
+ double *image_data;
+ msgpack_unpacked unpacked;
+ int r;
- if ( obj == NULL ) {
+ if ( data == NULL ) {
ERROR("No MessagePack object!\n");
return NULL;
}
@@ -290,6 +280,13 @@ struct image *image_msgpack_read(DataTemplate *dtempl,
return NULL;
}
+ msgpack_unpacked_init(&unpacked);
+ r = msgpack_unpack_next(&unpacked, data, data_size, NULL);
+ if ( r != MSGPACK_UNPACK_SUCCESS ) {
+ ERROR("Msgpack unpack failed: %i\n", r);
+ return NULL;
+ }
+
image = image_new();
if ( image == NULL ) {
ERROR("Couldn't allocate image structure.\n");
@@ -297,13 +294,13 @@ struct image *image_msgpack_read(DataTemplate *dtempl,
}
if ( !no_image_data ) {
- data = find_msgpack_data(obj,
- &data_width, &data_height);
- if ( data == NULL ) {
+ image_data = find_msgpack_data(&unpacked.data,
+ &data_width, &data_height);
+ if ( image_data == NULL ) {
ERROR("No image data in MessagePack object.\n");
return NULL;
}
- unpack_slab(image, dtempl, data,
+ unpack_slab(image, dtempl, image_data,
data_width, data_height);
} else {
image_set_zero_data(image, dtempl);
diff --git a/libcrystfel/src/image-msgpack.h b/libcrystfel/src/image-msgpack.h
index a8e5af34..06fcffec 100644
--- a/libcrystfel/src/image-msgpack.h
+++ b/libcrystfel/src/image-msgpack.h
@@ -3,11 +3,11 @@
*
* Image loading, MessagePack parts
*
- * Copyright © 2012-2020 Deutsches Elektronen-Synchrotron DESY,
+ * Copyright © 2012-2021 Deutsches Elektronen-Synchrotron DESY,
* a research centre of the Helmholtz Association.
*
* Authors:
- * 2020 Thomas White <taw@physics.org>
+ * 2020-2021 Thomas White <taw@physics.org>
*
* This file is part of CrystFEL.
*
@@ -33,27 +33,29 @@
#if defined(HAVE_MSGPACK)
-#include <msgpack.h>
-
extern struct image *image_msgpack_read(DataTemplate *dtempl,
- msgpack_object *obj,
+ void *data,
+ size_t data_size,
int no_image_data);
extern ImageFeatureList *image_msgpack_read_peaks(const DataTemplate *dtempl,
- msgpack_object *obj,
+ void *data,
+ size_t data_size,
int half_pixel_shift);
#else /* defined(HAVE_MSGPACK) */
static UNUSED struct image *image_msgpack_read(DataTemplate *dtempl,
- void *obj,
- int no_image_data)
+ void *data,
+ size_t data_size,
+ int no_image_data)
{
return NULL;
}
static UNUSED ImageFeatureList *image_msgpack_read_peaks(const DataTemplate *dtempl,
- void *obj,
+ void *data,
+ size_t data_size,
int half_pixel_shift)
{
return NULL;
diff --git a/src/im-sandbox.c b/src/im-sandbox.c
index 6615c418..eb0b942f 100644
--- a/src/im-sandbox.c
+++ b/src/im-sandbox.c
@@ -427,14 +427,16 @@ static int run_work(const struct index_args *iargs, Stream *st,
free(line);
- pargs.msgpack_obj = NULL;
+ pargs.zmq_data = NULL;
+ pargs.zmq_data_size = 0;
} else {
- pargs.msgpack_obj = im_zmq_fetch(zmqstuff);
+ pargs.zmq_data = im_zmq_fetch(zmqstuff,
+ &pargs.zmq_data_size);
pargs.filename = strdup("(from ZMQ)");
pargs.event = NULL;
- ser = 0; /* FIXME */
+ ser = 0; /* FIXME: Serial numbers from ZMQ? */
}
@@ -442,10 +444,7 @@ static int run_work(const struct index_args *iargs, Stream *st,
process_image(iargs, &pargs, st, cookie, tmpdir, ser,
sb->shared, taccs, sb->shared->last_task[cookie]);
- if ( sb->zmq ) {
- im_zmq_clean(zmqstuff);
- }
-
+ free(pargs.zmq_data);
}
im_zmq_shutdown(zmqstuff);
diff --git a/src/im-zmq.c b/src/im-zmq.c
index dea8515b..5c9e90bc 100644
--- a/src/im-zmq.c
+++ b/src/im-zmq.c
@@ -7,7 +7,7 @@
* a research centre of the Helmholtz Association.
*
* Authors:
- * 2018-2020 Thomas White <taw@physics.org>
+ * 2018-2021 Thomas White <taw@physics.org>
* 2014 Valerio Mariani
* 2017 Stijn de Graaf
*
@@ -38,7 +38,6 @@
#include <assert.h>
#include <unistd.h>
#include <zmq.h>
-#include <msgpack.h>
#include <image.h>
#include <utils.h>
@@ -53,8 +52,6 @@ struct im_zmq
void *ctx;
void *socket;
zmq_msg_t msg;
- msgpack_unpacked unpacked;
- int unpacked_set;
};
@@ -65,12 +62,10 @@ struct im_zmq *im_zmq_connect(const char *zmq_address)
z = malloc(sizeof(struct im_zmq));
if ( z == NULL ) return NULL;
- z->unpacked_set = 0;
-
z->ctx = zmq_ctx_new();
if ( z->ctx == NULL ) return NULL;
- z->socket = zmq_socket(z->ctx, ZMQ_REQ);
+ z->socket = zmq_socket(z->ctx, ZMQ_SUB);
if ( z->socket == NULL ) return NULL;
STATUS("Connecting to ZMQ at '%s'\n", zmq_address);
@@ -78,60 +73,45 @@ struct im_zmq *im_zmq_connect(const char *zmq_address)
ERROR("ZMQ connection failed: %s\n", zmq_strerror(errno));
return NULL;
}
- STATUS("ZMQ connected.\n");
+ STATUS("ZMQ connected. Subscribing to 'ondaframedata'\n");
+
+ if ( zmq_setsockopt(z->socket, ZMQ_SUBSCRIBE, "ondaframedata", 13) ) {
+ ERROR("ZMQ subscription failed: %s\n", zmq_strerror(errno));
+ return NULL;
+ }
return z;
}
-msgpack_object *im_zmq_fetch(struct im_zmq *z)
+void *im_zmq_fetch(struct im_zmq *z, size_t *pdata_size)
{
int msg_size;
- int r;
-
- if ( zmq_send(z->socket, "m", 1, 0) == -1 ) {
- ERROR("ZMQ message send failed: %s\n", zmq_strerror(errno));
- return NULL;
- }
+ void *data_copy;
zmq_msg_init(&z->msg);
+ STATUS("requesting data...\n");
msg_size = zmq_msg_recv(&z->msg, z->socket, 0);
+ STATUS("done (got %i bytes)\n", msg_size);
if ( msg_size == -1 ) {
ERROR("ZMQ recieve failed: %s\n", zmq_strerror(errno));
zmq_msg_close(&z->msg);
return NULL;
}
- msgpack_unpacked_init(&z->unpacked);
- r = msgpack_unpack_next(&z->unpacked, zmq_msg_data(&z->msg),
- msg_size, NULL);
- if ( r != MSGPACK_UNPACK_SUCCESS ) {
- ERROR("Msgpack unpack failed: %i\n", r);
- zmq_msg_close(&z->msg);
- return NULL;
- }
- z->unpacked_set = 1;
+ data_copy = malloc(msg_size);
+ if ( data_copy == NULL ) return NULL;
+ memcpy(data_copy, zmq_msg_data(&z->msg), msg_size);
- return &z->unpacked.data;
-}
-
-
-/* Clean structures ready for next frame */
-void im_zmq_clean(struct im_zmq *z)
-{
- if ( z->unpacked_set ) {
- msgpack_unpacked_destroy(&z->unpacked);
- zmq_msg_close(&z->msg);
- z->unpacked_set = 0;
- }
+ zmq_msg_close(&z->msg);
+ *pdata_size = msg_size;
+ return data_copy;
}
void im_zmq_shutdown(struct im_zmq *z)
{
if ( z == NULL ) return;
-
- zmq_msg_close(&z->msg);
zmq_close(z->socket);
zmq_ctx_destroy(z->ctx);
}
diff --git a/src/im-zmq.h b/src/im-zmq.h
index 87128895..c6bad6cc 100644
--- a/src/im-zmq.h
+++ b/src/im-zmq.h
@@ -7,7 +7,7 @@
* a research centre of the Helmholtz Association.
*
* Authors:
- * 2018-2019 Thomas White <taw@physics.org>
+ * 2018-2021 Thomas White <taw@physics.org>
* 2014 Valerio Mariani
* 2017 Stijn de Graaf
*
@@ -36,28 +36,18 @@
#include <config.h>
#endif
-#if defined(HAVE_MSGPACK) && defined(HAVE_ZMQ)
-
-#include <msgpack.h>
+#if defined(HAVE_ZMQ)
extern struct im_zmq *im_zmq_connect(const char *zmq_address);
-
-extern void im_zmq_clean(struct im_zmq *z);
-
extern void im_zmq_shutdown(struct im_zmq *z);
+extern void *im_zmq_fetch(struct im_zmq *z, size_t *pdata_size);
-extern msgpack_object *im_zmq_fetch(struct im_zmq *z);
-
-#else /* defined(HAVE_MSGPACK) && defined(HAVE_ZMQ) */
+#else /* defined(HAVE_ZMQ) */
static UNUSED struct im_zmq *im_zmq_connect(const char *zmq_address) { return NULL; }
+static UNUSED void im_zmq_shutdown(struct im_zmq *z) { }
+static UNUSED void *im_zmq_fetch(struct im_zmq *z, size_t *psize) { *psize = 0; return NULL; }
-static UNUSED void im_zmq_clean(struct im_zmq *z) { return; }
-
-static UNUSED void im_zmq_shutdown(struct im_zmq *z) { return; }
-
-static UNUSED void *im_zmq_fetch(struct im_zmq *z) { return NULL; }
-
-#endif /* defined(HAVE_MSGPACK) && defined(HAVE_ZMQ) */
+#endif /* defined(HAVE_ZMQ) */
#endif /* CRYSTFEL_ZMQ_H */
diff --git a/src/process_image.c b/src/process_image.c
index 825e57d9..52e0a9e0 100644
--- a/src/process_image.c
+++ b/src/process_image.c
@@ -193,10 +193,11 @@ void process_image(const struct index_args *iargs, struct pattern_args *pargs,
float **prefilter;
int any_crystals;
- if ( pargs->msgpack_obj != NULL ) {
+ if ( pargs->zmq_data != NULL ) {
set_last_task(last_task, "unpacking messagepack object");
image = image_msgpack_read(iargs->dtempl,
- pargs->msgpack_obj,
+ pargs->zmq_data,
+ pargs->zmq_data_size,
iargs->no_image_data);
if ( image == NULL ) return;
} else {
@@ -303,7 +304,8 @@ void process_image(const struct index_args *iargs, struct pattern_args *pargs,
case PEAK_MSGPACK:
image->features = image_msgpack_read_peaks(iargs->dtempl,
- pargs->msgpack_obj,
+ pargs->zmq_data,
+ pargs->zmq_data_size,
iargs->half_pixel_shift);
break;
diff --git a/src/process_image.h b/src/process_image.h
index e8be0c29..ec7e22e2 100644
--- a/src/process_image.h
+++ b/src/process_image.h
@@ -119,11 +119,8 @@ struct pattern_args
/* "Input" */
char *filename;
char *event;
-#ifdef HAVE_MSGPACK
- msgpack_object *msgpack_obj;
-#else
- void *msgpack_obj;
-#endif
+ void *zmq_data;
+ size_t zmq_data_size;
};