]> asedeno.scripts.mit.edu Git - linux.git/commitdiff
xprtrdma: Use an llist to manage free rpcrdma_reps
authorChuck Lever <chuck.lever@oracle.com>
Mon, 19 Aug 2019 22:48:43 +0000 (18:48 -0400)
committerAnna Schumaker <Anna.Schumaker@Netapp.com>
Wed, 21 Aug 2019 15:45:27 +0000 (11:45 -0400)
rpcrdma_rep objects are removed from their free list by only a
single thread: the Receive completion handler. Thus that free list
can be converted to an llist, where a single-threaded consumer and
a multi-threaded producer (rpcrdma_buffer_put) can both access the
llist without the need for any serialization.

This eliminates spin lock contention between the Receive completion
handler and rpcrdma_buffer_get, and makes the rep consumer wait-
free.

Signed-off-by: Chuck Lever <chuck.lever@oracle.com>
Signed-off-by: Anna Schumaker <Anna.Schumaker@Netapp.com>
net/sunrpc/xprtrdma/verbs.c
net/sunrpc/xprtrdma/xprt_rdma.h

index 52444c4d1be2e9aa20f69c4b13d24f82ef3987cf..db90083ed35b605cfc2941f4fe5b9fe0e7c77362 100644 (file)
@@ -75,6 +75,7 @@
  * internal functions
  */
 static void rpcrdma_sendctx_put_locked(struct rpcrdma_sendctx *sc);
+static void rpcrdma_reps_destroy(struct rpcrdma_buffer *buf);
 static void rpcrdma_mrs_create(struct rpcrdma_xprt *r_xprt);
 static void rpcrdma_mrs_destroy(struct rpcrdma_buffer *buf);
 static void rpcrdma_mr_free(struct rpcrdma_mr *mr);
@@ -407,7 +408,6 @@ rpcrdma_ia_remove(struct rpcrdma_ia *ia)
        struct rpcrdma_ep *ep = &r_xprt->rx_ep;
        struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
        struct rpcrdma_req *req;
-       struct rpcrdma_rep *rep;
 
        cancel_work_sync(&buf->rb_refresh_worker);
 
@@ -431,8 +431,7 @@ rpcrdma_ia_remove(struct rpcrdma_ia *ia)
        /* The ULP is responsible for ensuring all DMA
         * mappings and MRs are gone.
         */
