aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorThomas White <taw@physics.org>2013-02-22 10:15:23 +0100
committerThomas White <taw@physics.org>2013-02-22 10:15:23 +0100
commitb821aae2dd29cbdec5ccd33a3929a61ee047f0e6 (patch)
tree73a12f565f8f6e94100c5f4d16fafd7d42fc5eae /src
parent78ef5671c7533799834b6001c0a08f73f16195f9 (diff)
More robust stream marshalling
Diffstat (limited to 'src')
-rw-r--r--src/im-sandbox.c98
1 files changed, 39 insertions, 59 deletions
diff --git a/src/im-sandbox.c b/src/im-sandbox.c
index 0afe9ff3..37ccc35c 100644
--- a/src/im-sandbox.c
+++ b/src/im-sandbox.c
@@ -105,7 +105,6 @@ struct sandbox
FILE **fhs;
int *running;
- int *waiting;
FILE **result_fhs;
int *filename_pipes;
int *stream_pipe_read;
@@ -426,8 +425,6 @@ static void run_work(const struct index_args *iargs,
}
- write_line(st, "DONE");
-
cleanup_indexing(iargs->indm, iargs->ipriv);
free(iargs->indm);
free(iargs->ipriv);
@@ -468,7 +465,6 @@ static time_t get_monotonic_seconds()
static int pump_chunk(FILE *fh, FILE *ofh)
{
int chunk_started = 0;
- int chunk_finished = 0;
do {
@@ -479,40 +475,29 @@ static int pump_chunk(FILE *fh, FILE *ofh)
if ( rval == NULL ) {
if ( feof(fh) ) {
- /* Process died */
+ /* Whoops, connection lost */
if ( chunk_started ) {
ERROR("EOF during chunk!\n");
fprintf(ofh, "Chunk is unfinished!\n");
- }
+ fprintf(ofh, CHUNK_END_MARKER"\n");
+ } /* else normal end of output */
return 1;
} else {
ERROR("fgets() failed: %s\n", strerror(errno));
}
- chunk_finished = 1;
- continue;
+ break;
}
- if ( strcmp(line, "FLUSH\n") == 0 ) {
- chunk_finished = 1;
- continue;
- }
-
- if ( strcmp(line, "DONE\n") == 0 ) {
- return 1;
- }
+ if ( strcmp(line, "FLUSH\n") == 0 ) break;
fprintf(ofh, "%s", line);
- if ( strcmp(line, CHUNK_END_MARKER"\n") == 0 ) {
- chunk_finished = 1;
- }
- if ( strcmp(line, CHUNK_START_MARKER"\n") == 0 ) {
- chunk_started = 1;
- }
+ if ( strcmp(line, CHUNK_END_MARKER"\n") == 0 ) break;
+ if ( strcmp(line, CHUNK_START_MARKER"\n") == 0 ) break;
- } while ( !chunk_finished );
+ } while ( 1 );
return 0;
}
@@ -529,7 +514,7 @@ static void *run_reader(void *sbv)
fd_set fds;
int fdmax;
- tv.tv_sec = 5;
+ tv.tv_sec = 1;
tv.tv_usec = 0;
FD_ZERO(&fds);
@@ -539,7 +524,9 @@ static void *run_reader(void *sbv)
int fd;
- if ( !sb->running[i] ) continue;
+ /* Listen for output from all processes which have a
+ * connection, even if they're not "running". */
+ if ( sb->fhs[i] == NULL ) continue;
fd = sb->stream_pipe_read[i];
@@ -559,28 +546,24 @@ static void *run_reader(void *sbv)
continue;
}
- if ( r == 0 ) continue; /* Nothing this time. Try again */
-
lock_sandbox(sb);
for ( i=0; i<sb->n_proc; i++ ) {
- if ( !sb->running[i] ) continue;
-
if ( !FD_ISSET(sb->stream_pipe_read[i], &fds) ) {
continue;
}
+ /* If the chunk cannot be read, assume the connection
+ * is broken and that the process will die soon. */
if ( pump_chunk(sb->fhs[i], sb->ofh) ) {
- sb->running[i] = 0;
- sb->waiting[i] = 0;
+ sb->fhs[i] = NULL;
}
}
done = 1;
- for ( i=0; i<sb->n_proc; i++ ) {
- if ( sb->running[i] ) done = 0;
- }
+ if ( sb->running != NULL ) done = 0;
+
unlock_sandbox(sb);
}
@@ -733,7 +716,6 @@ static void handle_zombie(struct sandbox *sb)
int status, p;
if ( !sb->running[i] ) continue;
- if ( sb->waiting[i] ) continue;
p = waitpid(sb->pids[i], &status, WNOHANG);
@@ -745,7 +727,6 @@ static void handle_zombie(struct sandbox *sb)
if ( p == sb->pids[i] ) {
sb->running[i] = 0;
- sb->waiting[i] = 1;
if ( WIFEXITED(status) ) {
continue;
@@ -827,7 +808,6 @@ void create_sandbox(struct index_args *iargs, int n_proc, char *prefix,
sb->result_fhs = calloc(n_proc, sizeof(FILE *));
sb->pids = calloc(n_proc, sizeof(pid_t));
sb->running = calloc(n_proc, sizeof(int));
- sb->waiting = calloc(n_proc, sizeof(int));
sb->fhs = calloc(sb->n_proc, sizeof(FILE *));
if ( sb->filename_pipes == NULL ) {
ERROR("Couldn't allocate memory for pipes.\n");
@@ -845,10 +825,6 @@ void create_sandbox(struct index_args *iargs, int n_proc, char *prefix,
ERROR("Couldn't allocate memory for process flags.\n");
return;
}
- if ( sb->waiting == NULL ) {
- ERROR("Couldn't allocate memory for process flags.\n");
- return;
- }
sb->last_filename = calloc(n_proc, sizeof(char *));
if ( sb->last_filename == NULL ) {
@@ -861,11 +837,6 @@ void create_sandbox(struct index_args *iargs, int n_proc, char *prefix,
}
unlock_sandbox(sb);
- if ( pthread_create(&reader_thread, NULL, run_reader, (void *)sb) ) {
- ERROR("Failed to create reader thread.\n");
- return;
- }
-
if ( pipe(signal_pipe) == -1 ) {
ERROR("Failed to create signal pipe.\n");
return;
@@ -890,6 +861,13 @@ void create_sandbox(struct index_args *iargs, int n_proc, char *prefix,
}
unlock_sandbox(sb);
+ /* Start reader thread after forking, so that things are definitely
+ * "running" */
+ if ( pthread_create(&reader_thread, NULL, run_reader, (void *)sb) ) {
+ ERROR("Failed to create reader thread.\n");
+ return;
+ }
+
allDone = 0;
while ( !allDone ) {
@@ -909,9 +887,7 @@ void create_sandbox(struct index_args *iargs, int n_proc, char *prefix,
int fd;
- if ( !sb->running[i] ) {
- continue;
- }
+ if ( sb->result_fhs[i] == NULL) continue;
fd = fileno(sb->result_fhs[i]);
FD_SET(fd, &fds);
@@ -949,14 +925,10 @@ void create_sandbox(struct index_args *iargs, int n_proc, char *prefix,
int fd;
char *eptr;
- if ( !sb->running[i] ) {
- continue;
- }
+ if ( sb->result_fhs[i] == NULL ) continue;
fd = fileno(sb->result_fhs[i]);
- if ( !FD_ISSET(fd, &fds) ) {
- continue;
- }
+ if ( !FD_ISSET(fd, &fds) ) continue;
rval = fgets(results, 1024, sb->result_fhs[i]);
if ( rval == NULL ) {
@@ -964,6 +936,7 @@ void create_sandbox(struct index_args *iargs, int n_proc, char *prefix,
ERROR("fgets() failed: %s\n",
strerror(errno));
}
+ sb->result_fhs[i] = NULL;
continue;
}
@@ -1042,7 +1015,13 @@ void create_sandbox(struct index_args *iargs, int n_proc, char *prefix,
fclose(fh);
- pthread_mutex_destroy(&sb->lock);
+ /* Indicate to the reader thread that we are done */
+ lock_sandbox(sb);
+ free(sb->running);
+ sb->running = NULL;
+ unlock_sandbox(sb);
+
+ pthread_join(reader_thread, NULL);
for ( i=0; i<n_proc; i++ ) {
int status;
@@ -1051,7 +1030,7 @@ void create_sandbox(struct index_args *iargs, int n_proc, char *prefix,
for ( i=0; i<n_proc; i++ ) {
close(sb->filename_pipes[i]);
- fclose(sb->result_fhs[i]);
+ if ( sb->result_fhs[i] != NULL ) fclose(sb->result_fhs[i]);
}
for ( i=0; i<sb->n_proc; i++ ) {
@@ -1061,11 +1040,12 @@ void create_sandbox(struct index_args *iargs, int n_proc, char *prefix,
free(sb->filename_pipes);
free(sb->result_fhs);
free(sb->pids);
- free(sb->running);
- free(sb->waiting);
+
+ pthread_mutex_destroy(&sb->lock);
STATUS("Final:"
" %i images processed, %i had crystals, %i crystals overall.\n",
sb->n_processed, sb->n_hadcrystals, sb->n_crystals);
+ free(sb);
}