diff options
Diffstat (limited to 'src/indexamajig.c')
-rw-r--r-- | src/indexamajig.c | 880 |
1 files changed, 534 insertions, 346 deletions
diff --git a/src/indexamajig.c b/src/indexamajig.c index efbf4b44..9ddbb6ea 100644 --- a/src/indexamajig.c +++ b/src/indexamajig.c @@ -1,32 +1,12 @@ /* - * indexamajig.c + * indexamajigFork.c * * Index patterns, output hkl+intensity etc. * - * Copyright © 2012 Deutsches Elektronen-Synchrotron DESY, - * a research centre of the Helmholtz Association. - * Copyright © 2012 Richard Kirian - * Copyright © 2012 Lorenzo Galli + * (c) 2006-2012 Thomas White <taw@physics.org> + * (c) 2012- Chunhong Yoon <chun.hong.yoon@desy.de> * - * Authors: - * 2010-2012 Thomas White <taw@physics.org> - * 2011 Richard Kirian - * 2012 Lorenzo Galli - * - * This file is part of CrystFEL. - * - * CrystFEL is free software: you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation, either version 3 of the License, or - * (at your option) any later version. - * - * CrystFEL is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with CrystFEL. If not, see <http://www.gnu.org/licenses/>. + * Part of CrystFEL - crystallography with a FEL * */ @@ -44,6 +24,8 @@ #include <hdf5.h> #include <gsl/gsl_errno.h> #include <pthread.h> +#include <sys/wait.h> +#include <fcntl.h> #ifdef HAVE_CLOCK_GETTIME #include <time.h> @@ -51,22 +33,46 @@ #include <sys/time.h> #endif -#include "utils.h" -#include "hdf5-file.h" -#include "index.h" -#include "peaks.h" -#include "detector.h" -#include "filters.h" -#include "thread-pool.h" -#include "beam-parameters.h" -#include "geometry.h" -#include "stream.h" -#include "reflist-utils.h" +#include <crystfel/utils.h> +#include <crystfel/hdf5-file.h> +#include <crystfel/index.h> +#include <crystfel/peaks.h> +#include <crystfel/detector.h> +#include <crystfel/filters.h> +#include <crystfel/thread-pool.h> +#include <crystfel/beam-parameters.h> +#include <crystfel/geometry.h> +#include <crystfel/stream.h> +#include <crystfel/reflist-utils.h> /* Write statistics at APPROXIMATELY this interval */ -#define STATS_EVERY_N_SECONDS (5) +#define STATS_EVERY_N_SECONDS (2) + +#define LINE_LENGTH 1024 +#define BUFSIZE 64 + +#ifdef HAVE_CLOCK_GETTIME +static double get_time() +{ + struct timespec tp; + clock_gettime(CLOCK_MONOTONIC, &tp); + double sec = (double) tp.tv_sec+ (double) tp.tv_nsec/1000000000; + return sec; //nano resolution +} +#else +/* Fallback version of the above. The time according to gettimeofday() is not + * monotonic, so measuring intervals based on it will screw up if there's a + * timezone change (e.g. daylight savings) while the program is running. */ +static double get_time() +{ + struct timeval tp; + gettimeofday(&tp, NULL); + double sec = (double) tp.tv_sec+ (double) tp.tv_usec/1000000; + return sec; //micro resolution +} +#endif enum { PEAK_ZAEF, @@ -82,6 +88,7 @@ struct static_index_args int config_noisefilter; int config_verbose; int stream_flags; /* What goes into the output? */ + int config_polar; int config_satcorr; int config_closer; int config_insane; @@ -102,11 +109,14 @@ struct static_index_args double ir_inn; double ir_mid; double ir_out; - + /* Output stream */ pthread_mutex_t *output_mutex; /* Protects the output stream */ FILE *ofh; + + char *outfile; const struct copy_hdf5_field *copyme; + int nProcesses; }; @@ -136,9 +146,45 @@ struct queue_args int n_processed; int n_indexable_last_stats; int n_processed_last_stats; - int t_last_stats; + + int n_indexableTotal; + int n_processedTotal; + int n_indexable_last_statsTotal; + int n_processed_last_statsTotal; + + int nPerProcess; + int updateReader; + + double t_last_stats; + }; +// Count number of patterns in .lst +int count_patterns(FILE *fh){ + char *rval; + int n_total = 0; + do{ + char line[LINE_LENGTH]; + rval = fgets(line,LINE_LENGTH-1,fh); + if (rval != NULL){ + n_total++; + } + }while(rval!=NULL); + if (ferror(fh)) { + printf("Read error\n"); + return -1; + } + return n_total; +} + +// Assign a batch number to a process +int getBatchNum(int pid[], int length){ + int i, id = 0; + for (i=0; i<length; i++) { + id += ((pid[i]>0)*pow(2,i)); // child = 1, parent = 0 + } + return id; +} static void show_help(const char *s) { @@ -189,34 +235,35 @@ static void show_help(const char *s) "The default is '--record=integrated'.\n" "\n\n" "For more control over the process, you might need:\n\n" -" --cell-reduction=<m> Use <m> as the cell reduction method. Choose from:\n" -" none : no matching, just use the raw cell.\n" -" reduce : full cell reduction.\n" -" compare : match by at most changing the order of\n" -" the indices.\n" -" compare_ab : compare 'a' and 'b' lengths only.\n" -" --tolerance=<tol> Set the tolerances for cell reduction.\n" -" Default: 5,5,5,1.5.\n" -" --filter-cm Perform common-mode noise subtraction on images\n" -" before proceeding. Intensities will be extracted\n" -" from the image as it is after this processing.\n" -" --filter-noise Apply an aggressive noise filter which sets all\n" -" pixels in each 3x3 region to zero if any of them\n" -" have negative values. Intensity measurement will\n" -" be performed on the image as it was before this.\n" -" --no-sat-corr Don't correct values of saturated peaks using a\n" -" table included in the HDF5 file.\n" -" --threshold=<n> Only accept peaks above <n> ADU. Default: 800.\n" -" --min-gradient=<n> Minimum gradient for Zaefferer peak search.\n" -" Default: 100,000.\n" -" --min-snr=<n> Minimum signal-to-noise ratio for peaks.\n" -" Default: 5.\n" -" --min-integration-snr=<n> Minimum signal-to-noise ratio for peaks\n" -" during integration. Default: -infinity.\n" -" --int-radius=<r> Set the integration radii. Default: 4,5,7.\n" -"-e, --image=<element> Use this image from the HDF5 file.\n" -" Example: /data/data0.\n" -" Default: The first one found.\n" +" --cell-reduction=<m> Use <m> as the cell reduction method. Choose from:\n" +" none : no matching, just use the raw cell.\n" +" reduce : full cell reduction.\n" +" compare : match by at most changing the order of\n" +" the indices.\n" +" compare_ab : compare 'a' and 'b' lengths only.\n" +" --tolerance=<a,b,c,angl> Set the tolerance for a,b,c axis (in %%)\n" +" and for the angles (in deg) when reducing\n" +" or comparing (default is 5%% and 1.5deg)\n" +" --filter-cm Perform common-mode noise subtraction on images\n" +" before proceeding. Intensities will be extracted\n" +" from the image as it is after this processing.\n" +" --filter-noise Apply an aggressive noise filter which sets all\n" +" pixels in each 3x3 region to zero if any of them\n" +" have negative values. Intensity measurement will\n" +" be performed on the image as it was before this.\n" +" --unpolarized Don't correct for the polarisation of the X-rays.\n" +" --no-sat-corr Don't correct values of saturated peaks using a\n" +" table included in the HDF5 file.\n" +" --threshold=<n> Only accept peaks above <n> ADU. Default: 800.\n" +" --min-gradient=<n> Minimum gradient for Zaefferer peak search.\n" +" Default: 100,000.\n" +" --min-snr=<n> Minimum signal-to-noise ratio for peaks.\n" +" Default: 5.\n" +" --min-integration-snr=<n> Minimum signal-to-noise ratio for peaks\n" +" during integration. Default: -infinity.\n" +" -e, --image=<element> Use this image from the HDF5 file.\n" +" Example: /data/data0.\n" +" Default: The first one found.\n" "\n" "\nFor time-resolved stuff, you might want to use:\n\n" " --copy-hdf5-field <f> Copy the value of field <f> into the stream. You\n" @@ -245,289 +292,291 @@ static void show_help(const char *s) ); } +int readUpdate(void *qp, int fd_pipe[][2], int finish){ + struct queue_args *qargs = qp; + int i,j; + int rFlag; + int pipesOpen = 0; + char bufferR[BUFSIZE]; + memset(bufferR, 0, BUFSIZE); + int numFields = 2; + + for (i=0; i<qargs->static_args.nProcesses; i++){ + if (finish && i == qargs->updateReader) { + // do nothing + }else{ + rFlag = read(fd_pipe[i][0], bufferR, BUFSIZE-1); // wait till something to read + if (rFlag != 0){ + char * pch; + char delims[] = "#"; + pch = strtok (bufferR, delims); + for (j=0;j<numFields;j++) + { + switch (j){ + case 0: + qargs->n_indexableTotal += atoi(pch); + break; + case 1: + qargs->n_processedTotal += atoi(pch); + break; + } + pch = strtok (NULL, delims); + } + memset(bufferR, 0, BUFSIZE); + pipesOpen++; + } + } + } + STATUS("%i out of %i indexed so far," + " %i out of %i since the last message.\n\n", + qargs->n_indexableTotal, qargs->n_processedTotal, + qargs->n_indexableTotal-qargs->n_indexable_last_statsTotal, + qargs->n_processedTotal-qargs->n_processed_last_statsTotal); -static void process_image(void *pp, int cookie) -{ - struct index_args *pargs = pp; - struct hdfile *hdfile; - struct image image; + qargs->n_indexable_last_statsTotal = qargs->n_indexableTotal; + qargs->n_processed_last_statsTotal = qargs->n_processedTotal; + + return pipesOpen; +} + +// Manipulate image +static void process_image(char ***array, int batch, void *qp, int fd_pipe[][2]) { + + struct queue_args *qargs = qp; float *data_for_measurement; size_t data_size; - char *filename = pargs->filename; - UnitCell *cell = pargs->static_args.cell; - int config_cmfilter = pargs->static_args.config_cmfilter; - int config_noisefilter = pargs->static_args.config_noisefilter; - int config_verbose = pargs->static_args.config_verbose; - IndexingMethod *indm = pargs->static_args.indm; - struct beam_params *beam = pargs->static_args.beam; - - image.features = NULL; - image.data = NULL; - image.flags = NULL; - image.indexed_cell = NULL; - image.id = cookie; - image.filename = filename; - image.det = copy_geom(pargs->static_args.det); - image.copyme = pargs->static_args.copyme; - image.beam = beam; - - if ( beam == NULL ) { - ERROR("Warning: no beam parameters file.\n"); - ERROR("I'm going to assume 1 ADU per photon, which is almost"); - ERROR(" certainly wrong. Peak sigmas will be incorrect.\n"); - } - - pargs->indexable = 0; + UnitCell *cell = qargs->static_args.cell; + int config_cmfilter = qargs->static_args.config_cmfilter; + int config_noisefilter = qargs->static_args.config_noisefilter; + int config_verbose = qargs->static_args.config_verbose; + //int config_polar = qargs->static_args.config_polar; + IndexingMethod *indm = qargs->static_args.indm; + struct beam_params *beam = qargs->static_args.beam; + + int i; + int r, check; + struct hdfile *hdfile; + char *outfile = qargs->static_args.outfile; + int nPerBatch = qargs->nPerProcess; + + for (i=0; i<nPerBatch; i++) { + struct image image; + image.features = NULL; + image.data = NULL; + image.flags = NULL; + image.indexed_cell = NULL; + image.det = copy_geom(qargs->static_args.det); + image.copyme = qargs->static_args.copyme; + image.beam = beam; + image.id = batch; // MUST SET ID FOR MOSFLM TO WORK PROPERLY + + if ( beam == NULL ) { + ERROR("Warning: no beam parameters file.\n"); + ERROR("I'm going to assume 1 ADU per photon, which is almost"); + ERROR(" certainly wrong. Peak sigmas will be incorrect.\n"); + } - hdfile = hdfile_open(filename); - if ( hdfile == NULL ) return; + //pargs->indexable = 0; - if ( pargs->static_args.element != NULL ) { + char *filename = NULL; + char *line = array[batch][i]; + chomp(line); +//printf("%d ***************%s\n",batch,line); + filename = malloc(strlen(qargs->prefix)+strlen(line)+1); + snprintf(filename,LINE_LENGTH-1,"%s%s",qargs->prefix,line); //maximum print length + free(line); + image.filename = filename; +//printf("%d ***************%s\n",batch,filename); + hdfile = hdfile_open(filename); + if (hdfile == NULL) return; - int r; - r = hdfile_set_image(hdfile, pargs->static_args.element); - if ( r ) { - ERROR("Couldn't select path '%s'\n", - pargs->static_args.element); + r = hdfile_set_first_image(hdfile, "/"); // Need this to read hdf5 files + if (r) { + ERROR("Couldn't select first path\n"); hdfile_close(hdfile); return; } - } else { - - int r; - r = hdfile_set_first_image(hdfile, "/"); - if ( r ) { - ERROR("Couldn't select first path\n"); + check = hdf5_read(hdfile,&image,1); + if (check == 1){ hdfile_close(hdfile); return; } - } - - hdf5_read(hdfile, &image, pargs->static_args.config_satcorr); - - if ( (image.width != image.det->max_fs+1) - || (image.height != image.det->max_ss+1) ) - { - ERROR("Image size doesn't match geometry size" - " - rejecting image.\n"); - ERROR("Image size: %i,%i. Geometry size: %i,%i\n", - image.width, image.height, - image.det->max_fs+1, image.det->max_ss+1); - hdfile_close(hdfile); - free_detector_geometry(image.det); - return; - } - - if ( image.lambda < 0.0 ) { - if ( beam != NULL ) { - ERROR("Using nominal photon energy of %.2f eV\n", - beam->photon_energy); - image.lambda = ph_en_to_lambda( - eV_to_J(beam->photon_energy)); - } else { - ERROR("No wavelength in file, so you need to give " - "a beam parameters file with -b.\n"); + if ( (image.width != image.det->max_fs+1) + || (image.height != image.det->max_ss+1) ) + { + ERROR("Image size doesn't match geometry size" + " - rejecting image.\n"); + ERROR("Image size: %i,%i. Geometry size: %i,%i\n", + image.width, image.height, + image.det->max_fs+1, image.det->max_ss+1); hdfile_close(hdfile); free_detector_geometry(image.det); return; } - } - fill_in_values(image.det, hdfile); - if ( config_cmfilter ) { - filter_cm(&image); - } + if ( image.lambda < 0.0 ) { + if ( beam != NULL ) { + ERROR("Using nominal photon energy of %.2f eV\n", + beam->photon_energy); + image.lambda = ph_en_to_lambda( + eV_to_J(beam->photon_energy)); + } else { + ERROR("No wavelength in file, so you need to give " + "a beam parameters file with -b.\n"); + hdfile_close(hdfile); + free_detector_geometry(image.det); + return; + } + } + fill_in_values(image.det, hdfile); - /* Take snapshot of image after CM subtraction but before - * the aggressive noise filter. */ - data_size = image.width*image.height*sizeof(float); - data_for_measurement = malloc(data_size); + if ( config_cmfilter ) { + filter_cm(&image); + } - if ( config_noisefilter ) { - filter_noise(&image, data_for_measurement); - } else { - memcpy(data_for_measurement, image.data, data_size); - } + /* Take snapshot of image after CM subtraction but before + * the aggressive noise filter. */ + data_size = image.width*image.height*sizeof(float); + data_for_measurement = malloc(data_size); - switch ( pargs->static_args.peaks ) - { - case PEAK_HDF5 : - /* Get peaks from HDF5 */ - if ( get_peaks(&image, hdfile, - pargs->static_args.hdf5_peak_path) ) - { - ERROR("Failed to get peaks from HDF5 file.\n"); + if ( config_noisefilter ) { + filter_noise(&image, data_for_measurement); + } else { + memcpy(data_for_measurement, image.data, data_size); } - break; + switch ( qargs->static_args.peaks ) + { + case PEAK_HDF5 : + /* Get peaks from HDF5 */ + if ( get_peaks(&image, hdfile, + qargs->static_args.hdf5_peak_path) ) + { + ERROR("Failed to get peaks from HDF5 file.\n"); + } + break; case PEAK_ZAEF : - search_peaks(&image, pargs->static_args.threshold, - pargs->static_args.min_gradient, - pargs->static_args.min_snr, - pargs->static_args.ir_inn, - pargs->static_args.ir_mid, - pargs->static_args.ir_out); - break; - } - - /* Get rid of noise-filtered version at this point - * - it was strictly for the purposes of peak detection. */ - free(image.data); - image.data = data_for_measurement; - - /* Calculate orientation matrix (by magic) */ - image.div = beam->divergence; - image.bw = beam->bandwidth; - image.profile_radius = 0.0001e9; - index_pattern(&image, cell, indm, pargs->static_args.cellr, - config_verbose, pargs->static_args.ipriv, - pargs->static_args.config_insane, - pargs->static_args.tols); - - if ( image.indexed_cell != NULL ) { - - pargs->indexable = 1; - - image.reflections = find_intersections(&image, + search_peaks(&image, qargs->static_args.threshold, + qargs->static_args.min_gradient, + qargs->static_args.min_snr, + qargs->static_args.ir_inn, + qargs->static_args.ir_mid, + qargs->static_args.ir_out); + break; + } + + /* Get rid of noise-filtered version at this point + * - it was strictly for the purposes of peak detection. */ + free(image.data); + image.data = data_for_measurement; + + /* Calculate orientation matrix (by magic) */ + image.div = beam->divergence; + image.bw = beam->bandwidth; + image.profile_radius = 0.0001e9; + + ///// RUN INDEXING HERE ///// + index_pattern(&image, cell, indm, qargs->static_args.cellr, + config_verbose, qargs->static_args.ipriv, + qargs->static_args.config_insane, qargs->static_args.tols); + + if ( image.indexed_cell != NULL ) { + //pargs->indexable = 1; + image.reflections = find_intersections(&image, image.indexed_cell); - - if ( image.reflections != NULL ) { - - integrate_reflections(&image, - pargs->static_args.config_closer, - pargs->static_args.config_bgsub, - pargs->static_args.min_int_snr, - pargs->static_args.ir_inn, - pargs->static_args.ir_mid, - pargs->static_args.ir_out); - + if ( image.reflections != NULL ) { + integrate_reflections(&image, + qargs->static_args.config_closer, + qargs->static_args.config_bgsub, + qargs->static_args.min_int_snr, + qargs->static_args.ir_inn, + qargs->static_args.ir_mid, + qargs->static_args.ir_out); + } + } else { + image.reflections = NULL; } - } else { - - image.reflections = NULL; - - } - - pthread_mutex_lock(pargs->static_args.output_mutex); - write_chunk(pargs->static_args.ofh, &image, hdfile, - pargs->static_args.stream_flags); - pthread_mutex_unlock(pargs->static_args.output_mutex); - - /* Only free cell if found */ - cell_free(image.indexed_cell); - - reflist_free(image.reflections); - free(image.data); - if ( image.flags != NULL ) free(image.flags); - image_feature_list_free(image.features); - hdfile_close(hdfile); - free_detector_geometry(image.det); -} - - -static void *get_image(void *qp) -{ - char *line; - struct index_args *pargs; - char *rval; - struct queue_args *qargs = qp; - - /* Initialise new task arguments */ - pargs = malloc(sizeof(struct index_args)); - memcpy(&pargs->static_args, &qargs->static_args, - sizeof(struct static_index_args)); - - /* Get the next filename */ - if ( qargs->use_this_one_instead != NULL ) { - - line = qargs->use_this_one_instead; - qargs->use_this_one_instead = NULL; - - } else { - - line = malloc(1024*sizeof(char)); - rval = fgets(line, 1023, qargs->fh); - if ( rval == NULL ) { - free(pargs); - free(line); - return NULL; + // Write Lock + struct flock fl = {F_WRLCK, SEEK_SET, 0, 0, 0 }; + int fd; + fl.l_pid = getpid(); + + char *outfilename = NULL; + chomp(outfile); // may not need this +//printf("%d ***************%s\n",batch,outfile); + outfilename = malloc(strlen(outfile)+1); + snprintf(outfilename,LINE_LENGTH-1,"%s",outfile); //maximum print length +//printf("%d ***************%s\n",batch,outfilename); + if ((fd = open(outfilename, O_WRONLY)) == -1) { + perror("Error on opening\n"); + exit(1); + } + + if (fcntl(fd, F_SETLKW, &fl) == -1) { + perror("Error on setting lock wait\n"); + exit(1); + } + + // LOCKED! Write chunk + FILE *fh; +//printf("%d ***************%s\n",batch,outfilename); + fh = fopen(outfilename,"a"); + if (fh == NULL) { + perror("Error inside lock\n"); + } + write_chunk(fh, &image, hdfile, qargs->static_args.stream_flags); + fclose(fh); + + // Unlock stream for other processes + fl.l_type = F_UNLCK; // set to unlock same region + if (fcntl(fd, F_SETLK, &fl) == -1) { + perror("fcntl"); + exit(1); + } + close(fd); + + ///// WRITE UPDATE ///// + double seconds; + qargs->n_indexable += ( image.indexed_cell != NULL ); + qargs->n_processed++; + seconds = get_time(); + if ( seconds >= qargs->t_last_stats+STATS_EVERY_N_SECONDS + || qargs->n_processed == qargs->nPerProcess) { // Must write if finished + + // WRITE PIPE HERE + char bufferW[BUFSIZE]; + memset(bufferW, 0, BUFSIZE); + sprintf(bufferW,"%d#%d", + qargs->n_indexable - qargs->n_indexable_last_stats, + qargs->n_processed - qargs->n_processed_last_stats); + write(fd_pipe[batch][1], bufferW, BUFSIZE-1); + + // Update stats + qargs->n_processed_last_stats = qargs->n_processed; + qargs->n_indexable_last_stats = qargs->n_indexable; + qargs->t_last_stats = seconds; + + ///// READ UPDATE ///// + if (batch == qargs->updateReader) { + readUpdate(qargs,fd_pipe, 0); + } } - chomp(line); - - } - - if ( qargs->config_basename ) { - char *tmp; - tmp = safe_basename(line); - free(line); - line = tmp; - } - - pargs->filename = malloc(strlen(qargs->prefix)+strlen(line)+1); - - snprintf(pargs->filename, 1023, "%s%s", qargs->prefix, line); - - free(line); - - return pargs; -} - - -#ifdef HAVE_CLOCK_GETTIME - -static time_t get_monotonic_seconds() -{ - struct timespec tp; - clock_gettime(CLOCK_MONOTONIC, &tp); - return tp.tv_sec; -} - -#else - -/* Fallback version of the above. The time according to gettimeofday() is not - * monotonic, so measuring intervals based on it will screw up if there's a - * timezone change (e.g. daylight savings) while the program is running. */ -static time_t get_monotonic_seconds() -{ - struct timeval tp; - gettimeofday(&tp, NULL); - return tp.tv_sec; -} - -#endif - -static void finalise_image(void *qp, void *pp) -{ - struct queue_args *qargs = qp; - struct index_args *pargs = pp; - time_t monotonic_seconds; - - qargs->n_indexable += pargs->indexable; - qargs->n_processed++; - - monotonic_seconds = get_monotonic_seconds(); - if ( monotonic_seconds >= qargs->t_last_stats+STATS_EVERY_N_SECONDS ) { - - STATUS("%i out of %i indexed so far," - " %i out of %i since the last message.\n", - qargs->n_indexable, qargs->n_processed, - qargs->n_indexable - qargs->n_indexable_last_stats, - qargs->n_processed - qargs->n_processed_last_stats); - - qargs->n_processed_last_stats = qargs->n_processed; - qargs->n_indexable_last_stats = qargs->n_indexable; - qargs->t_last_stats = monotonic_seconds; + ///// FREE ///// + /* Only free cell if found */ + cell_free(image.indexed_cell); + reflist_free(image.reflections); + free(image.data); + if ( image.flags != NULL ) free(image.flags); + image_feature_list_free(image.features); + hdfile_close(hdfile); + free_detector_geometry(image.det); } - - free(pargs->filename); - free(pargs); } - static int parse_cell_reduction(const char *scellr, int *err, int *reduction_needs_cell) { @@ -565,6 +614,7 @@ int main(int argc, char *argv[]) int config_cmfilter = 0; int config_noisefilter = 0; int config_verbose = 0; + int config_polar = 1; int config_satcorr = 1; int config_checkprefix = 1; int config_closer = 1; @@ -591,7 +641,7 @@ int main(int argc, char *argv[]) float tols[4] = {5.0, 5.0, 5.0, 1.5}; /* a,b,c,angles (%,%,%,deg) */ int cellr; int peaks; - int nthreads = 1; + int nProcesses = 1; pthread_mutex_t output_mutex = PTHREAD_MUTEX_INITIALIZER; char *prepare_line; char prepare_filename[1024]; @@ -610,7 +660,7 @@ int main(int argc, char *argv[]) float ir_inn = 4.0; float ir_mid = 5.0; float ir_out = 7.0; - + copyme = new_copy_hdf5_field_list(); if ( copyme == NULL ) { ERROR("Couldn't allocate HDF5 field list.\n"); @@ -689,7 +739,7 @@ int main(int argc, char *argv[]) break; case 'j' : - nthreads = atoi(optarg); + nProcesses = atoi(optarg); break; case 'g' : @@ -828,11 +878,11 @@ int main(int argc, char *argv[]) } else { ofh = fopen(outfile, "w"); } +//printf("***************%s\n",outfile); if ( ofh == NULL ) { ERROR("Failed to open output file '%s'\n", outfile); return 1; } - free(outfile); if ( hdf5_peak_path == NULL ) { hdf5_peak_path = strdup("/processing/hitfinder/peakinfo"); @@ -865,7 +915,7 @@ int main(int argc, char *argv[]) } } - if ( nthreads == 0 ) { + if ( nProcesses == 0 ) { ERROR("Invalid number of threads.\n"); return 1; } @@ -964,7 +1014,7 @@ int main(int argc, char *argv[]) cell = NULL; } free(pdb); - + write_stream_header(ofh, argc, argv); if ( beam != NULL ) { @@ -1010,6 +1060,7 @@ int main(int argc, char *argv[]) qargs.static_args.config_cmfilter = config_cmfilter; qargs.static_args.config_noisefilter = config_noisefilter; qargs.static_args.config_verbose = config_verbose; + qargs.static_args.config_polar = config_polar; qargs.static_args.config_satcorr = config_satcorr; qargs.static_args.config_closer = config_closer; qargs.static_args.config_insane = config_insane; @@ -1034,6 +1085,7 @@ int main(int argc, char *argv[]) qargs.static_args.stream_flags = stream_flags; qargs.static_args.hdf5_peak_path = hdf5_peak_path; qargs.static_args.copyme = copyme; + qargs.static_args.nProcesses = nProcesses; qargs.static_args.ir_inn = ir_inn; qargs.static_args.ir_mid = ir_mid; qargs.static_args.ir_out = ir_out; @@ -1045,28 +1097,164 @@ int main(int argc, char *argv[]) qargs.n_processed = 0; qargs.n_indexable_last_stats = 0; qargs.n_processed_last_stats = 0; - qargs.t_last_stats = get_monotonic_seconds(); - - n_images = run_threads(nthreads, process_image, get_image, - finalise_image, &qargs, 0, - cpu_num, cpu_groupsize, cpu_offset); - - cleanup_indexing(ipriv); - - free(indm); - free(ipriv); - free(prefix); - free_detector_geometry(det); - free(beam); - free(element); - free(hdf5_peak_path); - free_copy_hdf5_field_list(copyme); - cell_free(cell); - if ( fh != stdin ) fclose(fh); - if ( ofh != stdout ) fclose(ofh); - - STATUS("There were %i images, of which %i could be indexed.\n", - n_images, qargs.n_indexable); - - return 0; + qargs.n_indexableTotal = 0; + qargs.n_processedTotal = 0; + qargs.n_indexable_last_statsTotal = 0; + qargs.n_processed_last_statsTotal = 0; + qargs.t_last_stats = get_time(); + qargs.updateReader = nProcesses-1; // last process reads updates + + //////////// Read .lst file /////////////// + int i,j; + double t1, t2; + rewind(fh); // make sure count from start + n_images = count_patterns(fh); + rewind(fh); + // Divide images into nProcesses + int nPerBatch = (int)ceil((float)n_images/nProcesses); + // Malloc 3D array + char ***array; + int nChars = LINE_LENGTH; + array = malloc(nProcesses * sizeof(char **)); + if (array == NULL){ + printf("Error\n"); + return -1; + } + for(i=0;i<nProcesses;i++){ + array[i] = malloc(nPerBatch * sizeof(char *)); + if(array[i] == NULL){ + printf("Error\n"); + return -1; + } + for(j=0;j<nPerBatch;j++){ + array[i][j] = malloc(nChars * sizeof(char)); + if(array[i][j] == NULL){ + printf("Error\n"); + return -1; + } + } + } + // Read from file + for (i=0; i<nProcesses; i++) { + for (j=0; j<nPerBatch; j++) { + if(fgets(&array[i][j][0],(nChars * sizeof(char))-1,fh) == NULL){ + array[i][j] = NULL; + } + } + } + fclose(fh); + printf("\n"); // flush buffer + + // Clear output file content + char *myOutfilename = NULL; + chomp(prefix); + chomp(outfile); + myOutfilename = malloc(strlen(outfile)+1); + snprintf(myOutfilename,LINE_LENGTH-1,"%s",outfile); //maximum print length + FILE *tfh; + tfh = fopen(myOutfilename,"a+"); + if (tfh == NULL) { printf("Error\n"); } + fclose(tfh); + qargs.static_args.outfile = outfile; + + //////////// PIPING /////////////// + char bufferW[BUFSIZE], bufferR[BUFSIZE]; // write and read buffers + int fd_pipe[nProcesses][2]; + + memset(bufferW, 0, BUFSIZE); + memset(bufferR, 0, BUFSIZE); + + for (i=0;i<nProcesses;i++){ // pipe nthread times + pipe(fd_pipe[i]); + } + + //////////// FORKING /////////////// + int power = 10; // 2^power must be larger than nProcesses + int pid[power]; + double num = 0; + int batchNum = 0; + // Fork 2^power times + for (i=0;i<power;i++){ + pid[i] = fork(); + } + // Assign id + for (i=0;i<power;i++){ + if (pid[i] == 0){ // keep parents and kill off children + num += pow(2,i); + } + } + // Kill if batchNum too high + if (num >= nProcesses){ + exit(0); // kill + } + batchNum = (int) num; + + // Calculate how many images to process + qargs.nPerProcess = 0; + i=batchNum; + for (j=0; j<nPerBatch; j++) { + if (array[i][j] != '\0'){ + qargs.nPerProcess++; + } + } + //printf("%d: %d images\n",batchNum,qargs.nPerProcess); + + ////////// PLUMBING ////////// + if (batchNum == qargs.updateReader){ + for (i=0;i<nProcesses;i++){ + if (i != batchNum) { + close(fd_pipe[i][1]); // only open own write pipe + } + // read from all processes + } + } else { + for (i=0;i<nProcesses;i++){ + if (i != batchNum) { + close(fd_pipe[i][1]); // only open own write pipe + } + close(fd_pipe[i][0]); // close all read pipes + } + } + + //////////// INDEXING /////////////// + //printf("PID&PPID: %d %d\n",getpid(),getppid()); + + t1 = get_time(); + + process_image(array,batchNum,&qargs,fd_pipe); + + ///// READ UPDATE ///// + if (batchNum == qargs.updateReader){ // loop until every process is finished + int pipesOpen; + int loopFlag = 1; + while (loopFlag) { + double seconds; + seconds = get_time(); + if ( seconds >= qargs.t_last_stats+STATS_EVERY_N_SECONDS ) { + pipesOpen = readUpdate(&qargs,fd_pipe,1); + if (pipesOpen == 0) { + // Close all read pipes + for (i=0;i<qargs.static_args.nProcesses;i++){ + close(fd_pipe[i][0]); + } + loopFlag = 0; // Must execute this to get out + } + } + } + } + + close(fd_pipe[batchNum][1]); // close own write pipe + + t2 = get_time(); + + //////////// EXITING /////////////// + + printf("%d: Fork Time %.2fs\n",batchNum,t2-t1); + + if (batchNum == qargs.updateReader){ + STATUS("There were %i images, of which %i could be indexed.\n", + n_images, qargs.n_indexableTotal); + } + + exit(0); } |