]> asedeno.scripts.mit.edu Git - linux.git/blobdiff - kernel/locking/rwsem.c
locking/rwsem: Implement lock handoff to prevent lock starvation
[linux.git] / kernel / locking / rwsem.c
index 8d0f2acfe13d52402f50b00769710693e72f4eef..decda9fb8c6d376fd2b6d0412db5c399a360d05d 100644 (file)
@@ -10,8 +10,9 @@
  * Optimistic spinning by Tim Chen <tim.c.chen@intel.com>
  * and Davidlohr Bueso <davidlohr@hp.com>. Based on mutexes.
  *
- * Rwsem count bit fields re-definition and rwsem rearchitecture
- * by Waiman Long <longman@redhat.com>.
+ * Rwsem count bit fields re-definition and rwsem rearchitecture by
+ * Waiman Long <longman@redhat.com> and
+ * Peter Zijlstra <peterz@infradead.org>.
  */
 
 #include <linux/types.h>
  *
  * Bit  0   - writer locked bit
  * Bit  1   - waiters present bit
- * Bits 2-7 - reserved
+ * Bit  2   - lock handoff bit
+ * Bits 3-7 - reserved
  * Bits 8-X - 24-bit (32-bit) or 56-bit reader count
  *
  * atomic_long_fetch_add() is used to obtain reader lock, whereas
  * atomic_long_cmpxchg() will be used to obtain writer lock.
+ *
+ * There are three places where the lock handoff bit may be set or cleared.
+ * 1) rwsem_mark_wake() for readers.
+ * 2) rwsem_try_write_lock() for writers.
+ * 3) Error path of rwsem_down_write_slowpath().
+ *
+ * For all the above cases, wait_lock will be held. A writer must also
+ * be the first one in the wait_list to be eligible for setting the handoff
+ * bit. So concurrent setting/clearing of handoff bit is not possible.
  */
 #define RWSEM_WRITER_LOCKED    (1UL << 0)
 #define RWSEM_FLAG_WAITERS     (1UL << 1)
+#define RWSEM_FLAG_HANDOFF     (1UL << 2)
+
 #define RWSEM_READER_SHIFT     8
 #define RWSEM_READER_BIAS      (1UL << RWSEM_READER_SHIFT)
 #define RWSEM_READER_MASK      (~(RWSEM_READER_BIAS - 1))
 #define RWSEM_WRITER_MASK      RWSEM_WRITER_LOCKED
 #define RWSEM_LOCK_MASK                (RWSEM_WRITER_MASK|RWSEM_READER_MASK)
-#define RWSEM_READ_FAILED_MASK (RWSEM_WRITER_MASK|RWSEM_FLAG_WAITERS)
+#define RWSEM_READ_FAILED_MASK (RWSEM_WRITER_MASK|RWSEM_FLAG_WAITERS|\
+                                RWSEM_FLAG_HANDOFF)
 
 /*
  * All writes to owner are protected by WRITE_ONCE() to make sure that
@@ -216,7 +230,10 @@ struct rwsem_waiter {
        struct list_head list;
        struct task_struct *task;
        enum rwsem_waiter_type type;
+       unsigned long timeout;
 };
+#define rwsem_first_waiter(sem) \
+       list_first_entry(&sem->wait_list, struct rwsem_waiter, list)
 
 enum rwsem_wake_type {
        RWSEM_WAKE_ANY,         /* Wake whatever's at head of wait list */
@@ -224,6 +241,19 @@ enum rwsem_wake_type {
        RWSEM_WAKE_READ_OWNED   /* Waker thread holds the read lock */
 };
 
+enum writer_wait_state {
+       WRITER_NOT_FIRST,       /* Writer is not first in wait list */
+       WRITER_FIRST,           /* Writer is first in wait list     */
+       WRITER_HANDOFF          /* Writer is first & handoff needed */
+};
+
+/*
+ * The typical HZ value is either 250 or 1000. So set the minimum waiting
+ * time to at least 4ms or 1 jiffy (if it is higher than 4ms) in the wait
+ * queue before initiating the handoff protocol.
+ */
+#define RWSEM_WAIT_TIMEOUT     DIV_ROUND_UP(HZ, 250)
+
 /*
  * handle the lock release when processes blocked on it that can now run
  * - if we come here from up_xxxx(), then the RWSEM_FLAG_WAITERS bit must
@@ -244,11 +274,13 @@ static void rwsem_mark_wake(struct rw_semaphore *sem,
        long oldcount, woken = 0, adjustment = 0;
        struct list_head wlist;
 
+       lockdep_assert_held(&sem->wait_lock);
+
        /*
         * Take a peek at the queue head waiter such that we can determine
         * the wakeup(s) to perform.
         */
