From 2205363dce7447b8e85f1ead14387664c1a98753 Mon Sep 17 00:00:00 2001 From: Jan Kara Date: Mon, 20 Oct 2008 23:50:38 +0200 Subject: ocfs2: Implement quota recovery Implement functions for recovery after a crash. Functions just read local quota file and sync info to global quota file. Signed-off-by: Jan Kara Signed-off-by: Mark Fasheh --- fs/ocfs2/journal.c | 108 ++++++++++++++++++++++++++++++++++++++++++----------- 1 file changed, 86 insertions(+), 22 deletions(-) (limited to 'fs/ocfs2/journal.c') diff --git a/fs/ocfs2/journal.c b/fs/ocfs2/journal.c index 11a1178d5ee..c60242018d9 100644 --- a/fs/ocfs2/journal.c +++ b/fs/ocfs2/journal.c @@ -45,6 +45,7 @@ #include "slot_map.h" #include "super.h" #include "sysfile.h" +#include "quota.h" #include "buffer_head_io.h" @@ -52,7 +53,7 @@ DEFINE_SPINLOCK(trans_inc_lock); static int ocfs2_force_read_journal(struct inode *inode); static int ocfs2_recover_node(struct ocfs2_super *osb, - int node_num); + int node_num, int slot_num); static int __ocfs2_recovery_thread(void *arg); static int ocfs2_commit_cache(struct ocfs2_super *osb); static int ocfs2_wait_on_mount(struct ocfs2_super *osb); @@ -857,6 +858,7 @@ struct ocfs2_la_recovery_item { int lri_slot; struct ocfs2_dinode *lri_la_dinode; struct ocfs2_dinode *lri_tl_dinode; + struct ocfs2_quota_recovery *lri_qrec; }; /* Does the second half of the recovery process. By this point, the @@ -877,6 +879,7 @@ void ocfs2_complete_recovery(struct work_struct *work) struct ocfs2_super *osb = journal->j_osb; struct ocfs2_dinode *la_dinode, *tl_dinode; struct ocfs2_la_recovery_item *item, *n; + struct ocfs2_quota_recovery *qrec; LIST_HEAD(tmp_la_list); mlog_entry_void(); @@ -922,6 +925,16 @@ void ocfs2_complete_recovery(struct work_struct *work) if (ret < 0) mlog_errno(ret); + qrec = item->lri_qrec; + if (qrec) { + mlog(0, "Recovering quota files"); + ret = ocfs2_finish_quota_recovery(osb, qrec, + item->lri_slot); + if (ret < 0) + mlog_errno(ret); + /* Recovery info is already freed now */ + } + kfree(item); } @@ -935,7 +948,8 @@ void ocfs2_complete_recovery(struct work_struct *work) static void ocfs2_queue_recovery_completion(struct ocfs2_journal *journal, int slot_num, struct ocfs2_dinode *la_dinode, - struct ocfs2_dinode *tl_dinode) + struct ocfs2_dinode *tl_dinode, + struct ocfs2_quota_recovery *qrec) { struct ocfs2_la_recovery_item *item; @@ -950,6 +964,9 @@ static void ocfs2_queue_recovery_completion(struct ocfs2_journal *journal, if (tl_dinode) kfree(tl_dinode); + if (qrec) + ocfs2_free_quota_recovery(qrec); + mlog_errno(-ENOMEM); return; } @@ -958,6 +975,7 @@ static void ocfs2_queue_recovery_completion(struct ocfs2_journal *journal, item->lri_la_dinode = la_dinode; item->lri_slot = slot_num; item->lri_tl_dinode = tl_dinode; + item->lri_qrec = qrec; spin_lock(&journal->j_lock); list_add_tail(&item->lri_list, &journal->j_la_cleanups); @@ -977,6 +995,7 @@ void ocfs2_complete_mount_recovery(struct ocfs2_super *osb) ocfs2_queue_recovery_completion(journal, osb->slot_num, osb->local_alloc_copy, + NULL, NULL); ocfs2_schedule_truncate_log_flush(osb, 0); @@ -985,11 +1004,26 @@ void ocfs2_complete_mount_recovery(struct ocfs2_super *osb) } } +void ocfs2_complete_quota_recovery(struct ocfs2_super *osb) +{ + if (osb->quota_rec) { + ocfs2_queue_recovery_completion(osb->journal, + osb->slot_num, + NULL, + NULL, + osb->quota_rec); + osb->quota_rec = NULL; + } +} + static int __ocfs2_recovery_thread(void *arg) { - int status, node_num; + int status, node_num, slot_num; struct ocfs2_super *osb = arg; struct ocfs2_recovery_map *rm = osb->recovery_map; + int *rm_quota = NULL; + int rm_quota_used = 0, i; + struct ocfs2_quota_recovery *qrec; mlog_entry_void(); @@ -998,6 +1032,11 @@ static int __ocfs2_recovery_thread(void *arg) goto bail; } + rm_quota = kzalloc(osb->max_slots * sizeof(int), GFP_NOFS); + if (!rm_quota) { + status = -ENOMEM; + goto bail; + } restart: status = ocfs2_super_lock(osb, 1); if (status < 0) { @@ -1011,8 +1050,28 @@ restart: * clear it until ocfs2_recover_node() has succeeded. */ node_num = rm->rm_entries[0]; spin_unlock(&osb->osb_lock); - - status = ocfs2_recover_node(osb, node_num); + mlog(0, "checking node %d\n", node_num); + slot_num = ocfs2_node_num_to_slot(osb, node_num); + if (slot_num == -ENOENT) { + status = 0; + mlog(0, "no slot for this node, so no recovery" + "required.\n"); + goto skip_recovery; + } + mlog(0, "node %d was using slot %d\n", node_num, slot_num); + + /* It is a bit subtle with quota recovery. We cannot do it + * immediately because we have to obtain cluster locks from + * quota files and we also don't want to just skip it because + * then quota usage would be out of sync until some node takes + * the slot. So we remember which nodes need quota recovery + * and when everything else is done, we recover quotas. */ + for (i = 0; i < rm_quota_used && rm_quota[i] != slot_num; i++); + if (i == rm_quota_used) + rm_quota[rm_quota_used++] = slot_num; + + status = ocfs2_recover_node(osb, node_num, slot_num); +skip_recovery: if (!status) { ocfs2_recovery_map_clear(osb, node_num); } else { @@ -1034,13 +1093,27 @@ restart: if (status < 0) mlog_errno(status); + /* Now it is right time to recover quotas... We have to do this under + * superblock lock so that noone can start using the slot (and crash) + * before we recover it */ + for (i = 0; i < rm_quota_used; i++) { + qrec = ocfs2_begin_quota_recovery(osb, rm_quota[i]); + if (IS_ERR(qrec)) { + status = PTR_ERR(qrec); + mlog_errno(status); + continue; + } + ocfs2_queue_recovery_completion(osb->journal, rm_quota[i], + NULL, NULL, qrec); + } + ocfs2_super_unlock(osb, 1); /* We always run recovery on our own orphan dir - the dead * node(s) may have disallowd a previos inode delete. Re-processing * is therefore required. */ ocfs2_queue_recovery_completion(osb->journal, osb->slot_num, NULL, - NULL); + NULL, NULL); bail: mutex_lock(&osb->recovery_lock); @@ -1055,6 +1128,9 @@ bail: mutex_unlock(&osb->recovery_lock); + if (rm_quota) + kfree(rm_quota); + mlog_exit(status); /* no one is callint kthread_stop() for us so the kthread() api * requires that we call do_exit(). And it isn't exported, but @@ -1282,31 +1358,19 @@ done: * far less concerning. */ static int ocfs2_recover_node(struct ocfs2_super *osb, - int node_num) + int node_num, int slot_num) { int status = 0; - int slot_num; struct ocfs2_dinode *la_copy = NULL; struct ocfs2_dinode *tl_copy = NULL; - mlog_entry("(node_num=%d, osb->node_num = %d)\n", - node_num, osb->node_num); - - mlog(0, "checking node %d\n", node_num); + mlog_entry("(node_num=%d, slot_num=%d, osb->node_num = %d)\n", + node_num, slot_num, osb->node_num); /* Should not ever be called to recover ourselves -- in that * case we should've called ocfs2_journal_load instead. */ BUG_ON(osb->node_num == node_num); - slot_num = ocfs2_node_num_to_slot(osb, node_num); - if (slot_num == -ENOENT) { - status = 0; - mlog(0, "no slot for this node, so no recovery required.\n"); - goto done; - } - - mlog(0, "node %d was using slot %d\n", node_num, slot_num); - status = ocfs2_replay_journal(osb, node_num, slot_num); if (status < 0) { if (status == -EBUSY) { @@ -1342,7 +1406,7 @@ static int ocfs2_recover_node(struct ocfs2_super *osb, /* This will kfree the memory pointed to by la_copy and tl_copy */ ocfs2_queue_recovery_completion(osb->journal, slot_num, la_copy, - tl_copy); + tl_copy, NULL); status = 0; done: -- cgit v1.2.3