aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorThomas White <taw@physics.org>2021-04-23 16:26:24 +0200
committerThomas White <taw@physics.org>2021-04-23 16:40:56 +0200
commita378be62b61d4c8a67c5523955babbd3f9d17b5c (patch)
tree18062584caeee30145188e9fe1269837e34434ab /src
parent4154688f5e91772f9a9c0d001316886b74686d9e (diff)
indexamajig: Generate a unique filename for ZMQ data
The unique filename is needed by the GUI for looking up results in a stream. Otherwise, the filename is "(null)" for everything and the lookup just returns the first chunk in the stream. The filename is generated based on the unique serial number for each chunk, and is therefore unique across one run of indexamajig regardless of the number of worker processes (-j). This might have to change in future to accommodate jobs run across multiple nodes, if there is any demand for looking at results in one big concatenated stream. This also changes the condition for deciding when to look for a 'real' file, to take into account that there is always a non-NULL filename.
Diffstat (limited to 'src')
-rw-r--r--src/im-sandbox.c167
-rw-r--r--src/process_image.c1
2 files changed, 87 insertions, 81 deletions
diff --git a/src/im-sandbox.c b/src/im-sandbox.c
index 7760693a..ae1fe92a 100644
--- a/src/im-sandbox.c
+++ b/src/im-sandbox.c
@@ -348,88 +348,87 @@ static int run_work(const struct index_args *iargs, Stream *st,
struct pattern_args pargs;
int ser;
+ char *line;
+ size_t len;
+ int i;
+ char *event_str = NULL;
+ char *ser_str = NULL;
+ int ok = 1;
+
+ /* Wait until an event is ready */
+ time_accounts_set(taccs, TACC_EVENTWAIT);
+ set_last_task(sb->shared->last_task[cookie], "wait_event");
+ if ( sem_wait(sb->queue_sem) != 0 ) {
+ ERROR("Failed to wait on queue semaphore: %s\n",
+ strerror(errno));
+ }
- if ( !sb->zmq ) {
-
- char *line;
- size_t len;
- int i;
- char *event_str = NULL;
- char *ser_str = NULL;
- int ok = 1;
-
- /* Wait until an event is ready */
- time_accounts_set(taccs, TACC_EVENTWAIT);
- set_last_task(sb->shared->last_task[cookie], "wait_event");
- if ( sem_wait(sb->queue_sem) != 0 ) {
- ERROR("Failed to wait on queue semaphore: %s\n",
- strerror(errno));
- }
-
- /* Get the event from the queue */
- set_last_task(sb->shared->last_task[cookie], "read_queue");
- pthread_mutex_lock(&sb->shared->queue_lock);
- if ( ((sb->shared->n_events==0) && (sb->shared->no_more))
- || (sb->shared->should_shutdown) )
- {
- /* Queue is empty and no more are coming,
- * or another process has initiated a shutdown.
- * Either way, it's time to get out of here. */
- pthread_mutex_unlock(&sb->shared->queue_lock);
- allDone = 1;
- continue;
- }
- if ( sb->shared->n_events == 0 ) {
- ERROR("Got the semaphore, but no events in queue!\n");
- ERROR("no_more = %i\n", sb->shared->no_more);
- pthread_mutex_unlock(&sb->shared->queue_lock);
- allDone = 1;
- continue;
- }
+ /* Get the event from the queue */
+ set_last_task(sb->shared->last_task[cookie], "read_queue");
+ pthread_mutex_lock(&sb->shared->queue_lock);
+ if ( ((sb->shared->n_events==0) && (sb->shared->no_more))
+ || (sb->shared->should_shutdown) )
+ {
+ /* Queue is empty and no more are coming,
+ * or another process has initiated a shutdown.
+ * Either way, it's time to get out of here. */
+ pthread_mutex_unlock(&sb->shared->queue_lock);
+ allDone = 1;
+ continue;
+ }
+ if ( sb->shared->n_events == 0 ) {
+ ERROR("Got the semaphore, but no events in queue!\n");
+ ERROR("no_more = %i\n", sb->shared->no_more);
+ pthread_mutex_unlock(&sb->shared->queue_lock);
+ allDone = 1;
+ continue;
+ }
- line = strdup(sb->shared->queue[0]);
+ line = strdup(sb->shared->queue[0]);
- len = strlen(line);
- assert(len > 1);
- for ( i=len-1; i>0; i-- ) {
- if ( line[i] == ' ' ) {
- line[i] = '\0';
- ser_str = &line[i+1];
- break;
- }
- }
- len = strlen(line);
- assert(len > 1);
- for ( i=len-1; i>0; i-- ) {
- if ( line[i] == ' ' ) {
- line[i] = '\0';
- event_str = &line[i+1];
- break;
- }
+ len = strlen(line);
+ assert(len > 1);
+ for ( i=len-1; i>0; i-- ) {
+ if ( line[i] == ' ' ) {
+ line[i] = '\0';
+ ser_str = &line[i+1];
+ break;
}
- if ( (ser_str != NULL) && (event_str != NULL) ) {
- if ( sscanf(ser_str, "%i", &ser) != 1 ) {
- STATUS("Invalid serial number '%s'\n",
- ser_str);
- ok = 0;
- }
+ }
+ len = strlen(line);
+ assert(len > 1);
+ for ( i=len-1; i>0; i-- ) {
+ if ( line[i] == ' ' ) {
+ line[i] = '\0';
+ event_str = &line[i+1];
+ break;
}
- if ( !ok ) {
- STATUS("Invalid event string '%s'\n",
- sb->shared->queue[0]);
+ }
+ if ( (ser_str != NULL) && (event_str != NULL) ) {
+ if ( sscanf(ser_str, "%i", &ser) != 1 ) {
+ STATUS("Invalid serial number '%s'\n",
+ ser_str);
ok = 0;
}
- memcpy(sb->shared->last_ev[cookie], sb->shared->queue[0],
- MAX_EV_LEN);
- shuffle_events(sb->shared);
- pthread_mutex_unlock(&sb->shared->queue_lock);
+ }
+ if ( !ok ) {
+ STATUS("Invalid event string '%s'\n",
+ sb->shared->queue[0]);
+ ok = 0;
+ }
+ memcpy(sb->shared->last_ev[cookie], sb->shared->queue[0],
+ MAX_EV_LEN);
+ shuffle_events(sb->shared);
+ pthread_mutex_unlock(&sb->shared->queue_lock);
- if ( !ok ) continue;
+ if ( !ok ) continue;
- pargs.filename = strdup(line);
- pargs.event = strdup(event_str);
+ pargs.filename = strdup(line);
+ pargs.event = strdup(event_str);
- free(line);
+ free(line);
+
+ if ( !sb->zmq ) {
pargs.zmq_data = NULL;
pargs.zmq_data_size = 0;
@@ -440,9 +439,11 @@ static int run_work(const struct index_args *iargs, Stream *st,
pargs.zmq_data = im_zmq_fetch(zmqstuff,
&pargs.zmq_data_size);
} while ( pargs.zmq_data_size < 15 );
- pargs.filename = strdup("(from ZMQ)");
- pargs.event = NULL;
- ser = 0; /* FIXME: Serial numbers from ZMQ? */
+
+ /* The filename/event, which will be 'fake' values in
+ * this case, still came via the event queue. More
+ * importantly, the event queue gave us a unique
+ * serial number for this image. */
}
@@ -820,17 +821,21 @@ static int setup_shm(struct sandbox *sb)
/* Assumes the caller is already holding queue_lock! */
static int fill_queue(struct get_pattern_ctx *gpctx, struct sandbox *sb)
{
- if ( sb->zmq ) {
- /* Do nothing */
- return 0;
- }
-
while ( sb->shared->n_events < QUEUE_SIZE ) {
char *filename;
char *evstr;
- if ( !get_pattern(gpctx, &filename, &evstr) ) return 1;
+ if ( sb->zmq ) {
+ /* These values will be passed down to process_image,
+ * but ignored. The 'real' filename, which is still a
+ * 'fake' filename - only for accounting purposes - will
+ * be generated by image_read_data_block(). */
+ filename = "ZMQdata";
+ evstr = strdup("//");
+ } else {
+ if ( !get_pattern(gpctx, &filename, &evstr) ) return 1;
+ }
memset(sb->shared->queue[sb->shared->n_events], 0, MAX_EV_LEN);
snprintf(sb->shared->queue[sb->shared->n_events++], MAX_EV_LEN,
diff --git a/src/process_image.c b/src/process_image.c
index 188a4829..281cbab5 100644
--- a/src/process_image.c
+++ b/src/process_image.c
@@ -199,6 +199,7 @@ void process_image(const struct index_args *iargs, struct pattern_args *pargs,
pargs->zmq_data,
pargs->zmq_data_size,
DST_MSGPACK,
+ serial,
iargs->no_image_data,
iargs->no_mask_data);
if ( image == NULL ) return;