diff options
-rw-r--r-- | src/Makefile.am | 2 | ||||
-rw-r--r-- | src/Makefile.in | 4 | ||||
-rw-r--r-- | src/calibrate_detector.c | 287 | ||||
-rw-r--r-- | src/facetron.c | 8 | ||||
-rw-r--r-- | src/thread-pool.c | 109 | ||||
-rw-r--r-- | src/thread-pool.h | 16 |
6 files changed, 195 insertions, 231 deletions
diff --git a/src/Makefile.am b/src/Makefile.am index 6733dc3a..ee20d0b3 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -53,7 +53,7 @@ render_hkl_SOURCES = render_hkl.c cell.c reflections.c utils.c povray.c \ render_hkl_LDADD = @LIBS@ calibrate_detector_SOURCES = calibrate_detector.c utils.c hdf5-file.c image.c \ - filters.c peaks.c detector.c cell.c + filters.c peaks.c detector.c cell.c thread-pool.c calibrate_detector_LDADD = @LIBS@ facetron_SOURCES = facetron.c cell.c hdf5-file.c utils.c detector.c peaks.c \ diff --git a/src/Makefile.in b/src/Makefile.in index c27c0ec9..c7ed213a 100644 --- a/src/Makefile.in +++ b/src/Makefile.in @@ -56,7 +56,7 @@ PROGRAMS = $(bin_PROGRAMS) am_calibrate_detector_OBJECTS = calibrate_detector.$(OBJEXT) \ utils.$(OBJEXT) hdf5-file.$(OBJEXT) image.$(OBJEXT) \ filters.$(OBJEXT) peaks.$(OBJEXT) detector.$(OBJEXT) \ - cell.$(OBJEXT) + cell.$(OBJEXT) thread-pool.$(OBJEXT) calibrate_detector_OBJECTS = $(am_calibrate_detector_OBJECTS) calibrate_detector_DEPENDENCIES = am_compare_hkl_OBJECTS = compare_hkl.$(OBJEXT) sfac.$(OBJEXT) \ @@ -294,7 +294,7 @@ render_hkl_SOURCES = render_hkl.c cell.c reflections.c utils.c povray.c \ render_hkl_LDADD = @LIBS@ calibrate_detector_SOURCES = calibrate_detector.c utils.c hdf5-file.c image.c \ - filters.c peaks.c detector.c cell.c + filters.c peaks.c detector.c cell.c thread-pool.c calibrate_detector_LDADD = @LIBS@ facetron_SOURCES = facetron.c cell.c hdf5-file.c utils.c detector.c peaks.c \ diff --git a/src/calibrate_detector.c b/src/calibrate_detector.c index e5945015..2c68f645 100644 --- a/src/calibrate_detector.c +++ b/src/calibrate_detector.c @@ -27,6 +27,7 @@ #include "hdf5-file.h" #include "filters.h" #include "peaks.h" +#include "thread-pool.h" #define INTEGRATION_RADIUS (10) @@ -39,10 +40,9 @@ typedef enum SUM_PEAKS } SumMethod; -struct process_args +struct sum_args { char *filename; - int id; int config_cmfilter; int config_noisefilter; double *sum; @@ -50,12 +50,20 @@ struct process_args int h; SumMethod sum_method; double threshold; +}; + - /* Thread control */ - pthread_mutex_t control_mutex; /* Protects the scary stuff below */ - int start; - int finish; - int done; +struct queue_args +{ + FILE *fh; + char *prefix; + int config_cmfilter; + int config_noisefilter; + double *sum; + int w; + int h; + SumMethod sum_method; + double threshold; }; @@ -151,8 +159,9 @@ static void sum_threshold(struct image *image, double *sum, double threshold) } -static void process_image(struct process_args *pargs) +static void add_image(void *args) { + struct sum_args *pargs = args; struct hdfile *hdfile; struct image image; @@ -160,7 +169,6 @@ static void process_image(struct process_args *pargs) image.data = NULL; image.flags = NULL; image.indexed_cell = NULL; - image.id = pargs->id; image.filename = pargs->filename; image.hits = NULL; image.n_hits = 0; @@ -215,71 +223,38 @@ out: image_feature_list_free(image.features); if ( image.flags != NULL ) free(image.flags); hdfile_close(hdfile); -} - - -static void *worker_thread(void *pargsv) -{ - struct process_args *pargs = pargsv; - int finish; - - do { - - int wakeup; - - process_image(pargs); - pthread_mutex_lock(&pargs->control_mutex); - pargs->done = 1; - pargs->start = 0; - pthread_mutex_unlock(&pargs->control_mutex); - - /* Go to sleep until told to exit or process next image */ - do { - - pthread_mutex_lock(&pargs->control_mutex); - /* Either of these can result in the thread waking up */ - wakeup = pargs->start || pargs->finish; - finish = pargs->finish; - pthread_mutex_unlock(&pargs->control_mutex); - usleep(20000); - - } while ( !wakeup ); - - } while ( !pargs->finish ); - - return NULL; + free(pargs->filename); + free(pargs); } -static void dump_to_file(struct process_args *worker_args[], int nthreads, - int w, int h, int n, const char *stem) +static void *get_image(void *qp) { - int i; - double *total; - char outfile[256]; - - total = calloc(w*h, sizeof(double)); - - /* Add the individual sums to the 0th sum */ - for ( i=0; i<nthreads; i++ ) { - - int x, y; - - for ( x=0; x<w; x++ ) { - for ( y=0; y<h; y++ ) { - double val = worker_args[i]->sum[x+w*y]; - total[x+w*y] += val; - } - } - - } - - snprintf(outfile, 255, "%s-%i.h5", stem, n); - - hdf5_write(outfile, total, w, h, H5T_NATIVE_DOUBLE); - - free(total); + char line[1024]; + struct sum_args *pargs; + char *rval; + struct queue_args *qargs = qp; + + /* Get the next filename */ + rval = fgets(line, 1023, qargs->fh); + if ( rval == NULL ) return NULL; + + pargs = malloc(sizeof(struct sum_args)); + + pargs->w = qargs->w; + pargs->h = qargs->h; + pargs->sum_method = qargs->sum_method; + pargs->threshold = qargs->threshold; + pargs->config_cmfilter = qargs->config_cmfilter; + pargs->config_noisefilter = qargs->config_noisefilter; + pargs->sum = qargs->sum; + + chomp(line); + pargs->filename = malloc(strlen(qargs->prefix) + strlen(line) + 1); + snprintf(pargs->filename, 1023, "%s%s", qargs->prefix, line); + + return pargs; } @@ -289,8 +264,7 @@ int main(int argc, char *argv[]) char *filename = NULL; char *outfile = NULL; FILE *fh; - char *rval = NULL; - int n_images; + int n_images = 0; int config_cmfilter = 0; int config_noisefilter = 0; char *prefix = NULL; @@ -299,12 +273,9 @@ int main(int argc, char *argv[]) double threshold = 400.0; SumMethod sum; int nthreads = 1; - pthread_t workers[MAX_THREADS]; - struct process_args *worker_args[MAX_THREADS]; - int worker_active[MAX_THREADS]; - int i; - const int w = 1024; /* FIXME! */ - const int h = 1024; /* FIXME! */ + struct queue_args qargs; + int n_done; + const int chunk_size = 1000; /* Long options */ const struct option longopts[] = { @@ -402,157 +373,43 @@ int main(int argc, char *argv[]) outfile = strdup("summed.h5"); } - if ( (nthreads == 0) || (nthreads > MAX_THREADS) ) { + if ( nthreads == 0 ) { ERROR("Invalid number of threads.\n"); return 1; } - /* Initialise worker arguments */ - for ( i=0; i<nthreads; i++ ) { - - worker_args[i] = malloc(sizeof(struct process_args)); - worker_args[i]->filename = malloc(1024); - worker_args[i]->sum = calloc(w*h, sizeof(double)); - worker_active[i] = 0; - - worker_args[i]->w = w; - worker_args[i]->h = h; - worker_args[i]->sum_method = sum; - worker_args[i]->threshold = threshold; - - } - - n_images = 0; - - /* Start threads off */ - for ( i=0; i<nthreads; i++ ) { - - char line[1024]; - struct process_args *pargs; - int r; - - pargs = worker_args[i]; + qargs.w = 1024; /* FIXME! */ + qargs.h = 1024; /* FIXME! */ + qargs.sum_method = sum; + qargs.threshold = threshold; + qargs.config_cmfilter = config_cmfilter; + qargs.config_noisefilter = config_noisefilter; + qargs.sum = calloc(qargs.w*qargs.h, sizeof(double)); + qargs.prefix = prefix; + qargs.fh = fh; - rval = fgets(line, 1023, fh); - if ( rval == NULL ) continue; - chomp(line); - snprintf(pargs->filename, 1023, "%s%s", prefix, line); - - n_images++; - - pthread_mutex_init(&pargs->control_mutex, NULL); - pargs->config_cmfilter = config_cmfilter; - pargs->config_noisefilter = config_noisefilter; - pthread_mutex_lock(&pargs->control_mutex); - pargs->done = 0; - pargs->start = 1; - pargs->finish = 0; - pthread_mutex_unlock(&pargs->control_mutex); - - - worker_active[i] = 1; - r = pthread_create(&workers[i], NULL, worker_thread, pargs); - if ( r != 0 ) { - worker_active[i] = 0; - ERROR("Couldn't start thread %i\n", i); - } - - } - - /* Keep threads busy until the end of the data */ do { - int i; - - for ( i=0; i<nthreads; i++ ) { - - char line[1024]; - struct process_args *pargs; - int done; - - /* Spend time working, not managing threads */ - usleep(100000); - - /* Are we using this thread record at all? */ - if ( !worker_active[i] ) continue; - - /* Has the thread finished yet? */ - pargs = worker_args[i]; - pthread_mutex_lock(&pargs->control_mutex); - done = pargs->done; - pthread_mutex_unlock(&pargs->control_mutex); - if ( !done ) continue; - - /* Get the next filename */ - rval = fgets(line, 1023, fh); - if ( rval == NULL ) break; - chomp(line); - snprintf(pargs->filename, 1023, "%s%s", prefix, line); + n_done = run_threads(nthreads, add_image, get_image, + (void *)&qargs, chunk_size); - n_images++; - - STATUS("Done %i images\n", n_images); - - /* Wake the thread up ... */ - pthread_mutex_lock(&pargs->control_mutex); - pargs->done = 0; - pargs->start = 1; - pthread_mutex_unlock(&pargs->control_mutex); - - if ( n_images % 1000 == 0 ) { - if ( intermediate != NULL ) { - dump_to_file(worker_args, nthreads, - w, h, n_images, - intermediate); - } - } + n_images += n_done; + /* Write intermediate sum if requested */ + if ( (intermediate != NULL) && (n_done == chunk_size) ) { + char outfile[1024]; + snprintf(outfile, 1023, "%s-%i.h5", + intermediate, n_images); + hdf5_write(outfile, qargs.sum, qargs.w, qargs.h, + H5T_NATIVE_DOUBLE); } - } while ( rval != NULL ); - - /* Join threads */ - for ( i=0; i<nthreads; i++ ) { - - if ( !worker_active[i] ) goto free; - - - /* Tell the thread to exit */ - struct process_args *pargs = worker_args[i]; - pthread_mutex_lock(&pargs->control_mutex); - pargs->finish = 1; - pthread_mutex_unlock(&pargs->control_mutex); - - /* Wait for it to join */ - pthread_join(workers[i], NULL); - - free: - if ( worker_args[i]->filename != NULL ) { - free(worker_args[i]->filename); - } - - } - - /* Add the individual sums to the 0th sum */ - for ( i=1; i<nthreads; i++ ) { - - int x, y; - - for ( x=0; x<w; x++ ) { - for ( y=0; y<h; y++ ) { - double val = worker_args[i]->sum[x+w*y]; - worker_args[0]->sum[x+w*y] += val; - } - } - free(worker_args[i]->sum); - free(worker_args[i]); - - } + } while ( n_done == chunk_size ); - hdf5_write(outfile, worker_args[0]->sum, w, h, H5T_NATIVE_DOUBLE); + /* Write the final output */ + hdf5_write(outfile, qargs.sum, qargs.w, qargs.h, H5T_NATIVE_DOUBLE); - free(worker_args[0]->sum); - free(worker_args[0]); + free(qargs.sum); free(prefix); free(outfile); if ( intermediate != NULL ) free(intermediate); diff --git a/src/facetron.c b/src/facetron.c index 3dc7d486..27fc1608 100644 --- a/src/facetron.c +++ b/src/facetron.c @@ -174,8 +174,8 @@ static void refine_all(struct image *images, int n_total_patterns, } - munch_threads(n_total_patterns, nthreads, "Refining", - refine_image, tasks); + run_thread_range(n_total_patterns, nthreads, "Refining", + refine_image, tasks); free(tasks); } @@ -205,8 +205,8 @@ static void estimate_full(struct image *images, int n_total_patterns, } - munch_threads(n_total_patterns, nthreads, "Integrating", - integrate_image, tasks); + run_thread_range(n_total_patterns, nthreads, "Integrating", + integrate_image, tasks); free(tasks); diff --git a/src/thread-pool.c b/src/thread-pool.c index ac508b11..358fb4f3 100644 --- a/src/thread-pool.c +++ b/src/thread-pool.c @@ -25,6 +25,8 @@ #include "utils.h" +/* ---------------------------------- Range --------------------------------- */ + enum { TASK_READY, TASK_RUNNING, @@ -32,7 +34,7 @@ enum { }; -struct task_queue +struct task_queue_range { pthread_mutex_t lock; @@ -47,9 +49,9 @@ struct task_queue }; -static void *worker_thread(void *pargsv) +static void *range_worker(void *pargsv) { - struct task_queue *q = pargsv; + struct task_queue_range *q = pargsv; do { @@ -89,12 +91,12 @@ static void *worker_thread(void *pargsv) } -void munch_threads(int n_tasks, int n_threads, const char *text, - void (*work)(int, void *), void *work_args) +void run_thread_range(int n_tasks, int n_threads, const char *text, + void (*work)(int, void *), void *work_args) { pthread_t *workers; int i; - struct task_queue q; + struct task_queue_range q; /* The nation of CrystFEL prides itself on having 0% unemployment. */ if ( n_threads > n_tasks ) n_threads = n_tasks; @@ -116,7 +118,7 @@ void munch_threads(int n_tasks, int n_threads, const char *text, /* Start threads */ for ( i=0; i<n_threads; i++ ) { - if ( pthread_create(&workers[i], NULL, worker_thread, &q) ) { + if ( pthread_create(&workers[i], NULL, range_worker, &q) ) { ERROR("Couldn't start thread %i\n", i); n_threads = i; break; @@ -132,3 +134,96 @@ void munch_threads(int n_tasks, int n_threads, const char *text, free(q.status); free(workers); } + + +/* ---------------------------- Custom get_task() --------------------------- */ + +struct task_queue +{ + pthread_mutex_t lock; + + int n_started; + int n_completed; + int max; + + void *(*get_task)(void *); + void *queue_args; + void (*work)(void *); +}; + + +static void *task_worker(void *pargsv) +{ + struct task_queue *q = pargsv; + + do { + + void *task; + + /* Get a task */ + pthread_mutex_lock(&q->lock); + if ( q->n_started >= q->max ) { + pthread_mutex_unlock(&q->lock); + break; + } + task = q->get_task(q->queue_args); + + /* No more tasks? */ + if ( task == NULL ) { + pthread_mutex_unlock(&q->lock); + break; + } + + q->n_started++; + pthread_mutex_unlock(&q->lock); + + q->work(task); + + /* Update totals etc */ + pthread_mutex_lock(&q->lock); + q->n_completed++; + pthread_mutex_unlock(&q->lock); + + } while ( 1 ); + + return NULL; +} + + +int run_threads(int n_threads, void (*work)(void *), + void *(*get_task)(void *), void *queue_args, int max) +{ + pthread_t *workers; + int i; + struct task_queue q; + + workers = malloc(n_threads * sizeof(pthread_t)); + + pthread_mutex_init(&q.lock, NULL); + q.work = work; + q.get_task = get_task; + q.queue_args = queue_args; + q.n_started = 0; + q.n_completed = 0; + q.max = max; + + /* Start threads */ + for ( i=0; i<n_threads; i++ ) { + + if ( pthread_create(&workers[i], NULL, task_worker, &q) ) { + ERROR("Couldn't start thread %i\n", i); + n_threads = i; + break; + } + + } + + /* Join threads */ + for ( i=0; i<n_threads; i++ ) { + pthread_join(workers[i], NULL); + } + + free(workers); + + return q.n_completed; +} diff --git a/src/thread-pool.h b/src/thread-pool.h index 4439f970..11123493 100644 --- a/src/thread-pool.h +++ b/src/thread-pool.h @@ -18,8 +18,20 @@ #endif -extern void munch_threads(int n_tasks, int n_threads, const char *text, - void (*work)(int, void *), void *work_args); +/* work() will be called with a number and work_args. The number will be + * unique and in the range 0..n_tasks. A progress bar will be shown using + * "text" and the progress through the tasks. */ +extern void run_thread_range(int n_tasks, int n_threads, const char *text, + void (*work)(int, void *), void *work_args); + + +/* get_task() will be called every time a worker is idle. It returns either + * NULL, indicating that no further work is available, or a pointer which will + * be passed to work(). Work will stop after 'max' tasks have been processed. + * get_task() does not need to be re-entrant. + * Returns: the number of tasks processed. */ +extern int run_threads(int n_threads, void (*work)(void *), + void *(*get_task)(void *), void *queue_args, int max); #endif /* THREAD_POOL_H */ |