-       list_for_each_entry(rep, &buf->rb_recv_bufs, rr_list)
-               rpcrdma_regbuf_dma_unmap(rep->rr_rdmabuf);
+       rpcrdma_reps_destroy(buf);
        list_for_each_entry(req, &buf->rb_allreqs, rl_all) {
                rpcrdma_regbuf_dma_unmap(req->rl_rdmabuf);
                rpcrdma_regbuf_dma_unmap(req->rl_sendbuf);
@@ -1071,6 +1070,40 @@ static struct rpcrdma_rep *rpcrdma_rep_create(struct rpcrdma_xprt *r_xprt,
        return NULL;
 }
 
+static void rpcrdma_rep_destroy(struct rpcrdma_rep *rep)
+{
+       rpcrdma_regbuf_free(rep->rr_rdmabuf);
+       kfree(rep);
+}
+
+static struct rpcrdma_rep *rpcrdma_rep_get_locked(struct rpcrdma_buffer *buf)
+{
+       struct llist_node *node;
+
+       /* Calls to llist_del_first are required to be serialized */
+       node = llist_del_first(&buf->rb_free_reps);
+       if (!node)
+               return NULL;
+       return llist_entry(node, struct rpcrdma_rep, rr_node);
+}
+
+static void rpcrdma_rep_put(struct rpcrdma_buffer *buf,
+                           struct rpcrdma_rep *rep)
+{
+       if (!rep->rr_temp)
+               llist_add(&rep->rr_node, &buf->rb_free_reps);
+       else
+               rpcrdma_rep_destroy(rep);
+}
+
+static void rpcrdma_reps_destroy(struct rpcrdma_buffer *buf)
+{
+       struct rpcrdma_rep *rep;
+
+       while ((rep = rpcrdma_rep_get_locked(buf)) != NULL)
+               rpcrdma_rep_destroy(rep);
+}
+
 /**
  * rpcrdma_buffer_create - Create initial set of req/rep objects
  * @r_xprt: transport instance to (re)initialize
@@ -1106,7 +1139,7 @@ int rpcrdma_buffer_create(struct rpcrdma_xprt *r_xprt)
        }
 
        buf->rb_credits = 1;
-       INIT_LIST_HEAD(&buf->rb_recv_bufs);
+       init_llist_head(&buf->rb_free_reps);
 
        rc = rpcrdma_sendctxs_create(r_xprt);
        if (rc)
@@ -1118,12 +1151,6 @@ int rpcrdma_buffer_create(struct rpcrdma_xprt *r_xprt)
        return rc;
 }
 
-static void rpcrdma_rep_destroy(struct rpcrdma_rep *rep)
-{
-       rpcrdma_regbuf_free(rep->rr_rdmabuf);
-       kfree(rep);
-}
-
 /**
  * rpcrdma_req_destroy - Destroy an rpcrdma_req object
  * @req: unused object to be destroyed
@@ -1182,15 +1209,7 @@ rpcrdma_buffer_destroy(struct rpcrdma_buffer *buf)
        cancel_work_sync(&buf->rb_refresh_worker);
 
        rpcrdma_sendctxs_destroy(buf);
-
-       while (!list_empty(&buf->rb_recv_bufs)) {
-               struct rpcrdma_rep *rep;
-
-               rep = list_first_entry(&buf->rb_recv_bufs,
-                                      struct rpcrdma_rep, rr_list);
-               list_del(&rep->rr_list);
-               rpcrdma_rep_destroy(rep);
-       }
+       rpcrdma_reps_destroy(buf);
 
        while (!list_empty(&buf->rb_send_bufs)) {
                struct rpcrdma_req *req;
@@ -1281,39 +1300,24 @@ rpcrdma_buffer_get(struct rpcrdma_buffer *buffers)
  */
 void rpcrdma_buffer_put(struct rpcrdma_buffer *buffers, struct rpcrdma_req *req)
 {
-       struct rpcrdma_rep *rep = req->rl_reply;
-
+       if (req->rl_reply)
+               rpcrdma_rep_put(buffers, req->rl_reply);
        req->rl_reply = NULL;
 
        spin_lock(&buffers->rb_lock);
        list_add(&req->rl_list, &buffers->rb_send_bufs);
-       if (rep) {
-               if (!rep->rr_temp) {
-                       list_add(&rep->rr_list, &buffers->rb_recv_bufs);
-                       rep = NULL;
-               }
-       }
        spin_unlock(&buffers->rb_lock);
-       if (rep)
-               rpcrdma_rep_destroy(rep);
 }
 
-/*
- * Put reply buffers back into pool when not attached to
- * request. This happens in error conditions.
+/**
+ * rpcrdma_recv_buffer_put - Release rpcrdma_rep back to free list
+ * @rep: rep to release
+ *
+ * Used after error conditions.
  */
-void
-rpcrdma_recv_buffer_put(struct rpcrdma_rep *rep)
+void rpcrdma_recv_buffer_put(struct rpcrdma_rep *rep)
 {
-       struct rpcrdma_buffer *buffers = &rep->rr_rxprt->rx_buf;
-
-       if (!rep->rr_temp) {
-               spin_lock(&buffers->rb_lock);
-               list_add(&rep->rr_list, &buffers->rb_recv_bufs);
-               spin_unlock(&buffers->rb_lock);
-       } else {
-               rpcrdma_rep_destroy(rep);
-       }
+       rpcrdma_rep_put(&rep->rr_rxprt->rx_buf, rep);
 }
 
 /* Returns a pointer to a rpcrdma_regbuf object, or NULL.
@@ -1469,22 +1473,10 @@ rpcrdma_post_recvs(struct rpcrdma_xprt *r_xprt, bool temp)
 
        /* fast path: all needed reps can be found on the free list */
        wr = NULL;
-       spin_lock(&buf->rb_lock);
        while (needed) {
-               rep = list_first_entry_or_null(&buf->rb_recv_bufs,
-                                              struct rpcrdma_rep, rr_list);
+               rep = rpcrdma_rep_get_locked(buf);
                if (!rep)
-                       break;
-
-               list_del(&rep->rr_list);
-               rep->rr_recv_wr.next = wr;
-               wr = &rep->rr_recv_wr;
-               --needed;
-       }
-       spin_unlock(&buf->rb_lock);
-
-       while (needed) {
-               rep = rpcrdma_rep_create(r_xprt, temp);
+                       rep = rpcrdma_rep_create(r_xprt, temp);
                if (!rep)
                        break;
 
index 200d075bbe313ebf645d32a62ec29e004a08527e..bd1befa83d243e2cc882e3655ebc3b271dcbb212 100644 (file)
@@ -47,6 +47,7 @@
 #include <linux/atomic.h>              /* atomic_t, etc */
 #include <linux/kref.h>                        /* struct kref */
 #include <linux/workqueue.h>           /* struct work_struct */
+#include <linux/llist.h>
 
 #include <rdma/rdma_cm.h>              /* RDMA connection api */
 #include <rdma/ib_verbs.h>             /* RDMA verbs api */
@@ -200,7 +201,7 @@ struct rpcrdma_rep {
        struct rpc_rqst         *rr_rqst;
        struct xdr_buf          rr_hdrbuf;
        struct xdr_stream       rr_stream;
-       struct list_head        rr_list;
+       struct llist_node       rr_node;
        struct ib_recv_wr       rr_recv_wr;
 };
 
@@ -362,7 +363,6 @@ rpcrdma_mr_pop(struct list_head *list)
 struct rpcrdma_buffer {
        spinlock_t              rb_lock;
        struct list_head        rb_send_bufs;
-       struct list_head        rb_recv_bufs;
        struct list_head        rb_mrs;
 
        unsigned long           rb_sc_head;
@@ -373,6 +373,8 @@ struct rpcrdma_buffer {
        struct list_head        rb_allreqs;
        struct list_head        rb_all_mrs;
 
+       struct llist_head       rb_free_reps;
+
        u32                     rb_max_requests;
        u32                     rb_credits;     /* most recent credit grant */