diff options
author | Thomas White <taw@physics.org> | 2022-06-23 13:51:32 +0200 |
---|---|---|
committer | Thomas White <taw@physics.org> | 2022-06-23 13:51:32 +0200 |
commit | 6668a7431d44d4c4fc463ab00c21300a7902d3da (patch) | |
tree | 5864f0b45e0cfb826434dd0f9b996bcd2ca0b9e6 /src/im-asapo.c | |
parent | 097e9601f8d869a6f8734bbc7aaa24a22088b909 (diff) |
indexamajig: Add --asapo-wait-for-stream
Diffstat (limited to 'src/im-asapo.c')
-rw-r--r-- | src/im-asapo.c | 26 |
1 files changed, 25 insertions, 1 deletions
diff --git a/src/im-asapo.c b/src/im-asapo.c index b19c5bdc..255a942d 100644 --- a/src/im-asapo.c +++ b/src/im-asapo.c @@ -51,6 +51,7 @@ struct im_asapo char *stream; AsapoConsumerHandle consumer; AsapoStringHandle group_id; + int wait_for_stream; }; @@ -110,11 +111,30 @@ struct im_asapo *im_asapo_connect(struct im_asapo_params *params) a->stream = strdup(params->stream); asapo_consumer_set_timeout(a->consumer, 3000); a->group_id = asapo_string_from_c_str(params->group_id); + a->wait_for_stream = params->wait_for_stream; return a; } +static int stream_empty(struct im_asapo *a) +{ + AsapoErrorHandle err; + + err = asapo_new_handle(); + int64_t size = asapo_consumer_get_current_size(a->consumer, a->stream, + &err); + + if ( asapo_is_error(err) ) { + show_asapo_error("Couldn't get stream size", err); + asapo_free_handle(&err); + return 0; + } + + return ( size == 0 ); +} + + void *im_asapo_fetch(struct im_asapo *a, size_t *pdata_size, char **pmeta, char **pfilename, char **pevent, int *pfinished) @@ -141,7 +161,11 @@ void *im_asapo_fetch(struct im_asapo *a, size_t *pdata_size, asapo_free_handle(&err); asapo_free_handle(&meta); asapo_free_handle(&data); - *pfinished = 1; + if ( stream_empty(a) && a->wait_for_stream ) { + *pfinished = 0; + } else { + *pfinished = 1; + } return NULL; } |