aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorThomas White <taw@physics.org>2022-06-02 12:49:10 +0200
committerThomas White <taw@physics.org>2022-06-02 12:49:10 +0200
commit96c82df69425d5dcb14e5213c789824d9d9e2ea1 (patch)
tree7dd019a250cb3d6b6e61590b38fa6ccd0baccdc7
parent6838dc35ffa07db47c48613aafb67441f1b620fa (diff)
parent0c64576cc6e802129c8eaefaed55101ab02cf15d (diff)
Merge branch 'asapo-consumer'
-rw-r--r--CMakeLists.txt17
-rw-r--r--config.h.cmake.in1
-rw-r--r--config.h.in1
-rw-r--r--doc/man/indexamajig.120
-rw-r--r--libcrystfel/CMakeLists.txt2
-rw-r--r--libcrystfel/libcrystfel-config.h.cmake.in1
-rw-r--r--libcrystfel/libcrystfel-config.h.meson.in1
-rw-r--r--libcrystfel/meson.build11
-rw-r--r--libcrystfel/src/image-seedee.c208
-rw-r--r--libcrystfel/src/image-seedee.h41
-rw-r--r--libcrystfel/src/image.c33
-rw-r--r--libcrystfel/src/image.h5
-rw-r--r--libcrystfel/src/profile.c10
-rw-r--r--libcrystfel/src/profile.h2
-rw-r--r--libcrystfel/src/stream.c5
-rw-r--r--meson.build12
-rw-r--r--src/im-asapo.c363
-rw-r--r--src/im-asapo.h91
-rw-r--r--src/im-sandbox.c146
-rw-r--r--src/im-sandbox.h3
-rw-r--r--src/indexamajig.c57
-rw-r--r--src/process_image.c37
-rw-r--r--src/process_image.h9
-rw-r--r--subprojects/cjson.wrap12
24 files changed, 1045 insertions, 43 deletions
diff --git a/CMakeLists.txt b/CMakeLists.txt
index 6002d708..7702e643 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -34,6 +34,13 @@ else ()
message(STATUS "ZMQ not found.")
endif ()
+pkg_search_module(ASAPO libasapo-consumer)
+if (ASAPO_FOUND)
+ message(STATUS "Found ASAP::O")
+else ()
+ message(STATUS "ASAP::O not found")
+endif ()
+
# Find out where forkpty() is declared
set(CMAKE_REQUIRED_LIBRARIES "-lutil")
check_symbol_exists(forkpty "pty.h" HAVE_FORKPTY_PTY_H)
@@ -108,6 +115,7 @@ set(HAVE_GDKPIXBUF ${GDKPIXBUF_FOUND})
set(HAVE_GDK ${GDK_FOUND})
set(HAVE_ZMQ ${ZMQ_FOUND})
set(HAVE_HDF5 1)
+set(HAVE_ASAPO ${ASAPO_FOUND})
set(PACKAGE_VERSION ${PROJECT_VERSION})
@@ -247,6 +255,10 @@ if ( ZMQ_FOUND )
list(APPEND INDEXAMAJIG_SOURCES src/im-zmq.c)
endif ()
+if ( ASAPO_FOUND )
+ list(APPEND INDEXAMAJIG_SOURCES src/im-asapo.c)
+endif ()
+
add_executable(indexamajig ${INDEXAMAJIG_SOURCES}
${CMAKE_CURRENT_BINARY_DIR}/version.c)
target_include_directories(indexamajig PRIVATE ${COMMON_INCLUDES})
@@ -258,6 +270,11 @@ if ( ZMQ_FOUND )
target_link_libraries(indexamajig ${ZMQ_LDFLAGS})
endif ()
+if ( ASAPO_FOUND )
+ target_include_directories(indexamajig PRIVATE ${ASAPO_INCLUDE_DIR})
+ target_link_libraries(indexamajig ${ASAPO_LDFLAGS})
+endif ()
+
# ----------------------------------------------------------------------
# get_hkl
diff --git a/config.h.cmake.in b/config.h.cmake.in
index 2c781b3c..c056cc4a 100644
--- a/config.h.cmake.in
+++ b/config.h.cmake.in
@@ -10,3 +10,4 @@
#cmakedefine HAVE_ZMQ
#cmakedefine HAVE_SLURM
#cmakedefine HAVE_HDF5
+#cmakedefine HAVE_ASAPO
diff --git a/config.h.in b/config.h.in
index 058e5cea..afa44161 100644
--- a/config.h.in
+++ b/config.h.in
@@ -10,3 +10,4 @@
#mesondefine HAVE_ZMQ
#mesondefine HAVE_SLURM
#mesondefine HAVE_HDF5
+#mesondefine HAVE_ASAPO
diff --git a/doc/man/indexamajig.1 b/doc/man/indexamajig.1
index a9d80175..25e025ac 100644
--- a/doc/man/indexamajig.1
+++ b/doc/man/indexamajig.1
@@ -186,9 +186,27 @@ Subscribe to ZeroMQ message type \fItag\fR. You can use this option multiple ti
Request new data over ZeroMQ by sending string \fImsg\fR. This will cause indexamajig's ZeroMQ socket to use REQ mode instead of SUB. This option and \fB--zmq-subscribe\fR are mutually exclusive.
.PD 0
+.IP \fB--asapo-endpoint=\fIendpoint\fR
+.PD
+Receive data via the specified ASAP::O endpoint. This option and \fB--zmq-input\fR are mutually exclusive.
+
+.PD 0
+.IP \fB--asapo-token=\fItoken\fR
+.IP \fB--asapo-beamtime=\fIbeamtime\fR
+.IP \fB--asapo-source=\fIsource\fR
+.IP \fB--asapo-group=\fIgroup\fR
+.PD
+Authentication token, beamtime, data source and consumer group, respectively, for ASAP::O data.
+
+.PD 0
+.IP \fB--asapo-stream=\fIstream\fR
+.PD
+Name of ASAP::O stream to process. If this option is not given, indexamajig will start processing from the end of the current last stream.
+
+.PD 0
.IP \fB--data-format=\fIformat\fR
.PD
-Specify the data format for data received over ZeroMQ. Possible values in this version are \fBmsgpack\fR and \fBhdf5\fR.
+Specify the data format for data received over ZeroMQ or ASAP::O. Possible values in this version are \fBmsgpack\fR, \fBhdf5\fR and \fBseedee\fR.
.PD 0
.IP \fB--basename\fR
diff --git a/libcrystfel/CMakeLists.txt b/libcrystfel/CMakeLists.txt
index 3ca50b1f..ea475637 100644
--- a/libcrystfel/CMakeLists.txt
+++ b/libcrystfel/CMakeLists.txt
@@ -19,6 +19,7 @@ set(HAVE_FDIP ${FDIP_FOUND})
set(HAVE_MSGPACK ${MSGPACK_FOUND})
set(HAVE_LIBCCP4 ${LIBCCP4_FOUND})
set(HAVE_ZLIB ${ZLIB_FOUND})
+set(HAVE_SEEDEE 0)
# Recent enough version of zlib?
set(CMAKE_REQUIRED_LIBRARIES "-lz")
@@ -62,6 +63,7 @@ set(LIBCRYSTFEL_SOURCES
src/image-hdf5.c
src/fom.c
src/image-msgpack.c
+ src/image-seedee.c
src/profile.c
${BISON_symopp_OUTPUTS}
${FLEX_symopl_OUTPUTS}
diff --git a/libcrystfel/libcrystfel-config.h.cmake.in b/libcrystfel/libcrystfel-config.h.cmake.in
index 830055cb..82d0611a 100644
--- a/libcrystfel/libcrystfel-config.h.cmake.in
+++ b/libcrystfel/libcrystfel-config.h.cmake.in
@@ -12,6 +12,7 @@
#cmakedefine HAVE_MSGPACK
#cmakedefine HAVE_CLOCK_GETTIME
#cmakedefine HAVE_HDF5
+#cmakedefine HAVE_SEEDEE
#cmakedefine HAVE_FORKPTY_PTY_H
#cmakedefine HAVE_FORKPTY_UTIL_H
diff --git a/libcrystfel/libcrystfel-config.h.meson.in b/libcrystfel/libcrystfel-config.h.meson.in
index 7d43147d..302fae0b 100644
--- a/libcrystfel/libcrystfel-config.h.meson.in
+++ b/libcrystfel/libcrystfel-config.h.meson.in
@@ -10,6 +10,7 @@
#mesondefine HAVE_MSGPACK
#mesondefine HAVE_CLOCK_GETTIME
#mesondefine HAVE_HDF5
+#mesondefine HAVE_SEEDEE
#mesondefine HAVE_FORKPTY_PTY_H
#mesondefine HAVE_FORKPTY_UTIL_H
diff --git a/libcrystfel/meson.build b/libcrystfel/meson.build
index b5005aa5..41cfbaa8 100644
--- a/libcrystfel/meson.build
+++ b/libcrystfel/meson.build
@@ -25,6 +25,14 @@ if fftwdep.found()
conf_data.set10('HAVE_FFTW', true)
endif
+seedeedep = dependency('seedee', required: false)
+cjsondep = dependency('cjson',
+ required: true,
+ fallback: ['cjson', 'libcjson_dep'])
+if cjsondep.found() and seedeedep.found()
+ conf_data.set10('HAVE_SEEDEE', 1)
+endif
+
xgandalfdep = dependency('xgandalf',
required: false,
fallback: ['xgandalf', 'xgandalf_dep'])
@@ -125,6 +133,7 @@ libcrystfel_sources = ['src/image.c',
'src/image-cbf.c',
'src/image-hdf5.c',
'src/image-msgpack.c',
+ 'src/image-seedee.c',
'src/indexers/dirax.c',
'src/indexers/felix.c',
'src/indexers/mosflm.c',
@@ -150,7 +159,7 @@ libcrystfel = library('crystfel', [libcrystfel_sources, libcrystfel_versionc],
dependencies: [mdep, utildep, fftwdep, gsldep, zlibdep,
hdf5dep, pthreaddep,
xgandalfdep, pinkindexerdep, fdipdep,
- ccp4dep, msgpackdep],
+ ccp4dep, msgpackdep, seedeedep, cjsondep],
install: true)
libcrystfeldep = declare_dependency(include_directories: libcrystfel_includes,
diff --git a/libcrystfel/src/image-seedee.c b/libcrystfel/src/image-seedee.c
new file mode 100644
index 00000000..537fc9e3
--- /dev/null
+++ b/libcrystfel/src/image-seedee.c
@@ -0,0 +1,208 @@
+/*
+ * image-seedee.c
+ *
+ * Image loading with Seedee
+ *
+ * Copyright © 2017-2022 Deutsches Elektronen-Synchrotron DESY,
+ * a research centre of the Helmholtz Association.
+ *
+ * Authors:
+ * 2018-2022 Thomas White <taw@physics.org>
+ * 2014 Valerio Mariani
+ * 2017 Stijn de Graaf
+ *
+ * This file is part of CrystFEL.
+ *
+ * CrystFEL is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * CrystFEL is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with CrystFEL. If not, see <http://www.gnu.org/licenses/>.
+ *
+ */
+
+#include <libcrystfel-config.h>
+
+#include <stdlib.h>
+#include <stdio.h>
+#include <stdint.h>
+#include <assert.h>
+#include <unistd.h>
+
+#include <image.h>
+#include <utils.h>
+#include <profile.h>
+
+#include "datatemplate_priv.h"
+
+
+#if defined(HAVE_SEEDEE)
+
+#include <seedee/seedee.h>
+#include <cjson/cJSON.h>
+
+
+static int load_seedee_data(struct panel_template *p,
+ struct SeedeeNDArray *array,
+ float **pdata)
+{
+ int data_size_fs, data_size_ss;
+ float *data = NULL;
+
+ data_size_ss = array->shape[0];
+ data_size_fs = array->shape[1];
+
+ if ( (p->orig_min_fs + PANEL_WIDTH(p) > data_size_fs)
+ || (p->orig_min_ss + PANEL_HEIGHT(p) > data_size_ss) )
+ {
+ ERROR("Data for panel %s (%i x %i + %i + %i) is outside data "
+ "array bounds (%i x %i)\n",
+ p->name,
+ PANEL_WIDTH(p), PANEL_HEIGHT(p),
+ p->orig_min_fs, p->orig_min_ss,
+ data_size_fs, data_size_ss);
+ return 1;
+ }
+
+ if ( (array->datatype == 'u')
+ && (array->itemsize == 2)
+ && (array->byteorder == '<') )
+ {
+ int fs, ss;
+ uint16_t *in_data = (uint16_t *)array->data;
+
+ data = malloc(PANEL_WIDTH(p) * PANEL_HEIGHT(p) * sizeof(float));
+ if ( data == NULL ) return 1;
+
+ for ( ss=0; ss<PANEL_HEIGHT(p); ss++ ) {
+ for ( fs=0; fs<PANEL_WIDTH(p); fs++ ) {
+ size_t idx = fs+p->orig_min_fs + (ss+p->orig_min_ss)*data_size_fs;
+ data[fs+ss*PANEL_WIDTH(p)] = in_data[idx];
+ }
+ }
+ *pdata = data;
+
+ } else {
+ ERROR("Unrecognised data type %c%i%c\n",
+ array->datatype, array->itemsize, array->byteorder);
+ return 1;
+ }
+
+ return 0;
+}
+
+
+/* Read the image data from 'data_block' into 'image', according to 'dtempl' */
+int image_seedee_read(struct image *image,
+ DataTemplate *dtempl,
+ void *data_block,
+ size_t data_block_size,
+ char *meta_data)
+{
+ struct SeedeeNDArray array;
+ int r;
+ bool zero_copy;
+ int i;
+ cJSON *json;
+ cJSON *data_format_str;
+
+ json = cJSON_Parse(meta_data);
+ if ( json == NULL ) {
+ ERROR("Failed to parse JSON\n");
+ return 1;
+ }
+
+ data_format_str = cJSON_GetObjectItemCaseSensitive(json, "_data_format");
+ if ( !cJSON_IsString(data_format_str) ) {
+ ERROR("_data_format isn't a string");
+ cJSON_Delete(json);
+ return 1;
+ }
+
+ profile_start("seedee-get-size");
+ array.size = seedee_get_data_size(data_format_str->valuestring,
+ data_block, data_block_size,
+ &zero_copy, &array);
+ profile_end("seedee-get-size");
+ array.data = malloc(array.size);
+ array.shape = malloc(array.ndims*sizeof(int));
+ if ( (array.data == NULL) || (array.shape == NULL) ) {
+ cJSON_Delete(json);
+ free(array.data);
+ free(array.shape);
+ return 1;
+ }
+
+ if ( array.ndims != 2 ) {
+ ERROR("Seedee data has unexpected number of dimensions "
+ "(%i, expected 2)\n", array.ndims);
+ free(array.data);
+ free(array.shape);
+ return 1;
+ }
+
+ profile_start("seedee-deserialize");
+ r = seedee_deserialize_ndarray(data_format_str->valuestring,
+ data_block, data_block_size,
+ 0, &array);
+ profile_end("seedee-deserialize");
+ cJSON_Delete(json);
+ if ( r < 0 ) {
+ ERROR("Seedee deserialiation failed.\n");
+ free(array.data);
+ free(array.shape);
+ return 1;
+ }
+
+ image->dp = malloc(dtempl->n_panels*sizeof(float *));
+ if ( image->dp == NULL ) {
+ ERROR("Failed to allocate data array.\n");
+ free(array.data);
+ free(array.shape);
+ return 1;
+ }
+
+ /* Set all pointers to NULL for easier clean-up */
+ for ( i=0; i<dtempl->n_panels; i++ ) image->dp[i] = NULL;
+
+ profile_start("seedee-panel");
+ for ( i=0; i<dtempl->n_panels; i++ ) {
+ if ( load_seedee_data(&dtempl->panels[i], &array, &image->dp[i]) )
+ {
+ ERROR("Failed to load data for panel '%s'\n",
+ dtempl->panels[i].name);
+ profile_end("seedee-panel");
+ free(array.data);
+ free(array.shape);
+ return 1;
+ }
+ }
+ profile_end("seedee-panel");
+
+ free(array.data);
+ free(array.shape);
+
+ return 0;
+}
+
+
+#else /* defined(HAVE_SEEDEE) */
+
+int image_seedee_read(struct image *image,
+ const DataTemplate *dtempl,
+ void *data,
+ size_t data_size,
+ char *meta_data)
+{
+ ERROR("Seedee is not supported in this installation (read).\n");
+ return 1;
+}
+
+#endif /* defined(HAVE_SEEDEE) */
diff --git a/libcrystfel/src/image-seedee.h b/libcrystfel/src/image-seedee.h
new file mode 100644
index 00000000..9432e74b
--- /dev/null
+++ b/libcrystfel/src/image-seedee.h
@@ -0,0 +1,41 @@
+/*
+ * image-seedee.h
+ *
+ * Image loading, SeeDee parts
+ *
+ * Copyright © 2012-2022 Deutsches Elektronen-Synchrotron DESY,
+ * a research centre of the Helmholtz Association.
+ *
+ * Authors:
+ * 2020-2022 Thomas White <taw@physics.org>
+ *
+ * This file is part of CrystFEL.
+ *
+ * CrystFEL is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * CrystFEL is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with CrystFEL. If not, see <http://www.gnu.org/licenses/>.
+ *
+ */
+
+#ifndef IMAGE_SEEDEE_H
+#define IMAGE_SEEDEE_H
+
+#include "datatemplate.h"
+
+
+extern int image_seedee_read(struct image *image,
+ const DataTemplate *dtempl,
+ void *data,
+ size_t data_size,
+ char *meta_data);
+
+#endif /* IMAGE_SEEDEE_H */
diff --git a/libcrystfel/src/image.c b/libcrystfel/src/image.c
index 2209ffee..5d67ae2d 100644
--- a/libcrystfel/src/image.c
+++ b/libcrystfel/src/image.c
@@ -42,6 +42,7 @@
#include "image-hdf5.h"
#include "image-cbf.h"
#include "image-msgpack.h"
+#include "image-seedee.h"
#include "profile.h"
#include "datatemplate.h"
@@ -803,6 +804,11 @@ static int image_read_image_data(struct image *image,
return image_msgpack_read(image, dtempl, image->data_block,
image->data_block_size);
+ case DATA_SOURCE_TYPE_SEEDEE:
+ return image_seedee_read(image, dtempl, image->data_block,
+ image->data_block_size,
+ image->meta_data);
+
default:
ERROR("Unrecognised file type %i (image_read_image_data)\n",
image->data_source_type);
@@ -881,8 +887,7 @@ static void mark_flagged_pixels_naninf(float *dp, int *bad,
{
long int i;
for ( i=0; i<n; i++ ) {
- float val = dp[i];
- if ( isnan(val) || isinf(val) ) bad[i] = 1;
+ if ( !isfinite(dp[i]) ) bad[i] = 1;
}
}
@@ -898,8 +903,11 @@ static void mark_flagged_pixels(struct panel_template *p,
p_h = p->orig_max_ss - p->orig_min_ss + 1;
n = p_w * p_h;
+ profile_start("nan-inf");
mark_flagged_pixels_naninf(dp, bad, n);
+ profile_end("nan-inf");
+ profile_start("flag-values");
for ( i=0; i<MAX_FLAG_VALUES; i++ ) {
float fv = p->flag_values[i];
@@ -923,6 +931,7 @@ static void mark_flagged_pixels(struct panel_template *p,
}
}
+ profile_end("flag-values");
}
@@ -1096,27 +1105,34 @@ static int create_badmap(struct image *image,
/* Panel marked as bad? */
if ( p->bad ) {
+ profile_start("whole-panel");
/* NB this sets every element to 0x1111,
* but that's OK - value is still 'true'. */
memset(image->bad[i], 1, p_w*p_h);
+ profile_end("whole-panel");
}
/* Add bad regions (skip if panel is bad anyway) */
if ( !p->bad ) {
+ profile_start("flagged-pixels");
mark_flagged_pixels(p, image->dp[i],
image->bad[i]);
+ profile_end("flagged-pixels");
}
/* Mask panel edges (skip if panel is bad anyway) */
if ( (p->mask_edge_pixels > 0) && !p->bad ) {
+ profile_start("panel-edges");
mask_panel_edges(image->bad[i], p_w, p_h,
p->mask_edge_pixels);
+ profile_end("panel-edges");
}
/* Load masks (skip if panel is bad anyway) */
if ( (!no_mask_data) && (!p->bad) ) {
int j;
+ profile_start("load-masks");
for ( j=0; j<MAX_MASKS; j++ ) {
@@ -1138,10 +1154,13 @@ static int create_badmap(struct image *image,
p->masks[j].bad_bits);
}
+ profile_end("load-masks");
}
}
+ profile_start("mark-regions");
mark_bad_regions(image, dtempl);
+ profile_end("mark-regions");
return 0;
}
@@ -1385,13 +1404,13 @@ struct image *image_read(const DataTemplate *dtempl,
struct image *image_read_data_block(const DataTemplate *dtempl,
void *data_block,
size_t data_block_size,
+ char *meta_data,
DataSourceType type,
int serial,
int no_image_data,
int no_mask_data)
{
struct image *image;
- char tmp[64];
if ( dtempl == NULL ) {
ERROR("NULL data template!\n");
@@ -1404,11 +1423,11 @@ struct image *image_read_data_block(const DataTemplate *dtempl,
return NULL;
}
- snprintf(tmp, 63, "datablock-%i", serial);
- image->filename = strdup(tmp);
- image->ev = strdup("//");
+ image->filename = NULL;
+ image->ev = NULL;
image->data_block = data_block;
image->data_block_size = data_block_size;
+ image->meta_data = meta_data;
image->data_source_type = type;
@@ -1433,6 +1452,7 @@ void image_free(struct image *image)
free(image->filename);
free(image->ev);
free(image->data_block);
+ free(image->meta_data);
if ( image->detgeom != NULL ) {
np = image->detgeom->n_panels;
@@ -1478,6 +1498,7 @@ struct image *image_new()
image->ev = NULL;
image->data_block = NULL;
image->data_block_size = 0;
+ image->meta_data = NULL;
image->data_source_type = DATA_SOURCE_TYPE_UNKNOWN;
image->n_cached_headers = 0;
diff --git a/libcrystfel/src/image.h b/libcrystfel/src/image.h
index 3746e115..654bf3b1 100644
--- a/libcrystfel/src/image.h
+++ b/libcrystfel/src/image.h
@@ -103,7 +103,8 @@ typedef enum
DATA_SOURCE_TYPE_HDF5,
DATA_SOURCE_TYPE_CBF,
DATA_SOURCE_TYPE_CBFGZ,
- DATA_SOURCE_TYPE_MSGPACK
+ DATA_SOURCE_TYPE_MSGPACK,
+ DATA_SOURCE_TYPE_SEEDEE
} DataSourceType;
@@ -148,6 +149,7 @@ struct image
* filenename/ev OR this should be filled in, but not both */
void *data_block;
size_t data_block_size;
+ char *meta_data;
/** A list of metadata read from the stream */
struct header_cache_entry *header_cache[HEADER_CACHE_SIZE];
@@ -225,6 +227,7 @@ extern struct image *image_create_for_simulation(const DataTemplate *dtempl);
extern struct image *image_read_data_block(const DataTemplate *dtempl,
void *data_block,
size_t data_block_size,
+ char *meta_data,
DataSourceType type,
int serial,
int no_image_data,
diff --git a/libcrystfel/src/profile.c b/libcrystfel/src/profile.c
index 79228b34..82f0217f 100644
--- a/libcrystfel/src/profile.c
+++ b/libcrystfel/src/profile.c
@@ -166,7 +166,7 @@ static void free_profile_block(struct _profile_block *b)
}
-void profile_print_and_reset()
+void profile_print_and_reset(int worker_id)
{
char *buf;
char *buf2;
@@ -187,10 +187,12 @@ void profile_print_and_reset()
stop_profile_block(pd->root);
buf = format_profile_block(pd->root);
- buf2 = malloc(2+strlen(buf));
- strcpy(buf2, buf);
- strcat(buf2, "\n");
+ buf2 = malloc(8+strlen(buf));
+ size_t len = 8+strlen(buf);
+ snprintf(buf2, len, "%i %s\n", worker_id, buf);
write(STDOUT_FILENO, buf2, strlen(buf2));
+ free(buf);
+ free(buf2);
free_profile_block(pd->root);
pd->root = start_profile_block("root");
diff --git a/libcrystfel/src/profile.h b/libcrystfel/src/profile.h
index 183528ce..61ef20c3 100644
--- a/libcrystfel/src/profile.h
+++ b/libcrystfel/src/profile.h
@@ -35,7 +35,7 @@
*/
extern void profile_init();
-extern void profile_print_and_reset();
+extern void profile_print_and_reset(int worker_id);
extern void profile_start(const char *name);
extern void profile_end(const char *name);
diff --git a/libcrystfel/src/stream.c b/libcrystfel/src/stream.c
index 093e34e4..67297e49 100644
--- a/libcrystfel/src/stream.c
+++ b/libcrystfel/src/stream.c
@@ -1167,6 +1167,11 @@ Stream *stream_open_for_write(const char *filename,
st->dtempl_write = dtempl;
st->dtempl_read = NULL;
+ if ( file_exists(filename) ) {
+ ERROR("Refusing to overwrite stream '%s'!\n", filename);
+ return NULL;
+ }
+
st->fh = fopen(filename, "w");
if ( st->fh == NULL ) {
ERROR("Failed to open stream.\n");
diff --git a/meson.build b/meson.build
index be0f6477..4f97d8ce 100644
--- a/meson.build
+++ b/meson.build
@@ -69,6 +69,11 @@ if zmqdep.found()
conf_data.set10('HAVE_ZMQ', true)
endif
+asapodep = dependency('libasapo-consumer', required: false)
+if asapodep.found()
+ conf_data.set10('HAVE_ASAPO', true)
+endif
+
if cc.has_function('clock_gettime', prefix: '#include <time.h>')
conf_data.set10('HAVE_CLOCK_GETTIME', true)
endif
@@ -167,8 +172,13 @@ if zmqdep.found()
indexamajig_sources += ['src/im-zmq.c']
endif
+if asapodep.found()
+ indexamajig_sources += ['src/im-asapo.c']
+endif
+
indexamajig = executable('indexamajig', indexamajig_sources,
- dependencies: [mdep, libcrystfeldep, gsldep, pthreaddep, zmqdep],
+ dependencies: [mdep, libcrystfeldep, gsldep,
+ pthreaddep, zmqdep, asapodep],
install: true,
install_rpath: '$ORIGIN/../lib64/:$ORIGIN/../lib')
diff --git a/src/im-asapo.c b/src/im-asapo.c
new file mode 100644
index 00000000..95445fd0
--- /dev/null
+++ b/src/im-asapo.c
@@ -0,0 +1,363 @@
+/*
+ * im-asapo.c
+ *
+ * ASAP::O data interface
+ *
+ * Copyright © 2021 Deutsches Elektronen-Synchrotron DESY,
+ * a research centre of the Helmholtz Association.
+ *
+ * Authors:
+ * 2021 Thomas White <taw@physics.org>
+ *
+ * This file is part of CrystFEL.
+ *
+ * CrystFEL is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * CrystFEL is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with CrystFEL. If not, see <http://www.gnu.org/licenses/>.
+ *
+ */
+
+#ifdef HAVE_CONFIG_H
+#include <config.h>
+#endif
+
+#include <stdlib.h>
+#include <stdio.h>
+#include <stdint.h>
+#include <assert.h>
+#include <unistd.h>
+#include <asapo/consumer_c.h>
+
+#include <image.h>
+#include <utils.h>
+#include <profile.h>
+
+#include "im-asapo.h"
+
+#include "datatemplate_priv.h"
+
+
+struct im_asapo
+{
+ char *stream;
+ int online_mode;
+ AsapoConsumerHandle consumer;
+ AsapoStringHandle group_id;
+};
+
+
+static void show_asapo_error(const char *msg, const AsapoErrorHandle err)
+{
+ char buf[1024];
+ asapo_error_explain(err, buf, sizeof(buf));
+ ERROR("%s: %s\n", msg, buf);
+}
+
+
+char *im_asapo_make_unique_group_id(const char *endpoint,
+ const char *token)
+{
+ AsapoConsumerHandle consumer;
+ AsapoSourceCredentialsHandle cred;
+ AsapoStringHandle group_id;
+ AsapoErrorHandle err = asapo_new_handle();
+
+ cred = asapo_create_source_credentials(kProcessed,
+ "", /* instance ID */
+ "", /* pipeline step */
+ "", /* beamtime */
+ "", /* beamline */
+ "", /* data source */
+ token);
+ consumer = asapo_create_consumer(endpoint, "", 0, cred, &err);
+ asapo_free_handle(&cred);
+ if ( asapo_is_error(err) ) {
+ show_asapo_error("Cannot create temporary ASAP::O consumer", err);
+ asapo_free_handle(&consumer);
+ return NULL;
+ }
+
+ group_id = asapo_consumer_generate_new_group_id(consumer, &err);
+ asapo_free_handle(&consumer);
+ if ( asapo_is_error(err) ) {
+ show_asapo_error("Cannot create ASAP::O group ID", err);
+ return NULL;
+ }
+
+ return strdup(asapo_string_c_str(group_id));
+}
+
+
+struct im_asapo *im_asapo_connect(const char *endpoint,
+ const char *token,
+ const char *beamtime,
+ const char *group_id,
+ const char *data_source,
+ const char *stream)
+{
+ struct im_asapo *a;
+ AsapoSourceCredentialsHandle cred;
+ AsapoErrorHandle err = asapo_new_handle();
+
+ a = malloc(sizeof(struct im_asapo));
+ if ( a == NULL ) return NULL;
+
+ cred = asapo_create_source_credentials(kProcessed,
+ "auto", /* instance ID */
+ "indexamajig", /* pipeline step */
+ beamtime,
+ "", /* beamline */
+ data_source,
+ token);
+ a->consumer = asapo_create_consumer(endpoint, "auto", 0, cred, &err);
+ asapo_free_handle(&cred);
+ if ( asapo_is_error(err) ) {
+ show_asapo_error("Cannot create ASAP::O consumer", err);
+ free(a);
+ return NULL;
+ }
+
+ asapo_consumer_set_timeout(a->consumer, 3000);
+
+ a->group_id = asapo_string_from_c_str(group_id);
+ if ( stream != NULL ) {
+
+ /* Named stream mode */
+ AsapoErrorHandle err = asapo_new_handle();
+
+ a->stream = strdup(stream);
+
+ asapo_consumer_set_last_read_marker(a->consumer,
+ a->group_id, 0,
+ a->stream, &err);
+ if ( asapo_is_error(err) ) {
+ show_asapo_error("Failed to skip to start of stream", err);
+ } else {
+ STATUS("Skipped to start of stream %s\n", a->stream);
+ }
+
+ asapo_free_handle(&err);
+ a->online_mode = 0;
+
+ } else {
+ /* Online mode */
+ a->stream = NULL;
+ a->online_mode = 1;
+ }
+
+ return a;
+}
+
+
+static int select_last_stream(struct im_asapo *a)
+{
+ AsapoStreamInfosHandle si;
+ size_t n;
+ int i;
+ AsapoStreamInfoHandle st;
+ AsapoErrorHandle err = asapo_new_handle();
+
+ si = asapo_consumer_get_stream_list(a->consumer, "",
+ kAllStreams, &err);
+
+ if ( asapo_is_error(err) ) {
+ show_asapo_error("Couldn't get ASAP::O stream list", err);
+ asapo_free_handle(&err);
+ return 1;
+ }
+
+ n = asapo_stream_infos_get_size(si);
+ if ( n == 0 ) {
+ STATUS("No streams.\n");
+ return 1;
+ }
+
+ STATUS("Streams available at start:\n");
+ for ( i=0; i<n; i++ ) {
+ AsapoStreamInfoHandle st = asapo_stream_infos_get_item(si, i);
+ STATUS("Stream %i: %s\n", i, asapo_stream_info_get_name(st));
+ asapo_free_handle(&st);
+ }
+ STATUS("End of stream list\n");
+
+ st = asapo_stream_infos_get_item(si, n-1);
+ a->stream = strdup(asapo_stream_info_get_name(st));
+ asapo_free_handle(&st);
+ STATUS("Starting with the last stream: %s\n", a->stream);
+
+ asapo_free_handle(&si);
+ asapo_free_handle(&err);
+ return 0;
+}
+
+
+static int select_next_stream(struct im_asapo *a)
+{
+ AsapoStreamInfosHandle si;
+ AsapoStreamInfoHandle st;
+ AsapoErrorHandle err = asapo_new_handle();
+ const char *next_stream;
+
+ si = asapo_consumer_get_stream_list(a->consumer, a->stream,
+ kAllStreams, &err);
+
+ if ( asapo_is_error(err) ) {
+ show_asapo_error("Couldn't get ASAP::O stream list", err);
+ asapo_free_handle(&si);
+ asapo_free_handle(&err);
+ return 1;
+ }
+
+ asapo_free_handle(&err);
+
+ /* Stream list includes the current stream, so we need at least
+ * two entries */
+ if ( asapo_stream_infos_get_size(si) < 2 ) {
+ //STATUS("No newer stream. Waiting for new data...\n");
+ asapo_free_handle(&si);
+ return 0;
+ }
+
+ /* Stream list includes the current stream, so look at the second one */
+ st = asapo_stream_infos_get_item(si, 1);
+ next_stream = asapo_stream_info_get_name(st);
+ free(a->stream);
+ a->stream = strdup(next_stream);
+ STATUS("Selecting next stream: %s\n", a->stream);
+ asapo_free_handle(&st);
+ asapo_free_handle(&si);
+
+ return 0;
+}
+
+
+static void skip_to_stream_end(struct im_asapo *a)
+{
+ int64_t size;
+ AsapoErrorHandle err = asapo_new_handle();
+
+ size = asapo_consumer_get_current_size(a->consumer, a->stream, &err);
+ if ( asapo_is_error(err) ) {
+ show_asapo_error("Failed to get length of stream", err);
+ } else {
+
+ AsapoErrorHandle err = asapo_new_handle();
+
+ asapo_consumer_set_last_read_marker(a->consumer,
+ a->group_id, size,
+ a->stream, &err);
+ if ( asapo_is_error(err) ) {
+ show_asapo_error("Failed to skip to end of stream", err);
+ } else {
+ STATUS("Skipped to end of stream (%lli)\n", size);
+ }
+
+ asapo_free_handle(&err);
+ }
+
+ asapo_free_handle(&err);
+}
+
+
+void *im_asapo_fetch(struct im_asapo *a, size_t *pdata_size,
+ char **pmeta, char **pfilename, char **pevent,
+ int *pfinished)
+{
+ void *data_copy;
+ AsapoMessageMetaHandle meta;
+ AsapoMessageDataHandle data;
+ AsapoErrorHandle err;
+ uint64_t msg_size;
+
+ *pfinished = 0;
+
+ profile_start("select-stream");
+ if ( a->stream == NULL ) {
+ if ( select_last_stream(a) ) {
+ profile_end("select-stream");
+ return NULL;
+ }
+ skip_to_stream_end(a);
+ }
+ profile_end("select-stream");
+
+ profile_start("create-handles");
+ err = asapo_new_handle();
+ meta = asapo_new_handle();
+ data = asapo_new_handle();
+ profile_end("create-handles");
+
+ profile_start("asapo-get-next");
+ asapo_consumer_get_next(a->consumer, a->group_id, &meta, &data,
+ a->stream, &err);
+ profile_end("asapo-get-next");
+ if ( asapo_error_get_type(err) == kEndOfStream ) {
+ asapo_free_handle(&err);
+ asapo_free_handle(&meta);
+ asapo_free_handle(&data);
+ if ( a->online_mode ) {
+ profile_start("next-stream");
+ select_next_stream(a);
+ profile_end("next-stream");
+ /* Sandbox will call to try again very soon */
+ } else {
+ *pfinished = 1;
+ }
+ return NULL;
+ }
+
+ if ( asapo_is_error(err) ) {
+ show_asapo_error("Couldn't get next ASAP::O record", err);
+ asapo_free_handle(&err);
+ asapo_free_handle(&meta);
+ asapo_free_handle(&data);
+ return NULL;
+ }
+
+ profile_start("get-size");
+ msg_size = asapo_message_meta_get_size(meta);
+ profile_end("get-size");
+
+ profile_start("malloc-copy");
+ data_copy = malloc(msg_size);
+ if ( data_copy == NULL ) {
+ ERROR("Failed to copy data block.\n");
+ asapo_free_handle(&err);
+ asapo_free_handle(&meta);
+ asapo_free_handle(&data);
+ return NULL;
+ }
+ memcpy(data_copy, asapo_message_data_get_as_chars(data), msg_size);
+ profile_end("malloc-copy");
+
+ profile_start("copy-meta");
+ *pmeta = strdup(asapo_message_meta_get_metadata(meta));
+ *pfilename = strdup(asapo_message_meta_get_name(meta));
+ *pevent = strdup("//");
+ profile_end("copy-meta");
+
+ asapo_free_handle(&err);
+ asapo_free_handle(&meta);
+ asapo_free_handle(&data);
+
+ *pdata_size = msg_size;
+ return data_copy;
+}
+
+
+void im_asapo_shutdown(struct im_asapo *a)
+{
+ if ( a == NULL ) return;
+ asapo_free_handle(&a->consumer);
+ asapo_free_handle(&a->group_id);
+ free(a);
+}
diff --git a/src/im-asapo.h b/src/im-asapo.h
new file mode 100644
index 00000000..ab68e1c2
--- /dev/null
+++ b/src/im-asapo.h
@@ -0,0 +1,91 @@
+/*
+ * im-asapo.h
+ *
+ * ASAP::O data interface
+ *
+ * Copyright © 2021-2022 Deutsches Elektronen-Synchrotron DESY,
+ * a research centre of the Helmholtz Association.
+ *
+ * Authors:
+ * 2021-2022 Thomas White <taw@physics.org>
+ *
+ * This file is part of CrystFEL.
+ *
+ * CrystFEL is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * CrystFEL is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with CrystFEL. If not, see <http://www.gnu.org/licenses/>.
+ *
+ */
+
+
+#ifndef CRYSTFEL_ASAPO_H
+#define CRYSTFEL_ASAPO_H
+
+#ifdef HAVE_CONFIG_H
+#include <config.h>
+#endif
+
+#if defined(HAVE_ASAPO)
+
+extern struct im_asapo *im_asapo_connect(const char *endpoint,
+ const char *token,
+ const char *beamtime,
+ const char *group_id,
+ const char *data_source,
+ const char *stream);
+
+extern void im_asapo_shutdown(struct im_asapo *a);
+
+extern void *im_asapo_fetch(struct im_asapo *a, size_t *pdata_size,
+ char **pmeta, char **pfilename, char **pevent,
+ int *pfinished);
+
+extern char *im_asapo_make_unique_group_id(const char *endpoint,
+ const char *token);
+
+#else /* defined(HAVE_ASAPO) */
+
+static UNUSED struct im_asapo *im_asapo_connect(const char *endpoint,
+ const char *token,
+ const char *beamtime,
+ const char *group_id,
+ const char *data_source,
+ const char *stream)
+{
+ return NULL;
+}
+
+static UNUSED void im_asapo_shutdown(struct im_asapo *a)
+{
+}
+
+static UNUSED void *im_asapo_fetch(struct im_asapo *a, size_t *psize,
+ char **pmeta, char **pfilename, char **pevent,
+ int *pfinished)
+{
+ *psize = 0;
+ *pmeta = NULL;
+ *pfilename = NULL;
+ *pevent = NULL;
+ *pfinished = 1;
+ return NULL;
+}
+
+static char *im_asapo_make_unique_group_id(const char *endpoint,
+ const char *token)
+{
+ return NULL;
+}
+
+#endif /* defined(HAVE_ASAPO) */
+
+#endif /* CRYSTFEL_ASAPO_H */
diff --git a/src/im-sandbox.c b/src/im-sandbox.c
index bd3f47e7..83d2f043 100644
--- a/src/im-sandbox.c
+++ b/src/im-sandbox.c
@@ -62,6 +62,7 @@
#include "process_image.h"
#include "im-zmq.h"
#include "profile.h"
+#include "im-asapo.h"
struct sandbox
@@ -104,6 +105,15 @@ struct sandbox
int n_zmq_subscriptions;
const char *zmq_request;
+ /* ASAP::O mode */
+ int asapo;
+ const char *asapo_endpoint;
+ const char *asapo_token;
+ const char *asapo_beamtime;
+ const char *asapo_group_id;
+ const char *asapo_source;
+ const char *asapo_stream;
+
/* Final output */
Stream *stream;
};
@@ -330,6 +340,7 @@ static int run_work(const struct index_args *iargs, Stream *st,
{
int allDone = 0;
struct im_zmq *zmqstuff = NULL;
+ struct im_asapo *asapostuff = NULL;
if ( sb->profile ) {
profile_init();
@@ -347,6 +358,19 @@ static int run_work(const struct index_args *iargs, Stream *st,
}
}
+ if ( sb->asapo ) {
+ asapostuff = im_asapo_connect(sb->asapo_endpoint,
+ sb->asapo_token,
+ sb->asapo_beamtime,
+ sb->asapo_group_id,
+ sb->asapo_source,
+ sb->asapo_stream);
+ if ( asapostuff == NULL ) {
+ ERROR("ASAP::O setup failed.\n");
+ return 1;
+ }
+ }
+
while ( !allDone ) {
struct pattern_args pargs;
@@ -432,42 +456,84 @@ static int run_work(const struct index_args *iargs, Stream *st,
pargs.event = safe_strdup(event_str);
free(line);
+ ok = 0;
- if ( !sb->zmq ) {
-
- pargs.zmq_data = NULL;
- pargs.zmq_data_size = 0;
+ /* Default values */
+ pargs.zmq_data = NULL;
+ pargs.zmq_data_size = 0;
+ pargs.asapo_data = NULL;
+ pargs.asapo_data_size = 0;
+ pargs.asapo_meta = NULL;
- } else {
+ if ( sb->zmq ) {
do {
pargs.zmq_data = im_zmq_fetch(zmqstuff,
&pargs.zmq_data_size);
} while ( pargs.zmq_data_size < 15 );
+ ok = 1;
/* The filename/event, which will be 'fake' values in
* this case, still came via the event queue. More
* importantly, the event queue gave us a unique
* serial number for this image. */
+ } else if ( sb->asapo ) {
+
+ char *filename;
+ char *event;
+ int finished = 0;
+
+ profile_start("asapo-fetch");
+ set_last_task(sb->shared->last_task[cookie], "ASAPO fetch");
+ pargs.asapo_data = im_asapo_fetch(asapostuff,
+ &pargs.asapo_data_size,
+ &pargs.asapo_meta,
+ &filename,
+ &event,
+ &finished);
+ profile_end("asapo-fetch");
+ if ( pargs.asapo_data != NULL ) {
+ ok = 1;
+
+ /* ASAP::O provides a meaningful filename, which
+ * replaces the placeholder. */
+ free(pargs.filename);
+ free(pargs.event);
+ pargs.filename = filename;
+ pargs.event = event;
+ } else {
+ if ( finished ) {
+ sb->shared->should_shutdown = 1;
+ allDone = 1;
+ }
+ }
+
+ } else {
+ ok = 1;
}
- sb->shared->time_last_start[cookie] = get_monotonic_seconds();
- profile_start("process-image");
- process_image(iargs, &pargs, st, cookie, tmpdir, ser,
- sb->shared, sb->shared->last_task[cookie]);
- profile_end("process-image");
+ if ( ok ) {
+ sb->shared->time_last_start[cookie] = get_monotonic_seconds();
+ profile_start("process-image");
+ process_image(iargs, &pargs, st, cookie, tmpdir, ser,
+ sb->shared, sb->shared->last_task[cookie]);
+ profile_end("process-image");
+ }
- /* pargs.zmq_data will be copied into the image structure, so
- * that it can be queried for "header" values etc. It will
- * eventually be freed by image_free() under process_image() */
+ /* NB pargs.zmq_data, pargs.asapo_data and pargs.asapo_meta
+ * will be copied into the image structure, so
+ * that it can be queried for "header" values etc. They will
+ * eventually be freed by image_free() under process_image(). */
if ( sb->profile ) {
- profile_print_and_reset();
+ profile_print_and_reset(cookie);
}
}
+ /* These are both no-ops if argument is NULL */
im_zmq_shutdown(zmqstuff);
+ im_asapo_shutdown(asapostuff);
cleanup_indexing(iargs->ipriv);
cell_free(iargs->cell);
@@ -843,12 +909,19 @@ static int fill_queue(struct get_pattern_ctx *gpctx, struct sandbox *sb)
char *evstr;
if ( sb->zmq ) {
- /* These values will be passed down to process_image,
- * but ignored. The 'real' filename, which is still a
- * 'fake' filename - only for accounting purposes - will
- * be generated by image_read_data_block(). */
+ /* These are just semi-meaningful placeholder values to
+ * be put into the queue, instead of "(null)".
+ * A unique filename is needed so that the GUI can
+ * tell the frames apart from one another.
+ * ASAP::O, for one, will replace this with a filename
+ * that corresponds to something real. */
filename = "ZMQdata";
- evstr = strdup("//");
+ evstr = malloc(64);
+ snprintf(evstr, 64, "//%i", sb->serial);
+ } else if ( sb->asapo ) {
+ filename = "ASAPOdata";
+ evstr = malloc(64);
+ snprintf(evstr, 64, "//%i", sb->serial);
} else {
if ( !get_pattern(gpctx, &filename, &evstr) ) return 1;
}
@@ -1056,6 +1129,9 @@ int create_sandbox(struct index_args *iargs, int n_proc, char *prefix,
Stream *stream, const char *tmpdir, int serial_start,
const char *zmq_address, char **zmq_subscriptions,
int n_zmq_subscriptions, const char *zmq_request,
+ const char *asapo_endpoint, const char *asapo_token,
+ const char *asapo_beamtime, const char *asapo_group_id,
+ const char *asapo_source, const char *asapo_stream,
int timeout, int profile)
{
int i;
@@ -1096,6 +1172,38 @@ int create_sandbox(struct index_args *iargs, int n_proc, char *prefix,
sb->zmq = 0;
}
+ if ( asapo_endpoint != NULL ) {
+ sb->asapo = 1;
+ sb->asapo_endpoint = asapo_endpoint;
+ sb->asapo_token = asapo_token;
+ sb->asapo_beamtime = asapo_beamtime;
+ sb->asapo_source = asapo_source;
+ sb->asapo_stream = asapo_stream;
+ } else {
+ sb->asapo = 0;
+ }
+
+ if ( sb->zmq && sb->asapo ) {
+ ERROR("Cannot simultaneously use ZMQ and ASAP::O input.\n");
+ free(sb);
+ return 0;
+ }
+
+ if ( sb->asapo ) {
+ if ( asapo_group_id != NULL ) {
+ sb->asapo_group_id = strdup(asapo_group_id);
+ } else {
+ sb->asapo_group_id = im_asapo_make_unique_group_id(asapo_endpoint,
+ asapo_token);
+ }
+ if ( sb->asapo_group_id == NULL ) {
+ ERROR("Failed to create ASAP::O group ID.\n");
+ return 0;
+ } else {
+ STATUS("The unique ID is %s\n", sb->asapo_group_id);
+ }
+ }
+
sb->fds = NULL;
sb->fhs = NULL;
sb->stream = stream;
diff --git a/src/im-sandbox.h b/src/im-sandbox.h
index a76e69ec..e1d2e1b9 100644
--- a/src/im-sandbox.h
+++ b/src/im-sandbox.h
@@ -87,6 +87,9 @@ extern int create_sandbox(struct index_args *iargs, int n_proc, char *prefix,
const char *tempdir, int serial_start,
const char *zmq_address, char **zmq_subscriptions,
int n_zmq_subscriptions, const char *zmq_request,
+ const char *asapo_endpoint, const char *asapo_token,
+ const char *asapo_beamtime, const char *asapo_group_id,
+ const char *asapo_source, const char *asapo_stream,
int timeout, int profile);
#endif /* IM_SANDBOX_H */
diff --git a/src/indexamajig.c b/src/indexamajig.c
index fc22a562..6787f903 100644
--- a/src/indexamajig.c
+++ b/src/indexamajig.c
@@ -84,6 +84,12 @@ struct indexamajig_arguments
char *zmq_request;
char *zmq_subscriptions[256];
int n_zmq_subscriptions;
+ char *asapo_endpoint;
+ char *asapo_token;
+ char *asapo_beamtime;
+ char *asapo_group_id;
+ char *asapo_source;
+ char *asapo_stream;
int serial_start;
char *temp_location;
int if_refine;
@@ -292,6 +298,7 @@ static DataSourceType parse_data_format(const char *str)
{
if ( strcmp(str, "hdf5") == 0 ) return DATA_SOURCE_TYPE_HDF5;
if ( strcmp(str, "msgpack") == 0 ) return DATA_SOURCE_TYPE_MSGPACK;
+ if ( strcmp(str, "seedee") == 0 ) return DATA_SOURCE_TYPE_SEEDEE;
/* CBF and CBFGZ should be added here once image-cbf.c supports
* in-memory access */
return DATA_SOURCE_TYPE_UNKNOWN;
@@ -402,6 +409,26 @@ static error_t parse_arg(int key, char *arg, struct argp_state *state)
args->zmq_request = strdup(arg);
break;
+ case 213 :
+ args->asapo_endpoint = strdup(arg);
+ break;
+
+ case 214 :
+ args->asapo_token = strdup(arg);
+ break;
+
+ case 215 :
+ args->asapo_beamtime = strdup(arg);
+ break;
+
+ case 217 :
+ args->asapo_group_id = strdup(arg);
+ break;
+
+ case 218 :
+ args->asapo_source = strdup(arg);
+ break;
+
case 219 :
args->iargs.data_format = parse_data_format(arg);
if ( args->iargs.data_format == DATA_SOURCE_TYPE_UNKNOWN ) {
@@ -410,6 +437,10 @@ static error_t parse_arg(int key, char *arg, struct argp_state *state)
}
break;
+ case 220 :
+ args->asapo_stream = strdup(arg);
+ break;
+
/* ---------- Peak search ---------- */
case 't' :
@@ -826,6 +857,12 @@ int main(int argc, char *argv[])
args.basename = 0;
args.zmq_addr = NULL;
args.zmq_request = NULL;
+ args.asapo_endpoint = NULL;
+ args.asapo_token = NULL;
+ args.asapo_beamtime = NULL;
+ args.asapo_group_id = NULL;
+ args.asapo_source = NULL;
+ args.asapo_stream = NULL;
args.n_zmq_subscriptions = 0;
args.serial_start = 1;
args.if_peaks = 1;
@@ -934,7 +971,13 @@ int main(int argc, char *argv[])
"type"},
{"zmq-request", 212, "str", OPTION_NO_USAGE, "Request messages using"
"this string."},
+ {"asapo-endpoint", 213, "str", OPTION_NO_USAGE, "ASAP::O endpoint"},
+ {"asapo-token", 214, "str", OPTION_NO_USAGE, "ASAP::O token"},
+ {"asapo-beamtime", 215, "str", OPTION_NO_USAGE, "ASAP::O beamtime ID"},
+ {"asapo-group", 217, "str", OPTION_NO_USAGE, "ASAP::O group ID"},
+ {"asapo-source", 218, "str", OPTION_NO_USAGE, "ASAP::O data source"},
{"data-format", 219, "str", OPTION_NO_USAGE, "Streamed data format"},
+ {"asapo-stream", 220, "str", OPTION_NO_USAGE, "ASAP::O stream name"},
{NULL, 0, 0, OPTION_DOC, "Peak search options:", 3},
{"peaks", 301, "method", 0, "Peak search method. Default: zaef"},
@@ -1051,7 +1094,9 @@ int main(int argc, char *argv[])
if ( argp_parse(&argp, argc, argv, 0, NULL, &args) ) return 1;
/* Check for minimal information */
- if ( (args.filename == NULL) && (args.zmq_addr == NULL) ) {
+ if ( (args.filename == NULL)
+ && (args.zmq_addr == NULL)
+ && (args.asapo_endpoint == NULL) ) {
ERROR("You need to provide the input filename (use -i)\n");
return 1;
}
@@ -1290,9 +1335,13 @@ int main(int argc, char *argv[])
gsl_set_error_handler_off();
r = create_sandbox(&args.iargs, args.n_proc, args.prefix, args.basename,
- fh, st, tmpdir, args.serial_start, args.zmq_addr,
- args.zmq_subscriptions, args.n_zmq_subscriptions,
- args.zmq_request, timeout, args.profile);
+ fh, st, tmpdir, args.serial_start,
+ args.zmq_addr, args.zmq_subscriptions,
+ args.n_zmq_subscriptions, args.zmq_request,
+ args.asapo_endpoint, args.asapo_token,
+ args.asapo_beamtime, args.asapo_group_id,
+ args.asapo_source, args.asapo_stream,
+ timeout, args.profile);
cell_free(args.iargs.cell);
free(args.prefix);
diff --git a/src/process_image.c b/src/process_image.c
index 9151528d..de2d8792 100644
--- a/src/process_image.c
+++ b/src/process_image.c
@@ -188,17 +188,48 @@ void process_image(const struct index_args *iargs, struct pattern_args *pargs,
int any_crystals;
if ( pargs->zmq_data != NULL ) {
- set_last_task(last_task, "unpacking messagepack object");
- profile_start("read-data-block");
+
+ set_last_task(last_task, "unpacking ZMQ data");
+ profile_start("read-zmq-data");
image = image_read_data_block(iargs->dtempl,
pargs->zmq_data,
pargs->zmq_data_size,
+ NULL,
+ iargs->data_format,
+ serial,
+ iargs->no_image_data,
+ iargs->no_mask_data);
+ profile_end("read-zmq-data");
+ if ( image == NULL ) return;
+
+ /* image_read_data_block() will leave the filename/event as
+ * NULL, because there's no file (duh). Fill them in now with
+ * the values passed down to us. For ZMQ, these values are just
+ * placeholders. */
+ image->filename = strdup(pargs->filename);
+ image->ev = strdup(pargs->event);
+
+ } else if ( pargs->asapo_data != NULL ) {
+
+ set_last_task(last_task, "unpacking ASAP::O data");
+ profile_start("read-asapo-data");
+ image = image_read_data_block(iargs->dtempl,
+ pargs->asapo_data,
+ pargs->asapo_data_size,
+ pargs->asapo_meta,
iargs->data_format,
serial,
iargs->no_image_data,
iargs->no_mask_data);
- profile_end("read-data-block");
+ profile_end("read-asapo-data");
if ( image == NULL ) return;
+
+ /* image_read_data_block() will leave the filename/event as
+ * NULL, because there's no file (duh). Fill them in now with
+ * the values passed down to us from ASAP::O. */
+ image->filename = strdup(pargs->filename);
+ image->ev = strdup(pargs->event);
+
} else {
profile_start("file-wait-open-read");
image = file_wait_open_read(pargs->filename, pargs->event,
diff --git a/src/process_image.h b/src/process_image.h
index cbf2713b..e2f792a5 100644
--- a/src/process_image.h
+++ b/src/process_image.h
@@ -3,11 +3,11 @@
*
* The processing pipeline for one image
*
- * Copyright © 2012-2021 Deutsches Elektronen-Synchrotron DESY,
+ * Copyright © 2012-2022 Deutsches Elektronen-Synchrotron DESY,
* a research centre of the Helmholtz Association.
*
* Authors:
- * 2010-2021 Thomas White <taw@physics.org>
+ * 2010-2022 Thomas White <taw@physics.org>
* 2014-2017 Valerio Mariani <valerio.mariani@desy.de>
* 2017-2018 Yaroslav Gevorkov <yaroslav.gevorkov@desy.de>
*
@@ -119,8 +119,13 @@ struct pattern_args
/* "Input" */
char *filename;
char *event;
+
void *zmq_data;
size_t zmq_data_size;
+
+ char *asapo_data;
+ size_t asapo_data_size;
+ char *asapo_meta;
};
diff --git a/subprojects/cjson.wrap b/subprojects/cjson.wrap
new file mode 100644
index 00000000..dc10279a
--- /dev/null
+++ b/subprojects/cjson.wrap
@@ -0,0 +1,12 @@
+[wrap-file]
+directory = cJSON-1.7.15
+source_url = https://github.com/DaveGamble/cJSON/archive/refs/tags/v1.7.15.tar.gz
+source_filename = v1.7.15.tar.gz
+source_hash = 5308fd4bd90cef7aa060558514de6a1a4a0819974a26e6ed13973c5f624c24b2
+patch_filename = cjson_1.7.15-2_patch.zip
+patch_url = https://wrapdb.mesonbuild.com/v2/cjson_1.7.15-2/get_patch
+patch_hash = d83b4bc0ca94e392c62c8c6c7839392f382d66a84974f5e10611074836ef1777
+
+[provide]
+libcjson = libcjson_dep
+