aboutsummaryrefslogtreecommitdiff
path: root/src/im-sandbox.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/im-sandbox.c')
-rw-r--r--src/im-sandbox.c594
1 files changed, 267 insertions, 327 deletions
diff --git a/src/im-sandbox.c b/src/im-sandbox.c
index c8cceb2b..03c553b7 100644
--- a/src/im-sandbox.c
+++ b/src/im-sandbox.c
@@ -3,13 +3,13 @@
*
* Sandbox for indexing
*
- * Copyright © 2012 Deutsches Elektronen-Synchrotron DESY,
- * a research centre of the Helmholtz Association.
+ * Copyright © 2012-2013 Deutsches Elektronen-Synchrotron DESY,
+ * a research centre of the Helmholtz Association.
* Copyright © 2012 Richard Kirian
* Copyright © 2012 Lorenzo Galli
*
* Authors:
- * 2010-2012 Thomas White <taw@physics.org>
+ * 2010-2013 Thomas White <taw@physics.org>
* 2011 Richard Kirian
* 2012 Lorenzo Galli
* 2012 Chunhong Yoon
@@ -41,12 +41,11 @@
#include <string.h>
#include <unistd.h>
#include <getopt.h>
-#include <hdf5.h>
-#include <gsl/gsl_errno.h>
#include <pthread.h>
#include <sys/wait.h>
#include <fcntl.h>
#include <signal.h>
+#include <sys/stat.h>
#ifdef HAVE_CLOCK_GETTIME
#include <time.h>
@@ -54,48 +53,56 @@
#include <sys/time.h>
#endif
-#include "utils.h"
-#include "hdf5-file.h"
-#include "index.h"
-#include "peaks.h"
-#include "detector.h"
-#include "filters.h"
-#include "thread-pool.h"
-#include "beam-parameters.h"
-#include "geometry.h"
-#include "stream.h"
-#include "reflist-utils.h"
-
#include "im-sandbox.h"
+#include "process_image.h"
/* Write statistics at APPROXIMATELY this interval */
#define STATS_EVERY_N_SECONDS (5)
+struct sb_reader
+{
+ pthread_mutex_t lock;
+ int done;
+
+ /* If a worker process dies unexpectedly (e.g. if it segfaults), then
+ * the pipe for its output can still stay open for a little while while
+ * its buffer empties. The number of pipes being read from is therefore
+ * not necessarily the same as the number of worker processes. */
+ int n_read;
+ FILE **fhs;
+ int *fds;
+
+ /* Final output fd */
+ int ofd;
+};
+
+
struct sandbox
{
pthread_mutex_t lock;
- int n_indexable;
int n_processed;
- int n_indexable_last_stats;
+ int n_hadcrystals;
+ int n_crystals;
int n_processed_last_stats;
+ int n_hadcrystals_last_stats;
+ int n_crystals_last_stats;
int t_last_stats;
struct index_args *iargs;
int n_proc;
pid_t *pids;
- FILE *ofh;
- FILE **fhs;
int *running;
FILE **result_fhs;
int *filename_pipes;
- int *stream_pipe_read;
int *stream_pipe_write;
char **last_filename;
+
+ struct sb_reader *reader;
};
@@ -120,6 +127,7 @@ static char *get_pattern(FILE *fh, char **use_this_one_instead,
{
char *line;
char *filename;
+ size_t len;
do {
@@ -153,7 +161,13 @@ static char *get_pattern(FILE *fh, char **use_this_one_instead,
line = tmp;
}
- filename = malloc(strlen(prefix)+strlen(line)+1);
+ len = strlen(prefix)+strlen(line)+1;
+
+ /* Round the length of the buffer, too keep Valgrind quiet when it gets
+ * given to write() a bit later on */
+ len += 4 - (len % 4);
+
+ filename = malloc(len);
snprintf(filename, 1023, "%s%s", prefix, line);
@@ -163,187 +177,8 @@ static char *get_pattern(FILE *fh, char **use_this_one_instead,
}
-static void process_image(const struct index_args *iargs,
- struct pattern_args *pargs, FILE *ofh,
- int cookie)
-{
- float *data_for_measurement;
- size_t data_size;
- UnitCell *cell = iargs->cell;
- int config_cmfilter = iargs->config_cmfilter;
- int config_noisefilter = iargs->config_noisefilter;
- int config_verbose = iargs->config_verbose;
- IndexingMethod *indm = iargs->indm;
- int check;
- struct hdfile *hdfile;
- struct image image;
-
- image.features = NULL;
- image.data = NULL;
- image.flags = NULL;
- image.indexed_cell = NULL;
- image.copyme = iargs->copyme;
- image.reflections = NULL;
- image.n_saturated = 0;
- image.id = cookie;
- image.filename = pargs->filename;
- image.beam = iargs->beam;
- image.det = iargs->det;
-
- hdfile = hdfile_open(image.filename);
- if ( hdfile == NULL ) return;
-
- if ( iargs->element != NULL ) {
-
- int r;
- r = hdfile_set_image(hdfile, iargs->element);
- if ( r ) {
- ERROR("Couldn't select path '%s'\n", iargs->element);
- hdfile_close(hdfile);
- return;
- }
-
- } else {
-
- int r;
- r = hdfile_set_first_image(hdfile, "/");
- if ( r ) {
- ERROR("Couldn't select first path\n");
- hdfile_close(hdfile);
- return;
- }
-
- }
-
- check = hdf5_read(hdfile, &image, 1);
- if ( check ) {
- hdfile_close(hdfile);
- return;
- }
-
- if ( (image.width != image.det->max_fs + 1 )
- || (image.height != image.det->max_ss + 1))
- {
- ERROR("Image size doesn't match geometry size"
- " - rejecting image.\n");
- ERROR("Image size: %i,%i. Geometry size: %i,%i\n",
- image.width, image.height,
- image.det->max_fs + 1, image.det->max_ss + 1);
- hdfile_close(hdfile);
- return;
- }
-
- fill_in_values(image.det, hdfile);
- fill_in_beam_parameters(image.beam, hdfile);
-
- image.lambda = ph_en_to_lambda(eV_to_J(image.beam->photon_energy));
-
- if ( (image.beam->photon_energy < 0.0) || (image.lambda > 1000) ) {
- /* Error message covers a silly value in the beam file or in
- * the HDF5 file. */
- ERROR("Nonsensical wavelength (%e m or %e eV) value for %s.\n",
- image.lambda, image.beam->photon_energy, image.filename);
- hdfile_close(hdfile);
- return;
- }
-
- if ( config_cmfilter ) {
- filter_cm(&image);
- }
-
- /* Take snapshot of image after CM subtraction but before
- * the aggressive noise filter. */
- data_size = image.width * image.height * sizeof(float);
- data_for_measurement = malloc(data_size);
-
- if ( config_noisefilter ) {
- filter_noise(&image, data_for_measurement);
- } else {
- memcpy(data_for_measurement, image.data, data_size);
- }
-
- switch ( iargs->peaks ) {
-
- case PEAK_HDF5:
- // Get peaks from HDF5
- if (get_peaks(&image, hdfile,
- iargs->hdf5_peak_path)) {
- ERROR("Failed to get peaks from HDF5 file.\n");
- }
- if ( !iargs->no_revalidate ) {
- validate_peaks(&image, iargs->min_int_snr,
- iargs->ir_inn, iargs->ir_mid,
- iargs->ir_out, iargs->use_saturated);
- }
- break;
-
- case PEAK_ZAEF:
- search_peaks(&image, iargs->threshold,
- iargs->min_gradient, iargs->min_snr,
- iargs->ir_inn, iargs->ir_mid, iargs->ir_out,
- iargs->use_saturated);
- break;
-
- }
-
- /* Get rid of noise-filtered version at this point
- * - it was strictly for the purposes of peak detection. */
- free(image.data);
- image.data = data_for_measurement;
-
- /* Calculate orientation matrix (by magic) */
- image.div = image.beam->divergence;
- image.bw = image.beam->bandwidth;
- image.profile_radius = image.beam->profile_radius;
-
- index_pattern(&image, cell, indm, iargs->cellr,
- config_verbose, iargs->ipriv,
- iargs->config_insane, iargs->tols);
-
- if ( image.indexed_cell != NULL ) {
-
- pargs->indexable = 1;
-
- if ( iargs->integrate_found ) {
- image.reflections = select_intersections(&image,
- image.indexed_cell);
- } else {
- image.reflections = find_intersections(&image,
- image.indexed_cell);
- }
-
- if (image.reflections != NULL) {
- integrate_reflections(&image,
- iargs->config_closer,
- iargs->config_bgsub,
- iargs->min_int_snr,
- iargs->ir_inn,
- iargs->ir_mid,
- iargs->ir_out,
- iargs->integrate_saturated);
- }
-
- } else {
- image.reflections = NULL;
- }
-
- write_chunk(ofh, &image, hdfile, iargs->stream_flags);
- fprintf(ofh, "END\n");
- fflush(ofh);
-
- /* Only free cell if found */
- cell_free(image.indexed_cell);
-
- reflist_free(image.reflections);
- free(image.data);
- if ( image.flags != NULL ) free(image.flags);
- image_feature_list_free(image.features);
- hdfile_close(hdfile);
-}
-
-
static void run_work(const struct index_args *iargs,
- int filename_pipe, int results_pipe, FILE *ofh,
+ int filename_pipe, int results_pipe, Stream *st,
int cookie)
{
int allDone = 0;
@@ -389,12 +224,12 @@ static void run_work(const struct index_args *iargs,
} else {
pargs.filename = line;
- pargs.indexable = 0;
+ pargs.n_crystals = 0;
- process_image(iargs, &pargs, ofh, cookie);
+ process_image(iargs, &pargs, st, cookie);
/* Request another image */
- c = sprintf(buf, "%i\n", pargs.indexable);
+ c = sprintf(buf, "%i\n", pargs.n_crystals);
w = write(results_pipe, buf, c);
if ( w < 0 ) {
ERROR("write P0\n");
@@ -406,7 +241,7 @@ static void run_work(const struct index_args *iargs,
}
- cleanup_indexing(iargs->ipriv);
+ cleanup_indexing(iargs->indm, iargs->ipriv);
free(iargs->indm);
free(iargs->ipriv);
free_detector_geometry(iargs->det);
@@ -442,11 +277,19 @@ static time_t get_monotonic_seconds()
#endif
+size_t vol = 0;
+
+
+static ssize_t lwrite(int fd, const char *a)
+{
+ size_t l = strlen(a);
+ return write(fd, a, l);
+}
+
-static int pump_chunk(FILE *fh, FILE *ofh)
+static int pump_chunk(FILE *fh, int ofd)
{
int chunk_started = 0;
- int chunk_finished = 0;
do {
@@ -457,65 +300,121 @@ static int pump_chunk(FILE *fh, FILE *ofh)
if ( rval == NULL ) {
if ( feof(fh) ) {
- /* Process died */
+ /* Whoops, connection lost */
if ( chunk_started ) {
ERROR("EOF during chunk!\n");
- fprintf(ofh, "Chunk is unfinished!\n");
- }
+ lwrite(ofd, "Unfinished chunk!\n");
+ lwrite(ofd, CHUNK_END_MARKER"\n");
+ } /* else normal end of output */
return 1;
- } else {
- ERROR("fgets() failed: %s\n", strerror(errno));
}
- chunk_finished = 1;
- continue;
- }
+ ERROR("fgets() failed: %s\n", strerror(errno));
+ return 1;
- if ( strcmp(line, "END\n") == 0 ) {
- chunk_finished = 1;
- } else {
- chunk_started = 1;
- fprintf(ofh, "%s", line);
}
- } while ( !chunk_finished );
+ if ( strcmp(line, "FLUSH\n") == 0 ) break;
+ lwrite(ofd, line);
+
+ if ( strcmp(line, CHUNK_END_MARKER"\n") == 0 ) break;
+ if ( strcmp(line, CHUNK_START_MARKER"\n") == 0 ) break;
+ } while ( 1 );
return 0;
}
-static void *run_reader(void *sbv)
+/* Add an fd to the list of pipes to be read from */
+static void add_pipe(struct sb_reader *rd, int fd)
{
- struct sandbox *sb = sbv;
- int done = 0;
+ int *fds_new;
+ FILE **fhs_new;
+ int slot;
- while ( !done ) {
+ pthread_mutex_lock(&rd->lock);
+
+ fds_new = realloc(rd->fds, (rd->n_read+1)*sizeof(int));
+ if ( fds_new == NULL ) {
+ ERROR("Failed to allocate memory for new pipe.\n");
+ return;
+ }
+
+ fhs_new = realloc(rd->fhs, (rd->n_read+1)*sizeof(FILE *));
+ if ( fhs_new == NULL ) {
+ ERROR("Failed to allocate memory for new FH.\n");
+ return;
+ }
+
+ rd->fds = fds_new;
+ rd->fhs = fhs_new;
+ slot = rd->n_read;
+
+ rd->fds[slot] = fd;
+
+ rd->fhs[slot] = fdopen(fd, "r");
+ if ( rd->fhs[slot] == NULL ) {
+ ERROR("Couldn't fdopen() stream!\n");
+ return;
+ }
+
+ rd->n_read++;
+
+ pthread_mutex_unlock(&rd->lock);
+}
+
+
+/* Assumes that the caller is already holding rd->lock! */
+static void remove_pipe(struct sb_reader *rd, int d)
+{
+ int i;
+
+ for ( i=d; i<rd->n_read; i++ ) {
+ if ( i < rd->n_read-1 ) {
+ rd->fds[i] = rd->fds[i+1];
+ rd->fhs[i] = rd->fhs[i+1];
+ } /* else don't bother */
+ }
+
+ rd->n_read--;
+
+ /* We don't bother shrinking the arrays */
+}
+
+
+static void *run_reader(void *rdv)
+{
+ struct sb_reader *rd = rdv;
+
+ while ( 1 ) {
int r, i;
struct timeval tv;
fd_set fds;
int fdmax;
- tv.tv_sec = 5;
+ /* Exit when:
+ * - No fhs left open to read from
+ * AND - Main thread says "done" */
+ if ( (rd->n_read == 0) && rd->done ) break;
+
+ tv.tv_sec = 1;
tv.tv_usec = 0;
FD_ZERO(&fds);
fdmax = 0;
- lock_sandbox(sb);
- for ( i=0; i<sb->n_proc; i++ ) {
+ pthread_mutex_lock(&rd->lock);
+ for ( i=0; i<rd->n_read; i++ ) {
int fd;
- if ( !sb->running[i] ) continue;
-
- fd = sb->stream_pipe_read[i];
+ fd = rd->fds[i];
FD_SET(fd, &fds);
if ( fd > fdmax ) fdmax = fd;
}
-
- unlock_sandbox(sb);
+ pthread_mutex_unlock(&rd->lock);
r = select(fdmax+1, &fds, NULL, NULL, &tv);
@@ -526,38 +425,75 @@ static void *run_reader(void *sbv)
continue;
}
- if ( r == 0 ) continue; /* Nothing this time. Try again */
+ pthread_mutex_lock(&rd->lock);
+ for ( i=0; i<rd->n_read; i++ ) {
- lock_sandbox(sb);
- for ( i=0; i<sb->n_proc; i++ ) {
+ if ( !FD_ISSET(rd->fds[i], &fds) ) {
+ continue;
+ }
- if ( !sb->running[i] ) continue;
+ /* If the chunk cannot be read, assume the connection
+ * is broken and that the process will die soon. */
+ if ( pump_chunk(rd->fhs[i], rd->ofd) ) {
+ /* remove_pipe() assumes that the caller is
+ * holding rd->lock ! */
+ remove_pipe(rd, i);
+ }
- if ( !FD_ISSET(sb->stream_pipe_read[i], &fds) ) continue;
+ }
+ pthread_mutex_unlock(&rd->lock);
+
+ }
+
+ return NULL;
+}
- if ( pump_chunk(sb->fhs[i], sb->ofh) ) {
- sb->running[i] = 0;
- }
+static int create_temporary_folder(signed int n)
+{
+ int r;
+ char tmp[64];
+ struct stat s;
+
+ if ( n < 0 ) {
+ snprintf(tmp, 63, "indexamajig.%i", getpid());
+ } else {
+ snprintf(tmp, 63, "worker.%i", n);
+ }
+
+ if ( stat(tmp, &s) == -1 ) {
+ if ( errno != ENOENT ) {
+ ERROR("Failed to stat temporary folder.\n");
+ return 1;
}
- done = 1;
- for ( i=0; i<sb->n_proc; i++ ) {
- if ( sb->running[i] ) done = 0;
+ r = mkdir(tmp, S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH);
+ if ( r ) {
+ ERROR("Failed to create temporary folder: %s\n",
+ strerror(errno));
+ return 1;
}
- unlock_sandbox(sb);
}
- return NULL;
+ r = chdir(tmp);
+ if ( r ) {
+ ERROR("Failed to chdir to temporary folder: %s\n",
+ strerror(errno));
+ return 1;
+ }
+
+ return 0;
}
-static void start_worker_process(struct sandbox *sb, int slot)
+static void start_worker_process(struct sandbox *sb, int slot,
+ int argc, char *argv[])
{
pid_t p;
int filename_pipe[2];
int result_pipe[2];
+ int stream_pipe[2];
if ( pipe(filename_pipe) == - 1 ) {
ERROR("pipe() failed!\n");
@@ -569,6 +505,11 @@ static void start_worker_process(struct sandbox *sb, int slot)
return;
}
+ if ( pipe(stream_pipe) == - 1 ) {
+ ERROR("pipe() failed!\n");
+ return;
+ }
+
p = fork();
if ( p == -1 ) {
ERROR("fork() failed!\n");
@@ -577,7 +518,7 @@ static void start_worker_process(struct sandbox *sb, int slot)
if ( p == 0 ) {
- FILE *sfh;
+ Stream *st;
int j;
struct sigaction sa;
int r;
@@ -592,6 +533,8 @@ static void start_worker_process(struct sandbox *sb, int slot)
return;
}
+ create_temporary_folder(slot);
+
/* Free resources which will not be needed by worker */
for ( j=0; j<sb->n_proc; j++ ) {
if ( (j != slot) && (sb->running[j]) ) {
@@ -600,7 +543,9 @@ static void start_worker_process(struct sandbox *sb, int slot)
}
for ( j=0; j<sb->n_proc; j++ ) {
if ( (j != slot) && (sb->running[j]) ) {
- fclose(sb->result_fhs[j]);
+ if ( sb->result_fhs[j] != NULL ) {
+ fclose(sb->result_fhs[j]);
+ }
close(sb->filename_pipes[j]);
}
}
@@ -614,10 +559,12 @@ static void start_worker_process(struct sandbox *sb, int slot)
close(filename_pipe[1]);
close(result_pipe[0]);
- sfh = fdopen(sb->stream_pipe_write[slot], "w");
+ st = open_stream_fd_for_write(stream_pipe[1]);
+ write_command(st, argc, argv);
+ write_line(st, "FLUSH");
run_work(sb->iargs, filename_pipe[0], result_pipe[1],
- sfh, slot);
- fclose(sfh);
+ st, slot);
+ close_stream(st);
//close(filename_pipe[0]);
close(result_pipe[1]);
@@ -630,14 +577,11 @@ static void start_worker_process(struct sandbox *sb, int slot)
* and the 'read' end of the result pipe. */
sb->pids[slot] = p;
sb->running[slot] = 1;
+ add_pipe(sb->reader, stream_pipe[0]);
close(filename_pipe[0]);
close(result_pipe[1]);
+ close(stream_pipe[1]);
sb->filename_pipes[slot] = filename_pipe[1];
- sb->fhs[slot] = fdopen(sb->stream_pipe_read[slot], "r");
- if ( sb->fhs[slot] == NULL ) {
- ERROR("Couldn't fdopen() stream!\n");
- return;
- }
sb->result_fhs[slot] = fdopen(result_pipe[0], "r");
if ( sb->result_fhs[slot] == NULL ) {
@@ -685,7 +629,7 @@ static void handle_zombie(struct sandbox *sb)
STATUS("Last filename was: %s\n",
sb->last_filename[i]);
sb->n_processed++;
- start_worker_process(sb, i);
+ start_worker_process(sb, i, 0, NULL);
}
}
@@ -697,7 +641,7 @@ static void handle_zombie(struct sandbox *sb)
void create_sandbox(struct index_args *iargs, int n_proc, char *prefix,
int config_basename, FILE *fh, char *use_this_one_instead,
- FILE *ofh)
+ int ofd, int argc, char *argv[])
{
int i;
int allDone;
@@ -712,48 +656,41 @@ void create_sandbox(struct index_args *iargs, int n_proc, char *prefix,
return;
}
- sb->n_indexable = 0;
+ sb->reader = calloc(1, sizeof(struct sb_reader));
+ if ( sb->reader == NULL ) {
+ ERROR("Couldn't allocate memory for SB reader.\n");
+ free(sb);
+ return;
+ }
+
+ pthread_mutex_init(&sb->lock, NULL);
+ pthread_mutex_init(&sb->reader->lock, NULL);
+
sb->n_processed = 0;
- sb->n_indexable_last_stats = 0;
+ sb->n_hadcrystals = 0;
+ sb->n_crystals = 0;
sb->n_processed_last_stats = 0;
+ sb->n_hadcrystals_last_stats = 0;
+ sb->n_crystals_last_stats = 0;
sb->t_last_stats = get_monotonic_seconds();
sb->n_proc = n_proc;
- sb->ofh = ofh;
sb->iargs = iargs;
- pthread_mutex_init(&sb->lock, NULL);
+ sb->reader->fds = NULL;
+ sb->reader->fhs = NULL;
+ sb->reader->ofd = ofd;
- sb->stream_pipe_read = calloc(n_proc, sizeof(int));
sb->stream_pipe_write = calloc(n_proc, sizeof(int));
- if ( sb->stream_pipe_read == NULL ) {
- ERROR("Couldn't allocate memory for pipes.\n");
- return;
- }
if ( sb->stream_pipe_write == NULL ) {
ERROR("Couldn't allocate memory for pipes.\n");
return;
}
- for ( i=0; i<n_proc; i++ ) {
-
- int stream_pipe[2];
-
- if ( pipe(stream_pipe) == - 1 ) {
- ERROR("pipe() failed!\n");
- return;
- }
-
- sb->stream_pipe_read[i] = stream_pipe[0];
- sb->stream_pipe_write[i] = stream_pipe[1];
-
- }
-
lock_sandbox(sb);
sb->filename_pipes = calloc(n_proc, sizeof(int));
sb->result_fhs = calloc(n_proc, sizeof(FILE *));
sb->pids = calloc(n_proc, sizeof(pid_t));
sb->running = calloc(n_proc, sizeof(int));
- sb->fhs = calloc(sb->n_proc, sizeof(FILE *));
if ( sb->filename_pipes == NULL ) {
ERROR("Couldn't allocate memory for pipes.\n");
return;
@@ -776,17 +713,8 @@ void create_sandbox(struct index_args *iargs, int n_proc, char *prefix,
ERROR("Couldn't allocate memory for last filename list.\n");
return;
}
- if ( sb->fhs == NULL ) {
- ERROR("Couldn't allocate memory for file handles!\n");
- return;
- }
unlock_sandbox(sb);
- if ( pthread_create(&reader_thread, NULL, run_reader, (void *)sb) ) {
- ERROR("Failed to create reader thread.\n");
- return;
- }
-
if ( pipe(signal_pipe) == -1 ) {
ERROR("Failed to create signal pipe.\n");
return;
@@ -802,13 +730,23 @@ void create_sandbox(struct index_args *iargs, int n_proc, char *prefix,
return;
}
+ if ( create_temporary_folder(-1) ) return;
+
/* Fork the right number of times */
lock_sandbox(sb);
for ( i=0; i<n_proc; i++ ) {
- start_worker_process(sb, i);
+ start_worker_process(sb, i, argc, argv);
}
unlock_sandbox(sb);
+ /* Start reader thread after forking, so that things are definitely
+ * "running" */
+ if ( pthread_create(&reader_thread, NULL, run_reader,
+ (void *)sb->reader) ) {
+ ERROR("Failed to create reader thread.\n");
+ return;
+ }
+
allDone = 0;
while ( !allDone ) {
@@ -828,9 +766,7 @@ void create_sandbox(struct index_args *iargs, int n_proc, char *prefix,
int fd;
- if ( !sb->running[i] ) {
- continue;
- }
+ if ( sb->result_fhs[i] == NULL) continue;
fd = fileno(sb->result_fhs[i]);
FD_SET(fd, &fds);
@@ -844,14 +780,11 @@ void create_sandbox(struct index_args *iargs, int n_proc, char *prefix,
r = select(fdmax+1, &fds, NULL, NULL, &tv);
if ( r == -1 ) {
- if ( errno != EINTR ) {
- ERROR("select() failed: %s\n", strerror(errno));
- }
- continue;
+ if ( errno == EINTR ) continue;
+ ERROR("select() failed: %s\n", strerror(errno));
+ break;
}
- if ( r == 0 ) continue; /* No progress this time. Try again */
-
if ( FD_ISSET(signal_pipe[0], &fds) ) {
char d;
@@ -869,14 +802,10 @@ void create_sandbox(struct index_args *iargs, int n_proc, char *prefix,
int fd;
char *eptr;
- if ( !sb->running[i] ) {
- continue;
- }
+ if ( sb->result_fhs[i] == NULL ) continue;
fd = fileno(sb->result_fhs[i]);
- if ( !FD_ISSET(fd, &fds) ) {
- continue;
- }
+ if ( !FD_ISSET(fd, &fds) ) continue;
rval = fgets(results, 1024, sb->result_fhs[i]);
if ( rval == NULL ) {
@@ -884,6 +813,7 @@ void create_sandbox(struct index_args *iargs, int n_proc, char *prefix,
ERROR("fgets() failed: %s\n",
strerror(errno));
}
+ sb->result_fhs[i] = NULL;
continue;
}
@@ -895,8 +825,14 @@ void create_sandbox(struct index_args *iargs, int n_proc, char *prefix,
ERROR("Invalid result '%s'\n", results);
}
} else {
- sb->n_indexable += atoi(results);
+
+ int nc = atoi(results);
+ sb->n_crystals += nc;
+ if ( nc > 0 ) {
+ sb->n_hadcrystals++;
+ }
sb->n_processed++;
+
}
/* Send next filename */
@@ -929,14 +865,16 @@ void create_sandbox(struct index_args *iargs, int n_proc, char *prefix,
tNow = get_monotonic_seconds();
if ( tNow >= sb->t_last_stats+STATS_EVERY_N_SECONDS ) {
- STATUS("%i out of %i indexed so far,"
- " %i out of %i since the last message.\n",
- sb->n_indexable, sb->n_processed,
- sb->n_indexable - sb->n_indexable_last_stats,
+ STATUS("%4i indexable out of %4i processed, "
+ "%4i crystals so far. "
+ "%4i images processed since the last message.\n",
+ sb->n_hadcrystals, sb->n_processed,
+ sb->n_crystals,
sb->n_processed - sb->n_processed_last_stats);
- sb->n_indexable_last_stats = sb->n_indexable;
sb->n_processed_last_stats = sb->n_processed;
+ sb->n_hadcrystals_last_stats = sb->n_hadcrystals;
+ sb->n_crystals_last_stats = sb->n_crystals;
sb->t_last_stats = tNow;
}
@@ -947,14 +885,18 @@ void create_sandbox(struct index_args *iargs, int n_proc, char *prefix,
for ( i=0; i<n_proc; i++ ) {
if ( sb->running[i] ) allDone = 0;
}
-
unlock_sandbox(sb);
}
fclose(fh);
- pthread_mutex_destroy(&sb->lock);
+ /* Indicate to the reader thread that we are done */
+ pthread_mutex_lock(&sb->reader->lock);
+ sb->reader->done = 1;
+ pthread_mutex_unlock(&sb->reader->lock);
+
+ pthread_join(reader_thread, NULL);
for ( i=0; i<n_proc; i++ ) {
int status;
@@ -963,21 +905,19 @@ void create_sandbox(struct index_args *iargs, int n_proc, char *prefix,
for ( i=0; i<n_proc; i++ ) {
close(sb->filename_pipes[i]);
- fclose(sb->result_fhs[i]);
+ if ( sb->result_fhs[i] != NULL ) fclose(sb->result_fhs[i]);
}
- for ( i=0; i<sb->n_proc; i++ ) {
- fclose(sb->fhs[i]);
- }
- free(sb->fhs);
+ free(sb->running);
free(sb->filename_pipes);
free(sb->result_fhs);
free(sb->pids);
- free(sb->running);
- if ( ofh != stdout ) fclose(ofh);
+ pthread_mutex_destroy(&sb->lock);
- STATUS("There were %i images, of which %i could be indexed.\n",
- sb->n_processed, sb->n_indexable);
+ STATUS("Final:"
+ " %i images processed, %i had crystals, %i crystals overall.\n",
+ sb->n_processed, sb->n_hadcrystals, sb->n_crystals);
+ free(sb);
}