From 7ce60a63de7750123d94ac77318547ba6e8b4cf1 Mon Sep 17 00:00:00 2001 From: Chuck Date: Thu, 29 Mar 2012 12:22:06 +0200 Subject: Fork indexamajig --- src/indexamajig.c | 880 +++++++++++++++++++++++++++++++++--------------------- 1 file changed, 534 insertions(+), 346 deletions(-) (limited to 'src') diff --git a/src/indexamajig.c b/src/indexamajig.c index efbf4b44..9ddbb6ea 100644 --- a/src/indexamajig.c +++ b/src/indexamajig.c @@ -1,32 +1,12 @@ /* - * indexamajig.c + * indexamajigFork.c * * Index patterns, output hkl+intensity etc. * - * Copyright © 2012 Deutsches Elektronen-Synchrotron DESY, - * a research centre of the Helmholtz Association. - * Copyright © 2012 Richard Kirian - * Copyright © 2012 Lorenzo Galli + * (c) 2006-2012 Thomas White + * (c) 2012- Chunhong Yoon * - * Authors: - * 2010-2012 Thomas White - * 2011 Richard Kirian - * 2012 Lorenzo Galli - * - * This file is part of CrystFEL. - * - * CrystFEL is free software: you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation, either version 3 of the License, or - * (at your option) any later version. - * - * CrystFEL is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with CrystFEL. If not, see . + * Part of CrystFEL - crystallography with a FEL * */ @@ -44,6 +24,8 @@ #include #include #include +#include +#include #ifdef HAVE_CLOCK_GETTIME #include @@ -51,22 +33,46 @@ #include #endif -#include "utils.h" -#include "hdf5-file.h" -#include "index.h" -#include "peaks.h" -#include "detector.h" -#include "filters.h" -#include "thread-pool.h" -#include "beam-parameters.h" -#include "geometry.h" -#include "stream.h" -#include "reflist-utils.h" +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include /* Write statistics at APPROXIMATELY this interval */ -#define STATS_EVERY_N_SECONDS (5) +#define STATS_EVERY_N_SECONDS (2) + +#define LINE_LENGTH 1024 +#define BUFSIZE 64 + +#ifdef HAVE_CLOCK_GETTIME +static double get_time() +{ + struct timespec tp; + clock_gettime(CLOCK_MONOTONIC, &tp); + double sec = (double) tp.tv_sec+ (double) tp.tv_nsec/1000000000; + return sec; //nano resolution +} +#else +/* Fallback version of the above. The time according to gettimeofday() is not + * monotonic, so measuring intervals based on it will screw up if there's a + * timezone change (e.g. daylight savings) while the program is running. */ +static double get_time() +{ + struct timeval tp; + gettimeofday(&tp, NULL); + double sec = (double) tp.tv_sec+ (double) tp.tv_usec/1000000; + return sec; //micro resolution +} +#endif enum { PEAK_ZAEF, @@ -82,6 +88,7 @@ struct static_index_args int config_noisefilter; int config_verbose; int stream_flags; /* What goes into the output? */ + int config_polar; int config_satcorr; int config_closer; int config_insane; @@ -102,11 +109,14 @@ struct static_index_args double ir_inn; double ir_mid; double ir_out; - + /* Output stream */ pthread_mutex_t *output_mutex; /* Protects the output stream */ FILE *ofh; + + char *outfile; const struct copy_hdf5_field *copyme; + int nProcesses; }; @@ -136,9 +146,45 @@ struct queue_args int n_processed; int n_indexable_last_stats; int n_processed_last_stats; - int t_last_stats; + + int n_indexableTotal; + int n_processedTotal; + int n_indexable_last_statsTotal; + int n_processed_last_statsTotal; + + int nPerProcess; + int updateReader; + + double t_last_stats; + }; +// Count number of patterns in .lst +int count_patterns(FILE *fh){ + char *rval; + int n_total = 0; + do{ + char line[LINE_LENGTH]; + rval = fgets(line,LINE_LENGTH-1,fh); + if (rval != NULL){ + n_total++; + } + }while(rval!=NULL); + if (ferror(fh)) { + printf("Read error\n"); + return -1; + } + return n_total; +} + +// Assign a batch number to a process +int getBatchNum(int pid[], int length){ + int i, id = 0; + for (i=0; i0)*pow(2,i)); // child = 1, parent = 0 + } + return id; +} static void show_help(const char *s) { @@ -189,34 +235,35 @@ static void show_help(const char *s) "The default is '--record=integrated'.\n" "\n\n" "For more control over the process, you might need:\n\n" -" --cell-reduction= Use as the cell reduction method. Choose from:\n" -" none : no matching, just use the raw cell.\n" -" reduce : full cell reduction.\n" -" compare : match by at most changing the order of\n" -" the indices.\n" -" compare_ab : compare 'a' and 'b' lengths only.\n" -" --tolerance= Set the tolerances for cell reduction.\n" -" Default: 5,5,5,1.5.\n" -" --filter-cm Perform common-mode noise subtraction on images\n" -" before proceeding. Intensities will be extracted\n" -" from the image as it is after this processing.\n" -" --filter-noise Apply an aggressive noise filter which sets all\n" -" pixels in each 3x3 region to zero if any of them\n" -" have negative values. Intensity measurement will\n" -" be performed on the image as it was before this.\n" -" --no-sat-corr Don't correct values of saturated peaks using a\n" -" table included in the HDF5 file.\n" -" --threshold= Only accept peaks above ADU. Default: 800.\n" -" --min-gradient= Minimum gradient for Zaefferer peak search.\n" -" Default: 100,000.\n" -" --min-snr= Minimum signal-to-noise ratio for peaks.\n" -" Default: 5.\n" -" --min-integration-snr= Minimum signal-to-noise ratio for peaks\n" -" during integration. Default: -infinity.\n" -" --int-radius= Set the integration radii. Default: 4,5,7.\n" -"-e, --image= Use this image from the HDF5 file.\n" -" Example: /data/data0.\n" -" Default: The first one found.\n" +" --cell-reduction= Use as the cell reduction method. Choose from:\n" +" none : no matching, just use the raw cell.\n" +" reduce : full cell reduction.\n" +" compare : match by at most changing the order of\n" +" the indices.\n" +" compare_ab : compare 'a' and 'b' lengths only.\n" +" --tolerance= Set the tolerance for a,b,c axis (in %%)\n" +" and for the angles (in deg) when reducing\n" +" or comparing (default is 5%% and 1.5deg)\n" +" --filter-cm Perform common-mode noise subtraction on images\n" +" before proceeding. Intensities will be extracted\n" +" from the image as it is after this processing.\n" +" --filter-noise Apply an aggressive noise filter which sets all\n" +" pixels in each 3x3 region to zero if any of them\n" +" have negative values. Intensity measurement will\n" +" be performed on the image as it was before this.\n" +" --unpolarized Don't correct for the polarisation of the X-rays.\n" +" --no-sat-corr Don't correct values of saturated peaks using a\n" +" table included in the HDF5 file.\n" +" --threshold= Only accept peaks above ADU. Default: 800.\n" +" --min-gradient= Minimum gradient for Zaefferer peak search.\n" +" Default: 100,000.\n" +" --min-snr= Minimum signal-to-noise ratio for peaks.\n" +" Default: 5.\n" +" --min-integration-snr= Minimum signal-to-noise ratio for peaks\n" +" during integration. Default: -infinity.\n" +" -e, --image= Use this image from the HDF5 file.\n" +" Example: /data/data0.\n" +" Default: The first one found.\n" "\n" "\nFor time-resolved stuff, you might want to use:\n\n" " --copy-hdf5-field Copy the value of field into the stream. You\n" @@ -245,289 +292,291 @@ static void show_help(const char *s) ); } +int readUpdate(void *qp, int fd_pipe[][2], int finish){ + struct queue_args *qargs = qp; + int i,j; + int rFlag; + int pipesOpen = 0; + char bufferR[BUFSIZE]; + memset(bufferR, 0, BUFSIZE); + int numFields = 2; + + for (i=0; istatic_args.nProcesses; i++){ + if (finish && i == qargs->updateReader) { + // do nothing + }else{ + rFlag = read(fd_pipe[i][0], bufferR, BUFSIZE-1); // wait till something to read + if (rFlag != 0){ + char * pch; + char delims[] = "#"; + pch = strtok (bufferR, delims); + for (j=0;jn_indexableTotal += atoi(pch); + break; + case 1: + qargs->n_processedTotal += atoi(pch); + break; + } + pch = strtok (NULL, delims); + } + memset(bufferR, 0, BUFSIZE); + pipesOpen++; + } + } + } + STATUS("%i out of %i indexed so far," + " %i out of %i since the last message.\n\n", + qargs->n_indexableTotal, qargs->n_processedTotal, + qargs->n_indexableTotal-qargs->n_indexable_last_statsTotal, + qargs->n_processedTotal-qargs->n_processed_last_statsTotal); -static void process_image(void *pp, int cookie) -{ - struct index_args *pargs = pp; - struct hdfile *hdfile; - struct image image; + qargs->n_indexable_last_statsTotal = qargs->n_indexableTotal; + qargs->n_processed_last_statsTotal = qargs->n_processedTotal; + + return pipesOpen; +} + +// Manipulate image +static void process_image(char ***array, int batch, void *qp, int fd_pipe[][2]) { + + struct queue_args *qargs = qp; float *data_for_measurement; size_t data_size; - char *filename = pargs->filename; - UnitCell *cell = pargs->static_args.cell; - int config_cmfilter = pargs->static_args.config_cmfilter; - int config_noisefilter = pargs->static_args.config_noisefilter; - int config_verbose = pargs->static_args.config_verbose; - IndexingMethod *indm = pargs->static_args.indm; - struct beam_params *beam = pargs->static_args.beam; - - image.features = NULL; - image.data = NULL; - image.flags = NULL; - image.indexed_cell = NULL; - image.id = cookie; - image.filename = filename; - image.det = copy_geom(pargs->static_args.det); - image.copyme = pargs->static_args.copyme; - image.beam = beam; - - if ( beam == NULL ) { - ERROR("Warning: no beam parameters file.\n"); - ERROR("I'm going to assume 1 ADU per photon, which is almost"); - ERROR(" certainly wrong. Peak sigmas will be incorrect.\n"); - } - - pargs->indexable = 0; + UnitCell *cell = qargs->static_args.cell; + int config_cmfilter = qargs->static_args.config_cmfilter; + int config_noisefilter = qargs->static_args.config_noisefilter; + int config_verbose = qargs->static_args.config_verbose; + //int config_polar = qargs->static_args.config_polar; + IndexingMethod *indm = qargs->static_args.indm; + struct beam_params *beam = qargs->static_args.beam; + + int i; + int r, check; + struct hdfile *hdfile; + char *outfile = qargs->static_args.outfile; + int nPerBatch = qargs->nPerProcess; + + for (i=0; istatic_args.det); + image.copyme = qargs->static_args.copyme; + image.beam = beam; + image.id = batch; // MUST SET ID FOR MOSFLM TO WORK PROPERLY + + if ( beam == NULL ) { + ERROR("Warning: no beam parameters file.\n"); + ERROR("I'm going to assume 1 ADU per photon, which is almost"); + ERROR(" certainly wrong. Peak sigmas will be incorrect.\n"); + } - hdfile = hdfile_open(filename); - if ( hdfile == NULL ) return; + //pargs->indexable = 0; - if ( pargs->static_args.element != NULL ) { + char *filename = NULL; + char *line = array[batch][i]; + chomp(line); +//printf("%d ***************%s\n",batch,line); + filename = malloc(strlen(qargs->prefix)+strlen(line)+1); + snprintf(filename,LINE_LENGTH-1,"%s%s",qargs->prefix,line); //maximum print length + free(line); + image.filename = filename; +//printf("%d ***************%s\n",batch,filename); + hdfile = hdfile_open(filename); + if (hdfile == NULL) return; - int r; - r = hdfile_set_image(hdfile, pargs->static_args.element); - if ( r ) { - ERROR("Couldn't select path '%s'\n", - pargs->static_args.element); + r = hdfile_set_first_image(hdfile, "/"); // Need this to read hdf5 files + if (r) { + ERROR("Couldn't select first path\n"); hdfile_close(hdfile); return; } - } else { - - int r; - r = hdfile_set_first_image(hdfile, "/"); - if ( r ) { - ERROR("Couldn't select first path\n"); + check = hdf5_read(hdfile,&image,1); + if (check == 1){ hdfile_close(hdfile); return; } - } - - hdf5_read(hdfile, &image, pargs->static_args.config_satcorr); - - if ( (image.width != image.det->max_fs+1) - || (image.height != image.det->max_ss+1) ) - { - ERROR("Image size doesn't match geometry size" - " - rejecting image.\n"); - ERROR("Image size: %i,%i. Geometry size: %i,%i\n", - image.width, image.height, - image.det->max_fs+1, image.det->max_ss+1); - hdfile_close(hdfile); - free_detector_geometry(image.det); - return; - } - - if ( image.lambda < 0.0 ) { - if ( beam != NULL ) { - ERROR("Using nominal photon energy of %.2f eV\n", - beam->photon_energy); - image.lambda = ph_en_to_lambda( - eV_to_J(beam->photon_energy)); - } else { - ERROR("No wavelength in file, so you need to give " - "a beam parameters file with -b.\n"); + if ( (image.width != image.det->max_fs+1) + || (image.height != image.det->max_ss+1) ) + { + ERROR("Image size doesn't match geometry size" + " - rejecting image.\n"); + ERROR("Image size: %i,%i. Geometry size: %i,%i\n", + image.width, image.height, + image.det->max_fs+1, image.det->max_ss+1); hdfile_close(hdfile); free_detector_geometry(image.det); return; } - } - fill_in_values(image.det, hdfile); - if ( config_cmfilter ) { - filter_cm(&image); - } + if ( image.lambda < 0.0 ) { + if ( beam != NULL ) { + ERROR("Using nominal photon energy of %.2f eV\n", + beam->photon_energy); + image.lambda = ph_en_to_lambda( + eV_to_J(beam->photon_energy)); + } else { + ERROR("No wavelength in file, so you need to give " + "a beam parameters file with -b.\n"); + hdfile_close(hdfile); + free_detector_geometry(image.det); + return; + } + } + fill_in_values(image.det, hdfile); - /* Take snapshot of image after CM subtraction but before - * the aggressive noise filter. */ - data_size = image.width*image.height*sizeof(float); - data_for_measurement = malloc(data_size); + if ( config_cmfilter ) { + filter_cm(&image); + } - if ( config_noisefilter ) { - filter_noise(&image, data_for_measurement); - } else { - memcpy(data_for_measurement, image.data, data_size); - } + /* Take snapshot of image after CM subtraction but before + * the aggressive noise filter. */ + data_size = image.width*image.height*sizeof(float); + data_for_measurement = malloc(data_size); - switch ( pargs->static_args.peaks ) - { - case PEAK_HDF5 : - /* Get peaks from HDF5 */ - if ( get_peaks(&image, hdfile, - pargs->static_args.hdf5_peak_path) ) - { - ERROR("Failed to get peaks from HDF5 file.\n"); + if ( config_noisefilter ) { + filter_noise(&image, data_for_measurement); + } else { + memcpy(data_for_measurement, image.data, data_size); } - break; + switch ( qargs->static_args.peaks ) + { + case PEAK_HDF5 : + /* Get peaks from HDF5 */ + if ( get_peaks(&image, hdfile, + qargs->static_args.hdf5_peak_path) ) + { + ERROR("Failed to get peaks from HDF5 file.\n"); + } + break; case PEAK_ZAEF : - search_peaks(&image, pargs->static_args.threshold, - pargs->static_args.min_gradient, - pargs->static_args.min_snr, - pargs->static_args.ir_inn, - pargs->static_args.ir_mid, - pargs->static_args.ir_out); - break; - } - - /* Get rid of noise-filtered version at this point - * - it was strictly for the purposes of peak detection. */ - free(image.data); - image.data = data_for_measurement; - - /* Calculate orientation matrix (by magic) */ - image.div = beam->divergence; - image.bw = beam->bandwidth; - image.profile_radius = 0.0001e9; - index_pattern(&image, cell, indm, pargs->static_args.cellr, - config_verbose, pargs->static_args.ipriv, - pargs->static_args.config_insane, - pargs->static_args.tols); - - if ( image.indexed_cell != NULL ) { - - pargs->indexable = 1; - - image.reflections = find_intersections(&image, + search_peaks(&image, qargs->static_args.threshold, + qargs->static_args.min_gradient, + qargs->static_args.min_snr, + qargs->static_args.ir_inn, + qargs->static_args.ir_mid, + qargs->static_args.ir_out); + break; + } + + /* Get rid of noise-filtered version at this point + * - it was strictly for the purposes of peak detection. */ + free(image.data); + image.data = data_for_measurement; + + /* Calculate orientation matrix (by magic) */ + image.div = beam->divergence; + image.bw = beam->bandwidth; + image.profile_radius = 0.0001e9; + + ///// RUN INDEXING HERE ///// + index_pattern(&image, cell, indm, qargs->static_args.cellr, + config_verbose, qargs->static_args.ipriv, + qargs->static_args.config_insane, qargs->static_args.tols); + + if ( image.indexed_cell != NULL ) { + //pargs->indexable = 1; + image.reflections = find_intersections(&image, image.indexed_cell); - - if ( image.reflections != NULL ) { - - integrate_reflections(&image, - pargs->static_args.config_closer, - pargs->static_args.config_bgsub, - pargs->static_args.min_int_snr, - pargs->static_args.ir_inn, - pargs->static_args.ir_mid, - pargs->static_args.ir_out); - + if ( image.reflections != NULL ) { + integrate_reflections(&image, + qargs->static_args.config_closer, + qargs->static_args.config_bgsub, + qargs->static_args.min_int_snr, + qargs->static_args.ir_inn, + qargs->static_args.ir_mid, + qargs->static_args.ir_out); + } + } else { + image.reflections = NULL; } - } else { - - image.reflections = NULL; - - } - - pthread_mutex_lock(pargs->static_args.output_mutex); - write_chunk(pargs->static_args.ofh, &image, hdfile, - pargs->static_args.stream_flags); - pthread_mutex_unlock(pargs->static_args.output_mutex); - - /* Only free cell if found */ - cell_free(image.indexed_cell); - - reflist_free(image.reflections); - free(image.data); - if ( image.flags != NULL ) free(image.flags); - image_feature_list_free(image.features); - hdfile_close(hdfile); - free_detector_geometry(image.det); -} - - -static void *get_image(void *qp) -{ - char *line; - struct index_args *pargs; - char *rval; - struct queue_args *qargs = qp; - - /* Initialise new task arguments */ - pargs = malloc(sizeof(struct index_args)); - memcpy(&pargs->static_args, &qargs->static_args, - sizeof(struct static_index_args)); - - /* Get the next filename */ - if ( qargs->use_this_one_instead != NULL ) { - - line = qargs->use_this_one_instead; - qargs->use_this_one_instead = NULL; - - } else { - - line = malloc(1024*sizeof(char)); - rval = fgets(line, 1023, qargs->fh); - if ( rval == NULL ) { - free(pargs); - free(line); - return NULL; + // Write Lock + struct flock fl = {F_WRLCK, SEEK_SET, 0, 0, 0 }; + int fd; + fl.l_pid = getpid(); + + char *outfilename = NULL; + chomp(outfile); // may not need this +//printf("%d ***************%s\n",batch,outfile); + outfilename = malloc(strlen(outfile)+1); + snprintf(outfilename,LINE_LENGTH-1,"%s",outfile); //maximum print length +//printf("%d ***************%s\n",batch,outfilename); + if ((fd = open(outfilename, O_WRONLY)) == -1) { + perror("Error on opening\n"); + exit(1); + } + + if (fcntl(fd, F_SETLKW, &fl) == -1) { + perror("Error on setting lock wait\n"); + exit(1); + } + + // LOCKED! Write chunk + FILE *fh; +//printf("%d ***************%s\n",batch,outfilename); + fh = fopen(outfilename,"a"); + if (fh == NULL) { + perror("Error inside lock\n"); + } + write_chunk(fh, &image, hdfile, qargs->static_args.stream_flags); + fclose(fh); + + // Unlock stream for other processes + fl.l_type = F_UNLCK; // set to unlock same region + if (fcntl(fd, F_SETLK, &fl) == -1) { + perror("fcntl"); + exit(1); + } + close(fd); + + ///// WRITE UPDATE ///// + double seconds; + qargs->n_indexable += ( image.indexed_cell != NULL ); + qargs->n_processed++; + seconds = get_time(); + if ( seconds >= qargs->t_last_stats+STATS_EVERY_N_SECONDS + || qargs->n_processed == qargs->nPerProcess) { // Must write if finished + + // WRITE PIPE HERE + char bufferW[BUFSIZE]; + memset(bufferW, 0, BUFSIZE); + sprintf(bufferW,"%d#%d", + qargs->n_indexable - qargs->n_indexable_last_stats, + qargs->n_processed - qargs->n_processed_last_stats); + write(fd_pipe[batch][1], bufferW, BUFSIZE-1); + + // Update stats + qargs->n_processed_last_stats = qargs->n_processed; + qargs->n_indexable_last_stats = qargs->n_indexable; + qargs->t_last_stats = seconds; + + ///// READ UPDATE ///// + if (batch == qargs->updateReader) { + readUpdate(qargs,fd_pipe, 0); + } } - chomp(line); - - } - - if ( qargs->config_basename ) { - char *tmp; - tmp = safe_basename(line); - free(line); - line = tmp; - } - - pargs->filename = malloc(strlen(qargs->prefix)+strlen(line)+1); - - snprintf(pargs->filename, 1023, "%s%s", qargs->prefix, line); - - free(line); - - return pargs; -} - - -#ifdef HAVE_CLOCK_GETTIME - -static time_t get_monotonic_seconds() -{ - struct timespec tp; - clock_gettime(CLOCK_MONOTONIC, &tp); - return tp.tv_sec; -} - -#else - -/* Fallback version of the above. The time according to gettimeofday() is not - * monotonic, so measuring intervals based on it will screw up if there's a - * timezone change (e.g. daylight savings) while the program is running. */ -static time_t get_monotonic_seconds() -{ - struct timeval tp; - gettimeofday(&tp, NULL); - return tp.tv_sec; -} - -#endif - -static void finalise_image(void *qp, void *pp) -{ - struct queue_args *qargs = qp; - struct index_args *pargs = pp; - time_t monotonic_seconds; - - qargs->n_indexable += pargs->indexable; - qargs->n_processed++; - - monotonic_seconds = get_monotonic_seconds(); - if ( monotonic_seconds >= qargs->t_last_stats+STATS_EVERY_N_SECONDS ) { - - STATUS("%i out of %i indexed so far," - " %i out of %i since the last message.\n", - qargs->n_indexable, qargs->n_processed, - qargs->n_indexable - qargs->n_indexable_last_stats, - qargs->n_processed - qargs->n_processed_last_stats); - - qargs->n_processed_last_stats = qargs->n_processed; - qargs->n_indexable_last_stats = qargs->n_indexable; - qargs->t_last_stats = monotonic_seconds; + ///// FREE ///// + /* Only free cell if found */ + cell_free(image.indexed_cell); + reflist_free(image.reflections); + free(image.data); + if ( image.flags != NULL ) free(image.flags); + image_feature_list_free(image.features); + hdfile_close(hdfile); + free_detector_geometry(image.det); } - - free(pargs->filename); - free(pargs); } - static int parse_cell_reduction(const char *scellr, int *err, int *reduction_needs_cell) { @@ -565,6 +614,7 @@ int main(int argc, char *argv[]) int config_cmfilter = 0; int config_noisefilter = 0; int config_verbose = 0; + int config_polar = 1; int config_satcorr = 1; int config_checkprefix = 1; int config_closer = 1; @@ -591,7 +641,7 @@ int main(int argc, char *argv[]) float tols[4] = {5.0, 5.0, 5.0, 1.5}; /* a,b,c,angles (%,%,%,deg) */ int cellr; int peaks; - int nthreads = 1; + int nProcesses = 1; pthread_mutex_t output_mutex = PTHREAD_MUTEX_INITIALIZER; char *prepare_line; char prepare_filename[1024]; @@ -610,7 +660,7 @@ int main(int argc, char *argv[]) float ir_inn = 4.0; float ir_mid = 5.0; float ir_out = 7.0; - + copyme = new_copy_hdf5_field_list(); if ( copyme == NULL ) { ERROR("Couldn't allocate HDF5 field list.\n"); @@ -689,7 +739,7 @@ int main(int argc, char *argv[]) break; case 'j' : - nthreads = atoi(optarg); + nProcesses = atoi(optarg); break; case 'g' : @@ -828,11 +878,11 @@ int main(int argc, char *argv[]) } else { ofh = fopen(outfile, "w"); } +//printf("***************%s\n",outfile); if ( ofh == NULL ) { ERROR("Failed to open output file '%s'\n", outfile); return 1; } - free(outfile); if ( hdf5_peak_path == NULL ) { hdf5_peak_path = strdup("/processing/hitfinder/peakinfo"); @@ -865,7 +915,7 @@ int main(int argc, char *argv[]) } } - if ( nthreads == 0 ) { + if ( nProcesses == 0 ) { ERROR("Invalid number of threads.\n"); return 1; } @@ -964,7 +1014,7 @@ int main(int argc, char *argv[]) cell = NULL; } free(pdb); - + write_stream_header(ofh, argc, argv); if ( beam != NULL ) { @@ -1010,6 +1060,7 @@ int main(int argc, char *argv[]) qargs.static_args.config_cmfilter = config_cmfilter; qargs.static_args.config_noisefilter = config_noisefilter; qargs.static_args.config_verbose = config_verbose; + qargs.static_args.config_polar = config_polar; qargs.static_args.config_satcorr = config_satcorr; qargs.static_args.config_closer = config_closer; qargs.static_args.config_insane = config_insane; @@ -1034,6 +1085,7 @@ int main(int argc, char *argv[]) qargs.static_args.stream_flags = stream_flags; qargs.static_args.hdf5_peak_path = hdf5_peak_path; qargs.static_args.copyme = copyme; + qargs.static_args.nProcesses = nProcesses; qargs.static_args.ir_inn = ir_inn; qargs.static_args.ir_mid = ir_mid; qargs.static_args.ir_out = ir_out; @@ -1045,28 +1097,164 @@ int main(int argc, char *argv[]) qargs.n_processed = 0; qargs.n_indexable_last_stats = 0; qargs.n_processed_last_stats = 0; - qargs.t_last_stats = get_monotonic_seconds(); - - n_images = run_threads(nthreads, process_image, get_image, - finalise_image, &qargs, 0, - cpu_num, cpu_groupsize, cpu_offset); - - cleanup_indexing(ipriv); - - free(indm); - free(ipriv); - free(prefix); - free_detector_geometry(det); - free(beam); - free(element); - free(hdf5_peak_path); - free_copy_hdf5_field_list(copyme); - cell_free(cell); - if ( fh != stdin ) fclose(fh); - if ( ofh != stdout ) fclose(ofh); - - STATUS("There were %i images, of which %i could be indexed.\n", - n_images, qargs.n_indexable); - - return 0; + qargs.n_indexableTotal = 0; + qargs.n_processedTotal = 0; + qargs.n_indexable_last_statsTotal = 0; + qargs.n_processed_last_statsTotal = 0; + qargs.t_last_stats = get_time(); + qargs.updateReader = nProcesses-1; // last process reads updates + + //////////// Read .lst file /////////////// + int i,j; + double t1, t2; + rewind(fh); // make sure count from start + n_images = count_patterns(fh); + rewind(fh); + // Divide images into nProcesses + int nPerBatch = (int)ceil((float)n_images/nProcesses); + // Malloc 3D array + char ***array; + int nChars = LINE_LENGTH; + array = malloc(nProcesses * sizeof(char **)); + if (array == NULL){ + printf("Error\n"); + return -1; + } + for(i=0;i= nProcesses){ + exit(0); // kill + } + batchNum = (int) num; + + // Calculate how many images to process + qargs.nPerProcess = 0; + i=batchNum; + for (j=0; j= qargs.t_last_stats+STATS_EVERY_N_SECONDS ) { + pipesOpen = readUpdate(&qargs,fd_pipe,1); + if (pipesOpen == 0) { + // Close all read pipes + for (i=0;i Date: Wed, 9 May 2012 16:19:29 +0200 Subject: Multiprocess indexamajig --- src/indexamajig.c | 550 ++++++++++++++++++++++++++++++++++-------------------- 1 file changed, 344 insertions(+), 206 deletions(-) (limited to 'src') diff --git a/src/indexamajig.c b/src/indexamajig.c index 281cac43..9372c447 100644 --- a/src/indexamajig.c +++ b/src/indexamajig.c @@ -12,6 +12,7 @@ * 2010-2012 Thomas White * 2011 Richard Kirian * 2012 Lorenzo Galli + * 2012 Chunhong Yoon * * This file is part of CrystFEL. * @@ -44,6 +45,8 @@ #include #include #include +#include +#include #ifdef HAVE_CLOCK_GETTIME #include @@ -51,22 +54,25 @@ #include #endif -#include "utils.h" -#include "hdf5-file.h" -#include "index.h" -#include "peaks.h" -#include "detector.h" -#include "filters.h" -#include "thread-pool.h" -#include "beam-parameters.h" -#include "geometry.h" -#include "stream.h" -#include "reflist-utils.h" +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include /* Write statistics at APPROXIMATELY this interval */ #define STATS_EVERY_N_SECONDS (5) +#define LINE_LENGTH 1024 + +#define BUFFER PIPE_BUF enum { PEAK_ZAEF, @@ -107,6 +113,7 @@ struct static_index_args pthread_mutex_t *output_mutex; /* Protects the output stream */ FILE *ofh; const struct copy_hdf5_field *copyme; + char *outfile; }; @@ -137,6 +144,7 @@ struct queue_args int n_indexable_last_stats; int n_processed_last_stats; int t_last_stats; + int updateReader; }; @@ -228,9 +236,8 @@ static void show_help(const char *s) "\n" "\nOptions you probably won't need:\n\n" " --no-check-prefix Don't attempt to correct the --prefix.\n" -" --no-closer-peak Don't integrate from the location of a nearby peak\n" -" instead of the position closest to the reciprocal\n" -" lattice point.\n" +" --closer-peak Don't integrate from the location of a nearby peak\n" +" instead of the predicted spot. Don't use.\n" " --insane Don't check that the reduced cell accounts for at\n" " least 10%% of the located peaks.\n" " --no-bg-sub Don't subtract local background estimates from\n" @@ -246,83 +253,94 @@ static void show_help(const char *s) } -static void process_image(void *pp, int cookie) +// Get next pattern in .lst +char* get_pattern(FILE *fh) { + char *rval; + char line[LINE_LENGTH]; + rval = fgets(line, LINE_LENGTH - 1, fh); + if (ferror(fh)) { + printf("Read error\n"); + rval = NULL; + } + return rval; +} + + +static void process_image(void *qp, void *pp, int cookie) { struct index_args *pargs = pp; - struct hdfile *hdfile; - struct image image; + struct queue_args *qargs = qp; float *data_for_measurement; size_t data_size; - char *filename = pargs->filename; - UnitCell *cell = pargs->static_args.cell; - int config_cmfilter = pargs->static_args.config_cmfilter; - int config_noisefilter = pargs->static_args.config_noisefilter; - int config_verbose = pargs->static_args.config_verbose; - IndexingMethod *indm = pargs->static_args.indm; - struct beam_params *beam = pargs->static_args.beam; + UnitCell *cell = qargs->static_args.cell; + int config_cmfilter = qargs->static_args.config_cmfilter; + int config_noisefilter = qargs->static_args.config_noisefilter; + int config_verbose = qargs->static_args.config_verbose; + IndexingMethod *indm = qargs->static_args.indm; + struct beam_params *beam = qargs->static_args.beam; + int r, check; + struct hdfile *hdfile; + char *outfile = qargs->static_args.outfile; + struct image image; image.features = NULL; image.data = NULL; image.flags = NULL; image.indexed_cell = NULL; - image.id = cookie; - image.filename = filename; - image.det = copy_geom(pargs->static_args.det); - image.copyme = pargs->static_args.copyme; + image.det = copy_geom(qargs->static_args.det); + image.copyme = qargs->static_args.copyme; image.beam = beam; + image.id = cookie; // MUST SET ID FOR MOSFLM TO WORK PROPERLY - pargs->indexable = 0; + if (beam == NULL) { + ERROR("Warning: no beam parameters file.\n"); + ERROR("I'm going to assume 1 ADU per photon, which is almost"); + ERROR(" certainly wrong. Peak sigmas will be incorrect.\n"); + } + char *filename = NULL; + char *imagename = pargs->filename; + chomp(imagename); + filename = malloc(strlen(qargs->prefix) + strlen(imagename) + 1); + snprintf(filename, LINE_LENGTH - 1, "%s%s", qargs->prefix, imagename); + image.filename = filename; hdfile = hdfile_open(filename); - if ( hdfile == NULL ) return; - - if ( pargs->static_args.element != NULL ) { - - int r; - r = hdfile_set_image(hdfile, pargs->static_args.element); - if ( r ) { - ERROR("Couldn't select path '%s'\n", - pargs->static_args.element); - hdfile_close(hdfile); - return; - } - - } else { - - int r; - r = hdfile_set_first_image(hdfile, "/"); - if ( r ) { - ERROR("Couldn't select first path\n"); - hdfile_close(hdfile); - return; - } + if (hdfile == NULL) return; + r = hdfile_set_first_image(hdfile, "/"); // Need this to read hdf5 files + if (r) { + ERROR("Couldn't select first path\n"); + hdfile_close(hdfile); + return; } - hdf5_read(hdfile, &image, pargs->static_args.config_satcorr); + check = hdf5_read(hdfile, &image, 1); + if (check == 1) { + hdfile_close(hdfile); + return; + } - if ( (image.width != image.det->max_fs+1) - || (image.height != image.det->max_ss+1) ) - { + if ((image.width != image.det->max_fs + 1) + || (image.height != image.det->max_ss + 1)) { ERROR("Image size doesn't match geometry size" - " - rejecting image.\n"); + " - rejecting image.\n"); ERROR("Image size: %i,%i. Geometry size: %i,%i\n", - image.width, image.height, - image.det->max_fs+1, image.det->max_ss+1); + image.width, image.height, + image.det->max_fs + 1, image.det->max_ss + 1); hdfile_close(hdfile); free_detector_geometry(image.det); return; } - if ( image.lambda < 0.0 ) { - if ( beam != NULL ) { + if (image.lambda < 0.0) { + if (beam != NULL) { ERROR("Using nominal photon energy of %.2f eV\n", - beam->photon_energy); + beam->photon_energy); image.lambda = ph_en_to_lambda( - eV_to_J(beam->photon_energy)); + eV_to_J(beam->photon_energy)); } else { ERROR("No wavelength in file, so you need to give " - "a beam parameters file with -b.\n"); + "a beam parameters file with -b.\n"); hdfile_close(hdfile); free_detector_geometry(image.det); return; @@ -330,40 +348,37 @@ static void process_image(void *pp, int cookie) } fill_in_values(image.det, hdfile); - if ( config_cmfilter ) { + if (config_cmfilter) { filter_cm(&image); } - /* Take snapshot of image after CM subtraction but before - * the aggressive noise filter. */ - data_size = image.width*image.height*sizeof(float); + // Take snapshot of image after CM subtraction but before + // the aggressive noise filter. + data_size = image.width * image.height * sizeof (float); data_for_measurement = malloc(data_size); - if ( config_noisefilter ) { + if (config_noisefilter) { filter_noise(&image, data_for_measurement); } else { memcpy(data_for_measurement, image.data, data_size); } - switch ( pargs->static_args.peaks ) - { - case PEAK_HDF5 : - /* Get peaks from HDF5 */ - if ( get_peaks(&image, hdfile, - pargs->static_args.hdf5_peak_path) ) - { - ERROR("Failed to get peaks from HDF5 file.\n"); - } - break; - - case PEAK_ZAEF : - search_peaks(&image, pargs->static_args.threshold, - pargs->static_args.min_gradient, - pargs->static_args.min_snr, - pargs->static_args.ir_inn, - pargs->static_args.ir_mid, - pargs->static_args.ir_out); - break; + switch (qargs->static_args.peaks) { + case PEAK_HDF5: + // Get peaks from HDF5 + if (get_peaks(&image, hdfile, + qargs->static_args.hdf5_peak_path)) { + ERROR("Failed to get peaks from HDF5 file.\n"); + } + break; + case PEAK_ZAEF: + search_peaks(&image, qargs->static_args.threshold, + qargs->static_args.min_gradient, + qargs->static_args.min_snr, + qargs->static_args.ir_inn, + qargs->static_args.ir_mid, + qargs->static_args.ir_out); + break; } /* Get rid of noise-filtered version at this point @@ -375,40 +390,66 @@ static void process_image(void *pp, int cookie) image.div = beam->divergence; image.bw = beam->bandwidth; image.profile_radius = 0.0001e9; - index_pattern(&image, cell, indm, pargs->static_args.cellr, - config_verbose, pargs->static_args.ipriv, - pargs->static_args.config_insane, - pargs->static_args.tols); - if ( image.indexed_cell != NULL ) { + /* RUN INDEXING HERE */ + index_pattern(&image, cell, indm, qargs->static_args.cellr, + config_verbose, qargs->static_args.ipriv, + qargs->static_args.config_insane, qargs->static_args.tols); + if (image.indexed_cell != NULL) { pargs->indexable = 1; - image.reflections = find_intersections(&image, - image.indexed_cell); - - if ( image.reflections != NULL ) { - + image.indexed_cell); + if (image.reflections != NULL) { integrate_reflections(&image, - pargs->static_args.config_closer, - pargs->static_args.config_bgsub, - pargs->static_args.min_int_snr, - pargs->static_args.ir_inn, - pargs->static_args.ir_mid, - pargs->static_args.ir_out); - + qargs->static_args.config_closer, + qargs->static_args.config_bgsub, + qargs->static_args.min_int_snr, + qargs->static_args.ir_inn, + qargs->static_args.ir_mid, + qargs->static_args.ir_out); } - } else { - image.reflections = NULL; + } + /* Write Lock */ + struct flock fl = {F_WRLCK, SEEK_SET, 0, 0, 0}; + int fd; + fl.l_pid = getpid(); + + char *outfilename = NULL; + chomp(outfile); + outfilename = malloc(strlen(outfile) + 1); + snprintf(outfilename, LINE_LENGTH - 1, "%s", outfile); + if ((fd = open(outfilename, O_WRONLY)) == -1) { + perror("Error on opening\n"); + exit(1); + } + if (fcntl(fd, F_SETLKW, &fl) == -1) { + perror("Error on setting lock wait\n"); + exit(1); + } + + /* LOCKED! Write chunk */ + FILE *fh; + fh = fopen(outfilename, "a"); + if (fh == NULL) { + perror("Error inside lock\n"); + } + write_chunk(fh, &image, hdfile, qargs->static_args.stream_flags); + fclose(fh); + + /* Unlock stream for other processes */ + fl.l_type = F_UNLCK; /* set to unlock same region */ + if (fcntl(fd, F_SETLK, &fl) == -1) { + perror("fcntl"); + exit(1); } + close(fd); - pthread_mutex_lock(pargs->static_args.output_mutex); - write_chunk(pargs->static_args.ofh, &image, hdfile, - pargs->static_args.stream_flags); - pthread_mutex_unlock(pargs->static_args.output_mutex); + qargs->n_indexable += pargs->indexable; + qargs->n_processed++; /* Only free cell if found */ cell_free(image.indexed_cell); @@ -422,54 +463,6 @@ static void process_image(void *pp, int cookie) } -static void *get_image(void *qp) -{ - char *line; - struct index_args *pargs; - char *rval; - struct queue_args *qargs = qp; - - /* Initialise new task arguments */ - pargs = malloc(sizeof(struct index_args)); - memcpy(&pargs->static_args, &qargs->static_args, - sizeof(struct static_index_args)); - - /* Get the next filename */ - if ( qargs->use_this_one_instead != NULL ) { - - line = qargs->use_this_one_instead; - qargs->use_this_one_instead = NULL; - - } else { - - line = malloc(1024*sizeof(char)); - rval = fgets(line, 1023, qargs->fh); - if ( rval == NULL ) { - free(pargs); - free(line); - return NULL; - } - chomp(line); - - } - - if ( qargs->config_basename ) { - char *tmp; - tmp = safe_basename(line); - free(line); - line = tmp; - } - - pargs->filename = malloc(strlen(qargs->prefix)+strlen(line)+1); - - snprintf(pargs->filename, 1023, "%s%s", qargs->prefix, line); - - free(line); - - return pargs; -} - - #ifdef HAVE_CLOCK_GETTIME static time_t get_monotonic_seconds() @@ -493,34 +486,6 @@ static time_t get_monotonic_seconds() #endif -static void finalise_image(void *qp, void *pp) -{ - struct queue_args *qargs = qp; - struct index_args *pargs = pp; - time_t monotonic_seconds; - - qargs->n_indexable += pargs->indexable; - qargs->n_processed++; - - monotonic_seconds = get_monotonic_seconds(); - if ( monotonic_seconds >= qargs->t_last_stats+STATS_EVERY_N_SECONDS ) { - - STATUS("%i out of %i indexed so far," - " %i out of %i since the last message.\n", - qargs->n_indexable, qargs->n_processed, - qargs->n_indexable - qargs->n_indexable_last_stats, - qargs->n_processed - qargs->n_processed_last_stats); - - qargs->n_processed_last_stats = qargs->n_processed; - qargs->n_indexable_last_stats = qargs->n_indexable; - qargs->t_last_stats = monotonic_seconds; - - } - - free(pargs->filename); - free(pargs); -} - static int parse_cell_reduction(const char *scellr, int *err, int *reduction_needs_cell) @@ -554,14 +519,13 @@ int main(int argc, char *argv[]) FILE *fh; FILE *ofh; char *rval = NULL; - int n_images; int config_noindex = 0; int config_cmfilter = 0; int config_noisefilter = 0; int config_verbose = 0; int config_satcorr = 1; int config_checkprefix = 1; - int config_closer = 1; + int config_closer = 0; int config_insane = 0; int config_bgsub = 1; int config_basename = 0; @@ -585,11 +549,11 @@ int main(int argc, char *argv[]) float tols[4] = {5.0, 5.0, 5.0, 1.5}; /* a,b,c,angles (%,%,%,deg) */ int cellr; int peaks; - int nthreads = 1; - pthread_mutex_t output_mutex = PTHREAD_MUTEX_INITIALIZER; + int nProcesses = 1; char *prepare_line; - char prepare_filename[1024]; + char prepare_filename[LINE_LENGTH]; struct queue_args qargs; + struct index_args pargs; struct beam_params *beam = NULL; char *element = NULL; double nominal_photon_energy; @@ -630,6 +594,7 @@ int main(int argc, char *argv[]) {"threshold", 1, NULL, 't'}, {"no-check-prefix", 0, &config_checkprefix, 0}, {"no-closer-peak", 0, &config_closer, 0}, + {"closer-peak", 0, &config_closer, 1}, {"insane", 0, &config_insane, 1}, {"image", 1, NULL, 'e'}, {"basename", 0, &config_basename, 1}, @@ -653,7 +618,7 @@ int main(int argc, char *argv[]) }; /* Short options */ - while ((c = getopt_long(argc, argv, "hi:wp:j:x:g:t:o:b:e:", + while ((c = getopt_long(argc, argv, "hi:o:z:p:x:j:g:t:b:e:", longopts, NULL)) != -1) { switch (c) { @@ -683,7 +648,7 @@ int main(int argc, char *argv[]) break; case 'j' : - nthreads = atoi(optarg); + nProcesses = atoi(optarg); break; case 'g' : @@ -826,7 +791,6 @@ int main(int argc, char *argv[]) ERROR("Failed to open output file '%s'\n", outfile); return 1; } - free(outfile); if ( hdf5_peak_path == NULL ) { hdf5_peak_path = strdup("/processing/hitfinder/peakinfo"); @@ -859,8 +823,8 @@ int main(int argc, char *argv[]) } } - if ( nthreads == 0 ) { - ERROR("Invalid number of threads.\n"); + if ( nProcesses == 0 ) { + ERROR("Invalid number of processes.\n"); return 1; } @@ -976,8 +940,8 @@ int main(int argc, char *argv[]) } /* Get first filename and use it to set up the indexing */ - prepare_line = malloc(1024*sizeof(char)); - rval = fgets(prepare_line, 1023, fh); + prepare_line = malloc(LINE_LENGTH*sizeof(char)); + rval = fgets(prepare_line, LINE_LENGTH-1, fh); if ( rval == NULL ) { ERROR("Failed to get filename to prepare indexing.\n"); return 1; @@ -989,7 +953,7 @@ int main(int argc, char *argv[]) free(prepare_line); prepare_line = tmp; } - snprintf(prepare_filename, 1023, "%s%s", prefix, prepare_line); + snprintf(prepare_filename, LINE_LENGTH-1, "%s%s", prefix, prepare_line); qargs.use_this_one_instead = prepare_line; /* Prepare the indexer */ @@ -1027,7 +991,6 @@ int main(int argc, char *argv[]) qargs.static_args.indm = indm; qargs.static_args.ipriv = ipriv; qargs.static_args.peaks = peaks; - qargs.static_args.output_mutex = &output_mutex; qargs.static_args.ofh = ofh; qargs.static_args.beam = beam; qargs.static_args.element = element; @@ -1045,11 +1008,185 @@ int main(int argc, char *argv[]) qargs.n_processed = 0; qargs.n_indexable_last_stats = 0; qargs.n_processed_last_stats = 0; + qargs.updateReader = 0; /* first process updates */ qargs.t_last_stats = get_monotonic_seconds(); - n_images = run_threads(nthreads, process_image, get_image, - finalise_image, &qargs, 0, - cpu_num, cpu_groupsize, cpu_offset); + /* Read .lst file */ + register int i; + rewind(fh); /* make sure to read from start */ + + /* Clear output file content */ + char *myOutfilename = NULL; + chomp(prefix); + chomp(outfile); + myOutfilename = malloc(strlen(outfile) + 1); + snprintf(myOutfilename, LINE_LENGTH - 1, "%s", outfile); + FILE *tfh; + tfh = fopen(myOutfilename, "a+"); + if (tfh == NULL) { + ERROR("No output filename\n"); + } + fclose(tfh); + qargs.static_args.outfile = outfile; + int ready_fd; + int buff_count; + fd_set fdset,tmpset; + char buffR[BUFFER], buffW[BUFFER]; + int fd_pipeIn[nProcesses][2]; /* Process0 In */ + int fd_pipeOut[nProcesses][2]; /* Process0 Out */ + unsigned int opts; + + FD_ZERO(&fdset); /* clear the fd_set */ + /* set pipeIn as non-blocking */ + for ( i=0; i max_fd) { /* find max_fd */ + max_fd = fd_pipeIn[i][0]; + } + } + max_fd = max_fd+1; + /* copy file set to tmpset */ + memcpy((void *) &tmpset,(void *) &fdset, sizeof(fd_set)); + + /**** FORKING ****/ + int power = 10; /* 2^power must be larger than nProcesses */ + int pid[power]; + double num = 0; + int batchNum = 0; + /* Fork 2^power times */ + for ( i=0; i= nProcesses + 1) { + exit(0); /* kill */ + } + batchNum = (int) num; + + /**** PLUMBING ****/ + if (batchNum == qargs.updateReader) { + for ( i=0; i 0) { + for ( i=0; i= qargs.t_last_stats+STATS_EVERY_N_SECONDS ) { + STATUS("%i out of %i indexed so far," + " %i out of %i since the last message.\n\n", + qargs.n_indexable, qargs.n_processed, + qargs.n_indexable - qargs.n_indexable_last_stats, + qargs.n_processed - qargs.n_processed_last_stats); + + qargs.n_indexable_last_stats = qargs.n_indexable; + qargs.n_processed_last_stats = qargs.n_processed; + qargs.t_last_stats = tNow; + } + } + /* close my pipes */ + for ( i=0; i 0) { + /* process image */ + pargs.filename = buffR; + pargs.indexable = 0; + process_image(&qargs, &pargs, batchNum); + /* request another image */ + buff_count = sprintf(buffW, "%d\n", pargs.indexable); + if(write (fd_pipeIn[batchNum-1][1], buffW, buff_count)<0) + perror("write P0"); + } else if (buff_count == 0) { + allDone = 1; /* EXIT */ + } + } + /* close my pipes */ + close(fd_pipeIn[batchNum-1][1]); + close(fd_pipeOut[batchNum-1][0]); + } cleanup_indexing(ipriv); @@ -1065,8 +1202,9 @@ int main(int argc, char *argv[]) if ( fh != stdin ) fclose(fh); if ( ofh != stdout ) fclose(ofh); - STATUS("There were %i images, of which %i could be indexed.\n", - n_images, qargs.n_indexable); - + if (batchNum == qargs.updateReader) { + STATUS("There were %i images, of which %i could be indexed.\n", + qargs.n_processed, qargs.n_indexable); + } return 0; } -- cgit v1.2.3 From 3cd69bcd2b4c8b19d70de7a4236a79aa3c4c158b Mon Sep 17 00:00:00 2001 From: Thomas White Date: Tue, 22 May 2012 15:14:48 +0200 Subject: indexamajig: Remove NUMA options (no longer needed) --- src/indexamajig.c | 49 ++----------------------------------------------- 1 file changed, 2 insertions(+), 47 deletions(-) (limited to 'src') diff --git a/src/indexamajig.c b/src/indexamajig.c index 9372c447..cf049aba 100644 --- a/src/indexamajig.c +++ b/src/indexamajig.c @@ -242,13 +242,6 @@ static void show_help(const char *s) " least 10%% of the located peaks.\n" " --no-bg-sub Don't subtract local background estimates from\n" " integrated intensities.\n" -"\n" -"\nYou can tune the CPU affinities for enhanced performance on NUMA machines:\n" -"\n" -" --cpus= Specify number of CPUs. This is NOT the same as\n" -" giving the number of analyses to run in parallel.\n" -" --cpugroup= Batch threads in groups of this size.\n" -" --cpuoffset= Start using CPUs at this group number.\n" ); } @@ -558,9 +551,6 @@ int main(int argc, char *argv[]) char *element = NULL; double nominal_photon_energy; int stream_flags = STREAM_INTEGRATED; - int cpu_num = 0; - int cpu_groupsize = 1; - int cpu_offset = 0; char *endptr; char *hdf5_peak_path = NULL; struct copy_hdf5_field *copyme; @@ -690,39 +680,10 @@ int main(int argc, char *argv[]) break; case 6 : - cpu_num = strtol(optarg, &endptr, 10); - if ( !( (optarg[0] != '\0') && (endptr[0] == '\0') ) ) { - ERROR("Invalid number of CPUs ('%s')\n", - optarg); - return 1; - } - break; - case 7 : - cpu_groupsize = strtol(optarg, &endptr, 10); - if ( !( (optarg[0] != '\0') && (endptr[0] == '\0') ) ) { - ERROR("Invalid CPU group size ('%s')\n", - optarg); - return 1; - } - if ( cpu_groupsize < 1 ) { - ERROR("CPU group size cannot be" - " less than 1.\n"); - return 1; - } - break; - case 8 : - cpu_offset = strtol(optarg, &endptr, 10); - if ( !( (optarg[0] != '\0') && (endptr[0] == '\0') ) ) { - ERROR("Invalid CPU offset ('%s')\n", - optarg); - return 1; - } - if ( cpu_offset < 0 ) { - ERROR("CPU offset must be positive.\n"); - return 1; - } + ERROR("The options --cpus, --cpugroup and --cpuoffset" + " are no longer used by indexamajig.\n"); break; case 9 : @@ -759,12 +720,6 @@ int main(int argc, char *argv[]) } - if ( (cpu_num > 0) && (cpu_num % cpu_groupsize != 0) ) { - ERROR("Number of CPUs must be divisible by" - " the CPU group size.\n"); - return 1; - } - if ( filename == NULL ) { filename = strdup("-"); } -- cgit v1.2.3 From 85253a254b404b3d2be8d0503e2c174ab0a27925 Mon Sep 17 00:00:00 2001 From: Thomas White Date: Tue, 22 May 2012 15:14:59 +0200 Subject: Remove trailing whitespace --- src/indexamajig.c | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) (limited to 'src') diff --git a/src/indexamajig.c b/src/indexamajig.c index cf049aba..f4bbb72e 100644 --- a/src/indexamajig.c +++ b/src/indexamajig.c @@ -346,7 +346,7 @@ static void process_image(void *qp, void *pp, int cookie) } // Take snapshot of image after CM subtraction but before - // the aggressive noise filter. + // the aggressive noise filter. data_size = image.width * image.height * sizeof (float); data_for_measurement = malloc(data_size); @@ -1015,7 +1015,7 @@ int main(int argc, char *argv[]) /* copy file set to tmpset */ memcpy((void *) &tmpset,(void *) &fdset, sizeof(fd_set)); - /**** FORKING ****/ + /**** FORKING ****/ int power = 10; /* 2^power must be larger than nProcesses */ int pid[power]; double num = 0; @@ -1074,12 +1074,12 @@ int main(int argc, char *argv[]) if (ready_fd > 0) { for ( i=0; i Date: Tue, 22 May 2012 17:44:52 +0200 Subject: WIP on tidy-up --- src/indexamajig.c | 423 +++++++++++++++++++++++------------------------------- 1 file changed, 181 insertions(+), 242 deletions(-) (limited to 'src') diff --git a/src/indexamajig.c b/src/indexamajig.c index f4bbb72e..ed84c6a8 100644 --- a/src/indexamajig.c +++ b/src/indexamajig.c @@ -81,7 +81,7 @@ enum { /* Information about the indexing process which is common to all patterns */ -struct static_index_args +struct index_args { UnitCell *cell; int config_cmfilter; @@ -110,7 +110,6 @@ struct static_index_args double ir_out; /* Output stream */ - pthread_mutex_t *output_mutex; /* Protects the output stream */ FILE *ofh; const struct copy_hdf5_field *copyme; char *outfile; @@ -118,36 +117,16 @@ struct static_index_args /* Information about the indexing process for one pattern */ -struct index_args +struct pattern_args { /* "Input" */ char *filename; - struct static_index_args static_args; /* "Output" */ int indexable; }; -/* Information needed to choose the next task and dispatch it */ -struct queue_args -{ - FILE *fh; - char *prefix; - int config_basename; - struct static_index_args static_args; - - char *use_this_one_instead; - - int n_indexable; - int n_processed; - int n_indexable_last_stats; - int n_processed_last_stats; - int t_last_stats; - int updateReader; -}; - - static void show_help(const char *s) { printf("Syntax: %s [options]\n\n", s); @@ -246,16 +225,16 @@ static void show_help(const char *s) } -// Get next pattern in .lst -char* get_pattern(FILE *fh) { - char *rval; - char line[LINE_LENGTH]; - rval = fgets(line, LINE_LENGTH - 1, fh); - if (ferror(fh)) { - printf("Read error\n"); - rval = NULL; - } - return rval; +static char *get_pattern(FILE *fh) +{ + char *rval; + char line[LINE_LENGTH]; + rval = fgets(line, LINE_LENGTH - 1, fh); + if ( ferror(fh) ) { + ERROR("Failed to get next filename from list.\n"); + rval = NULL; + } + return rval; } @@ -416,11 +395,11 @@ static void process_image(void *qp, void *pp, int cookie) outfilename = malloc(strlen(outfile) + 1); snprintf(outfilename, LINE_LENGTH - 1, "%s", outfile); if ((fd = open(outfilename, O_WRONLY)) == -1) { - perror("Error on opening\n"); + ERROR("Error on opening\n"); exit(1); } if (fcntl(fd, F_SETLKW, &fl) == -1) { - perror("Error on setting lock wait\n"); + ERROR("Error on setting lock wait\n"); exit(1); } @@ -428,7 +407,7 @@ static void process_image(void *qp, void *pp, int cookie) FILE *fh; fh = fopen(outfilename, "a"); if (fh == NULL) { - perror("Error inside lock\n"); + ERROR("Error inside lock\n"); } write_chunk(fh, &image, hdfile, qargs->static_args.stream_flags); fclose(fh); @@ -436,7 +415,7 @@ static void process_image(void *qp, void *pp, int cookie) /* Unlock stream for other processes */ fl.l_type = F_UNLCK; /* set to unlock same region */ if (fcntl(fd, F_SETLK, &fl) == -1) { - perror("fcntl"); + ERROR("fcntl"); exit(1); } close(fd); @@ -456,6 +435,36 @@ static void process_image(void *qp, void *pp, int cookie) } +static void run_work(const struct index_args *iargs, + int filename_pipe, int results_pipe) +{ + int allDone = 0; + + while ( !allDone ) { + + /* read from pipe and return number of bytes read */ + if ((buff_count=read(fd_pipeOut[batchNum-1][0],&buffR,BUFFER))<0) { + ERROR("read1"); + } else if (buff_count > 0) { + /* process image */ + pargs.filename = buffR; + pargs.indexable = 0; + process_image(&qargs, &pargs, batchNum); + /* request another image */ + buff_count = sprintf(buffW, "%d\n", pargs.indexable); + if(write (fd_pipeIn[batchNum-1][1], buffW, buff_count)<0) + ERROR("write P0"); + } else if (buff_count == 0) { + allDone = 1; + } + + } + /* close my pipes */ + close(filename_pipe); + close(results_pipe); +} + + #ifdef HAVE_CLOCK_GETTIME static time_t get_monotonic_seconds() @@ -542,7 +551,7 @@ int main(int argc, char *argv[]) float tols[4] = {5.0, 5.0, 5.0, 1.5}; /* a,b,c,angles (%,%,%,deg) */ int cellr; int peaks; - int nProcesses = 1; + int n_proc = 1; char *prepare_line; char prepare_filename[LINE_LENGTH]; struct queue_args qargs; @@ -558,6 +567,12 @@ int main(int argc, char *argv[]) float ir_inn = 4.0; float ir_mid = 5.0; float ir_out = 7.0; + int n_indexable, n_processed, n_indexable_last_stats; + int n_processed_last_stats; + int t_last_stats; + pid_t *pids; + int *filename_pipes; + int *result_pipes; copyme = new_copy_hdf5_field_list(); if ( copyme == NULL ) { @@ -638,7 +653,7 @@ int main(int argc, char *argv[]) break; case 'j' : - nProcesses = atoi(optarg); + n_proc = atoi(optarg); break; case 'g' : @@ -909,7 +924,7 @@ int main(int argc, char *argv[]) prepare_line = tmp; } snprintf(prepare_filename, LINE_LENGTH-1, "%s%s", prefix, prepare_line); - qargs.use_this_one_instead = prepare_line; + rewind(fh); /* Prepare the indexer */ if ( indm != NULL ) { @@ -925,223 +940,147 @@ int main(int argc, char *argv[]) gsl_set_error_handler_off(); - qargs.static_args.cell = cell; - qargs.static_args.config_cmfilter = config_cmfilter; - qargs.static_args.config_noisefilter = config_noisefilter; - qargs.static_args.config_verbose = config_verbose; - qargs.static_args.config_satcorr = config_satcorr; - qargs.static_args.config_closer = config_closer; - qargs.static_args.config_insane = config_insane; - qargs.static_args.config_bgsub = config_bgsub; - qargs.static_args.cellr = cellr; - qargs.static_args.tols[0] = tols[0]; - qargs.static_args.tols[1] = tols[1]; - qargs.static_args.tols[2] = tols[2]; - qargs.static_args.tols[3] = tols[3]; - qargs.static_args.threshold = threshold; - qargs.static_args.min_gradient = min_gradient; - qargs.static_args.min_snr = min_snr; - qargs.static_args.min_int_snr = min_int_snr; - qargs.static_args.det = det; - qargs.static_args.indm = indm; - qargs.static_args.ipriv = ipriv; - qargs.static_args.peaks = peaks; - qargs.static_args.ofh = ofh; - qargs.static_args.beam = beam; - qargs.static_args.element = element; - qargs.static_args.stream_flags = stream_flags; - qargs.static_args.hdf5_peak_path = hdf5_peak_path; - qargs.static_args.copyme = copyme; - qargs.static_args.ir_inn = ir_inn; - qargs.static_args.ir_mid = ir_mid; - qargs.static_args.ir_out = ir_out; - - qargs.fh = fh; - qargs.prefix = prefix; - qargs.config_basename = config_basename; - qargs.n_indexable = 0; - qargs.n_processed = 0; - qargs.n_indexable_last_stats = 0; - qargs.n_processed_last_stats = 0; - qargs.updateReader = 0; /* first process updates */ - qargs.t_last_stats = get_monotonic_seconds(); - - /* Read .lst file */ - register int i; - rewind(fh); /* make sure to read from start */ - - /* Clear output file content */ - char *myOutfilename = NULL; - chomp(prefix); - chomp(outfile); - myOutfilename = malloc(strlen(outfile) + 1); - snprintf(myOutfilename, LINE_LENGTH - 1, "%s", outfile); - FILE *tfh; - tfh = fopen(myOutfilename, "a+"); - if (tfh == NULL) { - ERROR("No output filename\n"); - } - fclose(tfh); - qargs.static_args.outfile = outfile; - int ready_fd; - int buff_count; - fd_set fdset,tmpset; - char buffR[BUFFER], buffW[BUFFER]; - int fd_pipeIn[nProcesses][2]; /* Process0 In */ - int fd_pipeOut[nProcesses][2]; /* Process0 Out */ - unsigned int opts; - - FD_ZERO(&fdset); /* clear the fd_set */ - /* set pipeIn as non-blocking */ - for ( i=0; i max_fd) { /* find max_fd */ - max_fd = fd_pipeIn[i][0]; + p = fork(); + if ( p == -1 ) { + ERROR("fork() failed!\n"); + return 1; } - } - max_fd = max_fd+1; - /* copy file set to tmpset */ - memcpy((void *) &tmpset,(void *) &fdset, sizeof(fd_set)); - - /**** FORKING ****/ - int power = 10; /* 2^power must be larger than nProcesses */ - int pid[power]; - double num = 0; - int batchNum = 0; - /* Fork 2^power times */ - for ( i=0; i= nProcesses + 1) { - exit(0); /* kill */ - } - batchNum = (int) num; - /**** PLUMBING ****/ - if (batchNum == qargs.updateReader) { - for ( i=0; i 0) { - for ( i=0; i 0) { + for ( i=0; i= qargs.t_last_stats+STATS_EVERY_N_SECONDS ) { - STATUS("%i out of %i indexed so far," - " %i out of %i since the last message.\n\n", - qargs.n_indexable, qargs.n_processed, - qargs.n_indexable - qargs.n_indexable_last_stats, - qargs.n_processed - qargs.n_processed_last_stats); - - qargs.n_indexable_last_stats = qargs.n_indexable; - qargs.n_processed_last_stats = qargs.n_processed; - qargs.t_last_stats = tNow; - } } - /* close my pipes */ - for ( i=0; i= qargs.t_last_stats+STATS_EVERY_N_SECONDS ) { + STATUS("%i out of %i indexed so far," + " %i out of %i since the last message.\n\n", + qargs.n_indexable, qargs.n_processed, + qargs.n_indexable - qargs.n_indexable_last_stats, + qargs.n_processed - qargs.n_processed_last_stats); + + qargs.n_indexable_last_stats = qargs.n_indexable; + qargs.n_processed_last_stats = qargs.n_processed; + qargs.t_last_stats = tNow; } - tEnd = get_monotonic_seconds(); - printf("Compute Time: %.2fs\n", tEnd - tStart); - } else { - while(!allDone){ - /* read from pipe and return number of bytes read */ - if ((buff_count=read(fd_pipeOut[batchNum-1][0],&buffR,BUFFER))<0) { - perror("read1"); - } else if (buff_count > 0) { - /* process image */ - pargs.filename = buffR; - pargs.indexable = 0; - process_image(&qargs, &pargs, batchNum); - /* request another image */ - buff_count = sprintf(buffW, "%d\n", pargs.indexable); - if(write (fd_pipeIn[batchNum-1][1], buffW, buff_count)<0) - perror("write P0"); - } else if (buff_count == 0) { - allDone = 1; /* EXIT */ - } - } - /* close my pipes */ - close(fd_pipeIn[batchNum-1][1]); - close(fd_pipeOut[batchNum-1][0]); } + /* close my pipes */ + for ( i=0; i Date: Fri, 25 May 2012 17:25:54 +0200 Subject: WIP on tidy-up --- src/indexamajig.c | 385 +++++++++++++++++++++++++++++++----------------------- 1 file changed, 220 insertions(+), 165 deletions(-) (limited to 'src') diff --git a/src/indexamajig.c b/src/indexamajig.c index ed84c6a8..75fcb1f5 100644 --- a/src/indexamajig.c +++ b/src/indexamajig.c @@ -70,10 +70,6 @@ /* Write statistics at APPROXIMATELY this interval */ #define STATS_EVERY_N_SECONDS (5) -#define LINE_LENGTH 1024 - -#define BUFFER PIPE_BUF - enum { PEAK_ZAEF, PEAK_HDF5, @@ -228,72 +224,73 @@ static void show_help(const char *s) static char *get_pattern(FILE *fh) { char *rval; - char line[LINE_LENGTH]; - rval = fgets(line, LINE_LENGTH - 1, fh); + char *line; + + line = malloc(1024); + if ( line == NULL ) { + ERROR("Couldn't allocate memory for filename\n"); + return NULL; + } + + rval = fgets(line, 1023, fh); if ( ferror(fh) ) { ERROR("Failed to get next filename from list.\n"); rval = NULL; } + return rval; } -static void process_image(void *qp, void *pp, int cookie) +static void process_image(const struct index_args *iargs, + struct pattern_args *pargs, int cookie) { - struct index_args *pargs = pp; - struct queue_args *qargs = qp; float *data_for_measurement; size_t data_size; - UnitCell *cell = qargs->static_args.cell; - int config_cmfilter = qargs->static_args.config_cmfilter; - int config_noisefilter = qargs->static_args.config_noisefilter; - int config_verbose = qargs->static_args.config_verbose; - IndexingMethod *indm = qargs->static_args.indm; - struct beam_params *beam = qargs->static_args.beam; + UnitCell *cell = iargs->cell; + int config_cmfilter = iargs->config_cmfilter; + int config_noisefilter = iargs->config_noisefilter; + int config_verbose = iargs->config_verbose; + IndexingMethod *indm = iargs->indm; + struct beam_params *beam = iargs->beam; int r, check; struct hdfile *hdfile; - char *outfile = qargs->static_args.outfile; - + char *outfile = iargs->outfile; struct image image; + char *outfilename; + int fd; + FILE *fh; + struct flock fl = {F_WRLCK, SEEK_SET, 0, 0, 0}; + image.features = NULL; image.data = NULL; image.flags = NULL; image.indexed_cell = NULL; - image.det = copy_geom(qargs->static_args.det); - image.copyme = qargs->static_args.copyme; + image.det = copy_geom(iargs->det); + image.copyme = iargs->copyme; image.beam = beam; - image.id = cookie; // MUST SET ID FOR MOSFLM TO WORK PROPERLY + image.id = cookie; + image.filename = pargs->filename; - if (beam == NULL) { - ERROR("Warning: no beam parameters file.\n"); - ERROR("I'm going to assume 1 ADU per photon, which is almost"); - ERROR(" certainly wrong. Peak sigmas will be incorrect.\n"); - } + hdfile = hdfile_open(image.filename); + if ( hdfile == NULL ) return; - char *filename = NULL; - char *imagename = pargs->filename; - chomp(imagename); - filename = malloc(strlen(qargs->prefix) + strlen(imagename) + 1); - snprintf(filename, LINE_LENGTH - 1, "%s%s", qargs->prefix, imagename); - image.filename = filename; - hdfile = hdfile_open(filename); - if (hdfile == NULL) return; - - r = hdfile_set_first_image(hdfile, "/"); // Need this to read hdf5 files - if (r) { + r = hdfile_set_first_image(hdfile, "/"); + if ( r ) { ERROR("Couldn't select first path\n"); hdfile_close(hdfile); return; } check = hdf5_read(hdfile, &image, 1); - if (check == 1) { + if ( check ) { hdfile_close(hdfile); return; } - if ((image.width != image.det->max_fs + 1) - || (image.height != image.det->max_ss + 1)) { + if ( (image.width != image.det->max_fs + 1 ) + || (image.height != image.det->max_ss + 1)) + { ERROR("Image size doesn't match geometry size" " - rejecting image.\n"); ERROR("Image size: %i,%i. Geometry size: %i,%i\n", @@ -304,8 +301,8 @@ static void process_image(void *qp, void *pp, int cookie) return; } - if (image.lambda < 0.0) { - if (beam != NULL) { + if ( image.lambda < 0.0 ) { + if ( beam != NULL ) { ERROR("Using nominal photon energy of %.2f eV\n", beam->photon_energy); image.lambda = ph_en_to_lambda( @@ -320,37 +317,40 @@ static void process_image(void *qp, void *pp, int cookie) } fill_in_values(image.det, hdfile); - if (config_cmfilter) { + if ( config_cmfilter ) { filter_cm(&image); } - // Take snapshot of image after CM subtraction but before - // the aggressive noise filter. - data_size = image.width * image.height * sizeof (float); + /* Take snapshot of image after CM subtraction but before + * the aggressive noise filter. */ + data_size = image.width * image.height * sizeof(float); data_for_measurement = malloc(data_size); - if (config_noisefilter) { + if ( config_noisefilter ) { filter_noise(&image, data_for_measurement); } else { memcpy(data_for_measurement, image.data, data_size); } - switch (qargs->static_args.peaks) { + switch ( iargs->peaks ) { + case PEAK_HDF5: // Get peaks from HDF5 - if (get_peaks(&image, hdfile, - qargs->static_args.hdf5_peak_path)) { - ERROR("Failed to get peaks from HDF5 file.\n"); - } - break; + if (get_peaks(&image, hdfile, + iargs->hdf5_peak_path)) { + ERROR("Failed to get peaks from HDF5 file.\n"); + } + break; + case PEAK_ZAEF: - search_peaks(&image, qargs->static_args.threshold, - qargs->static_args.min_gradient, - qargs->static_args.min_snr, - qargs->static_args.ir_inn, - qargs->static_args.ir_mid, - qargs->static_args.ir_out); - break; + search_peaks(&image, iargs->threshold, + iargs->min_gradient, + iargs->min_snr, + iargs->ir_inn, + iargs->ir_mid, + iargs->ir_out); + break; + } /* Get rid of noise-filtered version at this point @@ -364,37 +364,32 @@ static void process_image(void *qp, void *pp, int cookie) image.profile_radius = 0.0001e9; /* RUN INDEXING HERE */ - index_pattern(&image, cell, indm, qargs->static_args.cellr, - config_verbose, qargs->static_args.ipriv, - qargs->static_args.config_insane, qargs->static_args.tols); + index_pattern(&image, cell, indm, iargs->cellr, + config_verbose, iargs->ipriv, + iargs->config_insane, iargs->tols); - if (image.indexed_cell != NULL) { + if ( image.indexed_cell != NULL ) { pargs->indexable = 1; image.reflections = find_intersections(&image, image.indexed_cell); if (image.reflections != NULL) { integrate_reflections(&image, - qargs->static_args.config_closer, - qargs->static_args.config_bgsub, - qargs->static_args.min_int_snr, - qargs->static_args.ir_inn, - qargs->static_args.ir_mid, - qargs->static_args.ir_out); + iargs->config_closer, + iargs->config_bgsub, + iargs->min_int_snr, + iargs->ir_inn, + iargs->ir_mid, + iargs->ir_out); } } else { image.reflections = NULL; } /* Write Lock */ - struct flock fl = {F_WRLCK, SEEK_SET, 0, 0, 0}; - int fd; fl.l_pid = getpid(); - char *outfilename = NULL; - chomp(outfile); - outfilename = malloc(strlen(outfile) + 1); - snprintf(outfilename, LINE_LENGTH - 1, "%s", outfile); - if ((fd = open(outfilename, O_WRONLY)) == -1) { + fd = open(outfile, O_WRONLY); + if ( fd == -1) { ERROR("Error on opening\n"); exit(1); } @@ -404,25 +399,21 @@ static void process_image(void *qp, void *pp, int cookie) } /* LOCKED! Write chunk */ - FILE *fh; fh = fopen(outfilename, "a"); if (fh == NULL) { ERROR("Error inside lock\n"); } - write_chunk(fh, &image, hdfile, qargs->static_args.stream_flags); + write_chunk(fh, &image, hdfile, iargs->stream_flags); fclose(fh); /* Unlock stream for other processes */ fl.l_type = F_UNLCK; /* set to unlock same region */ - if (fcntl(fd, F_SETLK, &fl) == -1) { + if ( fcntl(fd, F_SETLK, &fl) == -1 ) { ERROR("fcntl"); exit(1); } close(fd); - qargs->n_indexable += pargs->indexable; - qargs->n_processed++; - /* Only free cell if found */ cell_free(image.indexed_cell); @@ -436,25 +427,42 @@ static void process_image(void *qp, void *pp, int cookie) static void run_work(const struct index_args *iargs, - int filename_pipe, int results_pipe) + int filename_pipe, int results_pipe, int cookie) { int allDone = 0; while ( !allDone ) { - /* read from pipe and return number of bytes read */ - if ((buff_count=read(fd_pipeOut[batchNum-1][0],&buffR,BUFFER))<0) { - ERROR("read1"); - } else if (buff_count > 0) { - /* process image */ - pargs.filename = buffR; + struct pattern_args pargs; + int r, w; + char buf[1024]; + + r = read(filename_pipe, buf, 1024); + + if ( r < 0 ) { + + ERROR("read() failed!\n"); + + } else if ( r > 0 ) { + + int c; + + /* Process image */ + pargs.filename = buf; pargs.indexable = 0; - process_image(&qargs, &pargs, batchNum); - /* request another image */ - buff_count = sprintf(buffW, "%d\n", pargs.indexable); - if(write (fd_pipeIn[batchNum-1][1], buffW, buff_count)<0) + + STATUS("Got filename: '%s'\n", buf); + + process_image(iargs, &pargs, cookie); + + /* Request another image */ + c = sprintf(buf, "%i\n", pargs.indexable); + w = write(results_pipe, buf, c); + if ( w < 0 ) { ERROR("write P0"); - } else if (buff_count == 0) { + } + + } else { allDone = 1; } @@ -553,14 +561,12 @@ int main(int argc, char *argv[]) int peaks; int n_proc = 1; char *prepare_line; - char prepare_filename[LINE_LENGTH]; - struct queue_args qargs; - struct index_args pargs; + char prepare_filename[1024]; + struct index_args iargs; struct beam_params *beam = NULL; char *element = NULL; double nominal_photon_energy; int stream_flags = STREAM_INTEGRATED; - char *endptr; char *hdf5_peak_path = NULL; struct copy_hdf5_field *copyme; char *intrad = NULL; @@ -573,6 +579,9 @@ int main(int argc, char *argv[]) pid_t *pids; int *filename_pipes; int *result_pipes; + fd_set fds; + int i; + int allDone, nFinished; copyme = new_copy_hdf5_field_list(); if ( copyme == NULL ) { @@ -793,7 +802,7 @@ int main(int argc, char *argv[]) } } - if ( nProcesses == 0 ) { + if ( n_proc == 0 ) { ERROR("Invalid number of processes.\n"); return 1; } @@ -900,18 +909,15 @@ int main(int argc, char *argv[]) } else { STATUS("No beam parameters file was given, so I'm taking the" " nominal photon energy to be 2 keV.\n"); + ERROR("I'm also going to assume 1 ADU per photon, which is"); + ERROR(" almost certainly wrong. Peak sigmas will be" + " incorrect.\n"); nominal_photon_energy = 2000.0; } - if ( beam == NULL ) { - ERROR("Warning: no beam parameters file.\n"); - ERROR("I'm going to assume 1 ADU per photon, which is almost"); - ERROR(" certainly wrong. Peak sigmas will be incorrect.\n"); - } - /* Get first filename and use it to set up the indexing */ - prepare_line = malloc(LINE_LENGTH*sizeof(char)); - rval = fgets(prepare_line, LINE_LENGTH-1, fh); + prepare_line = malloc(1024); + rval = fgets(prepare_line, 1023, fh); if ( rval == NULL ) { ERROR("Failed to get filename to prepare indexing.\n"); return 1; @@ -923,7 +929,7 @@ int main(int argc, char *argv[]) free(prepare_line); prepare_line = tmp; } - snprintf(prepare_filename, LINE_LENGTH-1, "%s%s", prefix, prepare_line); + snprintf(prepare_filename, 1023, "%s%s", prefix, prepare_line); rewind(fh); /* Prepare the indexer */ @@ -979,6 +985,20 @@ int main(int argc, char *argv[]) n_processed_last_stats = 0; t_last_stats = get_monotonic_seconds(); + FD_ZERO(&fds); + filename_pipes = calloc(n_proc, sizeof(int)); + result_pipes = calloc(n_proc, sizeof(int)); + if ( (filename_pipes == NULL) || (result_pipes == NULL) ) { + ERROR("Couldn't allocate memory for pipes.\n"); + return 1; + } + + pids = calloc(n_proc, sizeof(pid_t)); + if ( pids == NULL ) { + ERROR("Couldn't allocate memory for PIDs.\n"); + return 1; + } + /* Fork the right number of times */ for ( i=0; i 0) { - for ( i=0; i= qargs.t_last_stats+STATS_EVERY_N_SECONDS ) { - STATUS("%i out of %i indexed so far," - " %i out of %i since the last message.\n\n", - qargs.n_indexable, qargs.n_processed, - qargs.n_indexable - qargs.n_indexable_last_stats, - qargs.n_processed - qargs.n_processed_last_stats); - - qargs.n_indexable_last_stats = qargs.n_indexable; - qargs.n_processed_last_stats = qargs.n_processed; - qargs.t_last_stats = tNow; + + /* Update progress */ + tNow = get_monotonic_seconds(); + if ( tNow >= t_last_stats+STATS_EVERY_N_SECONDS ) { + + STATUS("%i out of %i indexed so far," + " %i out of %i since the last message.\n\n", + n_indexable, n_processed, + n_indexable - n_indexable_last_stats, + n_processed - n_processed_last_stats); + + n_indexable_last_stats = n_indexable; + n_processed_last_stats = n_processed; + t_last_stats = tNow; + } + } - /* close my pipes */ - for ( i=0; i Date: Tue, 29 May 2012 12:03:46 +0200 Subject: indexamajig: Fix includes --- src/indexamajig.c | 22 +++++++++++----------- 1 file changed, 11 insertions(+), 11 deletions(-) (limited to 'src') diff --git a/src/indexamajig.c b/src/indexamajig.c index 75fcb1f5..21a60d7f 100644 --- a/src/indexamajig.c +++ b/src/indexamajig.c @@ -54,17 +54,17 @@ #include #endif -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include +#include "utils.h" +#include "hdf5-file.h" +#include "index.h" +#include "peaks.h" +#include "detector.h" +#include "filters.h" +#include "thread-pool.h" +#include "beam-parameters.h" +#include "geometry.h" +#include "stream.h" +#include "reflist-utils.h" /* Write statistics at APPROXIMATELY this interval */ -- cgit v1.2.3 From dde3608a7d1b74d37f26d4fcf9ab89a092a4dd33 Mon Sep 17 00:00:00 2001 From: Thomas White Date: Tue, 29 May 2012 17:49:54 +0200 Subject: WIP --- src/indexamajig.c | 130 +++++++++++++++++++++++++++++++++++------------------- 1 file changed, 85 insertions(+), 45 deletions(-) (limited to 'src') diff --git a/src/indexamajig.c b/src/indexamajig.c index 21a60d7f..bb5be4b5 100644 --- a/src/indexamajig.c +++ b/src/indexamajig.c @@ -221,24 +221,47 @@ static void show_help(const char *s) } -static char *get_pattern(FILE *fh) +static char *get_pattern(FILE *fh, char **use_this_one_instead, + int config_basename, const char *prefix) { - char *rval; char *line; + char *filename; + + /* Get the next filename */ + if ( *use_this_one_instead != NULL ) { + + line = *use_this_one_instead; + *use_this_one_instead = NULL; + + } else { + + char *rval; + + line = malloc(1024*sizeof(char)); + rval = fgets(line, 1023, fh); + if ( rval == NULL ) { + free(line); + return NULL; + } - line = malloc(1024); - if ( line == NULL ) { - ERROR("Couldn't allocate memory for filename\n"); - return NULL; } - rval = fgets(line, 1023, fh); - if ( ferror(fh) ) { - ERROR("Failed to get next filename from list.\n"); - rval = NULL; + chomp(line); + + if ( config_basename ) { + char *tmp; + tmp = safe_basename(line); + free(line); + line = tmp; } - return rval; + filename = malloc(strlen(prefix)+strlen(line)+1); + + snprintf(filename, 1023, "%s%s", prefix, line); + + free(line); + + return filename; } @@ -257,7 +280,7 @@ static void process_image(const struct index_args *iargs, struct hdfile *hdfile; char *outfile = iargs->outfile; struct image image; - char *outfilename; + char *outfilename = iargs->outfile; int fd; FILE *fh; struct flock fl = {F_WRLCK, SEEK_SET, 0, 0, 0}; @@ -363,7 +386,6 @@ static void process_image(const struct index_args *iargs, image.bw = beam->bandwidth; image.profile_radius = 0.0001e9; - /* RUN INDEXING HERE */ index_pattern(&image, cell, indm, iargs->cellr, config_verbose, iargs->ipriv, iargs->config_insane, iargs->tols); @@ -398,10 +420,9 @@ static void process_image(const struct index_args *iargs, exit(1); } - /* LOCKED! Write chunk */ fh = fopen(outfilename, "a"); - if (fh == NULL) { - ERROR("Error inside lock\n"); + if ( fh == NULL ) { + ERROR("Couldn't open stream '%s'.\n", outfilename); } write_chunk(fh, &image, hdfile, iargs->stream_flags); fclose(fh); @@ -430,44 +451,53 @@ static void run_work(const struct index_args *iargs, int filename_pipe, int results_pipe, int cookie) { int allDone = 0; + FILE *fh; + + fh = fdopen(filename_pipe, "r"); + if ( fh == NULL ) { + ERROR("Failed to fdopen() the filename pipe!\n"); + close(filename_pipe); + close(results_pipe); + return; + } while ( !allDone ) { struct pattern_args pargs; - int r, w; + int w, c; char buf[1024]; + char *line; + char *rval; + + line = malloc(1024*sizeof(char)); + STATUS("Waiting for filename...\n"); + rval = fgets(line, 1023, fh); + if ( rval == NULL ) { + free(line); + ERROR("Read error!\n"); + return; + } - r = read(filename_pipe, buf, 1024); - - if ( r < 0 ) { - - ERROR("read() failed!\n"); - - } else if ( r > 0 ) { - - int c; - - /* Process image */ - pargs.filename = buf; - pargs.indexable = 0; - - STATUS("Got filename: '%s'\n", buf); + chomp(line); + pargs.filename = line; + pargs.indexable = 0; - process_image(iargs, &pargs, cookie); + STATUS("Got filename: '%s'\n", line); - /* Request another image */ - c = sprintf(buf, "%i\n", pargs.indexable); - w = write(results_pipe, buf, c); - if ( w < 0 ) { - ERROR("write P0"); - } + process_image(iargs, &pargs, cookie); - } else { - allDone = 1; + /* Request another image */ + c = sprintf(buf, "%i\n", pargs.indexable); + w = write(results_pipe, buf, c); + if ( w < 0 ) { + ERROR("write P0"); } + free(line); + } /* close my pipes */ + fclose(fh); close(filename_pipe); close(results_pipe); } @@ -562,6 +592,7 @@ int main(int argc, char *argv[]) int n_proc = 1; char *prepare_line; char prepare_filename[1024]; + char *use_this_one_instead; struct index_args iargs; struct beam_params *beam = NULL; char *element = NULL; @@ -922,6 +953,7 @@ int main(int argc, char *argv[]) ERROR("Failed to get filename to prepare indexing.\n"); return 1; } + use_this_one_instead = strdup(prepare_line); chomp(prepare_line); if ( config_basename ) { char *tmp; @@ -930,7 +962,6 @@ int main(int argc, char *argv[]) prepare_line = tmp; } snprintf(prepare_filename, 1023, "%s%s", prefix, prepare_line); - rewind(fh); /* Prepare the indexer */ if ( indm != NULL ) { @@ -1048,11 +1079,14 @@ int main(int argc, char *argv[]) char *nextImage; - nextImage = get_pattern(fh); + nextImage = get_pattern(fh, &use_this_one_instead, + config_basename, prefix); write(filename_pipes[i], nextImage, strlen(nextImage)); write(filename_pipes[i], "\n", 1); + free(nextImage); + } allDone = 0; @@ -1095,7 +1129,11 @@ int main(int argc, char *argv[]) n_processed++; /* Send next filename */ - nextImage = get_pattern(fh); + nextImage = get_pattern(fh, + &use_this_one_instead, + config_basename, + prefix); + if ( nextImage == NULL ) { /* no more images */ nFinished++; @@ -1106,6 +1144,7 @@ int main(int argc, char *argv[]) r = write(filename_pipes[i], nextImage, strlen(nextImage)); + r -= write(filename_pipes[i], "\n", 1); if ( r < 0 ) { ERROR("write pipe"); } @@ -1120,7 +1159,7 @@ int main(int argc, char *argv[]) if ( tNow >= t_last_stats+STATS_EVERY_N_SECONDS ) { STATUS("%i out of %i indexed so far," - " %i out of %i since the last message.\n\n", + " %i out of %i since the last message.\n", n_indexable, n_processed, n_indexable - n_indexable_last_stats, n_processed - n_processed_last_stats); @@ -1149,6 +1188,7 @@ int main(int argc, char *argv[]) free(hdf5_peak_path); free_copy_hdf5_field_list(copyme); cell_free(cell); + free(use_this_one_instead); if ( fh != stdin ) fclose(fh); if ( ofh != stdout ) fclose(ofh); -- cgit v1.2.3 From 3627a2a69d8ca7f803286569d8fe770ab10bde77 Mon Sep 17 00:00:00 2001 From: Thomas White Date: Tue, 29 May 2012 18:01:33 +0200 Subject: Fix select() call --- src/indexamajig.c | 94 +++++++++++++++++++++++++++++-------------------------- 1 file changed, 49 insertions(+), 45 deletions(-) (limited to 'src') diff --git a/src/indexamajig.c b/src/indexamajig.c index bb5be4b5..aab6a0eb 100644 --- a/src/indexamajig.c +++ b/src/indexamajig.c @@ -470,7 +470,6 @@ static void run_work(const struct index_args *iargs, char *rval; line = malloc(1024*sizeof(char)); - STATUS("Waiting for filename...\n"); rval = fgets(line, 1023, fh); if ( rval == NULL ) { free(line); @@ -482,8 +481,6 @@ static void run_work(const struct index_args *iargs, pargs.filename = line; pargs.indexable = 0; - STATUS("Got filename: '%s'\n", line); - process_image(iargs, &pargs, cookie); /* Request another image */ @@ -1070,8 +1067,6 @@ int main(int argc, char *argv[]) filename_pipes[i] = filename_pipe[1]; result_pipes[i] = result_pipe[0]; - FD_SET(result_pipes[i], &fds); - } /* Send first image to all children */ @@ -1093,63 +1088,72 @@ int main(int argc, char *argv[]) nFinished = 0; while ( !allDone ) { - int r; + int r, i; struct timeval tv; - fd_set fds_copy; + fd_set fds; double tNow; + int fdmax; tv.tv_sec = 5; tv.tv_usec = 0; - memcpy(&fds_copy, &fds, sizeof(fd_set)); - r = select(n_proc, &fds, NULL, NULL, &tv); + FD_ZERO(&fds); + fdmax = 0; + for ( i=0; i fdmax ) { + fdmax = result_pipes[i]; + } + } - if ( r < 0 ) { + r = select(fdmax+1, &fds, NULL, NULL, &tv); + if ( r == -1 ) { ERROR("select() failed!\n"); + continue; + } - } else { + if ( r == 0 ) { + STATUS("Timeout\n"); + continue; + } - for ( i=0; i Date: Tue, 29 May 2012 18:23:10 +0200 Subject: Handle the end of the file list cleanly --- src/indexamajig.c | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) (limited to 'src') diff --git a/src/indexamajig.c b/src/indexamajig.c index aab6a0eb..3c7b0cb9 100644 --- a/src/indexamajig.c +++ b/src/indexamajig.c @@ -473,8 +473,13 @@ static void run_work(const struct index_args *iargs, rval = fgets(line, 1023, fh); if ( rval == NULL ) { free(line); - ERROR("Read error!\n"); - return; + if ( feof(fh) ) { + allDone = 1; + continue; + } else { + ERROR("Read error!\n"); + return; + } } chomp(line); -- cgit v1.2.3 From fd647af47d1fe8fddc7e0c72c43bfdd31eaa2129 Mon Sep 17 00:00:00 2001 From: Thomas White Date: Wed, 30 May 2012 11:10:07 +0200 Subject: Tighten up file and error handling --- src/indexamajig.c | 104 +++++++++++++++++++++++++++++++++++++++--------------- 1 file changed, 75 insertions(+), 29 deletions(-) (limited to 'src') diff --git a/src/indexamajig.c b/src/indexamajig.c index 3c7b0cb9..223403ad 100644 --- a/src/indexamajig.c +++ b/src/indexamajig.c @@ -492,7 +492,7 @@ static void run_work(const struct index_args *iargs, c = sprintf(buf, "%i\n", pargs.indexable); w = write(results_pipe, buf, c); if ( w < 0 ) { - ERROR("write P0"); + ERROR("write P0\n"); } free(line); @@ -500,7 +500,6 @@ static void run_work(const struct index_args *iargs, } /* close my pipes */ fclose(fh); - close(filename_pipe); close(results_pipe); } @@ -611,10 +610,11 @@ int main(int argc, char *argv[]) int t_last_stats; pid_t *pids; int *filename_pipes; - int *result_pipes; + FILE **result_fhs; fd_set fds; int i; - int allDone, nFinished; + int allDone; + int *finished; copyme = new_copy_hdf5_field_list(); if ( copyme == NULL ) { @@ -1020,11 +1020,15 @@ int main(int argc, char *argv[]) FD_ZERO(&fds); filename_pipes = calloc(n_proc, sizeof(int)); - result_pipes = calloc(n_proc, sizeof(int)); - if ( (filename_pipes == NULL) || (result_pipes == NULL) ) { + result_fhs = calloc(n_proc, sizeof(FILE *)); + if ( filename_pipes == NULL ) { ERROR("Couldn't allocate memory for pipes.\n"); return 1; } + if ( result_fhs == NULL ) { + ERROR("Couldn't allocate memory for pipe file handles.\n"); + return 1; + } pids = calloc(n_proc, sizeof(pid_t)); if ( pids == NULL ) { @@ -1032,6 +1036,12 @@ int main(int argc, char *argv[]) return 1; } + finished = calloc(n_proc, sizeof(int)); + if ( finished == NULL ) { + ERROR("Couldn't allocate memory for process flags.\n"); + return 1; + } + /* Fork the right number of times */ for ( i=0; i fdmax ) { - fdmax = result_pipes[i]; - } + + int fd; + + if ( finished[i] ) continue; + + fd = fileno(result_fhs[i]); + FD_SET(fd, &fds); + if ( fd > fdmax ) fdmax = fd; + } r = select(fdmax+1, &fds, NULL, NULL, &tv); if ( r == -1 ) { - ERROR("select() failed!\n"); + ERROR("select() failed: %s\n", strerror(errno)); continue; } @@ -1127,15 +1155,27 @@ int main(int argc, char *argv[]) char *nextImage; char results[1024]; - - if ( !FD_ISSET(result_pipes[i], &fds) ) continue; - - r = read(result_pipes[i], results, 1024); - if ( r < 0 ) { - ERROR("read() failed!"); + char *rval; + int fd; + + if ( finished[i] ) continue; + + fd = fileno(result_fhs[i]); + if ( !FD_ISSET(fd, &fds) ) continue; + + rval = fgets(results, 1024, result_fhs[i]); + if ( rval == NULL ) { + if ( feof(result_fhs[i]) ) { + /* Process died */ + finished[i] = 1; + } else { + ERROR("fgets() failed: %s\n", + strerror(errno)); + } continue; } + chomp(results); n_indexable += atoi(results); n_processed++; @@ -1146,18 +1186,15 @@ int main(int argc, char *argv[]) prefix); if ( nextImage == NULL ) { - /* no more images */ - nFinished++; - if ( nFinished == n_proc ) { - allDone = 1; - } + /* No more images */ + finished[i] = 1; } else { r = write(filename_pipes[i], nextImage, strlen(nextImage)); r -= write(filename_pipes[i], "\n", 1); if ( r < 0 ) { - ERROR("write pipe"); + ERROR("write pipe\n"); } } @@ -1179,12 +1216,21 @@ int main(int argc, char *argv[]) } + allDone = 1; + for ( i=0; i Date: Wed, 6 Jun 2012 16:47:54 +0200 Subject: Formatting --- src/indexamajig.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) (limited to 'src') diff --git a/src/indexamajig.c b/src/indexamajig.c index 223403ad..8061b1ae 100644 --- a/src/indexamajig.c +++ b/src/indexamajig.c @@ -411,11 +411,11 @@ static void process_image(const struct index_args *iargs, fl.l_pid = getpid(); fd = open(outfile, O_WRONLY); - if ( fd == -1) { + if ( fd == -1 ) { ERROR("Error on opening\n"); exit(1); } - if (fcntl(fd, F_SETLKW, &fl) == -1) { + if ( fcntl(fd, F_SETLKW, &fl) == -1 ) { ERROR("Error on setting lock wait\n"); exit(1); } -- cgit v1.2.3 From f36ea6e42ac2909db5beaa30104bb09844fd225d Mon Sep 17 00:00:00 2001 From: Thomas White Date: Tue, 3 Jul 2012 11:37:34 +0200 Subject: indexamajig: Remove benign "timeout" message --- src/indexamajig.c | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) (limited to 'src') diff --git a/src/indexamajig.c b/src/indexamajig.c index 2c1af284..fe4960fc 100644 --- a/src/indexamajig.c +++ b/src/indexamajig.c @@ -1147,10 +1147,7 @@ int main(int argc, char *argv[]) continue; } - if ( r == 0 ) { - STATUS("Timeout\n"); - continue; - } + if ( r == 0 ) continue; /* No progress this time. Try again */ for ( i=0; i Date: Tue, 3 Jul 2012 11:40:06 +0200 Subject: Locking fussiness --- src/indexamajig.c | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) (limited to 'src') diff --git a/src/indexamajig.c b/src/indexamajig.c index fe4960fc..1981908e 100644 --- a/src/indexamajig.c +++ b/src/indexamajig.c @@ -283,7 +283,7 @@ static void process_image(const struct index_args *iargs, char *outfilename = iargs->outfile; int fd; FILE *fh; - struct flock fl = {F_WRLCK, SEEK_SET, 0, 0, 0}; + struct flock fl; image.features = NULL; image.data = NULL; @@ -409,7 +409,10 @@ static void process_image(const struct index_args *iargs, } /* Write Lock */ - fl.l_pid = getpid(); + fl.l_type = F_WRLCK; + fl.l_whence = SEEK_SET; + fl.l_start = 0; + fl.l_len = 0; /* Means "lock the whole file" */ fd = open(outfile, O_WRONLY); if ( fd == -1 ) { -- cgit v1.2.3 From ac964c54628af8534b811466e68583ef6e685bd2 Mon Sep 17 00:00:00 2001 From: Thomas White Date: Tue, 3 Jul 2012 11:40:16 +0200 Subject: Use fdopen() instead of fopen() --- src/indexamajig.c | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) (limited to 'src') diff --git a/src/indexamajig.c b/src/indexamajig.c index 1981908e..e9ee0f7a 100644 --- a/src/indexamajig.c +++ b/src/indexamajig.c @@ -424,12 +424,12 @@ static void process_image(const struct index_args *iargs, exit(1); } - fh = fopen(outfilename, "a"); + fh = fdopen(fd, "a"); if ( fh == NULL ) { ERROR("Couldn't open stream '%s'.\n", outfilename); } write_chunk(fh, &image, hdfile, iargs->stream_flags); - fclose(fh); + fflush(fh); /* Unlock stream for other processes */ fl.l_type = F_UNLCK; /* set to unlock same region */ @@ -437,7 +437,8 @@ static void process_image(const struct index_args *iargs, ERROR("fcntl"); exit(1); } - close(fd); + + fclose(fh); /* close(fd) happens as well because fd was not dup'd */ /* Only free cell if found */ cell_free(image.indexed_cell); -- cgit v1.2.3 From fa3a72ba31301b625eef3fc01c5a9d3fbf7ef296 Mon Sep 17 00:00:00 2001 From: Thomas White Date: Tue, 3 Jul 2012 12:02:16 +0200 Subject: Tweak error messages --- src/indexamajig.c | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) (limited to 'src') diff --git a/src/indexamajig.c b/src/indexamajig.c index e9ee0f7a..b09a00aa 100644 --- a/src/indexamajig.c +++ b/src/indexamajig.c @@ -416,17 +416,18 @@ static void process_image(const struct index_args *iargs, fd = open(outfile, O_WRONLY); if ( fd == -1 ) { - ERROR("Error on opening\n"); + ERROR("Couldn't open output stream ('%s').\n", outfile); exit(1); } if ( fcntl(fd, F_SETLKW, &fl) == -1 ) { - ERROR("Error on setting lock wait\n"); + ERROR("Couldn't get lock on output stream.\n"); exit(1); } fh = fdopen(fd, "a"); if ( fh == NULL ) { - ERROR("Couldn't open stream '%s'.\n", outfilename); + ERROR("Couldn't fdopen() the output stream.\n"); + exit(1); } write_chunk(fh, &image, hdfile, iargs->stream_flags); fflush(fh); -- cgit v1.2.3 From 61b69a8c846cda4506db71b285dafbef79f41bac Mon Sep 17 00:00:00 2001 From: Thomas White Date: Sun, 8 Jul 2012 22:09:50 +0200 Subject: Write stream without using a lock at all --- src/indexamajig.c | 275 +++++++++++++++++++++++++++++++++++++++++------------- 1 file changed, 209 insertions(+), 66 deletions(-) (limited to 'src') diff --git a/src/indexamajig.c b/src/indexamajig.c index b09a00aa..3e8ab4de 100644 --- a/src/indexamajig.c +++ b/src/indexamajig.c @@ -106,9 +106,7 @@ struct index_args double ir_out; /* Output stream */ - FILE *ofh; const struct copy_hdf5_field *copyme; - char *outfile; }; @@ -227,26 +225,30 @@ static char *get_pattern(FILE *fh, char **use_this_one_instead, char *line; char *filename; - /* Get the next filename */ - if ( *use_this_one_instead != NULL ) { + do { - line = *use_this_one_instead; - *use_this_one_instead = NULL; + /* Get the next filename */ + if ( *use_this_one_instead != NULL ) { - } else { + line = *use_this_one_instead; + *use_this_one_instead = NULL; - char *rval; + } else { + + char *rval; + + line = malloc(1024*sizeof(char)); + rval = fgets(line, 1023, fh); + if ( rval == NULL ) { + free(line); + return NULL; + } - line = malloc(1024*sizeof(char)); - rval = fgets(line, 1023, fh); - if ( rval == NULL ) { - free(line); - return NULL; } - } + chomp(line); - chomp(line); + } while ( strlen(line) == 0 ); if ( config_basename ) { char *tmp; @@ -266,7 +268,8 @@ static char *get_pattern(FILE *fh, char **use_this_one_instead, static void process_image(const struct index_args *iargs, - struct pattern_args *pargs, int cookie) + struct pattern_args *pargs, FILE *ofh, + int cookie) { float *data_for_measurement; size_t data_size; @@ -278,12 +281,7 @@ static void process_image(const struct index_args *iargs, struct beam_params *beam = iargs->beam; int r, check; struct hdfile *hdfile; - char *outfile = iargs->outfile; struct image image; - char *outfilename = iargs->outfile; - int fd; - FILE *fh; - struct flock fl; image.features = NULL; image.data = NULL; @@ -408,38 +406,9 @@ static void process_image(const struct index_args *iargs, image.reflections = NULL; } - /* Write Lock */ - fl.l_type = F_WRLCK; - fl.l_whence = SEEK_SET; - fl.l_start = 0; - fl.l_len = 0; /* Means "lock the whole file" */ - - fd = open(outfile, O_WRONLY); - if ( fd == -1 ) { - ERROR("Couldn't open output stream ('%s').\n", outfile); - exit(1); - } - if ( fcntl(fd, F_SETLKW, &fl) == -1 ) { - ERROR("Couldn't get lock on output stream.\n"); - exit(1); - } - - fh = fdopen(fd, "a"); - if ( fh == NULL ) { - ERROR("Couldn't fdopen() the output stream.\n"); - exit(1); - } - write_chunk(fh, &image, hdfile, iargs->stream_flags); - fflush(fh); - - /* Unlock stream for other processes */ - fl.l_type = F_UNLCK; /* set to unlock same region */ - if ( fcntl(fd, F_SETLK, &fl) == -1 ) { - ERROR("fcntl"); - exit(1); - } - - fclose(fh); /* close(fd) happens as well because fd was not dup'd */ + write_chunk(ofh, &image, hdfile, iargs->stream_flags); + fprintf(ofh, "END\n"); + fflush(ofh); /* Only free cell if found */ cell_free(image.indexed_cell); @@ -454,7 +423,8 @@ static void process_image(const struct index_args *iargs, static void run_work(const struct index_args *iargs, - int filename_pipe, int results_pipe, int cookie) + int filename_pipe, int results_pipe, FILE *ofh, + int cookie) { int allDone = 0; FILE *fh; @@ -489,16 +459,25 @@ static void run_work(const struct index_args *iargs, } chomp(line); - pargs.filename = line; - pargs.indexable = 0; - process_image(iargs, &pargs, cookie); + if ( strlen(line) == 0 ) { + + allDone = 1; + + } else { + + pargs.filename = line; + pargs.indexable = 0; + + process_image(iargs, &pargs, ofh, cookie); + + /* Request another image */ + c = sprintf(buf, "%i\n", pargs.indexable); + w = write(results_pipe, buf, c); + if ( w < 0 ) { + ERROR("write P0\n"); + } - /* Request another image */ - c = sprintf(buf, "%i\n", pargs.indexable); - w = write(results_pipe, buf, c); - if ( w < 0 ) { - ERROR("write P0\n"); } free(line); @@ -558,6 +537,118 @@ static int parse_cell_reduction(const char *scellr, int *err, } +static void run_reader(int *stream_pipe_read, int n_proc, FILE *ofh) +{ + int done = 0; + int *finished; + FILE **fhs; + int i; + int chunk_finished; + + finished = calloc(n_proc, sizeof(int)); + if ( finished == NULL ) { + ERROR("Couldn't allocate memory for flags!\n"); + exit(1); + } + + fhs = calloc(n_proc, sizeof(FILE *)); + if ( fhs == NULL ) { + ERROR("Couldn't allocate memory for file handles!\n"); + exit(1); + } + + for ( i=0; i fdmax ) fdmax = fd; + + } + + r = select(fdmax+1, &fds, NULL, NULL, &tv); + + if ( r == -1 ) { + ERROR("select() failed: %s\n", strerror(errno)); + continue; + } + + if ( r == 0 ) continue; /* Nothing this time. Try again */ + + for ( i=0; i Date: Sun, 8 Jul 2012 22:10:08 +0200 Subject: Formatting --- src/indexamajig.c | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) (limited to 'src') diff --git a/src/indexamajig.c b/src/indexamajig.c index 3e8ab4de..1db8b353 100644 --- a/src/indexamajig.c +++ b/src/indexamajig.c @@ -395,12 +395,12 @@ static void process_image(const struct index_args *iargs, image.indexed_cell); if (image.reflections != NULL) { integrate_reflections(&image, - iargs->config_closer, - iargs->config_bgsub, - iargs->min_int_snr, - iargs->ir_inn, - iargs->ir_mid, - iargs->ir_out); + iargs->config_closer, + iargs->config_bgsub, + iargs->min_int_snr, + iargs->ir_inn, + iargs->ir_mid, + iargs->ir_out); } } else { image.reflections = NULL; -- cgit v1.2.3 From 849df8b6dd7de1d45eaf8af1f4f2b1c1d72e5c9c Mon Sep 17 00:00:00 2001 From: Thomas White Date: Wed, 11 Jul 2012 22:54:53 +0200 Subject: Fix a load of memory leaks --- src/indexamajig.c | 154 ++++++++++++++++++++++++++++++++++++++---------------- 1 file changed, 109 insertions(+), 45 deletions(-) (limited to 'src') diff --git a/src/indexamajig.c b/src/indexamajig.c index 1db8b353..33caddfa 100644 --- a/src/indexamajig.c +++ b/src/indexamajig.c @@ -99,14 +99,12 @@ struct index_args int cellr; float tols[4]; struct beam_params *beam; - const char *element; - const char *hdf5_peak_path; + char *element; + char *hdf5_peak_path; double ir_inn; double ir_mid; double ir_out; - - /* Output stream */ - const struct copy_hdf5_field *copyme; + struct copy_hdf5_field *copyme; }; @@ -366,11 +364,8 @@ static void process_image(const struct index_args *iargs, case PEAK_ZAEF: search_peaks(&image, iargs->threshold, - iargs->min_gradient, - iargs->min_snr, - iargs->ir_inn, - iargs->ir_mid, - iargs->ir_out); + iargs->min_gradient, iargs->min_snr, + iargs->ir_inn, iargs->ir_mid, iargs->ir_out); break; } @@ -483,9 +478,20 @@ static void run_work(const struct index_args *iargs, free(line); } + /* close my pipes */ fclose(fh); close(results_pipe); + + cleanup_indexing(iargs->ipriv); + free(iargs->indm); + free(iargs->ipriv); + free_detector_geometry(iargs->det); + free(iargs->beam); + free(iargs->element); + free(iargs->hdf5_peak_path); + free_copy_hdf5_field_list(iargs->copyme); + cell_free(iargs->cell); } @@ -616,10 +622,13 @@ static void run_reader(int *stream_pipe_read, int n_proc, FILE *ofh) if ( feof(fhs[i]) ) { /* Process died */ finished[i] = 1; + ERROR("EOF during chunk\n"); } else { ERROR("fgets() failed: %s\n", strerror(errno)); } + fprintf(ofh, "Chunk is unfinished!\n"); + chunk_finished = 1; continue; } @@ -646,6 +655,9 @@ static void run_reader(int *stream_pipe_read, int n_proc, FILE *ofh) for ( i=0; i Date: Wed, 11 Jul 2012 23:42:31 +0200 Subject: More memory leaks --- src/indexamajig.c | 117 ++++++++++++++++++++++++++++++------------------------ 1 file changed, 65 insertions(+), 52 deletions(-) (limited to 'src') diff --git a/src/indexamajig.c b/src/indexamajig.c index 33caddfa..35e92c65 100644 --- a/src/indexamajig.c +++ b/src/indexamajig.c @@ -543,13 +543,51 @@ static int parse_cell_reduction(const char *scellr, int *err, } +static void pump_chunk(FILE *fh, int *finished, FILE *ofh) +{ + int chunk_started = 0; + int chunk_finished = 0; + + do { + + char line[1024]; + char *rval; + + rval = fgets(line, 1024, fh); + if ( rval == NULL ) { + + if ( feof(fh) ) { + /* Process died */ + *finished = 1; + if ( chunk_started ) { + ERROR("EOF during chunk!\n"); + fprintf(ofh, "Chunk is unfinished!\n"); + } + } else { + ERROR("fgets() failed: %s\n", strerror(errno)); + } + chunk_finished = 1; + continue; + + } + + if ( strcmp(line, "END\n") == 0 ) { + chunk_finished = 1; + } else { + chunk_started = 1; + fprintf(ofh, "%s", line); + } + + } while ( !chunk_finished ); +} + + static void run_reader(int *stream_pipe_read, int n_proc, FILE *ofh) { int done = 0; int *finished; FILE **fhs; int i; - int chunk_finished; finished = calloc(n_proc, sizeof(int)); if ( finished == NULL ) { @@ -611,35 +649,7 @@ static void run_reader(int *stream_pipe_read, int n_proc, FILE *ofh) if ( !FD_ISSET(stream_pipe_read[i], &fds) ) continue; - chunk_finished = 0; - do { - - char line[1024]; - char *rval; - - rval = fgets(line, 1024, fhs[i]); - if ( rval == NULL ) { - if ( feof(fhs[i]) ) { - /* Process died */ - finished[i] = 1; - ERROR("EOF during chunk\n"); - } else { - ERROR("fgets() failed: %s\n", - strerror(errno)); - } - fprintf(ofh, "Chunk is unfinished!\n"); - chunk_finished = 1; - continue; - } - - - if ( strcmp(line, "END\n") == 0 ) { - chunk_finished = 1; - } else { - fprintf(ofh, "%s", line); - } - - } while ( !chunk_finished ); + pump_chunk(fhs[i], &finished[i], ofh); } @@ -722,7 +732,6 @@ int main(int argc, char *argv[]) int *stream_pipe_read; int *stream_pipe_write; FILE **result_fhs; - fd_set fds; int i; int allDone; int *finished; @@ -1127,19 +1136,8 @@ int main(int argc, char *argv[]) n_processed_last_stats = 0; t_last_stats = get_monotonic_seconds(); - FD_ZERO(&fds); - filename_pipes = calloc(n_proc, sizeof(int)); - result_fhs = calloc(n_proc, sizeof(FILE *)); stream_pipe_read = calloc(n_proc, sizeof(int)); stream_pipe_write = calloc(n_proc, sizeof(int)); - if ( filename_pipes == NULL ) { - ERROR("Couldn't allocate memory for pipes.\n"); - return 1; - } - if ( result_fhs == NULL ) { - ERROR("Couldn't allocate memory for pipe file handles.\n"); - return 1; - } if ( stream_pipe_read == NULL ) { ERROR("Couldn't allocate memory for pipes.\n"); return 1; @@ -1149,12 +1147,6 @@ int main(int argc, char *argv[]) return 1; } - pids = calloc(n_proc, sizeof(pid_t)); - if ( pids == NULL ) { - ERROR("Couldn't allocate memory for PIDs.\n"); - return 1; - } - for ( i=0; i Date: Thu, 12 Jul 2012 00:02:06 +0200 Subject: Formatting --- src/indexamajig.c | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) (limited to 'src') diff --git a/src/indexamajig.c b/src/indexamajig.c index 35e92c65..7310d391 100644 --- a/src/indexamajig.c +++ b/src/indexamajig.c @@ -1405,10 +1405,8 @@ int main(int argc, char *argv[]) n_processed++; /* Send next filename */ - nextImage = get_pattern(fh, - &use_this_one_instead, - config_basename, - prefix); + nextImage = get_pattern(fh, &use_this_one_instead, + config_basename, prefix); if ( nextImage == NULL ) { /* No more images */ -- cgit v1.2.3 From cb2623e64fed28efdc0d3b3de2338ab57684985b Mon Sep 17 00:00:00 2001 From: Thomas White Date: Thu, 12 Jul 2012 01:00:31 +0200 Subject: Fix leaked fds --- src/indexamajig.c | 4 ++++ 1 file changed, 4 insertions(+) (limited to 'src') diff --git a/src/indexamajig.c b/src/indexamajig.c index 7310d391..5a8bd816 100644 --- a/src/indexamajig.c +++ b/src/indexamajig.c @@ -1257,6 +1257,10 @@ int main(int argc, char *argv[]) free(result_fhs); fclose(fh); free(pids); + for ( j=0; j Date: Fri, 13 Jul 2012 18:52:06 +0200 Subject: Skeleton signal handler --- src/indexamajig.c | 21 +++++++++++++++++++++ 1 file changed, 21 insertions(+) (limited to 'src') diff --git a/src/indexamajig.c b/src/indexamajig.c index 5a8bd816..f560b7b8 100644 --- a/src/indexamajig.c +++ b/src/indexamajig.c @@ -47,6 +47,7 @@ #include #include #include +#include #ifdef HAVE_CLOCK_GETTIME #include @@ -671,6 +672,14 @@ static void run_reader(int *stream_pipe_read, int n_proc, FILE *ofh) } +static void signal_handler(int sig, siginfo_t *si, void *uc_v) +{ + struct ucontext_t *uc = uc_v; + + STATUS("Signal!\n"); +} + + int main(int argc, char *argv[]) { int c; @@ -736,6 +745,8 @@ int main(int argc, char *argv[]) int allDone; int *finished; pid_t pr; + struct sigaction sa; + int r; copyme = new_copy_hdf5_field_list(); if ( copyme == NULL ) { @@ -1196,6 +1207,16 @@ int main(int argc, char *argv[]) } + /* Set up signal handler to take action if any children die */ + sa.sa_flags = SA_SIGINFO | SA_NOCLDSTOP; + sigemptyset(&sa.sa_mask); + sa.sa_sigaction = signal_handler; + r = sigaction(SIGCHLD, &sa, NULL); + if ( r == -1 ) { + ERROR("Failed to set signal handler!\n"); + return 1; + } + /* Free resources needed by reader only */ if ( ofh != stdout ) fclose(ofh); for ( i=0; i Date: Fri, 13 Jul 2012 18:52:35 +0200 Subject: More memory leaks and exit paths --- src/indexamajig.c | 14 ++++++-------- 1 file changed, 6 insertions(+), 8 deletions(-) (limited to 'src') diff --git a/src/indexamajig.c b/src/indexamajig.c index f560b7b8..b8e57048 100644 --- a/src/indexamajig.c +++ b/src/indexamajig.c @@ -428,8 +428,6 @@ static void run_work(const struct index_args *iargs, fh = fdopen(filename_pipe, "r"); if ( fh == NULL ) { ERROR("Failed to fdopen() the filename pipe!\n"); - close(filename_pipe); - close(results_pipe); return; } @@ -450,7 +448,7 @@ static void run_work(const struct index_args *iargs, continue; } else { ERROR("Read error!\n"); - return; + break; } } @@ -482,7 +480,6 @@ static void run_work(const struct index_args *iargs, /* close my pipes */ fclose(fh); - close(results_pipe); cleanup_indexing(iargs->ipriv); free(iargs->indm); @@ -1034,6 +1031,7 @@ int main(int argc, char *argv[]) ERROR("Invalid parameters for '--int-radius'\n"); return 1; } + free(intrad); } else { STATUS("WARNING: You did not specify --int-radius.\n"); STATUS("WARNING: I will use the default values, which are" @@ -1272,16 +1270,16 @@ int main(int argc, char *argv[]) for ( j=0; j Date: Sun, 15 Jul 2012 06:15:59 -0400 Subject: Move indexer sandbox to a new file --- src/im-sandbox.c | 889 ++++++++++++++++++++++++++++++++++++++++++++++++++++++ src/im-sandbox.h | 86 ++++++ src/indexamajig.c | 859 +--------------------------------------------------- 3 files changed, 978 insertions(+), 856 deletions(-) create mode 100644 src/im-sandbox.c create mode 100644 src/im-sandbox.h (limited to 'src') diff --git a/src/im-sandbox.c b/src/im-sandbox.c new file mode 100644 index 00000000..a5806734 --- /dev/null +++ b/src/im-sandbox.c @@ -0,0 +1,889 @@ +/* + * im-sandbox.c + * + * Sandbox for indexing + * + * Copyright © 2012 Deutsches Elektronen-Synchrotron DESY, + * a research centre of the Helmholtz Association. + * Copyright © 2012 Richard Kirian + * Copyright © 2012 Lorenzo Galli + * + * Authors: + * 2010-2012 Thomas White + * 2011 Richard Kirian + * 2012 Lorenzo Galli + * 2012 Chunhong Yoon + * + * This file is part of CrystFEL. + * + * CrystFEL is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * CrystFEL is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with CrystFEL. If not, see . + * + */ + +#ifdef HAVE_CONFIG_H +#include +#endif + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#ifdef HAVE_CLOCK_GETTIME +#include +#else +#include +#endif + +#include "utils.h" +#include "hdf5-file.h" +#include "index.h" +#include "peaks.h" +#include "detector.h" +#include "filters.h" +#include "thread-pool.h" +#include "beam-parameters.h" +#include "geometry.h" +#include "stream.h" +#include "reflist-utils.h" + +#include "im-sandbox.h" + + +/* Write statistics at APPROXIMATELY this interval */ +#define STATS_EVERY_N_SECONDS (5) + + +struct sandbox +{ + +}; + + +static char *get_pattern(FILE *fh, char **use_this_one_instead, + int config_basename, const char *prefix) +{ + char *line; + char *filename; + + do { + + /* Get the next filename */ + if ( *use_this_one_instead != NULL ) { + + line = *use_this_one_instead; + *use_this_one_instead = NULL; + + } else { + + char *rval; + + line = malloc(1024*sizeof(char)); + rval = fgets(line, 1023, fh); + if ( rval == NULL ) { + free(line); + return NULL; + } + + } + + chomp(line); + + } while ( strlen(line) == 0 ); + + if ( config_basename ) { + char *tmp; + tmp = safe_basename(line); + free(line); + line = tmp; + } + + filename = malloc(strlen(prefix)+strlen(line)+1); + + snprintf(filename, 1023, "%s%s", prefix, line); + + free(line); + + return filename; +} + + +static void process_image(const struct index_args *iargs, + struct pattern_args *pargs, FILE *ofh, + int cookie) +{ + float *data_for_measurement; + size_t data_size; + UnitCell *cell = iargs->cell; + int config_cmfilter = iargs->config_cmfilter; + int config_noisefilter = iargs->config_noisefilter; + int config_verbose = iargs->config_verbose; + IndexingMethod *indm = iargs->indm; + struct beam_params *beam = iargs->beam; + int r, check; + struct hdfile *hdfile; + struct image image; + + image.features = NULL; + image.data = NULL; + image.flags = NULL; + image.indexed_cell = NULL; + image.det = copy_geom(iargs->det); + image.copyme = iargs->copyme; + image.reflections = NULL; + image.id = cookie; + image.filename = pargs->filename; + image.beam = beam; + + hdfile = hdfile_open(image.filename); + if ( hdfile == NULL ) return; + + r = hdfile_set_first_image(hdfile, "/"); + if ( r ) { + ERROR("Couldn't select first path\n"); + hdfile_close(hdfile); + return; + } + + check = hdf5_read(hdfile, &image, 1); + if ( check ) { + hdfile_close(hdfile); + return; + } + + if ( (image.width != image.det->max_fs + 1 ) + || (image.height != image.det->max_ss + 1)) + { + ERROR("Image size doesn't match geometry size" + " - rejecting image.\n"); + ERROR("Image size: %i,%i. Geometry size: %i,%i\n", + image.width, image.height, + image.det->max_fs + 1, image.det->max_ss + 1); + hdfile_close(hdfile); + free_detector_geometry(image.det); + return; + } + + if ( image.lambda < 0.0 ) { + if ( beam != NULL ) { + ERROR("Using nominal photon energy of %.2f eV\n", + beam->photon_energy); + image.lambda = ph_en_to_lambda( + eV_to_J(beam->photon_energy)); + } else { + ERROR("No wavelength in file, so you need to give " + "a beam parameters file with -b.\n"); + hdfile_close(hdfile); + free_detector_geometry(image.det); + return; + } + } + fill_in_values(image.det, hdfile); + + if ( config_cmfilter ) { + filter_cm(&image); + } + + /* Take snapshot of image after CM subtraction but before + * the aggressive noise filter. */ + data_size = image.width * image.height * sizeof(float); + data_for_measurement = malloc(data_size); + + if ( config_noisefilter ) { + filter_noise(&image, data_for_measurement); + } else { + memcpy(data_for_measurement, image.data, data_size); + } + + switch ( iargs->peaks ) { + + case PEAK_HDF5: + // Get peaks from HDF5 + if (get_peaks(&image, hdfile, + iargs->hdf5_peak_path)) { + ERROR("Failed to get peaks from HDF5 file.\n"); + } + break; + + case PEAK_ZAEF: + search_peaks(&image, iargs->threshold, + iargs->min_gradient, iargs->min_snr, + iargs->ir_inn, iargs->ir_mid, iargs->ir_out); + break; + + } + + /* Get rid of noise-filtered version at this point + * - it was strictly for the purposes of peak detection. */ + free(image.data); + image.data = data_for_measurement; + + /* Calculate orientation matrix (by magic) */ + image.div = beam->divergence; + image.bw = beam->bandwidth; + image.profile_radius = 0.0001e9; + + index_pattern(&image, cell, indm, iargs->cellr, + config_verbose, iargs->ipriv, + iargs->config_insane, iargs->tols); + + if ( image.indexed_cell != NULL ) { + pargs->indexable = 1; + image.reflections = find_intersections(&image, + image.indexed_cell); + if (image.reflections != NULL) { + integrate_reflections(&image, + iargs->config_closer, + iargs->config_bgsub, + iargs->min_int_snr, + iargs->ir_inn, + iargs->ir_mid, + iargs->ir_out); + } + } else { + image.reflections = NULL; + } + + write_chunk(ofh, &image, hdfile, iargs->stream_flags); + fprintf(ofh, "END\n"); + fflush(ofh); + + /* Only free cell if found */ + cell_free(image.indexed_cell); + + reflist_free(image.reflections); + free(image.data); + if ( image.flags != NULL ) free(image.flags); + image_feature_list_free(image.features); + hdfile_close(hdfile); + free_detector_geometry(image.det); +} + + +static void run_work(const struct index_args *iargs, + int filename_pipe, int results_pipe, FILE *ofh, + int cookie) +{ + int allDone = 0; + FILE *fh; + + fh = fdopen(filename_pipe, "r"); + if ( fh == NULL ) { + ERROR("Failed to fdopen() the filename pipe!\n"); + return; + } + + while ( !allDone ) { + + struct pattern_args pargs; + int w, c; + char buf[1024]; + char *line; + char *rval; + + line = malloc(1024*sizeof(char)); + rval = fgets(line, 1023, fh); + if ( rval == NULL ) { + free(line); + if ( feof(fh) ) { + allDone = 1; + continue; + } else { + ERROR("Read error!\n"); + break; + } + } + + chomp(line); + + if ( strlen(line) == 0 ) { + + allDone = 1; + + } else { + + pargs.filename = line; + pargs.indexable = 0; + + process_image(iargs, &pargs, ofh, cookie); + + /* Request another image */ + c = sprintf(buf, "%i\n", pargs.indexable); + w = write(results_pipe, buf, c); + if ( w < 0 ) { + ERROR("write P0\n"); + } + + } + + free(line); + + } + + /* close my pipes */ + fclose(fh); + + cleanup_indexing(iargs->ipriv); + free(iargs->indm); + free(iargs->ipriv); + free_detector_geometry(iargs->det); + free(iargs->beam); + free(iargs->element); + free(iargs->hdf5_peak_path); + free_copy_hdf5_field_list(iargs->copyme); + cell_free(iargs->cell); +} + + +#ifdef HAVE_CLOCK_GETTIME + +static time_t get_monotonic_seconds() +{ + struct timespec tp; + clock_gettime(CLOCK_MONOTONIC, &tp); + return tp.tv_sec; +} + +#else + +/* Fallback version of the above. The time according to gettimeofday() is not + * monotonic, so measuring intervals based on it will screw up if there's a + * timezone change (e.g. daylight savings) while the program is running. */ +static time_t get_monotonic_seconds() +{ + struct timeval tp; + gettimeofday(&tp, NULL); + return tp.tv_sec; +} + +#endif + +static void pump_chunk(FILE *fh, int *finished, FILE *ofh) +{ + int chunk_started = 0; + int chunk_finished = 0; + + do { + + char line[1024]; + char *rval; + + rval = fgets(line, 1024, fh); + if ( rval == NULL ) { + + if ( feof(fh) ) { + /* Process died */ + *finished = 1; + if ( chunk_started ) { + ERROR("EOF during chunk!\n"); + fprintf(ofh, "Chunk is unfinished!\n"); + } + } else { + ERROR("fgets() failed: %s\n", strerror(errno)); + } + chunk_finished = 1; + continue; + + } + + if ( strcmp(line, "END\n") == 0 ) { + chunk_finished = 1; + } else { + chunk_started = 1; + fprintf(ofh, "%s", line); + } + + } while ( !chunk_finished ); +} + + +static void run_reader(int *stream_pipe_read, int n_proc, FILE *ofh) +{ + int done = 0; + int *finished; + FILE **fhs; + int i; + + finished = calloc(n_proc, sizeof(int)); + if ( finished == NULL ) { + ERROR("Couldn't allocate memory for flags!\n"); + exit(1); + } + + fhs = calloc(n_proc, sizeof(FILE *)); + if ( fhs == NULL ) { + ERROR("Couldn't allocate memory for file handles!\n"); + exit(1); + } + + for ( i=0; i fdmax ) fdmax = fd; + + } + + r = select(fdmax+1, &fds, NULL, NULL, &tv); + + if ( r == -1 ) { + ERROR("select() failed: %s\n", strerror(errno)); + continue; + } + + if ( r == 0 ) continue; /* Nothing this time. Try again */ + + for ( i=0; iipriv); + free(iargs->indm); + free(iargs->ipriv); + free_detector_geometry(iargs->det); + free(iargs->beam); + free(iargs->element); + free(iargs->hdf5_peak_path); + free_copy_hdf5_field_list(iargs->copyme); + cell_free(iargs->cell); + fclose(fh); + + run_reader(stream_pipe_read, n_proc, ofh); + + free(stream_pipe_read); + + exit(0); + + } + + /* Set up signal handler to take action if any children die */ + sa.sa_flags = SA_SIGINFO | SA_NOCLDSTOP; + sigemptyset(&sa.sa_mask); + sa.sa_sigaction = signal_handler; + r = sigaction(SIGCHLD, &sa, NULL); + if ( r == -1 ) { + ERROR("Failed to set signal handler!\n"); + return; + } + + /* Free resources needed by reader only */ + if ( ofh != stdout ) fclose(ofh); + for ( i=0; iipriv); + free(iargs->indm); + free(iargs->ipriv); + free_detector_geometry(iargs->det); + free(iargs->beam); + free(iargs->element); + free(iargs->hdf5_peak_path); + free_copy_hdf5_field_list(iargs->copyme); + cell_free(iargs->cell); + for ( i=0; i fdmax ) fdmax = fd; + + } + + r = select(fdmax+1, &fds, NULL, NULL, &tv); + + if ( r == -1 ) { + ERROR("select() failed: %s\n", strerror(errno)); + continue; + } + + if ( r == 0 ) continue; /* No progress this time. Try again */ + + for ( i=0; i= t_last_stats+STATS_EVERY_N_SECONDS ) { + + STATUS("%i out of %i indexed so far," + " %i out of %i since the last message.\n", + n_indexable, n_processed, + n_indexable - n_indexable_last_stats, + n_processed - n_processed_last_stats); + + n_indexable_last_stats = n_indexable; + n_processed_last_stats = n_processed; + t_last_stats = tNow; + + } + + allDone = 1; + for ( i=0; i + * 2011 Richard Kirian + * 2012 Lorenzo Galli + * 2012 Chunhong Yoon + * + * This file is part of CrystFEL. + * + * CrystFEL is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * CrystFEL is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with CrystFEL. If not, see . + * + */ + +enum { + PEAK_ZAEF, + PEAK_HDF5, +}; + + +/* Information about the indexing process which is common to all patterns */ +struct index_args +{ + UnitCell *cell; + int config_cmfilter; + int config_noisefilter; + int config_verbose; + int stream_flags; /* What goes into the output? */ + int config_satcorr; + int config_closer; + int config_insane; + int config_bgsub; + float threshold; + float min_gradient; + float min_snr; + double min_int_snr; + struct detector *det; + IndexingMethod *indm; + IndexingPrivate **ipriv; + int peaks; /* Peak detection method */ + int cellr; + float tols[4]; + struct beam_params *beam; + char *element; + char *hdf5_peak_path; + double ir_inn; + double ir_mid; + double ir_out; + struct copy_hdf5_field *copyme; +}; + + + + +/* Information about the indexing process for one pattern */ +struct pattern_args +{ + /* "Input" */ + char *filename; + + /* "Output" */ + int indexable; +}; + +extern void create_sandbox(struct index_args *iargs, int n_proc, char *prefix, + int config_basename, FILE *fh, + char *use_this_one_instead, FILE *ofh); diff --git a/src/indexamajig.c b/src/indexamajig.c index b8e57048..c2c9ed2c 100644 --- a/src/indexamajig.c +++ b/src/indexamajig.c @@ -44,10 +44,6 @@ #include #include #include -#include -#include -#include -#include #ifdef HAVE_CLOCK_GETTIME #include @@ -67,57 +63,7 @@ #include "stream.h" #include "reflist-utils.h" - -/* Write statistics at APPROXIMATELY this interval */ -#define STATS_EVERY_N_SECONDS (5) - -enum { - PEAK_ZAEF, - PEAK_HDF5, -}; - - -/* Information about the indexing process which is common to all patterns */ -struct index_args -{ - UnitCell *cell; - int config_cmfilter; - int config_noisefilter; - int config_verbose; - int stream_flags; /* What goes into the output? */ - int config_satcorr; - int config_closer; - int config_insane; - int config_bgsub; - float threshold; - float min_gradient; - float min_snr; - double min_int_snr; - struct detector *det; - IndexingMethod *indm; - IndexingPrivate **ipriv; - int peaks; /* Peak detection method */ - int cellr; - float tols[4]; - struct beam_params *beam; - char *element; - char *hdf5_peak_path; - double ir_inn; - double ir_mid; - double ir_out; - struct copy_hdf5_field *copyme; -}; - - -/* Information about the indexing process for one pattern */ -struct pattern_args -{ - /* "Input" */ - char *filename; - - /* "Output" */ - int indexable; -}; +#include "im-sandbox.h" static void show_help(const char *s) @@ -218,305 +164,6 @@ static void show_help(const char *s) } -static char *get_pattern(FILE *fh, char **use_this_one_instead, - int config_basename, const char *prefix) -{ - char *line; - char *filename; - - do { - - /* Get the next filename */ - if ( *use_this_one_instead != NULL ) { - - line = *use_this_one_instead; - *use_this_one_instead = NULL; - - } else { - - char *rval; - - line = malloc(1024*sizeof(char)); - rval = fgets(line, 1023, fh); - if ( rval == NULL ) { - free(line); - return NULL; - } - - } - - chomp(line); - - } while ( strlen(line) == 0 ); - - if ( config_basename ) { - char *tmp; - tmp = safe_basename(line); - free(line); - line = tmp; - } - - filename = malloc(strlen(prefix)+strlen(line)+1); - - snprintf(filename, 1023, "%s%s", prefix, line); - - free(line); - - return filename; -} - - -static void process_image(const struct index_args *iargs, - struct pattern_args *pargs, FILE *ofh, - int cookie) -{ - float *data_for_measurement; - size_t data_size; - UnitCell *cell = iargs->cell; - int config_cmfilter = iargs->config_cmfilter; - int config_noisefilter = iargs->config_noisefilter; - int config_verbose = iargs->config_verbose; - IndexingMethod *indm = iargs->indm; - struct beam_params *beam = iargs->beam; - int r, check; - struct hdfile *hdfile; - struct image image; - - image.features = NULL; - image.data = NULL; - image.flags = NULL; - image.indexed_cell = NULL; - image.det = copy_geom(iargs->det); - image.copyme = iargs->copyme; - image.reflections = NULL; - image.id = cookie; - image.filename = pargs->filename; - image.beam = beam; - - hdfile = hdfile_open(image.filename); - if ( hdfile == NULL ) return; - - r = hdfile_set_first_image(hdfile, "/"); - if ( r ) { - ERROR("Couldn't select first path\n"); - hdfile_close(hdfile); - return; - } - - check = hdf5_read(hdfile, &image, 1); - if ( check ) { - hdfile_close(hdfile); - return; - } - - if ( (image.width != image.det->max_fs + 1 ) - || (image.height != image.det->max_ss + 1)) - { - ERROR("Image size doesn't match geometry size" - " - rejecting image.\n"); - ERROR("Image size: %i,%i. Geometry size: %i,%i\n", - image.width, image.height, - image.det->max_fs + 1, image.det->max_ss + 1); - hdfile_close(hdfile); - free_detector_geometry(image.det); - return; - } - - if ( image.lambda < 0.0 ) { - if ( beam != NULL ) { - ERROR("Using nominal photon energy of %.2f eV\n", - beam->photon_energy); - image.lambda = ph_en_to_lambda( - eV_to_J(beam->photon_energy)); - } else { - ERROR("No wavelength in file, so you need to give " - "a beam parameters file with -b.\n"); - hdfile_close(hdfile); - free_detector_geometry(image.det); - return; - } - } - fill_in_values(image.det, hdfile); - - if ( config_cmfilter ) { - filter_cm(&image); - } - - /* Take snapshot of image after CM subtraction but before - * the aggressive noise filter. */ - data_size = image.width * image.height * sizeof(float); - data_for_measurement = malloc(data_size); - - if ( config_noisefilter ) { - filter_noise(&image, data_for_measurement); - } else { - memcpy(data_for_measurement, image.data, data_size); - } - - switch ( iargs->peaks ) { - - case PEAK_HDF5: - // Get peaks from HDF5 - if (get_peaks(&image, hdfile, - iargs->hdf5_peak_path)) { - ERROR("Failed to get peaks from HDF5 file.\n"); - } - break; - - case PEAK_ZAEF: - search_peaks(&image, iargs->threshold, - iargs->min_gradient, iargs->min_snr, - iargs->ir_inn, iargs->ir_mid, iargs->ir_out); - break; - - } - - /* Get rid of noise-filtered version at this point - * - it was strictly for the purposes of peak detection. */ - free(image.data); - image.data = data_for_measurement; - - /* Calculate orientation matrix (by magic) */ - image.div = beam->divergence; - image.bw = beam->bandwidth; - image.profile_radius = 0.0001e9; - - index_pattern(&image, cell, indm, iargs->cellr, - config_verbose, iargs->ipriv, - iargs->config_insane, iargs->tols); - - if ( image.indexed_cell != NULL ) { - pargs->indexable = 1; - image.reflections = find_intersections(&image, - image.indexed_cell); - if (image.reflections != NULL) { - integrate_reflections(&image, - iargs->config_closer, - iargs->config_bgsub, - iargs->min_int_snr, - iargs->ir_inn, - iargs->ir_mid, - iargs->ir_out); - } - } else { - image.reflections = NULL; - } - - write_chunk(ofh, &image, hdfile, iargs->stream_flags); - fprintf(ofh, "END\n"); - fflush(ofh); - - /* Only free cell if found */ - cell_free(image.indexed_cell); - - reflist_free(image.reflections); - free(image.data); - if ( image.flags != NULL ) free(image.flags); - image_feature_list_free(image.features); - hdfile_close(hdfile); - free_detector_geometry(image.det); -} - - -static void run_work(const struct index_args *iargs, - int filename_pipe, int results_pipe, FILE *ofh, - int cookie) -{ - int allDone = 0; - FILE *fh; - - fh = fdopen(filename_pipe, "r"); - if ( fh == NULL ) { - ERROR("Failed to fdopen() the filename pipe!\n"); - return; - } - - while ( !allDone ) { - - struct pattern_args pargs; - int w, c; - char buf[1024]; - char *line; - char *rval; - - line = malloc(1024*sizeof(char)); - rval = fgets(line, 1023, fh); - if ( rval == NULL ) { - free(line); - if ( feof(fh) ) { - allDone = 1; - continue; - } else { - ERROR("Read error!\n"); - break; - } - } - - chomp(line); - - if ( strlen(line) == 0 ) { - - allDone = 1; - - } else { - - pargs.filename = line; - pargs.indexable = 0; - - process_image(iargs, &pargs, ofh, cookie); - - /* Request another image */ - c = sprintf(buf, "%i\n", pargs.indexable); - w = write(results_pipe, buf, c); - if ( w < 0 ) { - ERROR("write P0\n"); - } - - } - - free(line); - - } - - /* close my pipes */ - fclose(fh); - - cleanup_indexing(iargs->ipriv); - free(iargs->indm); - free(iargs->ipriv); - free_detector_geometry(iargs->det); - free(iargs->beam); - free(iargs->element); - free(iargs->hdf5_peak_path); - free_copy_hdf5_field_list(iargs->copyme); - cell_free(iargs->cell); -} - - -#ifdef HAVE_CLOCK_GETTIME - -static time_t get_monotonic_seconds() -{ - struct timespec tp; - clock_gettime(CLOCK_MONOTONIC, &tp); - return tp.tv_sec; -} - -#else - -/* Fallback version of the above. The time according to gettimeofday() is not - * monotonic, so measuring intervals based on it will screw up if there's a - * timezone change (e.g. daylight savings) while the program is running. */ -static time_t get_monotonic_seconds() -{ - struct timeval tp; - gettimeofday(&tp, NULL); - return tp.tv_sec; -} - -#endif - - static int parse_cell_reduction(const char *scellr, int *err, int *reduction_needs_cell) { @@ -541,142 +188,6 @@ static int parse_cell_reduction(const char *scellr, int *err, } -static void pump_chunk(FILE *fh, int *finished, FILE *ofh) -{ - int chunk_started = 0; - int chunk_finished = 0; - - do { - - char line[1024]; - char *rval; - - rval = fgets(line, 1024, fh); - if ( rval == NULL ) { - - if ( feof(fh) ) { - /* Process died */ - *finished = 1; - if ( chunk_started ) { - ERROR("EOF during chunk!\n"); - fprintf(ofh, "Chunk is unfinished!\n"); - } - } else { - ERROR("fgets() failed: %s\n", strerror(errno)); - } - chunk_finished = 1; - continue; - - } - - if ( strcmp(line, "END\n") == 0 ) { - chunk_finished = 1; - } else { - chunk_started = 1; - fprintf(ofh, "%s", line); - } - - } while ( !chunk_finished ); -} - - -static void run_reader(int *stream_pipe_read, int n_proc, FILE *ofh) -{ - int done = 0; - int *finished; - FILE **fhs; - int i; - - finished = calloc(n_proc, sizeof(int)); - if ( finished == NULL ) { - ERROR("Couldn't allocate memory for flags!\n"); - exit(1); - } - - fhs = calloc(n_proc, sizeof(FILE *)); - if ( fhs == NULL ) { - ERROR("Couldn't allocate memory for file handles!\n"); - exit(1); - } - - for ( i=0; i fdmax ) fdmax = fd; - - } - - r = select(fdmax+1, &fds, NULL, NULL, &tv); - - if ( r == -1 ) { - ERROR("select() failed: %s\n", strerror(errno)); - continue; - } - - if ( r == 0 ) continue; /* Nothing this time. Try again */ - - for ( i=0; i fdmax ) fdmax = fd; - - } - - r = select(fdmax+1, &fds, NULL, NULL, &tv); - - if ( r == -1 ) { - ERROR("select() failed: %s\n", strerror(errno)); - continue; - } - - if ( r == 0 ) continue; /* No progress this time. Try again */ - - for ( i=0; i= t_last_stats+STATS_EVERY_N_SECONDS ) { - - STATUS("%i out of %i indexed so far," - " %i out of %i since the last message.\n", - n_indexable, n_processed, - n_indexable - n_indexable_last_stats, - n_processed - n_processed_last_stats); - - n_indexable_last_stats = n_indexable; - n_processed_last_stats = n_processed; - t_last_stats = tNow; - - } - - allDone = 1; - for ( i=0; i Date: Sun, 15 Jul 2012 10:21:16 -0400 Subject: Sandboxy stuff --- src/im-sandbox.c | 502 ++++++++++++++++++++++++++++++------------------------ src/indexamajig.c | 2 +- 2 files changed, 279 insertions(+), 225 deletions(-) (limited to 'src') diff --git a/src/im-sandbox.c b/src/im-sandbox.c index a5806734..85576ba1 100644 --- a/src/im-sandbox.c +++ b/src/im-sandbox.c @@ -75,10 +75,31 @@ struct sandbox { + int n_indexable; + int n_processed; + int n_indexable_last_stats; + int n_processed_last_stats; + int t_last_stats; + + struct index_args *iargs; + + int n_proc; + pid_t *pids; + FILE *ofh; + int *running; + FILE **result_fhs; + int *filename_pipes; + int *stream_pipe_read; + int *stream_pipe_write; + char **last_filename; }; +/* Horrible global variable for signal handler */ +struct sandbox *sb; + + static char *get_pattern(FILE *fh, char **use_this_one_instead, int config_basename, const char *prefix) { @@ -285,6 +306,7 @@ static void run_work(const struct index_args *iargs, { int allDone = 0; FILE *fh; + int w; fh = fdopen(filename_pipe, "r"); if ( fh == NULL ) { @@ -292,13 +314,18 @@ static void run_work(const struct index_args *iargs, return; } + w = write(results_pipe, "\n", 1); + if ( w < 0 ) { + ERROR("Failed to send request for first filename.\n"); + } + while ( !allDone ) { struct pattern_args pargs; - int w, c; - char buf[1024]; + int c; char *line; char *rval; + char buf[1024]; line = malloc(1024*sizeof(char)); rval = fgets(line, 1023, fh); @@ -306,6 +333,7 @@ static void run_work(const struct index_args *iargs, free(line); if ( feof(fh) ) { allDone = 1; + STATUS("Exiting!\n"); continue; } else { ERROR("Read error!\n"); @@ -377,7 +405,7 @@ static time_t get_monotonic_seconds() #endif -static void pump_chunk(FILE *fh, int *finished, FILE *ofh) +static int pump_chunk(FILE *fh, FILE *ofh) { int chunk_started = 0; int chunk_finished = 0; @@ -392,11 +420,11 @@ static void pump_chunk(FILE *fh, int *finished, FILE *ofh) if ( feof(fh) ) { /* Process died */ - *finished = 1; if ( chunk_started ) { ERROR("EOF during chunk!\n"); fprintf(ofh, "Chunk is unfinished!\n"); } + return 1; } else { ERROR("fgets() failed: %s\n", strerror(errno)); } @@ -413,33 +441,29 @@ static void pump_chunk(FILE *fh, int *finished, FILE *ofh) } } while ( !chunk_finished ); + + return 0; } -static void run_reader(int *stream_pipe_read, int n_proc, FILE *ofh) +static void *run_reader(void *sbv) { + struct sandbox *sb = sbv; int done = 0; - int *finished; FILE **fhs; int i; - finished = calloc(n_proc, sizeof(int)); - if ( finished == NULL ) { - ERROR("Couldn't allocate memory for flags!\n"); - exit(1); - } - - fhs = calloc(n_proc, sizeof(FILE *)); + fhs = calloc(sb->n_proc, sizeof(FILE *)); if ( fhs == NULL ) { ERROR("Couldn't allocate memory for file handles!\n"); - exit(1); + return NULL; } - for ( i=0; in_proc; i++ ) { + fhs[i] = fdopen(sb->stream_pipe_read[i], "r"); if ( fhs[i] == NULL ) { ERROR("Couldn't fdopen() stream!\n"); - exit(1); + return NULL; } } @@ -455,13 +479,13 @@ static void run_reader(int *stream_pipe_read, int n_proc, FILE *ofh) FD_ZERO(&fds); fdmax = 0; - for ( i=0; in_proc; i++ ) { int fd; - if ( finished[i] ) continue; + if ( !sb->running[i] ) continue; - fd = stream_pipe_read[i]; + fd = sb->stream_pipe_read[i]; FD_SET(fd, &fds); if ( fd > fdmax ) fdmax = fd; @@ -471,45 +495,176 @@ static void run_reader(int *stream_pipe_read, int n_proc, FILE *ofh) r = select(fdmax+1, &fds, NULL, NULL, &tv); if ( r == -1 ) { - ERROR("select() failed: %s\n", strerror(errno)); + if ( errno != EINTR ) { + ERROR("select() failed: %s\n", strerror(errno)); + } /* Otherwise no big deal */ continue; } if ( r == 0 ) continue; /* Nothing this time. Try again */ - for ( i=0; in_proc; i++ ) { - if ( finished[i] ) continue; + if ( !sb->running[i] ) continue; - if ( !FD_ISSET(stream_pipe_read[i], &fds) ) continue; + if ( !FD_ISSET(sb->stream_pipe_read[i], &fds) ) continue; - pump_chunk(fhs[i], &finished[i], ofh); + if ( pump_chunk(fhs[i], sb->ofh) ) { + sb->running[i] = 0; + } } done = 1; - for ( i=0; in_proc; i++ ) { + if ( sb->running[i] ) done = 0; } } - free(finished); - - for ( i=0; in_proc; i++ ) { fclose(fhs[i]); } free(fhs); - if ( ofh != stdout ) fclose(ofh); + return NULL; +} + + +static void start_worker_process(struct sandbox *sb, int slot) +{ + pid_t p; + int filename_pipe[2]; + int result_pipe[2]; + + if ( pipe(filename_pipe) == - 1 ) { + ERROR("pipe() failed!\n"); + return; + } + + if ( pipe(result_pipe) == - 1 ) { + ERROR("pipe() failed!\n"); + return; + } + + p = fork(); + if ( p == -1 ) { + ERROR("fork() failed!\n"); + return; + } + + if ( p == 0 ) { + + FILE *sfh; + int j; + struct sigaction sa; + int r; + + /* First, disconnect the signal handler */ + sa.sa_flags = 0; + sigemptyset(&sa.sa_mask); + sa.sa_handler = SIG_DFL; + r = sigaction(SIGCHLD, &sa, NULL); + if ( r == -1 ) { + ERROR("Failed to set signal handler!\n"); + return; + } + + /* Free resources which will not be needed by worker */ + for ( j=0; jn_proc; j++ ) { + if ( (j != slot) && (sb->running[j]) ) { + close(sb->stream_pipe_write[j]); + } + } + for ( j=0; jn_proc; j++ ) { + if ( (j != slot) && (sb->running[j]) ) { + fclose(sb->result_fhs[j]); + close(sb->filename_pipes[j]); + } + } + free(sb->filename_pipes); + free(sb->result_fhs); + free(sb->pids); + /* Also prefix, use_this_one_instead and fh */ + + /* Child process gets the 'read' end of the filename + * pipe, and the 'write' end of the result pipe. */ + close(filename_pipe[1]); + close(result_pipe[0]); + + sfh = fdopen(sb->stream_pipe_write[slot], "w"); + run_work(sb->iargs, filename_pipe[0], result_pipe[1], + sfh, slot); + fclose(sfh); + + free(sb->stream_pipe_write); + close(filename_pipe[0]); + close(result_pipe[1]); + + exit(0); + + } + + /* Parent process gets the 'write' end of the filename pipe + * and the 'read' end of the result pipe. */ + sb->pids[slot] = p; + sb->running[slot] = 1; + close(filename_pipe[0]); + close(result_pipe[1]); + sb->filename_pipes[slot] = filename_pipe[1]; + + sb->result_fhs[slot] = fdopen(result_pipe[0], "r"); + if ( sb->result_fhs[slot] == NULL ) { + ERROR("fdopen() failed.\n"); + return; + } } static void signal_handler(int sig, siginfo_t *si, void *uc_v) { - struct ucontext_t *uc = uc_v; + int i, found; - STATUS("Signal!\n"); + if ( si->si_signo != SIGCHLD ) { + ERROR("Unhandled signal %i?\n", si->si_signo); + return; + } + + found = 0; + for ( i=0; in_proc; i++ ) { + if ( (sb->running[i]) && (sb->pids[i] == si->si_pid) ) { + found = 1; + break; + } + } + + if ( !found ) { + ERROR("SIGCHLD from unknown child %i?\n", si->si_pid); + return; + } + + if ( (si->si_code == CLD_TRAPPED) || (si->si_code == CLD_STOPPED) + || (si->si_code == CLD_CONTINUED) ) return; + + if ( si->si_code == CLD_EXITED ) + { + sb->running[i] = 0; + STATUS("Worker process %i exited normally.\n", i); + return; + } + + if ( (si->si_code != CLD_DUMPED) && (si->si_code != CLD_KILLED) ) { + ERROR("Unhandled si_code %i (worker process %i).\n", + si->si_code, i); + return; + } + + ERROR("Worker process %i exited abnormally!\n", i); + ERROR(" -> Signal %i, last filename %s.\n", + si->si_signo, sb->last_filename[i]); + + sb->running[i] = 0; + //start_worker_process(sb, i); } @@ -517,34 +672,34 @@ void create_sandbox(struct index_args *iargs, int n_proc, char *prefix, int config_basename, FILE *fh, char *use_this_one_instead, FILE *ofh) { - int n_indexable, n_processed, n_indexable_last_stats; - int n_processed_last_stats; - int t_last_stats; - pid_t *pids; - int *filename_pipes; - int *stream_pipe_read; - int *stream_pipe_write; - FILE **result_fhs; int i; int allDone; - int *finished; - pid_t pr; struct sigaction sa; int r; + pthread_t reader_thread; - n_indexable = 0; - n_processed = 0; - n_indexable_last_stats = 0; - n_processed_last_stats = 0; - t_last_stats = get_monotonic_seconds(); + sb = calloc(1, sizeof(struct sandbox)); + if ( sb == NULL ) { + ERROR("Couldn't allocate memory for sandbox.\n"); + return; + } - stream_pipe_read = calloc(n_proc, sizeof(int)); - stream_pipe_write = calloc(n_proc, sizeof(int)); - if ( stream_pipe_read == NULL ) { + sb->n_indexable = 0; + sb->n_processed = 0; + sb->n_indexable_last_stats = 0; + sb->n_processed_last_stats = 0; + sb->t_last_stats = get_monotonic_seconds(); + sb->n_proc = n_proc; + sb->ofh = ofh; + sb->iargs = iargs; + + sb->stream_pipe_read = calloc(n_proc, sizeof(int)); + sb->stream_pipe_write = calloc(n_proc, sizeof(int)); + if ( sb->stream_pipe_read == NULL ) { ERROR("Couldn't allocate memory for pipes.\n"); return; } - if ( stream_pipe_write == NULL ) { + if ( sb->stream_pipe_write == NULL ) { ERROR("Couldn't allocate memory for pipes.\n"); return; } @@ -558,46 +713,16 @@ void create_sandbox(struct index_args *iargs, int n_proc, char *prefix, return; } - stream_pipe_read[i] = stream_pipe[0]; - stream_pipe_write[i] = stream_pipe[1]; + sb->stream_pipe_read[i] = stream_pipe[0]; + sb->stream_pipe_write[i] = stream_pipe[1]; } - pr = fork(); - if ( pr == - 1 ) { - ERROR("fork() failed (for reader process)\n"); + if ( pthread_create(&reader_thread, NULL, run_reader, (void *)sb) ) { + ERROR("Failed to create reader thread.\n"); return; } - if ( pr == 0 ) { - - /* Free resources not needed by reader - * (but which will be needed by worker or master) */ - for ( i=0; iipriv); - free(iargs->indm); - free(iargs->ipriv); - free_detector_geometry(iargs->det); - free(iargs->beam); - free(iargs->element); - free(iargs->hdf5_peak_path); - free_copy_hdf5_field_list(iargs->copyme); - cell_free(iargs->cell); - fclose(fh); - - run_reader(stream_pipe_read, n_proc, ofh); - - free(stream_pipe_read); - - exit(0); - - } - /* Set up signal handler to take action if any children die */ sa.sa_flags = SA_SIGINFO | SA_NOCLDSTOP; sigemptyset(&sa.sa_mask); @@ -608,119 +733,37 @@ void create_sandbox(struct index_args *iargs, int n_proc, char *prefix, return; } - /* Free resources needed by reader only */ - if ( ofh != stdout ) fclose(ofh); - for ( i=0; ifilename_pipes = calloc(n_proc, sizeof(int)); + sb->result_fhs = calloc(n_proc, sizeof(FILE *)); + sb->pids = calloc(n_proc, sizeof(pid_t)); + sb->running = calloc(n_proc, sizeof(int)); + if ( sb->filename_pipes == NULL ) { ERROR("Couldn't allocate memory for pipes.\n"); return; } - if ( result_fhs == NULL ) { + if ( sb->result_fhs == NULL ) { ERROR("Couldn't allocate memory for pipe file handles.\n"); return; } - if ( pids == NULL ) { + if ( sb->pids == NULL ) { ERROR("Couldn't allocate memory for PIDs.\n"); return; } + if ( sb->running == NULL ) { + ERROR("Couldn't allocate memory for process flags.\n"); + return; + } - /* Fork the right number of times */ - for ( i=0; ilast_filename = calloc(n_proc, sizeof(char *)); + if ( sb->last_filename == NULL ) { + ERROR("Couldn't allocate memory for last filename list.\n"); + return; } - /* Free resources which will not be used by the main thread */ - cleanup_indexing(iargs->ipriv); - free(iargs->indm); - free(iargs->ipriv); - free_detector_geometry(iargs->det); - free(iargs->beam); - free(iargs->element); - free(iargs->hdf5_peak_path); - free_copy_hdf5_field_list(iargs->copyme); - cell_free(iargs->cell); + /* Fork the right number of times */ for ( i=0; ilast_filename[i]); + sb->last_filename[i] = strdup(nextImage); + + write(sb->filename_pipes[i], nextImage, + strlen(nextImage)); + write(sb->filename_pipes[i], "\n", 1); free(nextImage); @@ -742,7 +789,7 @@ void create_sandbox(struct index_args *iargs, int n_proc, char *prefix, int r; /* No more files to process.. already? */ - r = write(filename_pipes[i], "\n", 1); + r = write(sb->filename_pipes[i], "\n", 1); if ( r < 0 ) { ERROR("Write pipe\n"); } @@ -751,12 +798,6 @@ void create_sandbox(struct index_args *iargs, int n_proc, char *prefix, } - finished = calloc(n_proc, sizeof(int)); - if ( finished == NULL ) { - ERROR("Couldn't allocate memory for process flags.\n"); - return; - } - allDone = 0; while ( !allDone ) { @@ -775,9 +816,9 @@ void create_sandbox(struct index_args *iargs, int n_proc, char *prefix, int fd; - if ( finished[i] ) continue; + if ( !sb->running[i] ) continue; - fd = fileno(result_fhs[i]); + fd = fileno(sb->result_fhs[i]); FD_SET(fd, &fds); if ( fd > fdmax ) fdmax = fd; @@ -786,7 +827,9 @@ void create_sandbox(struct index_args *iargs, int n_proc, char *prefix, r = select(fdmax+1, &fds, NULL, NULL, &tv); if ( r == -1 ) { - ERROR("select() failed: %s\n", strerror(errno)); + if ( errno != EINTR ) { + ERROR("select() failed: %s\n", strerror(errno)); + } continue; } @@ -798,17 +841,18 @@ void create_sandbox(struct index_args *iargs, int n_proc, char *prefix, char results[1024]; char *rval; int fd; + int n; + char *eptr; - if ( finished[i] ) continue; + if ( !sb->running[i] ) continue; - fd = fileno(result_fhs[i]); + fd = fileno(sb->result_fhs[i]); if ( !FD_ISSET(fd, &fds) ) continue; - rval = fgets(results, 1024, result_fhs[i]); + rval = fgets(results, 1024, sb->result_fhs[i]); if ( rval == NULL ) { - if ( feof(result_fhs[i]) ) { - /* Process died */ - finished[i] = 1; + if ( feof(sb->result_fhs[i]) ) { + ERROR("EOF from process %i.\n", i); } else { ERROR("fgets() failed: %s\n", strerror(errno)); @@ -817,8 +861,16 @@ void create_sandbox(struct index_args *iargs, int n_proc, char *prefix, } chomp(results); - n_indexable += atoi(results); - n_processed++; + + n = strtol(results, &eptr, 10); + if ( eptr == results ) { + if ( strlen(results) > 0 ) { + ERROR("Invalid result '%s'\n", results); + } + } else { + sb->n_indexable += atoi(results); + sb->n_processed++; + } /* Send next filename */ nextImage = get_pattern(fh, &use_this_one_instead, @@ -826,15 +878,15 @@ void create_sandbox(struct index_args *iargs, int n_proc, char *prefix, if ( nextImage == NULL ) { /* No more images */ - r = write(filename_pipes[i], "\n", 1); + r = write(sb->filename_pipes[i], "\n", 1); if ( r < 0 ) { ERROR("Write pipe\n"); } } else { - r = write(filename_pipes[i], nextImage, + r = write(sb->filename_pipes[i], nextImage, strlen(nextImage)); - r -= write(filename_pipes[i], "\n", 1); + r -= write(sb->filename_pipes[i], "\n", 1); if ( r < 0 ) { ERROR("write pipe\n"); } @@ -845,23 +897,23 @@ void create_sandbox(struct index_args *iargs, int n_proc, char *prefix, /* Update progress */ tNow = get_monotonic_seconds(); - if ( tNow >= t_last_stats+STATS_EVERY_N_SECONDS ) { + if ( tNow >= sb->t_last_stats+STATS_EVERY_N_SECONDS ) { STATUS("%i out of %i indexed so far," " %i out of %i since the last message.\n", - n_indexable, n_processed, - n_indexable - n_indexable_last_stats, - n_processed - n_processed_last_stats); + sb->n_indexable, sb->n_processed, + sb->n_indexable - sb->n_indexable_last_stats, + sb->n_processed - sb->n_processed_last_stats); - n_indexable_last_stats = n_indexable; - n_processed_last_stats = n_processed; - t_last_stats = tNow; + sb->n_indexable_last_stats = sb->n_indexable; + sb->n_processed_last_stats = sb->n_processed; + sb->t_last_stats = tNow; } allDone = 1; for ( i=0; irunning[i] ) allDone = 0; } } @@ -870,20 +922,22 @@ void create_sandbox(struct index_args *iargs, int n_proc, char *prefix, for ( i=0; ipids[i], &status, 0); } for ( i=0; ifilename_pipes[i]); + fclose(sb->result_fhs[i]); } - free(filename_pipes); - free(result_fhs); - free(pids); - free(finished); + free(sb->filename_pipes); + free(sb->result_fhs); + free(sb->pids); + free(sb->running); + + if ( ofh != stdout ) fclose(ofh); STATUS("There were %i images, of which %i could be indexed.\n", - n_processed, n_indexable); + sb->n_processed, sb->n_indexable); } diff --git a/src/indexamajig.c b/src/indexamajig.c index c2c9ed2c..294bce1d 100644 --- a/src/indexamajig.c +++ b/src/indexamajig.c @@ -637,7 +637,7 @@ int main(int argc, char *argv[]) iargs.ir_out = ir_out; create_sandbox(&iargs, n_proc, prefix, config_basename, fh, - use_this_one_instead, ofh); + use_this_one_instead, ofh); free(prefix); -- cgit v1.2.3 From 14100fed56471e4331f83acc46c2fccd67125911 Mon Sep 17 00:00:00 2001 From: Thomas White Date: Sun, 15 Jul 2012 11:46:25 -0400 Subject: Add locking --- src/im-sandbox.c | 174 ++++++++++++++++++++++++++++--------------------------- 1 file changed, 89 insertions(+), 85 deletions(-) (limited to 'src') diff --git a/src/im-sandbox.c b/src/im-sandbox.c index 85576ba1..02c90cdc 100644 --- a/src/im-sandbox.c +++ b/src/im-sandbox.c @@ -75,6 +75,8 @@ struct sandbox { + pthread_mutex_t lock; + int n_indexable; int n_processed; int n_indexable_last_stats; @@ -86,6 +88,7 @@ struct sandbox int n_proc; pid_t *pids; FILE *ofh; + FILE **fhs; int *running; FILE **result_fhs; @@ -367,8 +370,7 @@ static void run_work(const struct index_args *iargs, } - /* close my pipes */ - fclose(fh); + STATUS("Got command to exit. Shutting down!\n"); cleanup_indexing(iargs->ipriv); free(iargs->indm); @@ -379,6 +381,7 @@ static void run_work(const struct index_args *iargs, free(iargs->hdf5_peak_path); free_copy_hdf5_field_list(iargs->copyme); cell_free(iargs->cell); + fclose(fh); } @@ -450,22 +453,6 @@ static void *run_reader(void *sbv) { struct sandbox *sb = sbv; int done = 0; - FILE **fhs; - int i; - - fhs = calloc(sb->n_proc, sizeof(FILE *)); - if ( fhs == NULL ) { - ERROR("Couldn't allocate memory for file handles!\n"); - return NULL; - } - - for ( i=0; in_proc; i++ ) { - fhs[i] = fdopen(sb->stream_pipe_read[i], "r"); - if ( fhs[i] == NULL ) { - ERROR("Couldn't fdopen() stream!\n"); - return NULL; - } - } while ( !done ) { @@ -479,6 +466,7 @@ static void *run_reader(void *sbv) FD_ZERO(&fds); fdmax = 0; + pthread_mutex_lock(&sb->lock); for ( i=0; in_proc; i++ ) { int fd; @@ -492,6 +480,8 @@ static void *run_reader(void *sbv) } + pthread_mutex_unlock(&sb->lock); + r = select(fdmax+1, &fds, NULL, NULL, &tv); if ( r == -1 ) { @@ -503,13 +493,14 @@ static void *run_reader(void *sbv) if ( r == 0 ) continue; /* Nothing this time. Try again */ + pthread_mutex_lock(&sb->lock); for ( i=0; in_proc; i++ ) { if ( !sb->running[i] ) continue; if ( !FD_ISSET(sb->stream_pipe_read[i], &fds) ) continue; - if ( pump_chunk(fhs[i], sb->ofh) ) { + if ( pump_chunk(sb->fhs[i], sb->ofh) ) { sb->running[i] = 0; } @@ -519,14 +510,10 @@ static void *run_reader(void *sbv) for ( i=0; in_proc; i++ ) { if ( sb->running[i] ) done = 0; } + pthread_mutex_unlock(&sb->lock); } - for ( i=0; in_proc; i++ ) { - fclose(fhs[i]); - } - free(fhs); - return NULL; } @@ -547,9 +534,11 @@ static void start_worker_process(struct sandbox *sb, int slot) return; } + pthread_mutex_lock(&sb->lock); p = fork(); if ( p == -1 ) { ERROR("fork() failed!\n"); + pthread_mutex_unlock(&sb->lock); return; } @@ -560,6 +549,9 @@ static void start_worker_process(struct sandbox *sb, int slot) struct sigaction sa; int r; + /* FIXME: Is lock inherited? */ + pthread_mutex_unlock(&sb->lock); + /* First, disconnect the signal handler */ sa.sa_flags = 0; sigemptyset(&sa.sa_mask); @@ -597,8 +589,7 @@ static void start_worker_process(struct sandbox *sb, int slot) sfh, slot); fclose(sfh); - free(sb->stream_pipe_write); - close(filename_pipe[0]); + //close(filename_pipe[0]); close(result_pipe[1]); exit(0); @@ -612,12 +603,21 @@ static void start_worker_process(struct sandbox *sb, int slot) close(filename_pipe[0]); close(result_pipe[1]); sb->filename_pipes[slot] = filename_pipe[1]; + sb->fhs[slot] = fdopen(sb->stream_pipe_read[slot], "r"); + if ( sb->fhs[slot] == NULL ) { + ERROR("Couldn't fdopen() stream!\n"); + pthread_mutex_unlock(&sb->lock); + return; + } sb->result_fhs[slot] = fdopen(result_pipe[0], "r"); if ( sb->result_fhs[slot] == NULL ) { ERROR("fdopen() failed.\n"); + pthread_mutex_unlock(&sb->lock); return; } + + pthread_mutex_unlock(&sb->lock); } @@ -625,12 +625,17 @@ static void signal_handler(int sig, siginfo_t *si, void *uc_v) { int i, found; + STATUS("Signal!\n"); + if ( si->si_signo != SIGCHLD ) { ERROR("Unhandled signal %i?\n", si->si_signo); return; } found = 0; + STATUS("Getting lock...\n"); fflush(stderr); + pthread_mutex_lock(&sb->lock); + STATUS("Got it.\n"); fflush(stderr); for ( i=0; in_proc; i++ ) { if ( (sb->running[i]) && (sb->pids[i] == si->si_pid) ) { found = 1; @@ -640,22 +645,29 @@ static void signal_handler(int sig, siginfo_t *si, void *uc_v) if ( !found ) { ERROR("SIGCHLD from unknown child %i?\n", si->si_pid); + pthread_mutex_unlock(&sb->lock); return; } if ( (si->si_code == CLD_TRAPPED) || (si->si_code == CLD_STOPPED) - || (si->si_code == CLD_CONTINUED) ) return; + || (si->si_code == CLD_CONTINUED) ) + { + pthread_mutex_unlock(&sb->lock); + return; + } if ( si->si_code == CLD_EXITED ) { sb->running[i] = 0; STATUS("Worker process %i exited normally.\n", i); + pthread_mutex_unlock(&sb->lock); return; } if ( (si->si_code != CLD_DUMPED) && (si->si_code != CLD_KILLED) ) { ERROR("Unhandled si_code %i (worker process %i).\n", si->si_code, i); + pthread_mutex_unlock(&sb->lock); return; } @@ -663,8 +675,8 @@ static void signal_handler(int sig, siginfo_t *si, void *uc_v) ERROR(" -> Signal %i, last filename %s.\n", si->si_signo, sb->last_filename[i]); - sb->running[i] = 0; - //start_worker_process(sb, i); + pthread_mutex_unlock(&sb->lock); + start_worker_process(sb, i); } @@ -693,6 +705,8 @@ void create_sandbox(struct index_args *iargs, int n_proc, char *prefix, sb->ofh = ofh; sb->iargs = iargs; + pthread_mutex_init(&sb->lock, NULL); + sb->stream_pipe_read = calloc(n_proc, sizeof(int)); sb->stream_pipe_write = calloc(n_proc, sizeof(int)); if ( sb->stream_pipe_read == NULL ) { @@ -718,25 +732,12 @@ void create_sandbox(struct index_args *iargs, int n_proc, char *prefix, } - if ( pthread_create(&reader_thread, NULL, run_reader, (void *)sb) ) { - ERROR("Failed to create reader thread.\n"); - return; - } - - /* Set up signal handler to take action if any children die */ - sa.sa_flags = SA_SIGINFO | SA_NOCLDSTOP; - sigemptyset(&sa.sa_mask); - sa.sa_sigaction = signal_handler; - r = sigaction(SIGCHLD, &sa, NULL); - if ( r == -1 ) { - ERROR("Failed to set signal handler!\n"); - return; - } - + pthread_mutex_lock(&sb->lock); sb->filename_pipes = calloc(n_proc, sizeof(int)); sb->result_fhs = calloc(n_proc, sizeof(FILE *)); sb->pids = calloc(n_proc, sizeof(pid_t)); sb->running = calloc(n_proc, sizeof(int)); + sb->fhs = calloc(sb->n_proc, sizeof(FILE *)); if ( sb->filename_pipes == NULL ) { ERROR("Couldn't allocate memory for pipes.\n"); return; @@ -759,43 +760,30 @@ void create_sandbox(struct index_args *iargs, int n_proc, char *prefix, ERROR("Couldn't allocate memory for last filename list.\n"); return; } - - /* Fork the right number of times */ - for ( i=0; ifhs == NULL ) { + ERROR("Couldn't allocate memory for file handles!\n"); + return; } + pthread_mutex_unlock(&sb->lock); - /* Send first image to all children */ - for ( i=0; ilast_filename[i]); - sb->last_filename[i] = strdup(nextImage); - - write(sb->filename_pipes[i], nextImage, - strlen(nextImage)); - write(sb->filename_pipes[i], "\n", 1); - - free(nextImage); - - } else { - - int r; - - /* No more files to process.. already? */ - r = write(sb->filename_pipes[i], "\n", 1); - if ( r < 0 ) { - ERROR("Write pipe\n"); - } + if ( pthread_create(&reader_thread, NULL, run_reader, (void *)sb) ) { + ERROR("Failed to create reader thread.\n"); + return; + } - } + /* Set up signal handler to take action if any children die */ + sa.sa_flags = SA_SIGINFO | SA_NOCLDSTOP; + sigemptyset(&sa.sa_mask); + sa.sa_sigaction = signal_handler; + r = sigaction(SIGCHLD, &sa, NULL); + if ( r == -1 ) { + ERROR("Failed to set signal handler!\n"); + return; + } + /* Fork the right number of times */ + for ( i=0; ilock); for ( i=0; irunning[i] ) continue; + if ( !sb->running[i] ) { + pthread_mutex_unlock(&sb->lock); + continue; + } fd = fileno(sb->result_fhs[i]); FD_SET(fd, &fds); @@ -824,8 +816,8 @@ void create_sandbox(struct index_args *iargs, int n_proc, char *prefix, } + pthread_mutex_unlock(&sb->lock); r = select(fdmax+1, &fds, NULL, NULL, &tv); - if ( r == -1 ) { if ( errno != EINTR ) { ERROR("select() failed: %s\n", strerror(errno)); @@ -835,6 +827,7 @@ void create_sandbox(struct index_args *iargs, int n_proc, char *prefix, if ( r == 0 ) continue; /* No progress this time. Try again */ + pthread_mutex_lock(&sb->lock); for ( i=0; irunning[i] ) continue; + if ( !sb->running[i] ) { + continue; + } fd = fileno(sb->result_fhs[i]); - if ( !FD_ISSET(fd, &fds) ) continue; + if ( !FD_ISSET(fd, &fds) ) { + continue; + } rval = fgets(results, 1024, sb->result_fhs[i]); if ( rval == NULL ) { - if ( feof(sb->result_fhs[i]) ) { - ERROR("EOF from process %i.\n", i); - } else { + if ( !feof(sb->result_fhs[i]) ) { ERROR("fgets() failed: %s\n", strerror(errno)); } @@ -883,7 +878,6 @@ void create_sandbox(struct index_args *iargs, int n_proc, char *prefix, ERROR("Write pipe\n"); } } else { - r = write(sb->filename_pipes[i], nextImage, strlen(nextImage)); r -= write(sb->filename_pipes[i], "\n", 1); @@ -916,10 +910,16 @@ void create_sandbox(struct index_args *iargs, int n_proc, char *prefix, if ( sb->running[i] ) allDone = 0; } + pthread_mutex_unlock(&sb->lock); + } + STATUS("Done. Waiting..\n"); + fclose(fh); + pthread_mutex_destroy(&sb->lock); + for ( i=0; ipids[i], &status, 0); @@ -930,6 +930,10 @@ void create_sandbox(struct index_args *iargs, int n_proc, char *prefix, fclose(sb->result_fhs[i]); } + for ( i=0; in_proc; i++ ) { + fclose(sb->fhs[i]); + } + free(sb->fhs); free(sb->filename_pipes); free(sb->result_fhs); free(sb->pids); -- cgit v1.2.3 From cd127611f596921fa2d18dcdbd8641234a123f02 Mon Sep 17 00:00:00 2001 From: Thomas White Date: Sun, 15 Jul 2012 12:10:15 -0400 Subject: Tweak messages and locking --- src/im-sandbox.c | 26 ++++++-------------------- 1 file changed, 6 insertions(+), 20 deletions(-) (limited to 'src') diff --git a/src/im-sandbox.c b/src/im-sandbox.c index 02c90cdc..c315b64a 100644 --- a/src/im-sandbox.c +++ b/src/im-sandbox.c @@ -370,8 +370,6 @@ static void run_work(const struct index_args *iargs, } - STATUS("Got command to exit. Shutting down!\n"); - cleanup_indexing(iargs->ipriv); free(iargs->indm); free(iargs->ipriv); @@ -625,49 +623,41 @@ static void signal_handler(int sig, siginfo_t *si, void *uc_v) { int i, found; - STATUS("Signal!\n"); - if ( si->si_signo != SIGCHLD ) { ERROR("Unhandled signal %i?\n", si->si_signo); return; } found = 0; - STATUS("Getting lock...\n"); fflush(stderr); pthread_mutex_lock(&sb->lock); - STATUS("Got it.\n"); fflush(stderr); for ( i=0; in_proc; i++ ) { if ( (sb->running[i]) && (sb->pids[i] == si->si_pid) ) { found = 1; break; } } + pthread_mutex_unlock(&sb->lock); if ( !found ) { ERROR("SIGCHLD from unknown child %i?\n", si->si_pid); - pthread_mutex_unlock(&sb->lock); return; } if ( (si->si_code == CLD_TRAPPED) || (si->si_code == CLD_STOPPED) - || (si->si_code == CLD_CONTINUED) ) - { - pthread_mutex_unlock(&sb->lock); - return; - } + || (si->si_code == CLD_CONTINUED) ) return; if ( si->si_code == CLD_EXITED ) { + pthread_mutex_lock(&sb->lock); sb->running[i] = 0; - STATUS("Worker process %i exited normally.\n", i); pthread_mutex_unlock(&sb->lock); + STATUS("Worker process %i exited normally.\n", i); return; } if ( (si->si_code != CLD_DUMPED) && (si->si_code != CLD_KILLED) ) { ERROR("Unhandled si_code %i (worker process %i).\n", si->si_code, i); - pthread_mutex_unlock(&sb->lock); return; } @@ -675,7 +665,6 @@ static void signal_handler(int sig, siginfo_t *si, void *uc_v) ERROR(" -> Signal %i, last filename %s.\n", si->si_signo, sb->last_filename[i]); - pthread_mutex_unlock(&sb->lock); start_worker_process(sb, i); } @@ -795,7 +784,7 @@ void create_sandbox(struct index_args *iargs, int n_proc, char *prefix, double tNow; int fdmax; - tv.tv_sec = 5; + tv.tv_sec = 1; tv.tv_usec = 0; FD_ZERO(&fds); @@ -806,7 +795,6 @@ void create_sandbox(struct index_args *iargs, int n_proc, char *prefix, int fd; if ( !sb->running[i] ) { - pthread_mutex_unlock(&sb->lock); continue; } @@ -815,8 +803,8 @@ void create_sandbox(struct index_args *iargs, int n_proc, char *prefix, if ( fd > fdmax ) fdmax = fd; } - pthread_mutex_unlock(&sb->lock); + r = select(fdmax+1, &fds, NULL, NULL, &tv); if ( r == -1 ) { if ( errno != EINTR ) { @@ -914,8 +902,6 @@ void create_sandbox(struct index_args *iargs, int n_proc, char *prefix, } - STATUS("Done. Waiting..\n"); - fclose(fh); pthread_mutex_destroy(&sb->lock); -- cgit v1.2.3 From 29861ba57c9dc68946084a29eb22e3ba1547711e Mon Sep 17 00:00:00 2001 From: Thomas White Date: Wed, 18 Jul 2012 22:32:35 -0400 Subject: Block SIGCHLD while mutex is locked --- src/im-sandbox.c | 71 ++++++++++++++++++++++++++++++++++++++++---------------- 1 file changed, 51 insertions(+), 20 deletions(-) (limited to 'src') diff --git a/src/im-sandbox.c b/src/im-sandbox.c index c315b64a..e67acfd9 100644 --- a/src/im-sandbox.c +++ b/src/im-sandbox.c @@ -76,6 +76,7 @@ struct sandbox { pthread_mutex_t lock; + sigset_t old_sigmask; int n_indexable; int n_processed; @@ -103,6 +104,36 @@ struct sandbox struct sandbox *sb; +static void lock_sandbox(struct sandbox *sb) +{ + int r; + sigset_t set; + + sigemptyset(&set); + sigaddset(&set, SIGCHLD); + + r = pthread_sigmask(SIG_BLOCK, &set, &sb->old_sigmask); + if ( r != 0 ) { + ERROR("Failed to block signals.\n"); + } + + pthread_mutex_lock(&sb->lock); +} + + +static void unlock_sandbox(struct sandbox *sb) +{ + int r; + + pthread_mutex_unlock(&sb->lock); + + r = pthread_sigmask(SIG_SETMASK, &sb->old_sigmask, NULL); + if ( r != 0 ) { + ERROR("Failed to block signals.\n"); + } +} + + static char *get_pattern(FILE *fh, char **use_this_one_instead, int config_basename, const char *prefix) { @@ -464,7 +495,7 @@ static void *run_reader(void *sbv) FD_ZERO(&fds); fdmax = 0; - pthread_mutex_lock(&sb->lock); + lock_sandbox(sb); for ( i=0; in_proc; i++ ) { int fd; @@ -478,7 +509,7 @@ static void *run_reader(void *sbv) } - pthread_mutex_unlock(&sb->lock); + unlock_sandbox(sb); r = select(fdmax+1, &fds, NULL, NULL, &tv); @@ -491,7 +522,7 @@ static void *run_reader(void *sbv) if ( r == 0 ) continue; /* Nothing this time. Try again */ - pthread_mutex_lock(&sb->lock); + lock_sandbox(sb); for ( i=0; in_proc; i++ ) { if ( !sb->running[i] ) continue; @@ -508,7 +539,7 @@ static void *run_reader(void *sbv) for ( i=0; in_proc; i++ ) { if ( sb->running[i] ) done = 0; } - pthread_mutex_unlock(&sb->lock); + unlock_sandbox(sb); } @@ -532,11 +563,11 @@ static void start_worker_process(struct sandbox *sb, int slot) return; } - pthread_mutex_lock(&sb->lock); + lock_sandbox(sb); p = fork(); if ( p == -1 ) { ERROR("fork() failed!\n"); - pthread_mutex_unlock(&sb->lock); + unlock_sandbox(sb); return; } @@ -548,7 +579,7 @@ static void start_worker_process(struct sandbox *sb, int slot) int r; /* FIXME: Is lock inherited? */ - pthread_mutex_unlock(&sb->lock); + unlock_sandbox(sb); /* First, disconnect the signal handler */ sa.sa_flags = 0; @@ -604,18 +635,18 @@ static void start_worker_process(struct sandbox *sb, int slot) sb->fhs[slot] = fdopen(sb->stream_pipe_read[slot], "r"); if ( sb->fhs[slot] == NULL ) { ERROR("Couldn't fdopen() stream!\n"); - pthread_mutex_unlock(&sb->lock); + unlock_sandbox(sb); return; } sb->result_fhs[slot] = fdopen(result_pipe[0], "r"); if ( sb->result_fhs[slot] == NULL ) { ERROR("fdopen() failed.\n"); - pthread_mutex_unlock(&sb->lock); + unlock_sandbox(sb); return; } - pthread_mutex_unlock(&sb->lock); + unlock_sandbox(sb); } @@ -629,14 +660,14 @@ static void signal_handler(int sig, siginfo_t *si, void *uc_v) } found = 0; - pthread_mutex_lock(&sb->lock); + lock_sandbox(sb); for ( i=0; in_proc; i++ ) { if ( (sb->running[i]) && (sb->pids[i] == si->si_pid) ) { found = 1; break; } } - pthread_mutex_unlock(&sb->lock); + unlock_sandbox(sb); if ( !found ) { ERROR("SIGCHLD from unknown child %i?\n", si->si_pid); @@ -648,9 +679,9 @@ static void signal_handler(int sig, siginfo_t *si, void *uc_v) if ( si->si_code == CLD_EXITED ) { - pthread_mutex_lock(&sb->lock); + lock_sandbox(sb); sb->running[i] = 0; - pthread_mutex_unlock(&sb->lock); + unlock_sandbox(sb); STATUS("Worker process %i exited normally.\n", i); return; } @@ -721,7 +752,7 @@ void create_sandbox(struct index_args *iargs, int n_proc, char *prefix, } - pthread_mutex_lock(&sb->lock); + lock_sandbox(sb); sb->filename_pipes = calloc(n_proc, sizeof(int)); sb->result_fhs = calloc(n_proc, sizeof(FILE *)); sb->pids = calloc(n_proc, sizeof(pid_t)); @@ -753,7 +784,7 @@ void create_sandbox(struct index_args *iargs, int n_proc, char *prefix, ERROR("Couldn't allocate memory for file handles!\n"); return; } - pthread_mutex_unlock(&sb->lock); + unlock_sandbox(sb); if ( pthread_create(&reader_thread, NULL, run_reader, (void *)sb) ) { ERROR("Failed to create reader thread.\n"); @@ -789,7 +820,7 @@ void create_sandbox(struct index_args *iargs, int n_proc, char *prefix, FD_ZERO(&fds); fdmax = 0; - pthread_mutex_lock(&sb->lock); + lock_sandbox(sb); for ( i=0; i fdmax ) fdmax = fd; } - pthread_mutex_unlock(&sb->lock); + unlock_sandbox(sb); r = select(fdmax+1, &fds, NULL, NULL, &tv); if ( r == -1 ) { @@ -815,7 +846,7 @@ void create_sandbox(struct index_args *iargs, int n_proc, char *prefix, if ( r == 0 ) continue; /* No progress this time. Try again */ - pthread_mutex_lock(&sb->lock); + lock_sandbox(sb); for ( i=0; irunning[i] ) allDone = 0; } - pthread_mutex_unlock(&sb->lock); + unlock_sandbox(sb); } -- cgit v1.2.3 From c78895d1964ade09c9a006d4d600e592d773e542 Mon Sep 17 00:00:00 2001 From: Thomas White Date: Fri, 20 Jul 2012 14:59:10 -0400 Subject: Simplify signal handling --- src/im-sandbox.c | 119 +++++++++++++++++++++++++------------------------------ 1 file changed, 55 insertions(+), 64 deletions(-) (limited to 'src') diff --git a/src/im-sandbox.c b/src/im-sandbox.c index e67acfd9..8187612d 100644 --- a/src/im-sandbox.c +++ b/src/im-sandbox.c @@ -76,7 +76,6 @@ struct sandbox { pthread_mutex_t lock; - sigset_t old_sigmask; int n_indexable; int n_processed; @@ -101,36 +100,18 @@ struct sandbox /* Horrible global variable for signal handler */ -struct sandbox *sb; +int signal_pipe[2]; static void lock_sandbox(struct sandbox *sb) { - int r; - sigset_t set; - - sigemptyset(&set); - sigaddset(&set, SIGCHLD); - - r = pthread_sigmask(SIG_BLOCK, &set, &sb->old_sigmask); - if ( r != 0 ) { - ERROR("Failed to block signals.\n"); - } - pthread_mutex_lock(&sb->lock); } static void unlock_sandbox(struct sandbox *sb) { - int r; - pthread_mutex_unlock(&sb->lock); - - r = pthread_sigmask(SIG_SETMASK, &sb->old_sigmask, NULL); - if ( r != 0 ) { - ERROR("Failed to block signals.\n"); - } } @@ -563,11 +544,9 @@ static void start_worker_process(struct sandbox *sb, int slot) return; } - lock_sandbox(sb); p = fork(); if ( p == -1 ) { ERROR("fork() failed!\n"); - unlock_sandbox(sb); return; } @@ -578,9 +557,6 @@ static void start_worker_process(struct sandbox *sb, int slot) struct sigaction sa; int r; - /* FIXME: Is lock inherited? */ - unlock_sandbox(sb); - /* First, disconnect the signal handler */ sa.sa_flags = 0; sigemptyset(&sa.sa_mask); @@ -635,68 +611,62 @@ static void start_worker_process(struct sandbox *sb, int slot) sb->fhs[slot] = fdopen(sb->stream_pipe_read[slot], "r"); if ( sb->fhs[slot] == NULL ) { ERROR("Couldn't fdopen() stream!\n"); - unlock_sandbox(sb); return; } sb->result_fhs[slot] = fdopen(result_pipe[0], "r"); if ( sb->result_fhs[slot] == NULL ) { ERROR("fdopen() failed.\n"); - unlock_sandbox(sb); return; } - - unlock_sandbox(sb); } static void signal_handler(int sig, siginfo_t *si, void *uc_v) { - int i, found; + write(signal_pipe[1], "\n", 1); +} - if ( si->si_signo != SIGCHLD ) { - ERROR("Unhandled signal %i?\n", si->si_signo); - return; - } - found = 0; +static void handle_zombie(struct sandbox *sb) +{ + int i; + lock_sandbox(sb); for ( i=0; in_proc; i++ ) { - if ( (sb->running[i]) && (sb->pids[i] == si->si_pid) ) { - found = 1; - break; + + int status, p; + + if ( !sb->running[i] ) continue; + + p = waitpid(sb->pids[i], &status, WNOHANG); + + if ( p == -1 ) { + ERROR("waitpid() failed.\n"); + continue; } - } - unlock_sandbox(sb); - if ( !found ) { - ERROR("SIGCHLD from unknown child %i?\n", si->si_pid); - return; - } + if ( p == sb->pids[i] ) { - if ( (si->si_code == CLD_TRAPPED) || (si->si_code == CLD_STOPPED) - || (si->si_code == CLD_CONTINUED) ) return; + sb->running[i] = 0; - if ( si->si_code == CLD_EXITED ) - { - lock_sandbox(sb); - sb->running[i] = 0; - unlock_sandbox(sb); - STATUS("Worker process %i exited normally.\n", i); - return; - } + if ( WIFEXITED(status) ) { + STATUS("Worker %i exited normally.\n", i); + continue; + } - if ( (si->si_code != CLD_DUMPED) && (si->si_code != CLD_KILLED) ) { - ERROR("Unhandled si_code %i (worker process %i).\n", - si->si_code, i); - return; - } + if ( WIFSIGNALED(status) ) { + STATUS("Worker %i was killed by signal %i\n", + i, WTERMSIG(status)); + STATUS("Last filename was: %s\n", + sb->last_filename[i]); + start_worker_process(sb, i); + } - ERROR("Worker process %i exited abnormally!\n", i); - ERROR(" -> Signal %i, last filename %s.\n", - si->si_signo, sb->last_filename[i]); + } - start_worker_process(sb, i); + } + unlock_sandbox(sb); } @@ -709,6 +679,7 @@ void create_sandbox(struct index_args *iargs, int n_proc, char *prefix, struct sigaction sa; int r; pthread_t reader_thread; + struct sandbox *sb; sb = calloc(1, sizeof(struct sandbox)); if ( sb == NULL ) { @@ -791,6 +762,11 @@ void create_sandbox(struct index_args *iargs, int n_proc, char *prefix, return; } + if ( pipe(signal_pipe) == -1 ) { + ERROR("Failed to create signal pipe.\n"); + return; + } + /* Set up signal handler to take action if any children die */ sa.sa_flags = SA_SIGINFO | SA_NOCLDSTOP; sigemptyset(&sa.sa_mask); @@ -802,9 +778,11 @@ void create_sandbox(struct index_args *iargs, int n_proc, char *prefix, } /* Fork the right number of times */ + lock_sandbox(sb); for ( i=0; i fdmax ) fdmax = signal_pipe[0]; + r = select(fdmax+1, &fds, NULL, NULL, &tv); if ( r == -1 ) { if ( errno != EINTR ) { @@ -846,6 +827,14 @@ void create_sandbox(struct index_args *iargs, int n_proc, char *prefix, if ( r == 0 ) continue; /* No progress this time. Try again */ + if ( FD_ISSET(signal_pipe[0], &fds) ) { + + char d; + read(signal_pipe[0], &d, 1); + handle_zombie(sb); + + } + lock_sandbox(sb); for ( i=0; ilast_filename[i]); + sb->last_filename[i] = nextImage; + if ( nextImage == NULL ) { /* No more images */ r = write(sb->filename_pipes[i], "\n", 1); @@ -903,7 +895,6 @@ void create_sandbox(struct index_args *iargs, int n_proc, char *prefix, if ( r < 0 ) { ERROR("write pipe\n"); } - free(nextImage); } } -- cgit v1.2.3 From 87f6f2a16d642df8f846f432151cb803b766b3cb Mon Sep 17 00:00:00 2001 From: Thomas White Date: Fri, 20 Jul 2012 15:01:48 -0400 Subject: Tidy up the exit path from run_work() --- src/im-sandbox.c | 13 +++++-------- 1 file changed, 5 insertions(+), 8 deletions(-) (limited to 'src') diff --git a/src/im-sandbox.c b/src/im-sandbox.c index 8187612d..eb823683 100644 --- a/src/im-sandbox.c +++ b/src/im-sandbox.c @@ -345,15 +345,12 @@ static void run_work(const struct index_args *iargs, line = malloc(1024*sizeof(char)); rval = fgets(line, 1023, fh); if ( rval == NULL ) { + + ERROR("Read error!\n"); free(line); - if ( feof(fh) ) { - allDone = 1; - STATUS("Exiting!\n"); - continue; - } else { - ERROR("Read error!\n"); - break; - } + allDone = 1; + continue; + } chomp(line); -- cgit v1.2.3 From 6eb751c2230226723884cc7bc473d65b91064b81 Mon Sep 17 00:00:00 2001 From: Thomas White Date: Fri, 20 Jul 2012 15:25:52 -0400 Subject: Break down locking a bit --- src/im-sandbox.c | 4 ++++ 1 file changed, 4 insertions(+) (limited to 'src') diff --git a/src/im-sandbox.c b/src/im-sandbox.c index eb823683..b2f14bbb 100644 --- a/src/im-sandbox.c +++ b/src/im-sandbox.c @@ -895,8 +895,10 @@ void create_sandbox(struct index_args *iargs, int n_proc, char *prefix, } } + unlock_sandbox(sb); /* Update progress */ + lock_sandbox(sb); tNow = get_monotonic_seconds(); if ( tNow >= sb->t_last_stats+STATS_EVERY_N_SECONDS ) { @@ -911,8 +913,10 @@ void create_sandbox(struct index_args *iargs, int n_proc, char *prefix, sb->t_last_stats = tNow; } + unlock_sandbox(sb); allDone = 1; + lock_sandbox(sb); for ( i=0; irunning[i] ) allDone = 0; } -- cgit v1.2.3