aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorThomas White <taw@physics.org>2013-02-22 11:40:46 +0100
committerThomas White <taw@physics.org>2013-02-22 11:40:46 +0100
commite995d87d18a52ff2b18209459b33d2988dd7aab3 (patch)
tree3281f015b014f3a268cb333187feebb9978a8b64 /src
parentb821aae2dd29cbdec5ccd33a3929a61ee047f0e6 (diff)
Even more robust stream handling
Diffstat (limited to 'src')
-rw-r--r--src/im-sandbox.c192
1 files changed, 125 insertions, 67 deletions
diff --git a/src/im-sandbox.c b/src/im-sandbox.c
index 37ccc35c..b8f45247 100644
--- a/src/im-sandbox.c
+++ b/src/im-sandbox.c
@@ -85,6 +85,24 @@ struct pattern_args
};
+struct sb_reader
+{
+ pthread_mutex_t lock;
+ int done;
+
+ /* If a worker process dies unexpectedly (e.g. if it segfaults), then
+ * the pipe for its output can still stay open for a little while while
+ * its buffer empties. The number of pipes being read from is therefore
+ * not necessarily the same as the number of worker processes. */
+ int n_read;
+ FILE **fhs;
+ int *fds;
+
+ /* Final output file handle */
+ FILE *ofh;
+};
+
+
struct sandbox
{
pthread_mutex_t lock;
@@ -101,15 +119,14 @@ struct sandbox
int n_proc;
pid_t *pids;
- FILE *ofh;
- FILE **fhs;
int *running;
FILE **result_fhs;
int *filename_pipes;
- int *stream_pipe_read;
int *stream_pipe_write;
char **last_filename;
+
+ struct sb_reader *reader;
};
@@ -502,40 +519,96 @@ static int pump_chunk(FILE *fh, FILE *ofh)
}
-static void *run_reader(void *sbv)
+/* Add an fd to the list of pipes to be read from */
+static void add_pipe(struct sb_reader *rd, int fd)
{
- struct sandbox *sb = sbv;
- int done = 0;
+ int *fds_new;
+ FILE **fhs_new;
+ int slot;
+
+ pthread_mutex_lock(&rd->lock);
+
+ fds_new = realloc(rd->fds, (rd->n_read+1)*sizeof(int));
+ if ( fds_new == NULL ) {
+ ERROR("Failed to allocate memory for new pipe.\n");
+ return;
+ }
- while ( !done ) {
+ fhs_new = realloc(rd->fhs, (rd->n_read+1)*sizeof(FILE *));
+ if ( fhs_new == NULL ) {
+ ERROR("Failed to allocate memory for new FH.\n");
+ return;
+ }
+
+ rd->fds = fds_new;
+ rd->fhs = fhs_new;
+ slot = rd->n_read;
+
+ rd->fds[slot] = fd;
+
+ rd->fhs[slot] = fdopen(fd, "r");
+ if ( rd->fhs[slot] == NULL ) {
+ ERROR("Couldn't fdopen() stream!\n");
+ return;
+ }
+
+ rd->n_read++;
+
+ pthread_mutex_unlock(&rd->lock);
+}
+
+
+/* Assumes that the caller is already holding rd->lock! */
+static void remove_pipe(struct sb_reader *rd, int d)
+{
+ int i;
+
+ for ( i=d; i<rd->n_read; i++ ) {
+ if ( i < rd->n_read-1 ) {
+ rd->fds[i] = rd->fds[i+1];
+ rd->fhs[i] = rd->fhs[i+1];
+ } /* else don't bother */
+ }
+
+ rd->n_read--;
+
+ /* We don't bother shrinking the arrays */
+}
+
+
+static void *run_reader(void *rdv)
+{
+ struct sb_reader *rd = rdv;
+
+ while ( 1 ) {
int r, i;
struct timeval tv;
fd_set fds;
int fdmax;
+ /* Exit when:
+ * - No fhs left open to read from
+ * AND - Main thread says "done" */
+ if ( (rd->n_read == 0) && rd->done ) break;
+
tv.tv_sec = 1;
tv.tv_usec = 0;
FD_ZERO(&fds);
fdmax = 0;
- lock_sandbox(sb);
- for ( i=0; i<sb->n_proc; i++ ) {
+ pthread_mutex_lock(&rd->lock);
+ for ( i=0; i<rd->n_read; i++ ) {
int fd;
- /* Listen for output from all processes which have a
- * connection, even if they're not "running". */
- if ( sb->fhs[i] == NULL ) continue;
-
- fd = sb->stream_pipe_read[i];
+ fd = rd->fds[i];
FD_SET(fd, &fds);
if ( fd > fdmax ) fdmax = fd;
}
-
- unlock_sandbox(sb);
+ pthread_mutex_unlock(&rd->lock);
r = select(fdmax+1, &fds, NULL, NULL, &tv);
@@ -546,25 +619,23 @@ static void *run_reader(void *sbv)
continue;
}
- lock_sandbox(sb);
- for ( i=0; i<sb->n_proc; i++ ) {
+ pthread_mutex_lock(&rd->lock);
+ for ( i=0; i<rd->n_read; i++ ) {
- if ( !FD_ISSET(sb->stream_pipe_read[i], &fds) ) {
+ if ( !FD_ISSET(rd->fds[i], &fds) ) {
continue;
}
/* If the chunk cannot be read, assume the connection
* is broken and that the process will die soon. */
- if ( pump_chunk(sb->fhs[i], sb->ofh) ) {
- sb->fhs[i] = NULL;
+ if ( pump_chunk(rd->fhs[i], rd->ofh) ) {
+ /* remove_pipe() assumes that the caller is
+ * holding rd->lock ! */
+ remove_pipe(rd, i);
}
}
-
- done = 1;
- if ( sb->running != NULL ) done = 0;
-
- unlock_sandbox(sb);
+ pthread_mutex_unlock(&rd->lock);
}
@@ -607,6 +678,7 @@ static void start_worker_process(struct sandbox *sb, int slot,
pid_t p;
int filename_pipe[2];
int result_pipe[2];
+ int stream_pipe[2];
if ( pipe(filename_pipe) == - 1 ) {
ERROR("pipe() failed!\n");
@@ -618,6 +690,11 @@ static void start_worker_process(struct sandbox *sb, int slot,
return;
}
+ if ( pipe(stream_pipe) == - 1 ) {
+ ERROR("pipe() failed!\n");
+ return;
+ }
+
p = fork();
if ( p == -1 ) {
ERROR("fork() failed!\n");
@@ -665,7 +742,7 @@ static void start_worker_process(struct sandbox *sb, int slot,
close(filename_pipe[1]);
close(result_pipe[0]);
- st = open_stream_fd_for_write(sb->stream_pipe_write[slot]);
+ st = open_stream_fd_for_write(stream_pipe[1]);
write_command(st, argc, argv);
write_line(st, "FLUSH");
run_work(sb->iargs, filename_pipe[0], result_pipe[1],
@@ -683,14 +760,11 @@ static void start_worker_process(struct sandbox *sb, int slot,
* and the 'read' end of the result pipe. */
sb->pids[slot] = p;
sb->running[slot] = 1;
+ add_pipe(sb->reader, stream_pipe[0]);
close(filename_pipe[0]);
close(result_pipe[1]);
+ close(stream_pipe[1]);
sb->filename_pipes[slot] = filename_pipe[1];
- sb->fhs[slot] = fdopen(sb->stream_pipe_read[slot], "r");
- if ( sb->fhs[slot] == NULL ) {
- ERROR("Couldn't fdopen() stream!\n");
- return;
- }
sb->result_fhs[slot] = fdopen(result_pipe[0], "r");
if ( sb->result_fhs[slot] == NULL ) {
@@ -765,6 +839,16 @@ void create_sandbox(struct index_args *iargs, int n_proc, char *prefix,
return;
}
+ sb->reader = calloc(1, sizeof(struct sb_reader));
+ if ( sb->reader == NULL ) {
+ ERROR("Couldn't allocate memory for SB reader.\n");
+ free(sb);
+ return;
+ }
+
+ pthread_mutex_init(&sb->lock, NULL);
+ pthread_mutex_init(&sb->reader->lock, NULL);
+
sb->n_processed = 0;
sb->n_hadcrystals = 0;
sb->n_crystals = 0;
@@ -775,40 +859,21 @@ void create_sandbox(struct index_args *iargs, int n_proc, char *prefix,
sb->n_proc = n_proc;
sb->iargs = iargs;
- pthread_mutex_init(&sb->lock, NULL);
+ sb->reader->fds = NULL;
+ sb->reader->fhs = NULL;
+ sb->reader->ofh = ofh;
- sb->ofh = ofh;
- sb->stream_pipe_read = calloc(n_proc, sizeof(int));
sb->stream_pipe_write = calloc(n_proc, sizeof(int));
- if ( sb->stream_pipe_read == NULL ) {
- ERROR("Couldn't allocate memory for pipes.\n");
- return;
- }
if ( sb->stream_pipe_write == NULL ) {
ERROR("Couldn't allocate memory for pipes.\n");
return;
}
- for ( i=0; i<n_proc; i++ ) {
-
- int stream_pipe[2];
-
- if ( pipe(stream_pipe) == - 1 ) {
- ERROR("pipe() failed!\n");
- return;
- }
-
- sb->stream_pipe_read[i] = stream_pipe[0];
- sb->stream_pipe_write[i] = stream_pipe[1];
-
- }
-
lock_sandbox(sb);
sb->filename_pipes = calloc(n_proc, sizeof(int));
sb->result_fhs = calloc(n_proc, sizeof(FILE *));
sb->pids = calloc(n_proc, sizeof(pid_t));
sb->running = calloc(n_proc, sizeof(int));
- sb->fhs = calloc(sb->n_proc, sizeof(FILE *));
if ( sb->filename_pipes == NULL ) {
ERROR("Couldn't allocate memory for pipes.\n");
return;
@@ -831,10 +896,6 @@ void create_sandbox(struct index_args *iargs, int n_proc, char *prefix,
ERROR("Couldn't allocate memory for last filename list.\n");
return;
}
- if ( sb->fhs == NULL ) {
- ERROR("Couldn't allocate memory for file handles!\n");
- return;
- }
unlock_sandbox(sb);
if ( pipe(signal_pipe) == -1 ) {
@@ -863,7 +924,8 @@ void create_sandbox(struct index_args *iargs, int n_proc, char *prefix,
/* Start reader thread after forking, so that things are definitely
* "running" */
- if ( pthread_create(&reader_thread, NULL, run_reader, (void *)sb) ) {
+ if ( pthread_create(&reader_thread, NULL, run_reader,
+ (void *)sb->reader) ) {
ERROR("Failed to create reader thread.\n");
return;
}
@@ -1016,10 +1078,9 @@ void create_sandbox(struct index_args *iargs, int n_proc, char *prefix,
fclose(fh);
/* Indicate to the reader thread that we are done */
- lock_sandbox(sb);
- free(sb->running);
- sb->running = NULL;
- unlock_sandbox(sb);
+ pthread_mutex_lock(&sb->reader->lock);
+ sb->reader->done = 1;
+ pthread_mutex_unlock(&sb->reader->lock);
pthread_join(reader_thread, NULL);
@@ -1033,10 +1094,7 @@ void create_sandbox(struct index_args *iargs, int n_proc, char *prefix,
if ( sb->result_fhs[i] != NULL ) fclose(sb->result_fhs[i]);
}
- for ( i=0; i<sb->n_proc; i++ ) {
- fclose(sb->fhs[i]);
- }
- free(sb->fhs);
+ free(sb->running);
free(sb->filename_pipes);
free(sb->result_fhs);
free(sb->pids);