aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorThomas White <taw@bitwiz.org.uk>2010-10-10 22:28:09 +0200
committerThomas White <taw@physics.org>2012-02-22 15:27:02 +0100
commit313ef1a1bcc7d506730d7c3d4b19fc2dc393ace3 (patch)
tree934c7fefc99faf74c2d7909c754bfe0c6c45b317
parent6bd5c9e7384efaf44b4f9e7a70c9bcf57ef67ceb (diff)
reintegrate: Use new thread pool
-rw-r--r--src/Makefile.am3
-rw-r--r--src/Makefile.in5
-rw-r--r--src/reintegrate.c249
3 files changed, 77 insertions, 180 deletions
diff --git a/src/Makefile.am b/src/Makefile.am
index 6517a67d..46fb77b9 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -66,7 +66,8 @@ cubeit_LDADD = @LIBS@
reintegrate_SOURCES = reintegrate.c cell.c hdf5-file.c utils.c detector.c \
peaks.c image.c stream.c \
- index.c dirax.c templates.c geometry.c symmetry.c
+ index.c dirax.c templates.c geometry.c symmetry.c \
+ thread-pool.c
reintegrate_LDADD = @LIBS@
INCLUDES = "-I$(top_srcdir)/data"
diff --git a/src/Makefile.in b/src/Makefile.in
index 7bf59480..4ffa7701 100644
--- a/src/Makefile.in
+++ b/src/Makefile.in
@@ -125,7 +125,7 @@ am_reintegrate_OBJECTS = reintegrate.$(OBJEXT) cell.$(OBJEXT) \
hdf5-file.$(OBJEXT) utils.$(OBJEXT) detector.$(OBJEXT) \
peaks.$(OBJEXT) image.$(OBJEXT) stream.$(OBJEXT) \
index.$(OBJEXT) dirax.$(OBJEXT) templates.$(OBJEXT) \
- geometry.$(OBJEXT) symmetry.$(OBJEXT)
+ geometry.$(OBJEXT) symmetry.$(OBJEXT) thread-pool.$(OBJEXT)
reintegrate_OBJECTS = $(am_reintegrate_OBJECTS)
reintegrate_DEPENDENCIES =
am_render_hkl_OBJECTS = render_hkl.$(OBJEXT) cell.$(OBJEXT) \
@@ -307,7 +307,8 @@ cubeit_SOURCES = cubeit.c cell.c hdf5-file.c utils.c detector.c render.c \
cubeit_LDADD = @LIBS@
reintegrate_SOURCES = reintegrate.c cell.c hdf5-file.c utils.c detector.c \
peaks.c image.c stream.c \
- index.c dirax.c templates.c geometry.c symmetry.c
+ index.c dirax.c templates.c geometry.c symmetry.c \
+ thread-pool.c
reintegrate_LDADD = @LIBS@
INCLUDES = "-I$(top_srcdir)/data"
diff --git a/src/reintegrate.c b/src/reintegrate.c
index ceb1974e..5b9cf0f2 100644
--- a/src/reintegrate.c
+++ b/src/reintegrate.c
@@ -30,22 +30,11 @@
#include "peaks.h"
#include "stream.h"
#include "index.h"
+#include "thread-pool.h"
-#define MAX_THREADS (256)
-
-struct process_args
+struct static_integration_args
{
- char *filename;
- int id;
-
- /* Thread control */
- pthread_mutex_t control_mutex; /* Protects the scary stuff below */
- int start;
- int finish;
- int done;
-
- UnitCell *cell;
struct detector *det;
pthread_mutex_t *output_mutex; /* Protects 'stdout' */
int config_cmfilter;
@@ -57,6 +46,26 @@ struct process_args
};
+struct integration_args
+{
+ char *filename;
+ UnitCell *cell;
+
+ struct static_integration_args static_args;
+};
+
+
+struct queue_args
+{
+ FILE *fh;
+ const char *prefix;
+ int config_basename;
+
+ struct static_integration_args static_args;
+};
+
+
+
static void show_help(const char *s)
{
printf("Syntax: %s [options]\n\n", s);
@@ -88,8 +97,10 @@ static void show_help(const char *s)
}
-static void process_image(struct process_args *pargs)
+static void process_image(void *pg)
{
+ struct integration_args *apargs = pg;
+ struct static_integration_args *pargs = &apargs->static_args;
struct hdfile *hdfile;
struct image image;
@@ -97,8 +108,7 @@ static void process_image(struct process_args *pargs)
image.data = NULL;
image.flags = NULL;
image.indexed_cell = NULL;
- image.id = pargs->id;
- image.filename = pargs->filename;
+ image.filename = apargs->filename;
image.hits = NULL;
image.n_hits = 0;
image.det = pargs->det;
@@ -109,9 +119,9 @@ static void process_image(struct process_args *pargs)
image.orientation.y = 0.0;
image.orientation.z = 0.0;
- STATUS("Processing '%s'\n", pargs->filename);
+ STATUS("Processing '%s'\n", apargs->filename);
- hdfile = hdfile_open(pargs->filename);
+ hdfile = hdfile_open(apargs->filename);
if ( hdfile == NULL ) {
return;
} else if ( hdfile_set_first_image(hdfile, "/") ) {
@@ -132,50 +142,51 @@ static void process_image(struct process_args *pargs)
} else {
- output_intensities(&image, pargs->cell,
+ output_intensities(&image, apargs->cell,
pargs->output_mutex, pargs->config_polar,
pargs->config_sa, pargs->config_closer,
stdout, 0, 0.1);
}
free(image.data);
- cell_free(pargs->cell);
if ( image.flags != NULL ) free(image.flags);
hdfile_close(hdfile);
+
+ free(apargs->filename);
+ cell_free(apargs->cell);
+ free(apargs);
}
-static void *worker_thread(void *pargsv)
+static void *get_image(void *qp)
{
- struct process_args *pargs = pargsv;
- int finish;
-
- do {
-
- int wakeup;
-
- process_image(pargs);
+ struct integration_args *pargs;
+ struct queue_args *qargs = qp;
+ UnitCell *cell;
+ char *filename;
- pthread_mutex_lock(&pargs->control_mutex);
- pargs->done = 1;
- pargs->start = 0;
- pthread_mutex_unlock(&pargs->control_mutex);
+ /* Get the next filename */
+ if ( find_chunk(qargs->fh, &cell, &filename) ) {
+ return NULL;
+ }
- /* Go to sleep until told to exit or process next image */
- do {
+ pargs = malloc(sizeof(struct integration_args));
- pthread_mutex_lock(&pargs->control_mutex);
- /* Either of these can result in the thread waking up */
- wakeup = pargs->start || pargs->finish;
- finish = pargs->finish;
- pthread_mutex_unlock(&pargs->control_mutex);
- usleep(20000);
+ if ( qargs->config_basename ) {
+ char *tmp;
+ tmp = strdup(basename(filename));
+ free(filename);
+ filename = tmp;
+ }
- } while ( !wakeup );
+ memcpy(&pargs->static_args, &qargs->static_args,
+ sizeof(struct static_integration_args));
- } while ( !pargs->finish );
+ pargs->cell = cell;
+ pargs->filename = malloc(1024);
+ snprintf(pargs->filename, 1023, "%s%s", qargs->prefix, filename);
- return NULL;
+ return pargs;
}
@@ -185,141 +196,25 @@ static void integrate_all(int nthreads, struct detector *det, FILE *fh,
int config_satcorr, int config_sa, int config_closer,
int config_sanity)
{
- pthread_t workers[MAX_THREADS];
- struct process_args *worker_args[MAX_THREADS];
- int worker_active[MAX_THREADS];
- int i;
- int rval;
+ struct queue_args qargs;
pthread_mutex_t output_mutex = PTHREAD_MUTEX_INITIALIZER;
- /* Initialise worker arguments */
- for ( i=0; i<nthreads; i++ ) {
-
- worker_args[i] = malloc(sizeof(struct process_args));
- worker_args[i]->filename = malloc(1024);
- worker_active[i] = 0;
- worker_args[i]->det = det;
- worker_args[i]->config_cmfilter = config_cmfilter;
- worker_args[i]->config_polar = config_polar;
- worker_args[i]->config_sanity = config_sanity;
- worker_args[i]->config_satcorr = config_satcorr;
- worker_args[i]->config_sa = config_sa;
- worker_args[i]->config_closer = config_closer;
- pthread_mutex_init(&worker_args[i]->control_mutex, NULL);
- worker_args[i]->output_mutex = &output_mutex;
-
- }
-
- /* Start threads off */
- for ( i=0; i<nthreads; i++ ) {
-
- struct process_args *pargs;
- int r;
- int rval;
- char *filename;
- UnitCell *cell;
-
- pargs = worker_args[i];
-
- /* Get the next filename */
- rval = find_chunk(fh, &cell, &filename);
- if ( rval == 1 ) break;
- if ( config_basename ) {
- char *tmp;
- tmp = strdup(basename(filename));
- free(filename);
- filename = tmp;
- }
- snprintf(pargs->filename, 1023, "%s%s",
- prefix, filename);
- pargs->cell = cell;
- free(filename);
-
- pthread_mutex_lock(&pargs->control_mutex);
- pargs->done = 0;
- pargs->start = 1;
- pargs->finish = 0;
- pthread_mutex_unlock(&pargs->control_mutex);
-
- worker_active[i] = 1;
- r = pthread_create(&workers[i], NULL, worker_thread, pargs);
- if ( r != 0 ) {
- worker_active[i] = 0;
- ERROR("Couldn't start thread %i\n", i);
- }
-
- }
-
- /* Keep threads busy until the end of the data */
- do {
-
- int i;
- rval = 0;
-
- for ( i=0; i<nthreads; i++ ) {
-
- struct process_args *pargs;
- int done;
- char *filename;
- UnitCell *cell;
-
- /* Spend time working, not managing threads */
- usleep(100000);
-
- /* Are we using this thread record at all? */
- if ( !worker_active[i] ) continue;
-
- /* Has the thread finished yet? */
- pargs = worker_args[i];
- pthread_mutex_lock(&pargs->control_mutex);
- done = pargs->done;
- pthread_mutex_unlock(&pargs->control_mutex);
- if ( !done ) continue;
-
- /* Get the next filename */
- rval = find_chunk(fh, &cell, &filename);
- if ( rval == 1 ) break;
- if ( config_basename ) {
- char *tmp;
- tmp = strdup(basename(filename));
- free(filename);
- filename = tmp;
- }
- snprintf(pargs->filename, 1023, "%s%s",
- prefix, filename);
- pargs->cell = cell;
- free(filename);
-
- /* Wake the thread up ... */
- pthread_mutex_lock(&pargs->control_mutex);
- pargs->done = 0;
- pargs->start = 1;
- pthread_mutex_unlock(&pargs->control_mutex);
-
- }
-
- } while ( rval == 0 );
-
- /* Join threads */
- for ( i=0; i<nthreads; i++ ) {
-
- if ( !worker_active[i] ) goto free;
-
- /* Tell the thread to exit */
- struct process_args *pargs = worker_args[i];
- pthread_mutex_lock(&pargs->control_mutex);
- pargs->finish = 1;
- pthread_mutex_unlock(&pargs->control_mutex);
-
- /* Wait for it to join */
- pthread_join(workers[i], NULL);
-
- free:
- if ( worker_args[i]->filename != NULL ) {
- free(worker_args[i]->filename);
- }
-
- }
+ /* Information required to choose the next image */
+ qargs.fh = fh;
+ qargs.prefix = prefix;
+ qargs.config_basename = config_basename;
+
+ /* Information for the task which does not vary */
+ qargs.static_args.det = det;
+ qargs.static_args.config_cmfilter = config_cmfilter;
+ qargs.static_args.config_polar = config_polar;
+ qargs.static_args.config_satcorr = config_satcorr;
+ qargs.static_args.config_sa = config_sa;
+ qargs.static_args.config_closer = config_closer;
+ qargs.static_args.config_sanity = qargs.static_args.config_sanity;
+ qargs.static_args.output_mutex = &output_mutex;
+
+ run_threads(nthreads, process_image, get_image, &qargs, 0);
}