aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorThomas White <taw@physics.org>2010-07-06 18:31:20 +0200
committerThomas White <taw@physics.org>2012-02-22 15:26:52 +0100
commitae5c7478b4e5e0fe0ef421c3be7cd7f452913739 (patch)
treea9509b257b13270f2b88d2488f1eee07bc2e6ce0
parent68b8e4272dc7cb808323e7b37bd02170403332bd (diff)
calibrate_detector: Rework multithreading not to use pthread_timedjoin_np()
-rw-r--r--src/calibrate_detector.c108
1 files changed, 79 insertions, 29 deletions
diff --git a/src/calibrate_detector.c b/src/calibrate_detector.c
index 70b70c71..68c9782b 100644
--- a/src/calibrate_detector.c
+++ b/src/calibrate_detector.c
@@ -51,6 +51,12 @@ struct process_args
int h;
SumMethod sum_method;
double threshold;
+
+ /* Thread control */
+ pthread_mutex_t control_mutex; /* Protects the scary stuff below */
+ int start;
+ int finish;
+ int done;
};
@@ -142,9 +148,8 @@ static void sum_threshold(struct image *image, double *sum, double threshold)
}
-static void *process_image(void *pargsv)
+static void process_image(struct process_args *pargs)
{
- struct process_args *pargs = pargsv;
struct hdfile *hdfile;
struct image image;
@@ -168,11 +173,11 @@ static void *process_image(void *pargsv)
hdfile = hdfile_open(pargs->filename);
if ( hdfile == NULL ) {
- return NULL;
+ return;
} else if ( hdfile_set_first_image(hdfile, "/") ) {
ERROR("Couldn't select path\n");
hdfile_close(hdfile);
- return NULL;
+ return;
}
hdf5_read(hdfile, &image, 1);
@@ -206,6 +211,38 @@ out:
free(image.data);
if ( image.flags != NULL ) free(image.flags);
hdfile_close(hdfile);
+}
+
+
+static void *worker_thread(void *pargsv)
+{
+ struct process_args *pargs = pargsv;
+ int finish;
+
+ do {
+
+ int wakeup;
+
+ process_image(pargs);
+
+ pthread_mutex_lock(&pargs->control_mutex);
+ pargs->done = 1;
+ pargs->start = 0;
+ pthread_mutex_unlock(&pargs->control_mutex);
+
+ /* Go to sleep until told to exit or process next image */
+ do {
+
+ pthread_mutex_lock(&pargs->control_mutex);
+ /* Either of these can result in the thread waking up */
+ wakeup = pargs->start || pargs->finish;
+ finish = pargs->finish;
+ pthread_mutex_unlock(&pargs->control_mutex);
+ usleep(20000);
+
+ } while ( !wakeup );
+
+ } while ( !pargs->finish );
return NULL;
}
@@ -383,7 +420,7 @@ int main(int argc, char *argv[])
n_images = 0;
- /* Initially, fire off the full number of threads */
+ /* Start threads off */
for ( i=0; i<nthreads; i++ ) {
char line[1024];
@@ -399,11 +436,18 @@ int main(int argc, char *argv[])
n_images++;
+ pthread_mutex_init(&pargs->control_mutex, NULL);
pargs->config_cmfilter = config_cmfilter;
pargs->config_noisefilter = config_noisefilter;
+ pthread_mutex_lock(&pargs->control_mutex);
+ pargs->done = 0;
+ pargs->start = 1;
+ pargs->finish = 0;
+ pthread_mutex_unlock(&pargs->control_mutex);
+
worker_active[i] = 1;
- r = pthread_create(&workers[i], NULL, process_image, pargs);
+ r = pthread_create(&workers[i], NULL, worker_thread, pargs);
if ( r != 0 ) {
worker_active[i] = 0;
ERROR("Couldn't start thread %i\n", i);
@@ -411,7 +455,7 @@ int main(int argc, char *argv[])
}
- /* Start new threads as old ones finish */
+ /* Keep threads busy until the end of the data */
do {
int i;
@@ -419,39 +463,38 @@ int main(int argc, char *argv[])
for ( i=0; i<nthreads; i++ ) {
char line[1024];
- int r;
- struct timespec t;
- struct timeval tv;
struct process_args *pargs;
+ int done;
+
+ /* Spend time working, not managing threads */
+ usleep(100000);
+ /* Are we using this thread record at all? */
if ( !worker_active[i] ) continue;
+ /* Has the thread finished yet? */
pargs = worker_args[i];
+ pthread_mutex_lock(&pargs->control_mutex);
+ done = pargs->done;
+ pthread_mutex_unlock(&pargs->control_mutex);
+ if ( !done ) continue;
- gettimeofday(&tv, NULL);
- t.tv_sec = tv.tv_sec;
- t.tv_nsec = tv.tv_usec * 1000 + 20000;
-
- r = pthread_timedjoin_np(workers[i], NULL, &t);
- if ( r != 0 ) continue; /* Not ready yet */
-
- worker_active[i] = 0;
-
+ /* Get the next filename */
rval = fgets(line, 1023, fh);
if ( rval == NULL ) break;
chomp(line);
snprintf(pargs->filename, 1023, "%s%s", prefix, line);
- worker_active[i] = 1;
- r = pthread_create(&workers[i], NULL, process_image, pargs);
- if ( r != 0 ) {
- worker_active[i] = 0;
- ERROR("Couldn't start thread %i\n", i);
- }
-
n_images++;
+
STATUS("Done %i images\n", n_images);
+ /* Wake the thread up ... */
+ pthread_mutex_lock(&pargs->control_mutex);
+ pargs->done = 0;
+ pargs->start = 1;
+ pthread_mutex_unlock(&pargs->control_mutex);
+
if ( n_images % 1000 == 0 ) {
if ( intermediate != NULL ) {
dump_to_file(worker_args, nthreads,
@@ -459,18 +502,25 @@ int main(int argc, char *argv[])
intermediate);
}
}
+
}
} while ( rval != NULL );
- /* Catch all remaining threads */
+ /* Join threads */
for ( i=0; i<nthreads; i++ ) {
if ( !worker_active[i] ) goto free;
- pthread_join(workers[i], NULL);
- worker_active[i] = 0;
+ /* Tell the thread to exit */
+ struct process_args *pargs = worker_args[i];
+ pthread_mutex_lock(&pargs->control_mutex);
+ pargs->finish = 1;
+ pthread_mutex_unlock(&pargs->control_mutex);
+
+ /* Wait for it to join */
+ pthread_join(workers[i], NULL);
free:
if ( worker_args[i]->filename != NULL ) {