diff options
Diffstat (limited to 'src/im-asapo.c')
-rw-r--r-- | src/im-asapo.c | 113 |
1 files changed, 112 insertions, 1 deletions
diff --git a/src/im-asapo.c b/src/im-asapo.c index 43455298..37784bf3 100644 --- a/src/im-asapo.c +++ b/src/im-asapo.c @@ -36,6 +36,7 @@ #include <assert.h> #include <unistd.h> #include <asapo/consumer_c.h> +#include <asapo/producer_c.h> #include <image.h> #include <utils.h> @@ -50,8 +51,10 @@ struct im_asapo { char *stream; AsapoConsumerHandle consumer; + AsapoProducerHandle producer; AsapoStringHandle group_id; int wait_for_stream; + char *output_stream; }; @@ -96,6 +99,10 @@ struct im_asapo *im_asapo_connect(struct im_asapo_params *params) ERROR("ASAP::O stream not specified.\n"); return NULL; } + if ( strcmp(params->stream, params->output_stream) == 0 ) { + ERROR("ASAP::O input and output streams cannot be the same.\n"); + return NULL; + } a = malloc(sizeof(struct im_asapo)); if ( a == NULL ) return NULL; @@ -108,18 +115,41 @@ struct im_asapo *im_asapo_connect(struct im_asapo_params *params) params->source, params->token); a->consumer = asapo_create_consumer(params->endpoint, "auto", 0, cred, &err); - asapo_free_handle(&cred); if ( asapo_is_error(err) ) { show_asapo_error("Cannot create ASAP::O consumer", err); + asapo_free_handle(&cred); free(a); return NULL; } + if ( params->output_stream != NULL ) { + a->producer = asapo_create_producer(params->endpoint, + 1, /* Number of sender threads */ + kTcp, + cred, + 60000, /* Timeout */ + &err); + if ( asapo_is_error(err) ) { + show_asapo_error("Cannot create ASAP::O producer", err); + asapo_free_handle(&a->consumer); + asapo_free_handle(&a->group_id); + asapo_free_handle(&cred); + free(a); + return NULL; + } + a->output_stream = strdup(params->output_stream); + } else { + a->producer = NULL; + a->output_stream = NULL; + } + a->stream = strdup(params->stream); asapo_consumer_set_timeout(a->consumer, 3000); a->group_id = asapo_string_from_c_str(params->group_id); a->wait_for_stream = params->wait_for_stream; + asapo_free_handle(&cred); + return a; } @@ -216,6 +246,87 @@ void *im_asapo_fetch(struct im_asapo *a, size_t *pdata_size, } +static void send_callback(void *a, AsapoRequestCallbackPayloadHandle payload, + AsapoErrorHandle err) +{ + if ( asapo_is_error(err) ) { + show_asapo_error("ASAP::O send error", err); + } else if ( err != NULL ) { + show_asapo_error("ASAP::O send warning", err); + } +} + + +static void send_real(struct im_asapo *a, struct image *image) +{ + AsapoMessageHeaderHandle header; + AsapoErrorHandle err; + + header = asapo_create_message_header(image->serial, + image->data_block_size, + image->filename, + image->meta_data, + 0, /* Dataset substream */ + 0, + 0); /* Auto ID */ + + asapo_producer_send(a->producer, header, image->data_block, + kDefaultIngestMode, a->output_stream, + send_callback, &err); + if ( asapo_is_error(err) ) { + show_asapo_error("Couldn't ASAP::O message", err); + asapo_free_handle(&header); + asapo_free_handle(&err); + return; + } + + asapo_free_handle(&header); + asapo_free_handle(&err); +} + + +static void send_placeholder(struct im_asapo *a, struct image *image) +{ + AsapoMessageHeaderHandle header; + AsapoErrorHandle err; + + header = asapo_create_message_header(image->serial, + 8, /* strlen("SKIPPED"+\0) */ + image->filename, + image->meta_data, + 0, /* Dataset substream */ + 0, + 0); /* Auto ID */ + + asapo_producer_send(a->producer, header, "SKIPPED", + kDefaultIngestMode, a->output_stream, + send_callback, &err); + if ( asapo_is_error(err) ) { + show_asapo_error("Couldn't ASAP::O message", err); + asapo_free_handle(&header); + asapo_free_handle(&err); + return; + } + + asapo_free_handle(&header); + asapo_free_handle(&err); +} + + +/* Send the image to the output ASAP::O stream, if it's a hit. Otherwise, + * send a placeholder */ +void im_asapo_send(struct im_asapo *a, struct image *image, int hit) +{ + profile_start("asapo-send"); + if ( hit ) { + send_real(a, image); + } else { + send_placeholder(a, image); + } + profile_end("asapo-send"); +} + + void im_asapo_shutdown(struct im_asapo *a) { if ( a == NULL ) return; |