aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--doc/articles/online.rst27
-rw-r--r--doc/man/indexamajig.19
-rw-r--r--src/im-sandbox.c8
-rw-r--r--src/im-sandbox.h3
-rw-r--r--src/im-zmq.c63
-rw-r--r--src/im-zmq.h6
-rw-r--r--src/indexamajig.c16
7 files changed, 103 insertions, 29 deletions
diff --git a/doc/articles/online.rst b/doc/articles/online.rst
index 84e037ca..6183d7eb 100644
--- a/doc/articles/online.rst
+++ b/doc/articles/online.rst
@@ -7,9 +7,10 @@ socket. Use ``--zmq-input`` instead of ``--input`` or ``-i``. An error will
be generated if you use ``--zmq-input`` and ``--input`` or ``-i``
simultaneously.
-Indexamajig assumes that the socket is a *pub/sub* socket. You will also need
-to specify which message prefixes to subscribe to. Use ``--zmq-subscribe`` for
-this::
+Indexamajig can use either a SUB (subscriber) or a REQ (request) socket. The
+SUB socket type can be used for receiving data from OnDA/OM via the same
+mechanism that the OnDA/OM GUI uses. In this case, you will also need to
+specify which message prefixes to subscribe to using ``--zmq-subscribe``::
indexamajig --zmq-input=tcp://127.0.0.1:5002 \
--zmq-subscribe=ondaframedata \
@@ -18,8 +19,24 @@ this::
You can use ``--zmq-subscribe`` multiple times to subscribe to multiple message
prefixes.
-The option ``--no-image-data`` will be honoured, if given. This makes it
-possible to quickly check streaming data for "indexability".
+Note that this mode of operation does not combine well with multi-threading
+in inddxamajig - all worker processes will receive the same data! For anything
+more than "taking a peek" at the data, use the REQ socket instead by using
+``--zmq-request`` instead of ``--zmq-subscribe``. The argument to this option
+is the string which should be sent in the request message::
+
+ indexamajig --zmq-input=tcp://127.0.0.1:5002 \
+ --zmq-request=next \
+ -o output.stream -g Eiger.geom ...
+
+Because they represent completely different modes of operation, the two options
+``--zmq-request`` and ``--zmq-subscribe`` are mutually exclusive.
+
+In both cases, the option ``--no-image-data`` will be honoured, if given. This
+makes it possible to quickly check streaming data for "indexability". You will
+be able to do almost all of the usual downstream analysis operations on the
+resulting stream, except that attempting to merge it using partialator or
+process_hkl will result in zeroes everywhere.
Data format
diff --git a/doc/man/indexamajig.1 b/doc/man/indexamajig.1
index 5b4e62d4..7045d8de 100644
--- a/doc/man/indexamajig.1
+++ b/doc/man/indexamajig.1
@@ -181,12 +181,17 @@ Read the detector geometry description from \fIfilename\fR. See \fBman crystfel
.PD
Receive data over ZeroMQ from \fIaddress\fR. In this version, the ZeroMQ data will be assumed to be serialised with MsgPack, but other formats might be added in future. The options \fB--input\fR and \fB--zmq-input\fR are mutually exclusive - you must specify exactly one of them. Example: \fB--zmq-input=tcp://127.0.0.1:5002\fR.
.IP
-If you use this option, you should also use \fB--zmq-subscribe\fR to add a ZeroMQ subscription.
+If you use this option, you should also use either \fB--zmq-subscribe\fR to add a ZeroMQ subscription, or \fB--zmq-request\fR to define how to request data.
.PD 0
.IP \fB--zmq-subscribe=\fItag\fR
.PD
-Subscribe to ZeroMQ message type \fItag\fR. You can use this option multiple times to add multiple subscriptions.
+Subscribe to ZeroMQ message type \fItag\fR. You can use this option multiple times to add multiple subscriptions. This option and \fB--zmq-request\fR are mutually exclusive.
+
+.PD 0
+.IP \fB--zmq-request=\fImsg\fR
+.PD
+Request new data over ZeroMQ by sending string \fImsg\fR. This will cause indexamajig's ZeroMQ socket to use REQ mode instead of SUB. This option and \fB--zmq-subscribe\fR are mutually exclusive.
.PD 0
.IP \fB--basename\fR
diff --git a/src/im-sandbox.c b/src/im-sandbox.c
index ae1fe92a..4d40c461 100644
--- a/src/im-sandbox.c
+++ b/src/im-sandbox.c
@@ -102,6 +102,7 @@ struct sandbox
const char *zmq_address;
char **zmq_subscriptions;
int n_zmq_subscriptions;
+ const char *zmq_request;
/* Final output */
Stream *stream;
@@ -335,7 +336,8 @@ static int run_work(const struct index_args *iargs, Stream *st,
if ( sb->zmq ) {
zmqstuff = im_zmq_connect(sb->zmq_address,
sb->zmq_subscriptions,
- sb->n_zmq_subscriptions);
+ sb->n_zmq_subscriptions,
+ sb->zmq_request);
if ( zmqstuff == NULL ) {
ERROR("ZMQ setup failed.\n");
return 1;
@@ -1025,7 +1027,8 @@ int create_sandbox(struct index_args *iargs, int n_proc, char *prefix,
int config_basename, FILE *fh,
Stream *stream, const char *tmpdir, int serial_start,
const char *zmq_address, char **zmq_subscriptions,
- int n_zmq_subscriptions, int timeout, int profile)
+ int n_zmq_subscriptions, const char *zmq_request,
+ int timeout, int profile)
{
int i;
struct sandbox *sb;
@@ -1061,6 +1064,7 @@ int create_sandbox(struct index_args *iargs, int n_proc, char *prefix,
sb->zmq_address = zmq_address;
sb->zmq_subscriptions = zmq_subscriptions;
sb->n_zmq_subscriptions = n_zmq_subscriptions;
+ sb->zmq_request = zmq_request;
} else {
sb->zmq = 0;
}
diff --git a/src/im-sandbox.h b/src/im-sandbox.h
index 47d23a18..a76e69ec 100644
--- a/src/im-sandbox.h
+++ b/src/im-sandbox.h
@@ -86,6 +86,7 @@ extern int create_sandbox(struct index_args *iargs, int n_proc, char *prefix,
int config_basename, FILE *fh, Stream *stream,
const char *tempdir, int serial_start,
const char *zmq_address, char **zmq_subscriptions,
- int n_zmq_subscriptions, int timeout, int profile);
+ int n_zmq_subscriptions, const char *zmq_request,
+ int timeout, int profile);
#endif /* IM_SANDBOX_H */
diff --git a/src/im-zmq.c b/src/im-zmq.c
index be3bc544..c2d386cf 100644
--- a/src/im-zmq.c
+++ b/src/im-zmq.c
@@ -52,12 +52,14 @@ struct im_zmq
void *ctx;
void *socket;
zmq_msg_t msg;
+ const char *request_str;
};
struct im_zmq *im_zmq_connect(const char *zmq_address,
char **subscriptions,
- int n_subscriptions)
+ int n_subscriptions,
+ const char *zmq_request)
{
struct im_zmq *z;
int i;
@@ -68,30 +70,46 @@ struct im_zmq *im_zmq_connect(const char *zmq_address,
z->ctx = zmq_ctx_new();
if ( z->ctx == NULL ) return NULL;
- z->socket = zmq_socket(z->ctx, ZMQ_SUB);
+ if ( zmq_request == NULL ) {
+ STATUS("Connecting ZMQ subscriber to '%s'\n", zmq_address);
+ z->socket = zmq_socket(z->ctx, ZMQ_SUB);
+ } else {
+ STATUS("Connecting ZMQ requester to '%s'\n", zmq_address);
+ z->socket = zmq_socket(z->ctx, ZMQ_REQ);
+ }
if ( z->socket == NULL ) return NULL;
- STATUS("Connecting to ZMQ at '%s'\n", zmq_address);
if ( zmq_connect(z->socket, zmq_address) == -1 ) {
ERROR("ZMQ connection failed: %s\n", zmq_strerror(errno));
return NULL;
}
- if ( n_subscriptions == 0 ) {
- ERROR("WARNING: No ZeroMQ subscriptions. You should probably "
- "try again with --zmq-subscribe.\n");
- }
+ if ( zmq_request == NULL ) {
- for ( i=0; i<n_subscriptions; i++ ) {
- STATUS("Subscribing to '%s'\n", subscriptions[i]);
- if ( zmq_setsockopt(z->socket, ZMQ_SUBSCRIBE,
- subscriptions[i],
- strlen(subscriptions[i])) )
- {
- ERROR("ZMQ subscription failed: %s\n",
- zmq_strerror(errno));
- return NULL;
+ /* SUB mode */
+ if ( n_subscriptions == 0 ) {
+ ERROR("WARNING: No ZeroMQ subscriptions. You should "
+ "probably try again with --zmq-subscribe.\n");
}
+ for ( i=0; i<n_subscriptions; i++ ) {
+ STATUS("Subscribing to '%s'\n", subscriptions[i]);
+ if ( zmq_setsockopt(z->socket, ZMQ_SUBSCRIBE,
+ subscriptions[i],
+ strlen(subscriptions[i])) )
+ {
+ ERROR("ZMQ subscription failed: %s\n",
+ zmq_strerror(errno));
+ return NULL;
+ }
+ }
+
+ z->request_str = NULL;
+
+ } else {
+
+ /* REQ mode */
+ z->request_str = zmq_request;
+
}
return z;
@@ -103,6 +121,19 @@ void *im_zmq_fetch(struct im_zmq *z, size_t *pdata_size)
int msg_size;
void *data_copy;
+ if ( z->request_str != NULL ) {
+
+ /* Send the request */
+ if ( zmq_send(z->socket, z->request_str,
+ strlen(z->request_str), 0) == -1 )
+ {
+ ERROR("ZMQ message send failed: %s\n",
+ zmq_strerror(errno));
+ return NULL;
+ }
+ }
+
+ /* Receive message */
zmq_msg_init(&z->msg);
msg_size = zmq_msg_recv(&z->msg, z->socket, 0);
if ( msg_size == -1 ) {
diff --git a/src/im-zmq.h b/src/im-zmq.h
index 9cc36b48..557f42e3 100644
--- a/src/im-zmq.h
+++ b/src/im-zmq.h
@@ -40,7 +40,8 @@
extern struct im_zmq *im_zmq_connect(const char *zmq_address,
char **subscriptions,
- int n_subscriptions);
+ int n_subscriptions,
+ const char *zmq_request);
extern void im_zmq_shutdown(struct im_zmq *z);
extern void *im_zmq_fetch(struct im_zmq *z, size_t *pdata_size);
@@ -48,7 +49,8 @@ extern void *im_zmq_fetch(struct im_zmq *z, size_t *pdata_size);
static UNUSED struct im_zmq *im_zmq_connect(const char *zmq_address,
char *zmq_subscriptions,
- int n_subscriptions) { return NULL; }
+ int n_subscriptions,
+ const char *zmq_request) { return NULL; }
static UNUSED void im_zmq_shutdown(struct im_zmq *z) { }
static UNUSED void *im_zmq_fetch(struct im_zmq *z, size_t *psize) { *psize = 0; return NULL; }
diff --git a/src/indexamajig.c b/src/indexamajig.c
index 43709cf6..16822989 100644
--- a/src/indexamajig.c
+++ b/src/indexamajig.c
@@ -81,6 +81,7 @@ struct indexamajig_arguments
char *indm_str;
int basename;
char *zmq_addr;
+ char *zmq_request;
char *zmq_subscriptions[256];
int n_zmq_subscriptions;
int serial_start;
@@ -371,6 +372,10 @@ static error_t parse_arg(int key, char *arg, struct argp_state *state)
args->zmq_subscriptions[args->n_zmq_subscriptions++] = strdup(arg);
break;
+ case 212 :
+ args->zmq_request = strdup(arg);
+ break;
+
/* ---------- Peak search ---------- */
case 't' :
@@ -786,6 +791,7 @@ int main(int argc, char *argv[])
args.indm_str = NULL;
args.basename = 0;
args.zmq_addr = NULL;
+ args.zmq_request = NULL;
args.n_zmq_subscriptions = 0;
args.serial_start = 1;
args.if_peaks = 1;
@@ -892,6 +898,8 @@ int main(int argc, char *argv[])
{"no-mask-data", 210, NULL, OPTION_NO_USAGE, "Do not load mask data"},
{"zmq-subscribe", 211, "tag", OPTION_NO_USAGE, "Subscribe to ZMQ message"
"type"},
+ {"zmq-request", 212, "str", OPTION_NO_USAGE, "Request messages using"
+ "this string."},
{NULL, 0, 0, OPTION_DOC, "Peak search options:", 3},
{"peaks", 301, "method", 0, "Peak search method. Default: zaef"},
@@ -1026,6 +1034,12 @@ int main(int argc, char *argv[])
return 1;
}
+ if ( (args.zmq_request != NULL) && (args.n_zmq_subscriptions > 0) ) {
+ ERROR("The options --zmq-request and --zmq-subscribe are "
+ "mutually exclusive.\n");
+ return 1;
+ }
+
/* Open input */
if ( args.filename != NULL ) {
if ( strcmp(args.filename, "-") == 0 ) {
@@ -1243,7 +1257,7 @@ int main(int argc, char *argv[])
r = create_sandbox(&args.iargs, args.n_proc, args.prefix, args.basename,
fh, st, tmpdir, args.serial_start, args.zmq_addr,
args.zmq_subscriptions, args.n_zmq_subscriptions,
- timeout, args.profile);
+ args.zmq_request, timeout, args.profile);
cell_free(args.iargs.cell);
free(args.prefix);