aboutsummaryrefslogtreecommitdiff
path: root/src/im-sandbox.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/im-sandbox.c')
-rw-r--r--src/im-sandbox.c502
1 files changed, 278 insertions, 224 deletions
diff --git a/src/im-sandbox.c b/src/im-sandbox.c
index a5806734..85576ba1 100644
--- a/src/im-sandbox.c
+++ b/src/im-sandbox.c
@@ -75,10 +75,31 @@
struct sandbox
{
+ int n_indexable;
+ int n_processed;
+ int n_indexable_last_stats;
+ int n_processed_last_stats;
+ int t_last_stats;
+
+ struct index_args *iargs;
+
+ int n_proc;
+ pid_t *pids;
+ FILE *ofh;
+ int *running;
+ FILE **result_fhs;
+ int *filename_pipes;
+ int *stream_pipe_read;
+ int *stream_pipe_write;
+ char **last_filename;
};
+/* Horrible global variable for signal handler */
+struct sandbox *sb;
+
+
static char *get_pattern(FILE *fh, char **use_this_one_instead,
int config_basename, const char *prefix)
{
@@ -285,6 +306,7 @@ static void run_work(const struct index_args *iargs,
{
int allDone = 0;
FILE *fh;
+ int w;
fh = fdopen(filename_pipe, "r");
if ( fh == NULL ) {
@@ -292,13 +314,18 @@ static void run_work(const struct index_args *iargs,
return;
}
+ w = write(results_pipe, "\n", 1);
+ if ( w < 0 ) {
+ ERROR("Failed to send request for first filename.\n");
+ }
+
while ( !allDone ) {
struct pattern_args pargs;
- int w, c;
- char buf[1024];
+ int c;
char *line;
char *rval;
+ char buf[1024];
line = malloc(1024*sizeof(char));
rval = fgets(line, 1023, fh);
@@ -306,6 +333,7 @@ static void run_work(const struct index_args *iargs,
free(line);
if ( feof(fh) ) {
allDone = 1;
+ STATUS("Exiting!\n");
continue;
} else {
ERROR("Read error!\n");
@@ -377,7 +405,7 @@ static time_t get_monotonic_seconds()
#endif
-static void pump_chunk(FILE *fh, int *finished, FILE *ofh)
+static int pump_chunk(FILE *fh, FILE *ofh)
{
int chunk_started = 0;
int chunk_finished = 0;
@@ -392,11 +420,11 @@ static void pump_chunk(FILE *fh, int *finished, FILE *ofh)
if ( feof(fh) ) {
/* Process died */
- *finished = 1;
if ( chunk_started ) {
ERROR("EOF during chunk!\n");
fprintf(ofh, "Chunk is unfinished!\n");
}
+ return 1;
} else {
ERROR("fgets() failed: %s\n", strerror(errno));
}
@@ -413,33 +441,29 @@ static void pump_chunk(FILE *fh, int *finished, FILE *ofh)
}
} while ( !chunk_finished );
+
+ return 0;
}
-static void run_reader(int *stream_pipe_read, int n_proc, FILE *ofh)
+static void *run_reader(void *sbv)
{
+ struct sandbox *sb = sbv;
int done = 0;
- int *finished;
FILE **fhs;
int i;
- finished = calloc(n_proc, sizeof(int));
- if ( finished == NULL ) {
- ERROR("Couldn't allocate memory for flags!\n");
- exit(1);
- }
-
- fhs = calloc(n_proc, sizeof(FILE *));
+ fhs = calloc(sb->n_proc, sizeof(FILE *));
if ( fhs == NULL ) {
ERROR("Couldn't allocate memory for file handles!\n");
- exit(1);
+ return NULL;
}
- for ( i=0; i<n_proc; i++ ) {
- fhs[i] = fdopen(stream_pipe_read[i], "r");
+ for ( i=0; i<sb->n_proc; i++ ) {
+ fhs[i] = fdopen(sb->stream_pipe_read[i], "r");
if ( fhs[i] == NULL ) {
ERROR("Couldn't fdopen() stream!\n");
- exit(1);
+ return NULL;
}
}
@@ -455,13 +479,13 @@ static void run_reader(int *stream_pipe_read, int n_proc, FILE *ofh)
FD_ZERO(&fds);
fdmax = 0;
- for ( i=0; i<n_proc; i++ ) {
+ for ( i=0; i<sb->n_proc; i++ ) {
int fd;
- if ( finished[i] ) continue;
+ if ( !sb->running[i] ) continue;
- fd = stream_pipe_read[i];
+ fd = sb->stream_pipe_read[i];
FD_SET(fd, &fds);
if ( fd > fdmax ) fdmax = fd;
@@ -471,45 +495,176 @@ static void run_reader(int *stream_pipe_read, int n_proc, FILE *ofh)
r = select(fdmax+1, &fds, NULL, NULL, &tv);
if ( r == -1 ) {
- ERROR("select() failed: %s\n", strerror(errno));
+ if ( errno != EINTR ) {
+ ERROR("select() failed: %s\n", strerror(errno));
+ } /* Otherwise no big deal */
continue;
}
if ( r == 0 ) continue; /* Nothing this time. Try again */
- for ( i=0; i<n_proc; i++ ) {
+ for ( i=0; i<sb->n_proc; i++ ) {
- if ( finished[i] ) continue;
+ if ( !sb->running[i] ) continue;
- if ( !FD_ISSET(stream_pipe_read[i], &fds) ) continue;
+ if ( !FD_ISSET(sb->stream_pipe_read[i], &fds) ) continue;
- pump_chunk(fhs[i], &finished[i], ofh);
+ if ( pump_chunk(fhs[i], sb->ofh) ) {
+ sb->running[i] = 0;
+ }
}
done = 1;
- for ( i=0; i<n_proc; i++ ) {
- if ( !finished[i] ) done = 0;
+ for ( i=0; i<sb->n_proc; i++ ) {
+ if ( sb->running[i] ) done = 0;
}
}
- free(finished);
-
- for ( i=0; i<n_proc; i++ ) {
+ for ( i=0; i<sb->n_proc; i++ ) {
fclose(fhs[i]);
}
free(fhs);
- if ( ofh != stdout ) fclose(ofh);
+ return NULL;
+}
+
+
+static void start_worker_process(struct sandbox *sb, int slot)
+{
+ pid_t p;
+ int filename_pipe[2];
+ int result_pipe[2];
+
+ if ( pipe(filename_pipe) == - 1 ) {
+ ERROR("pipe() failed!\n");
+ return;
+ }
+
+ if ( pipe(result_pipe) == - 1 ) {
+ ERROR("pipe() failed!\n");
+ return;
+ }
+
+ p = fork();
+ if ( p == -1 ) {
+ ERROR("fork() failed!\n");
+ return;
+ }
+
+ if ( p == 0 ) {
+
+ FILE *sfh;
+ int j;
+ struct sigaction sa;
+ int r;
+
+ /* First, disconnect the signal handler */
+ sa.sa_flags = 0;
+ sigemptyset(&sa.sa_mask);
+ sa.sa_handler = SIG_DFL;
+ r = sigaction(SIGCHLD, &sa, NULL);
+ if ( r == -1 ) {
+ ERROR("Failed to set signal handler!\n");
+ return;
+ }
+
+ /* Free resources which will not be needed by worker */
+ for ( j=0; j<sb->n_proc; j++ ) {
+ if ( (j != slot) && (sb->running[j]) ) {
+ close(sb->stream_pipe_write[j]);
+ }
+ }
+ for ( j=0; j<sb->n_proc; j++ ) {
+ if ( (j != slot) && (sb->running[j]) ) {
+ fclose(sb->result_fhs[j]);
+ close(sb->filename_pipes[j]);
+ }
+ }
+ free(sb->filename_pipes);
+ free(sb->result_fhs);
+ free(sb->pids);
+ /* Also prefix, use_this_one_instead and fh */
+
+ /* Child process gets the 'read' end of the filename
+ * pipe, and the 'write' end of the result pipe. */
+ close(filename_pipe[1]);
+ close(result_pipe[0]);
+
+ sfh = fdopen(sb->stream_pipe_write[slot], "w");
+ run_work(sb->iargs, filename_pipe[0], result_pipe[1],
+ sfh, slot);
+ fclose(sfh);
+
+ free(sb->stream_pipe_write);
+ close(filename_pipe[0]);
+ close(result_pipe[1]);
+
+ exit(0);
+
+ }
+
+ /* Parent process gets the 'write' end of the filename pipe
+ * and the 'read' end of the result pipe. */
+ sb->pids[slot] = p;
+ sb->running[slot] = 1;
+ close(filename_pipe[0]);
+ close(result_pipe[1]);
+ sb->filename_pipes[slot] = filename_pipe[1];
+
+ sb->result_fhs[slot] = fdopen(result_pipe[0], "r");
+ if ( sb->result_fhs[slot] == NULL ) {
+ ERROR("fdopen() failed.\n");
+ return;
+ }
}
static void signal_handler(int sig, siginfo_t *si, void *uc_v)
{
- struct ucontext_t *uc = uc_v;
+ int i, found;
- STATUS("Signal!\n");
+ if ( si->si_signo != SIGCHLD ) {
+ ERROR("Unhandled signal %i?\n", si->si_signo);
+ return;
+ }
+
+ found = 0;
+ for ( i=0; i<sb->n_proc; i++ ) {
+ if ( (sb->running[i]) && (sb->pids[i] == si->si_pid) ) {
+ found = 1;
+ break;
+ }
+ }
+
+ if ( !found ) {
+ ERROR("SIGCHLD from unknown child %i?\n", si->si_pid);
+ return;
+ }
+
+ if ( (si->si_code == CLD_TRAPPED) || (si->si_code == CLD_STOPPED)
+ || (si->si_code == CLD_CONTINUED) ) return;
+
+ if ( si->si_code == CLD_EXITED )
+ {
+ sb->running[i] = 0;
+ STATUS("Worker process %i exited normally.\n", i);
+ return;
+ }
+
+ if ( (si->si_code != CLD_DUMPED) && (si->si_code != CLD_KILLED) ) {
+ ERROR("Unhandled si_code %i (worker process %i).\n",
+ si->si_code, i);
+ return;
+ }
+
+ ERROR("Worker process %i exited abnormally!\n", i);
+ ERROR(" -> Signal %i, last filename %s.\n",
+ si->si_signo, sb->last_filename[i]);
+
+ sb->running[i] = 0;
+ //start_worker_process(sb, i);
}
@@ -517,34 +672,34 @@ void create_sandbox(struct index_args *iargs, int n_proc, char *prefix,
int config_basename, FILE *fh, char *use_this_one_instead,
FILE *ofh)
{
- int n_indexable, n_processed, n_indexable_last_stats;
- int n_processed_last_stats;
- int t_last_stats;
- pid_t *pids;
- int *filename_pipes;
- int *stream_pipe_read;
- int *stream_pipe_write;
- FILE **result_fhs;
int i;
int allDone;
- int *finished;
- pid_t pr;
struct sigaction sa;
int r;
+ pthread_t reader_thread;
- n_indexable = 0;
- n_processed = 0;
- n_indexable_last_stats = 0;
- n_processed_last_stats = 0;
- t_last_stats = get_monotonic_seconds();
+ sb = calloc(1, sizeof(struct sandbox));
+ if ( sb == NULL ) {
+ ERROR("Couldn't allocate memory for sandbox.\n");
+ return;
+ }
- stream_pipe_read = calloc(n_proc, sizeof(int));
- stream_pipe_write = calloc(n_proc, sizeof(int));
- if ( stream_pipe_read == NULL ) {
+ sb->n_indexable = 0;
+ sb->n_processed = 0;
+ sb->n_indexable_last_stats = 0;
+ sb->n_processed_last_stats = 0;
+ sb->t_last_stats = get_monotonic_seconds();
+ sb->n_proc = n_proc;
+ sb->ofh = ofh;
+ sb->iargs = iargs;
+
+ sb->stream_pipe_read = calloc(n_proc, sizeof(int));
+ sb->stream_pipe_write = calloc(n_proc, sizeof(int));
+ if ( sb->stream_pipe_read == NULL ) {
ERROR("Couldn't allocate memory for pipes.\n");
return;
}
- if ( stream_pipe_write == NULL ) {
+ if ( sb->stream_pipe_write == NULL ) {
ERROR("Couldn't allocate memory for pipes.\n");
return;
}
@@ -558,46 +713,16 @@ void create_sandbox(struct index_args *iargs, int n_proc, char *prefix,
return;
}
- stream_pipe_read[i] = stream_pipe[0];
- stream_pipe_write[i] = stream_pipe[1];
+ sb->stream_pipe_read[i] = stream_pipe[0];
+ sb->stream_pipe_write[i] = stream_pipe[1];
}
- pr = fork();
- if ( pr == - 1 ) {
- ERROR("fork() failed (for reader process)\n");
+ if ( pthread_create(&reader_thread, NULL, run_reader, (void *)sb) ) {
+ ERROR("Failed to create reader thread.\n");
return;
}
- if ( pr == 0 ) {
-
- /* Free resources not needed by reader
- * (but which will be needed by worker or master) */
- for ( i=0; i<n_proc; i++ ) {
- close(stream_pipe_write[i]);
- }
- free(prefix);
- free(use_this_one_instead);
- free(stream_pipe_write);
- cleanup_indexing(iargs->ipriv);
- free(iargs->indm);
- free(iargs->ipriv);
- free_detector_geometry(iargs->det);
- free(iargs->beam);
- free(iargs->element);
- free(iargs->hdf5_peak_path);
- free_copy_hdf5_field_list(iargs->copyme);
- cell_free(iargs->cell);
- fclose(fh);
-
- run_reader(stream_pipe_read, n_proc, ofh);
-
- free(stream_pipe_read);
-
- exit(0);
-
- }
-
/* Set up signal handler to take action if any children die */
sa.sa_flags = SA_SIGINFO | SA_NOCLDSTOP;
sigemptyset(&sa.sa_mask);
@@ -608,119 +733,37 @@ void create_sandbox(struct index_args *iargs, int n_proc, char *prefix,
return;
}
- /* Free resources needed by reader only */
- if ( ofh != stdout ) fclose(ofh);
- for ( i=0; i<n_proc; i++ ) {
- close(stream_pipe_read[i]);
- }
- free(stream_pipe_read);
-
- filename_pipes = calloc(n_proc, sizeof(int));
- result_fhs = calloc(n_proc, sizeof(FILE *));
- pids = calloc(n_proc, sizeof(pid_t));
- if ( filename_pipes == NULL ) {
+ sb->filename_pipes = calloc(n_proc, sizeof(int));
+ sb->result_fhs = calloc(n_proc, sizeof(FILE *));
+ sb->pids = calloc(n_proc, sizeof(pid_t));
+ sb->running = calloc(n_proc, sizeof(int));
+ if ( sb->filename_pipes == NULL ) {
ERROR("Couldn't allocate memory for pipes.\n");
return;
}
- if ( result_fhs == NULL ) {
+ if ( sb->result_fhs == NULL ) {
ERROR("Couldn't allocate memory for pipe file handles.\n");
return;
}
- if ( pids == NULL ) {
+ if ( sb->pids == NULL ) {
ERROR("Couldn't allocate memory for PIDs.\n");
return;
}
+ if ( sb->running == NULL ) {
+ ERROR("Couldn't allocate memory for process flags.\n");
+ return;
+ }
- /* Fork the right number of times */
- for ( i=0; i<n_proc; i++ ) {
-
- pid_t p;
- int filename_pipe[2];
- int result_pipe[2];
-
- if ( pipe(filename_pipe) == - 1 ) {
- ERROR("pipe() failed!\n");
- return;
- }
-
- if ( pipe(result_pipe) == - 1 ) {
- ERROR("pipe() failed!\n");
- return;
- }
-
- p = fork();
- if ( p == -1 ) {
- ERROR("fork() failed!\n");
- return;
- }
-
- if ( p == 0 ) {
-
- FILE *sfh;
- int j;
-
- /* Free resources which will not be needed by worker */
- for ( j=0; j<n_proc; j++ ) {
- if ( i != j ) close(stream_pipe_write[j]);
- }
- for ( j=0; j<i-1; j++ ) {
- fclose(result_fhs[j]);
- close(filename_pipes[j]);
- }
- free(prefix);
- free(use_this_one_instead);
- free(filename_pipes);
- free(result_fhs);
- fclose(fh);
- free(pids);
-
- /* Child process gets the 'read' end of the filename
- * pipe, and the 'write' end of the result pipe. */
- close(filename_pipe[1]);
- close(result_pipe[0]);
-
- sfh = fdopen(stream_pipe_write[i], "w");
- run_work(iargs, filename_pipe[0], result_pipe[1],
- sfh, i);
- fclose(sfh);
-
- free(stream_pipe_write);
- close(filename_pipe[0]);
- close(result_pipe[1]);
-
- exit(0);
-
- }
-
- /* Parent process gets the 'write' end of the filename pipe
- * and the 'read' end of the result pipe. */
- pids[i] = p;
- close(filename_pipe[0]);
- close(result_pipe[1]);
- filename_pipes[i] = filename_pipe[1];
-
- result_fhs[i] = fdopen(result_pipe[0], "r");
- if ( result_fhs[i] == NULL ) {
- ERROR("fdopen() failed.\n");
- return;
- }
-
+ sb->last_filename = calloc(n_proc, sizeof(char *));
+ if ( sb->last_filename == NULL ) {
+ ERROR("Couldn't allocate memory for last filename list.\n");
+ return;
}
- /* Free resources which will not be used by the main thread */
- cleanup_indexing(iargs->ipriv);
- free(iargs->indm);
- free(iargs->ipriv);
- free_detector_geometry(iargs->det);
- free(iargs->beam);
- free(iargs->element);
- free(iargs->hdf5_peak_path);
- free_copy_hdf5_field_list(iargs->copyme);
- cell_free(iargs->cell);
+ /* Fork the right number of times */
for ( i=0; i<n_proc; i++ ) {
- close(stream_pipe_write[i]);
+ start_worker_process(sb, i);
}
- free(stream_pipe_write);
/* Send first image to all children */
for ( i=0; i<n_proc; i++ ) {
@@ -732,8 +775,12 @@ void create_sandbox(struct index_args *iargs, int n_proc, char *prefix,
if ( nextImage != NULL ) {
- write(filename_pipes[i], nextImage, strlen(nextImage));
- write(filename_pipes[i], "\n", 1);
+ free(sb->last_filename[i]);
+ sb->last_filename[i] = strdup(nextImage);
+
+ write(sb->filename_pipes[i], nextImage,
+ strlen(nextImage));
+ write(sb->filename_pipes[i], "\n", 1);
free(nextImage);
@@ -742,7 +789,7 @@ void create_sandbox(struct index_args *iargs, int n_proc, char *prefix,
int r;
/* No more files to process.. already? */
- r = write(filename_pipes[i], "\n", 1);
+ r = write(sb->filename_pipes[i], "\n", 1);
if ( r < 0 ) {
ERROR("Write pipe\n");
}
@@ -751,12 +798,6 @@ void create_sandbox(struct index_args *iargs, int n_proc, char *prefix,
}
- finished = calloc(n_proc, sizeof(int));
- if ( finished == NULL ) {
- ERROR("Couldn't allocate memory for process flags.\n");
- return;
- }
-
allDone = 0;
while ( !allDone ) {
@@ -775,9 +816,9 @@ void create_sandbox(struct index_args *iargs, int n_proc, char *prefix,
int fd;
- if ( finished[i] ) continue;
+ if ( !sb->running[i] ) continue;
- fd = fileno(result_fhs[i]);
+ fd = fileno(sb->result_fhs[i]);
FD_SET(fd, &fds);
if ( fd > fdmax ) fdmax = fd;
@@ -786,7 +827,9 @@ void create_sandbox(struct index_args *iargs, int n_proc, char *prefix,
r = select(fdmax+1, &fds, NULL, NULL, &tv);
if ( r == -1 ) {
- ERROR("select() failed: %s\n", strerror(errno));
+ if ( errno != EINTR ) {
+ ERROR("select() failed: %s\n", strerror(errno));
+ }
continue;
}
@@ -798,17 +841,18 @@ void create_sandbox(struct index_args *iargs, int n_proc, char *prefix,
char results[1024];
char *rval;
int fd;
+ int n;
+ char *eptr;
- if ( finished[i] ) continue;
+ if ( !sb->running[i] ) continue;
- fd = fileno(result_fhs[i]);
+ fd = fileno(sb->result_fhs[i]);
if ( !FD_ISSET(fd, &fds) ) continue;
- rval = fgets(results, 1024, result_fhs[i]);
+ rval = fgets(results, 1024, sb->result_fhs[i]);
if ( rval == NULL ) {
- if ( feof(result_fhs[i]) ) {
- /* Process died */
- finished[i] = 1;
+ if ( feof(sb->result_fhs[i]) ) {
+ ERROR("EOF from process %i.\n", i);
} else {
ERROR("fgets() failed: %s\n",
strerror(errno));
@@ -817,8 +861,16 @@ void create_sandbox(struct index_args *iargs, int n_proc, char *prefix,
}
chomp(results);
- n_indexable += atoi(results);
- n_processed++;
+
+ n = strtol(results, &eptr, 10);
+ if ( eptr == results ) {
+ if ( strlen(results) > 0 ) {
+ ERROR("Invalid result '%s'\n", results);
+ }
+ } else {
+ sb->n_indexable += atoi(results);
+ sb->n_processed++;
+ }
/* Send next filename */
nextImage = get_pattern(fh, &use_this_one_instead,
@@ -826,15 +878,15 @@ void create_sandbox(struct index_args *iargs, int n_proc, char *prefix,
if ( nextImage == NULL ) {
/* No more images */
- r = write(filename_pipes[i], "\n", 1);
+ r = write(sb->filename_pipes[i], "\n", 1);
if ( r < 0 ) {
ERROR("Write pipe\n");
}
} else {
- r = write(filename_pipes[i], nextImage,
+ r = write(sb->filename_pipes[i], nextImage,
strlen(nextImage));
- r -= write(filename_pipes[i], "\n", 1);
+ r -= write(sb->filename_pipes[i], "\n", 1);
if ( r < 0 ) {
ERROR("write pipe\n");
}
@@ -845,23 +897,23 @@ void create_sandbox(struct index_args *iargs, int n_proc, char *prefix,
/* Update progress */
tNow = get_monotonic_seconds();
- if ( tNow >= t_last_stats+STATS_EVERY_N_SECONDS ) {
+ if ( tNow >= sb->t_last_stats+STATS_EVERY_N_SECONDS ) {
STATUS("%i out of %i indexed so far,"
" %i out of %i since the last message.\n",
- n_indexable, n_processed,
- n_indexable - n_indexable_last_stats,
- n_processed - n_processed_last_stats);
+ sb->n_indexable, sb->n_processed,
+ sb->n_indexable - sb->n_indexable_last_stats,
+ sb->n_processed - sb->n_processed_last_stats);
- n_indexable_last_stats = n_indexable;
- n_processed_last_stats = n_processed;
- t_last_stats = tNow;
+ sb->n_indexable_last_stats = sb->n_indexable;
+ sb->n_processed_last_stats = sb->n_processed;
+ sb->t_last_stats = tNow;
}
allDone = 1;
for ( i=0; i<n_proc; i++ ) {
- if ( !finished[i] ) allDone = 0;
+ if ( sb->running[i] ) allDone = 0;
}
}
@@ -870,20 +922,22 @@ void create_sandbox(struct index_args *iargs, int n_proc, char *prefix,
for ( i=0; i<n_proc; i++ ) {
int status;
- waitpid(pids[i], &status, 0);
+ waitpid(sb->pids[i], &status, 0);
}
for ( i=0; i<n_proc; i++ ) {
- close(filename_pipes[i]);
- fclose(result_fhs[i]);
+ close(sb->filename_pipes[i]);
+ fclose(sb->result_fhs[i]);
}
- free(filename_pipes);
- free(result_fhs);
- free(pids);
- free(finished);
+ free(sb->filename_pipes);
+ free(sb->result_fhs);
+ free(sb->pids);
+ free(sb->running);
+
+ if ( ofh != stdout ) fclose(ofh);
STATUS("There were %i images, of which %i could be indexed.\n",
- n_processed, n_indexable);
+ sb->n_processed, sb->n_indexable);
}