]> asedeno.scripts.mit.edu Git - linux.git/blobdiff - net/rds/ib_recv.c
Merge tag 'for-linus' of git://git.kernel.org/pub/scm/linux/kernel/git/dledford/rdma
[linux.git] / net / rds / ib_recv.c
index 0ceb4c60d2a3aa657c09dc4228cd02174765d9cc..f43831e4186a3543af1b8cd8beba803c66a233d3 100644 (file)
@@ -297,7 +297,7 @@ static struct rds_page_frag *rds_ib_refill_one_frag(struct rds_ib_connection *ic
 }
 
 static int rds_ib_recv_refill_one(struct rds_connection *conn,
-                                 struct rds_ib_recv_work *recv, int prefill)
+                                 struct rds_ib_recv_work *recv, gfp_t gfp)
 {
        struct rds_ib_connection *ic = conn->c_transport_data;
        struct ib_sge *sge;
@@ -305,7 +305,7 @@ static int rds_ib_recv_refill_one(struct rds_connection *conn,
        gfp_t slab_mask = GFP_NOWAIT;
        gfp_t page_mask = GFP_NOWAIT;
 
-       if (prefill) {
+       if (gfp & __GFP_WAIT) {
                slab_mask = GFP_KERNEL;
                page_mask = GFP_HIGHUSER;
        }
@@ -347,6 +347,24 @@ static int rds_ib_recv_refill_one(struct rds_connection *conn,
        return ret;
 }
 
+static int acquire_refill(struct rds_connection *conn)
+{
+       return test_and_set_bit(RDS_RECV_REFILL, &conn->c_flags) == 0;
+}
+
+static void release_refill(struct rds_connection *conn)
+{
+       clear_bit(RDS_RECV_REFILL, &conn->c_flags);
+
+       /* We don't use wait_on_bit()/wake_up_bit() because our waking is in a
+        * hot path and finding waiters is very rare.  We don't want to walk
+        * the system-wide hashed waitqueue buckets in the fast path only to
+        * almost never find waiters.
+        */
+       if (waitqueue_active(&conn->c_waitq))
+               wake_up_all(&conn->c_waitq);
+}
+
 /*
  * This tries to allocate and post unused work requests after making sure that
  * they have all the allocations they need to queue received fragments into
@@ -354,15 +372,23 @@ static int rds_ib_recv_refill_one(struct rds_connection *conn,
  *
  * -1 is returned if posting fails due to temporary resource exhaustion.
  */
-void rds_ib_recv_refill(struct rds_connection *conn, int prefill)
+void rds_ib_recv_refill(struct rds_connection *conn, int prefill, gfp_t gfp)
 {
        struct rds_ib_connection *ic = conn->c_transport_data;
        struct rds_ib_recv_work *recv;
        struct ib_recv_wr *failed_wr;
        unsigned int posted = 0;
        int ret = 0;
+       bool can_wait = !!(gfp & __GFP_WAIT);
        u32 pos;
 
+       /* the goal here is to just make sure that someone, somewhere
+        * is posting buffers.  If we can't get the refill lock,
+        * let them do their thing
+        */
+       if (!acquire_refill(conn))
+               return;
+
        while ((prefill || rds_conn_up(conn)) &&
               rds_ib_ring_alloc(&ic->i_recv_ring, 1, &pos)) {
                if (pos >= ic->i_recv_ring.w_nr) {
@@ -372,7 +398,7 @@ void rds_ib_recv_refill(struct rds_connection *conn, int prefill)
                }
 
                recv = &ic->i_recvs[pos];
-               ret = rds_ib_recv_refill_one(conn, recv, prefill);
+               ret = rds_ib_recv_refill_one(conn, recv, gfp);
                if (ret) {
                        break;
                }
@@ -402,6 +428,24 @@ void rds_ib_recv_refill(struct rds_connection *conn, int prefill)
 
        if (ret)
                rds_ib_ring_unalloc(&ic->i_recv_ring, 1);
+
+       release_refill(conn);
+
+       /* if we're called from the softirq handler, we'll be GFP_NOWAIT.
+        * in this case the ring being low is going to lead to more interrupts
+        * and we can safely let the softirq code take care of it unless the
+        * ring is completely empty.
+        *
+        * if we're called from krdsd, we'll be GFP_KERNEL.  In this case
+        * we might have raced with the softirq code while we had the refill
+        * lock held.  Use rds_ib_ring_low() instead of ring_empty to decide
+        * if we should requeue.
+        */
+       if (rds_conn_up(conn) &&
+           ((can_wait && rds_ib_ring_low(&ic->i_recv_ring)) ||
+           rds_ib_ring_empty(&ic->i_recv_ring))) {
+               queue_delayed_work(rds_wq, &conn->c_recv_w, 1);
+       }
 }
 
 /*
@@ -982,10 +1026,17 @@ static inline void rds_poll_cq(struct rds_ib_connection *ic,
                }
 
                /*
-                * It's very important that we only free this ring entry if we've truly
-                * freed the resources allocated to the entry.  The refilling path can
-                * leak if we don't.
+                * rds_ib_process_recv() doesn't always consume the frag, and
+                * we might not have called it at all if the wc didn't indicate
+                * success. We already unmapped the frag's pages, though, and
+                * the following rds_ib_ring_free() call tells the refill path
+                * that it will not find an allocated frag here. Make sure we
+                * keep that promise by freeing a frag that's still on the ring.
                 */
+               if (recv->r_frag) {
+                       rds_ib_frag_free(ic, recv->r_frag);
+                       recv->r_frag = NULL;
+               }
                rds_ib_ring_free(&ic->i_recv_ring, 1);
        }
 }
@@ -1016,7 +1067,7 @@ void rds_ib_recv_tasklet_fn(unsigned long data)
                rds_ib_stats_inc(s_ib_rx_ring_empty);
 
        if (rds_ib_ring_low(&ic->i_recv_ring))
-               rds_ib_recv_refill(conn, 0);
+               rds_ib_recv_refill(conn, 0, GFP_NOWAIT);
 }
 
 int rds_ib_recv(struct rds_connection *conn)
@@ -1025,8 +1076,10 @@ int rds_ib_recv(struct rds_connection *conn)
        int ret = 0;
 
        rdsdebug("conn %p\n", conn);
-       if (rds_conn_up(conn))
+       if (rds_conn_up(conn)) {
                rds_ib_attempt_ack(ic);
+               rds_ib_recv_refill(conn, 0, GFP_KERNEL);
+       }
 
        return ret;
 }
@@ -1049,9 +1102,10 @@ int rds_ib_recv_init(void)
        rds_ib_frag_slab = kmem_cache_create("rds_ib_frag",
                                        sizeof(struct rds_page_frag),
                                        0, SLAB_HWCACHE_ALIGN, NULL);
-       if (!rds_ib_frag_slab)
+       if (!rds_ib_frag_slab) {
                kmem_cache_destroy(rds_ib_incoming_slab);
-       else
+               rds_ib_incoming_slab = NULL;
+       } else
                ret = 0;
 out:
        return ret;