aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorThomas White <taw@bitwiz.org.uk>2010-10-10 20:32:41 +0200
committerThomas White <taw@physics.org>2012-02-22 15:27:02 +0100
commit33d4fbfbbee784f734632e543a75222f38bc807d (patch)
treec9281f611a9da26433e4c3770ffce440ef25d3f9
parenta9adbc73158e8f8b225cd59d62ad5ade648c8241 (diff)
calibrate_detecotr: Use new thread pool
-rw-r--r--src/Makefile.am2
-rw-r--r--src/Makefile.in4
-rw-r--r--src/calibrate_detector.c287
-rw-r--r--src/facetron.c8
-rw-r--r--src/thread-pool.c109
-rw-r--r--src/thread-pool.h16
6 files changed, 195 insertions, 231 deletions
diff --git a/src/Makefile.am b/src/Makefile.am
index 6733dc3a..ee20d0b3 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -53,7 +53,7 @@ render_hkl_SOURCES = render_hkl.c cell.c reflections.c utils.c povray.c \
render_hkl_LDADD = @LIBS@
calibrate_detector_SOURCES = calibrate_detector.c utils.c hdf5-file.c image.c \
- filters.c peaks.c detector.c cell.c
+ filters.c peaks.c detector.c cell.c thread-pool.c
calibrate_detector_LDADD = @LIBS@
facetron_SOURCES = facetron.c cell.c hdf5-file.c utils.c detector.c peaks.c \
diff --git a/src/Makefile.in b/src/Makefile.in
index c27c0ec9..c7ed213a 100644
--- a/src/Makefile.in
+++ b/src/Makefile.in
@@ -56,7 +56,7 @@ PROGRAMS = $(bin_PROGRAMS)
am_calibrate_detector_OBJECTS = calibrate_detector.$(OBJEXT) \
utils.$(OBJEXT) hdf5-file.$(OBJEXT) image.$(OBJEXT) \
filters.$(OBJEXT) peaks.$(OBJEXT) detector.$(OBJEXT) \
- cell.$(OBJEXT)
+ cell.$(OBJEXT) thread-pool.$(OBJEXT)
calibrate_detector_OBJECTS = $(am_calibrate_detector_OBJECTS)
calibrate_detector_DEPENDENCIES =
am_compare_hkl_OBJECTS = compare_hkl.$(OBJEXT) sfac.$(OBJEXT) \
@@ -294,7 +294,7 @@ render_hkl_SOURCES = render_hkl.c cell.c reflections.c utils.c povray.c \
render_hkl_LDADD = @LIBS@
calibrate_detector_SOURCES = calibrate_detector.c utils.c hdf5-file.c image.c \
- filters.c peaks.c detector.c cell.c
+ filters.c peaks.c detector.c cell.c thread-pool.c
calibrate_detector_LDADD = @LIBS@
facetron_SOURCES = facetron.c cell.c hdf5-file.c utils.c detector.c peaks.c \
diff --git a/src/calibrate_detector.c b/src/calibrate_detector.c
index e5945015..2c68f645 100644
--- a/src/calibrate_detector.c
+++ b/src/calibrate_detector.c
@@ -27,6 +27,7 @@
#include "hdf5-file.h"
#include "filters.h"
#include "peaks.h"
+#include "thread-pool.h"
#define INTEGRATION_RADIUS (10)
@@ -39,10 +40,9 @@ typedef enum
SUM_PEAKS
} SumMethod;
-struct process_args
+struct sum_args
{
char *filename;
- int id;
int config_cmfilter;
int config_noisefilter;
double *sum;
@@ -50,12 +50,20 @@ struct process_args
int h;
SumMethod sum_method;
double threshold;
+};
+
- /* Thread control */
- pthread_mutex_t control_mutex; /* Protects the scary stuff below */
- int start;
- int finish;
- int done;
+struct queue_args
+{
+ FILE *fh;
+ char *prefix;
+ int config_cmfilter;
+ int config_noisefilter;
+ double *sum;
+ int w;
+ int h;
+ SumMethod sum_method;
+ double threshold;
};
@@ -151,8 +159,9 @@ static void sum_threshold(struct image *image, double *sum, double threshold)
}
-static void process_image(struct process_args *pargs)
+static void add_image(void *args)
{
+ struct sum_args *pargs = args;
struct hdfile *hdfile;
struct image image;
@@ -160,7 +169,6 @@ static void process_image(struct process_args *pargs)
image.data = NULL;
image.flags = NULL;
image.indexed_cell = NULL;
- image.id = pargs->id;
image.filename = pargs->filename;
image.hits = NULL;
image.n_hits = 0;
@@ -215,71 +223,38 @@ out:
image_feature_list_free(image.features);
if ( image.flags != NULL ) free(image.flags);
hdfile_close(hdfile);
-}
-
-
-static void *worker_thread(void *pargsv)
-{
- struct process_args *pargs = pargsv;
- int finish;
-
- do {
-
- int wakeup;
-
- process_image(pargs);
- pthread_mutex_lock(&pargs->control_mutex);
- pargs->done = 1;
- pargs->start = 0;
- pthread_mutex_unlock(&pargs->control_mutex);
-
- /* Go to sleep until told to exit or process next image */
- do {
-
- pthread_mutex_lock(&pargs->control_mutex);
- /* Either of these can result in the thread waking up */
- wakeup = pargs->start || pargs->finish;
- finish = pargs->finish;
- pthread_mutex_unlock(&pargs->control_mutex);
- usleep(20000);
-
- } while ( !wakeup );
-
- } while ( !pargs->finish );
-
- return NULL;
+ free(pargs->filename);
+ free(pargs);
}
-static void dump_to_file(struct process_args *worker_args[], int nthreads,
- int w, int h, int n, const char *stem)
+static void *get_image(void *qp)
{
- int i;
- double *total;
- char outfile[256];
-
- total = calloc(w*h, sizeof(double));
-
- /* Add the individual sums to the 0th sum */
- for ( i=0; i<nthreads; i++ ) {
-
- int x, y;
-
- for ( x=0; x<w; x++ ) {
- for ( y=0; y<h; y++ ) {
- double val = worker_args[i]->sum[x+w*y];
- total[x+w*y] += val;
- }
- }
-
- }
-
- snprintf(outfile, 255, "%s-%i.h5", stem, n);
-
- hdf5_write(outfile, total, w, h, H5T_NATIVE_DOUBLE);
-
- free(total);
+ char line[1024];
+ struct sum_args *pargs;
+ char *rval;
+ struct queue_args *qargs = qp;
+
+ /* Get the next filename */
+ rval = fgets(line, 1023, qargs->fh);
+ if ( rval == NULL ) return NULL;
+
+ pargs = malloc(sizeof(struct sum_args));
+
+ pargs->w = qargs->w;
+ pargs->h = qargs->h;
+ pargs->sum_method = qargs->sum_method;
+ pargs->threshold = qargs->threshold;
+ pargs->config_cmfilter = qargs->config_cmfilter;
+ pargs->config_noisefilter = qargs->config_noisefilter;
+ pargs->sum = qargs->sum;
+
+ chomp(line);
+ pargs->filename = malloc(strlen(qargs->prefix) + strlen(line) + 1);
+ snprintf(pargs->filename, 1023, "%s%s", qargs->prefix, line);
+
+ return pargs;
}
@@ -289,8 +264,7 @@ int main(int argc, char *argv[])
char *filename = NULL;
char *outfile = NULL;
FILE *fh;
- char *rval = NULL;
- int n_images;
+ int n_images = 0;
int config_cmfilter = 0;
int config_noisefilter = 0;
char *prefix = NULL;
@@ -299,12 +273,9 @@ int main(int argc, char *argv[])
double threshold = 400.0;
SumMethod sum;
int nthreads = 1;
- pthread_t workers[MAX_THREADS];
- struct process_args *worker_args[MAX_THREADS];
- int worker_active[MAX_THREADS];
- int i;
- const int w = 1024; /* FIXME! */
- const int h = 1024; /* FIXME! */
+ struct queue_args qargs;
+ int n_done;
+ const int chunk_size = 1000;
/* Long options */
const struct option longopts[] = {
@@ -402,157 +373,43 @@ int main(int argc, char *argv[])
outfile = strdup("summed.h5");
}
- if ( (nthreads == 0) || (nthreads > MAX_THREADS) ) {
+ if ( nthreads == 0 ) {
ERROR("Invalid number of threads.\n");
return 1;
}
- /* Initialise worker arguments */
- for ( i=0; i<nthreads; i++ ) {
-
- worker_args[i] = malloc(sizeof(struct process_args));
- worker_args[i]->filename = malloc(1024);
- worker_args[i]->sum = calloc(w*h, sizeof(double));
- worker_active[i] = 0;
-
- worker_args[i]->w = w;
- worker_args[i]->h = h;
- worker_args[i]->sum_method = sum;
- worker_args[i]->threshold = threshold;
-
- }
-
- n_images = 0;
-
- /* Start threads off */
- for ( i=0; i<nthreads; i++ ) {
-
- char line[1024];
- struct process_args *pargs;
- int r;
-
- pargs = worker_args[i];
+ qargs.w = 1024; /* FIXME! */
+ qargs.h = 1024; /* FIXME! */
+ qargs.sum_method = sum;
+ qargs.threshold = threshold;
+ qargs.config_cmfilter = config_cmfilter;
+ qargs.config_noisefilter = config_noisefilter;
+ qargs.sum = calloc(qargs.w*qargs.h, sizeof(double));
+ qargs.prefix = prefix;
+ qargs.fh = fh;
- rval = fgets(line, 1023, fh);
- if ( rval == NULL ) continue;
- chomp(line);
- snprintf(pargs->filename, 1023, "%s%s", prefix, line);
-
- n_images++;
-
- pthread_mutex_init(&pargs->control_mutex, NULL);
- pargs->config_cmfilter = config_cmfilter;
- pargs->config_noisefilter = config_noisefilter;
- pthread_mutex_lock(&pargs->control_mutex);
- pargs->done = 0;
- pargs->start = 1;
- pargs->finish = 0;
- pthread_mutex_unlock(&pargs->control_mutex);
-
-
- worker_active[i] = 1;
- r = pthread_create(&workers[i], NULL, worker_thread, pargs);
- if ( r != 0 ) {
- worker_active[i] = 0;
- ERROR("Couldn't start thread %i\n", i);
- }
-
- }
-
- /* Keep threads busy until the end of the data */
do {
- int i;
-
- for ( i=0; i<nthreads; i++ ) {
-
- char line[1024];
- struct process_args *pargs;
- int done;
-
- /* Spend time working, not managing threads */
- usleep(100000);
-
- /* Are we using this thread record at all? */
- if ( !worker_active[i] ) continue;
-
- /* Has the thread finished yet? */
- pargs = worker_args[i];
- pthread_mutex_lock(&pargs->control_mutex);
- done = pargs->done;
- pthread_mutex_unlock(&pargs->control_mutex);
- if ( !done ) continue;
-
- /* Get the next filename */
- rval = fgets(line, 1023, fh);
- if ( rval == NULL ) break;
- chomp(line);
- snprintf(pargs->filename, 1023, "%s%s", prefix, line);
+ n_done = run_threads(nthreads, add_image, get_image,
+ (void *)&qargs, chunk_size);
- n_images++;
-
- STATUS("Done %i images\n", n_images);
-
- /* Wake the thread up ... */
- pthread_mutex_lock(&pargs->control_mutex);
- pargs->done = 0;
- pargs->start = 1;
- pthread_mutex_unlock(&pargs->control_mutex);
-
- if ( n_images % 1000 == 0 ) {
- if ( intermediate != NULL ) {
- dump_to_file(worker_args, nthreads,
- w, h, n_images,
- intermediate);
- }
- }
+ n_images += n_done;
+ /* Write intermediate sum if requested */
+ if ( (intermediate != NULL) && (n_done == chunk_size) ) {
+ char outfile[1024];
+ snprintf(outfile, 1023, "%s-%i.h5",
+ intermediate, n_images);
+ hdf5_write(outfile, qargs.sum, qargs.w, qargs.h,
+ H5T_NATIVE_DOUBLE);
}
- } while ( rval != NULL );
-
- /* Join threads */
- for ( i=0; i<nthreads; i++ ) {
-
- if ( !worker_active[i] ) goto free;
-
-
- /* Tell the thread to exit */
- struct process_args *pargs = worker_args[i];
- pthread_mutex_lock(&pargs->control_mutex);
- pargs->finish = 1;
- pthread_mutex_unlock(&pargs->control_mutex);
-
- /* Wait for it to join */
- pthread_join(workers[i], NULL);
-
- free:
- if ( worker_args[i]->filename != NULL ) {
- free(worker_args[i]->filename);
- }
-
- }
-
- /* Add the individual sums to the 0th sum */
- for ( i=1; i<nthreads; i++ ) {
-
- int x, y;
-
- for ( x=0; x<w; x++ ) {
- for ( y=0; y<h; y++ ) {
- double val = worker_args[i]->sum[x+w*y];
- worker_args[0]->sum[x+w*y] += val;
- }
- }
- free(worker_args[i]->sum);
- free(worker_args[i]);
-
- }
+ } while ( n_done == chunk_size );
- hdf5_write(outfile, worker_args[0]->sum, w, h, H5T_NATIVE_DOUBLE);
+ /* Write the final output */
+ hdf5_write(outfile, qargs.sum, qargs.w, qargs.h, H5T_NATIVE_DOUBLE);
- free(worker_args[0]->sum);
- free(worker_args[0]);
+ free(qargs.sum);
free(prefix);
free(outfile);
if ( intermediate != NULL ) free(intermediate);
diff --git a/src/facetron.c b/src/facetron.c
index 3dc7d486..27fc1608 100644
--- a/src/facetron.c
+++ b/src/facetron.c
@@ -174,8 +174,8 @@ static void refine_all(struct image *images, int n_total_patterns,
}
- munch_threads(n_total_patterns, nthreads, "Refining",
- refine_image, tasks);
+ run_thread_range(n_total_patterns, nthreads, "Refining",
+ refine_image, tasks);
free(tasks);
}
@@ -205,8 +205,8 @@ static void estimate_full(struct image *images, int n_total_patterns,
}
- munch_threads(n_total_patterns, nthreads, "Integrating",
- integrate_image, tasks);
+ run_thread_range(n_total_patterns, nthreads, "Integrating",
+ integrate_image, tasks);
free(tasks);
diff --git a/src/thread-pool.c b/src/thread-pool.c
index ac508b11..358fb4f3 100644
--- a/src/thread-pool.c
+++ b/src/thread-pool.c
@@ -25,6 +25,8 @@
#include "utils.h"
+/* ---------------------------------- Range --------------------------------- */
+
enum {
TASK_READY,
TASK_RUNNING,
@@ -32,7 +34,7 @@ enum {
};
-struct task_queue
+struct task_queue_range
{
pthread_mutex_t lock;
@@ -47,9 +49,9 @@ struct task_queue
};
-static void *worker_thread(void *pargsv)
+static void *range_worker(void *pargsv)
{
- struct task_queue *q = pargsv;
+ struct task_queue_range *q = pargsv;
do {
@@ -89,12 +91,12 @@ static void *worker_thread(void *pargsv)
}
-void munch_threads(int n_tasks, int n_threads, const char *text,
- void (*work)(int, void *), void *work_args)
+void run_thread_range(int n_tasks, int n_threads, const char *text,
+ void (*work)(int, void *), void *work_args)
{
pthread_t *workers;
int i;
- struct task_queue q;
+ struct task_queue_range q;
/* The nation of CrystFEL prides itself on having 0% unemployment. */
if ( n_threads > n_tasks ) n_threads = n_tasks;
@@ -116,7 +118,7 @@ void munch_threads(int n_tasks, int n_threads, const char *text,
/* Start threads */
for ( i=0; i<n_threads; i++ ) {
- if ( pthread_create(&workers[i], NULL, worker_thread, &q) ) {
+ if ( pthread_create(&workers[i], NULL, range_worker, &q) ) {
ERROR("Couldn't start thread %i\n", i);
n_threads = i;
break;
@@ -132,3 +134,96 @@ void munch_threads(int n_tasks, int n_threads, const char *text,
free(q.status);
free(workers);
}
+
+
+/* ---------------------------- Custom get_task() --------------------------- */
+
+struct task_queue
+{
+ pthread_mutex_t lock;
+
+ int n_started;
+ int n_completed;
+ int max;
+
+ void *(*get_task)(void *);
+ void *queue_args;
+ void (*work)(void *);
+};
+
+
+static void *task_worker(void *pargsv)
+{
+ struct task_queue *q = pargsv;
+
+ do {
+
+ void *task;
+
+ /* Get a task */
+ pthread_mutex_lock(&q->lock);
+ if ( q->n_started >= q->max ) {
+ pthread_mutex_unlock(&q->lock);
+ break;
+ }
+ task = q->get_task(q->queue_args);
+
+ /* No more tasks? */
+ if ( task == NULL ) {
+ pthread_mutex_unlock(&q->lock);
+ break;
+ }
+
+ q->n_started++;
+ pthread_mutex_unlock(&q->lock);
+
+ q->work(task);
+
+ /* Update totals etc */
+ pthread_mutex_lock(&q->lock);
+ q->n_completed++;
+ pthread_mutex_unlock(&q->lock);
+
+ } while ( 1 );
+
+ return NULL;
+}
+
+
+int run_threads(int n_threads, void (*work)(void *),
+ void *(*get_task)(void *), void *queue_args, int max)
+{
+ pthread_t *workers;
+ int i;
+ struct task_queue q;
+
+ workers = malloc(n_threads * sizeof(pthread_t));
+
+ pthread_mutex_init(&q.lock, NULL);
+ q.work = work;
+ q.get_task = get_task;
+ q.queue_args = queue_args;
+ q.n_started = 0;
+ q.n_completed = 0;
+ q.max = max;
+
+ /* Start threads */
+ for ( i=0; i<n_threads; i++ ) {
+
+ if ( pthread_create(&workers[i], NULL, task_worker, &q) ) {
+ ERROR("Couldn't start thread %i\n", i);
+ n_threads = i;
+ break;
+ }
+
+ }
+
+ /* Join threads */
+ for ( i=0; i<n_threads; i++ ) {
+ pthread_join(workers[i], NULL);
+ }
+
+ free(workers);
+
+ return q.n_completed;
+}
diff --git a/src/thread-pool.h b/src/thread-pool.h
index 4439f970..11123493 100644
--- a/src/thread-pool.h
+++ b/src/thread-pool.h
@@ -18,8 +18,20 @@
#endif
-extern void munch_threads(int n_tasks, int n_threads, const char *text,
- void (*work)(int, void *), void *work_args);
+/* work() will be called with a number and work_args. The number will be
+ * unique and in the range 0..n_tasks. A progress bar will be shown using
+ * "text" and the progress through the tasks. */
+extern void run_thread_range(int n_tasks, int n_threads, const char *text,
+ void (*work)(int, void *), void *work_args);
+
+
+/* get_task() will be called every time a worker is idle. It returns either
+ * NULL, indicating that no further work is available, or a pointer which will
+ * be passed to work(). Work will stop after 'max' tasks have been processed.
+ * get_task() does not need to be re-entrant.
+ * Returns: the number of tasks processed. */
+extern int run_threads(int n_threads, void (*work)(void *),
+ void *(*get_task)(void *), void *queue_args, int max);
#endif /* THREAD_POOL_H */