aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorThomas White <taw@physics.org>2021-04-15 15:30:25 +0200
committerThomas White <taw@physics.org>2021-04-15 16:43:40 +0200
commitafcb7b568947c20fb3477a178be5aefe3203b603 (patch)
tree25d3d8a475bd4353083ce00f893e766525e1e625 /src
parent0dcd6c7e2fbfe78e4d2f26e01de0d4ea032d8fd6 (diff)
Separate ZMQ from MessagePack, switch to pub/sub socket
Indexamajig uses only ZMQ, to receive streaming data, while libcrystfel uses only msgpack to implement another type of image format. The two of these are eventually tied together in process_image, which does this: if ( have_zmq_data ) interpret_zmq_data_as_msgpack; - however, they would be easy to split up if we wanted to do something else (CBF data over ZMQ, anyone?). This commit also switches the ZMQ connector to use a pub/sub socket instead of a request/reply one. This matches changes in OnDA. At the moment, the MessagePack image reader simply dumps the object to disk.
Diffstat (limited to 'src')
-rw-r--r--src/im-sandbox.c13
-rw-r--r--src/im-zmq.c56
-rw-r--r--src/im-zmq.h24
-rw-r--r--src/process_image.c8
-rw-r--r--src/process_image.h7
5 files changed, 38 insertions, 70 deletions
diff --git a/src/im-sandbox.c b/src/im-sandbox.c
index 6615c418..eb0b942f 100644
--- a/src/im-sandbox.c
+++ b/src/im-sandbox.c
@@ -427,14 +427,16 @@ static int run_work(const struct index_args *iargs, Stream *st,
free(line);
- pargs.msgpack_obj = NULL;
+ pargs.zmq_data = NULL;
+ pargs.zmq_data_size = 0;
} else {
- pargs.msgpack_obj = im_zmq_fetch(zmqstuff);
+ pargs.zmq_data = im_zmq_fetch(zmqstuff,
+ &pargs.zmq_data_size);
pargs.filename = strdup("(from ZMQ)");
pargs.event = NULL;
- ser = 0; /* FIXME */
+ ser = 0; /* FIXME: Serial numbers from ZMQ? */
}
@@ -442,10 +444,7 @@ static int run_work(const struct index_args *iargs, Stream *st,
process_image(iargs, &pargs, st, cookie, tmpdir, ser,
sb->shared, taccs, sb->shared->last_task[cookie]);
- if ( sb->zmq ) {
- im_zmq_clean(zmqstuff);
- }
-
+ free(pargs.zmq_data);
}
im_zmq_shutdown(zmqstuff);
diff --git a/src/im-zmq.c b/src/im-zmq.c
index dea8515b..5c9e90bc 100644
--- a/src/im-zmq.c
+++ b/src/im-zmq.c
@@ -7,7 +7,7 @@
* a research centre of the Helmholtz Association.
*
* Authors:
- * 2018-2020 Thomas White <taw@physics.org>
+ * 2018-2021 Thomas White <taw@physics.org>
* 2014 Valerio Mariani
* 2017 Stijn de Graaf
*
@@ -38,7 +38,6 @@
#include <assert.h>
#include <unistd.h>
#include <zmq.h>
-#include <msgpack.h>
#include <image.h>
#include <utils.h>
@@ -53,8 +52,6 @@ struct im_zmq
void *ctx;
void *socket;
zmq_msg_t msg;
- msgpack_unpacked unpacked;
- int unpacked_set;
};
@@ -65,12 +62,10 @@ struct im_zmq *im_zmq_connect(const char *zmq_address)
z = malloc(sizeof(struct im_zmq));
if ( z == NULL ) return NULL;
- z->unpacked_set = 0;
-
z->ctx = zmq_ctx_new();
if ( z->ctx == NULL ) return NULL;
- z->socket = zmq_socket(z->ctx, ZMQ_REQ);
+ z->socket = zmq_socket(z->ctx, ZMQ_SUB);
if ( z->socket == NULL ) return NULL;
STATUS("Connecting to ZMQ at '%s'\n", zmq_address);
@@ -78,60 +73,45 @@ struct im_zmq *im_zmq_connect(const char *zmq_address)
ERROR("ZMQ connection failed: %s\n", zmq_strerror(errno));
return NULL;
}
- STATUS("ZMQ connected.\n");
+ STATUS("ZMQ connected. Subscribing to 'ondaframedata'\n");
+
+ if ( zmq_setsockopt(z->socket, ZMQ_SUBSCRIBE, "ondaframedata", 13) ) {
+ ERROR("ZMQ subscription failed: %s\n", zmq_strerror(errno));
+ return NULL;
+ }
return z;
}
-msgpack_object *im_zmq_fetch(struct im_zmq *z)
+void *im_zmq_fetch(struct im_zmq *z, size_t *pdata_size)
{
int msg_size;
- int r;
-
- if ( zmq_send(z->socket, "m", 1, 0) == -1 ) {
- ERROR("ZMQ message send failed: %s\n", zmq_strerror(errno));
- return NULL;
- }
+ void *data_copy;
zmq_msg_init(&z->msg);
+ STATUS("requesting data...\n");
msg_size = zmq_msg_recv(&z->msg, z->socket, 0);
+ STATUS("done (got %i bytes)\n", msg_size);
if ( msg_size == -1 ) {
ERROR("ZMQ recieve failed: %s\n", zmq_strerror(errno));
zmq_msg_close(&z->msg);
return NULL;
}
- msgpack_unpacked_init(&z->unpacked);
- r = msgpack_unpack_next(&z->unpacked, zmq_msg_data(&z->msg),
- msg_size, NULL);
- if ( r != MSGPACK_UNPACK_SUCCESS ) {
- ERROR("Msgpack unpack failed: %i\n", r);
- zmq_msg_close(&z->msg);
- return NULL;
- }
- z->unpacked_set = 1;
+ data_copy = malloc(msg_size);
+ if ( data_copy == NULL ) return NULL;
+ memcpy(data_copy, zmq_msg_data(&z->msg), msg_size);
- return &z->unpacked.data;
-}
-
-
-/* Clean structures ready for next frame */
-void im_zmq_clean(struct im_zmq *z)
-{
- if ( z->unpacked_set ) {
- msgpack_unpacked_destroy(&z->unpacked);
- zmq_msg_close(&z->msg);
- z->unpacked_set = 0;
- }
+ zmq_msg_close(&z->msg);
+ *pdata_size = msg_size;
+ return data_copy;
}
void im_zmq_shutdown(struct im_zmq *z)
{
if ( z == NULL ) return;
-
- zmq_msg_close(&z->msg);
zmq_close(z->socket);
zmq_ctx_destroy(z->ctx);
}
diff --git a/src/im-zmq.h b/src/im-zmq.h
index 87128895..c6bad6cc 100644
--- a/src/im-zmq.h
+++ b/src/im-zmq.h
@@ -7,7 +7,7 @@
* a research centre of the Helmholtz Association.
*
* Authors:
- * 2018-2019 Thomas White <taw@physics.org>
+ * 2018-2021 Thomas White <taw@physics.org>
* 2014 Valerio Mariani
* 2017 Stijn de Graaf
*
@@ -36,28 +36,18 @@
#include <config.h>
#endif
-#if defined(HAVE_MSGPACK) && defined(HAVE_ZMQ)
-
-#include <msgpack.h>
+#if defined(HAVE_ZMQ)
extern struct im_zmq *im_zmq_connect(const char *zmq_address);
-
-extern void im_zmq_clean(struct im_zmq *z);
-
extern void im_zmq_shutdown(struct im_zmq *z);
+extern void *im_zmq_fetch(struct im_zmq *z, size_t *pdata_size);
-extern msgpack_object *im_zmq_fetch(struct im_zmq *z);
-
-#else /* defined(HAVE_MSGPACK) && defined(HAVE_ZMQ) */
+#else /* defined(HAVE_ZMQ) */
static UNUSED struct im_zmq *im_zmq_connect(const char *zmq_address) { return NULL; }
+static UNUSED void im_zmq_shutdown(struct im_zmq *z) { }
+static UNUSED void *im_zmq_fetch(struct im_zmq *z, size_t *psize) { *psize = 0; return NULL; }
-static UNUSED void im_zmq_clean(struct im_zmq *z) { return; }
-
-static UNUSED void im_zmq_shutdown(struct im_zmq *z) { return; }
-
-static UNUSED void *im_zmq_fetch(struct im_zmq *z) { return NULL; }
-
-#endif /* defined(HAVE_MSGPACK) && defined(HAVE_ZMQ) */
+#endif /* defined(HAVE_ZMQ) */
#endif /* CRYSTFEL_ZMQ_H */
diff --git a/src/process_image.c b/src/process_image.c
index 825e57d9..52e0a9e0 100644
--- a/src/process_image.c
+++ b/src/process_image.c
@@ -193,10 +193,11 @@ void process_image(const struct index_args *iargs, struct pattern_args *pargs,
float **prefilter;
int any_crystals;
- if ( pargs->msgpack_obj != NULL ) {
+ if ( pargs->zmq_data != NULL ) {
set_last_task(last_task, "unpacking messagepack object");
image = image_msgpack_read(iargs->dtempl,
- pargs->msgpack_obj,
+ pargs->zmq_data,
+ pargs->zmq_data_size,
iargs->no_image_data);
if ( image == NULL ) return;
} else {
@@ -303,7 +304,8 @@ void process_image(const struct index_args *iargs, struct pattern_args *pargs,
case PEAK_MSGPACK:
image->features = image_msgpack_read_peaks(iargs->dtempl,
- pargs->msgpack_obj,
+ pargs->zmq_data,
+ pargs->zmq_data_size,
iargs->half_pixel_shift);
break;
diff --git a/src/process_image.h b/src/process_image.h
index e8be0c29..ec7e22e2 100644
--- a/src/process_image.h
+++ b/src/process_image.h
@@ -119,11 +119,8 @@ struct pattern_args
/* "Input" */
char *filename;
char *event;
-#ifdef HAVE_MSGPACK
- msgpack_object *msgpack_obj;
-#else
- void *msgpack_obj;
-#endif
+ void *zmq_data;
+ size_t zmq_data_size;
};