aboutsummaryrefslogtreecommitdiff
path: root/src/calibrate_detector.c
diff options
context:
space:
mode:
authorThomas White <taw@bitwiz.org.uk>2010-10-10 20:32:41 +0200
committerThomas White <taw@physics.org>2012-02-22 15:27:02 +0100
commit33d4fbfbbee784f734632e543a75222f38bc807d (patch)
treec9281f611a9da26433e4c3770ffce440ef25d3f9 /src/calibrate_detector.c
parenta9adbc73158e8f8b225cd59d62ad5ade648c8241 (diff)
calibrate_detecotr: Use new thread pool
Diffstat (limited to 'src/calibrate_detector.c')
-rw-r--r--src/calibrate_detector.c287
1 files changed, 72 insertions, 215 deletions
diff --git a/src/calibrate_detector.c b/src/calibrate_detector.c
index e5945015..2c68f645 100644
--- a/src/calibrate_detector.c
+++ b/src/calibrate_detector.c
@@ -27,6 +27,7 @@
#include "hdf5-file.h"
#include "filters.h"
#include "peaks.h"
+#include "thread-pool.h"
#define INTEGRATION_RADIUS (10)
@@ -39,10 +40,9 @@ typedef enum
SUM_PEAKS
} SumMethod;
-struct process_args
+struct sum_args
{
char *filename;
- int id;
int config_cmfilter;
int config_noisefilter;
double *sum;
@@ -50,12 +50,20 @@ struct process_args
int h;
SumMethod sum_method;
double threshold;
+};
+
- /* Thread control */
- pthread_mutex_t control_mutex; /* Protects the scary stuff below */
- int start;
- int finish;
- int done;
+struct queue_args
+{
+ FILE *fh;
+ char *prefix;
+ int config_cmfilter;
+ int config_noisefilter;
+ double *sum;
+ int w;
+ int h;
+ SumMethod sum_method;
+ double threshold;
};
@@ -151,8 +159,9 @@ static void sum_threshold(struct image *image, double *sum, double threshold)
}
-static void process_image(struct process_args *pargs)
+static void add_image(void *args)
{
+ struct sum_args *pargs = args;
struct hdfile *hdfile;
struct image image;
@@ -160,7 +169,6 @@ static void process_image(struct process_args *pargs)
image.data = NULL;
image.flags = NULL;
image.indexed_cell = NULL;
- image.id = pargs->id;
image.filename = pargs->filename;
image.hits = NULL;
image.n_hits = 0;
@@ -215,71 +223,38 @@ out:
image_feature_list_free(image.features);
if ( image.flags != NULL ) free(image.flags);
hdfile_close(hdfile);
-}
-
-
-static void *worker_thread(void *pargsv)
-{
- struct process_args *pargs = pargsv;
- int finish;
-
- do {
-
- int wakeup;
-
- process_image(pargs);
- pthread_mutex_lock(&pargs->control_mutex);
- pargs->done = 1;
- pargs->start = 0;
- pthread_mutex_unlock(&pargs->control_mutex);
-
- /* Go to sleep until told to exit or process next image */
- do {
-
- pthread_mutex_lock(&pargs->control_mutex);
- /* Either of these can result in the thread waking up */
- wakeup = pargs->start || pargs->finish;
- finish = pargs->finish;
- pthread_mutex_unlock(&pargs->control_mutex);
- usleep(20000);
-
- } while ( !wakeup );
-
- } while ( !pargs->finish );
-
- return NULL;
+ free(pargs->filename);
+ free(pargs);
}
-static void dump_to_file(struct process_args *worker_args[], int nthreads,
- int w, int h, int n, const char *stem)
+static void *get_image(void *qp)
{
- int i;
- double *total;
- char outfile[256];
-
- total = calloc(w*h, sizeof(double));
-
- /* Add the individual sums to the 0th sum */
- for ( i=0; i<nthreads; i++ ) {
-
- int x, y;
-
- for ( x=0; x<w; x++ ) {
- for ( y=0; y<h; y++ ) {
- double val = worker_args[i]->sum[x+w*y];
- total[x+w*y] += val;
- }
- }
-
- }
-
- snprintf(outfile, 255, "%s-%i.h5", stem, n);
-
- hdf5_write(outfile, total, w, h, H5T_NATIVE_DOUBLE);
-
- free(total);
+ char line[1024];
+ struct sum_args *pargs;
+ char *rval;
+ struct queue_args *qargs = qp;
+
+ /* Get the next filename */
+ rval = fgets(line, 1023, qargs->fh);
+ if ( rval == NULL ) return NULL;
+
+ pargs = malloc(sizeof(struct sum_args));
+
+ pargs->w = qargs->w;
+ pargs->h = qargs->h;
+ pargs->sum_method = qargs->sum_method;
+ pargs->threshold = qargs->threshold;
+ pargs->config_cmfilter = qargs->config_cmfilter;
+ pargs->config_noisefilter = qargs->config_noisefilter;
+ pargs->sum = qargs->sum;
+
+ chomp(line);
+ pargs->filename = malloc(strlen(qargs->prefix) + strlen(line) + 1);
+ snprintf(pargs->filename, 1023, "%s%s", qargs->prefix, line);
+
+ return pargs;
}
@@ -289,8 +264,7 @@ int main(int argc, char *argv[])
char *filename = NULL;
char *outfile = NULL;
FILE *fh;
- char *rval = NULL;
- int n_images;
+ int n_images = 0;
int config_cmfilter = 0;
int config_noisefilter = 0;
char *prefix = NULL;
@@ -299,12 +273,9 @@ int main(int argc, char *argv[])
double threshold = 400.0;
SumMethod sum;
int nthreads = 1;
- pthread_t workers[MAX_THREADS];
- struct process_args *worker_args[MAX_THREADS];
- int worker_active[MAX_THREADS];
- int i;
- const int w = 1024; /* FIXME! */
- const int h = 1024; /* FIXME! */
+ struct queue_args qargs;
+ int n_done;
+ const int chunk_size = 1000;
/* Long options */
const struct option longopts[] = {
@@ -402,157 +373,43 @@ int main(int argc, char *argv[])
outfile = strdup("summed.h5");
}
- if ( (nthreads == 0) || (nthreads > MAX_THREADS) ) {
+ if ( nthreads == 0 ) {
ERROR("Invalid number of threads.\n");
return 1;
}
- /* Initialise worker arguments */
- for ( i=0; i<nthreads; i++ ) {
-
- worker_args[i] = malloc(sizeof(struct process_args));
- worker_args[i]->filename = malloc(1024);
- worker_args[i]->sum = calloc(w*h, sizeof(double));
- worker_active[i] = 0;
-
- worker_args[i]->w = w;
- worker_args[i]->h = h;
- worker_args[i]->sum_method = sum;
- worker_args[i]->threshold = threshold;
-
- }
-
- n_images = 0;
-
- /* Start threads off */
- for ( i=0; i<nthreads; i++ ) {
-
- char line[1024];
- struct process_args *pargs;
- int r;
-
- pargs = worker_args[i];
+ qargs.w = 1024; /* FIXME! */
+ qargs.h = 1024; /* FIXME! */
+ qargs.sum_method = sum;
+ qargs.threshold = threshold;
+ qargs.config_cmfilter = config_cmfilter;
+ qargs.config_noisefilter = config_noisefilter;
+ qargs.sum = calloc(qargs.w*qargs.h, sizeof(double));
+ qargs.prefix = prefix;
+ qargs.fh = fh;
- rval = fgets(line, 1023, fh);
- if ( rval == NULL ) continue;
- chomp(line);
- snprintf(pargs->filename, 1023, "%s%s", prefix, line);
-
- n_images++;
-
- pthread_mutex_init(&pargs->control_mutex, NULL);
- pargs->config_cmfilter = config_cmfilter;
- pargs->config_noisefilter = config_noisefilter;
- pthread_mutex_lock(&pargs->control_mutex);
- pargs->done = 0;
- pargs->start = 1;
- pargs->finish = 0;
- pthread_mutex_unlock(&pargs->control_mutex);
-
-
- worker_active[i] = 1;
- r = pthread_create(&workers[i], NULL, worker_thread, pargs);
- if ( r != 0 ) {
- worker_active[i] = 0;
- ERROR("Couldn't start thread %i\n", i);
- }
-
- }
-
- /* Keep threads busy until the end of the data */
do {
- int i;
-
- for ( i=0; i<nthreads; i++ ) {
-
- char line[1024];
- struct process_args *pargs;
- int done;
-
- /* Spend time working, not managing threads */
- usleep(100000);
-
- /* Are we using this thread record at all? */
- if ( !worker_active[i] ) continue;
-
- /* Has the thread finished yet? */
- pargs = worker_args[i];
- pthread_mutex_lock(&pargs->control_mutex);
- done = pargs->done;
- pthread_mutex_unlock(&pargs->control_mutex);
- if ( !done ) continue;
-
- /* Get the next filename */
- rval = fgets(line, 1023, fh);
- if ( rval == NULL ) break;
- chomp(line);
- snprintf(pargs->filename, 1023, "%s%s", prefix, line);
+ n_done = run_threads(nthreads, add_image, get_image,
+ (void *)&qargs, chunk_size);
- n_images++;
-
- STATUS("Done %i images\n", n_images);
-
- /* Wake the thread up ... */
- pthread_mutex_lock(&pargs->control_mutex);
- pargs->done = 0;
- pargs->start = 1;
- pthread_mutex_unlock(&pargs->control_mutex);
-
- if ( n_images % 1000 == 0 ) {
- if ( intermediate != NULL ) {
- dump_to_file(worker_args, nthreads,
- w, h, n_images,
- intermediate);
- }
- }
+ n_images += n_done;
+ /* Write intermediate sum if requested */
+ if ( (intermediate != NULL) && (n_done == chunk_size) ) {
+ char outfile[1024];
+ snprintf(outfile, 1023, "%s-%i.h5",
+ intermediate, n_images);
+ hdf5_write(outfile, qargs.sum, qargs.w, qargs.h,
+ H5T_NATIVE_DOUBLE);
}
- } while ( rval != NULL );
-
- /* Join threads */
- for ( i=0; i<nthreads; i++ ) {
-
- if ( !worker_active[i] ) goto free;
-
-
- /* Tell the thread to exit */
- struct process_args *pargs = worker_args[i];
- pthread_mutex_lock(&pargs->control_mutex);
- pargs->finish = 1;
- pthread_mutex_unlock(&pargs->control_mutex);
-
- /* Wait for it to join */
- pthread_join(workers[i], NULL);
-
- free:
- if ( worker_args[i]->filename != NULL ) {
- free(worker_args[i]->filename);
- }
-
- }
-
- /* Add the individual sums to the 0th sum */
- for ( i=1; i<nthreads; i++ ) {
-
- int x, y;
-
- for ( x=0; x<w; x++ ) {
- for ( y=0; y<h; y++ ) {
- double val = worker_args[i]->sum[x+w*y];
- worker_args[0]->sum[x+w*y] += val;
- }
- }
- free(worker_args[i]->sum);
- free(worker_args[i]);
-
- }
+ } while ( n_done == chunk_size );
- hdf5_write(outfile, worker_args[0]->sum, w, h, H5T_NATIVE_DOUBLE);
+ /* Write the final output */
+ hdf5_write(outfile, qargs.sum, qargs.w, qargs.h, H5T_NATIVE_DOUBLE);
- free(worker_args[0]->sum);
- free(worker_args[0]);
+ free(qargs.sum);
free(prefix);
free(outfile);
if ( intermediate != NULL ) free(intermediate);