aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--doc/man/indexamajig.15
-rw-r--r--src/im-asapo.c26
-rw-r--r--src/im-asapo.h1
-rw-r--r--src/indexamajig.c7
4 files changed, 38 insertions, 1 deletions
diff --git a/doc/man/indexamajig.1 b/doc/man/indexamajig.1
index 25e025ac..4ad84325 100644
--- a/doc/man/indexamajig.1
+++ b/doc/man/indexamajig.1
@@ -204,6 +204,11 @@ Authentication token, beamtime, data source and consumer group, respectively, fo
Name of ASAP::O stream to process. If this option is not given, indexamajig will start processing from the end of the current last stream.
.PD 0
+.IP \fB--asapo-wait-for-stream
+.PD
+If the ASAP::O stream does not exist, wait for it to be appear. Without this option, indexamajig will exit immediately if the stream is not found.
+
+.PD 0
.IP \fB--data-format=\fIformat\fR
.PD
Specify the data format for data received over ZeroMQ or ASAP::O. Possible values in this version are \fBmsgpack\fR, \fBhdf5\fR and \fBseedee\fR.
diff --git a/src/im-asapo.c b/src/im-asapo.c
index b19c5bdc..255a942d 100644
--- a/src/im-asapo.c
+++ b/src/im-asapo.c
@@ -51,6 +51,7 @@ struct im_asapo
char *stream;
AsapoConsumerHandle consumer;
AsapoStringHandle group_id;
+ int wait_for_stream;
};
@@ -110,11 +111,30 @@ struct im_asapo *im_asapo_connect(struct im_asapo_params *params)
a->stream = strdup(params->stream);
asapo_consumer_set_timeout(a->consumer, 3000);
a->group_id = asapo_string_from_c_str(params->group_id);
+ a->wait_for_stream = params->wait_for_stream;
return a;
}
+static int stream_empty(struct im_asapo *a)
+{
+ AsapoErrorHandle err;
+
+ err = asapo_new_handle();
+ int64_t size = asapo_consumer_get_current_size(a->consumer, a->stream,
+ &err);
+
+ if ( asapo_is_error(err) ) {
+ show_asapo_error("Couldn't get stream size", err);
+ asapo_free_handle(&err);
+ return 0;
+ }
+
+ return ( size == 0 );
+}
+
+
void *im_asapo_fetch(struct im_asapo *a, size_t *pdata_size,
char **pmeta, char **pfilename, char **pevent,
int *pfinished)
@@ -141,7 +161,11 @@ void *im_asapo_fetch(struct im_asapo *a, size_t *pdata_size,
asapo_free_handle(&err);
asapo_free_handle(&meta);
asapo_free_handle(&data);
- *pfinished = 1;
+ if ( stream_empty(a) && a->wait_for_stream ) {
+ *pfinished = 0;
+ } else {
+ *pfinished = 1;
+ }
return NULL;
}
diff --git a/src/im-asapo.h b/src/im-asapo.h
index f2e9e40a..cda9fbd9 100644
--- a/src/im-asapo.h
+++ b/src/im-asapo.h
@@ -42,6 +42,7 @@ struct im_asapo_params
char *group_id;
char *source;
char *stream;
+ int wait_for_stream;
};
#if defined(HAVE_ASAPO)
diff --git a/src/indexamajig.c b/src/indexamajig.c
index af1a5704..6e821434 100644
--- a/src/indexamajig.c
+++ b/src/indexamajig.c
@@ -435,6 +435,10 @@ static error_t parse_arg(int key, char *arg, struct argp_state *state)
args->asapo_params.stream = strdup(arg);
break;
+ case 221 :
+ args->asapo_params.wait_for_stream = 1;
+ break;
+
/* ---------- Peak search ---------- */
case 't' :
@@ -858,6 +862,7 @@ int main(int argc, char *argv[])
args.asapo_params.group_id = NULL;
args.asapo_params.source = NULL;
args.asapo_params.stream = NULL;
+ args.asapo_params.wait_for_stream = 0;
args.serial_start = 1;
args.if_peaks = 1;
args.if_multi = 0;
@@ -972,6 +977,8 @@ int main(int argc, char *argv[])
{"asapo-source", 218, "str", OPTION_NO_USAGE, "ASAP::O data source"},
{"data-format", 219, "str", OPTION_NO_USAGE, "Streamed data format"},
{"asapo-stream", 220, "str", OPTION_NO_USAGE, "ASAP::O stream name"},
+ {"asapo-wait-for-stream", 221, "str", OPTION_NO_USAGE,
+ "Wait for ASAP::O stream to appear"},
{NULL, 0, 0, OPTION_DOC, "Peak search options:", 3},
{"peaks", 301, "method", 0, "Peak search method. Default: zaef"},