aboutsummaryrefslogtreecommitdiff
path: root/kernel
diff options
context:
space:
mode:
authorDavid Howells <dhowells@redhat.com>2009-11-19 18:10:23 +0000
committerDavid Howells <dhowells@redhat.com>2009-11-19 18:10:23 +0000
commit3d7a641e544e428191667e8b1f83f96fa46dbd65 (patch)
tree172aa672eca96b94f5531885b82abb82b43c7d8a /kernel
parent66b00a7c93ec782d118d2c03bd599cfd041e80a1 (diff)
SLOW_WORK: Wait for outstanding work items belonging to a module to clear
Wait for outstanding slow work items belonging to a module to clear when unregistering that module as a user of the facility. This prevents the put_ref code of a work item from being taken away before it returns. Signed-off-by: David Howells <dhowells@redhat.com>
Diffstat (limited to 'kernel')
-rw-r--r--kernel/slow-work.c132
1 files changed, 126 insertions, 6 deletions
diff --git a/kernel/slow-work.c b/kernel/slow-work.c
index 0d31135efbf..dd08f376e40 100644
--- a/kernel/slow-work.c
+++ b/kernel/slow-work.c
@@ -22,6 +22,8 @@
#define SLOW_WORK_OOM_TIMEOUT (5 * HZ) /* can't start new threads for 5s after
* OOM */
+#define SLOW_WORK_THREAD_LIMIT 255 /* abs maximum number of slow-work threads */
+
static void slow_work_cull_timeout(unsigned long);
static void slow_work_oom_timeout(unsigned long);
@@ -46,7 +48,7 @@ static unsigned vslow_work_proportion = 50; /* % of threads that may process
#ifdef CONFIG_SYSCTL
static const int slow_work_min_min_threads = 2;
-static int slow_work_max_max_threads = 255;
+static int slow_work_max_max_threads = SLOW_WORK_THREAD_LIMIT;
static const int slow_work_min_vslow = 1;
static const int slow_work_max_vslow = 99;
@@ -98,6 +100,23 @@ static DEFINE_TIMER(slow_work_oom_timer, slow_work_oom_timeout, 0, 0);
static struct slow_work slow_work_new_thread; /* new thread starter */
/*
+ * slow work ID allocation (use slow_work_queue_lock)
+ */
+static DECLARE_BITMAP(slow_work_ids, SLOW_WORK_THREAD_LIMIT);
+
+/*
+ * Unregistration tracking to prevent put_ref() from disappearing during module
+ * unload
+ */
+#ifdef CONFIG_MODULES
+static struct module *slow_work_thread_processing[SLOW_WORK_THREAD_LIMIT];
+static struct module *slow_work_unreg_module;
+static struct slow_work *slow_work_unreg_work_item;
+static DECLARE_WAIT_QUEUE_HEAD(slow_work_unreg_wq);
+static DEFINE_MUTEX(slow_work_unreg_sync_lock);
+#endif
+
+/*
* The queues of work items and the lock governing access to them. These are
* shared between all the CPUs. It doesn't make sense to have per-CPU queues
* as the number of threads bears no relation to the number of CPUs.
@@ -149,8 +168,11 @@ static unsigned slow_work_calc_vsmax(void)
* Attempt to execute stuff queued on a slow thread. Return true if we managed
* it, false if there was nothing to do.
*/
-static bool slow_work_execute(void)
+static bool slow_work_execute(int id)
{
+#ifdef CONFIG_MODULES
+ struct module *module;
+#endif
struct slow_work *work = NULL;
unsigned vsmax;
bool very_slow;
@@ -186,6 +208,12 @@ static bool slow_work_execute(void)
} else {
very_slow = false; /* avoid the compiler warning */
}
+
+#ifdef CONFIG_MODULES
+ if (work)
+ slow_work_thread_processing[id] = work->owner;
+#endif
+
spin_unlock_irq(&slow_work_queue_lock);
if (!work)
@@ -219,7 +247,18 @@ static bool slow_work_execute(void)
spin_unlock_irq(&slow_work_queue_lock);
}
+ /* sort out the race between module unloading and put_ref() */
work->ops->put_ref(work);
+
+#ifdef CONFIG_MODULES
+ module = slow_work_thread_processing[id];
+ slow_work_thread_processing[id] = NULL;
+ smp_mb();
+ if (slow_work_unreg_work_item == work ||
+ slow_work_unreg_module == module)
+ wake_up_all(&slow_work_unreg_wq);
+#endif
+
return true;
auto_requeue:
@@ -232,6 +271,7 @@ auto_requeue:
else
list_add_tail(&work->link, &slow_work_queue);
spin_unlock_irq(&slow_work_queue_lock);
+ slow_work_thread_processing[id] = NULL;
return true;
}
@@ -368,13 +408,22 @@ static inline bool slow_work_available(int vsmax)
*/
static int slow_work_thread(void *_data)
{
- int vsmax;
+ int vsmax, id;
DEFINE_WAIT(wait);
set_freezable();
set_user_nice(current, -5);
+ /* allocate ourselves an ID */
+ spin_lock_irq(&slow_work_queue_lock);
+ id = find_first_zero_bit(slow_work_ids, SLOW_WORK_THREAD_LIMIT);
+ BUG_ON(id < 0 || id >= SLOW_WORK_THREAD_LIMIT);
+ __set_bit(id, slow_work_ids);
+ spin_unlock_irq(&slow_work_queue_lock);
+
+ sprintf(current->comm, "kslowd%03u", id);
+
for (;;) {
vsmax = vslow_work_proportion;
vsmax *= atomic_read(&slow_work_thread_count);
@@ -395,7 +444,7 @@ static int slow_work_thread(void *_data)
vsmax *= atomic_read(&slow_work_thread_count);
vsmax /= 100;
- if (slow_work_available(vsmax) && slow_work_execute()) {
+ if (slow_work_available(vsmax) && slow_work_execute(id)) {
cond_resched();
if (list_empty(&slow_work_queue) &&
list_empty(&vslow_work_queue) &&
@@ -412,6 +461,10 @@ static int slow_work_thread(void *_data)
break;
}
+ spin_lock_irq(&slow_work_queue_lock);
+ __clear_bit(id, slow_work_ids);
+ spin_unlock_irq(&slow_work_queue_lock);
+
if (atomic_dec_and_test(&slow_work_thread_count))
complete_and_exit(&slow_work_last_thread_exited, 0);
return 0;
@@ -475,6 +528,7 @@ static void slow_work_new_thread_execute(struct slow_work *work)
}
static const struct slow_work_ops slow_work_new_thread_ops = {
+ .owner = THIS_MODULE,
.get_ref = slow_work_new_thread_get_ref,
.put_ref = slow_work_new_thread_put_ref,
.execute = slow_work_new_thread_execute,
@@ -546,12 +600,13 @@ static int slow_work_max_threads_sysctl(struct ctl_table *table, int write,
/**
* slow_work_register_user - Register a user of the facility
+ * @module: The module about to make use of the facility
*
* Register a user of the facility, starting up the initial threads if there
* aren't any other users at this point. This will return 0 if successful, or
* an error if not.
*/
-int slow_work_register_user(void)
+int slow_work_register_user(struct module *module)
{
struct task_struct *p;
int loop;
@@ -598,14 +653,79 @@ error:
}
EXPORT_SYMBOL(slow_work_register_user);
+/*
+ * wait for all outstanding items from the calling module to complete
+ * - note that more items may be queued whilst we're waiting
+ */
+static void slow_work_wait_for_items(struct module *module)
+{
+ DECLARE_WAITQUEUE(myself, current);
+ struct slow_work *work;
+ int loop;
+
+ mutex_lock(&slow_work_unreg_sync_lock);
+ add_wait_queue(&slow_work_unreg_wq, &myself);
+
+ for (;;) {
+ spin_lock_irq(&slow_work_queue_lock);
+
+ /* first of all, we wait for the last queued item in each list
+ * to be processed */
+ list_for_each_entry_reverse(work, &vslow_work_queue, link) {
+ if (work->owner == module) {
+ set_current_state(TASK_UNINTERRUPTIBLE);
+ slow_work_unreg_work_item = work;
+ goto do_wait;
+ }
+ }
+ list_for_each_entry_reverse(work, &slow_work_queue, link) {
+ if (work->owner == module) {
+ set_current_state(TASK_UNINTERRUPTIBLE);
+ slow_work_unreg_work_item = work;
+ goto do_wait;
+ }
+ }
+
+ /* then we wait for the items being processed to finish */
+ slow_work_unreg_module = module;
+ smp_mb();
+ for (loop = 0; loop < SLOW_WORK_THREAD_LIMIT; loop++) {
+ if (slow_work_thread_processing[loop] == module)
+ goto do_wait;
+ }
+ spin_unlock_irq(&slow_work_queue_lock);
+ break; /* okay, we're done */
+
+ do_wait:
+ spin_unlock_irq(&slow_work_queue_lock);
+ schedule();
+ slow_work_unreg_work_item = NULL;
+ slow_work_unreg_module = NULL;
+ }
+
+ remove_wait_queue(&slow_work_unreg_wq, &myself);
+ mutex_unlock(&slow_work_unreg_sync_lock);
+}
+
/**
* slow_work_unregister_user - Unregister a user of the facility
+ * @module: The module whose items should be cleared
*
* Unregister a user of the facility, killing all the threads if this was the
* last one.
+ *
+ * This waits for all the work items belonging to the nominated module to go
+ * away before proceeding.
*/
-void slow_work_unregister_user(void)
+void slow_work_unregister_user(struct module *module)
{
+ /* first of all, wait for all outstanding items from the calling module
+ * to complete */
+ if (module)
+ slow_work_wait_for_items(module);
+
+ /* then we can actually go about shutting down the facility if need
+ * be */
mutex_lock(&slow_work_user_lock);
BUG_ON(slow_work_user_count <= 0);