diff options
-rw-r--r-- | src/im-asapo.c | 122 |
1 files changed, 118 insertions, 4 deletions
diff --git a/src/im-asapo.c b/src/im-asapo.c index 5f4f2b65..0fc0a2b2 100644 --- a/src/im-asapo.c +++ b/src/im-asapo.c @@ -47,6 +47,7 @@ struct im_asapo { + char *stream; AsapoConsumerHandle consumer; AsapoStringHandle group_id; }; @@ -114,23 +115,130 @@ struct im_asapo *im_asapo_connect(const char *endpoint, asapo_consumer_set_timeout(a->consumer, 1000); a->group_id = asapo_string_from_c_str(group_id); + a->stream = NULL; return a; } +static int select_last_stream(struct im_asapo *a) +{ + AsapoStreamInfosHandle si; + size_t len; + int i; + AsapoStreamInfoHandle st; + AsapoErrorHandle err = asapo_new_handle(); + + si = asapo_consumer_get_stream_list(a->consumer, NULL, + kAllStreams, &err); + + if ( asapo_is_error(err) ) { + show_asapo_error("Couldn't get ASAP::O stream list", err); + asapo_free_handle(err); + return 1; + } + + STATUS("for info: stream list:\n"); + n = asapo_stream_infos_get_size(si); + for ( i=0; i<n; i++ ) { + AsapoStreamInfoHandle st; + st = asapo_stream_infos_get_item(si, i); + STATUS("Stream %i: %s\n", i, asapo_stream_info_get_name(st)); + asapo_free_handle(st); + } + STATUS("end of stream list\n"); + + st = asapo_stream_infos_get_item(si, n-1); + a->stream = strdup(asapo_stream_info_get_name(st)); + asapo_free_handle(st); + + asapo_free_handle(si); + asapo_free_handle(err); + return 0; +} + + +static int select_next_stream(struct im_asapo *a) +{ + AsapoStreamInfosHandle si; + size_t len; + int i; + AsapoErrorHandle err = asapo_new_handle(); + + si = asapo_consumer_get_stream_list(a->consumer, NULL, + kAllStreams, &err); + + if ( asapo_is_error(err) ) { + show_asapo_error("Couldn't get ASAP::O stream list", err); + asapo_free_handle(err); + return 1; + } + + n = asapo_stream_infos_get_size(si); + for ( i=n-1; i>0; i-- ) { + + AsapoStreamInfoHandle st; + const char *name; + + st = asapo_stream_infos_get_item(si, i); + name = asapo_stream_info_get_name(st); + + if ( strcmp(name, a->stream) == 0 ) { + free(a->stream); + a->stream = strdup(asapo_stream_info_get_next_stream(st)); + asapo_free_handle(st); + break; + } + + asapo_free_handle(st); + } + asapo_free_handle(si); + asapo_free_handle(err); + + return 0; +} + + +static void skip_to_stream_end(struct im_asapo *a) +{ + /* FIXME: Implementation */ +} + + void *im_asapo_fetch(struct im_asapo *a, size_t *pdata_size) { void *data_copy; - AsapoMessageMetaHandle meta = asapo_new_handle(); - AsapoMessageDataHandle data = asapo_new_handle(); + AsapoMessageMetaHandle meta; + AsapoMessageDataHandle data; AsapoErrorHandle err = asapo_new_handle(); uint64_t msg_size; + if ( a->stream == NULL ) { + if ( select_last_stream(a) ) { + asapo_free_handle(&err); + return NULL; + } + skip_to_stream_end(a); + } + + meta = asapo_new_handle(); + data = asapo_new_handle(); + asapo_consumer_get_next(a->consumer, a->group_id, &meta, &data, - "default", &err); + a->stream, &err); + if ( asapo_error_get_type(err) == kEndOfStream ) { + select_next_stream(a); + asapo_free_handle(&err); + asapo_free_handle(&meta); + asapo_free_handle(&data); + return NULL; /* Please call back later! */ + } + if ( asapo_is_error(err) ) { show_asapo_error("Couldn't get next ASAP::O record", err); + asapo_free_handle(&err); + asapo_free_handle(&meta); + asapo_free_handle(&data); return NULL; } @@ -141,7 +249,13 @@ void *im_asapo_fetch(struct im_asapo *a, size_t *pdata_size) STATUS("ASAP::O size: %lli\n", (long long int)msg_size); data_copy = malloc(msg_size); - if ( data_copy == NULL ) return NULL; + if ( data_copy == NULL ) { + ERROR("Failed to copy data block.\n"); + asapo_free_handle(&err); + asapo_free_handle(&meta); + asapo_free_handle(&data); + return NULL; + } memcpy(data_copy, asapo_message_data_get_as_chars(data), msg_size); asapo_free_handle(&err); |