diff options
Diffstat (limited to 'src/facetron.c')
-rw-r--r-- | src/facetron.c | 251 |
1 files changed, 58 insertions, 193 deletions
diff --git a/src/facetron.c b/src/facetron.c index 5d760b8a..3dc7d486 100644 --- a/src/facetron.c +++ b/src/facetron.c @@ -20,8 +20,6 @@ #include <string.h> #include <unistd.h> #include <getopt.h> -#include <pthread.h> -#include <sys/time.h> #include <assert.h> #include "utils.h" @@ -31,30 +29,7 @@ #include "stream.h" #include "geometry.h" #include "peaks.h" - - -#define MAX_THREADS (256) - -struct process_args -{ - struct image *image; - - /* Thread control */ - pthread_mutex_t control_mutex; /* Protects the scary stuff below */ - int start; - int finish; - int done; - - /* Analysis routine */ - void (*func)(struct process_args *); - - /* Analysis parameters */ - const char *sym; - pthread_mutex_t *list_lock; /* Protects 'obs', 'i_full' and 'cts' */ - ReflItemList *obs; - double *i_full; - unsigned int *cts; -}; +#include "thread-pool.h" static void show_help(const char *s) @@ -79,14 +54,38 @@ static void show_help(const char *s) } -static void refine_image(struct process_args *pargs) +struct refine_args { + const char *sym; + ReflItemList *obs; + double *i_full; + struct image *image; +}; + + +static void refine_image(int mytask, void *tasks) +{ + struct refine_args *all_args = tasks; + struct refine_args *pargs = &all_args[mytask]; /* Do, er, something. */ } -static void integrate_image(struct process_args *pargs) +struct integrate_args +{ + const char *sym; + ReflItemList *obs; + double *i_full; + unsigned int *cts; + pthread_mutex_t *list_lock; + struct image *image; +}; + + +static void integrate_image(int mytask, void *tasks) { + struct integrate_args *all_args = tasks; + struct integrate_args *pargs = &all_args[mytask]; struct reflhit *spots; int j, n; struct hdfile *hdfile; @@ -158,177 +157,27 @@ static void integrate_image(struct process_args *pargs) } -static void *worker_thread(void *pargsv) -{ - struct process_args *pargs = pargsv; - int finish; - - do { - - int wakeup; - - /* Acknowledge start */ - pthread_mutex_lock(&pargs->control_mutex); - pargs->start = 0; - pthread_mutex_unlock(&pargs->control_mutex); - - pargs->func(pargs); - - pthread_mutex_lock(&pargs->control_mutex); - pargs->done = 1; - pthread_mutex_unlock(&pargs->control_mutex); - - /* Go to sleep until told to exit or process next image */ - do { - - pthread_mutex_lock(&pargs->control_mutex); - /* Either of these can result in the thread waking up */ - wakeup = pargs->start || pargs->finish; - finish = pargs->finish; - pthread_mutex_unlock(&pargs->control_mutex); - usleep(20000); - - } while ( !wakeup ); - - } while ( !pargs->finish ); - - return NULL; -} - - -static void munch_threads(struct image *images, int n_total_patterns, - struct detector *det, const char *sym, - ReflItemList *obs, double *i_full, unsigned int *cts, - int nthreads, void (*func)(struct process_args *), - const char *text) +static void refine_all(struct image *images, int n_total_patterns, + struct detector *det, const char *sym, + ReflItemList *obs, double *i_full, int nthreads) { - pthread_t workers[MAX_THREADS]; - struct process_args *worker_args[MAX_THREADS]; - pthread_mutex_t list_lock = PTHREAD_MUTEX_INITIALIZER; - int worker_active[MAX_THREADS]; + struct refine_args *tasks; int i; - int n_done = 0; - int n_started = 0; - - /* Initialise worker arguments with the unchanging data */ - for ( i=0; i<nthreads; i++ ) { - - worker_args[i] = malloc(sizeof(struct process_args)); - worker_active[i] = 0; - pthread_mutex_init(&worker_args[i]->control_mutex, NULL); - worker_args[i]->sym = sym; - worker_args[i]->obs = obs; - worker_args[i]->i_full = i_full; - worker_args[i]->cts = cts; - worker_args[i]->list_lock = &list_lock; - worker_args[i]->func = func; - - } - - /* Start threads off */ - for ( i=0; i<nthreads; i++ ) { - - struct process_args *pargs; - int r; - - if ( n_started == n_total_patterns ) break; - - pargs = worker_args[i]; - pargs->image = &images[n_started++]; - - pthread_mutex_lock(&pargs->control_mutex); - pargs->done = 0; - pargs->start = 1; - pargs->finish = 0; - pthread_mutex_unlock(&pargs->control_mutex); - - worker_active[i] = 1; - r = pthread_create(&workers[i], NULL, worker_thread, pargs); - if ( r != 0 ) { - worker_active[i] = 0; - ERROR("Couldn't start thread %i\n", i); - } - - } - - /* Keep threads busy until the end of the data */ - do { - - int i; - - for ( i=0; i<nthreads; i++ ) { - - struct process_args *pargs; - int done; - - /* Spend time working, not managing threads */ - usleep(100000); - - /* Are we using this thread record at all? */ - if ( !worker_active[i] ) continue; - - /* Has the thread finished yet? */ - pargs = worker_args[i]; - pthread_mutex_lock(&pargs->control_mutex); - done = pargs->done; - pthread_mutex_unlock(&pargs->control_mutex); - if ( !done ) continue; - - /* Reset "done" flag */ - pthread_mutex_lock(&pargs->control_mutex); - pargs->done = 0; - pthread_mutex_unlock(&pargs->control_mutex); - - n_done++; - progress_bar(n_done, n_total_patterns, text); - /* If there are no more patterns, "done" will remain - * zero, so the last pattern will not be re-counted. */ - if ( n_started == n_total_patterns ) break; - - /* Start work on the next pattern */ - pargs->image = &images[n_started++]; - pthread_mutex_lock(&pargs->control_mutex); - pargs->start = 1; - pthread_mutex_unlock(&pargs->control_mutex); - - } - - } while ( n_started < n_total_patterns ); - - /* Join threads */ - for ( i=0; i<nthreads; i++ ) { - - if ( !worker_active[i] ) continue; - - /* Tell the thread to exit */ - struct process_args *pargs = worker_args[i]; - pthread_mutex_lock(&pargs->control_mutex); - pargs->finish = 1; - pthread_mutex_unlock(&pargs->control_mutex); - - /* Wait for it to join */ - pthread_join(workers[i], NULL); - - if ( pargs->done ) { - n_done++; - progress_bar(n_done, n_total_patterns, text); - } /* else this thread was not busy */ + tasks = malloc(n_total_patterns * sizeof(struct refine_args)); + for ( i=0; i<n_total_patterns; i++ ) { - } + tasks[i].sym = sym; + tasks[i].obs = obs; + tasks[i].i_full = i_full; + tasks[i].image = &images[i]; - for ( i=0; i<nthreads; i++ ) { - free(worker_args[i]); } -} + munch_threads(n_total_patterns, nthreads, "Refining", + refine_image, tasks); -static void refine_all(struct image *images, int n_total_patterns, - struct detector *det, const char *sym, - ReflItemList *obs, double *i_full, int nthreads) -{ - munch_threads(images, n_total_patterns, det, sym, obs, i_full, NULL, - nthreads, refine_image, "Refining"); + free(tasks); } @@ -338,12 +187,28 @@ static void estimate_full(struct image *images, int n_total_patterns, { int i; unsigned int *cts; + struct integrate_args *tasks; + pthread_mutex_t list_lock = PTHREAD_MUTEX_INITIALIZER; cts = new_list_count(); clear_items(obs); - munch_threads(images, n_total_patterns, det, sym, obs, i_full, cts, - nthreads, integrate_image, "Integrating"); + tasks = malloc(n_total_patterns * sizeof(struct integrate_args)); + for ( i=0; i<n_total_patterns; i++ ) { + + tasks[i].sym = sym; + tasks[i].obs = obs; + tasks[i].i_full = i_full; + tasks[i].cts = cts; + tasks[i].list_lock = &list_lock; + tasks[i].image = &images[i]; + + } + + munch_threads(n_total_patterns, nthreads, "Integrating", + integrate_image, tasks); + + free(tasks); /* Divide the totals to get the means */ for ( i=0; i<num_items(obs); i++ ) { |