-       waiter = list_first_entry(&sem->wait_list, struct rwsem_waiter, list);
+       waiter = rwsem_first_waiter(sem);
 
        if (waiter->type == RWSEM_WAITING_FOR_WRITE) {
                if (wake_type == RWSEM_WAKE_ANY) {
@@ -275,7 +307,18 @@ static void rwsem_mark_wake(struct rw_semaphore *sem,
                adjustment = RWSEM_READER_BIAS;
                oldcount = atomic_long_fetch_add(adjustment, &sem->count);
                if (unlikely(oldcount & RWSEM_WRITER_MASK)) {
-                       atomic_long_sub(adjustment, &sem->count);
+                       /*
+                        * When we've been waiting "too" long (for writers
+                        * to give up the lock), request a HANDOFF to
+                        * force the issue.
+                        */
+                       if (!(oldcount & RWSEM_FLAG_HANDOFF) &&
+                           time_after(jiffies, waiter->timeout)) {
+                               adjustment -= RWSEM_FLAG_HANDOFF;
+                               lockevent_inc(rwsem_rlock_handoff);
+                       }
+
+                       atomic_long_add(-adjustment, &sem->count);
                        return;
                }
                /*
@@ -317,6 +360,13 @@ static void rwsem_mark_wake(struct rw_semaphore *sem,
                adjustment -= RWSEM_FLAG_WAITERS;
        }
 
+       /*
+        * When we've woken a reader, we no longer need to force writers
+        * to give up the lock and we can clear HANDOFF.
+        */
+       if (woken && (atomic_long_read(&sem->count) & RWSEM_FLAG_HANDOFF))
+               adjustment -= RWSEM_FLAG_HANDOFF;
+
        if (adjustment)
                atomic_long_add(adjustment, &sem->count);
 
@@ -346,23 +396,48 @@ static void rwsem_mark_wake(struct rw_semaphore *sem,
  * This function must be called with the sem->wait_lock held to prevent
  * race conditions between checking the rwsem wait list and setting the
  * sem->count accordingly.
+ *
+ * If wstate is WRITER_HANDOFF, it will make sure that either the handoff
+ * bit is set or the lock is acquired with handoff bit cleared.
  */
-static inline bool rwsem_try_write_lock(long count, struct rw_semaphore *sem)
+static inline bool rwsem_try_write_lock(long count, struct rw_semaphore *sem,
+                                       enum writer_wait_state wstate)
 {
        long new;
 
-       if (count & RWSEM_LOCK_MASK)
-               return false;
+       lockdep_assert_held(&sem->wait_lock);
 
-       new = count + RWSEM_WRITER_LOCKED -
-            (list_is_singular(&sem->wait_list) ? RWSEM_FLAG_WAITERS : 0);
+       do {
+               bool has_handoff = !!(count & RWSEM_FLAG_HANDOFF);
 
-       if (atomic_long_try_cmpxchg_acquire(&sem->count, &count, new)) {
-               rwsem_set_owner(sem);
-               return true;
-       }
+               if (has_handoff && wstate == WRITER_NOT_FIRST)
+                       return false;
 
-       return false;
+               new = count;
+
+               if (count & RWSEM_LOCK_MASK) {
+                       if (has_handoff || (wstate != WRITER_HANDOFF))
+                               return false;
+
+                       new |= RWSEM_FLAG_HANDOFF;
+               } else {
+                       new |= RWSEM_WRITER_LOCKED;
+                       new &= ~RWSEM_FLAG_HANDOFF;
+
+                       if (list_is_singular(&sem->wait_list))
+                               new &= ~RWSEM_FLAG_WAITERS;
+               }
+       } while (!atomic_long_try_cmpxchg_acquire(&sem->count, &count, new));
+
+       /*
+        * We have either acquired the lock with handoff bit cleared or
+        * set the handoff bit.
+        */
+       if (new & RWSEM_FLAG_HANDOFF)
+               return false;
+
+       rwsem_set_owner(sem);
+       return true;
 }
 
 #ifdef CONFIG_RWSEM_SPIN_ON_OWNER
@@ -373,9 +448,9 @@ static inline bool rwsem_try_write_lock_unqueued(struct rw_semaphore *sem)
 {
        long count = atomic_long_read(&sem->count);
 
-       while (!(count & RWSEM_LOCK_MASK)) {
+       while (!(count & (RWSEM_LOCK_MASK|RWSEM_FLAG_HANDOFF))) {
                if (atomic_long_try_cmpxchg_acquire(&sem->count, &count,
-                                       count + RWSEM_WRITER_LOCKED)) {
+                                       count | RWSEM_WRITER_LOCKED)) {
                        rwsem_set_owner(sem);
                        lockevent_inc(rwsem_opt_wlock);
                        return true;
@@ -456,6 +531,11 @@ static noinline enum owner_state rwsem_spin_on_owner(struct rw_semaphore *sem)
 
        rcu_read_lock();
        for (;;) {
+               if (atomic_long_read(&sem->count) & RWSEM_FLAG_HANDOFF) {
+                       state = OWNER_NONSPINNABLE;
+                       break;
+               }
+
                tmp = READ_ONCE(sem->owner);
                if (tmp != owner) {
                        state = rwsem_owner_state((unsigned long)tmp);
@@ -553,16 +633,18 @@ rwsem_down_read_slowpath(struct rw_semaphore *sem, int state)
 
        waiter.task = current;
        waiter.type = RWSEM_WAITING_FOR_READ;
+       waiter.timeout = jiffies + RWSEM_WAIT_TIMEOUT;
 
        raw_spin_lock_irq(&sem->wait_lock);
        if (list_empty(&sem->wait_list)) {
                /*
                 * In case the wait queue is empty and the lock isn't owned
-                * by a writer, this reader can exit the slowpath and return
-                * immediately as its RWSEM_READER_BIAS has already been
-                * set in the count.
+                * by a writer or has the handoff bit set, this reader can
+                * exit the slowpath and return immediately as its
+                * RWSEM_READER_BIAS has already been set in the count.
                 */
-               if (!(atomic_long_read(&sem->count) & RWSEM_WRITER_MASK)) {
+               if (!(atomic_long_read(&sem->count) &
+                    (RWSEM_WRITER_MASK | RWSEM_FLAG_HANDOFF))) {
                        raw_spin_unlock_irq(&sem->wait_lock);
                        rwsem_set_reader_owned(sem);
                        lockevent_inc(rwsem_rlock_fast);
@@ -609,8 +691,10 @@ rwsem_down_read_slowpath(struct rw_semaphore *sem, int state)
        return sem;
 out_nolock:
        list_del(&waiter.list);
-       if (list_empty(&sem->wait_list))
-               atomic_long_andnot(RWSEM_FLAG_WAITERS, &sem->count);
+       if (list_empty(&sem->wait_list)) {
+               atomic_long_andnot(RWSEM_FLAG_WAITERS|RWSEM_FLAG_HANDOFF,
+                                  &sem->count);
+       }
        raw_spin_unlock_irq(&sem->wait_lock);
        __set_current_state(TASK_RUNNING);
        lockevent_inc(rwsem_rlock_fail);
@@ -624,7 +708,7 @@ static struct rw_semaphore *
 rwsem_down_write_slowpath(struct rw_semaphore *sem, int state)
 {
        long count;
-       bool waiting = true; /* any queued threads before us */
+       enum writer_wait_state wstate;
        struct rwsem_waiter waiter;
        struct rw_semaphore *ret = sem;
        DEFINE_WAKE_Q(wake_q);
@@ -639,66 +723,95 @@ rwsem_down_write_slowpath(struct rw_semaphore *sem, int state)
         */
        waiter.task = current;
        waiter.type = RWSEM_WAITING_FOR_WRITE;
+       waiter.timeout = jiffies + RWSEM_WAIT_TIMEOUT;
 
        raw_spin_lock_irq(&sem->wait_lock);
 
        /* account for this before adding a new element to the list */
-       if (list_empty(&sem->wait_list))
-               waiting = false;
+       wstate = list_empty(&sem->wait_list) ? WRITER_FIRST : WRITER_NOT_FIRST;
 
        list_add_tail(&waiter.list, &sem->wait_list);
 
        /* we're now waiting on the lock */
-       if (waiting) {
+       if (wstate == WRITER_NOT_FIRST) {
                count = atomic_long_read(&sem->count);
 
                /*
-                * If there were already threads queued before us and there are
-                * no active writers and some readers, the lock must be read
-                * owned; so we try to  any read locks that were queued ahead
-                * of us.
+                * If there were already threads queued before us and:
+                *  1) there are no no active locks, wake the front
+                *     queued process(es) as the handoff bit might be set.
+                *  2) there are no active writers and some readers, the lock
+                *     must be read owned; so we try to wake any read lock
+                *     waiters that were queued ahead of us.
                 */
-               if (!(count & RWSEM_WRITER_MASK) &&
-                    (count & RWSEM_READER_MASK)) {
-                       rwsem_mark_wake(sem, RWSEM_WAKE_READERS, &wake_q);
-                       /*
-                        * The wakeup is normally called _after_ the wait_lock
-                        * is released, but given that we are proactively waking
-                        * readers we can deal with the wake_q overhead as it is
-                        * similar to releasing and taking the wait_lock again
-                        * for attempting rwsem_try_write_lock().
-                        */
-                       wake_up_q(&wake_q);
+               if (count & RWSEM_WRITER_MASK)
+                       goto wait;
 
-                       /*
-                        * Reinitialize wake_q after use.
-                        */
-                       wake_q_init(&wake_q);
-               }
+               rwsem_mark_wake(sem, (count & RWSEM_READER_MASK)
+                                       ? RWSEM_WAKE_READERS
+                                       : RWSEM_WAKE_ANY, &wake_q);
 
+               /*
+                * The wakeup is normally called _after_ the wait_lock
+                * is released, but given that we are proactively waking
+                * readers we can deal with the wake_q overhead as it is
+                * similar to releasing and taking the wait_lock again
+                * for attempting rwsem_try_write_lock().
+                */
+               wake_up_q(&wake_q);
+
+               /* We need wake_q again below, reinitialize */
+               wake_q_init(&wake_q);
        } else {
                count = atomic_long_add_return(RWSEM_FLAG_WAITERS, &sem->count);
        }
 
+wait:
        /* wait until we successfully acquire the lock */
        set_current_state(state);
        while (true) {
-               if (rwsem_try_write_lock(count, sem))
+               if (rwsem_try_write_lock(count, sem, wstate))
                        break;
+
                raw_spin_unlock_irq(&sem->wait_lock);
 
                /* Block until there are no active lockers. */
-               do {
+               for (;;) {
                        if (signal_pending_state(state, current))
                                goto out_nolock;
 
                        schedule();
                        lockevent_inc(rwsem_sleep_writer);
                        set_current_state(state);
+                       /*
+                        * If HANDOFF bit is set, unconditionally do
+                        * a trylock.
+                        */
+                       if (wstate == WRITER_HANDOFF)
+                               break;
+
+                       if ((wstate == WRITER_NOT_FIRST) &&
+                           (rwsem_first_waiter(sem) == &waiter))
+                               wstate = WRITER_FIRST;
+
                        count = atomic_long_read(&sem->count);
-               } while (count & RWSEM_LOCK_MASK);
+                       if (!(count & RWSEM_LOCK_MASK))
+                               break;
+
+                       /*
+                        * The setting of the handoff bit is deferred
+                        * until rwsem_try_write_lock() is called.
+                        */
+                       if ((wstate == WRITER_FIRST) && (rt_task(current) ||
+                           time_after(jiffies, waiter.timeout))) {
+                               wstate = WRITER_HANDOFF;
+                               lockevent_inc(rwsem_wlock_handoff);
+                               break;
+                       }
+               }
 
                raw_spin_lock_irq(&sem->wait_lock);
+               count = atomic_long_read(&sem->count);
        }
        __set_current_state(TASK_RUNNING);
        list_del(&waiter.list);
@@ -711,6 +824,10 @@ rwsem_down_write_slowpath(struct rw_semaphore *sem, int state)
        __set_current_state(TASK_RUNNING);
        raw_spin_lock_irq(&sem->wait_lock);
        list_del(&waiter.list);
+
+       if (unlikely(wstate == WRITER_HANDOFF))
+               atomic_long_add(-RWSEM_FLAG_HANDOFF,  &sem->count);
+
        if (list_empty(&sem->wait_list))
                atomic_long_andnot(RWSEM_FLAG_WAITERS, &sem->count);
        else
@@ -726,7 +843,7 @@ rwsem_down_write_slowpath(struct rw_semaphore *sem, int state)
  * handle waking up a waiter on the semaphore
  * - up_read/up_write has decremented the active part of count if we come here
  */
-static struct rw_semaphore *rwsem_wake(struct rw_semaphore *sem)
+static struct rw_semaphore *rwsem_wake(struct rw_semaphore *sem, long count)
 {
        unsigned long flags;
        DEFINE_WAKE_Q(wake_q);
@@ -859,7 +976,7 @@ inline void __up_read(struct rw_semaphore *sem)
        tmp = atomic_long_add_return_release(-RWSEM_READER_BIAS, &sem->count);
        if (unlikely((tmp & (RWSEM_LOCK_MASK|RWSEM_FLAG_WAITERS)) ==
                      RWSEM_FLAG_WAITERS))
-               rwsem_wake(sem);
+               rwsem_wake(sem, tmp);
 }
 
 /*
@@ -873,7 +990,7 @@ static inline void __up_write(struct rw_semaphore *sem)
        rwsem_clear_owner(sem);
        tmp = atomic_long_fetch_add_release(-RWSEM_WRITER_LOCKED, &sem->count);
        if (unlikely(tmp & RWSEM_FLAG_WAITERS))
-               rwsem_wake(sem);
+               rwsem_wake(sem, tmp);
 }
 
 /*