From 1d1a2a700d229852d9ba2e34b382a2f7b3c147f3 Mon Sep 17 00:00:00 2001 From: Thomas White Date: Fri, 2 Jun 2023 14:00:03 +0200 Subject: ASAP::O: Use XX_hits for producer data source --- src/im-asapo.c | 68 +++++++++++++++++++++++++++++++++++++++++----------------- 1 file changed, 48 insertions(+), 20 deletions(-) (limited to 'src') diff --git a/src/im-asapo.c b/src/im-asapo.c index f7851b7b..4ae4d291 100644 --- a/src/im-asapo.c +++ b/src/im-asapo.c @@ -73,6 +73,53 @@ static void show_asapo_error(const char *msg, const AsapoErrorHandle err) } +static int create_producer(struct im_asapo *a, struct im_asapo_params *params) +{ + char *source; + AsapoSourceCredentialsHandle cred; + AsapoErrorHandle err = asapo_new_handle(); + + if ( params->output_stream == NULL ) { + a->output_stream = NULL; + a->producer = NULL; + return 0; + } + + source = malloc(strlen(params->source)+6); + if ( source == NULL ) return 1; + + strcpy(source, params->source); + strcat(source, "_hits"); + + cred = asapo_create_source_credentials(kProcessed, + "auto", /* instance ID */ + "indexamajig", /* pipeline step */ + params->beamtime, + "", /* beamline */ + source, + params->token); + free(source); + + a->producer = asapo_create_producer(params->endpoint, + 1, /* Number of sender threads */ + kTcp, + cred, + 60000, /* Timeout */ + &err); + + if ( asapo_is_error(err) ) { + show_asapo_error("Cannot create ASAP::O producer", err); + asapo_free_handle(&cred); + asapo_free_handle(&err); + return 1; + } + + asapo_free_handle(&err); + a->output_stream = strdup(params->output_stream); + return 0; +} + + struct im_asapo *im_asapo_connect(struct im_asapo_params *params) { struct im_asapo *a; @@ -124,26 +171,7 @@ struct im_asapo *im_asapo_connect(struct im_asapo_params *params) return NULL; } - if ( params->output_stream != NULL ) { - a->producer = asapo_create_producer(params->endpoint, - 1, /* Number of sender threads */ - kTcp, - cred, - 60000, /* Timeout */ - &err); - if ( asapo_is_error(err) ) { - show_asapo_error("Cannot create ASAP::O producer", err); - asapo_free_handle(&a->consumer); - asapo_free_handle(&a->group_id); - asapo_free_handle(&cred); - free(a); - return NULL; - } - a->output_stream = strdup(params->output_stream); - } else { - a->producer = NULL; - a->output_stream = NULL; - } + if ( create_producer(a, params) ) return NULL; a->stream = strdup(params->stream); asapo_consumer_set_timeout(a->consumer, 3000); -- cgit v1.2.3