aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorThomas White <taw@physics.org>2021-05-05 17:07:13 +0200
committerThomas White <taw@physics.org>2021-05-05 17:14:04 +0200
commitce270ad7d8136aac47a802a9a72c011344f90527 (patch)
tree1df70574efad2ceb31edb43582ba7ff2c8a2c8ea /src
parente11394ce9133333af01afd88a0f484d6ea70665d (diff)
indexamajig: Add --zmq-request
This (re-)adds the ability to get data via a request/reply socket. See afcb7b568947c for when it was removed.
Diffstat (limited to 'src')
-rw-r--r--src/im-sandbox.c8
-rw-r--r--src/im-sandbox.h3
-rw-r--r--src/im-zmq.c63
-rw-r--r--src/im-zmq.h6
-rw-r--r--src/indexamajig.c16
5 files changed, 74 insertions, 22 deletions
diff --git a/src/im-sandbox.c b/src/im-sandbox.c
index ae1fe92a..4d40c461 100644
--- a/src/im-sandbox.c
+++ b/src/im-sandbox.c
@@ -102,6 +102,7 @@ struct sandbox
const char *zmq_address;
char **zmq_subscriptions;
int n_zmq_subscriptions;
+ const char *zmq_request;
/* Final output */
Stream *stream;
@@ -335,7 +336,8 @@ static int run_work(const struct index_args *iargs, Stream *st,
if ( sb->zmq ) {
zmqstuff = im_zmq_connect(sb->zmq_address,
sb->zmq_subscriptions,
- sb->n_zmq_subscriptions);
+ sb->n_zmq_subscriptions,
+ sb->zmq_request);
if ( zmqstuff == NULL ) {
ERROR("ZMQ setup failed.\n");
return 1;
@@ -1025,7 +1027,8 @@ int create_sandbox(struct index_args *iargs, int n_proc, char *prefix,
int config_basename, FILE *fh,
Stream *stream, const char *tmpdir, int serial_start,
const char *zmq_address, char **zmq_subscriptions,
- int n_zmq_subscriptions, int timeout, int profile)
+ int n_zmq_subscriptions, const char *zmq_request,
+ int timeout, int profile)
{
int i;
struct sandbox *sb;
@@ -1061,6 +1064,7 @@ int create_sandbox(struct index_args *iargs, int n_proc, char *prefix,
sb->zmq_address = zmq_address;
sb->zmq_subscriptions = zmq_subscriptions;
sb->n_zmq_subscriptions = n_zmq_subscriptions;
+ sb->zmq_request = zmq_request;
} else {
sb->zmq = 0;
}
diff --git a/src/im-sandbox.h b/src/im-sandbox.h
index 47d23a18..a76e69ec 100644
--- a/src/im-sandbox.h
+++ b/src/im-sandbox.h
@@ -86,6 +86,7 @@ extern int create_sandbox(struct index_args *iargs, int n_proc, char *prefix,
int config_basename, FILE *fh, Stream *stream,
const char *tempdir, int serial_start,
const char *zmq_address, char **zmq_subscriptions,
- int n_zmq_subscriptions, int timeout, int profile);
+ int n_zmq_subscriptions, const char *zmq_request,
+ int timeout, int profile);
#endif /* IM_SANDBOX_H */
diff --git a/src/im-zmq.c b/src/im-zmq.c
index be3bc544..c2d386cf 100644
--- a/src/im-zmq.c
+++ b/src/im-zmq.c
@@ -52,12 +52,14 @@ struct im_zmq
void *ctx;
void *socket;
zmq_msg_t msg;
+ const char *request_str;
};
struct im_zmq *im_zmq_connect(const char *zmq_address,
char **subscriptions,
- int n_subscriptions)
+ int n_subscriptions,
+ const char *zmq_request)
{
struct im_zmq *z;
int i;
@@ -68,30 +70,46 @@ struct im_zmq *im_zmq_connect(const char *zmq_address,
z->ctx = zmq_ctx_new();
if ( z->ctx == NULL ) return NULL;
- z->socket = zmq_socket(z->ctx, ZMQ_SUB);
+ if ( zmq_request == NULL ) {
+ STATUS("Connecting ZMQ subscriber to '%s'\n", zmq_address);
+ z->socket = zmq_socket(z->ctx, ZMQ_SUB);
+ } else {
+ STATUS("Connecting ZMQ requester to '%s'\n", zmq_address);
+ z->socket = zmq_socket(z->ctx, ZMQ_REQ);
+ }
if ( z->socket == NULL ) return NULL;
- STATUS("Connecting to ZMQ at '%s'\n", zmq_address);
if ( zmq_connect(z->socket, zmq_address) == -1 ) {
ERROR("ZMQ connection failed: %s\n", zmq_strerror(errno));
return NULL;
}
- if ( n_subscriptions == 0 ) {
- ERROR("WARNING: No ZeroMQ subscriptions. You should probably "
- "try again with --zmq-subscribe.\n");
- }
+ if ( zmq_request == NULL ) {
- for ( i=0; i<n_subscriptions; i++ ) {
- STATUS("Subscribing to '%s'\n", subscriptions[i]);
- if ( zmq_setsockopt(z->socket, ZMQ_SUBSCRIBE,
- subscriptions[i],
- strlen(subscriptions[i])) )
- {
- ERROR("ZMQ subscription failed: %s\n",
- zmq_strerror(errno));
- return NULL;
+ /* SUB mode */
+ if ( n_subscriptions == 0 ) {
+ ERROR("WARNING: No ZeroMQ subscriptions. You should "
+ "probably try again with --zmq-subscribe.\n");
}
+ for ( i=0; i<n_subscriptions; i++ ) {
+ STATUS("Subscribing to '%s'\n", subscriptions[i]);
+ if ( zmq_setsockopt(z->socket, ZMQ_SUBSCRIBE,
+ subscriptions[i],
+ strlen(subscriptions[i])) )
+ {
+ ERROR("ZMQ subscription failed: %s\n",
+ zmq_strerror(errno));
+ return NULL;
+ }
+ }
+
+ z->request_str = NULL;
+
+ } else {
+
+ /* REQ mode */
+ z->request_str = zmq_request;
+
}
return z;
@@ -103,6 +121,19 @@ void *im_zmq_fetch(struct im_zmq *z, size_t *pdata_size)
int msg_size;
void *data_copy;
+ if ( z->request_str != NULL ) {
+
+ /* Send the request */
+ if ( zmq_send(z->socket, z->request_str,
+ strlen(z->request_str), 0) == -1 )
+ {
+ ERROR("ZMQ message send failed: %s\n",
+ zmq_strerror(errno));
+ return NULL;
+ }
+ }
+
+ /* Receive message */
zmq_msg_init(&z->msg);
msg_size = zmq_msg_recv(&z->msg, z->socket, 0);
if ( msg_size == -1 ) {
diff --git a/src/im-zmq.h b/src/im-zmq.h
index 9cc36b48..557f42e3 100644
--- a/src/im-zmq.h
+++ b/src/im-zmq.h
@@ -40,7 +40,8 @@
extern struct im_zmq *im_zmq_connect(const char *zmq_address,
char **subscriptions,
- int n_subscriptions);
+ int n_subscriptions,
+ const char *zmq_request);
extern void im_zmq_shutdown(struct im_zmq *z);
extern void *im_zmq_fetch(struct im_zmq *z, size_t *pdata_size);
@@ -48,7 +49,8 @@ extern void *im_zmq_fetch(struct im_zmq *z, size_t *pdata_size);
static UNUSED struct im_zmq *im_zmq_connect(const char *zmq_address,
char *zmq_subscriptions,
- int n_subscriptions) { return NULL; }
+ int n_subscriptions,
+ const char *zmq_request) { return NULL; }
static UNUSED void im_zmq_shutdown(struct im_zmq *z) { }
static UNUSED void *im_zmq_fetch(struct im_zmq *z, size_t *psize) { *psize = 0; return NULL; }
diff --git a/src/indexamajig.c b/src/indexamajig.c
index 43709cf6..16822989 100644
--- a/src/indexamajig.c
+++ b/src/indexamajig.c
@@ -81,6 +81,7 @@ struct indexamajig_arguments
char *indm_str;
int basename;
char *zmq_addr;
+ char *zmq_request;
char *zmq_subscriptions[256];
int n_zmq_subscriptions;
int serial_start;
@@ -371,6 +372,10 @@ static error_t parse_arg(int key, char *arg, struct argp_state *state)
args->zmq_subscriptions[args->n_zmq_subscriptions++] = strdup(arg);
break;
+ case 212 :
+ args->zmq_request = strdup(arg);
+ break;
+
/* ---------- Peak search ---------- */
case 't' :
@@ -786,6 +791,7 @@ int main(int argc, char *argv[])
args.indm_str = NULL;
args.basename = 0;
args.zmq_addr = NULL;
+ args.zmq_request = NULL;
args.n_zmq_subscriptions = 0;
args.serial_start = 1;
args.if_peaks = 1;
@@ -892,6 +898,8 @@ int main(int argc, char *argv[])
{"no-mask-data", 210, NULL, OPTION_NO_USAGE, "Do not load mask data"},
{"zmq-subscribe", 211, "tag", OPTION_NO_USAGE, "Subscribe to ZMQ message"
"type"},
+ {"zmq-request", 212, "str", OPTION_NO_USAGE, "Request messages using"
+ "this string."},
{NULL, 0, 0, OPTION_DOC, "Peak search options:", 3},
{"peaks", 301, "method", 0, "Peak search method. Default: zaef"},
@@ -1026,6 +1034,12 @@ int main(int argc, char *argv[])
return 1;
}
+ if ( (args.zmq_request != NULL) && (args.n_zmq_subscriptions > 0) ) {
+ ERROR("The options --zmq-request and --zmq-subscribe are "
+ "mutually exclusive.\n");
+ return 1;
+ }
+
/* Open input */
if ( args.filename != NULL ) {
if ( strcmp(args.filename, "-") == 0 ) {
@@ -1243,7 +1257,7 @@ int main(int argc, char *argv[])
r = create_sandbox(&args.iargs, args.n_proc, args.prefix, args.basename,
fh, st, tmpdir, args.serial_start, args.zmq_addr,
args.zmq_subscriptions, args.n_zmq_subscriptions,
- timeout, args.profile);
+ args.zmq_request, timeout, args.profile);
cell_free(args.iargs.cell);
free(args.prefix);