From d7329d0a207cd17cde5651abd989a66c1b5da82c Mon Sep 17 00:00:00 2001 From: Thomas White Date: Wed, 24 Apr 2024 13:47:54 +0200 Subject: indexamajig: Add ASAP::O acknowledgements --- src/im-asapo.c | 14 ++++++++++++++ src/im-asapo.h | 6 ++++++ src/im-sandbox.c | 10 +++++++--- 3 files changed, 27 insertions(+), 3 deletions(-) diff --git a/src/im-asapo.c b/src/im-asapo.c index 07666d97..6df14e93 100644 --- a/src/im-asapo.c +++ b/src/im-asapo.c @@ -173,6 +173,8 @@ struct im_asapo *im_asapo_connect(struct im_asapo_params *params) a->group_id = asapo_string_from_c_str(params->group_id); a->wait_for_stream = params->wait_for_stream; + asapo_consumer_set_resend_nacs(a->consumer, 1, 10000, 3); + asapo_free_handle(&cred); return a; @@ -271,6 +273,18 @@ void *im_asapo_fetch(struct im_asapo *a, size_t *pdata_size, } +void im_asapo_finalise(struct im_asapo *a, uint64_t message_id) +{ + AsapoErrorHandle err = asapo_new_handle(); + asapo_consumer_acknowledge(a->consumer, a->group_id, message_id, + a->stream, &err); + if ( asapo_is_error(err) ) { + show_asapo_error("Couldn't acknowledge ASAP::O message", err); + } + asapo_free_handle(&err); +} + + static void send_callback(void *a, AsapoRequestCallbackPayloadHandle payload, AsapoErrorHandle err) { diff --git a/src/im-asapo.h b/src/im-asapo.h index 424d8b59..d06f9527 100644 --- a/src/im-asapo.h +++ b/src/im-asapo.h @@ -59,6 +59,8 @@ extern void *im_asapo_fetch(struct im_asapo *a, size_t *pdata_size, char **pmeta, char **pfilename, char **pevent, int *pfinished, int *pmessageid); +extern void im_asapo_finalise(struct im_asapo *a, uint64_t message_id); + extern void im_asapo_send(struct im_asapo *a, struct image *image, int hit); #else /* defined(HAVE_ASAPO) */ @@ -90,6 +92,10 @@ static UNUSED void im_asapo_send(struct im_asapo *a, struct image *image, int hi { } +static UNUSED void im_asapo_finalise(struct im_asapo *a, uint64_t message_id) +{ +} + #endif /* defined(HAVE_ASAPO) */ #endif /* CRYSTFEL_ASAPO_H */ diff --git a/src/im-sandbox.c b/src/im-sandbox.c index e97e204f..3d9f1f96 100644 --- a/src/im-sandbox.c +++ b/src/im-sandbox.c @@ -343,6 +343,7 @@ static int run_work(const struct index_args *iargs, Stream *st, struct im_asapo *asapostuff = NULL; Mille *mille; ImageDataArrays *ida; + int asapo_message_id; if ( sb->profile ) { profile_init(); @@ -490,7 +491,6 @@ static int run_work(const struct index_args *iargs, Stream *st, char *filename; char *event; int finished = 0; - int message_id; profile_start("asapo-fetch"); set_last_task(sb->shared->last_task[cookie], "ASAPO fetch"); @@ -500,7 +500,7 @@ static int run_work(const struct index_args *iargs, Stream *st, &filename, &event, &finished, - &message_id); + &asapo_message_id); profile_end("asapo-fetch"); if ( pargs.asapo_data != NULL ) { ok = 1; @@ -515,7 +515,7 @@ static int run_work(const struct index_args *iargs, Stream *st, /* We will also use ASAP::O's serial number * instead of our own. */ - ser = message_id; + ser = asapo_message_id; } else { if ( finished ) { sb->shared->end_of_stream[cookie] = 1; @@ -535,6 +535,10 @@ static int run_work(const struct index_args *iargs, Stream *st, profile_end("process-image"); } + if ( sb->asapo_params != NULL ) { + im_asapo_finalise(asapostuff, asapo_message_id); + } + /* NB pargs.zmq_data, pargs.asapo_data and pargs.asapo_meta * will be copied into the image structure, so * that it can be queried for "header" values etc. They will -- cgit v1.2.3