aboutsummaryrefslogtreecommitdiff
path: root/src/im-asapo.c
diff options
context:
space:
mode:
authorThomas White <taw@physics.org>2022-06-17 12:37:16 +0200
committerThomas White <taw@physics.org>2022-06-17 15:00:40 +0200
commit889ed82d2e9d37f59e6872401eea4fed78265ab5 (patch)
tree1ce2a38494831b21cd0ab1b2608e38be175f2d87 /src/im-asapo.c
parent07217dbd38976849b50eb2c669ca902f068bdce9 (diff)
ASAP::O: Simplify by removing stream switching logic
Diffstat (limited to 'src/im-asapo.c')
-rw-r--r--src/im-asapo.c211
1 files changed, 23 insertions, 188 deletions
diff --git a/src/im-asapo.c b/src/im-asapo.c
index 95445fd0..2685036b 100644
--- a/src/im-asapo.c
+++ b/src/im-asapo.c
@@ -49,7 +49,6 @@
struct im_asapo
{
char *stream;
- int online_mode;
AsapoConsumerHandle consumer;
AsapoStringHandle group_id;
};
@@ -63,40 +62,6 @@ static void show_asapo_error(const char *msg, const AsapoErrorHandle err)
}
-char *im_asapo_make_unique_group_id(const char *endpoint,
- const char *token)
-{
- AsapoConsumerHandle consumer;
- AsapoSourceCredentialsHandle cred;
- AsapoStringHandle group_id;
- AsapoErrorHandle err = asapo_new_handle();
-
- cred = asapo_create_source_credentials(kProcessed,
- "", /* instance ID */
- "", /* pipeline step */
- "", /* beamtime */
- "", /* beamline */
- "", /* data source */
- token);
- consumer = asapo_create_consumer(endpoint, "", 0, cred, &err);
- asapo_free_handle(&cred);
- if ( asapo_is_error(err) ) {
- show_asapo_error("Cannot create temporary ASAP::O consumer", err);
- asapo_free_handle(&consumer);
- return NULL;
- }
-
- group_id = asapo_consumer_generate_new_group_id(consumer, &err);
- asapo_free_handle(&consumer);
- if ( asapo_is_error(err) ) {
- show_asapo_error("Cannot create ASAP::O group ID", err);
- return NULL;
- }
-
- return strdup(asapo_string_c_str(group_id));
-}
-
-
struct im_asapo *im_asapo_connect(const char *endpoint,
const char *token,
const char *beamtime,
@@ -108,6 +73,27 @@ struct im_asapo *im_asapo_connect(const char *endpoint,
AsapoSourceCredentialsHandle cred;
AsapoErrorHandle err = asapo_new_handle();
+ if ( endpoint == NULL ) {
+ ERROR("ASAP::O endpoint not specified.\n");
+ return NULL;
+ }
+ if ( beamtime == NULL ) {
+ ERROR("ASAP::O beamtime not specified.\n");
+ return NULL;
+ }
+ if ( group_id == NULL ) {
+ ERROR("ASAP::O consumer group ID not specified.\n");
+ return NULL;
+ }
+ if ( data_source == NULL ) {
+ ERROR("ASAP::O data source not specified.\n");
+ return NULL;
+ }
+ if ( stream == NULL ) {
+ ERROR("ASAP::O stream not specified.\n");
+ return NULL;
+ }
+
a = malloc(sizeof(struct im_asapo));
if ( a == NULL ) return NULL;
@@ -126,148 +112,14 @@ struct im_asapo *im_asapo_connect(const char *endpoint,
return NULL;
}
+ a->stream = strdup(stream);
asapo_consumer_set_timeout(a->consumer, 3000);
-
a->group_id = asapo_string_from_c_str(group_id);
- if ( stream != NULL ) {
-
- /* Named stream mode */
- AsapoErrorHandle err = asapo_new_handle();
-
- a->stream = strdup(stream);
-
- asapo_consumer_set_last_read_marker(a->consumer,
- a->group_id, 0,
- a->stream, &err);
- if ( asapo_is_error(err) ) {
- show_asapo_error("Failed to skip to start of stream", err);
- } else {
- STATUS("Skipped to start of stream %s\n", a->stream);
- }
-
- asapo_free_handle(&err);
- a->online_mode = 0;
-
- } else {
- /* Online mode */
- a->stream = NULL;
- a->online_mode = 1;
- }
return a;
}
-static int select_last_stream(struct im_asapo *a)
-{
- AsapoStreamInfosHandle si;
- size_t n;
- int i;
- AsapoStreamInfoHandle st;
- AsapoErrorHandle err = asapo_new_handle();
-
- si = asapo_consumer_get_stream_list(a->consumer, "",
- kAllStreams, &err);
-
- if ( asapo_is_error(err) ) {
- show_asapo_error("Couldn't get ASAP::O stream list", err);
- asapo_free_handle(&err);
- return 1;
- }
-
- n = asapo_stream_infos_get_size(si);
- if ( n == 0 ) {
- STATUS("No streams.\n");
- return 1;
- }
-
- STATUS("Streams available at start:\n");
- for ( i=0; i<n; i++ ) {
- AsapoStreamInfoHandle st = asapo_stream_infos_get_item(si, i);
- STATUS("Stream %i: %s\n", i, asapo_stream_info_get_name(st));
- asapo_free_handle(&st);
- }
- STATUS("End of stream list\n");
-
- st = asapo_stream_infos_get_item(si, n-1);
- a->stream = strdup(asapo_stream_info_get_name(st));
- asapo_free_handle(&st);
- STATUS("Starting with the last stream: %s\n", a->stream);
-
- asapo_free_handle(&si);
- asapo_free_handle(&err);
- return 0;
-}
-
-
-static int select_next_stream(struct im_asapo *a)
-{
- AsapoStreamInfosHandle si;
- AsapoStreamInfoHandle st;
- AsapoErrorHandle err = asapo_new_handle();
- const char *next_stream;
-
- si = asapo_consumer_get_stream_list(a->consumer, a->stream,
- kAllStreams, &err);
-
- if ( asapo_is_error(err) ) {
- show_asapo_error("Couldn't get ASAP::O stream list", err);
- asapo_free_handle(&si);
- asapo_free_handle(&err);
- return 1;
- }
-
- asapo_free_handle(&err);
-
- /* Stream list includes the current stream, so we need at least
- * two entries */
- if ( asapo_stream_infos_get_size(si) < 2 ) {
- //STATUS("No newer stream. Waiting for new data...\n");
- asapo_free_handle(&si);
- return 0;
- }
-
- /* Stream list includes the current stream, so look at the second one */
- st = asapo_stream_infos_get_item(si, 1);
- next_stream = asapo_stream_info_get_name(st);
- free(a->stream);
- a->stream = strdup(next_stream);
- STATUS("Selecting next stream: %s\n", a->stream);
- asapo_free_handle(&st);
- asapo_free_handle(&si);
-
- return 0;
-}
-
-
-static void skip_to_stream_end(struct im_asapo *a)
-{
- int64_t size;
- AsapoErrorHandle err = asapo_new_handle();
-
- size = asapo_consumer_get_current_size(a->consumer, a->stream, &err);
- if ( asapo_is_error(err) ) {
- show_asapo_error("Failed to get length of stream", err);
- } else {
-
- AsapoErrorHandle err = asapo_new_handle();
-
- asapo_consumer_set_last_read_marker(a->consumer,
- a->group_id, size,
- a->stream, &err);
- if ( asapo_is_error(err) ) {
- show_asapo_error("Failed to skip to end of stream", err);
- } else {
- STATUS("Skipped to end of stream (%lli)\n", size);
- }
-
- asapo_free_handle(&err);
- }
-
- asapo_free_handle(&err);
-}
-
-
void *im_asapo_fetch(struct im_asapo *a, size_t *pdata_size,
char **pmeta, char **pfilename, char **pevent,
int *pfinished)
@@ -280,16 +132,6 @@ void *im_asapo_fetch(struct im_asapo *a, size_t *pdata_size,
*pfinished = 0;
- profile_start("select-stream");
- if ( a->stream == NULL ) {
- if ( select_last_stream(a) ) {
- profile_end("select-stream");
- return NULL;
- }
- skip_to_stream_end(a);
- }
- profile_end("select-stream");
-
profile_start("create-handles");
err = asapo_new_handle();
meta = asapo_new_handle();
@@ -304,14 +146,7 @@ void *im_asapo_fetch(struct im_asapo *a, size_t *pdata_size,
asapo_free_handle(&err);
asapo_free_handle(&meta);
asapo_free_handle(&data);
- if ( a->online_mode ) {
- profile_start("next-stream");
- select_next_stream(a);
- profile_end("next-stream");
- /* Sandbox will call to try again very soon */
- } else {
- *pfinished = 1;
- }
+ *pfinished = 1;
return NULL;
}