diff options
author | Thomas White <taw@physics.org> | 2012-07-23 15:00:33 +0200 |
---|---|---|
committer | Thomas White <taw@physics.org> | 2012-07-23 15:00:33 +0200 |
commit | 9964cdeafc1338f5841270bc5392ad8e3c8eb33e (patch) | |
tree | 0fe784c1f27967034690b32bc67cb4426d79aae3 /src | |
parent | e63626f4207fa58d3ea70b68f2862630d281598d (diff) | |
parent | 6eb751c2230226723884cc7bc473d65b91064b81 (diff) |
Merge branch 'tom/speed'
Diffstat (limited to 'src')
-rw-r--r-- | src/im-sandbox.c | 956 | ||||
-rw-r--r-- | src/im-sandbox.h | 86 | ||||
-rw-r--r-- | src/indexamajig.c | 540 | ||||
-rw-r--r-- | src/partial_sim.c | 4 |
4 files changed, 1098 insertions, 488 deletions
diff --git a/src/im-sandbox.c b/src/im-sandbox.c new file mode 100644 index 00000000..b2f14bbb --- /dev/null +++ b/src/im-sandbox.c @@ -0,0 +1,956 @@ +/* + * im-sandbox.c + * + * Sandbox for indexing + * + * Copyright © 2012 Deutsches Elektronen-Synchrotron DESY, + * a research centre of the Helmholtz Association. + * Copyright © 2012 Richard Kirian + * Copyright © 2012 Lorenzo Galli + * + * Authors: + * 2010-2012 Thomas White <taw@physics.org> + * 2011 Richard Kirian + * 2012 Lorenzo Galli + * 2012 Chunhong Yoon + * + * This file is part of CrystFEL. + * + * CrystFEL is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * CrystFEL is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with CrystFEL. If not, see <http://www.gnu.org/licenses/>. + * + */ + +#ifdef HAVE_CONFIG_H +#include <config.h> +#endif + +#include <stdarg.h> +#include <stdlib.h> +#include <stdio.h> +#include <string.h> +#include <unistd.h> +#include <getopt.h> +#include <hdf5.h> +#include <gsl/gsl_errno.h> +#include <pthread.h> +#include <sys/wait.h> +#include <fcntl.h> +#include <signal.h> + +#ifdef HAVE_CLOCK_GETTIME +#include <time.h> +#else +#include <sys/time.h> +#endif + +#include "utils.h" +#include "hdf5-file.h" +#include "index.h" +#include "peaks.h" +#include "detector.h" +#include "filters.h" +#include "thread-pool.h" +#include "beam-parameters.h" +#include "geometry.h" +#include "stream.h" +#include "reflist-utils.h" + +#include "im-sandbox.h" + + +/* Write statistics at APPROXIMATELY this interval */ +#define STATS_EVERY_N_SECONDS (5) + + +struct sandbox +{ + pthread_mutex_t lock; + + int n_indexable; + int n_processed; + int n_indexable_last_stats; + int n_processed_last_stats; + int t_last_stats; + + struct index_args *iargs; + + int n_proc; + pid_t *pids; + FILE *ofh; + FILE **fhs; + + int *running; + FILE **result_fhs; + int *filename_pipes; + int *stream_pipe_read; + int *stream_pipe_write; + char **last_filename; +}; + + +/* Horrible global variable for signal handler */ +int signal_pipe[2]; + + +static void lock_sandbox(struct sandbox *sb) +{ + pthread_mutex_lock(&sb->lock); +} + + +static void unlock_sandbox(struct sandbox *sb) +{ + pthread_mutex_unlock(&sb->lock); +} + + +static char *get_pattern(FILE *fh, char **use_this_one_instead, + int config_basename, const char *prefix) +{ + char *line; + char *filename; + + do { + + /* Get the next filename */ + if ( *use_this_one_instead != NULL ) { + + line = *use_this_one_instead; + *use_this_one_instead = NULL; + + } else { + + char *rval; + + line = malloc(1024*sizeof(char)); + rval = fgets(line, 1023, fh); + if ( rval == NULL ) { + free(line); + return NULL; + } + + } + + chomp(line); + + } while ( strlen(line) == 0 ); + + if ( config_basename ) { + char *tmp; + tmp = safe_basename(line); + free(line); + line = tmp; + } + + filename = malloc(strlen(prefix)+strlen(line)+1); + + snprintf(filename, 1023, "%s%s", prefix, line); + + free(line); + + return filename; +} + + +static void process_image(const struct index_args *iargs, + struct pattern_args *pargs, FILE *ofh, + int cookie) +{ + float *data_for_measurement; + size_t data_size; + UnitCell *cell = iargs->cell; + int config_cmfilter = iargs->config_cmfilter; + int config_noisefilter = iargs->config_noisefilter; + int config_verbose = iargs->config_verbose; + IndexingMethod *indm = iargs->indm; + struct beam_params *beam = iargs->beam; + int r, check; + struct hdfile *hdfile; + struct image image; + + image.features = NULL; + image.data = NULL; + image.flags = NULL; + image.indexed_cell = NULL; + image.det = copy_geom(iargs->det); + image.copyme = iargs->copyme; + image.reflections = NULL; + image.id = cookie; + image.filename = pargs->filename; + image.beam = beam; + + hdfile = hdfile_open(image.filename); + if ( hdfile == NULL ) return; + + r = hdfile_set_first_image(hdfile, "/"); + if ( r ) { + ERROR("Couldn't select first path\n"); + hdfile_close(hdfile); + return; + } + + check = hdf5_read(hdfile, &image, 1); + if ( check ) { + hdfile_close(hdfile); + return; + } + + if ( (image.width != image.det->max_fs + 1 ) + || (image.height != image.det->max_ss + 1)) + { + ERROR("Image size doesn't match geometry size" + " - rejecting image.\n"); + ERROR("Image size: %i,%i. Geometry size: %i,%i\n", + image.width, image.height, + image.det->max_fs + 1, image.det->max_ss + 1); + hdfile_close(hdfile); + free_detector_geometry(image.det); + return; + } + + if ( image.lambda < 0.0 ) { + if ( beam != NULL ) { + ERROR("Using nominal photon energy of %.2f eV\n", + beam->photon_energy); + image.lambda = ph_en_to_lambda( + eV_to_J(beam->photon_energy)); + } else { + ERROR("No wavelength in file, so you need to give " + "a beam parameters file with -b.\n"); + hdfile_close(hdfile); + free_detector_geometry(image.det); + return; + } + } + fill_in_values(image.det, hdfile); + + if ( config_cmfilter ) { + filter_cm(&image); + } + + /* Take snapshot of image after CM subtraction but before + * the aggressive noise filter. */ + data_size = image.width * image.height * sizeof(float); + data_for_measurement = malloc(data_size); + + if ( config_noisefilter ) { + filter_noise(&image, data_for_measurement); + } else { + memcpy(data_for_measurement, image.data, data_size); + } + + switch ( iargs->peaks ) { + + case PEAK_HDF5: + // Get peaks from HDF5 + if (get_peaks(&image, hdfile, + iargs->hdf5_peak_path)) { + ERROR("Failed to get peaks from HDF5 file.\n"); + } + break; + + case PEAK_ZAEF: + search_peaks(&image, iargs->threshold, + iargs->min_gradient, iargs->min_snr, + iargs->ir_inn, iargs->ir_mid, iargs->ir_out); + break; + + } + + /* Get rid of noise-filtered version at this point + * - it was strictly for the purposes of peak detection. */ + free(image.data); + image.data = data_for_measurement; + + /* Calculate orientation matrix (by magic) */ + image.div = beam->divergence; + image.bw = beam->bandwidth; + image.profile_radius = 0.0001e9; + + index_pattern(&image, cell, indm, iargs->cellr, + config_verbose, iargs->ipriv, + iargs->config_insane, iargs->tols); + + if ( image.indexed_cell != NULL ) { + pargs->indexable = 1; + image.reflections = find_intersections(&image, + image.indexed_cell); + if (image.reflections != NULL) { + integrate_reflections(&image, + iargs->config_closer, + iargs->config_bgsub, + iargs->min_int_snr, + iargs->ir_inn, + iargs->ir_mid, + iargs->ir_out); + } + } else { + image.reflections = NULL; + } + + write_chunk(ofh, &image, hdfile, iargs->stream_flags); + fprintf(ofh, "END\n"); + fflush(ofh); + + /* Only free cell if found */ + cell_free(image.indexed_cell); + + reflist_free(image.reflections); + free(image.data); + if ( image.flags != NULL ) free(image.flags); + image_feature_list_free(image.features); + hdfile_close(hdfile); + free_detector_geometry(image.det); +} + + +static void run_work(const struct index_args *iargs, + int filename_pipe, int results_pipe, FILE *ofh, + int cookie) +{ + int allDone = 0; + FILE *fh; + int w; + + fh = fdopen(filename_pipe, "r"); + if ( fh == NULL ) { + ERROR("Failed to fdopen() the filename pipe!\n"); + return; + } + + w = write(results_pipe, "\n", 1); + if ( w < 0 ) { + ERROR("Failed to send request for first filename.\n"); + } + + while ( !allDone ) { + + struct pattern_args pargs; + int c; + char *line; + char *rval; + char buf[1024]; + + line = malloc(1024*sizeof(char)); + rval = fgets(line, 1023, fh); + if ( rval == NULL ) { + + ERROR("Read error!\n"); + free(line); + allDone = 1; + continue; + + } + + chomp(line); + + if ( strlen(line) == 0 ) { + + allDone = 1; + + } else { + + pargs.filename = line; + pargs.indexable = 0; + + process_image(iargs, &pargs, ofh, cookie); + + /* Request another image */ + c = sprintf(buf, "%i\n", pargs.indexable); + w = write(results_pipe, buf, c); + if ( w < 0 ) { + ERROR("write P0\n"); + } + + } + + free(line); + + } + + cleanup_indexing(iargs->ipriv); + free(iargs->indm); + free(iargs->ipriv); + free_detector_geometry(iargs->det); + free(iargs->beam); + free(iargs->element); + free(iargs->hdf5_peak_path); + free_copy_hdf5_field_list(iargs->copyme); + cell_free(iargs->cell); + fclose(fh); +} + + +#ifdef HAVE_CLOCK_GETTIME + +static time_t get_monotonic_seconds() +{ + struct timespec tp; + clock_gettime(CLOCK_MONOTONIC, &tp); + return tp.tv_sec; +} + +#else + +/* Fallback version of the above. The time according to gettimeofday() is not + * monotonic, so measuring intervals based on it will screw up if there's a + * timezone change (e.g. daylight savings) while the program is running. */ +static time_t get_monotonic_seconds() +{ + struct timeval tp; + gettimeofday(&tp, NULL); + return tp.tv_sec; +} + +#endif + +static int pump_chunk(FILE *fh, FILE *ofh) +{ + int chunk_started = 0; + int chunk_finished = 0; + + do { + + char line[1024]; + char *rval; + + rval = fgets(line, 1024, fh); + if ( rval == NULL ) { + + if ( feof(fh) ) { + /* Process died */ + if ( chunk_started ) { + ERROR("EOF during chunk!\n"); + fprintf(ofh, "Chunk is unfinished!\n"); + } + return 1; + } else { + ERROR("fgets() failed: %s\n", strerror(errno)); + } + chunk_finished = 1; + continue; + + } + + if ( strcmp(line, "END\n") == 0 ) { + chunk_finished = 1; + } else { + chunk_started = 1; + fprintf(ofh, "%s", line); + } + + } while ( !chunk_finished ); + + return 0; +} + + +static void *run_reader(void *sbv) +{ + struct sandbox *sb = sbv; + int done = 0; + + while ( !done ) { + + int r, i; + struct timeval tv; + fd_set fds; + int fdmax; + + tv.tv_sec = 5; + tv.tv_usec = 0; + + FD_ZERO(&fds); + fdmax = 0; + lock_sandbox(sb); + for ( i=0; i<sb->n_proc; i++ ) { + + int fd; + + if ( !sb->running[i] ) continue; + + fd = sb->stream_pipe_read[i]; + + FD_SET(fd, &fds); + if ( fd > fdmax ) fdmax = fd; + + } + + unlock_sandbox(sb); + + r = select(fdmax+1, &fds, NULL, NULL, &tv); + + if ( r == -1 ) { + if ( errno != EINTR ) { + ERROR("select() failed: %s\n", strerror(errno)); + } /* Otherwise no big deal */ + continue; + } + + if ( r == 0 ) continue; /* Nothing this time. Try again */ + + lock_sandbox(sb); + for ( i=0; i<sb->n_proc; i++ ) { + + if ( !sb->running[i] ) continue; + + if ( !FD_ISSET(sb->stream_pipe_read[i], &fds) ) continue; + + if ( pump_chunk(sb->fhs[i], sb->ofh) ) { + sb->running[i] = 0; + } + + } + + done = 1; + for ( i=0; i<sb->n_proc; i++ ) { + if ( sb->running[i] ) done = 0; + } + unlock_sandbox(sb); + + } + + return NULL; +} + + +static void start_worker_process(struct sandbox *sb, int slot) +{ + pid_t p; + int filename_pipe[2]; + int result_pipe[2]; + + if ( pipe(filename_pipe) == - 1 ) { + ERROR("pipe() failed!\n"); + return; + } + + if ( pipe(result_pipe) == - 1 ) { + ERROR("pipe() failed!\n"); + return; + } + + p = fork(); + if ( p == -1 ) { + ERROR("fork() failed!\n"); + return; + } + + if ( p == 0 ) { + + FILE *sfh; + int j; + struct sigaction sa; + int r; + + /* First, disconnect the signal handler */ + sa.sa_flags = 0; + sigemptyset(&sa.sa_mask); + sa.sa_handler = SIG_DFL; + r = sigaction(SIGCHLD, &sa, NULL); + if ( r == -1 ) { + ERROR("Failed to set signal handler!\n"); + return; + } + + /* Free resources which will not be needed by worker */ + for ( j=0; j<sb->n_proc; j++ ) { + if ( (j != slot) && (sb->running[j]) ) { + close(sb->stream_pipe_write[j]); + } + } + for ( j=0; j<sb->n_proc; j++ ) { + if ( (j != slot) && (sb->running[j]) ) { + fclose(sb->result_fhs[j]); + close(sb->filename_pipes[j]); + } + } + free(sb->filename_pipes); + free(sb->result_fhs); + free(sb->pids); + /* Also prefix, use_this_one_instead and fh */ + + /* Child process gets the 'read' end of the filename + * pipe, and the 'write' end of the result pipe. */ + close(filename_pipe[1]); + close(result_pipe[0]); + + sfh = fdopen(sb->stream_pipe_write[slot], "w"); + run_work(sb->iargs, filename_pipe[0], result_pipe[1], + sfh, slot); + fclose(sfh); + + //close(filename_pipe[0]); + close(result_pipe[1]); + + exit(0); + + } + + /* Parent process gets the 'write' end of the filename pipe + * and the 'read' end of the result pipe. */ + sb->pids[slot] = p; + sb->running[slot] = 1; + close(filename_pipe[0]); + close(result_pipe[1]); + sb->filename_pipes[slot] = filename_pipe[1]; + sb->fhs[slot] = fdopen(sb->stream_pipe_read[slot], "r"); + if ( sb->fhs[slot] == NULL ) { + ERROR("Couldn't fdopen() stream!\n"); + return; + } + + sb->result_fhs[slot] = fdopen(result_pipe[0], "r"); + if ( sb->result_fhs[slot] == NULL ) { + ERROR("fdopen() failed.\n"); + return; + } +} + + +static void signal_handler(int sig, siginfo_t *si, void *uc_v) +{ + write(signal_pipe[1], "\n", 1); +} + + +static void handle_zombie(struct sandbox *sb) +{ + int i; + + lock_sandbox(sb); + for ( i=0; i<sb->n_proc; i++ ) { + + int status, p; + + if ( !sb->running[i] ) continue; + + p = waitpid(sb->pids[i], &status, WNOHANG); + + if ( p == -1 ) { + ERROR("waitpid() failed.\n"); + continue; + } + + if ( p == sb->pids[i] ) { + + sb->running[i] = 0; + + if ( WIFEXITED(status) ) { + STATUS("Worker %i exited normally.\n", i); + continue; + } + + if ( WIFSIGNALED(status) ) { + STATUS("Worker %i was killed by signal %i\n", + i, WTERMSIG(status)); + STATUS("Last filename was: %s\n", + sb->last_filename[i]); + start_worker_process(sb, i); + } + + } + + } + unlock_sandbox(sb); +} + + +void create_sandbox(struct index_args *iargs, int n_proc, char *prefix, + int config_basename, FILE *fh, char *use_this_one_instead, + FILE *ofh) +{ + int i; + int allDone; + struct sigaction sa; + int r; + pthread_t reader_thread; + struct sandbox *sb; + + sb = calloc(1, sizeof(struct sandbox)); + if ( sb == NULL ) { + ERROR("Couldn't allocate memory for sandbox.\n"); + return; + } + + sb->n_indexable = 0; + sb->n_processed = 0; + sb->n_indexable_last_stats = 0; + sb->n_processed_last_stats = 0; + sb->t_last_stats = get_monotonic_seconds(); + sb->n_proc = n_proc; + sb->ofh = ofh; + sb->iargs = iargs; + + pthread_mutex_init(&sb->lock, NULL); + + sb->stream_pipe_read = calloc(n_proc, sizeof(int)); + sb->stream_pipe_write = calloc(n_proc, sizeof(int)); + if ( sb->stream_pipe_read == NULL ) { + ERROR("Couldn't allocate memory for pipes.\n"); + return; + } + if ( sb->stream_pipe_write == NULL ) { + ERROR("Couldn't allocate memory for pipes.\n"); + return; + } + + for ( i=0; i<n_proc; i++ ) { + + int stream_pipe[2]; + + if ( pipe(stream_pipe) == - 1 ) { + ERROR("pipe() failed!\n"); + return; + } + + sb->stream_pipe_read[i] = stream_pipe[0]; + sb->stream_pipe_write[i] = stream_pipe[1]; + + } + + lock_sandbox(sb); + sb->filename_pipes = calloc(n_proc, sizeof(int)); + sb->result_fhs = calloc(n_proc, sizeof(FILE *)); + sb->pids = calloc(n_proc, sizeof(pid_t)); + sb->running = calloc(n_proc, sizeof(int)); + sb->fhs = calloc(sb->n_proc, sizeof(FILE *)); + if ( sb->filename_pipes == NULL ) { + ERROR("Couldn't allocate memory for pipes.\n"); + return; + } + if ( sb->result_fhs == NULL ) { + ERROR("Couldn't allocate memory for pipe file handles.\n"); + return; + } + if ( sb->pids == NULL ) { + ERROR("Couldn't allocate memory for PIDs.\n"); + return; + } + if ( sb->running == NULL ) { + ERROR("Couldn't allocate memory for process flags.\n"); + return; + } + + sb->last_filename = calloc(n_proc, sizeof(char *)); + if ( sb->last_filename == NULL ) { + ERROR("Couldn't allocate memory for last filename list.\n"); + return; + } + if ( sb->fhs == NULL ) { + ERROR("Couldn't allocate memory for file handles!\n"); + return; + } + unlock_sandbox(sb); + + if ( pthread_create(&reader_thread, NULL, run_reader, (void *)sb) ) { + ERROR("Failed to create reader thread.\n"); + return; + } + + if ( pipe(signal_pipe) == -1 ) { + ERROR("Failed to create signal pipe.\n"); + return; + } + + /* Set up signal handler to take action if any children die */ + sa.sa_flags = SA_SIGINFO | SA_NOCLDSTOP; + sigemptyset(&sa.sa_mask); + sa.sa_sigaction = signal_handler; + r = sigaction(SIGCHLD, &sa, NULL); + if ( r == -1 ) { + ERROR("Failed to set signal handler!\n"); + return; + } + + /* Fork the right number of times */ + lock_sandbox(sb); + for ( i=0; i<n_proc; i++ ) { + start_worker_process(sb, i); + } + unlock_sandbox(sb); + + allDone = 0; + while ( !allDone ) { + + int r, i; + struct timeval tv; + fd_set fds; + double tNow; + int fdmax; + + tv.tv_sec = 1; + tv.tv_usec = 0; + + FD_ZERO(&fds); + fdmax = 0; + lock_sandbox(sb); + for ( i=0; i<n_proc; i++ ) { + + int fd; + + if ( !sb->running[i] ) { + continue; + } + + fd = fileno(sb->result_fhs[i]); + FD_SET(fd, &fds); + if ( fd > fdmax ) fdmax = fd; + + } + unlock_sandbox(sb); + + FD_SET(signal_pipe[0], &fds); + if ( signal_pipe[0] > fdmax ) fdmax = signal_pipe[0]; + + r = select(fdmax+1, &fds, NULL, NULL, &tv); + if ( r == -1 ) { + if ( errno != EINTR ) { + ERROR("select() failed: %s\n", strerror(errno)); + } + continue; + } + + if ( r == 0 ) continue; /* No progress this time. Try again */ + + if ( FD_ISSET(signal_pipe[0], &fds) ) { + + char d; + read(signal_pipe[0], &d, 1); + handle_zombie(sb); + + } + + lock_sandbox(sb); + for ( i=0; i<n_proc; i++ ) { + + char *nextImage; + char results[1024]; + char *rval; + int fd; + int n; + char *eptr; + + if ( !sb->running[i] ) { + continue; + } + + fd = fileno(sb->result_fhs[i]); + if ( !FD_ISSET(fd, &fds) ) { + continue; + } + + rval = fgets(results, 1024, sb->result_fhs[i]); + if ( rval == NULL ) { + if ( !feof(sb->result_fhs[i]) ) { + ERROR("fgets() failed: %s\n", + strerror(errno)); + } + continue; + } + + chomp(results); + + n = strtol(results, &eptr, 10); + if ( eptr == results ) { + if ( strlen(results) > 0 ) { + ERROR("Invalid result '%s'\n", results); + } + } else { + sb->n_indexable += atoi(results); + sb->n_processed++; + } + + /* Send next filename */ + nextImage = get_pattern(fh, &use_this_one_instead, + config_basename, prefix); + + free(sb->last_filename[i]); + sb->last_filename[i] = nextImage; + + if ( nextImage == NULL ) { + /* No more images */ + r = write(sb->filename_pipes[i], "\n", 1); + if ( r < 0 ) { + ERROR("Write pipe\n"); + } + } else { + r = write(sb->filename_pipes[i], nextImage, + strlen(nextImage)); + r -= write(sb->filename_pipes[i], "\n", 1); + if ( r < 0 ) { + ERROR("write pipe\n"); + } + } + + } + unlock_sandbox(sb); + + /* Update progress */ + lock_sandbox(sb); + tNow = get_monotonic_seconds(); + if ( tNow >= sb->t_last_stats+STATS_EVERY_N_SECONDS ) { + + STATUS("%i out of %i indexed so far," + " %i out of %i since the last message.\n", + sb->n_indexable, sb->n_processed, + sb->n_indexable - sb->n_indexable_last_stats, + sb->n_processed - sb->n_processed_last_stats); + + sb->n_indexable_last_stats = sb->n_indexable; + sb->n_processed_last_stats = sb->n_processed; + sb->t_last_stats = tNow; + + } + unlock_sandbox(sb); + + allDone = 1; + lock_sandbox(sb); + for ( i=0; i<n_proc; i++ ) { + if ( sb->running[i] ) allDone = 0; + } + + unlock_sandbox(sb); + + } + + fclose(fh); + + pthread_mutex_destroy(&sb->lock); + + for ( i=0; i<n_proc; i++ ) { + int status; + waitpid(sb->pids[i], &status, 0); + } + + for ( i=0; i<n_proc; i++ ) { + close(sb->filename_pipes[i]); + fclose(sb->result_fhs[i]); + } + + for ( i=0; i<sb->n_proc; i++ ) { + fclose(sb->fhs[i]); + } + free(sb->fhs); + free(sb->filename_pipes); + free(sb->result_fhs); + free(sb->pids); + free(sb->running); + + if ( ofh != stdout ) fclose(ofh); + + STATUS("There were %i images, of which %i could be indexed.\n", + sb->n_processed, sb->n_indexable); + +} diff --git a/src/im-sandbox.h b/src/im-sandbox.h new file mode 100644 index 00000000..8ad9fd90 --- /dev/null +++ b/src/im-sandbox.h @@ -0,0 +1,86 @@ +/* + * im-sandbox.h + * + * Sandbox for indexing + * + * Copyright © 2012 Deutsches Elektronen-Synchrotron DESY, + * a research centre of the Helmholtz Association. + * Copyright © 2012 Richard Kirian + * Copyright © 2012 Lorenzo Galli + * + * Authors: + * 2010-2012 Thomas White <taw@physics.org> + * 2011 Richard Kirian + * 2012 Lorenzo Galli + * 2012 Chunhong Yoon + * + * This file is part of CrystFEL. + * + * CrystFEL is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * CrystFEL is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with CrystFEL. If not, see <http://www.gnu.org/licenses/>. + * + */ + +enum { + PEAK_ZAEF, + PEAK_HDF5, +}; + + +/* Information about the indexing process which is common to all patterns */ +struct index_args +{ + UnitCell *cell; + int config_cmfilter; + int config_noisefilter; + int config_verbose; + int stream_flags; /* What goes into the output? */ + int config_satcorr; + int config_closer; + int config_insane; + int config_bgsub; + float threshold; + float min_gradient; + float min_snr; + double min_int_snr; + struct detector *det; + IndexingMethod *indm; + IndexingPrivate **ipriv; + int peaks; /* Peak detection method */ + int cellr; + float tols[4]; + struct beam_params *beam; + char *element; + char *hdf5_peak_path; + double ir_inn; + double ir_mid; + double ir_out; + struct copy_hdf5_field *copyme; +}; + + + + +/* Information about the indexing process for one pattern */ +struct pattern_args +{ + /* "Input" */ + char *filename; + + /* "Output" */ + int indexable; +}; + +extern void create_sandbox(struct index_args *iargs, int n_proc, char *prefix, + int config_basename, FILE *fh, + char *use_this_one_instead, FILE *ofh); diff --git a/src/indexamajig.c b/src/indexamajig.c index 1ff392e9..294bce1d 100644 --- a/src/indexamajig.c +++ b/src/indexamajig.c @@ -12,6 +12,7 @@ * 2010-2012 Thomas White <taw@physics.org> * 2011 Richard Kirian * 2012 Lorenzo Galli + * 2012 Chunhong Yoon * * This file is part of CrystFEL. * @@ -43,7 +44,6 @@ #include <getopt.h> #include <hdf5.h> #include <gsl/gsl_errno.h> -#include <pthread.h> #ifdef HAVE_CLOCK_GETTIME #include <time.h> @@ -63,81 +63,7 @@ #include "stream.h" #include "reflist-utils.h" - -/* Write statistics at APPROXIMATELY this interval */ -#define STATS_EVERY_N_SECONDS (5) - - -enum { - PEAK_ZAEF, - PEAK_HDF5, -}; - - -/* Information about the indexing process which is common to all patterns */ -struct static_index_args -{ - UnitCell *cell; - int config_cmfilter; - int config_noisefilter; - int config_verbose; - int stream_flags; /* What goes into the output? */ - int config_satcorr; - int config_closer; - int config_insane; - int config_bgsub; - float threshold; - float min_gradient; - float min_snr; - double min_int_snr; - struct detector *det; - IndexingMethod *indm; - IndexingPrivate **ipriv; - int peaks; /* Peak detection method */ - int cellr; - float tols[4]; - struct beam_params *beam; - const char *element; - const char *hdf5_peak_path; - double ir_inn; - double ir_mid; - double ir_out; - - /* Output stream */ - pthread_mutex_t *output_mutex; /* Protects the output stream */ - FILE *ofh; - const struct copy_hdf5_field *copyme; -}; - - -/* Information about the indexing process for one pattern */ -struct index_args -{ - /* "Input" */ - char *filename; - struct static_index_args static_args; - - /* "Output" */ - int indexable; -}; - - -/* Information needed to choose the next task and dispatch it */ -struct queue_args -{ - FILE *fh; - char *prefix; - int config_basename; - struct static_index_args static_args; - - char *use_this_one_instead; - - int n_indexable; - int n_processed; - int n_indexable_last_stats; - int n_processed_last_stats; - int t_last_stats; -}; +#include "im-sandbox.h" static void show_help(const char *s) @@ -234,294 +160,10 @@ static void show_help(const char *s) " least 10%% of the located peaks.\n" " --no-bg-sub Don't subtract local background estimates from\n" " integrated intensities.\n" -"\n" -"\nYou can tune the CPU affinities for enhanced performance on NUMA machines:\n" -"\n" -" --cpus=<n> Specify number of CPUs. This is NOT the same as\n" -" giving the number of analyses to run in parallel.\n" -" --cpugroup=<n> Batch threads in groups of this size.\n" -" --cpuoffset=<n> Start using CPUs at this group number.\n" ); } -static void process_image(void *pp, int cookie) -{ - struct index_args *pargs = pp; - struct hdfile *hdfile; - struct image image; - float *data_for_measurement; - size_t data_size; - char *filename = pargs->filename; - UnitCell *cell = pargs->static_args.cell; - int config_cmfilter = pargs->static_args.config_cmfilter; - int config_noisefilter = pargs->static_args.config_noisefilter; - int config_verbose = pargs->static_args.config_verbose; - IndexingMethod *indm = pargs->static_args.indm; - struct beam_params *beam = pargs->static_args.beam; - - image.features = NULL; - image.data = NULL; - image.flags = NULL; - image.indexed_cell = NULL; - image.reflections = NULL; - image.id = cookie; - image.filename = filename; - image.det = copy_geom(pargs->static_args.det); - image.copyme = pargs->static_args.copyme; - image.beam = beam; - - pargs->indexable = 0; - - hdfile = hdfile_open(filename); - if ( hdfile == NULL ) return; - - if ( pargs->static_args.element != NULL ) { - - int r; - r = hdfile_set_image(hdfile, pargs->static_args.element); - if ( r ) { - ERROR("Couldn't select path '%s'\n", - pargs->static_args.element); - hdfile_close(hdfile); - return; - } - - } else { - - int r; - r = hdfile_set_first_image(hdfile, "/"); - if ( r ) { - ERROR("Couldn't select first path\n"); - hdfile_close(hdfile); - return; - } - - } - - hdf5_read(hdfile, &image, pargs->static_args.config_satcorr); - - if ( (image.width != image.det->max_fs+1) - || (image.height != image.det->max_ss+1) ) - { - ERROR("Image size doesn't match geometry size" - " - rejecting image.\n"); - ERROR("Image size: %i,%i. Geometry size: %i,%i\n", - image.width, image.height, - image.det->max_fs+1, image.det->max_ss+1); - hdfile_close(hdfile); - free_detector_geometry(image.det); - return; - } - - if ( image.lambda < 0.0 ) { - if ( beam != NULL ) { - ERROR("Using nominal photon energy of %.2f eV\n", - beam->photon_energy); - image.lambda = ph_en_to_lambda( - eV_to_J(beam->photon_energy)); - } else { - ERROR("No wavelength in file, so you need to give " - "a beam parameters file with -b.\n"); - hdfile_close(hdfile); - free_detector_geometry(image.det); - return; - } - } - fill_in_values(image.det, hdfile); - - if ( config_cmfilter ) { - filter_cm(&image); - } - - /* Take snapshot of image after CM subtraction but before - * the aggressive noise filter. */ - data_size = image.width*image.height*sizeof(float); - data_for_measurement = malloc(data_size); - - if ( config_noisefilter ) { - filter_noise(&image, data_for_measurement); - } else { - memcpy(data_for_measurement, image.data, data_size); - } - - switch ( pargs->static_args.peaks ) - { - case PEAK_HDF5 : - /* Get peaks from HDF5 */ - if ( get_peaks(&image, hdfile, - pargs->static_args.hdf5_peak_path) ) - { - ERROR("Failed to get peaks from HDF5 file.\n"); - } - break; - - case PEAK_ZAEF : - search_peaks(&image, pargs->static_args.threshold, - pargs->static_args.min_gradient, - pargs->static_args.min_snr, - pargs->static_args.ir_inn, - pargs->static_args.ir_mid, - pargs->static_args.ir_out); - break; - } - - /* Get rid of noise-filtered version at this point - * - it was strictly for the purposes of peak detection. */ - free(image.data); - image.data = data_for_measurement; - - /* Calculate orientation matrix (by magic) */ - image.div = beam->divergence; - image.bw = beam->bandwidth; - image.profile_radius = 0.0001e9; - index_pattern(&image, cell, indm, pargs->static_args.cellr, - config_verbose, pargs->static_args.ipriv, - pargs->static_args.config_insane, - pargs->static_args.tols); - - if ( image.indexed_cell != NULL ) { - - pargs->indexable = 1; - - image.reflections = find_intersections(&image, - image.indexed_cell); - - if ( image.reflections != NULL ) { - - integrate_reflections(&image, - pargs->static_args.config_closer, - pargs->static_args.config_bgsub, - pargs->static_args.min_int_snr, - pargs->static_args.ir_inn, - pargs->static_args.ir_mid, - pargs->static_args.ir_out); - - } - - } else { - - image.reflections = NULL; - - } - - pthread_mutex_lock(pargs->static_args.output_mutex); - write_chunk(pargs->static_args.ofh, &image, hdfile, - pargs->static_args.stream_flags); - pthread_mutex_unlock(pargs->static_args.output_mutex); - - /* Only free cell if found */ - cell_free(image.indexed_cell); - - reflist_free(image.reflections); - free(image.data); - if ( image.flags != NULL ) free(image.flags); - image_feature_list_free(image.features); - hdfile_close(hdfile); - free_detector_geometry(image.det); -} - - -static void *get_image(void *qp) -{ - char *line; - struct index_args *pargs; - char *rval; - struct queue_args *qargs = qp; - - /* Initialise new task arguments */ - pargs = malloc(sizeof(struct index_args)); - memcpy(&pargs->static_args, &qargs->static_args, - sizeof(struct static_index_args)); - - /* Get the next filename */ - if ( qargs->use_this_one_instead != NULL ) { - - line = qargs->use_this_one_instead; - qargs->use_this_one_instead = NULL; - - } else { - - line = malloc(1024*sizeof(char)); - rval = fgets(line, 1023, qargs->fh); - if ( rval == NULL ) { - free(pargs); - free(line); - return NULL; - } - chomp(line); - - } - - if ( qargs->config_basename ) { - char *tmp; - tmp = safe_basename(line); - free(line); - line = tmp; - } - - pargs->filename = malloc(strlen(qargs->prefix)+strlen(line)+1); - - snprintf(pargs->filename, 1023, "%s%s", qargs->prefix, line); - - free(line); - - return pargs; -} - - -#ifdef HAVE_CLOCK_GETTIME - -static time_t get_monotonic_seconds() -{ - struct timespec tp; - clock_gettime(CLOCK_MONOTONIC, &tp); - return tp.tv_sec; -} - -#else - -/* Fallback version of the above. The time according to gettimeofday() is not - * monotonic, so measuring intervals based on it will screw up if there's a - * timezone change (e.g. daylight savings) while the program is running. */ -static time_t get_monotonic_seconds() -{ - struct timeval tp; - gettimeofday(&tp, NULL); - return tp.tv_sec; -} - -#endif - -static void finalise_image(void *qp, void *pp) -{ - struct queue_args *qargs = qp; - struct index_args *pargs = pp; - time_t monotonic_seconds; - - qargs->n_indexable += pargs->indexable; - qargs->n_processed++; - - monotonic_seconds = get_monotonic_seconds(); - if ( monotonic_seconds >= qargs->t_last_stats+STATS_EVERY_N_SECONDS ) { - - STATUS("%i out of %i indexed so far," - " %i out of %i since the last message.\n", - qargs->n_indexable, qargs->n_processed, - qargs->n_indexable - qargs->n_indexable_last_stats, - qargs->n_processed - qargs->n_processed_last_stats); - - qargs->n_processed_last_stats = qargs->n_processed; - qargs->n_indexable_last_stats = qargs->n_indexable; - qargs->t_last_stats = monotonic_seconds; - - } - - free(pargs->filename); - free(pargs); -} - - static int parse_cell_reduction(const char *scellr, int *err, int *reduction_needs_cell) { @@ -554,7 +196,6 @@ int main(int argc, char *argv[]) FILE *fh; FILE *ofh; char *rval = NULL; - int n_images; int config_noindex = 0; int config_cmfilter = 0; int config_noisefilter = 0; @@ -585,19 +226,15 @@ int main(int argc, char *argv[]) float tols[4] = {5.0, 5.0, 5.0, 1.5}; /* a,b,c,angles (%,%,%,deg) */ int cellr; int peaks; - int nthreads = 1; - pthread_mutex_t output_mutex = PTHREAD_MUTEX_INITIALIZER; + int n_proc = 1; char *prepare_line; char prepare_filename[1024]; - struct queue_args qargs; + char *use_this_one_instead; + struct index_args iargs; struct beam_params *beam = NULL; char *element = NULL; double nominal_photon_energy; int stream_flags = STREAM_INTEGRATED; - int cpu_num = 0; - int cpu_groupsize = 1; - int cpu_offset = 0; - char *endptr; char *hdf5_peak_path = NULL; struct copy_hdf5_field *copyme; char *intrad = NULL; @@ -684,7 +321,7 @@ int main(int argc, char *argv[]) break; case 'j' : - nthreads = atoi(optarg); + n_proc = atoi(optarg); break; case 'g' : @@ -726,39 +363,10 @@ int main(int argc, char *argv[]) break; case 6 : - cpu_num = strtol(optarg, &endptr, 10); - if ( !( (optarg[0] != '\0') && (endptr[0] == '\0') ) ) { - ERROR("Invalid number of CPUs ('%s')\n", - optarg); - return 1; - } - break; - case 7 : - cpu_groupsize = strtol(optarg, &endptr, 10); - if ( !( (optarg[0] != '\0') && (endptr[0] == '\0') ) ) { - ERROR("Invalid CPU group size ('%s')\n", - optarg); - return 1; - } - if ( cpu_groupsize < 1 ) { - ERROR("CPU group size cannot be" - " less than 1.\n"); - return 1; - } - break; - case 8 : - cpu_offset = strtol(optarg, &endptr, 10); - if ( !( (optarg[0] != '\0') && (endptr[0] == '\0') ) ) { - ERROR("Invalid CPU offset ('%s')\n", - optarg); - return 1; - } - if ( cpu_offset < 0 ) { - ERROR("CPU offset must be positive.\n"); - return 1; - } + ERROR("The options --cpus, --cpugroup and --cpuoffset" + " are no longer used by indexamajig.\n"); break; case 9 : @@ -795,12 +403,6 @@ int main(int argc, char *argv[]) } - if ( (cpu_num > 0) && (cpu_num % cpu_groupsize != 0) ) { - ERROR("Number of CPUs must be divisible by" - " the CPU group size.\n"); - return 1; - } - if ( filename == NULL ) { filename = strdup("-"); } @@ -816,18 +418,15 @@ int main(int argc, char *argv[]) free(filename); if ( outfile == NULL ) { - outfile = strdup("-"); - } - if ( strcmp(outfile, "-") == 0 ) { ofh = stdout; } else { ofh = fopen(outfile, "w"); + if ( ofh == NULL ) { + ERROR("Failed to open output file '%s'\n", outfile); + return 1; + } + free(outfile); } - if ( ofh == NULL ) { - ERROR("Failed to open output file '%s'\n", outfile); - return 1; - } - free(outfile); if ( hdf5_peak_path == NULL ) { hdf5_peak_path = strdup("/processing/hitfinder/peakinfo"); @@ -860,8 +459,8 @@ int main(int argc, char *argv[]) } } - if ( nthreads == 0 ) { - ERROR("Invalid number of threads.\n"); + if ( n_proc == 0 ) { + ERROR("Invalid number of processes.\n"); return 1; } @@ -929,6 +528,7 @@ int main(int argc, char *argv[]) ERROR("Invalid parameters for '--int-radius'\n"); return 1; } + free(intrad); } else { STATUS("WARNING: You did not specify --int-radius.\n"); STATUS("WARNING: I will use the default values, which are" @@ -967,22 +567,20 @@ int main(int argc, char *argv[]) } else { STATUS("No beam parameters file was given, so I'm taking the" " nominal photon energy to be 2 keV.\n"); + ERROR("I'm also going to assume 1 ADU per photon, which is"); + ERROR(" almost certainly wrong. Peak sigmas will be" + " incorrect.\n"); nominal_photon_energy = 2000.0; } - if ( beam == NULL ) { - ERROR("Warning: no beam parameters file.\n"); - ERROR("I'm going to assume 1 ADU per photon, which is almost"); - ERROR(" certainly wrong. Peak sigmas will be incorrect.\n"); - } - /* Get first filename and use it to set up the indexing */ - prepare_line = malloc(1024*sizeof(char)); + prepare_line = malloc(1024); rval = fgets(prepare_line, 1023, fh); if ( rval == NULL ) { ERROR("Failed to get filename to prepare indexing.\n"); return 1; } + use_this_one_instead = strdup(prepare_line); chomp(prepare_line); if ( config_basename ) { char *tmp; @@ -991,7 +589,7 @@ int main(int argc, char *argv[]) prepare_line = tmp; } snprintf(prepare_filename, 1023, "%s%s", prefix, prepare_line); - qargs.use_this_one_instead = prepare_line; + free(prepare_line); /* Prepare the indexer */ if ( indm != NULL ) { @@ -1007,67 +605,41 @@ int main(int argc, char *argv[]) gsl_set_error_handler_off(); - qargs.static_args.cell = cell; - qargs.static_args.config_cmfilter = config_cmfilter; - qargs.static_args.config_noisefilter = config_noisefilter; - qargs.static_args.config_verbose = config_verbose; - qargs.static_args.config_satcorr = config_satcorr; - qargs.static_args.config_closer = config_closer; - qargs.static_args.config_insane = config_insane; - qargs.static_args.config_bgsub = config_bgsub; - qargs.static_args.cellr = cellr; - qargs.static_args.tols[0] = tols[0]; - qargs.static_args.tols[1] = tols[1]; - qargs.static_args.tols[2] = tols[2]; - qargs.static_args.tols[3] = tols[3]; - qargs.static_args.threshold = threshold; - qargs.static_args.min_gradient = min_gradient; - qargs.static_args.min_snr = min_snr; - qargs.static_args.min_int_snr = min_int_snr; - qargs.static_args.det = det; - qargs.static_args.indm = indm; - qargs.static_args.ipriv = ipriv; - qargs.static_args.peaks = peaks; - qargs.static_args.output_mutex = &output_mutex; - qargs.static_args.ofh = ofh; - qargs.static_args.beam = beam; - qargs.static_args.element = element; - qargs.static_args.stream_flags = stream_flags; - qargs.static_args.hdf5_peak_path = hdf5_peak_path; - qargs.static_args.copyme = copyme; - qargs.static_args.ir_inn = ir_inn; - qargs.static_args.ir_mid = ir_mid; - qargs.static_args.ir_out = ir_out; - - qargs.fh = fh; - qargs.prefix = prefix; - qargs.config_basename = config_basename; - qargs.n_indexable = 0; - qargs.n_processed = 0; - qargs.n_indexable_last_stats = 0; - qargs.n_processed_last_stats = 0; - qargs.t_last_stats = get_monotonic_seconds(); - - n_images = run_threads(nthreads, process_image, get_image, - finalise_image, &qargs, 0, - cpu_num, cpu_groupsize, cpu_offset); - - cleanup_indexing(ipriv); - - free(indm); - free(ipriv); + /* Static worker args */ + iargs.cell = cell; + iargs.config_cmfilter = config_cmfilter; + iargs.config_noisefilter = config_noisefilter; + iargs.config_verbose = config_verbose; + iargs.config_satcorr = config_satcorr; + iargs.config_closer = config_closer; + iargs.config_insane = config_insane; + iargs.config_bgsub = config_bgsub; + iargs.cellr = cellr; + iargs.tols[0] = tols[0]; + iargs.tols[1] = tols[1]; + iargs.tols[2] = tols[2]; + iargs.tols[3] = tols[3]; + iargs.threshold = threshold; + iargs.min_gradient = min_gradient; + iargs.min_snr = min_snr; + iargs.min_int_snr = min_int_snr; + iargs.det = det; + iargs.indm = indm; + iargs.ipriv = ipriv; + iargs.peaks = peaks; + iargs.beam = beam; + iargs.element = element; + iargs.stream_flags = stream_flags; + iargs.hdf5_peak_path = hdf5_peak_path; + iargs.copyme = copyme; + iargs.ir_inn = ir_inn; + iargs.ir_mid = ir_mid; + iargs.ir_out = ir_out; + + create_sandbox(&iargs, n_proc, prefix, config_basename, fh, + use_this_one_instead, ofh); + free(prefix); - free_detector_geometry(det); - free(beam); - free(element); - free(hdf5_peak_path); - free_copy_hdf5_field_list(copyme); - cell_free(cell); - if ( fh != stdin ) fclose(fh); - if ( ofh != stdout ) fclose(ofh); - - STATUS("There were %i images, of which %i could be indexed.\n", - n_images, qargs.n_indexable); return 0; } diff --git a/src/partial_sim.c b/src/partial_sim.c index 899b9154..3cfb94a2 100644 --- a/src/partial_sim.c +++ b/src/partial_sim.c @@ -178,8 +178,6 @@ static void show_help(const char *s) " -c, --cnoise=<val> Add random noise, with a flat distribution, to the\n" " reciprocal lattice vector components given in the\n" " stream, with maximum error +/- <val> percent.\n" -" --pgraph=<file> Write reflection counts and partiality statistics\n" -" to <file>.\n" "\n" ); } @@ -549,8 +547,6 @@ int main(int argc, char *argv[]) if ( fh != NULL ) { - fprintf(fh, "1/d_nm #refl pmean pmax\n"); - for ( i=0; i<NBINS; i++ ) { double rcen; |