aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorThomas White <taw@bitwiz.org.uk>2010-10-10 15:20:07 +0200
committerThomas White <taw@physics.org>2012-02-22 15:27:02 +0100
commit5a9ad87cc6534c046d23347241fbbcf43b5e3715 (patch)
treefb3fa913324645d4409078d4ce63adf57345a04c
parent33a8fc19c75323cc5447add5f268258ae518aff0 (diff)
facetron: Use new thread pool
-rw-r--r--Makefile.am2
-rw-r--r--Makefile.in2
-rw-r--r--src/Makefile.am2
-rw-r--r--src/Makefile.in5
-rw-r--r--src/facetron.c251
-rw-r--r--src/thread-pool.c123
-rw-r--r--src/thread-pool.h25
7 files changed, 212 insertions, 198 deletions
diff --git a/Makefile.am b/Makefile.am
index d57c60f5..28617e06 100644
--- a/Makefile.am
+++ b/Makefile.am
@@ -10,5 +10,5 @@ EXTRA_DIST = configure src/cell.h src/hdf5-file.h src/image.h \
data/sfac/Ca.nff data/sfac/C.nff data/sfac/Fe.nff data/sfac/H.nff \
data/sfac/Mg.nff data/sfac/N.nff data/sfac/O.nff data/sfac/P.nff \
data/sfac/S.nff data/sfac/f0_WaasKirf.dat src/render_hkl.h \
- src/stream.h
+ src/stream.h src/thread-pool.h
SUBDIRS = src data doc doc/examples scripts
diff --git a/Makefile.in b/Makefile.in
index 5a2466e5..cb24fdaa 100644
--- a/Makefile.in
+++ b/Makefile.in
@@ -208,7 +208,7 @@ EXTRA_DIST = configure src/cell.h src/hdf5-file.h src/image.h \
data/sfac/Ca.nff data/sfac/C.nff data/sfac/Fe.nff data/sfac/H.nff \
data/sfac/Mg.nff data/sfac/N.nff data/sfac/O.nff data/sfac/P.nff \
data/sfac/S.nff data/sfac/f0_WaasKirf.dat src/render_hkl.h \
- src/stream.h
+ src/stream.h src/thread-pool.h
SUBDIRS = src data doc doc/examples scripts
all: config.h
diff --git a/src/Makefile.am b/src/Makefile.am
index 721c80f1..6733dc3a 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -57,7 +57,7 @@ calibrate_detector_SOURCES = calibrate_detector.c utils.c hdf5-file.c image.c \
calibrate_detector_LDADD = @LIBS@
facetron_SOURCES = facetron.c cell.c hdf5-file.c utils.c detector.c peaks.c \
- image.c geometry.c reflections.c stream.c
+ image.c geometry.c reflections.c stream.c thread-pool.c
facetron_LDADD = @LIBS@
cubeit_SOURCES = cubeit.c cell.c hdf5-file.c utils.c detector.c render.c \
diff --git a/src/Makefile.in b/src/Makefile.in
index 7cfc3494..c27c0ec9 100644
--- a/src/Makefile.in
+++ b/src/Makefile.in
@@ -73,7 +73,7 @@ cubeit_DEPENDENCIES =
am_facetron_OBJECTS = facetron.$(OBJEXT) cell.$(OBJEXT) \
hdf5-file.$(OBJEXT) utils.$(OBJEXT) detector.$(OBJEXT) \
peaks.$(OBJEXT) image.$(OBJEXT) geometry.$(OBJEXT) \
- reflections.$(OBJEXT) stream.$(OBJEXT)
+ reflections.$(OBJEXT) stream.$(OBJEXT) thread-pool.$(OBJEXT)
facetron_OBJECTS = $(am_facetron_OBJECTS)
facetron_DEPENDENCIES =
am_get_hkl_OBJECTS = get_hkl.$(OBJEXT) sfac.$(OBJEXT) cell.$(OBJEXT) \
@@ -298,7 +298,7 @@ calibrate_detector_SOURCES = calibrate_detector.c utils.c hdf5-file.c image.c \
calibrate_detector_LDADD = @LIBS@
facetron_SOURCES = facetron.c cell.c hdf5-file.c utils.c detector.c peaks.c \
- image.c geometry.c reflections.c stream.c
+ image.c geometry.c reflections.c stream.c thread-pool.c
facetron_LDADD = @LIBS@
cubeit_SOURCES = cubeit.c cell.c hdf5-file.c utils.c detector.c render.c \
@@ -458,6 +458,7 @@ distclean-compile:
@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/stream.Po@am__quote@
@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/symmetry.Po@am__quote@
@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/templates.Po@am__quote@
+@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/thread-pool.Po@am__quote@
@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/utils.Po@am__quote@
.c.o:
diff --git a/src/facetron.c b/src/facetron.c
index 5d760b8a..3dc7d486 100644
--- a/src/facetron.c
+++ b/src/facetron.c
@@ -20,8 +20,6 @@
#include <string.h>
#include <unistd.h>
#include <getopt.h>
-#include <pthread.h>
-#include <sys/time.h>
#include <assert.h>
#include "utils.h"
@@ -31,30 +29,7 @@
#include "stream.h"
#include "geometry.h"
#include "peaks.h"
-
-
-#define MAX_THREADS (256)
-
-struct process_args
-{
- struct image *image;
-
- /* Thread control */
- pthread_mutex_t control_mutex; /* Protects the scary stuff below */
- int start;
- int finish;
- int done;
-
- /* Analysis routine */
- void (*func)(struct process_args *);
-
- /* Analysis parameters */
- const char *sym;
- pthread_mutex_t *list_lock; /* Protects 'obs', 'i_full' and 'cts' */
- ReflItemList *obs;
- double *i_full;
- unsigned int *cts;
-};
+#include "thread-pool.h"
static void show_help(const char *s)
@@ -79,14 +54,38 @@ static void show_help(const char *s)
}
-static void refine_image(struct process_args *pargs)
+struct refine_args
{
+ const char *sym;
+ ReflItemList *obs;
+ double *i_full;
+ struct image *image;
+};
+
+
+static void refine_image(int mytask, void *tasks)
+{
+ struct refine_args *all_args = tasks;
+ struct refine_args *pargs = &all_args[mytask];
/* Do, er, something. */
}
-static void integrate_image(struct process_args *pargs)
+struct integrate_args
+{
+ const char *sym;
+ ReflItemList *obs;
+ double *i_full;
+ unsigned int *cts;
+ pthread_mutex_t *list_lock;
+ struct image *image;
+};
+
+
+static void integrate_image(int mytask, void *tasks)
{
+ struct integrate_args *all_args = tasks;
+ struct integrate_args *pargs = &all_args[mytask];
struct reflhit *spots;
int j, n;
struct hdfile *hdfile;
@@ -158,177 +157,27 @@ static void integrate_image(struct process_args *pargs)
}
-static void *worker_thread(void *pargsv)
-{
- struct process_args *pargs = pargsv;
- int finish;
-
- do {
-
- int wakeup;
-
- /* Acknowledge start */
- pthread_mutex_lock(&pargs->control_mutex);
- pargs->start = 0;
- pthread_mutex_unlock(&pargs->control_mutex);
-
- pargs->func(pargs);
-
- pthread_mutex_lock(&pargs->control_mutex);
- pargs->done = 1;
- pthread_mutex_unlock(&pargs->control_mutex);
-
- /* Go to sleep until told to exit or process next image */
- do {
-
- pthread_mutex_lock(&pargs->control_mutex);
- /* Either of these can result in the thread waking up */
- wakeup = pargs->start || pargs->finish;
- finish = pargs->finish;
- pthread_mutex_unlock(&pargs->control_mutex);
- usleep(20000);
-
- } while ( !wakeup );
-
- } while ( !pargs->finish );
-
- return NULL;
-}
-
-
-static void munch_threads(struct image *images, int n_total_patterns,
- struct detector *det, const char *sym,
- ReflItemList *obs, double *i_full, unsigned int *cts,
- int nthreads, void (*func)(struct process_args *),
- const char *text)
+static void refine_all(struct image *images, int n_total_patterns,
+ struct detector *det, const char *sym,
+ ReflItemList *obs, double *i_full, int nthreads)
{
- pthread_t workers[MAX_THREADS];
- struct process_args *worker_args[MAX_THREADS];
- pthread_mutex_t list_lock = PTHREAD_MUTEX_INITIALIZER;
- int worker_active[MAX_THREADS];
+ struct refine_args *tasks;
int i;
- int n_done = 0;
- int n_started = 0;
-
- /* Initialise worker arguments with the unchanging data */
- for ( i=0; i<nthreads; i++ ) {
-
- worker_args[i] = malloc(sizeof(struct process_args));
- worker_active[i] = 0;
- pthread_mutex_init(&worker_args[i]->control_mutex, NULL);
- worker_args[i]->sym = sym;
- worker_args[i]->obs = obs;
- worker_args[i]->i_full = i_full;
- worker_args[i]->cts = cts;
- worker_args[i]->list_lock = &list_lock;
- worker_args[i]->func = func;
-
- }
-
- /* Start threads off */
- for ( i=0; i<nthreads; i++ ) {
-
- struct process_args *pargs;
- int r;
-
- if ( n_started == n_total_patterns ) break;
-
- pargs = worker_args[i];
- pargs->image = &images[n_started++];
-
- pthread_mutex_lock(&pargs->control_mutex);
- pargs->done = 0;
- pargs->start = 1;
- pargs->finish = 0;
- pthread_mutex_unlock(&pargs->control_mutex);
-
- worker_active[i] = 1;
- r = pthread_create(&workers[i], NULL, worker_thread, pargs);
- if ( r != 0 ) {
- worker_active[i] = 0;
- ERROR("Couldn't start thread %i\n", i);
- }
-
- }
-
- /* Keep threads busy until the end of the data */
- do {
-
- int i;
-
- for ( i=0; i<nthreads; i++ ) {
-
- struct process_args *pargs;
- int done;
-
- /* Spend time working, not managing threads */
- usleep(100000);
-
- /* Are we using this thread record at all? */
- if ( !worker_active[i] ) continue;
-
- /* Has the thread finished yet? */
- pargs = worker_args[i];
- pthread_mutex_lock(&pargs->control_mutex);
- done = pargs->done;
- pthread_mutex_unlock(&pargs->control_mutex);
- if ( !done ) continue;
-
- /* Reset "done" flag */
- pthread_mutex_lock(&pargs->control_mutex);
- pargs->done = 0;
- pthread_mutex_unlock(&pargs->control_mutex);
-
- n_done++;
- progress_bar(n_done, n_total_patterns, text);
- /* If there are no more patterns, "done" will remain
- * zero, so the last pattern will not be re-counted. */
- if ( n_started == n_total_patterns ) break;
-
- /* Start work on the next pattern */
- pargs->image = &images[n_started++];
- pthread_mutex_lock(&pargs->control_mutex);
- pargs->start = 1;
- pthread_mutex_unlock(&pargs->control_mutex);
-
- }
-
- } while ( n_started < n_total_patterns );
-
- /* Join threads */
- for ( i=0; i<nthreads; i++ ) {
-
- if ( !worker_active[i] ) continue;
-
- /* Tell the thread to exit */
- struct process_args *pargs = worker_args[i];
- pthread_mutex_lock(&pargs->control_mutex);
- pargs->finish = 1;
- pthread_mutex_unlock(&pargs->control_mutex);
-
- /* Wait for it to join */
- pthread_join(workers[i], NULL);
-
- if ( pargs->done ) {
- n_done++;
- progress_bar(n_done, n_total_patterns, text);
- } /* else this thread was not busy */
+ tasks = malloc(n_total_patterns * sizeof(struct refine_args));
+ for ( i=0; i<n_total_patterns; i++ ) {
- }
+ tasks[i].sym = sym;
+ tasks[i].obs = obs;
+ tasks[i].i_full = i_full;
+ tasks[i].image = &images[i];
- for ( i=0; i<nthreads; i++ ) {
- free(worker_args[i]);
}
-}
+ munch_threads(n_total_patterns, nthreads, "Refining",
+ refine_image, tasks);
-static void refine_all(struct image *images, int n_total_patterns,
- struct detector *det, const char *sym,
- ReflItemList *obs, double *i_full, int nthreads)
-{
- munch_threads(images, n_total_patterns, det, sym, obs, i_full, NULL,
- nthreads, refine_image, "Refining");
+ free(tasks);
}
@@ -338,12 +187,28 @@ static void estimate_full(struct image *images, int n_total_patterns,
{
int i;
unsigned int *cts;
+ struct integrate_args *tasks;
+ pthread_mutex_t list_lock = PTHREAD_MUTEX_INITIALIZER;
cts = new_list_count();
clear_items(obs);
- munch_threads(images, n_total_patterns, det, sym, obs, i_full, cts,
- nthreads, integrate_image, "Integrating");
+ tasks = malloc(n_total_patterns * sizeof(struct integrate_args));
+ for ( i=0; i<n_total_patterns; i++ ) {
+
+ tasks[i].sym = sym;
+ tasks[i].obs = obs;
+ tasks[i].i_full = i_full;
+ tasks[i].cts = cts;
+ tasks[i].list_lock = &list_lock;
+ tasks[i].image = &images[i];
+
+ }
+
+ munch_threads(n_total_patterns, nthreads, "Integrating",
+ integrate_image, tasks);
+
+ free(tasks);
/* Divide the totals to get the means */
for ( i=0; i<num_items(obs); i++ ) {
diff --git a/src/thread-pool.c b/src/thread-pool.c
new file mode 100644
index 00000000..b0d3d8c2
--- /dev/null
+++ b/src/thread-pool.c
@@ -0,0 +1,123 @@
+/*
+ * thread-pool.c
+ *
+ * A thread pool implementation
+ *
+ * (c) 2006-2010 Thomas White <taw@physics.org>
+ *
+ * Part of CrystFEL - crystallography with a FEL
+ *
+ */
+
+
+#ifdef HAVE_CONFIG_H
+#include <config.h>
+#endif
+
+
+#include <stdarg.h>
+#include <stdlib.h>
+#include <stdio.h>
+#include <string.h>
+#include <unistd.h>
+#include <pthread.h>
+
+#include "utils.h"
+
+
+struct task_queue
+{
+ pthread_mutex_t lock;
+
+ int n_tasks;
+ int *done;
+ int n_done;
+
+ void (*work)(int, void *);
+ void *work_args;
+
+ const char *text;
+};
+
+
+static void *worker_thread(void *pargsv)
+{
+ struct task_queue *q = pargsv;
+
+ do {
+
+ int i;
+ int found = 0;
+ int mytask = -1;
+
+ /* Get a task */
+ pthread_mutex_lock(&q->lock);
+ for ( i=0; i<q->n_tasks; i++ ) {
+ if ( q->done[i] == 0 ) {
+ mytask = i;
+ found = 1;
+ }
+ }
+ pthread_mutex_unlock(&q->lock);
+
+ /* No more tasks? */
+ if ( !found ) break;
+
+ q->work(mytask, q->work_args);
+
+ /* Mark this task as done, update totals etc */
+ pthread_mutex_lock(&q->lock);
+ q->done[mytask] = 1;
+ q->n_done++;
+ progress_bar(q->n_done, q->n_tasks, q->text);
+ pthread_mutex_unlock(&q->lock);
+
+ } while ( 1 );
+
+ return NULL;
+}
+
+
+void munch_threads(int n_tasks, int n_threads, const char *text,
+ void (*work)(int, void *), void *work_args)
+{
+ pthread_t *workers;
+ int i;
+ struct task_queue q;
+
+ /* The nation of CrystFEL prides itself on having 0% unemployment. */
+ if ( n_threads > n_tasks ) n_threads = n_tasks;
+
+ workers = malloc(n_threads * sizeof(pthread_t));
+
+ q.done = malloc(n_tasks * sizeof(int));
+ pthread_mutex_init(&q.lock, NULL);
+ q.n_tasks = n_tasks;
+ q.work = work;
+ q.work_args = work_args;
+ q.n_done = 0;
+ q.text = text;
+
+ for ( i=0; i<n_tasks; i++ ) {
+ q.done[i] = 0;
+ }
+
+ /* Start threads */
+ for ( i=0; i<n_threads; i++ ) {
+
+ if ( pthread_create(&workers[i], NULL, worker_thread, &q) ) {
+ ERROR("Couldn't start thread %i\n", i);
+ n_threads = i;
+ break;
+ }
+
+ }
+
+ /* Join threads */
+ for ( i=0; i<n_threads; i++ ) {
+ pthread_join(workers[i], NULL);
+ }
+
+ free(q.done);
+ free(workers);
+}
diff --git a/src/thread-pool.h b/src/thread-pool.h
new file mode 100644
index 00000000..4439f970
--- /dev/null
+++ b/src/thread-pool.h
@@ -0,0 +1,25 @@
+/*
+ * thread-pool.h
+ *
+ * A thread pool implementation
+ *
+ * (c) 2006-2010 Thomas White <taw@physics.org>
+ *
+ * Part of CrystFEL - crystallography with a FEL
+ *
+ */
+
+
+#ifndef THREAD_POOL_H
+#define THREAD_POOL_H
+
+#ifdef HAVE_CONFIG_H
+#include <config.h>
+#endif
+
+
+extern void munch_threads(int n_tasks, int n_threads, const char *text,
+ void (*work)(int, void *), void *work_args);
+
+
+#endif /* THREAD_POOL_H */