aboutsummaryrefslogtreecommitdiff
path: root/src/thread-pool.c
diff options
context:
space:
mode:
authorThomas White <taw@physics.org>2011-01-11 17:34:50 +0100
committerThomas White <taw@physics.org>2012-02-22 15:27:10 +0100
commit3dc2ba04ab33be723d2ea8dfe1638a127519a0ae (patch)
tree200aaefb204543a0b3840c24b38bcbc21a16e18e /src/thread-pool.c
parent5ed6541407ce99c4036d0e768546fe554f9fa5c2 (diff)
Prefix STATUS() and ERROR() messages with a unique thread number, where appropriate
Diffstat (limited to 'src/thread-pool.c')
-rw-r--r--src/thread-pool.c116
1 files changed, 86 insertions, 30 deletions
diff --git a/src/thread-pool.c b/src/thread-pool.c
index d9f734d8..f44a8f85 100644
--- a/src/thread-pool.c
+++ b/src/thread-pool.c
@@ -26,6 +26,31 @@
#include "utils.h"
+static int use_status_labels = 0;
+static pthread_key_t status_label_key;
+pthread_mutex_t stderr_lock = PTHREAD_MUTEX_INITIALIZER;
+
+struct worker_args
+{
+ struct task_queue_range *tqr;
+ struct task_queue *tq;
+ int id;
+};
+
+
+signed int get_status_label()
+{
+ int *cookie;
+
+ if ( !use_status_labels ) {
+ return -1;
+ }
+
+ cookie = pthread_getspecific(status_label_key);
+ return *cookie;
+}
+
+
/* ---------------------------------- Range --------------------------------- */
enum {
@@ -52,7 +77,15 @@ struct task_queue_range
static void *range_worker(void *pargsv)
{
- struct task_queue_range *q = pargsv;
+ struct worker_args *w = pargsv;
+ struct task_queue_range *q = w->tqr;
+ int *cookie;
+
+ cookie = malloc(sizeof(int));
+ *cookie = w->id;
+ pthread_setspecific(status_label_key, cookie);
+
+ free(w);
do {
@@ -88,6 +121,8 @@ static void *range_worker(void *pargsv)
} while ( 1 );
+ free(cookie);
+
return NULL;
}
@@ -102,6 +137,8 @@ void run_thread_range(int n_tasks, int n_threads, const char *text,
/* The nation of CrystFEL prides itself on having 0% unemployment. */
if ( n_threads > n_tasks ) n_threads = n_tasks;
+ pthread_key_create(&status_label_key, NULL);
+
workers = malloc(n_threads * sizeof(pthread_t));
q.status = malloc(n_tasks * sizeof(int));
@@ -116,11 +153,23 @@ void run_thread_range(int n_tasks, int n_threads, const char *text,
q.status[i] = TASK_READY;
}
+ /* Now it's safe to start using the status labels */
+ if ( n_threads > 1 ) use_status_labels = 1;
+
/* Start threads */
for ( i=0; i<n_threads; i++ ) {
- if ( pthread_create(&workers[i], NULL, range_worker, &q) ) {
- ERROR("Couldn't start thread %i\n", i);
+ struct worker_args *w;
+
+ w = malloc(sizeof(struct worker_args));
+
+ w->tqr = &q;
+ w->tq = NULL;
+ w->id = i;
+
+ if ( pthread_create(&workers[i], NULL, range_worker, &w) ) {
+ /* Not ERROR() here */
+ fprintf(stderr, "Couldn't start thread %i\n", i);
n_threads = i;
break;
}
@@ -132,6 +181,8 @@ void run_thread_range(int n_tasks, int n_threads, const char *text,
pthread_join(workers[i], NULL);
}
+ use_status_labels = 0;
+
free(q.status);
free(workers);
}
@@ -146,8 +197,6 @@ struct task_queue
int n_started;
int n_completed;
int max;
- int n_cookies;
- int *cookies;
void *(*get_task)(void *);
void (*finalise)(void *, void *);
@@ -158,14 +207,21 @@ struct task_queue
static void *task_worker(void *pargsv)
{
- struct task_queue *q = pargsv;
+ struct worker_args *w = pargsv;
+ struct task_queue *q = w->tq;
+ int *cookie;
+
+ cookie = malloc(sizeof(int));
+ *cookie = w->id;
+ pthread_setspecific(status_label_key, cookie);
+ STATUS("Initialised thread %i\n", w->id);
+
+ free(w);
do {
void *task;
- int i;
- int mycookie = -1;
- int found = 0;
+ int cookie;
/* Get a task */
pthread_mutex_lock(&q->lock);
@@ -181,26 +237,15 @@ static void *task_worker(void *pargsv)
break;
}
- /* Find a cookie */
- for ( i=0; i<q->n_cookies; i++ ) {
- if ( q->cookies[i] == 0 ) {
- mycookie = i;
- found = 1;
- q->cookies[i] = 1;
- break;
- }
- }
- assert(found);
-
q->n_started++;
pthread_mutex_unlock(&q->lock);
- q->work(task, mycookie);
+ cookie = *(int *)pthread_getspecific(status_label_key);
+ q->work(task, cookie);
/* Update totals, release cookie etc */
pthread_mutex_lock(&q->lock);
q->n_completed++;
- q->cookies[mycookie] = 0;
if ( q->finalise ) {
q->finalise(q->queue_args, task);
}
@@ -208,6 +253,8 @@ static void *task_worker(void *pargsv)
} while ( 1 );
+ free(cookie);
+
return NULL;
}
@@ -220,6 +267,8 @@ int run_threads(int n_threads, void (*work)(void *, int),
int i;
struct task_queue q;
+ pthread_key_create(&status_label_key, NULL);
+
workers = malloc(n_threads * sizeof(pthread_t));
pthread_mutex_init(&q.lock, NULL);
@@ -230,18 +279,24 @@ int run_threads(int n_threads, void (*work)(void *, int),
q.n_started = 0;
q.n_completed = 0;
q.max = max;
- q.n_cookies = n_threads;
- q.cookies = malloc(q.n_cookies * sizeof(int));
- for ( i=0; i<n_threads; i++ ) {
- q.cookies[i] = 0;
- }
+ /* Now it's safe to start using the status labels */
+ if ( n_threads > 1 ) use_status_labels = 1;
/* Start threads */
for ( i=0; i<n_threads; i++ ) {
- if ( pthread_create(&workers[i], NULL, task_worker, &q) ) {
- ERROR("Couldn't start thread %i\n", i);
+ struct worker_args *w;
+
+ w = malloc(sizeof(struct worker_args));
+
+ w->tq = &q;
+ w->tqr = NULL;
+ w->id = i;
+
+ if ( pthread_create(&workers[i], NULL, task_worker, w) ) {
+ /* Not ERROR() here */
+ fprintf(stderr, "Couldn't start thread %i\n", i);
n_threads = i;
break;
}
@@ -253,8 +308,9 @@ int run_threads(int n_threads, void (*work)(void *, int),
pthread_join(workers[i], NULL);
}
+ use_status_labels = 0;
+
free(workers);
- free(q.cookies);
return q.n_completed;
}