aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/im-sandbox.c13
-rw-r--r--src/im-zmq.c56
-rw-r--r--src/im-zmq.h24
-rw-r--r--src/process_image.c8
-rw-r--r--src/process_image.h7
5 files changed, 38 insertions, 70 deletions
diff --git a/src/im-sandbox.c b/src/im-sandbox.c
index 6615c418..eb0b942f 100644
--- a/src/im-sandbox.c
+++ b/src/im-sandbox.c
@@ -427,14 +427,16 @@ static int run_work(const struct index_args *iargs, Stream *st,
free(line);
- pargs.msgpack_obj = NULL;
+ pargs.zmq_data = NULL;
+ pargs.zmq_data_size = 0;
} else {
- pargs.msgpack_obj = im_zmq_fetch(zmqstuff);
+ pargs.zmq_data = im_zmq_fetch(zmqstuff,
+ &pargs.zmq_data_size);
pargs.filename = strdup("(from ZMQ)");
pargs.event = NULL;
- ser = 0; /* FIXME */
+ ser = 0; /* FIXME: Serial numbers from ZMQ? */
}
@@ -442,10 +444,7 @@ static int run_work(const struct index_args *iargs, Stream *st,
process_image(iargs, &pargs, st, cookie, tmpdir, ser,
sb->shared, taccs, sb->shared->last_task[cookie]);
- if ( sb->zmq ) {
- im_zmq_clean(zmqstuff);
- }
-
+ free(pargs.zmq_data);
}
im_zmq_shutdown(zmqstuff);
diff --git a/src/im-zmq.c b/src/im-zmq.c
index dea8515b..5c9e90bc 100644
--- a/src/im-zmq.c
+++ b/src/im-zmq.c
@@ -7,7 +7,7 @@
* a research centre of the Helmholtz Association.
*
* Authors:
- * 2018-2020 Thomas White <taw@physics.org>
+ * 2018-2021 Thomas White <taw@physics.org>
* 2014 Valerio Mariani
* 2017 Stijn de Graaf
*
@@ -38,7 +38,6 @@
#include <assert.h>
#include <unistd.h>
#include <zmq.h>
-#include <msgpack.h>
#include <image.h>
#include <utils.h>
@@ -53,8 +52,6 @@ struct im_zmq
void *ctx;
void *socket;
zmq_msg_t msg;
- msgpack_unpacked unpacked;
- int unpacked_set;
};
@@ -65,12 +62,10 @@ struct im_zmq *im_zmq_connect(const char *zmq_address)
z = malloc(sizeof(struct im_zmq));
if ( z == NULL ) return NULL;
- z->unpacked_set = 0;
-
z->ctx = zmq_ctx_new();
if ( z->ctx == NULL ) return NULL;
- z->socket = zmq_socket(z->ctx, ZMQ_REQ);
+ z->socket = zmq_socket(z->ctx, ZMQ_SUB);
if ( z->socket == NULL ) return NULL;
STATUS("Connecting to ZMQ at '%s'\n", zmq_address);
@@ -78,60 +73,45 @@ struct im_zmq *im_zmq_connect(const char *zmq_address)
ERROR("ZMQ connection failed: %s\n", zmq_strerror(errno));
return NULL;
}
- STATUS("ZMQ connected.\n");
+ STATUS("ZMQ connected. Subscribing to 'ondaframedata'\n");
+
+ if ( zmq_setsockopt(z->socket, ZMQ_SUBSCRIBE, "ondaframedata", 13) ) {
+ ERROR("ZMQ subscription failed: %s\n", zmq_strerror(errno));
+ return NULL;
+ }
return z;
}
-msgpack_object *im_zmq_fetch(struct im_zmq *z)
+void *im_zmq_fetch(struct im_zmq *z, size_t *pdata_size)
{
int msg_size;
- int r;
-
- if ( zmq_send(z->socket, "m", 1, 0) == -1 ) {
- ERROR("ZMQ message send failed: %s\n", zmq_strerror(errno));
- return NULL;
- }
+ void *data_copy;
zmq_msg_init(&z->msg);
+ STATUS("requesting data...\n");
msg_size = zmq_msg_recv(&z->msg, z->socket, 0);
+ STATUS("done (got %i bytes)\n", msg_size);
if ( msg_size == -1 ) {
ERROR("ZMQ recieve failed: %s\n", zmq_strerror(errno));
zmq_msg_close(&z->msg);
return NULL;
}
- msgpack_unpacked_init(&z->unpacked);
- r = msgpack_unpack_next(&z->unpacked, zmq_msg_data(&z->msg),
- msg_size, NULL);
- if ( r != MSGPACK_UNPACK_SUCCESS ) {
- ERROR("Msgpack unpack failed: %i\n", r);
- zmq_msg_close(&z->msg);
- return NULL;
- }
- z->unpacked_set = 1;
+ data_copy = malloc(msg_size);
+ if ( data_copy == NULL ) return NULL;
+ memcpy(data_copy, zmq_msg_data(&z->msg), msg_size);
- return &z->unpacked.data;
-}
-
-
-/* Clean structures ready for next frame */
-void im_zmq_clean(struct im_zmq *z)
-{
- if ( z->unpacked_set ) {
- msgpack_unpacked_destroy(&z->unpacked);
- zmq_msg_close(&z->msg);
- z->unpacked_set = 0;
- }
+ zmq_msg_close(&z->msg);
+ *pdata_size = msg_size;
+ return data_copy;
}
void im_zmq_shutdown(struct im_zmq *z)
{
if ( z == NULL ) return;
-
- zmq_msg_close(&z->msg);
zmq_close(z->socket);
zmq_ctx_destroy(z->ctx);
}
diff --git a/src/im-zmq.h b/src/im-zmq.h
index 87128895..c6bad6cc 100644
--- a/src/im-zmq.h
+++ b/src/im-zmq.h
@@ -7,7 +7,7 @@
* a research centre of the Helmholtz Association.
*
* Authors:
- * 2018-2019 Thomas White <taw@physics.org>
+ * 2018-2021 Thomas White <taw@physics.org>
* 2014 Valerio Mariani
* 2017 Stijn de Graaf
*
@@ -36,28 +36,18 @@
#include <config.h>
#endif
-#if defined(HAVE_MSGPACK) && defined(HAVE_ZMQ)
-
-#include <msgpack.h>
+#if defined(HAVE_ZMQ)
extern struct im_zmq *im_zmq_connect(const char *zmq_address);
-
-extern void im_zmq_clean(struct im_zmq *z);
-
extern void im_zmq_shutdown(struct im_zmq *z);
+extern void *im_zmq_fetch(struct im_zmq *z, size_t *pdata_size);
-extern msgpack_object *im_zmq_fetch(struct im_zmq *z);
-
-#else /* defined(HAVE_MSGPACK) && defined(HAVE_ZMQ) */
+#else /* defined(HAVE_ZMQ) */
static UNUSED struct im_zmq *im_zmq_connect(const char *zmq_address) { return NULL; }
+static UNUSED void im_zmq_shutdown(struct im_zmq *z) { }
+static UNUSED void *im_zmq_fetch(struct im_zmq *z, size_t *psize) { *psize = 0; return NULL; }
-static UNUSED void im_zmq_clean(struct im_zmq *z) { return; }
-
-static UNUSED void im_zmq_shutdown(struct im_zmq *z) { return; }
-
-static UNUSED void *im_zmq_fetch(struct im_zmq *z) { return NULL; }
-
-#endif /* defined(HAVE_MSGPACK) && defined(HAVE_ZMQ) */
+#endif /* defined(HAVE_ZMQ) */
#endif /* CRYSTFEL_ZMQ_H */
diff --git a/src/process_image.c b/src/process_image.c
index 825e57d9..52e0a9e0 100644
--- a/src/process_image.c
+++ b/src/process_image.c
@@ -193,10 +193,11 @@ void process_image(const struct index_args *iargs, struct pattern_args *pargs,
float **prefilter;
int any_crystals;
- if ( pargs->msgpack_obj != NULL ) {
+ if ( pargs->zmq_data != NULL ) {
set_last_task(last_task, "unpacking messagepack object");
image = image_msgpack_read(iargs->dtempl,
- pargs->msgpack_obj,
+ pargs->zmq_data,
+ pargs->zmq_data_size,
iargs->no_image_data);
if ( image == NULL ) return;
} else {
@@ -303,7 +304,8 @@ void process_image(const struct index_args *iargs, struct pattern_args *pargs,
case PEAK_MSGPACK:
image->features = image_msgpack_read_peaks(iargs->dtempl,
- pargs->msgpack_obj,
+ pargs->zmq_data,
+ pargs->zmq_data_size,
iargs->half_pixel_shift);
break;
diff --git a/src/process_image.h b/src/process_image.h
index e8be0c29..ec7e22e2 100644
--- a/src/process_image.h
+++ b/src/process_image.h
@@ -119,11 +119,8 @@ struct pattern_args
/* "Input" */
char *filename;
char *event;
-#ifdef HAVE_MSGPACK
- msgpack_object *msgpack_obj;
-#else
- void *msgpack_obj;
-#endif
+ void *zmq_data;
+ size_t zmq_data_size;
};