diff options
Diffstat (limited to 'src/im-sandbox.c')
-rw-r--r-- | src/im-sandbox.c | 594 |
1 files changed, 267 insertions, 327 deletions
diff --git a/src/im-sandbox.c b/src/im-sandbox.c index c8cceb2b..03c553b7 100644 --- a/src/im-sandbox.c +++ b/src/im-sandbox.c @@ -3,13 +3,13 @@ * * Sandbox for indexing * - * Copyright © 2012 Deutsches Elektronen-Synchrotron DESY, - * a research centre of the Helmholtz Association. + * Copyright © 2012-2013 Deutsches Elektronen-Synchrotron DESY, + * a research centre of the Helmholtz Association. * Copyright © 2012 Richard Kirian * Copyright © 2012 Lorenzo Galli * * Authors: - * 2010-2012 Thomas White <taw@physics.org> + * 2010-2013 Thomas White <taw@physics.org> * 2011 Richard Kirian * 2012 Lorenzo Galli * 2012 Chunhong Yoon @@ -41,12 +41,11 @@ #include <string.h> #include <unistd.h> #include <getopt.h> -#include <hdf5.h> -#include <gsl/gsl_errno.h> #include <pthread.h> #include <sys/wait.h> #include <fcntl.h> #include <signal.h> +#include <sys/stat.h> #ifdef HAVE_CLOCK_GETTIME #include <time.h> @@ -54,48 +53,56 @@ #include <sys/time.h> #endif -#include "utils.h" -#include "hdf5-file.h" -#include "index.h" -#include "peaks.h" -#include "detector.h" -#include "filters.h" -#include "thread-pool.h" -#include "beam-parameters.h" -#include "geometry.h" -#include "stream.h" -#include "reflist-utils.h" - #include "im-sandbox.h" +#include "process_image.h" /* Write statistics at APPROXIMATELY this interval */ #define STATS_EVERY_N_SECONDS (5) +struct sb_reader +{ + pthread_mutex_t lock; + int done; + + /* If a worker process dies unexpectedly (e.g. if it segfaults), then + * the pipe for its output can still stay open for a little while while + * its buffer empties. The number of pipes being read from is therefore + * not necessarily the same as the number of worker processes. */ + int n_read; + FILE **fhs; + int *fds; + + /* Final output fd */ + int ofd; +}; + + struct sandbox { pthread_mutex_t lock; - int n_indexable; int n_processed; - int n_indexable_last_stats; + int n_hadcrystals; + int n_crystals; int n_processed_last_stats; + int n_hadcrystals_last_stats; + int n_crystals_last_stats; int t_last_stats; struct index_args *iargs; int n_proc; pid_t *pids; - FILE *ofh; - FILE **fhs; int *running; FILE **result_fhs; int *filename_pipes; - int *stream_pipe_read; int *stream_pipe_write; char **last_filename; + + struct sb_reader *reader; }; @@ -120,6 +127,7 @@ static char *get_pattern(FILE *fh, char **use_this_one_instead, { char *line; char *filename; + size_t len; do { @@ -153,7 +161,13 @@ static char *get_pattern(FILE *fh, char **use_this_one_instead, line = tmp; } - filename = malloc(strlen(prefix)+strlen(line)+1); + len = strlen(prefix)+strlen(line)+1; + + /* Round the length of the buffer, too keep Valgrind quiet when it gets + * given to write() a bit later on */ + len += 4 - (len % 4); + + filename = malloc(len); snprintf(filename, 1023, "%s%s", prefix, line); @@ -163,187 +177,8 @@ static char *get_pattern(FILE *fh, char **use_this_one_instead, } -static void process_image(const struct index_args *iargs, - struct pattern_args *pargs, FILE *ofh, - int cookie) -{ - float *data_for_measurement; - size_t data_size; - UnitCell *cell = iargs->cell; - int config_cmfilter = iargs->config_cmfilter; - int config_noisefilter = iargs->config_noisefilter; - int config_verbose = iargs->config_verbose; - IndexingMethod *indm = iargs->indm; - int check; - struct hdfile *hdfile; - struct image image; - - image.features = NULL; - image.data = NULL; - image.flags = NULL; - image.indexed_cell = NULL; - image.copyme = iargs->copyme; - image.reflections = NULL; - image.n_saturated = 0; - image.id = cookie; - image.filename = pargs->filename; - image.beam = iargs->beam; - image.det = iargs->det; - - hdfile = hdfile_open(image.filename); - if ( hdfile == NULL ) return; - - if ( iargs->element != NULL ) { - - int r; - r = hdfile_set_image(hdfile, iargs->element); - if ( r ) { - ERROR("Couldn't select path '%s'\n", iargs->element); - hdfile_close(hdfile); - return; - } - - } else { - - int r; - r = hdfile_set_first_image(hdfile, "/"); - if ( r ) { - ERROR("Couldn't select first path\n"); - hdfile_close(hdfile); - return; - } - - } - - check = hdf5_read(hdfile, &image, 1); - if ( check ) { - hdfile_close(hdfile); - return; - } - - if ( (image.width != image.det->max_fs + 1 ) - || (image.height != image.det->max_ss + 1)) - { - ERROR("Image size doesn't match geometry size" - " - rejecting image.\n"); - ERROR("Image size: %i,%i. Geometry size: %i,%i\n", - image.width, image.height, - image.det->max_fs + 1, image.det->max_ss + 1); - hdfile_close(hdfile); - return; - } - - fill_in_values(image.det, hdfile); - fill_in_beam_parameters(image.beam, hdfile); - - image.lambda = ph_en_to_lambda(eV_to_J(image.beam->photon_energy)); - - if ( (image.beam->photon_energy < 0.0) || (image.lambda > 1000) ) { - /* Error message covers a silly value in the beam file or in - * the HDF5 file. */ - ERROR("Nonsensical wavelength (%e m or %e eV) value for %s.\n", - image.lambda, image.beam->photon_energy, image.filename); - hdfile_close(hdfile); - return; - } - - if ( config_cmfilter ) { - filter_cm(&image); - } - - /* Take snapshot of image after CM subtraction but before - * the aggressive noise filter. */ - data_size = image.width * image.height * sizeof(float); - data_for_measurement = malloc(data_size); - - if ( config_noisefilter ) { - filter_noise(&image, data_for_measurement); - } else { - memcpy(data_for_measurement, image.data, data_size); - } - - switch ( iargs->peaks ) { - - case PEAK_HDF5: - // Get peaks from HDF5 - if (get_peaks(&image, hdfile, - iargs->hdf5_peak_path)) { - ERROR("Failed to get peaks from HDF5 file.\n"); - } - if ( !iargs->no_revalidate ) { - validate_peaks(&image, iargs->min_int_snr, - iargs->ir_inn, iargs->ir_mid, - iargs->ir_out, iargs->use_saturated); - } - break; - - case PEAK_ZAEF: - search_peaks(&image, iargs->threshold, - iargs->min_gradient, iargs->min_snr, - iargs->ir_inn, iargs->ir_mid, iargs->ir_out, - iargs->use_saturated); - break; - - } - - /* Get rid of noise-filtered version at this point - * - it was strictly for the purposes of peak detection. */ - free(image.data); - image.data = data_for_measurement; - - /* Calculate orientation matrix (by magic) */ - image.div = image.beam->divergence; - image.bw = image.beam->bandwidth; - image.profile_radius = image.beam->profile_radius; - - index_pattern(&image, cell, indm, iargs->cellr, - config_verbose, iargs->ipriv, - iargs->config_insane, iargs->tols); - - if ( image.indexed_cell != NULL ) { - - pargs->indexable = 1; - - if ( iargs->integrate_found ) { - image.reflections = select_intersections(&image, - image.indexed_cell); - } else { - image.reflections = find_intersections(&image, - image.indexed_cell); - } - - if (image.reflections != NULL) { - integrate_reflections(&image, - iargs->config_closer, - iargs->config_bgsub, - iargs->min_int_snr, - iargs->ir_inn, - iargs->ir_mid, - iargs->ir_out, - iargs->integrate_saturated); - } - - } else { - image.reflections = NULL; - } - - write_chunk(ofh, &image, hdfile, iargs->stream_flags); - fprintf(ofh, "END\n"); - fflush(ofh); - - /* Only free cell if found */ - cell_free(image.indexed_cell); - - reflist_free(image.reflections); - free(image.data); - if ( image.flags != NULL ) free(image.flags); - image_feature_list_free(image.features); - hdfile_close(hdfile); -} - - static void run_work(const struct index_args *iargs, - int filename_pipe, int results_pipe, FILE *ofh, + int filename_pipe, int results_pipe, Stream *st, int cookie) { int allDone = 0; @@ -389,12 +224,12 @@ static void run_work(const struct index_args *iargs, } else { pargs.filename = line; - pargs.indexable = 0; + pargs.n_crystals = 0; - process_image(iargs, &pargs, ofh, cookie); + process_image(iargs, &pargs, st, cookie); /* Request another image */ - c = sprintf(buf, "%i\n", pargs.indexable); + c = sprintf(buf, "%i\n", pargs.n_crystals); w = write(results_pipe, buf, c); if ( w < 0 ) { ERROR("write P0\n"); @@ -406,7 +241,7 @@ static void run_work(const struct index_args *iargs, } - cleanup_indexing(iargs->ipriv); + cleanup_indexing(iargs->indm, iargs->ipriv); free(iargs->indm); free(iargs->ipriv); free_detector_geometry(iargs->det); @@ -442,11 +277,19 @@ static time_t get_monotonic_seconds() #endif +size_t vol = 0; + + +static ssize_t lwrite(int fd, const char *a) +{ + size_t l = strlen(a); + return write(fd, a, l); +} + -static int pump_chunk(FILE *fh, FILE *ofh) +static int pump_chunk(FILE *fh, int ofd) { int chunk_started = 0; - int chunk_finished = 0; do { @@ -457,65 +300,121 @@ static int pump_chunk(FILE *fh, FILE *ofh) if ( rval == NULL ) { if ( feof(fh) ) { - /* Process died */ + /* Whoops, connection lost */ if ( chunk_started ) { ERROR("EOF during chunk!\n"); - fprintf(ofh, "Chunk is unfinished!\n"); - } + lwrite(ofd, "Unfinished chunk!\n"); + lwrite(ofd, CHUNK_END_MARKER"\n"); + } /* else normal end of output */ return 1; - } else { - ERROR("fgets() failed: %s\n", strerror(errno)); } - chunk_finished = 1; - continue; - } + ERROR("fgets() failed: %s\n", strerror(errno)); + return 1; - if ( strcmp(line, "END\n") == 0 ) { - chunk_finished = 1; - } else { - chunk_started = 1; - fprintf(ofh, "%s", line); } - } while ( !chunk_finished ); + if ( strcmp(line, "FLUSH\n") == 0 ) break; + lwrite(ofd, line); + + if ( strcmp(line, CHUNK_END_MARKER"\n") == 0 ) break; + if ( strcmp(line, CHUNK_START_MARKER"\n") == 0 ) break; + } while ( 1 ); return 0; } -static void *run_reader(void *sbv) +/* Add an fd to the list of pipes to be read from */ +static void add_pipe(struct sb_reader *rd, int fd) { - struct sandbox *sb = sbv; - int done = 0; + int *fds_new; + FILE **fhs_new; + int slot; - while ( !done ) { + pthread_mutex_lock(&rd->lock); + + fds_new = realloc(rd->fds, (rd->n_read+1)*sizeof(int)); + if ( fds_new == NULL ) { + ERROR("Failed to allocate memory for new pipe.\n"); + return; + } + + fhs_new = realloc(rd->fhs, (rd->n_read+1)*sizeof(FILE *)); + if ( fhs_new == NULL ) { + ERROR("Failed to allocate memory for new FH.\n"); + return; + } + + rd->fds = fds_new; + rd->fhs = fhs_new; + slot = rd->n_read; + + rd->fds[slot] = fd; + + rd->fhs[slot] = fdopen(fd, "r"); + if ( rd->fhs[slot] == NULL ) { + ERROR("Couldn't fdopen() stream!\n"); + return; + } + + rd->n_read++; + + pthread_mutex_unlock(&rd->lock); +} + + +/* Assumes that the caller is already holding rd->lock! */ +static void remove_pipe(struct sb_reader *rd, int d) +{ + int i; + + for ( i=d; i<rd->n_read; i++ ) { + if ( i < rd->n_read-1 ) { + rd->fds[i] = rd->fds[i+1]; + rd->fhs[i] = rd->fhs[i+1]; + } /* else don't bother */ + } + + rd->n_read--; + + /* We don't bother shrinking the arrays */ +} + + +static void *run_reader(void *rdv) +{ + struct sb_reader *rd = rdv; + + while ( 1 ) { int r, i; struct timeval tv; fd_set fds; int fdmax; - tv.tv_sec = 5; + /* Exit when: + * - No fhs left open to read from + * AND - Main thread says "done" */ + if ( (rd->n_read == 0) && rd->done ) break; + + tv.tv_sec = 1; tv.tv_usec = 0; FD_ZERO(&fds); fdmax = 0; - lock_sandbox(sb); - for ( i=0; i<sb->n_proc; i++ ) { + pthread_mutex_lock(&rd->lock); + for ( i=0; i<rd->n_read; i++ ) { int fd; - if ( !sb->running[i] ) continue; - - fd = sb->stream_pipe_read[i]; + fd = rd->fds[i]; FD_SET(fd, &fds); if ( fd > fdmax ) fdmax = fd; } - - unlock_sandbox(sb); + pthread_mutex_unlock(&rd->lock); r = select(fdmax+1, &fds, NULL, NULL, &tv); @@ -526,38 +425,75 @@ static void *run_reader(void *sbv) continue; } - if ( r == 0 ) continue; /* Nothing this time. Try again */ + pthread_mutex_lock(&rd->lock); + for ( i=0; i<rd->n_read; i++ ) { - lock_sandbox(sb); - for ( i=0; i<sb->n_proc; i++ ) { + if ( !FD_ISSET(rd->fds[i], &fds) ) { + continue; + } - if ( !sb->running[i] ) continue; + /* If the chunk cannot be read, assume the connection + * is broken and that the process will die soon. */ + if ( pump_chunk(rd->fhs[i], rd->ofd) ) { + /* remove_pipe() assumes that the caller is + * holding rd->lock ! */ + remove_pipe(rd, i); + } - if ( !FD_ISSET(sb->stream_pipe_read[i], &fds) ) continue; + } + pthread_mutex_unlock(&rd->lock); + + } + + return NULL; +} - if ( pump_chunk(sb->fhs[i], sb->ofh) ) { - sb->running[i] = 0; - } +static int create_temporary_folder(signed int n) +{ + int r; + char tmp[64]; + struct stat s; + + if ( n < 0 ) { + snprintf(tmp, 63, "indexamajig.%i", getpid()); + } else { + snprintf(tmp, 63, "worker.%i", n); + } + + if ( stat(tmp, &s) == -1 ) { + if ( errno != ENOENT ) { + ERROR("Failed to stat temporary folder.\n"); + return 1; } - done = 1; - for ( i=0; i<sb->n_proc; i++ ) { - if ( sb->running[i] ) done = 0; + r = mkdir(tmp, S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH); + if ( r ) { + ERROR("Failed to create temporary folder: %s\n", + strerror(errno)); + return 1; } - unlock_sandbox(sb); } - return NULL; + r = chdir(tmp); + if ( r ) { + ERROR("Failed to chdir to temporary folder: %s\n", + strerror(errno)); + return 1; + } + + return 0; } -static void start_worker_process(struct sandbox *sb, int slot) +static void start_worker_process(struct sandbox *sb, int slot, + int argc, char *argv[]) { pid_t p; int filename_pipe[2]; int result_pipe[2]; + int stream_pipe[2]; if ( pipe(filename_pipe) == - 1 ) { ERROR("pipe() failed!\n"); @@ -569,6 +505,11 @@ static void start_worker_process(struct sandbox *sb, int slot) return; } + if ( pipe(stream_pipe) == - 1 ) { + ERROR("pipe() failed!\n"); + return; + } + p = fork(); if ( p == -1 ) { ERROR("fork() failed!\n"); @@ -577,7 +518,7 @@ static void start_worker_process(struct sandbox *sb, int slot) if ( p == 0 ) { - FILE *sfh; + Stream *st; int j; struct sigaction sa; int r; @@ -592,6 +533,8 @@ static void start_worker_process(struct sandbox *sb, int slot) return; } + create_temporary_folder(slot); + /* Free resources which will not be needed by worker */ for ( j=0; j<sb->n_proc; j++ ) { if ( (j != slot) && (sb->running[j]) ) { @@ -600,7 +543,9 @@ static void start_worker_process(struct sandbox *sb, int slot) } for ( j=0; j<sb->n_proc; j++ ) { if ( (j != slot) && (sb->running[j]) ) { - fclose(sb->result_fhs[j]); + if ( sb->result_fhs[j] != NULL ) { + fclose(sb->result_fhs[j]); + } close(sb->filename_pipes[j]); } } @@ -614,10 +559,12 @@ static void start_worker_process(struct sandbox *sb, int slot) close(filename_pipe[1]); close(result_pipe[0]); - sfh = fdopen(sb->stream_pipe_write[slot], "w"); + st = open_stream_fd_for_write(stream_pipe[1]); + write_command(st, argc, argv); + write_line(st, "FLUSH"); run_work(sb->iargs, filename_pipe[0], result_pipe[1], - sfh, slot); - fclose(sfh); + st, slot); + close_stream(st); //close(filename_pipe[0]); close(result_pipe[1]); @@ -630,14 +577,11 @@ static void start_worker_process(struct sandbox *sb, int slot) * and the 'read' end of the result pipe. */ sb->pids[slot] = p; sb->running[slot] = 1; + add_pipe(sb->reader, stream_pipe[0]); close(filename_pipe[0]); close(result_pipe[1]); + close(stream_pipe[1]); sb->filename_pipes[slot] = filename_pipe[1]; - sb->fhs[slot] = fdopen(sb->stream_pipe_read[slot], "r"); - if ( sb->fhs[slot] == NULL ) { - ERROR("Couldn't fdopen() stream!\n"); - return; - } sb->result_fhs[slot] = fdopen(result_pipe[0], "r"); if ( sb->result_fhs[slot] == NULL ) { @@ -685,7 +629,7 @@ static void handle_zombie(struct sandbox *sb) STATUS("Last filename was: %s\n", sb->last_filename[i]); sb->n_processed++; - start_worker_process(sb, i); + start_worker_process(sb, i, 0, NULL); } } @@ -697,7 +641,7 @@ static void handle_zombie(struct sandbox *sb) void create_sandbox(struct index_args *iargs, int n_proc, char *prefix, int config_basename, FILE *fh, char *use_this_one_instead, - FILE *ofh) + int ofd, int argc, char *argv[]) { int i; int allDone; @@ -712,48 +656,41 @@ void create_sandbox(struct index_args *iargs, int n_proc, char *prefix, return; } - sb->n_indexable = 0; + sb->reader = calloc(1, sizeof(struct sb_reader)); + if ( sb->reader == NULL ) { + ERROR("Couldn't allocate memory for SB reader.\n"); + free(sb); + return; + } + + pthread_mutex_init(&sb->lock, NULL); + pthread_mutex_init(&sb->reader->lock, NULL); + sb->n_processed = 0; - sb->n_indexable_last_stats = 0; + sb->n_hadcrystals = 0; + sb->n_crystals = 0; sb->n_processed_last_stats = 0; + sb->n_hadcrystals_last_stats = 0; + sb->n_crystals_last_stats = 0; sb->t_last_stats = get_monotonic_seconds(); sb->n_proc = n_proc; - sb->ofh = ofh; sb->iargs = iargs; - pthread_mutex_init(&sb->lock, NULL); + sb->reader->fds = NULL; + sb->reader->fhs = NULL; + sb->reader->ofd = ofd; - sb->stream_pipe_read = calloc(n_proc, sizeof(int)); sb->stream_pipe_write = calloc(n_proc, sizeof(int)); - if ( sb->stream_pipe_read == NULL ) { - ERROR("Couldn't allocate memory for pipes.\n"); - return; - } if ( sb->stream_pipe_write == NULL ) { ERROR("Couldn't allocate memory for pipes.\n"); return; } - for ( i=0; i<n_proc; i++ ) { - - int stream_pipe[2]; - - if ( pipe(stream_pipe) == - 1 ) { - ERROR("pipe() failed!\n"); - return; - } - - sb->stream_pipe_read[i] = stream_pipe[0]; - sb->stream_pipe_write[i] = stream_pipe[1]; - - } - lock_sandbox(sb); sb->filename_pipes = calloc(n_proc, sizeof(int)); sb->result_fhs = calloc(n_proc, sizeof(FILE *)); sb->pids = calloc(n_proc, sizeof(pid_t)); sb->running = calloc(n_proc, sizeof(int)); - sb->fhs = calloc(sb->n_proc, sizeof(FILE *)); if ( sb->filename_pipes == NULL ) { ERROR("Couldn't allocate memory for pipes.\n"); return; @@ -776,17 +713,8 @@ void create_sandbox(struct index_args *iargs, int n_proc, char *prefix, ERROR("Couldn't allocate memory for last filename list.\n"); return; } - if ( sb->fhs == NULL ) { - ERROR("Couldn't allocate memory for file handles!\n"); - return; - } unlock_sandbox(sb); - if ( pthread_create(&reader_thread, NULL, run_reader, (void *)sb) ) { - ERROR("Failed to create reader thread.\n"); - return; - } - if ( pipe(signal_pipe) == -1 ) { ERROR("Failed to create signal pipe.\n"); return; @@ -802,13 +730,23 @@ void create_sandbox(struct index_args *iargs, int n_proc, char *prefix, return; } + if ( create_temporary_folder(-1) ) return; + /* Fork the right number of times */ lock_sandbox(sb); for ( i=0; i<n_proc; i++ ) { - start_worker_process(sb, i); + start_worker_process(sb, i, argc, argv); } unlock_sandbox(sb); + /* Start reader thread after forking, so that things are definitely + * "running" */ + if ( pthread_create(&reader_thread, NULL, run_reader, + (void *)sb->reader) ) { + ERROR("Failed to create reader thread.\n"); + return; + } + allDone = 0; while ( !allDone ) { @@ -828,9 +766,7 @@ void create_sandbox(struct index_args *iargs, int n_proc, char *prefix, int fd; - if ( !sb->running[i] ) { - continue; - } + if ( sb->result_fhs[i] == NULL) continue; fd = fileno(sb->result_fhs[i]); FD_SET(fd, &fds); @@ -844,14 +780,11 @@ void create_sandbox(struct index_args *iargs, int n_proc, char *prefix, r = select(fdmax+1, &fds, NULL, NULL, &tv); if ( r == -1 ) { - if ( errno != EINTR ) { - ERROR("select() failed: %s\n", strerror(errno)); - } - continue; + if ( errno == EINTR ) continue; + ERROR("select() failed: %s\n", strerror(errno)); + break; } - if ( r == 0 ) continue; /* No progress this time. Try again */ - if ( FD_ISSET(signal_pipe[0], &fds) ) { char d; @@ -869,14 +802,10 @@ void create_sandbox(struct index_args *iargs, int n_proc, char *prefix, int fd; char *eptr; - if ( !sb->running[i] ) { - continue; - } + if ( sb->result_fhs[i] == NULL ) continue; fd = fileno(sb->result_fhs[i]); - if ( !FD_ISSET(fd, &fds) ) { - continue; - } + if ( !FD_ISSET(fd, &fds) ) continue; rval = fgets(results, 1024, sb->result_fhs[i]); if ( rval == NULL ) { @@ -884,6 +813,7 @@ void create_sandbox(struct index_args *iargs, int n_proc, char *prefix, ERROR("fgets() failed: %s\n", strerror(errno)); } + sb->result_fhs[i] = NULL; continue; } @@ -895,8 +825,14 @@ void create_sandbox(struct index_args *iargs, int n_proc, char *prefix, ERROR("Invalid result '%s'\n", results); } } else { - sb->n_indexable += atoi(results); + + int nc = atoi(results); + sb->n_crystals += nc; + if ( nc > 0 ) { + sb->n_hadcrystals++; + } sb->n_processed++; + } /* Send next filename */ @@ -929,14 +865,16 @@ void create_sandbox(struct index_args *iargs, int n_proc, char *prefix, tNow = get_monotonic_seconds(); if ( tNow >= sb->t_last_stats+STATS_EVERY_N_SECONDS ) { - STATUS("%i out of %i indexed so far," - " %i out of %i since the last message.\n", - sb->n_indexable, sb->n_processed, - sb->n_indexable - sb->n_indexable_last_stats, + STATUS("%4i indexable out of %4i processed, " + "%4i crystals so far. " + "%4i images processed since the last message.\n", + sb->n_hadcrystals, sb->n_processed, + sb->n_crystals, sb->n_processed - sb->n_processed_last_stats); - sb->n_indexable_last_stats = sb->n_indexable; sb->n_processed_last_stats = sb->n_processed; + sb->n_hadcrystals_last_stats = sb->n_hadcrystals; + sb->n_crystals_last_stats = sb->n_crystals; sb->t_last_stats = tNow; } @@ -947,14 +885,18 @@ void create_sandbox(struct index_args *iargs, int n_proc, char *prefix, for ( i=0; i<n_proc; i++ ) { if ( sb->running[i] ) allDone = 0; } - unlock_sandbox(sb); } fclose(fh); - pthread_mutex_destroy(&sb->lock); + /* Indicate to the reader thread that we are done */ + pthread_mutex_lock(&sb->reader->lock); + sb->reader->done = 1; + pthread_mutex_unlock(&sb->reader->lock); + + pthread_join(reader_thread, NULL); for ( i=0; i<n_proc; i++ ) { int status; @@ -963,21 +905,19 @@ void create_sandbox(struct index_args *iargs, int n_proc, char *prefix, for ( i=0; i<n_proc; i++ ) { close(sb->filename_pipes[i]); - fclose(sb->result_fhs[i]); + if ( sb->result_fhs[i] != NULL ) fclose(sb->result_fhs[i]); } - for ( i=0; i<sb->n_proc; i++ ) { - fclose(sb->fhs[i]); - } - free(sb->fhs); + free(sb->running); free(sb->filename_pipes); free(sb->result_fhs); free(sb->pids); - free(sb->running); - if ( ofh != stdout ) fclose(ofh); + pthread_mutex_destroy(&sb->lock); - STATUS("There were %i images, of which %i could be indexed.\n", - sb->n_processed, sb->n_indexable); + STATUS("Final:" + " %i images processed, %i had crystals, %i crystals overall.\n", + sb->n_processed, sb->n_hadcrystals, sb->n_crystals); + free(sb); } |