From 77612a5e8a39ffc96c89e4b07fc14fc50d57382f Mon Sep 17 00:00:00 2001 From: Thomas White Date: Thu, 16 Sep 2021 16:20:51 +0200 Subject: ASAP::O: Fixes after testing for stream switching --- src/im-asapo.c | 62 ++++++++++++++++++++++++++-------------------------------- 1 file changed, 28 insertions(+), 34 deletions(-) (limited to 'src/im-asapo.c') diff --git a/src/im-asapo.c b/src/im-asapo.c index 0fc0a2b2..7b3c8b8b 100644 --- a/src/im-asapo.c +++ b/src/im-asapo.c @@ -124,36 +124,34 @@ struct im_asapo *im_asapo_connect(const char *endpoint, static int select_last_stream(struct im_asapo *a) { AsapoStreamInfosHandle si; - size_t len; + size_t n; int i; AsapoStreamInfoHandle st; AsapoErrorHandle err = asapo_new_handle(); - si = asapo_consumer_get_stream_list(a->consumer, NULL, + si = asapo_consumer_get_stream_list(a->consumer, "", kAllStreams, &err); if ( asapo_is_error(err) ) { show_asapo_error("Couldn't get ASAP::O stream list", err); - asapo_free_handle(err); + asapo_free_handle(&err); return 1; } - STATUS("for info: stream list:\n"); + STATUS("Streams available at start:\n"); n = asapo_stream_infos_get_size(si); for ( i=0; istream = strdup(asapo_stream_info_get_name(st)); - asapo_free_handle(st); + STATUS("Starting with the last stream: %s\n", a->stream); - asapo_free_handle(si); - asapo_free_handle(err); + asapo_free_handle(&err); return 0; } @@ -161,39 +159,30 @@ static int select_last_stream(struct im_asapo *a) static int select_next_stream(struct im_asapo *a) { AsapoStreamInfosHandle si; - size_t len; - int i; + AsapoStreamInfoHandle st; AsapoErrorHandle err = asapo_new_handle(); + const char *next_stream; - si = asapo_consumer_get_stream_list(a->consumer, NULL, + si = asapo_consumer_get_stream_list(a->consumer, a->stream, kAllStreams, &err); if ( asapo_is_error(err) ) { show_asapo_error("Couldn't get ASAP::O stream list", err); - asapo_free_handle(err); + asapo_free_handle(&err); return 1; } - n = asapo_stream_infos_get_size(si); - for ( i=n-1; i>0; i-- ) { - - AsapoStreamInfoHandle st; - const char *name; - - st = asapo_stream_infos_get_item(si, i); - name = asapo_stream_info_get_name(st); - - if ( strcmp(name, a->stream) == 0 ) { - free(a->stream); - a->stream = strdup(asapo_stream_info_get_next_stream(st)); - asapo_free_handle(st); - break; - } + asapo_free_handle(&err); - asapo_free_handle(st); + st = asapo_stream_infos_get_item(si, 0); + next_stream = asapo_stream_info_get_name(st); + if ( strcmp(next_stream, a->stream) == 0 ) { + STATUS("Waiting for new data...\n"); + } else { + free(a->stream); + a->stream = strdup(next_stream); + STATUS("Selecting next stream: %s\n", a->stream); } - asapo_free_handle(si); - asapo_free_handle(err); return 0; } @@ -201,7 +190,13 @@ static int select_next_stream(struct im_asapo *a) static void skip_to_stream_end(struct im_asapo *a) { - /* FIXME: Implementation */ + int64_t size; + AsapoErrorHandle err = asapo_new_handle(); + + size = asapo_consumer_get_current_size(a->consumer, a->stream, &err); + asapo_consumer_set_last_read_marker(a->consumer, a->group_id, size, + a->stream, &err); + STATUS("Skipping to end of stream (%lli)\n", size); } @@ -231,7 +226,7 @@ void *im_asapo_fetch(struct im_asapo *a, size_t *pdata_size) asapo_free_handle(&err); asapo_free_handle(&meta); asapo_free_handle(&data); - return NULL; /* Please call back later! */ + return NULL; /* Sandbox will call try again very soon */ } if ( asapo_is_error(err) ) { @@ -246,7 +241,6 @@ void *im_asapo_fetch(struct im_asapo *a, size_t *pdata_size) STATUS("ASAP::O ID: %llu\n", asapo_message_meta_get_id(meta)); STATUS("ASAP::O filename: %s\n", asapo_message_meta_get_name(meta)); - STATUS("ASAP::O size: %lli\n", (long long int)msg_size); data_copy = malloc(msg_size); if ( data_copy == NULL ) { -- cgit v1.2.3