aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/im-asapo.c165
-rw-r--r--src/im-asapo.h14
-rw-r--r--src/im-sandbox.c11
-rw-r--r--src/indexamajig.c8
-rw-r--r--src/process_image.c5
-rw-r--r--src/process_image.h4
6 files changed, 195 insertions, 12 deletions
diff --git a/src/im-asapo.c b/src/im-asapo.c
index 82934d50..c34954e2 100644
--- a/src/im-asapo.c
+++ b/src/im-asapo.c
@@ -3,11 +3,11 @@
*
* ASAP::O data interface
*
- * Copyright © 2021 Deutsches Elektronen-Synchrotron DESY,
- * a research centre of the Helmholtz Association.
+ * Copyright © 2021-2023 Deutsches Elektronen-Synchrotron DESY,
+ * a research centre of the Helmholtz Association.
*
* Authors:
- * 2021 Thomas White <taw@physics.org>
+ * 2021-2023 Thomas White <taw@physics.org>
*
* This file is part of CrystFEL.
*
@@ -35,7 +35,9 @@
#include <stdint.h>
#include <assert.h>
#include <unistd.h>
+#include <sys/time.h>
#include <asapo/consumer_c.h>
+#include <asapo/producer_c.h>
#include <image.h>
#include <utils.h>
@@ -50,6 +52,7 @@ struct im_asapo
{
char *stream;
AsapoConsumerHandle consumer;
+ AsapoProducerHandle producer;
AsapoStringHandle group_id;
int wait_for_stream;
};
@@ -70,6 +73,54 @@ static void show_asapo_error(const char *msg, const AsapoErrorHandle err)
}
+static int create_producer(struct im_asapo *a, struct im_asapo_params *params)
+{
+ char *source;
+ AsapoSourceCredentialsHandle cred;
+ AsapoErrorHandle err = asapo_new_handle();
+
+ if ( !params->write_output_stream ) {
+ a->producer = NULL;
+ return 0;
+ }
+
+ source = malloc(strlen(params->source)+6);
+ if ( source == NULL ) return 1;
+
+ strcpy(source, params->source);
+ strcat(source, "_hits");
+
+ cred = asapo_create_source_credentials(kProcessed,
+ "auto", /* instance ID */
+ "indexamajig", /* pipeline step */
+ params->beamtime,
+ "", /* beamline */
+ source,
+ params->token);
+ STATUS("Writing hits-only output stream as data source '%s'\n", source);
+ free(source);
+
+ a->producer = asapo_create_producer(params->endpoint,
+ 8, /* Number of sender threads */
+ kTcp,
+ cred,
+ 30000, /* Timeout */
+ &err);
+
+ if ( asapo_is_error(err) ) {
+ show_asapo_error("Cannot create ASAP::O producer", err);
+ asapo_free_handle(&cred);
+ asapo_free_handle(&err);
+ return 1;
+ }
+
+ asapo_producer_set_log_level(a->producer, Debug);
+
+ asapo_free_handle(&err);
+ return 0;
+}
+
+
struct im_asapo *im_asapo_connect(struct im_asapo_params *params)
{
struct im_asapo *a;
@@ -108,18 +159,22 @@ struct im_asapo *im_asapo_connect(struct im_asapo_params *params)
params->source,
params->token);
a->consumer = asapo_create_consumer(params->endpoint, "auto", 0, cred, &err);
- asapo_free_handle(&cred);
if ( asapo_is_error(err) ) {
show_asapo_error("Cannot create ASAP::O consumer", err);
+ asapo_free_handle(&cred);
free(a);
return NULL;
}
+ if ( create_producer(a, params) ) return NULL;
+
a->stream = strdup(params->stream);
asapo_consumer_set_timeout(a->consumer, 3000);
a->group_id = asapo_string_from_c_str(params->group_id);
a->wait_for_stream = params->wait_for_stream;
+ asapo_free_handle(&cred);
+
return a;
}
@@ -144,7 +199,7 @@ static int stream_empty(struct im_asapo *a)
void *im_asapo_fetch(struct im_asapo *a, size_t *pdata_size,
char **pmeta, char **pfilename, char **pevent,
- int *pfinished)
+ int *pfinished, int *pmessageid)
{
void *data_copy;
AsapoMessageMetaHandle meta;
@@ -204,6 +259,7 @@ void *im_asapo_fetch(struct im_asapo *a, size_t *pdata_size,
*pmeta = strdup(asapo_message_meta_get_metadata(meta));
*pfilename = strdup(asapo_message_meta_get_name(meta));
*pevent = strdup("//");
+ *pmessageid = asapo_message_meta_get_id(meta);
profile_end("copy-meta");
asapo_free_handle(&err);
@@ -215,6 +271,105 @@ void *im_asapo_fetch(struct im_asapo *a, size_t *pdata_size,
}
+static void send_callback(void *a, AsapoRequestCallbackPayloadHandle payload,
+ AsapoErrorHandle err)
+{
+ if ( asapo_is_error(err) ) {
+ show_asapo_error("ASAP::O send error", err);
+ }
+}
+
+
+static void send_real(struct im_asapo *a, struct image *image)
+{
+ AsapoMessageHeaderHandle header;
+ AsapoErrorHandle err;
+ char filename[1024];
+
+ snprintf(filename, 1024, "processed/%s_hits/%s-%i.data",
+ a->stream, a->stream, image->serial);
+
+ header = asapo_create_message_header(image->serial,
+ image->data_block_size,
+ filename,
+ image->meta_data,
+ 0, /* Dataset substream */
+ 0,
+ 0); /* Auto ID */
+
+ struct timeval tv;
+ gettimeofday(&tv,NULL);
+ STATUS("sent %s at %lli . %lli\n", filename, tv.tv_sec, tv.tv_usec);
+
+ err = asapo_new_handle();
+ asapo_producer_send(a->producer, header, image->data_block,
+ kDefaultIngestMode, a->stream,
+ send_callback, &err);
+ if ( asapo_is_error(err) ) {
+ show_asapo_error("Couldn't send ASAP::O message", err);
+ asapo_free_handle(&header);
+ asapo_free_handle(&err);
+ return;
+ }
+
+ asapo_free_handle(&header);
+ asapo_free_handle(&err);
+}
+
+
+static void send_placeholder(struct im_asapo *a, struct image *image)
+{
+ AsapoMessageHeaderHandle header;
+ AsapoErrorHandle err;
+ char filename[1024];
+
+ snprintf(filename, 1024, "processed/%s_hits/%s-%i.placeholder",
+ a->stream, a->stream, image->serial);
+
+ struct timeval tv;
+ gettimeofday(&tv,NULL);
+ STATUS("sent %s at %lli . %lli\n", filename, tv.tv_sec, tv.tv_usec);
+
+ header = asapo_create_message_header(image->serial,
+ 8, /* strlen("SKIPPED"+\0) */
+ filename,
+ image->meta_data,
+ 0, /* Dataset substream */
+ 0,
+ 0); /* Auto ID */
+
+ err = asapo_new_handle();
+ asapo_producer_send(a->producer, header, "SKIPPED",
+ kDefaultIngestMode, a->stream,
+ send_callback, &err);
+ if ( asapo_is_error(err) ) {
+ show_asapo_error("Couldn't send ASAP::O message", err);
+ asapo_free_handle(&header);
+ asapo_free_handle(&err);
+ return;
+ }
+
+ asapo_free_handle(&header);
+ asapo_free_handle(&err);
+}
+
+
+/* Send the image to the output ASAP::O stream, if it's a hit. Otherwise,
+ * send a placeholder */
+void im_asapo_send(struct im_asapo *a, struct image *image, int hit)
+{
+ if ( a == NULL ) return;
+ if ( a->producer == NULL ) return;
+ profile_start("asapo-send");
+ if ( hit ) {
+ send_real(a, image);
+ } else {
+ send_placeholder(a, image);
+ }
+ profile_end("asapo-send");
+}
+
+
void im_asapo_shutdown(struct im_asapo *a)
{
if ( a == NULL ) return;
diff --git a/src/im-asapo.h b/src/im-asapo.h
index cda9fbd9..43691700 100644
--- a/src/im-asapo.h
+++ b/src/im-asapo.h
@@ -43,8 +43,11 @@ struct im_asapo_params
char *source;
char *stream;
int wait_for_stream;
+ int write_output_stream;
};
+struct im_asapo;
+
#if defined(HAVE_ASAPO)
extern struct im_asapo *im_asapo_connect(struct im_asapo_params *params);
@@ -53,7 +56,9 @@ extern void im_asapo_shutdown(struct im_asapo *a);
extern void *im_asapo_fetch(struct im_asapo *a, size_t *pdata_size,
char **pmeta, char **pfilename, char **pevent,
- int *pfinished);
+ int *pfinished, int *pmessageid);
+
+extern void im_asapo_send(struct im_asapo *a, struct image *image, int hit);
#else /* defined(HAVE_ASAPO) */
@@ -69,16 +74,21 @@ static UNUSED void im_asapo_shutdown(struct im_asapo *a)
static UNUSED void *im_asapo_fetch(struct im_asapo *a, size_t *psize,
char **pmeta, char **pfilename, char **pevent,
- int *pfinished)
+ int *pfinished, int *pmessageid)
{
*psize = 0;
*pmeta = NULL;
*pfilename = NULL;
*pevent = NULL;
*pfinished = 1;
+ *pmessageid = 0;
return NULL;
}
+static UNUSED void im_asapo_send(struct im_asapo *a, struct image *image, int hit)
+{
+}
+
#endif /* defined(HAVE_ASAPO) */
#endif /* CRYSTFEL_ASAPO_H */
diff --git a/src/im-sandbox.c b/src/im-sandbox.c
index 51d9db99..7f02b1cf 100644
--- a/src/im-sandbox.c
+++ b/src/im-sandbox.c
@@ -476,6 +476,7 @@ static int run_work(const struct index_args *iargs, Stream *st,
char *filename;
char *event;
int finished = 0;
+ int message_id;
profile_start("asapo-fetch");
set_last_task(sb->shared->last_task[cookie], "ASAPO fetch");
@@ -484,7 +485,8 @@ static int run_work(const struct index_args *iargs, Stream *st,
&pargs.asapo_meta,
&filename,
&event,
- &finished);
+ &finished,
+ &message_id);
profile_end("asapo-fetch");
if ( pargs.asapo_data != NULL ) {
ok = 1;
@@ -496,6 +498,10 @@ static int run_work(const struct index_args *iargs, Stream *st,
pargs.filename = filename;
pargs.event = event;
sb->shared->end_of_stream[cookie] = 0;
+
+ /* We will also use ASAP::O's serial number
+ * instead of our own. */
+ ser = message_id;
} else {
if ( finished ) {
sb->shared->end_of_stream[cookie] = 1;
@@ -510,7 +516,8 @@ static int run_work(const struct index_args *iargs, Stream *st,
sb->shared->time_last_start[cookie] = get_monotonic_seconds();
profile_start("process-image");
process_image(iargs, &pargs, st, cookie, tmpdir, ser,
- sb->shared, sb->shared->last_task[cookie]);
+ sb->shared, sb->shared->last_task[cookie],
+ asapostuff);
profile_end("process-image");
}
diff --git a/src/indexamajig.c b/src/indexamajig.c
index 427843bc..a7b5ec9d 100644
--- a/src/indexamajig.c
+++ b/src/indexamajig.c
@@ -449,6 +449,10 @@ static error_t parse_arg(int key, char *arg, struct argp_state *state)
break;
case 222 :
+ args->asapo_params.write_output_stream = 1;
+ break;
+
+ case 223 :
args->cpu_pin = 1;
break;
@@ -887,6 +891,7 @@ int main(int argc, char *argv[])
args.asapo_params.source = NULL;
args.asapo_params.stream = NULL;
args.asapo_params.wait_for_stream = 0;
+ args.asapo_params.write_output_stream = 0;
args.cpu_pin = 0;
args.serial_start = 1;
args.if_peaks = 1;
@@ -1009,7 +1014,8 @@ int main(int argc, char *argv[])
{"asapo-stream", 220, "str", OPTION_NO_USAGE, "ASAP::O stream name"},
{"asapo-wait-for-stream", 221, NULL, OPTION_NO_USAGE,
"Wait for ASAP::O stream to appear"},
- {"cpu-pin", 222, NULL, OPTION_NO_USAGE, "Pin worker processes to CPUs"},
+ {"asapo-output-stream", 222, NULL, OPTION_NO_USAGE, "Create an ASAP::O hits-only stream"},
+ {"cpu-pin", 223, NULL, OPTION_NO_USAGE, "Pin worker processes to CPUs"},
{NULL, 0, 0, OPTION_DOC, "Peak search options:", 3},
{"peaks", 301, "method", 0, "Peak search method. Default: zaef"},
diff --git a/src/process_image.c b/src/process_image.c
index 5bc527a1..8acd3e86 100644
--- a/src/process_image.c
+++ b/src/process_image.c
@@ -56,6 +56,7 @@
#include "predict-refine.h"
#include "im-sandbox.h"
#include "im-zmq.h"
+#include "im-asapo.h"
static float **backup_image_data(float **dp, struct detgeom *det)
{
@@ -177,7 +178,7 @@ static struct image *file_wait_open_read(const char *filename,
void process_image(const struct index_args *iargs, struct pattern_args *pargs,
Stream *st, int cookie, const char *tmpdir,
int serial, struct sb_shm *sb_shared,
- char *last_task)
+ char *last_task, struct im_asapo *asapostuff)
{
struct image *image;
int i;
@@ -480,6 +481,8 @@ streamwrite:
"%s\n", n, n>1?"s":"", image->filename, image->ev);
}
+ im_asapo_send(asapostuff, image, image->hit);
+
out:
/* Count crystals which are still good */
set_last_task(last_task, "process_image finalisation");
diff --git a/src/process_image.h b/src/process_image.h
index bb68ce19..cca2f7d2 100644
--- a/src/process_image.h
+++ b/src/process_image.h
@@ -45,6 +45,7 @@ struct index_args;
#include "im-sandbox.h"
#include "peaks.h"
#include "image.h"
+#include "im-asapo.h"
/* Information about the indexing process which is common to all patterns */
@@ -113,7 +114,8 @@ struct pattern_args
extern void process_image(const struct index_args *iargs,
struct pattern_args *pargs, Stream *st,
int cookie, const char *tmpdir, int serial,
- struct sb_shm *sb_shared, char *last_task);
+ struct sb_shm *sb_shared, char *last_task,
+ struct im_asapo *asapostuff);
#endif /* PROCESS_IMAGE_H */