aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/im-sandbox.c35
-rw-r--r--src/im-sandbox.h4
-rw-r--r--src/im-zmq.c27
-rw-r--r--src/im-zmq.h18
-rw-r--r--src/indexamajig.c33
5 files changed, 51 insertions, 66 deletions
diff --git a/src/im-sandbox.c b/src/im-sandbox.c
index de2e08b6..1e4a95a6 100644
--- a/src/im-sandbox.c
+++ b/src/im-sandbox.c
@@ -98,12 +98,8 @@ struct sandbox
const char *tmpdir;
- /* ZMQ mode */
- int zmq;
- const char *zmq_address;
- char **zmq_subscriptions;
- int n_zmq_subscriptions;
- const char *zmq_request;
+ /* If non-NULL, we are using ZMQ */
+ struct im_zmq_params *zmq_params;
/* If non-NULL, we are using ASAP::O */
struct im_asapo_params *asapo_params;
@@ -341,11 +337,8 @@ static int run_work(const struct index_args *iargs, Stream *st,
}
/* Connect via ZMQ */
- if ( sb->zmq ) {
- zmqstuff = im_zmq_connect(sb->zmq_address,
- sb->zmq_subscriptions,
- sb->n_zmq_subscriptions,
- sb->zmq_request);
+ if ( sb->zmq_params != NULL ) {
+ zmqstuff = im_zmq_connect(sb->zmq_params);
if ( zmqstuff == NULL ) {
ERROR("ZMQ setup failed.\n");
return 1;
@@ -455,7 +448,7 @@ static int run_work(const struct index_args *iargs, Stream *st,
pargs.asapo_data_size = 0;
pargs.asapo_meta = NULL;
- if ( sb->zmq ) {
+ if ( sb->zmq_params != NULL ) {
do {
pargs.zmq_data = im_zmq_fetch(zmqstuff,
@@ -898,7 +891,7 @@ static int fill_queue(struct get_pattern_ctx *gpctx, struct sandbox *sb)
char *filename;
char *evstr;
- if ( sb->zmq ) {
+ if ( sb->zmq_params != NULL ) {
/* These are just semi-meaningful placeholder values to
* be put into the queue, instead of "(null)".
* A unique filename is needed so that the GUI can
@@ -1117,8 +1110,7 @@ char *create_tempdir(const char *temp_location)
int create_sandbox(struct index_args *iargs, int n_proc, char *prefix,
int config_basename, FILE *fh,
Stream *stream, const char *tmpdir, int serial_start,
- const char *zmq_address, char **zmq_subscriptions,
- int n_zmq_subscriptions, const char *zmq_request,
+ struct im_zmq_params *zmq_params,
struct im_asapo_params *asapo_params,
int timeout, int profile)
{
@@ -1150,14 +1142,11 @@ int create_sandbox(struct index_args *iargs, int n_proc, char *prefix,
sb->tmpdir = tmpdir;
sb->profile = profile;
sb->timeout = timeout;
- if ( zmq_address != NULL ) {
- sb->zmq = 1;
- sb->zmq_address = zmq_address;
- sb->zmq_subscriptions = zmq_subscriptions;
- sb->n_zmq_subscriptions = n_zmq_subscriptions;
- sb->zmq_request = zmq_request;
+
+ if ( zmq_params->addr != NULL ) {
+ sb->zmq_params = zmq_params;
} else {
- sb->zmq = 0;
+ sb->zmq_params = NULL;
}
if ( asapo_params->endpoint != NULL ) {
@@ -1166,7 +1155,7 @@ int create_sandbox(struct index_args *iargs, int n_proc, char *prefix,
sb->asapo_params = NULL;
}
- if ( sb->zmq && sb->asapo_params ) {
+ if ( sb->zmq_params && sb->asapo_params ) {
ERROR("Cannot simultaneously use ZMQ and ASAP::O input.\n");
free(sb);
return 0;
diff --git a/src/im-sandbox.h b/src/im-sandbox.h
index 78f542b9..52094a3d 100644
--- a/src/im-sandbox.h
+++ b/src/im-sandbox.h
@@ -42,6 +42,7 @@ struct sb_shm;
#include "stream.h"
#include "cell.h"
#include "process_image.h"
+#include "im-zmq.h"
#include "im-asapo.h"
/* Length of event queue */
@@ -86,8 +87,7 @@ extern void set_last_task(char *lt, const char *task);
extern int create_sandbox(struct index_args *iargs, int n_proc, char *prefix,
int config_basename, FILE *fh, Stream *stream,
const char *tempdir, int serial_start,
- const char *zmq_address, char **zmq_subscriptions,
- int n_zmq_subscriptions, const char *zmq_request,
+ struct im_zmq_params *zmq_params,
struct im_asapo_params *asapo_params,
int timeout, int profile);
diff --git a/src/im-zmq.c b/src/im-zmq.c
index 3fd04428..b326e868 100644
--- a/src/im-zmq.c
+++ b/src/im-zmq.c
@@ -56,10 +56,7 @@ struct im_zmq
};
-struct im_zmq *im_zmq_connect(const char *zmq_address,
- char **subscriptions,
- int n_subscriptions,
- const char *zmq_request)
+struct im_zmq *im_zmq_connect(struct im_zmq_params *params)
{
struct im_zmq *z;
@@ -72,11 +69,11 @@ struct im_zmq *im_zmq_connect(const char *zmq_address,
return NULL;
}
- if ( zmq_request == NULL ) {
- STATUS("Connecting ZMQ subscriber to '%s'\n", zmq_address);
+ if ( params->request == NULL ) {
+ STATUS("Connecting ZMQ subscriber to '%s'\n", params->addr);
z->socket = zmq_socket(z->ctx, ZMQ_SUB);
} else {
- STATUS("Connecting ZMQ requester to '%s'\n", zmq_address);
+ STATUS("Connecting ZMQ requester to '%s'\n", params->addr);
z->socket = zmq_socket(z->ctx, ZMQ_REQ);
}
if ( z->socket == NULL ) {
@@ -84,26 +81,26 @@ struct im_zmq *im_zmq_connect(const char *zmq_address,
return NULL;
}
- if ( zmq_connect(z->socket, zmq_address) == -1 ) {
+ if ( zmq_connect(z->socket, params->addr) == -1 ) {
ERROR("ZMQ connection failed: %s\n", zmq_strerror(errno));
free(z);
return NULL;
}
- if ( zmq_request == NULL ) {
+ if ( params->request == NULL ) {
int i;
/* SUB mode */
- if ( n_subscriptions == 0 ) {
+ if ( params->n_subscriptions == 0 ) {
ERROR("WARNING: No ZeroMQ subscriptions. You should "
"probably try again with --zmq-subscribe.\n");
}
- for ( i=0; i<n_subscriptions; i++ ) {
- STATUS("Subscribing to '%s'\n", subscriptions[i]);
+ for ( i=0; i<params->n_subscriptions; i++ ) {
+ STATUS("Subscribing to '%s'\n", params->subscriptions[i]);
if ( zmq_setsockopt(z->socket, ZMQ_SUBSCRIBE,
- subscriptions[i],
- strlen(subscriptions[i])) )
+ params->subscriptions[i],
+ strlen(params->subscriptions[i])) )
{
ERROR("ZMQ subscription failed: %s\n",
zmq_strerror(errno));
@@ -116,7 +113,7 @@ struct im_zmq *im_zmq_connect(const char *zmq_address,
} else {
/* REQ mode */
- z->request_str = zmq_request;
+ z->request_str = params->request;
}
diff --git a/src/im-zmq.h b/src/im-zmq.h
index 88c0a568..cfb98ff4 100644
--- a/src/im-zmq.h
+++ b/src/im-zmq.h
@@ -36,21 +36,23 @@
#include <config.h>
#endif
+struct im_zmq_params
+{
+ char *addr;
+ char *request;
+ char *subscriptions[256];
+ int n_subscriptions;
+};
+
#if defined(HAVE_ZMQ)
-extern struct im_zmq *im_zmq_connect(const char *zmq_address,
- char **subscriptions,
- int n_subscriptions,
- const char *zmq_request);
+extern struct im_zmq *im_zmq_connect(struct im_zmq_params *params);
extern void im_zmq_shutdown(struct im_zmq *z);
extern void *im_zmq_fetch(struct im_zmq *z, size_t *pdata_size);
#else /* defined(HAVE_ZMQ) */
-static UNUSED struct im_zmq *im_zmq_connect(const char *zmq_address,
- char **zmq_subscriptions,
- int n_subscriptions,
- const char *zmq_request) { return NULL; }
+static UNUSED struct im_zmq *im_zmq_connect(struct im_zmq_params *params) { return NULL; }
static UNUSED void im_zmq_shutdown(struct im_zmq *z) { }
static UNUSED void *im_zmq_fetch(struct im_zmq *z, size_t *psize) { *psize = 0; return NULL; }
diff --git a/src/indexamajig.c b/src/indexamajig.c
index 020612a8..af1a5704 100644
--- a/src/indexamajig.c
+++ b/src/indexamajig.c
@@ -63,6 +63,7 @@
#include <datatemplate.h>
#include "im-sandbox.h"
+#include "im-zmq.h"
#include "im-asapo.h"
#include "version.h"
#include "json-utils.h"
@@ -81,10 +82,7 @@ struct indexamajig_arguments
char *cellfile;
char *indm_str;
int basename;
- char *zmq_addr;
- char *zmq_request;
- char *zmq_subscriptions[256];
- int n_zmq_subscriptions;
+ struct im_zmq_params zmq_params;
struct im_asapo_params asapo_params;
int serial_start;
char *temp_location;
@@ -378,7 +376,7 @@ static error_t parse_arg(int key, char *arg, struct argp_state *state)
break;
case 207 :
- args->zmq_addr = strdup(arg);
+ args->zmq_params.addr = strdup(arg);
break;
case 208 :
@@ -394,15 +392,15 @@ static error_t parse_arg(int key, char *arg, struct argp_state *state)
break;
case 211 :
- if ( args->n_zmq_subscriptions == 256 ) {
+ if ( args->zmq_params.n_subscriptions == 256 ) {
ERROR("Too many ZMQ subscriptions.\n");
return 1;
}
- args->zmq_subscriptions[args->n_zmq_subscriptions++] = strdup(arg);
+ args->zmq_params.subscriptions[args->zmq_params.n_subscriptions++] = strdup(arg);
break;
case 212 :
- args->zmq_request = strdup(arg);
+ args->zmq_params.request = strdup(arg);
break;
case 213 :
@@ -851,15 +849,15 @@ int main(int argc, char *argv[])
args.cellfile = NULL;
args.indm_str = NULL;
args.basename = 0;
- args.zmq_addr = NULL;
- args.zmq_request = NULL;
+ args.zmq_params.addr = NULL;
+ args.zmq_params.request = NULL;
+ args.zmq_params.n_subscriptions = 0;
args.asapo_params.endpoint = NULL;
args.asapo_params.token = NULL;
args.asapo_params.beamtime = NULL;
args.asapo_params.group_id = NULL;
args.asapo_params.source = NULL;
args.asapo_params.stream = NULL;
- args.n_zmq_subscriptions = 0;
args.serial_start = 1;
args.if_peaks = 1;
args.if_multi = 0;
@@ -1091,7 +1089,7 @@ int main(int argc, char *argv[])
/* Check for minimal information */
if ( (args.filename == NULL)
- && (args.zmq_addr == NULL)
+ && (args.zmq_params.addr == NULL)
&& (args.asapo_params.endpoint == NULL) ) {
ERROR("You need to provide the input filename (use -i)\n");
return 1;
@@ -1105,7 +1103,7 @@ int main(int argc, char *argv[])
return 1;
}
- if ( (args.filename != NULL) && (args.zmq_addr != NULL) ) {
+ if ( (args.filename != NULL) && (args.zmq_params.addr != NULL) ) {
ERROR("The options --input and --zmq-input are mutually "
"exclusive.\n");
return 1;
@@ -1117,13 +1115,13 @@ int main(int argc, char *argv[])
return 1;
}
- if ( (args.asapo_params.endpoint != NULL) && (args.zmq_addr != NULL) ) {
+ if ( (args.asapo_params.endpoint != NULL) && (args.zmq_params.addr != NULL) ) {
ERROR("The options --asapo-endpoint and --zmq-input are mutually "
"exclusive.\n");
return 1;
}
- if ( (args.zmq_request != NULL) && (args.n_zmq_subscriptions > 0) ) {
+ if ( (args.zmq_params.request != NULL) && (args.zmq_params.n_subscriptions > 0) ) {
ERROR("The options --zmq-request and --zmq-subscribe are "
"mutually exclusive.\n");
return 1;
@@ -1345,9 +1343,8 @@ int main(int argc, char *argv[])
r = create_sandbox(&args.iargs, args.n_proc, args.prefix, args.basename,
fh, st, tmpdir, args.serial_start,
- args.zmq_addr, args.zmq_subscriptions,
- args.n_zmq_subscriptions, args.zmq_request,
- &args.asapo_params, timeout, args.profile);
+ &args.zmq_params, &args.asapo_params,
+ timeout, args.profile);
cell_free(args.iargs.cell);
free(args.prefix);