From ce270ad7d8136aac47a802a9a72c011344f90527 Mon Sep 17 00:00:00 2001 From: Thomas White Date: Wed, 5 May 2021 17:07:13 +0200 Subject: indexamajig: Add --zmq-request This (re-)adds the ability to get data via a request/reply socket. See afcb7b568947c for when it was removed. --- src/im-sandbox.c | 8 +++++-- src/im-sandbox.h | 3 ++- src/im-zmq.c | 63 +++++++++++++++++++++++++++++++++++++++++-------------- src/im-zmq.h | 6 ++++-- src/indexamajig.c | 16 +++++++++++++- 5 files changed, 74 insertions(+), 22 deletions(-) (limited to 'src') diff --git a/src/im-sandbox.c b/src/im-sandbox.c index ae1fe92a..4d40c461 100644 --- a/src/im-sandbox.c +++ b/src/im-sandbox.c @@ -102,6 +102,7 @@ struct sandbox const char *zmq_address; char **zmq_subscriptions; int n_zmq_subscriptions; + const char *zmq_request; /* Final output */ Stream *stream; @@ -335,7 +336,8 @@ static int run_work(const struct index_args *iargs, Stream *st, if ( sb->zmq ) { zmqstuff = im_zmq_connect(sb->zmq_address, sb->zmq_subscriptions, - sb->n_zmq_subscriptions); + sb->n_zmq_subscriptions, + sb->zmq_request); if ( zmqstuff == NULL ) { ERROR("ZMQ setup failed.\n"); return 1; @@ -1025,7 +1027,8 @@ int create_sandbox(struct index_args *iargs, int n_proc, char *prefix, int config_basename, FILE *fh, Stream *stream, const char *tmpdir, int serial_start, const char *zmq_address, char **zmq_subscriptions, - int n_zmq_subscriptions, int timeout, int profile) + int n_zmq_subscriptions, const char *zmq_request, + int timeout, int profile) { int i; struct sandbox *sb; @@ -1061,6 +1064,7 @@ int create_sandbox(struct index_args *iargs, int n_proc, char *prefix, sb->zmq_address = zmq_address; sb->zmq_subscriptions = zmq_subscriptions; sb->n_zmq_subscriptions = n_zmq_subscriptions; + sb->zmq_request = zmq_request; } else { sb->zmq = 0; } diff --git a/src/im-sandbox.h b/src/im-sandbox.h index 47d23a18..a76e69ec 100644 --- a/src/im-sandbox.h +++ b/src/im-sandbox.h @@ -86,6 +86,7 @@ extern int create_sandbox(struct index_args *iargs, int n_proc, char *prefix, int config_basename, FILE *fh, Stream *stream, const char *tempdir, int serial_start, const char *zmq_address, char **zmq_subscriptions, - int n_zmq_subscriptions, int timeout, int profile); + int n_zmq_subscriptions, const char *zmq_request, + int timeout, int profile); #endif /* IM_SANDBOX_H */ diff --git a/src/im-zmq.c b/src/im-zmq.c index be3bc544..c2d386cf 100644 --- a/src/im-zmq.c +++ b/src/im-zmq.c @@ -52,12 +52,14 @@ struct im_zmq void *ctx; void *socket; zmq_msg_t msg; + const char *request_str; }; struct im_zmq *im_zmq_connect(const char *zmq_address, char **subscriptions, - int n_subscriptions) + int n_subscriptions, + const char *zmq_request) { struct im_zmq *z; int i; @@ -68,30 +70,46 @@ struct im_zmq *im_zmq_connect(const char *zmq_address, z->ctx = zmq_ctx_new(); if ( z->ctx == NULL ) return NULL; - z->socket = zmq_socket(z->ctx, ZMQ_SUB); + if ( zmq_request == NULL ) { + STATUS("Connecting ZMQ subscriber to '%s'\n", zmq_address); + z->socket = zmq_socket(z->ctx, ZMQ_SUB); + } else { + STATUS("Connecting ZMQ requester to '%s'\n", zmq_address); + z->socket = zmq_socket(z->ctx, ZMQ_REQ); + } if ( z->socket == NULL ) return NULL; - STATUS("Connecting to ZMQ at '%s'\n", zmq_address); if ( zmq_connect(z->socket, zmq_address) == -1 ) { ERROR("ZMQ connection failed: %s\n", zmq_strerror(errno)); return NULL; } - if ( n_subscriptions == 0 ) { - ERROR("WARNING: No ZeroMQ subscriptions. You should probably " - "try again with --zmq-subscribe.\n"); - } + if ( zmq_request == NULL ) { - for ( i=0; isocket, ZMQ_SUBSCRIBE, - subscriptions[i], - strlen(subscriptions[i])) ) - { - ERROR("ZMQ subscription failed: %s\n", - zmq_strerror(errno)); - return NULL; + /* SUB mode */ + if ( n_subscriptions == 0 ) { + ERROR("WARNING: No ZeroMQ subscriptions. You should " + "probably try again with --zmq-subscribe.\n"); } + for ( i=0; isocket, ZMQ_SUBSCRIBE, + subscriptions[i], + strlen(subscriptions[i])) ) + { + ERROR("ZMQ subscription failed: %s\n", + zmq_strerror(errno)); + return NULL; + } + } + + z->request_str = NULL; + + } else { + + /* REQ mode */ + z->request_str = zmq_request; + } return z; @@ -103,6 +121,19 @@ void *im_zmq_fetch(struct im_zmq *z, size_t *pdata_size) int msg_size; void *data_copy; + if ( z->request_str != NULL ) { + + /* Send the request */ + if ( zmq_send(z->socket, z->request_str, + strlen(z->request_str), 0) == -1 ) + { + ERROR("ZMQ message send failed: %s\n", + zmq_strerror(errno)); + return NULL; + } + } + + /* Receive message */ zmq_msg_init(&z->msg); msg_size = zmq_msg_recv(&z->msg, z->socket, 0); if ( msg_size == -1 ) { diff --git a/src/im-zmq.h b/src/im-zmq.h index 9cc36b48..557f42e3 100644 --- a/src/im-zmq.h +++ b/src/im-zmq.h @@ -40,7 +40,8 @@ extern struct im_zmq *im_zmq_connect(const char *zmq_address, char **subscriptions, - int n_subscriptions); + int n_subscriptions, + const char *zmq_request); extern void im_zmq_shutdown(struct im_zmq *z); extern void *im_zmq_fetch(struct im_zmq *z, size_t *pdata_size); @@ -48,7 +49,8 @@ extern void *im_zmq_fetch(struct im_zmq *z, size_t *pdata_size); static UNUSED struct im_zmq *im_zmq_connect(const char *zmq_address, char *zmq_subscriptions, - int n_subscriptions) { return NULL; } + int n_subscriptions, + const char *zmq_request) { return NULL; } static UNUSED void im_zmq_shutdown(struct im_zmq *z) { } static UNUSED void *im_zmq_fetch(struct im_zmq *z, size_t *psize) { *psize = 0; return NULL; } diff --git a/src/indexamajig.c b/src/indexamajig.c index 43709cf6..16822989 100644 --- a/src/indexamajig.c +++ b/src/indexamajig.c @@ -81,6 +81,7 @@ struct indexamajig_arguments char *indm_str; int basename; char *zmq_addr; + char *zmq_request; char *zmq_subscriptions[256]; int n_zmq_subscriptions; int serial_start; @@ -371,6 +372,10 @@ static error_t parse_arg(int key, char *arg, struct argp_state *state) args->zmq_subscriptions[args->n_zmq_subscriptions++] = strdup(arg); break; + case 212 : + args->zmq_request = strdup(arg); + break; + /* ---------- Peak search ---------- */ case 't' : @@ -786,6 +791,7 @@ int main(int argc, char *argv[]) args.indm_str = NULL; args.basename = 0; args.zmq_addr = NULL; + args.zmq_request = NULL; args.n_zmq_subscriptions = 0; args.serial_start = 1; args.if_peaks = 1; @@ -892,6 +898,8 @@ int main(int argc, char *argv[]) {"no-mask-data", 210, NULL, OPTION_NO_USAGE, "Do not load mask data"}, {"zmq-subscribe", 211, "tag", OPTION_NO_USAGE, "Subscribe to ZMQ message" "type"}, + {"zmq-request", 212, "str", OPTION_NO_USAGE, "Request messages using" + "this string."}, {NULL, 0, 0, OPTION_DOC, "Peak search options:", 3}, {"peaks", 301, "method", 0, "Peak search method. Default: zaef"}, @@ -1026,6 +1034,12 @@ int main(int argc, char *argv[]) return 1; } + if ( (args.zmq_request != NULL) && (args.n_zmq_subscriptions > 0) ) { + ERROR("The options --zmq-request and --zmq-subscribe are " + "mutually exclusive.\n"); + return 1; + } + /* Open input */ if ( args.filename != NULL ) { if ( strcmp(args.filename, "-") == 0 ) { @@ -1243,7 +1257,7 @@ int main(int argc, char *argv[]) r = create_sandbox(&args.iargs, args.n_proc, args.prefix, args.basename, fh, st, tmpdir, args.serial_start, args.zmq_addr, args.zmq_subscriptions, args.n_zmq_subscriptions, - timeout, args.profile); + args.zmq_request, timeout, args.profile); cell_free(args.iargs.cell); free(args.prefix); -- cgit v1.2.3