aboutsummaryrefslogtreecommitdiff
path: root/src/thread-pool.c
blob: b27c0441670703705fba4f2c5420e67b00a821c3 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
/*
 * thread-pool.c
 *
 * A thread pool implementation
 *
 * (c) 2006-2010 Thomas White <taw@physics.org>
 *
 * Part of CrystFEL - crystallography with a FEL
 *
 */


#ifdef HAVE_CONFIG_H
#include <config.h>
#endif


#include <stdarg.h>
#include <stdlib.h>
#include <stdio.h>
#include <string.h>
#include <unistd.h>
#include <pthread.h>

#include "utils.h"


enum {
	TASK_READY,
	TASK_RUNNING,
	TASK_FINISHED,
};


struct task_queue
{
	pthread_mutex_t  lock;

	int              n_tasks;
	int             *status;
	int              n_done;

	void (*work)(int, void *);
	void *work_args;

	const char      *text;
};


static void *worker_thread(void *pargsv)
{
	struct task_queue *q = pargsv;

	do {

		int i;
		int found = 0;
		int mytask = -1;

		/* Get a task */
		pthread_mutex_lock(&q->lock);
		for ( i=0; i<q->n_tasks; i++ ) {
			if ( q->status[i] == TASK_READY ) {
				mytask = i;
				found = 1;
				q->status[i] = TASK_RUNNING;
				break;
			}
		}
		pthread_mutex_unlock(&q->lock);

		/* No more tasks? */
		if ( !found ) break;

		q->work(mytask, q->work_args);

		/* Mark this task as done, update totals etc */
		pthread_mutex_lock(&q->lock);
		q->status[mytask] = TASK_FINISHED;
		q->n_done++;
		progress_bar(q->n_done, q->n_tasks, q->text);
		pthread_mutex_unlock(&q->lock);

	} while ( 1 );

	return NULL;
}


void munch_threads(int n_tasks, int n_threads, const char *text,
                   void (*work)(int, void *), void *work_args)
{
	pthread_t *workers;
	int i;
	struct task_queue q;

	/* The nation of CrystFEL prides itself on having 0% unemployment. */
	if ( n_threads > n_tasks ) n_threads = n_tasks;

	workers = malloc(n_threads * sizeof(pthread_t));

	q.status = malloc(n_tasks * sizeof(int));
	pthread_mutex_init(&q.lock, NULL);
	q.n_tasks = n_tasks;
	q.work = work;
	q.work_args = work_args;
	q.n_done = 0;
	q.text = text;

	for ( i=0; i<n_tasks; i++ ) {
		q.status[i] = TASK_READY;
	}

	/* Start threads */
	for ( i=0; i<n_threads; i++ ) {

		if ( pthread_create(&workers[i], NULL, worker_thread, &q) ) {
			ERROR("Couldn't start thread %i\n", i);
			n_threads = i;
			break;
		}

	}

	/* Join threads */
	for ( i=0; i<n_threads; i++ ) {
		pthread_join(workers[i], NULL);
	}

	free(q.status);
	free(workers);
}