aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorThomas White <taw@physics.org>2023-06-06 13:41:52 +0200
committerThomas White <taw@physics.org>2023-06-06 13:41:52 +0200
commit117ed0d0339163e04290349f9fe468b4bb6982e7 (patch)
treeb96e7dfb22cb27d8272a21f155817f7eec257e3e
parentde407aa9fa93192aad6773a53fe51295d9c8cb8e (diff)
parentb5550e4399415fe4863057d6cbd406faf3f2df31 (diff)
Merge branch 'asapo-producer'
-rw-r--r--doc/man/indexamajig.15
-rw-r--r--libcrystfel/src/image-seedee.c28
-rw-r--r--meson.build34
-rw-r--r--src/im-asapo.c165
-rw-r--r--src/im-asapo.h14
-rw-r--r--src/im-sandbox.c11
-rw-r--r--src/indexamajig.c8
-rw-r--r--src/process_image.c5
-rw-r--r--src/process_image.h4
9 files changed, 249 insertions, 25 deletions
diff --git a/doc/man/indexamajig.1 b/doc/man/indexamajig.1
index 8f4dc6a2..0f35d4d8 100644
--- a/doc/man/indexamajig.1
+++ b/doc/man/indexamajig.1
@@ -199,6 +199,11 @@ Receive data via the specified ASAP::O endpoint. This option and \fB--zmq-input
Authentication token, beamtime, data source, consumer group and stream, respectively, for ASAP::O data.
.PD 0
+.IP \fB--asapo-output-stream=\fIstream\fR
+.PD
+Send an output stream via ASAP::O. For non-hits, a small placeholder will be sent.
+
+.PD 0
.IP \fB--asapo-wait-for-stream
.PD
If the ASAP::O stream does not exist, wait for it to be appear. Without this option, indexamajig will exit immediately if the stream is not found.
diff --git a/libcrystfel/src/image-seedee.c b/libcrystfel/src/image-seedee.c
index 60153591..b4ae5179 100644
--- a/libcrystfel/src/image-seedee.c
+++ b/libcrystfel/src/image-seedee.c
@@ -84,6 +84,34 @@ static int load_seedee_data(struct panel_template *p,
}
}
+ } else if ( (array->datatype == 'u')
+ && (array->itemsize == 4)
+ && (array->byteorder == '<') )
+ {
+ int fs, ss;
+ uint32_t *in_data = (uint32_t *)array->data;
+
+ for ( ss=0; ss<PANEL_HEIGHT(p); ss++ ) {
+ for ( fs=0; fs<PANEL_WIDTH(p); fs++ ) {
+ size_t idx = fs+p->orig_min_fs + (ss+p->orig_min_ss)*data_size_fs;
+ data[fs+ss*PANEL_WIDTH(p)] = in_data[idx];
+ }
+ }
+
+ } else if ( (array->datatype == 'f')
+ && (array->itemsize == 8)
+ && (array->byteorder == '<') )
+ {
+ int fs, ss;
+ double *in_data = (double *)array->data;
+
+ for ( ss=0; ss<PANEL_HEIGHT(p); ss++ ) {
+ for ( fs=0; fs<PANEL_WIDTH(p); fs++ ) {
+ size_t idx = fs+p->orig_min_fs + (ss+p->orig_min_ss)*data_size_fs;
+ data[fs+ss*PANEL_WIDTH(p)] = in_data[idx];
+ }
+ }
+
} else {
ERROR("Unrecognised data type %c%i%c\n",
array->datatype, array->itemsize, array->byteorder);
diff --git a/meson.build b/meson.build
index 8f6567fa..c07a326a 100644
--- a/meson.build
+++ b/meson.build
@@ -6,6 +6,7 @@ project('crystfel', 'c',
default_options: ['buildtype=debugoptimized'])
libcrystfel_api_version = 16
+crystfel_rpath = '$ORIGIN/../lib64/:$ORIGIN/../lib:$ORIGIN/../../asapo-libraries/lib:$ORIGIN/../../asapo-libraries/lib64'
add_project_arguments('-DHAVE_CONFIG_H', language: 'c')
@@ -53,9 +54,16 @@ if zmqdep.found()
conf_data.set10('HAVE_ZMQ', true)
endif
-asapodep = dependency('libasapo-consumer', required: false)
-if asapodep.found()
- conf_data.set10('HAVE_ASAPO', true)
+# There are two separate dependencies for ASAP::O (consumer and producer)
+# We need both of them.
+asapoproddep = dependency('libasapo-producer', required: false)
+if asapoproddep.found()
+ asapodep = dependency('libasapo-consumer', required: true)
+ if asapodep.found()
+ conf_data.set10('HAVE_ASAPO', true)
+ endif
+else
+ asapodep = dependency('', required: false)
endif
if cc.has_function('clock_gettime', prefix: '#include <time.h>')
@@ -112,7 +120,7 @@ executable('compare_hkl',
['src/compare_hkl.c', versionc],
dependencies: [mdep, libcrystfeldep, gsldep],
install: true,
- install_rpath: '$ORIGIN/../lib64/:$ORIGIN/../lib')
+ install_rpath: crystfel_rpath)
# check_hkl
executable('check_hkl',
@@ -131,21 +139,21 @@ partialator = executable('partialator',
versionc],
dependencies: [mdep, libcrystfeldep, gsldep, pthreaddep],
install: true,
- install_rpath: '$ORIGIN/../lib64/:$ORIGIN/../lib')
+ install_rpath: crystfel_rpath)
# ambigator
executable('ambigator',
['src/ambigator.c', versionc],
dependencies: [mdep, libcrystfeldep, gsldep, hdf5dep],
install: true,
- install_rpath: '$ORIGIN/../lib64/:$ORIGIN/../lib')
+ install_rpath: crystfel_rpath)
# whirligig
executable('whirligig',
['src/whirligig.c', versionc],
dependencies: [mdep, libcrystfeldep],
install: true,
- install_rpath: '$ORIGIN/../lib64/:$ORIGIN/../lib')
+ install_rpath: crystfel_rpath)
# indexamajig
indexamajig_sources = ['src/indexamajig.c', 'src/im-sandbox.c',
@@ -161,9 +169,9 @@ endif
indexamajig = executable('indexamajig', indexamajig_sources,
dependencies: [mdep, libcrystfeldep, gsldep,
- pthreaddep, zmqdep, asapodep],
+ pthreaddep, zmqdep, asapodep, asapoproddep],
install: true,
- install_rpath: '$ORIGIN/../lib64/:$ORIGIN/../lib')
+ install_rpath: crystfel_rpath)
# make_pixelmap
if hdf5dep.found()
@@ -171,7 +179,7 @@ if hdf5dep.found()
['src/make_pixelmap.c', versionc],
dependencies: [mdep, libcrystfeldep, hdf5dep],
install: true,
- install_rpath: '$ORIGIN/../lib64/:$ORIGIN/../lib')
+ install_rpath: crystfel_rpath)
endif
@@ -206,7 +214,7 @@ if gtkdep.found()
executable('crystfel', gui_sources, gresources,
dependencies: [mdep, libcrystfeldep, gtkdep, gsldep],
install: true,
- install_rpath: '$ORIGIN/../lib64/:$ORIGIN/../lib')
+ install_rpath: crystfel_rpath)
install_data(['data/crystfel.desktop'],
install_dir: get_option('datadir') / 'applications')
@@ -218,7 +226,7 @@ executable('render_hkl',
['src/render_hkl.c', versionc],
dependencies: [mdep, libcrystfeldep, cairodep, gsldep],
install: true,
- install_rpath: '$ORIGIN/../lib64/:$ORIGIN/../lib')
+ install_rpath: crystfel_rpath)
# cell_explorer
if gtkdep.found()
@@ -226,7 +234,7 @@ if gtkdep.found()
['src/cell_explorer.c', 'src/multihistogram.c', versionc],
dependencies: [mdep, libcrystfeldep, gtkdep, gsldep],
install: true,
- install_rpath: '$ORIGIN/../lib64/:$ORIGIN/../lib')
+ install_rpath: crystfel_rpath)
install_data(['data/cell_explorer.desktop'],
install_dir: get_option('datadir') / 'applications')
diff --git a/src/im-asapo.c b/src/im-asapo.c
index 82934d50..c34954e2 100644
--- a/src/im-asapo.c
+++ b/src/im-asapo.c
@@ -3,11 +3,11 @@
*
* ASAP::O data interface
*
- * Copyright © 2021 Deutsches Elektronen-Synchrotron DESY,
- * a research centre of the Helmholtz Association.
+ * Copyright © 2021-2023 Deutsches Elektronen-Synchrotron DESY,
+ * a research centre of the Helmholtz Association.
*
* Authors:
- * 2021 Thomas White <taw@physics.org>
+ * 2021-2023 Thomas White <taw@physics.org>
*
* This file is part of CrystFEL.
*
@@ -35,7 +35,9 @@
#include <stdint.h>
#include <assert.h>
#include <unistd.h>
+#include <sys/time.h>
#include <asapo/consumer_c.h>
+#include <asapo/producer_c.h>
#include <image.h>
#include <utils.h>
@@ -50,6 +52,7 @@ struct im_asapo
{
char *stream;
AsapoConsumerHandle consumer;
+ AsapoProducerHandle producer;
AsapoStringHandle group_id;
int wait_for_stream;
};
@@ -70,6 +73,54 @@ static void show_asapo_error(const char *msg, const AsapoErrorHandle err)
}
+static int create_producer(struct im_asapo *a, struct im_asapo_params *params)
+{
+ char *source;
+ AsapoSourceCredentialsHandle cred;
+ AsapoErrorHandle err = asapo_new_handle();
+
+ if ( !params->write_output_stream ) {
+ a->producer = NULL;
+ return 0;
+ }
+
+ source = malloc(strlen(params->source)+6);
+ if ( source == NULL ) return 1;
+
+ strcpy(source, params->source);
+ strcat(source, "_hits");
+
+ cred = asapo_create_source_credentials(kProcessed,
+ "auto", /* instance ID */
+ "indexamajig", /* pipeline step */
+ params->beamtime,
+ "", /* beamline */
+ source,
+ params->token);
+ STATUS("Writing hits-only output stream as data source '%s'\n", source);
+ free(source);
+
+ a->producer = asapo_create_producer(params->endpoint,
+ 8, /* Number of sender threads */
+ kTcp,
+ cred,
+ 30000, /* Timeout */
+ &err);
+
+ if ( asapo_is_error(err) ) {
+ show_asapo_error("Cannot create ASAP::O producer", err);
+ asapo_free_handle(&cred);
+ asapo_free_handle(&err);
+ return 1;
+ }
+
+ asapo_producer_set_log_level(a->producer, Debug);
+
+ asapo_free_handle(&err);
+ return 0;
+}
+
+
struct im_asapo *im_asapo_connect(struct im_asapo_params *params)
{
struct im_asapo *a;
@@ -108,18 +159,22 @@ struct im_asapo *im_asapo_connect(struct im_asapo_params *params)
params->source,
params->token);
a->consumer = asapo_create_consumer(params->endpoint, "auto", 0, cred, &err);
- asapo_free_handle(&cred);
if ( asapo_is_error(err) ) {
show_asapo_error("Cannot create ASAP::O consumer", err);
+ asapo_free_handle(&cred);
free(a);
return NULL;
}
+ if ( create_producer(a, params) ) return NULL;
+
a->stream = strdup(params->stream);
asapo_consumer_set_timeout(a->consumer, 3000);
a->group_id = asapo_string_from_c_str(params->group_id);
a->wait_for_stream = params->wait_for_stream;
+ asapo_free_handle(&cred);
+
return a;
}
@@ -144,7 +199,7 @@ static int stream_empty(struct im_asapo *a)
void *im_asapo_fetch(struct im_asapo *a, size_t *pdata_size,
char **pmeta, char **pfilename, char **pevent,
- int *pfinished)
+ int *pfinished, int *pmessageid)
{
void *data_copy;
AsapoMessageMetaHandle meta;
@@ -204,6 +259,7 @@ void *im_asapo_fetch(struct im_asapo *a, size_t *pdata_size,
*pmeta = strdup(asapo_message_meta_get_metadata(meta));
*pfilename = strdup(asapo_message_meta_get_name(meta));
*pevent = strdup("//");
+ *pmessageid = asapo_message_meta_get_id(meta);
profile_end("copy-meta");
asapo_free_handle(&err);
@@ -215,6 +271,105 @@ void *im_asapo_fetch(struct im_asapo *a, size_t *pdata_size,
}
+static void send_callback(void *a, AsapoRequestCallbackPayloadHandle payload,
+ AsapoErrorHandle err)
+{
+ if ( asapo_is_error(err) ) {
+ show_asapo_error("ASAP::O send error", err);
+ }
+}
+
+
+static void send_real(struct im_asapo *a, struct image *image)
+{
+ AsapoMessageHeaderHandle header;
+ AsapoErrorHandle err;
+ char filename[1024];
+
+ snprintf(filename, 1024, "processed/%s_hits/%s-%i.data",
+ a->stream, a->stream, image->serial);
+
+ header = asapo_create_message_header(image->serial,
+ image->data_block_size,
+ filename,
+ image->meta_data,
+ 0, /* Dataset substream */
+ 0,
+ 0); /* Auto ID */
+
+ struct timeval tv;
+ gettimeofday(&tv,NULL);
+ STATUS("sent %s at %lli . %lli\n", filename, tv.tv_sec, tv.tv_usec);
+
+ err = asapo_new_handle();
+ asapo_producer_send(a->producer, header, image->data_block,
+ kDefaultIngestMode, a->stream,
+ send_callback, &err);
+ if ( asapo_is_error(err) ) {
+ show_asapo_error("Couldn't send ASAP::O message", err);
+ asapo_free_handle(&header);
+ asapo_free_handle(&err);
+ return;
+ }
+
+ asapo_free_handle(&header);
+ asapo_free_handle(&err);
+}
+
+
+static void send_placeholder(struct im_asapo *a, struct image *image)
+{
+ AsapoMessageHeaderHandle header;
+ AsapoErrorHandle err;
+ char filename[1024];
+
+ snprintf(filename, 1024, "processed/%s_hits/%s-%i.placeholder",
+ a->stream, a->stream, image->serial);
+
+ struct timeval tv;
+ gettimeofday(&tv,NULL);
+ STATUS("sent %s at %lli . %lli\n", filename, tv.tv_sec, tv.tv_usec);
+
+ header = asapo_create_message_header(image->serial,
+ 8, /* strlen("SKIPPED"+\0) */
+ filename,
+ image->meta_data,
+ 0, /* Dataset substream */
+ 0,
+ 0); /* Auto ID */
+
+ err = asapo_new_handle();
+ asapo_producer_send(a->producer, header, "SKIPPED",
+ kDefaultIngestMode, a->stream,
+ send_callback, &err);
+ if ( asapo_is_error(err) ) {
+ show_asapo_error("Couldn't send ASAP::O message", err);
+ asapo_free_handle(&header);
+ asapo_free_handle(&err);
+ return;
+ }
+
+ asapo_free_handle(&header);
+ asapo_free_handle(&err);
+}
+
+
+/* Send the image to the output ASAP::O stream, if it's a hit. Otherwise,
+ * send a placeholder */
+void im_asapo_send(struct im_asapo *a, struct image *image, int hit)
+{
+ if ( a == NULL ) return;
+ if ( a->producer == NULL ) return;
+ profile_start("asapo-send");
+ if ( hit ) {
+ send_real(a, image);
+ } else {
+ send_placeholder(a, image);
+ }
+ profile_end("asapo-send");
+}
+
+
void im_asapo_shutdown(struct im_asapo *a)
{
if ( a == NULL ) return;
diff --git a/src/im-asapo.h b/src/im-asapo.h
index cda9fbd9..43691700 100644
--- a/src/im-asapo.h
+++ b/src/im-asapo.h
@@ -43,8 +43,11 @@ struct im_asapo_params
char *source;
char *stream;
int wait_for_stream;
+ int write_output_stream;
};
+struct im_asapo;
+
#if defined(HAVE_ASAPO)
extern struct im_asapo *im_asapo_connect(struct im_asapo_params *params);
@@ -53,7 +56,9 @@ extern void im_asapo_shutdown(struct im_asapo *a);
extern void *im_asapo_fetch(struct im_asapo *a, size_t *pdata_size,
char **pmeta, char **pfilename, char **pevent,
- int *pfinished);
+ int *pfinished, int *pmessageid);
+
+extern void im_asapo_send(struct im_asapo *a, struct image *image, int hit);
#else /* defined(HAVE_ASAPO) */
@@ -69,16 +74,21 @@ static UNUSED void im_asapo_shutdown(struct im_asapo *a)
static UNUSED void *im_asapo_fetch(struct im_asapo *a, size_t *psize,
char **pmeta, char **pfilename, char **pevent,
- int *pfinished)
+ int *pfinished, int *pmessageid)
{
*psize = 0;
*pmeta = NULL;
*pfilename = NULL;
*pevent = NULL;
*pfinished = 1;
+ *pmessageid = 0;
return NULL;
}
+static UNUSED void im_asapo_send(struct im_asapo *a, struct image *image, int hit)
+{
+}
+
#endif /* defined(HAVE_ASAPO) */
#endif /* CRYSTFEL_ASAPO_H */
diff --git a/src/im-sandbox.c b/src/im-sandbox.c
index 51d9db99..7f02b1cf 100644
--- a/src/im-sandbox.c
+++ b/src/im-sandbox.c
@@ -476,6 +476,7 @@ static int run_work(const struct index_args *iargs, Stream *st,
char *filename;
char *event;
int finished = 0;
+ int message_id;
profile_start("asapo-fetch");
set_last_task(sb->shared->last_task[cookie], "ASAPO fetch");
@@ -484,7 +485,8 @@ static int run_work(const struct index_args *iargs, Stream *st,
&pargs.asapo_meta,
&filename,
&event,
- &finished);
+ &finished,
+ &message_id);
profile_end("asapo-fetch");
if ( pargs.asapo_data != NULL ) {
ok = 1;
@@ -496,6 +498,10 @@ static int run_work(const struct index_args *iargs, Stream *st,
pargs.filename = filename;
pargs.event = event;
sb->shared->end_of_stream[cookie] = 0;
+
+ /* We will also use ASAP::O's serial number
+ * instead of our own. */
+ ser = message_id;
} else {
if ( finished ) {
sb->shared->end_of_stream[cookie] = 1;
@@ -510,7 +516,8 @@ static int run_work(const struct index_args *iargs, Stream *st,
sb->shared->time_last_start[cookie] = get_monotonic_seconds();
profile_start("process-image");
process_image(iargs, &pargs, st, cookie, tmpdir, ser,
- sb->shared, sb->shared->last_task[cookie]);
+ sb->shared, sb->shared->last_task[cookie],
+ asapostuff);
profile_end("process-image");
}
diff --git a/src/indexamajig.c b/src/indexamajig.c
index 427843bc..a7b5ec9d 100644
--- a/src/indexamajig.c
+++ b/src/indexamajig.c
@@ -449,6 +449,10 @@ static error_t parse_arg(int key, char *arg, struct argp_state *state)
break;
case 222 :
+ args->asapo_params.write_output_stream = 1;
+ break;
+
+ case 223 :
args->cpu_pin = 1;
break;
@@ -887,6 +891,7 @@ int main(int argc, char *argv[])
args.asapo_params.source = NULL;
args.asapo_params.stream = NULL;
args.asapo_params.wait_for_stream = 0;
+ args.asapo_params.write_output_stream = 0;
args.cpu_pin = 0;
args.serial_start = 1;
args.if_peaks = 1;
@@ -1009,7 +1014,8 @@ int main(int argc, char *argv[])
{"asapo-stream", 220, "str", OPTION_NO_USAGE, "ASAP::O stream name"},
{"asapo-wait-for-stream", 221, NULL, OPTION_NO_USAGE,
"Wait for ASAP::O stream to appear"},
- {"cpu-pin", 222, NULL, OPTION_NO_USAGE, "Pin worker processes to CPUs"},
+ {"asapo-output-stream", 222, NULL, OPTION_NO_USAGE, "Create an ASAP::O hits-only stream"},
+ {"cpu-pin", 223, NULL, OPTION_NO_USAGE, "Pin worker processes to CPUs"},
{NULL, 0, 0, OPTION_DOC, "Peak search options:", 3},
{"peaks", 301, "method", 0, "Peak search method. Default: zaef"},
diff --git a/src/process_image.c b/src/process_image.c
index 5bc527a1..8acd3e86 100644
--- a/src/process_image.c
+++ b/src/process_image.c
@@ -56,6 +56,7 @@
#include "predict-refine.h"
#include "im-sandbox.h"
#include "im-zmq.h"
+#include "im-asapo.h"
static float **backup_image_data(float **dp, struct detgeom *det)
{
@@ -177,7 +178,7 @@ static struct image *file_wait_open_read(const char *filename,
void process_image(const struct index_args *iargs, struct pattern_args *pargs,
Stream *st, int cookie, const char *tmpdir,
int serial, struct sb_shm *sb_shared,
- char *last_task)
+ char *last_task, struct im_asapo *asapostuff)
{
struct image *image;
int i;
@@ -480,6 +481,8 @@ streamwrite:
"%s\n", n, n>1?"s":"", image->filename, image->ev);
}
+ im_asapo_send(asapostuff, image, image->hit);
+
out:
/* Count crystals which are still good */
set_last_task(last_task, "process_image finalisation");
diff --git a/src/process_image.h b/src/process_image.h
index bb68ce19..cca2f7d2 100644
--- a/src/process_image.h
+++ b/src/process_image.h
@@ -45,6 +45,7 @@ struct index_args;
#include "im-sandbox.h"
#include "peaks.h"
#include "image.h"
+#include "im-asapo.h"
/* Information about the indexing process which is common to all patterns */
@@ -113,7 +114,8 @@ struct pattern_args
extern void process_image(const struct index_args *iargs,
struct pattern_args *pargs, Stream *st,
int cookie, const char *tmpdir, int serial,
- struct sb_shm *sb_shared, char *last_task);
+ struct sb_shm *sb_shared, char *last_task,
+ struct im_asapo *asapostuff);
#endif /* PROCESS_IMAGE_H */