]> asedeno.scripts.mit.edu Git - linux.git/blob - kernel/locking/rwsem.c
locking/rwsem: Enable time-based spinning on reader-owned rwsem
[linux.git] / kernel / locking / rwsem.c
1 // SPDX-License-Identifier: GPL-2.0
2 /* kernel/rwsem.c: R/W semaphores, public implementation
3  *
4  * Written by David Howells (dhowells@redhat.com).
5  * Derived from asm-i386/semaphore.h
6  *
7  * Writer lock-stealing by Alex Shi <alex.shi@intel.com>
8  * and Michel Lespinasse <walken@google.com>
9  *
10  * Optimistic spinning by Tim Chen <tim.c.chen@intel.com>
11  * and Davidlohr Bueso <davidlohr@hp.com>. Based on mutexes.
12  *
13  * Rwsem count bit fields re-definition and rwsem rearchitecture by
14  * Waiman Long <longman@redhat.com> and
15  * Peter Zijlstra <peterz@infradead.org>.
16  */
17
18 #include <linux/types.h>
19 #include <linux/kernel.h>
20 #include <linux/sched.h>
21 #include <linux/sched/rt.h>
22 #include <linux/sched/task.h>
23 #include <linux/sched/debug.h>
24 #include <linux/sched/wake_q.h>
25 #include <linux/sched/signal.h>
26 #include <linux/sched/clock.h>
27 #include <linux/export.h>
28 #include <linux/rwsem.h>
29 #include <linux/atomic.h>
30
31 #include "rwsem.h"
32 #include "lock_events.h"
33
34 /*
35  * The least significant 3 bits of the owner value has the following
36  * meanings when set.
37  *  - Bit 0: RWSEM_READER_OWNED - The rwsem is owned by readers
38  *  - Bit 1: RWSEM_RD_NONSPINNABLE - Readers cannot spin on this lock.
39  *  - Bit 2: RWSEM_WR_NONSPINNABLE - Writers cannot spin on this lock.
40  *
41  * When the rwsem is either owned by an anonymous writer, or it is
42  * reader-owned, but a spinning writer has timed out, both nonspinnable
43  * bits will be set to disable optimistic spinning by readers and writers.
44  * In the later case, the last unlocking reader should then check the
45  * writer nonspinnable bit and clear it only to give writers preference
46  * to acquire the lock via optimistic spinning, but not readers. Similar
47  * action is also done in the reader slowpath.
48
49  * When a writer acquires a rwsem, it puts its task_struct pointer
50  * into the owner field. It is cleared after an unlock.
51  *
52  * When a reader acquires a rwsem, it will also puts its task_struct
53  * pointer into the owner field with the RWSEM_READER_OWNED bit set.
54  * On unlock, the owner field will largely be left untouched. So
55  * for a free or reader-owned rwsem, the owner value may contain
56  * information about the last reader that acquires the rwsem.
57  *
58  * That information may be helpful in debugging cases where the system
59  * seems to hang on a reader owned rwsem especially if only one reader
60  * is involved. Ideally we would like to track all the readers that own
61  * a rwsem, but the overhead is simply too big.
62  */
63 #define RWSEM_READER_OWNED      (1UL << 0)
64 #define RWSEM_RD_NONSPINNABLE   (1UL << 1)
65 #define RWSEM_WR_NONSPINNABLE   (1UL << 2)
66 #define RWSEM_NONSPINNABLE      (RWSEM_RD_NONSPINNABLE | RWSEM_WR_NONSPINNABLE)
67 #define RWSEM_OWNER_FLAGS_MASK  (RWSEM_READER_OWNED | RWSEM_NONSPINNABLE)
68
69 #ifdef CONFIG_DEBUG_RWSEMS
70 # define DEBUG_RWSEMS_WARN_ON(c, sem)   do {                    \
71         if (!debug_locks_silent &&                              \
72             WARN_ONCE(c, "DEBUG_RWSEMS_WARN_ON(%s): count = 0x%lx, owner = 0x%lx, curr 0x%lx, list %sempty\n",\
73                 #c, atomic_long_read(&(sem)->count),            \
74                 atomic_long_read(&(sem)->owner), (long)current, \
75                 list_empty(&(sem)->wait_list) ? "" : "not "))   \
76                         debug_locks_off();                      \
77         } while (0)
78 #else
79 # define DEBUG_RWSEMS_WARN_ON(c, sem)
80 #endif
81
82 /*
83  * The definition of the atomic counter in the semaphore:
84  *
85  * Bit  0   - writer locked bit
86  * Bit  1   - waiters present bit
87  * Bit  2   - lock handoff bit
88  * Bits 3-7 - reserved
89  * Bits 8-X - 24-bit (32-bit) or 56-bit reader count
90  *
91  * atomic_long_fetch_add() is used to obtain reader lock, whereas
92  * atomic_long_cmpxchg() will be used to obtain writer lock.
93  *
94  * There are three places where the lock handoff bit may be set or cleared.
95  * 1) rwsem_mark_wake() for readers.
96  * 2) rwsem_try_write_lock() for writers.
97  * 3) Error path of rwsem_down_write_slowpath().
98  *
99  * For all the above cases, wait_lock will be held. A writer must also
100  * be the first one in the wait_list to be eligible for setting the handoff
101  * bit. So concurrent setting/clearing of handoff bit is not possible.
102  */
103 #define RWSEM_WRITER_LOCKED     (1UL << 0)
104 #define RWSEM_FLAG_WAITERS      (1UL << 1)
105 #define RWSEM_FLAG_HANDOFF      (1UL << 2)
106
107 #define RWSEM_READER_SHIFT      8
108 #define RWSEM_READER_BIAS       (1UL << RWSEM_READER_SHIFT)
109 #define RWSEM_READER_MASK       (~(RWSEM_READER_BIAS - 1))
110 #define RWSEM_WRITER_MASK       RWSEM_WRITER_LOCKED
111 #define RWSEM_LOCK_MASK         (RWSEM_WRITER_MASK|RWSEM_READER_MASK)
112 #define RWSEM_READ_FAILED_MASK  (RWSEM_WRITER_MASK|RWSEM_FLAG_WAITERS|\
113                                  RWSEM_FLAG_HANDOFF)
114
115 /*
116  * All writes to owner are protected by WRITE_ONCE() to make sure that
117  * store tearing can't happen as optimistic spinners may read and use
118  * the owner value concurrently without lock. Read from owner, however,
119  * may not need READ_ONCE() as long as the pointer value is only used
120  * for comparison and isn't being dereferenced.
121  */
122 static inline void rwsem_set_owner(struct rw_semaphore *sem)
123 {
124         atomic_long_set(&sem->owner, (long)current);
125 }
126
127 static inline void rwsem_clear_owner(struct rw_semaphore *sem)
128 {
129         atomic_long_set(&sem->owner, 0);
130 }
131
132 /*
133  * Test the flags in the owner field.
134  */
135 static inline bool rwsem_test_oflags(struct rw_semaphore *sem, long flags)
136 {
137         return atomic_long_read(&sem->owner) & flags;
138 }
139
140 /*
141  * The task_struct pointer of the last owning reader will be left in
142  * the owner field.
143  *
144  * Note that the owner value just indicates the task has owned the rwsem
145  * previously, it may not be the real owner or one of the real owners
146  * anymore when that field is examined, so take it with a grain of salt.
147  */
148 static inline void __rwsem_set_reader_owned(struct rw_semaphore *sem,
149                                             struct task_struct *owner)
150 {
151         unsigned long val = (unsigned long)owner | RWSEM_READER_OWNED;
152
153         atomic_long_set(&sem->owner, val);
154 }
155
156 static inline void rwsem_set_reader_owned(struct rw_semaphore *sem)
157 {
158         __rwsem_set_reader_owned(sem, current);
159 }
160
161 /*
162  * Return true if the rwsem is owned by a reader.
163  */
164 static inline bool is_rwsem_reader_owned(struct rw_semaphore *sem)
165 {
166 #ifdef CONFIG_DEBUG_RWSEMS
167         /*
168          * Check the count to see if it is write-locked.
169          */
170         long count = atomic_long_read(&sem->count);
171
172         if (count & RWSEM_WRITER_MASK)
173                 return false;
174 #endif
175         return rwsem_test_oflags(sem, RWSEM_READER_OWNED);
176 }
177
178 #ifdef CONFIG_DEBUG_RWSEMS
179 /*
180  * With CONFIG_DEBUG_RWSEMS configured, it will make sure that if there
181  * is a task pointer in owner of a reader-owned rwsem, it will be the
182  * real owner or one of the real owners. The only exception is when the
183  * unlock is done by up_read_non_owner().
184  */
185 static inline void rwsem_clear_reader_owned(struct rw_semaphore *sem)
186 {
187         unsigned long val = atomic_long_read(&sem->owner);
188
189         while ((val & ~RWSEM_OWNER_FLAGS_MASK) == (unsigned long)current) {
190                 if (atomic_long_try_cmpxchg(&sem->owner, &val,
191                                             val & RWSEM_OWNER_FLAGS_MASK))
192                         return;
193         }
194 }
195 #else
196 static inline void rwsem_clear_reader_owned(struct rw_semaphore *sem)
197 {
198 }
199 #endif
200
201 /*
202  * Set the RWSEM_NONSPINNABLE bits if the RWSEM_READER_OWNED flag
203  * remains set. Otherwise, the operation will be aborted.
204  */
205 static inline void rwsem_set_nonspinnable(struct rw_semaphore *sem)
206 {
207         unsigned long owner = atomic_long_read(&sem->owner);
208
209         do {
210                 if (!(owner & RWSEM_READER_OWNED))
211                         break;
212                 if (owner & RWSEM_NONSPINNABLE)
213                         break;
214         } while (!atomic_long_try_cmpxchg(&sem->owner, &owner,
215                                           owner | RWSEM_NONSPINNABLE));
216 }
217
218 /*
219  * Return just the real task structure pointer of the owner
220  */
221 static inline struct task_struct *rwsem_owner(struct rw_semaphore *sem)
222 {
223         return (struct task_struct *)
224                 (atomic_long_read(&sem->owner) & ~RWSEM_OWNER_FLAGS_MASK);
225 }
226
227 /*
228  * Return the real task structure pointer of the owner and the embedded
229  * flags in the owner. pflags must be non-NULL.
230  */
231 static inline struct task_struct *
232 rwsem_owner_flags(struct rw_semaphore *sem, unsigned long *pflags)
233 {
234         unsigned long owner = atomic_long_read(&sem->owner);
235
236         *pflags = owner & RWSEM_OWNER_FLAGS_MASK;
237         return (struct task_struct *)(owner & ~RWSEM_OWNER_FLAGS_MASK);
238 }
239
240 /*
241  * Guide to the rw_semaphore's count field.
242  *
243  * When the RWSEM_WRITER_LOCKED bit in count is set, the lock is owned
244  * by a writer.
245  *
246  * The lock is owned by readers when
247  * (1) the RWSEM_WRITER_LOCKED isn't set in count,
248  * (2) some of the reader bits are set in count, and
249  * (3) the owner field has RWSEM_READ_OWNED bit set.
250  *
251  * Having some reader bits set is not enough to guarantee a readers owned
252  * lock as the readers may be in the process of backing out from the count
253  * and a writer has just released the lock. So another writer may steal
254  * the lock immediately after that.
255  */
256
257 /*
258  * Initialize an rwsem:
259  */
260 void __init_rwsem(struct rw_semaphore *sem, const char *name,
261                   struct lock_class_key *key)
262 {
263 #ifdef CONFIG_DEBUG_LOCK_ALLOC
264         /*
265          * Make sure we are not reinitializing a held semaphore:
266          */
267         debug_check_no_locks_freed((void *)sem, sizeof(*sem));
268         lockdep_init_map(&sem->dep_map, name, key, 0);
269 #endif
270         atomic_long_set(&sem->count, RWSEM_UNLOCKED_VALUE);
271         raw_spin_lock_init(&sem->wait_lock);
272         INIT_LIST_HEAD(&sem->wait_list);
273         atomic_long_set(&sem->owner, 0L);
274 #ifdef CONFIG_RWSEM_SPIN_ON_OWNER
275         osq_lock_init(&sem->osq);
276 #endif
277 }
278 EXPORT_SYMBOL(__init_rwsem);
279
280 enum rwsem_waiter_type {
281         RWSEM_WAITING_FOR_WRITE,
282         RWSEM_WAITING_FOR_READ
283 };
284
285 struct rwsem_waiter {
286         struct list_head list;
287         struct task_struct *task;
288         enum rwsem_waiter_type type;
289         unsigned long timeout;
290 };
291 #define rwsem_first_waiter(sem) \
292         list_first_entry(&sem->wait_list, struct rwsem_waiter, list)
293
294 enum rwsem_wake_type {
295         RWSEM_WAKE_ANY,         /* Wake whatever's at head of wait list */
296         RWSEM_WAKE_READERS,     /* Wake readers only */
297         RWSEM_WAKE_READ_OWNED   /* Waker thread holds the read lock */
298 };
299
300 enum writer_wait_state {
301         WRITER_NOT_FIRST,       /* Writer is not first in wait list */
302         WRITER_FIRST,           /* Writer is first in wait list     */
303         WRITER_HANDOFF          /* Writer is first & handoff needed */
304 };
305
306 /*
307  * The typical HZ value is either 250 or 1000. So set the minimum waiting
308  * time to at least 4ms or 1 jiffy (if it is higher than 4ms) in the wait
309  * queue before initiating the handoff protocol.
310  */
311 #define RWSEM_WAIT_TIMEOUT      DIV_ROUND_UP(HZ, 250)
312
313 /*
314  * Magic number to batch-wakeup waiting readers, even when writers are
315  * also present in the queue. This both limits the amount of work the
316  * waking thread must do and also prevents any potential counter overflow,
317  * however unlikely.
318  */
319 #define MAX_READERS_WAKEUP      0x100
320
321 /*
322  * handle the lock release when processes blocked on it that can now run
323  * - if we come here from up_xxxx(), then the RWSEM_FLAG_WAITERS bit must
324  *   have been set.
325  * - there must be someone on the queue
326  * - the wait_lock must be held by the caller
327  * - tasks are marked for wakeup, the caller must later invoke wake_up_q()
328  *   to actually wakeup the blocked task(s) and drop the reference count,
329  *   preferably when the wait_lock is released
330  * - woken process blocks are discarded from the list after having task zeroed
331  * - writers are only marked woken if downgrading is false
332  */
333 static void rwsem_mark_wake(struct rw_semaphore *sem,
334                             enum rwsem_wake_type wake_type,
335                             struct wake_q_head *wake_q)
336 {
337         struct rwsem_waiter *waiter, *tmp;
338         long oldcount, woken = 0, adjustment = 0;
339         struct list_head wlist;
340
341         lockdep_assert_held(&sem->wait_lock);
342
343         /*
344          * Take a peek at the queue head waiter such that we can determine
345          * the wakeup(s) to perform.
346          */
347         waiter = rwsem_first_waiter(sem);
348
349         if (waiter->type == RWSEM_WAITING_FOR_WRITE) {
350                 if (wake_type == RWSEM_WAKE_ANY) {
351                         /*
352                          * Mark writer at the front of the queue for wakeup.
353                          * Until the task is actually later awoken later by
354                          * the caller, other writers are able to steal it.
355                          * Readers, on the other hand, will block as they
356                          * will notice the queued writer.
357                          */
358                         wake_q_add(wake_q, waiter->task);
359                         lockevent_inc(rwsem_wake_writer);
360                 }
361
362                 return;
363         }
364
365         /*
366          * Writers might steal the lock before we grant it to the next reader.
367          * We prefer to do the first reader grant before counting readers
368          * so we can bail out early if a writer stole the lock.
369          */
370         if (wake_type != RWSEM_WAKE_READ_OWNED) {
371                 adjustment = RWSEM_READER_BIAS;
372                 oldcount = atomic_long_fetch_add(adjustment, &sem->count);
373                 if (unlikely(oldcount & RWSEM_WRITER_MASK)) {
374                         /*
375                          * When we've been waiting "too" long (for writers
376                          * to give up the lock), request a HANDOFF to
377                          * force the issue.
378                          */
379                         if (!(oldcount & RWSEM_FLAG_HANDOFF) &&
380                             time_after(jiffies, waiter->timeout)) {
381                                 adjustment -= RWSEM_FLAG_HANDOFF;
382                                 lockevent_inc(rwsem_rlock_handoff);
383                         }
384
385                         atomic_long_add(-adjustment, &sem->count);
386                         return;
387                 }
388                 /*
389                  * Set it to reader-owned to give spinners an early
390                  * indication that readers now have the lock.
391                  */
392                 __rwsem_set_reader_owned(sem, waiter->task);
393         }
394
395         /*
396          * Grant up to MAX_READERS_WAKEUP read locks to all the readers in the
397          * queue. We know that the woken will be at least 1 as we accounted
398          * for above. Note we increment the 'active part' of the count by the
399          * number of readers before waking any processes up.
400          *
401          * This is an adaptation of the phase-fair R/W locks where at the
402          * reader phase (first waiter is a reader), all readers are eligible
403          * to acquire the lock at the same time irrespective of their order
404          * in the queue. The writers acquire the lock according to their
405          * order in the queue.
406          *
407          * We have to do wakeup in 2 passes to prevent the possibility that
408          * the reader count may be decremented before it is incremented. It
409          * is because the to-be-woken waiter may not have slept yet. So it
410          * may see waiter->task got cleared, finish its critical section and
411          * do an unlock before the reader count increment.
412          *
413          * 1) Collect the read-waiters in a separate list, count them and
414          *    fully increment the reader count in rwsem.
415          * 2) For each waiters in the new list, clear waiter->task and
416          *    put them into wake_q to be woken up later.
417          */
418         INIT_LIST_HEAD(&wlist);
419         list_for_each_entry_safe(waiter, tmp, &sem->wait_list, list) {
420                 if (waiter->type == RWSEM_WAITING_FOR_WRITE)
421                         continue;
422
423                 woken++;
424                 list_move_tail(&waiter->list, &wlist);
425
426                 /*
427                  * Limit # of readers that can be woken up per wakeup call.
428                  */
429                 if (woken >= MAX_READERS_WAKEUP)
430                         break;
431         }
432
433         adjustment = woken * RWSEM_READER_BIAS - adjustment;
434         lockevent_cond_inc(rwsem_wake_reader, woken);
435         if (list_empty(&sem->wait_list)) {
436                 /* hit end of list above */
437                 adjustment -= RWSEM_FLAG_WAITERS;
438         }
439
440         /*
441          * When we've woken a reader, we no longer need to force writers
442          * to give up the lock and we can clear HANDOFF.
443          */
444         if (woken && (atomic_long_read(&sem->count) & RWSEM_FLAG_HANDOFF))
445                 adjustment -= RWSEM_FLAG_HANDOFF;
446
447         if (adjustment)
448                 atomic_long_add(adjustment, &sem->count);
449
450         /* 2nd pass */
451         list_for_each_entry_safe(waiter, tmp, &wlist, list) {
452                 struct task_struct *tsk;
453
454                 tsk = waiter->task;
455                 get_task_struct(tsk);
456
457                 /*
458                  * Ensure calling get_task_struct() before setting the reader
459                  * waiter to nil such that rwsem_down_read_slowpath() cannot
460                  * race with do_exit() by always holding a reference count
461                  * to the task to wakeup.
462                  */
463                 smp_store_release(&waiter->task, NULL);
464                 /*
465                  * Ensure issuing the wakeup (either by us or someone else)
466                  * after setting the reader waiter to nil.
467                  */
468                 wake_q_add_safe(wake_q, tsk);
469         }
470 }
471
472 /*
473  * This function must be called with the sem->wait_lock held to prevent
474  * race conditions between checking the rwsem wait list and setting the
475  * sem->count accordingly.
476  *
477  * If wstate is WRITER_HANDOFF, it will make sure that either the handoff
478  * bit is set or the lock is acquired with handoff bit cleared.
479  */
480 static inline bool rwsem_try_write_lock(struct rw_semaphore *sem,
481                                         enum writer_wait_state wstate)
482 {
483         long count, new;
484
485         lockdep_assert_held(&sem->wait_lock);
486
487         count = atomic_long_read(&sem->count);
488         do {
489                 bool has_handoff = !!(count & RWSEM_FLAG_HANDOFF);
490
491                 if (has_handoff && wstate == WRITER_NOT_FIRST)
492                         return false;
493
494                 new = count;
495
496                 if (count & RWSEM_LOCK_MASK) {
497                         if (has_handoff || (wstate != WRITER_HANDOFF))
498                                 return false;
499
500                         new |= RWSEM_FLAG_HANDOFF;
501                 } else {
502                         new |= RWSEM_WRITER_LOCKED;
503                         new &= ~RWSEM_FLAG_HANDOFF;
504
505                         if (list_is_singular(&sem->wait_list))
506                                 new &= ~RWSEM_FLAG_WAITERS;
507                 }
508         } while (!atomic_long_try_cmpxchg_acquire(&sem->count, &count, new));
509
510         /*
511          * We have either acquired the lock with handoff bit cleared or
512          * set the handoff bit.
513          */
514         if (new & RWSEM_FLAG_HANDOFF)
515                 return false;
516
517         rwsem_set_owner(sem);
518         return true;
519 }
520
521 #ifdef CONFIG_RWSEM_SPIN_ON_OWNER
522 /*
523  * Try to acquire read lock before the reader is put on wait queue.
524  * Lock acquisition isn't allowed if the rwsem is locked or a writer handoff
525  * is ongoing.
526  */
527 static inline bool rwsem_try_read_lock_unqueued(struct rw_semaphore *sem)
528 {
529         long count = atomic_long_read(&sem->count);
530
531         if (count & (RWSEM_WRITER_MASK | RWSEM_FLAG_HANDOFF))
532                 return false;
533
534         count = atomic_long_fetch_add_acquire(RWSEM_READER_BIAS, &sem->count);
535         if (!(count & (RWSEM_WRITER_MASK | RWSEM_FLAG_HANDOFF))) {
536                 rwsem_set_reader_owned(sem);
537                 lockevent_inc(rwsem_opt_rlock);
538                 return true;
539         }
540
541         /* Back out the change */
542         atomic_long_add(-RWSEM_READER_BIAS, &sem->count);
543         return false;
544 }
545
546 /*
547  * Try to acquire write lock before the writer has been put on wait queue.
548  */
549 static inline bool rwsem_try_write_lock_unqueued(struct rw_semaphore *sem)
550 {
551         long count = atomic_long_read(&sem->count);
552
553         while (!(count & (RWSEM_LOCK_MASK|RWSEM_FLAG_HANDOFF))) {
554                 if (atomic_long_try_cmpxchg_acquire(&sem->count, &count,
555                                         count | RWSEM_WRITER_LOCKED)) {
556                         rwsem_set_owner(sem);
557                         lockevent_inc(rwsem_opt_wlock);
558                         return true;
559                 }
560         }
561         return false;
562 }
563
564 static inline bool owner_on_cpu(struct task_struct *owner)
565 {
566         /*
567          * As lock holder preemption issue, we both skip spinning if
568          * task is not on cpu or its cpu is preempted
569          */
570         return owner->on_cpu && !vcpu_is_preempted(task_cpu(owner));
571 }
572
573 static inline bool rwsem_can_spin_on_owner(struct rw_semaphore *sem,
574                                            unsigned long nonspinnable)
575 {
576         struct task_struct *owner;
577         unsigned long flags;
578         bool ret = true;
579
580         BUILD_BUG_ON(!(RWSEM_OWNER_UNKNOWN & RWSEM_NONSPINNABLE));
581
582         if (need_resched()) {
583                 lockevent_inc(rwsem_opt_fail);
584                 return false;
585         }
586
587         preempt_disable();
588         rcu_read_lock();
589         owner = rwsem_owner_flags(sem, &flags);
590         if ((flags & nonspinnable) || (owner && !owner_on_cpu(owner)))
591                 ret = false;
592         rcu_read_unlock();
593         preempt_enable();
594
595         lockevent_cond_inc(rwsem_opt_fail, !ret);
596         return ret;
597 }
598
599 /*
600  * The rwsem_spin_on_owner() function returns the folowing 4 values
601  * depending on the lock owner state.
602  *   OWNER_NULL  : owner is currently NULL
603  *   OWNER_WRITER: when owner changes and is a writer
604  *   OWNER_READER: when owner changes and the new owner may be a reader.
605  *   OWNER_NONSPINNABLE:
606  *                 when optimistic spinning has to stop because either the
607  *                 owner stops running, is unknown, or its timeslice has
608  *                 been used up.
609  */
610 enum owner_state {
611         OWNER_NULL              = 1 << 0,
612         OWNER_WRITER            = 1 << 1,
613         OWNER_READER            = 1 << 2,
614         OWNER_NONSPINNABLE      = 1 << 3,
615 };
616 #define OWNER_SPINNABLE         (OWNER_NULL | OWNER_WRITER | OWNER_READER)
617
618 static inline enum owner_state
619 rwsem_owner_state(struct task_struct *owner, unsigned long flags, unsigned long nonspinnable)
620 {
621         if (flags & nonspinnable)
622                 return OWNER_NONSPINNABLE;
623
624         if (flags & RWSEM_READER_OWNED)
625                 return OWNER_READER;
626
627         return owner ? OWNER_WRITER : OWNER_NULL;
628 }
629
630 static noinline enum owner_state
631 rwsem_spin_on_owner(struct rw_semaphore *sem, unsigned long nonspinnable)
632 {
633         struct task_struct *new, *owner;
634         unsigned long flags, new_flags;
635         enum owner_state state;
636
637         owner = rwsem_owner_flags(sem, &flags);
638         state = rwsem_owner_state(owner, flags, nonspinnable);
639         if (state != OWNER_WRITER)
640                 return state;
641
642         rcu_read_lock();
643         for (;;) {
644                 if (atomic_long_read(&sem->count) & RWSEM_FLAG_HANDOFF) {
645                         state = OWNER_NONSPINNABLE;
646                         break;
647                 }
648
649                 new = rwsem_owner_flags(sem, &new_flags);
650                 if ((new != owner) || (new_flags != flags)) {
651                         state = rwsem_owner_state(new, new_flags, nonspinnable);
652                         break;
653                 }
654
655                 /*
656                  * Ensure we emit the owner->on_cpu, dereference _after_
657                  * checking sem->owner still matches owner, if that fails,
658                  * owner might point to free()d memory, if it still matches,
659                  * the rcu_read_lock() ensures the memory stays valid.
660                  */
661                 barrier();
662
663                 if (need_resched() || !owner_on_cpu(owner)) {
664                         state = OWNER_NONSPINNABLE;
665                         break;
666                 }
667
668                 cpu_relax();
669         }
670         rcu_read_unlock();
671
672         return state;
673 }
674
675 /*
676  * Calculate reader-owned rwsem spinning threshold for writer
677  *
678  * The more readers own the rwsem, the longer it will take for them to
679  * wind down and free the rwsem. So the empirical formula used to
680  * determine the actual spinning time limit here is:
681  *
682  *   Spinning threshold = (10 + nr_readers/2)us
683  *
684  * The limit is capped to a maximum of 25us (30 readers). This is just
685  * a heuristic and is subjected to change in the future.
686  */
687 static inline u64 rwsem_rspin_threshold(struct rw_semaphore *sem)
688 {
689         long count = atomic_long_read(&sem->count);
690         int readers = count >> RWSEM_READER_SHIFT;
691         u64 delta;
692
693         if (readers > 30)
694                 readers = 30;
695         delta = (20 + readers) * NSEC_PER_USEC / 2;
696
697         return sched_clock() + delta;
698 }
699
700 static bool rwsem_optimistic_spin(struct rw_semaphore *sem, bool wlock)
701 {
702         bool taken = false;
703         int prev_owner_state = OWNER_NULL;
704         int loop = 0;
705         u64 rspin_threshold = 0;
706         unsigned long nonspinnable = wlock ? RWSEM_WR_NONSPINNABLE
707                                            : RWSEM_RD_NONSPINNABLE;
708
709         preempt_disable();
710
711         /* sem->wait_lock should not be held when doing optimistic spinning */
712         if (!osq_lock(&sem->osq))
713                 goto done;
714
715         /*
716          * Optimistically spin on the owner field and attempt to acquire the
717          * lock whenever the owner changes. Spinning will be stopped when:
718          *  1) the owning writer isn't running; or
719          *  2) readers own the lock and spinning time has exceeded limit.
720          */
721         for (;;) {
722                 enum owner_state owner_state;
723
724                 owner_state = rwsem_spin_on_owner(sem, nonspinnable);
725                 if (!(owner_state & OWNER_SPINNABLE))
726                         break;
727
728                 /*
729                  * Try to acquire the lock
730                  */
731                 taken = wlock ? rwsem_try_write_lock_unqueued(sem)
732                               : rwsem_try_read_lock_unqueued(sem);
733
734                 if (taken)
735                         break;
736
737                 /*
738                  * Time-based reader-owned rwsem optimistic spinning
739                  */
740                 if (wlock && (owner_state == OWNER_READER)) {
741                         /*
742                          * Re-initialize rspin_threshold every time when
743                          * the owner state changes from non-reader to reader.
744                          * This allows a writer to steal the lock in between
745                          * 2 reader phases and have the threshold reset at
746                          * the beginning of the 2nd reader phase.
747                          */
748                         if (prev_owner_state != OWNER_READER) {
749                                 if (rwsem_test_oflags(sem, nonspinnable))
750                                         break;
751                                 rspin_threshold = rwsem_rspin_threshold(sem);
752                                 loop = 0;
753                         }
754
755                         /*
756                          * Check time threshold once every 16 iterations to
757                          * avoid calling sched_clock() too frequently so
758                          * as to reduce the average latency between the times
759                          * when the lock becomes free and when the spinner
760                          * is ready to do a trylock.
761                          */
762                         else if (!(++loop & 0xf) && (sched_clock() > rspin_threshold)) {
763                                 rwsem_set_nonspinnable(sem);
764                                 lockevent_inc(rwsem_opt_nospin);
765                                 break;
766                         }
767                 }
768
769                 /*
770                  * An RT task cannot do optimistic spinning if it cannot
771                  * be sure the lock holder is running or live-lock may
772                  * happen if the current task and the lock holder happen
773                  * to run in the same CPU. However, aborting optimistic
774                  * spinning while a NULL owner is detected may miss some
775                  * opportunity where spinning can continue without causing
776                  * problem.
777                  *
778                  * There are 2 possible cases where an RT task may be able
779                  * to continue spinning.
780                  *
781                  * 1) The lock owner is in the process of releasing the
782                  *    lock, sem->owner is cleared but the lock has not
783                  *    been released yet.
784                  * 2) The lock was free and owner cleared, but another
785                  *    task just comes in and acquire the lock before
786                  *    we try to get it. The new owner may be a spinnable
787                  *    writer.
788                  *
789                  * To take advantage of two scenarios listed agove, the RT
790                  * task is made to retry one more time to see if it can
791                  * acquire the lock or continue spinning on the new owning
792                  * writer. Of course, if the time lag is long enough or the
793                  * new owner is not a writer or spinnable, the RT task will
794                  * quit spinning.
795                  *
796                  * If the owner is a writer, the need_resched() check is
797                  * done inside rwsem_spin_on_owner(). If the owner is not
798                  * a writer, need_resched() check needs to be done here.
799                  */
800                 if (owner_state != OWNER_WRITER) {
801                         if (need_resched())
802                                 break;
803                         if (rt_task(current) &&
804                            (prev_owner_state != OWNER_WRITER))
805                                 break;
806                 }
807                 prev_owner_state = owner_state;
808
809                 /*
810                  * The cpu_relax() call is a compiler barrier which forces
811                  * everything in this loop to be re-loaded. We don't need
812                  * memory barriers as we'll eventually observe the right
813                  * values at the cost of a few extra spins.
814                  */
815                 cpu_relax();
816         }
817         osq_unlock(&sem->osq);
818 done:
819         preempt_enable();
820         lockevent_cond_inc(rwsem_opt_fail, !taken);
821         return taken;
822 }
823
824 /*
825  * Clear the owner's RWSEM_WR_NONSPINNABLE bit if it is set. This should
826  * only be called when the reader count reaches 0.
827  *
828  * This give writers better chance to acquire the rwsem first before
829  * readers when the rwsem was being held by readers for a relatively long
830  * period of time. Race can happen that an optimistic spinner may have
831  * just stolen the rwsem and set the owner, but just clearing the
832  * RWSEM_WR_NONSPINNABLE bit will do no harm anyway.
833  */
834 static inline void clear_wr_nonspinnable(struct rw_semaphore *sem)
835 {
836         if (rwsem_test_oflags(sem, RWSEM_WR_NONSPINNABLE))
837                 atomic_long_andnot(RWSEM_WR_NONSPINNABLE, &sem->owner);
838 }
839 #else
840 static inline bool rwsem_can_spin_on_owner(struct rw_semaphore *sem,
841                                            unsigned long nonspinnable)
842 {
843         return false;
844 }
845
846 static inline bool rwsem_optimistic_spin(struct rw_semaphore *sem, bool wlock)
847 {
848         return false;
849 }
850
851 static inline void clear_wr_nonspinnable(struct rw_semaphore *sem) { }
852 #endif
853
854 /*
855  * Wait for the read lock to be granted
856  */
857 static struct rw_semaphore __sched *
858 rwsem_down_read_slowpath(struct rw_semaphore *sem, int state)
859 {
860         long count, adjustment = -RWSEM_READER_BIAS;
861         bool wake = false;
862         struct rwsem_waiter waiter;
863         DEFINE_WAKE_Q(wake_q);
864
865         if (!rwsem_can_spin_on_owner(sem, RWSEM_RD_NONSPINNABLE))
866                 goto queue;
867
868         /*
869          * Undo read bias from down_read() and do optimistic spinning.
870          */
871         atomic_long_add(-RWSEM_READER_BIAS, &sem->count);
872         adjustment = 0;
873         if (rwsem_optimistic_spin(sem, false)) {
874                 /*
875                  * Wake up other readers in the wait list if the front
876                  * waiter is a reader.
877                  */
878                 if ((atomic_long_read(&sem->count) & RWSEM_FLAG_WAITERS)) {
879                         raw_spin_lock_irq(&sem->wait_lock);
880                         if (!list_empty(&sem->wait_list))
881                                 rwsem_mark_wake(sem, RWSEM_WAKE_READ_OWNED,
882                                                 &wake_q);
883                         raw_spin_unlock_irq(&sem->wait_lock);
884                         wake_up_q(&wake_q);
885                 }
886                 return sem;
887         }
888
889 queue:
890         waiter.task = current;
891         waiter.type = RWSEM_WAITING_FOR_READ;
892         waiter.timeout = jiffies + RWSEM_WAIT_TIMEOUT;
893
894         raw_spin_lock_irq(&sem->wait_lock);
895         if (list_empty(&sem->wait_list)) {
896                 /*
897                  * In case the wait queue is empty and the lock isn't owned
898                  * by a writer or has the handoff bit set, this reader can
899                  * exit the slowpath and return immediately as its
900                  * RWSEM_READER_BIAS has already been set in the count.
901                  */
902                 if (adjustment && !(atomic_long_read(&sem->count) &
903                      (RWSEM_WRITER_MASK | RWSEM_FLAG_HANDOFF))) {
904                         raw_spin_unlock_irq(&sem->wait_lock);
905                         rwsem_set_reader_owned(sem);
906                         lockevent_inc(rwsem_rlock_fast);
907                         return sem;
908                 }
909                 adjustment += RWSEM_FLAG_WAITERS;
910         }
911         list_add_tail(&waiter.list, &sem->wait_list);
912
913         /* we're now waiting on the lock, but no longer actively locking */
914         if (adjustment)
915                 count = atomic_long_add_return(adjustment, &sem->count);
916         else
917                 count = atomic_long_read(&sem->count);
918
919         /*
920          * If there are no active locks, wake the front queued process(es).
921          *
922          * If there are no writers and we are first in the queue,
923          * wake our own waiter to join the existing active readers !
924          */
925         if (!(count & RWSEM_LOCK_MASK)) {
926                 clear_wr_nonspinnable(sem);
927                 wake = true;
928         }
929         if (wake || (!(count & RWSEM_WRITER_MASK) &&
930                     (adjustment & RWSEM_FLAG_WAITERS)))
931                 rwsem_mark_wake(sem, RWSEM_WAKE_ANY, &wake_q);
932
933         raw_spin_unlock_irq(&sem->wait_lock);
934         wake_up_q(&wake_q);
935
936         /* wait to be given the lock */
937         while (true) {
938                 set_current_state(state);
939                 if (!waiter.task)
940                         break;
941                 if (signal_pending_state(state, current)) {
942                         raw_spin_lock_irq(&sem->wait_lock);
943                         if (waiter.task)
944                                 goto out_nolock;
945                         raw_spin_unlock_irq(&sem->wait_lock);
946                         break;
947                 }
948                 schedule();
949                 lockevent_inc(rwsem_sleep_reader);
950         }
951
952         __set_current_state(TASK_RUNNING);
953         lockevent_inc(rwsem_rlock);
954         return sem;
955 out_nolock:
956         list_del(&waiter.list);
957         if (list_empty(&sem->wait_list)) {
958                 atomic_long_andnot(RWSEM_FLAG_WAITERS|RWSEM_FLAG_HANDOFF,
959                                    &sem->count);
960         }
961         raw_spin_unlock_irq(&sem->wait_lock);
962         __set_current_state(TASK_RUNNING);
963         lockevent_inc(rwsem_rlock_fail);
964         return ERR_PTR(-EINTR);
965 }
966
967 /*
968  * Wait until we successfully acquire the write lock
969  */
970 static struct rw_semaphore *
971 rwsem_down_write_slowpath(struct rw_semaphore *sem, int state)
972 {
973         long count;
974         enum writer_wait_state wstate;
975         struct rwsem_waiter waiter;
976         struct rw_semaphore *ret = sem;
977         DEFINE_WAKE_Q(wake_q);
978
979         /* do optimistic spinning and steal lock if possible */
980         if (rwsem_can_spin_on_owner(sem, RWSEM_WR_NONSPINNABLE) &&
981             rwsem_optimistic_spin(sem, true))
982                 return sem;
983
984         /*
985          * Optimistic spinning failed, proceed to the slowpath
986          * and block until we can acquire the sem.
987          */
988         waiter.task = current;
989         waiter.type = RWSEM_WAITING_FOR_WRITE;
990         waiter.timeout = jiffies + RWSEM_WAIT_TIMEOUT;
991
992         raw_spin_lock_irq(&sem->wait_lock);
993
994         /* account for this before adding a new element to the list */
995         wstate = list_empty(&sem->wait_list) ? WRITER_FIRST : WRITER_NOT_FIRST;
996
997         list_add_tail(&waiter.list, &sem->wait_list);
998
999         /* we're now waiting on the lock */
1000         if (wstate == WRITER_NOT_FIRST) {
1001                 count = atomic_long_read(&sem->count);
1002
1003                 /*
1004                  * If there were already threads queued before us and:
1005                  *  1) there are no no active locks, wake the front
1006                  *     queued process(es) as the handoff bit might be set.
1007                  *  2) there are no active writers and some readers, the lock
1008                  *     must be read owned; so we try to wake any read lock
1009                  *     waiters that were queued ahead of us.
1010                  */
1011                 if (count & RWSEM_WRITER_MASK)
1012                         goto wait;
1013
1014                 rwsem_mark_wake(sem, (count & RWSEM_READER_MASK)
1015                                         ? RWSEM_WAKE_READERS
1016                                         : RWSEM_WAKE_ANY, &wake_q);
1017
1018                 if (!wake_q_empty(&wake_q)) {
1019                         /*
1020                          * We want to minimize wait_lock hold time especially
1021                          * when a large number of readers are to be woken up.
1022                          */
1023                         raw_spin_unlock_irq(&sem->wait_lock);
1024                         wake_up_q(&wake_q);
1025                         wake_q_init(&wake_q);   /* Used again, reinit */
1026                         raw_spin_lock_irq(&sem->wait_lock);
1027                 }
1028         } else {
1029                 atomic_long_or(RWSEM_FLAG_WAITERS, &sem->count);
1030         }
1031
1032 wait:
1033         /* wait until we successfully acquire the lock */
1034         set_current_state(state);
1035         while (true) {
1036                 if (rwsem_try_write_lock(sem, wstate))
1037                         break;
1038
1039                 raw_spin_unlock_irq(&sem->wait_lock);
1040
1041                 /* Block until there are no active lockers. */
1042                 for (;;) {
1043                         if (signal_pending_state(state, current))
1044                                 goto out_nolock;
1045
1046                         schedule();
1047                         lockevent_inc(rwsem_sleep_writer);
1048                         set_current_state(state);
1049                         /*
1050                          * If HANDOFF bit is set, unconditionally do
1051                          * a trylock.
1052                          */
1053                         if (wstate == WRITER_HANDOFF)
1054                                 break;
1055
1056                         if ((wstate == WRITER_NOT_FIRST) &&
1057                             (rwsem_first_waiter(sem) == &waiter))
1058                                 wstate = WRITER_FIRST;
1059
1060                         count = atomic_long_read(&sem->count);
1061                         if (!(count & RWSEM_LOCK_MASK))
1062                                 break;
1063
1064                         /*
1065                          * The setting of the handoff bit is deferred
1066                          * until rwsem_try_write_lock() is called.
1067                          */
1068                         if ((wstate == WRITER_FIRST) && (rt_task(current) ||
1069                             time_after(jiffies, waiter.timeout))) {
1070                                 wstate = WRITER_HANDOFF;
1071                                 lockevent_inc(rwsem_wlock_handoff);
1072                                 break;
1073                         }
1074                 }
1075
1076                 raw_spin_lock_irq(&sem->wait_lock);
1077         }
1078         __set_current_state(TASK_RUNNING);
1079         list_del(&waiter.list);
1080         raw_spin_unlock_irq(&sem->wait_lock);
1081         lockevent_inc(rwsem_wlock);
1082
1083         return ret;
1084
1085 out_nolock:
1086         __set_current_state(TASK_RUNNING);
1087         raw_spin_lock_irq(&sem->wait_lock);
1088         list_del(&waiter.list);
1089
1090         if (unlikely(wstate == WRITER_HANDOFF))
1091                 atomic_long_add(-RWSEM_FLAG_HANDOFF,  &sem->count);
1092
1093         if (list_empty(&sem->wait_list))
1094                 atomic_long_andnot(RWSEM_FLAG_WAITERS, &sem->count);
1095         else
1096                 rwsem_mark_wake(sem, RWSEM_WAKE_ANY, &wake_q);
1097         raw_spin_unlock_irq(&sem->wait_lock);
1098         wake_up_q(&wake_q);
1099         lockevent_inc(rwsem_wlock_fail);
1100
1101         return ERR_PTR(-EINTR);
1102 }
1103
1104 /*
1105  * handle waking up a waiter on the semaphore
1106  * - up_read/up_write has decremented the active part of count if we come here
1107  */
1108 static struct rw_semaphore *rwsem_wake(struct rw_semaphore *sem, long count)
1109 {
1110         unsigned long flags;
1111         DEFINE_WAKE_Q(wake_q);
1112
1113         raw_spin_lock_irqsave(&sem->wait_lock, flags);
1114
1115         if (!list_empty(&sem->wait_list))
1116                 rwsem_mark_wake(sem, RWSEM_WAKE_ANY, &wake_q);
1117
1118         raw_spin_unlock_irqrestore(&sem->wait_lock, flags);
1119         wake_up_q(&wake_q);
1120
1121         return sem;
1122 }
1123
1124 /*
1125  * downgrade a write lock into a read lock
1126  * - caller incremented waiting part of count and discovered it still negative
1127  * - just wake up any readers at the front of the queue
1128  */
1129 static struct rw_semaphore *rwsem_downgrade_wake(struct rw_semaphore *sem)
1130 {
1131         unsigned long flags;
1132         DEFINE_WAKE_Q(wake_q);
1133
1134         raw_spin_lock_irqsave(&sem->wait_lock, flags);
1135
1136         if (!list_empty(&sem->wait_list))
1137                 rwsem_mark_wake(sem, RWSEM_WAKE_READ_OWNED, &wake_q);
1138
1139         raw_spin_unlock_irqrestore(&sem->wait_lock, flags);
1140         wake_up_q(&wake_q);
1141
1142         return sem;
1143 }
1144
1145 /*
1146  * lock for reading
1147  */
1148 inline void __down_read(struct rw_semaphore *sem)
1149 {
1150         if (unlikely(atomic_long_fetch_add_acquire(RWSEM_READER_BIAS,
1151                         &sem->count) & RWSEM_READ_FAILED_MASK)) {
1152                 rwsem_down_read_slowpath(sem, TASK_UNINTERRUPTIBLE);
1153                 DEBUG_RWSEMS_WARN_ON(!is_rwsem_reader_owned(sem), sem);
1154         } else {
1155                 rwsem_set_reader_owned(sem);
1156         }
1157 }
1158
1159 static inline int __down_read_killable(struct rw_semaphore *sem)
1160 {
1161         if (unlikely(atomic_long_fetch_add_acquire(RWSEM_READER_BIAS,
1162                         &sem->count) & RWSEM_READ_FAILED_MASK)) {
1163                 if (IS_ERR(rwsem_down_read_slowpath(sem, TASK_KILLABLE)))
1164                         return -EINTR;
1165                 DEBUG_RWSEMS_WARN_ON(!is_rwsem_reader_owned(sem), sem);
1166         } else {
1167                 rwsem_set_reader_owned(sem);
1168         }
1169         return 0;
1170 }
1171
1172 static inline int __down_read_trylock(struct rw_semaphore *sem)
1173 {
1174         /*
1175          * Optimize for the case when the rwsem is not locked at all.
1176          */
1177         long tmp = RWSEM_UNLOCKED_VALUE;
1178
1179         do {
1180                 if (atomic_long_try_cmpxchg_acquire(&sem->count, &tmp,
1181                                         tmp + RWSEM_READER_BIAS)) {
1182                         rwsem_set_reader_owned(sem);
1183                         return 1;
1184                 }
1185         } while (!(tmp & RWSEM_READ_FAILED_MASK));
1186         return 0;
1187 }
1188
1189 /*
1190  * lock for writing
1191  */
1192 static inline void __down_write(struct rw_semaphore *sem)
1193 {
1194         long tmp = RWSEM_UNLOCKED_VALUE;
1195
1196         if (unlikely(!atomic_long_try_cmpxchg_acquire(&sem->count, &tmp,
1197                                                       RWSEM_WRITER_LOCKED)))
1198                 rwsem_down_write_slowpath(sem, TASK_UNINTERRUPTIBLE);
1199         rwsem_set_owner(sem);
1200 }
1201
1202 static inline int __down_write_killable(struct rw_semaphore *sem)
1203 {
1204         long tmp = RWSEM_UNLOCKED_VALUE;
1205
1206         if (unlikely(!atomic_long_try_cmpxchg_acquire(&sem->count, &tmp,
1207                                                       RWSEM_WRITER_LOCKED))) {
1208                 if (IS_ERR(rwsem_down_write_slowpath(sem, TASK_KILLABLE)))
1209                         return -EINTR;
1210         }
1211         rwsem_set_owner(sem);
1212         return 0;
1213 }
1214
1215 static inline int __down_write_trylock(struct rw_semaphore *sem)
1216 {
1217         long tmp = RWSEM_UNLOCKED_VALUE;
1218
1219         if (atomic_long_try_cmpxchg_acquire(&sem->count, &tmp,
1220                                             RWSEM_WRITER_LOCKED)) {
1221                 rwsem_set_owner(sem);
1222                 return true;
1223         }
1224         return false;
1225 }
1226
1227 /*
1228  * unlock after reading
1229  */
1230 inline void __up_read(struct rw_semaphore *sem)
1231 {
1232         long tmp;
1233
1234         DEBUG_RWSEMS_WARN_ON(!is_rwsem_reader_owned(sem), sem);
1235         rwsem_clear_reader_owned(sem);
1236         tmp = atomic_long_add_return_release(-RWSEM_READER_BIAS, &sem->count);
1237         if (unlikely((tmp & (RWSEM_LOCK_MASK|RWSEM_FLAG_WAITERS)) ==
1238                       RWSEM_FLAG_WAITERS)) {
1239                 clear_wr_nonspinnable(sem);
1240                 rwsem_wake(sem, tmp);
1241         }
1242 }
1243
1244 /*
1245  * unlock after writing
1246  */
1247 static inline void __up_write(struct rw_semaphore *sem)
1248 {
1249         long tmp;
1250
1251         /*
1252          * sem->owner may differ from current if the ownership is transferred
1253          * to an anonymous writer by setting the RWSEM_NONSPINNABLE bits.
1254          */
1255         DEBUG_RWSEMS_WARN_ON((rwsem_owner(sem) != current) &&
1256                             !rwsem_test_oflags(sem, RWSEM_NONSPINNABLE), sem);
1257         rwsem_clear_owner(sem);
1258         tmp = atomic_long_fetch_add_release(-RWSEM_WRITER_LOCKED, &sem->count);
1259         if (unlikely(tmp & RWSEM_FLAG_WAITERS))
1260                 rwsem_wake(sem, tmp);
1261 }
1262
1263 /*
1264  * downgrade write lock to read lock
1265  */
1266 static inline void __downgrade_write(struct rw_semaphore *sem)
1267 {
1268         long tmp;
1269
1270         /*
1271          * When downgrading from exclusive to shared ownership,
1272          * anything inside the write-locked region cannot leak
1273          * into the read side. In contrast, anything in the
1274          * read-locked region is ok to be re-ordered into the
1275          * write side. As such, rely on RELEASE semantics.
1276          */
1277         DEBUG_RWSEMS_WARN_ON(rwsem_owner(sem) != current, sem);
1278         tmp = atomic_long_fetch_add_release(
1279                 -RWSEM_WRITER_LOCKED+RWSEM_READER_BIAS, &sem->count);
1280         rwsem_set_reader_owned(sem);
1281         if (tmp & RWSEM_FLAG_WAITERS)
1282                 rwsem_downgrade_wake(sem);
1283 }
1284
1285 /*
1286  * lock for reading
1287  */
1288 void __sched down_read(struct rw_semaphore *sem)
1289 {
1290         might_sleep();
1291         rwsem_acquire_read(&sem->dep_map, 0, 0, _RET_IP_);
1292
1293         LOCK_CONTENDED(sem, __down_read_trylock, __down_read);
1294 }
1295 EXPORT_SYMBOL(down_read);
1296
1297 int __sched down_read_killable(struct rw_semaphore *sem)
1298 {
1299         might_sleep();
1300         rwsem_acquire_read(&sem->dep_map, 0, 0, _RET_IP_);
1301
1302         if (LOCK_CONTENDED_RETURN(sem, __down_read_trylock, __down_read_killable)) {
1303                 rwsem_release(&sem->dep_map, 1, _RET_IP_);
1304                 return -EINTR;
1305         }
1306
1307         return 0;
1308 }
1309 EXPORT_SYMBOL(down_read_killable);
1310
1311 /*
1312  * trylock for reading -- returns 1 if successful, 0 if contention
1313  */
1314 int down_read_trylock(struct rw_semaphore *sem)
1315 {
1316         int ret = __down_read_trylock(sem);
1317
1318         if (ret == 1)
1319                 rwsem_acquire_read(&sem->dep_map, 0, 1, _RET_IP_);
1320         return ret;
1321 }
1322 EXPORT_SYMBOL(down_read_trylock);
1323
1324 /*
1325  * lock for writing
1326  */
1327 void __sched down_write(struct rw_semaphore *sem)
1328 {
1329         might_sleep();
1330         rwsem_acquire(&sem->dep_map, 0, 0, _RET_IP_);
1331         LOCK_CONTENDED(sem, __down_write_trylock, __down_write);
1332 }
1333 EXPORT_SYMBOL(down_write);
1334
1335 /*
1336  * lock for writing
1337  */
1338 int __sched down_write_killable(struct rw_semaphore *sem)
1339 {
1340         might_sleep();
1341         rwsem_acquire(&sem->dep_map, 0, 0, _RET_IP_);
1342
1343         if (LOCK_CONTENDED_RETURN(sem, __down_write_trylock,
1344                                   __down_write_killable)) {
1345                 rwsem_release(&sem->dep_map, 1, _RET_IP_);
1346                 return -EINTR;
1347         }
1348
1349         return 0;
1350 }
1351 EXPORT_SYMBOL(down_write_killable);
1352
1353 /*
1354  * trylock for writing -- returns 1 if successful, 0 if contention
1355  */
1356 int down_write_trylock(struct rw_semaphore *sem)
1357 {
1358         int ret = __down_write_trylock(sem);
1359
1360         if (ret == 1)
1361                 rwsem_acquire(&sem->dep_map, 0, 1, _RET_IP_);
1362
1363         return ret;
1364 }
1365 EXPORT_SYMBOL(down_write_trylock);
1366
1367 /*
1368  * release a read lock
1369  */
1370 void up_read(struct rw_semaphore *sem)
1371 {
1372         rwsem_release(&sem->dep_map, 1, _RET_IP_);
1373         __up_read(sem);
1374 }
1375 EXPORT_SYMBOL(up_read);
1376
1377 /*
1378  * release a write lock
1379  */
1380 void up_write(struct rw_semaphore *sem)
1381 {
1382         rwsem_release(&sem->dep_map, 1, _RET_IP_);
1383         __up_write(sem);
1384 }
1385 EXPORT_SYMBOL(up_write);
1386
1387 /*
1388  * downgrade write lock to read lock
1389  */
1390 void downgrade_write(struct rw_semaphore *sem)
1391 {
1392         lock_downgrade(&sem->dep_map, _RET_IP_);
1393         __downgrade_write(sem);
1394 }
1395 EXPORT_SYMBOL(downgrade_write);
1396
1397 #ifdef CONFIG_DEBUG_LOCK_ALLOC
1398
1399 void down_read_nested(struct rw_semaphore *sem, int subclass)
1400 {
1401         might_sleep();
1402         rwsem_acquire_read(&sem->dep_map, subclass, 0, _RET_IP_);
1403         LOCK_CONTENDED(sem, __down_read_trylock, __down_read);
1404 }
1405 EXPORT_SYMBOL(down_read_nested);
1406
1407 void _down_write_nest_lock(struct rw_semaphore *sem, struct lockdep_map *nest)
1408 {
1409         might_sleep();
1410         rwsem_acquire_nest(&sem->dep_map, 0, 0, nest, _RET_IP_);
1411         LOCK_CONTENDED(sem, __down_write_trylock, __down_write);
1412 }
1413 EXPORT_SYMBOL(_down_write_nest_lock);
1414
1415 void down_read_non_owner(struct rw_semaphore *sem)
1416 {
1417         might_sleep();
1418         __down_read(sem);
1419         __rwsem_set_reader_owned(sem, NULL);
1420 }
1421 EXPORT_SYMBOL(down_read_non_owner);
1422
1423 void down_write_nested(struct rw_semaphore *sem, int subclass)
1424 {
1425         might_sleep();
1426         rwsem_acquire(&sem->dep_map, subclass, 0, _RET_IP_);
1427         LOCK_CONTENDED(sem, __down_write_trylock, __down_write);
1428 }
1429 EXPORT_SYMBOL(down_write_nested);
1430
1431 int __sched down_write_killable_nested(struct rw_semaphore *sem, int subclass)
1432 {
1433         might_sleep();
1434         rwsem_acquire(&sem->dep_map, subclass, 0, _RET_IP_);
1435
1436         if (LOCK_CONTENDED_RETURN(sem, __down_write_trylock,
1437                                   __down_write_killable)) {
1438                 rwsem_release(&sem->dep_map, 1, _RET_IP_);
1439                 return -EINTR;
1440         }
1441
1442         return 0;
1443 }
1444 EXPORT_SYMBOL(down_write_killable_nested);
1445
1446 void up_read_non_owner(struct rw_semaphore *sem)
1447 {
1448         DEBUG_RWSEMS_WARN_ON(!is_rwsem_reader_owned(sem), sem);
1449         __up_read(sem);
1450 }
1451 EXPORT_SYMBOL(up_read_non_owner);
1452
1453 #endif