aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorThomas White <taw@physics.org>2019-01-29 16:17:19 +0100
committerThomas White <taw@physics.org>2019-01-29 16:17:19 +0100
commita2c2df1885ea9d078ee8ad712d46c0a0392949e2 (patch)
treea8b8047c682e06bfe2114eca64dd90d1202825d5
parent80cccc97eb5f2b57dd76a14df0ac730c10427bb2 (diff)
Tidy up no_more
-rw-r--r--src/im-sandbox.c15
1 files changed, 8 insertions, 7 deletions
diff --git a/src/im-sandbox.c b/src/im-sandbox.c
index 55ed939c..171ecf80 100644
--- a/src/im-sandbox.c
+++ b/src/im-sandbox.c
@@ -189,6 +189,7 @@ static char *read_prefixed_filename(struct get_pattern_ctx *gpctx, char **event)
if ( !feof(gpctx->fh) ) {
ERROR("Input file read error.\n");
}
+ free(line);
return NULL;
}
chomp(line);
@@ -392,7 +393,8 @@ static void run_work(const struct index_args *iargs, Stream *st,
/* Get the event from the queue */
set_last_task(sb->shared->last_task[cookie], "read_queue");
pthread_mutex_lock(&sb->shared->queue_lock);
- if ( sb->shared->no_more ) {
+ if ( (sb->shared->n_events==0) && (sb->shared->no_more) ) {
+ /* Queue is empty and no more coming, so exit */
pthread_mutex_unlock(&sb->shared->queue_lock);
allDone = 1;
continue;
@@ -1003,7 +1005,6 @@ int create_sandbox(struct index_args *iargs, int n_proc, char *prefix,
char semname_q[64];
struct sigaction sa;
int r;
- int no_more = 0;
int allDone = 0;
TimeAccounts *taccs;
struct get_pattern_ctx gpctx;
@@ -1071,8 +1072,7 @@ int create_sandbox(struct index_args *iargs, int n_proc, char *prefix,
/* Fill the queue */
pthread_mutex_lock(&sb->shared->queue_lock);
sb->shared->n_events = 0;
- fill_queue(&gpctx, sb);
- sb->shared->no_more = 0;
+ sb->shared->no_more = fill_queue(&gpctx, sb);
pthread_mutex_unlock(&sb->shared->queue_lock);
/* Fork the right number of times */
@@ -1124,8 +1124,9 @@ int create_sandbox(struct index_args *iargs, int n_proc, char *prefix,
/* Top up the queue if necessary */
time_accounts_set(taccs, TACC_QUEUETOPUP);
pthread_mutex_lock(&sb->shared->queue_lock);
- if ( !no_more && (sb->shared->n_events < QUEUE_SIZE/2) ) {
- if ( fill_queue(&gpctx, sb) ) no_more = 1;
+ if ( !sb->shared->no_more && (sb->shared->n_events < QUEUE_SIZE/2) ) {
+ STATUS("filling: %i %i\n", sb->shared->no_more, sb->shared->n_events);
+ if ( fill_queue(&gpctx, sb) ) sb->shared->no_more = 1;
}
pthread_mutex_unlock(&sb->shared->queue_lock);
@@ -1137,7 +1138,7 @@ int create_sandbox(struct index_args *iargs, int n_proc, char *prefix,
/* Have all the events been swallowed? */
time_accounts_set(taccs, TACC_ENDCHECK);
pthread_mutex_lock(&sb->shared->queue_lock);
- if ( no_more && (sb->shared->n_events == 0) ) allDone = 1;
+ if ( sb->shared->no_more && (sb->shared->n_events == 0) ) allDone = 1;
pthread_mutex_unlock(&sb->shared->queue_lock);
} while ( !allDone );