aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorThomas White <taw@physics.org>2021-07-01 15:50:31 +0200
committerThomas White <taw@physics.org>2022-06-02 12:15:38 +0200
commitb28228b5826235ca917d9f42af7d22f75b38bb13 (patch)
tree57738a8725df12bce74abd80777d8ff62fbb8029
parent095fb15a3136fc20642d3ff12bfe26a1aec1b797 (diff)
ASAP::O guts
-rw-r--r--src/im-asapo.c130
-rw-r--r--src/im-asapo.h51
-rw-r--r--src/im-sandbox.c36
-rw-r--r--src/im-sandbox.h2
-rw-r--r--src/indexamajig.c33
5 files changed, 249 insertions, 3 deletions
diff --git a/src/im-asapo.c b/src/im-asapo.c
new file mode 100644
index 00000000..e6061268
--- /dev/null
+++ b/src/im-asapo.c
@@ -0,0 +1,130 @@
+/*
+ * asapo.c
+ *
+ * ASAP::O data interface
+ *
+ * Copyright © 2021 Deutsches Elektronen-Synchrotron DESY,
+ * a research centre of the Helmholtz Association.
+ *
+ * Authors:
+ * 2021 Thomas White <taw@physics.org>
+ *
+ * This file is part of CrystFEL.
+ *
+ * CrystFEL is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * CrystFEL is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with CrystFEL. If not, see <http://www.gnu.org/licenses/>.
+ *
+ */
+
+#ifdef HAVE_CONFIG_H
+#include <config.h>
+#endif
+
+#include <stdlib.h>
+#include <stdio.h>
+#include <stdint.h>
+#include <assert.h>
+#include <unistd.h>
+#include <asapo/consumer_c.h>
+
+#include <image.h>
+#include <utils.h>
+
+#include "im-asapo.h"
+
+#include "datatemplate_priv.h"
+
+
+struct im_asapo
+{
+ AsapoConsumerHandle consumer;
+ AsapoStringHandle group_id;
+};
+
+
+static void show_asapo_error(const char *msg, const AsapoErrorHandle err)
+{
+ char buf[1024];
+ asapo_error_explain(err, buf, sizeof(buf));
+ ERROR("%s: %s\n", msg, buf);
+}
+
+
+struct im_asapo *im_asapo_connect(const char *endpoint,
+ const char *token,
+ const char *beamtime,
+ const char *path)
+{
+ struct im_asapo *a;
+ AsapoSourceCredentialsHandle cred;
+ AsapoErrorHandle err = asapo_new_handle();
+
+ a = malloc(sizeof(struct im_asapo));
+ if ( a == NULL ) return NULL;
+
+ cred = asapo_create_source_credentials(kProcessed, beamtime, "", "", token);
+ a->consumer = asapo_create_consumer(endpoint, path, 1, cred, &err);
+ asapo_free_handle(&cred);
+ if ( err ) {
+ show_asapo_error("Cannot create ASAP::O consumer", err);
+ free(a);
+ return NULL;
+ }
+
+ asapo_consumer_set_timeout(a->consumer, 1000);
+
+ a->group_id = asapo_consumer_generate_new_group_id(a->consumer, &err);
+ if ( err ) {
+ show_asapo_error("Cannot create ASAP::O group ID", err);
+ free(a);
+ return NULL;
+ }
+
+ return a;
+}
+
+
+void *im_asapo_fetch(struct im_asapo *a, size_t *pdata_size)
+{
+ void *data_block;
+ AsapoMessageMetaHandle meta = asapo_new_handle();
+ AsapoMessageDataHandle data = asapo_new_handle();
+ AsapoErrorHandle err = asapo_new_handle();
+
+ asapo_consumer_get_next(a->consumer, a->group_id, &meta, &data,
+ "default", &err);
+ if ( err ) {
+ show_asapo_error("Couldn't get next ASAP::O record", err);
+ return NULL;
+ }
+
+ STATUS("ASAP::O ID: %llu\n", asapo_message_meta_get_id(meta));
+ STATUS("ASAP::O filename: %s\n", asapo_message_meta_get_name(meta));
+
+ data_block = asapo_message_data_get_as_chars(data);
+
+ asapo_free_handle(&err);
+ asapo_free_handle(&meta);
+ asapo_free_handle(&data);
+
+ return data_block;
+}
+
+
+void im_asapo_shutdown(struct im_asapo *a)
+{
+ if ( a == NULL ) return;
+ asapo_free_handle(&a->consumer);
+ asapo_free_handle(&a->group_id);
+ free(a);
+}
diff --git a/src/im-asapo.h b/src/im-asapo.h
new file mode 100644
index 00000000..254e7df3
--- /dev/null
+++ b/src/im-asapo.h
@@ -0,0 +1,51 @@
+/*
+ * asapo.h
+ *
+ * ASAP::O data interface
+ *
+ * Copyright © 2021 Deutsches Elektronen-Synchrotron DESY,
+ * a research centre of the Helmholtz Association.
+ *
+ * Authors:
+ * 2021 Thomas White <taw@physics.org>
+ *
+ * This file is part of CrystFEL.
+ *
+ * CrystFEL is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * CrystFEL is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with CrystFEL. If not, see <http://www.gnu.org/licenses/>.
+ *
+ */
+
+
+#ifndef CRYSTFEL_ASAPO_H
+#define CRYSTFEL_ASAPO_H
+
+#ifdef HAVE_CONFIG_H
+#include <config.h>
+#endif
+
+#if defined(HAVE_ASAPO)
+
+extern struct im_asapo *im_asapo_connect();
+extern void im_asapo_shutdown(struct im_asapo *a);
+extern void *im_asapo_fetch(struct im_asapo *a, size_t *pdata_size);
+
+#else /* defined(HAVE_ASAPO) */
+
+static UNUSED struct im_asapo *im_asapo_connect() { return NULL; }
+static UNUSED void im_asapo_shutdown(struct im_asapo *a) { }
+static UNUSED void *im_asapo_fetch(struct im_asapo *a, size_t *psize) { *psize = 0; return NULL; }
+
+#endif /* defined(HAVE_ASAPO) */
+
+#endif /* CRYSTFEL_ASAPO_H */
diff --git a/src/im-sandbox.c b/src/im-sandbox.c
index bd3f47e7..ed58a3be 100644
--- a/src/im-sandbox.c
+++ b/src/im-sandbox.c
@@ -104,6 +104,13 @@ struct sandbox
int n_zmq_subscriptions;
const char *zmq_request;
+ /* ASAP::O mode */
+ int asapo;
+ const char *asapo_endpoint;
+ const char *asapo_token;
+ const char *asapo_beamtime;
+ const char *asapo_path;
+
/* Final output */
Stream *stream;
};
@@ -330,6 +337,7 @@ static int run_work(const struct index_args *iargs, Stream *st,
{
int allDone = 0;
struct im_zmq *zmqstuff = NULL;
+ struct im_asapo *asapostuff = NULL;
if ( sb->profile ) {
profile_init();
@@ -347,6 +355,17 @@ static int run_work(const struct index_args *iargs, Stream *st,
}
}
+ if ( sb->asapo ) {
+ zmqstuff = im_zmq_connect(sb->zmq_address,
+ sb->zmq_subscriptions,
+ sb->n_zmq_subscriptions,
+ sb->zmq_request);
+ if ( zmqstuff == NULL ) {
+ ERROR("ZMQ setup failed.\n");
+ return 1;
+ }
+ }
+
while ( !allDone ) {
struct pattern_args pargs;
@@ -1056,6 +1075,8 @@ int create_sandbox(struct index_args *iargs, int n_proc, char *prefix,
Stream *stream, const char *tmpdir, int serial_start,
const char *zmq_address, char **zmq_subscriptions,
int n_zmq_subscriptions, const char *zmq_request,
+ const char *asapo_endpoint, const char *asapo_token,
+ const char *asapo_beamtime, const char *asapo_path,
int timeout, int profile)
{
int i;
@@ -1096,6 +1117,21 @@ int create_sandbox(struct index_args *iargs, int n_proc, char *prefix,
sb->zmq = 0;
}
+ if ( asapo_endpoint != NULL ) {
+ sb->asapo = 1;
+ sb->asapo_endpoint = asapo_endpoint;
+ sb->asapo_token = asapo_token;
+ sb->asapo_beamtime = asapo_beamtime;
+ sb->asapo_path = asapo_path;
+ } else {
+ sb->asapo = 0;
+ }
+
+ if ( sb->zmq && sb->asapo ) {
+ ERROR("Cannot simultaneously use ZMQ and ASAP::O input.\n");
+ return 0;
+ }
+
sb->fds = NULL;
sb->fhs = NULL;
sb->stream = stream;
diff --git a/src/im-sandbox.h b/src/im-sandbox.h
index a76e69ec..5efd3595 100644
--- a/src/im-sandbox.h
+++ b/src/im-sandbox.h
@@ -87,6 +87,8 @@ extern int create_sandbox(struct index_args *iargs, int n_proc, char *prefix,
const char *tempdir, int serial_start,
const char *zmq_address, char **zmq_subscriptions,
int n_zmq_subscriptions, const char *zmq_request,
+ const char *asapo_endpoint, const char *asapo_token,
+ const char *asapo_beamtime, const char *asapo_path,
int timeout, int profile);
#endif /* IM_SANDBOX_H */
diff --git a/src/indexamajig.c b/src/indexamajig.c
index fc22a562..252a9e8f 100644
--- a/src/indexamajig.c
+++ b/src/indexamajig.c
@@ -84,6 +84,10 @@ struct indexamajig_arguments
char *zmq_request;
char *zmq_subscriptions[256];
int n_zmq_subscriptions;
+ char *asapo_endpoint;
+ char *asapo_token;
+ char *asapo_beamtime;
+ char *asapo_path;
int serial_start;
char *temp_location;
int if_refine;
@@ -402,6 +406,22 @@ static error_t parse_arg(int key, char *arg, struct argp_state *state)
args->zmq_request = strdup(arg);
break;
+ case 213 :
+ args->asapo_endpoint = strdup(arg);
+ break;
+
+ case 214 :
+ args->asapo_token = strdup(arg);
+ break;
+
+ case 215 :
+ args->asapo_beamtime = strdup(arg);
+ break;
+
+ case 216 :
+ args->asapo_path = strdup(arg);
+ break;
+
case 219 :
args->iargs.data_format = parse_data_format(arg);
if ( args->iargs.data_format == DATA_SOURCE_TYPE_UNKNOWN ) {
@@ -934,6 +954,10 @@ int main(int argc, char *argv[])
"type"},
{"zmq-request", 212, "str", OPTION_NO_USAGE, "Request messages using"
"this string."},
+ {"asapo-endpoint", 213, "str", OPTION_NO_USAGE, "ASAP::O endpoint"},
+ {"asapo-token", 214, "str", OPTION_NO_USAGE, "ASAP::O token"},
+ {"asapo-beamtime", 215, "str", OPTION_NO_USAGE, "ASAP::O beamtime ID"},
+ {"asapo-path", 216, "str", OPTION_NO_USAGE, "ASAP::O path to files"},
{"data-format", 219, "str", OPTION_NO_USAGE, "Streamed data format"},
{NULL, 0, 0, OPTION_DOC, "Peak search options:", 3},
@@ -1290,9 +1314,12 @@ int main(int argc, char *argv[])
gsl_set_error_handler_off();
r = create_sandbox(&args.iargs, args.n_proc, args.prefix, args.basename,
- fh, st, tmpdir, args.serial_start, args.zmq_addr,
- args.zmq_subscriptions, args.n_zmq_subscriptions,
- args.zmq_request, timeout, args.profile);
+ fh, st, tmpdir, args.serial_start,
+ args.zmq_addr, args.zmq_subscriptions,
+ args.n_zmq_subscriptions, args.zmq_request,
+ args.asapo_endpoint, args.asapo_token,
+ args.asapo_beamtime, args.asapo_path,
+ timeout, args.profile);
cell_free(args.iargs.cell);
free(args.prefix);