From 14100fed56471e4331f83acc46c2fccd67125911 Mon Sep 17 00:00:00 2001 From: Thomas White Date: Sun, 15 Jul 2012 11:46:25 -0400 Subject: Add locking --- src/im-sandbox.c | 174 ++++++++++++++++++++++++++++--------------------------- 1 file changed, 89 insertions(+), 85 deletions(-) (limited to 'src/im-sandbox.c') diff --git a/src/im-sandbox.c b/src/im-sandbox.c index 85576ba1..02c90cdc 100644 --- a/src/im-sandbox.c +++ b/src/im-sandbox.c @@ -75,6 +75,8 @@ struct sandbox { + pthread_mutex_t lock; + int n_indexable; int n_processed; int n_indexable_last_stats; @@ -86,6 +88,7 @@ struct sandbox int n_proc; pid_t *pids; FILE *ofh; + FILE **fhs; int *running; FILE **result_fhs; @@ -367,8 +370,7 @@ static void run_work(const struct index_args *iargs, } - /* close my pipes */ - fclose(fh); + STATUS("Got command to exit. Shutting down!\n"); cleanup_indexing(iargs->ipriv); free(iargs->indm); @@ -379,6 +381,7 @@ static void run_work(const struct index_args *iargs, free(iargs->hdf5_peak_path); free_copy_hdf5_field_list(iargs->copyme); cell_free(iargs->cell); + fclose(fh); } @@ -450,22 +453,6 @@ static void *run_reader(void *sbv) { struct sandbox *sb = sbv; int done = 0; - FILE **fhs; - int i; - - fhs = calloc(sb->n_proc, sizeof(FILE *)); - if ( fhs == NULL ) { - ERROR("Couldn't allocate memory for file handles!\n"); - return NULL; - } - - for ( i=0; in_proc; i++ ) { - fhs[i] = fdopen(sb->stream_pipe_read[i], "r"); - if ( fhs[i] == NULL ) { - ERROR("Couldn't fdopen() stream!\n"); - return NULL; - } - } while ( !done ) { @@ -479,6 +466,7 @@ static void *run_reader(void *sbv) FD_ZERO(&fds); fdmax = 0; + pthread_mutex_lock(&sb->lock); for ( i=0; in_proc; i++ ) { int fd; @@ -492,6 +480,8 @@ static void *run_reader(void *sbv) } + pthread_mutex_unlock(&sb->lock); + r = select(fdmax+1, &fds, NULL, NULL, &tv); if ( r == -1 ) { @@ -503,13 +493,14 @@ static void *run_reader(void *sbv) if ( r == 0 ) continue; /* Nothing this time. Try again */ + pthread_mutex_lock(&sb->lock); for ( i=0; in_proc; i++ ) { if ( !sb->running[i] ) continue; if ( !FD_ISSET(sb->stream_pipe_read[i], &fds) ) continue; - if ( pump_chunk(fhs[i], sb->ofh) ) { + if ( pump_chunk(sb->fhs[i], sb->ofh) ) { sb->running[i] = 0; } @@ -519,14 +510,10 @@ static void *run_reader(void *sbv) for ( i=0; in_proc; i++ ) { if ( sb->running[i] ) done = 0; } + pthread_mutex_unlock(&sb->lock); } - for ( i=0; in_proc; i++ ) { - fclose(fhs[i]); - } - free(fhs); - return NULL; } @@ -547,9 +534,11 @@ static void start_worker_process(struct sandbox *sb, int slot) return; } + pthread_mutex_lock(&sb->lock); p = fork(); if ( p == -1 ) { ERROR("fork() failed!\n"); + pthread_mutex_unlock(&sb->lock); return; } @@ -560,6 +549,9 @@ static void start_worker_process(struct sandbox *sb, int slot) struct sigaction sa; int r; + /* FIXME: Is lock inherited? */ + pthread_mutex_unlock(&sb->lock); + /* First, disconnect the signal handler */ sa.sa_flags = 0; sigemptyset(&sa.sa_mask); @@ -597,8 +589,7 @@ static void start_worker_process(struct sandbox *sb, int slot) sfh, slot); fclose(sfh); - free(sb->stream_pipe_write); - close(filename_pipe[0]); + //close(filename_pipe[0]); close(result_pipe[1]); exit(0); @@ -612,12 +603,21 @@ static void start_worker_process(struct sandbox *sb, int slot) close(filename_pipe[0]); close(result_pipe[1]); sb->filename_pipes[slot] = filename_pipe[1]; + sb->fhs[slot] = fdopen(sb->stream_pipe_read[slot], "r"); + if ( sb->fhs[slot] == NULL ) { + ERROR("Couldn't fdopen() stream!\n"); + pthread_mutex_unlock(&sb->lock); + return; + } sb->result_fhs[slot] = fdopen(result_pipe[0], "r"); if ( sb->result_fhs[slot] == NULL ) { ERROR("fdopen() failed.\n"); + pthread_mutex_unlock(&sb->lock); return; } + + pthread_mutex_unlock(&sb->lock); } @@ -625,12 +625,17 @@ static void signal_handler(int sig, siginfo_t *si, void *uc_v) { int i, found; + STATUS("Signal!\n"); + if ( si->si_signo != SIGCHLD ) { ERROR("Unhandled signal %i?\n", si->si_signo); return; } found = 0; + STATUS("Getting lock...\n"); fflush(stderr); + pthread_mutex_lock(&sb->lock); + STATUS("Got it.\n"); fflush(stderr); for ( i=0; in_proc; i++ ) { if ( (sb->running[i]) && (sb->pids[i] == si->si_pid) ) { found = 1; @@ -640,22 +645,29 @@ static void signal_handler(int sig, siginfo_t *si, void *uc_v) if ( !found ) { ERROR("SIGCHLD from unknown child %i?\n", si->si_pid); + pthread_mutex_unlock(&sb->lock); return; } if ( (si->si_code == CLD_TRAPPED) || (si->si_code == CLD_STOPPED) - || (si->si_code == CLD_CONTINUED) ) return; + || (si->si_code == CLD_CONTINUED) ) + { + pthread_mutex_unlock(&sb->lock); + return; + } if ( si->si_code == CLD_EXITED ) { sb->running[i] = 0; STATUS("Worker process %i exited normally.\n", i); + pthread_mutex_unlock(&sb->lock); return; } if ( (si->si_code != CLD_DUMPED) && (si->si_code != CLD_KILLED) ) { ERROR("Unhandled si_code %i (worker process %i).\n", si->si_code, i); + pthread_mutex_unlock(&sb->lock); return; } @@ -663,8 +675,8 @@ static void signal_handler(int sig, siginfo_t *si, void *uc_v) ERROR(" -> Signal %i, last filename %s.\n", si->si_signo, sb->last_filename[i]); - sb->running[i] = 0; - //start_worker_process(sb, i); + pthread_mutex_unlock(&sb->lock); + start_worker_process(sb, i); } @@ -693,6 +705,8 @@ void create_sandbox(struct index_args *iargs, int n_proc, char *prefix, sb->ofh = ofh; sb->iargs = iargs; + pthread_mutex_init(&sb->lock, NULL); + sb->stream_pipe_read = calloc(n_proc, sizeof(int)); sb->stream_pipe_write = calloc(n_proc, sizeof(int)); if ( sb->stream_pipe_read == NULL ) { @@ -718,25 +732,12 @@ void create_sandbox(struct index_args *iargs, int n_proc, char *prefix, } - if ( pthread_create(&reader_thread, NULL, run_reader, (void *)sb) ) { - ERROR("Failed to create reader thread.\n"); - return; - } - - /* Set up signal handler to take action if any children die */ - sa.sa_flags = SA_SIGINFO | SA_NOCLDSTOP; - sigemptyset(&sa.sa_mask); - sa.sa_sigaction = signal_handler; - r = sigaction(SIGCHLD, &sa, NULL); - if ( r == -1 ) { - ERROR("Failed to set signal handler!\n"); - return; - } - + pthread_mutex_lock(&sb->lock); sb->filename_pipes = calloc(n_proc, sizeof(int)); sb->result_fhs = calloc(n_proc, sizeof(FILE *)); sb->pids = calloc(n_proc, sizeof(pid_t)); sb->running = calloc(n_proc, sizeof(int)); + sb->fhs = calloc(sb->n_proc, sizeof(FILE *)); if ( sb->filename_pipes == NULL ) { ERROR("Couldn't allocate memory for pipes.\n"); return; @@ -759,43 +760,30 @@ void create_sandbox(struct index_args *iargs, int n_proc, char *prefix, ERROR("Couldn't allocate memory for last filename list.\n"); return; } - - /* Fork the right number of times */ - for ( i=0; ifhs == NULL ) { + ERROR("Couldn't allocate memory for file handles!\n"); + return; } + pthread_mutex_unlock(&sb->lock); - /* Send first image to all children */ - for ( i=0; ilast_filename[i]); - sb->last_filename[i] = strdup(nextImage); - - write(sb->filename_pipes[i], nextImage, - strlen(nextImage)); - write(sb->filename_pipes[i], "\n", 1); - - free(nextImage); - - } else { - - int r; - - /* No more files to process.. already? */ - r = write(sb->filename_pipes[i], "\n", 1); - if ( r < 0 ) { - ERROR("Write pipe\n"); - } + if ( pthread_create(&reader_thread, NULL, run_reader, (void *)sb) ) { + ERROR("Failed to create reader thread.\n"); + return; + } - } + /* Set up signal handler to take action if any children die */ + sa.sa_flags = SA_SIGINFO | SA_NOCLDSTOP; + sigemptyset(&sa.sa_mask); + sa.sa_sigaction = signal_handler; + r = sigaction(SIGCHLD, &sa, NULL); + if ( r == -1 ) { + ERROR("Failed to set signal handler!\n"); + return; + } + /* Fork the right number of times */ + for ( i=0; ilock); for ( i=0; irunning[i] ) continue; + if ( !sb->running[i] ) { + pthread_mutex_unlock(&sb->lock); + continue; + } fd = fileno(sb->result_fhs[i]); FD_SET(fd, &fds); @@ -824,8 +816,8 @@ void create_sandbox(struct index_args *iargs, int n_proc, char *prefix, } + pthread_mutex_unlock(&sb->lock); r = select(fdmax+1, &fds, NULL, NULL, &tv); - if ( r == -1 ) { if ( errno != EINTR ) { ERROR("select() failed: %s\n", strerror(errno)); @@ -835,6 +827,7 @@ void create_sandbox(struct index_args *iargs, int n_proc, char *prefix, if ( r == 0 ) continue; /* No progress this time. Try again */ + pthread_mutex_lock(&sb->lock); for ( i=0; irunning[i] ) continue; + if ( !sb->running[i] ) { + continue; + } fd = fileno(sb->result_fhs[i]); - if ( !FD_ISSET(fd, &fds) ) continue; + if ( !FD_ISSET(fd, &fds) ) { + continue; + } rval = fgets(results, 1024, sb->result_fhs[i]); if ( rval == NULL ) { - if ( feof(sb->result_fhs[i]) ) { - ERROR("EOF from process %i.\n", i); - } else { + if ( !feof(sb->result_fhs[i]) ) { ERROR("fgets() failed: %s\n", strerror(errno)); } @@ -883,7 +878,6 @@ void create_sandbox(struct index_args *iargs, int n_proc, char *prefix, ERROR("Write pipe\n"); } } else { - r = write(sb->filename_pipes[i], nextImage, strlen(nextImage)); r -= write(sb->filename_pipes[i], "\n", 1); @@ -916,10 +910,16 @@ void create_sandbox(struct index_args *iargs, int n_proc, char *prefix, if ( sb->running[i] ) allDone = 0; } + pthread_mutex_unlock(&sb->lock); + } + STATUS("Done. Waiting..\n"); + fclose(fh); + pthread_mutex_destroy(&sb->lock); + for ( i=0; ipids[i], &status, 0); @@ -930,6 +930,10 @@ void create_sandbox(struct index_args *iargs, int n_proc, char *prefix, fclose(sb->result_fhs[i]); } + for ( i=0; in_proc; i++ ) { + fclose(sb->fhs[i]); + } + free(sb->fhs); free(sb->filename_pipes); free(sb->result_fhs); free(sb->pids); -- cgit v1.2.3