aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/im-asapo.c62
1 files changed, 28 insertions, 34 deletions
diff --git a/src/im-asapo.c b/src/im-asapo.c
index 0fc0a2b2..7b3c8b8b 100644
--- a/src/im-asapo.c
+++ b/src/im-asapo.c
@@ -124,36 +124,34 @@ struct im_asapo *im_asapo_connect(const char *endpoint,
static int select_last_stream(struct im_asapo *a)
{
AsapoStreamInfosHandle si;
- size_t len;
+ size_t n;
int i;
AsapoStreamInfoHandle st;
AsapoErrorHandle err = asapo_new_handle();
- si = asapo_consumer_get_stream_list(a->consumer, NULL,
+ si = asapo_consumer_get_stream_list(a->consumer, "",
kAllStreams, &err);
if ( asapo_is_error(err) ) {
show_asapo_error("Couldn't get ASAP::O stream list", err);
- asapo_free_handle(err);
+ asapo_free_handle(&err);
return 1;
}
- STATUS("for info: stream list:\n");
+ STATUS("Streams available at start:\n");
n = asapo_stream_infos_get_size(si);
for ( i=0; i<n; i++ ) {
AsapoStreamInfoHandle st;
st = asapo_stream_infos_get_item(si, i);
STATUS("Stream %i: %s\n", i, asapo_stream_info_get_name(st));
- asapo_free_handle(st);
}
- STATUS("end of stream list\n");
+ STATUS("End of stream list\n");
st = asapo_stream_infos_get_item(si, n-1);
a->stream = strdup(asapo_stream_info_get_name(st));
- asapo_free_handle(st);
+ STATUS("Starting with the last stream: %s\n", a->stream);
- asapo_free_handle(si);
- asapo_free_handle(err);
+ asapo_free_handle(&err);
return 0;
}
@@ -161,39 +159,30 @@ static int select_last_stream(struct im_asapo *a)
static int select_next_stream(struct im_asapo *a)
{
AsapoStreamInfosHandle si;
- size_t len;
- int i;
+ AsapoStreamInfoHandle st;
AsapoErrorHandle err = asapo_new_handle();
+ const char *next_stream;
- si = asapo_consumer_get_stream_list(a->consumer, NULL,
+ si = asapo_consumer_get_stream_list(a->consumer, a->stream,
kAllStreams, &err);
if ( asapo_is_error(err) ) {
show_asapo_error("Couldn't get ASAP::O stream list", err);
- asapo_free_handle(err);
+ asapo_free_handle(&err);
return 1;
}
- n = asapo_stream_infos_get_size(si);
- for ( i=n-1; i>0; i-- ) {
-
- AsapoStreamInfoHandle st;
- const char *name;
-
- st = asapo_stream_infos_get_item(si, i);
- name = asapo_stream_info_get_name(st);
-
- if ( strcmp(name, a->stream) == 0 ) {
- free(a->stream);
- a->stream = strdup(asapo_stream_info_get_next_stream(st));
- asapo_free_handle(st);
- break;
- }
+ asapo_free_handle(&err);
- asapo_free_handle(st);
+ st = asapo_stream_infos_get_item(si, 0);
+ next_stream = asapo_stream_info_get_name(st);
+ if ( strcmp(next_stream, a->stream) == 0 ) {
+ STATUS("Waiting for new data...\n");
+ } else {
+ free(a->stream);
+ a->stream = strdup(next_stream);
+ STATUS("Selecting next stream: %s\n", a->stream);
}
- asapo_free_handle(si);
- asapo_free_handle(err);
return 0;
}
@@ -201,7 +190,13 @@ static int select_next_stream(struct im_asapo *a)
static void skip_to_stream_end(struct im_asapo *a)
{
- /* FIXME: Implementation */
+ int64_t size;
+ AsapoErrorHandle err = asapo_new_handle();
+
+ size = asapo_consumer_get_current_size(a->consumer, a->stream, &err);
+ asapo_consumer_set_last_read_marker(a->consumer, a->group_id, size,
+ a->stream, &err);
+ STATUS("Skipping to end of stream (%lli)\n", size);
}
@@ -231,7 +226,7 @@ void *im_asapo_fetch(struct im_asapo *a, size_t *pdata_size)
asapo_free_handle(&err);
asapo_free_handle(&meta);
asapo_free_handle(&data);
- return NULL; /* Please call back later! */
+ return NULL; /* Sandbox will call try again very soon */
}
if ( asapo_is_error(err) ) {
@@ -246,7 +241,6 @@ void *im_asapo_fetch(struct im_asapo *a, size_t *pdata_size)
STATUS("ASAP::O ID: %llu\n", asapo_message_meta_get_id(meta));
STATUS("ASAP::O filename: %s\n", asapo_message_meta_get_name(meta));
- STATUS("ASAP::O size: %lli\n", (long long int)msg_size);
data_copy = malloc(msg_size);
if ( data_copy == NULL ) {