From e995d87d18a52ff2b18209459b33d2988dd7aab3 Mon Sep 17 00:00:00 2001 From: Thomas White Date: Fri, 22 Feb 2013 11:40:46 +0100 Subject: Even more robust stream handling --- src/im-sandbox.c | 192 ++++++++++++++++++++++++++++++++++++------------------- 1 file changed, 125 insertions(+), 67 deletions(-) (limited to 'src') diff --git a/src/im-sandbox.c b/src/im-sandbox.c index 37ccc35c..b8f45247 100644 --- a/src/im-sandbox.c +++ b/src/im-sandbox.c @@ -85,6 +85,24 @@ struct pattern_args }; +struct sb_reader +{ + pthread_mutex_t lock; + int done; + + /* If a worker process dies unexpectedly (e.g. if it segfaults), then + * the pipe for its output can still stay open for a little while while + * its buffer empties. The number of pipes being read from is therefore + * not necessarily the same as the number of worker processes. */ + int n_read; + FILE **fhs; + int *fds; + + /* Final output file handle */ + FILE *ofh; +}; + + struct sandbox { pthread_mutex_t lock; @@ -101,15 +119,14 @@ struct sandbox int n_proc; pid_t *pids; - FILE *ofh; - FILE **fhs; int *running; FILE **result_fhs; int *filename_pipes; - int *stream_pipe_read; int *stream_pipe_write; char **last_filename; + + struct sb_reader *reader; }; @@ -502,40 +519,96 @@ static int pump_chunk(FILE *fh, FILE *ofh) } -static void *run_reader(void *sbv) +/* Add an fd to the list of pipes to be read from */ +static void add_pipe(struct sb_reader *rd, int fd) { - struct sandbox *sb = sbv; - int done = 0; + int *fds_new; + FILE **fhs_new; + int slot; + + pthread_mutex_lock(&rd->lock); + + fds_new = realloc(rd->fds, (rd->n_read+1)*sizeof(int)); + if ( fds_new == NULL ) { + ERROR("Failed to allocate memory for new pipe.\n"); + return; + } - while ( !done ) { + fhs_new = realloc(rd->fhs, (rd->n_read+1)*sizeof(FILE *)); + if ( fhs_new == NULL ) { + ERROR("Failed to allocate memory for new FH.\n"); + return; + } + + rd->fds = fds_new; + rd->fhs = fhs_new; + slot = rd->n_read; + + rd->fds[slot] = fd; + + rd->fhs[slot] = fdopen(fd, "r"); + if ( rd->fhs[slot] == NULL ) { + ERROR("Couldn't fdopen() stream!\n"); + return; + } + + rd->n_read++; + + pthread_mutex_unlock(&rd->lock); +} + + +/* Assumes that the caller is already holding rd->lock! */ +static void remove_pipe(struct sb_reader *rd, int d) +{ + int i; + + for ( i=d; in_read; i++ ) { + if ( i < rd->n_read-1 ) { + rd->fds[i] = rd->fds[i+1]; + rd->fhs[i] = rd->fhs[i+1]; + } /* else don't bother */ + } + + rd->n_read--; + + /* We don't bother shrinking the arrays */ +} + + +static void *run_reader(void *rdv) +{ + struct sb_reader *rd = rdv; + + while ( 1 ) { int r, i; struct timeval tv; fd_set fds; int fdmax; + /* Exit when: + * - No fhs left open to read from + * AND - Main thread says "done" */ + if ( (rd->n_read == 0) && rd->done ) break; + tv.tv_sec = 1; tv.tv_usec = 0; FD_ZERO(&fds); fdmax = 0; - lock_sandbox(sb); - for ( i=0; in_proc; i++ ) { + pthread_mutex_lock(&rd->lock); + for ( i=0; in_read; i++ ) { int fd; - /* Listen for output from all processes which have a - * connection, even if they're not "running". */ - if ( sb->fhs[i] == NULL ) continue; - - fd = sb->stream_pipe_read[i]; + fd = rd->fds[i]; FD_SET(fd, &fds); if ( fd > fdmax ) fdmax = fd; } - - unlock_sandbox(sb); + pthread_mutex_unlock(&rd->lock); r = select(fdmax+1, &fds, NULL, NULL, &tv); @@ -546,25 +619,23 @@ static void *run_reader(void *sbv) continue; } - lock_sandbox(sb); - for ( i=0; in_proc; i++ ) { + pthread_mutex_lock(&rd->lock); + for ( i=0; in_read; i++ ) { - if ( !FD_ISSET(sb->stream_pipe_read[i], &fds) ) { + if ( !FD_ISSET(rd->fds[i], &fds) ) { continue; } /* If the chunk cannot be read, assume the connection * is broken and that the process will die soon. */ - if ( pump_chunk(sb->fhs[i], sb->ofh) ) { - sb->fhs[i] = NULL; + if ( pump_chunk(rd->fhs[i], rd->ofh) ) { + /* remove_pipe() assumes that the caller is + * holding rd->lock ! */ + remove_pipe(rd, i); } } - - done = 1; - if ( sb->running != NULL ) done = 0; - - unlock_sandbox(sb); + pthread_mutex_unlock(&rd->lock); } @@ -607,6 +678,7 @@ static void start_worker_process(struct sandbox *sb, int slot, pid_t p; int filename_pipe[2]; int result_pipe[2]; + int stream_pipe[2]; if ( pipe(filename_pipe) == - 1 ) { ERROR("pipe() failed!\n"); @@ -618,6 +690,11 @@ static void start_worker_process(struct sandbox *sb, int slot, return; } + if ( pipe(stream_pipe) == - 1 ) { + ERROR("pipe() failed!\n"); + return; + } + p = fork(); if ( p == -1 ) { ERROR("fork() failed!\n"); @@ -665,7 +742,7 @@ static void start_worker_process(struct sandbox *sb, int slot, close(filename_pipe[1]); close(result_pipe[0]); - st = open_stream_fd_for_write(sb->stream_pipe_write[slot]); + st = open_stream_fd_for_write(stream_pipe[1]); write_command(st, argc, argv); write_line(st, "FLUSH"); run_work(sb->iargs, filename_pipe[0], result_pipe[1], @@ -683,14 +760,11 @@ static void start_worker_process(struct sandbox *sb, int slot, * and the 'read' end of the result pipe. */ sb->pids[slot] = p; sb->running[slot] = 1; + add_pipe(sb->reader, stream_pipe[0]); close(filename_pipe[0]); close(result_pipe[1]); + close(stream_pipe[1]); sb->filename_pipes[slot] = filename_pipe[1]; - sb->fhs[slot] = fdopen(sb->stream_pipe_read[slot], "r"); - if ( sb->fhs[slot] == NULL ) { - ERROR("Couldn't fdopen() stream!\n"); - return; - } sb->result_fhs[slot] = fdopen(result_pipe[0], "r"); if ( sb->result_fhs[slot] == NULL ) { @@ -765,6 +839,16 @@ void create_sandbox(struct index_args *iargs, int n_proc, char *prefix, return; } + sb->reader = calloc(1, sizeof(struct sb_reader)); + if ( sb->reader == NULL ) { + ERROR("Couldn't allocate memory for SB reader.\n"); + free(sb); + return; + } + + pthread_mutex_init(&sb->lock, NULL); + pthread_mutex_init(&sb->reader->lock, NULL); + sb->n_processed = 0; sb->n_hadcrystals = 0; sb->n_crystals = 0; @@ -775,40 +859,21 @@ void create_sandbox(struct index_args *iargs, int n_proc, char *prefix, sb->n_proc = n_proc; sb->iargs = iargs; - pthread_mutex_init(&sb->lock, NULL); + sb->reader->fds = NULL; + sb->reader->fhs = NULL; + sb->reader->ofh = ofh; - sb->ofh = ofh; - sb->stream_pipe_read = calloc(n_proc, sizeof(int)); sb->stream_pipe_write = calloc(n_proc, sizeof(int)); - if ( sb->stream_pipe_read == NULL ) { - ERROR("Couldn't allocate memory for pipes.\n"); - return; - } if ( sb->stream_pipe_write == NULL ) { ERROR("Couldn't allocate memory for pipes.\n"); return; } - for ( i=0; istream_pipe_read[i] = stream_pipe[0]; - sb->stream_pipe_write[i] = stream_pipe[1]; - - } - lock_sandbox(sb); sb->filename_pipes = calloc(n_proc, sizeof(int)); sb->result_fhs = calloc(n_proc, sizeof(FILE *)); sb->pids = calloc(n_proc, sizeof(pid_t)); sb->running = calloc(n_proc, sizeof(int)); - sb->fhs = calloc(sb->n_proc, sizeof(FILE *)); if ( sb->filename_pipes == NULL ) { ERROR("Couldn't allocate memory for pipes.\n"); return; @@ -831,10 +896,6 @@ void create_sandbox(struct index_args *iargs, int n_proc, char *prefix, ERROR("Couldn't allocate memory for last filename list.\n"); return; } - if ( sb->fhs == NULL ) { - ERROR("Couldn't allocate memory for file handles!\n"); - return; - } unlock_sandbox(sb); if ( pipe(signal_pipe) == -1 ) { @@ -863,7 +924,8 @@ void create_sandbox(struct index_args *iargs, int n_proc, char *prefix, /* Start reader thread after forking, so that things are definitely * "running" */ - if ( pthread_create(&reader_thread, NULL, run_reader, (void *)sb) ) { + if ( pthread_create(&reader_thread, NULL, run_reader, + (void *)sb->reader) ) { ERROR("Failed to create reader thread.\n"); return; } @@ -1016,10 +1078,9 @@ void create_sandbox(struct index_args *iargs, int n_proc, char *prefix, fclose(fh); /* Indicate to the reader thread that we are done */ - lock_sandbox(sb); - free(sb->running); - sb->running = NULL; - unlock_sandbox(sb); + pthread_mutex_lock(&sb->reader->lock); + sb->reader->done = 1; + pthread_mutex_unlock(&sb->reader->lock); pthread_join(reader_thread, NULL); @@ -1033,10 +1094,7 @@ void create_sandbox(struct index_args *iargs, int n_proc, char *prefix, if ( sb->result_fhs[i] != NULL ) fclose(sb->result_fhs[i]); } - for ( i=0; in_proc; i++ ) { - fclose(sb->fhs[i]); - } - free(sb->fhs); + free(sb->running); free(sb->filename_pipes); free(sb->result_fhs); free(sb->pids); -- cgit v1.2.3