From 095fb15a3136fc20642d3ff12bfe26a1aec1b797 Mon Sep 17 00:00:00 2001 From: Thomas White Date: Mon, 28 Mar 2022 14:32:31 +0200 Subject: Skeleton ASAP::O consumer --- CMakeLists.txt | 17 +++++++++++++++++ config.h.cmake.in | 1 + config.h.in | 1 + meson.build | 12 +++++++++++- 4 files changed, 30 insertions(+), 1 deletion(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index 6002d708..7702e643 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -34,6 +34,13 @@ else () message(STATUS "ZMQ not found.") endif () +pkg_search_module(ASAPO libasapo-consumer) +if (ASAPO_FOUND) + message(STATUS "Found ASAP::O") +else () + message(STATUS "ASAP::O not found") +endif () + # Find out where forkpty() is declared set(CMAKE_REQUIRED_LIBRARIES "-lutil") check_symbol_exists(forkpty "pty.h" HAVE_FORKPTY_PTY_H) @@ -108,6 +115,7 @@ set(HAVE_GDKPIXBUF ${GDKPIXBUF_FOUND}) set(HAVE_GDK ${GDK_FOUND}) set(HAVE_ZMQ ${ZMQ_FOUND}) set(HAVE_HDF5 1) +set(HAVE_ASAPO ${ASAPO_FOUND}) set(PACKAGE_VERSION ${PROJECT_VERSION}) @@ -247,6 +255,10 @@ if ( ZMQ_FOUND ) list(APPEND INDEXAMAJIG_SOURCES src/im-zmq.c) endif () +if ( ASAPO_FOUND ) + list(APPEND INDEXAMAJIG_SOURCES src/im-asapo.c) +endif () + add_executable(indexamajig ${INDEXAMAJIG_SOURCES} ${CMAKE_CURRENT_BINARY_DIR}/version.c) target_include_directories(indexamajig PRIVATE ${COMMON_INCLUDES}) @@ -258,6 +270,11 @@ if ( ZMQ_FOUND ) target_link_libraries(indexamajig ${ZMQ_LDFLAGS}) endif () +if ( ASAPO_FOUND ) + target_include_directories(indexamajig PRIVATE ${ASAPO_INCLUDE_DIR}) + target_link_libraries(indexamajig ${ASAPO_LDFLAGS}) +endif () + # ---------------------------------------------------------------------- # get_hkl diff --git a/config.h.cmake.in b/config.h.cmake.in index 2c781b3c..c056cc4a 100644 --- a/config.h.cmake.in +++ b/config.h.cmake.in @@ -10,3 +10,4 @@ #cmakedefine HAVE_ZMQ #cmakedefine HAVE_SLURM #cmakedefine HAVE_HDF5 +#cmakedefine HAVE_ASAPO diff --git a/config.h.in b/config.h.in index 058e5cea..afa44161 100644 --- a/config.h.in +++ b/config.h.in @@ -10,3 +10,4 @@ #mesondefine HAVE_ZMQ #mesondefine HAVE_SLURM #mesondefine HAVE_HDF5 +#mesondefine HAVE_ASAPO diff --git a/meson.build b/meson.build index be0f6477..964d3f61 100644 --- a/meson.build +++ b/meson.build @@ -69,6 +69,11 @@ if zmqdep.found() conf_data.set10('HAVE_ZMQ', true) endif +asapodep = dependency('libasapo-consumer', required: false) +if asapodep.found() + conf_data.set10('HAVE_ASAPO', 1) +endif + if cc.has_function('clock_gettime', prefix: '#include ') conf_data.set10('HAVE_CLOCK_GETTIME', true) endif @@ -167,8 +172,13 @@ if zmqdep.found() indexamajig_sources += ['src/im-zmq.c'] endif +if asapodep.found() + indexamajig_sources += ['src/im-asapo.c'] +endif + indexamajig = executable('indexamajig', indexamajig_sources, - dependencies: [mdep, libcrystfeldep, gsldep, pthreaddep, zmqdep], + dependencies: [mdep, libcrystfeldep, gsldep, + pthreaddep, zmqdep, asapodep], install: true, install_rpath: '$ORIGIN/../lib64/:$ORIGIN/../lib') -- cgit v1.2.3 From b28228b5826235ca917d9f42af7d22f75b38bb13 Mon Sep 17 00:00:00 2001 From: Thomas White Date: Thu, 1 Jul 2021 15:50:31 +0200 Subject: ASAP::O guts --- src/im-asapo.c | 130 ++++++++++++++++++++++++++++++++++++++++++++++++++++++ src/im-asapo.h | 51 +++++++++++++++++++++ src/im-sandbox.c | 36 +++++++++++++++ src/im-sandbox.h | 2 + src/indexamajig.c | 33 ++++++++++++-- 5 files changed, 249 insertions(+), 3 deletions(-) create mode 100644 src/im-asapo.c create mode 100644 src/im-asapo.h diff --git a/src/im-asapo.c b/src/im-asapo.c new file mode 100644 index 00000000..e6061268 --- /dev/null +++ b/src/im-asapo.c @@ -0,0 +1,130 @@ +/* + * asapo.c + * + * ASAP::O data interface + * + * Copyright © 2021 Deutsches Elektronen-Synchrotron DESY, + * a research centre of the Helmholtz Association. + * + * Authors: + * 2021 Thomas White + * + * This file is part of CrystFEL. + * + * CrystFEL is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * CrystFEL is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with CrystFEL. If not, see . + * + */ + +#ifdef HAVE_CONFIG_H +#include +#endif + +#include +#include +#include +#include +#include +#include + +#include +#include + +#include "im-asapo.h" + +#include "datatemplate_priv.h" + + +struct im_asapo +{ + AsapoConsumerHandle consumer; + AsapoStringHandle group_id; +}; + + +static void show_asapo_error(const char *msg, const AsapoErrorHandle err) +{ + char buf[1024]; + asapo_error_explain(err, buf, sizeof(buf)); + ERROR("%s: %s\n", msg, buf); +} + + +struct im_asapo *im_asapo_connect(const char *endpoint, + const char *token, + const char *beamtime, + const char *path) +{ + struct im_asapo *a; + AsapoSourceCredentialsHandle cred; + AsapoErrorHandle err = asapo_new_handle(); + + a = malloc(sizeof(struct im_asapo)); + if ( a == NULL ) return NULL; + + cred = asapo_create_source_credentials(kProcessed, beamtime, "", "", token); + a->consumer = asapo_create_consumer(endpoint, path, 1, cred, &err); + asapo_free_handle(&cred); + if ( err ) { + show_asapo_error("Cannot create ASAP::O consumer", err); + free(a); + return NULL; + } + + asapo_consumer_set_timeout(a->consumer, 1000); + + a->group_id = asapo_consumer_generate_new_group_id(a->consumer, &err); + if ( err ) { + show_asapo_error("Cannot create ASAP::O group ID", err); + free(a); + return NULL; + } + + return a; +} + + +void *im_asapo_fetch(struct im_asapo *a, size_t *pdata_size) +{ + void *data_block; + AsapoMessageMetaHandle meta = asapo_new_handle(); + AsapoMessageDataHandle data = asapo_new_handle(); + AsapoErrorHandle err = asapo_new_handle(); + + asapo_consumer_get_next(a->consumer, a->group_id, &meta, &data, + "default", &err); + if ( err ) { + show_asapo_error("Couldn't get next ASAP::O record", err); + return NULL; + } + + STATUS("ASAP::O ID: %llu\n", asapo_message_meta_get_id(meta)); + STATUS("ASAP::O filename: %s\n", asapo_message_meta_get_name(meta)); + + data_block = asapo_message_data_get_as_chars(data); + + asapo_free_handle(&err); + asapo_free_handle(&meta); + asapo_free_handle(&data); + + return data_block; +} + + +void im_asapo_shutdown(struct im_asapo *a) +{ + if ( a == NULL ) return; + asapo_free_handle(&a->consumer); + asapo_free_handle(&a->group_id); + free(a); +} diff --git a/src/im-asapo.h b/src/im-asapo.h new file mode 100644 index 00000000..254e7df3 --- /dev/null +++ b/src/im-asapo.h @@ -0,0 +1,51 @@ +/* + * asapo.h + * + * ASAP::O data interface + * + * Copyright © 2021 Deutsches Elektronen-Synchrotron DESY, + * a research centre of the Helmholtz Association. + * + * Authors: + * 2021 Thomas White + * + * This file is part of CrystFEL. + * + * CrystFEL is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * CrystFEL is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with CrystFEL. If not, see . + * + */ + + +#ifndef CRYSTFEL_ASAPO_H +#define CRYSTFEL_ASAPO_H + +#ifdef HAVE_CONFIG_H +#include +#endif + +#if defined(HAVE_ASAPO) + +extern struct im_asapo *im_asapo_connect(); +extern void im_asapo_shutdown(struct im_asapo *a); +extern void *im_asapo_fetch(struct im_asapo *a, size_t *pdata_size); + +#else /* defined(HAVE_ASAPO) */ + +static UNUSED struct im_asapo *im_asapo_connect() { return NULL; } +static UNUSED void im_asapo_shutdown(struct im_asapo *a) { } +static UNUSED void *im_asapo_fetch(struct im_asapo *a, size_t *psize) { *psize = 0; return NULL; } + +#endif /* defined(HAVE_ASAPO) */ + +#endif /* CRYSTFEL_ASAPO_H */ diff --git a/src/im-sandbox.c b/src/im-sandbox.c index bd3f47e7..ed58a3be 100644 --- a/src/im-sandbox.c +++ b/src/im-sandbox.c @@ -104,6 +104,13 @@ struct sandbox int n_zmq_subscriptions; const char *zmq_request; + /* ASAP::O mode */ + int asapo; + const char *asapo_endpoint; + const char *asapo_token; + const char *asapo_beamtime; + const char *asapo_path; + /* Final output */ Stream *stream; }; @@ -330,6 +337,7 @@ static int run_work(const struct index_args *iargs, Stream *st, { int allDone = 0; struct im_zmq *zmqstuff = NULL; + struct im_asapo *asapostuff = NULL; if ( sb->profile ) { profile_init(); @@ -347,6 +355,17 @@ static int run_work(const struct index_args *iargs, Stream *st, } } + if ( sb->asapo ) { + zmqstuff = im_zmq_connect(sb->zmq_address, + sb->zmq_subscriptions, + sb->n_zmq_subscriptions, + sb->zmq_request); + if ( zmqstuff == NULL ) { + ERROR("ZMQ setup failed.\n"); + return 1; + } + } + while ( !allDone ) { struct pattern_args pargs; @@ -1056,6 +1075,8 @@ int create_sandbox(struct index_args *iargs, int n_proc, char *prefix, Stream *stream, const char *tmpdir, int serial_start, const char *zmq_address, char **zmq_subscriptions, int n_zmq_subscriptions, const char *zmq_request, + const char *asapo_endpoint, const char *asapo_token, + const char *asapo_beamtime, const char *asapo_path, int timeout, int profile) { int i; @@ -1096,6 +1117,21 @@ int create_sandbox(struct index_args *iargs, int n_proc, char *prefix, sb->zmq = 0; } + if ( asapo_endpoint != NULL ) { + sb->asapo = 1; + sb->asapo_endpoint = asapo_endpoint; + sb->asapo_token = asapo_token; + sb->asapo_beamtime = asapo_beamtime; + sb->asapo_path = asapo_path; + } else { + sb->asapo = 0; + } + + if ( sb->zmq && sb->asapo ) { + ERROR("Cannot simultaneously use ZMQ and ASAP::O input.\n"); + return 0; + } + sb->fds = NULL; sb->fhs = NULL; sb->stream = stream; diff --git a/src/im-sandbox.h b/src/im-sandbox.h index a76e69ec..5efd3595 100644 --- a/src/im-sandbox.h +++ b/src/im-sandbox.h @@ -87,6 +87,8 @@ extern int create_sandbox(struct index_args *iargs, int n_proc, char *prefix, const char *tempdir, int serial_start, const char *zmq_address, char **zmq_subscriptions, int n_zmq_subscriptions, const char *zmq_request, + const char *asapo_endpoint, const char *asapo_token, + const char *asapo_beamtime, const char *asapo_path, int timeout, int profile); #endif /* IM_SANDBOX_H */ diff --git a/src/indexamajig.c b/src/indexamajig.c index fc22a562..252a9e8f 100644 --- a/src/indexamajig.c +++ b/src/indexamajig.c @@ -84,6 +84,10 @@ struct indexamajig_arguments char *zmq_request; char *zmq_subscriptions[256]; int n_zmq_subscriptions; + char *asapo_endpoint; + char *asapo_token; + char *asapo_beamtime; + char *asapo_path; int serial_start; char *temp_location; int if_refine; @@ -402,6 +406,22 @@ static error_t parse_arg(int key, char *arg, struct argp_state *state) args->zmq_request = strdup(arg); break; + case 213 : + args->asapo_endpoint = strdup(arg); + break; + + case 214 : + args->asapo_token = strdup(arg); + break; + + case 215 : + args->asapo_beamtime = strdup(arg); + break; + + case 216 : + args->asapo_path = strdup(arg); + break; + case 219 : args->iargs.data_format = parse_data_format(arg); if ( args->iargs.data_format == DATA_SOURCE_TYPE_UNKNOWN ) { @@ -934,6 +954,10 @@ int main(int argc, char *argv[]) "type"}, {"zmq-request", 212, "str", OPTION_NO_USAGE, "Request messages using" "this string."}, + {"asapo-endpoint", 213, "str", OPTION_NO_USAGE, "ASAP::O endpoint"}, + {"asapo-token", 214, "str", OPTION_NO_USAGE, "ASAP::O token"}, + {"asapo-beamtime", 215, "str", OPTION_NO_USAGE, "ASAP::O beamtime ID"}, + {"asapo-path", 216, "str", OPTION_NO_USAGE, "ASAP::O path to files"}, {"data-format", 219, "str", OPTION_NO_USAGE, "Streamed data format"}, {NULL, 0, 0, OPTION_DOC, "Peak search options:", 3}, @@ -1290,9 +1314,12 @@ int main(int argc, char *argv[]) gsl_set_error_handler_off(); r = create_sandbox(&args.iargs, args.n_proc, args.prefix, args.basename, - fh, st, tmpdir, args.serial_start, args.zmq_addr, - args.zmq_subscriptions, args.n_zmq_subscriptions, - args.zmq_request, timeout, args.profile); + fh, st, tmpdir, args.serial_start, + args.zmq_addr, args.zmq_subscriptions, + args.n_zmq_subscriptions, args.zmq_request, + args.asapo_endpoint, args.asapo_token, + args.asapo_beamtime, args.asapo_path, + timeout, args.profile); cell_free(args.iargs.cell); free(args.prefix); -- cgit v1.2.3 From 45d3a8670da1d40145202ed42129c7cddba18df5 Mon Sep 17 00:00:00 2001 From: Thomas White Date: Tue, 6 Jul 2021 15:15:21 +0200 Subject: ASAP::O: Group ID stuff --- src/im-asapo.c | 49 ++++++++++++++++++++++++++++++++++++++++++------- src/im-asapo.h | 54 ++++++++++++++++++++++++++++++++++++++++++++++++++---- src/im-sandbox.c | 25 +++++++++++++++++++------ src/im-sandbox.h | 1 + src/indexamajig.c | 12 ++++++++++++ 5 files changed, 124 insertions(+), 17 deletions(-) diff --git a/src/im-asapo.c b/src/im-asapo.c index e6061268..4eafd058 100644 --- a/src/im-asapo.c +++ b/src/im-asapo.c @@ -60,10 +60,50 @@ static void show_asapo_error(const char *msg, const AsapoErrorHandle err) } +AsapoStringHandle im_asapo_group_id_from_string(const char *str) +{ + /* FIXME: This function does not yet exist */ + return asapostringhandle_from_string(str); +} + + +AsapoStringHandle im_asapo_make_unique_group_id(const char *endpoint, + const char *token, + const char *beamtime, + const char *path) +{ + AsapoConsumerHandle consumer; + AsapoSourceCredentialsHandle cred; + AsapoStringHandle group_id; + AsapoErrorHandle err = asapo_new_handle(); + + cred = asapo_create_source_credentials(kProcessed, beamtime, "", "", token); + consumer = asapo_create_consumer(endpoint, path, 1, cred, &err); + asapo_free_handle(&cred); + if ( err ) { + show_asapo_error("Cannot create temporary ASAP::O consumer", err); + asapo_free_handle(&consumer); + return NULL; + } + + asapo_consumer_set_timeout(consumer, 1000); + + group_id = asapo_consumer_generate_new_group_id(consumer, &err); + asapo_free_handle(&consumer); + if ( err ) { + show_asapo_error("Cannot create ASAP::O group ID", err); + return NULL; + } + + return group_id; +} + + struct im_asapo *im_asapo_connect(const char *endpoint, const char *token, const char *beamtime, - const char *path) + const char *path, + AsapoStringHandle group_id) { struct im_asapo *a; AsapoSourceCredentialsHandle cred; @@ -83,12 +123,7 @@ struct im_asapo *im_asapo_connect(const char *endpoint, asapo_consumer_set_timeout(a->consumer, 1000); - a->group_id = asapo_consumer_generate_new_group_id(a->consumer, &err); - if ( err ) { - show_asapo_error("Cannot create ASAP::O group ID", err); - free(a); - return NULL; - } + a->group_id = group_id; return a; } diff --git a/src/im-asapo.h b/src/im-asapo.h index 254e7df3..3ffc5808 100644 --- a/src/im-asapo.h +++ b/src/im-asapo.h @@ -36,15 +36,61 @@ #if defined(HAVE_ASAPO) -extern struct im_asapo *im_asapo_connect(); +#include + +extern struct im_asapo *im_asapo_connect(const char *endpoint, + const char *token, + const char *beamtime, + const char *path, + AsapoStringHandle group_id); + extern void im_asapo_shutdown(struct im_asapo *a); + extern void *im_asapo_fetch(struct im_asapo *a, size_t *pdata_size); +extern AsapoStringHandle im_asapo_group_id_from_string(const char *str); + +extern AsapoStringHandle im_asapo_make_unique_group_id(const char *endpoint, + const char *token, + const char *beamtime, + const char *path); + #else /* defined(HAVE_ASAPO) */ -static UNUSED struct im_asapo *im_asapo_connect() { return NULL; } -static UNUSED void im_asapo_shutdown(struct im_asapo *a) { } -static UNUSED void *im_asapo_fetch(struct im_asapo *a, size_t *psize) { *psize = 0; return NULL; } +typedef void* AsapoStringHandle; + +static UNUSED struct im_asapo *im_asapo_connect(const char *endpoint, + const char *token, + const char *beamtime, + const char *path, + AsapoStringHandle group_id) +{ + return NULL; +} + +static UNUSED void im_asapo_shutdown(struct im_asapo *a) +{ +} + +static UNUSED void *im_asapo_fetch(struct im_asapo *a, size_t *psize) +{ + *psize = 0; + return NULL; +} + +static UNUSED AsapoStringHandle im_asapo_group_id_from_string(const char *str) +{ + return NULL; +} + +static UNUSED AsapoStringHandle im_asapo_make_unique_group_id(const char *endpoint, + const char *token, + const char *beamtime, + const char *path + AsapoStringHandle group_id) +{ + return NULL; +} #endif /* defined(HAVE_ASAPO) */ diff --git a/src/im-sandbox.c b/src/im-sandbox.c index ed58a3be..654f07d1 100644 --- a/src/im-sandbox.c +++ b/src/im-sandbox.c @@ -62,6 +62,7 @@ #include "process_image.h" #include "im-zmq.h" #include "profile.h" +#include "im-asapo.h" struct sandbox @@ -110,6 +111,7 @@ struct sandbox const char *asapo_token; const char *asapo_beamtime; const char *asapo_path; + AsapoStringHandle asapo_group_id; /* Final output */ Stream *stream; @@ -356,12 +358,13 @@ static int run_work(const struct index_args *iargs, Stream *st, } if ( sb->asapo ) { - zmqstuff = im_zmq_connect(sb->zmq_address, - sb->zmq_subscriptions, - sb->n_zmq_subscriptions, - sb->zmq_request); - if ( zmqstuff == NULL ) { - ERROR("ZMQ setup failed.\n"); + asapostuff = im_asapo_connect(sb->asapo_endpoint, + sb->asapo_token, + sb->asapo_beamtime, + sb->asapo_path, + sb->asapo_group_id); + if ( asapostuff == NULL ) { + ERROR("ASAP::O setup failed.\n"); return 1; } } @@ -1077,6 +1080,7 @@ int create_sandbox(struct index_args *iargs, int n_proc, char *prefix, int n_zmq_subscriptions, const char *zmq_request, const char *asapo_endpoint, const char *asapo_token, const char *asapo_beamtime, const char *asapo_path, + const char *asapo_group_id, int timeout, int profile) { int i; @@ -1132,6 +1136,15 @@ int create_sandbox(struct index_args *iargs, int n_proc, char *prefix, return 0; } + if ( asapo_group_id != NULL ) { + sb->asapo_group_id = im_asapo_group_id_from_string(asapo_group_id); + } else { + sb->asapo_group_id = im_asapo_make_unique_group_id(asapo_endpoint, + asapo_token, + asapo_beamtime, + asapo_path); + } + sb->fds = NULL; sb->fhs = NULL; sb->stream = stream; diff --git a/src/im-sandbox.h b/src/im-sandbox.h index 5efd3595..0415cb80 100644 --- a/src/im-sandbox.h +++ b/src/im-sandbox.h @@ -89,6 +89,7 @@ extern int create_sandbox(struct index_args *iargs, int n_proc, char *prefix, int n_zmq_subscriptions, const char *zmq_request, const char *asapo_endpoint, const char *asapo_token, const char *asapo_beamtime, const char *asapo_path, + const char *asapo_group_id, int timeout, int profile); #endif /* IM_SANDBOX_H */ diff --git a/src/indexamajig.c b/src/indexamajig.c index 252a9e8f..ee23fcc3 100644 --- a/src/indexamajig.c +++ b/src/indexamajig.c @@ -88,6 +88,7 @@ struct indexamajig_arguments char *asapo_token; char *asapo_beamtime; char *asapo_path; + char *asapo_group_id; int serial_start; char *temp_location; int if_refine; @@ -422,6 +423,10 @@ static error_t parse_arg(int key, char *arg, struct argp_state *state) args->asapo_path = strdup(arg); break; + case 217 : + args->asapo_group_id = strdup(arg); + break; + case 219 : args->iargs.data_format = parse_data_format(arg); if ( args->iargs.data_format == DATA_SOURCE_TYPE_UNKNOWN ) { @@ -846,6 +851,11 @@ int main(int argc, char *argv[]) args.basename = 0; args.zmq_addr = NULL; args.zmq_request = NULL; + args.asapo_endpoint = NULL; + args.asapo_token = NULL; + args.asapo_beamtime = NULL; + args.asapo_path = NULL; + args.asapo_group_id = NULL; args.n_zmq_subscriptions = 0; args.serial_start = 1; args.if_peaks = 1; @@ -958,6 +968,7 @@ int main(int argc, char *argv[]) {"asapo-token", 214, "str", OPTION_NO_USAGE, "ASAP::O token"}, {"asapo-beamtime", 215, "str", OPTION_NO_USAGE, "ASAP::O beamtime ID"}, {"asapo-path", 216, "str", OPTION_NO_USAGE, "ASAP::O path to files"}, + {"asapo-group", 217, "str", OPTION_NO_USAGE, "ASAP::O group ID"}, {"data-format", 219, "str", OPTION_NO_USAGE, "Streamed data format"}, {NULL, 0, 0, OPTION_DOC, "Peak search options:", 3}, @@ -1319,6 +1330,7 @@ int main(int argc, char *argv[]) args.n_zmq_subscriptions, args.zmq_request, args.asapo_endpoint, args.asapo_token, args.asapo_beamtime, args.asapo_path, + args.asapo_group_id, timeout, args.profile); cell_free(args.iargs.cell); -- cgit v1.2.3 From eecd67a344d2f5524fe8dd0175758632b932d6cb Mon Sep 17 00:00:00 2001 From: Thomas White Date: Tue, 6 Jul 2021 15:20:04 +0200 Subject: ASAP::O: Actually get the data --- src/im-sandbox.c | 18 ++++++++++++++---- 1 file changed, 14 insertions(+), 4 deletions(-) diff --git a/src/im-sandbox.c b/src/im-sandbox.c index 654f07d1..7da02970 100644 --- a/src/im-sandbox.c +++ b/src/im-sandbox.c @@ -457,10 +457,8 @@ static int run_work(const struct index_args *iargs, Stream *st, if ( !sb->zmq ) { - pargs.zmq_data = NULL; - pargs.zmq_data_size = 0; - - } else { + } + if ( sb->zmq ) { do { pargs.zmq_data = im_zmq_fetch(zmqstuff, @@ -472,6 +470,16 @@ static int run_work(const struct index_args *iargs, Stream *st, * importantly, the event queue gave us a unique * serial number for this image. */ + } else if ( sb->asapo ) { + + /* Temporary (?) abuse of "zmq_data", even though + * data comes via ASAP::O */ + pargs.zmq_data = im_asapo_fetch(asapostuff, + &pargs.zmq_data_size); + + } else { + pargs.zmq_data = NULL; + pargs.zmq_data_size = 0; } sb->shared->time_last_start[cookie] = get_monotonic_seconds(); @@ -489,7 +497,9 @@ static int run_work(const struct index_args *iargs, Stream *st, } } + /* These are both no-ops if argument is NULL */ im_zmq_shutdown(zmqstuff); + im_asapo_shutdown(asapostuff); cleanup_indexing(iargs->ipriv); cell_free(iargs->cell); -- cgit v1.2.3 From 457f8809521d83612552d5bb74c2eeeb0f506980 Mon Sep 17 00:00:00 2001 From: Thomas White Date: Tue, 20 Jul 2021 11:20:55 +0200 Subject: ASAP::O: Use asapo_string_from_c_str, which exists now --- src/im-asapo.c | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/im-asapo.c b/src/im-asapo.c index 4eafd058..362a73b0 100644 --- a/src/im-asapo.c +++ b/src/im-asapo.c @@ -62,8 +62,7 @@ static void show_asapo_error(const char *msg, const AsapoErrorHandle err) AsapoStringHandle im_asapo_group_id_from_string(const char *str) { - /* FIXME: This function does not yet exist */ - return asapostringhandle_from_string(str); + return asapo_string_from_c_str(str); } -- cgit v1.2.3 From c1a526cebf6e97d78ff340148952421feaaee9c3 Mon Sep 17 00:00:00 2001 From: Thomas White Date: Tue, 20 Jul 2021 12:06:57 +0200 Subject: Fix build without ASAP::O --- src/im-sandbox.c | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/src/im-sandbox.c b/src/im-sandbox.c index 7da02970..554024b2 100644 --- a/src/im-sandbox.c +++ b/src/im-sandbox.c @@ -62,7 +62,10 @@ #include "process_image.h" #include "im-zmq.h" #include "profile.h" + +#ifdef HAVE_ASAPO #include "im-asapo.h" +#endif struct sandbox @@ -111,7 +114,9 @@ struct sandbox const char *asapo_token; const char *asapo_beamtime; const char *asapo_path; +#ifdef HAVE_ASAPO AsapoStringHandle asapo_group_id; +#endif /* Final output */ Stream *stream; @@ -339,7 +344,10 @@ static int run_work(const struct index_args *iargs, Stream *st, { int allDone = 0; struct im_zmq *zmqstuff = NULL; + +#ifdef HAVE_ASAPO struct im_asapo *asapostuff = NULL; +#endif if ( sb->profile ) { profile_init(); @@ -357,6 +365,7 @@ static int run_work(const struct index_args *iargs, Stream *st, } } +#ifdef HAVE_ASAPO if ( sb->asapo ) { asapostuff = im_asapo_connect(sb->asapo_endpoint, sb->asapo_token, @@ -368,6 +377,7 @@ static int run_work(const struct index_args *iargs, Stream *st, return 1; } } +#endif while ( !allDone ) { @@ -472,10 +482,12 @@ static int run_work(const struct index_args *iargs, Stream *st, } else if ( sb->asapo ) { +#ifdef HAVE_ASAPO /* Temporary (?) abuse of "zmq_data", even though * data comes via ASAP::O */ pargs.zmq_data = im_asapo_fetch(asapostuff, &pargs.zmq_data_size); +#endif } else { pargs.zmq_data = NULL; @@ -499,7 +511,9 @@ static int run_work(const struct index_args *iargs, Stream *st, /* These are both no-ops if argument is NULL */ im_zmq_shutdown(zmqstuff); +#ifdef HAVE_ASAPO im_asapo_shutdown(asapostuff); +#endif cleanup_indexing(iargs->ipriv); cell_free(iargs->cell); @@ -1146,6 +1160,7 @@ int create_sandbox(struct index_args *iargs, int n_proc, char *prefix, return 0; } +#ifdef HAVE_ASAPO if ( asapo_group_id != NULL ) { sb->asapo_group_id = im_asapo_group_id_from_string(asapo_group_id); } else { @@ -1154,6 +1169,7 @@ int create_sandbox(struct index_args *iargs, int n_proc, char *prefix, asapo_beamtime, asapo_path); } +#endif sb->fds = NULL; sb->fhs = NULL; -- cgit v1.2.3 From 8735bc06caf9ab07b13108f80763a6ff164f0db6 Mon Sep 17 00:00:00 2001 From: Thomas White Date: Fri, 23 Jul 2021 17:10:11 +0200 Subject: ASAP::O: Tidy up build process This removes a lot of random #ifdefs from the source code. --- src/im-asapo.c | 20 +++++++------------- src/im-asapo.h | 32 ++++++++++---------------------- src/im-sandbox.c | 20 ++------------------ 3 files changed, 19 insertions(+), 53 deletions(-) diff --git a/src/im-asapo.c b/src/im-asapo.c index 362a73b0..ea6505a4 100644 --- a/src/im-asapo.c +++ b/src/im-asapo.c @@ -60,16 +60,10 @@ static void show_asapo_error(const char *msg, const AsapoErrorHandle err) } -AsapoStringHandle im_asapo_group_id_from_string(const char *str) -{ - return asapo_string_from_c_str(str); -} - - -AsapoStringHandle im_asapo_make_unique_group_id(const char *endpoint, - const char *token, - const char *beamtime, - const char *path) +char *im_asapo_make_unique_group_id(const char *endpoint, + const char *token, + const char *beamtime, + const char *path) { AsapoConsumerHandle consumer; AsapoSourceCredentialsHandle cred; @@ -94,7 +88,7 @@ AsapoStringHandle im_asapo_make_unique_group_id(const char *endpoint, return NULL; } - return group_id; + return strdup(asapo_string_c_str(group_id)); } @@ -102,7 +96,7 @@ struct im_asapo *im_asapo_connect(const char *endpoint, const char *token, const char *beamtime, const char *path, - AsapoStringHandle group_id) + const char *group_id) { struct im_asapo *a; AsapoSourceCredentialsHandle cred; @@ -122,7 +116,7 @@ struct im_asapo *im_asapo_connect(const char *endpoint, asapo_consumer_set_timeout(a->consumer, 1000); - a->group_id = group_id; + a->group_id = asapo_string_from_c_str(group_id); return a; } diff --git a/src/im-asapo.h b/src/im-asapo.h index 3ffc5808..b0c413f5 100644 --- a/src/im-asapo.h +++ b/src/im-asapo.h @@ -36,34 +36,28 @@ #if defined(HAVE_ASAPO) -#include - extern struct im_asapo *im_asapo_connect(const char *endpoint, const char *token, const char *beamtime, const char *path, - AsapoStringHandle group_id); + const char *group_id); extern void im_asapo_shutdown(struct im_asapo *a); extern void *im_asapo_fetch(struct im_asapo *a, size_t *pdata_size); -extern AsapoStringHandle im_asapo_group_id_from_string(const char *str); - -extern AsapoStringHandle im_asapo_make_unique_group_id(const char *endpoint, - const char *token, - const char *beamtime, - const char *path); +extern char *im_asapo_make_unique_group_id(const char *endpoint, + const char *token, + const char *beamtime, + const char *path); #else /* defined(HAVE_ASAPO) */ -typedef void* AsapoStringHandle; - static UNUSED struct im_asapo *im_asapo_connect(const char *endpoint, const char *token, const char *beamtime, const char *path, - AsapoStringHandle group_id) + const char *group_id) { return NULL; } @@ -78,16 +72,10 @@ static UNUSED void *im_asapo_fetch(struct im_asapo *a, size_t *psize) return NULL; } -static UNUSED AsapoStringHandle im_asapo_group_id_from_string(const char *str) -{ - return NULL; -} - -static UNUSED AsapoStringHandle im_asapo_make_unique_group_id(const char *endpoint, - const char *token, - const char *beamtime, - const char *path - AsapoStringHandle group_id) +static char *im_asapo_make_unique_group_id(const char *endpoint, + const char *token, + const char *beamtime, + const char *path) { return NULL; } diff --git a/src/im-sandbox.c b/src/im-sandbox.c index 554024b2..759c5442 100644 --- a/src/im-sandbox.c +++ b/src/im-sandbox.c @@ -62,10 +62,7 @@ #include "process_image.h" #include "im-zmq.h" #include "profile.h" - -#ifdef HAVE_ASAPO #include "im-asapo.h" -#endif struct sandbox @@ -114,9 +111,7 @@ struct sandbox const char *asapo_token; const char *asapo_beamtime; const char *asapo_path; -#ifdef HAVE_ASAPO - AsapoStringHandle asapo_group_id; -#endif + const char *asapo_group_id; /* Final output */ Stream *stream; @@ -344,10 +339,7 @@ static int run_work(const struct index_args *iargs, Stream *st, { int allDone = 0; struct im_zmq *zmqstuff = NULL; - -#ifdef HAVE_ASAPO struct im_asapo *asapostuff = NULL; -#endif if ( sb->profile ) { profile_init(); @@ -365,7 +357,6 @@ static int run_work(const struct index_args *iargs, Stream *st, } } -#ifdef HAVE_ASAPO if ( sb->asapo ) { asapostuff = im_asapo_connect(sb->asapo_endpoint, sb->asapo_token, @@ -377,7 +368,6 @@ static int run_work(const struct index_args *iargs, Stream *st, return 1; } } -#endif while ( !allDone ) { @@ -482,12 +472,10 @@ static int run_work(const struct index_args *iargs, Stream *st, } else if ( sb->asapo ) { -#ifdef HAVE_ASAPO /* Temporary (?) abuse of "zmq_data", even though * data comes via ASAP::O */ pargs.zmq_data = im_asapo_fetch(asapostuff, &pargs.zmq_data_size); -#endif } else { pargs.zmq_data = NULL; @@ -511,9 +499,7 @@ static int run_work(const struct index_args *iargs, Stream *st, /* These are both no-ops if argument is NULL */ im_zmq_shutdown(zmqstuff); -#ifdef HAVE_ASAPO im_asapo_shutdown(asapostuff); -#endif cleanup_indexing(iargs->ipriv); cell_free(iargs->cell); @@ -1160,16 +1146,14 @@ int create_sandbox(struct index_args *iargs, int n_proc, char *prefix, return 0; } -#ifdef HAVE_ASAPO if ( asapo_group_id != NULL ) { - sb->asapo_group_id = im_asapo_group_id_from_string(asapo_group_id); + sb->asapo_group_id = strdup(asapo_group_id); } else { sb->asapo_group_id = im_asapo_make_unique_group_id(asapo_endpoint, asapo_token, asapo_beamtime, asapo_path); } -#endif sb->fds = NULL; sb->fhs = NULL; -- cgit v1.2.3 From 817304e411d69426ffdd263ec6cc6fee35c48e22 Mon Sep 17 00:00:00 2001 From: Thomas White Date: Tue, 27 Jul 2021 11:49:49 +0200 Subject: ASAP::O: Fix memory leak on error path --- src/im-sandbox.c | 1 + 1 file changed, 1 insertion(+) diff --git a/src/im-sandbox.c b/src/im-sandbox.c index 759c5442..b646441b 100644 --- a/src/im-sandbox.c +++ b/src/im-sandbox.c @@ -1143,6 +1143,7 @@ int create_sandbox(struct index_args *iargs, int n_proc, char *prefix, if ( sb->zmq && sb->asapo ) { ERROR("Cannot simultaneously use ZMQ and ASAP::O input.\n"); + free(sb); return 0; } -- cgit v1.2.3 From b4c09272d2754a2add12153685549260885f7ca7 Mon Sep 17 00:00:00 2001 From: Thomas White Date: Wed, 11 Aug 2021 14:52:24 +0200 Subject: ASAP::O: Remove vestigial block --- src/im-sandbox.c | 3 --- 1 file changed, 3 deletions(-) diff --git a/src/im-sandbox.c b/src/im-sandbox.c index b646441b..801b8b88 100644 --- a/src/im-sandbox.c +++ b/src/im-sandbox.c @@ -455,9 +455,6 @@ static int run_work(const struct index_args *iargs, Stream *st, free(line); - if ( !sb->zmq ) { - - } if ( sb->zmq ) { do { -- cgit v1.2.3 From 6f11f88810d1d74fbee12fc410e4634cac588f6e Mon Sep 17 00:00:00 2001 From: Thomas White Date: Wed, 11 Aug 2021 14:52:54 +0200 Subject: indexamajig: Special cases for ASAP::O as for ZMQ --- src/im-sandbox.c | 3 +++ src/indexamajig.c | 4 +++- 2 files changed, 6 insertions(+), 1 deletion(-) diff --git a/src/im-sandbox.c b/src/im-sandbox.c index 801b8b88..468afdc9 100644 --- a/src/im-sandbox.c +++ b/src/im-sandbox.c @@ -878,6 +878,9 @@ static int fill_queue(struct get_pattern_ctx *gpctx, struct sandbox *sb) * be generated by image_read_data_block(). */ filename = "ZMQdata"; evstr = strdup("//"); + } else if ( sb->asapo ) { + filename = "ASAPOdata"; + evstr = strdup("//"); } else { if ( !get_pattern(gpctx, &filename, &evstr) ) return 1; } diff --git a/src/indexamajig.c b/src/indexamajig.c index ee23fcc3..52e83c57 100644 --- a/src/indexamajig.c +++ b/src/indexamajig.c @@ -1086,7 +1086,9 @@ int main(int argc, char *argv[]) if ( argp_parse(&argp, argc, argv, 0, NULL, &args) ) return 1; /* Check for minimal information */ - if ( (args.filename == NULL) && (args.zmq_addr == NULL) ) { + if ( (args.filename == NULL) + && (args.zmq_addr == NULL) + && (args.asapo_endpoint == NULL) ) { ERROR("You need to provide the input filename (use -i)\n"); return 1; } -- cgit v1.2.3 From 39aa39d13cd8ea4bc13b9d1499ab903ab10ecb66 Mon Sep 17 00:00:00 2001 From: Thomas White Date: Wed, 11 Aug 2021 16:02:38 +0200 Subject: ASAP::O: Use asapo_is_error() --- src/im-asapo.c | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/im-asapo.c b/src/im-asapo.c index ea6505a4..5008a62b 100644 --- a/src/im-asapo.c +++ b/src/im-asapo.c @@ -73,7 +73,7 @@ char *im_asapo_make_unique_group_id(const char *endpoint, cred = asapo_create_source_credentials(kProcessed, beamtime, "", "", token); consumer = asapo_create_consumer(endpoint, path, 1, cred, &err); asapo_free_handle(&cred); - if ( err ) { + if ( asapo_is_error(err) ) { show_asapo_error("Cannot create temporary ASAP::O consumer", err); asapo_free_handle(&consumer); return NULL; @@ -83,7 +83,7 @@ char *im_asapo_make_unique_group_id(const char *endpoint, group_id = asapo_consumer_generate_new_group_id(consumer, &err); asapo_free_handle(&consumer); - if ( err ) { + if ( asapo_is_error(err) ) { show_asapo_error("Cannot create ASAP::O group ID", err); return NULL; } @@ -108,7 +108,7 @@ struct im_asapo *im_asapo_connect(const char *endpoint, cred = asapo_create_source_credentials(kProcessed, beamtime, "", "", token); a->consumer = asapo_create_consumer(endpoint, path, 1, cred, &err); asapo_free_handle(&cred); - if ( err ) { + if ( asapo_is_error(err) ) { show_asapo_error("Cannot create ASAP::O consumer", err); free(a); return NULL; @@ -131,7 +131,7 @@ void *im_asapo_fetch(struct im_asapo *a, size_t *pdata_size) asapo_consumer_get_next(a->consumer, a->group_id, &meta, &data, "default", &err); - if ( err ) { + if ( asapo_is_error(err) ) { show_asapo_error("Couldn't get next ASAP::O record", err); return NULL; } -- cgit v1.2.3 From 2d09dfe63742975e39abee6f8c73bc5936d779c8 Mon Sep 17 00:00:00 2001 From: Thomas White Date: Wed, 11 Aug 2021 16:02:52 +0200 Subject: indexamajig: Exit if unique ASAP::O group ID can't be created --- src/im-sandbox.c | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/src/im-sandbox.c b/src/im-sandbox.c index 468afdc9..a175b3ed 100644 --- a/src/im-sandbox.c +++ b/src/im-sandbox.c @@ -1155,6 +1155,10 @@ int create_sandbox(struct index_args *iargs, int n_proc, char *prefix, asapo_beamtime, asapo_path); } + if ( sb->asapo_group_id == NULL ) { + ERROR("Failed to create ASAP::O group ID.\n"); + return 0; + } sb->fds = NULL; sb->fhs = NULL; -- cgit v1.2.3 From 5cde112fdab9f2913f9207e703322356553a05d1 Mon Sep 17 00:00:00 2001 From: Thomas White Date: Fri, 13 Aug 2021 10:50:16 +0200 Subject: indexamajig: Don't do ASAP::O stuff unless asked for --- src/im-sandbox.c | 24 +++++++++++++----------- 1 file changed, 13 insertions(+), 11 deletions(-) diff --git a/src/im-sandbox.c b/src/im-sandbox.c index a175b3ed..1cc78295 100644 --- a/src/im-sandbox.c +++ b/src/im-sandbox.c @@ -1147,17 +1147,19 @@ int create_sandbox(struct index_args *iargs, int n_proc, char *prefix, return 0; } - if ( asapo_group_id != NULL ) { - sb->asapo_group_id = strdup(asapo_group_id); - } else { - sb->asapo_group_id = im_asapo_make_unique_group_id(asapo_endpoint, - asapo_token, - asapo_beamtime, - asapo_path); - } - if ( sb->asapo_group_id == NULL ) { - ERROR("Failed to create ASAP::O group ID.\n"); - return 0; + if ( sb->asapo ) { + if ( asapo_group_id != NULL ) { + sb->asapo_group_id = strdup(asapo_group_id); + } else { + sb->asapo_group_id = im_asapo_make_unique_group_id(asapo_endpoint, + asapo_token, + asapo_beamtime, + asapo_path); + } + if ( sb->asapo_group_id == NULL ) { + ERROR("Failed to create ASAP::O group ID.\n"); + return 0; + } } sb->fds = NULL; -- cgit v1.2.3 From ff9e20b38b9033b1151722fa5c3423d3325d4ae2 Mon Sep 17 00:00:00 2001 From: Thomas White Date: Wed, 8 Sep 2021 15:43:58 +0200 Subject: ASAP::O: Copy the data block We may eventually want to avoid copying the entire data block, but it's an easy solution for now, and matches what we do for ZMQ. --- src/im-asapo.c | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) diff --git a/src/im-asapo.c b/src/im-asapo.c index 5008a62b..13611c2c 100644 --- a/src/im-asapo.c +++ b/src/im-asapo.c @@ -124,10 +124,11 @@ struct im_asapo *im_asapo_connect(const char *endpoint, void *im_asapo_fetch(struct im_asapo *a, size_t *pdata_size) { - void *data_block; + void *data_copy; AsapoMessageMetaHandle meta = asapo_new_handle(); AsapoMessageDataHandle data = asapo_new_handle(); AsapoErrorHandle err = asapo_new_handle(); + uint64_t msg_size; asapo_consumer_get_next(a->consumer, a->group_id, &meta, &data, "default", &err); @@ -136,16 +137,22 @@ void *im_asapo_fetch(struct im_asapo *a, size_t *pdata_size) return NULL; } + msg_size = asapo_message_meta_get_size(meta); + STATUS("ASAP::O ID: %llu\n", asapo_message_meta_get_id(meta)); STATUS("ASAP::O filename: %s\n", asapo_message_meta_get_name(meta)); + STATUS("ASAP::O size: %lli\n", (long long int)msg_size); + - data_block = asapo_message_data_get_as_chars(data); + data_copy = malloc(msg_size); + if ( data_copy == NULL ) return NULL; + memcpy(data_copy, asapo_message_data_get_as_chars(data), msg_size); asapo_free_handle(&err); asapo_free_handle(&meta); asapo_free_handle(&data); - return data_block; + return data_copy; } -- cgit v1.2.3 From 16aec3ce84f8cbd6ea36cccc5dfac6b270bbb653 Mon Sep 17 00:00:00 2001 From: Thomas White Date: Wed, 8 Sep 2021 15:54:40 +0200 Subject: ASAP::O: Show the uniquely-generated group ID --- src/im-sandbox.c | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/im-sandbox.c b/src/im-sandbox.c index 1cc78295..3da205f1 100644 --- a/src/im-sandbox.c +++ b/src/im-sandbox.c @@ -1159,6 +1159,8 @@ int create_sandbox(struct index_args *iargs, int n_proc, char *prefix, if ( sb->asapo_group_id == NULL ) { ERROR("Failed to create ASAP::O group ID.\n"); return 0; + } else { + STATUS("The unique ID is %s\n", sb->asapo_group_id); } } -- cgit v1.2.3 From 91e1a5751b61efa63b2a425cfb501e5c534b6e66 Mon Sep 17 00:00:00 2001 From: Thomas White Date: Thu, 9 Sep 2021 16:18:33 +0200 Subject: Formatting fussiness --- src/im-asapo.c | 1 - 1 file changed, 1 deletion(-) diff --git a/src/im-asapo.c b/src/im-asapo.c index 13611c2c..3920eadd 100644 --- a/src/im-asapo.c +++ b/src/im-asapo.c @@ -143,7 +143,6 @@ void *im_asapo_fetch(struct im_asapo *a, size_t *pdata_size) STATUS("ASAP::O filename: %s\n", asapo_message_meta_get_name(meta)); STATUS("ASAP::O size: %lli\n", (long long int)msg_size); - data_copy = malloc(msg_size); if ( data_copy == NULL ) return NULL; memcpy(data_copy, asapo_message_data_get_as_chars(data), msg_size); -- cgit v1.2.3 From 71e9837def30091db678cd3e93780b90924e2582 Mon Sep 17 00:00:00 2001 From: Thomas White Date: Thu, 9 Sep 2021 16:20:21 +0200 Subject: ASAP::O: Don't send data to process_image if nothing came --- src/im-sandbox.c | 18 +++++++++++++----- 1 file changed, 13 insertions(+), 5 deletions(-) diff --git a/src/im-sandbox.c b/src/im-sandbox.c index 3da205f1..512a12df 100644 --- a/src/im-sandbox.c +++ b/src/im-sandbox.c @@ -454,6 +454,7 @@ static int run_work(const struct index_args *iargs, Stream *st, pargs.event = safe_strdup(event_str); free(line); + ok = 0; if ( sb->zmq ) { @@ -461,6 +462,7 @@ static int run_work(const struct index_args *iargs, Stream *st, pargs.zmq_data = im_zmq_fetch(zmqstuff, &pargs.zmq_data_size); } while ( pargs.zmq_data_size < 15 ); + ok = 1; /* The filename/event, which will be 'fake' values in * this case, still came via the event queue. More @@ -473,17 +475,23 @@ static int run_work(const struct index_args *iargs, Stream *st, * data comes via ASAP::O */ pargs.zmq_data = im_asapo_fetch(asapostuff, &pargs.zmq_data_size); + if ( pargs.zmq_data != NULL ) { + ok = 1; + } } else { pargs.zmq_data = NULL; pargs.zmq_data_size = 0; + ok = 1; } - sb->shared->time_last_start[cookie] = get_monotonic_seconds(); - profile_start("process-image"); - process_image(iargs, &pargs, st, cookie, tmpdir, ser, - sb->shared, sb->shared->last_task[cookie]); - profile_end("process-image"); + if ( ok ) { + sb->shared->time_last_start[cookie] = get_monotonic_seconds(); + profile_start("process-image"); + process_image(iargs, &pargs, st, cookie, tmpdir, ser, + sb->shared, sb->shared->last_task[cookie]); + profile_end("process-image"); + } /* pargs.zmq_data will be copied into the image structure, so * that it can be queried for "header" values etc. It will -- cgit v1.2.3 From 0c566e3bcc918f8f047fd3b2aad866f0e81e582f Mon Sep 17 00:00:00 2001 From: Thomas White Date: Thu, 9 Sep 2021 16:25:57 +0200 Subject: ASAP::O: Simplify im_asapo_make_unique_group_id --- src/im-asapo.c | 10 +++------- src/im-asapo.h | 8 ++------ src/im-sandbox.c | 4 +--- 3 files changed, 6 insertions(+), 16 deletions(-) diff --git a/src/im-asapo.c b/src/im-asapo.c index 3920eadd..876de222 100644 --- a/src/im-asapo.c +++ b/src/im-asapo.c @@ -61,17 +61,15 @@ static void show_asapo_error(const char *msg, const AsapoErrorHandle err) char *im_asapo_make_unique_group_id(const char *endpoint, - const char *token, - const char *beamtime, - const char *path) + const char *token) { AsapoConsumerHandle consumer; AsapoSourceCredentialsHandle cred; AsapoStringHandle group_id; AsapoErrorHandle err = asapo_new_handle(); - cred = asapo_create_source_credentials(kProcessed, beamtime, "", "", token); - consumer = asapo_create_consumer(endpoint, path, 1, cred, &err); + cred = asapo_create_source_credentials(kProcessed, "", "", "", token); + consumer = asapo_create_consumer(endpoint, "", 0, cred, &err); asapo_free_handle(&cred); if ( asapo_is_error(err) ) { show_asapo_error("Cannot create temporary ASAP::O consumer", err); @@ -79,8 +77,6 @@ char *im_asapo_make_unique_group_id(const char *endpoint, return NULL; } - asapo_consumer_set_timeout(consumer, 1000); - group_id = asapo_consumer_generate_new_group_id(consumer, &err); asapo_free_handle(&consumer); if ( asapo_is_error(err) ) { diff --git a/src/im-asapo.h b/src/im-asapo.h index b0c413f5..4f5c13f7 100644 --- a/src/im-asapo.h +++ b/src/im-asapo.h @@ -47,9 +47,7 @@ extern void im_asapo_shutdown(struct im_asapo *a); extern void *im_asapo_fetch(struct im_asapo *a, size_t *pdata_size); extern char *im_asapo_make_unique_group_id(const char *endpoint, - const char *token, - const char *beamtime, - const char *path); + const char *token); #else /* defined(HAVE_ASAPO) */ @@ -73,9 +71,7 @@ static UNUSED void *im_asapo_fetch(struct im_asapo *a, size_t *psize) } static char *im_asapo_make_unique_group_id(const char *endpoint, - const char *token, - const char *beamtime, - const char *path) + const char *token) { return NULL; } diff --git a/src/im-sandbox.c b/src/im-sandbox.c index 512a12df..fed3873e 100644 --- a/src/im-sandbox.c +++ b/src/im-sandbox.c @@ -1160,9 +1160,7 @@ int create_sandbox(struct index_args *iargs, int n_proc, char *prefix, sb->asapo_group_id = strdup(asapo_group_id); } else { sb->asapo_group_id = im_asapo_make_unique_group_id(asapo_endpoint, - asapo_token, - asapo_beamtime, - asapo_path); + asapo_token); } if ( sb->asapo_group_id == NULL ) { ERROR("Failed to create ASAP::O group ID.\n"); -- cgit v1.2.3 From 151924b7fccde4cb6bdb73128ba27c091037eb4b Mon Sep 17 00:00:00 2001 From: Thomas White Date: Thu, 9 Sep 2021 16:30:14 +0200 Subject: ASAP::O: Expose data source --- src/im-asapo.c | 16 +++++++++++++--- src/im-asapo.h | 6 ++++-- src/im-sandbox.c | 7 +++++-- src/im-sandbox.h | 2 +- src/indexamajig.c | 9 ++++++++- 5 files changed, 31 insertions(+), 9 deletions(-) diff --git a/src/im-asapo.c b/src/im-asapo.c index 876de222..9d8fb171 100644 --- a/src/im-asapo.c +++ b/src/im-asapo.c @@ -92,17 +92,27 @@ struct im_asapo *im_asapo_connect(const char *endpoint, const char *token, const char *beamtime, const char *path, - const char *group_id) + const char *group_id, + const char *data_source) { struct im_asapo *a; + int has_filesystem; AsapoSourceCredentialsHandle cred; AsapoErrorHandle err = asapo_new_handle(); a = malloc(sizeof(struct im_asapo)); if ( a == NULL ) return NULL; - cred = asapo_create_source_credentials(kProcessed, beamtime, "", "", token); - a->consumer = asapo_create_consumer(endpoint, path, 1, cred, &err); + cred = asapo_create_source_credentials(kProcessed, beamtime, "", + data_source, token); + if ( path == NULL ) { + path = ""; + has_filesystem = 0; + } else { + has_filesystem = 1; + } + a->consumer = asapo_create_consumer(endpoint, path, has_filesystem, + cred, &err); asapo_free_handle(&cred); if ( asapo_is_error(err) ) { show_asapo_error("Cannot create ASAP::O consumer", err); diff --git a/src/im-asapo.h b/src/im-asapo.h index 4f5c13f7..3160c69d 100644 --- a/src/im-asapo.h +++ b/src/im-asapo.h @@ -40,7 +40,8 @@ extern struct im_asapo *im_asapo_connect(const char *endpoint, const char *token, const char *beamtime, const char *path, - const char *group_id); + const char *group_id, + const char *data_source); extern void im_asapo_shutdown(struct im_asapo *a); @@ -55,7 +56,8 @@ static UNUSED struct im_asapo *im_asapo_connect(const char *endpoint, const char *token, const char *beamtime, const char *path, - const char *group_id) + const char *group_id, + const char *data_source) { return NULL; } diff --git a/src/im-sandbox.c b/src/im-sandbox.c index fed3873e..f1c0474f 100644 --- a/src/im-sandbox.c +++ b/src/im-sandbox.c @@ -112,6 +112,7 @@ struct sandbox const char *asapo_beamtime; const char *asapo_path; const char *asapo_group_id; + const char *asapo_source; /* Final output */ Stream *stream; @@ -362,7 +363,8 @@ static int run_work(const struct index_args *iargs, Stream *st, sb->asapo_token, sb->asapo_beamtime, sb->asapo_path, - sb->asapo_group_id); + sb->asapo_group_id, + sb->asapo_source); if ( asapostuff == NULL ) { ERROR("ASAP::O setup failed.\n"); return 1; @@ -1098,7 +1100,7 @@ int create_sandbox(struct index_args *iargs, int n_proc, char *prefix, int n_zmq_subscriptions, const char *zmq_request, const char *asapo_endpoint, const char *asapo_token, const char *asapo_beamtime, const char *asapo_path, - const char *asapo_group_id, + const char *asapo_group_id, const char *asapo_source, int timeout, int profile) { int i; @@ -1145,6 +1147,7 @@ int create_sandbox(struct index_args *iargs, int n_proc, char *prefix, sb->asapo_token = asapo_token; sb->asapo_beamtime = asapo_beamtime; sb->asapo_path = asapo_path; + sb->asapo_source = asapo_source; } else { sb->asapo = 0; } diff --git a/src/im-sandbox.h b/src/im-sandbox.h index 0415cb80..8d76b376 100644 --- a/src/im-sandbox.h +++ b/src/im-sandbox.h @@ -89,7 +89,7 @@ extern int create_sandbox(struct index_args *iargs, int n_proc, char *prefix, int n_zmq_subscriptions, const char *zmq_request, const char *asapo_endpoint, const char *asapo_token, const char *asapo_beamtime, const char *asapo_path, - const char *asapo_group_id, + const char *asapo_group_id, const char *asapo_source, int timeout, int profile); #endif /* IM_SANDBOX_H */ diff --git a/src/indexamajig.c b/src/indexamajig.c index 52e83c57..7fbb3f8f 100644 --- a/src/indexamajig.c +++ b/src/indexamajig.c @@ -89,6 +89,7 @@ struct indexamajig_arguments char *asapo_beamtime; char *asapo_path; char *asapo_group_id; + char *asapo_source; int serial_start; char *temp_location; int if_refine; @@ -427,6 +428,10 @@ static error_t parse_arg(int key, char *arg, struct argp_state *state) args->asapo_group_id = strdup(arg); break; + case 218 : + args->asapo_source = strdup(arg); + break; + case 219 : args->iargs.data_format = parse_data_format(arg); if ( args->iargs.data_format == DATA_SOURCE_TYPE_UNKNOWN ) { @@ -856,6 +861,7 @@ int main(int argc, char *argv[]) args.asapo_beamtime = NULL; args.asapo_path = NULL; args.asapo_group_id = NULL; + args.asapo_source = NULL; args.n_zmq_subscriptions = 0; args.serial_start = 1; args.if_peaks = 1; @@ -969,6 +975,7 @@ int main(int argc, char *argv[]) {"asapo-beamtime", 215, "str", OPTION_NO_USAGE, "ASAP::O beamtime ID"}, {"asapo-path", 216, "str", OPTION_NO_USAGE, "ASAP::O path to files"}, {"asapo-group", 217, "str", OPTION_NO_USAGE, "ASAP::O group ID"}, + {"asapo-source", 218, "str", OPTION_NO_USAGE, "ASAP::O data source"}, {"data-format", 219, "str", OPTION_NO_USAGE, "Streamed data format"}, {NULL, 0, 0, OPTION_DOC, "Peak search options:", 3}, @@ -1332,7 +1339,7 @@ int main(int argc, char *argv[]) args.n_zmq_subscriptions, args.zmq_request, args.asapo_endpoint, args.asapo_token, args.asapo_beamtime, args.asapo_path, - args.asapo_group_id, + args.asapo_group_id, args.asapo_source, timeout, args.profile); cell_free(args.iargs.cell); -- cgit v1.2.3 From 24d180fc0173c8f577ffbb3394b612c19fa161ff Mon Sep 17 00:00:00 2001 From: Thomas White Date: Thu, 9 Sep 2021 16:33:50 +0200 Subject: ASAP::O: Remove filesystem path This is only really needed for offline processing. It's an added complication for now. Maybe we'll revisit it later and re-expose this, though. --- src/im-asapo.c | 11 +---------- src/im-asapo.h | 2 -- src/im-sandbox.c | 5 +---- src/im-sandbox.h | 2 +- src/indexamajig.c | 9 +-------- 5 files changed, 4 insertions(+), 25 deletions(-) diff --git a/src/im-asapo.c b/src/im-asapo.c index 9d8fb171..c44f05a7 100644 --- a/src/im-asapo.c +++ b/src/im-asapo.c @@ -91,12 +91,10 @@ char *im_asapo_make_unique_group_id(const char *endpoint, struct im_asapo *im_asapo_connect(const char *endpoint, const char *token, const char *beamtime, - const char *path, const char *group_id, const char *data_source) { struct im_asapo *a; - int has_filesystem; AsapoSourceCredentialsHandle cred; AsapoErrorHandle err = asapo_new_handle(); @@ -105,14 +103,7 @@ struct im_asapo *im_asapo_connect(const char *endpoint, cred = asapo_create_source_credentials(kProcessed, beamtime, "", data_source, token); - if ( path == NULL ) { - path = ""; - has_filesystem = 0; - } else { - has_filesystem = 1; - } - a->consumer = asapo_create_consumer(endpoint, path, has_filesystem, - cred, &err); + a->consumer = asapo_create_consumer(endpoint, "", 0, cred, &err); asapo_free_handle(&cred); if ( asapo_is_error(err) ) { show_asapo_error("Cannot create ASAP::O consumer", err); diff --git a/src/im-asapo.h b/src/im-asapo.h index 3160c69d..04856511 100644 --- a/src/im-asapo.h +++ b/src/im-asapo.h @@ -39,7 +39,6 @@ extern struct im_asapo *im_asapo_connect(const char *endpoint, const char *token, const char *beamtime, - const char *path, const char *group_id, const char *data_source); @@ -55,7 +54,6 @@ extern char *im_asapo_make_unique_group_id(const char *endpoint, static UNUSED struct im_asapo *im_asapo_connect(const char *endpoint, const char *token, const char *beamtime, - const char *path, const char *group_id, const char *data_source) { diff --git a/src/im-sandbox.c b/src/im-sandbox.c index f1c0474f..0933ab31 100644 --- a/src/im-sandbox.c +++ b/src/im-sandbox.c @@ -110,7 +110,6 @@ struct sandbox const char *asapo_endpoint; const char *asapo_token; const char *asapo_beamtime; - const char *asapo_path; const char *asapo_group_id; const char *asapo_source; @@ -362,7 +361,6 @@ static int run_work(const struct index_args *iargs, Stream *st, asapostuff = im_asapo_connect(sb->asapo_endpoint, sb->asapo_token, sb->asapo_beamtime, - sb->asapo_path, sb->asapo_group_id, sb->asapo_source); if ( asapostuff == NULL ) { @@ -1099,7 +1097,7 @@ int create_sandbox(struct index_args *iargs, int n_proc, char *prefix, const char *zmq_address, char **zmq_subscriptions, int n_zmq_subscriptions, const char *zmq_request, const char *asapo_endpoint, const char *asapo_token, - const char *asapo_beamtime, const char *asapo_path, + const char *asapo_beamtime, const char *asapo_group_id, const char *asapo_source, int timeout, int profile) { @@ -1146,7 +1144,6 @@ int create_sandbox(struct index_args *iargs, int n_proc, char *prefix, sb->asapo_endpoint = asapo_endpoint; sb->asapo_token = asapo_token; sb->asapo_beamtime = asapo_beamtime; - sb->asapo_path = asapo_path; sb->asapo_source = asapo_source; } else { sb->asapo = 0; diff --git a/src/im-sandbox.h b/src/im-sandbox.h index 8d76b376..a1b9ed36 100644 --- a/src/im-sandbox.h +++ b/src/im-sandbox.h @@ -88,7 +88,7 @@ extern int create_sandbox(struct index_args *iargs, int n_proc, char *prefix, const char *zmq_address, char **zmq_subscriptions, int n_zmq_subscriptions, const char *zmq_request, const char *asapo_endpoint, const char *asapo_token, - const char *asapo_beamtime, const char *asapo_path, + const char *asapo_beamtime, const char *asapo_group_id, const char *asapo_source, int timeout, int profile); diff --git a/src/indexamajig.c b/src/indexamajig.c index 7fbb3f8f..8d383d8e 100644 --- a/src/indexamajig.c +++ b/src/indexamajig.c @@ -87,7 +87,6 @@ struct indexamajig_arguments char *asapo_endpoint; char *asapo_token; char *asapo_beamtime; - char *asapo_path; char *asapo_group_id; char *asapo_source; int serial_start; @@ -420,10 +419,6 @@ static error_t parse_arg(int key, char *arg, struct argp_state *state) args->asapo_beamtime = strdup(arg); break; - case 216 : - args->asapo_path = strdup(arg); - break; - case 217 : args->asapo_group_id = strdup(arg); break; @@ -859,7 +854,6 @@ int main(int argc, char *argv[]) args.asapo_endpoint = NULL; args.asapo_token = NULL; args.asapo_beamtime = NULL; - args.asapo_path = NULL; args.asapo_group_id = NULL; args.asapo_source = NULL; args.n_zmq_subscriptions = 0; @@ -973,7 +967,6 @@ int main(int argc, char *argv[]) {"asapo-endpoint", 213, "str", OPTION_NO_USAGE, "ASAP::O endpoint"}, {"asapo-token", 214, "str", OPTION_NO_USAGE, "ASAP::O token"}, {"asapo-beamtime", 215, "str", OPTION_NO_USAGE, "ASAP::O beamtime ID"}, - {"asapo-path", 216, "str", OPTION_NO_USAGE, "ASAP::O path to files"}, {"asapo-group", 217, "str", OPTION_NO_USAGE, "ASAP::O group ID"}, {"asapo-source", 218, "str", OPTION_NO_USAGE, "ASAP::O data source"}, {"data-format", 219, "str", OPTION_NO_USAGE, "Streamed data format"}, @@ -1338,7 +1331,7 @@ int main(int argc, char *argv[]) args.zmq_addr, args.zmq_subscriptions, args.n_zmq_subscriptions, args.zmq_request, args.asapo_endpoint, args.asapo_token, - args.asapo_beamtime, args.asapo_path, + args.asapo_beamtime, args.asapo_group_id, args.asapo_source, timeout, args.profile); -- cgit v1.2.3 From 65b0ee1d76dc47a9d890f94a309b1947260f74ef Mon Sep 17 00:00:00 2001 From: Thomas White Date: Fri, 10 Sep 2021 17:18:10 +0200 Subject: ASAP::O: Pass through data block size --- src/im-asapo.c | 1 + 1 file changed, 1 insertion(+) diff --git a/src/im-asapo.c b/src/im-asapo.c index c44f05a7..5f4f2b65 100644 --- a/src/im-asapo.c +++ b/src/im-asapo.c @@ -148,6 +148,7 @@ void *im_asapo_fetch(struct im_asapo *a, size_t *pdata_size) asapo_free_handle(&meta); asapo_free_handle(&data); + *pdata_size = msg_size; return data_copy; } -- cgit v1.2.3 From 36ac02772c42c41b454cc0da051cebcddcd433e2 Mon Sep 17 00:00:00 2001 From: Thomas White Date: Wed, 15 Sep 2021 17:15:52 +0200 Subject: ASAP::O: Stream switching --- src/im-asapo.c | 122 +++++++++++++++++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 118 insertions(+), 4 deletions(-) diff --git a/src/im-asapo.c b/src/im-asapo.c index 5f4f2b65..0fc0a2b2 100644 --- a/src/im-asapo.c +++ b/src/im-asapo.c @@ -47,6 +47,7 @@ struct im_asapo { + char *stream; AsapoConsumerHandle consumer; AsapoStringHandle group_id; }; @@ -114,23 +115,130 @@ struct im_asapo *im_asapo_connect(const char *endpoint, asapo_consumer_set_timeout(a->consumer, 1000); a->group_id = asapo_string_from_c_str(group_id); + a->stream = NULL; return a; } +static int select_last_stream(struct im_asapo *a) +{ + AsapoStreamInfosHandle si; + size_t len; + int i; + AsapoStreamInfoHandle st; + AsapoErrorHandle err = asapo_new_handle(); + + si = asapo_consumer_get_stream_list(a->consumer, NULL, + kAllStreams, &err); + + if ( asapo_is_error(err) ) { + show_asapo_error("Couldn't get ASAP::O stream list", err); + asapo_free_handle(err); + return 1; + } + + STATUS("for info: stream list:\n"); + n = asapo_stream_infos_get_size(si); + for ( i=0; istream = strdup(asapo_stream_info_get_name(st)); + asapo_free_handle(st); + + asapo_free_handle(si); + asapo_free_handle(err); + return 0; +} + + +static int select_next_stream(struct im_asapo *a) +{ + AsapoStreamInfosHandle si; + size_t len; + int i; + AsapoErrorHandle err = asapo_new_handle(); + + si = asapo_consumer_get_stream_list(a->consumer, NULL, + kAllStreams, &err); + + if ( asapo_is_error(err) ) { + show_asapo_error("Couldn't get ASAP::O stream list", err); + asapo_free_handle(err); + return 1; + } + + n = asapo_stream_infos_get_size(si); + for ( i=n-1; i>0; i-- ) { + + AsapoStreamInfoHandle st; + const char *name; + + st = asapo_stream_infos_get_item(si, i); + name = asapo_stream_info_get_name(st); + + if ( strcmp(name, a->stream) == 0 ) { + free(a->stream); + a->stream = strdup(asapo_stream_info_get_next_stream(st)); + asapo_free_handle(st); + break; + } + + asapo_free_handle(st); + } + asapo_free_handle(si); + asapo_free_handle(err); + + return 0; +} + + +static void skip_to_stream_end(struct im_asapo *a) +{ + /* FIXME: Implementation */ +} + + void *im_asapo_fetch(struct im_asapo *a, size_t *pdata_size) { void *data_copy; - AsapoMessageMetaHandle meta = asapo_new_handle(); - AsapoMessageDataHandle data = asapo_new_handle(); + AsapoMessageMetaHandle meta; + AsapoMessageDataHandle data; AsapoErrorHandle err = asapo_new_handle(); uint64_t msg_size; + if ( a->stream == NULL ) { + if ( select_last_stream(a) ) { + asapo_free_handle(&err); + return NULL; + } + skip_to_stream_end(a); + } + + meta = asapo_new_handle(); + data = asapo_new_handle(); + asapo_consumer_get_next(a->consumer, a->group_id, &meta, &data, - "default", &err); + a->stream, &err); + if ( asapo_error_get_type(err) == kEndOfStream ) { + select_next_stream(a); + asapo_free_handle(&err); + asapo_free_handle(&meta); + asapo_free_handle(&data); + return NULL; /* Please call back later! */ + } + if ( asapo_is_error(err) ) { show_asapo_error("Couldn't get next ASAP::O record", err); + asapo_free_handle(&err); + asapo_free_handle(&meta); + asapo_free_handle(&data); return NULL; } @@ -141,7 +249,13 @@ void *im_asapo_fetch(struct im_asapo *a, size_t *pdata_size) STATUS("ASAP::O size: %lli\n", (long long int)msg_size); data_copy = malloc(msg_size); - if ( data_copy == NULL ) return NULL; + if ( data_copy == NULL ) { + ERROR("Failed to copy data block.\n"); + asapo_free_handle(&err); + asapo_free_handle(&meta); + asapo_free_handle(&data); + return NULL; + } memcpy(data_copy, asapo_message_data_get_as_chars(data), msg_size); asapo_free_handle(&err); -- cgit v1.2.3 From 77612a5e8a39ffc96c89e4b07fc14fc50d57382f Mon Sep 17 00:00:00 2001 From: Thomas White Date: Thu, 16 Sep 2021 16:20:51 +0200 Subject: ASAP::O: Fixes after testing for stream switching --- src/im-asapo.c | 62 ++++++++++++++++++++++++++-------------------------------- 1 file changed, 28 insertions(+), 34 deletions(-) diff --git a/src/im-asapo.c b/src/im-asapo.c index 0fc0a2b2..7b3c8b8b 100644 --- a/src/im-asapo.c +++ b/src/im-asapo.c @@ -124,36 +124,34 @@ struct im_asapo *im_asapo_connect(const char *endpoint, static int select_last_stream(struct im_asapo *a) { AsapoStreamInfosHandle si; - size_t len; + size_t n; int i; AsapoStreamInfoHandle st; AsapoErrorHandle err = asapo_new_handle(); - si = asapo_consumer_get_stream_list(a->consumer, NULL, + si = asapo_consumer_get_stream_list(a->consumer, "", kAllStreams, &err); if ( asapo_is_error(err) ) { show_asapo_error("Couldn't get ASAP::O stream list", err); - asapo_free_handle(err); + asapo_free_handle(&err); return 1; } - STATUS("for info: stream list:\n"); + STATUS("Streams available at start:\n"); n = asapo_stream_infos_get_size(si); for ( i=0; istream = strdup(asapo_stream_info_get_name(st)); - asapo_free_handle(st); + STATUS("Starting with the last stream: %s\n", a->stream); - asapo_free_handle(si); - asapo_free_handle(err); + asapo_free_handle(&err); return 0; } @@ -161,39 +159,30 @@ static int select_last_stream(struct im_asapo *a) static int select_next_stream(struct im_asapo *a) { AsapoStreamInfosHandle si; - size_t len; - int i; + AsapoStreamInfoHandle st; AsapoErrorHandle err = asapo_new_handle(); + const char *next_stream; - si = asapo_consumer_get_stream_list(a->consumer, NULL, + si = asapo_consumer_get_stream_list(a->consumer, a->stream, kAllStreams, &err); if ( asapo_is_error(err) ) { show_asapo_error("Couldn't get ASAP::O stream list", err); - asapo_free_handle(err); + asapo_free_handle(&err); return 1; } - n = asapo_stream_infos_get_size(si); - for ( i=n-1; i>0; i-- ) { - - AsapoStreamInfoHandle st; - const char *name; - - st = asapo_stream_infos_get_item(si, i); - name = asapo_stream_info_get_name(st); - - if ( strcmp(name, a->stream) == 0 ) { - free(a->stream); - a->stream = strdup(asapo_stream_info_get_next_stream(st)); - asapo_free_handle(st); - break; - } + asapo_free_handle(&err); - asapo_free_handle(st); + st = asapo_stream_infos_get_item(si, 0); + next_stream = asapo_stream_info_get_name(st); + if ( strcmp(next_stream, a->stream) == 0 ) { + STATUS("Waiting for new data...\n"); + } else { + free(a->stream); + a->stream = strdup(next_stream); + STATUS("Selecting next stream: %s\n", a->stream); } - asapo_free_handle(si); - asapo_free_handle(err); return 0; } @@ -201,7 +190,13 @@ static int select_next_stream(struct im_asapo *a) static void skip_to_stream_end(struct im_asapo *a) { - /* FIXME: Implementation */ + int64_t size; + AsapoErrorHandle err = asapo_new_handle(); + + size = asapo_consumer_get_current_size(a->consumer, a->stream, &err); + asapo_consumer_set_last_read_marker(a->consumer, a->group_id, size, + a->stream, &err); + STATUS("Skipping to end of stream (%lli)\n", size); } @@ -231,7 +226,7 @@ void *im_asapo_fetch(struct im_asapo *a, size_t *pdata_size) asapo_free_handle(&err); asapo_free_handle(&meta); asapo_free_handle(&data); - return NULL; /* Please call back later! */ + return NULL; /* Sandbox will call try again very soon */ } if ( asapo_is_error(err) ) { @@ -246,7 +241,6 @@ void *im_asapo_fetch(struct im_asapo *a, size_t *pdata_size) STATUS("ASAP::O ID: %llu\n", asapo_message_meta_get_id(meta)); STATUS("ASAP::O filename: %s\n", asapo_message_meta_get_name(meta)); - STATUS("ASAP::O size: %lli\n", (long long int)msg_size); data_copy = malloc(msg_size); if ( data_copy == NULL ) { -- cgit v1.2.3 From 1638ec5dc997f0c0fa35ed4fb4f23ed117aafae4 Mon Sep 17 00:00:00 2001 From: Thomas White Date: Thu, 16 Sep 2021 16:56:06 +0200 Subject: ASAP::O: Fix memory leaks --- src/im-asapo.c | 36 +++++++++++++++++++++++++++++------- 1 file changed, 29 insertions(+), 7 deletions(-) diff --git a/src/im-asapo.c b/src/im-asapo.c index 7b3c8b8b..7d4cbf3f 100644 --- a/src/im-asapo.c +++ b/src/im-asapo.c @@ -141,16 +141,18 @@ static int select_last_stream(struct im_asapo *a) STATUS("Streams available at start:\n"); n = asapo_stream_infos_get_size(si); for ( i=0; istream = strdup(asapo_stream_info_get_name(st)); + asapo_free_handle(&st); STATUS("Starting with the last stream: %s\n", a->stream); + asapo_free_handle(&si); asapo_free_handle(&err); return 0; } @@ -168,6 +170,7 @@ static int select_next_stream(struct im_asapo *a) if ( asapo_is_error(err) ) { show_asapo_error("Couldn't get ASAP::O stream list", err); + asapo_free_handle(&si); asapo_free_handle(&err); return 1; } @@ -176,6 +179,7 @@ static int select_next_stream(struct im_asapo *a) st = asapo_stream_infos_get_item(si, 0); next_stream = asapo_stream_info_get_name(st); + asapo_free_handle(&st); if ( strcmp(next_stream, a->stream) == 0 ) { STATUS("Waiting for new data...\n"); } else { @@ -184,6 +188,8 @@ static int select_next_stream(struct im_asapo *a) STATUS("Selecting next stream: %s\n", a->stream); } + asapo_free_handle(&si); + return 0; } @@ -194,9 +200,25 @@ static void skip_to_stream_end(struct im_asapo *a) AsapoErrorHandle err = asapo_new_handle(); size = asapo_consumer_get_current_size(a->consumer, a->stream, &err); - asapo_consumer_set_last_read_marker(a->consumer, a->group_id, size, - a->stream, &err); - STATUS("Skipping to end of stream (%lli)\n", size); + if ( asapo_is_error(err) ) { + show_asapo_error("Failed to get length of stream", err); + } else { + + AsapoErrorHandle err = asapo_new_handle(); + + asapo_consumer_set_last_read_marker(a->consumer, + a->group_id, size, + a->stream, &err); + if ( asapo_is_error(err) ) { + show_asapo_error("Failed to skip to end of stream", err); + } else { + STATUS("Skipped to end of stream (%lli)\n", size); + } + + asapo_free_handle(&err); + } + + asapo_free_handle(&err); } @@ -205,17 +227,17 @@ void *im_asapo_fetch(struct im_asapo *a, size_t *pdata_size) void *data_copy; AsapoMessageMetaHandle meta; AsapoMessageDataHandle data; - AsapoErrorHandle err = asapo_new_handle(); + AsapoErrorHandle err; uint64_t msg_size; if ( a->stream == NULL ) { if ( select_last_stream(a) ) { - asapo_free_handle(&err); return NULL; } skip_to_stream_end(a); } + err = asapo_new_handle(); meta = asapo_new_handle(); data = asapo_new_handle(); -- cgit v1.2.3 From 6db9619ba5172b9e416a5939d375f5119aa652ff Mon Sep 17 00:00:00 2001 From: Thomas White Date: Thu, 16 Sep 2021 17:21:20 +0200 Subject: Refuse to overwrite an existing stream With online processing, we might end up with a very long stream. It should not be so easily deleted if indexamajig is restarted! --- libcrystfel/src/stream.c | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/libcrystfel/src/stream.c b/libcrystfel/src/stream.c index 093e34e4..67297e49 100644 --- a/libcrystfel/src/stream.c +++ b/libcrystfel/src/stream.c @@ -1167,6 +1167,11 @@ Stream *stream_open_for_write(const char *filename, st->dtempl_write = dtempl; st->dtempl_read = NULL; + if ( file_exists(filename) ) { + ERROR("Refusing to overwrite stream '%s'!\n", filename); + return NULL; + } + st->fh = fopen(filename, "w"); if ( st->fh == NULL ) { ERROR("Failed to open stream.\n"); -- cgit v1.2.3 From 9b22f7216f0f6244a97dc3b58cacec1b99c3bd01 Mon Sep 17 00:00:00 2001 From: Thomas White Date: Fri, 17 Sep 2021 08:54:57 +0200 Subject: ASAP::O: Fix stream switching --- src/im-asapo.c | 22 +++++++++++++--------- 1 file changed, 13 insertions(+), 9 deletions(-) diff --git a/src/im-asapo.c b/src/im-asapo.c index 7d4cbf3f..85cbb159 100644 --- a/src/im-asapo.c +++ b/src/im-asapo.c @@ -177,17 +177,21 @@ static int select_next_stream(struct im_asapo *a) asapo_free_handle(&err); - st = asapo_stream_infos_get_item(si, 0); - next_stream = asapo_stream_info_get_name(st); - asapo_free_handle(&st); - if ( strcmp(next_stream, a->stream) == 0 ) { - STATUS("Waiting for new data...\n"); - } else { - free(a->stream); - a->stream = strdup(next_stream); - STATUS("Selecting next stream: %s\n", a->stream); + /* Stream list includes the current stream, so we need at least + * two entries */ + if ( asapo_stream_infos_get_size(si) < 2 ) { + STATUS("No newer stream. Waiting for new data...\n"); + asapo_free_handle(&si); + return 0; } + /* Stream list includes the current stream, so look at the second one */ + st = asapo_stream_infos_get_item(si, 1); + next_stream = asapo_stream_info_get_name(st); + free(a->stream); + a->stream = strdup(next_stream); + STATUS("Selecting next stream: %s\n", a->stream); + asapo_free_handle(&st); asapo_free_handle(&si); return 0; -- cgit v1.2.3 From eb4bb2d227faf1cff7c78ca10d37e74d72bd730e Mon Sep 17 00:00:00 2001 From: Thomas White Date: Fri, 17 Sep 2021 11:31:07 +0200 Subject: indexamajig(1): Mention ASAP::O --- doc/man/indexamajig.1 | 15 ++++++++++++++- 1 file changed, 14 insertions(+), 1 deletion(-) diff --git a/doc/man/indexamajig.1 b/doc/man/indexamajig.1 index a9d80175..0660167e 100644 --- a/doc/man/indexamajig.1 +++ b/doc/man/indexamajig.1 @@ -185,10 +185,23 @@ Subscribe to ZeroMQ message type \fItag\fR. You can use this option multiple ti .PD Request new data over ZeroMQ by sending string \fImsg\fR. This will cause indexamajig's ZeroMQ socket to use REQ mode instead of SUB. This option and \fB--zmq-subscribe\fR are mutually exclusive. +.PD 0 +.IP \fB--asapo-endpoint=\fIendpoint\fR +.PD +Receive data via the specified ASAP::O endpoint. This option and \fB--zmq-input\fR are mutually exclusive. + +.PD 0 +.IP \fB--asapo-token=\fItoken\fR +.IP \fB--asapo-beamtime=\fIbeamtime\fR +.IP \fB--asapo-source=\fIsource\fR +.IP \fB--asapo-group=\fIgroup\fR +.PD +Authentication token, beamtime, data source and consumer group, respectively, for ASAP::O data. + .PD 0 .IP \fB--data-format=\fIformat\fR .PD -Specify the data format for data received over ZeroMQ. Possible values in this version are \fBmsgpack\fR and \fBhdf5\fR. +Specify the data format for data received over ZeroMQ or ASAP::O. Possible values in this version are \fBmsgpack\fR and \fBhdf5\fR. .PD 0 .IP \fB--basename\fR -- cgit v1.2.3 From 6aff9f9d2fd6cd4e028871e8162536f9e7fa1683 Mon Sep 17 00:00:00 2001 From: Thomas White Date: Fri, 17 Sep 2021 11:32:24 +0200 Subject: ASAP::O: Fix comments --- src/im-asapo.c | 2 +- src/im-asapo.h | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/im-asapo.c b/src/im-asapo.c index 85cbb159..eac85375 100644 --- a/src/im-asapo.c +++ b/src/im-asapo.c @@ -1,5 +1,5 @@ /* - * asapo.c + * im-asapo.c * * ASAP::O data interface * diff --git a/src/im-asapo.h b/src/im-asapo.h index 04856511..61d237db 100644 --- a/src/im-asapo.h +++ b/src/im-asapo.h @@ -1,5 +1,5 @@ /* - * asapo.h + * im-asapo.h * * ASAP::O data interface * -- cgit v1.2.3 From af270239fa00eb4ca295fb4d866064bfd8a767b7 Mon Sep 17 00:00:00 2001 From: Thomas White Date: Wed, 1 Dec 2021 16:55:24 +0100 Subject: ASAP::O: Pass filename through --- src/im-asapo.c | 9 +++++---- src/im-asapo.h | 8 ++++++-- src/im-sandbox.c | 11 ++++++++++- 3 files changed, 21 insertions(+), 7 deletions(-) diff --git a/src/im-asapo.c b/src/im-asapo.c index eac85375..ff5c010a 100644 --- a/src/im-asapo.c +++ b/src/im-asapo.c @@ -226,7 +226,8 @@ static void skip_to_stream_end(struct im_asapo *a) } -void *im_asapo_fetch(struct im_asapo *a, size_t *pdata_size) +void *im_asapo_fetch(struct im_asapo *a, size_t *pdata_size, + char **pfilename, char **pevent) { void *data_copy; AsapoMessageMetaHandle meta; @@ -265,9 +266,6 @@ void *im_asapo_fetch(struct im_asapo *a, size_t *pdata_size) msg_size = asapo_message_meta_get_size(meta); - STATUS("ASAP::O ID: %llu\n", asapo_message_meta_get_id(meta)); - STATUS("ASAP::O filename: %s\n", asapo_message_meta_get_name(meta)); - data_copy = malloc(msg_size); if ( data_copy == NULL ) { ERROR("Failed to copy data block.\n"); @@ -278,6 +276,9 @@ void *im_asapo_fetch(struct im_asapo *a, size_t *pdata_size) } memcpy(data_copy, asapo_message_data_get_as_chars(data), msg_size); + *pfilename = strdup(asapo_message_meta_get_id(meta)); + *pevent = strdup("//"); + asapo_free_handle(&err); asapo_free_handle(&meta); asapo_free_handle(&data); diff --git a/src/im-asapo.h b/src/im-asapo.h index 61d237db..26546a88 100644 --- a/src/im-asapo.h +++ b/src/im-asapo.h @@ -44,7 +44,8 @@ extern struct im_asapo *im_asapo_connect(const char *endpoint, extern void im_asapo_shutdown(struct im_asapo *a); -extern void *im_asapo_fetch(struct im_asapo *a, size_t *pdata_size); +extern void *im_asapo_fetch(struct im_asapo *a, size_t *pdata_size, + char **pfilename, char **pevent); extern char *im_asapo_make_unique_group_id(const char *endpoint, const char *token); @@ -64,9 +65,12 @@ static UNUSED void im_asapo_shutdown(struct im_asapo *a) { } -static UNUSED void *im_asapo_fetch(struct im_asapo *a, size_t *psize) +static UNUSED void *im_asapo_fetch(struct im_asapo *a, size_t *psize, + char **pfilename, char **pevent) { *psize = 0; + *pfilename = NULL; + *pevent = NULL; return NULL; } diff --git a/src/im-sandbox.c b/src/im-sandbox.c index 0933ab31..059f4a74 100644 --- a/src/im-sandbox.c +++ b/src/im-sandbox.c @@ -471,12 +471,21 @@ static int run_work(const struct index_args *iargs, Stream *st, } else if ( sb->asapo ) { + char *filename; + char *event; + /* Temporary (?) abuse of "zmq_data", even though * data comes via ASAP::O */ pargs.zmq_data = im_asapo_fetch(asapostuff, - &pargs.zmq_data_size); + &pargs.zmq_data_size, + &filename, + &event); if ( pargs.zmq_data != NULL ) { ok = 1; + free(pargs.filename); + free(pargs.event); + pargs.filename = filename; + pargs.event = event; } } else { -- cgit v1.2.3 From ac174a2fda0fa6f09093cb18750141785077a187 Mon Sep 17 00:00:00 2001 From: Thomas White Date: Mon, 13 Dec 2021 16:39:42 +0100 Subject: ASAP::O: Fix call to get filename --- src/im-asapo.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/im-asapo.c b/src/im-asapo.c index ff5c010a..63310dfd 100644 --- a/src/im-asapo.c +++ b/src/im-asapo.c @@ -276,7 +276,7 @@ void *im_asapo_fetch(struct im_asapo *a, size_t *pdata_size, } memcpy(data_copy, asapo_message_data_get_as_chars(data), msg_size); - *pfilename = strdup(asapo_message_meta_get_id(meta)); + *pfilename = strdup(asapo_message_meta_get_name(meta)); *pevent = strdup("//"); asapo_free_handle(&err); -- cgit v1.2.3 From eaebf189dff79fa40de8dd934c738ceafbb2c678 Mon Sep 17 00:00:00 2001 From: Thomas White Date: Tue, 14 Dec 2021 22:30:49 +0100 Subject: ASAP::O: Comment out repetitive status message --- src/im-asapo.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/im-asapo.c b/src/im-asapo.c index 63310dfd..c811c213 100644 --- a/src/im-asapo.c +++ b/src/im-asapo.c @@ -180,7 +180,7 @@ static int select_next_stream(struct im_asapo *a) /* Stream list includes the current stream, so we need at least * two entries */ if ( asapo_stream_infos_get_size(si) < 2 ) { - STATUS("No newer stream. Waiting for new data...\n"); + //STATUS("No newer stream. Waiting for new data...\n"); asapo_free_handle(&si); return 0; } -- cgit v1.2.3 From 78688611d648e3cff2dfeee72880952043a8b3c8 Mon Sep 17 00:00:00 2001 From: Thomas White Date: Fri, 17 Dec 2021 16:15:56 +0100 Subject: ASAP::O: Set path to "auto" --- src/im-asapo.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/im-asapo.c b/src/im-asapo.c index c811c213..bd98a11f 100644 --- a/src/im-asapo.c +++ b/src/im-asapo.c @@ -104,7 +104,7 @@ struct im_asapo *im_asapo_connect(const char *endpoint, cred = asapo_create_source_credentials(kProcessed, beamtime, "", data_source, token); - a->consumer = asapo_create_consumer(endpoint, "", 0, cred, &err); + a->consumer = asapo_create_consumer(endpoint, "auto", 0, cred, &err); asapo_free_handle(&cred); if ( asapo_is_error(err) ) { show_asapo_error("Cannot create ASAP::O consumer", err); -- cgit v1.2.3 From 79f8df1c73f2cb2baab96750263bf99d89971691 Mon Sep 17 00:00:00 2001 From: Thomas White Date: Fri, 17 Dec 2021 16:16:08 +0100 Subject: ASAP::O: Increase timeout to 3 seconds --- src/im-asapo.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/im-asapo.c b/src/im-asapo.c index bd98a11f..388fe8da 100644 --- a/src/im-asapo.c +++ b/src/im-asapo.c @@ -112,7 +112,7 @@ struct im_asapo *im_asapo_connect(const char *endpoint, return NULL; } - asapo_consumer_set_timeout(a->consumer, 1000); + asapo_consumer_set_timeout(a->consumer, 3000); a->group_id = asapo_string_from_c_str(group_id); a->stream = NULL; -- cgit v1.2.3 From be3595c63e5d310624f03be05cb2d2006c4963d2 Mon Sep 17 00:00:00 2001 From: Thomas White Date: Mon, 10 Jan 2022 12:24:53 +0100 Subject: Generate placeholder filename/event earlier Previously, a placeholder was put in the queue ("ZMQdata //" or "ASAPOdata //"), and replaced by image_read_data_block. Instead, the "final" placeholder can be put in the queue already. Note that ASAP::O, at least, will replace this placeholder with a filename delivered by the data transport. --- libcrystfel/src/image.c | 6 ++---- src/im-sandbox.c | 19 +++++++++++++------ src/process_image.c | 8 ++++++++ 3 files changed, 23 insertions(+), 10 deletions(-) diff --git a/libcrystfel/src/image.c b/libcrystfel/src/image.c index 2209ffee..1bad7d00 100644 --- a/libcrystfel/src/image.c +++ b/libcrystfel/src/image.c @@ -1391,7 +1391,6 @@ struct image *image_read_data_block(const DataTemplate *dtempl, int no_mask_data) { struct image *image; - char tmp[64]; if ( dtempl == NULL ) { ERROR("NULL data template!\n"); @@ -1404,9 +1403,8 @@ struct image *image_read_data_block(const DataTemplate *dtempl, return NULL; } - snprintf(tmp, 63, "datablock-%i", serial); - image->filename = strdup(tmp); - image->ev = strdup("//"); + image->filename = NULL; + image->ev = NULL; image->data_block = data_block; image->data_block_size = data_block_size; diff --git a/src/im-sandbox.c b/src/im-sandbox.c index 059f4a74..158da59b 100644 --- a/src/im-sandbox.c +++ b/src/im-sandbox.c @@ -482,6 +482,9 @@ static int run_work(const struct index_args *iargs, Stream *st, &event); if ( pargs.zmq_data != NULL ) { ok = 1; + + /* ASAP::O provides a meaningful filename, which + * replaces the placeholder. */ free(pargs.filename); free(pargs.event); pargs.filename = filename; @@ -889,15 +892,19 @@ static int fill_queue(struct get_pattern_ctx *gpctx, struct sandbox *sb) char *evstr; if ( sb->zmq ) { - /* These values will be passed down to process_image, - * but ignored. The 'real' filename, which is still a - * 'fake' filename - only for accounting purposes - will - * be generated by image_read_data_block(). */ + /* These are just semi-meaningful placeholder values to + * be put into the queue, instead of "(null)". + * A unique filename is needed so that the GUI can + * tell the frames apart from one another. + * ASAP::O, for one, will replace this with a filename + * that corresponds to something real. */ filename = "ZMQdata"; - evstr = strdup("//"); + evstr = malloc(64); + snprintf(evstr, 64, "//%i", sb->serial); } else if ( sb->asapo ) { filename = "ASAPOdata"; - evstr = strdup("//"); + evstr = malloc(64); + snprintf(evstr, 64, "//%i", sb->serial); } else { if ( !get_pattern(gpctx, &filename, &evstr) ) return 1; } diff --git a/src/process_image.c b/src/process_image.c index 9151528d..71be87cc 100644 --- a/src/process_image.c +++ b/src/process_image.c @@ -199,6 +199,14 @@ void process_image(const struct index_args *iargs, struct pattern_args *pargs, iargs->no_mask_data); profile_end("read-data-block"); if ( image == NULL ) return; + + /* image_read_data_block() will leave the filename/event as + * NULL, because there's no file (duh). Fill them in now with + * the values passed down to us. These values might be + * meaningful (e.g. ASAP::O), or just placeholders. */ + image->filename = strdup(pargs->filename); + image->ev = strdup(pargs->event); + } else { profile_start("file-wait-open-read"); image = file_wait_open_read(pargs->filename, pargs->event, -- cgit v1.2.3 From 2bf23b74fc94e95b15d93aff65505dcc8bf10176 Mon Sep 17 00:00:00 2001 From: Thomas White Date: Mon, 28 Mar 2022 14:57:05 +0200 Subject: Add instance ID and pipeline step --- src/im-asapo.c | 17 ++++++++++++++--- 1 file changed, 14 insertions(+), 3 deletions(-) diff --git a/src/im-asapo.c b/src/im-asapo.c index 388fe8da..68635946 100644 --- a/src/im-asapo.c +++ b/src/im-asapo.c @@ -69,7 +69,13 @@ char *im_asapo_make_unique_group_id(const char *endpoint, AsapoStringHandle group_id; AsapoErrorHandle err = asapo_new_handle(); - cred = asapo_create_source_credentials(kProcessed, "", "", "", token); + cred = asapo_create_source_credentials(kProcessed, + "", /* instance ID */ + "", /* pipeline step */ + "", /* beamtime */ + "", /* beamline */ + "", /* data source */ + token); consumer = asapo_create_consumer(endpoint, "", 0, cred, &err); asapo_free_handle(&cred); if ( asapo_is_error(err) ) { @@ -102,8 +108,13 @@ struct im_asapo *im_asapo_connect(const char *endpoint, a = malloc(sizeof(struct im_asapo)); if ( a == NULL ) return NULL; - cred = asapo_create_source_credentials(kProcessed, beamtime, "", - data_source, token); + cred = asapo_create_source_credentials(kProcessed, + "auto", /* instance ID */ + "indexamajig", /* pipeline step */ + beamtime, + "", /* beamline */ + data_source, + token); a->consumer = asapo_create_consumer(endpoint, "auto", 0, cred, &err); asapo_free_handle(&cred); if ( asapo_is_error(err) ) { -- cgit v1.2.3 From cad70f87ea137f7e60a36d9e8aa45f3f03c11a4a Mon Sep 17 00:00:00 2001 From: Thomas White Date: Wed, 20 Apr 2022 16:03:35 +0200 Subject: ASAP::O: Give up if there are no streams at all --- src/im-asapo.c | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/src/im-asapo.c b/src/im-asapo.c index 68635946..f336a735 100644 --- a/src/im-asapo.c +++ b/src/im-asapo.c @@ -149,8 +149,13 @@ static int select_last_stream(struct im_asapo *a) return 1; } - STATUS("Streams available at start:\n"); n = asapo_stream_infos_get_size(si); + if ( n == 0 ) { + STATUS("No streams.\n"); + return 1; + } + + STATUS("Streams available at start:\n"); for ( i=0; i Date: Tue, 3 May 2022 13:48:05 +0200 Subject: ASAP::O: Add profiling bits --- src/im-asapo.c | 3 +++ src/im-sandbox.c | 2 ++ 2 files changed, 5 insertions(+) diff --git a/src/im-asapo.c b/src/im-asapo.c index f336a735..48a32ee4 100644 --- a/src/im-asapo.c +++ b/src/im-asapo.c @@ -39,6 +39,7 @@ #include #include +#include #include "im-asapo.h" @@ -262,8 +263,10 @@ void *im_asapo_fetch(struct im_asapo *a, size_t *pdata_size, meta = asapo_new_handle(); data = asapo_new_handle(); + profile_start("asapo-get-next"); asapo_consumer_get_next(a->consumer, a->group_id, &meta, &data, a->stream, &err); + profile_end("asapo-get-next"); if ( asapo_error_get_type(err) == kEndOfStream ) { select_next_stream(a); asapo_free_handle(&err); diff --git a/src/im-sandbox.c b/src/im-sandbox.c index 158da59b..6e4b7fcd 100644 --- a/src/im-sandbox.c +++ b/src/im-sandbox.c @@ -476,10 +476,12 @@ static int run_work(const struct index_args *iargs, Stream *st, /* Temporary (?) abuse of "zmq_data", even though * data comes via ASAP::O */ + profile_start("asapo-fetch"); pargs.zmq_data = im_asapo_fetch(asapostuff, &pargs.zmq_data_size, &filename, &event); + profile_end("asapo-fetch"); if ( pargs.zmq_data != NULL ) { ok = 1; -- cgit v1.2.3 From 992e25b007f8a418fd7860cc34a441932cdf023c Mon Sep 17 00:00:00 2001 From: Thomas White Date: Tue, 3 May 2022 15:28:46 +0200 Subject: indexamajig: Track metadata (e.g. for ASAP::O) --- libcrystfel/src/image.c | 4 ++++ libcrystfel/src/image.h | 2 ++ src/im-asapo.c | 3 ++- src/im-asapo.h | 5 +++-- src/im-sandbox.c | 29 +++++++++++++++++------------ src/process_image.c | 33 ++++++++++++++++++++++++++++----- src/process_image.h | 5 +++++ 7 files changed, 61 insertions(+), 20 deletions(-) diff --git a/libcrystfel/src/image.c b/libcrystfel/src/image.c index 1bad7d00..10c05525 100644 --- a/libcrystfel/src/image.c +++ b/libcrystfel/src/image.c @@ -1385,6 +1385,7 @@ struct image *image_read(const DataTemplate *dtempl, struct image *image_read_data_block(const DataTemplate *dtempl, void *data_block, size_t data_block_size, + char *meta_data, DataSourceType type, int serial, int no_image_data, @@ -1407,6 +1408,7 @@ struct image *image_read_data_block(const DataTemplate *dtempl, image->ev = NULL; image->data_block = data_block; image->data_block_size = data_block_size; + image->meta_data = meta_data; image->data_source_type = type; @@ -1431,6 +1433,7 @@ void image_free(struct image *image) free(image->filename); free(image->ev); free(image->data_block); + free(image->meta_data); if ( image->detgeom != NULL ) { np = image->detgeom->n_panels; @@ -1476,6 +1479,7 @@ struct image *image_new() image->ev = NULL; image->data_block = NULL; image->data_block_size = 0; + image->meta_data = NULL; image->data_source_type = DATA_SOURCE_TYPE_UNKNOWN; image->n_cached_headers = 0; diff --git a/libcrystfel/src/image.h b/libcrystfel/src/image.h index 3746e115..6c7e2a50 100644 --- a/libcrystfel/src/image.h +++ b/libcrystfel/src/image.h @@ -148,6 +148,7 @@ struct image * filenename/ev OR this should be filled in, but not both */ void *data_block; size_t data_block_size; + char *meta_data; /** A list of metadata read from the stream */ struct header_cache_entry *header_cache[HEADER_CACHE_SIZE]; @@ -225,6 +226,7 @@ extern struct image *image_create_for_simulation(const DataTemplate *dtempl); extern struct image *image_read_data_block(const DataTemplate *dtempl, void *data_block, size_t data_block_size, + char *meta_data, DataSourceType type, int serial, int no_image_data, diff --git a/src/im-asapo.c b/src/im-asapo.c index 48a32ee4..a095c82e 100644 --- a/src/im-asapo.c +++ b/src/im-asapo.c @@ -244,7 +244,7 @@ static void skip_to_stream_end(struct im_asapo *a) void *im_asapo_fetch(struct im_asapo *a, size_t *pdata_size, - char **pfilename, char **pevent) + char **pmeta, char **pfilename, char **pevent) { void *data_copy; AsapoMessageMetaHandle meta; @@ -295,6 +295,7 @@ void *im_asapo_fetch(struct im_asapo *a, size_t *pdata_size, } memcpy(data_copy, asapo_message_data_get_as_chars(data), msg_size); + *pmeta = strdup(asapo_message_meta_get_metadata(meta)); *pfilename = strdup(asapo_message_meta_get_name(meta)); *pevent = strdup("//"); diff --git a/src/im-asapo.h b/src/im-asapo.h index 26546a88..719b5bf1 100644 --- a/src/im-asapo.h +++ b/src/im-asapo.h @@ -45,7 +45,7 @@ extern struct im_asapo *im_asapo_connect(const char *endpoint, extern void im_asapo_shutdown(struct im_asapo *a); extern void *im_asapo_fetch(struct im_asapo *a, size_t *pdata_size, - char **pfilename, char **pevent); + char **pmeta, char **pfilename, char **pevent); extern char *im_asapo_make_unique_group_id(const char *endpoint, const char *token); @@ -66,9 +66,10 @@ static UNUSED void im_asapo_shutdown(struct im_asapo *a) } static UNUSED void *im_asapo_fetch(struct im_asapo *a, size_t *psize, - char **pfilename, char **pevent) + char **pmeta, char **pfilename, char **pevent) { *psize = 0; + *pmeta = NULL; *pfilename = NULL; *pevent = NULL; return NULL; diff --git a/src/im-sandbox.c b/src/im-sandbox.c index 6e4b7fcd..8137db9e 100644 --- a/src/im-sandbox.c +++ b/src/im-sandbox.c @@ -456,6 +456,13 @@ static int run_work(const struct index_args *iargs, Stream *st, free(line); ok = 0; + /* Default values */ + pargs.zmq_data = NULL; + pargs.zmq_data_size = 0; + pargs.asapo_data = NULL; + pargs.asapo_data_size = 0; + pargs.asapo_meta = NULL; + if ( sb->zmq ) { do { @@ -474,15 +481,14 @@ static int run_work(const struct index_args *iargs, Stream *st, char *filename; char *event; - /* Temporary (?) abuse of "zmq_data", even though - * data comes via ASAP::O */ profile_start("asapo-fetch"); - pargs.zmq_data = im_asapo_fetch(asapostuff, - &pargs.zmq_data_size, - &filename, - &event); + pargs.asapo_data = im_asapo_fetch(asapostuff, + &pargs.asapo_data_size, + &pargs.asapo_meta, + &filename, + &event); profile_end("asapo-fetch"); - if ( pargs.zmq_data != NULL ) { + if ( pargs.asapo_data != NULL ) { ok = 1; /* ASAP::O provides a meaningful filename, which @@ -494,8 +500,6 @@ static int run_work(const struct index_args *iargs, Stream *st, } } else { - pargs.zmq_data = NULL; - pargs.zmq_data_size = 0; ok = 1; } @@ -507,9 +511,10 @@ static int run_work(const struct index_args *iargs, Stream *st, profile_end("process-image"); } - /* pargs.zmq_data will be copied into the image structure, so - * that it can be queried for "header" values etc. It will - * eventually be freed by image_free() under process_image() */ + /* NB pargs.zmq_data, pargs.asapo_data and pargs.asapo_meta + * will be copied into the image structure, so + * that it can be queried for "header" values etc. They will + * eventually be freed by image_free() under process_image(). */ if ( sb->profile ) { profile_print_and_reset(); diff --git a/src/process_image.c b/src/process_image.c index 71be87cc..de2d8792 100644 --- a/src/process_image.c +++ b/src/process_image.c @@ -188,22 +188,45 @@ void process_image(const struct index_args *iargs, struct pattern_args *pargs, int any_crystals; if ( pargs->zmq_data != NULL ) { - set_last_task(last_task, "unpacking messagepack object"); - profile_start("read-data-block"); + + set_last_task(last_task, "unpacking ZMQ data"); + profile_start("read-zmq-data"); image = image_read_data_block(iargs->dtempl, pargs->zmq_data, pargs->zmq_data_size, + NULL, + iargs->data_format, + serial, + iargs->no_image_data, + iargs->no_mask_data); + profile_end("read-zmq-data"); + if ( image == NULL ) return; + + /* image_read_data_block() will leave the filename/event as + * NULL, because there's no file (duh). Fill them in now with + * the values passed down to us. For ZMQ, these values are just + * placeholders. */ + image->filename = strdup(pargs->filename); + image->ev = strdup(pargs->event); + + } else if ( pargs->asapo_data != NULL ) { + + set_last_task(last_task, "unpacking ASAP::O data"); + profile_start("read-asapo-data"); + image = image_read_data_block(iargs->dtempl, + pargs->asapo_data, + pargs->asapo_data_size, + pargs->asapo_meta, iargs->data_format, serial, iargs->no_image_data, iargs->no_mask_data); - profile_end("read-data-block"); + profile_end("read-asapo-data"); if ( image == NULL ) return; /* image_read_data_block() will leave the filename/event as * NULL, because there's no file (duh). Fill them in now with - * the values passed down to us. These values might be - * meaningful (e.g. ASAP::O), or just placeholders. */ + * the values passed down to us from ASAP::O. */ image->filename = strdup(pargs->filename); image->ev = strdup(pargs->event); diff --git a/src/process_image.h b/src/process_image.h index cbf2713b..5e12ff29 100644 --- a/src/process_image.h +++ b/src/process_image.h @@ -119,8 +119,13 @@ struct pattern_args /* "Input" */ char *filename; char *event; + void *zmq_data; size_t zmq_data_size; + + char *asapo_data; + size_t asapo_data_size; + char *asapo_meta; }; -- cgit v1.2.3 From 5a9ad6f30ea2ba5599d50e847d9e9e50b9fbbe1b Mon Sep 17 00:00:00 2001 From: Thomas White Date: Tue, 3 May 2022 14:31:34 +0200 Subject: Seedee deserialization --- doc/man/indexamajig.1 | 2 +- libcrystfel/CMakeLists.txt | 2 + libcrystfel/libcrystfel-config.h.cmake.in | 1 + libcrystfel/libcrystfel-config.h.meson.in | 1 + libcrystfel/meson.build | 11 +- libcrystfel/src/image-seedee.c | 195 ++++++++++++++++++++++++++++++ libcrystfel/src/image-seedee.h | 41 +++++++ libcrystfel/src/image.c | 6 + libcrystfel/src/image.h | 3 +- src/im-asapo.h | 6 +- src/indexamajig.c | 1 + src/process_image.h | 4 +- subprojects/cjson.wrap | 12 ++ 13 files changed, 277 insertions(+), 8 deletions(-) create mode 100644 libcrystfel/src/image-seedee.c create mode 100644 libcrystfel/src/image-seedee.h create mode 100644 subprojects/cjson.wrap diff --git a/doc/man/indexamajig.1 b/doc/man/indexamajig.1 index 0660167e..fa108270 100644 --- a/doc/man/indexamajig.1 +++ b/doc/man/indexamajig.1 @@ -201,7 +201,7 @@ Authentication token, beamtime, data source and consumer group, respectively, fo .PD 0 .IP \fB--data-format=\fIformat\fR .PD -Specify the data format for data received over ZeroMQ or ASAP::O. Possible values in this version are \fBmsgpack\fR and \fBhdf5\fR. +Specify the data format for data received over ZeroMQ or ASAP::O. Possible values in this version are \fBmsgpack\fR, \fBhdf5\fR and \fBseedee\fR. .PD 0 .IP \fB--basename\fR diff --git a/libcrystfel/CMakeLists.txt b/libcrystfel/CMakeLists.txt index 3ca50b1f..ea475637 100644 --- a/libcrystfel/CMakeLists.txt +++ b/libcrystfel/CMakeLists.txt @@ -19,6 +19,7 @@ set(HAVE_FDIP ${FDIP_FOUND}) set(HAVE_MSGPACK ${MSGPACK_FOUND}) set(HAVE_LIBCCP4 ${LIBCCP4_FOUND}) set(HAVE_ZLIB ${ZLIB_FOUND}) +set(HAVE_SEEDEE 0) # Recent enough version of zlib? set(CMAKE_REQUIRED_LIBRARIES "-lz") @@ -62,6 +63,7 @@ set(LIBCRYSTFEL_SOURCES src/image-hdf5.c src/fom.c src/image-msgpack.c + src/image-seedee.c src/profile.c ${BISON_symopp_OUTPUTS} ${FLEX_symopl_OUTPUTS} diff --git a/libcrystfel/libcrystfel-config.h.cmake.in b/libcrystfel/libcrystfel-config.h.cmake.in index 830055cb..82d0611a 100644 --- a/libcrystfel/libcrystfel-config.h.cmake.in +++ b/libcrystfel/libcrystfel-config.h.cmake.in @@ -12,6 +12,7 @@ #cmakedefine HAVE_MSGPACK #cmakedefine HAVE_CLOCK_GETTIME #cmakedefine HAVE_HDF5 +#cmakedefine HAVE_SEEDEE #cmakedefine HAVE_FORKPTY_PTY_H #cmakedefine HAVE_FORKPTY_UTIL_H diff --git a/libcrystfel/libcrystfel-config.h.meson.in b/libcrystfel/libcrystfel-config.h.meson.in index 7d43147d..302fae0b 100644 --- a/libcrystfel/libcrystfel-config.h.meson.in +++ b/libcrystfel/libcrystfel-config.h.meson.in @@ -10,6 +10,7 @@ #mesondefine HAVE_MSGPACK #mesondefine HAVE_CLOCK_GETTIME #mesondefine HAVE_HDF5 +#mesondefine HAVE_SEEDEE #mesondefine HAVE_FORKPTY_PTY_H #mesondefine HAVE_FORKPTY_UTIL_H diff --git a/libcrystfel/meson.build b/libcrystfel/meson.build index b5005aa5..41cfbaa8 100644 --- a/libcrystfel/meson.build +++ b/libcrystfel/meson.build @@ -25,6 +25,14 @@ if fftwdep.found() conf_data.set10('HAVE_FFTW', true) endif +seedeedep = dependency('seedee', required: false) +cjsondep = dependency('cjson', + required: true, + fallback: ['cjson', 'libcjson_dep']) +if cjsondep.found() and seedeedep.found() + conf_data.set10('HAVE_SEEDEE', 1) +endif + xgandalfdep = dependency('xgandalf', required: false, fallback: ['xgandalf', 'xgandalf_dep']) @@ -125,6 +133,7 @@ libcrystfel_sources = ['src/image.c', 'src/image-cbf.c', 'src/image-hdf5.c', 'src/image-msgpack.c', + 'src/image-seedee.c', 'src/indexers/dirax.c', 'src/indexers/felix.c', 'src/indexers/mosflm.c', @@ -150,7 +159,7 @@ libcrystfel = library('crystfel', [libcrystfel_sources, libcrystfel_versionc], dependencies: [mdep, utildep, fftwdep, gsldep, zlibdep, hdf5dep, pthreaddep, xgandalfdep, pinkindexerdep, fdipdep, - ccp4dep, msgpackdep], + ccp4dep, msgpackdep, seedeedep, cjsondep], install: true) libcrystfeldep = declare_dependency(include_directories: libcrystfel_includes, diff --git a/libcrystfel/src/image-seedee.c b/libcrystfel/src/image-seedee.c new file mode 100644 index 00000000..1965aefa --- /dev/null +++ b/libcrystfel/src/image-seedee.c @@ -0,0 +1,195 @@ +/* + * image-seedee.c + * + * Image loading with Seedee + * + * Copyright © 2017-2022 Deutsches Elektronen-Synchrotron DESY, + * a research centre of the Helmholtz Association. + * + * Authors: + * 2018-2022 Thomas White + * 2014 Valerio Mariani + * 2017 Stijn de Graaf + * + * This file is part of CrystFEL. + * + * CrystFEL is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * CrystFEL is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with CrystFEL. If not, see . + * + */ + +#include + +#include +#include +#include +#include +#include + +#include +#include +#include + +#include "datatemplate_priv.h" + + +#if defined(HAVE_SEEDEE) + +#include +#include + + +static int load_seedee_data(struct panel_template *p, + struct SeedeeNDArray *array, + float **pdata) +{ + int data_size_fs, data_size_ss; + float *data = NULL; + + data_size_ss = array->shape[0]; + data_size_fs = array->shape[1]; + + if ( (p->orig_min_fs + PANEL_WIDTH(p) > data_size_fs) + || (p->orig_min_ss + PANEL_HEIGHT(p) > data_size_ss) ) + { + ERROR("Data for panel %s (%i x %i + %i + %i) is outside data " + "array bounds (%i x %i)\n", + p->name, + PANEL_WIDTH(p), PANEL_HEIGHT(p), + p->orig_min_fs, p->orig_min_ss, + data_size_fs, data_size_ss); + return 1; + } + + if ( (array->datatype == 'u') + && (array->itemsize == 2) + && (array->byteorder == '<') ) + { + int fs, ss; + uint16_t *in_data = (uint16_t *)array->data; + + data = malloc(PANEL_WIDTH(p) * PANEL_HEIGHT(p) * sizeof(float)); + if ( data == NULL ) return 1; + + for ( ss=0; ssorig_min_fs + (ss+p->orig_min_ss)*data_size_fs; + data[fs+ss*PANEL_WIDTH(p)] = in_data[idx]; + } + } + *pdata = data; + + } else { + ERROR("Unrecognised data type %c%i%c\n", + array->datatype, array->itemsize, array->byteorder); + return 1; + } + + return 0; +} + + +/* Read the image data from 'data_block' into 'image', according to 'dtempl' */ +int image_seedee_read(struct image *image, + DataTemplate *dtempl, + void *data_block, + size_t data_block_size, + char *meta_data) +{ + struct SeedeeNDArray array; + int r; + bool zero_copy; + int i; + cJSON *json; + cJSON *data_format_str; + + json = cJSON_Parse(meta_data); + if ( json == NULL ) { + ERROR("Failed to parse JSON\n"); + return 1; + } + + data_format_str = cJSON_GetObjectItemCaseSensitive(json, "_data_format"); + if ( !cJSON_IsString(data_format_str) ) { + ERROR("_data_format isn't a string"); + cJSON_Delete(json); + return 1; + } + + profile_start("seedee-get-size"); + array.size = seedee_get_data_size(data_format_str->valuestring, + data_block, data_block_size, + &zero_copy, &array); + profile_end("seedee-get-size"); + array.data = malloc(array.size); + array.shape = malloc(array.ndims*sizeof(int)); + if ( (array.data == NULL) || (array.shape == NULL) ) { + cJSON_Delete(json); + return 1; + } + + if ( array.ndims != 2 ) { + ERROR("Seedee data has unexpected number of dimensions " + "(%i, expected 2)\n", array.ndims); + return 1; + } + + profile_start("seedee-deserialize"); + r = seedee_deserialize_ndarray(data_format_str->valuestring, + data_block, data_block_size, + 0, &array); + profile_end("seedee-deserialize"); + cJSON_Delete(json); + if ( r < 0 ) { + ERROR("Seedee deserialiation failed.\n"); + return 1; + } + + image->dp = malloc(dtempl->n_panels*sizeof(float *)); + if ( image->dp == NULL ) { + ERROR("Failed to allocate data array.\n"); + return 1; + } + + /* Set all pointers to NULL for easier clean-up */ + for ( i=0; in_panels; i++ ) image->dp[i] = NULL; + + profile_start("seedee-panel"); + for ( i=0; in_panels; i++ ) { + if ( load_seedee_data(&dtempl->panels[i], &array, &image->dp[i]) ) + { + ERROR("Failed to load data for panel '%s'\n", + dtempl->panels[i].name); + profile_end("seedee-panel"); + return 1; + } + } + profile_end("seedee-panel"); + + return 0; +} + + +#else /* defined(HAVE_SEEDEE) */ + +int image_seedee_read(struct image *image, + const DataTemplate *dtempl, + void *data, + size_t data_size, + char *meta_data) +{ + ERROR("Seedee is not supported in this installation (read).\n"); + return 1; +} + +#endif /* defined(HAVE_SEEDEE) */ diff --git a/libcrystfel/src/image-seedee.h b/libcrystfel/src/image-seedee.h new file mode 100644 index 00000000..9432e74b --- /dev/null +++ b/libcrystfel/src/image-seedee.h @@ -0,0 +1,41 @@ +/* + * image-seedee.h + * + * Image loading, SeeDee parts + * + * Copyright © 2012-2022 Deutsches Elektronen-Synchrotron DESY, + * a research centre of the Helmholtz Association. + * + * Authors: + * 2020-2022 Thomas White + * + * This file is part of CrystFEL. + * + * CrystFEL is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * CrystFEL is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with CrystFEL. If not, see . + * + */ + +#ifndef IMAGE_SEEDEE_H +#define IMAGE_SEEDEE_H + +#include "datatemplate.h" + + +extern int image_seedee_read(struct image *image, + const DataTemplate *dtempl, + void *data, + size_t data_size, + char *meta_data); + +#endif /* IMAGE_SEEDEE_H */ diff --git a/libcrystfel/src/image.c b/libcrystfel/src/image.c index 10c05525..f9c75b19 100644 --- a/libcrystfel/src/image.c +++ b/libcrystfel/src/image.c @@ -42,6 +42,7 @@ #include "image-hdf5.h" #include "image-cbf.h" #include "image-msgpack.h" +#include "image-seedee.h" #include "profile.h" #include "datatemplate.h" @@ -803,6 +804,11 @@ static int image_read_image_data(struct image *image, return image_msgpack_read(image, dtempl, image->data_block, image->data_block_size); + case DATA_SOURCE_TYPE_SEEDEE: + return image_seedee_read(image, dtempl, image->data_block, + image->data_block_size, + image->meta_data); + default: ERROR("Unrecognised file type %i (image_read_image_data)\n", image->data_source_type); diff --git a/libcrystfel/src/image.h b/libcrystfel/src/image.h index 6c7e2a50..654bf3b1 100644 --- a/libcrystfel/src/image.h +++ b/libcrystfel/src/image.h @@ -103,7 +103,8 @@ typedef enum DATA_SOURCE_TYPE_HDF5, DATA_SOURCE_TYPE_CBF, DATA_SOURCE_TYPE_CBFGZ, - DATA_SOURCE_TYPE_MSGPACK + DATA_SOURCE_TYPE_MSGPACK, + DATA_SOURCE_TYPE_SEEDEE } DataSourceType; diff --git a/src/im-asapo.h b/src/im-asapo.h index 719b5bf1..5fd7665c 100644 --- a/src/im-asapo.h +++ b/src/im-asapo.h @@ -3,11 +3,11 @@ * * ASAP::O data interface * - * Copyright © 2021 Deutsches Elektronen-Synchrotron DESY, - * a research centre of the Helmholtz Association. + * Copyright © 2021-2022 Deutsches Elektronen-Synchrotron DESY, + * a research centre of the Helmholtz Association. * * Authors: - * 2021 Thomas White + * 2021-2022 Thomas White * * This file is part of CrystFEL. * diff --git a/src/indexamajig.c b/src/indexamajig.c index 8d383d8e..dbea7261 100644 --- a/src/indexamajig.c +++ b/src/indexamajig.c @@ -297,6 +297,7 @@ static DataSourceType parse_data_format(const char *str) { if ( strcmp(str, "hdf5") == 0 ) return DATA_SOURCE_TYPE_HDF5; if ( strcmp(str, "msgpack") == 0 ) return DATA_SOURCE_TYPE_MSGPACK; + if ( strcmp(str, "seedee") == 0 ) return DATA_SOURCE_TYPE_SEEDEE; /* CBF and CBFGZ should be added here once image-cbf.c supports * in-memory access */ return DATA_SOURCE_TYPE_UNKNOWN; diff --git a/src/process_image.h b/src/process_image.h index 5e12ff29..e2f792a5 100644 --- a/src/process_image.h +++ b/src/process_image.h @@ -3,11 +3,11 @@ * * The processing pipeline for one image * - * Copyright © 2012-2021 Deutsches Elektronen-Synchrotron DESY, + * Copyright © 2012-2022 Deutsches Elektronen-Synchrotron DESY, * a research centre of the Helmholtz Association. * * Authors: - * 2010-2021 Thomas White + * 2010-2022 Thomas White * 2014-2017 Valerio Mariani * 2017-2018 Yaroslav Gevorkov * diff --git a/subprojects/cjson.wrap b/subprojects/cjson.wrap new file mode 100644 index 00000000..dc10279a --- /dev/null +++ b/subprojects/cjson.wrap @@ -0,0 +1,12 @@ +[wrap-file] +directory = cJSON-1.7.15 +source_url = https://github.com/DaveGamble/cJSON/archive/refs/tags/v1.7.15.tar.gz +source_filename = v1.7.15.tar.gz +source_hash = 5308fd4bd90cef7aa060558514de6a1a4a0819974a26e6ed13973c5f624c24b2 +patch_filename = cjson_1.7.15-2_patch.zip +patch_url = https://wrapdb.mesonbuild.com/v2/cjson_1.7.15-2/get_patch +patch_hash = d83b4bc0ca94e392c62c8c6c7839392f382d66a84974f5e10611074836ef1777 + +[provide] +libcjson = libcjson_dep + -- cgit v1.2.3 From 8e601d452a8b3d022b89e904c0cecee812f2b636 Mon Sep 17 00:00:00 2001 From: Thomas White Date: Wed, 4 May 2022 14:44:37 +0200 Subject: ASAP::O: Add profiling --- src/im-asapo.c | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/src/im-asapo.c b/src/im-asapo.c index a095c82e..2e5ed504 100644 --- a/src/im-asapo.c +++ b/src/im-asapo.c @@ -252,23 +252,30 @@ void *im_asapo_fetch(struct im_asapo *a, size_t *pdata_size, AsapoErrorHandle err; uint64_t msg_size; + profile_start("select-stream"); if ( a->stream == NULL ) { if ( select_last_stream(a) ) { + profile_end("select-stream"); return NULL; } skip_to_stream_end(a); } + profile_end("select-stream"); + profile_start("create-handles"); err = asapo_new_handle(); meta = asapo_new_handle(); data = asapo_new_handle(); + profile_end("create-handles"); profile_start("asapo-get-next"); asapo_consumer_get_next(a->consumer, a->group_id, &meta, &data, a->stream, &err); profile_end("asapo-get-next"); if ( asapo_error_get_type(err) == kEndOfStream ) { + profile_start("next-stream"); select_next_stream(a); + profile_end("next-stream"); asapo_free_handle(&err); asapo_free_handle(&meta); asapo_free_handle(&data); @@ -283,8 +290,11 @@ void *im_asapo_fetch(struct im_asapo *a, size_t *pdata_size, return NULL; } + profile_start("get-size"); msg_size = asapo_message_meta_get_size(meta); + profile_end("get-size"); + profile_start("malloc-copy"); data_copy = malloc(msg_size); if ( data_copy == NULL ) { ERROR("Failed to copy data block.\n"); @@ -294,10 +304,13 @@ void *im_asapo_fetch(struct im_asapo *a, size_t *pdata_size, return NULL; } memcpy(data_copy, asapo_message_data_get_as_chars(data), msg_size); + profile_end("malloc-copy"); + profile_start("copy-meta"); *pmeta = strdup(asapo_message_meta_get_metadata(meta)); *pfilename = strdup(asapo_message_meta_get_name(meta)); *pevent = strdup("//"); + profile_end("copy-meta"); asapo_free_handle(&err); asapo_free_handle(&meta); -- cgit v1.2.3 From ffd98b770d6dfa7c1bef4b2ae54e0b637f2e7ac3 Mon Sep 17 00:00:00 2001 From: Thomas White Date: Wed, 4 May 2022 12:14:27 +0200 Subject: indexamajig: Add --asapo-stream --- doc/man/indexamajig.1 | 5 +++++ src/im-asapo.c | 47 ++++++++++++++++++++++++++++++++++++++++------- src/im-asapo.h | 13 +++++++++---- src/im-sandbox.c | 18 ++++++++++++++---- src/im-sandbox.h | 4 ++-- src/indexamajig.c | 11 +++++++++-- 6 files changed, 79 insertions(+), 19 deletions(-) diff --git a/doc/man/indexamajig.1 b/doc/man/indexamajig.1 index fa108270..25e025ac 100644 --- a/doc/man/indexamajig.1 +++ b/doc/man/indexamajig.1 @@ -198,6 +198,11 @@ Receive data via the specified ASAP::O endpoint. This option and \fB--zmq-input .PD Authentication token, beamtime, data source and consumer group, respectively, for ASAP::O data. +.PD 0 +.IP \fB--asapo-stream=\fIstream\fR +.PD +Name of ASAP::O stream to process. If this option is not given, indexamajig will start processing from the end of the current last stream. + .PD 0 .IP \fB--data-format=\fIformat\fR .PD diff --git a/src/im-asapo.c b/src/im-asapo.c index 2e5ed504..95445fd0 100644 --- a/src/im-asapo.c +++ b/src/im-asapo.c @@ -49,6 +49,7 @@ struct im_asapo { char *stream; + int online_mode; AsapoConsumerHandle consumer; AsapoStringHandle group_id; }; @@ -100,7 +101,8 @@ struct im_asapo *im_asapo_connect(const char *endpoint, const char *token, const char *beamtime, const char *group_id, - const char *data_source) + const char *data_source, + const char *stream) { struct im_asapo *a; AsapoSourceCredentialsHandle cred; @@ -127,7 +129,30 @@ struct im_asapo *im_asapo_connect(const char *endpoint, asapo_consumer_set_timeout(a->consumer, 3000); a->group_id = asapo_string_from_c_str(group_id); - a->stream = NULL; + if ( stream != NULL ) { + + /* Named stream mode */ + AsapoErrorHandle err = asapo_new_handle(); + + a->stream = strdup(stream); + + asapo_consumer_set_last_read_marker(a->consumer, + a->group_id, 0, + a->stream, &err); + if ( asapo_is_error(err) ) { + show_asapo_error("Failed to skip to start of stream", err); + } else { + STATUS("Skipped to start of stream %s\n", a->stream); + } + + asapo_free_handle(&err); + a->online_mode = 0; + + } else { + /* Online mode */ + a->stream = NULL; + a->online_mode = 1; + } return a; } @@ -244,7 +269,8 @@ static void skip_to_stream_end(struct im_asapo *a) void *im_asapo_fetch(struct im_asapo *a, size_t *pdata_size, - char **pmeta, char **pfilename, char **pevent) + char **pmeta, char **pfilename, char **pevent, + int *pfinished) { void *data_copy; AsapoMessageMetaHandle meta; @@ -252,6 +278,8 @@ void *im_asapo_fetch(struct im_asapo *a, size_t *pdata_size, AsapoErrorHandle err; uint64_t msg_size; + *pfinished = 0; + profile_start("select-stream"); if ( a->stream == NULL ) { if ( select_last_stream(a) ) { @@ -273,13 +301,18 @@ void *im_asapo_fetch(struct im_asapo *a, size_t *pdata_size, a->stream, &err); profile_end("asapo-get-next"); if ( asapo_error_get_type(err) == kEndOfStream ) { - profile_start("next-stream"); - select_next_stream(a); - profile_end("next-stream"); asapo_free_handle(&err); asapo_free_handle(&meta); asapo_free_handle(&data); - return NULL; /* Sandbox will call try again very soon */ + if ( a->online_mode ) { + profile_start("next-stream"); + select_next_stream(a); + profile_end("next-stream"); + /* Sandbox will call to try again very soon */ + } else { + *pfinished = 1; + } + return NULL; } if ( asapo_is_error(err) ) { diff --git a/src/im-asapo.h b/src/im-asapo.h index 5fd7665c..ab68e1c2 100644 --- a/src/im-asapo.h +++ b/src/im-asapo.h @@ -40,12 +40,14 @@ extern struct im_asapo *im_asapo_connect(const char *endpoint, const char *token, const char *beamtime, const char *group_id, - const char *data_source); + const char *data_source, + const char *stream); extern void im_asapo_shutdown(struct im_asapo *a); extern void *im_asapo_fetch(struct im_asapo *a, size_t *pdata_size, - char **pmeta, char **pfilename, char **pevent); + char **pmeta, char **pfilename, char **pevent, + int *pfinished); extern char *im_asapo_make_unique_group_id(const char *endpoint, const char *token); @@ -56,7 +58,8 @@ static UNUSED struct im_asapo *im_asapo_connect(const char *endpoint, const char *token, const char *beamtime, const char *group_id, - const char *data_source) + const char *data_source, + const char *stream) { return NULL; } @@ -66,12 +69,14 @@ static UNUSED void im_asapo_shutdown(struct im_asapo *a) } static UNUSED void *im_asapo_fetch(struct im_asapo *a, size_t *psize, - char **pmeta, char **pfilename, char **pevent) + char **pmeta, char **pfilename, char **pevent, + int *pfinished) { *psize = 0; *pmeta = NULL; *pfilename = NULL; *pevent = NULL; + *pfinished = 1; return NULL; } diff --git a/src/im-sandbox.c b/src/im-sandbox.c index 8137db9e..8e1e5004 100644 --- a/src/im-sandbox.c +++ b/src/im-sandbox.c @@ -112,6 +112,7 @@ struct sandbox const char *asapo_beamtime; const char *asapo_group_id; const char *asapo_source; + const char *asapo_stream; /* Final output */ Stream *stream; @@ -362,7 +363,8 @@ static int run_work(const struct index_args *iargs, Stream *st, sb->asapo_token, sb->asapo_beamtime, sb->asapo_group_id, - sb->asapo_source); + sb->asapo_source, + sb->asapo_stream); if ( asapostuff == NULL ) { ERROR("ASAP::O setup failed.\n"); return 1; @@ -480,13 +482,15 @@ static int run_work(const struct index_args *iargs, Stream *st, char *filename; char *event; + int finished = 0; profile_start("asapo-fetch"); pargs.asapo_data = im_asapo_fetch(asapostuff, &pargs.asapo_data_size, &pargs.asapo_meta, &filename, - &event); + &event, + &finished); profile_end("asapo-fetch"); if ( pargs.asapo_data != NULL ) { ok = 1; @@ -497,6 +501,11 @@ static int run_work(const struct index_args *iargs, Stream *st, free(pargs.event); pargs.filename = filename; pargs.event = event; + } else { + if ( finished ) { + sb->shared->should_shutdown = 1; + allDone = 1; + } } } else { @@ -1120,8 +1129,8 @@ int create_sandbox(struct index_args *iargs, int n_proc, char *prefix, const char *zmq_address, char **zmq_subscriptions, int n_zmq_subscriptions, const char *zmq_request, const char *asapo_endpoint, const char *asapo_token, - const char *asapo_beamtime, - const char *asapo_group_id, const char *asapo_source, + const char *asapo_beamtime, const char *asapo_group_id, + const char *asapo_source, const char *asapo_stream, int timeout, int profile) { int i; @@ -1168,6 +1177,7 @@ int create_sandbox(struct index_args *iargs, int n_proc, char *prefix, sb->asapo_token = asapo_token; sb->asapo_beamtime = asapo_beamtime; sb->asapo_source = asapo_source; + sb->asapo_stream = asapo_stream; } else { sb->asapo = 0; } diff --git a/src/im-sandbox.h b/src/im-sandbox.h index a1b9ed36..e1d2e1b9 100644 --- a/src/im-sandbox.h +++ b/src/im-sandbox.h @@ -88,8 +88,8 @@ extern int create_sandbox(struct index_args *iargs, int n_proc, char *prefix, const char *zmq_address, char **zmq_subscriptions, int n_zmq_subscriptions, const char *zmq_request, const char *asapo_endpoint, const char *asapo_token, - const char *asapo_beamtime, - const char *asapo_group_id, const char *asapo_source, + const char *asapo_beamtime, const char *asapo_group_id, + const char *asapo_source, const char *asapo_stream, int timeout, int profile); #endif /* IM_SANDBOX_H */ diff --git a/src/indexamajig.c b/src/indexamajig.c index dbea7261..6787f903 100644 --- a/src/indexamajig.c +++ b/src/indexamajig.c @@ -89,6 +89,7 @@ struct indexamajig_arguments char *asapo_beamtime; char *asapo_group_id; char *asapo_source; + char *asapo_stream; int serial_start; char *temp_location; int if_refine; @@ -436,6 +437,10 @@ static error_t parse_arg(int key, char *arg, struct argp_state *state) } break; + case 220 : + args->asapo_stream = strdup(arg); + break; + /* ---------- Peak search ---------- */ case 't' : @@ -857,6 +862,7 @@ int main(int argc, char *argv[]) args.asapo_beamtime = NULL; args.asapo_group_id = NULL; args.asapo_source = NULL; + args.asapo_stream = NULL; args.n_zmq_subscriptions = 0; args.serial_start = 1; args.if_peaks = 1; @@ -971,6 +977,7 @@ int main(int argc, char *argv[]) {"asapo-group", 217, "str", OPTION_NO_USAGE, "ASAP::O group ID"}, {"asapo-source", 218, "str", OPTION_NO_USAGE, "ASAP::O data source"}, {"data-format", 219, "str", OPTION_NO_USAGE, "Streamed data format"}, + {"asapo-stream", 220, "str", OPTION_NO_USAGE, "ASAP::O stream name"}, {NULL, 0, 0, OPTION_DOC, "Peak search options:", 3}, {"peaks", 301, "method", 0, "Peak search method. Default: zaef"}, @@ -1332,8 +1339,8 @@ int main(int argc, char *argv[]) args.zmq_addr, args.zmq_subscriptions, args.n_zmq_subscriptions, args.zmq_request, args.asapo_endpoint, args.asapo_token, - args.asapo_beamtime, - args.asapo_group_id, args.asapo_source, + args.asapo_beamtime, args.asapo_group_id, + args.asapo_source, args.asapo_stream, timeout, args.profile); cell_free(args.iargs.cell); -- cgit v1.2.3 From b0bf26431f5e4fb6bb2cb5ae3501fd57a10d15c2 Mon Sep 17 00:00:00 2001 From: Thomas White Date: Fri, 6 May 2022 12:01:21 +0200 Subject: Fix horrific Seedee memory leak --- libcrystfel/src/image-seedee.c | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/libcrystfel/src/image-seedee.c b/libcrystfel/src/image-seedee.c index 1965aefa..537fc9e3 100644 --- a/libcrystfel/src/image-seedee.c +++ b/libcrystfel/src/image-seedee.c @@ -135,12 +135,16 @@ int image_seedee_read(struct image *image, array.shape = malloc(array.ndims*sizeof(int)); if ( (array.data == NULL) || (array.shape == NULL) ) { cJSON_Delete(json); + free(array.data); + free(array.shape); return 1; } if ( array.ndims != 2 ) { ERROR("Seedee data has unexpected number of dimensions " "(%i, expected 2)\n", array.ndims); + free(array.data); + free(array.shape); return 1; } @@ -152,12 +156,16 @@ int image_seedee_read(struct image *image, cJSON_Delete(json); if ( r < 0 ) { ERROR("Seedee deserialiation failed.\n"); + free(array.data); + free(array.shape); return 1; } image->dp = malloc(dtempl->n_panels*sizeof(float *)); if ( image->dp == NULL ) { ERROR("Failed to allocate data array.\n"); + free(array.data); + free(array.shape); return 1; } @@ -171,11 +179,16 @@ int image_seedee_read(struct image *image, ERROR("Failed to load data for panel '%s'\n", dtempl->panels[i].name); profile_end("seedee-panel"); + free(array.data); + free(array.shape); return 1; } } profile_end("seedee-panel"); + free(array.data); + free(array.shape); + return 0; } -- cgit v1.2.3 From 540884d85b8fd7449946869bd811d153b7de65a5 Mon Sep 17 00:00:00 2001 From: Thomas White Date: Fri, 6 May 2022 12:01:50 +0200 Subject: Use isfinite() instead of isnan() || isinf() These FP calls seem to be slower than expected. Using only one doubles the speed. --- libcrystfel/src/image.c | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/libcrystfel/src/image.c b/libcrystfel/src/image.c index f9c75b19..5dccfec6 100644 --- a/libcrystfel/src/image.c +++ b/libcrystfel/src/image.c @@ -887,8 +887,7 @@ static void mark_flagged_pixels_naninf(float *dp, int *bad, { long int i; for ( i=0; i Date: Fri, 6 May 2022 12:02:31 +0200 Subject: Add profiling for bad region generation --- libcrystfel/src/image.c | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/libcrystfel/src/image.c b/libcrystfel/src/image.c index 5dccfec6..5d67ae2d 100644 --- a/libcrystfel/src/image.c +++ b/libcrystfel/src/image.c @@ -903,8 +903,11 @@ static void mark_flagged_pixels(struct panel_template *p, p_h = p->orig_max_ss - p->orig_min_ss + 1; n = p_w * p_h; + profile_start("nan-inf"); mark_flagged_pixels_naninf(dp, bad, n); + profile_end("nan-inf"); + profile_start("flag-values"); for ( i=0; iflag_values[i]; @@ -928,6 +931,7 @@ static void mark_flagged_pixels(struct panel_template *p, } } + profile_end("flag-values"); } @@ -1101,27 +1105,34 @@ static int create_badmap(struct image *image, /* Panel marked as bad? */ if ( p->bad ) { + profile_start("whole-panel"); /* NB this sets every element to 0x1111, * but that's OK - value is still 'true'. */ memset(image->bad[i], 1, p_w*p_h); + profile_end("whole-panel"); } /* Add bad regions (skip if panel is bad anyway) */ if ( !p->bad ) { + profile_start("flagged-pixels"); mark_flagged_pixels(p, image->dp[i], image->bad[i]); + profile_end("flagged-pixels"); } /* Mask panel edges (skip if panel is bad anyway) */ if ( (p->mask_edge_pixels > 0) && !p->bad ) { + profile_start("panel-edges"); mask_panel_edges(image->bad[i], p_w, p_h, p->mask_edge_pixels); + profile_end("panel-edges"); } /* Load masks (skip if panel is bad anyway) */ if ( (!no_mask_data) && (!p->bad) ) { int j; + profile_start("load-masks"); for ( j=0; jmasks[j].bad_bits); } + profile_end("load-masks"); } } + profile_start("mark-regions"); mark_bad_regions(image, dtempl); + profile_end("mark-regions"); return 0; } -- cgit v1.2.3 From 27d4992f755279a2240059c9cd67f737af0481a2 Mon Sep 17 00:00:00 2001 From: Thomas White Date: Fri, 6 May 2022 12:02:54 +0200 Subject: Add worker ID to profiling data --- libcrystfel/src/profile.c | 10 ++++++---- libcrystfel/src/profile.h | 2 +- src/im-sandbox.c | 2 +- 3 files changed, 8 insertions(+), 6 deletions(-) diff --git a/libcrystfel/src/profile.c b/libcrystfel/src/profile.c index 79228b34..82f0217f 100644 --- a/libcrystfel/src/profile.c +++ b/libcrystfel/src/profile.c @@ -166,7 +166,7 @@ static void free_profile_block(struct _profile_block *b) } -void profile_print_and_reset() +void profile_print_and_reset(int worker_id) { char *buf; char *buf2; @@ -187,10 +187,12 @@ void profile_print_and_reset() stop_profile_block(pd->root); buf = format_profile_block(pd->root); - buf2 = malloc(2+strlen(buf)); - strcpy(buf2, buf); - strcat(buf2, "\n"); + buf2 = malloc(8+strlen(buf)); + size_t len = 8+strlen(buf); + snprintf(buf2, len, "%i %s\n", worker_id, buf); write(STDOUT_FILENO, buf2, strlen(buf2)); + free(buf); + free(buf2); free_profile_block(pd->root); pd->root = start_profile_block("root"); diff --git a/libcrystfel/src/profile.h b/libcrystfel/src/profile.h index 183528ce..61ef20c3 100644 --- a/libcrystfel/src/profile.h +++ b/libcrystfel/src/profile.h @@ -35,7 +35,7 @@ */ extern void profile_init(); -extern void profile_print_and_reset(); +extern void profile_print_and_reset(int worker_id); extern void profile_start(const char *name); extern void profile_end(const char *name); diff --git a/src/im-sandbox.c b/src/im-sandbox.c index 8e1e5004..4337e4a2 100644 --- a/src/im-sandbox.c +++ b/src/im-sandbox.c @@ -526,7 +526,7 @@ static int run_work(const struct index_args *iargs, Stream *st, * eventually be freed by image_free() under process_image(). */ if ( sb->profile ) { - profile_print_and_reset(); + profile_print_and_reset(cookie); } } -- cgit v1.2.3 From b3310d6e91010c7b839c3bf599e866877746462d Mon Sep 17 00:00:00 2001 From: Thomas White Date: Fri, 6 May 2022 12:03:05 +0200 Subject: ASAP::O: Add last_task for ASAPO fetch --- src/im-sandbox.c | 1 + 1 file changed, 1 insertion(+) diff --git a/src/im-sandbox.c b/src/im-sandbox.c index 4337e4a2..83d2f043 100644 --- a/src/im-sandbox.c +++ b/src/im-sandbox.c @@ -485,6 +485,7 @@ static int run_work(const struct index_args *iargs, Stream *st, int finished = 0; profile_start("asapo-fetch"); + set_last_task(sb->shared->last_task[cookie], "ASAPO fetch"); pargs.asapo_data = im_asapo_fetch(asapostuff, &pargs.asapo_data_size, &pargs.asapo_meta, -- cgit v1.2.3 From 0c64576cc6e802129c8eaefaed55101ab02cf15d Mon Sep 17 00:00:00 2001 From: Thomas White Date: Thu, 2 Jun 2022 12:47:21 +0200 Subject: Meson: Change HAVE_ASAPO conf_data from '1' to 'true' See 13a36408f3b867938cb029a9d11d952113d9bf64 --- meson.build | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/meson.build b/meson.build index 964d3f61..4f97d8ce 100644 --- a/meson.build +++ b/meson.build @@ -71,7 +71,7 @@ endif asapodep = dependency('libasapo-consumer', required: false) if asapodep.found() - conf_data.set10('HAVE_ASAPO', 1) + conf_data.set10('HAVE_ASAPO', true) endif if cc.has_function('clock_gettime', prefix: '#include ') -- cgit v1.2.3