diff options
author | Thomas White <taw@physics.org> | 2015-07-11 18:26:51 +0200 |
---|---|---|
committer | Thomas White <taw@physics.org> | 2015-07-13 16:00:46 +0200 |
commit | 59b0708b8b89e921ee6914e072ff71eb121d7c41 (patch) | |
tree | 2930fd7d44756b441cc19f6923a36cfd19aacde1 /src | |
parent | cca5d6e35b0ab653b333424abf819b4a874cf911 (diff) |
Clean up semaphore on interrupt
Necessary since we encourage users to interrupt indexamajig with Ctrl+C
Also, generally improve the signal handling.
Diffstat (limited to 'src')
-rw-r--r-- | src/im-sandbox.c | 143 |
1 files changed, 92 insertions, 51 deletions
diff --git a/src/im-sandbox.c b/src/im-sandbox.c index 6464202c..5e9c4838 100644 --- a/src/im-sandbox.c +++ b/src/im-sandbox.c @@ -96,10 +96,6 @@ struct sandbox }; -/* Horrible global variable for signal handler */ -sem_t *zombie_sem; - - static struct filename_plus_event *get_pattern(FILE *fh, int config_basename, struct detector *det, const char *prefix) @@ -456,8 +452,8 @@ static void try_read(struct sandbox *sb) int fdmax; const int ofd = get_stream_fd(sb->stream); - tv.tv_sec = 5; - tv.tv_usec = 0; + tv.tv_sec = 0; + tv.tv_usec = 500000; FD_ZERO(&fds); fdmax = 0; @@ -523,21 +519,31 @@ static void start_worker_process(struct sandbox *sb, int slot) size_t ll; int i; - /* First, disconnect the signal handler */ - sa.sa_flags = 0; - sigemptyset(&sa.sa_mask); - sa.sa_handler = SIG_DFL; - r = sigaction(SIGCHLD, &sa, NULL); - if ( r == -1 ) { + /* First, disconnect the signal handlers */ + sa.sa_flags = 0; + sigemptyset(&sa.sa_mask); + sa.sa_handler = SIG_DFL; + r = sigaction(SIGCHLD, &sa, NULL); + if ( r == -1 ) { ERROR("Failed to set signal handler!\n"); - return; - } + exit(1); + } + r = sigaction(SIGINT, &sa, NULL); + if ( r == -1 ) { + ERROR("Failed to set signal handler!\n"); + exit(1); + } + r = sigaction(SIGQUIT, &sa, NULL); + if ( r == -1 ) { + ERROR("Failed to set signal handler!\n"); + exit(1); + } ll = 64 + strlen(sb->tmpdir); tmp = malloc(ll); if ( tmp == NULL ) { ERROR("Failed to allocate temporary dir\n"); - return; + exit(1); } snprintf(tmp, 63, "%s/worker.%i", sb->tmpdir, slot); @@ -545,14 +551,14 @@ static void start_worker_process(struct sandbox *sb, int slot) if ( stat(tmp, &s) == -1 ) { if ( errno != ENOENT ) { ERROR("Failed to stat temporary folder.\n"); - return; + exit(1); } r = mkdir(tmp, S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH); if ( r ) { ERROR("Failed to create temporary folder: %s\n", strerror(errno)); - return; + exit(1); } } @@ -596,13 +602,7 @@ static void start_worker_process(struct sandbox *sb, int slot) } -static void signal_handler(int sig, siginfo_t *si, void *uc_v) -{ - sem_post(zombie_sem); -} - - -static void handle_zombie(struct sandbox *sb) +static void handle_zombie(struct sandbox *sb, int respawn) { int i; @@ -628,11 +628,15 @@ static void handle_zombie(struct sandbox *sb) } if ( WIFSIGNALED(status) ) { + + if ( (WTERMSIG(status) == SIGINT) + || (WTERMSIG(status) == SIGQUIT) ) continue; + STATUS("Worker %i was killed by signal %i\n", i, WTERMSIG(status)); STATUS("Event ID was: %s\n", sb->shared->last_ev[i]); - start_worker_process(sb, i); + if ( respawn ) start_worker_process(sb, i); } } @@ -715,19 +719,47 @@ static int fill_queue(FILE *fh, int config_basename, struct detector *det, return 0; } +volatile sig_atomic_t at_zombies = 0; +volatile sig_atomic_t at_interrupt = 0; + +static void sigchld_handler(int sig, siginfo_t *si, void *uc_v) +{ + at_zombies = 1; +} + + +static void sigint_handler(int sig, siginfo_t *si, void *uc_v) +{ + at_interrupt = 1; +} + + +static void check_signals(struct sandbox *sb, const char *semname_q, + int respawn) +{ + if ( at_zombies ) { + at_zombies = 0; + handle_zombie(sb, respawn); + } + + if ( at_interrupt ) { + sem_unlink(semname_q); + exit(0); + } +} + void create_sandbox(struct index_args *iargs, int n_proc, char *prefix, int config_basename, FILE *fh, Stream *stream, const char *tempdir) { int i; - struct sigaction sa; - int r; struct sandbox *sb; size_t ll; struct stat s; char semname_q[64]; - char semname_z[64]; + struct sigaction sa; + int r; int allDone = 0; if ( n_proc > MAX_NUM_WORKERS ) { @@ -772,14 +804,6 @@ void create_sandbox(struct index_args *iargs, int n_proc, char *prefix, ERROR("Failed to create semaphore: %s\n", strerror(errno)); return; } - snprintf(semname_z, 64, "indexamajig-z%i", getpid()); - zombie_sem = sem_open(semname_z, O_CREAT | O_EXCL, - S_IRUSR | S_IWUSR, 0); - if ( zombie_sem == SEM_FAILED ) { - ERROR("Failed to create zombie semaphore: %s\n", - strerror(errno)); - return; - } sb->pids = calloc(n_proc, sizeof(pid_t)); sb->running = calloc(n_proc, sizeof(int)); @@ -792,16 +816,6 @@ void create_sandbox(struct index_args *iargs, int n_proc, char *prefix, return; } - /* Set up signal handler to take action if any children die */ - sa.sa_flags = SA_SIGINFO | SA_NOCLDSTOP; - sigemptyset(&sa.sa_mask); - sa.sa_sigaction = signal_handler; - r = sigaction(SIGCHLD, &sa, NULL); - if ( r == -1 ) { - ERROR("Failed to set signal handler!\n"); - return; - } - if ( tempdir == NULL ) { tempdir = ""; } @@ -844,15 +858,38 @@ void create_sandbox(struct index_args *iargs, int n_proc, char *prefix, start_worker_process(sb, i); } + /* Set up signal handler to take action if any children die */ + sa.sa_flags = SA_SIGINFO | SA_NOCLDSTOP; + sigemptyset(&sa.sa_mask); + sa.sa_sigaction = sigchld_handler; + r = sigaction(SIGCHLD, &sa, NULL); + if ( r == -1 ) { + ERROR("Failed to set signal handler!\n"); + return; + } + + /* Set up signal handler to clean up semaphore on exit */ + sa.sa_flags = SA_SIGINFO | SA_NOCLDSTOP; + sigemptyset(&sa.sa_mask); + sa.sa_sigaction = sigint_handler; + r = sigaction(SIGINT, &sa, NULL); + if ( r == -1 ) { + ERROR("Failed to set signal handler!\n"); + return; + } + r = sigaction(SIGQUIT, &sa, NULL); + if ( r == -1 ) { + ERROR("Failed to set signal handler!\n"); + return; + } + do { int r; double tNow; try_read(sb); - - /* Check for dead workers */ - if ( sem_trywait(zombie_sem) == 0 ) handle_zombie(sb); + check_signals(sb, semname_q, 1); /* Top up the queue if necessary */ r = 0; @@ -900,6 +937,7 @@ void create_sandbox(struct index_args *iargs, int n_proc, char *prefix, pthread_mutex_unlock(&sb->shared->queue_lock); } while ( !allDone ); + fclose(fh); /* Indicate to the workers that we are finished, and wake them up one @@ -915,9 +953,14 @@ void create_sandbox(struct index_args *iargs, int n_proc, char *prefix, int status; while ( waitpid(sb->pids[i], &status, WNOHANG) == 0 ) { try_read(sb); + check_signals(sb, semname_q, 0); } + /* If this worker died and got waited by the zombie handler, + * waitpid() returns -1 and the loop still exits. */ } + sem_unlink(semname_q); + for ( i=0; i<sb->n_read; i++ ) { fclose(sb->fhs[i]); } @@ -933,8 +976,6 @@ void create_sandbox(struct index_args *iargs, int n_proc, char *prefix, 100.0 * sb->shared->n_hadcrystals / sb->shared->n_processed, sb->shared->n_crystals); - sem_unlink(semname_q); - sem_unlink(semname_z); munmap(sb->shared, sizeof(struct sb_shm)); free(sb); } |