diff options
author | Thomas White <taw@physics.org> | 2015-07-11 10:16:56 +0200 |
---|---|---|
committer | Thomas White <taw@physics.org> | 2015-07-13 16:00:16 +0200 |
commit | cca5d6e35b0ab653b333424abf819b4a874cf911 (patch) | |
tree | 711984cb36323f8cc250f09bdf8b4a8ecf610af6 /src | |
parent | 65d5d478b4288b26a455e260ee9bc153a1789f29 (diff) |
Use named semaphores instead of unnamed
Diffstat (limited to 'src')
-rw-r--r-- | src/im-sandbox.c | 68 | ||||
-rw-r--r-- | src/im-sandbox.h | 1 |
2 files changed, 48 insertions, 21 deletions
diff --git a/src/im-sandbox.c b/src/im-sandbox.c index 1b3dbfcd..6464202c 100644 --- a/src/im-sandbox.c +++ b/src/im-sandbox.c @@ -82,6 +82,7 @@ struct sandbox int serial; struct sb_shm *shared; + sem_t *queue_sem; char *tmpdir; @@ -96,7 +97,7 @@ struct sandbox /* Horrible global variable for signal handler */ -sem_t zombie_sem; +sem_t *zombie_sem; static struct filename_plus_event *get_pattern(FILE *fh, int config_basename, @@ -246,7 +247,7 @@ static void shuffle_events(struct sb_shm *sb_shared) static void run_work(const struct index_args *iargs, Stream *st, - int cookie, const char *tmpdir, struct sb_shm *sb_shared) + int cookie, const char *tmpdir, struct sandbox *sb) { int allDone = 0; @@ -260,25 +261,35 @@ static void run_work(const struct index_args *iargs, Stream *st, int r; /* Wait until an event is ready */ - sem_wait(&sb_shared->queue_sem); + if ( sem_wait(sb->queue_sem) != 0 ) { + ERROR("Failed to wait on queue semaphore: %s\n", + strerror(errno)); + } /* Get the event from the queue */ - pthread_mutex_lock(&sb_shared->queue_lock); - if ( sb_shared->no_more ) { - pthread_mutex_unlock(&sb_shared->queue_lock); + pthread_mutex_lock(&sb->shared->queue_lock); + if ( sb->shared->no_more ) { + pthread_mutex_unlock(&sb->shared->queue_lock); + allDone = 1; + continue; + } + if ( sb->shared->n_events == 0 ) { + ERROR("Got the semaphore, but no events in queue!\n"); + ERROR("no_more = %i\n", sb->shared->no_more); + pthread_mutex_unlock(&sb->shared->queue_lock); allDone = 1; continue; } - r = sscanf(sb_shared->queue[0], "%s %s %i", + r = sscanf(sb->shared->queue[0], "%s %s %i", filename, event_str, &ser); if ( r != 3 ) { STATUS("Invalid event string '%s'\n", - sb_shared->queue[0]); + sb->shared->queue[0]); } - memcpy(sb_shared->last_ev[cookie], sb_shared->queue[0], + memcpy(sb->shared->last_ev[cookie], sb->shared->queue[0], MAX_EV_LEN); - shuffle_events(sb_shared); - pthread_mutex_unlock(&sb_shared->queue_lock); + shuffle_events(sb->shared); + pthread_mutex_unlock(&sb->shared->queue_lock); if ( r != 3 ) continue; @@ -301,7 +312,7 @@ static void run_work(const struct index_args *iargs, Stream *st, } process_image(iargs, &pargs, st, cookie, tmpdir, ser, - sb_shared); + sb->shared); free_filename_plus_event(pargs.filename_p_e); @@ -562,7 +573,7 @@ static void start_worker_process(struct sandbox *sb, int slot) */ st = open_stream_fd_for_write(stream_pipe[1]); - run_work(sb->iargs, st, slot, tmp, sb->shared); + run_work(sb->iargs, st, slot, tmp, sb); close_stream(st); free(tmp); @@ -587,7 +598,7 @@ static void start_worker_process(struct sandbox *sb, int slot) static void signal_handler(int sig, siginfo_t *si, void *uc_v) { - sem_post(&zombie_sem); + sem_post(zombie_sem); } @@ -697,7 +708,7 @@ static int fill_queue(FILE *fh, int config_basename, struct detector *det, maybe_get_event_string(ne->ev), sb->serial++); memcpy(sb->shared->queue[sb->shared->n_events++], ev_string, MAX_EV_LEN); - sem_post(&sb->shared->queue_sem); + sem_post(sb->queue_sem); free_filename_plus_event(ne); } @@ -715,6 +726,8 @@ void create_sandbox(struct index_args *iargs, int n_proc, char *prefix, struct sandbox *sb; size_t ll; struct stat s; + char semname_q[64]; + char semname_z[64]; int allDone = 0; if ( n_proc > MAX_NUM_WORKERS ) { @@ -751,8 +764,22 @@ void create_sandbox(struct index_args *iargs, int n_proc, char *prefix, sb->shared->n_hadcrystals = 0; sb->shared->n_crystals = 0; - sem_init(&sb->shared->queue_sem, 1, 0); - sem_init(&zombie_sem, 0, 0); + /* Set up semaphore to control work queue */ + snprintf(semname_q, 64, "indexamajig-q%i", getpid()); + sb->queue_sem = sem_open(semname_q, O_CREAT | O_EXCL, + S_IRUSR | S_IWUSR, 0); + if ( sb->queue_sem == SEM_FAILED ) { + ERROR("Failed to create semaphore: %s\n", strerror(errno)); + return; + } + snprintf(semname_z, 64, "indexamajig-z%i", getpid()); + zombie_sem = sem_open(semname_z, O_CREAT | O_EXCL, + S_IRUSR | S_IWUSR, 0); + if ( zombie_sem == SEM_FAILED ) { + ERROR("Failed to create zombie semaphore: %s\n", + strerror(errno)); + return; + } sb->pids = calloc(n_proc, sizeof(pid_t)); sb->running = calloc(n_proc, sizeof(int)); @@ -823,10 +850,9 @@ void create_sandbox(struct index_args *iargs, int n_proc, char *prefix, double tNow; try_read(sb); - sleep(5); /* Check for dead workers */ - if ( sem_trywait(&zombie_sem) == 0 ) handle_zombie(sb); + if ( sem_trywait(zombie_sem) == 0 ) handle_zombie(sb); /* Top up the queue if necessary */ r = 0; @@ -883,7 +909,7 @@ void create_sandbox(struct index_args *iargs, int n_proc, char *prefix, sb->shared->no_more = 1; pthread_mutex_unlock(&sb->shared->queue_lock); for ( i=0; i<n_proc; i++ ) { - sem_post(&sb->shared->queue_sem); + sem_post(sb->queue_sem); } for ( i=0; i<n_proc; i++ ) { int status; @@ -907,6 +933,8 @@ void create_sandbox(struct index_args *iargs, int n_proc, char *prefix, 100.0 * sb->shared->n_hadcrystals / sb->shared->n_processed, sb->shared->n_crystals); + sem_unlink(semname_q); + sem_unlink(semname_z); munmap(sb->shared, sizeof(struct sb_shm)); free(sb); } diff --git a/src/im-sandbox.h b/src/im-sandbox.h index 67bc88c8..b39f9566 100644 --- a/src/im-sandbox.h +++ b/src/im-sandbox.h @@ -61,7 +61,6 @@ struct sb_shm char queue[QUEUE_SIZE][MAX_EV_LEN]; int no_more; char last_ev[MAX_NUM_WORKERS][MAX_EV_LEN]; - sem_t queue_sem; pthread_mutex_t totals_lock; int n_processed; |