diff options
author | Thomas White <taw@bitwiz.org.uk> | 2012-07-15 06:15:59 -0400 |
---|---|---|
committer | Thomas White <taw@bitwiz.org.uk> | 2012-07-15 06:15:59 -0400 |
commit | 9ff3210f4027c679d86c2e0b63b71ed12aebf74b (patch) | |
tree | 4c2e1781844b9321a5d3358b8d5f72525a0bfdd1 /src/indexamajig.c | |
parent | 57a44a9738308d73ada759b5663c84dc0f535256 (diff) |
Move indexer sandbox to a new file
Diffstat (limited to 'src/indexamajig.c')
-rw-r--r-- | src/indexamajig.c | 859 |
1 files changed, 3 insertions, 856 deletions
diff --git a/src/indexamajig.c b/src/indexamajig.c index b8e57048..c2c9ed2c 100644 --- a/src/indexamajig.c +++ b/src/indexamajig.c @@ -44,10 +44,6 @@ #include <getopt.h> #include <hdf5.h> #include <gsl/gsl_errno.h> -#include <pthread.h> -#include <sys/wait.h> -#include <fcntl.h> -#include <signal.h> #ifdef HAVE_CLOCK_GETTIME #include <time.h> @@ -67,57 +63,7 @@ #include "stream.h" #include "reflist-utils.h" - -/* Write statistics at APPROXIMATELY this interval */ -#define STATS_EVERY_N_SECONDS (5) - -enum { - PEAK_ZAEF, - PEAK_HDF5, -}; - - -/* Information about the indexing process which is common to all patterns */ -struct index_args -{ - UnitCell *cell; - int config_cmfilter; - int config_noisefilter; - int config_verbose; - int stream_flags; /* What goes into the output? */ - int config_satcorr; - int config_closer; - int config_insane; - int config_bgsub; - float threshold; - float min_gradient; - float min_snr; - double min_int_snr; - struct detector *det; - IndexingMethod *indm; - IndexingPrivate **ipriv; - int peaks; /* Peak detection method */ - int cellr; - float tols[4]; - struct beam_params *beam; - char *element; - char *hdf5_peak_path; - double ir_inn; - double ir_mid; - double ir_out; - struct copy_hdf5_field *copyme; -}; - - -/* Information about the indexing process for one pattern */ -struct pattern_args -{ - /* "Input" */ - char *filename; - - /* "Output" */ - int indexable; -}; +#include "im-sandbox.h" static void show_help(const char *s) @@ -218,305 +164,6 @@ static void show_help(const char *s) } -static char *get_pattern(FILE *fh, char **use_this_one_instead, - int config_basename, const char *prefix) -{ - char *line; - char *filename; - - do { - - /* Get the next filename */ - if ( *use_this_one_instead != NULL ) { - - line = *use_this_one_instead; - *use_this_one_instead = NULL; - - } else { - - char *rval; - - line = malloc(1024*sizeof(char)); - rval = fgets(line, 1023, fh); - if ( rval == NULL ) { - free(line); - return NULL; - } - - } - - chomp(line); - - } while ( strlen(line) == 0 ); - - if ( config_basename ) { - char *tmp; - tmp = safe_basename(line); - free(line); - line = tmp; - } - - filename = malloc(strlen(prefix)+strlen(line)+1); - - snprintf(filename, 1023, "%s%s", prefix, line); - - free(line); - - return filename; -} - - -static void process_image(const struct index_args *iargs, - struct pattern_args *pargs, FILE *ofh, - int cookie) -{ - float *data_for_measurement; - size_t data_size; - UnitCell *cell = iargs->cell; - int config_cmfilter = iargs->config_cmfilter; - int config_noisefilter = iargs->config_noisefilter; - int config_verbose = iargs->config_verbose; - IndexingMethod *indm = iargs->indm; - struct beam_params *beam = iargs->beam; - int r, check; - struct hdfile *hdfile; - struct image image; - - image.features = NULL; - image.data = NULL; - image.flags = NULL; - image.indexed_cell = NULL; - image.det = copy_geom(iargs->det); - image.copyme = iargs->copyme; - image.reflections = NULL; - image.id = cookie; - image.filename = pargs->filename; - image.beam = beam; - - hdfile = hdfile_open(image.filename); - if ( hdfile == NULL ) return; - - r = hdfile_set_first_image(hdfile, "/"); - if ( r ) { - ERROR("Couldn't select first path\n"); - hdfile_close(hdfile); - return; - } - - check = hdf5_read(hdfile, &image, 1); - if ( check ) { - hdfile_close(hdfile); - return; - } - - if ( (image.width != image.det->max_fs + 1 ) - || (image.height != image.det->max_ss + 1)) - { - ERROR("Image size doesn't match geometry size" - " - rejecting image.\n"); - ERROR("Image size: %i,%i. Geometry size: %i,%i\n", - image.width, image.height, - image.det->max_fs + 1, image.det->max_ss + 1); - hdfile_close(hdfile); - free_detector_geometry(image.det); - return; - } - - if ( image.lambda < 0.0 ) { - if ( beam != NULL ) { - ERROR("Using nominal photon energy of %.2f eV\n", - beam->photon_energy); - image.lambda = ph_en_to_lambda( - eV_to_J(beam->photon_energy)); - } else { - ERROR("No wavelength in file, so you need to give " - "a beam parameters file with -b.\n"); - hdfile_close(hdfile); - free_detector_geometry(image.det); - return; - } - } - fill_in_values(image.det, hdfile); - - if ( config_cmfilter ) { - filter_cm(&image); - } - - /* Take snapshot of image after CM subtraction but before - * the aggressive noise filter. */ - data_size = image.width * image.height * sizeof(float); - data_for_measurement = malloc(data_size); - - if ( config_noisefilter ) { - filter_noise(&image, data_for_measurement); - } else { - memcpy(data_for_measurement, image.data, data_size); - } - - switch ( iargs->peaks ) { - - case PEAK_HDF5: - // Get peaks from HDF5 - if (get_peaks(&image, hdfile, - iargs->hdf5_peak_path)) { - ERROR("Failed to get peaks from HDF5 file.\n"); - } - break; - - case PEAK_ZAEF: - search_peaks(&image, iargs->threshold, - iargs->min_gradient, iargs->min_snr, - iargs->ir_inn, iargs->ir_mid, iargs->ir_out); - break; - - } - - /* Get rid of noise-filtered version at this point - * - it was strictly for the purposes of peak detection. */ - free(image.data); - image.data = data_for_measurement; - - /* Calculate orientation matrix (by magic) */ - image.div = beam->divergence; - image.bw = beam->bandwidth; - image.profile_radius = 0.0001e9; - - index_pattern(&image, cell, indm, iargs->cellr, - config_verbose, iargs->ipriv, - iargs->config_insane, iargs->tols); - - if ( image.indexed_cell != NULL ) { - pargs->indexable = 1; - image.reflections = find_intersections(&image, - image.indexed_cell); - if (image.reflections != NULL) { - integrate_reflections(&image, - iargs->config_closer, - iargs->config_bgsub, - iargs->min_int_snr, - iargs->ir_inn, - iargs->ir_mid, - iargs->ir_out); - } - } else { - image.reflections = NULL; - } - - write_chunk(ofh, &image, hdfile, iargs->stream_flags); - fprintf(ofh, "END\n"); - fflush(ofh); - - /* Only free cell if found */ - cell_free(image.indexed_cell); - - reflist_free(image.reflections); - free(image.data); - if ( image.flags != NULL ) free(image.flags); - image_feature_list_free(image.features); - hdfile_close(hdfile); - free_detector_geometry(image.det); -} - - -static void run_work(const struct index_args *iargs, - int filename_pipe, int results_pipe, FILE *ofh, - int cookie) -{ - int allDone = 0; - FILE *fh; - - fh = fdopen(filename_pipe, "r"); - if ( fh == NULL ) { - ERROR("Failed to fdopen() the filename pipe!\n"); - return; - } - - while ( !allDone ) { - - struct pattern_args pargs; - int w, c; - char buf[1024]; - char *line; - char *rval; - - line = malloc(1024*sizeof(char)); - rval = fgets(line, 1023, fh); - if ( rval == NULL ) { - free(line); - if ( feof(fh) ) { - allDone = 1; - continue; - } else { - ERROR("Read error!\n"); - break; - } - } - - chomp(line); - - if ( strlen(line) == 0 ) { - - allDone = 1; - - } else { - - pargs.filename = line; - pargs.indexable = 0; - - process_image(iargs, &pargs, ofh, cookie); - - /* Request another image */ - c = sprintf(buf, "%i\n", pargs.indexable); - w = write(results_pipe, buf, c); - if ( w < 0 ) { - ERROR("write P0\n"); - } - - } - - free(line); - - } - - /* close my pipes */ - fclose(fh); - - cleanup_indexing(iargs->ipriv); - free(iargs->indm); - free(iargs->ipriv); - free_detector_geometry(iargs->det); - free(iargs->beam); - free(iargs->element); - free(iargs->hdf5_peak_path); - free_copy_hdf5_field_list(iargs->copyme); - cell_free(iargs->cell); -} - - -#ifdef HAVE_CLOCK_GETTIME - -static time_t get_monotonic_seconds() -{ - struct timespec tp; - clock_gettime(CLOCK_MONOTONIC, &tp); - return tp.tv_sec; -} - -#else - -/* Fallback version of the above. The time according to gettimeofday() is not - * monotonic, so measuring intervals based on it will screw up if there's a - * timezone change (e.g. daylight savings) while the program is running. */ -static time_t get_monotonic_seconds() -{ - struct timeval tp; - gettimeofday(&tp, NULL); - return tp.tv_sec; -} - -#endif - - static int parse_cell_reduction(const char *scellr, int *err, int *reduction_needs_cell) { @@ -541,142 +188,6 @@ static int parse_cell_reduction(const char *scellr, int *err, } -static void pump_chunk(FILE *fh, int *finished, FILE *ofh) -{ - int chunk_started = 0; - int chunk_finished = 0; - - do { - - char line[1024]; - char *rval; - - rval = fgets(line, 1024, fh); - if ( rval == NULL ) { - - if ( feof(fh) ) { - /* Process died */ - *finished = 1; - if ( chunk_started ) { - ERROR("EOF during chunk!\n"); - fprintf(ofh, "Chunk is unfinished!\n"); - } - } else { - ERROR("fgets() failed: %s\n", strerror(errno)); - } - chunk_finished = 1; - continue; - - } - - if ( strcmp(line, "END\n") == 0 ) { - chunk_finished = 1; - } else { - chunk_started = 1; - fprintf(ofh, "%s", line); - } - - } while ( !chunk_finished ); -} - - -static void run_reader(int *stream_pipe_read, int n_proc, FILE *ofh) -{ - int done = 0; - int *finished; - FILE **fhs; - int i; - - finished = calloc(n_proc, sizeof(int)); - if ( finished == NULL ) { - ERROR("Couldn't allocate memory for flags!\n"); - exit(1); - } - - fhs = calloc(n_proc, sizeof(FILE *)); - if ( fhs == NULL ) { - ERROR("Couldn't allocate memory for file handles!\n"); - exit(1); - } - - for ( i=0; i<n_proc; i++ ) { - fhs[i] = fdopen(stream_pipe_read[i], "r"); - if ( fhs[i] == NULL ) { - ERROR("Couldn't fdopen() stream!\n"); - exit(1); - } - } - - while ( !done ) { - - int r, i; - struct timeval tv; - fd_set fds; - int fdmax; - - tv.tv_sec = 5; - tv.tv_usec = 0; - - FD_ZERO(&fds); - fdmax = 0; - for ( i=0; i<n_proc; i++ ) { - - int fd; - - if ( finished[i] ) continue; - - fd = stream_pipe_read[i]; - - FD_SET(fd, &fds); - if ( fd > fdmax ) fdmax = fd; - - } - - r = select(fdmax+1, &fds, NULL, NULL, &tv); - - if ( r == -1 ) { - ERROR("select() failed: %s\n", strerror(errno)); - continue; - } - - if ( r == 0 ) continue; /* Nothing this time. Try again */ - - for ( i=0; i<n_proc; i++ ) { - - if ( finished[i] ) continue; - - if ( !FD_ISSET(stream_pipe_read[i], &fds) ) continue; - - pump_chunk(fhs[i], &finished[i], ofh); - - } - - done = 1; - for ( i=0; i<n_proc; i++ ) { - if ( !finished[i] ) done = 0; - } - - } - - free(finished); - - for ( i=0; i<n_proc; i++ ) { - fclose(fhs[i]); - } - free(fhs); - - if ( ofh != stdout ) fclose(ofh); -} - - -static void signal_handler(int sig, siginfo_t *si, void *uc_v) -{ - struct ucontext_t *uc = uc_v; - - STATUS("Signal!\n"); -} - - int main(int argc, char *argv[]) { int c; @@ -730,20 +241,6 @@ int main(int argc, char *argv[]) float ir_inn = 4.0; float ir_mid = 5.0; float ir_out = 7.0; - int n_indexable, n_processed, n_indexable_last_stats; - int n_processed_last_stats; - int t_last_stats; - pid_t *pids; - int *filename_pipes; - int *stream_pipe_read; - int *stream_pipe_write; - FILE **result_fhs; - int i; - int allDone; - int *finished; - pid_t pr; - struct sigaction sa; - int r; copyme = new_copy_hdf5_field_list(); if ( copyme == NULL ) { @@ -1139,360 +636,10 @@ int main(int argc, char *argv[]) iargs.ir_mid = ir_mid; iargs.ir_out = ir_out; - n_indexable = 0; - n_processed = 0; - n_indexable_last_stats = 0; - n_processed_last_stats = 0; - t_last_stats = get_monotonic_seconds(); - - stream_pipe_read = calloc(n_proc, sizeof(int)); - stream_pipe_write = calloc(n_proc, sizeof(int)); - if ( stream_pipe_read == NULL ) { - ERROR("Couldn't allocate memory for pipes.\n"); - return 1; - } - if ( stream_pipe_write == NULL ) { - ERROR("Couldn't allocate memory for pipes.\n"); - return 1; - } - - for ( i=0; i<n_proc; i++ ) { - - int stream_pipe[2]; - - if ( pipe(stream_pipe) == - 1 ) { - ERROR("pipe() failed!\n"); - return 1; - } - - stream_pipe_read[i] = stream_pipe[0]; - stream_pipe_write[i] = stream_pipe[1]; - - } - - pr = fork(); - if ( pr == - 1 ) { - ERROR("fork() failed (for reader process)\n"); - return 1; - } - - if ( pr == 0 ) { - - /* Free resources not needed by reader - * (but which will be needed by worker or master) */ - for ( i=0; i<n_proc; i++ ) { - close(stream_pipe_write[i]); - } - free(prefix); - free(use_this_one_instead); - free(stream_pipe_write); - cleanup_indexing(ipriv); - free(indm); - free(ipriv); - free_detector_geometry(det); - free(beam); - free(element); - free(hdf5_peak_path); - free_copy_hdf5_field_list(copyme); - cell_free(cell); - fclose(fh); - - run_reader(stream_pipe_read, n_proc, ofh); - - free(stream_pipe_read); - - exit(0); - - } - - /* Set up signal handler to take action if any children die */ - sa.sa_flags = SA_SIGINFO | SA_NOCLDSTOP; - sigemptyset(&sa.sa_mask); - sa.sa_sigaction = signal_handler; - r = sigaction(SIGCHLD, &sa, NULL); - if ( r == -1 ) { - ERROR("Failed to set signal handler!\n"); - return 1; - } - - /* Free resources needed by reader only */ - if ( ofh != stdout ) fclose(ofh); - for ( i=0; i<n_proc; i++ ) { - close(stream_pipe_read[i]); - } - free(stream_pipe_read); - - filename_pipes = calloc(n_proc, sizeof(int)); - result_fhs = calloc(n_proc, sizeof(FILE *)); - pids = calloc(n_proc, sizeof(pid_t)); - if ( filename_pipes == NULL ) { - ERROR("Couldn't allocate memory for pipes.\n"); - return 1; - } - if ( result_fhs == NULL ) { - ERROR("Couldn't allocate memory for pipe file handles.\n"); - return 1; - } - if ( pids == NULL ) { - ERROR("Couldn't allocate memory for PIDs.\n"); - return 1; - } - - /* Fork the right number of times */ - for ( i=0; i<n_proc; i++ ) { - - pid_t p; - int filename_pipe[2]; - int result_pipe[2]; - - if ( pipe(filename_pipe) == - 1 ) { - ERROR("pipe() failed!\n"); - return 1; - } - - if ( pipe(result_pipe) == - 1 ) { - ERROR("pipe() failed!\n"); - return 1; - } - - p = fork(); - if ( p == -1 ) { - ERROR("fork() failed!\n"); - return 1; - } - - if ( p == 0 ) { - - FILE *sfh; - int j; - - /* Free resources which will not be needed by worker */ - for ( j=0; j<n_proc; j++ ) { - if ( i != j ) close(stream_pipe_write[j]); - } - for ( j=0; j<i-1; j++ ) { - fclose(result_fhs[j]); - close(filename_pipes[j]); - } - free(prefix); - free(use_this_one_instead); - free(filename_pipes); - free(result_fhs); - fclose(fh); - free(pids); - - /* Child process gets the 'read' end of the filename - * pipe, and the 'write' end of the result pipe. */ - close(filename_pipe[1]); - close(result_pipe[0]); - - sfh = fdopen(stream_pipe_write[i], "w"); - run_work(&iargs, filename_pipe[0], result_pipe[1], - sfh, i); - fclose(sfh); - - free(stream_pipe_write); - close(filename_pipe[0]); - close(result_pipe[1]); - - exit(0); - - } - - /* Parent process gets the 'write' end of the filename pipe - * and the 'read' end of the result pipe. */ - pids[i] = p; - close(filename_pipe[0]); - close(result_pipe[1]); - filename_pipes[i] = filename_pipe[1]; - - result_fhs[i] = fdopen(result_pipe[0], "r"); - if ( result_fhs[i] == NULL ) { - ERROR("fdopen() failed.\n"); - return 1; - } - - } - - /* Free resources which will not be used by the main thread */ - cleanup_indexing(ipriv); - free(indm); - free(ipriv); - free_detector_geometry(det); - free(beam); - free(element); - free(hdf5_peak_path); - free_copy_hdf5_field_list(copyme); - cell_free(cell); - for ( i=0; i<n_proc; i++ ) { - close(stream_pipe_write[i]); - } - free(stream_pipe_write); - - /* Send first image to all children */ - for ( i=0; i<n_proc; i++ ) { - - char *nextImage; - - nextImage = get_pattern(fh, &use_this_one_instead, - config_basename, prefix); - - if ( nextImage != NULL ) { - - write(filename_pipes[i], nextImage, strlen(nextImage)); - write(filename_pipes[i], "\n", 1); - - free(nextImage); - - } else { - - int r; - - /* No more files to process.. already? */ - r = write(filename_pipes[i], "\n", 1); - if ( r < 0 ) { - ERROR("Write pipe\n"); - } - - } - - } - - finished = calloc(n_proc, sizeof(int)); - if ( finished == NULL ) { - ERROR("Couldn't allocate memory for process flags.\n"); - return 1; - } - - allDone = 0; - while ( !allDone ) { - - int r, i; - struct timeval tv; - fd_set fds; - double tNow; - int fdmax; - - tv.tv_sec = 5; - tv.tv_usec = 0; - - FD_ZERO(&fds); - fdmax = 0; - for ( i=0; i<n_proc; i++ ) { - - int fd; - - if ( finished[i] ) continue; - - fd = fileno(result_fhs[i]); - FD_SET(fd, &fds); - if ( fd > fdmax ) fdmax = fd; - - } - - r = select(fdmax+1, &fds, NULL, NULL, &tv); - - if ( r == -1 ) { - ERROR("select() failed: %s\n", strerror(errno)); - continue; - } - - if ( r == 0 ) continue; /* No progress this time. Try again */ - - for ( i=0; i<n_proc; i++ ) { - - char *nextImage; - char results[1024]; - char *rval; - int fd; - - if ( finished[i] ) continue; - - fd = fileno(result_fhs[i]); - if ( !FD_ISSET(fd, &fds) ) continue; - - rval = fgets(results, 1024, result_fhs[i]); - if ( rval == NULL ) { - if ( feof(result_fhs[i]) ) { - /* Process died */ - finished[i] = 1; - } else { - ERROR("fgets() failed: %s\n", - strerror(errno)); - } - continue; - } - - chomp(results); - n_indexable += atoi(results); - n_processed++; - - /* Send next filename */ - nextImage = get_pattern(fh, &use_this_one_instead, - config_basename, prefix); - - if ( nextImage == NULL ) { - /* No more images */ - r = write(filename_pipes[i], "\n", 1); - if ( r < 0 ) { - ERROR("Write pipe\n"); - } - } else { - - r = write(filename_pipes[i], nextImage, - strlen(nextImage)); - r -= write(filename_pipes[i], "\n", 1); - if ( r < 0 ) { - ERROR("write pipe\n"); - } - free(nextImage); - } - - } - - /* Update progress */ - tNow = get_monotonic_seconds(); - if ( tNow >= t_last_stats+STATS_EVERY_N_SECONDS ) { - - STATUS("%i out of %i indexed so far," - " %i out of %i since the last message.\n", - n_indexable, n_processed, - n_indexable - n_indexable_last_stats, - n_processed - n_processed_last_stats); - - n_indexable_last_stats = n_indexable; - n_processed_last_stats = n_processed; - t_last_stats = tNow; - - } - - allDone = 1; - for ( i=0; i<n_proc; i++ ) { - if ( !finished[i] ) allDone = 0; - } - - } - - fclose(fh); - - for ( i=0; i<n_proc; i++ ) { - int status; - waitpid(pids[i], &status, 0); - } - - for ( i=0; i<n_proc; i++ ) { - close(filename_pipes[i]); - fclose(result_fhs[i]); - } + create_sandbox(&iargs, n_proc, prefix, config_basename, fh, + use_this_one_instead, ofh); free(prefix); - free(filename_pipes); - free(result_fhs); - free(pids); - free(finished); - - STATUS("There were %i images, of which %i could be indexed.\n", - n_processed, n_indexable); return 0; } |