aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorThomas White <taw@physics.org>2021-04-23 16:26:24 +0200
committerThomas White <taw@physics.org>2021-04-23 16:40:56 +0200
commita378be62b61d4c8a67c5523955babbd3f9d17b5c (patch)
tree18062584caeee30145188e9fe1269837e34434ab
parent4154688f5e91772f9a9c0d001316886b74686d9e (diff)
indexamajig: Generate a unique filename for ZMQ data
The unique filename is needed by the GUI for looking up results in a stream. Otherwise, the filename is "(null)" for everything and the lookup just returns the first chunk in the stream. The filename is generated based on the unique serial number for each chunk, and is therefore unique across one run of indexamajig regardless of the number of worker processes (-j). This might have to change in future to accommodate jobs run across multiple nodes, if there is any demand for looking at results in one big concatenated stream. This also changes the condition for deciding when to look for a 'real' file, to take into account that there is always a non-NULL filename.
-rw-r--r--libcrystfel/src/image.c9
-rw-r--r--libcrystfel/src/image.h1
-rw-r--r--src/im-sandbox.c167
-rw-r--r--src/process_image.c1
4 files changed, 94 insertions, 84 deletions
diff --git a/libcrystfel/src/image.c b/libcrystfel/src/image.c
index ad651cae..b40819ef 100644
--- a/libcrystfel/src/image.c
+++ b/libcrystfel/src/image.c
@@ -669,7 +669,7 @@ int image_set_zero_mask(struct image *image,
static int image_read_image_data(struct image *image,
const DataTemplate *dtempl)
{
- if ( (image->filename != NULL)
+ if ( (image->data_block == NULL)
&& (!file_exists(image->filename)) )
{
ERROR("File not found: %s (read data)\n", image->filename);
@@ -1201,10 +1201,12 @@ struct image *image_read_data_block(const DataTemplate *dtempl,
void *data_block,
size_t data_block_size,
DataSourceType type,
+ int serial,
int no_image_data,
int no_mask_data)
{
struct image *image;
+ char tmp[64];
if ( dtempl == NULL ) {
ERROR("NULL data template!\n");
@@ -1217,8 +1219,9 @@ struct image *image_read_data_block(const DataTemplate *dtempl,
return NULL;
}
- image->filename = NULL;
- image->ev = NULL;
+ snprintf(tmp, 63, "datablock-%i", serial);
+ image->filename = strdup(tmp);
+ image->ev = strdup("//");
image->data_block = data_block;
image->data_block_size = data_block_size;
diff --git a/libcrystfel/src/image.h b/libcrystfel/src/image.h
index 58228e35..045f3e77 100644
--- a/libcrystfel/src/image.h
+++ b/libcrystfel/src/image.h
@@ -216,6 +216,7 @@ extern struct image *image_read_data_block(const DataTemplate *dtempl,
void *data_block,
size_t data_block_size,
DataSourceType type,
+ int serial,
int no_image_data,
int no_mask_data);
extern void image_free(struct image *image);
diff --git a/src/im-sandbox.c b/src/im-sandbox.c
index 7760693a..ae1fe92a 100644
--- a/src/im-sandbox.c
+++ b/src/im-sandbox.c
@@ -348,88 +348,87 @@ static int run_work(const struct index_args *iargs, Stream *st,
struct pattern_args pargs;
int ser;
+ char *line;
+ size_t len;
+ int i;
+ char *event_str = NULL;
+ char *ser_str = NULL;
+ int ok = 1;
+
+ /* Wait until an event is ready */
+ time_accounts_set(taccs, TACC_EVENTWAIT);
+ set_last_task(sb->shared->last_task[cookie], "wait_event");
+ if ( sem_wait(sb->queue_sem) != 0 ) {
+ ERROR("Failed to wait on queue semaphore: %s\n",
+ strerror(errno));
+ }
- if ( !sb->zmq ) {
-
- char *line;
- size_t len;
- int i;
- char *event_str = NULL;
- char *ser_str = NULL;
- int ok = 1;
-
- /* Wait until an event is ready */
- time_accounts_set(taccs, TACC_EVENTWAIT);
- set_last_task(sb->shared->last_task[cookie], "wait_event");
- if ( sem_wait(sb->queue_sem) != 0 ) {
- ERROR("Failed to wait on queue semaphore: %s\n",
- strerror(errno));
- }
-
- /* Get the event from the queue */
- set_last_task(sb->shared->last_task[cookie], "read_queue");
- pthread_mutex_lock(&sb->shared->queue_lock);
- if ( ((sb->shared->n_events==0) && (sb->shared->no_more))
- || (sb->shared->should_shutdown) )
- {
- /* Queue is empty and no more are coming,
- * or another process has initiated a shutdown.
- * Either way, it's time to get out of here. */
- pthread_mutex_unlock(&sb->shared->queue_lock);
- allDone = 1;
- continue;
- }
- if ( sb->shared->n_events == 0 ) {
- ERROR("Got the semaphore, but no events in queue!\n");
- ERROR("no_more = %i\n", sb->shared->no_more);
- pthread_mutex_unlock(&sb->shared->queue_lock);
- allDone = 1;
- continue;
- }
+ /* Get the event from the queue */
+ set_last_task(sb->shared->last_task[cookie], "read_queue");
+ pthread_mutex_lock(&sb->shared->queue_lock);
+ if ( ((sb->shared->n_events==0) && (sb->shared->no_more))
+ || (sb->shared->should_shutdown) )
+ {
+ /* Queue is empty and no more are coming,
+ * or another process has initiated a shutdown.
+ * Either way, it's time to get out of here. */
+ pthread_mutex_unlock(&sb->shared->queue_lock);
+ allDone = 1;
+ continue;
+ }
+ if ( sb->shared->n_events == 0 ) {
+ ERROR("Got the semaphore, but no events in queue!\n");
+ ERROR("no_more = %i\n", sb->shared->no_more);
+ pthread_mutex_unlock(&sb->shared->queue_lock);
+ allDone = 1;
+ continue;
+ }
- line = strdup(sb->shared->queue[0]);
+ line = strdup(sb->shared->queue[0]);
- len = strlen(line);
- assert(len > 1);
- for ( i=len-1; i>0; i-- ) {
- if ( line[i] == ' ' ) {
- line[i] = '\0';
- ser_str = &line[i+1];
- break;
- }
- }
- len = strlen(line);
- assert(len > 1);
- for ( i=len-1; i>0; i-- ) {
- if ( line[i] == ' ' ) {
- line[i] = '\0';
- event_str = &line[i+1];
- break;
- }
+ len = strlen(line);
+ assert(len > 1);
+ for ( i=len-1; i>0; i-- ) {
+ if ( line[i] == ' ' ) {
+ line[i] = '\0';
+ ser_str = &line[i+1];
+ break;
}
- if ( (ser_str != NULL) && (event_str != NULL) ) {
- if ( sscanf(ser_str, "%i", &ser) != 1 ) {
- STATUS("Invalid serial number '%s'\n",
- ser_str);
- ok = 0;
- }
+ }
+ len = strlen(line);
+ assert(len > 1);
+ for ( i=len-1; i>0; i-- ) {
+ if ( line[i] == ' ' ) {
+ line[i] = '\0';
+ event_str = &line[i+1];
+ break;
}
- if ( !ok ) {
- STATUS("Invalid event string '%s'\n",
- sb->shared->queue[0]);
+ }
+ if ( (ser_str != NULL) && (event_str != NULL) ) {
+ if ( sscanf(ser_str, "%i", &ser) != 1 ) {
+ STATUS("Invalid serial number '%s'\n",
+ ser_str);
ok = 0;
}
- memcpy(sb->shared->last_ev[cookie], sb->shared->queue[0],
- MAX_EV_LEN);
- shuffle_events(sb->shared);
- pthread_mutex_unlock(&sb->shared->queue_lock);
+ }
+ if ( !ok ) {
+ STATUS("Invalid event string '%s'\n",
+ sb->shared->queue[0]);
+ ok = 0;
+ }
+ memcpy(sb->shared->last_ev[cookie], sb->shared->queue[0],
+ MAX_EV_LEN);
+ shuffle_events(sb->shared);
+ pthread_mutex_unlock(&sb->shared->queue_lock);
- if ( !ok ) continue;
+ if ( !ok ) continue;
- pargs.filename = strdup(line);
- pargs.event = strdup(event_str);
+ pargs.filename = strdup(line);
+ pargs.event = strdup(event_str);
- free(line);
+ free(line);
+
+ if ( !sb->zmq ) {
pargs.zmq_data = NULL;
pargs.zmq_data_size = 0;
@@ -440,9 +439,11 @@ static int run_work(const struct index_args *iargs, Stream *st,
pargs.zmq_data = im_zmq_fetch(zmqstuff,
&pargs.zmq_data_size);
} while ( pargs.zmq_data_size < 15 );
- pargs.filename = strdup("(from ZMQ)");
- pargs.event = NULL;
- ser = 0; /* FIXME: Serial numbers from ZMQ? */
+
+ /* The filename/event, which will be 'fake' values in
+ * this case, still came via the event queue. More
+ * importantly, the event queue gave us a unique
+ * serial number for this image. */
}
@@ -820,17 +821,21 @@ static int setup_shm(struct sandbox *sb)
/* Assumes the caller is already holding queue_lock! */
static int fill_queue(struct get_pattern_ctx *gpctx, struct sandbox *sb)
{
- if ( sb->zmq ) {
- /* Do nothing */
- return 0;
- }
-
while ( sb->shared->n_events < QUEUE_SIZE ) {
char *filename;
char *evstr;
- if ( !get_pattern(gpctx, &filename, &evstr) ) return 1;
+ if ( sb->zmq ) {
+ /* These values will be passed down to process_image,
+ * but ignored. The 'real' filename, which is still a
+ * 'fake' filename - only for accounting purposes - will
+ * be generated by image_read_data_block(). */
+ filename = "ZMQdata";
+ evstr = strdup("//");
+ } else {
+ if ( !get_pattern(gpctx, &filename, &evstr) ) return 1;
+ }
memset(sb->shared->queue[sb->shared->n_events], 0, MAX_EV_LEN);
snprintf(sb->shared->queue[sb->shared->n_events++], MAX_EV_LEN,
diff --git a/src/process_image.c b/src/process_image.c
index 188a4829..281cbab5 100644
--- a/src/process_image.c
+++ b/src/process_image.c
@@ -199,6 +199,7 @@ void process_image(const struct index_args *iargs, struct pattern_args *pargs,
pargs->zmq_data,
pargs->zmq_data_size,
DST_MSGPACK,
+ serial,
iargs->no_image_data,
iargs->no_mask_data);
if ( image == NULL ) return;