diff options
Diffstat (limited to 'src/im-asapo.c')
-rw-r--r-- | src/im-asapo.c | 363 |
1 files changed, 363 insertions, 0 deletions
diff --git a/src/im-asapo.c b/src/im-asapo.c new file mode 100644 index 00000000..95445fd0 --- /dev/null +++ b/src/im-asapo.c @@ -0,0 +1,363 @@ +/* + * im-asapo.c + * + * ASAP::O data interface + * + * Copyright © 2021 Deutsches Elektronen-Synchrotron DESY, + * a research centre of the Helmholtz Association. + * + * Authors: + * 2021 Thomas White <taw@physics.org> + * + * This file is part of CrystFEL. + * + * CrystFEL is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * CrystFEL is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with CrystFEL. If not, see <http://www.gnu.org/licenses/>. + * + */ + +#ifdef HAVE_CONFIG_H +#include <config.h> +#endif + +#include <stdlib.h> +#include <stdio.h> +#include <stdint.h> +#include <assert.h> +#include <unistd.h> +#include <asapo/consumer_c.h> + +#include <image.h> +#include <utils.h> +#include <profile.h> + +#include "im-asapo.h" + +#include "datatemplate_priv.h" + + +struct im_asapo +{ + char *stream; + int online_mode; + AsapoConsumerHandle consumer; + AsapoStringHandle group_id; +}; + + +static void show_asapo_error(const char *msg, const AsapoErrorHandle err) +{ + char buf[1024]; + asapo_error_explain(err, buf, sizeof(buf)); + ERROR("%s: %s\n", msg, buf); +} + + +char *im_asapo_make_unique_group_id(const char *endpoint, + const char *token) +{ + AsapoConsumerHandle consumer; + AsapoSourceCredentialsHandle cred; + AsapoStringHandle group_id; + AsapoErrorHandle err = asapo_new_handle(); + + cred = asapo_create_source_credentials(kProcessed, + "", /* instance ID */ + "", /* pipeline step */ + "", /* beamtime */ + "", /* beamline */ + "", /* data source */ + token); + consumer = asapo_create_consumer(endpoint, "", 0, cred, &err); + asapo_free_handle(&cred); + if ( asapo_is_error(err) ) { + show_asapo_error("Cannot create temporary ASAP::O consumer", err); + asapo_free_handle(&consumer); + return NULL; + } + + group_id = asapo_consumer_generate_new_group_id(consumer, &err); + asapo_free_handle(&consumer); + if ( asapo_is_error(err) ) { + show_asapo_error("Cannot create ASAP::O group ID", err); + return NULL; + } + + return strdup(asapo_string_c_str(group_id)); +} + + +struct im_asapo *im_asapo_connect(const char *endpoint, + const char *token, + const char *beamtime, + const char *group_id, + const char *data_source, + const char *stream) +{ + struct im_asapo *a; + AsapoSourceCredentialsHandle cred; + AsapoErrorHandle err = asapo_new_handle(); + + a = malloc(sizeof(struct im_asapo)); + if ( a == NULL ) return NULL; + + cred = asapo_create_source_credentials(kProcessed, + "auto", /* instance ID */ + "indexamajig", /* pipeline step */ + beamtime, + "", /* beamline */ + data_source, + token); + a->consumer = asapo_create_consumer(endpoint, "auto", 0, cred, &err); + asapo_free_handle(&cred); + if ( asapo_is_error(err) ) { + show_asapo_error("Cannot create ASAP::O consumer", err); + free(a); + return NULL; + } + + asapo_consumer_set_timeout(a->consumer, 3000); + + a->group_id = asapo_string_from_c_str(group_id); + if ( stream != NULL ) { + + /* Named stream mode */ + AsapoErrorHandle err = asapo_new_handle(); + + a->stream = strdup(stream); + + asapo_consumer_set_last_read_marker(a->consumer, + a->group_id, 0, + a->stream, &err); + if ( asapo_is_error(err) ) { + show_asapo_error("Failed to skip to start of stream", err); + } else { + STATUS("Skipped to start of stream %s\n", a->stream); + } + + asapo_free_handle(&err); + a->online_mode = 0; + + } else { + /* Online mode */ + a->stream = NULL; + a->online_mode = 1; + } + + return a; +} + + +static int select_last_stream(struct im_asapo *a) +{ + AsapoStreamInfosHandle si; + size_t n; + int i; + AsapoStreamInfoHandle st; + AsapoErrorHandle err = asapo_new_handle(); + + si = asapo_consumer_get_stream_list(a->consumer, "", + kAllStreams, &err); + + if ( asapo_is_error(err) ) { + show_asapo_error("Couldn't get ASAP::O stream list", err); + asapo_free_handle(&err); + return 1; + } + + n = asapo_stream_infos_get_size(si); + if ( n == 0 ) { + STATUS("No streams.\n"); + return 1; + } + + STATUS("Streams available at start:\n"); + for ( i=0; i<n; i++ ) { + AsapoStreamInfoHandle st = asapo_stream_infos_get_item(si, i); + STATUS("Stream %i: %s\n", i, asapo_stream_info_get_name(st)); + asapo_free_handle(&st); + } + STATUS("End of stream list\n"); + + st = asapo_stream_infos_get_item(si, n-1); + a->stream = strdup(asapo_stream_info_get_name(st)); + asapo_free_handle(&st); + STATUS("Starting with the last stream: %s\n", a->stream); + + asapo_free_handle(&si); + asapo_free_handle(&err); + return 0; +} + + +static int select_next_stream(struct im_asapo *a) +{ + AsapoStreamInfosHandle si; + AsapoStreamInfoHandle st; + AsapoErrorHandle err = asapo_new_handle(); + const char *next_stream; + + si = asapo_consumer_get_stream_list(a->consumer, a->stream, + kAllStreams, &err); + + if ( asapo_is_error(err) ) { + show_asapo_error("Couldn't get ASAP::O stream list", err); + asapo_free_handle(&si); + asapo_free_handle(&err); + return 1; + } + + asapo_free_handle(&err); + + /* Stream list includes the current stream, so we need at least + * two entries */ + if ( asapo_stream_infos_get_size(si) < 2 ) { + //STATUS("No newer stream. Waiting for new data...\n"); + asapo_free_handle(&si); + return 0; + } + + /* Stream list includes the current stream, so look at the second one */ + st = asapo_stream_infos_get_item(si, 1); + next_stream = asapo_stream_info_get_name(st); + free(a->stream); + a->stream = strdup(next_stream); + STATUS("Selecting next stream: %s\n", a->stream); + asapo_free_handle(&st); + asapo_free_handle(&si); + + return 0; +} + + +static void skip_to_stream_end(struct im_asapo *a) +{ + int64_t size; + AsapoErrorHandle err = asapo_new_handle(); + + size = asapo_consumer_get_current_size(a->consumer, a->stream, &err); + if ( asapo_is_error(err) ) { + show_asapo_error("Failed to get length of stream", err); + } else { + + AsapoErrorHandle err = asapo_new_handle(); + + asapo_consumer_set_last_read_marker(a->consumer, + a->group_id, size, + a->stream, &err); + if ( asapo_is_error(err) ) { + show_asapo_error("Failed to skip to end of stream", err); + } else { + STATUS("Skipped to end of stream (%lli)\n", size); + } + + asapo_free_handle(&err); + } + + asapo_free_handle(&err); +} + + +void *im_asapo_fetch(struct im_asapo *a, size_t *pdata_size, + char **pmeta, char **pfilename, char **pevent, + int *pfinished) +{ + void *data_copy; + AsapoMessageMetaHandle meta; + AsapoMessageDataHandle data; + AsapoErrorHandle err; + uint64_t msg_size; + + *pfinished = 0; + + profile_start("select-stream"); + if ( a->stream == NULL ) { + if ( select_last_stream(a) ) { + profile_end("select-stream"); + return NULL; + } + skip_to_stream_end(a); + } + profile_end("select-stream"); + + profile_start("create-handles"); + err = asapo_new_handle(); + meta = asapo_new_handle(); + data = asapo_new_handle(); + profile_end("create-handles"); + + profile_start("asapo-get-next"); + asapo_consumer_get_next(a->consumer, a->group_id, &meta, &data, + a->stream, &err); + profile_end("asapo-get-next"); + if ( asapo_error_get_type(err) == kEndOfStream ) { + asapo_free_handle(&err); + asapo_free_handle(&meta); + asapo_free_handle(&data); + if ( a->online_mode ) { + profile_start("next-stream"); + select_next_stream(a); + profile_end("next-stream"); + /* Sandbox will call to try again very soon */ + } else { + *pfinished = 1; + } + return NULL; + } + + if ( asapo_is_error(err) ) { + show_asapo_error("Couldn't get next ASAP::O record", err); + asapo_free_handle(&err); + asapo_free_handle(&meta); + asapo_free_handle(&data); + return NULL; + } + + profile_start("get-size"); + msg_size = asapo_message_meta_get_size(meta); + profile_end("get-size"); + + profile_start("malloc-copy"); + data_copy = malloc(msg_size); + if ( data_copy == NULL ) { + ERROR("Failed to copy data block.\n"); + asapo_free_handle(&err); + asapo_free_handle(&meta); + asapo_free_handle(&data); + return NULL; + } + memcpy(data_copy, asapo_message_data_get_as_chars(data), msg_size); + profile_end("malloc-copy"); + + profile_start("copy-meta"); + *pmeta = strdup(asapo_message_meta_get_metadata(meta)); + *pfilename = strdup(asapo_message_meta_get_name(meta)); + *pevent = strdup("//"); + profile_end("copy-meta"); + + asapo_free_handle(&err); + asapo_free_handle(&meta); + asapo_free_handle(&data); + + *pdata_size = msg_size; + return data_copy; +} + + +void im_asapo_shutdown(struct im_asapo *a) +{ + if ( a == NULL ) return; + asapo_free_handle(&a->consumer); + asapo_free_handle(&a->group_id); + free(a); +} |