From 2f4f6ad97467a62ed8bf5cb44040548b89493c0b Mon Sep 17 00:00:00 2001 From: Thomas White Date: Thu, 22 Apr 2021 15:53:56 +0200 Subject: indexamajig: Add --zmq-subscribe --- src/im-sandbox.c | 11 +++++++++-- src/im-sandbox.h | 3 ++- src/im-zmq.c | 24 +++++++++++++++++++----- src/im-zmq.h | 8 ++++++-- src/indexamajig.c | 13 +++++++++++++ 5 files changed, 49 insertions(+), 10 deletions(-) (limited to 'src') diff --git a/src/im-sandbox.c b/src/im-sandbox.c index 22e2c05f..3f689a0d 100644 --- a/src/im-sandbox.c +++ b/src/im-sandbox.c @@ -100,6 +100,8 @@ struct sandbox /* ZMQ mode */ int zmq; const char *zmq_address; + char **zmq_subscriptions; + int n_zmq_subscriptions; /* Final output */ Stream *stream; @@ -331,7 +333,9 @@ static int run_work(const struct index_args *iargs, Stream *st, /* Connect via ZMQ */ if ( sb->zmq ) { - zmqstuff = im_zmq_connect(sb->zmq_address); + zmqstuff = im_zmq_connect(sb->zmq_address, + sb->zmq_subscriptions, + sb->n_zmq_subscriptions); if ( zmqstuff == NULL ) { ERROR("ZMQ setup failed.\n"); return 1; @@ -1013,7 +1017,8 @@ char *create_tempdir(const char *temp_location) int create_sandbox(struct index_args *iargs, int n_proc, char *prefix, int config_basename, FILE *fh, Stream *stream, const char *tmpdir, int serial_start, - const char *zmq_address, int timeout, int profile) + const char *zmq_address, char **zmq_subscriptions, + int n_zmq_subscriptions, int timeout, int profile) { int i; struct sandbox *sb; @@ -1047,6 +1052,8 @@ int create_sandbox(struct index_args *iargs, int n_proc, char *prefix, if ( zmq_address != NULL ) { sb->zmq = 1; sb->zmq_address = zmq_address; + sb->zmq_subscriptions = zmq_subscriptions; + sb->n_zmq_subscriptions = n_zmq_subscriptions; } else { sb->zmq = 0; } diff --git a/src/im-sandbox.h b/src/im-sandbox.h index 2e006be4..47d23a18 100644 --- a/src/im-sandbox.h +++ b/src/im-sandbox.h @@ -85,6 +85,7 @@ extern void set_last_task(char *lt, const char *task); extern int create_sandbox(struct index_args *iargs, int n_proc, char *prefix, int config_basename, FILE *fh, Stream *stream, const char *tempdir, int serial_start, - const char *zmq_address, int timeout, int profile); + const char *zmq_address, char **zmq_subscriptions, + int n_zmq_subscriptions, int timeout, int profile); #endif /* IM_SANDBOX_H */ diff --git a/src/im-zmq.c b/src/im-zmq.c index 2f489811..be3bc544 100644 --- a/src/im-zmq.c +++ b/src/im-zmq.c @@ -55,9 +55,12 @@ struct im_zmq }; -struct im_zmq *im_zmq_connect(const char *zmq_address) +struct im_zmq *im_zmq_connect(const char *zmq_address, + char **subscriptions, + int n_subscriptions) { struct im_zmq *z; + int i; z = malloc(sizeof(struct im_zmq)); if ( z == NULL ) return NULL; @@ -73,11 +76,22 @@ struct im_zmq *im_zmq_connect(const char *zmq_address) ERROR("ZMQ connection failed: %s\n", zmq_strerror(errno)); return NULL; } - STATUS("ZMQ connected. Subscribing to 'ondaframedata'\n"); - if ( zmq_setsockopt(z->socket, ZMQ_SUBSCRIBE, "ondaframedata", 13) ) { - ERROR("ZMQ subscription failed: %s\n", zmq_strerror(errno)); - return NULL; + if ( n_subscriptions == 0 ) { + ERROR("WARNING: No ZeroMQ subscriptions. You should probably " + "try again with --zmq-subscribe.\n"); + } + + for ( i=0; isocket, ZMQ_SUBSCRIBE, + subscriptions[i], + strlen(subscriptions[i])) ) + { + ERROR("ZMQ subscription failed: %s\n", + zmq_strerror(errno)); + return NULL; + } } return z; diff --git a/src/im-zmq.h b/src/im-zmq.h index c6bad6cc..9cc36b48 100644 --- a/src/im-zmq.h +++ b/src/im-zmq.h @@ -38,13 +38,17 @@ #if defined(HAVE_ZMQ) -extern struct im_zmq *im_zmq_connect(const char *zmq_address); +extern struct im_zmq *im_zmq_connect(const char *zmq_address, + char **subscriptions, + int n_subscriptions); extern void im_zmq_shutdown(struct im_zmq *z); extern void *im_zmq_fetch(struct im_zmq *z, size_t *pdata_size); #else /* defined(HAVE_ZMQ) */ -static UNUSED struct im_zmq *im_zmq_connect(const char *zmq_address) { return NULL; } +static UNUSED struct im_zmq *im_zmq_connect(const char *zmq_address, + char *zmq_subscriptions, + int n_subscriptions) { return NULL; } static UNUSED void im_zmq_shutdown(struct im_zmq *z) { } static UNUSED void *im_zmq_fetch(struct im_zmq *z, size_t *psize) { *psize = 0; return NULL; } diff --git a/src/indexamajig.c b/src/indexamajig.c index 27071256..524022b2 100644 --- a/src/indexamajig.c +++ b/src/indexamajig.c @@ -81,6 +81,8 @@ struct indexamajig_arguments char *indm_str; int basename; char *zmq_addr; + char *zmq_subscriptions[256]; + int n_zmq_subscriptions; int serial_start; char *temp_location; int if_refine; @@ -361,6 +363,13 @@ static error_t parse_arg(int key, char *arg, struct argp_state *state) args->iargs.no_mask_data = 1; break; + case 211 : + if ( args->n_zmq_subscriptions == 256 ) { + ERROR("Too many ZMQ subscriptions.\n"); + return 1; + } + args->zmq_subscriptions[args->n_zmq_subscriptions++] = strdup(arg); + /* ---------- Peak search ---------- */ case 't' : @@ -776,6 +785,7 @@ int main(int argc, char *argv[]) args.indm_str = NULL; args.basename = 0; args.zmq_addr = NULL; + args.n_zmq_subscriptions = 0; args.serial_start = 1; args.if_peaks = 1; args.if_multi = 0; @@ -879,6 +889,8 @@ int main(int argc, char *argv[]) {"spectrum-file", 209, "fn", OPTION_NO_USAGE | OPTION_HIDDEN, "File containing radiation spectrum"}, {"no-mask-data", 210, NULL, OPTION_NO_USAGE, "Do not load mask data"}, + {"zmq-subscribe", 211, "tag", OPTION_NO_USAGE, "Subscribe to ZMQ message" + "type"}, {NULL, 0, 0, OPTION_DOC, "Peak search options:", 3}, {"peaks", 301, "method", 0, "Peak search method. Default: zaef"}, @@ -1229,6 +1241,7 @@ int main(int argc, char *argv[]) r = create_sandbox(&args.iargs, args.n_proc, args.prefix, args.basename, fh, st, tmpdir, args.serial_start, args.zmq_addr, + args.zmq_subscriptions, args.n_zmq_subscriptions, timeout, args.profile); cell_free(args.iargs.cell); -- cgit v1.2.3