aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorThomas White <taw@physics.org>2015-07-13 14:01:45 +0200
committerThomas White <taw@physics.org>2015-07-13 16:00:16 +0200
commit65d5d478b4288b26a455e260ee9bc153a1789f29 (patch)
tree61c602653c4c216bdd54005b872a88dc40df9e86
parent7591530ff83fc538ac5d92792d01d3b6fe88c781 (diff)
indexamajig: Avoid forking while multithreaded
Conflicts: src/im-sandbox.c
-rw-r--r--src/im-sandbox.c201
1 files changed, 69 insertions, 132 deletions
diff --git a/src/im-sandbox.c b/src/im-sandbox.c
index 747c7d03..1b3dbfcd 100644
--- a/src/im-sandbox.c
+++ b/src/im-sandbox.c
@@ -65,27 +65,6 @@
#include "process_image.h"
-/* Write statistics at APPROXIMATELY this interval */
-#define STATS_EVERY_N_SECONDS (5)
-
-struct sb_reader
-{
- pthread_mutex_t lock;
- int done;
-
- /* If a worker process dies unexpectedly (e.g. if it segfaults), then
- * the pipe for its output can still stay open for a little while while
- * its buffer empties. The number of pipes being read from is therefore
- * not necessarily the same as the number of worker processes. */
- int n_read;
- FILE **fhs;
- int *fds;
-
- /* Final output */
- Stream *stream;
-};
-
-
struct sandbox
{
int n_processed_last_stats;
@@ -106,7 +85,13 @@ struct sandbox
char *tmpdir;
- struct sb_reader *reader;
+ /* Streams to read from */
+ int n_read;
+ FILE **fhs;
+ int *fds;
+
+ /* Final output */
+ Stream *stream;
};
@@ -399,129 +384,105 @@ static int pump_chunk(FILE *fh, int ofd)
/* Add an fd to the list of pipes to be read from */
-static void add_pipe(struct sb_reader *rd, int fd)
+static void add_pipe(struct sandbox *sb, int fd)
{
int *fds_new;
FILE **fhs_new;
int slot;
- pthread_mutex_lock(&rd->lock);
-
- fds_new = realloc(rd->fds, (rd->n_read+1)*sizeof(int));
+ fds_new = realloc(sb->fds, (sb->n_read+1)*sizeof(int));
if ( fds_new == NULL ) {
ERROR("Failed to allocate memory for new pipe.\n");
return;
}
- fhs_new = realloc(rd->fhs, (rd->n_read+1)*sizeof(FILE *));
+ fhs_new = realloc(sb->fhs, (sb->n_read+1)*sizeof(FILE *));
if ( fhs_new == NULL ) {
ERROR("Failed to allocate memory for new FH.\n");
return;
}
- rd->fds = fds_new;
- rd->fhs = fhs_new;
- slot = rd->n_read;
+ sb->fds = fds_new;
+ sb->fhs = fhs_new;
+ slot = sb->n_read;
- rd->fds[slot] = fd;
+ sb->fds[slot] = fd;
- rd->fhs[slot] = fdopen(fd, "r");
- if ( rd->fhs[slot] == NULL ) {
+ sb->fhs[slot] = fdopen(fd, "r");
+ if ( sb->fhs[slot] == NULL ) {
ERROR("Couldn't fdopen() stream!\n");
return;
}
- rd->n_read++;
-
- pthread_mutex_unlock(&rd->lock);
+ sb->n_read++;
}
-/* Assumes that the caller is already holding rd->lock! */
-static void remove_pipe(struct sb_reader *rd, int d)
+static void remove_pipe(struct sandbox *sb, int d)
{
int i;
- fclose(rd->fhs[d]);
+ fclose(sb->fhs[d]);
- for ( i=d; i<rd->n_read; i++ ) {
- if ( i < rd->n_read-1 ) {
- rd->fds[i] = rd->fds[i+1];
- rd->fhs[i] = rd->fhs[i+1];
+ for ( i=d; i<sb->n_read; i++ ) {
+ if ( i < sb->n_read-1 ) {
+ sb->fds[i] = sb->fds[i+1];
+ sb->fhs[i] = sb->fhs[i+1];
} /* else don't bother */
}
- rd->n_read--;
+ sb->n_read--;
/* We don't bother shrinking the arrays */
}
-static void *run_reader(void *rdv)
+static void try_read(struct sandbox *sb)
{
- struct sb_reader *rd = rdv;
- const int ofd = get_stream_fd(rd->stream);
-
- while ( 1 ) {
+ int r, i;
+ struct timeval tv;
+ fd_set fds;
+ int fdmax;
+ const int ofd = get_stream_fd(sb->stream);
- int r, i;
- struct timeval tv;
- fd_set fds;
- int fdmax;
+ tv.tv_sec = 5;
+ tv.tv_usec = 0;
- /* Exit when:
- * - No fhs left open to read from
- * AND - Main thread says "done" */
- if ( (rd->n_read == 0) && rd->done ) break;
+ FD_ZERO(&fds);
+ fdmax = 0;
+ for ( i=0; i<sb->n_read; i++ ) {
- tv.tv_sec = 1;
- tv.tv_usec = 0;
+ int fd;
- FD_ZERO(&fds);
- fdmax = 0;
- pthread_mutex_lock(&rd->lock);
- for ( i=0; i<rd->n_read; i++ ) {
+ fd = sb->fds[i];
- int fd;
+ FD_SET(fd, &fds);
+ if ( fd > fdmax ) fdmax = fd;
- fd = rd->fds[i];
+ }
- FD_SET(fd, &fds);
- if ( fd > fdmax ) fdmax = fd;
+ r = select(fdmax+1, &fds, NULL, NULL, &tv);
- }
- pthread_mutex_unlock(&rd->lock);
+ if ( r == -1 ) {
+ if ( errno != EINTR ) {
+ ERROR("select() failed: %s\n", strerror(errno));
+ } /* Otherwise no big deal */
+ return;
+ }
- r = select(fdmax+1, &fds, NULL, NULL, &tv);
+ for ( i=0; i<sb->n_read; i++ ) {
- if ( r == -1 ) {
- if ( errno != EINTR ) {
- ERROR("select() failed: %s\n", strerror(errno));
- } /* Otherwise no big deal */
+ if ( !FD_ISSET(sb->fds[i], &fds) ) {
continue;
}
- pthread_mutex_lock(&rd->lock);
- for ( i=0; i<rd->n_read; i++ ) {
-
- if ( !FD_ISSET(rd->fds[i], &fds) ) {
- continue;
- }
-
- /* If the chunk cannot be read, assume the connection
- * is broken and that the process will die soon. */
- if ( pump_chunk(rd->fhs[i], ofd) ) {
- /* remove_pipe() assumes that the caller is
- * holding rd->lock ! */
- remove_pipe(rd, i);
- }
-
+ /* If the chunk cannot be read, assume the connection
+ * is broken and that the process will die soon. */
+ if ( pump_chunk(sb->fhs[i], ofd) ) {
+ remove_pipe(sb, i);
}
- pthread_mutex_unlock(&rd->lock);
}
-
- return NULL;
}
@@ -586,12 +547,11 @@ static void start_worker_process(struct sandbox *sb, int slot)
/* Free resources which will not be needed by worker */
free(sb->pids);
- for ( i=0; i<sb->reader->n_read; i++ ) {
- fclose(sb->reader->fhs[i]);
+ for ( i=0; i<sb->n_read; i++ ) {
+ fclose(sb->fhs[i]);
}
- free(sb->reader->fhs);
- free(sb->reader->fds);
- free(sb->reader);
+ free(sb->fhs);
+ free(sb->fds);
free(sb->tmpdir);
free(sb->running);
/* Not freed because it's not worth passing them down just for
@@ -620,7 +580,7 @@ static void start_worker_process(struct sandbox *sb, int slot)
* and the 'read' end of the result pipe. */
sb->pids[slot] = p;
sb->running[slot] = 1;
- add_pipe(sb->reader, stream_pipe[0]);
+ add_pipe(sb, stream_pipe[0]);
close(stream_pipe[1]);
}
@@ -752,7 +712,6 @@ void create_sandbox(struct index_args *iargs, int n_proc, char *prefix,
int i;
struct sigaction sa;
int r;
- pthread_t reader_thread;
struct sandbox *sb;
size_t ll;
struct stat s;
@@ -770,15 +729,6 @@ void create_sandbox(struct index_args *iargs, int n_proc, char *prefix,
return;
}
- sb->reader = calloc(1, sizeof(struct sb_reader));
- if ( sb->reader == NULL ) {
- ERROR("Couldn't allocate memory for SB reader.\n");
- free(sb);
- return;
- }
-
- pthread_mutex_init(&sb->reader->lock, NULL);
-
sb->n_processed_last_stats = 0;
sb->n_hadcrystals_last_stats = 0;
sb->n_crystals_last_stats = 0;
@@ -787,9 +737,9 @@ void create_sandbox(struct index_args *iargs, int n_proc, char *prefix,
sb->iargs = iargs;
sb->serial = 1;
- sb->reader->fds = NULL;
- sb->reader->fhs = NULL;
- sb->reader->stream = stream;
+ sb->fds = NULL;
+ sb->fhs = NULL;
+ sb->stream = stream;
if ( setup_shm(sb) ) {
ERROR("Failed to set up SHM.\n");
@@ -867,19 +817,12 @@ void create_sandbox(struct index_args *iargs, int n_proc, char *prefix,
start_worker_process(sb, i);
}
- /* Start reader thread after forking, so that things are definitely
- * "running" */
- if ( pthread_create(&reader_thread, NULL, run_reader,
- (void *)sb->reader) ) {
- ERROR("Failed to create reader thread.\n");
- return;
- }
-
do {
int r;
double tNow;
+ try_read(sb);
sleep(5);
/* Check for dead workers */
@@ -944,25 +887,19 @@ void create_sandbox(struct index_args *iargs, int n_proc, char *prefix,
}
for ( i=0; i<n_proc; i++ ) {
int status;
- waitpid(sb->pids[i], &status, 0);
+ while ( waitpid(sb->pids[i], &status, WNOHANG) == 0 ) {
+ try_read(sb);
+ }
}
- /* Indicate to the reader thread that we are done */
- pthread_mutex_lock(&sb->reader->lock);
- sb->reader->done = 1;
- pthread_mutex_unlock(&sb->reader->lock);
-
- pthread_join(reader_thread, NULL);
-
- for ( i=0; i<sb->reader->n_read; i++ ) {
- fclose(sb->reader->fhs[i]);
+ for ( i=0; i<sb->n_read; i++ ) {
+ fclose(sb->fhs[i]);
}
- free(sb->reader->fhs);
- free(sb->reader->fds);
+ free(sb->fhs);
+ free(sb->fds);
free(sb->running);
free(sb->pids);
free(sb->tmpdir);
- free(sb->reader);
STATUS("Final: %i images processed, %i had crystals (%.1f%%),"
" %i crystals overall.\n",