aboutsummaryrefslogtreecommitdiff
path: root/src/im-asapo.c
diff options
context:
space:
mode:
authorThomas White <taw@physics.org>2023-04-12 17:12:48 +0200
committerThomas White <taw@physics.org>2023-06-01 15:31:50 +0200
commit6730fa1c86654ccf84b4ad694f74cc1564cd2cee (patch)
tree1fc974d0853bfcc84c446e246ad2d772aa2aaf1b /src/im-asapo.c
parente922cc649959d2f05dc4df44b0d5889b0fcdb32c (diff)
indexamajig: Add --asapo-output-stream (hits only)
Diffstat (limited to 'src/im-asapo.c')
-rw-r--r--src/im-asapo.c113
1 files changed, 112 insertions, 1 deletions
diff --git a/src/im-asapo.c b/src/im-asapo.c
index 43455298..37784bf3 100644
--- a/src/im-asapo.c
+++ b/src/im-asapo.c
@@ -36,6 +36,7 @@
#include <assert.h>
#include <unistd.h>
#include <asapo/consumer_c.h>
+#include <asapo/producer_c.h>
#include <image.h>
#include <utils.h>
@@ -50,8 +51,10 @@ struct im_asapo
{
char *stream;
AsapoConsumerHandle consumer;
+ AsapoProducerHandle producer;
AsapoStringHandle group_id;
int wait_for_stream;
+ char *output_stream;
};
@@ -96,6 +99,10 @@ struct im_asapo *im_asapo_connect(struct im_asapo_params *params)
ERROR("ASAP::O stream not specified.\n");
return NULL;
}
+ if ( strcmp(params->stream, params->output_stream) == 0 ) {
+ ERROR("ASAP::O input and output streams cannot be the same.\n");
+ return NULL;
+ }
a = malloc(sizeof(struct im_asapo));
if ( a == NULL ) return NULL;
@@ -108,18 +115,41 @@ struct im_asapo *im_asapo_connect(struct im_asapo_params *params)
params->source,
params->token);
a->consumer = asapo_create_consumer(params->endpoint, "auto", 0, cred, &err);
- asapo_free_handle(&cred);
if ( asapo_is_error(err) ) {
show_asapo_error("Cannot create ASAP::O consumer", err);
+ asapo_free_handle(&cred);
free(a);
return NULL;
}
+ if ( params->output_stream != NULL ) {
+ a->producer = asapo_create_producer(params->endpoint,
+ 1, /* Number of sender threads */
+ kTcp,
+ cred,
+ 60000, /* Timeout */
+ &err);
+ if ( asapo_is_error(err) ) {
+ show_asapo_error("Cannot create ASAP::O producer", err);
+ asapo_free_handle(&a->consumer);
+ asapo_free_handle(&a->group_id);
+ asapo_free_handle(&cred);
+ free(a);
+ return NULL;
+ }
+ a->output_stream = strdup(params->output_stream);
+ } else {
+ a->producer = NULL;
+ a->output_stream = NULL;
+ }
+
a->stream = strdup(params->stream);
asapo_consumer_set_timeout(a->consumer, 3000);
a->group_id = asapo_string_from_c_str(params->group_id);
a->wait_for_stream = params->wait_for_stream;
+ asapo_free_handle(&cred);
+
return a;
}
@@ -216,6 +246,87 @@ void *im_asapo_fetch(struct im_asapo *a, size_t *pdata_size,
}
+static void send_callback(void *a, AsapoRequestCallbackPayloadHandle payload,
+ AsapoErrorHandle err)
+{
+ if ( asapo_is_error(err) ) {
+ show_asapo_error("ASAP::O send error", err);
+ } else if ( err != NULL ) {
+ show_asapo_error("ASAP::O send warning", err);
+ }
+}
+
+
+static void send_real(struct im_asapo *a, struct image *image)
+{
+ AsapoMessageHeaderHandle header;
+ AsapoErrorHandle err;
+
+ header = asapo_create_message_header(image->serial,
+ image->data_block_size,
+ image->filename,
+ image->meta_data,
+ 0, /* Dataset substream */
+ 0,
+ 0); /* Auto ID */
+
+ asapo_producer_send(a->producer, header, image->data_block,
+ kDefaultIngestMode, a->output_stream,
+ send_callback, &err);
+ if ( asapo_is_error(err) ) {
+ show_asapo_error("Couldn't ASAP::O message", err);
+ asapo_free_handle(&header);
+ asapo_free_handle(&err);
+ return;
+ }
+
+ asapo_free_handle(&header);
+ asapo_free_handle(&err);
+}
+
+
+static void send_placeholder(struct im_asapo *a, struct image *image)
+{
+ AsapoMessageHeaderHandle header;
+ AsapoErrorHandle err;
+
+ header = asapo_create_message_header(image->serial,
+ 8, /* strlen("SKIPPED"+\0) */
+ image->filename,
+ image->meta_data,
+ 0, /* Dataset substream */
+ 0,
+ 0); /* Auto ID */
+
+ asapo_producer_send(a->producer, header, "SKIPPED",
+ kDefaultIngestMode, a->output_stream,
+ send_callback, &err);
+ if ( asapo_is_error(err) ) {
+ show_asapo_error("Couldn't ASAP::O message", err);
+ asapo_free_handle(&header);
+ asapo_free_handle(&err);
+ return;
+ }
+
+ asapo_free_handle(&header);
+ asapo_free_handle(&err);
+}
+
+
+/* Send the image to the output ASAP::O stream, if it's a hit. Otherwise,
+ * send a placeholder */
+void im_asapo_send(struct im_asapo *a, struct image *image, int hit)
+{
+ profile_start("asapo-send");
+ if ( hit ) {
+ send_real(a, image);
+ } else {
+ send_placeholder(a, image);
+ }
+ profile_end("asapo-send");
+}
+
+
void im_asapo_shutdown(struct im_asapo *a)
{
if ( a == NULL ) return;