From 16b97e9eb6e453d518bd081c94b54c373410ac01 Mon Sep 17 00:00:00 2001 From: Thomas White Date: Sun, 15 Jul 2012 10:21:16 -0400 Subject: Sandboxy stuff --- src/im-sandbox.c | 502 ++++++++++++++++++++++++++++++------------------------ src/indexamajig.c | 2 +- 2 files changed, 279 insertions(+), 225 deletions(-) diff --git a/src/im-sandbox.c b/src/im-sandbox.c index a5806734..85576ba1 100644 --- a/src/im-sandbox.c +++ b/src/im-sandbox.c @@ -75,10 +75,31 @@ struct sandbox { + int n_indexable; + int n_processed; + int n_indexable_last_stats; + int n_processed_last_stats; + int t_last_stats; + + struct index_args *iargs; + + int n_proc; + pid_t *pids; + FILE *ofh; + int *running; + FILE **result_fhs; + int *filename_pipes; + int *stream_pipe_read; + int *stream_pipe_write; + char **last_filename; }; +/* Horrible global variable for signal handler */ +struct sandbox *sb; + + static char *get_pattern(FILE *fh, char **use_this_one_instead, int config_basename, const char *prefix) { @@ -285,6 +306,7 @@ static void run_work(const struct index_args *iargs, { int allDone = 0; FILE *fh; + int w; fh = fdopen(filename_pipe, "r"); if ( fh == NULL ) { @@ -292,13 +314,18 @@ static void run_work(const struct index_args *iargs, return; } + w = write(results_pipe, "\n", 1); + if ( w < 0 ) { + ERROR("Failed to send request for first filename.\n"); + } + while ( !allDone ) { struct pattern_args pargs; - int w, c; - char buf[1024]; + int c; char *line; char *rval; + char buf[1024]; line = malloc(1024*sizeof(char)); rval = fgets(line, 1023, fh); @@ -306,6 +333,7 @@ static void run_work(const struct index_args *iargs, free(line); if ( feof(fh) ) { allDone = 1; + STATUS("Exiting!\n"); continue; } else { ERROR("Read error!\n"); @@ -377,7 +405,7 @@ static time_t get_monotonic_seconds() #endif -static void pump_chunk(FILE *fh, int *finished, FILE *ofh) +static int pump_chunk(FILE *fh, FILE *ofh) { int chunk_started = 0; int chunk_finished = 0; @@ -392,11 +420,11 @@ static void pump_chunk(FILE *fh, int *finished, FILE *ofh) if ( feof(fh) ) { /* Process died */ - *finished = 1; if ( chunk_started ) { ERROR("EOF during chunk!\n"); fprintf(ofh, "Chunk is unfinished!\n"); } + return 1; } else { ERROR("fgets() failed: %s\n", strerror(errno)); } @@ -413,33 +441,29 @@ static void pump_chunk(FILE *fh, int *finished, FILE *ofh) } } while ( !chunk_finished ); + + return 0; } -static void run_reader(int *stream_pipe_read, int n_proc, FILE *ofh) +static void *run_reader(void *sbv) { + struct sandbox *sb = sbv; int done = 0; - int *finished; FILE **fhs; int i; - finished = calloc(n_proc, sizeof(int)); - if ( finished == NULL ) { - ERROR("Couldn't allocate memory for flags!\n"); - exit(1); - } - - fhs = calloc(n_proc, sizeof(FILE *)); + fhs = calloc(sb->n_proc, sizeof(FILE *)); if ( fhs == NULL ) { ERROR("Couldn't allocate memory for file handles!\n"); - exit(1); + return NULL; } - for ( i=0; in_proc; i++ ) { + fhs[i] = fdopen(sb->stream_pipe_read[i], "r"); if ( fhs[i] == NULL ) { ERROR("Couldn't fdopen() stream!\n"); - exit(1); + return NULL; } } @@ -455,13 +479,13 @@ static void run_reader(int *stream_pipe_read, int n_proc, FILE *ofh) FD_ZERO(&fds); fdmax = 0; - for ( i=0; in_proc; i++ ) { int fd; - if ( finished[i] ) continue; + if ( !sb->running[i] ) continue; - fd = stream_pipe_read[i]; + fd = sb->stream_pipe_read[i]; FD_SET(fd, &fds); if ( fd > fdmax ) fdmax = fd; @@ -471,45 +495,176 @@ static void run_reader(int *stream_pipe_read, int n_proc, FILE *ofh) r = select(fdmax+1, &fds, NULL, NULL, &tv); if ( r == -1 ) { - ERROR("select() failed: %s\n", strerror(errno)); + if ( errno != EINTR ) { + ERROR("select() failed: %s\n", strerror(errno)); + } /* Otherwise no big deal */ continue; } if ( r == 0 ) continue; /* Nothing this time. Try again */ - for ( i=0; in_proc; i++ ) { - if ( finished[i] ) continue; + if ( !sb->running[i] ) continue; - if ( !FD_ISSET(stream_pipe_read[i], &fds) ) continue; + if ( !FD_ISSET(sb->stream_pipe_read[i], &fds) ) continue; - pump_chunk(fhs[i], &finished[i], ofh); + if ( pump_chunk(fhs[i], sb->ofh) ) { + sb->running[i] = 0; + } } done = 1; - for ( i=0; in_proc; i++ ) { + if ( sb->running[i] ) done = 0; } } - free(finished); - - for ( i=0; in_proc; i++ ) { fclose(fhs[i]); } free(fhs); - if ( ofh != stdout ) fclose(ofh); + return NULL; +} + + +static void start_worker_process(struct sandbox *sb, int slot) +{ + pid_t p; + int filename_pipe[2]; + int result_pipe[2]; + + if ( pipe(filename_pipe) == - 1 ) { + ERROR("pipe() failed!\n"); + return; + } + + if ( pipe(result_pipe) == - 1 ) { + ERROR("pipe() failed!\n"); + return; + } + + p = fork(); + if ( p == -1 ) { + ERROR("fork() failed!\n"); + return; + } + + if ( p == 0 ) { + + FILE *sfh; + int j; + struct sigaction sa; + int r; + + /* First, disconnect the signal handler */ + sa.sa_flags = 0; + sigemptyset(&sa.sa_mask); + sa.sa_handler = SIG_DFL; + r = sigaction(SIGCHLD, &sa, NULL); + if ( r == -1 ) { + ERROR("Failed to set signal handler!\n"); + return; + } + + /* Free resources which will not be needed by worker */ + for ( j=0; jn_proc; j++ ) { + if ( (j != slot) && (sb->running[j]) ) { + close(sb->stream_pipe_write[j]); + } + } + for ( j=0; jn_proc; j++ ) { + if ( (j != slot) && (sb->running[j]) ) { + fclose(sb->result_fhs[j]); + close(sb->filename_pipes[j]); + } + } + free(sb->filename_pipes); + free(sb->result_fhs); + free(sb->pids); + /* Also prefix, use_this_one_instead and fh */ + + /* Child process gets the 'read' end of the filename + * pipe, and the 'write' end of the result pipe. */ + close(filename_pipe[1]); + close(result_pipe[0]); + + sfh = fdopen(sb->stream_pipe_write[slot], "w"); + run_work(sb->iargs, filename_pipe[0], result_pipe[1], + sfh, slot); + fclose(sfh); + + free(sb->stream_pipe_write); + close(filename_pipe[0]); + close(result_pipe[1]); + + exit(0); + + } + + /* Parent process gets the 'write' end of the filename pipe + * and the 'read' end of the result pipe. */ + sb->pids[slot] = p; + sb->running[slot] = 1; + close(filename_pipe[0]); + close(result_pipe[1]); + sb->filename_pipes[slot] = filename_pipe[1]; + + sb->result_fhs[slot] = fdopen(result_pipe[0], "r"); + if ( sb->result_fhs[slot] == NULL ) { + ERROR("fdopen() failed.\n"); + return; + } } static void signal_handler(int sig, siginfo_t *si, void *uc_v) { - struct ucontext_t *uc = uc_v; + int i, found; - STATUS("Signal!\n"); + if ( si->si_signo != SIGCHLD ) { + ERROR("Unhandled signal %i?\n", si->si_signo); + return; + } + + found = 0; + for ( i=0; in_proc; i++ ) { + if ( (sb->running[i]) && (sb->pids[i] == si->si_pid) ) { + found = 1; + break; + } + } + + if ( !found ) { + ERROR("SIGCHLD from unknown child %i?\n", si->si_pid); + return; + } + + if ( (si->si_code == CLD_TRAPPED) || (si->si_code == CLD_STOPPED) + || (si->si_code == CLD_CONTINUED) ) return; + + if ( si->si_code == CLD_EXITED ) + { + sb->running[i] = 0; + STATUS("Worker process %i exited normally.\n", i); + return; + } + + if ( (si->si_code != CLD_DUMPED) && (si->si_code != CLD_KILLED) ) { + ERROR("Unhandled si_code %i (worker process %i).\n", + si->si_code, i); + return; + } + + ERROR("Worker process %i exited abnormally!\n", i); + ERROR(" -> Signal %i, last filename %s.\n", + si->si_signo, sb->last_filename[i]); + + sb->running[i] = 0; + //start_worker_process(sb, i); } @@ -517,34 +672,34 @@ void create_sandbox(struct index_args *iargs, int n_proc, char *prefix, int config_basename, FILE *fh, char *use_this_one_instead, FILE *ofh) { - int n_indexable, n_processed, n_indexable_last_stats; - int n_processed_last_stats; - int t_last_stats; - pid_t *pids; - int *filename_pipes; - int *stream_pipe_read; - int *stream_pipe_write; - FILE **result_fhs; int i; int allDone; - int *finished; - pid_t pr; struct sigaction sa; int r; + pthread_t reader_thread; - n_indexable = 0; - n_processed = 0; - n_indexable_last_stats = 0; - n_processed_last_stats = 0; - t_last_stats = get_monotonic_seconds(); + sb = calloc(1, sizeof(struct sandbox)); + if ( sb == NULL ) { + ERROR("Couldn't allocate memory for sandbox.\n"); + return; + } - stream_pipe_read = calloc(n_proc, sizeof(int)); - stream_pipe_write = calloc(n_proc, sizeof(int)); - if ( stream_pipe_read == NULL ) { + sb->n_indexable = 0; + sb->n_processed = 0; + sb->n_indexable_last_stats = 0; + sb->n_processed_last_stats = 0; + sb->t_last_stats = get_monotonic_seconds(); + sb->n_proc = n_proc; + sb->ofh = ofh; + sb->iargs = iargs; + + sb->stream_pipe_read = calloc(n_proc, sizeof(int)); + sb->stream_pipe_write = calloc(n_proc, sizeof(int)); + if ( sb->stream_pipe_read == NULL ) { ERROR("Couldn't allocate memory for pipes.\n"); return; } - if ( stream_pipe_write == NULL ) { + if ( sb->stream_pipe_write == NULL ) { ERROR("Couldn't allocate memory for pipes.\n"); return; } @@ -558,46 +713,16 @@ void create_sandbox(struct index_args *iargs, int n_proc, char *prefix, return; } - stream_pipe_read[i] = stream_pipe[0]; - stream_pipe_write[i] = stream_pipe[1]; + sb->stream_pipe_read[i] = stream_pipe[0]; + sb->stream_pipe_write[i] = stream_pipe[1]; } - pr = fork(); - if ( pr == - 1 ) { - ERROR("fork() failed (for reader process)\n"); + if ( pthread_create(&reader_thread, NULL, run_reader, (void *)sb) ) { + ERROR("Failed to create reader thread.\n"); return; } - if ( pr == 0 ) { - - /* Free resources not needed by reader - * (but which will be needed by worker or master) */ - for ( i=0; iipriv); - free(iargs->indm); - free(iargs->ipriv); - free_detector_geometry(iargs->det); - free(iargs->beam); - free(iargs->element); - free(iargs->hdf5_peak_path); - free_copy_hdf5_field_list(iargs->copyme); - cell_free(iargs->cell); - fclose(fh); - - run_reader(stream_pipe_read, n_proc, ofh); - - free(stream_pipe_read); - - exit(0); - - } - /* Set up signal handler to take action if any children die */ sa.sa_flags = SA_SIGINFO | SA_NOCLDSTOP; sigemptyset(&sa.sa_mask); @@ -608,119 +733,37 @@ void create_sandbox(struct index_args *iargs, int n_proc, char *prefix, return; } - /* Free resources needed by reader only */ - if ( ofh != stdout ) fclose(ofh); - for ( i=0; ifilename_pipes = calloc(n_proc, sizeof(int)); + sb->result_fhs = calloc(n_proc, sizeof(FILE *)); + sb->pids = calloc(n_proc, sizeof(pid_t)); + sb->running = calloc(n_proc, sizeof(int)); + if ( sb->filename_pipes == NULL ) { ERROR("Couldn't allocate memory for pipes.\n"); return; } - if ( result_fhs == NULL ) { + if ( sb->result_fhs == NULL ) { ERROR("Couldn't allocate memory for pipe file handles.\n"); return; } - if ( pids == NULL ) { + if ( sb->pids == NULL ) { ERROR("Couldn't allocate memory for PIDs.\n"); return; } + if ( sb->running == NULL ) { + ERROR("Couldn't allocate memory for process flags.\n"); + return; + } - /* Fork the right number of times */ - for ( i=0; ilast_filename = calloc(n_proc, sizeof(char *)); + if ( sb->last_filename == NULL ) { + ERROR("Couldn't allocate memory for last filename list.\n"); + return; } - /* Free resources which will not be used by the main thread */ - cleanup_indexing(iargs->ipriv); - free(iargs->indm); - free(iargs->ipriv); - free_detector_geometry(iargs->det); - free(iargs->beam); - free(iargs->element); - free(iargs->hdf5_peak_path); - free_copy_hdf5_field_list(iargs->copyme); - cell_free(iargs->cell); + /* Fork the right number of times */ for ( i=0; ilast_filename[i]); + sb->last_filename[i] = strdup(nextImage); + + write(sb->filename_pipes[i], nextImage, + strlen(nextImage)); + write(sb->filename_pipes[i], "\n", 1); free(nextImage); @@ -742,7 +789,7 @@ void create_sandbox(struct index_args *iargs, int n_proc, char *prefix, int r; /* No more files to process.. already? */ - r = write(filename_pipes[i], "\n", 1); + r = write(sb->filename_pipes[i], "\n", 1); if ( r < 0 ) { ERROR("Write pipe\n"); } @@ -751,12 +798,6 @@ void create_sandbox(struct index_args *iargs, int n_proc, char *prefix, } - finished = calloc(n_proc, sizeof(int)); - if ( finished == NULL ) { - ERROR("Couldn't allocate memory for process flags.\n"); - return; - } - allDone = 0; while ( !allDone ) { @@ -775,9 +816,9 @@ void create_sandbox(struct index_args *iargs, int n_proc, char *prefix, int fd; - if ( finished[i] ) continue; + if ( !sb->running[i] ) continue; - fd = fileno(result_fhs[i]); + fd = fileno(sb->result_fhs[i]); FD_SET(fd, &fds); if ( fd > fdmax ) fdmax = fd; @@ -786,7 +827,9 @@ void create_sandbox(struct index_args *iargs, int n_proc, char *prefix, r = select(fdmax+1, &fds, NULL, NULL, &tv); if ( r == -1 ) { - ERROR("select() failed: %s\n", strerror(errno)); + if ( errno != EINTR ) { + ERROR("select() failed: %s\n", strerror(errno)); + } continue; } @@ -798,17 +841,18 @@ void create_sandbox(struct index_args *iargs, int n_proc, char *prefix, char results[1024]; char *rval; int fd; + int n; + char *eptr; - if ( finished[i] ) continue; + if ( !sb->running[i] ) continue; - fd = fileno(result_fhs[i]); + fd = fileno(sb->result_fhs[i]); if ( !FD_ISSET(fd, &fds) ) continue; - rval = fgets(results, 1024, result_fhs[i]); + rval = fgets(results, 1024, sb->result_fhs[i]); if ( rval == NULL ) { - if ( feof(result_fhs[i]) ) { - /* Process died */ - finished[i] = 1; + if ( feof(sb->result_fhs[i]) ) { + ERROR("EOF from process %i.\n", i); } else { ERROR("fgets() failed: %s\n", strerror(errno)); @@ -817,8 +861,16 @@ void create_sandbox(struct index_args *iargs, int n_proc, char *prefix, } chomp(results); - n_indexable += atoi(results); - n_processed++; + + n = strtol(results, &eptr, 10); + if ( eptr == results ) { + if ( strlen(results) > 0 ) { + ERROR("Invalid result '%s'\n", results); + } + } else { + sb->n_indexable += atoi(results); + sb->n_processed++; + } /* Send next filename */ nextImage = get_pattern(fh, &use_this_one_instead, @@ -826,15 +878,15 @@ void create_sandbox(struct index_args *iargs, int n_proc, char *prefix, if ( nextImage == NULL ) { /* No more images */ - r = write(filename_pipes[i], "\n", 1); + r = write(sb->filename_pipes[i], "\n", 1); if ( r < 0 ) { ERROR("Write pipe\n"); } } else { - r = write(filename_pipes[i], nextImage, + r = write(sb->filename_pipes[i], nextImage, strlen(nextImage)); - r -= write(filename_pipes[i], "\n", 1); + r -= write(sb->filename_pipes[i], "\n", 1); if ( r < 0 ) { ERROR("write pipe\n"); } @@ -845,23 +897,23 @@ void create_sandbox(struct index_args *iargs, int n_proc, char *prefix, /* Update progress */ tNow = get_monotonic_seconds(); - if ( tNow >= t_last_stats+STATS_EVERY_N_SECONDS ) { + if ( tNow >= sb->t_last_stats+STATS_EVERY_N_SECONDS ) { STATUS("%i out of %i indexed so far," " %i out of %i since the last message.\n", - n_indexable, n_processed, - n_indexable - n_indexable_last_stats, - n_processed - n_processed_last_stats); + sb->n_indexable, sb->n_processed, + sb->n_indexable - sb->n_indexable_last_stats, + sb->n_processed - sb->n_processed_last_stats); - n_indexable_last_stats = n_indexable; - n_processed_last_stats = n_processed; - t_last_stats = tNow; + sb->n_indexable_last_stats = sb->n_indexable; + sb->n_processed_last_stats = sb->n_processed; + sb->t_last_stats = tNow; } allDone = 1; for ( i=0; irunning[i] ) allDone = 0; } } @@ -870,20 +922,22 @@ void create_sandbox(struct index_args *iargs, int n_proc, char *prefix, for ( i=0; ipids[i], &status, 0); } for ( i=0; ifilename_pipes[i]); + fclose(sb->result_fhs[i]); } - free(filename_pipes); - free(result_fhs); - free(pids); - free(finished); + free(sb->filename_pipes); + free(sb->result_fhs); + free(sb->pids); + free(sb->running); + + if ( ofh != stdout ) fclose(ofh); STATUS("There were %i images, of which %i could be indexed.\n", - n_processed, n_indexable); + sb->n_processed, sb->n_indexable); } diff --git a/src/indexamajig.c b/src/indexamajig.c index c2c9ed2c..294bce1d 100644 --- a/src/indexamajig.c +++ b/src/indexamajig.c @@ -637,7 +637,7 @@ int main(int argc, char *argv[]) iargs.ir_out = ir_out; create_sandbox(&iargs, n_proc, prefix, config_basename, fh, - use_this_one_instead, ofh); + use_this_one_instead, ofh); free(prefix); -- cgit v1.2.3