aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorThomas White <taw@physics.org>2022-07-06 15:28:38 +0200
committerThomas White <taw@physics.org>2022-07-06 16:15:05 +0200
commit5e06310da1d04436a27354f3d305a520ce993c1d (patch)
tree6d2fc2d6e22b94a68c8bab5ad352063686997ee1 /src
parente1bee0847d0843ca028106dc8940df72e8690eb1 (diff)
indexamajig: Add timeout for ZMQ socket
Diffstat (limited to 'src')
-rw-r--r--src/im-sandbox.c11
-rw-r--r--src/im-zmq.c15
2 files changed, 18 insertions, 8 deletions
diff --git a/src/im-sandbox.c b/src/im-sandbox.c
index 5c4629ce..a9722509 100644
--- a/src/im-sandbox.c
+++ b/src/im-sandbox.c
@@ -452,14 +452,13 @@ static int run_work(const struct index_args *iargs, Stream *st,
profile_start("zmq-fetch");
set_last_task(sb->shared->last_task[cookie], "ZMQ fetch");
- do {
- pargs.zmq_data = im_zmq_fetch(zmqstuff,
- &pargs.zmq_data_size);
- } while ( pargs.zmq_data_size < 15 );
- ok = 1;
-
+ pargs.zmq_data = im_zmq_fetch(zmqstuff,
+ &pargs.zmq_data_size);
profile_end("zmq-fetch");
+ if ( (pargs.zmq_data != NULL)
+ && (pargs.zmq_data_size > 15) ) ok = 1;
+
/* The filename/event, which will be 'fake' values in
* this case, still came via the event queue. More
* importantly, the event queue gave us a unique
diff --git a/src/im-zmq.c b/src/im-zmq.c
index b326e868..af70e7ae 100644
--- a/src/im-zmq.c
+++ b/src/im-zmq.c
@@ -53,6 +53,7 @@ struct im_zmq
void *socket;
zmq_msg_t msg;
const char *request_str;
+ int request_sent;
};
@@ -87,6 +88,11 @@ struct im_zmq *im_zmq_connect(struct im_zmq_params *params)
return NULL;
}
+ int timeout = 3000;
+ zmq_setsockopt(z->socket, ZMQ_RCVTIMEO, &timeout, sizeof(timeout));
+ int linger = 0;;
+ zmq_setsockopt(z->socket, ZMQ_LINGER, &linger, sizeof(linger));
+
if ( params->request == NULL ) {
int i;
@@ -114,6 +120,7 @@ struct im_zmq *im_zmq_connect(struct im_zmq_params *params)
/* REQ mode */
z->request_str = params->request;
+ z->request_sent = 0;
}
@@ -126,7 +133,7 @@ void *im_zmq_fetch(struct im_zmq *z, size_t *pdata_size)
int msg_size;
void *data_copy;
- if ( z->request_str != NULL ) {
+ if ( (z->request_str != NULL) && !z->request_sent ) {
/* Send the request */
if ( zmq_send(z->socket, z->request_str,
@@ -136,13 +143,17 @@ void *im_zmq_fetch(struct im_zmq *z, size_t *pdata_size)
zmq_strerror(errno));
return NULL;
}
+
+ z->request_sent = 1;
}
/* Receive message */
zmq_msg_init(&z->msg);
msg_size = zmq_msg_recv(&z->msg, z->socket, 0);
if ( msg_size == -1 ) {
- ERROR("ZMQ recieve failed: %s\n", zmq_strerror(errno));
+ if ( errno != EAGAIN ) {
+ ERROR("ZMQ recieve failed: %s\n", zmq_strerror(errno));
+ }
zmq_msg_close(&z->msg);
return NULL;
}