aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/im-asapo.c20
-rw-r--r--src/im-asapo.h1
-rw-r--r--src/im-sandbox.c9
-rw-r--r--src/indexamajig.c11
4 files changed, 29 insertions, 12 deletions
diff --git a/src/im-asapo.c b/src/im-asapo.c
index 6df14e93..9b57883c 100644
--- a/src/im-asapo.c
+++ b/src/im-asapo.c
@@ -55,6 +55,7 @@ struct im_asapo
AsapoProducerHandle producer;
AsapoStringHandle group_id;
int wait_for_stream;
+ int use_ack;
};
@@ -173,7 +174,10 @@ struct im_asapo *im_asapo_connect(struct im_asapo_params *params)
a->group_id = asapo_string_from_c_str(params->group_id);
a->wait_for_stream = params->wait_for_stream;
- asapo_consumer_set_resend_nacs(a->consumer, 1, 10000, 3);
+ a->use_ack = params->use_ack;
+ if ( a->use_ack ) {
+ asapo_consumer_set_resend_nacs(a->consumer, 1, 60000, 3);
+ }
asapo_free_handle(&cred);
@@ -275,13 +279,15 @@ void *im_asapo_fetch(struct im_asapo *a, size_t *pdata_size,
void im_asapo_finalise(struct im_asapo *a, uint64_t message_id)
{
- AsapoErrorHandle err = asapo_new_handle();
- asapo_consumer_acknowledge(a->consumer, a->group_id, message_id,
- a->stream, &err);
- if ( asapo_is_error(err) ) {
- show_asapo_error("Couldn't acknowledge ASAP::O message", err);
+ if ( a->use_ack ) {
+ AsapoErrorHandle err = asapo_new_handle();
+ asapo_consumer_acknowledge(a->consumer, a->group_id, message_id,
+ a->stream, &err);
+ if ( asapo_is_error(err) ) {
+ show_asapo_error("Couldn't acknowledge ASAP::O message", err);
+ }
+ asapo_free_handle(&err);
}
- asapo_free_handle(&err);
}
diff --git a/src/im-asapo.h b/src/im-asapo.h
index d06f9527..70f67fd0 100644
--- a/src/im-asapo.h
+++ b/src/im-asapo.h
@@ -45,6 +45,7 @@ struct im_asapo_params
int wait_for_stream;
int write_output_stream;
int consumer_timeout_ms;
+ int use_ack;
};
struct im_asapo;
diff --git a/src/im-sandbox.c b/src/im-sandbox.c
index 3d9f1f96..2fa19820 100644
--- a/src/im-sandbox.c
+++ b/src/im-sandbox.c
@@ -343,7 +343,6 @@ static int run_work(const struct index_args *iargs, Stream *st,
struct im_asapo *asapostuff = NULL;
Mille *mille;
ImageDataArrays *ida;
- int asapo_message_id;
if ( sb->profile ) {
profile_init();
@@ -491,6 +490,7 @@ static int run_work(const struct index_args *iargs, Stream *st,
char *filename;
char *event;
int finished = 0;
+ int asapo_message_id;
profile_start("asapo-fetch");
set_last_task(sb->shared->last_task[cookie], "ASAPO fetch");
@@ -533,10 +533,11 @@ static int run_work(const struct index_args *iargs, Stream *st,
sb->shared, sb->shared->last_task[cookie],
asapostuff, mille, ida);
profile_end("process-image");
- }
- if ( sb->asapo_params != NULL ) {
- im_asapo_finalise(asapostuff, asapo_message_id);
+ if ( sb->asapo_params != NULL ) {
+ im_asapo_finalise(asapostuff, ser);
+ }
+
}
/* NB pargs.zmq_data, pargs.asapo_data and pargs.asapo_meta
diff --git a/src/indexamajig.c b/src/indexamajig.c
index aa0db210..00b15b55 100644
--- a/src/indexamajig.c
+++ b/src/indexamajig.c
@@ -474,6 +474,10 @@ static error_t parse_arg(int key, char *arg, struct argp_state *state)
}
break;
+ case 226 :
+ args->asapo_params.use_ack = 1;
+ break;
+
/* ---------- Peak search ---------- */
case 't' :
@@ -932,6 +936,7 @@ int main(int argc, char *argv[])
args.asapo_params.wait_for_stream = 0;
args.asapo_params.write_output_stream = 0;
args.asapo_params.consumer_timeout_ms = 500;
+ args.asapo_params.use_ack = 0;
args.cpu_pin = 0;
args.serial_start = 1;
args.if_peaks = 1;
@@ -1065,6 +1070,7 @@ int main(int argc, char *argv[])
"Shut down after this many seconds without ASAP::O data"},
{"asapo-consumer-timeout", 225, "ms", OPTION_NO_USAGE,
"ASAP::O get_next timeout for one frame (milliseconds)"},
+ {"asapo-acks", 226, NULL, OPTION_NO_USAGE, "Use ASAP::O acknowledgements"},
{NULL, 0, 0, OPTION_DOC, "Peak search options:", 3},
{"peaks", 301, "method", 0, "Peak search method. Default: zaef"},
@@ -1241,7 +1247,10 @@ int main(int argc, char *argv[])
return 1;
}
- if ( (args.filename != NULL) && is_hdf5_file(args.filename, &err) ) {
+ if ( (args.filename != NULL)
+ && (strcmp(args.filename, "-") != 0)
+ && is_hdf5_file(args.filename, &err) )
+ {
ERROR("Your input file appears to be an HDF5 file.\n");
ERROR("The input file should be a list of data files, not the "
"data file itself.\n");