/* * im-asapo.c * * ASAP::O data interface * * Copyright © 2021 Deutsches Elektronen-Synchrotron DESY, * a research centre of the Helmholtz Association. * * Authors: * 2021 Thomas White * * This file is part of CrystFEL. * * CrystFEL is free software: you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation, either version 3 of the License, or * (at your option) any later version. * * CrystFEL is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with CrystFEL. If not, see . * */ #ifdef HAVE_CONFIG_H #include #endif #include #include #include #include #include #include #include #include #include #include "im-asapo.h" #include "datatemplate_priv.h" struct im_asapo { char *stream; int online_mode; AsapoConsumerHandle consumer; AsapoStringHandle group_id; }; static void show_asapo_error(const char *msg, const AsapoErrorHandle err) { char buf[1024]; asapo_error_explain(err, buf, sizeof(buf)); ERROR("%s: %s\n", msg, buf); } char *im_asapo_make_unique_group_id(const char *endpoint, const char *token) { AsapoConsumerHandle consumer; AsapoSourceCredentialsHandle cred; AsapoStringHandle group_id; AsapoErrorHandle err = asapo_new_handle(); cred = asapo_create_source_credentials(kProcessed, "", /* instance ID */ "", /* pipeline step */ "", /* beamtime */ "", /* beamline */ "", /* data source */ token); consumer = asapo_create_consumer(endpoint, "", 0, cred, &err); asapo_free_handle(&cred); if ( asapo_is_error(err) ) { show_asapo_error("Cannot create temporary ASAP::O consumer", err); asapo_free_handle(&consumer); return NULL; } group_id = asapo_consumer_generate_new_group_id(consumer, &err); asapo_free_handle(&consumer); if ( asapo_is_error(err) ) { show_asapo_error("Cannot create ASAP::O group ID", err); return NULL; } return strdup(asapo_string_c_str(group_id)); } struct im_asapo *im_asapo_connect(const char *endpoint, const char *token, const char *beamtime, const char *group_id, const char *data_source, const char *stream) { struct im_asapo *a; AsapoSourceCredentialsHandle cred; AsapoErrorHandle err = asapo_new_handle(); a = malloc(sizeof(struct im_asapo)); if ( a == NULL ) return NULL; cred = asapo_create_source_credentials(kProcessed, "auto", /* instance ID */ "indexamajig", /* pipeline step */ beamtime, "", /* beamline */ data_source, token); a->consumer = asapo_create_consumer(endpoint, "auto", 0, cred, &err); asapo_free_handle(&cred); if ( asapo_is_error(err) ) { show_asapo_error("Cannot create ASAP::O consumer", err); free(a); return NULL; } asapo_consumer_set_timeout(a->consumer, 3000); a->group_id = asapo_string_from_c_str(group_id); if ( stream != NULL ) { /* Named stream mode */ AsapoErrorHandle err = asapo_new_handle(); a->stream = strdup(stream); asapo_consumer_set_last_read_marker(a->consumer, a->group_id, 0, a->stream, &err); if ( asapo_is_error(err) ) { show_asapo_error("Failed to skip to start of stream", err); } else { STATUS("Skipped to start of stream %s\n", a->stream); } asapo_free_handle(&err); a->online_mode = 0; } else { /* Online mode */ a->stream = NULL; a->online_mode = 1; } return a; } static int select_last_stream(struct im_asapo *a) { AsapoStreamInfosHandle si; size_t n; int i; AsapoStreamInfoHandle st; AsapoErrorHandle err = asapo_new_handle(); si = asapo_consumer_get_stream_list(a->consumer, "", kAllStreams, &err); if ( asapo_is_error(err) ) { show_asapo_error("Couldn't get ASAP::O stream list", err); asapo_free_handle(&err); return 1; } n = asapo_stream_infos_get_size(si); if ( n == 0 ) { STATUS("No streams.\n"); return 1; } STATUS("Streams available at start:\n"); for ( i=0; istream = strdup(asapo_stream_info_get_name(st)); asapo_free_handle(&st); STATUS("Starting with the last stream: %s\n", a->stream); asapo_free_handle(&si); asapo_free_handle(&err); return 0; } static int select_next_stream(struct im_asapo *a) { AsapoStreamInfosHandle si; AsapoStreamInfoHandle st; AsapoErrorHandle err = asapo_new_handle(); const char *next_stream; si = asapo_consumer_get_stream_list(a->consumer, a->stream, kAllStreams, &err); if ( asapo_is_error(err) ) { show_asapo_error("Couldn't get ASAP::O stream list", err); asapo_free_handle(&si); asapo_free_handle(&err); return 1; } asapo_free_handle(&err); /* Stream list includes the current stream, so we need at least * two entries */ if ( asapo_stream_infos_get_size(si) < 2 ) { //STATUS("No newer stream. Waiting for new data...\n"); asapo_free_handle(&si); return 0; } /* Stream list includes the current stream, so look at the second one */ st = asapo_stream_infos_get_item(si, 1); next_stream = asapo_stream_info_get_name(st); free(a->stream); a->stream = strdup(next_stream); STATUS("Selecting next stream: %s\n", a->stream); asapo_free_handle(&st); asapo_free_handle(&si); return 0; } static void skip_to_stream_end(struct im_asapo *a) { int64_t size; AsapoErrorHandle err = asapo_new_handle(); size = asapo_consumer_get_current_size(a->consumer, a->stream, &err); if ( asapo_is_error(err) ) { show_asapo_error("Failed to get length of stream", err); } else { AsapoErrorHandle err = asapo_new_handle(); asapo_consumer_set_last_read_marker(a->consumer, a->group_id, size, a->stream, &err); if ( asapo_is_error(err) ) { show_asapo_error("Failed to skip to end of stream", err); } else { STATUS("Skipped to end of stream (%lli)\n", size); } asapo_free_handle(&err); } asapo_free_handle(&err); } void *im_asapo_fetch(struct im_asapo *a, size_t *pdata_size, char **pmeta, char **pfilename, char **pevent, int *pfinished) { void *data_copy; AsapoMessageMetaHandle meta; AsapoMessageDataHandle data; AsapoErrorHandle err; uint64_t msg_size; *pfinished = 0; profile_start("select-stream"); if ( a->stream == NULL ) { if ( select_last_stream(a) ) { profile_end("select-stream"); return NULL; } skip_to_stream_end(a); } profile_end("select-stream"); profile_start("create-handles"); err = asapo_new_handle(); meta = asapo_new_handle(); data = asapo_new_handle(); profile_end("create-handles"); profile_start("asapo-get-next"); asapo_consumer_get_next(a->consumer, a->group_id, &meta, &data, a->stream, &err); profile_end("asapo-get-next"); if ( asapo_error_get_type(err) == kEndOfStream ) { asapo_free_handle(&err); asapo_free_handle(&meta); asapo_free_handle(&data); if ( a->online_mode ) { profile_start("next-stream"); select_next_stream(a); profile_end("next-stream"); /* Sandbox will call to try again very soon */ } else { *pfinished = 1; } return NULL; } if ( asapo_is_error(err) ) { show_asapo_error("Couldn't get next ASAP::O record", err); asapo_free_handle(&err); asapo_free_handle(&meta); asapo_free_handle(&data); return NULL; } profile_start("get-size"); msg_size = asapo_message_meta_get_size(meta); profile_end("get-size"); profile_start("malloc-copy"); data_copy = malloc(msg_size); if ( data_copy == NULL ) { ERROR("Failed to copy data block.\n"); asapo_free_handle(&err); asapo_free_handle(&meta); asapo_free_handle(&data); return NULL; } memcpy(data_copy, asapo_message_data_get_as_chars(data), msg_size); profile_end("malloc-copy"); profile_start("copy-meta"); *pmeta = strdup(asapo_message_meta_get_metadata(meta)); *pfilename = strdup(asapo_message_meta_get_name(meta)); *pevent = strdup("//"); profile_end("copy-meta"); asapo_free_handle(&err); asapo_free_handle(&meta); asapo_free_handle(&data); *pdata_size = msg_size; return data_copy; } void im_asapo_shutdown(struct im_asapo *a) { if ( a == NULL ) return; asapo_free_handle(&a->consumer); asapo_free_handle(&a->group_id); free(a); }