From 33d4fbfbbee784f734632e543a75222f38bc807d Mon Sep 17 00:00:00 2001 From: Thomas White Date: Sun, 10 Oct 2010 20:32:41 +0200 Subject: calibrate_detecotr: Use new thread pool --- src/calibrate_detector.c | 287 ++++++++++++----------------------------------- 1 file changed, 72 insertions(+), 215 deletions(-) (limited to 'src/calibrate_detector.c') diff --git a/src/calibrate_detector.c b/src/calibrate_detector.c index e5945015..2c68f645 100644 --- a/src/calibrate_detector.c +++ b/src/calibrate_detector.c @@ -27,6 +27,7 @@ #include "hdf5-file.h" #include "filters.h" #include "peaks.h" +#include "thread-pool.h" #define INTEGRATION_RADIUS (10) @@ -39,10 +40,9 @@ typedef enum SUM_PEAKS } SumMethod; -struct process_args +struct sum_args { char *filename; - int id; int config_cmfilter; int config_noisefilter; double *sum; @@ -50,12 +50,20 @@ struct process_args int h; SumMethod sum_method; double threshold; +}; + - /* Thread control */ - pthread_mutex_t control_mutex; /* Protects the scary stuff below */ - int start; - int finish; - int done; +struct queue_args +{ + FILE *fh; + char *prefix; + int config_cmfilter; + int config_noisefilter; + double *sum; + int w; + int h; + SumMethod sum_method; + double threshold; }; @@ -151,8 +159,9 @@ static void sum_threshold(struct image *image, double *sum, double threshold) } -static void process_image(struct process_args *pargs) +static void add_image(void *args) { + struct sum_args *pargs = args; struct hdfile *hdfile; struct image image; @@ -160,7 +169,6 @@ static void process_image(struct process_args *pargs) image.data = NULL; image.flags = NULL; image.indexed_cell = NULL; - image.id = pargs->id; image.filename = pargs->filename; image.hits = NULL; image.n_hits = 0; @@ -215,71 +223,38 @@ out: image_feature_list_free(image.features); if ( image.flags != NULL ) free(image.flags); hdfile_close(hdfile); -} - - -static void *worker_thread(void *pargsv) -{ - struct process_args *pargs = pargsv; - int finish; - - do { - - int wakeup; - - process_image(pargs); - pthread_mutex_lock(&pargs->control_mutex); - pargs->done = 1; - pargs->start = 0; - pthread_mutex_unlock(&pargs->control_mutex); - - /* Go to sleep until told to exit or process next image */ - do { - - pthread_mutex_lock(&pargs->control_mutex); - /* Either of these can result in the thread waking up */ - wakeup = pargs->start || pargs->finish; - finish = pargs->finish; - pthread_mutex_unlock(&pargs->control_mutex); - usleep(20000); - - } while ( !wakeup ); - - } while ( !pargs->finish ); - - return NULL; + free(pargs->filename); + free(pargs); } -static void dump_to_file(struct process_args *worker_args[], int nthreads, - int w, int h, int n, const char *stem) +static void *get_image(void *qp) { - int i; - double *total; - char outfile[256]; - - total = calloc(w*h, sizeof(double)); - - /* Add the individual sums to the 0th sum */ - for ( i=0; isum[x+w*y]; - total[x+w*y] += val; - } - } - - } - - snprintf(outfile, 255, "%s-%i.h5", stem, n); - - hdf5_write(outfile, total, w, h, H5T_NATIVE_DOUBLE); - - free(total); + char line[1024]; + struct sum_args *pargs; + char *rval; + struct queue_args *qargs = qp; + + /* Get the next filename */ + rval = fgets(line, 1023, qargs->fh); + if ( rval == NULL ) return NULL; + + pargs = malloc(sizeof(struct sum_args)); + + pargs->w = qargs->w; + pargs->h = qargs->h; + pargs->sum_method = qargs->sum_method; + pargs->threshold = qargs->threshold; + pargs->config_cmfilter = qargs->config_cmfilter; + pargs->config_noisefilter = qargs->config_noisefilter; + pargs->sum = qargs->sum; + + chomp(line); + pargs->filename = malloc(strlen(qargs->prefix) + strlen(line) + 1); + snprintf(pargs->filename, 1023, "%s%s", qargs->prefix, line); + + return pargs; } @@ -289,8 +264,7 @@ int main(int argc, char *argv[]) char *filename = NULL; char *outfile = NULL; FILE *fh; - char *rval = NULL; - int n_images; + int n_images = 0; int config_cmfilter = 0; int config_noisefilter = 0; char *prefix = NULL; @@ -299,12 +273,9 @@ int main(int argc, char *argv[]) double threshold = 400.0; SumMethod sum; int nthreads = 1; - pthread_t workers[MAX_THREADS]; - struct process_args *worker_args[MAX_THREADS]; - int worker_active[MAX_THREADS]; - int i; - const int w = 1024; /* FIXME! */ - const int h = 1024; /* FIXME! */ + struct queue_args qargs; + int n_done; + const int chunk_size = 1000; /* Long options */ const struct option longopts[] = { @@ -402,157 +373,43 @@ int main(int argc, char *argv[]) outfile = strdup("summed.h5"); } - if ( (nthreads == 0) || (nthreads > MAX_THREADS) ) { + if ( nthreads == 0 ) { ERROR("Invalid number of threads.\n"); return 1; } - /* Initialise worker arguments */ - for ( i=0; ifilename = malloc(1024); - worker_args[i]->sum = calloc(w*h, sizeof(double)); - worker_active[i] = 0; - - worker_args[i]->w = w; - worker_args[i]->h = h; - worker_args[i]->sum_method = sum; - worker_args[i]->threshold = threshold; - - } - - n_images = 0; - - /* Start threads off */ - for ( i=0; ifilename, 1023, "%s%s", prefix, line); - - n_images++; - - pthread_mutex_init(&pargs->control_mutex, NULL); - pargs->config_cmfilter = config_cmfilter; - pargs->config_noisefilter = config_noisefilter; - pthread_mutex_lock(&pargs->control_mutex); - pargs->done = 0; - pargs->start = 1; - pargs->finish = 0; - pthread_mutex_unlock(&pargs->control_mutex); - - - worker_active[i] = 1; - r = pthread_create(&workers[i], NULL, worker_thread, pargs); - if ( r != 0 ) { - worker_active[i] = 0; - ERROR("Couldn't start thread %i\n", i); - } - - } - - /* Keep threads busy until the end of the data */ do { - int i; - - for ( i=0; icontrol_mutex); - done = pargs->done; - pthread_mutex_unlock(&pargs->control_mutex); - if ( !done ) continue; - - /* Get the next filename */ - rval = fgets(line, 1023, fh); - if ( rval == NULL ) break; - chomp(line); - snprintf(pargs->filename, 1023, "%s%s", prefix, line); + n_done = run_threads(nthreads, add_image, get_image, + (void *)&qargs, chunk_size); - n_images++; - - STATUS("Done %i images\n", n_images); - - /* Wake the thread up ... */ - pthread_mutex_lock(&pargs->control_mutex); - pargs->done = 0; - pargs->start = 1; - pthread_mutex_unlock(&pargs->control_mutex); - - if ( n_images % 1000 == 0 ) { - if ( intermediate != NULL ) { - dump_to_file(worker_args, nthreads, - w, h, n_images, - intermediate); - } - } + n_images += n_done; + /* Write intermediate sum if requested */ + if ( (intermediate != NULL) && (n_done == chunk_size) ) { + char outfile[1024]; + snprintf(outfile, 1023, "%s-%i.h5", + intermediate, n_images); + hdf5_write(outfile, qargs.sum, qargs.w, qargs.h, + H5T_NATIVE_DOUBLE); } - } while ( rval != NULL ); - - /* Join threads */ - for ( i=0; icontrol_mutex); - pargs->finish = 1; - pthread_mutex_unlock(&pargs->control_mutex); - - /* Wait for it to join */ - pthread_join(workers[i], NULL); - - free: - if ( worker_args[i]->filename != NULL ) { - free(worker_args[i]->filename); - } - - } - - /* Add the individual sums to the 0th sum */ - for ( i=1; isum[x+w*y]; - worker_args[0]->sum[x+w*y] += val; - } - } - free(worker_args[i]->sum); - free(worker_args[i]); - - } + } while ( n_done == chunk_size ); - hdf5_write(outfile, worker_args[0]->sum, w, h, H5T_NATIVE_DOUBLE); + /* Write the final output */ + hdf5_write(outfile, qargs.sum, qargs.w, qargs.h, H5T_NATIVE_DOUBLE); - free(worker_args[0]->sum); - free(worker_args[0]); + free(qargs.sum); free(prefix); free(outfile); if ( intermediate != NULL ) free(intermediate); -- cgit v1.2.3