aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--doc/man/indexamajig.17
-rw-r--r--src/im-sandbox.c11
-rw-r--r--src/im-sandbox.h3
-rw-r--r--src/im-zmq.c24
-rw-r--r--src/im-zmq.h8
-rw-r--r--src/indexamajig.c13
6 files changed, 56 insertions, 10 deletions
diff --git a/doc/man/indexamajig.1 b/doc/man/indexamajig.1
index d3077516..ba8205a4 100644
--- a/doc/man/indexamajig.1
+++ b/doc/man/indexamajig.1
@@ -180,6 +180,13 @@ Read the detector geometry description from \fIfilename\fR. See \fBman crystfel
.IP \fB--zmq-input=\fIaddress\fR
.PD
Receive data over ZeroMQ from \fIaddress\fR. In this version, the ZeroMQ data will be assumed to be serialised with MsgPack, but other formats might be added in future. The options \fB--input\fR and \fB--zmq-input\fR are mutually exclusive - you must specify exactly one of them. Example: \fB--zmq-input=tcp://127.0.0.1:5002\fR.
+.IP
+If you use this option, you should also use \fB--zmq-subscribe\fR to add a ZeroMQ subscription.
+
+.PD 0
+.IP \fB--zmq-subscribe=\fItag\fR
+.PD
+Subscribe to ZeroMQ message type \fItag\fR. You can use this option multiple times to add multiple subscriptions.
.PD 0
.IP \fB--basename\fR
diff --git a/src/im-sandbox.c b/src/im-sandbox.c
index 22e2c05f..3f689a0d 100644
--- a/src/im-sandbox.c
+++ b/src/im-sandbox.c
@@ -100,6 +100,8 @@ struct sandbox
/* ZMQ mode */
int zmq;
const char *zmq_address;
+ char **zmq_subscriptions;
+ int n_zmq_subscriptions;
/* Final output */
Stream *stream;
@@ -331,7 +333,9 @@ static int run_work(const struct index_args *iargs, Stream *st,
/* Connect via ZMQ */
if ( sb->zmq ) {
- zmqstuff = im_zmq_connect(sb->zmq_address);
+ zmqstuff = im_zmq_connect(sb->zmq_address,
+ sb->zmq_subscriptions,
+ sb->n_zmq_subscriptions);
if ( zmqstuff == NULL ) {
ERROR("ZMQ setup failed.\n");
return 1;
@@ -1013,7 +1017,8 @@ char *create_tempdir(const char *temp_location)
int create_sandbox(struct index_args *iargs, int n_proc, char *prefix,
int config_basename, FILE *fh,
Stream *stream, const char *tmpdir, int serial_start,
- const char *zmq_address, int timeout, int profile)
+ const char *zmq_address, char **zmq_subscriptions,
+ int n_zmq_subscriptions, int timeout, int profile)
{
int i;
struct sandbox *sb;
@@ -1047,6 +1052,8 @@ int create_sandbox(struct index_args *iargs, int n_proc, char *prefix,
if ( zmq_address != NULL ) {
sb->zmq = 1;
sb->zmq_address = zmq_address;
+ sb->zmq_subscriptions = zmq_subscriptions;
+ sb->n_zmq_subscriptions = n_zmq_subscriptions;
} else {
sb->zmq = 0;
}
diff --git a/src/im-sandbox.h b/src/im-sandbox.h
index 2e006be4..47d23a18 100644
--- a/src/im-sandbox.h
+++ b/src/im-sandbox.h
@@ -85,6 +85,7 @@ extern void set_last_task(char *lt, const char *task);
extern int create_sandbox(struct index_args *iargs, int n_proc, char *prefix,
int config_basename, FILE *fh, Stream *stream,
const char *tempdir, int serial_start,
- const char *zmq_address, int timeout, int profile);
+ const char *zmq_address, char **zmq_subscriptions,
+ int n_zmq_subscriptions, int timeout, int profile);
#endif /* IM_SANDBOX_H */
diff --git a/src/im-zmq.c b/src/im-zmq.c
index 2f489811..be3bc544 100644
--- a/src/im-zmq.c
+++ b/src/im-zmq.c
@@ -55,9 +55,12 @@ struct im_zmq
};
-struct im_zmq *im_zmq_connect(const char *zmq_address)
+struct im_zmq *im_zmq_connect(const char *zmq_address,
+ char **subscriptions,
+ int n_subscriptions)
{
struct im_zmq *z;
+ int i;
z = malloc(sizeof(struct im_zmq));
if ( z == NULL ) return NULL;
@@ -73,11 +76,22 @@ struct im_zmq *im_zmq_connect(const char *zmq_address)
ERROR("ZMQ connection failed: %s\n", zmq_strerror(errno));
return NULL;
}
- STATUS("ZMQ connected. Subscribing to 'ondaframedata'\n");
- if ( zmq_setsockopt(z->socket, ZMQ_SUBSCRIBE, "ondaframedata", 13) ) {
- ERROR("ZMQ subscription failed: %s\n", zmq_strerror(errno));
- return NULL;
+ if ( n_subscriptions == 0 ) {
+ ERROR("WARNING: No ZeroMQ subscriptions. You should probably "
+ "try again with --zmq-subscribe.\n");
+ }
+
+ for ( i=0; i<n_subscriptions; i++ ) {
+ STATUS("Subscribing to '%s'\n", subscriptions[i]);
+ if ( zmq_setsockopt(z->socket, ZMQ_SUBSCRIBE,
+ subscriptions[i],
+ strlen(subscriptions[i])) )
+ {
+ ERROR("ZMQ subscription failed: %s\n",
+ zmq_strerror(errno));
+ return NULL;
+ }
}
return z;
diff --git a/src/im-zmq.h b/src/im-zmq.h
index c6bad6cc..9cc36b48 100644
--- a/src/im-zmq.h
+++ b/src/im-zmq.h
@@ -38,13 +38,17 @@
#if defined(HAVE_ZMQ)
-extern struct im_zmq *im_zmq_connect(const char *zmq_address);
+extern struct im_zmq *im_zmq_connect(const char *zmq_address,
+ char **subscriptions,
+ int n_subscriptions);
extern void im_zmq_shutdown(struct im_zmq *z);
extern void *im_zmq_fetch(struct im_zmq *z, size_t *pdata_size);
#else /* defined(HAVE_ZMQ) */
-static UNUSED struct im_zmq *im_zmq_connect(const char *zmq_address) { return NULL; }
+static UNUSED struct im_zmq *im_zmq_connect(const char *zmq_address,
+ char *zmq_subscriptions,
+ int n_subscriptions) { return NULL; }
static UNUSED void im_zmq_shutdown(struct im_zmq *z) { }
static UNUSED void *im_zmq_fetch(struct im_zmq *z, size_t *psize) { *psize = 0; return NULL; }
diff --git a/src/indexamajig.c b/src/indexamajig.c
index 27071256..524022b2 100644
--- a/src/indexamajig.c
+++ b/src/indexamajig.c
@@ -81,6 +81,8 @@ struct indexamajig_arguments
char *indm_str;
int basename;
char *zmq_addr;
+ char *zmq_subscriptions[256];
+ int n_zmq_subscriptions;
int serial_start;
char *temp_location;
int if_refine;
@@ -361,6 +363,13 @@ static error_t parse_arg(int key, char *arg, struct argp_state *state)
args->iargs.no_mask_data = 1;
break;
+ case 211 :
+ if ( args->n_zmq_subscriptions == 256 ) {
+ ERROR("Too many ZMQ subscriptions.\n");
+ return 1;
+ }
+ args->zmq_subscriptions[args->n_zmq_subscriptions++] = strdup(arg);
+
/* ---------- Peak search ---------- */
case 't' :
@@ -776,6 +785,7 @@ int main(int argc, char *argv[])
args.indm_str = NULL;
args.basename = 0;
args.zmq_addr = NULL;
+ args.n_zmq_subscriptions = 0;
args.serial_start = 1;
args.if_peaks = 1;
args.if_multi = 0;
@@ -879,6 +889,8 @@ int main(int argc, char *argv[])
{"spectrum-file", 209, "fn", OPTION_NO_USAGE | OPTION_HIDDEN,
"File containing radiation spectrum"},
{"no-mask-data", 210, NULL, OPTION_NO_USAGE, "Do not load mask data"},
+ {"zmq-subscribe", 211, "tag", OPTION_NO_USAGE, "Subscribe to ZMQ message"
+ "type"},
{NULL, 0, 0, OPTION_DOC, "Peak search options:", 3},
{"peaks", 301, "method", 0, "Peak search method. Default: zaef"},
@@ -1229,6 +1241,7 @@ int main(int argc, char *argv[])
r = create_sandbox(&args.iargs, args.n_proc, args.prefix, args.basename,
fh, st, tmpdir, args.serial_start, args.zmq_addr,
+ args.zmq_subscriptions, args.n_zmq_subscriptions,
timeout, args.profile);
cell_free(args.iargs.cell);