aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorThomas White <taw@bitwiz.org.uk>2010-10-10 21:23:33 +0200
committerThomas White <taw@physics.org>2012-02-22 15:27:02 +0100
commite05bf79a5d1e1c1c2f55164485704d7bc3a8ec95 (patch)
tree1c16586d4928c9571b3ebcb58fe3d3507ad330e9
parent33d4fbfbbee784f734632e543a75222f38bc807d (diff)
cubeit: Use new thread pool
-rw-r--r--src/Makefile.am2
-rw-r--r--src/Makefile.in4
-rw-r--r--src/cubeit.c355
3 files changed, 135 insertions, 226 deletions
diff --git a/src/Makefile.am b/src/Makefile.am
index ee20d0b3..6517a67d 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -61,7 +61,7 @@ facetron_SOURCES = facetron.c cell.c hdf5-file.c utils.c detector.c peaks.c \
facetron_LDADD = @LIBS@
cubeit_SOURCES = cubeit.c cell.c hdf5-file.c utils.c detector.c render.c \
- filters.c image.c symmetry.c stream.c
+ filters.c image.c symmetry.c stream.c thread-pool.c
cubeit_LDADD = @LIBS@
reintegrate_SOURCES = reintegrate.c cell.c hdf5-file.c utils.c detector.c \
diff --git a/src/Makefile.in b/src/Makefile.in
index c7ed213a..7bf59480 100644
--- a/src/Makefile.in
+++ b/src/Makefile.in
@@ -67,7 +67,7 @@ compare_hkl_DEPENDENCIES =
am_cubeit_OBJECTS = cubeit.$(OBJEXT) cell.$(OBJEXT) \
hdf5-file.$(OBJEXT) utils.$(OBJEXT) detector.$(OBJEXT) \
render.$(OBJEXT) filters.$(OBJEXT) image.$(OBJEXT) \
- symmetry.$(OBJEXT) stream.$(OBJEXT)
+ symmetry.$(OBJEXT) stream.$(OBJEXT) thread-pool.$(OBJEXT)
cubeit_OBJECTS = $(am_cubeit_OBJECTS)
cubeit_DEPENDENCIES =
am_facetron_OBJECTS = facetron.$(OBJEXT) cell.$(OBJEXT) \
@@ -302,7 +302,7 @@ facetron_SOURCES = facetron.c cell.c hdf5-file.c utils.c detector.c peaks.c \
facetron_LDADD = @LIBS@
cubeit_SOURCES = cubeit.c cell.c hdf5-file.c utils.c detector.c render.c \
- filters.c image.c symmetry.c stream.c
+ filters.c image.c symmetry.c stream.c thread-pool.c
cubeit_LDADD = @LIBS@
reintegrate_SOURCES = reintegrate.c cell.c hdf5-file.c utils.c detector.c \
diff --git a/src/cubeit.c b/src/cubeit.c
index 4bc466a6..ef892340 100644
--- a/src/cubeit.c
+++ b/src/cubeit.c
@@ -20,11 +20,10 @@
#include <string.h>
#include <unistd.h>
#include <getopt.h>
-#include <pthread.h>
-#include <sys/time.h>
#include <assert.h>
#include <png.h>
#include <fenv.h>
+#include <pthread.h>
#include "utils.h"
#include "hdf5-file.h"
@@ -32,30 +31,28 @@
#include "render.h"
#include "symmetry.h"
#include "stream.h"
+#include "thread-pool.h"
-#define MAX_THREADS (256)
-
-struct process_args
+struct static_sum_args
{
- char *filename;
- int id;
-
- /* Thread control */
- pthread_mutex_t control_mutex; /* Protects the scary stuff below */
- int start;
- int finish;
- int done;
-
- UnitCell *cell;
- pthread_mutex_t vals_mutex; /* Protects "vals" */
+ pthread_mutex_t *vals_mutex; /* Protects "vals" */
double *vals;
int xs;
int ys;
int zs;
int config_angles;
- pthread_mutex_t angles_mutex; /* Protects "angles" */
+ pthread_mutex_t *angles_mutex; /* Protects "angles" */
unsigned int *angles;
+
+ pthread_mutex_t *cell_mutex; /* Protects "angles" */
+ double *as;
+ double *bs;
+ double *cs;
+ double *als;
+ double *bes;
+ double *gas;
+
struct detector *det;
signed int ht;
signed int kt;
@@ -64,6 +61,25 @@ struct process_args
};
+struct sum_args
+{
+ char *filename;
+ UnitCell *cell;
+
+ struct static_sum_args static_args;
+};
+
+
+struct queue_args
+{
+ FILE *fh;
+ char *prefix;
+ int config_basename;
+
+ struct static_sum_args static_args;
+};
+
+
static void show_help(const char *s)
{
printf("Syntax: %s [options]\n\n", s);
@@ -82,6 +98,24 @@ static void show_help(const char *s)
}
+static void add_to_mean(UnitCell *cell, double *ast, double *bst, double *cst,
+ double *alst, double *best, double *gast)
+{
+ double asx, asy, asz;
+ double bsx, bsy, bsz;
+ double csx, csy, csz;
+
+ cell_get_reciprocal(cell, &asx, &asy, &asz, &bsx, &bsy, &bsz,
+ &csx, &csy, &csz);
+ *ast += modulus(asx, asy, asz);
+ *bst += modulus(bsx, bsy, bsz);
+ *cst += modulus(csx, csy, csz);
+ *alst += angle_between(bsx, bsy, bsz, csx, csy, csz);
+ *best += angle_between(asx, asy, asz, csx, csy, csz);
+ *gast += angle_between(asx, asy, asz, bsx, bsy, bsz);
+}
+
+
static void interpolate_linear(double *vals, double v,
int xs, int ys, int zs,
int xv, int yv, double zv)
@@ -166,8 +200,10 @@ static void interpolate_onto_grid(double *vals, double v,
}
-static void process_image(struct process_args *pargs)
+static void sum_image(void *pg)
{
+ struct sum_args *apargs = pg;
+ struct static_sum_args *pargs = &apargs->static_args;
struct hdfile *hdfile;
struct image image;
double ax, ay, az;
@@ -179,8 +215,7 @@ static void process_image(struct process_args *pargs)
image.data = NULL;
image.flags = NULL;
image.indexed_cell = NULL;
- image.id = pargs->id;
- image.filename = pargs->filename;
+ image.filename = apargs->filename;
image.hits = NULL;
image.n_hits = 0;
image.det = pargs->det;
@@ -191,9 +226,9 @@ static void process_image(struct process_args *pargs)
image.orientation.y = 0.0;
image.orientation.z = 0.0;
- STATUS("Processing '%s'\n", pargs->filename);
+ STATUS("Processing '%s'\n", apargs->filename);
- hdfile = hdfile_open(pargs->filename);
+ hdfile = hdfile_open(apargs->filename);
if ( hdfile == NULL ) {
return;
} else if ( hdfile_set_first_image(hdfile, "/") ) {
@@ -204,7 +239,7 @@ static void process_image(struct process_args *pargs)
hdf5_read(hdfile, &image, 1);
- cell_get_cartesian(pargs->cell, &ax, &ay, &az, &bx, &by,
+ cell_get_cartesian(apargs->cell, &ax, &ay, &az, &bx, &by,
&bz, &cx, &cy, &cz);
fesetround(1); /* Round towards nearest */
@@ -243,10 +278,10 @@ static void process_image(struct process_args *pargs)
double v = image.data[x+image.width*y];
- pthread_mutex_lock(&pargs->vals_mutex);
+ pthread_mutex_lock(pargs->vals_mutex);
interpolate_onto_grid(pargs->vals, v, pargs->xs, pargs->ys,
pargs->zs, dh, dk, dl);
- pthread_mutex_unlock(&pargs->vals_mutex);
+ pthread_mutex_unlock(pargs->vals_mutex);
}
}
@@ -259,56 +294,30 @@ static void process_image(struct process_args *pargs)
double ang;
int bin;
- cell_get_reciprocal(pargs->cell, &asx, &asy, &asz,
- &bsx, &bsy, &bsz,
- &csx, &csy, &csz);
+ cell_get_reciprocal(apargs->cell, &asx, &asy, &asz,
+ &bsx, &bsy, &bsz,
+ &csx, &csy, &csz);
ang = angle_between(csx, csy, csz, 0.0, 0.0, 1.0);
ang = rad2deg(ang); /* 0->180 deg */
bin = rint(ang);
- pthread_mutex_lock(&pargs->vals_mutex);
+ pthread_mutex_lock(pargs->angles_mutex);
pargs->angles[bin]++;
- pthread_mutex_unlock(&pargs->vals_mutex);
+ pthread_mutex_unlock(pargs->angles_mutex);
}
+ pthread_mutex_lock(pargs->cell_mutex);
+ add_to_mean(apargs->cell, pargs->as, pargs->bs, pargs->cs,
+ pargs->als, pargs->bes, pargs->gas);
+ pthread_mutex_unlock(pargs->cell_mutex);
+
free(image.data);
- cell_free(pargs->cell);
+ cell_free(apargs->cell);
if ( image.flags != NULL ) free(image.flags);
hdfile_close(hdfile);
-}
-
-
-static void *worker_thread(void *pargsv)
-{
- struct process_args *pargs = pargsv;
- int finish;
-
- do {
-
- int wakeup;
-
- process_image(pargs);
-
- pthread_mutex_lock(&pargs->control_mutex);
- pargs->done = 1;
- pargs->start = 0;
- pthread_mutex_unlock(&pargs->control_mutex);
- /* Go to sleep until told to exit or process next image */
- do {
-
- pthread_mutex_lock(&pargs->control_mutex);
- /* Either of these can result in the thread waking up */
- wakeup = pargs->start || pargs->finish;
- finish = pargs->finish;
- pthread_mutex_unlock(&pargs->control_mutex);
- usleep(20000);
-
- } while ( !wakeup );
-
- } while ( !pargs->finish );
-
- return NULL;
+ free(apargs->filename);
+ free(pargs);
}
@@ -384,21 +393,35 @@ static void write_slice(const char *filename, double *vals, int z,
}
-static void add_to_mean(UnitCell *cell, double *ast, double *bst, double *cst,
- double *alst, double *best, double *gast)
+static void *get_image(void *qp)
{
- double asx, asy, asz;
- double bsx, bsy, bsz;
- double csx, csy, csz;
+ struct sum_args *pargs;
+ struct queue_args *qargs = qp;
+ UnitCell *cell;
+ char *filename;
- cell_get_reciprocal(cell, &asx, &asy, &asz, &bsx, &bsy, &bsz,
- &csx, &csy, &csz);
- *ast += modulus(asx, asy, asz);
- *bst += modulus(bsx, bsy, bsz);
- *cst += modulus(csx, csy, csz);
- *alst += angle_between(bsx, bsy, bsz, csx, csy, csz);
- *best += angle_between(asx, asy, asz, csx, csy, csz);
- *gast += angle_between(asx, asy, asz, bsx, bsy, bsz);
+ /* Get the next filename */
+ if ( find_chunk(qargs->fh, &cell, &filename) ) {
+ return NULL;
+ }
+
+ pargs = malloc(sizeof(struct sum_args));
+
+ if ( qargs->config_basename ) {
+ char *tmp;
+ tmp = strdup(basename(filename));
+ free(filename);
+ filename = tmp;
+ }
+
+ memcpy(&pargs->static_args, &qargs->static_args,
+ sizeof(struct static_sum_args));
+
+ pargs->cell = cell;
+ pargs->filename = malloc(1024);
+ snprintf(pargs->filename, 1023, "%s%s", qargs->prefix, filename);
+
+ return pargs;
}
@@ -408,13 +431,9 @@ int main(int argc, char *argv[])
char *infile = NULL;
char *geomfile = NULL;
FILE *fh;
- int rval;
int n_images;
char *prefix = NULL;
int nthreads = 1;
- pthread_t workers[MAX_THREADS];
- struct process_args *worker_args[MAX_THREADS];
- int worker_active[MAX_THREADS];
int config_basename = 0;
int config_checkprefix = 1;
struct detector *det;
@@ -425,7 +444,16 @@ int main(int argc, char *argv[])
int config_angles = 0;
signed int ht, kt, lt;
char *sym = NULL;
- double as, bs, cs, als, bes, gas;
+ struct queue_args qargs;
+ pthread_mutex_t vals_mutex = PTHREAD_MUTEX_INITIALIZER;
+ pthread_mutex_t angles_mutex = PTHREAD_MUTEX_INITIALIZER;
+ pthread_mutex_t cell_mutex = PTHREAD_MUTEX_INITIALIZER;
+ double as;
+ double bs;
+ double cs;
+ double als;
+ double bes;
+ double gas;
/* Long options */
const struct option longopts[] = {
@@ -509,7 +537,7 @@ int main(int argc, char *argv[])
/* Initialise shape transform array */
vals = calloc(gs*gs*gs, sizeof(double));
- if ( (nthreads == 0) || (nthreads > MAX_THREADS) ) {
+ if ( nthreads == 0 ) {
ERROR("Invalid number of threads.\n");
return 1;
}
@@ -519,149 +547,30 @@ int main(int argc, char *argv[])
as = 0.0; bs = 0.0; cs = 0.0; als = 0.0; bes = 0.0; gas = 0.0;
- /* Initialise worker arguments */
- for ( i=0; i<nthreads; i++ ) {
-
- worker_args[i] = malloc(sizeof(struct process_args));
- worker_args[i]->filename = malloc(1024);
- worker_active[i] = 0;
- worker_args[i]->xs = gs;
- worker_args[i]->ys = gs;
- worker_args[i]->zs = gs;
- worker_args[i]->config_angles = config_angles;
- worker_args[i]->vals = vals;
- worker_args[i]->angles = angles;
- worker_args[i]->det = det;
- pthread_mutex_init(&worker_args[i]->control_mutex, NULL);
- pthread_mutex_init(&worker_args[i]->vals_mutex, NULL);
- pthread_mutex_init(&worker_args[i]->angles_mutex, NULL);
- worker_args[i]->ht = ht;
- worker_args[i]->kt = kt;
- worker_args[i]->lt = lt;
- worker_args[i]->sym = sym;
-
- }
+ qargs.static_args.xs = gs;
+ qargs.static_args.ys = gs;
+ qargs.static_args.zs = gs;
+ qargs.static_args.config_angles = config_angles;
+ qargs.static_args.vals = vals;
+ qargs.static_args.angles = angles;
+ qargs.static_args.det = det;
+ qargs.static_args.vals_mutex = &vals_mutex;
+ qargs.static_args.angles_mutex = &angles_mutex;
+ qargs.static_args.ht = ht;
+ qargs.static_args.kt = kt;
+ qargs.static_args.lt = lt;
+ qargs.static_args.sym = sym;
+ qargs.static_args.cell_mutex = &cell_mutex;
+ qargs.static_args.as = &as;
+ qargs.static_args.bs = &bs;
+ qargs.static_args.cs = &cs;
+ qargs.static_args.als = &als;
+ qargs.static_args.bes = &bes;
+ qargs.static_args.gas = &gas;
n_images = 0;
- /* Start threads off */
- for ( i=0; i<nthreads; i++ ) {
-
- struct process_args *pargs;
- int r;
- int rval;
- char *filename;
- UnitCell *cell;
-
- pargs = worker_args[i];
-
- /* Get the next filename */
- rval = find_chunk(fh, &cell, &filename);
- if ( rval == 1 ) break;
- add_to_mean(cell, &as, &bs, &cs, &als, &bes, &gas);
- if ( config_basename ) {
- char *tmp;
- tmp = strdup(basename(filename));
- free(filename);
- filename = tmp;
- }
- snprintf(pargs->filename, 1023, "%s%s",
- prefix, filename);
- pargs->cell = cell;
- free(filename);
-
- n_images++;
-
- pthread_mutex_lock(&pargs->control_mutex);
- pargs->done = 0;
- pargs->start = 1;
- pargs->finish = 0;
- pthread_mutex_unlock(&pargs->control_mutex);
-
- worker_active[i] = 1;
- r = pthread_create(&workers[i], NULL, worker_thread, pargs);
- if ( r != 0 ) {
- worker_active[i] = 0;
- ERROR("Couldn't start thread %i\n", i);
- }
-
- }
-
- /* Keep threads busy until the end of the data */
- do {
-
- int i;
- rval = 0;
-
- for ( i=0; i<nthreads; i++ ) {
-
- struct process_args *pargs;
- int done;
- char *filename;
- UnitCell *cell;
-
- /* Spend time working, not managing threads */
- usleep(100000);
-
- /* Are we using this thread record at all? */
- if ( !worker_active[i] ) continue;
-
- /* Has the thread finished yet? */
- pargs = worker_args[i];
- pthread_mutex_lock(&pargs->control_mutex);
- done = pargs->done;
- pthread_mutex_unlock(&pargs->control_mutex);
- if ( !done ) continue;
-
- /* Get the next filename */
- rval = find_chunk(fh, &cell, &filename);
- if ( rval == 1 ) break;
- add_to_mean(cell, &as, &bs, &cs, &als, &bes, &gas);
- if ( config_basename ) {
- char *tmp;
- tmp = strdup(basename(filename));
- free(filename);
- filename = tmp;
- }
- snprintf(pargs->filename, 1023, "%s%s",
- prefix, filename);
- pargs->cell = cell;
- free(filename);
-
- n_images++;
-
- STATUS("Done %i images\n", n_images);
-
- /* Wake the thread up ... */
- pthread_mutex_lock(&pargs->control_mutex);
- pargs->done = 0;
- pargs->start = 1;
- pthread_mutex_unlock(&pargs->control_mutex);
-
- }
-
- } while ( rval == 0 );
-
- /* Join threads */
- for ( i=0; i<nthreads; i++ ) {
-
- if ( !worker_active[i] ) goto free;
-
- /* Tell the thread to exit */
- struct process_args *pargs = worker_args[i];
- pthread_mutex_lock(&pargs->control_mutex);
- pargs->finish = 1;
- pthread_mutex_unlock(&pargs->control_mutex);
-
- /* Wait for it to join */
- pthread_join(workers[i], NULL);
-
- free:
- if ( worker_args[i]->filename != NULL ) {
- free(worker_args[i]->filename);
- }
-
- }
+ run_threads(nthreads, sum_image, get_image, &qargs, 0);
fclose(fh);