]> asedeno.scripts.mit.edu Git - linux.git/blob - fs/ocfs2/dlmglue.c
Merge branch 'irq-urgent-for-linus' of git://git.kernel.org/pub/scm/linux/kernel...
[linux.git] / fs / ocfs2 / dlmglue.c
1 /* -*- mode: c; c-basic-offset: 8; -*-
2  * vim: noexpandtab sw=8 ts=8 sts=0:
3  *
4  * dlmglue.c
5  *
6  * Code which implements an OCFS2 specific interface to our DLM.
7  *
8  * Copyright (C) 2003, 2004 Oracle.  All rights reserved.
9  *
10  * This program is free software; you can redistribute it and/or
11  * modify it under the terms of the GNU General Public
12  * License as published by the Free Software Foundation; either
13  * version 2 of the License, or (at your option) any later version.
14  *
15  * This program is distributed in the hope that it will be useful,
16  * but WITHOUT ANY WARRANTY; without even the implied warranty of
17  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
18  * General Public License for more details.
19  *
20  * You should have received a copy of the GNU General Public
21  * License along with this program; if not, write to the
22  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
23  * Boston, MA 021110-1307, USA.
24  */
25
26 #include <linux/types.h>
27 #include <linux/slab.h>
28 #include <linux/highmem.h>
29 #include <linux/mm.h>
30 #include <linux/kthread.h>
31 #include <linux/pagemap.h>
32 #include <linux/debugfs.h>
33 #include <linux/seq_file.h>
34 #include <linux/time.h>
35 #include <linux/quotaops.h>
36 #include <linux/sched/signal.h>
37
38 #define MLOG_MASK_PREFIX ML_DLM_GLUE
39 #include <cluster/masklog.h>
40
41 #include "ocfs2.h"
42 #include "ocfs2_lockingver.h"
43
44 #include "alloc.h"
45 #include "dcache.h"
46 #include "dlmglue.h"
47 #include "extent_map.h"
48 #include "file.h"
49 #include "heartbeat.h"
50 #include "inode.h"
51 #include "journal.h"
52 #include "stackglue.h"
53 #include "slot_map.h"
54 #include "super.h"
55 #include "uptodate.h"
56 #include "quota.h"
57 #include "refcounttree.h"
58 #include "acl.h"
59
60 #include "buffer_head_io.h"
61
62 struct ocfs2_mask_waiter {
63         struct list_head        mw_item;
64         int                     mw_status;
65         struct completion       mw_complete;
66         unsigned long           mw_mask;
67         unsigned long           mw_goal;
68 #ifdef CONFIG_OCFS2_FS_STATS
69         ktime_t                 mw_lock_start;
70 #endif
71 };
72
73 static struct ocfs2_super *ocfs2_get_dentry_osb(struct ocfs2_lock_res *lockres);
74 static struct ocfs2_super *ocfs2_get_inode_osb(struct ocfs2_lock_res *lockres);
75 static struct ocfs2_super *ocfs2_get_file_osb(struct ocfs2_lock_res *lockres);
76 static struct ocfs2_super *ocfs2_get_qinfo_osb(struct ocfs2_lock_res *lockres);
77
78 /*
79  * Return value from ->downconvert_worker functions.
80  *
81  * These control the precise actions of ocfs2_unblock_lock()
82  * and ocfs2_process_blocked_lock()
83  *
84  */
85 enum ocfs2_unblock_action {
86         UNBLOCK_CONTINUE        = 0, /* Continue downconvert */
87         UNBLOCK_CONTINUE_POST   = 1, /* Continue downconvert, fire
88                                       * ->post_unlock callback */
89         UNBLOCK_STOP_POST       = 2, /* Do not downconvert, fire
90                                       * ->post_unlock() callback. */
91 };
92
93 struct ocfs2_unblock_ctl {
94         int requeue;
95         enum ocfs2_unblock_action unblock_action;
96 };
97
98 /* Lockdep class keys */
99 #ifdef CONFIG_DEBUG_LOCK_ALLOC
100 static struct lock_class_key lockdep_keys[OCFS2_NUM_LOCK_TYPES];
101 #endif
102
103 static int ocfs2_check_meta_downconvert(struct ocfs2_lock_res *lockres,
104                                         int new_level);
105 static void ocfs2_set_meta_lvb(struct ocfs2_lock_res *lockres);
106
107 static int ocfs2_data_convert_worker(struct ocfs2_lock_res *lockres,
108                                      int blocking);
109
110 static int ocfs2_dentry_convert_worker(struct ocfs2_lock_res *lockres,
111                                        int blocking);
112
113 static void ocfs2_dentry_post_unlock(struct ocfs2_super *osb,
114                                      struct ocfs2_lock_res *lockres);
115
116 static void ocfs2_set_qinfo_lvb(struct ocfs2_lock_res *lockres);
117
118 static int ocfs2_check_refcount_downconvert(struct ocfs2_lock_res *lockres,
119                                             int new_level);
120 static int ocfs2_refcount_convert_worker(struct ocfs2_lock_res *lockres,
121                                          int blocking);
122
123 #define mlog_meta_lvb(__level, __lockres) ocfs2_dump_meta_lvb_info(__level, __PRETTY_FUNCTION__, __LINE__, __lockres)
124
125 /* This aids in debugging situations where a bad LVB might be involved. */
126 static void ocfs2_dump_meta_lvb_info(u64 level,
127                                      const char *function,
128                                      unsigned int line,
129                                      struct ocfs2_lock_res *lockres)
130 {
131         struct ocfs2_meta_lvb *lvb = ocfs2_dlm_lvb(&lockres->l_lksb);
132
133         mlog(level, "LVB information for %s (called from %s:%u):\n",
134              lockres->l_name, function, line);
135         mlog(level, "version: %u, clusters: %u, generation: 0x%x\n",
136              lvb->lvb_version, be32_to_cpu(lvb->lvb_iclusters),
137              be32_to_cpu(lvb->lvb_igeneration));
138         mlog(level, "size: %llu, uid %u, gid %u, mode 0x%x\n",
139              (unsigned long long)be64_to_cpu(lvb->lvb_isize),
140              be32_to_cpu(lvb->lvb_iuid), be32_to_cpu(lvb->lvb_igid),
141              be16_to_cpu(lvb->lvb_imode));
142         mlog(level, "nlink %u, atime_packed 0x%llx, ctime_packed 0x%llx, "
143              "mtime_packed 0x%llx iattr 0x%x\n", be16_to_cpu(lvb->lvb_inlink),
144              (long long)be64_to_cpu(lvb->lvb_iatime_packed),
145              (long long)be64_to_cpu(lvb->lvb_ictime_packed),
146              (long long)be64_to_cpu(lvb->lvb_imtime_packed),
147              be32_to_cpu(lvb->lvb_iattr));
148 }
149
150
151 /*
152  * OCFS2 Lock Resource Operations
153  *
154  * These fine tune the behavior of the generic dlmglue locking infrastructure.
155  *
156  * The most basic of lock types can point ->l_priv to their respective
157  * struct ocfs2_super and allow the default actions to manage things.
158  *
159  * Right now, each lock type also needs to implement an init function,
160  * and trivial lock/unlock wrappers. ocfs2_simple_drop_lockres()
161  * should be called when the lock is no longer needed (i.e., object
162  * destruction time).
163  */
164 struct ocfs2_lock_res_ops {
165         /*
166          * Translate an ocfs2_lock_res * into an ocfs2_super *. Define
167          * this callback if ->l_priv is not an ocfs2_super pointer
168          */
169         struct ocfs2_super * (*get_osb)(struct ocfs2_lock_res *);
170
171         /*
172          * Optionally called in the downconvert thread after a
173          * successful downconvert. The lockres will not be referenced
174          * after this callback is called, so it is safe to free
175          * memory, etc.
176          *
177          * The exact semantics of when this is called are controlled
178          * by ->downconvert_worker()
179          */
180         void (*post_unlock)(struct ocfs2_super *, struct ocfs2_lock_res *);
181
182         /*
183          * Allow a lock type to add checks to determine whether it is
184          * safe to downconvert a lock. Return 0 to re-queue the
185          * downconvert at a later time, nonzero to continue.
186          *
187          * For most locks, the default checks that there are no
188          * incompatible holders are sufficient.
189          *
190          * Called with the lockres spinlock held.
191          */
192         int (*check_downconvert)(struct ocfs2_lock_res *, int);
193
194         /*
195          * Allows a lock type to populate the lock value block. This
196          * is called on downconvert, and when we drop a lock.
197          *
198          * Locks that want to use this should set LOCK_TYPE_USES_LVB
199          * in the flags field.
200          *
201          * Called with the lockres spinlock held.
202          */
203         void (*set_lvb)(struct ocfs2_lock_res *);
204
205         /*
206          * Called from the downconvert thread when it is determined
207          * that a lock will be downconverted. This is called without
208          * any locks held so the function can do work that might
209          * schedule (syncing out data, etc).
210          *
211          * This should return any one of the ocfs2_unblock_action
212          * values, depending on what it wants the thread to do.
213          */
214         int (*downconvert_worker)(struct ocfs2_lock_res *, int);
215
216         /*
217          * LOCK_TYPE_* flags which describe the specific requirements
218          * of a lock type. Descriptions of each individual flag follow.
219          */
220         int flags;
221 };
222
223 /*
224  * Some locks want to "refresh" potentially stale data when a
225  * meaningful (PRMODE or EXMODE) lock level is first obtained. If this
226  * flag is set, the OCFS2_LOCK_NEEDS_REFRESH flag will be set on the
227  * individual lockres l_flags member from the ast function. It is
228  * expected that the locking wrapper will clear the
229  * OCFS2_LOCK_NEEDS_REFRESH flag when done.
230  */
231 #define LOCK_TYPE_REQUIRES_REFRESH 0x1
232
233 /*
234  * Indicate that a lock type makes use of the lock value block. The
235  * ->set_lvb lock type callback must be defined.
236  */
237 #define LOCK_TYPE_USES_LVB              0x2
238
239 static struct ocfs2_lock_res_ops ocfs2_inode_rw_lops = {
240         .get_osb        = ocfs2_get_inode_osb,
241         .flags          = 0,
242 };
243
244 static struct ocfs2_lock_res_ops ocfs2_inode_inode_lops = {
245         .get_osb        = ocfs2_get_inode_osb,
246         .check_downconvert = ocfs2_check_meta_downconvert,
247         .set_lvb        = ocfs2_set_meta_lvb,
248         .downconvert_worker = ocfs2_data_convert_worker,
249         .flags          = LOCK_TYPE_REQUIRES_REFRESH|LOCK_TYPE_USES_LVB,
250 };
251
252 static struct ocfs2_lock_res_ops ocfs2_super_lops = {
253         .flags          = LOCK_TYPE_REQUIRES_REFRESH,
254 };
255
256 static struct ocfs2_lock_res_ops ocfs2_rename_lops = {
257         .flags          = 0,
258 };
259
260 static struct ocfs2_lock_res_ops ocfs2_nfs_sync_lops = {
261         .flags          = 0,
262 };
263
264 static struct ocfs2_lock_res_ops ocfs2_trim_fs_lops = {
265         .flags          = LOCK_TYPE_REQUIRES_REFRESH|LOCK_TYPE_USES_LVB,
266 };
267
268 static struct ocfs2_lock_res_ops ocfs2_orphan_scan_lops = {
269         .flags          = LOCK_TYPE_REQUIRES_REFRESH|LOCK_TYPE_USES_LVB,
270 };
271
272 static struct ocfs2_lock_res_ops ocfs2_dentry_lops = {
273         .get_osb        = ocfs2_get_dentry_osb,
274         .post_unlock    = ocfs2_dentry_post_unlock,
275         .downconvert_worker = ocfs2_dentry_convert_worker,
276         .flags          = 0,
277 };
278
279 static struct ocfs2_lock_res_ops ocfs2_inode_open_lops = {
280         .get_osb        = ocfs2_get_inode_osb,
281         .flags          = 0,
282 };
283
284 static struct ocfs2_lock_res_ops ocfs2_flock_lops = {
285         .get_osb        = ocfs2_get_file_osb,
286         .flags          = 0,
287 };
288
289 static struct ocfs2_lock_res_ops ocfs2_qinfo_lops = {
290         .set_lvb        = ocfs2_set_qinfo_lvb,
291         .get_osb        = ocfs2_get_qinfo_osb,
292         .flags          = LOCK_TYPE_REQUIRES_REFRESH | LOCK_TYPE_USES_LVB,
293 };
294
295 static struct ocfs2_lock_res_ops ocfs2_refcount_block_lops = {
296         .check_downconvert = ocfs2_check_refcount_downconvert,
297         .downconvert_worker = ocfs2_refcount_convert_worker,
298         .flags          = 0,
299 };
300
301 static inline int ocfs2_is_inode_lock(struct ocfs2_lock_res *lockres)
302 {
303         return lockres->l_type == OCFS2_LOCK_TYPE_META ||
304                 lockres->l_type == OCFS2_LOCK_TYPE_RW ||
305                 lockres->l_type == OCFS2_LOCK_TYPE_OPEN;
306 }
307
308 static inline struct ocfs2_lock_res *ocfs2_lksb_to_lock_res(struct ocfs2_dlm_lksb *lksb)
309 {
310         return container_of(lksb, struct ocfs2_lock_res, l_lksb);
311 }
312
313 static inline struct inode *ocfs2_lock_res_inode(struct ocfs2_lock_res *lockres)
314 {
315         BUG_ON(!ocfs2_is_inode_lock(lockres));
316
317         return (struct inode *) lockres->l_priv;
318 }
319
320 static inline struct ocfs2_dentry_lock *ocfs2_lock_res_dl(struct ocfs2_lock_res *lockres)
321 {
322         BUG_ON(lockres->l_type != OCFS2_LOCK_TYPE_DENTRY);
323
324         return (struct ocfs2_dentry_lock *)lockres->l_priv;
325 }
326
327 static inline struct ocfs2_mem_dqinfo *ocfs2_lock_res_qinfo(struct ocfs2_lock_res *lockres)
328 {
329         BUG_ON(lockres->l_type != OCFS2_LOCK_TYPE_QINFO);
330
331         return (struct ocfs2_mem_dqinfo *)lockres->l_priv;
332 }
333
334 static inline struct ocfs2_refcount_tree *
335 ocfs2_lock_res_refcount_tree(struct ocfs2_lock_res *res)
336 {
337         return container_of(res, struct ocfs2_refcount_tree, rf_lockres);
338 }
339
340 static inline struct ocfs2_super *ocfs2_get_lockres_osb(struct ocfs2_lock_res *lockres)
341 {
342         if (lockres->l_ops->get_osb)
343                 return lockres->l_ops->get_osb(lockres);
344
345         return (struct ocfs2_super *)lockres->l_priv;
346 }
347
348 static int ocfs2_lock_create(struct ocfs2_super *osb,
349                              struct ocfs2_lock_res *lockres,
350                              int level,
351                              u32 dlm_flags);
352 static inline int ocfs2_may_continue_on_blocked_lock(struct ocfs2_lock_res *lockres,
353                                                      int wanted);
354 static void __ocfs2_cluster_unlock(struct ocfs2_super *osb,
355                                    struct ocfs2_lock_res *lockres,
356                                    int level, unsigned long caller_ip);
357 static inline void ocfs2_cluster_unlock(struct ocfs2_super *osb,
358                                         struct ocfs2_lock_res *lockres,
359                                         int level)
360 {
361         __ocfs2_cluster_unlock(osb, lockres, level, _RET_IP_);
362 }
363
364 static inline void ocfs2_generic_handle_downconvert_action(struct ocfs2_lock_res *lockres);
365 static inline void ocfs2_generic_handle_convert_action(struct ocfs2_lock_res *lockres);
366 static inline void ocfs2_generic_handle_attach_action(struct ocfs2_lock_res *lockres);
367 static int ocfs2_generic_handle_bast(struct ocfs2_lock_res *lockres, int level);
368 static void ocfs2_schedule_blocked_lock(struct ocfs2_super *osb,
369                                         struct ocfs2_lock_res *lockres);
370 static inline void ocfs2_recover_from_dlm_error(struct ocfs2_lock_res *lockres,
371                                                 int convert);
372 #define ocfs2_log_dlm_error(_func, _err, _lockres) do {                                 \
373         if ((_lockres)->l_type != OCFS2_LOCK_TYPE_DENTRY)                               \
374                 mlog(ML_ERROR, "DLM error %d while calling %s on resource %s\n",        \
375                      _err, _func, _lockres->l_name);                                    \
376         else                                                                            \
377                 mlog(ML_ERROR, "DLM error %d while calling %s on resource %.*s%08x\n",  \
378                      _err, _func, OCFS2_DENTRY_LOCK_INO_START - 1, (_lockres)->l_name,  \
379                      (unsigned int)ocfs2_get_dentry_lock_ino(_lockres));                \
380 } while (0)
381 static int ocfs2_downconvert_thread(void *arg);
382 static void ocfs2_downconvert_on_unlock(struct ocfs2_super *osb,
383                                         struct ocfs2_lock_res *lockres);
384 static int ocfs2_inode_lock_update(struct inode *inode,
385                                   struct buffer_head **bh);
386 static void ocfs2_drop_osb_locks(struct ocfs2_super *osb);
387 static inline int ocfs2_highest_compat_lock_level(int level);
388 static unsigned int ocfs2_prepare_downconvert(struct ocfs2_lock_res *lockres,
389                                               int new_level);
390 static int ocfs2_downconvert_lock(struct ocfs2_super *osb,
391                                   struct ocfs2_lock_res *lockres,
392                                   int new_level,
393                                   int lvb,
394                                   unsigned int generation);
395 static int ocfs2_prepare_cancel_convert(struct ocfs2_super *osb,
396                                         struct ocfs2_lock_res *lockres);
397 static int ocfs2_cancel_convert(struct ocfs2_super *osb,
398                                 struct ocfs2_lock_res *lockres);
399
400
401 static void ocfs2_build_lock_name(enum ocfs2_lock_type type,
402                                   u64 blkno,
403                                   u32 generation,
404                                   char *name)
405 {
406         int len;
407
408         BUG_ON(type >= OCFS2_NUM_LOCK_TYPES);
409
410         len = snprintf(name, OCFS2_LOCK_ID_MAX_LEN, "%c%s%016llx%08x",
411                        ocfs2_lock_type_char(type), OCFS2_LOCK_ID_PAD,
412                        (long long)blkno, generation);
413
414         BUG_ON(len != (OCFS2_LOCK_ID_MAX_LEN - 1));
415
416         mlog(0, "built lock resource with name: %s\n", name);
417 }
418
419 static DEFINE_SPINLOCK(ocfs2_dlm_tracking_lock);
420
421 static void ocfs2_add_lockres_tracking(struct ocfs2_lock_res *res,
422                                        struct ocfs2_dlm_debug *dlm_debug)
423 {
424         mlog(0, "Add tracking for lockres %s\n", res->l_name);
425
426         spin_lock(&ocfs2_dlm_tracking_lock);
427         list_add(&res->l_debug_list, &dlm_debug->d_lockres_tracking);
428         spin_unlock(&ocfs2_dlm_tracking_lock);
429 }
430
431 static void ocfs2_remove_lockres_tracking(struct ocfs2_lock_res *res)
432 {
433         spin_lock(&ocfs2_dlm_tracking_lock);
434         if (!list_empty(&res->l_debug_list))
435                 list_del_init(&res->l_debug_list);
436         spin_unlock(&ocfs2_dlm_tracking_lock);
437 }
438
439 #ifdef CONFIG_OCFS2_FS_STATS
440 static void ocfs2_init_lock_stats(struct ocfs2_lock_res *res)
441 {
442         res->l_lock_refresh = 0;
443         memset(&res->l_lock_prmode, 0, sizeof(struct ocfs2_lock_stats));
444         memset(&res->l_lock_exmode, 0, sizeof(struct ocfs2_lock_stats));
445 }
446
447 static void ocfs2_update_lock_stats(struct ocfs2_lock_res *res, int level,
448                                     struct ocfs2_mask_waiter *mw, int ret)
449 {
450         u32 usec;
451         ktime_t kt;
452         struct ocfs2_lock_stats *stats;
453
454         if (level == LKM_PRMODE)
455                 stats = &res->l_lock_prmode;
456         else if (level == LKM_EXMODE)
457                 stats = &res->l_lock_exmode;
458         else
459                 return;
460
461         kt = ktime_sub(ktime_get(), mw->mw_lock_start);
462         usec = ktime_to_us(kt);
463
464         stats->ls_gets++;
465         stats->ls_total += ktime_to_ns(kt);
466         /* overflow */
467         if (unlikely(stats->ls_gets == 0)) {
468                 stats->ls_gets++;
469                 stats->ls_total = ktime_to_ns(kt);
470         }
471
472         if (stats->ls_max < usec)
473                 stats->ls_max = usec;
474
475         if (ret)
476                 stats->ls_fail++;
477 }
478
479 static inline void ocfs2_track_lock_refresh(struct ocfs2_lock_res *lockres)
480 {
481         lockres->l_lock_refresh++;
482 }
483
484 static inline void ocfs2_init_start_time(struct ocfs2_mask_waiter *mw)
485 {
486         mw->mw_lock_start = ktime_get();
487 }
488 #else
489 static inline void ocfs2_init_lock_stats(struct ocfs2_lock_res *res)
490 {
491 }
492 static inline void ocfs2_update_lock_stats(struct ocfs2_lock_res *res,
493                            int level, struct ocfs2_mask_waiter *mw, int ret)
494 {
495 }
496 static inline void ocfs2_track_lock_refresh(struct ocfs2_lock_res *lockres)
497 {
498 }
499 static inline void ocfs2_init_start_time(struct ocfs2_mask_waiter *mw)
500 {
501 }
502 #endif
503
504 static void ocfs2_lock_res_init_common(struct ocfs2_super *osb,
505                                        struct ocfs2_lock_res *res,
506                                        enum ocfs2_lock_type type,
507                                        struct ocfs2_lock_res_ops *ops,
508                                        void *priv)
509 {
510         res->l_type          = type;
511         res->l_ops           = ops;
512         res->l_priv          = priv;
513
514         res->l_level         = DLM_LOCK_IV;
515         res->l_requested     = DLM_LOCK_IV;
516         res->l_blocking      = DLM_LOCK_IV;
517         res->l_action        = OCFS2_AST_INVALID;
518         res->l_unlock_action = OCFS2_UNLOCK_INVALID;
519
520         res->l_flags         = OCFS2_LOCK_INITIALIZED;
521
522         ocfs2_add_lockres_tracking(res, osb->osb_dlm_debug);
523
524         ocfs2_init_lock_stats(res);
525 #ifdef CONFIG_DEBUG_LOCK_ALLOC
526         if (type != OCFS2_LOCK_TYPE_OPEN)
527                 lockdep_init_map(&res->l_lockdep_map, ocfs2_lock_type_strings[type],
528                                  &lockdep_keys[type], 0);
529         else
530                 res->l_lockdep_map.key = NULL;
531 #endif
532 }
533
534 void ocfs2_lock_res_init_once(struct ocfs2_lock_res *res)
535 {
536         /* This also clears out the lock status block */
537         memset(res, 0, sizeof(struct ocfs2_lock_res));
538         spin_lock_init(&res->l_lock);
539         init_waitqueue_head(&res->l_event);
540         INIT_LIST_HEAD(&res->l_blocked_list);
541         INIT_LIST_HEAD(&res->l_mask_waiters);
542         INIT_LIST_HEAD(&res->l_holders);
543 }
544
545 void ocfs2_inode_lock_res_init(struct ocfs2_lock_res *res,
546                                enum ocfs2_lock_type type,
547                                unsigned int generation,
548                                struct inode *inode)
549 {
550         struct ocfs2_lock_res_ops *ops;
551
552         switch(type) {
553                 case OCFS2_LOCK_TYPE_RW:
554                         ops = &ocfs2_inode_rw_lops;
555                         break;
556                 case OCFS2_LOCK_TYPE_META:
557                         ops = &ocfs2_inode_inode_lops;
558                         break;
559                 case OCFS2_LOCK_TYPE_OPEN:
560                         ops = &ocfs2_inode_open_lops;
561                         break;
562                 default:
563                         mlog_bug_on_msg(1, "type: %d\n", type);
564                         ops = NULL; /* thanks, gcc */
565                         break;
566         };
567
568         ocfs2_build_lock_name(type, OCFS2_I(inode)->ip_blkno,
569                               generation, res->l_name);
570         ocfs2_lock_res_init_common(OCFS2_SB(inode->i_sb), res, type, ops, inode);
571 }
572
573 static struct ocfs2_super *ocfs2_get_inode_osb(struct ocfs2_lock_res *lockres)
574 {
575         struct inode *inode = ocfs2_lock_res_inode(lockres);
576
577         return OCFS2_SB(inode->i_sb);
578 }
579
580 static struct ocfs2_super *ocfs2_get_qinfo_osb(struct ocfs2_lock_res *lockres)
581 {
582         struct ocfs2_mem_dqinfo *info = lockres->l_priv;
583
584         return OCFS2_SB(info->dqi_gi.dqi_sb);
585 }
586
587 static struct ocfs2_super *ocfs2_get_file_osb(struct ocfs2_lock_res *lockres)
588 {
589         struct ocfs2_file_private *fp = lockres->l_priv;
590
591         return OCFS2_SB(fp->fp_file->f_mapping->host->i_sb);
592 }
593
594 static __u64 ocfs2_get_dentry_lock_ino(struct ocfs2_lock_res *lockres)
595 {
596         __be64 inode_blkno_be;
597
598         memcpy(&inode_blkno_be, &lockres->l_name[OCFS2_DENTRY_LOCK_INO_START],
599                sizeof(__be64));
600
601         return be64_to_cpu(inode_blkno_be);
602 }
603
604 static struct ocfs2_super *ocfs2_get_dentry_osb(struct ocfs2_lock_res *lockres)
605 {
606         struct ocfs2_dentry_lock *dl = lockres->l_priv;
607
608         return OCFS2_SB(dl->dl_inode->i_sb);
609 }
610
611 void ocfs2_dentry_lock_res_init(struct ocfs2_dentry_lock *dl,
612                                 u64 parent, struct inode *inode)
613 {
614         int len;
615         u64 inode_blkno = OCFS2_I(inode)->ip_blkno;
616         __be64 inode_blkno_be = cpu_to_be64(inode_blkno);
617         struct ocfs2_lock_res *lockres = &dl->dl_lockres;
618
619         ocfs2_lock_res_init_once(lockres);
620
621         /*
622          * Unfortunately, the standard lock naming scheme won't work
623          * here because we have two 16 byte values to use. Instead,
624          * we'll stuff the inode number as a binary value. We still
625          * want error prints to show something without garbling the
626          * display, so drop a null byte in there before the inode
627          * number. A future version of OCFS2 will likely use all
628          * binary lock names. The stringified names have been a
629          * tremendous aid in debugging, but now that the debugfs
630          * interface exists, we can mangle things there if need be.
631          *
632          * NOTE: We also drop the standard "pad" value (the total lock
633          * name size stays the same though - the last part is all
634          * zeros due to the memset in ocfs2_lock_res_init_once()
635          */
636         len = snprintf(lockres->l_name, OCFS2_DENTRY_LOCK_INO_START,
637                        "%c%016llx",
638                        ocfs2_lock_type_char(OCFS2_LOCK_TYPE_DENTRY),
639                        (long long)parent);
640
641         BUG_ON(len != (OCFS2_DENTRY_LOCK_INO_START - 1));
642
643         memcpy(&lockres->l_name[OCFS2_DENTRY_LOCK_INO_START], &inode_blkno_be,
644                sizeof(__be64));
645
646         ocfs2_lock_res_init_common(OCFS2_SB(inode->i_sb), lockres,
647                                    OCFS2_LOCK_TYPE_DENTRY, &ocfs2_dentry_lops,
648                                    dl);
649 }
650
651 static void ocfs2_super_lock_res_init(struct ocfs2_lock_res *res,
652                                       struct ocfs2_super *osb)
653 {
654         /* Superblock lockres doesn't come from a slab so we call init
655          * once on it manually.  */
656         ocfs2_lock_res_init_once(res);
657         ocfs2_build_lock_name(OCFS2_LOCK_TYPE_SUPER, OCFS2_SUPER_BLOCK_BLKNO,
658                               0, res->l_name);
659         ocfs2_lock_res_init_common(osb, res, OCFS2_LOCK_TYPE_SUPER,
660                                    &ocfs2_super_lops, osb);
661 }
662
663 static void ocfs2_rename_lock_res_init(struct ocfs2_lock_res *res,
664                                        struct ocfs2_super *osb)
665 {
666         /* Rename lockres doesn't come from a slab so we call init
667          * once on it manually.  */
668         ocfs2_lock_res_init_once(res);
669         ocfs2_build_lock_name(OCFS2_LOCK_TYPE_RENAME, 0, 0, res->l_name);
670         ocfs2_lock_res_init_common(osb, res, OCFS2_LOCK_TYPE_RENAME,
671                                    &ocfs2_rename_lops, osb);
672 }
673
674 static void ocfs2_nfs_sync_lock_res_init(struct ocfs2_lock_res *res,
675                                          struct ocfs2_super *osb)
676 {
677         /* nfs_sync lockres doesn't come from a slab so we call init
678          * once on it manually.  */
679         ocfs2_lock_res_init_once(res);
680         ocfs2_build_lock_name(OCFS2_LOCK_TYPE_NFS_SYNC, 0, 0, res->l_name);
681         ocfs2_lock_res_init_common(osb, res, OCFS2_LOCK_TYPE_NFS_SYNC,
682                                    &ocfs2_nfs_sync_lops, osb);
683 }
684
685 void ocfs2_trim_fs_lock_res_init(struct ocfs2_super *osb)
686 {
687         struct ocfs2_lock_res *lockres = &osb->osb_trim_fs_lockres;
688
689         /* Only one trimfs thread are allowed to work at the same time. */
690         mutex_lock(&osb->obs_trim_fs_mutex);
691
692         ocfs2_lock_res_init_once(lockres);
693         ocfs2_build_lock_name(OCFS2_LOCK_TYPE_TRIM_FS, 0, 0, lockres->l_name);
694         ocfs2_lock_res_init_common(osb, lockres, OCFS2_LOCK_TYPE_TRIM_FS,
695                                    &ocfs2_trim_fs_lops, osb);
696 }
697
698 void ocfs2_trim_fs_lock_res_uninit(struct ocfs2_super *osb)
699 {
700         struct ocfs2_lock_res *lockres = &osb->osb_trim_fs_lockres;
701
702         ocfs2_simple_drop_lockres(osb, lockres);
703         ocfs2_lock_res_free(lockres);
704
705         mutex_unlock(&osb->obs_trim_fs_mutex);
706 }
707
708 static void ocfs2_orphan_scan_lock_res_init(struct ocfs2_lock_res *res,
709                                             struct ocfs2_super *osb)
710 {
711         ocfs2_lock_res_init_once(res);
712         ocfs2_build_lock_name(OCFS2_LOCK_TYPE_ORPHAN_SCAN, 0, 0, res->l_name);
713         ocfs2_lock_res_init_common(osb, res, OCFS2_LOCK_TYPE_ORPHAN_SCAN,
714                                    &ocfs2_orphan_scan_lops, osb);
715 }
716
717 void ocfs2_file_lock_res_init(struct ocfs2_lock_res *lockres,
718                               struct ocfs2_file_private *fp)
719 {
720         struct inode *inode = fp->fp_file->f_mapping->host;
721         struct ocfs2_inode_info *oi = OCFS2_I(inode);
722
723         ocfs2_lock_res_init_once(lockres);
724         ocfs2_build_lock_name(OCFS2_LOCK_TYPE_FLOCK, oi->ip_blkno,
725                               inode->i_generation, lockres->l_name);
726         ocfs2_lock_res_init_common(OCFS2_SB(inode->i_sb), lockres,
727                                    OCFS2_LOCK_TYPE_FLOCK, &ocfs2_flock_lops,
728                                    fp);
729         lockres->l_flags |= OCFS2_LOCK_NOCACHE;
730 }
731
732 void ocfs2_qinfo_lock_res_init(struct ocfs2_lock_res *lockres,
733                                struct ocfs2_mem_dqinfo *info)
734 {
735         ocfs2_lock_res_init_once(lockres);
736         ocfs2_build_lock_name(OCFS2_LOCK_TYPE_QINFO, info->dqi_gi.dqi_type,
737                               0, lockres->l_name);
738         ocfs2_lock_res_init_common(OCFS2_SB(info->dqi_gi.dqi_sb), lockres,
739                                    OCFS2_LOCK_TYPE_QINFO, &ocfs2_qinfo_lops,
740                                    info);
741 }
742
743 void ocfs2_refcount_lock_res_init(struct ocfs2_lock_res *lockres,
744                                   struct ocfs2_super *osb, u64 ref_blkno,
745                                   unsigned int generation)
746 {
747         ocfs2_lock_res_init_once(lockres);
748         ocfs2_build_lock_name(OCFS2_LOCK_TYPE_REFCOUNT, ref_blkno,
749                               generation, lockres->l_name);
750         ocfs2_lock_res_init_common(osb, lockres, OCFS2_LOCK_TYPE_REFCOUNT,
751                                    &ocfs2_refcount_block_lops, osb);
752 }
753
754 void ocfs2_lock_res_free(struct ocfs2_lock_res *res)
755 {
756         if (!(res->l_flags & OCFS2_LOCK_INITIALIZED))
757                 return;
758
759         ocfs2_remove_lockres_tracking(res);
760
761         mlog_bug_on_msg(!list_empty(&res->l_blocked_list),
762                         "Lockres %s is on the blocked list\n",
763                         res->l_name);
764         mlog_bug_on_msg(!list_empty(&res->l_mask_waiters),
765                         "Lockres %s has mask waiters pending\n",
766                         res->l_name);
767         mlog_bug_on_msg(spin_is_locked(&res->l_lock),
768                         "Lockres %s is locked\n",
769                         res->l_name);
770         mlog_bug_on_msg(res->l_ro_holders,
771                         "Lockres %s has %u ro holders\n",
772                         res->l_name, res->l_ro_holders);
773         mlog_bug_on_msg(res->l_ex_holders,
774                         "Lockres %s has %u ex holders\n",
775                         res->l_name, res->l_ex_holders);
776
777         /* Need to clear out the lock status block for the dlm */
778         memset(&res->l_lksb, 0, sizeof(res->l_lksb));
779
780         res->l_flags = 0UL;
781 }
782
783 /*
784  * Keep a list of processes who have interest in a lockres.
785  * Note: this is now only uesed for check recursive cluster locking.
786  */
787 static inline void ocfs2_add_holder(struct ocfs2_lock_res *lockres,
788                                    struct ocfs2_lock_holder *oh)
789 {
790         INIT_LIST_HEAD(&oh->oh_list);
791         oh->oh_owner_pid = get_pid(task_pid(current));
792
793         spin_lock(&lockres->l_lock);
794         list_add_tail(&oh->oh_list, &lockres->l_holders);
795         spin_unlock(&lockres->l_lock);
796 }
797
798 static struct ocfs2_lock_holder *
799 ocfs2_pid_holder(struct ocfs2_lock_res *lockres,
800                 struct pid *pid)
801 {
802         struct ocfs2_lock_holder *oh;
803
804         spin_lock(&lockres->l_lock);
805         list_for_each_entry(oh, &lockres->l_holders, oh_list) {
806                 if (oh->oh_owner_pid == pid) {
807                         spin_unlock(&lockres->l_lock);
808                         return oh;
809                 }
810         }
811         spin_unlock(&lockres->l_lock);
812         return NULL;
813 }
814
815 static inline void ocfs2_remove_holder(struct ocfs2_lock_res *lockres,
816                                        struct ocfs2_lock_holder *oh)
817 {
818         spin_lock(&lockres->l_lock);
819         list_del(&oh->oh_list);
820         spin_unlock(&lockres->l_lock);
821
822         put_pid(oh->oh_owner_pid);
823 }
824
825
826 static inline void ocfs2_inc_holders(struct ocfs2_lock_res *lockres,
827                                      int level)
828 {
829         BUG_ON(!lockres);
830
831         switch(level) {
832         case DLM_LOCK_EX:
833                 lockres->l_ex_holders++;
834                 break;
835         case DLM_LOCK_PR:
836                 lockres->l_ro_holders++;
837                 break;
838         default:
839                 BUG();
840         }
841 }
842
843 static inline void ocfs2_dec_holders(struct ocfs2_lock_res *lockres,
844                                      int level)
845 {
846         BUG_ON(!lockres);
847
848         switch(level) {
849         case DLM_LOCK_EX:
850                 BUG_ON(!lockres->l_ex_holders);
851                 lockres->l_ex_holders--;
852                 break;
853         case DLM_LOCK_PR:
854                 BUG_ON(!lockres->l_ro_holders);
855                 lockres->l_ro_holders--;
856                 break;
857         default:
858                 BUG();
859         }
860 }
861
862 /* WARNING: This function lives in a world where the only three lock
863  * levels are EX, PR, and NL. It *will* have to be adjusted when more
864  * lock types are added. */
865 static inline int ocfs2_highest_compat_lock_level(int level)
866 {
867         int new_level = DLM_LOCK_EX;
868
869         if (level == DLM_LOCK_EX)
870                 new_level = DLM_LOCK_NL;
871         else if (level == DLM_LOCK_PR)
872                 new_level = DLM_LOCK_PR;
873         return new_level;
874 }
875
876 static void lockres_set_flags(struct ocfs2_lock_res *lockres,
877                               unsigned long newflags)
878 {
879         struct ocfs2_mask_waiter *mw, *tmp;
880
881         assert_spin_locked(&lockres->l_lock);
882
883         lockres->l_flags = newflags;
884
885         list_for_each_entry_safe(mw, tmp, &lockres->l_mask_waiters, mw_item) {
886                 if ((lockres->l_flags & mw->mw_mask) != mw->mw_goal)
887                         continue;
888
889                 list_del_init(&mw->mw_item);
890                 mw->mw_status = 0;
891                 complete(&mw->mw_complete);
892         }
893 }
894 static void lockres_or_flags(struct ocfs2_lock_res *lockres, unsigned long or)
895 {
896         lockres_set_flags(lockres, lockres->l_flags | or);
897 }
898 static void lockres_clear_flags(struct ocfs2_lock_res *lockres,
899                                 unsigned long clear)
900 {
901         lockres_set_flags(lockres, lockres->l_flags & ~clear);
902 }
903
904 static inline void ocfs2_generic_handle_downconvert_action(struct ocfs2_lock_res *lockres)
905 {
906         BUG_ON(!(lockres->l_flags & OCFS2_LOCK_BUSY));
907         BUG_ON(!(lockres->l_flags & OCFS2_LOCK_ATTACHED));
908         BUG_ON(!(lockres->l_flags & OCFS2_LOCK_BLOCKED));
909         BUG_ON(lockres->l_blocking <= DLM_LOCK_NL);
910
911         lockres->l_level = lockres->l_requested;
912         if (lockres->l_level <=
913             ocfs2_highest_compat_lock_level(lockres->l_blocking)) {
914                 lockres->l_blocking = DLM_LOCK_NL;
915                 lockres_clear_flags(lockres, OCFS2_LOCK_BLOCKED);
916         }
917         lockres_clear_flags(lockres, OCFS2_LOCK_BUSY);
918 }
919
920 static inline void ocfs2_generic_handle_convert_action(struct ocfs2_lock_res *lockres)
921 {
922         BUG_ON(!(lockres->l_flags & OCFS2_LOCK_BUSY));
923         BUG_ON(!(lockres->l_flags & OCFS2_LOCK_ATTACHED));
924
925         /* Convert from RO to EX doesn't really need anything as our
926          * information is already up to data. Convert from NL to
927          * *anything* however should mark ourselves as needing an
928          * update */
929         if (lockres->l_level == DLM_LOCK_NL &&
930             lockres->l_ops->flags & LOCK_TYPE_REQUIRES_REFRESH)
931                 lockres_or_flags(lockres, OCFS2_LOCK_NEEDS_REFRESH);
932
933         lockres->l_level = lockres->l_requested;
934
935         /*
936          * We set the OCFS2_LOCK_UPCONVERT_FINISHING flag before clearing
937          * the OCFS2_LOCK_BUSY flag to prevent the dc thread from
938          * downconverting the lock before the upconvert has fully completed.
939          * Do not prevent the dc thread from downconverting if NONBLOCK lock
940          * had already returned.
941          */
942         if (!(lockres->l_flags & OCFS2_LOCK_NONBLOCK_FINISHED))
943                 lockres_or_flags(lockres, OCFS2_LOCK_UPCONVERT_FINISHING);
944         else
945                 lockres_clear_flags(lockres, OCFS2_LOCK_NONBLOCK_FINISHED);
946
947         lockres_clear_flags(lockres, OCFS2_LOCK_BUSY);
948 }
949
950 static inline void ocfs2_generic_handle_attach_action(struct ocfs2_lock_res *lockres)
951 {
952         BUG_ON((!(lockres->l_flags & OCFS2_LOCK_BUSY)));
953         BUG_ON(lockres->l_flags & OCFS2_LOCK_ATTACHED);
954
955         if (lockres->l_requested > DLM_LOCK_NL &&
956             !(lockres->l_flags & OCFS2_LOCK_LOCAL) &&
957             lockres->l_ops->flags & LOCK_TYPE_REQUIRES_REFRESH)
958                 lockres_or_flags(lockres, OCFS2_LOCK_NEEDS_REFRESH);
959
960         lockres->l_level = lockres->l_requested;
961         lockres_or_flags(lockres, OCFS2_LOCK_ATTACHED);
962         lockres_clear_flags(lockres, OCFS2_LOCK_BUSY);
963 }
964
965 static int ocfs2_generic_handle_bast(struct ocfs2_lock_res *lockres,
966                                      int level)
967 {
968         int needs_downconvert = 0;
969
970         assert_spin_locked(&lockres->l_lock);
971
972         if (level > lockres->l_blocking) {
973                 /* only schedule a downconvert if we haven't already scheduled
974                  * one that goes low enough to satisfy the level we're
975                  * blocking.  this also catches the case where we get
976                  * duplicate BASTs */
977                 if (ocfs2_highest_compat_lock_level(level) <
978                     ocfs2_highest_compat_lock_level(lockres->l_blocking))
979                         needs_downconvert = 1;
980
981                 lockres->l_blocking = level;
982         }
983
984         mlog(ML_BASTS, "lockres %s, block %d, level %d, l_block %d, dwn %d\n",
985              lockres->l_name, level, lockres->l_level, lockres->l_blocking,
986              needs_downconvert);
987
988         if (needs_downconvert)
989                 lockres_or_flags(lockres, OCFS2_LOCK_BLOCKED);
990         mlog(0, "needs_downconvert = %d\n", needs_downconvert);
991         return needs_downconvert;
992 }
993
994 /*
995  * OCFS2_LOCK_PENDING and l_pending_gen.
996  *
997  * Why does OCFS2_LOCK_PENDING exist?  To close a race between setting
998  * OCFS2_LOCK_BUSY and calling ocfs2_dlm_lock().  See ocfs2_unblock_lock()
999  * for more details on the race.
1000  *
1001  * OCFS2_LOCK_PENDING closes the race quite nicely.  However, it introduces
1002  * a race on itself.  In o2dlm, we can get the ast before ocfs2_dlm_lock()
1003  * returns.  The ast clears OCFS2_LOCK_BUSY, and must therefore clear
1004  * OCFS2_LOCK_PENDING at the same time.  When ocfs2_dlm_lock() returns,
1005  * the caller is going to try to clear PENDING again.  If nothing else is
1006  * happening, __lockres_clear_pending() sees PENDING is unset and does
1007  * nothing.
1008  *
1009  * But what if another path (eg downconvert thread) has just started a
1010  * new locking action?  The other path has re-set PENDING.  Our path
1011  * cannot clear PENDING, because that will re-open the original race
1012  * window.
1013  *
1014  * [Example]
1015  *
1016  * ocfs2_meta_lock()
1017  *  ocfs2_cluster_lock()
1018  *   set BUSY
1019  *   set PENDING
1020  *   drop l_lock
1021  *   ocfs2_dlm_lock()
1022  *    ocfs2_locking_ast()               ocfs2_downconvert_thread()
1023  *     clear PENDING                     ocfs2_unblock_lock()
1024  *                                        take_l_lock
1025  *                                        !BUSY
1026  *                                        ocfs2_prepare_downconvert()
1027  *                                         set BUSY
1028  *                                         set PENDING
1029  *                                        drop l_lock
1030  *   take l_lock
1031  *   clear PENDING
1032  *   drop l_lock
1033  *                      <window>
1034  *                                        ocfs2_dlm_lock()
1035  *
1036  * So as you can see, we now have a window where l_lock is not held,
1037  * PENDING is not set, and ocfs2_dlm_lock() has not been called.
1038  *
1039  * The core problem is that ocfs2_cluster_lock() has cleared the PENDING
1040  * set by ocfs2_prepare_downconvert().  That wasn't nice.
1041  *
1042  * To solve this we introduce l_pending_gen.  A call to
1043  * lockres_clear_pending() will only do so when it is passed a generation
1044  * number that matches the lockres.  lockres_set_pending() will return the
1045  * current generation number.  When ocfs2_cluster_lock() goes to clear
1046  * PENDING, it passes the generation it got from set_pending().  In our
1047  * example above, the generation numbers will *not* match.  Thus,
1048  * ocfs2_cluster_lock() will not clear the PENDING set by
1049  * ocfs2_prepare_downconvert().
1050  */
1051
1052 /* Unlocked version for ocfs2_locking_ast() */
1053 static void __lockres_clear_pending(struct ocfs2_lock_res *lockres,
1054                                     unsigned int generation,
1055                                     struct ocfs2_super *osb)
1056 {
1057         assert_spin_locked(&lockres->l_lock);
1058
1059         /*
1060          * The ast and locking functions can race us here.  The winner
1061          * will clear pending, the loser will not.
1062          */
1063         if (!(lockres->l_flags & OCFS2_LOCK_PENDING) ||
1064             (lockres->l_pending_gen != generation))
1065                 return;
1066
1067         lockres_clear_flags(lockres, OCFS2_LOCK_PENDING);
1068         lockres->l_pending_gen++;
1069
1070         /*
1071          * The downconvert thread may have skipped us because we
1072          * were PENDING.  Wake it up.
1073          */
1074         if (lockres->l_flags & OCFS2_LOCK_BLOCKED)
1075                 ocfs2_wake_downconvert_thread(osb);
1076 }
1077
1078 /* Locked version for callers of ocfs2_dlm_lock() */
1079 static void lockres_clear_pending(struct ocfs2_lock_res *lockres,
1080                                   unsigned int generation,
1081                                   struct ocfs2_super *osb)
1082 {
1083         unsigned long flags;
1084
1085         spin_lock_irqsave(&lockres->l_lock, flags);
1086         __lockres_clear_pending(lockres, generation, osb);
1087         spin_unlock_irqrestore(&lockres->l_lock, flags);
1088 }
1089
1090 static unsigned int lockres_set_pending(struct ocfs2_lock_res *lockres)
1091 {
1092         assert_spin_locked(&lockres->l_lock);
1093         BUG_ON(!(lockres->l_flags & OCFS2_LOCK_BUSY));
1094
1095         lockres_or_flags(lockres, OCFS2_LOCK_PENDING);
1096
1097         return lockres->l_pending_gen;
1098 }
1099
1100 static void ocfs2_blocking_ast(struct ocfs2_dlm_lksb *lksb, int level)
1101 {
1102         struct ocfs2_lock_res *lockres = ocfs2_lksb_to_lock_res(lksb);
1103         struct ocfs2_super *osb = ocfs2_get_lockres_osb(lockres);
1104         int needs_downconvert;
1105         unsigned long flags;
1106
1107         BUG_ON(level <= DLM_LOCK_NL);
1108
1109         mlog(ML_BASTS, "BAST fired for lockres %s, blocking %d, level %d, "
1110              "type %s\n", lockres->l_name, level, lockres->l_level,
1111              ocfs2_lock_type_string(lockres->l_type));
1112
1113         /*
1114          * We can skip the bast for locks which don't enable caching -
1115          * they'll be dropped at the earliest possible time anyway.
1116          */
1117         if (lockres->l_flags & OCFS2_LOCK_NOCACHE)
1118                 return;
1119
1120         spin_lock_irqsave(&lockres->l_lock, flags);
1121         needs_downconvert = ocfs2_generic_handle_bast(lockres, level);
1122         if (needs_downconvert)
1123                 ocfs2_schedule_blocked_lock(osb, lockres);
1124         spin_unlock_irqrestore(&lockres->l_lock, flags);
1125
1126         wake_up(&lockres->l_event);
1127
1128         ocfs2_wake_downconvert_thread(osb);
1129 }
1130
1131 static void ocfs2_locking_ast(struct ocfs2_dlm_lksb *lksb)
1132 {
1133         struct ocfs2_lock_res *lockres = ocfs2_lksb_to_lock_res(lksb);
1134         struct ocfs2_super *osb = ocfs2_get_lockres_osb(lockres);
1135         unsigned long flags;
1136         int status;
1137
1138         spin_lock_irqsave(&lockres->l_lock, flags);
1139
1140         status = ocfs2_dlm_lock_status(&lockres->l_lksb);
1141
1142         if (status == -EAGAIN) {
1143                 lockres_clear_flags(lockres, OCFS2_LOCK_BUSY);
1144                 goto out;
1145         }
1146
1147         if (status) {
1148                 mlog(ML_ERROR, "lockres %s: lksb status value of %d!\n",
1149                      lockres->l_name, status);
1150                 spin_unlock_irqrestore(&lockres->l_lock, flags);
1151                 return;
1152         }
1153
1154         mlog(ML_BASTS, "AST fired for lockres %s, action %d, unlock %d, "
1155              "level %d => %d\n", lockres->l_name, lockres->l_action,
1156              lockres->l_unlock_action, lockres->l_level, lockres->l_requested);
1157
1158         switch(lockres->l_action) {
1159         case OCFS2_AST_ATTACH:
1160                 ocfs2_generic_handle_attach_action(lockres);
1161                 lockres_clear_flags(lockres, OCFS2_LOCK_LOCAL);
1162                 break;
1163         case OCFS2_AST_CONVERT:
1164                 ocfs2_generic_handle_convert_action(lockres);
1165                 break;
1166         case OCFS2_AST_DOWNCONVERT:
1167                 ocfs2_generic_handle_downconvert_action(lockres);
1168                 break;
1169         default:
1170                 mlog(ML_ERROR, "lockres %s: AST fired with invalid action: %u, "
1171                      "flags 0x%lx, unlock: %u\n",
1172                      lockres->l_name, lockres->l_action, lockres->l_flags,
1173                      lockres->l_unlock_action);
1174                 BUG();
1175         }
1176 out:
1177         /* set it to something invalid so if we get called again we
1178          * can catch it. */
1179         lockres->l_action = OCFS2_AST_INVALID;
1180
1181         /* Did we try to cancel this lock?  Clear that state */
1182         if (lockres->l_unlock_action == OCFS2_UNLOCK_CANCEL_CONVERT)
1183                 lockres->l_unlock_action = OCFS2_UNLOCK_INVALID;
1184
1185         /*
1186          * We may have beaten the locking functions here.  We certainly
1187          * know that dlm_lock() has been called :-)
1188          * Because we can't have two lock calls in flight at once, we
1189          * can use lockres->l_pending_gen.
1190          */
1191         __lockres_clear_pending(lockres, lockres->l_pending_gen,  osb);
1192
1193         wake_up(&lockres->l_event);
1194         spin_unlock_irqrestore(&lockres->l_lock, flags);
1195 }
1196
1197 static void ocfs2_unlock_ast(struct ocfs2_dlm_lksb *lksb, int error)
1198 {
1199         struct ocfs2_lock_res *lockres = ocfs2_lksb_to_lock_res(lksb);
1200         unsigned long flags;
1201
1202         mlog(ML_BASTS, "UNLOCK AST fired for lockres %s, action = %d\n",
1203              lockres->l_name, lockres->l_unlock_action);
1204
1205         spin_lock_irqsave(&lockres->l_lock, flags);
1206         if (error) {
1207                 mlog(ML_ERROR, "Dlm passes error %d for lock %s, "
1208                      "unlock_action %d\n", error, lockres->l_name,
1209                      lockres->l_unlock_action);
1210                 spin_unlock_irqrestore(&lockres->l_lock, flags);
1211                 return;
1212         }
1213
1214         switch(lockres->l_unlock_action) {
1215         case OCFS2_UNLOCK_CANCEL_CONVERT:
1216                 mlog(0, "Cancel convert success for %s\n", lockres->l_name);
1217                 lockres->l_action = OCFS2_AST_INVALID;
1218                 /* Downconvert thread may have requeued this lock, we
1219                  * need to wake it. */
1220                 if (lockres->l_flags & OCFS2_LOCK_BLOCKED)
1221                         ocfs2_wake_downconvert_thread(ocfs2_get_lockres_osb(lockres));
1222                 break;
1223         case OCFS2_UNLOCK_DROP_LOCK:
1224                 lockres->l_level = DLM_LOCK_IV;
1225                 break;
1226         default:
1227                 BUG();
1228         }
1229
1230         lockres_clear_flags(lockres, OCFS2_LOCK_BUSY);
1231         lockres->l_unlock_action = OCFS2_UNLOCK_INVALID;
1232         wake_up(&lockres->l_event);
1233         spin_unlock_irqrestore(&lockres->l_lock, flags);
1234 }
1235
1236 /*
1237  * This is the filesystem locking protocol.  It provides the lock handling
1238  * hooks for the underlying DLM.  It has a maximum version number.
1239  * The version number allows interoperability with systems running at
1240  * the same major number and an equal or smaller minor number.
1241  *
1242  * Whenever the filesystem does new things with locks (adds or removes a
1243  * lock, orders them differently, does different things underneath a lock),
1244  * the version must be changed.  The protocol is negotiated when joining
1245  * the dlm domain.  A node may join the domain if its major version is
1246  * identical to all other nodes and its minor version is greater than
1247  * or equal to all other nodes.  When its minor version is greater than
1248  * the other nodes, it will run at the minor version specified by the
1249  * other nodes.
1250  *
1251  * If a locking change is made that will not be compatible with older
1252  * versions, the major number must be increased and the minor version set
1253  * to zero.  If a change merely adds a behavior that can be disabled when
1254  * speaking to older versions, the minor version must be increased.  If a
1255  * change adds a fully backwards compatible change (eg, LVB changes that
1256  * are just ignored by older versions), the version does not need to be
1257  * updated.
1258  */
1259 static struct ocfs2_locking_protocol lproto = {
1260         .lp_max_version = {
1261                 .pv_major = OCFS2_LOCKING_PROTOCOL_MAJOR,
1262                 .pv_minor = OCFS2_LOCKING_PROTOCOL_MINOR,
1263         },
1264         .lp_lock_ast            = ocfs2_locking_ast,
1265         .lp_blocking_ast        = ocfs2_blocking_ast,
1266         .lp_unlock_ast          = ocfs2_unlock_ast,
1267 };
1268
1269 void ocfs2_set_locking_protocol(void)
1270 {
1271         ocfs2_stack_glue_set_max_proto_version(&lproto.lp_max_version);
1272 }
1273
1274 static inline void ocfs2_recover_from_dlm_error(struct ocfs2_lock_res *lockres,
1275                                                 int convert)
1276 {
1277         unsigned long flags;
1278
1279         spin_lock_irqsave(&lockres->l_lock, flags);
1280         lockres_clear_flags(lockres, OCFS2_LOCK_BUSY);
1281         lockres_clear_flags(lockres, OCFS2_LOCK_UPCONVERT_FINISHING);
1282         if (convert)
1283                 lockres->l_action = OCFS2_AST_INVALID;
1284         else
1285                 lockres->l_unlock_action = OCFS2_UNLOCK_INVALID;
1286         spin_unlock_irqrestore(&lockres->l_lock, flags);
1287
1288         wake_up(&lockres->l_event);
1289 }
1290
1291 /* Note: If we detect another process working on the lock (i.e.,
1292  * OCFS2_LOCK_BUSY), we'll bail out returning 0. It's up to the caller
1293  * to do the right thing in that case.
1294  */
1295 static int ocfs2_lock_create(struct ocfs2_super *osb,
1296                              struct ocfs2_lock_res *lockres,
1297                              int level,
1298                              u32 dlm_flags)
1299 {
1300         int ret = 0;
1301         unsigned long flags;
1302         unsigned int gen;
1303
1304         mlog(0, "lock %s, level = %d, flags = %u\n", lockres->l_name, level,
1305              dlm_flags);
1306
1307         spin_lock_irqsave(&lockres->l_lock, flags);
1308         if ((lockres->l_flags & OCFS2_LOCK_ATTACHED) ||
1309             (lockres->l_flags & OCFS2_LOCK_BUSY)) {
1310                 spin_unlock_irqrestore(&lockres->l_lock, flags);
1311                 goto bail;
1312         }
1313
1314         lockres->l_action = OCFS2_AST_ATTACH;
1315         lockres->l_requested = level;
1316         lockres_or_flags(lockres, OCFS2_LOCK_BUSY);
1317         gen = lockres_set_pending(lockres);
1318         spin_unlock_irqrestore(&lockres->l_lock, flags);
1319
1320         ret = ocfs2_dlm_lock(osb->cconn,
1321                              level,
1322                              &lockres->l_lksb,
1323                              dlm_flags,
1324                              lockres->l_name,
1325                              OCFS2_LOCK_ID_MAX_LEN - 1);
1326         lockres_clear_pending(lockres, gen, osb);
1327         if (ret) {
1328                 ocfs2_log_dlm_error("ocfs2_dlm_lock", ret, lockres);
1329                 ocfs2_recover_from_dlm_error(lockres, 1);
1330         }
1331
1332         mlog(0, "lock %s, return from ocfs2_dlm_lock\n", lockres->l_name);
1333
1334 bail:
1335         return ret;
1336 }
1337
1338 static inline int ocfs2_check_wait_flag(struct ocfs2_lock_res *lockres,
1339                                         int flag)
1340 {
1341         unsigned long flags;
1342         int ret;
1343
1344         spin_lock_irqsave(&lockres->l_lock, flags);
1345         ret = lockres->l_flags & flag;
1346         spin_unlock_irqrestore(&lockres->l_lock, flags);
1347
1348         return ret;
1349 }
1350
1351 static inline void ocfs2_wait_on_busy_lock(struct ocfs2_lock_res *lockres)
1352
1353 {
1354         wait_event(lockres->l_event,
1355                    !ocfs2_check_wait_flag(lockres, OCFS2_LOCK_BUSY));
1356 }
1357
1358 static inline void ocfs2_wait_on_refreshing_lock(struct ocfs2_lock_res *lockres)
1359
1360 {
1361         wait_event(lockres->l_event,
1362                    !ocfs2_check_wait_flag(lockres, OCFS2_LOCK_REFRESHING));
1363 }
1364
1365 /* predict what lock level we'll be dropping down to on behalf
1366  * of another node, and return true if the currently wanted
1367  * level will be compatible with it. */
1368 static inline int ocfs2_may_continue_on_blocked_lock(struct ocfs2_lock_res *lockres,
1369                                                      int wanted)
1370 {
1371         BUG_ON(!(lockres->l_flags & OCFS2_LOCK_BLOCKED));
1372
1373         return wanted <= ocfs2_highest_compat_lock_level(lockres->l_blocking);
1374 }
1375
1376 static void ocfs2_init_mask_waiter(struct ocfs2_mask_waiter *mw)
1377 {
1378         INIT_LIST_HEAD(&mw->mw_item);
1379         init_completion(&mw->mw_complete);
1380         ocfs2_init_start_time(mw);
1381 }
1382
1383 static int ocfs2_wait_for_mask(struct ocfs2_mask_waiter *mw)
1384 {
1385         wait_for_completion(&mw->mw_complete);
1386         /* Re-arm the completion in case we want to wait on it again */
1387         reinit_completion(&mw->mw_complete);
1388         return mw->mw_status;
1389 }
1390
1391 static void lockres_add_mask_waiter(struct ocfs2_lock_res *lockres,
1392                                     struct ocfs2_mask_waiter *mw,
1393                                     unsigned long mask,
1394                                     unsigned long goal)
1395 {
1396         BUG_ON(!list_empty(&mw->mw_item));
1397
1398         assert_spin_locked(&lockres->l_lock);
1399
1400         list_add_tail(&mw->mw_item, &lockres->l_mask_waiters);
1401         mw->mw_mask = mask;
1402         mw->mw_goal = goal;
1403 }
1404
1405 /* returns 0 if the mw that was removed was already satisfied, -EBUSY
1406  * if the mask still hadn't reached its goal */
1407 static int __lockres_remove_mask_waiter(struct ocfs2_lock_res *lockres,
1408                                       struct ocfs2_mask_waiter *mw)
1409 {
1410         int ret = 0;
1411
1412         assert_spin_locked(&lockres->l_lock);
1413         if (!list_empty(&mw->mw_item)) {
1414                 if ((lockres->l_flags & mw->mw_mask) != mw->mw_goal)
1415                         ret = -EBUSY;
1416
1417                 list_del_init(&mw->mw_item);
1418                 init_completion(&mw->mw_complete);
1419         }
1420
1421         return ret;
1422 }
1423
1424 static int lockres_remove_mask_waiter(struct ocfs2_lock_res *lockres,
1425                                       struct ocfs2_mask_waiter *mw)
1426 {
1427         unsigned long flags;
1428         int ret = 0;
1429
1430         spin_lock_irqsave(&lockres->l_lock, flags);
1431         ret = __lockres_remove_mask_waiter(lockres, mw);
1432         spin_unlock_irqrestore(&lockres->l_lock, flags);
1433
1434         return ret;
1435
1436 }
1437
1438 static int ocfs2_wait_for_mask_interruptible(struct ocfs2_mask_waiter *mw,
1439                                              struct ocfs2_lock_res *lockres)
1440 {
1441         int ret;
1442
1443         ret = wait_for_completion_interruptible(&mw->mw_complete);
1444         if (ret)
1445                 lockres_remove_mask_waiter(lockres, mw);
1446         else
1447                 ret = mw->mw_status;
1448         /* Re-arm the completion in case we want to wait on it again */
1449         reinit_completion(&mw->mw_complete);
1450         return ret;
1451 }
1452
1453 static int __ocfs2_cluster_lock(struct ocfs2_super *osb,
1454                                 struct ocfs2_lock_res *lockres,
1455                                 int level,
1456                                 u32 lkm_flags,
1457                                 int arg_flags,
1458                                 int l_subclass,
1459                                 unsigned long caller_ip)
1460 {
1461         struct ocfs2_mask_waiter mw;
1462         int wait, catch_signals = !(osb->s_mount_opt & OCFS2_MOUNT_NOINTR);
1463         int ret = 0; /* gcc doesn't realize wait = 1 guarantees ret is set */
1464         unsigned long flags;
1465         unsigned int gen;
1466         int noqueue_attempted = 0;
1467         int dlm_locked = 0;
1468         int kick_dc = 0;
1469
1470         if (!(lockres->l_flags & OCFS2_LOCK_INITIALIZED)) {
1471                 mlog_errno(-EINVAL);
1472                 return -EINVAL;
1473         }
1474
1475         ocfs2_init_mask_waiter(&mw);
1476
1477         if (lockres->l_ops->flags & LOCK_TYPE_USES_LVB)
1478                 lkm_flags |= DLM_LKF_VALBLK;
1479
1480 again:
1481         wait = 0;
1482
1483         spin_lock_irqsave(&lockres->l_lock, flags);
1484
1485         if (catch_signals && signal_pending(current)) {
1486                 ret = -ERESTARTSYS;
1487                 goto unlock;
1488         }
1489
1490         mlog_bug_on_msg(lockres->l_flags & OCFS2_LOCK_FREEING,
1491                         "Cluster lock called on freeing lockres %s! flags "
1492                         "0x%lx\n", lockres->l_name, lockres->l_flags);
1493
1494         /* We only compare against the currently granted level
1495          * here. If the lock is blocked waiting on a downconvert,
1496          * we'll get caught below. */
1497         if (lockres->l_flags & OCFS2_LOCK_BUSY &&
1498             level > lockres->l_level) {
1499                 /* is someone sitting in dlm_lock? If so, wait on
1500                  * them. */
1501                 lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_BUSY, 0);
1502                 wait = 1;
1503                 goto unlock;
1504         }
1505
1506         if (lockres->l_flags & OCFS2_LOCK_UPCONVERT_FINISHING) {
1507                 /*
1508                  * We've upconverted. If the lock now has a level we can
1509                  * work with, we take it. If, however, the lock is not at the
1510                  * required level, we go thru the full cycle. One way this could
1511                  * happen is if a process requesting an upconvert to PR is
1512                  * closely followed by another requesting upconvert to an EX.
1513                  * If the process requesting EX lands here, we want it to
1514                  * continue attempting to upconvert and let the process
1515                  * requesting PR take the lock.
1516                  * If multiple processes request upconvert to PR, the first one
1517                  * here will take the lock. The others will have to go thru the
1518                  * OCFS2_LOCK_BLOCKED check to ensure that there is no pending
1519                  * downconvert request.
1520                  */
1521                 if (level <= lockres->l_level)
1522                         goto update_holders;
1523         }
1524
1525         if (lockres->l_flags & OCFS2_LOCK_BLOCKED &&
1526             !ocfs2_may_continue_on_blocked_lock(lockres, level)) {
1527                 /* is the lock is currently blocked on behalf of
1528                  * another node */
1529                 lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_BLOCKED, 0);
1530                 wait = 1;
1531                 goto unlock;
1532         }
1533
1534         if (level > lockres->l_level) {
1535                 if (noqueue_attempted > 0) {
1536                         ret = -EAGAIN;
1537                         goto unlock;
1538                 }
1539                 if (lkm_flags & DLM_LKF_NOQUEUE)
1540                         noqueue_attempted = 1;
1541
1542                 if (lockres->l_action != OCFS2_AST_INVALID)
1543                         mlog(ML_ERROR, "lockres %s has action %u pending\n",
1544                              lockres->l_name, lockres->l_action);
1545
1546                 if (!(lockres->l_flags & OCFS2_LOCK_ATTACHED)) {
1547                         lockres->l_action = OCFS2_AST_ATTACH;
1548                         lkm_flags &= ~DLM_LKF_CONVERT;
1549                 } else {
1550                         lockres->l_action = OCFS2_AST_CONVERT;
1551                         lkm_flags |= DLM_LKF_CONVERT;
1552                 }
1553
1554                 lockres->l_requested = level;
1555                 lockres_or_flags(lockres, OCFS2_LOCK_BUSY);
1556                 gen = lockres_set_pending(lockres);
1557                 spin_unlock_irqrestore(&lockres->l_lock, flags);
1558
1559                 BUG_ON(level == DLM_LOCK_IV);
1560                 BUG_ON(level == DLM_LOCK_NL);
1561
1562                 mlog(ML_BASTS, "lockres %s, convert from %d to %d\n",
1563                      lockres->l_name, lockres->l_level, level);
1564
1565                 /* call dlm_lock to upgrade lock now */
1566                 ret = ocfs2_dlm_lock(osb->cconn,
1567                                      level,
1568                                      &lockres->l_lksb,
1569                                      lkm_flags,
1570                                      lockres->l_name,
1571                                      OCFS2_LOCK_ID_MAX_LEN - 1);
1572                 lockres_clear_pending(lockres, gen, osb);
1573                 if (ret) {
1574                         if (!(lkm_flags & DLM_LKF_NOQUEUE) ||
1575                             (ret != -EAGAIN)) {
1576                                 ocfs2_log_dlm_error("ocfs2_dlm_lock",
1577                                                     ret, lockres);
1578                         }
1579                         ocfs2_recover_from_dlm_error(lockres, 1);
1580                         goto out;
1581                 }
1582                 dlm_locked = 1;
1583
1584                 mlog(0, "lock %s, successful return from ocfs2_dlm_lock\n",
1585                      lockres->l_name);
1586
1587                 /* At this point we've gone inside the dlm and need to
1588                  * complete our work regardless. */
1589                 catch_signals = 0;
1590
1591                 /* wait for busy to clear and carry on */
1592                 goto again;
1593         }
1594
1595 update_holders:
1596         /* Ok, if we get here then we're good to go. */
1597         ocfs2_inc_holders(lockres, level);
1598
1599         ret = 0;
1600 unlock:
1601         lockres_clear_flags(lockres, OCFS2_LOCK_UPCONVERT_FINISHING);
1602
1603         /* ocfs2_unblock_lock reques on seeing OCFS2_LOCK_UPCONVERT_FINISHING */
1604         kick_dc = (lockres->l_flags & OCFS2_LOCK_BLOCKED);
1605
1606         spin_unlock_irqrestore(&lockres->l_lock, flags);
1607         if (kick_dc)
1608                 ocfs2_wake_downconvert_thread(osb);
1609 out:
1610         /*
1611          * This is helping work around a lock inversion between the page lock
1612          * and dlm locks.  One path holds the page lock while calling aops
1613          * which block acquiring dlm locks.  The voting thread holds dlm
1614          * locks while acquiring page locks while down converting data locks.
1615          * This block is helping an aop path notice the inversion and back
1616          * off to unlock its page lock before trying the dlm lock again.
1617          */
1618         if (wait && arg_flags & OCFS2_LOCK_NONBLOCK &&
1619             mw.mw_mask & (OCFS2_LOCK_BUSY|OCFS2_LOCK_BLOCKED)) {
1620                 wait = 0;
1621                 spin_lock_irqsave(&lockres->l_lock, flags);
1622                 if (__lockres_remove_mask_waiter(lockres, &mw)) {
1623                         if (dlm_locked)
1624                                 lockres_or_flags(lockres,
1625                                         OCFS2_LOCK_NONBLOCK_FINISHED);
1626                         spin_unlock_irqrestore(&lockres->l_lock, flags);
1627                         ret = -EAGAIN;
1628                 } else {
1629                         spin_unlock_irqrestore(&lockres->l_lock, flags);
1630                         goto again;
1631                 }
1632         }
1633         if (wait) {
1634                 ret = ocfs2_wait_for_mask(&mw);
1635                 if (ret == 0)
1636                         goto again;
1637                 mlog_errno(ret);
1638         }
1639         ocfs2_update_lock_stats(lockres, level, &mw, ret);
1640
1641 #ifdef CONFIG_DEBUG_LOCK_ALLOC
1642         if (!ret && lockres->l_lockdep_map.key != NULL) {
1643                 if (level == DLM_LOCK_PR)
1644                         rwsem_acquire_read(&lockres->l_lockdep_map, l_subclass,
1645                                 !!(arg_flags & OCFS2_META_LOCK_NOQUEUE),
1646                                 caller_ip);
1647                 else
1648                         rwsem_acquire(&lockres->l_lockdep_map, l_subclass,
1649                                 !!(arg_flags & OCFS2_META_LOCK_NOQUEUE),
1650                                 caller_ip);
1651         }
1652 #endif
1653         return ret;
1654 }
1655
1656 static inline int ocfs2_cluster_lock(struct ocfs2_super *osb,
1657                                      struct ocfs2_lock_res *lockres,
1658                                      int level,
1659                                      u32 lkm_flags,
1660                                      int arg_flags)
1661 {
1662         return __ocfs2_cluster_lock(osb, lockres, level, lkm_flags, arg_flags,
1663                                     0, _RET_IP_);
1664 }
1665
1666
1667 static void __ocfs2_cluster_unlock(struct ocfs2_super *osb,
1668                                    struct ocfs2_lock_res *lockres,
1669                                    int level,
1670                                    unsigned long caller_ip)
1671 {
1672         unsigned long flags;
1673
1674         spin_lock_irqsave(&lockres->l_lock, flags);
1675         ocfs2_dec_holders(lockres, level);
1676         ocfs2_downconvert_on_unlock(osb, lockres);
1677         spin_unlock_irqrestore(&lockres->l_lock, flags);
1678 #ifdef CONFIG_DEBUG_LOCK_ALLOC
1679         if (lockres->l_lockdep_map.key != NULL)
1680                 rwsem_release(&lockres->l_lockdep_map, 1, caller_ip);
1681 #endif
1682 }
1683
1684 static int ocfs2_create_new_lock(struct ocfs2_super *osb,
1685                                  struct ocfs2_lock_res *lockres,
1686                                  int ex,
1687                                  int local)
1688 {
1689         int level =  ex ? DLM_LOCK_EX : DLM_LOCK_PR;
1690         unsigned long flags;
1691         u32 lkm_flags = local ? DLM_LKF_LOCAL : 0;
1692
1693         spin_lock_irqsave(&lockres->l_lock, flags);
1694         BUG_ON(lockres->l_flags & OCFS2_LOCK_ATTACHED);
1695         lockres_or_flags(lockres, OCFS2_LOCK_LOCAL);
1696         spin_unlock_irqrestore(&lockres->l_lock, flags);
1697
1698         return ocfs2_lock_create(osb, lockres, level, lkm_flags);
1699 }
1700
1701 /* Grants us an EX lock on the data and metadata resources, skipping
1702  * the normal cluster directory lookup. Use this ONLY on newly created
1703  * inodes which other nodes can't possibly see, and which haven't been
1704  * hashed in the inode hash yet. This can give us a good performance
1705  * increase as it'll skip the network broadcast normally associated
1706  * with creating a new lock resource. */
1707 int ocfs2_create_new_inode_locks(struct inode *inode)
1708 {
1709         int ret;
1710         struct ocfs2_super *osb = OCFS2_SB(inode->i_sb);
1711
1712         BUG_ON(!ocfs2_inode_is_new(inode));
1713
1714         mlog(0, "Inode %llu\n", (unsigned long long)OCFS2_I(inode)->ip_blkno);
1715
1716         /* NOTE: That we don't increment any of the holder counts, nor
1717          * do we add anything to a journal handle. Since this is
1718          * supposed to be a new inode which the cluster doesn't know
1719          * about yet, there is no need to.  As far as the LVB handling
1720          * is concerned, this is basically like acquiring an EX lock
1721          * on a resource which has an invalid one -- we'll set it
1722          * valid when we release the EX. */
1723
1724         ret = ocfs2_create_new_lock(osb, &OCFS2_I(inode)->ip_rw_lockres, 1, 1);
1725         if (ret) {
1726                 mlog_errno(ret);
1727                 goto bail;
1728         }
1729
1730         /*
1731          * We don't want to use DLM_LKF_LOCAL on a meta data lock as they
1732          * don't use a generation in their lock names.
1733          */
1734         ret = ocfs2_create_new_lock(osb, &OCFS2_I(inode)->ip_inode_lockres, 1, 0);
1735         if (ret) {
1736                 mlog_errno(ret);
1737                 goto bail;
1738         }
1739
1740         ret = ocfs2_create_new_lock(osb, &OCFS2_I(inode)->ip_open_lockres, 0, 0);
1741         if (ret)
1742                 mlog_errno(ret);
1743
1744 bail:
1745         return ret;
1746 }
1747
1748 int ocfs2_rw_lock(struct inode *inode, int write)
1749 {
1750         int status, level;
1751         struct ocfs2_lock_res *lockres;
1752         struct ocfs2_super *osb = OCFS2_SB(inode->i_sb);
1753
1754         mlog(0, "inode %llu take %s RW lock\n",
1755              (unsigned long long)OCFS2_I(inode)->ip_blkno,
1756              write ? "EXMODE" : "PRMODE");
1757
1758         if (ocfs2_mount_local(osb))
1759                 return 0;
1760
1761         lockres = &OCFS2_I(inode)->ip_rw_lockres;
1762
1763         level = write ? DLM_LOCK_EX : DLM_LOCK_PR;
1764
1765         status = ocfs2_cluster_lock(osb, lockres, level, 0, 0);
1766         if (status < 0)
1767                 mlog_errno(status);
1768
1769         return status;
1770 }
1771
1772 int ocfs2_try_rw_lock(struct inode *inode, int write)
1773 {
1774         int status, level;
1775         struct ocfs2_lock_res *lockres;
1776         struct ocfs2_super *osb = OCFS2_SB(inode->i_sb);
1777
1778         mlog(0, "inode %llu try to take %s RW lock\n",
1779              (unsigned long long)OCFS2_I(inode)->ip_blkno,
1780              write ? "EXMODE" : "PRMODE");
1781
1782         if (ocfs2_mount_local(osb))
1783                 return 0;
1784
1785         lockres = &OCFS2_I(inode)->ip_rw_lockres;
1786
1787         level = write ? DLM_LOCK_EX : DLM_LOCK_PR;
1788
1789         status = ocfs2_cluster_lock(osb, lockres, level, DLM_LKF_NOQUEUE, 0);
1790         return status;
1791 }
1792
1793 void ocfs2_rw_unlock(struct inode *inode, int write)
1794 {
1795         int level = write ? DLM_LOCK_EX : DLM_LOCK_PR;
1796         struct ocfs2_lock_res *lockres = &OCFS2_I(inode)->ip_rw_lockres;
1797         struct ocfs2_super *osb = OCFS2_SB(inode->i_sb);
1798
1799         mlog(0, "inode %llu drop %s RW lock\n",
1800              (unsigned long long)OCFS2_I(inode)->ip_blkno,
1801              write ? "EXMODE" : "PRMODE");
1802
1803         if (!ocfs2_mount_local(osb))
1804                 ocfs2_cluster_unlock(osb, lockres, level);
1805 }
1806
1807 /*
1808  * ocfs2_open_lock always get PR mode lock.
1809  */
1810 int ocfs2_open_lock(struct inode *inode)
1811 {
1812         int status = 0;
1813         struct ocfs2_lock_res *lockres;
1814         struct ocfs2_super *osb = OCFS2_SB(inode->i_sb);
1815
1816         mlog(0, "inode %llu take PRMODE open lock\n",
1817              (unsigned long long)OCFS2_I(inode)->ip_blkno);
1818
1819         if (ocfs2_is_hard_readonly(osb) || ocfs2_mount_local(osb))
1820                 goto out;
1821
1822         lockres = &OCFS2_I(inode)->ip_open_lockres;
1823
1824         status = ocfs2_cluster_lock(osb, lockres, DLM_LOCK_PR, 0, 0);
1825         if (status < 0)
1826                 mlog_errno(status);
1827
1828 out:
1829         return status;
1830 }
1831
1832 int ocfs2_try_open_lock(struct inode *inode, int write)
1833 {
1834         int status = 0, level;
1835         struct ocfs2_lock_res *lockres;
1836         struct ocfs2_super *osb = OCFS2_SB(inode->i_sb);
1837
1838         mlog(0, "inode %llu try to take %s open lock\n",
1839              (unsigned long long)OCFS2_I(inode)->ip_blkno,
1840              write ? "EXMODE" : "PRMODE");
1841
1842         if (ocfs2_is_hard_readonly(osb)) {
1843                 if (write)
1844                         status = -EROFS;
1845                 goto out;
1846         }
1847
1848         if (ocfs2_mount_local(osb))
1849                 goto out;
1850
1851         lockres = &OCFS2_I(inode)->ip_open_lockres;
1852
1853         level = write ? DLM_LOCK_EX : DLM_LOCK_PR;
1854
1855         /*
1856          * The file system may already holding a PRMODE/EXMODE open lock.
1857          * Since we pass DLM_LKF_NOQUEUE, the request won't block waiting on
1858          * other nodes and the -EAGAIN will indicate to the caller that
1859          * this inode is still in use.
1860          */
1861         status = ocfs2_cluster_lock(osb, lockres, level, DLM_LKF_NOQUEUE, 0);
1862
1863 out:
1864         return status;
1865 }
1866
1867 /*
1868  * ocfs2_open_unlock unlock PR and EX mode open locks.
1869  */
1870 void ocfs2_open_unlock(struct inode *inode)
1871 {
1872         struct ocfs2_lock_res *lockres = &OCFS2_I(inode)->ip_open_lockres;
1873         struct ocfs2_super *osb = OCFS2_SB(inode->i_sb);
1874
1875         mlog(0, "inode %llu drop open lock\n",
1876              (unsigned long long)OCFS2_I(inode)->ip_blkno);
1877
1878         if (ocfs2_mount_local(osb))
1879                 goto out;
1880
1881         if(lockres->l_ro_holders)
1882                 ocfs2_cluster_unlock(osb, lockres, DLM_LOCK_PR);
1883         if(lockres->l_ex_holders)
1884                 ocfs2_cluster_unlock(osb, lockres, DLM_LOCK_EX);
1885
1886 out:
1887         return;
1888 }
1889
1890 static int ocfs2_flock_handle_signal(struct ocfs2_lock_res *lockres,
1891                                      int level)
1892 {
1893         int ret;
1894         struct ocfs2_super *osb = ocfs2_get_lockres_osb(lockres);
1895         unsigned long flags;
1896         struct ocfs2_mask_waiter mw;
1897
1898         ocfs2_init_mask_waiter(&mw);
1899
1900 retry_cancel:
1901         spin_lock_irqsave(&lockres->l_lock, flags);
1902         if (lockres->l_flags & OCFS2_LOCK_BUSY) {
1903                 ret = ocfs2_prepare_cancel_convert(osb, lockres);
1904                 if (ret) {
1905                         spin_unlock_irqrestore(&lockres->l_lock, flags);
1906                         ret = ocfs2_cancel_convert(osb, lockres);
1907                         if (ret < 0) {
1908                                 mlog_errno(ret);
1909                                 goto out;
1910                         }
1911                         goto retry_cancel;
1912                 }
1913                 lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_BUSY, 0);
1914                 spin_unlock_irqrestore(&lockres->l_lock, flags);
1915
1916                 ocfs2_wait_for_mask(&mw);
1917                 goto retry_cancel;
1918         }
1919
1920         ret = -ERESTARTSYS;
1921         /*
1922          * We may still have gotten the lock, in which case there's no
1923          * point to restarting the syscall.
1924          */
1925         if (lockres->l_level == level)
1926                 ret = 0;
1927
1928         mlog(0, "Cancel returning %d. flags: 0x%lx, level: %d, act: %d\n", ret,
1929              lockres->l_flags, lockres->l_level, lockres->l_action);
1930
1931         spin_unlock_irqrestore(&lockres->l_lock, flags);
1932
1933 out:
1934         return ret;
1935 }
1936
1937 /*
1938  * ocfs2_file_lock() and ocfs2_file_unlock() map to a single pair of
1939  * flock() calls. The locking approach this requires is sufficiently
1940  * different from all other cluster lock types that we implement a
1941  * separate path to the "low-level" dlm calls. In particular:
1942  *
1943  * - No optimization of lock levels is done - we take at exactly
1944  *   what's been requested.
1945  *
1946  * - No lock caching is employed. We immediately downconvert to
1947  *   no-lock at unlock time. This also means flock locks never go on
1948  *   the blocking list).
1949  *
1950  * - Since userspace can trivially deadlock itself with flock, we make
1951  *   sure to allow cancellation of a misbehaving applications flock()
1952  *   request.
1953  *
1954  * - Access to any flock lockres doesn't require concurrency, so we
1955  *   can simplify the code by requiring the caller to guarantee
1956  *   serialization of dlmglue flock calls.
1957  */
1958 int ocfs2_file_lock(struct file *file, int ex, int trylock)
1959 {
1960         int ret, level = ex ? DLM_LOCK_EX : DLM_LOCK_PR;
1961         unsigned int lkm_flags = trylock ? DLM_LKF_NOQUEUE : 0;
1962         unsigned long flags;
1963         struct ocfs2_file_private *fp = file->private_data;
1964         struct ocfs2_lock_res *lockres = &fp->fp_flock;
1965         struct ocfs2_super *osb = OCFS2_SB(file->f_mapping->host->i_sb);
1966         struct ocfs2_mask_waiter mw;
1967
1968         ocfs2_init_mask_waiter(&mw);
1969
1970         if ((lockres->l_flags & OCFS2_LOCK_BUSY) ||
1971             (lockres->l_level > DLM_LOCK_NL)) {
1972                 mlog(ML_ERROR,
1973                      "File lock \"%s\" has busy or locked state: flags: 0x%lx, "
1974                      "level: %u\n", lockres->l_name, lockres->l_flags,
1975                      lockres->l_level);
1976                 return -EINVAL;
1977         }
1978
1979         spin_lock_irqsave(&lockres->l_lock, flags);
1980         if (!(lockres->l_flags & OCFS2_LOCK_ATTACHED)) {
1981                 lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_BUSY, 0);
1982                 spin_unlock_irqrestore(&lockres->l_lock, flags);
1983
1984                 /*
1985                  * Get the lock at NLMODE to start - that way we
1986                  * can cancel the upconvert request if need be.
1987                  */
1988                 ret = ocfs2_lock_create(osb, lockres, DLM_LOCK_NL, 0);
1989                 if (ret < 0) {
1990                         mlog_errno(ret);
1991                         goto out;
1992                 }
1993
1994                 ret = ocfs2_wait_for_mask(&mw);
1995                 if (ret) {
1996                         mlog_errno(ret);
1997                         goto out;
1998                 }
1999                 spin_lock_irqsave(&lockres->l_lock, flags);
2000         }
2001
2002         lockres->l_action = OCFS2_AST_CONVERT;
2003         lkm_flags |= DLM_LKF_CONVERT;
2004         lockres->l_requested = level;
2005         lockres_or_flags(lockres, OCFS2_LOCK_BUSY);
2006
2007         lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_BUSY, 0);
2008         spin_unlock_irqrestore(&lockres->l_lock, flags);
2009
2010         ret = ocfs2_dlm_lock(osb->cconn, level, &lockres->l_lksb, lkm_flags,
2011                              lockres->l_name, OCFS2_LOCK_ID_MAX_LEN - 1);
2012         if (ret) {
2013                 if (!trylock || (ret != -EAGAIN)) {
2014                         ocfs2_log_dlm_error("ocfs2_dlm_lock", ret, lockres);
2015                         ret = -EINVAL;
2016                 }
2017
2018                 ocfs2_recover_from_dlm_error(lockres, 1);
2019                 lockres_remove_mask_waiter(lockres, &mw);
2020                 goto out;
2021         }
2022
2023         ret = ocfs2_wait_for_mask_interruptible(&mw, lockres);
2024         if (ret == -ERESTARTSYS) {
2025                 /*
2026                  * Userspace can cause deadlock itself with
2027                  * flock(). Current behavior locally is to allow the
2028                  * deadlock, but abort the system call if a signal is
2029                  * received. We follow this example, otherwise a
2030                  * poorly written program could sit in kernel until
2031                  * reboot.
2032                  *
2033                  * Handling this is a bit more complicated for Ocfs2
2034                  * though. We can't exit this function with an
2035                  * outstanding lock request, so a cancel convert is
2036                  * required. We intentionally overwrite 'ret' - if the
2037                  * cancel fails and the lock was granted, it's easier
2038                  * to just bubble success back up to the user.
2039                  */
2040                 ret = ocfs2_flock_handle_signal(lockres, level);
2041         } else if (!ret && (level > lockres->l_level)) {
2042                 /* Trylock failed asynchronously */
2043                 BUG_ON(!trylock);
2044                 ret = -EAGAIN;
2045         }
2046
2047 out:
2048
2049         mlog(0, "Lock: \"%s\" ex: %d, trylock: %d, returns: %d\n",
2050              lockres->l_name, ex, trylock, ret);
2051         return ret;
2052 }
2053
2054 void ocfs2_file_unlock(struct file *file)
2055 {
2056         int ret;
2057         unsigned int gen;
2058         unsigned long flags;
2059         struct ocfs2_file_private *fp = file->private_data;
2060         struct ocfs2_lock_res *lockres = &fp->fp_flock;
2061         struct ocfs2_super *osb = OCFS2_SB(file->f_mapping->host->i_sb);
2062         struct ocfs2_mask_waiter mw;
2063
2064         ocfs2_init_mask_waiter(&mw);
2065
2066         if (!(lockres->l_flags & OCFS2_LOCK_ATTACHED))
2067                 return;
2068
2069         if (lockres->l_level == DLM_LOCK_NL)
2070                 return;
2071
2072         mlog(0, "Unlock: \"%s\" flags: 0x%lx, level: %d, act: %d\n",
2073              lockres->l_name, lockres->l_flags, lockres->l_level,
2074              lockres->l_action);
2075
2076         spin_lock_irqsave(&lockres->l_lock, flags);
2077         /*
2078          * Fake a blocking ast for the downconvert code.
2079          */
2080         lockres_or_flags(lockres, OCFS2_LOCK_BLOCKED);
2081         lockres->l_blocking = DLM_LOCK_EX;
2082
2083         gen = ocfs2_prepare_downconvert(lockres, DLM_LOCK_NL);
2084         lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_BUSY, 0);
2085         spin_unlock_irqrestore(&lockres->l_lock, flags);
2086
2087         ret = ocfs2_downconvert_lock(osb, lockres, DLM_LOCK_NL, 0, gen);
2088         if (ret) {
2089                 mlog_errno(ret);
2090                 return;
2091         }
2092
2093         ret = ocfs2_wait_for_mask(&mw);
2094         if (ret)
2095                 mlog_errno(ret);
2096 }
2097
2098 static void ocfs2_downconvert_on_unlock(struct ocfs2_super *osb,
2099                                         struct ocfs2_lock_res *lockres)
2100 {
2101         int kick = 0;
2102
2103         /* If we know that another node is waiting on our lock, kick
2104          * the downconvert thread * pre-emptively when we reach a release
2105          * condition. */
2106         if (lockres->l_flags & OCFS2_LOCK_BLOCKED) {
2107                 switch(lockres->l_blocking) {
2108                 case DLM_LOCK_EX:
2109                         if (!lockres->l_ex_holders && !lockres->l_ro_holders)
2110                                 kick = 1;
2111                         break;
2112                 case DLM_LOCK_PR:
2113                         if (!lockres->l_ex_holders)
2114                                 kick = 1;
2115                         break;
2116                 default:
2117                         BUG();
2118                 }
2119         }
2120
2121         if (kick)
2122                 ocfs2_wake_downconvert_thread(osb);
2123 }
2124
2125 #define OCFS2_SEC_BITS   34
2126 #define OCFS2_SEC_SHIFT  (64 - 34)
2127 #define OCFS2_NSEC_MASK  ((1ULL << OCFS2_SEC_SHIFT) - 1)
2128
2129 /* LVB only has room for 64 bits of time here so we pack it for
2130  * now. */
2131 static u64 ocfs2_pack_timespec(struct timespec64 *spec)
2132 {
2133         u64 res;
2134         u64 sec = clamp_t(time64_t, spec->tv_sec, 0, 0x3ffffffffull);
2135         u32 nsec = spec->tv_nsec;
2136
2137         res = (sec << OCFS2_SEC_SHIFT) | (nsec & OCFS2_NSEC_MASK);
2138
2139         return res;
2140 }
2141
2142 /* Call this with the lockres locked. I am reasonably sure we don't
2143  * need ip_lock in this function as anyone who would be changing those
2144  * values is supposed to be blocked in ocfs2_inode_lock right now. */
2145 static void __ocfs2_stuff_meta_lvb(struct inode *inode)
2146 {
2147         struct ocfs2_inode_info *oi = OCFS2_I(inode);
2148         struct ocfs2_lock_res *lockres = &oi->ip_inode_lockres;
2149         struct ocfs2_meta_lvb *lvb;
2150
2151         lvb = ocfs2_dlm_lvb(&lockres->l_lksb);
2152
2153         /*
2154          * Invalidate the LVB of a deleted inode - this way other
2155          * nodes are forced to go to disk and discover the new inode
2156          * status.
2157          */
2158         if (oi->ip_flags & OCFS2_INODE_DELETED) {
2159                 lvb->lvb_version = 0;
2160                 goto out;
2161         }
2162
2163         lvb->lvb_version   = OCFS2_LVB_VERSION;
2164         lvb->lvb_isize     = cpu_to_be64(i_size_read(inode));
2165         lvb->lvb_iclusters = cpu_to_be32(oi->ip_clusters);
2166         lvb->lvb_iuid      = cpu_to_be32(i_uid_read(inode));
2167         lvb->lvb_igid      = cpu_to_be32(i_gid_read(inode));
2168         lvb->lvb_imode     = cpu_to_be16(inode->i_mode);
2169         lvb->lvb_inlink    = cpu_to_be16(inode->i_nlink);
2170         lvb->lvb_iatime_packed  =
2171                 cpu_to_be64(ocfs2_pack_timespec(&inode->i_atime));
2172         lvb->lvb_ictime_packed =
2173                 cpu_to_be64(ocfs2_pack_timespec(&inode->i_ctime));
2174         lvb->lvb_imtime_packed =
2175                 cpu_to_be64(ocfs2_pack_timespec(&inode->i_mtime));
2176         lvb->lvb_iattr    = cpu_to_be32(oi->ip_attr);
2177         lvb->lvb_idynfeatures = cpu_to_be16(oi->ip_dyn_features);
2178         lvb->lvb_igeneration = cpu_to_be32(inode->i_generation);
2179
2180 out:
2181         mlog_meta_lvb(0, lockres);
2182 }
2183
2184 static void ocfs2_unpack_timespec(struct timespec64 *spec,
2185                                   u64 packed_time)
2186 {
2187         spec->tv_sec = packed_time >> OCFS2_SEC_SHIFT;
2188         spec->tv_nsec = packed_time & OCFS2_NSEC_MASK;
2189 }
2190
2191 static void ocfs2_refresh_inode_from_lvb(struct inode *inode)
2192 {
2193         struct ocfs2_inode_info *oi = OCFS2_I(inode);
2194         struct ocfs2_lock_res *lockres = &oi->ip_inode_lockres;
2195         struct ocfs2_meta_lvb *lvb;
2196
2197         mlog_meta_lvb(0, lockres);
2198
2199         lvb = ocfs2_dlm_lvb(&lockres->l_lksb);
2200
2201         /* We're safe here without the lockres lock... */
2202         spin_lock(&oi->ip_lock);
2203         oi->ip_clusters = be32_to_cpu(lvb->lvb_iclusters);
2204         i_size_write(inode, be64_to_cpu(lvb->lvb_isize));
2205
2206         oi->ip_attr = be32_to_cpu(lvb->lvb_iattr);
2207         oi->ip_dyn_features = be16_to_cpu(lvb->lvb_idynfeatures);
2208         ocfs2_set_inode_flags(inode);
2209
2210         /* fast-symlinks are a special case */
2211         if (S_ISLNK(inode->i_mode) && !oi->ip_clusters)
2212                 inode->i_blocks = 0;
2213         else
2214                 inode->i_blocks = ocfs2_inode_sector_count(inode);
2215
2216         i_uid_write(inode, be32_to_cpu(lvb->lvb_iuid));
2217         i_gid_write(inode, be32_to_cpu(lvb->lvb_igid));
2218         inode->i_mode    = be16_to_cpu(lvb->lvb_imode);
2219         set_nlink(inode, be16_to_cpu(lvb->lvb_inlink));
2220         ocfs2_unpack_timespec(&inode->i_atime,
2221                               be64_to_cpu(lvb->lvb_iatime_packed));
2222         ocfs2_unpack_timespec(&inode->i_mtime,
2223                               be64_to_cpu(lvb->lvb_imtime_packed));
2224         ocfs2_unpack_timespec(&inode->i_ctime,
2225                               be64_to_cpu(lvb->lvb_ictime_packed));
2226         spin_unlock(&oi->ip_lock);
2227 }
2228
2229 static inline int ocfs2_meta_lvb_is_trustable(struct inode *inode,
2230                                               struct ocfs2_lock_res *lockres)
2231 {
2232         struct ocfs2_meta_lvb *lvb = ocfs2_dlm_lvb(&lockres->l_lksb);
2233
2234         if (ocfs2_dlm_lvb_valid(&lockres->l_lksb)
2235             && lvb->lvb_version == OCFS2_LVB_VERSION
2236             && be32_to_cpu(lvb->lvb_igeneration) == inode->i_generation)
2237                 return 1;
2238         return 0;
2239 }
2240
2241 /* Determine whether a lock resource needs to be refreshed, and
2242  * arbitrate who gets to refresh it.
2243  *
2244  *   0 means no refresh needed.
2245  *
2246  *   > 0 means you need to refresh this and you MUST call
2247  *   ocfs2_complete_lock_res_refresh afterwards. */
2248 static int ocfs2_should_refresh_lock_res(struct ocfs2_lock_res *lockres)
2249 {
2250         unsigned long flags;
2251         int status = 0;
2252
2253 refresh_check:
2254         spin_lock_irqsave(&lockres->l_lock, flags);
2255         if (!(lockres->l_flags & OCFS2_LOCK_NEEDS_REFRESH)) {
2256                 spin_unlock_irqrestore(&lockres->l_lock, flags);
2257                 goto bail;
2258         }
2259
2260         if (lockres->l_flags & OCFS2_LOCK_REFRESHING) {
2261                 spin_unlock_irqrestore(&lockres->l_lock, flags);
2262
2263                 ocfs2_wait_on_refreshing_lock(lockres);
2264                 goto refresh_check;
2265         }
2266
2267         /* Ok, I'll be the one to refresh this lock. */
2268         lockres_or_flags(lockres, OCFS2_LOCK_REFRESHING);
2269         spin_unlock_irqrestore(&lockres->l_lock, flags);
2270
2271         status = 1;
2272 bail:
2273         mlog(0, "status %d\n", status);
2274         return status;
2275 }
2276
2277 /* If status is non zero, I'll mark it as not being in refresh
2278  * anymroe, but i won't clear the needs refresh flag. */
2279 static inline void ocfs2_complete_lock_res_refresh(struct ocfs2_lock_res *lockres,
2280                                                    int status)
2281 {
2282         unsigned long flags;
2283
2284         spin_lock_irqsave(&lockres->l_lock, flags);
2285         lockres_clear_flags(lockres, OCFS2_LOCK_REFRESHING);
2286         if (!status)
2287                 lockres_clear_flags(lockres, OCFS2_LOCK_NEEDS_REFRESH);
2288         spin_unlock_irqrestore(&lockres->l_lock, flags);
2289
2290         wake_up(&lockres->l_event);
2291 }
2292
2293 /* may or may not return a bh if it went to disk. */
2294 static int ocfs2_inode_lock_update(struct inode *inode,
2295                                   struct buffer_head **bh)
2296 {
2297         int status = 0;
2298         struct ocfs2_inode_info *oi = OCFS2_I(inode);
2299         struct ocfs2_lock_res *lockres = &oi->ip_inode_lockres;
2300         struct ocfs2_dinode *fe;
2301         struct ocfs2_super *osb = OCFS2_SB(inode->i_sb);
2302
2303         if (ocfs2_mount_local(osb))
2304                 goto bail;
2305
2306         spin_lock(&oi->ip_lock);
2307         if (oi->ip_flags & OCFS2_INODE_DELETED) {
2308                 mlog(0, "Orphaned inode %llu was deleted while we "
2309                      "were waiting on a lock. ip_flags = 0x%x\n",
2310                      (unsigned long long)oi->ip_blkno, oi->ip_flags);
2311                 spin_unlock(&oi->ip_lock);
2312                 status = -ENOENT;
2313                 goto bail;
2314         }
2315         spin_unlock(&oi->ip_lock);
2316
2317         if (!ocfs2_should_refresh_lock_res(lockres))
2318                 goto bail;
2319
2320         /* This will discard any caching information we might have had
2321          * for the inode metadata. */
2322         ocfs2_metadata_cache_purge(INODE_CACHE(inode));
2323
2324         ocfs2_extent_map_trunc(inode, 0);
2325
2326         if (ocfs2_meta_lvb_is_trustable(inode, lockres)) {
2327                 mlog(0, "Trusting LVB on inode %llu\n",
2328                      (unsigned long long)oi->ip_blkno);
2329                 ocfs2_refresh_inode_from_lvb(inode);
2330         } else {
2331                 /* Boo, we have to go to disk. */
2332                 /* read bh, cast, ocfs2_refresh_inode */
2333                 status = ocfs2_read_inode_block(inode, bh);
2334                 if (status < 0) {
2335                         mlog_errno(status);
2336                         goto bail_refresh;
2337                 }
2338                 fe = (struct ocfs2_dinode *) (*bh)->b_data;
2339
2340                 /* This is a good chance to make sure we're not
2341                  * locking an invalid object.  ocfs2_read_inode_block()
2342                  * already checked that the inode block is sane.
2343                  *
2344                  * We bug on a stale inode here because we checked
2345                  * above whether it was wiped from disk. The wiping
2346                  * node provides a guarantee that we receive that
2347                  * message and can mark the inode before dropping any
2348                  * locks associated with it. */
2349                 mlog_bug_on_msg(inode->i_generation !=
2350                                 le32_to_cpu(fe->i_generation),
2351                                 "Invalid dinode %llu disk generation: %u "
2352                                 "inode->i_generation: %u\n",
2353                                 (unsigned long long)oi->ip_blkno,
2354                                 le32_to_cpu(fe->i_generation),
2355                                 inode->i_generation);
2356                 mlog_bug_on_msg(le64_to_cpu(fe->i_dtime) ||
2357                                 !(fe->i_flags & cpu_to_le32(OCFS2_VALID_FL)),
2358                                 "Stale dinode %llu dtime: %llu flags: 0x%x\n",
2359                                 (unsigned long long)oi->ip_blkno,
2360                                 (unsigned long long)le64_to_cpu(fe->i_dtime),
2361                                 le32_to_cpu(fe->i_flags));
2362
2363                 ocfs2_refresh_inode(inode, fe);
2364                 ocfs2_track_lock_refresh(lockres);
2365         }
2366
2367         status = 0;
2368 bail_refresh:
2369         ocfs2_complete_lock_res_refresh(lockres, status);
2370 bail:
2371         return status;
2372 }
2373
2374 static int ocfs2_assign_bh(struct inode *inode,
2375                            struct buffer_head **ret_bh,
2376                            struct buffer_head *passed_bh)
2377 {
2378         int status;
2379
2380         if (passed_bh) {
2381                 /* Ok, the update went to disk for us, use the
2382                  * returned bh. */
2383                 *ret_bh = passed_bh;
2384                 get_bh(*ret_bh);
2385
2386                 return 0;
2387         }
2388
2389         status = ocfs2_read_inode_block(inode, ret_bh);
2390         if (status < 0)
2391                 mlog_errno(status);
2392
2393         return status;
2394 }
2395
2396 /*
2397  * returns < 0 error if the callback will never be called, otherwise
2398  * the result of the lock will be communicated via the callback.
2399  */
2400 int ocfs2_inode_lock_full_nested(struct inode *inode,
2401                                  struct buffer_head **ret_bh,
2402                                  int ex,
2403                                  int arg_flags,
2404                                  int subclass)
2405 {
2406         int status, level, acquired;
2407         u32 dlm_flags;
2408         struct ocfs2_lock_res *lockres = NULL;
2409         struct ocfs2_super *osb = OCFS2_SB(inode->i_sb);
2410         struct buffer_head *local_bh = NULL;
2411
2412         mlog(0, "inode %llu, take %s META lock\n",
2413              (unsigned long long)OCFS2_I(inode)->ip_blkno,
2414              ex ? "EXMODE" : "PRMODE");
2415
2416         status = 0;
2417         acquired = 0;
2418         /* We'll allow faking a readonly metadata lock for
2419          * rodevices. */
2420         if (ocfs2_is_hard_readonly(osb)) {
2421                 if (ex)
2422                         status = -EROFS;
2423                 goto getbh;
2424         }
2425
2426         if ((arg_flags & OCFS2_META_LOCK_GETBH) ||
2427             ocfs2_mount_local(osb))
2428                 goto update;
2429
2430         if (!(arg_flags & OCFS2_META_LOCK_RECOVERY))
2431                 ocfs2_wait_for_recovery(osb);
2432
2433         lockres = &OCFS2_I(inode)->ip_inode_lockres;
2434         level = ex ? DLM_LOCK_EX : DLM_LOCK_PR;
2435         dlm_flags = 0;
2436         if (arg_flags & OCFS2_META_LOCK_NOQUEUE)
2437                 dlm_flags |= DLM_LKF_NOQUEUE;
2438
2439         status = __ocfs2_cluster_lock(osb, lockres, level, dlm_flags,
2440                                       arg_flags, subclass, _RET_IP_);
2441         if (status < 0) {
2442                 if (status != -EAGAIN)
2443                         mlog_errno(status);
2444                 goto bail;
2445         }
2446
2447         /* Notify the error cleanup path to drop the cluster lock. */
2448         acquired = 1;
2449
2450         /* We wait twice because a node may have died while we were in
2451          * the lower dlm layers. The second time though, we've
2452          * committed to owning this lock so we don't allow signals to
2453          * abort the operation. */
2454         if (!(arg_flags & OCFS2_META_LOCK_RECOVERY))
2455                 ocfs2_wait_for_recovery(osb);
2456
2457 update:
2458         /*
2459          * We only see this flag if we're being called from
2460          * ocfs2_read_locked_inode(). It means we're locking an inode
2461          * which hasn't been populated yet, so clear the refresh flag
2462          * and let the caller handle it.
2463          */
2464         if (inode->i_state & I_NEW) {
2465                 status = 0;
2466                 if (lockres)
2467                         ocfs2_complete_lock_res_refresh(lockres, 0);
2468                 goto bail;
2469         }
2470
2471         /* This is fun. The caller may want a bh back, or it may
2472          * not. ocfs2_inode_lock_update definitely wants one in, but
2473          * may or may not read one, depending on what's in the
2474          * LVB. The result of all of this is that we've *only* gone to
2475          * disk if we have to, so the complexity is worthwhile. */
2476         status = ocfs2_inode_lock_update(inode, &local_bh);
2477         if (status < 0) {
2478                 if (status != -ENOENT)
2479                         mlog_errno(status);
2480                 goto bail;
2481         }
2482 getbh:
2483         if (ret_bh) {
2484                 status = ocfs2_assign_bh(inode, ret_bh, local_bh);
2485                 if (status < 0) {
2486                         mlog_errno(status);
2487                         goto bail;
2488                 }
2489         }
2490
2491 bail:
2492         if (status < 0) {
2493                 if (ret_bh && (*ret_bh)) {
2494                         brelse(*ret_bh);
2495                         *ret_bh = NULL;
2496                 }
2497                 if (acquired)
2498                         ocfs2_inode_unlock(inode, ex);
2499         }
2500
2501         if (local_bh)
2502                 brelse(local_bh);
2503
2504         return status;
2505 }
2506
2507 /*
2508  * This is working around a lock inversion between tasks acquiring DLM
2509  * locks while holding a page lock and the downconvert thread which
2510  * blocks dlm lock acquiry while acquiring page locks.
2511  *
2512  * ** These _with_page variantes are only intended to be called from aop
2513  * methods that hold page locks and return a very specific *positive* error
2514  * code that aop methods pass up to the VFS -- test for errors with != 0. **
2515  *
2516  * The DLM is called such that it returns -EAGAIN if it would have
2517  * blocked waiting for the downconvert thread.  In that case we unlock
2518  * our page so the downconvert thread can make progress.  Once we've
2519  * done this we have to return AOP_TRUNCATED_PAGE so the aop method
2520  * that called us can bubble that back up into the VFS who will then
2521  * immediately retry the aop call.
2522  */
2523 int ocfs2_inode_lock_with_page(struct inode *inode,
2524                               struct buffer_head **ret_bh,
2525                               int ex,
2526                               struct page *page)
2527 {
2528         int ret;
2529
2530         ret = ocfs2_inode_lock_full(inode, ret_bh, ex, OCFS2_LOCK_NONBLOCK);
2531         if (ret == -EAGAIN) {
2532                 unlock_page(page);
2533                 /*
2534                  * If we can't get inode lock immediately, we should not return
2535                  * directly here, since this will lead to a softlockup problem.
2536                  * The method is to get a blocking lock and immediately unlock
2537                  * before returning, this can avoid CPU resource waste due to
2538                  * lots of retries, and benefits fairness in getting lock.
2539                  */
2540                 if (ocfs2_inode_lock(inode, ret_bh, ex) == 0)
2541                         ocfs2_inode_unlock(inode, ex);
2542                 ret = AOP_TRUNCATED_PAGE;
2543         }
2544
2545         return ret;
2546 }
2547
2548 int ocfs2_inode_lock_atime(struct inode *inode,
2549                           struct vfsmount *vfsmnt,
2550                           int *level, int wait)
2551 {
2552         int ret;
2553
2554         if (wait)
2555                 ret = ocfs2_inode_lock(inode, NULL, 0);
2556         else
2557                 ret = ocfs2_try_inode_lock(inode, NULL, 0);
2558
2559         if (ret < 0) {
2560                 if (ret != -EAGAIN)
2561                         mlog_errno(ret);
2562                 return ret;
2563         }
2564
2565         /*
2566          * If we should update atime, we will get EX lock,
2567          * otherwise we just get PR lock.
2568          */
2569         if (ocfs2_should_update_atime(inode, vfsmnt)) {
2570                 struct buffer_head *bh = NULL;
2571
2572                 ocfs2_inode_unlock(inode, 0);
2573                 if (wait)
2574                         ret = ocfs2_inode_lock(inode, &bh, 1);
2575                 else
2576                         ret = ocfs2_try_inode_lock(inode, &bh, 1);
2577
2578                 if (ret < 0) {
2579                         if (ret != -EAGAIN)
2580                                 mlog_errno(ret);
2581                         return ret;
2582                 }
2583                 *level = 1;
2584                 if (ocfs2_should_update_atime(inode, vfsmnt))
2585                         ocfs2_update_inode_atime(inode, bh);
2586                 if (bh)
2587                         brelse(bh);
2588         } else
2589                 *level = 0;
2590
2591         return ret;
2592 }
2593
2594 void ocfs2_inode_unlock(struct inode *inode,
2595                        int ex)
2596 {
2597         int level = ex ? DLM_LOCK_EX : DLM_LOCK_PR;
2598         struct ocfs2_lock_res *lockres = &OCFS2_I(inode)->ip_inode_lockres;
2599         struct ocfs2_super *osb = OCFS2_SB(inode->i_sb);
2600
2601         mlog(0, "inode %llu drop %s META lock\n",
2602              (unsigned long long)OCFS2_I(inode)->ip_blkno,
2603              ex ? "EXMODE" : "PRMODE");
2604
2605         if (!ocfs2_is_hard_readonly(osb) &&
2606             !ocfs2_mount_local(osb))
2607                 ocfs2_cluster_unlock(osb, lockres, level);
2608 }
2609
2610 /*
2611  * This _tracker variantes are introduced to deal with the recursive cluster
2612  * locking issue. The idea is to keep track of a lock holder on the stack of
2613  * the current process. If there's a lock holder on the stack, we know the
2614  * task context is already protected by cluster locking. Currently, they're
2615  * used in some VFS entry routines.
2616  *
2617  * return < 0 on error, return == 0 if there's no lock holder on the stack
2618  * before this call, return == 1 if this call would be a recursive locking.
2619  * return == -1 if this lock attempt will cause an upgrade which is forbidden.
2620  *
2621  * When taking lock levels into account,we face some different situations.
2622  *
2623  * 1. no lock is held
2624  *    In this case, just lock the inode as requested and return 0
2625  *
2626  * 2. We are holding a lock
2627  *    For this situation, things diverges into several cases
2628  *
2629  *    wanted     holding             what to do
2630  *    ex                ex          see 2.1 below
2631  *    ex                pr          see 2.2 below
2632  *    pr                ex          see 2.1 below
2633  *    pr                pr          see 2.1 below
2634  *
2635  *    2.1 lock level that is been held is compatible
2636  *    with the wanted level, so no lock action will be tacken.
2637  *
2638  *    2.2 Otherwise, an upgrade is needed, but it is forbidden.
2639  *
2640  * Reason why upgrade within a process is forbidden is that
2641  * lock upgrade may cause dead lock. The following illustrates
2642  * how it happens.
2643  *
2644  *         thread on node1                             thread on node2
2645  * ocfs2_inode_lock_tracker(ex=0)
2646  *
2647  *                                <======   ocfs2_inode_lock_tracker(ex=1)
2648  *
2649  * ocfs2_inode_lock_tracker(ex=1)
2650  */
2651 int ocfs2_inode_lock_tracker(struct inode *inode,
2652                              struct buffer_head **ret_bh,
2653                              int ex,
2654                              struct ocfs2_lock_holder *oh)
2655 {
2656         int status = 0;
2657         struct ocfs2_lock_res *lockres;
2658         struct ocfs2_lock_holder *tmp_oh;
2659         struct pid *pid = task_pid(current);
2660
2661
2662         lockres = &OCFS2_I(inode)->ip_inode_lockres;
2663         tmp_oh = ocfs2_pid_holder(lockres, pid);
2664
2665         if (!tmp_oh) {
2666                 /*
2667                  * This corresponds to the case 1.
2668                  * We haven't got any lock before.
2669                  */
2670                 status = ocfs2_inode_lock_full(inode, ret_bh, ex, 0);
2671                 if (status < 0) {
2672                         if (status != -ENOENT)
2673                                 mlog_errno(status);
2674                         return status;
2675                 }
2676
2677                 oh->oh_ex = ex;
2678                 ocfs2_add_holder(lockres, oh);
2679                 return 0;
2680         }
2681
2682         if (unlikely(ex && !tmp_oh->oh_ex)) {
2683                 /*
2684                  * case 2.2 upgrade may cause dead lock, forbid it.
2685                  */
2686                 mlog(ML_ERROR, "Recursive locking is not permitted to "
2687                      "upgrade to EX level from PR level.\n");
2688                 dump_stack();
2689                 return -EINVAL;
2690         }
2691
2692         /*
2693          *  case 2.1 OCFS2_META_LOCK_GETBH flag make ocfs2_inode_lock_full.
2694          *  ignore the lock level and just update it.
2695          */
2696         if (ret_bh) {
2697                 status = ocfs2_inode_lock_full(inode, ret_bh, ex,
2698                                                OCFS2_META_LOCK_GETBH);
2699                 if (status < 0) {
2700                         if (status != -ENOENT)
2701                                 mlog_errno(status);
2702                         return status;
2703                 }
2704         }
2705         return tmp_oh ? 1 : 0;
2706 }
2707
2708 void ocfs2_inode_unlock_tracker(struct inode *inode,
2709                                 int ex,
2710                                 struct ocfs2_lock_holder *oh,
2711                                 int had_lock)
2712 {
2713         struct ocfs2_lock_res *lockres;
2714
2715         lockres = &OCFS2_I(inode)->ip_inode_lockres;
2716         /* had_lock means that the currect process already takes the cluster
2717          * lock previously.
2718          * If had_lock is 1, we have nothing to do here.
2719          * If had_lock is 0, we will release the lock.
2720          */
2721         if (!had_lock) {
2722                 ocfs2_inode_unlock(inode, oh->oh_ex);
2723                 ocfs2_remove_holder(lockres, oh);
2724         }
2725 }
2726
2727 int ocfs2_orphan_scan_lock(struct ocfs2_super *osb, u32 *seqno)
2728 {
2729         struct ocfs2_lock_res *lockres;
2730         struct ocfs2_orphan_scan_lvb *lvb;
2731         int status = 0;
2732
2733         if (ocfs2_is_hard_readonly(osb))
2734                 return -EROFS;
2735
2736         if (ocfs2_mount_local(osb))
2737                 return 0;
2738
2739         lockres = &osb->osb_orphan_scan.os_lockres;
2740         status = ocfs2_cluster_lock(osb, lockres, DLM_LOCK_EX, 0, 0);
2741         if (status < 0)
2742                 return status;
2743
2744         lvb = ocfs2_dlm_lvb(&lockres->l_lksb);
2745         if (ocfs2_dlm_lvb_valid(&lockres->l_lksb) &&
2746             lvb->lvb_version == OCFS2_ORPHAN_LVB_VERSION)
2747                 *seqno = be32_to_cpu(lvb->lvb_os_seqno);
2748         else
2749                 *seqno = osb->osb_orphan_scan.os_seqno + 1;
2750
2751         return status;
2752 }
2753
2754 void ocfs2_orphan_scan_unlock(struct ocfs2_super *osb, u32 seqno)
2755 {
2756         struct ocfs2_lock_res *lockres;
2757         struct ocfs2_orphan_scan_lvb *lvb;
2758
2759         if (!ocfs2_is_hard_readonly(osb) && !ocfs2_mount_local(osb)) {
2760                 lockres = &osb->osb_orphan_scan.os_lockres;
2761                 lvb = ocfs2_dlm_lvb(&lockres->l_lksb);
2762                 lvb->lvb_version = OCFS2_ORPHAN_LVB_VERSION;
2763                 lvb->lvb_os_seqno = cpu_to_be32(seqno);
2764                 ocfs2_cluster_unlock(osb, lockres, DLM_LOCK_EX);
2765         }
2766 }
2767
2768 int ocfs2_super_lock(struct ocfs2_super *osb,
2769                      int ex)
2770 {
2771         int status = 0;
2772         int level = ex ? DLM_LOCK_EX : DLM_LOCK_PR;
2773         struct ocfs2_lock_res *lockres = &osb->osb_super_lockres;
2774
2775         if (ocfs2_is_hard_readonly(osb))
2776                 return -EROFS;
2777
2778         if (ocfs2_mount_local(osb))
2779                 goto bail;
2780
2781         status = ocfs2_cluster_lock(osb, lockres, level, 0, 0);
2782         if (status < 0) {
2783                 mlog_errno(status);
2784                 goto bail;
2785         }
2786
2787         /* The super block lock path is really in the best position to
2788          * know when resources covered by the lock need to be
2789          * refreshed, so we do it here. Of course, making sense of
2790          * everything is up to the caller :) */
2791         status = ocfs2_should_refresh_lock_res(lockres);
2792         if (status) {
2793                 status = ocfs2_refresh_slot_info(osb);
2794
2795                 ocfs2_complete_lock_res_refresh(lockres, status);
2796
2797                 if (status < 0) {
2798                         ocfs2_cluster_unlock(osb, lockres, level);
2799                         mlog_errno(status);
2800                 }
2801                 ocfs2_track_lock_refresh(lockres);
2802         }
2803 bail:
2804         return status;
2805 }
2806
2807 void ocfs2_super_unlock(struct ocfs2_super *osb,
2808                         int ex)
2809 {
2810         int level = ex ? DLM_LOCK_EX : DLM_LOCK_PR;
2811         struct ocfs2_lock_res *lockres = &osb->osb_super_lockres;
2812
2813         if (!ocfs2_mount_local(osb))
2814                 ocfs2_cluster_unlock(osb, lockres, level);
2815 }
2816
2817 int ocfs2_rename_lock(struct ocfs2_super *osb)
2818 {
2819         int status;
2820         struct ocfs2_lock_res *lockres = &osb->osb_rename_lockres;
2821
2822         if (ocfs2_is_hard_readonly(osb))
2823                 return -EROFS;
2824
2825         if (ocfs2_mount_local(osb))
2826                 return 0;
2827
2828         status = ocfs2_cluster_lock(osb, lockres, DLM_LOCK_EX, 0, 0);
2829         if (status < 0)
2830                 mlog_errno(status);
2831
2832         return status;
2833 }
2834
2835 void ocfs2_rename_unlock(struct ocfs2_super *osb)
2836 {
2837         struct ocfs2_lock_res *lockres = &osb->osb_rename_lockres;
2838
2839         if (!ocfs2_mount_local(osb))
2840                 ocfs2_cluster_unlock(osb, lockres, DLM_LOCK_EX);
2841 }
2842
2843 int ocfs2_nfs_sync_lock(struct ocfs2_super *osb, int ex)
2844 {
2845         int status;
2846         struct ocfs2_lock_res *lockres = &osb->osb_nfs_sync_lockres;
2847
2848         if (ocfs2_is_hard_readonly(osb))
2849                 return -EROFS;
2850
2851         if (ocfs2_mount_local(osb))
2852                 return 0;
2853
2854         status = ocfs2_cluster_lock(osb, lockres, ex ? LKM_EXMODE : LKM_PRMODE,
2855                                     0, 0);
2856         if (status < 0)
2857                 mlog(ML_ERROR, "lock on nfs sync lock failed %d\n", status);
2858
2859         return status;
2860 }
2861
2862 void ocfs2_nfs_sync_unlock(struct ocfs2_super *osb, int ex)
2863 {
2864         struct ocfs2_lock_res *lockres = &osb->osb_nfs_sync_lockres;
2865
2866         if (!ocfs2_mount_local(osb))
2867                 ocfs2_cluster_unlock(osb, lockres,
2868                                      ex ? LKM_EXMODE : LKM_PRMODE);
2869 }
2870
2871 int ocfs2_trim_fs_lock(struct ocfs2_super *osb,
2872                        struct ocfs2_trim_fs_info *info, int trylock)
2873 {
2874         int status;
2875         struct ocfs2_trim_fs_lvb *lvb;
2876         struct ocfs2_lock_res *lockres = &osb->osb_trim_fs_lockres;
2877
2878         if (info)
2879                 info->tf_valid = 0;
2880
2881         if (ocfs2_is_hard_readonly(osb))
2882                 return -EROFS;
2883
2884         if (ocfs2_mount_local(osb))
2885                 return 0;
2886
2887         status = ocfs2_cluster_lock(osb, lockres, DLM_LOCK_EX,
2888                                     trylock ? DLM_LKF_NOQUEUE : 0, 0);
2889         if (status < 0) {
2890                 if (status != -EAGAIN)
2891                         mlog_errno(status);
2892                 return status;
2893         }
2894
2895         if (info) {
2896                 lvb = ocfs2_dlm_lvb(&lockres->l_lksb);
2897                 if (ocfs2_dlm_lvb_valid(&lockres->l_lksb) &&
2898                     lvb->lvb_version == OCFS2_TRIMFS_LVB_VERSION) {
2899                         info->tf_valid = 1;
2900                         info->tf_success = lvb->lvb_success;
2901                         info->tf_nodenum = be32_to_cpu(lvb->lvb_nodenum);
2902                         info->tf_start = be64_to_cpu(lvb->lvb_start);
2903                         info->tf_len = be64_to_cpu(lvb->lvb_len);
2904                         info->tf_minlen = be64_to_cpu(lvb->lvb_minlen);
2905                         info->tf_trimlen = be64_to_cpu(lvb->lvb_trimlen);
2906                 }
2907         }
2908
2909         return status;
2910 }
2911
2912 void ocfs2_trim_fs_unlock(struct ocfs2_super *osb,
2913                           struct ocfs2_trim_fs_info *info)
2914 {
2915         struct ocfs2_trim_fs_lvb *lvb;
2916         struct ocfs2_lock_res *lockres = &osb->osb_trim_fs_lockres;
2917
2918         if (ocfs2_mount_local(osb))
2919                 return;
2920
2921         if (info) {
2922                 lvb = ocfs2_dlm_lvb(&lockres->l_lksb);
2923                 lvb->lvb_version = OCFS2_TRIMFS_LVB_VERSION;
2924                 lvb->lvb_success = info->tf_success;
2925                 lvb->lvb_nodenum = cpu_to_be32(info->tf_nodenum);
2926                 lvb->lvb_start = cpu_to_be64(info->tf_start);
2927                 lvb->lvb_len = cpu_to_be64(info->tf_len);
2928                 lvb->lvb_minlen = cpu_to_be64(info->tf_minlen);
2929                 lvb->lvb_trimlen = cpu_to_be64(info->tf_trimlen);
2930         }
2931
2932         ocfs2_cluster_unlock(osb, lockres, DLM_LOCK_EX);
2933 }
2934
2935 int ocfs2_dentry_lock(struct dentry *dentry, int ex)
2936 {
2937         int ret;
2938         int level = ex ? DLM_LOCK_EX : DLM_LOCK_PR;
2939         struct ocfs2_dentry_lock *dl = dentry->d_fsdata;
2940         struct ocfs2_super *osb = OCFS2_SB(dentry->d_sb);
2941
2942         BUG_ON(!dl);
2943
2944         if (ocfs2_is_hard_readonly(osb)) {
2945                 if (ex)
2946                         return -EROFS;
2947                 return 0;
2948         }
2949
2950         if (ocfs2_mount_local(osb))
2951                 return 0;
2952
2953         ret = ocfs2_cluster_lock(osb, &dl->dl_lockres, level, 0, 0);
2954         if (ret < 0)
2955                 mlog_errno(ret);
2956
2957         return ret;
2958 }
2959
2960 void ocfs2_dentry_unlock(struct dentry *dentry, int ex)
2961 {
2962         int level = ex ? DLM_LOCK_EX : DLM_LOCK_PR;
2963         struct ocfs2_dentry_lock *dl = dentry->d_fsdata;
2964         struct ocfs2_super *osb = OCFS2_SB(dentry->d_sb);
2965
2966         if (!ocfs2_is_hard_readonly(osb) && !ocfs2_mount_local(osb))
2967                 ocfs2_cluster_unlock(osb, &dl->dl_lockres, level);
2968 }
2969
2970 /* Reference counting of the dlm debug structure. We want this because
2971  * open references on the debug inodes can live on after a mount, so
2972  * we can't rely on the ocfs2_super to always exist. */
2973 static void ocfs2_dlm_debug_free(struct kref *kref)
2974 {
2975         struct ocfs2_dlm_debug *dlm_debug;
2976
2977         dlm_debug = container_of(kref, struct ocfs2_dlm_debug, d_refcnt);
2978
2979         kfree(dlm_debug);
2980 }
2981
2982 void ocfs2_put_dlm_debug(struct ocfs2_dlm_debug *dlm_debug)
2983 {
2984         if (dlm_debug)
2985                 kref_put(&dlm_debug->d_refcnt, ocfs2_dlm_debug_free);
2986 }
2987
2988 static void ocfs2_get_dlm_debug(struct ocfs2_dlm_debug *debug)
2989 {
2990         kref_get(&debug->d_refcnt);
2991 }
2992
2993 struct ocfs2_dlm_debug *ocfs2_new_dlm_debug(void)
2994 {
2995         struct ocfs2_dlm_debug *dlm_debug;
2996
2997         dlm_debug = kmalloc(sizeof(struct ocfs2_dlm_debug), GFP_KERNEL);
2998         if (!dlm_debug) {
2999                 mlog_errno(-ENOMEM);
3000                 goto out;
3001         }
3002
3003         kref_init(&dlm_debug->d_refcnt);
3004         INIT_LIST_HEAD(&dlm_debug->d_lockres_tracking);
3005         dlm_debug->d_locking_state = NULL;
3006 out:
3007         return dlm_debug;
3008 }
3009
3010 /* Access to this is arbitrated for us via seq_file->sem. */
3011 struct ocfs2_dlm_seq_priv {
3012         struct ocfs2_dlm_debug *p_dlm_debug;
3013         struct ocfs2_lock_res p_iter_res;
3014         struct ocfs2_lock_res p_tmp_res;
3015 };
3016
3017 static struct ocfs2_lock_res *ocfs2_dlm_next_res(struct ocfs2_lock_res *start,
3018                                                  struct ocfs2_dlm_seq_priv *priv)
3019 {
3020         struct ocfs2_lock_res *iter, *ret = NULL;
3021         struct ocfs2_dlm_debug *dlm_debug = priv->p_dlm_debug;
3022
3023         assert_spin_locked(&ocfs2_dlm_tracking_lock);
3024
3025         list_for_each_entry(iter, &start->l_debug_list, l_debug_list) {
3026                 /* discover the head of the list */
3027                 if (&iter->l_debug_list == &dlm_debug->d_lockres_tracking) {
3028                         mlog(0, "End of list found, %p\n", ret);
3029                         break;
3030                 }
3031
3032                 /* We track our "dummy" iteration lockres' by a NULL
3033                  * l_ops field. */
3034                 if (iter->l_ops != NULL) {
3035                         ret = iter;
3036                         break;
3037                 }
3038         }
3039
3040         return ret;
3041 }
3042
3043 static void *ocfs2_dlm_seq_start(struct seq_file *m, loff_t *pos)
3044 {
3045         struct ocfs2_dlm_seq_priv *priv = m->private;
3046         struct ocfs2_lock_res *iter;
3047
3048         spin_lock(&ocfs2_dlm_tracking_lock);
3049         iter = ocfs2_dlm_next_res(&priv->p_iter_res, priv);
3050         if (iter) {
3051                 /* Since lockres' have the lifetime of their container
3052                  * (which can be inodes, ocfs2_supers, etc) we want to
3053                  * copy this out to a temporary lockres while still
3054                  * under the spinlock. Obviously after this we can't
3055                  * trust any pointers on the copy returned, but that's
3056                  * ok as the information we want isn't typically held
3057                  * in them. */
3058                 priv->p_tmp_res = *iter;
3059                 iter = &priv->p_tmp_res;
3060         }
3061         spin_unlock(&ocfs2_dlm_tracking_lock);
3062
3063         return iter;
3064 }
3065
3066 static void ocfs2_dlm_seq_stop(struct seq_file *m, void *v)
3067 {
3068 }
3069
3070 static void *ocfs2_dlm_seq_next(struct seq_file *m, void *v, loff_t *pos)
3071 {
3072         struct ocfs2_dlm_seq_priv *priv = m->private;
3073         struct ocfs2_lock_res *iter = v;
3074         struct ocfs2_lock_res *dummy = &priv->p_iter_res;
3075
3076         spin_lock(&ocfs2_dlm_tracking_lock);
3077         iter = ocfs2_dlm_next_res(iter, priv);
3078         list_del_init(&dummy->l_debug_list);
3079         if (iter) {
3080                 list_add(&dummy->l_debug_list, &iter->l_debug_list);
3081                 priv->p_tmp_res = *iter;
3082                 iter = &priv->p_tmp_res;
3083         }
3084         spin_unlock(&ocfs2_dlm_tracking_lock);
3085
3086         return iter;
3087 }
3088
3089 /*
3090  * Version is used by debugfs.ocfs2 to determine the format being used
3091  *
3092  * New in version 2
3093  *      - Lock stats printed
3094  * New in version 3
3095  *      - Max time in lock stats is in usecs (instead of nsecs)
3096  */
3097 #define OCFS2_DLM_DEBUG_STR_VERSION 3
3098 static int ocfs2_dlm_seq_show(struct seq_file *m, void *v)
3099 {
3100         int i;
3101         char *lvb;
3102         struct ocfs2_lock_res *lockres = v;
3103
3104         if (!lockres)
3105                 return -EINVAL;
3106
3107         seq_printf(m, "0x%x\t", OCFS2_DLM_DEBUG_STR_VERSION);
3108
3109         if (lockres->l_type == OCFS2_LOCK_TYPE_DENTRY)
3110                 seq_printf(m, "%.*s%08x\t", OCFS2_DENTRY_LOCK_INO_START - 1,
3111                            lockres->l_name,
3112                            (unsigned int)ocfs2_get_dentry_lock_ino(lockres));
3113         else
3114                 seq_printf(m, "%.*s\t", OCFS2_LOCK_ID_MAX_LEN, lockres->l_name);
3115
3116         seq_printf(m, "%d\t"
3117                    "0x%lx\t"
3118                    "0x%x\t"
3119                    "0x%x\t"
3120                    "%u\t"
3121                    "%u\t"
3122                    "%d\t"
3123                    "%d\t",
3124                    lockres->l_level,
3125                    lockres->l_flags,
3126                    lockres->l_action,
3127                    lockres->l_unlock_action,
3128                    lockres->l_ro_holders,
3129                    lockres->l_ex_holders,
3130                    lockres->l_requested,
3131                    lockres->l_blocking);
3132
3133         /* Dump the raw LVB */
3134         lvb = ocfs2_dlm_lvb(&lockres->l_lksb);
3135         for(i = 0; i < DLM_LVB_LEN; i++)
3136                 seq_printf(m, "0x%x\t", lvb[i]);
3137
3138 #ifdef CONFIG_OCFS2_FS_STATS
3139 # define lock_num_prmode(_l)            ((_l)->l_lock_prmode.ls_gets)
3140 # define lock_num_exmode(_l)            ((_l)->l_lock_exmode.ls_gets)
3141 # define lock_num_prmode_failed(_l)     ((_l)->l_lock_prmode.ls_fail)
3142 # define lock_num_exmode_failed(_l)     ((_l)->l_lock_exmode.ls_fail)
3143 # define lock_total_prmode(_l)          ((_l)->l_lock_prmode.ls_total)
3144 # define lock_total_exmode(_l)          ((_l)->l_lock_exmode.ls_total)
3145 # define lock_max_prmode(_l)            ((_l)->l_lock_prmode.ls_max)
3146 # define lock_max_exmode(_l)            ((_l)->l_lock_exmode.ls_max)
3147 # define lock_refresh(_l)               ((_l)->l_lock_refresh)
3148 #else
3149 # define lock_num_prmode(_l)            (0)
3150 # define lock_num_exmode(_l)            (0)
3151 # define lock_num_prmode_failed(_l)     (0)
3152 # define lock_num_exmode_failed(_l)     (0)
3153 # define lock_total_prmode(_l)          (0ULL)
3154 # define lock_total_exmode(_l)          (0ULL)
3155 # define lock_max_prmode(_l)            (0)
3156 # define lock_max_exmode(_l)            (0)
3157 # define lock_refresh(_l)               (0)
3158 #endif
3159         /* The following seq_print was added in version 2 of this output */
3160         seq_printf(m, "%u\t"
3161                    "%u\t"
3162                    "%u\t"
3163                    "%u\t"
3164                    "%llu\t"
3165                    "%llu\t"
3166                    "%u\t"
3167                    "%u\t"
3168                    "%u\t",
3169                    lock_num_prmode(lockres),
3170                    lock_num_exmode(lockres),
3171                    lock_num_prmode_failed(lockres),
3172                    lock_num_exmode_failed(lockres),
3173                    lock_total_prmode(lockres),
3174                    lock_total_exmode(lockres),
3175                    lock_max_prmode(lockres),
3176                    lock_max_exmode(lockres),
3177                    lock_refresh(lockres));
3178
3179         /* End the line */
3180         seq_printf(m, "\n");
3181         return 0;
3182 }
3183
3184 static const struct seq_operations ocfs2_dlm_seq_ops = {
3185         .start =        ocfs2_dlm_seq_start,
3186         .stop =         ocfs2_dlm_seq_stop,
3187         .next =         ocfs2_dlm_seq_next,
3188         .show =         ocfs2_dlm_seq_show,
3189 };
3190
3191 static int ocfs2_dlm_debug_release(struct inode *inode, struct file *file)
3192 {
3193         struct seq_file *seq = file->private_data;
3194         struct ocfs2_dlm_seq_priv *priv = seq->private;
3195         struct ocfs2_lock_res *res = &priv->p_iter_res;
3196
3197         ocfs2_remove_lockres_tracking(res);
3198         ocfs2_put_dlm_debug(priv->p_dlm_debug);
3199         return seq_release_private(inode, file);
3200 }
3201
3202 static int ocfs2_dlm_debug_open(struct inode *inode, struct file *file)
3203 {
3204         struct ocfs2_dlm_seq_priv *priv;
3205         struct ocfs2_super *osb;
3206
3207         priv = __seq_open_private(file, &ocfs2_dlm_seq_ops, sizeof(*priv));
3208         if (!priv) {
3209                 mlog_errno(-ENOMEM);
3210                 return -ENOMEM;
3211         }
3212
3213         osb = inode->i_private;
3214         ocfs2_get_dlm_debug(osb->osb_dlm_debug);
3215         priv->p_dlm_debug = osb->osb_dlm_debug;
3216         INIT_LIST_HEAD(&priv->p_iter_res.l_debug_list);
3217
3218         ocfs2_add_lockres_tracking(&priv->p_iter_res,
3219                                    priv->p_dlm_debug);
3220
3221         return 0;
3222 }
3223
3224 static const struct file_operations ocfs2_dlm_debug_fops = {
3225         .open =         ocfs2_dlm_debug_open,
3226         .release =      ocfs2_dlm_debug_release,
3227         .read =         seq_read,
3228         .llseek =       seq_lseek,
3229 };
3230
3231 static int ocfs2_dlm_init_debug(struct ocfs2_super *osb)
3232 {
3233         int ret = 0;
3234         struct ocfs2_dlm_debug *dlm_debug = osb->osb_dlm_debug;
3235
3236         dlm_debug->d_locking_state = debugfs_create_file("locking_state",
3237                                                          S_IFREG|S_IRUSR,
3238                                                          osb->osb_debug_root,
3239                                                          osb,
3240                                                          &ocfs2_dlm_debug_fops);
3241         if (!dlm_debug->d_locking_state) {
3242                 ret = -EINVAL;
3243                 mlog(ML_ERROR,
3244                      "Unable to create locking state debugfs file.\n");
3245                 goto out;
3246         }
3247
3248         ocfs2_get_dlm_debug(dlm_debug);
3249 out:
3250         return ret;
3251 }
3252
3253 static void ocfs2_dlm_shutdown_debug(struct ocfs2_super *osb)
3254 {
3255         struct ocfs2_dlm_debug *dlm_debug = osb->osb_dlm_debug;
3256
3257         if (dlm_debug) {
3258                 debugfs_remove(dlm_debug->d_locking_state);
3259                 ocfs2_put_dlm_debug(dlm_debug);
3260         }
3261 }
3262
3263 int ocfs2_dlm_init(struct ocfs2_super *osb)
3264 {
3265         int status = 0;
3266         struct ocfs2_cluster_connection *conn = NULL;
3267
3268         if (ocfs2_mount_local(osb)) {
3269                 osb->node_num = 0;
3270                 goto local;
3271         }
3272
3273         status = ocfs2_dlm_init_debug(osb);
3274         if (status < 0) {
3275                 mlog_errno(status);
3276                 goto bail;
3277         }
3278
3279         /* launch downconvert thread */
3280         osb->dc_task = kthread_run(ocfs2_downconvert_thread, osb, "ocfs2dc-%s",
3281                         osb->uuid_str);
3282         if (IS_ERR(osb->dc_task)) {
3283                 status = PTR_ERR(osb->dc_task);
3284                 osb->dc_task = NULL;
3285                 mlog_errno(status);
3286                 goto bail;
3287         }
3288
3289         /* for now, uuid == domain */
3290         status = ocfs2_cluster_connect(osb->osb_cluster_stack,
3291                                        osb->osb_cluster_name,
3292                                        strlen(osb->osb_cluster_name),
3293                                        osb->uuid_str,
3294                                        strlen(osb->uuid_str),
3295                                        &lproto, ocfs2_do_node_down, osb,
3296                                        &conn);
3297         if (status) {
3298                 mlog_errno(status);
3299                 goto bail;
3300         }
3301
3302         status = ocfs2_cluster_this_node(conn, &osb->node_num);
3303         if (status < 0) {
3304                 mlog_errno(status);
3305                 mlog(ML_ERROR,
3306                      "could not find this host's node number\n");
3307                 ocfs2_cluster_disconnect(conn, 0);
3308                 goto bail;
3309         }
3310
3311 local:
3312         ocfs2_super_lock_res_init(&osb->osb_super_lockres, osb);
3313         ocfs2_rename_lock_res_init(&osb->osb_rename_lockres, osb);
3314         ocfs2_nfs_sync_lock_res_init(&osb->osb_nfs_sync_lockres, osb);
3315         ocfs2_orphan_scan_lock_res_init(&osb->osb_orphan_scan.os_lockres, osb);
3316
3317         osb->cconn = conn;
3318 bail:
3319         if (status < 0) {
3320                 ocfs2_dlm_shutdown_debug(osb);
3321                 if (osb->dc_task)
3322                         kthread_stop(osb->dc_task);
3323         }
3324
3325         return status;
3326 }
3327
3328 void ocfs2_dlm_shutdown(struct ocfs2_super *osb,
3329                         int hangup_pending)
3330 {
3331         ocfs2_drop_osb_locks(osb);
3332
3333         /*
3334          * Now that we have dropped all locks and ocfs2_dismount_volume()
3335          * has disabled recovery, the DLM won't be talking to us.  It's
3336          * safe to tear things down before disconnecting the cluster.
3337          */
3338
3339         if (osb->dc_task) {
3340                 kthread_stop(osb->dc_task);
3341                 osb->dc_task = NULL;
3342         }
3343
3344         ocfs2_lock_res_free(&osb->osb_super_lockres);
3345         ocfs2_lock_res_free(&osb->osb_rename_lockres);
3346         ocfs2_lock_res_free(&osb->osb_nfs_sync_lockres);
3347         ocfs2_lock_res_free(&osb->osb_orphan_scan.os_lockres);
3348
3349         ocfs2_cluster_disconnect(osb->cconn, hangup_pending);
3350         osb->cconn = NULL;
3351
3352         ocfs2_dlm_shutdown_debug(osb);
3353 }
3354
3355 static int ocfs2_drop_lock(struct ocfs2_super *osb,
3356                            struct ocfs2_lock_res *lockres)
3357 {
3358         int ret;
3359         unsigned long flags;
3360         u32 lkm_flags = 0;
3361
3362         /* We didn't get anywhere near actually using this lockres. */
3363         if (!(lockres->l_flags & OCFS2_LOCK_INITIALIZED))
3364                 goto out;
3365
3366         if (lockres->l_ops->flags & LOCK_TYPE_USES_LVB)
3367                 lkm_flags |= DLM_LKF_VALBLK;
3368
3369         spin_lock_irqsave(&lockres->l_lock, flags);
3370
3371         mlog_bug_on_msg(!(lockres->l_flags & OCFS2_LOCK_FREEING),
3372                         "lockres %s, flags 0x%lx\n",
3373                         lockres->l_name, lockres->l_flags);
3374
3375         while (lockres->l_flags & OCFS2_LOCK_BUSY) {
3376                 mlog(0, "waiting on busy lock \"%s\": flags = %lx, action = "
3377                      "%u, unlock_action = %u\n",
3378                      lockres->l_name, lockres->l_flags, lockres->l_action,
3379                      lockres->l_unlock_action);
3380
3381                 spin_unlock_irqrestore(&lockres->l_lock, flags);
3382
3383                 /* XXX: Today we just wait on any busy
3384                  * locks... Perhaps we need to cancel converts in the
3385                  * future? */
3386                 ocfs2_wait_on_busy_lock(lockres);
3387
3388                 spin_lock_irqsave(&lockres->l_lock, flags);
3389         }
3390
3391         if (lockres->l_ops->flags & LOCK_TYPE_USES_LVB) {
3392                 if (lockres->l_flags & OCFS2_LOCK_ATTACHED &&
3393                     lockres->l_level == DLM_LOCK_EX &&
3394                     !(lockres->l_flags & OCFS2_LOCK_NEEDS_REFRESH))
3395                         lockres->l_ops->set_lvb(lockres);
3396         }
3397
3398         if (lockres->l_flags & OCFS2_LOCK_BUSY)
3399                 mlog(ML_ERROR, "destroying busy lock: \"%s\"\n",
3400                      lockres->l_name);
3401         if (lockres->l_flags & OCFS2_LOCK_BLOCKED)
3402                 mlog(0, "destroying blocked lock: \"%s\"\n", lockres->l_name);
3403
3404         if (!(lockres->l_flags & OCFS2_LOCK_ATTACHED)) {
3405                 spin_unlock_irqrestore(&lockres->l_lock, flags);
3406                 goto out;
3407         }
3408
3409         lockres_clear_flags(lockres, OCFS2_LOCK_ATTACHED);
3410
3411         /* make sure we never get here while waiting for an ast to
3412          * fire. */
3413         BUG_ON(lockres->l_action != OCFS2_AST_INVALID);
3414
3415         /* is this necessary? */
3416         lockres_or_flags(lockres, OCFS2_LOCK_BUSY);
3417         lockres->l_unlock_action = OCFS2_UNLOCK_DROP_LOCK;
3418         spin_unlock_irqrestore(&lockres->l_lock, flags);
3419
3420         mlog(0, "lock %s\n", lockres->l_name);
3421
3422         ret = ocfs2_dlm_unlock(osb->cconn, &lockres->l_lksb, lkm_flags);
3423         if (ret) {
3424                 ocfs2_log_dlm_error("ocfs2_dlm_unlock", ret, lockres);
3425                 mlog(ML_ERROR, "lockres flags: %lu\n", lockres->l_flags);
3426                 ocfs2_dlm_dump_lksb(&lockres->l_lksb);
3427                 BUG();
3428         }
3429         mlog(0, "lock %s, successful return from ocfs2_dlm_unlock\n",
3430              lockres->l_name);
3431
3432         ocfs2_wait_on_busy_lock(lockres);
3433 out:
3434         return 0;
3435 }
3436
3437 static void ocfs2_process_blocked_lock(struct ocfs2_super *osb,
3438                                        struct ocfs2_lock_res *lockres);
3439
3440 /* Mark the lockres as being dropped. It will no longer be
3441  * queued if blocking, but we still may have to wait on it
3442  * being dequeued from the downconvert thread before we can consider
3443  * it safe to drop.
3444  *
3445  * You can *not* attempt to call cluster_lock on this lockres anymore. */
3446 void ocfs2_mark_lockres_freeing(struct ocfs2_super *osb,
3447                                 struct ocfs2_lock_res *lockres)
3448 {
3449         int status;
3450         struct ocfs2_mask_waiter mw;
3451         unsigned long flags, flags2;
3452
3453         ocfs2_init_mask_waiter(&mw);
3454
3455         spin_lock_irqsave(&lockres->l_lock, flags);
3456         lockres->l_flags |= OCFS2_LOCK_FREEING;
3457         if (lockres->l_flags & OCFS2_LOCK_QUEUED && current == osb->dc_task) {
3458                 /*
3459                  * We know the downconvert is queued but not in progress
3460                  * because we are the downconvert thread and processing
3461                  * different lock. So we can just remove the lock from the
3462                  * queue. This is not only an optimization but also a way
3463                  * to avoid the following deadlock:
3464                  *   ocfs2_dentry_post_unlock()
3465                  *     ocfs2_dentry_lock_put()
3466                  *       ocfs2_drop_dentry_lock()
3467                  *         iput()
3468                  *           ocfs2_evict_inode()
3469                  *             ocfs2_clear_inode()
3470                  *               ocfs2_mark_lockres_freeing()
3471                  *                 ... blocks waiting for OCFS2_LOCK_QUEUED
3472                  *                 since we are the downconvert thread which
3473                  *                 should clear the flag.
3474                  */
3475                 spin_unlock_irqrestore(&lockres->l_lock, flags);
3476                 spin_lock_irqsave(&osb->dc_task_lock, flags2);
3477                 list_del_init(&lockres->l_blocked_list);
3478                 osb->blocked_lock_count--;
3479                 spin_unlock_irqrestore(&osb->dc_task_lock, flags2);
3480                 /*
3481                  * Warn if we recurse into another post_unlock call.  Strictly
3482                  * speaking it isn't a problem but we need to be careful if
3483                  * that happens (stack overflow, deadlocks, ...) so warn if
3484                  * ocfs2 grows a path for which this can happen.
3485                  */
3486                 WARN_ON_ONCE(lockres->l_ops->post_unlock);
3487                 /* Since the lock is freeing we don't do much in the fn below */
3488                 ocfs2_process_blocked_lock(osb, lockres);
3489                 return;
3490         }
3491         while (lockres->l_flags & OCFS2_LOCK_QUEUED) {
3492                 lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_QUEUED, 0);
3493                 spin_unlock_irqrestore(&lockres->l_lock, flags);
3494
3495                 mlog(0, "Waiting on lockres %s\n", lockres->l_name);
3496
3497                 status = ocfs2_wait_for_mask(&mw);
3498                 if (status)
3499                         mlog_errno(status);
3500
3501                 spin_lock_irqsave(&lockres->l_lock, flags);
3502         }
3503         spin_unlock_irqrestore(&lockres->l_lock, flags);
3504 }
3505
3506 void ocfs2_simple_drop_lockres(struct ocfs2_super *osb,
3507                                struct ocfs2_lock_res *lockres)
3508 {
3509         int ret;
3510
3511         ocfs2_mark_lockres_freeing(osb, lockres);
3512         ret = ocfs2_drop_lock(osb, lockres);
3513         if (ret)
3514                 mlog_errno(ret);
3515 }
3516
3517 static void ocfs2_drop_osb_locks(struct ocfs2_super *osb)
3518 {
3519         ocfs2_simple_drop_lockres(osb, &osb->osb_super_lockres);
3520         ocfs2_simple_drop_lockres(osb, &osb->osb_rename_lockres);
3521         ocfs2_simple_drop_lockres(osb, &osb->osb_nfs_sync_lockres);
3522         ocfs2_simple_drop_lockres(osb, &osb->osb_orphan_scan.os_lockres);
3523 }
3524
3525 int ocfs2_drop_inode_locks(struct inode *inode)
3526 {
3527         int status, err;
3528
3529         /* No need to call ocfs2_mark_lockres_freeing here -
3530          * ocfs2_clear_inode has done it for us. */
3531
3532         err = ocfs2_drop_lock(OCFS2_SB(inode->i_sb),
3533                               &OCFS2_I(inode)->ip_open_lockres);
3534         if (err < 0)
3535                 mlog_errno(err);
3536
3537         status = err;
3538
3539         err = ocfs2_drop_lock(OCFS2_SB(inode->i_sb),
3540                               &OCFS2_I(inode)->ip_inode_lockres);
3541         if (err < 0)
3542                 mlog_errno(err);
3543         if (err < 0 && !status)
3544                 status = err;
3545
3546         err = ocfs2_drop_lock(OCFS2_SB(inode->i_sb),
3547                               &OCFS2_I(inode)->ip_rw_lockres);
3548         if (err < 0)
3549                 mlog_errno(err);
3550         if (err < 0 && !status)
3551                 status = err;
3552
3553         return status;
3554 }
3555
3556 static unsigned int ocfs2_prepare_downconvert(struct ocfs2_lock_res *lockres,
3557                                               int new_level)
3558 {
3559         assert_spin_locked(&lockres->l_lock);
3560
3561         BUG_ON(lockres->l_blocking <= DLM_LOCK_NL);
3562
3563         if (lockres->l_level <= new_level) {
3564                 mlog(ML_ERROR, "lockres %s, lvl %d <= %d, blcklst %d, mask %d, "
3565                      "type %d, flags 0x%lx, hold %d %d, act %d %d, req %d, "
3566                      "block %d, pgen %d\n", lockres->l_name, lockres->l_level,
3567                      new_level, list_empty(&lockres->l_blocked_list),
3568                      list_empty(&lockres->l_mask_waiters), lockres->l_type,
3569                      lockres->l_flags, lockres->l_ro_holders,
3570                      lockres->l_ex_holders, lockres->l_action,
3571                      lockres->l_unlock_action, lockres->l_requested,
3572                      lockres->l_blocking, lockres->l_pending_gen);
3573                 BUG();
3574         }
3575
3576         mlog(ML_BASTS, "lockres %s, level %d => %d, blocking %d\n",
3577              lockres->l_name, lockres->l_level, new_level, lockres->l_blocking);
3578
3579         lockres->l_action = OCFS2_AST_DOWNCONVERT;
3580         lockres->l_requested = new_level;
3581         lockres_or_flags(lockres, OCFS2_LOCK_BUSY);
3582         return lockres_set_pending(lockres);
3583 }
3584
3585 static int ocfs2_downconvert_lock(struct ocfs2_super *osb,
3586                                   struct ocfs2_lock_res *lockres,
3587                                   int new_level,
3588                                   int lvb,
3589                                   unsigned int generation)
3590 {
3591         int ret;
3592         u32 dlm_flags = DLM_LKF_CONVERT;
3593
3594         mlog(ML_BASTS, "lockres %s, level %d => %d\n", lockres->l_name,
3595              lockres->l_level, new_level);
3596
3597         /*
3598          * On DLM_LKF_VALBLK, fsdlm behaves differently with o2cb. It always
3599          * expects DLM_LKF_VALBLK being set if the LKB has LVB, so that
3600          * we can recover correctly from node failure. Otherwise, we may get
3601          * invalid LVB in LKB, but without DLM_SBF_VALNOTVALID being set.
3602          */
3603         if (ocfs2_userspace_stack(osb) &&
3604             lockres->l_ops->flags & LOCK_TYPE_USES_LVB)
3605                 lvb = 1;
3606
3607         if (lvb)
3608                 dlm_flags |= DLM_LKF_VALBLK;
3609
3610         ret = ocfs2_dlm_lock(osb->cconn,
3611                              new_level,
3612                              &lockres->l_lksb,
3613                              dlm_flags,
3614                              lockres->l_name,
3615                              OCFS2_LOCK_ID_MAX_LEN - 1);
3616         lockres_clear_pending(lockres, generation, osb);
3617         if (ret) {
3618                 ocfs2_log_dlm_error("ocfs2_dlm_lock", ret, lockres);
3619                 ocfs2_recover_from_dlm_error(lockres, 1);
3620                 goto bail;
3621         }
3622
3623         ret = 0;
3624 bail:
3625         return ret;
3626 }
3627
3628 /* returns 1 when the caller should unlock and call ocfs2_dlm_unlock */
3629 static int ocfs2_prepare_cancel_convert(struct ocfs2_super *osb,
3630                                         struct ocfs2_lock_res *lockres)
3631 {
3632         assert_spin_locked(&lockres->l_lock);
3633
3634         if (lockres->l_unlock_action == OCFS2_UNLOCK_CANCEL_CONVERT) {
3635                 /* If we're already trying to cancel a lock conversion
3636                  * then just drop the spinlock and allow the caller to
3637                  * requeue this lock. */
3638                 mlog(ML_BASTS, "lockres %s, skip convert\n", lockres->l_name);
3639                 return 0;
3640         }
3641
3642         /* were we in a convert when we got the bast fire? */
3643         BUG_ON(lockres->l_action != OCFS2_AST_CONVERT &&
3644                lockres->l_action != OCFS2_AST_DOWNCONVERT);
3645         /* set things up for the unlockast to know to just
3646          * clear out the ast_action and unset busy, etc. */
3647         lockres->l_unlock_action = OCFS2_UNLOCK_CANCEL_CONVERT;
3648
3649         mlog_bug_on_msg(!(lockres->l_flags & OCFS2_LOCK_BUSY),
3650                         "lock %s, invalid flags: 0x%lx\n",
3651                         lockres->l_name, lockres->l_flags);
3652
3653         mlog(ML_BASTS, "lockres %s\n", lockres->l_name);
3654
3655         return 1;
3656 }
3657
3658 static int ocfs2_cancel_convert(struct ocfs2_super *osb,
3659                                 struct ocfs2_lock_res *lockres)
3660 {
3661         int ret;
3662
3663         ret = ocfs2_dlm_unlock(osb->cconn, &lockres->l_lksb,
3664                                DLM_LKF_CANCEL);
3665         if (ret) {
3666                 ocfs2_log_dlm_error("ocfs2_dlm_unlock", ret, lockres);
3667                 ocfs2_recover_from_dlm_error(lockres, 0);
3668         }
3669
3670         mlog(ML_BASTS, "lockres %s\n", lockres->l_name);
3671
3672         return ret;
3673 }
3674
3675 static int ocfs2_unblock_lock(struct ocfs2_super *osb,
3676                               struct ocfs2_lock_res *lockres,
3677                               struct ocfs2_unblock_ctl *ctl)
3678 {
3679         unsigned long flags;
3680         int blocking;
3681         int new_level;
3682         int level;
3683         int ret = 0;
3684         int set_lvb = 0;
3685         unsigned int gen;
3686
3687         spin_lock_irqsave(&lockres->l_lock, flags);
3688
3689 recheck:
3690         /*
3691          * Is it still blocking? If not, we have no more work to do.
3692          */
3693         if (!(lockres->l_flags & OCFS2_LOCK_BLOCKED)) {
3694                 BUG_ON(lockres->l_blocking != DLM_LOCK_NL);
3695                 spin_unlock_irqrestore(&lockres->l_lock, flags);
3696                 ret = 0;
3697                 goto leave;
3698         }
3699
3700         if (lockres->l_flags & OCFS2_LOCK_BUSY) {
3701                 /* XXX
3702                  * This is a *big* race.  The OCFS2_LOCK_PENDING flag
3703                  * exists entirely for one reason - another thread has set
3704                  * OCFS2_LOCK_BUSY, but has *NOT* yet called dlm_lock().
3705                  *
3706                  * If we do ocfs2_cancel_convert() before the other thread
3707                  * calls dlm_lock(), our cancel will do nothing.  We will
3708                  * get no ast, and we will have no way of knowing the
3709                  * cancel failed.  Meanwhile, the other thread will call
3710                  * into dlm_lock() and wait...forever.
3711                  *
3712                  * Why forever?  Because another node has asked for the
3713                  * lock first; that's why we're here in unblock_lock().
3714                  *
3715                  * The solution is OCFS2_LOCK_PENDING.  When PENDING is
3716                  * set, we just requeue the unblock.  Only when the other
3717                  * thread has called dlm_lock() and cleared PENDING will
3718                  * we then cancel their request.
3719                  *
3720                  * All callers of dlm_lock() must set OCFS2_DLM_PENDING
3721                  * at the same time they set OCFS2_DLM_BUSY.  They must
3722                  * clear OCFS2_DLM_PENDING after dlm_lock() returns.
3723                  */
3724                 if (lockres->l_flags & OCFS2_LOCK_PENDING) {
3725                         mlog(ML_BASTS, "lockres %s, ReQ: Pending\n",
3726                              lockres->l_name);
3727                         goto leave_requeue;
3728                 }
3729
3730                 ctl->requeue = 1;
3731                 ret = ocfs2_prepare_cancel_convert(osb, lockres);
3732                 spin_unlock_irqrestore(&lockres->l_lock, flags);
3733                 if (ret) {
3734                         ret = ocfs2_cancel_convert(osb, lockres);
3735                         if (ret < 0)
3736                                 mlog_errno(ret);
3737                 }
3738                 goto leave;
3739         }
3740
3741         /*
3742          * This prevents livelocks. OCFS2_LOCK_UPCONVERT_FINISHING flag is
3743          * set when the ast is received for an upconvert just before the
3744          * OCFS2_LOCK_BUSY flag is cleared. Now if the fs received a bast
3745          * on the heels of the ast, we want to delay the downconvert just
3746          * enough to allow the up requestor to do its task. Because this
3747          * lock is in the blocked queue, the lock will be downconverted
3748          * as soon as the requestor is done with the lock.
3749          */
3750         if (lockres->l_flags & OCFS2_LOCK_UPCONVERT_FINISHING)
3751                 goto leave_requeue;
3752
3753         /*
3754          * How can we block and yet be at NL?  We were trying to upconvert
3755          * from NL and got canceled.  The code comes back here, and now
3756          * we notice and clear BLOCKING.
3757          */
3758         if (lockres->l_level == DLM_LOCK_NL) {
3759                 BUG_ON(lockres->l_ex_holders || lockres->l_ro_holders);
3760                 mlog(ML_BASTS, "lockres %s, Aborting dc\n", lockres->l_name);
3761                 lockres->l_blocking = DLM_LOCK_NL;
3762                 lockres_clear_flags(lockres, OCFS2_LOCK_BLOCKED);
3763                 spin_unlock_irqrestore(&lockres->l_lock, flags);
3764                 goto leave;
3765         }
3766
3767         /* if we're blocking an exclusive and we have *any* holders,
3768          * then requeue. */
3769         if ((lockres->l_blocking == DLM_LOCK_EX)
3770             && (lockres->l_ex_holders || lockres->l_ro_holders)) {
3771                 mlog(ML_BASTS, "lockres %s, ReQ: EX/PR Holders %u,%u\n",
3772                      lockres->l_name, lockres->l_ex_holders,
3773                      lockres->l_ro_holders);
3774                 goto leave_requeue;
3775         }
3776
3777         /* If it's a PR we're blocking, then only
3778          * requeue if we've got any EX holders */
3779         if (lockres->l_blocking == DLM_LOCK_PR &&
3780             lockres->l_ex_holders) {
3781                 mlog(ML_BASTS, "lockres %s, ReQ: EX Holders %u\n",
3782                      lockres->l_name, lockres->l_ex_holders);
3783                 goto leave_requeue;
3784         }
3785
3786         /*
3787          * Can we get a lock in this state if the holder counts are
3788          * zero? The meta data unblock code used to check this.
3789          */
3790         if ((lockres->l_ops->flags & LOCK_TYPE_REQUIRES_REFRESH)
3791             && (lockres->l_flags & OCFS2_LOCK_REFRESHING)) {
3792                 mlog(ML_BASTS, "lockres %s, ReQ: Lock Refreshing\n",
3793                      lockres->l_name);
3794                 goto leave_requeue;
3795         }
3796
3797         new_level = ocfs2_highest_compat_lock_level(lockres->l_blocking);
3798
3799         if (lockres->l_ops->check_downconvert
3800             && !lockres->l_ops->check_downconvert(lockres, new_level)) {
3801                 mlog(ML_BASTS, "lockres %s, ReQ: Checkpointing\n",
3802                      lockres->l_name);
3803                 goto leave_requeue;
3804         }
3805
3806         /* If we get here, then we know that there are no more
3807          * incompatible holders (and anyone asking for an incompatible
3808          * lock is blocked). We can now downconvert the lock */
3809         if (!lockres->l_ops->downconvert_worker)
3810                 goto downconvert;
3811
3812         /* Some lockres types want to do a bit of work before
3813          * downconverting a lock. Allow that here. The worker function
3814          * may sleep, so we save off a copy of what we're blocking as
3815          * it may change while we're not holding the spin lock. */
3816         blocking = lockres->l_blocking;
3817         level = lockres->l_level;
3818         spin_unlock_irqrestore(&lockres->l_lock, flags);
3819
3820         ctl->unblock_action = lockres->l_ops->downconvert_worker(lockres, blocking);
3821
3822         if (ctl->unblock_action == UNBLOCK_STOP_POST) {
3823                 mlog(ML_BASTS, "lockres %s, UNBLOCK_STOP_POST\n",
3824                      lockres->l_name);
3825                 goto leave;
3826         }
3827
3828         spin_lock_irqsave(&lockres->l_lock, flags);
3829         if ((blocking != lockres->l_blocking) || (level != lockres->l_level)) {
3830                 /* If this changed underneath us, then we can't drop
3831                  * it just yet. */
3832                 mlog(ML_BASTS, "lockres %s, block=%d:%d, level=%d:%d, "
3833                      "Recheck\n", lockres->l_name, blocking,
3834                      lockres->l_blocking, level, lockres->l_level);
3835                 goto recheck;
3836         }
3837
3838 downconvert:
3839         ctl->requeue = 0;
3840
3841         if (lockres->l_ops->flags & LOCK_TYPE_USES_LVB) {
3842                 if (lockres->l_level == DLM_LOCK_EX)
3843                         set_lvb = 1;
3844
3845                 /*
3846                  * We only set the lvb if the lock has been fully
3847                  * refreshed - otherwise we risk setting stale
3848                  * data. Otherwise, there's no need to actually clear
3849                  * out the lvb here as it's value is still valid.
3850                  */
3851                 if (set_lvb && !(lockres->l_flags & OCFS2_LOCK_NEEDS_REFRESH))
3852                         lockres->l_ops->set_lvb(lockres);
3853         }
3854
3855         gen = ocfs2_prepare_downconvert(lockres, new_level);
3856         spin_unlock_irqrestore(&lockres->l_lock, flags);
3857         ret = ocfs2_downconvert_lock(osb, lockres, new_level, set_lvb,
3858                                      gen);
3859
3860 leave:
3861         if (ret)
3862                 mlog_errno(ret);
3863         return ret;
3864
3865 leave_requeue:
3866         spin_unlock_irqrestore(&lockres->l_lock, flags);
3867         ctl->requeue = 1;
3868
3869         return 0;
3870 }
3871
3872 static int ocfs2_data_convert_worker(struct ocfs2_lock_res *lockres,
3873                                      int blocking)
3874 {
3875         struct inode *inode;
3876         struct address_space *mapping;
3877         struct ocfs2_inode_info *oi;
3878
3879         inode = ocfs2_lock_res_inode(lockres);
3880         mapping = inode->i_mapping;
3881
3882         if (S_ISDIR(inode->i_mode)) {
3883                 oi = OCFS2_I(inode);
3884                 oi->ip_dir_lock_gen++;
3885                 mlog(0, "generation: %u\n", oi->ip_dir_lock_gen);
3886                 goto out;
3887         }
3888
3889         if (!S_ISREG(inode->i_mode))
3890                 goto out;
3891
3892         /*
3893          * We need this before the filemap_fdatawrite() so that it can
3894          * transfer the dirty bit from the PTE to the
3895          * page. Unfortunately this means that even for EX->PR
3896          * downconverts, we'll lose our mappings and have to build
3897          * them up again.
3898          */
3899         unmap_mapping_range(mapping, 0, 0, 0);
3900
3901         if (filemap_fdatawrite(mapping)) {
3902                 mlog(ML_ERROR, "Could not sync inode %llu for downconvert!",
3903                      (unsigned long long)OCFS2_I(inode)->ip_blkno);
3904         }
3905         sync_mapping_buffers(mapping);
3906         if (blocking == DLM_LOCK_EX) {
3907                 truncate_inode_pages(mapping, 0);
3908         } else {
3909                 /* We only need to wait on the I/O if we're not also
3910                  * truncating pages because truncate_inode_pages waits
3911                  * for us above. We don't truncate pages if we're
3912                  * blocking anything < EXMODE because we want to keep
3913                  * them around in that case. */
3914                 filemap_fdatawait(mapping);
3915         }
3916
3917         forget_all_cached_acls(inode);
3918
3919 out:
3920         return UNBLOCK_CONTINUE;
3921 }
3922
3923 static int ocfs2_ci_checkpointed(struct ocfs2_caching_info *ci,
3924                                  struct ocfs2_lock_res *lockres,
3925                                  int new_level)
3926 {
3927         int checkpointed = ocfs2_ci_fully_checkpointed(ci);
3928
3929         BUG_ON(new_level != DLM_LOCK_NL && new_level != DLM_LOCK_PR);
3930         BUG_ON(lockres->l_level != DLM_LOCK_EX && !checkpointed);
3931
3932         if (checkpointed)
3933                 return 1;
3934
3935         ocfs2_start_checkpoint(OCFS2_SB(ocfs2_metadata_cache_get_super(ci)));
3936         return 0;
3937 }
3938
3939 static int ocfs2_check_meta_downconvert(struct ocfs2_lock_res *lockres,
3940                                         int new_level)
3941 {
3942         struct inode *inode = ocfs2_lock_res_inode(lockres);
3943
3944         return ocfs2_ci_checkpointed(INODE_CACHE(inode), lockres, new_level);
3945 }
3946
3947 static void ocfs2_set_meta_lvb(struct ocfs2_lock_res *lockres)
3948 {
3949         struct inode *inode = ocfs2_lock_res_inode(lockres);
3950
3951         __ocfs2_stuff_meta_lvb(inode);
3952 }
3953
3954 /*
3955  * Does the final reference drop on our dentry lock. Right now this
3956  * happens in the downconvert thread, but we could choose to simplify the
3957  * dlmglue API and push these off to the ocfs2_wq in the future.
3958  */
3959 static void ocfs2_dentry_post_unlock(struct ocfs2_super *osb,
3960                                      struct ocfs2_lock_res *lockres)
3961 {
3962         struct ocfs2_dentry_lock *dl = ocfs2_lock_res_dl(lockres);
3963         ocfs2_dentry_lock_put(osb, dl);
3964 }
3965
3966 /*
3967  * d_delete() matching dentries before the lock downconvert.
3968  *
3969  * At this point, any process waiting to destroy the
3970  * dentry_lock due to last ref count is stopped by the
3971  * OCFS2_LOCK_QUEUED flag.
3972  *
3973  * We have two potential problems
3974  *
3975  * 1) If we do the last reference drop on our dentry_lock (via dput)
3976  *    we'll wind up in ocfs2_release_dentry_lock(), waiting on
3977  *    the downconvert to finish. Instead we take an elevated
3978  *    reference and push the drop until after we've completed our
3979  *    unblock processing.
3980  *
3981  * 2) There might be another process with a final reference,
3982  *    waiting on us to finish processing. If this is the case, we
3983  *    detect it and exit out - there's no more dentries anyway.
3984  */
3985 static int ocfs2_dentry_convert_worker(struct ocfs2_lock_res *lockres,
3986                                        int blocking)
3987 {
3988         struct ocfs2_dentry_lock *dl = ocfs2_lock_res_dl(lockres);
3989         struct ocfs2_inode_info *oi = OCFS2_I(dl->dl_inode);
3990         struct dentry *dentry;
3991         unsigned long flags;
3992         int extra_ref = 0;
3993
3994         /*
3995          * This node is blocking another node from getting a read
3996          * lock. This happens when we've renamed within a
3997          * directory. We've forced the other nodes to d_delete(), but
3998          * we never actually dropped our lock because it's still
3999          * valid. The downconvert code will retain a PR for this node,
4000          * so there's no further work to do.
4001          */
4002         if (blocking == DLM_LOCK_PR)
4003                 return UNBLOCK_CONTINUE;
4004
4005         /*
4006          * Mark this inode as potentially orphaned. The code in
4007          * ocfs2_delete_inode() will figure out whether it actually
4008          * needs to be freed or not.
4009          */
4010         spin_lock(&oi->ip_lock);
4011         oi->ip_flags |= OCFS2_INODE_MAYBE_ORPHANED;
4012         spin_unlock(&oi->ip_lock);
4013
4014         /*
4015          * Yuck. We need to make sure however that the check of
4016          * OCFS2_LOCK_FREEING and the extra reference are atomic with
4017          * respect to a reference decrement or the setting of that
4018          * flag.
4019          */
4020         spin_lock_irqsave(&lockres->l_lock, flags);
4021         spin_lock(&dentry_attach_lock);
4022         if (!(lockres->l_flags & OCFS2_LOCK_FREEING)
4023             && dl->dl_count) {
4024                 dl->dl_count++;
4025                 extra_ref = 1;
4026         }
4027         spin_unlock(&dentry_attach_lock);
4028         spin_unlock_irqrestore(&lockres->l_lock, flags);
4029
4030         mlog(0, "extra_ref = %d\n", extra_ref);
4031
4032         /*
4033          * We have a process waiting on us in ocfs2_dentry_iput(),
4034          * which means we can't have any more outstanding
4035          * aliases. There's no need to do any more work.
4036          */
4037         if (!extra_ref)
4038                 return UNBLOCK_CONTINUE;
4039
4040         spin_lock(&dentry_attach_lock);
4041         while (1) {
4042                 dentry = ocfs2_find_local_alias(dl->dl_inode,
4043                                                 dl->dl_parent_blkno, 1);
4044                 if (!dentry)
4045                         break;
4046                 spin_unlock(&dentry_attach_lock);
4047
4048                 if (S_ISDIR(dl->dl_inode->i_mode))
4049                         shrink_dcache_parent(dentry);
4050
4051                 mlog(0, "d_delete(%pd);\n", dentry);
4052
4053                 /*
4054                  * The following dcache calls may do an
4055                  * iput(). Normally we don't want that from the
4056                  * downconverting thread, but in this case it's ok
4057                  * because the requesting node already has an
4058                  * exclusive lock on the inode, so it can't be queued
4059                  * for a downconvert.
4060                  */
4061                 d_delete(dentry);
4062                 dput(dentry);
4063
4064                 spin_lock(&dentry_attach_lock);
4065         }
4066         spin_unlock(&dentry_attach_lock);
4067
4068         /*
4069          * If we are the last holder of this dentry lock, there is no
4070          * reason to downconvert so skip straight to the unlock.
4071          */
4072         if (dl->dl_count == 1)
4073                 return UNBLOCK_STOP_POST;
4074
4075         return UNBLOCK_CONTINUE_POST;
4076 }
4077
4078 static int ocfs2_check_refcount_downconvert(struct ocfs2_lock_res *lockres,
4079                                             int new_level)
4080 {
4081         struct ocfs2_refcount_tree *tree =
4082                                 ocfs2_lock_res_refcount_tree(lockres);
4083
4084         return ocfs2_ci_checkpointed(&tree->rf_ci, lockres, new_level);
4085 }
4086
4087 static int ocfs2_refcount_convert_worker(struct ocfs2_lock_res *lockres,
4088                                          int blocking)
4089 {
4090         struct ocfs2_refcount_tree *tree =
4091                                 ocfs2_lock_res_refcount_tree(lockres);
4092
4093         ocfs2_metadata_cache_purge(&tree->rf_ci);
4094
4095         return UNBLOCK_CONTINUE;
4096 }
4097
4098 static void ocfs2_set_qinfo_lvb(struct ocfs2_lock_res *lockres)
4099 {
4100         struct ocfs2_qinfo_lvb *lvb;
4101         struct ocfs2_mem_dqinfo *oinfo = ocfs2_lock_res_qinfo(lockres);
4102         struct mem_dqinfo *info = sb_dqinfo(oinfo->dqi_gi.dqi_sb,
4103                                             oinfo->dqi_gi.dqi_type);
4104
4105         lvb = ocfs2_dlm_lvb(&lockres->l_lksb);
4106         lvb->lvb_version = OCFS2_QINFO_LVB_VERSION;
4107         lvb->lvb_bgrace = cpu_to_be32(info->dqi_bgrace);
4108         lvb->lvb_igrace = cpu_to_be32(info->dqi_igrace);
4109         lvb->lvb_syncms = cpu_to_be32(oinfo->dqi_syncms);
4110         lvb->lvb_blocks = cpu_to_be32(oinfo->dqi_gi.dqi_blocks);
4111         lvb->lvb_free_blk = cpu_to_be32(oinfo->dqi_gi.dqi_free_blk);
4112         lvb->lvb_free_entry = cpu_to_be32(oinfo->dqi_gi.dqi_free_entry);
4113 }
4114
4115 void ocfs2_qinfo_unlock(struct ocfs2_mem_dqinfo *oinfo, int ex)
4116 {
4117         struct ocfs2_lock_res *lockres = &oinfo->dqi_gqlock;
4118         struct ocfs2_super *osb = OCFS2_SB(oinfo->dqi_gi.dqi_sb);
4119         int level = ex ? DLM_LOCK_EX : DLM_LOCK_PR;
4120
4121         if (!ocfs2_is_hard_readonly(osb) && !ocfs2_mount_local(osb))
4122                 ocfs2_cluster_unlock(osb, lockres, level);
4123 }
4124
4125 static int ocfs2_refresh_qinfo(struct ocfs2_mem_dqinfo *oinfo)
4126 {
4127         struct mem_dqinfo *info = sb_dqinfo(oinfo->dqi_gi.dqi_sb,
4128                                             oinfo->dqi_gi.dqi_type);
4129         struct ocfs2_lock_res *lockres = &oinfo->dqi_gqlock;
4130         struct ocfs2_qinfo_lvb *lvb = ocfs2_dlm_lvb(&lockres->l_lksb);
4131         struct buffer_head *bh = NULL;
4132         struct ocfs2_global_disk_dqinfo *gdinfo;
4133         int status = 0;
4134
4135         if (ocfs2_dlm_lvb_valid(&lockres->l_lksb) &&
4136             lvb->lvb_version == OCFS2_QINFO_LVB_VERSION) {
4137                 info->dqi_bgrace = be32_to_cpu(lvb->lvb_bgrace);
4138                 info->dqi_igrace = be32_to_cpu(lvb->lvb_igrace);
4139                 oinfo->dqi_syncms = be32_to_cpu(lvb->lvb_syncms);
4140                 oinfo->dqi_gi.dqi_blocks = be32_to_cpu(lvb->lvb_blocks);
4141                 oinfo->dqi_gi.dqi_free_blk = be32_to_cpu(lvb->lvb_free_blk);
4142                 oinfo->dqi_gi.dqi_free_entry =
4143                                         be32_to_cpu(lvb->lvb_free_entry);
4144         } else {
4145                 status = ocfs2_read_quota_phys_block(oinfo->dqi_gqinode,
4146                                                      oinfo->dqi_giblk, &bh);
4147                 if (status) {
4148                         mlog_errno(status);
4149                         goto bail;
4150                 }
4151                 gdinfo = (struct ocfs2_global_disk_dqinfo *)
4152                                         (bh->b_data + OCFS2_GLOBAL_INFO_OFF);
4153                 info->dqi_bgrace = le32_to_cpu(gdinfo->dqi_bgrace);
4154                 info->dqi_igrace = le32_to_cpu(gdinfo->dqi_igrace);
4155                 oinfo->dqi_syncms = le32_to_cpu(gdinfo->dqi_syncms);
4156                 oinfo->dqi_gi.dqi_blocks = le32_to_cpu(gdinfo->dqi_blocks);
4157                 oinfo->dqi_gi.dqi_free_blk = le32_to_cpu(gdinfo->dqi_free_blk);
4158                 oinfo->dqi_gi.dqi_free_entry =
4159                                         le32_to_cpu(gdinfo->dqi_free_entry);
4160                 brelse(bh);
4161                 ocfs2_track_lock_refresh(lockres);
4162         }
4163
4164 bail:
4165         return status;
4166 }
4167
4168 /* Lock quota info, this function expects at least shared lock on the quota file
4169  * so that we can safely refresh quota info from disk. */
4170 int ocfs2_qinfo_lock(struct ocfs2_mem_dqinfo *oinfo, int ex)
4171 {
4172         struct ocfs2_lock_res *lockres = &oinfo->dqi_gqlock;
4173         struct ocfs2_super *osb = OCFS2_SB(oinfo->dqi_gi.dqi_sb);
4174         int level = ex ? DLM_LOCK_EX : DLM_LOCK_PR;
4175         int status = 0;
4176
4177         /* On RO devices, locking really isn't needed... */
4178         if (ocfs2_is_hard_readonly(osb)) {
4179                 if (ex)
4180                         status = -EROFS;
4181                 goto bail;
4182         }
4183         if (ocfs2_mount_local(osb))
4184                 goto bail;
4185
4186         status = ocfs2_cluster_lock(osb, lockres, level, 0, 0);
4187         if (status < 0) {
4188                 mlog_errno(status);
4189                 goto bail;
4190         }
4191         if (!ocfs2_should_refresh_lock_res(lockres))
4192                 goto bail;
4193         /* OK, we have the lock but we need to refresh the quota info */
4194         status = ocfs2_refresh_qinfo(oinfo);
4195         if (status)
4196                 ocfs2_qinfo_unlock(oinfo, ex);
4197         ocfs2_complete_lock_res_refresh(lockres, status);
4198 bail:
4199         return status;
4200 }
4201
4202 int ocfs2_refcount_lock(struct ocfs2_refcount_tree *ref_tree, int ex)
4203 {
4204         int status;
4205         int level = ex ? DLM_LOCK_EX : DLM_LOCK_PR;
4206         struct ocfs2_lock_res *lockres = &ref_tree->rf_lockres;
4207         struct ocfs2_super *osb = lockres->l_priv;
4208
4209
4210         if (ocfs2_is_hard_readonly(osb))
4211                 return -EROFS;
4212
4213         if (ocfs2_mount_local(osb))
4214                 return 0;
4215
4216         status = ocfs2_cluster_lock(osb, lockres, level, 0, 0);
4217         if (status < 0)
4218                 mlog_errno(status);
4219
4220         return status;
4221 }
4222
4223 void ocfs2_refcount_unlock(struct ocfs2_refcount_tree *ref_tree, int ex)
4224 {
4225         int level = ex ? DLM_LOCK_EX : DLM_LOCK_PR;
4226         struct ocfs2_lock_res *lockres = &ref_tree->rf_lockres;
4227         struct ocfs2_super *osb = lockres->l_priv;
4228
4229         if (!ocfs2_mount_local(osb))
4230                 ocfs2_cluster_unlock(osb, lockres, level);
4231 }
4232
4233 static void ocfs2_process_blocked_lock(struct ocfs2_super *osb,
4234                                        struct ocfs2_lock_res *lockres)
4235 {
4236         int status;
4237         struct ocfs2_unblock_ctl ctl = {0, 0,};
4238         unsigned long flags;
4239
4240         /* Our reference to the lockres in this function can be
4241          * considered valid until we remove the OCFS2_LOCK_QUEUED
4242          * flag. */
4243
4244         BUG_ON(!lockres);
4245         BUG_ON(!lockres->l_ops);
4246
4247         mlog(ML_BASTS, "lockres %s blocked\n", lockres->l_name);
4248
4249         /* Detect whether a lock has been marked as going away while
4250          * the downconvert thread was processing other things. A lock can
4251          * still be marked with OCFS2_LOCK_FREEING after this check,
4252          * but short circuiting here will still save us some
4253          * performance. */
4254         spin_lock_irqsave(&lockres->l_lock, flags);
4255         if (lockres->l_flags & OCFS2_LOCK_FREEING)
4256                 goto unqueue;
4257         spin_unlock_irqrestore(&lockres->l_lock, flags);
4258
4259         status = ocfs2_unblock_lock(osb, lockres, &ctl);
4260         if (status < 0)
4261                 mlog_errno(status);
4262
4263         spin_lock_irqsave(&lockres->l_lock, flags);
4264 unqueue:
4265         if (lockres->l_flags & OCFS2_LOCK_FREEING || !ctl.requeue) {
4266                 lockres_clear_flags(lockres, OCFS2_LOCK_QUEUED);
4267         } else
4268                 ocfs2_schedule_blocked_lock(osb, lockres);
4269
4270         mlog(ML_BASTS, "lockres %s, requeue = %s.\n", lockres->l_name,
4271              ctl.requeue ? "yes" : "no");
4272         spin_unlock_irqrestore(&lockres->l_lock, flags);
4273
4274         if (ctl.unblock_action != UNBLOCK_CONTINUE
4275             && lockres->l_ops->post_unlock)
4276                 lockres->l_ops->post_unlock(osb, lockres);
4277 }
4278
4279 static void ocfs2_schedule_blocked_lock(struct ocfs2_super *osb,
4280                                         struct ocfs2_lock_res *lockres)
4281 {
4282         unsigned long flags;
4283
4284         assert_spin_locked(&lockres->l_lock);
4285
4286         if (lockres->l_flags & OCFS2_LOCK_FREEING) {
4287                 /* Do not schedule a lock for downconvert when it's on
4288                  * the way to destruction - any nodes wanting access
4289                  * to the resource will get it soon. */
4290                 mlog(ML_BASTS, "lockres %s won't be scheduled: flags 0x%lx\n",
4291                      lockres->l_name, lockres->l_flags);
4292                 return;
4293         }
4294
4295         lockres_or_flags(lockres, OCFS2_LOCK_QUEUED);
4296
4297         spin_lock_irqsave(&osb->dc_task_lock, flags);
4298         if (list_empty(&lockres->l_blocked_list)) {
4299                 list_add_tail(&lockres->l_blocked_list,
4300                               &osb->blocked_lock_list);
4301                 osb->blocked_lock_count++;
4302         }
4303         spin_unlock_irqrestore(&osb->dc_task_lock, flags);
4304 }
4305
4306 static void ocfs2_downconvert_thread_do_work(struct ocfs2_super *osb)
4307 {
4308         unsigned long processed;
4309         unsigned long flags;
4310         struct ocfs2_lock_res *lockres;
4311
4312         spin_lock_irqsave(&osb->dc_task_lock, flags);
4313         /* grab this early so we know to try again if a state change and
4314          * wake happens part-way through our work  */
4315         osb->dc_work_sequence = osb->dc_wake_sequence;
4316
4317         processed = osb->blocked_lock_count;
4318         /*
4319          * blocked lock processing in this loop might call iput which can
4320          * remove items off osb->blocked_lock_list. Downconvert up to
4321          * 'processed' number of locks, but stop short if we had some
4322          * removed in ocfs2_mark_lockres_freeing when downconverting.
4323          */
4324         while (processed && !list_empty(&osb->blocked_lock_list)) {
4325                 lockres = list_entry(osb->blocked_lock_list.next,
4326                                      struct ocfs2_lock_res, l_blocked_list);
4327                 list_del_init(&lockres->l_blocked_list);
4328                 osb->blocked_lock_count--;
4329                 spin_unlock_irqrestore(&osb->dc_task_lock, flags);
4330
4331                 BUG_ON(!processed);
4332                 processed--;
4333
4334                 ocfs2_process_blocked_lock(osb, lockres);
4335
4336                 spin_lock_irqsave(&osb->dc_task_lock, flags);
4337         }
4338         spin_unlock_irqrestore(&osb->dc_task_lock, flags);
4339 }
4340
4341 static int ocfs2_downconvert_thread_lists_empty(struct ocfs2_super *osb)
4342 {
4343         int empty = 0;
4344         unsigned long flags;
4345
4346         spin_lock_irqsave(&osb->dc_task_lock, flags);
4347         if (list_empty(&osb->blocked_lock_list))
4348                 empty = 1;
4349
4350         spin_unlock_irqrestore(&osb->dc_task_lock, flags);
4351         return empty;
4352 }
4353
4354 static int ocfs2_downconvert_thread_should_wake(struct ocfs2_super *osb)
4355 {
4356         int should_wake = 0;
4357         unsigned long flags;
4358
4359         spin_lock_irqsave(&osb->dc_task_lock, flags);
4360         if (osb->dc_work_sequence != osb->dc_wake_sequence)
4361                 should_wake = 1;
4362         spin_unlock_irqrestore(&osb->dc_task_lock, flags);
4363
4364         return should_wake;
4365 }
4366
4367 static int ocfs2_downconvert_thread(void *arg)
4368 {
4369         int status = 0;
4370         struct ocfs2_super *osb = arg;
4371
4372         /* only quit once we've been asked to stop and there is no more
4373          * work available */
4374         while (!(kthread_should_stop() &&
4375                 ocfs2_downconvert_thread_lists_empty(osb))) {
4376
4377                 wait_event_interruptible(osb->dc_event,
4378                                          ocfs2_downconvert_thread_should_wake(osb) ||
4379                                          kthread_should_stop());
4380
4381                 mlog(0, "downconvert_thread: awoken\n");
4382
4383                 ocfs2_downconvert_thread_do_work(osb);
4384         }
4385
4386         osb->dc_task = NULL;
4387         return status;
4388 }
4389
4390 void ocfs2_wake_downconvert_thread(struct ocfs2_super *osb)
4391 {
4392         unsigned long flags;
4393
4394         spin_lock_irqsave(&osb->dc_task_lock, flags);
4395         /* make sure the voting thread gets a swipe at whatever changes
4396          * the caller may have made to the voting state */
4397         osb->dc_wake_sequence++;
4398         spin_unlock_irqrestore(&osb->dc_task_lock, flags);
4399         wake_up(&osb->dc_event);
4400 }