diff options
author | Thomas White <taw@bitwiz.org.uk> | 2010-10-10 22:28:09 +0200 |
---|---|---|
committer | Thomas White <taw@physics.org> | 2012-02-22 15:27:02 +0100 |
commit | 313ef1a1bcc7d506730d7c3d4b19fc2dc393ace3 (patch) | |
tree | 934c7fefc99faf74c2d7909c754bfe0c6c45b317 | |
parent | 6bd5c9e7384efaf44b4f9e7a70c9bcf57ef67ceb (diff) |
reintegrate: Use new thread pool
-rw-r--r-- | src/Makefile.am | 3 | ||||
-rw-r--r-- | src/Makefile.in | 5 | ||||
-rw-r--r-- | src/reintegrate.c | 249 |
3 files changed, 77 insertions, 180 deletions
diff --git a/src/Makefile.am b/src/Makefile.am index 6517a67d..46fb77b9 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -66,7 +66,8 @@ cubeit_LDADD = @LIBS@ reintegrate_SOURCES = reintegrate.c cell.c hdf5-file.c utils.c detector.c \ peaks.c image.c stream.c \ - index.c dirax.c templates.c geometry.c symmetry.c + index.c dirax.c templates.c geometry.c symmetry.c \ + thread-pool.c reintegrate_LDADD = @LIBS@ INCLUDES = "-I$(top_srcdir)/data" diff --git a/src/Makefile.in b/src/Makefile.in index 7bf59480..4ffa7701 100644 --- a/src/Makefile.in +++ b/src/Makefile.in @@ -125,7 +125,7 @@ am_reintegrate_OBJECTS = reintegrate.$(OBJEXT) cell.$(OBJEXT) \ hdf5-file.$(OBJEXT) utils.$(OBJEXT) detector.$(OBJEXT) \ peaks.$(OBJEXT) image.$(OBJEXT) stream.$(OBJEXT) \ index.$(OBJEXT) dirax.$(OBJEXT) templates.$(OBJEXT) \ - geometry.$(OBJEXT) symmetry.$(OBJEXT) + geometry.$(OBJEXT) symmetry.$(OBJEXT) thread-pool.$(OBJEXT) reintegrate_OBJECTS = $(am_reintegrate_OBJECTS) reintegrate_DEPENDENCIES = am_render_hkl_OBJECTS = render_hkl.$(OBJEXT) cell.$(OBJEXT) \ @@ -307,7 +307,8 @@ cubeit_SOURCES = cubeit.c cell.c hdf5-file.c utils.c detector.c render.c \ cubeit_LDADD = @LIBS@ reintegrate_SOURCES = reintegrate.c cell.c hdf5-file.c utils.c detector.c \ peaks.c image.c stream.c \ - index.c dirax.c templates.c geometry.c symmetry.c + index.c dirax.c templates.c geometry.c symmetry.c \ + thread-pool.c reintegrate_LDADD = @LIBS@ INCLUDES = "-I$(top_srcdir)/data" diff --git a/src/reintegrate.c b/src/reintegrate.c index ceb1974e..5b9cf0f2 100644 --- a/src/reintegrate.c +++ b/src/reintegrate.c @@ -30,22 +30,11 @@ #include "peaks.h" #include "stream.h" #include "index.h" +#include "thread-pool.h" -#define MAX_THREADS (256) - -struct process_args +struct static_integration_args { - char *filename; - int id; - - /* Thread control */ - pthread_mutex_t control_mutex; /* Protects the scary stuff below */ - int start; - int finish; - int done; - - UnitCell *cell; struct detector *det; pthread_mutex_t *output_mutex; /* Protects 'stdout' */ int config_cmfilter; @@ -57,6 +46,26 @@ struct process_args }; +struct integration_args +{ + char *filename; + UnitCell *cell; + + struct static_integration_args static_args; +}; + + +struct queue_args +{ + FILE *fh; + const char *prefix; + int config_basename; + + struct static_integration_args static_args; +}; + + + static void show_help(const char *s) { printf("Syntax: %s [options]\n\n", s); @@ -88,8 +97,10 @@ static void show_help(const char *s) } -static void process_image(struct process_args *pargs) +static void process_image(void *pg) { + struct integration_args *apargs = pg; + struct static_integration_args *pargs = &apargs->static_args; struct hdfile *hdfile; struct image image; @@ -97,8 +108,7 @@ static void process_image(struct process_args *pargs) image.data = NULL; image.flags = NULL; image.indexed_cell = NULL; - image.id = pargs->id; - image.filename = pargs->filename; + image.filename = apargs->filename; image.hits = NULL; image.n_hits = 0; image.det = pargs->det; @@ -109,9 +119,9 @@ static void process_image(struct process_args *pargs) image.orientation.y = 0.0; image.orientation.z = 0.0; - STATUS("Processing '%s'\n", pargs->filename); + STATUS("Processing '%s'\n", apargs->filename); - hdfile = hdfile_open(pargs->filename); + hdfile = hdfile_open(apargs->filename); if ( hdfile == NULL ) { return; } else if ( hdfile_set_first_image(hdfile, "/") ) { @@ -132,50 +142,51 @@ static void process_image(struct process_args *pargs) } else { - output_intensities(&image, pargs->cell, + output_intensities(&image, apargs->cell, pargs->output_mutex, pargs->config_polar, pargs->config_sa, pargs->config_closer, stdout, 0, 0.1); } free(image.data); - cell_free(pargs->cell); if ( image.flags != NULL ) free(image.flags); hdfile_close(hdfile); + + free(apargs->filename); + cell_free(apargs->cell); + free(apargs); } -static void *worker_thread(void *pargsv) +static void *get_image(void *qp) { - struct process_args *pargs = pargsv; - int finish; - - do { - - int wakeup; - - process_image(pargs); + struct integration_args *pargs; + struct queue_args *qargs = qp; + UnitCell *cell; + char *filename; - pthread_mutex_lock(&pargs->control_mutex); - pargs->done = 1; - pargs->start = 0; - pthread_mutex_unlock(&pargs->control_mutex); + /* Get the next filename */ + if ( find_chunk(qargs->fh, &cell, &filename) ) { + return NULL; + } - /* Go to sleep until told to exit or process next image */ - do { + pargs = malloc(sizeof(struct integration_args)); - pthread_mutex_lock(&pargs->control_mutex); - /* Either of these can result in the thread waking up */ - wakeup = pargs->start || pargs->finish; - finish = pargs->finish; - pthread_mutex_unlock(&pargs->control_mutex); - usleep(20000); + if ( qargs->config_basename ) { + char *tmp; + tmp = strdup(basename(filename)); + free(filename); + filename = tmp; + } - } while ( !wakeup ); + memcpy(&pargs->static_args, &qargs->static_args, + sizeof(struct static_integration_args)); - } while ( !pargs->finish ); + pargs->cell = cell; + pargs->filename = malloc(1024); + snprintf(pargs->filename, 1023, "%s%s", qargs->prefix, filename); - return NULL; + return pargs; } @@ -185,141 +196,25 @@ static void integrate_all(int nthreads, struct detector *det, FILE *fh, int config_satcorr, int config_sa, int config_closer, int config_sanity) { - pthread_t workers[MAX_THREADS]; - struct process_args *worker_args[MAX_THREADS]; - int worker_active[MAX_THREADS]; - int i; - int rval; + struct queue_args qargs; pthread_mutex_t output_mutex = PTHREAD_MUTEX_INITIALIZER; - /* Initialise worker arguments */ - for ( i=0; i<nthreads; i++ ) { - - worker_args[i] = malloc(sizeof(struct process_args)); - worker_args[i]->filename = malloc(1024); - worker_active[i] = 0; - worker_args[i]->det = det; - worker_args[i]->config_cmfilter = config_cmfilter; - worker_args[i]->config_polar = config_polar; - worker_args[i]->config_sanity = config_sanity; - worker_args[i]->config_satcorr = config_satcorr; - worker_args[i]->config_sa = config_sa; - worker_args[i]->config_closer = config_closer; - pthread_mutex_init(&worker_args[i]->control_mutex, NULL); - worker_args[i]->output_mutex = &output_mutex; - - } - - /* Start threads off */ - for ( i=0; i<nthreads; i++ ) { - - struct process_args *pargs; - int r; - int rval; - char *filename; - UnitCell *cell; - - pargs = worker_args[i]; - - /* Get the next filename */ - rval = find_chunk(fh, &cell, &filename); - if ( rval == 1 ) break; - if ( config_basename ) { - char *tmp; - tmp = strdup(basename(filename)); - free(filename); - filename = tmp; - } - snprintf(pargs->filename, 1023, "%s%s", - prefix, filename); - pargs->cell = cell; - free(filename); - - pthread_mutex_lock(&pargs->control_mutex); - pargs->done = 0; - pargs->start = 1; - pargs->finish = 0; - pthread_mutex_unlock(&pargs->control_mutex); - - worker_active[i] = 1; - r = pthread_create(&workers[i], NULL, worker_thread, pargs); - if ( r != 0 ) { - worker_active[i] = 0; - ERROR("Couldn't start thread %i\n", i); - } - - } - - /* Keep threads busy until the end of the data */ - do { - - int i; - rval = 0; - - for ( i=0; i<nthreads; i++ ) { - - struct process_args *pargs; - int done; - char *filename; - UnitCell *cell; - - /* Spend time working, not managing threads */ - usleep(100000); - - /* Are we using this thread record at all? */ - if ( !worker_active[i] ) continue; - - /* Has the thread finished yet? */ - pargs = worker_args[i]; - pthread_mutex_lock(&pargs->control_mutex); - done = pargs->done; - pthread_mutex_unlock(&pargs->control_mutex); - if ( !done ) continue; - - /* Get the next filename */ - rval = find_chunk(fh, &cell, &filename); - if ( rval == 1 ) break; - if ( config_basename ) { - char *tmp; - tmp = strdup(basename(filename)); - free(filename); - filename = tmp; - } - snprintf(pargs->filename, 1023, "%s%s", - prefix, filename); - pargs->cell = cell; - free(filename); - - /* Wake the thread up ... */ - pthread_mutex_lock(&pargs->control_mutex); - pargs->done = 0; - pargs->start = 1; - pthread_mutex_unlock(&pargs->control_mutex); - - } - - } while ( rval == 0 ); - - /* Join threads */ - for ( i=0; i<nthreads; i++ ) { - - if ( !worker_active[i] ) goto free; - - /* Tell the thread to exit */ - struct process_args *pargs = worker_args[i]; - pthread_mutex_lock(&pargs->control_mutex); - pargs->finish = 1; - pthread_mutex_unlock(&pargs->control_mutex); - - /* Wait for it to join */ - pthread_join(workers[i], NULL); - - free: - if ( worker_args[i]->filename != NULL ) { - free(worker_args[i]->filename); - } - - } + /* Information required to choose the next image */ + qargs.fh = fh; + qargs.prefix = prefix; + qargs.config_basename = config_basename; + + /* Information for the task which does not vary */ + qargs.static_args.det = det; + qargs.static_args.config_cmfilter = config_cmfilter; + qargs.static_args.config_polar = config_polar; + qargs.static_args.config_satcorr = config_satcorr; + qargs.static_args.config_sa = config_sa; + qargs.static_args.config_closer = config_closer; + qargs.static_args.config_sanity = qargs.static_args.config_sanity; + qargs.static_args.output_mutex = &output_mutex; + + run_threads(nthreads, process_image, get_image, &qargs, 0); } |