diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/im-sandbox.c | 30 | ||||
-rw-r--r-- | src/im-sandbox.h | 1 |
2 files changed, 27 insertions, 4 deletions
diff --git a/src/im-sandbox.c b/src/im-sandbox.c index 8231b041..0602da26 100644 --- a/src/im-sandbox.c +++ b/src/im-sandbox.c @@ -488,10 +488,10 @@ static int run_work(const struct index_args *iargs, Stream *st, free(pargs.event); pargs.filename = filename; pargs.event = event; + sb->shared->end_of_stream[cookie] = 0; } else { if ( finished ) { - sb->shared->should_shutdown = 1; - allDone = 1; + sb->shared->end_of_stream[cookie] = 1; } } @@ -688,10 +688,13 @@ static void start_worker_process(struct sandbox *sb, int slot) return; } + pthread_mutex_lock(&sb->shared->queue_lock); sb->shared->pings[slot] = 0; + sb->shared->end_of_stream[slot] = 0; sb->last_ping[slot] = 0; sb->shared->time_last_start[slot] = get_monotonic_seconds(); sb->shared->warned_long_running[slot] = 0; + pthread_mutex_unlock(&sb->shared->queue_lock); p = fork(); if ( p == -1 ) { @@ -1118,6 +1121,17 @@ char *create_tempdir(const char *temp_location) } +/* Call under queue_lock */ +static int all_got_end_of_stream(struct sandbox *sb) +{ + int i; + for ( i=0; i<sb->n_proc; i++ ) { + if ( !sb->shared->end_of_stream[i] ) return 0; + } + return 1; +} + + /* Returns the number of frames processed (not necessarily indexed). * If the return value is zero, something is probably wrong. */ int create_sandbox(struct index_args *iargs, int n_proc, char *prefix, @@ -1284,16 +1298,24 @@ int create_sandbox(struct index_args *iargs, int n_proc, char *prefix, /* Update progress */ try_status(sb, 0); - /* Have all the events been swallowed? */ + /* Begin exit criterion checking */ pthread_mutex_lock(&sb->shared->queue_lock); + + /* Case 1: Queue empty and no more coming? */ if ( sb->shared->no_more && (sb->shared->n_events == 0) ) allDone = 1; + + /* Case 2: Worker process requested immediate shutdown */ if ( sb->shared->should_shutdown ) { - /* Worker process requested immediate shutdown */ allDone = 1; sb->shared->n_events = 0; sb->shared->no_more = 1; } + + /* Case 3: All workers saw end of (ASAP::O) stream */ + if ( all_got_end_of_stream(sb) ) allDone = 1; + pthread_mutex_unlock(&sb->shared->queue_lock); + /* End exit criterion checking */ } while ( !allDone ); diff --git a/src/im-sandbox.h b/src/im-sandbox.h index 52094a3d..a6adddd5 100644 --- a/src/im-sandbox.h +++ b/src/im-sandbox.h @@ -69,6 +69,7 @@ struct sb_shm char last_ev[MAX_NUM_WORKERS][MAX_EV_LEN]; char last_task[MAX_NUM_WORKERS][MAX_TASK_LEN]; int pings[MAX_NUM_WORKERS]; + int end_of_stream[MAX_NUM_WORKERS]; time_t time_last_start[MAX_NUM_WORKERS]; int warned_long_running[MAX_NUM_WORKERS]; |