aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorThomas White <taw@physics.org>2010-07-05 18:39:07 +0200
committerThomas White <taw@physics.org>2012-02-22 15:26:52 +0100
commit8cb69e84552e0c2f144e145881a78f0db674201f (patch)
treedcda1ef1ff4cf54d4106e40bf97d71b25a9b7ba1
parent6aee185fbdaa5c5bb1216066e53ed343f468b4ae (diff)
indexamajig: Rework threading not to use pthread_timedjoin_np()
-rw-r--r--src/indexamajig.c140
1 files changed, 92 insertions, 48 deletions
diff --git a/src/indexamajig.c b/src/indexamajig.c
index 87dfb6dc..548d60dc 100644
--- a/src/indexamajig.c
+++ b/src/indexamajig.c
@@ -42,6 +42,7 @@
struct process_args
{
+ /* Input */
char *filename;
int id;
pthread_mutex_t *output_mutex; /* Protects stdout */
@@ -66,8 +67,17 @@ struct process_args
const double *intensities;
const unsigned int *counts;
struct gpu_context *gctx;
+
+ /* Thread control and output */
+ pthread_mutex_t control_mutex; /* Protects the scary stuff below */
+ int start;
+ int finish;
+ int done;
+ int hit;
+ int peaks_sane;
};
+
struct process_result
{
int hit;
@@ -237,13 +247,13 @@ static void simulate_and_write(struct image *simage, struct gpu_context **gctx,
}
-static void *process_image(void *pargsv)
+static struct process_result process_image(struct process_args *pargs)
{
- struct process_args *pargs = pargsv;
struct hdfile *hdfile;
struct image image;
struct image *simage;
float *data_for_measurement;
+ struct process_result result;
size_t data_size;
const char *filename = pargs->filename;
UnitCell *cell = pargs->cell;
@@ -262,7 +272,6 @@ static void *process_image(void *pargsv)
const double *intensities = pargs->intensities;
const unsigned int *counts = pargs->counts;
struct gpu_context *gctx = pargs->gctx;
- struct process_result *result;
image.features = NULL;
image.data = NULL;
@@ -281,10 +290,8 @@ static void *process_image(void *pargsv)
STATUS("Processing '%s'\n", image.filename);
- result = malloc(sizeof(*result));
- if ( result == NULL ) return NULL;
- result->peaks_sane = 0;
- result->hit = 0;
+ result.peaks_sane = 0;
+ result.hit = 0;
hdfile = hdfile_open(filename);
if ( hdfile == NULL ) {
@@ -344,7 +351,7 @@ static void *process_image(void *pargsv)
STATUS("Failed peak sanity check.\n");
goto done;
} else {
- result->peaks_sane = 1;
+ result.peaks_sane = 1;
}
/* Measure intensities if requested */
@@ -385,14 +392,51 @@ done:
hdfile_close(hdfile);
if ( image.indexed_cell == NULL ) {
- result->hit = 0;
+ result.hit = 0;
} else {
- result->hit = 1;
+ result.hit = 1;
}
return result;
}
+static void *worker_thread(void *pargsv)
+{
+ struct process_args *pargs = pargsv;
+ int finish;
+
+ do {
+
+ struct process_result result;
+ int wakeup;
+
+ result = process_image(pargs);
+
+ pthread_mutex_lock(&pargs->control_mutex);
+ pargs->hit = result.hit;
+ pargs->peaks_sane = result.peaks_sane;
+ pargs->done = 1;
+ pargs->start = 0;
+ pthread_mutex_unlock(&pargs->control_mutex);
+
+ /* Go to sleep until told to exit or process next image */
+ do {
+
+ pthread_mutex_lock(&pargs->control_mutex);
+ /* Either of these can result in the thread waking up */
+ wakeup = pargs->start || pargs->finish;
+ finish = pargs->finish;
+ pthread_mutex_unlock(&pargs->control_mutex);
+ usleep(20000);
+
+ } while ( !wakeup );
+
+ } while ( !pargs->finish );
+
+ return NULL;
+}
+
+
int main(int argc, char *argv[])
{
int c;
@@ -590,7 +634,7 @@ int main(int argc, char *argv[])
worker_active[i] = 0;
}
- /* Initially, fire off the full number of threads */
+ /* Start threads off */
for ( i=0; i<nthreads; i++ ) {
char line[1024];
@@ -608,6 +652,7 @@ int main(int argc, char *argv[])
pargs->output_mutex = &output_mutex;
pargs->gpu_mutex = &gpu_mutex;
+ pthread_mutex_init(&pargs->control_mutex, NULL);
pargs->config_cmfilter = config_cmfilter;
pargs->config_noisefilter = config_noisefilter;
pargs->config_writedrx = config_writedrx;
@@ -629,9 +674,14 @@ int main(int argc, char *argv[])
pargs->counts = counts;
pargs->gctx = gctx;
pargs->id = i;
+ pthread_mutex_lock(&pargs->control_mutex);
+ pargs->done = 0;
+ pargs->start = 1;
+ pargs->finish = 0;
+ pthread_mutex_unlock(&pargs->control_mutex);
worker_active[i] = 1;
- r = pthread_create(&workers[i], NULL, process_image, pargs);
+ r = pthread_create(&workers[i], NULL, worker_thread, pargs);
if ( r != 0 ) {
worker_active[i] = 0;
ERROR("Couldn't start thread %i\n", i);
@@ -639,7 +689,7 @@ int main(int argc, char *argv[])
}
- /* Start new threads as old ones finish */
+ /* Keep threads busy until the end of the data */
do {
int i;
@@ -647,65 +697,59 @@ int main(int argc, char *argv[])
for ( i=0; i<nthreads; i++ ) {
char line[1024];
- int r;
- struct process_result *result = NULL;
- struct timespec t;
- struct timeval tv;
struct process_args *pargs;
+ int done;
+
+ /* Spend CPU time indexing, not checking results */
+ usleep(100000);
+ /* Are we using this thread record at all? */
if ( !worker_active[i] ) continue;
+ /* Has the thread finished yet? */
pargs = worker_args[i];
+ pthread_mutex_lock(&pargs->control_mutex);
+ done = pargs->done;
+ pthread_mutex_unlock(&pargs->control_mutex);
+ if ( !done ) continue;
- gettimeofday(&tv, NULL);
- t.tv_sec = tv.tv_sec;
- t.tv_nsec = tv.tv_usec * 1000 + 20000;
-
- r = pthread_timedjoin_np(workers[i], (void *)&result,
- &t);
- if ( r != 0 ) continue; /* Not ready yet */
-
- worker_active[i] = 0;
-
- if ( result != NULL ) {
- n_hits += result->hit;
- n_sane += result->peaks_sane;
- free(result);
- }
+ /* Record the result */
+ n_hits += pargs->hit;
+ n_sane += pargs->peaks_sane;
+ /* Get next filename */
rval = fgets(line, 1023, fh);
if ( rval == NULL ) break;
chomp(line);
snprintf(pargs->filename, 1023, "%s%s", prefix, line);
- worker_active[i] = 1;
- r = pthread_create(&workers[i], NULL, process_image,
- pargs);
- if ( r != 0 ) {
- worker_active[i] = 0;
- ERROR("Couldn't start thread %i\n", i);
- }
-
n_images++;
+
+ /* Wake the thread up ... */
+ pthread_mutex_lock(&pargs->control_mutex);
+ pargs->done = 0;
+ pargs->start = 1;
+ pthread_mutex_unlock(&pargs->control_mutex);
+
}
} while ( rval != NULL );
- /* Catch all remaining threads */
+ /* Join threads */
for ( i=0; i<nthreads; i++ ) {
- struct process_result *result = NULL;
-
if ( !worker_active[i] ) goto free;
- pthread_join(workers[i], (void *)&result);
+ /* Tell the thread to exit */
+ struct process_args *pargs = worker_args[i];
+ pargs->finish = 1;
+ /* Wait for it to join */
+ pthread_join(workers[i], NULL);
worker_active[i] = 0;
- if ( result != NULL ) {
- n_hits += result->hit;
- free(result);
- }
+ n_hits += pargs->hit;
+ n_sane += pargs->peaks_sane;
free:
if ( worker_args[i]->filename != NULL ) {