From 6668a7431d44d4c4fc463ab00c21300a7902d3da Mon Sep 17 00:00:00 2001 From: Thomas White Date: Thu, 23 Jun 2022 13:51:32 +0200 Subject: indexamajig: Add --asapo-wait-for-stream --- src/im-asapo.c | 26 +++++++++++++++++++++++++- src/im-asapo.h | 1 + src/indexamajig.c | 7 +++++++ 3 files changed, 33 insertions(+), 1 deletion(-) (limited to 'src') diff --git a/src/im-asapo.c b/src/im-asapo.c index b19c5bdc..255a942d 100644 --- a/src/im-asapo.c +++ b/src/im-asapo.c @@ -51,6 +51,7 @@ struct im_asapo char *stream; AsapoConsumerHandle consumer; AsapoStringHandle group_id; + int wait_for_stream; }; @@ -110,11 +111,30 @@ struct im_asapo *im_asapo_connect(struct im_asapo_params *params) a->stream = strdup(params->stream); asapo_consumer_set_timeout(a->consumer, 3000); a->group_id = asapo_string_from_c_str(params->group_id); + a->wait_for_stream = params->wait_for_stream; return a; } +static int stream_empty(struct im_asapo *a) +{ + AsapoErrorHandle err; + + err = asapo_new_handle(); + int64_t size = asapo_consumer_get_current_size(a->consumer, a->stream, + &err); + + if ( asapo_is_error(err) ) { + show_asapo_error("Couldn't get stream size", err); + asapo_free_handle(&err); + return 0; + } + + return ( size == 0 ); +} + + void *im_asapo_fetch(struct im_asapo *a, size_t *pdata_size, char **pmeta, char **pfilename, char **pevent, int *pfinished) @@ -141,7 +161,11 @@ void *im_asapo_fetch(struct im_asapo *a, size_t *pdata_size, asapo_free_handle(&err); asapo_free_handle(&meta); asapo_free_handle(&data); - *pfinished = 1; + if ( stream_empty(a) && a->wait_for_stream ) { + *pfinished = 0; + } else { + *pfinished = 1; + } return NULL; } diff --git a/src/im-asapo.h b/src/im-asapo.h index f2e9e40a..cda9fbd9 100644 --- a/src/im-asapo.h +++ b/src/im-asapo.h @@ -42,6 +42,7 @@ struct im_asapo_params char *group_id; char *source; char *stream; + int wait_for_stream; }; #if defined(HAVE_ASAPO) diff --git a/src/indexamajig.c b/src/indexamajig.c index af1a5704..6e821434 100644 --- a/src/indexamajig.c +++ b/src/indexamajig.c @@ -435,6 +435,10 @@ static error_t parse_arg(int key, char *arg, struct argp_state *state) args->asapo_params.stream = strdup(arg); break; + case 221 : + args->asapo_params.wait_for_stream = 1; + break; + /* ---------- Peak search ---------- */ case 't' : @@ -858,6 +862,7 @@ int main(int argc, char *argv[]) args.asapo_params.group_id = NULL; args.asapo_params.source = NULL; args.asapo_params.stream = NULL; + args.asapo_params.wait_for_stream = 0; args.serial_start = 1; args.if_peaks = 1; args.if_multi = 0; @@ -972,6 +977,8 @@ int main(int argc, char *argv[]) {"asapo-source", 218, "str", OPTION_NO_USAGE, "ASAP::O data source"}, {"data-format", 219, "str", OPTION_NO_USAGE, "Streamed data format"}, {"asapo-stream", 220, "str", OPTION_NO_USAGE, "ASAP::O stream name"}, + {"asapo-wait-for-stream", 221, "str", OPTION_NO_USAGE, + "Wait for ASAP::O stream to appear"}, {NULL, 0, 0, OPTION_DOC, "Peak search options:", 3}, {"peaks", 301, "method", 0, "Peak search method. Default: zaef"}, -- cgit v1.2.3