aboutsummaryrefslogtreecommitdiff
path: root/src/thread-pool.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/thread-pool.c')
-rw-r--r--src/thread-pool.c109
1 files changed, 102 insertions, 7 deletions
diff --git a/src/thread-pool.c b/src/thread-pool.c
index ac508b11..358fb4f3 100644
--- a/src/thread-pool.c
+++ b/src/thread-pool.c
@@ -25,6 +25,8 @@
#include "utils.h"
+/* ---------------------------------- Range --------------------------------- */
+
enum {
TASK_READY,
TASK_RUNNING,
@@ -32,7 +34,7 @@ enum {
};
-struct task_queue
+struct task_queue_range
{
pthread_mutex_t lock;
@@ -47,9 +49,9 @@ struct task_queue
};
-static void *worker_thread(void *pargsv)
+static void *range_worker(void *pargsv)
{
- struct task_queue *q = pargsv;
+ struct task_queue_range *q = pargsv;
do {
@@ -89,12 +91,12 @@ static void *worker_thread(void *pargsv)
}
-void munch_threads(int n_tasks, int n_threads, const char *text,
- void (*work)(int, void *), void *work_args)
+void run_thread_range(int n_tasks, int n_threads, const char *text,
+ void (*work)(int, void *), void *work_args)
{
pthread_t *workers;
int i;
- struct task_queue q;
+ struct task_queue_range q;
/* The nation of CrystFEL prides itself on having 0% unemployment. */
if ( n_threads > n_tasks ) n_threads = n_tasks;
@@ -116,7 +118,7 @@ void munch_threads(int n_tasks, int n_threads, const char *text,
/* Start threads */
for ( i=0; i<n_threads; i++ ) {
- if ( pthread_create(&workers[i], NULL, worker_thread, &q) ) {
+ if ( pthread_create(&workers[i], NULL, range_worker, &q) ) {
ERROR("Couldn't start thread %i\n", i);
n_threads = i;
break;
@@ -132,3 +134,96 @@ void munch_threads(int n_tasks, int n_threads, const char *text,
free(q.status);
free(workers);
}
+
+
+/* ---------------------------- Custom get_task() --------------------------- */
+
+struct task_queue
+{
+ pthread_mutex_t lock;
+
+ int n_started;
+ int n_completed;
+ int max;
+
+ void *(*get_task)(void *);
+ void *queue_args;
+ void (*work)(void *);
+};
+
+
+static void *task_worker(void *pargsv)
+{
+ struct task_queue *q = pargsv;
+
+ do {
+
+ void *task;
+
+ /* Get a task */
+ pthread_mutex_lock(&q->lock);
+ if ( q->n_started >= q->max ) {
+ pthread_mutex_unlock(&q->lock);
+ break;
+ }
+ task = q->get_task(q->queue_args);
+
+ /* No more tasks? */
+ if ( task == NULL ) {
+ pthread_mutex_unlock(&q->lock);
+ break;
+ }
+
+ q->n_started++;
+ pthread_mutex_unlock(&q->lock);
+
+ q->work(task);
+
+ /* Update totals etc */
+ pthread_mutex_lock(&q->lock);
+ q->n_completed++;
+ pthread_mutex_unlock(&q->lock);
+
+ } while ( 1 );
+
+ return NULL;
+}
+
+
+int run_threads(int n_threads, void (*work)(void *),
+ void *(*get_task)(void *), void *queue_args, int max)
+{
+ pthread_t *workers;
+ int i;
+ struct task_queue q;
+
+ workers = malloc(n_threads * sizeof(pthread_t));
+
+ pthread_mutex_init(&q.lock, NULL);
+ q.work = work;
+ q.get_task = get_task;
+ q.queue_args = queue_args;
+ q.n_started = 0;
+ q.n_completed = 0;
+ q.max = max;
+
+ /* Start threads */
+ for ( i=0; i<n_threads; i++ ) {
+
+ if ( pthread_create(&workers[i], NULL, task_worker, &q) ) {
+ ERROR("Couldn't start thread %i\n", i);
+ n_threads = i;
+ break;
+ }
+
+ }
+
+ /* Join threads */
+ for ( i=0; i<n_threads; i++ ) {
+ pthread_join(workers[i], NULL);
+ }
+
+ free(workers);
+
+ return q.n_completed;
+}