diff options
Diffstat (limited to 'src/thread-pool.c')
-rw-r--r-- | src/thread-pool.c | 109 |
1 files changed, 102 insertions, 7 deletions
diff --git a/src/thread-pool.c b/src/thread-pool.c index ac508b11..358fb4f3 100644 --- a/src/thread-pool.c +++ b/src/thread-pool.c @@ -25,6 +25,8 @@ #include "utils.h" +/* ---------------------------------- Range --------------------------------- */ + enum { TASK_READY, TASK_RUNNING, @@ -32,7 +34,7 @@ enum { }; -struct task_queue +struct task_queue_range { pthread_mutex_t lock; @@ -47,9 +49,9 @@ struct task_queue }; -static void *worker_thread(void *pargsv) +static void *range_worker(void *pargsv) { - struct task_queue *q = pargsv; + struct task_queue_range *q = pargsv; do { @@ -89,12 +91,12 @@ static void *worker_thread(void *pargsv) } -void munch_threads(int n_tasks, int n_threads, const char *text, - void (*work)(int, void *), void *work_args) +void run_thread_range(int n_tasks, int n_threads, const char *text, + void (*work)(int, void *), void *work_args) { pthread_t *workers; int i; - struct task_queue q; + struct task_queue_range q; /* The nation of CrystFEL prides itself on having 0% unemployment. */ if ( n_threads > n_tasks ) n_threads = n_tasks; @@ -116,7 +118,7 @@ void munch_threads(int n_tasks, int n_threads, const char *text, /* Start threads */ for ( i=0; i<n_threads; i++ ) { - if ( pthread_create(&workers[i], NULL, worker_thread, &q) ) { + if ( pthread_create(&workers[i], NULL, range_worker, &q) ) { ERROR("Couldn't start thread %i\n", i); n_threads = i; break; @@ -132,3 +134,96 @@ void munch_threads(int n_tasks, int n_threads, const char *text, free(q.status); free(workers); } + + +/* ---------------------------- Custom get_task() --------------------------- */ + +struct task_queue +{ + pthread_mutex_t lock; + + int n_started; + int n_completed; + int max; + + void *(*get_task)(void *); + void *queue_args; + void (*work)(void *); +}; + + +static void *task_worker(void *pargsv) +{ + struct task_queue *q = pargsv; + + do { + + void *task; + + /* Get a task */ + pthread_mutex_lock(&q->lock); + if ( q->n_started >= q->max ) { + pthread_mutex_unlock(&q->lock); + break; + } + task = q->get_task(q->queue_args); + + /* No more tasks? */ + if ( task == NULL ) { + pthread_mutex_unlock(&q->lock); + break; + } + + q->n_started++; + pthread_mutex_unlock(&q->lock); + + q->work(task); + + /* Update totals etc */ + pthread_mutex_lock(&q->lock); + q->n_completed++; + pthread_mutex_unlock(&q->lock); + + } while ( 1 ); + + return NULL; +} + + +int run_threads(int n_threads, void (*work)(void *), + void *(*get_task)(void *), void *queue_args, int max) +{ + pthread_t *workers; + int i; + struct task_queue q; + + workers = malloc(n_threads * sizeof(pthread_t)); + + pthread_mutex_init(&q.lock, NULL); + q.work = work; + q.get_task = get_task; + q.queue_args = queue_args; + q.n_started = 0; + q.n_completed = 0; + q.max = max; + + /* Start threads */ + for ( i=0; i<n_threads; i++ ) { + + if ( pthread_create(&workers[i], NULL, task_worker, &q) ) { + ERROR("Couldn't start thread %i\n", i); + n_threads = i; + break; + } + + } + + /* Join threads */ + for ( i=0; i<n_threads; i++ ) { + pthread_join(workers[i], NULL); + } + + free(workers); + + return q.n_completed; +} |