From 3dc2ba04ab33be723d2ea8dfe1638a127519a0ae Mon Sep 17 00:00:00 2001 From: Thomas White Date: Tue, 11 Jan 2011 17:34:50 +0100 Subject: Prefix STATUS() and ERROR() messages with a unique thread number, where appropriate --- src/thread-pool.c | 116 ++++++++++++++++++++++++++++++++++++++++-------------- src/thread-pool.h | 3 ++ src/utils.h | 27 ++++++++++++- 3 files changed, 114 insertions(+), 32 deletions(-) (limited to 'src') diff --git a/src/thread-pool.c b/src/thread-pool.c index d9f734d8..f44a8f85 100644 --- a/src/thread-pool.c +++ b/src/thread-pool.c @@ -26,6 +26,31 @@ #include "utils.h" +static int use_status_labels = 0; +static pthread_key_t status_label_key; +pthread_mutex_t stderr_lock = PTHREAD_MUTEX_INITIALIZER; + +struct worker_args +{ + struct task_queue_range *tqr; + struct task_queue *tq; + int id; +}; + + +signed int get_status_label() +{ + int *cookie; + + if ( !use_status_labels ) { + return -1; + } + + cookie = pthread_getspecific(status_label_key); + return *cookie; +} + + /* ---------------------------------- Range --------------------------------- */ enum { @@ -52,7 +77,15 @@ struct task_queue_range static void *range_worker(void *pargsv) { - struct task_queue_range *q = pargsv; + struct worker_args *w = pargsv; + struct task_queue_range *q = w->tqr; + int *cookie; + + cookie = malloc(sizeof(int)); + *cookie = w->id; + pthread_setspecific(status_label_key, cookie); + + free(w); do { @@ -88,6 +121,8 @@ static void *range_worker(void *pargsv) } while ( 1 ); + free(cookie); + return NULL; } @@ -102,6 +137,8 @@ void run_thread_range(int n_tasks, int n_threads, const char *text, /* The nation of CrystFEL prides itself on having 0% unemployment. */ if ( n_threads > n_tasks ) n_threads = n_tasks; + pthread_key_create(&status_label_key, NULL); + workers = malloc(n_threads * sizeof(pthread_t)); q.status = malloc(n_tasks * sizeof(int)); @@ -116,11 +153,23 @@ void run_thread_range(int n_tasks, int n_threads, const char *text, q.status[i] = TASK_READY; } + /* Now it's safe to start using the status labels */ + if ( n_threads > 1 ) use_status_labels = 1; + /* Start threads */ for ( i=0; itqr = &q; + w->tq = NULL; + w->id = i; + + if ( pthread_create(&workers[i], NULL, range_worker, &w) ) { + /* Not ERROR() here */ + fprintf(stderr, "Couldn't start thread %i\n", i); n_threads = i; break; } @@ -132,6 +181,8 @@ void run_thread_range(int n_tasks, int n_threads, const char *text, pthread_join(workers[i], NULL); } + use_status_labels = 0; + free(q.status); free(workers); } @@ -146,8 +197,6 @@ struct task_queue int n_started; int n_completed; int max; - int n_cookies; - int *cookies; void *(*get_task)(void *); void (*finalise)(void *, void *); @@ -158,14 +207,21 @@ struct task_queue static void *task_worker(void *pargsv) { - struct task_queue *q = pargsv; + struct worker_args *w = pargsv; + struct task_queue *q = w->tq; + int *cookie; + + cookie = malloc(sizeof(int)); + *cookie = w->id; + pthread_setspecific(status_label_key, cookie); + STATUS("Initialised thread %i\n", w->id); + + free(w); do { void *task; - int i; - int mycookie = -1; - int found = 0; + int cookie; /* Get a task */ pthread_mutex_lock(&q->lock); @@ -181,26 +237,15 @@ static void *task_worker(void *pargsv) break; } - /* Find a cookie */ - for ( i=0; in_cookies; i++ ) { - if ( q->cookies[i] == 0 ) { - mycookie = i; - found = 1; - q->cookies[i] = 1; - break; - } - } - assert(found); - q->n_started++; pthread_mutex_unlock(&q->lock); - q->work(task, mycookie); + cookie = *(int *)pthread_getspecific(status_label_key); + q->work(task, cookie); /* Update totals, release cookie etc */ pthread_mutex_lock(&q->lock); q->n_completed++; - q->cookies[mycookie] = 0; if ( q->finalise ) { q->finalise(q->queue_args, task); } @@ -208,6 +253,8 @@ static void *task_worker(void *pargsv) } while ( 1 ); + free(cookie); + return NULL; } @@ -220,6 +267,8 @@ int run_threads(int n_threads, void (*work)(void *, int), int i; struct task_queue q; + pthread_key_create(&status_label_key, NULL); + workers = malloc(n_threads * sizeof(pthread_t)); pthread_mutex_init(&q.lock, NULL); @@ -230,18 +279,24 @@ int run_threads(int n_threads, void (*work)(void *, int), q.n_started = 0; q.n_completed = 0; q.max = max; - q.n_cookies = n_threads; - q.cookies = malloc(q.n_cookies * sizeof(int)); - for ( i=0; i 1 ) use_status_labels = 1; /* Start threads */ for ( i=0; itq = &q; + w->tqr = NULL; + w->id = i; + + if ( pthread_create(&workers[i], NULL, task_worker, w) ) { + /* Not ERROR() here */ + fprintf(stderr, "Couldn't start thread %i\n", i); n_threads = i; break; } @@ -253,8 +308,9 @@ int run_threads(int n_threads, void (*work)(void *, int), pthread_join(workers[i], NULL); } + use_status_labels = 0; + free(workers); - free(q.cookies); return q.n_completed; } diff --git a/src/thread-pool.h b/src/thread-pool.h index 05e2f55a..23c7c632 100644 --- a/src/thread-pool.h +++ b/src/thread-pool.h @@ -18,6 +18,9 @@ #endif +extern signed int get_status_label(void); + + /* work() will be called with a number and work_args. The number will be * unique and in the range 0..n_tasks. A progress bar will be shown using * "text" and the progress through the tasks, unless "text" is NULL. */ diff --git a/src/utils.h b/src/utils.h index 29e2aeda..30ab27c8 100644 --- a/src/utils.h +++ b/src/utils.h @@ -20,6 +20,10 @@ #include #include #include +#include + + +#include "thread-pool.h" /* -------------------------- Fundamental constants ------------------------ */ @@ -216,8 +220,27 @@ extern ReflItemList *intersection_items(ReflItemList *i1, ReflItemList *i2); /* ------------------------------ Message macros ---------------------------- */ -#define ERROR(...) fprintf(stderr, __VA_ARGS__) -#define STATUS(...) fprintf(stderr, __VA_ARGS__) +extern pthread_mutex_t stderr_lock; + +#define ERROR(...) { \ + int val = get_status_label(); \ + pthread_mutex_lock(&stderr_lock); \ + if ( val >= 0 ) { \ + fprintf(stderr, "%3i: ", val); \ + } \ + fprintf(stderr, __VA_ARGS__); \ + pthread_mutex_unlock(&stderr_lock); \ + } + +#define STATUS(...) { \ + int val = get_status_label(); \ + pthread_mutex_lock(&stderr_lock); \ + if ( val >= 0 ) { \ + fprintf(stderr, "%3i: ", val); \ + } \ + fprintf(stderr, __VA_ARGS__); \ + pthread_mutex_unlock(&stderr_lock); \ + } /* ------------------------------ File handling ----------------------------- */ -- cgit v1.2.3