From 889ed82d2e9d37f59e6872401eea4fed78265ab5 Mon Sep 17 00:00:00 2001 From: Thomas White Date: Fri, 17 Jun 2022 12:37:16 +0200 Subject: ASAP::O: Simplify by removing stream switching logic --- src/im-asapo.c | 211 +++++++-------------------------------------------------- 1 file changed, 23 insertions(+), 188 deletions(-) (limited to 'src/im-asapo.c') diff --git a/src/im-asapo.c b/src/im-asapo.c index 95445fd0..2685036b 100644 --- a/src/im-asapo.c +++ b/src/im-asapo.c @@ -49,7 +49,6 @@ struct im_asapo { char *stream; - int online_mode; AsapoConsumerHandle consumer; AsapoStringHandle group_id; }; @@ -63,40 +62,6 @@ static void show_asapo_error(const char *msg, const AsapoErrorHandle err) } -char *im_asapo_make_unique_group_id(const char *endpoint, - const char *token) -{ - AsapoConsumerHandle consumer; - AsapoSourceCredentialsHandle cred; - AsapoStringHandle group_id; - AsapoErrorHandle err = asapo_new_handle(); - - cred = asapo_create_source_credentials(kProcessed, - "", /* instance ID */ - "", /* pipeline step */ - "", /* beamtime */ - "", /* beamline */ - "", /* data source */ - token); - consumer = asapo_create_consumer(endpoint, "", 0, cred, &err); - asapo_free_handle(&cred); - if ( asapo_is_error(err) ) { - show_asapo_error("Cannot create temporary ASAP::O consumer", err); - asapo_free_handle(&consumer); - return NULL; - } - - group_id = asapo_consumer_generate_new_group_id(consumer, &err); - asapo_free_handle(&consumer); - if ( asapo_is_error(err) ) { - show_asapo_error("Cannot create ASAP::O group ID", err); - return NULL; - } - - return strdup(asapo_string_c_str(group_id)); -} - - struct im_asapo *im_asapo_connect(const char *endpoint, const char *token, const char *beamtime, @@ -108,6 +73,27 @@ struct im_asapo *im_asapo_connect(const char *endpoint, AsapoSourceCredentialsHandle cred; AsapoErrorHandle err = asapo_new_handle(); + if ( endpoint == NULL ) { + ERROR("ASAP::O endpoint not specified.\n"); + return NULL; + } + if ( beamtime == NULL ) { + ERROR("ASAP::O beamtime not specified.\n"); + return NULL; + } + if ( group_id == NULL ) { + ERROR("ASAP::O consumer group ID not specified.\n"); + return NULL; + } + if ( data_source == NULL ) { + ERROR("ASAP::O data source not specified.\n"); + return NULL; + } + if ( stream == NULL ) { + ERROR("ASAP::O stream not specified.\n"); + return NULL; + } + a = malloc(sizeof(struct im_asapo)); if ( a == NULL ) return NULL; @@ -126,148 +112,14 @@ struct im_asapo *im_asapo_connect(const char *endpoint, return NULL; } + a->stream = strdup(stream); asapo_consumer_set_timeout(a->consumer, 3000); - a->group_id = asapo_string_from_c_str(group_id); - if ( stream != NULL ) { - - /* Named stream mode */ - AsapoErrorHandle err = asapo_new_handle(); - - a->stream = strdup(stream); - - asapo_consumer_set_last_read_marker(a->consumer, - a->group_id, 0, - a->stream, &err); - if ( asapo_is_error(err) ) { - show_asapo_error("Failed to skip to start of stream", err); - } else { - STATUS("Skipped to start of stream %s\n", a->stream); - } - - asapo_free_handle(&err); - a->online_mode = 0; - - } else { - /* Online mode */ - a->stream = NULL; - a->online_mode = 1; - } return a; } -static int select_last_stream(struct im_asapo *a) -{ - AsapoStreamInfosHandle si; - size_t n; - int i; - AsapoStreamInfoHandle st; - AsapoErrorHandle err = asapo_new_handle(); - - si = asapo_consumer_get_stream_list(a->consumer, "", - kAllStreams, &err); - - if ( asapo_is_error(err) ) { - show_asapo_error("Couldn't get ASAP::O stream list", err); - asapo_free_handle(&err); - return 1; - } - - n = asapo_stream_infos_get_size(si); - if ( n == 0 ) { - STATUS("No streams.\n"); - return 1; - } - - STATUS("Streams available at start:\n"); - for ( i=0; istream = strdup(asapo_stream_info_get_name(st)); - asapo_free_handle(&st); - STATUS("Starting with the last stream: %s\n", a->stream); - - asapo_free_handle(&si); - asapo_free_handle(&err); - return 0; -} - - -static int select_next_stream(struct im_asapo *a) -{ - AsapoStreamInfosHandle si; - AsapoStreamInfoHandle st; - AsapoErrorHandle err = asapo_new_handle(); - const char *next_stream; - - si = asapo_consumer_get_stream_list(a->consumer, a->stream, - kAllStreams, &err); - - if ( asapo_is_error(err) ) { - show_asapo_error("Couldn't get ASAP::O stream list", err); - asapo_free_handle(&si); - asapo_free_handle(&err); - return 1; - } - - asapo_free_handle(&err); - - /* Stream list includes the current stream, so we need at least - * two entries */ - if ( asapo_stream_infos_get_size(si) < 2 ) { - //STATUS("No newer stream. Waiting for new data...\n"); - asapo_free_handle(&si); - return 0; - } - - /* Stream list includes the current stream, so look at the second one */ - st = asapo_stream_infos_get_item(si, 1); - next_stream = asapo_stream_info_get_name(st); - free(a->stream); - a->stream = strdup(next_stream); - STATUS("Selecting next stream: %s\n", a->stream); - asapo_free_handle(&st); - asapo_free_handle(&si); - - return 0; -} - - -static void skip_to_stream_end(struct im_asapo *a) -{ - int64_t size; - AsapoErrorHandle err = asapo_new_handle(); - - size = asapo_consumer_get_current_size(a->consumer, a->stream, &err); - if ( asapo_is_error(err) ) { - show_asapo_error("Failed to get length of stream", err); - } else { - - AsapoErrorHandle err = asapo_new_handle(); - - asapo_consumer_set_last_read_marker(a->consumer, - a->group_id, size, - a->stream, &err); - if ( asapo_is_error(err) ) { - show_asapo_error("Failed to skip to end of stream", err); - } else { - STATUS("Skipped to end of stream (%lli)\n", size); - } - - asapo_free_handle(&err); - } - - asapo_free_handle(&err); -} - - void *im_asapo_fetch(struct im_asapo *a, size_t *pdata_size, char **pmeta, char **pfilename, char **pevent, int *pfinished) @@ -280,16 +132,6 @@ void *im_asapo_fetch(struct im_asapo *a, size_t *pdata_size, *pfinished = 0; - profile_start("select-stream"); - if ( a->stream == NULL ) { - if ( select_last_stream(a) ) { - profile_end("select-stream"); - return NULL; - } - skip_to_stream_end(a); - } - profile_end("select-stream"); - profile_start("create-handles"); err = asapo_new_handle(); meta = asapo_new_handle(); @@ -304,14 +146,7 @@ void *im_asapo_fetch(struct im_asapo *a, size_t *pdata_size, asapo_free_handle(&err); asapo_free_handle(&meta); asapo_free_handle(&data); - if ( a->online_mode ) { - profile_start("next-stream"); - select_next_stream(a); - profile_end("next-stream"); - /* Sandbox will call to try again very soon */ - } else { - *pfinished = 1; - } + *pfinished = 1; return NULL; } -- cgit v1.2.3