diff options
author | Thomas White <taw@physics.org> | 2021-07-01 15:50:31 +0200 |
---|---|---|
committer | Thomas White <taw@physics.org> | 2022-06-02 12:15:38 +0200 |
commit | b28228b5826235ca917d9f42af7d22f75b38bb13 (patch) | |
tree | 57738a8725df12bce74abd80777d8ff62fbb8029 | |
parent | 095fb15a3136fc20642d3ff12bfe26a1aec1b797 (diff) |
ASAP::O guts
-rw-r--r-- | src/im-asapo.c | 130 | ||||
-rw-r--r-- | src/im-asapo.h | 51 | ||||
-rw-r--r-- | src/im-sandbox.c | 36 | ||||
-rw-r--r-- | src/im-sandbox.h | 2 | ||||
-rw-r--r-- | src/indexamajig.c | 33 |
5 files changed, 249 insertions, 3 deletions
diff --git a/src/im-asapo.c b/src/im-asapo.c new file mode 100644 index 00000000..e6061268 --- /dev/null +++ b/src/im-asapo.c @@ -0,0 +1,130 @@ +/* + * asapo.c + * + * ASAP::O data interface + * + * Copyright © 2021 Deutsches Elektronen-Synchrotron DESY, + * a research centre of the Helmholtz Association. + * + * Authors: + * 2021 Thomas White <taw@physics.org> + * + * This file is part of CrystFEL. + * + * CrystFEL is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * CrystFEL is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with CrystFEL. If not, see <http://www.gnu.org/licenses/>. + * + */ + +#ifdef HAVE_CONFIG_H +#include <config.h> +#endif + +#include <stdlib.h> +#include <stdio.h> +#include <stdint.h> +#include <assert.h> +#include <unistd.h> +#include <asapo/consumer_c.h> + +#include <image.h> +#include <utils.h> + +#include "im-asapo.h" + +#include "datatemplate_priv.h" + + +struct im_asapo +{ + AsapoConsumerHandle consumer; + AsapoStringHandle group_id; +}; + + +static void show_asapo_error(const char *msg, const AsapoErrorHandle err) +{ + char buf[1024]; + asapo_error_explain(err, buf, sizeof(buf)); + ERROR("%s: %s\n", msg, buf); +} + + +struct im_asapo *im_asapo_connect(const char *endpoint, + const char *token, + const char *beamtime, + const char *path) +{ + struct im_asapo *a; + AsapoSourceCredentialsHandle cred; + AsapoErrorHandle err = asapo_new_handle(); + + a = malloc(sizeof(struct im_asapo)); + if ( a == NULL ) return NULL; + + cred = asapo_create_source_credentials(kProcessed, beamtime, "", "", token); + a->consumer = asapo_create_consumer(endpoint, path, 1, cred, &err); + asapo_free_handle(&cred); + if ( err ) { + show_asapo_error("Cannot create ASAP::O consumer", err); + free(a); + return NULL; + } + + asapo_consumer_set_timeout(a->consumer, 1000); + + a->group_id = asapo_consumer_generate_new_group_id(a->consumer, &err); + if ( err ) { + show_asapo_error("Cannot create ASAP::O group ID", err); + free(a); + return NULL; + } + + return a; +} + + +void *im_asapo_fetch(struct im_asapo *a, size_t *pdata_size) +{ + void *data_block; + AsapoMessageMetaHandle meta = asapo_new_handle(); + AsapoMessageDataHandle data = asapo_new_handle(); + AsapoErrorHandle err = asapo_new_handle(); + + asapo_consumer_get_next(a->consumer, a->group_id, &meta, &data, + "default", &err); + if ( err ) { + show_asapo_error("Couldn't get next ASAP::O record", err); + return NULL; + } + + STATUS("ASAP::O ID: %llu\n", asapo_message_meta_get_id(meta)); + STATUS("ASAP::O filename: %s\n", asapo_message_meta_get_name(meta)); + + data_block = asapo_message_data_get_as_chars(data); + + asapo_free_handle(&err); + asapo_free_handle(&meta); + asapo_free_handle(&data); + + return data_block; +} + + +void im_asapo_shutdown(struct im_asapo *a) +{ + if ( a == NULL ) return; + asapo_free_handle(&a->consumer); + asapo_free_handle(&a->group_id); + free(a); +} diff --git a/src/im-asapo.h b/src/im-asapo.h new file mode 100644 index 00000000..254e7df3 --- /dev/null +++ b/src/im-asapo.h @@ -0,0 +1,51 @@ +/* + * asapo.h + * + * ASAP::O data interface + * + * Copyright © 2021 Deutsches Elektronen-Synchrotron DESY, + * a research centre of the Helmholtz Association. + * + * Authors: + * 2021 Thomas White <taw@physics.org> + * + * This file is part of CrystFEL. + * + * CrystFEL is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * CrystFEL is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with CrystFEL. If not, see <http://www.gnu.org/licenses/>. + * + */ + + +#ifndef CRYSTFEL_ASAPO_H +#define CRYSTFEL_ASAPO_H + +#ifdef HAVE_CONFIG_H +#include <config.h> +#endif + +#if defined(HAVE_ASAPO) + +extern struct im_asapo *im_asapo_connect(); +extern void im_asapo_shutdown(struct im_asapo *a); +extern void *im_asapo_fetch(struct im_asapo *a, size_t *pdata_size); + +#else /* defined(HAVE_ASAPO) */ + +static UNUSED struct im_asapo *im_asapo_connect() { return NULL; } +static UNUSED void im_asapo_shutdown(struct im_asapo *a) { } +static UNUSED void *im_asapo_fetch(struct im_asapo *a, size_t *psize) { *psize = 0; return NULL; } + +#endif /* defined(HAVE_ASAPO) */ + +#endif /* CRYSTFEL_ASAPO_H */ diff --git a/src/im-sandbox.c b/src/im-sandbox.c index bd3f47e7..ed58a3be 100644 --- a/src/im-sandbox.c +++ b/src/im-sandbox.c @@ -104,6 +104,13 @@ struct sandbox int n_zmq_subscriptions; const char *zmq_request; + /* ASAP::O mode */ + int asapo; + const char *asapo_endpoint; + const char *asapo_token; + const char *asapo_beamtime; + const char *asapo_path; + /* Final output */ Stream *stream; }; @@ -330,6 +337,7 @@ static int run_work(const struct index_args *iargs, Stream *st, { int allDone = 0; struct im_zmq *zmqstuff = NULL; + struct im_asapo *asapostuff = NULL; if ( sb->profile ) { profile_init(); @@ -347,6 +355,17 @@ static int run_work(const struct index_args *iargs, Stream *st, } } + if ( sb->asapo ) { + zmqstuff = im_zmq_connect(sb->zmq_address, + sb->zmq_subscriptions, + sb->n_zmq_subscriptions, + sb->zmq_request); + if ( zmqstuff == NULL ) { + ERROR("ZMQ setup failed.\n"); + return 1; + } + } + while ( !allDone ) { struct pattern_args pargs; @@ -1056,6 +1075,8 @@ int create_sandbox(struct index_args *iargs, int n_proc, char *prefix, Stream *stream, const char *tmpdir, int serial_start, const char *zmq_address, char **zmq_subscriptions, int n_zmq_subscriptions, const char *zmq_request, + const char *asapo_endpoint, const char *asapo_token, + const char *asapo_beamtime, const char *asapo_path, int timeout, int profile) { int i; @@ -1096,6 +1117,21 @@ int create_sandbox(struct index_args *iargs, int n_proc, char *prefix, sb->zmq = 0; } + if ( asapo_endpoint != NULL ) { + sb->asapo = 1; + sb->asapo_endpoint = asapo_endpoint; + sb->asapo_token = asapo_token; + sb->asapo_beamtime = asapo_beamtime; + sb->asapo_path = asapo_path; + } else { + sb->asapo = 0; + } + + if ( sb->zmq && sb->asapo ) { + ERROR("Cannot simultaneously use ZMQ and ASAP::O input.\n"); + return 0; + } + sb->fds = NULL; sb->fhs = NULL; sb->stream = stream; diff --git a/src/im-sandbox.h b/src/im-sandbox.h index a76e69ec..5efd3595 100644 --- a/src/im-sandbox.h +++ b/src/im-sandbox.h @@ -87,6 +87,8 @@ extern int create_sandbox(struct index_args *iargs, int n_proc, char *prefix, const char *tempdir, int serial_start, const char *zmq_address, char **zmq_subscriptions, int n_zmq_subscriptions, const char *zmq_request, + const char *asapo_endpoint, const char *asapo_token, + const char *asapo_beamtime, const char *asapo_path, int timeout, int profile); #endif /* IM_SANDBOX_H */ diff --git a/src/indexamajig.c b/src/indexamajig.c index fc22a562..252a9e8f 100644 --- a/src/indexamajig.c +++ b/src/indexamajig.c @@ -84,6 +84,10 @@ struct indexamajig_arguments char *zmq_request; char *zmq_subscriptions[256]; int n_zmq_subscriptions; + char *asapo_endpoint; + char *asapo_token; + char *asapo_beamtime; + char *asapo_path; int serial_start; char *temp_location; int if_refine; @@ -402,6 +406,22 @@ static error_t parse_arg(int key, char *arg, struct argp_state *state) args->zmq_request = strdup(arg); break; + case 213 : + args->asapo_endpoint = strdup(arg); + break; + + case 214 : + args->asapo_token = strdup(arg); + break; + + case 215 : + args->asapo_beamtime = strdup(arg); + break; + + case 216 : + args->asapo_path = strdup(arg); + break; + case 219 : args->iargs.data_format = parse_data_format(arg); if ( args->iargs.data_format == DATA_SOURCE_TYPE_UNKNOWN ) { @@ -934,6 +954,10 @@ int main(int argc, char *argv[]) "type"}, {"zmq-request", 212, "str", OPTION_NO_USAGE, "Request messages using" "this string."}, + {"asapo-endpoint", 213, "str", OPTION_NO_USAGE, "ASAP::O endpoint"}, + {"asapo-token", 214, "str", OPTION_NO_USAGE, "ASAP::O token"}, + {"asapo-beamtime", 215, "str", OPTION_NO_USAGE, "ASAP::O beamtime ID"}, + {"asapo-path", 216, "str", OPTION_NO_USAGE, "ASAP::O path to files"}, {"data-format", 219, "str", OPTION_NO_USAGE, "Streamed data format"}, {NULL, 0, 0, OPTION_DOC, "Peak search options:", 3}, @@ -1290,9 +1314,12 @@ int main(int argc, char *argv[]) gsl_set_error_handler_off(); r = create_sandbox(&args.iargs, args.n_proc, args.prefix, args.basename, - fh, st, tmpdir, args.serial_start, args.zmq_addr, - args.zmq_subscriptions, args.n_zmq_subscriptions, - args.zmq_request, timeout, args.profile); + fh, st, tmpdir, args.serial_start, + args.zmq_addr, args.zmq_subscriptions, + args.n_zmq_subscriptions, args.zmq_request, + args.asapo_endpoint, args.asapo_token, + args.asapo_beamtime, args.asapo_path, + timeout, args.profile); cell_free(args.iargs.cell); free(args.prefix); |