diff options
-rw-r--r-- | src/partialator.c | 78 | ||||
-rw-r--r-- | src/thread-pool.c | 143 | ||||
-rw-r--r-- | src/thread-pool.h | 8 |
3 files changed, 57 insertions, 172 deletions
diff --git a/src/partialator.c b/src/partialator.c index ea3eb50c..33dcad68 100644 --- a/src/partialator.c +++ b/src/partialator.c @@ -71,40 +71,76 @@ struct refine_args }; -static void refine_image(int mytask, void *tasks) +struct queue_args { - struct refine_args *all_args = tasks; - struct refine_args *pargs = &all_args[mytask]; + int n; + int n_done; + int n_total_patterns; + struct image *images; + struct refine_args task_defaults; +}; + + +static void refine_image(void *task, int id) +{ + struct refine_args *pargs = task; struct image *image = pargs->image; + image->id = id; pr_refine(image, pargs->full, pargs->sym); } -static void refine_all(struct image *images, int n_total_patterns, - struct detector *det, const char *sym, - ReflItemList *obs, RefList *full, int nthreads, - FILE *graph, FILE *pgraph) +static void *get_image(void *vqargs) { - struct refine_args *tasks; - int i; + struct refine_args *task; + struct queue_args *qargs = vqargs; - tasks = malloc(n_total_patterns * sizeof(struct refine_args)); - for ( i=0; i<n_total_patterns; i++ ) { + task = malloc(sizeof(struct refine_args)); + memcpy(task, &qargs->task_defaults, sizeof(struct refine_args)); - tasks[i].sym = sym; - tasks[i].obs = obs; - tasks[i].full = full; - tasks[i].image = &images[i]; - tasks[i].graph = graph; - tasks[i].pgraph = pgraph; + task->image = &qargs->images[qargs->n]; - } + qargs->n++; - run_thread_range(n_total_patterns, nthreads, "Refining", - refine_image, tasks, 0, 0, 0); + return task; +} + + +static void done_image(void *vqargs, void *task) +{ + struct queue_args *qargs = vqargs; + + qargs->n_done++; + + progress_bar(qargs->n_done, qargs->n_total_patterns, "Refining"); + free(task); +} - free(tasks); + +static void refine_all(struct image *images, int n_total_patterns, + struct detector *det, const char *sym, + ReflItemList *obs, RefList *full, int nthreads, + FILE *graph, FILE *pgraph) +{ + struct refine_args task_defaults; + struct queue_args qargs; + + task_defaults.sym = sym; + task_defaults.obs = obs; + task_defaults.full = full; + task_defaults.image = NULL; + task_defaults.graph = graph; + task_defaults.pgraph = pgraph; + + qargs.task_defaults = task_defaults; + qargs.n = 0; + qargs.n_done = 0; + qargs.n_total_patterns = n_total_patterns; + qargs.images = images; + + run_threads(nthreads, refine_image, get_image, done_image, + &qargs, n_total_patterns, 0, 0, 0); } diff --git a/src/thread-pool.c b/src/thread-pool.c index c91c7002..133cbfcf 100644 --- a/src/thread-pool.c +++ b/src/thread-pool.c @@ -114,149 +114,6 @@ signed int get_status_label() } -/* ---------------------------------- Range --------------------------------- */ - -enum { - TASK_READY, - TASK_RUNNING, - TASK_FINISHED, -}; - - -struct task_queue_range -{ - pthread_mutex_t lock; - - int n_tasks; - int *status; - int n_done; - - void (*work)(int, void *); - void *work_args; - - const char *text; -}; - - -static void *range_worker(void *pargsv) -{ - struct worker_args *w = pargsv; - struct task_queue_range *q = w->tqr; - int *cookie; - - set_affinity(w->id, w->cpu_num, w->cpu_groupsize, w->cpu_offset); - - cookie = malloc(sizeof(int)); - *cookie = w->id; - pthread_setspecific(status_label_key, cookie); - - free(w); - - do { - - int i; - int found = 0; - int mytask = -1; - - /* Get a task */ - pthread_mutex_lock(&q->lock); - for ( i=0; i<q->n_tasks; i++ ) { - if ( q->status[i] == TASK_READY ) { - mytask = i; - found = 1; - q->status[i] = TASK_RUNNING; - break; - } - } - pthread_mutex_unlock(&q->lock); - - /* No more tasks? */ - if ( !found ) break; - - q->work(mytask, q->work_args); - - /* Mark this task as done, update totals etc */ - pthread_mutex_lock(&q->lock); - q->status[mytask] = TASK_FINISHED; - q->n_done++; - if ( q->text != NULL ) { - progress_bar(q->n_done, q->n_tasks, q->text); - } - pthread_mutex_unlock(&q->lock); - - } while ( 1 ); - - free(cookie); - - return NULL; -} - - -void run_thread_range(int n_tasks, int n_threads, const char *text, - void (*work)(int, void *), void *work_args, - int cpu_num, int cpu_groupsize, int cpu_offset) -{ - pthread_t *workers; - int i; - struct task_queue_range q; - - /* The nation of CrystFEL prides itself on having 0% unemployment. */ - if ( n_threads > n_tasks ) n_threads = n_tasks; - - pthread_key_create(&status_label_key, NULL); - - workers = malloc(n_threads * sizeof(pthread_t)); - - q.status = malloc(n_tasks * sizeof(int)); - pthread_mutex_init(&q.lock, NULL); - q.n_tasks = n_tasks; - q.work = work; - q.work_args = work_args; - q.n_done = 0; - q.text = text; - - for ( i=0; i<n_tasks; i++ ) { - q.status[i] = TASK_READY; - } - - /* Now it's safe to start using the status labels */ - if ( n_threads > 1 ) use_status_labels = 1; - - /* Start threads */ - for ( i=0; i<n_threads; i++ ) { - - struct worker_args *w; - - w = malloc(sizeof(struct worker_args)); - - w->tqr = &q; - w->tq = NULL; - w->id = i; - w->cpu_num = cpu_num; - w->cpu_groupsize = cpu_groupsize; - w->cpu_offset = cpu_offset; - - if ( pthread_create(&workers[i], NULL, range_worker, w) ) { - /* Not ERROR() here */ - fprintf(stderr, "Couldn't start thread %i\n", i); - n_threads = i; - break; - } - - } - - /* Join threads */ - for ( i=0; i<n_threads; i++ ) { - pthread_join(workers[i], NULL); - } - - use_status_labels = 0; - - free(q.status); - free(workers); -} - - /* ---------------------------- Custom get_task() --------------------------- */ struct task_queue diff --git a/src/thread-pool.h b/src/thread-pool.h index 04a9e19b..9c2a86a0 100644 --- a/src/thread-pool.h +++ b/src/thread-pool.h @@ -24,14 +24,6 @@ extern pthread_mutex_t stderr_lock; extern signed int get_status_label(void); -/* work() will be called with a number and work_args. The number will be - * unique and in the range 0..n_tasks. A progress bar will be shown using - * "text" and the progress through the tasks, unless "text" is NULL. */ -extern void run_thread_range(int n_tasks, int n_threads, const char *text, - void (*work)(int, void *), void *work_args, - int cpu_num, int cpu_groupsize, int cpu_offset); - - /* get_task() will be called every time a worker is idle. It returns either * NULL, indicating that no further work is available, or a pointer which will * be passed to work(). Work will stop after 'max' tasks have been processed. |