From df3224acef436c70fd95ac1f990f4dd2a68c3206 Mon Sep 17 00:00:00 2001 From: Thomas White Date: Mon, 7 Nov 2022 10:30:16 +0100 Subject: indexamajig: Exit only once all workers received kEndOfStream Previously, the whole program would exit if any worker saw kEndOfStream. Unfortunately, this happens quite often, due to data starvation (too many workers for the data rate) or just general slowness. Therefore we need a more robust criterion. --- src/im-sandbox.c | 30 ++++++++++++++++++++++++++---- src/im-sandbox.h | 1 + 2 files changed, 27 insertions(+), 4 deletions(-) (limited to 'src') diff --git a/src/im-sandbox.c b/src/im-sandbox.c index 8231b041..0602da26 100644 --- a/src/im-sandbox.c +++ b/src/im-sandbox.c @@ -488,10 +488,10 @@ static int run_work(const struct index_args *iargs, Stream *st, free(pargs.event); pargs.filename = filename; pargs.event = event; + sb->shared->end_of_stream[cookie] = 0; } else { if ( finished ) { - sb->shared->should_shutdown = 1; - allDone = 1; + sb->shared->end_of_stream[cookie] = 1; } } @@ -688,10 +688,13 @@ static void start_worker_process(struct sandbox *sb, int slot) return; } + pthread_mutex_lock(&sb->shared->queue_lock); sb->shared->pings[slot] = 0; + sb->shared->end_of_stream[slot] = 0; sb->last_ping[slot] = 0; sb->shared->time_last_start[slot] = get_monotonic_seconds(); sb->shared->warned_long_running[slot] = 0; + pthread_mutex_unlock(&sb->shared->queue_lock); p = fork(); if ( p == -1 ) { @@ -1118,6 +1121,17 @@ char *create_tempdir(const char *temp_location) } +/* Call under queue_lock */ +static int all_got_end_of_stream(struct sandbox *sb) +{ + int i; + for ( i=0; in_proc; i++ ) { + if ( !sb->shared->end_of_stream[i] ) return 0; + } + return 1; +} + + /* Returns the number of frames processed (not necessarily indexed). * If the return value is zero, something is probably wrong. */ int create_sandbox(struct index_args *iargs, int n_proc, char *prefix, @@ -1284,16 +1298,24 @@ int create_sandbox(struct index_args *iargs, int n_proc, char *prefix, /* Update progress */ try_status(sb, 0); - /* Have all the events been swallowed? */ + /* Begin exit criterion checking */ pthread_mutex_lock(&sb->shared->queue_lock); + + /* Case 1: Queue empty and no more coming? */ if ( sb->shared->no_more && (sb->shared->n_events == 0) ) allDone = 1; + + /* Case 2: Worker process requested immediate shutdown */ if ( sb->shared->should_shutdown ) { - /* Worker process requested immediate shutdown */ allDone = 1; sb->shared->n_events = 0; sb->shared->no_more = 1; } + + /* Case 3: All workers saw end of (ASAP::O) stream */ + if ( all_got_end_of_stream(sb) ) allDone = 1; + pthread_mutex_unlock(&sb->shared->queue_lock); + /* End exit criterion checking */ } while ( !allDone ); diff --git a/src/im-sandbox.h b/src/im-sandbox.h index 52094a3d..a6adddd5 100644 --- a/src/im-sandbox.h +++ b/src/im-sandbox.h @@ -69,6 +69,7 @@ struct sb_shm char last_ev[MAX_NUM_WORKERS][MAX_EV_LEN]; char last_task[MAX_NUM_WORKERS][MAX_TASK_LEN]; int pings[MAX_NUM_WORKERS]; + int end_of_stream[MAX_NUM_WORKERS]; time_t time_last_start[MAX_NUM_WORKERS]; int warned_long_running[MAX_NUM_WORKERS]; -- cgit v1.2.3