aboutsummaryrefslogtreecommitdiff
path: root/src/facetron.c
diff options
context:
space:
mode:
authorThomas White <taw@bitwiz.org.uk>2010-10-10 15:20:07 +0200
committerThomas White <taw@physics.org>2012-02-22 15:27:02 +0100
commit5a9ad87cc6534c046d23347241fbbcf43b5e3715 (patch)
treefb3fa913324645d4409078d4ce63adf57345a04c /src/facetron.c
parent33a8fc19c75323cc5447add5f268258ae518aff0 (diff)
facetron: Use new thread pool
Diffstat (limited to 'src/facetron.c')
-rw-r--r--src/facetron.c251
1 files changed, 58 insertions, 193 deletions
diff --git a/src/facetron.c b/src/facetron.c
index 5d760b8a..3dc7d486 100644
--- a/src/facetron.c
+++ b/src/facetron.c
@@ -20,8 +20,6 @@
#include <string.h>
#include <unistd.h>
#include <getopt.h>
-#include <pthread.h>
-#include <sys/time.h>
#include <assert.h>
#include "utils.h"
@@ -31,30 +29,7 @@
#include "stream.h"
#include "geometry.h"
#include "peaks.h"
-
-
-#define MAX_THREADS (256)
-
-struct process_args
-{
- struct image *image;
-
- /* Thread control */
- pthread_mutex_t control_mutex; /* Protects the scary stuff below */
- int start;
- int finish;
- int done;
-
- /* Analysis routine */
- void (*func)(struct process_args *);
-
- /* Analysis parameters */
- const char *sym;
- pthread_mutex_t *list_lock; /* Protects 'obs', 'i_full' and 'cts' */
- ReflItemList *obs;
- double *i_full;
- unsigned int *cts;
-};
+#include "thread-pool.h"
static void show_help(const char *s)
@@ -79,14 +54,38 @@ static void show_help(const char *s)
}
-static void refine_image(struct process_args *pargs)
+struct refine_args
{
+ const char *sym;
+ ReflItemList *obs;
+ double *i_full;
+ struct image *image;
+};
+
+
+static void refine_image(int mytask, void *tasks)
+{
+ struct refine_args *all_args = tasks;
+ struct refine_args *pargs = &all_args[mytask];
/* Do, er, something. */
}
-static void integrate_image(struct process_args *pargs)
+struct integrate_args
+{
+ const char *sym;
+ ReflItemList *obs;
+ double *i_full;
+ unsigned int *cts;
+ pthread_mutex_t *list_lock;
+ struct image *image;
+};
+
+
+static void integrate_image(int mytask, void *tasks)
{
+ struct integrate_args *all_args = tasks;
+ struct integrate_args *pargs = &all_args[mytask];
struct reflhit *spots;
int j, n;
struct hdfile *hdfile;
@@ -158,177 +157,27 @@ static void integrate_image(struct process_args *pargs)
}
-static void *worker_thread(void *pargsv)
-{
- struct process_args *pargs = pargsv;
- int finish;
-
- do {
-
- int wakeup;
-
- /* Acknowledge start */
- pthread_mutex_lock(&pargs->control_mutex);
- pargs->start = 0;
- pthread_mutex_unlock(&pargs->control_mutex);
-
- pargs->func(pargs);
-
- pthread_mutex_lock(&pargs->control_mutex);
- pargs->done = 1;
- pthread_mutex_unlock(&pargs->control_mutex);
-
- /* Go to sleep until told to exit or process next image */
- do {
-
- pthread_mutex_lock(&pargs->control_mutex);
- /* Either of these can result in the thread waking up */
- wakeup = pargs->start || pargs->finish;
- finish = pargs->finish;
- pthread_mutex_unlock(&pargs->control_mutex);
- usleep(20000);
-
- } while ( !wakeup );
-
- } while ( !pargs->finish );
-
- return NULL;
-}
-
-
-static void munch_threads(struct image *images, int n_total_patterns,
- struct detector *det, const char *sym,
- ReflItemList *obs, double *i_full, unsigned int *cts,
- int nthreads, void (*func)(struct process_args *),
- const char *text)
+static void refine_all(struct image *images, int n_total_patterns,
+ struct detector *det, const char *sym,
+ ReflItemList *obs, double *i_full, int nthreads)
{
- pthread_t workers[MAX_THREADS];
- struct process_args *worker_args[MAX_THREADS];
- pthread_mutex_t list_lock = PTHREAD_MUTEX_INITIALIZER;
- int worker_active[MAX_THREADS];
+ struct refine_args *tasks;
int i;
- int n_done = 0;
- int n_started = 0;
-
- /* Initialise worker arguments with the unchanging data */
- for ( i=0; i<nthreads; i++ ) {
-
- worker_args[i] = malloc(sizeof(struct process_args));
- worker_active[i] = 0;
- pthread_mutex_init(&worker_args[i]->control_mutex, NULL);
- worker_args[i]->sym = sym;
- worker_args[i]->obs = obs;
- worker_args[i]->i_full = i_full;
- worker_args[i]->cts = cts;
- worker_args[i]->list_lock = &list_lock;
- worker_args[i]->func = func;
-
- }
-
- /* Start threads off */
- for ( i=0; i<nthreads; i++ ) {
-
- struct process_args *pargs;
- int r;
-
- if ( n_started == n_total_patterns ) break;
-
- pargs = worker_args[i];
- pargs->image = &images[n_started++];
-
- pthread_mutex_lock(&pargs->control_mutex);
- pargs->done = 0;
- pargs->start = 1;
- pargs->finish = 0;
- pthread_mutex_unlock(&pargs->control_mutex);
-
- worker_active[i] = 1;
- r = pthread_create(&workers[i], NULL, worker_thread, pargs);
- if ( r != 0 ) {
- worker_active[i] = 0;
- ERROR("Couldn't start thread %i\n", i);
- }
-
- }
-
- /* Keep threads busy until the end of the data */
- do {
-
- int i;
-
- for ( i=0; i<nthreads; i++ ) {
-
- struct process_args *pargs;
- int done;
-
- /* Spend time working, not managing threads */
- usleep(100000);
-
- /* Are we using this thread record at all? */
- if ( !worker_active[i] ) continue;
-
- /* Has the thread finished yet? */
- pargs = worker_args[i];
- pthread_mutex_lock(&pargs->control_mutex);
- done = pargs->done;
- pthread_mutex_unlock(&pargs->control_mutex);
- if ( !done ) continue;
-
- /* Reset "done" flag */
- pthread_mutex_lock(&pargs->control_mutex);
- pargs->done = 0;
- pthread_mutex_unlock(&pargs->control_mutex);
-
- n_done++;
- progress_bar(n_done, n_total_patterns, text);
- /* If there are no more patterns, "done" will remain
- * zero, so the last pattern will not be re-counted. */
- if ( n_started == n_total_patterns ) break;
-
- /* Start work on the next pattern */
- pargs->image = &images[n_started++];
- pthread_mutex_lock(&pargs->control_mutex);
- pargs->start = 1;
- pthread_mutex_unlock(&pargs->control_mutex);
-
- }
-
- } while ( n_started < n_total_patterns );
-
- /* Join threads */
- for ( i=0; i<nthreads; i++ ) {
-
- if ( !worker_active[i] ) continue;
-
- /* Tell the thread to exit */
- struct process_args *pargs = worker_args[i];
- pthread_mutex_lock(&pargs->control_mutex);
- pargs->finish = 1;
- pthread_mutex_unlock(&pargs->control_mutex);
-
- /* Wait for it to join */
- pthread_join(workers[i], NULL);
-
- if ( pargs->done ) {
- n_done++;
- progress_bar(n_done, n_total_patterns, text);
- } /* else this thread was not busy */
+ tasks = malloc(n_total_patterns * sizeof(struct refine_args));
+ for ( i=0; i<n_total_patterns; i++ ) {
- }
+ tasks[i].sym = sym;
+ tasks[i].obs = obs;
+ tasks[i].i_full = i_full;
+ tasks[i].image = &images[i];
- for ( i=0; i<nthreads; i++ ) {
- free(worker_args[i]);
}
-}
+ munch_threads(n_total_patterns, nthreads, "Refining",
+ refine_image, tasks);
-static void refine_all(struct image *images, int n_total_patterns,
- struct detector *det, const char *sym,
- ReflItemList *obs, double *i_full, int nthreads)
-{
- munch_threads(images, n_total_patterns, det, sym, obs, i_full, NULL,
- nthreads, refine_image, "Refining");
+ free(tasks);
}
@@ -338,12 +187,28 @@ static void estimate_full(struct image *images, int n_total_patterns,
{
int i;
unsigned int *cts;
+ struct integrate_args *tasks;
+ pthread_mutex_t list_lock = PTHREAD_MUTEX_INITIALIZER;
cts = new_list_count();
clear_items(obs);
- munch_threads(images, n_total_patterns, det, sym, obs, i_full, cts,
- nthreads, integrate_image, "Integrating");
+ tasks = malloc(n_total_patterns * sizeof(struct integrate_args));
+ for ( i=0; i<n_total_patterns; i++ ) {
+
+ tasks[i].sym = sym;
+ tasks[i].obs = obs;
+ tasks[i].i_full = i_full;
+ tasks[i].cts = cts;
+ tasks[i].list_lock = &list_lock;
+ tasks[i].image = &images[i];
+
+ }
+
+ munch_threads(n_total_patterns, nthreads, "Integrating",
+ integrate_image, tasks);
+
+ free(tasks);
/* Divide the totals to get the means */
for ( i=0; i<num_items(obs); i++ ) {