aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/im-sandbox.c30
-rw-r--r--src/im-sandbox.h1
2 files changed, 27 insertions, 4 deletions
diff --git a/src/im-sandbox.c b/src/im-sandbox.c
index 8231b041..0602da26 100644
--- a/src/im-sandbox.c
+++ b/src/im-sandbox.c
@@ -488,10 +488,10 @@ static int run_work(const struct index_args *iargs, Stream *st,
free(pargs.event);
pargs.filename = filename;
pargs.event = event;
+ sb->shared->end_of_stream[cookie] = 0;
} else {
if ( finished ) {
- sb->shared->should_shutdown = 1;
- allDone = 1;
+ sb->shared->end_of_stream[cookie] = 1;
}
}
@@ -688,10 +688,13 @@ static void start_worker_process(struct sandbox *sb, int slot)
return;
}
+ pthread_mutex_lock(&sb->shared->queue_lock);
sb->shared->pings[slot] = 0;
+ sb->shared->end_of_stream[slot] = 0;
sb->last_ping[slot] = 0;
sb->shared->time_last_start[slot] = get_monotonic_seconds();
sb->shared->warned_long_running[slot] = 0;
+ pthread_mutex_unlock(&sb->shared->queue_lock);
p = fork();
if ( p == -1 ) {
@@ -1118,6 +1121,17 @@ char *create_tempdir(const char *temp_location)
}
+/* Call under queue_lock */
+static int all_got_end_of_stream(struct sandbox *sb)
+{
+ int i;
+ for ( i=0; i<sb->n_proc; i++ ) {
+ if ( !sb->shared->end_of_stream[i] ) return 0;
+ }
+ return 1;
+}
+
+
/* Returns the number of frames processed (not necessarily indexed).
* If the return value is zero, something is probably wrong. */
int create_sandbox(struct index_args *iargs, int n_proc, char *prefix,
@@ -1284,16 +1298,24 @@ int create_sandbox(struct index_args *iargs, int n_proc, char *prefix,
/* Update progress */
try_status(sb, 0);
- /* Have all the events been swallowed? */
+ /* Begin exit criterion checking */
pthread_mutex_lock(&sb->shared->queue_lock);
+
+ /* Case 1: Queue empty and no more coming? */
if ( sb->shared->no_more && (sb->shared->n_events == 0) ) allDone = 1;
+
+ /* Case 2: Worker process requested immediate shutdown */
if ( sb->shared->should_shutdown ) {
- /* Worker process requested immediate shutdown */
allDone = 1;
sb->shared->n_events = 0;
sb->shared->no_more = 1;
}
+
+ /* Case 3: All workers saw end of (ASAP::O) stream */
+ if ( all_got_end_of_stream(sb) ) allDone = 1;
+
pthread_mutex_unlock(&sb->shared->queue_lock);
+ /* End exit criterion checking */
} while ( !allDone );
diff --git a/src/im-sandbox.h b/src/im-sandbox.h
index 52094a3d..a6adddd5 100644
--- a/src/im-sandbox.h
+++ b/src/im-sandbox.h
@@ -69,6 +69,7 @@ struct sb_shm
char last_ev[MAX_NUM_WORKERS][MAX_EV_LEN];
char last_task[MAX_NUM_WORKERS][MAX_TASK_LEN];
int pings[MAX_NUM_WORKERS];
+ int end_of_stream[MAX_NUM_WORKERS];
time_t time_last_start[MAX_NUM_WORKERS];
int warned_long_running[MAX_NUM_WORKERS];