aboutsummaryrefslogtreecommitdiff
path: root/src/indexamajig.c
diff options
context:
space:
mode:
authorThomas White <taw@physics.org>2010-10-11 10:57:46 +0200
committerThomas White <taw@physics.org>2012-02-22 15:27:02 +0100
commita22d0dc84c4411ca1b4583ac7857d5301c690f7c (patch)
treeb7639b8813ef6c8f97160fb8d8a8391a27664eea /src/indexamajig.c
parenta7d2cab127719eeef82584664b1abbbee06656c4 (diff)
indexamajig: Use new thread pool
Diffstat (limited to 'src/indexamajig.c')
-rw-r--r--src/indexamajig.c345
1 files changed, 122 insertions, 223 deletions
diff --git a/src/indexamajig.c b/src/indexamajig.c
index a866ac3b..4af459bc 100644
--- a/src/indexamajig.c
+++ b/src/indexamajig.c
@@ -35,9 +35,7 @@
#include "sfac.h"
#include "filters.h"
#include "reflections.h"
-
-
-#define MAX_THREADS (96)
+#include "thread-pool.h"
enum {
@@ -46,11 +44,9 @@ enum {
};
-struct process_args
+/* Information about the indexing process which is common to all patterns */
+struct static_index_args
{
- /* Input */
- char *filename;
- int id;
pthread_mutex_t *gpu_mutex; /* Protects "gctx" */
UnitCell *cell;
int config_cmfilter;
@@ -76,20 +72,37 @@ struct process_args
struct gpu_context *gctx;
int peaks;
- /* Thread control and output */
- pthread_mutex_t control_mutex; /* Protects the scary stuff below */
- int start;
- int finish;
- int done;
- int indexable;
- int sane;
-
/* Output stream */
pthread_mutex_t *output_mutex; /* Protects the output stream */
FILE *ofh;
};
+/* Information about the indexing process for one pattern */
+struct index_args
+{
+ /* "Input" */
+ char *filename;
+ struct static_index_args static_args;
+
+ /* "Output" */
+ int indexable;
+ int sane;
+};
+
+
+/* Information needed to choose the next task and dispatch it */
+struct queue_args
+{
+ FILE *fh;
+ char *prefix;
+ struct static_index_args static_args;
+
+ int n_indexable;
+ int n_sane;
+};
+
+
static void show_help(const char *s)
{
printf("Syntax: %s [options]\n\n", s);
@@ -100,6 +113,7 @@ static void show_help(const char *s)
"\n"
" -i, --input=<filename> Specify file containing list of images to process.\n"
" '-' means stdin, which is the default.\n"
+" -o, --output=<filename> Write indexed stream to this file. '-' for stdout.\n"
"\n"
" --indexing=<method> Use 'method' for indexing. Choose from:\n"
" none : no indexing (default)\n"
@@ -273,38 +287,39 @@ static void simulate_and_write(struct image *simage, struct gpu_context **gctx,
}
-static void process_image(struct process_args *pargs)
+static void process_image(void *pp, int cookie)
{
+ struct index_args *pargs = pp;
struct hdfile *hdfile;
struct image image;
struct image *simage;
float *data_for_measurement;
size_t data_size;
char *filename = pargs->filename;
- UnitCell *cell = pargs->cell;
- int config_cmfilter = pargs->config_cmfilter;
- int config_noisefilter = pargs->config_noisefilter;
- int config_writedrx = pargs->config_writedrx;
- int config_dumpfound = pargs->config_dumpfound;
- int config_verbose = pargs->config_verbose;
- int config_alternate = pargs->config_alternate;
- int config_nearbragg = pargs->config_nearbragg;
- int config_gpu = pargs->config_gpu;
- int config_simulate = pargs->config_simulate;
- int config_nomatch = pargs->config_nomatch;
- int config_polar = pargs->config_polar;
- IndexingMethod indm = pargs->indm;
- const double *intensities = pargs->intensities;
- struct gpu_context *gctx = pargs->gctx;
+ UnitCell *cell = pargs->static_args.cell;
+ int config_cmfilter = pargs->static_args.config_cmfilter;
+ int config_noisefilter = pargs->static_args.config_noisefilter;
+ int config_writedrx = pargs->static_args.config_writedrx;
+ int config_dumpfound = pargs->static_args.config_dumpfound;
+ int config_verbose = pargs->static_args.config_verbose;
+ int config_alternate = pargs->static_args.config_alternate;
+ int config_nearbragg = pargs->static_args.config_nearbragg;
+ int config_gpu = pargs->static_args.config_gpu;
+ int config_simulate = pargs->static_args.config_simulate;
+ int config_nomatch = pargs->static_args.config_nomatch;
+ int config_polar = pargs->static_args.config_polar;
+ IndexingMethod indm = pargs->static_args.indm;
+ const double *intensities = pargs->static_args.intensities;
+ struct gpu_context *gctx = pargs->static_args.gctx;
image.features = NULL;
image.data = NULL;
image.indexed_cell = NULL;
- image.id = pargs->id;
+ image.id = cookie;
image.filename = filename;
image.hits = NULL;
image.n_hits = 0;
- image.det = pargs->det;
+ image.det = pargs->static_args.det;
/* View head-on (unit cell is tilted) */
image.orientation.w = 1.0;
@@ -325,7 +340,7 @@ static void process_image(struct process_args *pargs)
return;
}
- hdf5_read(hdfile, &image, pargs->config_satcorr);
+ hdf5_read(hdfile, &image, pargs->static_args.config_satcorr);
if ( config_cmfilter ) {
filter_cm(&image);
@@ -342,7 +357,7 @@ static void process_image(struct process_args *pargs)
memcpy(data_for_measurement, image.data, data_size);
}
- switch ( pargs->peaks )
+ switch ( pargs->static_args.peaks )
{
case PEAK_HDF5 :
/* Get peaks from HDF5 */
@@ -352,7 +367,7 @@ static void process_image(struct process_args *pargs)
}
break;
case PEAK_ZAEF :
- search_peaks(&image, pargs->threshold);
+ search_peaks(&image, pargs->static_args.threshold);
break;
}
@@ -362,7 +377,8 @@ static void process_image(struct process_args *pargs)
image.data = data_for_measurement;
if ( config_dumpfound ) {
- dump_peaks(&image, pargs->ofh, pargs->output_mutex);
+ dump_peaks(&image, pargs->static_args.ofh,
+ pargs->static_args.output_mutex);
}
/* Not indexing nor writing xfel.drx?
@@ -374,7 +390,7 @@ static void process_image(struct process_args *pargs)
/* Calculate orientation matrix (by magic) */
if ( config_writedrx || (indm != INDEXING_NONE) ) {
index_pattern(&image, cell, indm, config_nomatch,
- config_verbose, pargs->ipriv);
+ config_verbose, pargs->static_args.ipriv);
}
/* No cell at this point? Then we're done. */
@@ -382,7 +398,7 @@ static void process_image(struct process_args *pargs)
pargs->indexable = 1;
/* Sanity check */
- if ( pargs->config_sanity
+ if ( pargs->static_args.config_sanity
&& !peak_sanity_check(&image, image.indexed_cell, 0, 0.1) ) {
STATUS("Failed peak sanity check.\n");
goto done;
@@ -393,9 +409,10 @@ static void process_image(struct process_args *pargs)
/* Measure intensities if requested */
if ( config_nearbragg ) {
output_intensities(&image, image.indexed_cell,
- pargs->output_mutex, config_polar,
- pargs->config_sa, pargs->config_closer,
- pargs->ofh, 0, 0.1);
+ pargs->static_args.output_mutex,
+ config_polar, pargs->static_args.config_sa,
+ pargs->static_args.config_closer,
+ pargs->static_args.ofh, 0, 0.1);
}
simage = get_simage(&image, config_alternate);
@@ -403,10 +420,10 @@ static void process_image(struct process_args *pargs)
/* Simulate if requested */
if ( config_simulate ) {
if ( config_gpu ) {
- pthread_mutex_lock(pargs->gpu_mutex);
+ pthread_mutex_lock(pargs->static_args.gpu_mutex);
simulate_and_write(simage, &gctx, intensities,
image.indexed_cell);
- pthread_mutex_unlock(pargs->gpu_mutex);
+ pthread_mutex_unlock(pargs->static_args.gpu_mutex);
} else {
simulate_and_write(simage, NULL, intensities,
image.indexed_cell);
@@ -430,37 +447,40 @@ done:
}
-static void *worker_thread(void *pargsv)
+static void *get_image(void *qp)
{
- struct process_args *pargs = pargsv;
- int finish;
+ char line[1024];
+ struct index_args *pargs;
+ char *rval;
+ struct queue_args *qargs = qp;
- do {
+ /* Get the next filename */
+ rval = fgets(line, 1023, qargs->fh);
+ if ( rval == NULL ) return NULL;
- int wakeup;
+ pargs = malloc(sizeof(struct index_args));
- process_image(pargs);
+ memcpy(&pargs->static_args, &qargs->static_args,
+ sizeof(struct static_index_args));
- pthread_mutex_lock(&pargs->control_mutex);
- pargs->done = 1;
- pargs->start = 0;
- pthread_mutex_unlock(&pargs->control_mutex);
+ chomp(line);
+ pargs->filename = malloc(strlen(qargs->prefix) + strlen(line) + 1);
+ snprintf(pargs->filename, 1023, "%s%s", qargs->prefix, line);
- /* Go to sleep until told to exit or process next image */
- do {
+ return pargs;
+}
- pthread_mutex_lock(&pargs->control_mutex);
- /* Either of these can result in the thread waking up */
- wakeup = pargs->start || pargs->finish;
- finish = pargs->finish;
- pthread_mutex_unlock(&pargs->control_mutex);
- usleep(20000);
- } while ( !wakeup );
+static void finalise_image(void *qp, void *pp)
+{
+ struct queue_args *qargs = qp;
+ struct index_args *pargs = pp;
- } while ( !pargs->finish );
+ qargs->n_indexable += pargs->indexable;
+ qargs->n_sane += pargs->sane;
- return NULL;
+ free(pargs->filename);
+ free(pargs);
}
@@ -474,8 +494,6 @@ int main(int argc, char *argv[])
FILE *ofh;
char *rval = NULL;
int n_images;
- int n_indexable;
- int n_sane;
int config_noindex = 0;
int config_dumpfound = 0;
int config_nearbragg = 0;
@@ -506,15 +524,13 @@ int main(int argc, char *argv[])
char *speaks = NULL;
int peaks;
int nthreads = 1;
- pthread_t workers[MAX_THREADS];
- struct process_args *worker_args[MAX_THREADS];
- int worker_active[MAX_THREADS];
int i;
pthread_mutex_t output_mutex = PTHREAD_MUTEX_INITIALIZER;
pthread_mutex_t gpu_mutex = PTHREAD_MUTEX_INITIALIZER;
char prepare_line[1024];
char prepare_filename[1024];
IndexingPrivate *ipriv;
+ struct queue_args qargs;
/* Long options */
const struct option longopts[] = {
@@ -670,7 +686,7 @@ int main(int argc, char *argv[])
}
}
- if ( (nthreads == 0) || (nthreads > MAX_THREADS) ) {
+ if ( nthreads == 0 ) {
ERROR("Invalid number of threads.\n");
return 1;
}
@@ -740,158 +756,41 @@ int main(int argc, char *argv[])
}
gsl_set_error_handler_off();
- n_images = 0;
- n_indexable = 0;
- n_sane = 0;
-
- for ( i=0; i<nthreads; i++ ) {
- worker_args[i] = malloc(sizeof(struct process_args));
- worker_args[i]->filename = malloc(1024);
- worker_args[i]->ofh = ofh;
- worker_args[i]->peaks = peaks;
- worker_active[i] = 0;
- }
-
- /* Start threads off */
- for ( i=0; i<nthreads; i++ ) {
-
- char line[1024];
- struct process_args *pargs;
- int r;
-
- pargs = worker_args[i];
- if ( strlen(prepare_line) > 0 ) {
- strcpy(line, prepare_line);
- prepare_line[0] = '\0';
- } else {
- rval = fgets(line, 1023, fh);
- if ( rval == NULL ) continue;
- }
- chomp(line);
- snprintf(pargs->filename, 1023, "%s%s", prefix, line);
-
- n_images++;
-
- pargs->output_mutex = &output_mutex;
- pargs->gpu_mutex = &gpu_mutex;
- pthread_mutex_init(&pargs->control_mutex, NULL);
- pargs->config_cmfilter = config_cmfilter;
- pargs->config_noisefilter = config_noisefilter;
- pargs->config_writedrx = config_writedrx;
- pargs->config_dumpfound = config_dumpfound;
- pargs->config_verbose = config_verbose;
- pargs->config_alternate = config_alternate;
- pargs->config_nearbragg = config_nearbragg;
- pargs->config_gpu = config_gpu;
- pargs->config_simulate = config_simulate;
- pargs->config_nomatch = config_nomatch;
- pargs->config_polar = config_polar;
- pargs->config_sanity = config_sanity;
- pargs->config_satcorr = config_satcorr;
- pargs->config_sa = config_sa;
- pargs->config_closer = config_closer;
- pargs->cell = cell;
- pargs->det = det;
- pargs->ipriv = ipriv;
- pargs->indm = indm;
- pargs->intensities = intensities;
- pargs->gctx = gctx;
- pargs->threshold = threshold;
- pargs->id = i;
- pthread_mutex_lock(&pargs->control_mutex);
- pargs->done = 0;
- pargs->start = 1;
- pargs->finish = 0;
- pthread_mutex_unlock(&pargs->control_mutex);
-
- worker_active[i] = 1;
- r = pthread_create(&workers[i], NULL, worker_thread, pargs);
- if ( r != 0 ) {
- worker_active[i] = 0;
- ERROR("Couldn't start thread %i\n", i);
- }
-
- }
-
- /* Keep threads busy until the end of the data */
- do {
-
- int i;
-
- for ( i=0; i<nthreads; i++ ) {
-
- char line[1024];
- struct process_args *pargs;
- int done;
-
- /* Spend CPU time indexing, not checking results */
- usleep(100000);
-
- /* Are we using this thread record at all? */
- if ( !worker_active[i] ) continue;
-
- /* Has the thread finished yet? */
- pargs = worker_args[i];
- pthread_mutex_lock(&pargs->control_mutex);
- done = pargs->done;
- pthread_mutex_unlock(&pargs->control_mutex);
- if ( !done ) continue;
-
- /* Results will be processed after checking if
- * there are any more images to process. */
-
- /* Get next filename */
- rval = fgets(line, 1023, fh);
- /* In this case, the result of the last file
- * file will be processed when the thread is
- * joined. */
- if ( rval == NULL ) break;
-
- /* Record the result */
- n_indexable += pargs->indexable;
- n_sane += pargs->sane;
-
- chomp(line);
- snprintf(pargs->filename, 1023, "%s%s", prefix, line);
-
- n_images++;
-
- /* Wake the thread up ... */
- pthread_mutex_lock(&pargs->control_mutex);
- pargs->done = 0;
- pargs->start = 1;
- pthread_mutex_unlock(&pargs->control_mutex);
-
- }
-
- } while ( rval != NULL );
-
- /* Join threads */
- for ( i=0; i<nthreads; i++ ) {
-
- if ( !worker_active[i] ) goto free;
-
- /* Tell the thread to exit */
- struct process_args *pargs = worker_args[i];
- pthread_mutex_lock(&pargs->control_mutex);
- pargs->finish = 1;
- pthread_mutex_unlock(&pargs->control_mutex);
-
- /* Wait for it to join */
- pthread_join(workers[i], NULL);
- worker_active[i] = 0;
-
- n_indexable += pargs->indexable;
- n_sane += pargs->sane;
-
- free:
- if ( worker_args[i]->filename != NULL ) {
- free(worker_args[i]->filename);
- }
- free(worker_args[i]);
-
- }
+ qargs.static_args.gpu_mutex = &gpu_mutex;
+ qargs.static_args.cell = cell;
+ qargs.static_args.config_cmfilter = config_cmfilter;
+ qargs.static_args.config_noisefilter = config_noisefilter;
+ qargs.static_args.config_writedrx = config_writedrx;
+ qargs.static_args.config_dumpfound = config_dumpfound;
+ qargs.static_args.config_verbose = config_verbose;
+ qargs.static_args.config_alternate = config_alternate;
+ qargs.static_args.config_nearbragg = config_nearbragg;
+ qargs.static_args.config_gpu = config_gpu;
+ qargs.static_args.config_simulate = config_simulate;
+ qargs.static_args.config_nomatch = config_nomatch;
+ qargs.static_args.config_polar = config_polar;
+ qargs.static_args.config_sanity = config_sanity;
+ qargs.static_args.config_satcorr = config_satcorr;
+ qargs.static_args.config_sa = config_sa;
+ qargs.static_args.config_closer = config_closer;
+ qargs.static_args.threshold = threshold;
+ qargs.static_args.det = det;
+ qargs.static_args.indm = indm;
+ qargs.static_args.ipriv = ipriv;
+ qargs.static_args.intensities = intensities;
+ qargs.static_args.gctx = gctx;
+ qargs.static_args.peaks = peaks;
+ qargs.static_args.output_mutex = &output_mutex;
+ qargs.static_args.ofh = ofh;
+
+ qargs.fh = fh;
+ qargs.prefix = prefix;
+ qargs.n_indexable = 0;
+ qargs.n_sane = 0;
+
+ n_images = run_threads(nthreads, process_image, get_image,
+ finalise_image, &qargs, 0);
cleanup_indexing(ipriv);
@@ -902,7 +801,7 @@ int main(int argc, char *argv[])
fclose(fh);
STATUS("There were %i images. %i could be indexed, of which %i"
- " looked sane.\n", n_images, n_indexable, n_sane);
+ " looked sane.\n", n_images, qargs.n_indexable, qargs.n_sane);
if ( gctx != NULL ) {
cleanup_gpu(gctx);