aboutsummaryrefslogtreecommitdiff
path: root/src/im-asapo.c
diff options
context:
space:
mode:
authorThomas White <taw@physics.org>2022-06-23 13:51:32 +0200
committerThomas White <taw@physics.org>2022-06-23 13:51:32 +0200
commit6668a7431d44d4c4fc463ab00c21300a7902d3da (patch)
tree5864f0b45e0cfb826434dd0f9b996bcd2ca0b9e6 /src/im-asapo.c
parent097e9601f8d869a6f8734bbc7aaa24a22088b909 (diff)
indexamajig: Add --asapo-wait-for-stream
Diffstat (limited to 'src/im-asapo.c')
-rw-r--r--src/im-asapo.c26
1 files changed, 25 insertions, 1 deletions
diff --git a/src/im-asapo.c b/src/im-asapo.c
index b19c5bdc..255a942d 100644
--- a/src/im-asapo.c
+++ b/src/im-asapo.c
@@ -51,6 +51,7 @@ struct im_asapo
char *stream;
AsapoConsumerHandle consumer;
AsapoStringHandle group_id;
+ int wait_for_stream;
};
@@ -110,11 +111,30 @@ struct im_asapo *im_asapo_connect(struct im_asapo_params *params)
a->stream = strdup(params->stream);
asapo_consumer_set_timeout(a->consumer, 3000);
a->group_id = asapo_string_from_c_str(params->group_id);
+ a->wait_for_stream = params->wait_for_stream;
return a;
}
+static int stream_empty(struct im_asapo *a)
+{
+ AsapoErrorHandle err;
+
+ err = asapo_new_handle();
+ int64_t size = asapo_consumer_get_current_size(a->consumer, a->stream,
+ &err);
+
+ if ( asapo_is_error(err) ) {
+ show_asapo_error("Couldn't get stream size", err);
+ asapo_free_handle(&err);
+ return 0;
+ }
+
+ return ( size == 0 );
+}
+
+
void *im_asapo_fetch(struct im_asapo *a, size_t *pdata_size,
char **pmeta, char **pfilename, char **pevent,
int *pfinished)
@@ -141,7 +161,11 @@ void *im_asapo_fetch(struct im_asapo *a, size_t *pdata_size,
asapo_free_handle(&err);
asapo_free_handle(&meta);
asapo_free_handle(&data);
- *pfinished = 1;
+ if ( stream_empty(a) && a->wait_for_stream ) {
+ *pfinished = 0;
+ } else {
+ *pfinished = 1;
+ }
return NULL;
}