diff options
author | Thomas White <taw@physics.org> | 2011-01-11 17:34:50 +0100 |
---|---|---|
committer | Thomas White <taw@physics.org> | 2012-02-22 15:27:10 +0100 |
commit | 3dc2ba04ab33be723d2ea8dfe1638a127519a0ae (patch) | |
tree | 200aaefb204543a0b3840c24b38bcbc21a16e18e /src/thread-pool.c | |
parent | 5ed6541407ce99c4036d0e768546fe554f9fa5c2 (diff) |
Prefix STATUS() and ERROR() messages with a unique thread number, where appropriate
Diffstat (limited to 'src/thread-pool.c')
-rw-r--r-- | src/thread-pool.c | 116 |
1 files changed, 86 insertions, 30 deletions
diff --git a/src/thread-pool.c b/src/thread-pool.c index d9f734d8..f44a8f85 100644 --- a/src/thread-pool.c +++ b/src/thread-pool.c @@ -26,6 +26,31 @@ #include "utils.h" +static int use_status_labels = 0; +static pthread_key_t status_label_key; +pthread_mutex_t stderr_lock = PTHREAD_MUTEX_INITIALIZER; + +struct worker_args +{ + struct task_queue_range *tqr; + struct task_queue *tq; + int id; +}; + + +signed int get_status_label() +{ + int *cookie; + + if ( !use_status_labels ) { + return -1; + } + + cookie = pthread_getspecific(status_label_key); + return *cookie; +} + + /* ---------------------------------- Range --------------------------------- */ enum { @@ -52,7 +77,15 @@ struct task_queue_range static void *range_worker(void *pargsv) { - struct task_queue_range *q = pargsv; + struct worker_args *w = pargsv; + struct task_queue_range *q = w->tqr; + int *cookie; + + cookie = malloc(sizeof(int)); + *cookie = w->id; + pthread_setspecific(status_label_key, cookie); + + free(w); do { @@ -88,6 +121,8 @@ static void *range_worker(void *pargsv) } while ( 1 ); + free(cookie); + return NULL; } @@ -102,6 +137,8 @@ void run_thread_range(int n_tasks, int n_threads, const char *text, /* The nation of CrystFEL prides itself on having 0% unemployment. */ if ( n_threads > n_tasks ) n_threads = n_tasks; + pthread_key_create(&status_label_key, NULL); + workers = malloc(n_threads * sizeof(pthread_t)); q.status = malloc(n_tasks * sizeof(int)); @@ -116,11 +153,23 @@ void run_thread_range(int n_tasks, int n_threads, const char *text, q.status[i] = TASK_READY; } + /* Now it's safe to start using the status labels */ + if ( n_threads > 1 ) use_status_labels = 1; + /* Start threads */ for ( i=0; i<n_threads; i++ ) { - if ( pthread_create(&workers[i], NULL, range_worker, &q) ) { - ERROR("Couldn't start thread %i\n", i); + struct worker_args *w; + + w = malloc(sizeof(struct worker_args)); + + w->tqr = &q; + w->tq = NULL; + w->id = i; + + if ( pthread_create(&workers[i], NULL, range_worker, &w) ) { + /* Not ERROR() here */ + fprintf(stderr, "Couldn't start thread %i\n", i); n_threads = i; break; } @@ -132,6 +181,8 @@ void run_thread_range(int n_tasks, int n_threads, const char *text, pthread_join(workers[i], NULL); } + use_status_labels = 0; + free(q.status); free(workers); } @@ -146,8 +197,6 @@ struct task_queue int n_started; int n_completed; int max; - int n_cookies; - int *cookies; void *(*get_task)(void *); void (*finalise)(void *, void *); @@ -158,14 +207,21 @@ struct task_queue static void *task_worker(void *pargsv) { - struct task_queue *q = pargsv; + struct worker_args *w = pargsv; + struct task_queue *q = w->tq; + int *cookie; + + cookie = malloc(sizeof(int)); + *cookie = w->id; + pthread_setspecific(status_label_key, cookie); + STATUS("Initialised thread %i\n", w->id); + + free(w); do { void *task; - int i; - int mycookie = -1; - int found = 0; + int cookie; /* Get a task */ pthread_mutex_lock(&q->lock); @@ -181,26 +237,15 @@ static void *task_worker(void *pargsv) break; } - /* Find a cookie */ - for ( i=0; i<q->n_cookies; i++ ) { - if ( q->cookies[i] == 0 ) { - mycookie = i; - found = 1; - q->cookies[i] = 1; - break; - } - } - assert(found); - q->n_started++; pthread_mutex_unlock(&q->lock); - q->work(task, mycookie); + cookie = *(int *)pthread_getspecific(status_label_key); + q->work(task, cookie); /* Update totals, release cookie etc */ pthread_mutex_lock(&q->lock); q->n_completed++; - q->cookies[mycookie] = 0; if ( q->finalise ) { q->finalise(q->queue_args, task); } @@ -208,6 +253,8 @@ static void *task_worker(void *pargsv) } while ( 1 ); + free(cookie); + return NULL; } @@ -220,6 +267,8 @@ int run_threads(int n_threads, void (*work)(void *, int), int i; struct task_queue q; + pthread_key_create(&status_label_key, NULL); + workers = malloc(n_threads * sizeof(pthread_t)); pthread_mutex_init(&q.lock, NULL); @@ -230,18 +279,24 @@ int run_threads(int n_threads, void (*work)(void *, int), q.n_started = 0; q.n_completed = 0; q.max = max; - q.n_cookies = n_threads; - q.cookies = malloc(q.n_cookies * sizeof(int)); - for ( i=0; i<n_threads; i++ ) { - q.cookies[i] = 0; - } + /* Now it's safe to start using the status labels */ + if ( n_threads > 1 ) use_status_labels = 1; /* Start threads */ for ( i=0; i<n_threads; i++ ) { - if ( pthread_create(&workers[i], NULL, task_worker, &q) ) { - ERROR("Couldn't start thread %i\n", i); + struct worker_args *w; + + w = malloc(sizeof(struct worker_args)); + + w->tq = &q; + w->tqr = NULL; + w->id = i; + + if ( pthread_create(&workers[i], NULL, task_worker, w) ) { + /* Not ERROR() here */ + fprintf(stderr, "Couldn't start thread %i\n", i); n_threads = i; break; } @@ -253,8 +308,9 @@ int run_threads(int n_threads, void (*work)(void *, int), pthread_join(workers[i], NULL); } + use_status_labels = 0; + free(workers); - free(q.cookies); return q.n_completed; } |