]> asedeno.scripts.mit.edu Git - linux.git/blobdiff - net/sunrpc/xprtrdma/verbs.c
xprtrdma: Fix MR list handling
[linux.git] / net / sunrpc / xprtrdma / verbs.c
index b10aa16557f00d104bae3b757e83adccbfe68228..82361e7bbb51ca4512e07ec606be0ffef6179c2c 100644 (file)
@@ -53,6 +53,7 @@
 #include <linux/slab.h>
 #include <linux/sunrpc/addr.h>
 #include <linux/sunrpc/svc_rdma.h>
+#include <linux/log2.h>
 
 #include <asm-generic/barrier.h>
 #include <asm/bitops.h>
@@ -74,6 +75,8 @@
  * internal functions
  */
 static void rpcrdma_sendctx_put_locked(struct rpcrdma_sendctx *sc);
+static void rpcrdma_reqs_reset(struct rpcrdma_xprt *r_xprt);
+static void rpcrdma_reps_destroy(struct rpcrdma_buffer *buf);
 static void rpcrdma_mrs_create(struct rpcrdma_xprt *r_xprt);
 static void rpcrdma_mrs_destroy(struct rpcrdma_buffer *buf);
 static struct rpcrdma_regbuf *
@@ -81,7 +84,6 @@ rpcrdma_regbuf_alloc(size_t size, enum dma_data_direction direction,
                     gfp_t flags);
 static void rpcrdma_regbuf_dma_unmap(struct rpcrdma_regbuf *rb);
 static void rpcrdma_regbuf_free(struct rpcrdma_regbuf *rb);
-static void rpcrdma_post_recvs(struct rpcrdma_xprt *r_xprt, bool temp);
 
 /* Wait for outstanding transport work to finish. ib_drain_qp
  * handles the drains in the wrong order for us, so open code
@@ -167,7 +169,6 @@ rpcrdma_wc_receive(struct ib_cq *cq, struct ib_wc *wc)
                                   rdmab_addr(rep->rr_rdmabuf),
                                   wc->byte_len, DMA_FROM_DEVICE);
 
-       rpcrdma_post_recvs(r_xprt, false);
        rpcrdma_reply_handler(rep);
        return;
 
@@ -405,9 +406,8 @@ rpcrdma_ia_remove(struct rpcrdma_ia *ia)
        struct rpcrdma_ep *ep = &r_xprt->rx_ep;
        struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
        struct rpcrdma_req *req;
-       struct rpcrdma_rep *rep;
 
-       cancel_delayed_work_sync(&buf->rb_refresh_worker);
+       cancel_work_sync(&buf->rb_refresh_worker);
 
        /* This is similar to rpcrdma_ep_destroy, but:
         * - Don't cancel the connect worker.
@@ -429,8 +429,7 @@ rpcrdma_ia_remove(struct rpcrdma_ia *ia)
        /* The ULP is responsible for ensuring all DMA
         * mappings and MRs are gone.
         */
-       list_for_each_entry(rep, &buf->rb_recv_bufs, rr_list)
-               rpcrdma_regbuf_dma_unmap(rep->rr_rdmabuf);
+       rpcrdma_reps_destroy(buf);
        list_for_each_entry(req, &buf->rb_allreqs, rl_all) {
                rpcrdma_regbuf_dma_unmap(req->rl_rdmabuf);
                rpcrdma_regbuf_dma_unmap(req->rl_sendbuf);
@@ -604,10 +603,10 @@ void rpcrdma_ep_destroy(struct rpcrdma_xprt *r_xprt)
  * Unlike a normal reconnection, a fresh PD and a new set
  * of MRs and buffers is needed.
  */
-static int
-rpcrdma_ep_recreate_xprt(struct rpcrdma_xprt *r_xprt,
-                        struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
+static int rpcrdma_ep_recreate_xprt(struct rpcrdma_xprt *r_xprt,
+                                   struct ib_qp_init_attr *qp_init_attr)
 {
+       struct rpcrdma_ia *ia = &r_xprt->rx_ia;
        int rc, err;
 
        trace_xprtrdma_reinsert(r_xprt);
@@ -624,7 +623,7 @@ rpcrdma_ep_recreate_xprt(struct rpcrdma_xprt *r_xprt,
        }
 
        rc = -ENETUNREACH;
-       err = rdma_create_qp(ia->ri_id, ia->ri_pd, &ep->rep_attr);
+       err = rdma_create_qp(ia->ri_id, ia->ri_pd, qp_init_attr);
        if (err) {
                pr_err("rpcrdma: rdma_create_qp returned %d\n", err);
                goto out3;
@@ -641,16 +640,16 @@ rpcrdma_ep_recreate_xprt(struct rpcrdma_xprt *r_xprt,
        return rc;
 }
 
-static int
-rpcrdma_ep_reconnect(struct rpcrdma_xprt *r_xprt, struct rpcrdma_ep *ep,
-                    struct rpcrdma_ia *ia)
+static int rpcrdma_ep_reconnect(struct rpcrdma_xprt *r_xprt,
+                               struct ib_qp_init_attr *qp_init_attr)
 {
+       struct rpcrdma_ia *ia = &r_xprt->rx_ia;
        struct rdma_cm_id *id, *old;
        int err, rc;
 
        trace_xprtrdma_reconnect(r_xprt);
 
-       rpcrdma_ep_disconnect(ep, ia);
+       rpcrdma_ep_disconnect(&r_xprt->rx_ep, ia);
 
        rc = -EHOSTUNREACH;
        id = rpcrdma_create_id(r_xprt, ia);
@@ -672,7 +671,7 @@ rpcrdma_ep_reconnect(struct rpcrdma_xprt *r_xprt, struct rpcrdma_ep *ep,
                goto out_destroy;
        }
 
-       err = rdma_create_qp(id, ia->ri_pd, &ep->rep_attr);
+       err = rdma_create_qp(id, ia->ri_pd, qp_init_attr);
        if (err)
                goto out_destroy;
 
@@ -697,25 +696,27 @@ rpcrdma_ep_connect(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
        struct rpcrdma_xprt *r_xprt = container_of(ia, struct rpcrdma_xprt,
                                                   rx_ia);
        struct rpc_xprt *xprt = &r_xprt->rx_xprt;
+       struct ib_qp_init_attr qp_init_attr;
        int rc;
 
 retry:
+       memcpy(&qp_init_attr, &ep->rep_attr, sizeof(qp_init_attr));
        switch (ep->rep_connected) {
        case 0:
                dprintk("RPC:       %s: connecting...\n", __func__);
-               rc = rdma_create_qp(ia->ri_id, ia->ri_pd, &ep->rep_attr);
+               rc = rdma_create_qp(ia->ri_id, ia->ri_pd, &qp_init_attr);
                if (rc) {
                        rc = -ENETUNREACH;
                        goto out_noupdate;
                }
                break;
        case -ENODEV:
-               rc = rpcrdma_ep_recreate_xprt(r_xprt, ep, ia);
+               rc = rpcrdma_ep_recreate_xprt(r_xprt, &qp_init_attr);
                if (rc)
                        goto out_noupdate;
                break;
        default:
-               rc = rpcrdma_ep_reconnect(r_xprt, ep, ia);
+               rc = rpcrdma_ep_reconnect(r_xprt, &qp_init_attr);
                if (rc)
                        goto out;
        }
@@ -723,12 +724,15 @@ rpcrdma_ep_connect(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
        ep->rep_connected = 0;
        xprt_clear_connected(xprt);
 
+       rpcrdma_reset_cwnd(r_xprt);
        rpcrdma_post_recvs(r_xprt, true);
 
        rc = rdma_connect(ia->ri_id, &ep->rep_remote_cma);
        if (rc)
                goto out;
 
+       if (xprt->reestablish_timeout < RPCRDMA_INIT_REEST_TO)
+               xprt->reestablish_timeout = RPCRDMA_INIT_REEST_TO;
        wait_event_interruptible(ep->rep_connect_wait, ep->rep_connected != 0);
        if (ep->rep_connected <= 0) {
                if (ep->rep_connected == -EAGAIN)
@@ -775,6 +779,7 @@ rpcrdma_ep_disconnect(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
        trace_xprtrdma_disconnect(r_xprt, rc);
 
        rpcrdma_xprt_drain(r_xprt);
+       rpcrdma_reqs_reset(r_xprt);
 }
 
 /* Fixed-size circular FIFO queue. This implementation is wait-free and
@@ -942,14 +947,12 @@ rpcrdma_mrs_create(struct rpcrdma_xprt *r_xprt)
        struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
        struct rpcrdma_ia *ia = &r_xprt->rx_ia;
        unsigned int count;
-       LIST_HEAD(free);
-       LIST_HEAD(all);
 
        for (count = 0; count < ia->ri_max_segs; count++) {
                struct rpcrdma_mr *mr;
                int rc;
 
-               mr = kzalloc(sizeof(*mr), GFP_KERNEL);
+               mr = kzalloc(sizeof(*mr), GFP_NOFS);
                if (!mr)
                        break;
 
@@ -961,15 +964,13 @@ rpcrdma_mrs_create(struct rpcrdma_xprt *r_xprt)
 
                mr->mr_xprt = r_xprt;
 
-               list_add(&mr->mr_list, &free);
-               list_add(&mr->mr_all, &all);
+               spin_lock(&buf->rb_lock);
+               rpcrdma_mr_push(mr, &buf->rb_mrs);
+               list_add(&mr->mr_all, &buf->rb_all_mrs);
+               spin_unlock(&buf->rb_lock);
        }
 
-       spin_lock(&buf->rb_mrlock);
-       list_splice(&free, &buf->rb_mrs);
-       list_splice(&all, &buf->rb_all);
        r_xprt->rx_stats.mrs_allocated += count;
-       spin_unlock(&buf->rb_mrlock);
        trace_xprtrdma_createmrs(r_xprt, count);
 }
 
@@ -977,7 +978,7 @@ static void
 rpcrdma_mr_refresh_worker(struct work_struct *work)
 {
        struct rpcrdma_buffer *buf = container_of(work, struct rpcrdma_buffer,
-                                                 rb_refresh_worker.work);
+                                                 rb_refresh_worker);
        struct rpcrdma_xprt *r_xprt = container_of(buf, struct rpcrdma_xprt,
                                                   rx_buf);
 
@@ -999,12 +1000,18 @@ struct rpcrdma_req *rpcrdma_req_create(struct rpcrdma_xprt *r_xprt, size_t size,
        struct rpcrdma_buffer *buffer = &r_xprt->rx_buf;
        struct rpcrdma_regbuf *rb;
        struct rpcrdma_req *req;
+       size_t maxhdrsize;
 
        req = kzalloc(sizeof(*req), flags);
        if (req == NULL)
                goto out1;
 
-       rb = rpcrdma_regbuf_alloc(RPCRDMA_HDRBUF_SIZE, DMA_TO_DEVICE, flags);
+       /* Compute maximum header buffer size in bytes */
+       maxhdrsize = rpcrdma_fixed_maxsz + 3 +
+                    r_xprt->rx_ia.ri_max_segs * rpcrdma_readchunk_maxsz;
+       maxhdrsize *= sizeof(__be32);
+       rb = rpcrdma_regbuf_alloc(__roundup_pow_of_two(maxhdrsize),
+                                 DMA_TO_DEVICE, flags);
        if (!rb)
                goto out2;
        req->rl_rdmabuf = rb;
@@ -1018,6 +1025,7 @@ struct rpcrdma_req *rpcrdma_req_create(struct rpcrdma_xprt *r_xprt, size_t size,
        if (!req->rl_recvbuf)
                goto out4;
 
+       INIT_LIST_HEAD(&req->rl_free_mrs);
        INIT_LIST_HEAD(&req->rl_registered);
        spin_lock(&buffer->rb_lock);
        list_add(&req->rl_all, &buffer->rb_allreqs);
@@ -1034,6 +1042,26 @@ struct rpcrdma_req *rpcrdma_req_create(struct rpcrdma_xprt *r_xprt, size_t size,
        return NULL;
 }
 
+/**
+ * rpcrdma_reqs_reset - Reset all reqs owned by a transport
+ * @r_xprt: controlling transport instance
+ *
+ * ASSUMPTION: the rb_allreqs list is stable for the duration,
+ * and thus can be walked without holding rb_lock. Eg. the
+ * caller is holding the transport send lock to exclude
+ * device removal or disconnection.
+ */
+static void rpcrdma_reqs_reset(struct rpcrdma_xprt *r_xprt)
+{
+       struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
+       struct rpcrdma_req *req;
+
+       list_for_each_entry(req, &buf->rb_allreqs, rl_all) {
+               /* Credits are valid only for one connection */
+               req->rl_slot.rq_cong = 0;
+       }
+}
+
 static struct rpcrdma_rep *rpcrdma_rep_create(struct rpcrdma_xprt *r_xprt,
                                              bool temp)
 {
@@ -1065,6 +1093,40 @@ static struct rpcrdma_rep *rpcrdma_rep_create(struct rpcrdma_xprt *r_xprt,
        return NULL;
 }
 
+static void rpcrdma_rep_destroy(struct rpcrdma_rep *rep)
+{
+       rpcrdma_regbuf_free(rep->rr_rdmabuf);
+       kfree(rep);
+}
+
+static struct rpcrdma_rep *rpcrdma_rep_get_locked(struct rpcrdma_buffer *buf)
+{
+       struct llist_node *node;
+
+       /* Calls to llist_del_first are required to be serialized */
+       node = llist_del_first(&buf->rb_free_reps);
+       if (!node)
+               return NULL;
+       return llist_entry(node, struct rpcrdma_rep, rr_node);
+}
+
+static void rpcrdma_rep_put(struct rpcrdma_buffer *buf,
+                           struct rpcrdma_rep *rep)
+{
+       if (!rep->rr_temp)
+               llist_add(&rep->rr_node, &buf->rb_free_reps);
+       else
+               rpcrdma_rep_destroy(rep);
+}
+
+static void rpcrdma_reps_destroy(struct rpcrdma_buffer *buf)
+{
+       struct rpcrdma_rep *rep;
+
+       while ((rep = rpcrdma_rep_get_locked(buf)) != NULL)
+               rpcrdma_rep_destroy(rep);
+}
+
 /**
  * rpcrdma_buffer_create - Create initial set of req/rep objects
  * @r_xprt: transport instance to (re)initialize
@@ -1078,12 +1140,10 @@ int rpcrdma_buffer_create(struct rpcrdma_xprt *r_xprt)
 
        buf->rb_max_requests = r_xprt->rx_ep.rep_max_requests;
        buf->rb_bc_srv_max_requests = 0;
-       spin_lock_init(&buf->rb_mrlock);
        spin_lock_init(&buf->rb_lock);
        INIT_LIST_HEAD(&buf->rb_mrs);
-       INIT_LIST_HEAD(&buf->rb_all);
-       INIT_DELAYED_WORK(&buf->rb_refresh_worker,
-                         rpcrdma_mr_refresh_worker);
+       INIT_LIST_HEAD(&buf->rb_all_mrs);
+       INIT_WORK(&buf->rb_refresh_worker, rpcrdma_mr_refresh_worker);
 
        rpcrdma_mrs_create(r_xprt);
 
@@ -1101,8 +1161,7 @@ int rpcrdma_buffer_create(struct rpcrdma_xprt *r_xprt)
                list_add(&req->rl_list, &buf->rb_send_bufs);
        }
 
-       buf->rb_credits = 1;
-       INIT_LIST_HEAD(&buf->rb_recv_bufs);
+       init_llist_head(&buf->rb_free_reps);
 
        rc = rpcrdma_sendctxs_create(r_xprt);
        if (rc)
@@ -1114,12 +1173,6 @@ int rpcrdma_buffer_create(struct rpcrdma_xprt *r_xprt)
        return rc;
 }
 
-static void rpcrdma_rep_destroy(struct rpcrdma_rep *rep)
-{
-       rpcrdma_regbuf_free(rep->rr_rdmabuf);
-       kfree(rep);
-}
-
 /**
  * rpcrdma_req_destroy - Destroy an rpcrdma_req object
  * @req: unused object to be destroyed
@@ -1127,45 +1180,54 @@ static void rpcrdma_rep_destroy(struct rpcrdma_rep *rep)
  * This function assumes that the caller prevents concurrent device
  * unload and transport tear-down.
  */
-void
-rpcrdma_req_destroy(struct rpcrdma_req *req)
+void rpcrdma_req_destroy(struct rpcrdma_req *req)
 {
+       struct rpcrdma_mr *mr;
+
        list_del(&req->rl_all);
 
+       while ((mr = rpcrdma_mr_pop(&req->rl_free_mrs))) {
+               struct rpcrdma_buffer *buf = &mr->mr_xprt->rx_buf;
+
+               spin_lock(&buf->rb_lock);
+               list_del(&mr->mr_all);
+               spin_unlock(&buf->rb_lock);
+
+               frwr_release_mr(mr);
+       }
+
        rpcrdma_regbuf_free(req->rl_recvbuf);
        rpcrdma_regbuf_free(req->rl_sendbuf);
        rpcrdma_regbuf_free(req->rl_rdmabuf);
        kfree(req);
 }
 
-static void
-rpcrdma_mrs_destroy(struct rpcrdma_buffer *buf)
+/**
+ * rpcrdma_mrs_destroy - Release all of a transport's MRs
+ * @buf: controlling buffer instance
+ *
+ * Relies on caller holding the transport send lock to protect
+ * removing mr->mr_list from req->rl_free_mrs safely.
+ */
+static void rpcrdma_mrs_destroy(struct rpcrdma_buffer *buf)
 {
        struct rpcrdma_xprt *r_xprt = container_of(buf, struct rpcrdma_xprt,
                                                   rx_buf);
        struct rpcrdma_mr *mr;
-       unsigned int count;
 
-       count = 0;
-       spin_lock(&buf->rb_mrlock);
-       while (!list_empty(&buf->rb_all)) {
-               mr = list_entry(buf->rb_all.next, struct rpcrdma_mr, mr_all);
+       spin_lock(&buf->rb_lock);
+       while ((mr = list_first_entry_or_null(&buf->rb_all_mrs,
+                                             struct rpcrdma_mr,
+                                             mr_all)) != NULL) {
+               list_del(&mr->mr_list);
                list_del(&mr->mr_all);
-
-               spin_unlock(&buf->rb_mrlock);
-
-               /* Ensure MW is not on any rl_registered list */
-               if (!list_empty(&mr->mr_list))
-                       list_del(&mr->mr_list);
+               spin_unlock(&buf->rb_lock);
 
                frwr_release_mr(mr);
-               count++;
-               spin_lock(&buf->rb_mrlock);
+               spin_lock(&buf->rb_lock);
        }
-       spin_unlock(&buf->rb_mrlock);
+       spin_unlock(&buf->rb_lock);
        r_xprt->rx_stats.mrs_allocated = 0;
-
-       dprintk("RPC:       %s: released %u MRs\n", __func__, count);
 }
 
 /**
@@ -1179,18 +1241,10 @@ rpcrdma_mrs_destroy(struct rpcrdma_buffer *buf)
 void
 rpcrdma_buffer_destroy(struct rpcrdma_buffer *buf)
 {
-       cancel_delayed_work_sync(&buf->rb_refresh_worker);
+       cancel_work_sync(&buf->rb_refresh_worker);
 
        rpcrdma_sendctxs_destroy(buf);
-
-       while (!list_empty(&buf->rb_recv_bufs)) {
-               struct rpcrdma_rep *rep;
-
-               rep = list_first_entry(&buf->rb_recv_bufs,
-                                      struct rpcrdma_rep, rr_list);
-               list_del(&rep->rr_list);
-               rpcrdma_rep_destroy(rep);
-       }
+       rpcrdma_reps_destroy(buf);
 
        while (!list_empty(&buf->rb_send_bufs)) {
                struct rpcrdma_req *req;
@@ -1215,54 +1269,20 @@ struct rpcrdma_mr *
 rpcrdma_mr_get(struct rpcrdma_xprt *r_xprt)
 {
        struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
-       struct rpcrdma_mr *mr = NULL;
-
-       spin_lock(&buf->rb_mrlock);
-       if (!list_empty(&buf->rb_mrs))
-               mr = rpcrdma_mr_pop(&buf->rb_mrs);
-       spin_unlock(&buf->rb_mrlock);
+       struct rpcrdma_mr *mr;
 
-       if (!mr)
-               goto out_nomrs;
+       spin_lock(&buf->rb_lock);
+       mr = rpcrdma_mr_pop(&buf->rb_mrs);
+       spin_unlock(&buf->rb_lock);
        return mr;
-
-out_nomrs:
-       trace_xprtrdma_nomrs(r_xprt);
-       if (r_xprt->rx_ep.rep_connected != -ENODEV)
-               schedule_delayed_work(&buf->rb_refresh_worker, 0);
-
-       /* Allow the reply handler and refresh worker to run */
-       cond_resched();
-
-       return NULL;
-}
-
-static void
-__rpcrdma_mr_put(struct rpcrdma_buffer *buf, struct rpcrdma_mr *mr)
-{
-       spin_lock(&buf->rb_mrlock);
-       rpcrdma_mr_push(mr, &buf->rb_mrs);
-       spin_unlock(&buf->rb_mrlock);
-}
-
-/**
- * rpcrdma_mr_put - Release an rpcrdma_mr object
- * @mr: object to release
- *
- */
-void
-rpcrdma_mr_put(struct rpcrdma_mr *mr)
-{
-       __rpcrdma_mr_put(&mr->mr_xprt->rx_buf, mr);
 }
 
 /**
- * rpcrdma_mr_unmap_and_put - DMA unmap an MR and release it
- * @mr: object to release
+ * rpcrdma_mr_put - DMA unmap an MR and release it
+ * @mr: MR to release
  *
  */
-void
-rpcrdma_mr_unmap_and_put(struct rpcrdma_mr *mr)
+void rpcrdma_mr_put(struct rpcrdma_mr *mr)
 {
        struct rpcrdma_xprt *r_xprt = mr->mr_xprt;
 
@@ -1272,7 +1292,8 @@ rpcrdma_mr_unmap_and_put(struct rpcrdma_mr *mr)
                                mr->mr_sg, mr->mr_nents, mr->mr_dir);
                mr->mr_dir = DMA_NONE;
        }
-       __rpcrdma_mr_put(&r_xprt->rx_buf, mr);
+
+       rpcrdma_mr_push(mr, &mr->mr_req->rl_free_mrs);
 }
 
 /**
@@ -1303,39 +1324,24 @@ rpcrdma_buffer_get(struct rpcrdma_buffer *buffers)
  */
 void rpcrdma_buffer_put(struct rpcrdma_buffer *buffers, struct rpcrdma_req *req)
 {
-       struct rpcrdma_rep *rep = req->rl_reply;
-
+       if (req->rl_reply)
+               rpcrdma_rep_put(buffers, req->rl_reply);
        req->rl_reply = NULL;
 
        spin_lock(&buffers->rb_lock);
        list_add(&req->rl_list, &buffers->rb_send_bufs);
-       if (rep) {
-               if (!rep->rr_temp) {
-                       list_add(&rep->rr_list, &buffers->rb_recv_bufs);
-                       rep = NULL;
-               }
-       }
        spin_unlock(&buffers->rb_lock);
-       if (rep)
-               rpcrdma_rep_destroy(rep);
 }
 
-/*
- * Put reply buffers back into pool when not attached to
- * request. This happens in error conditions.
+/**
+ * rpcrdma_recv_buffer_put - Release rpcrdma_rep back to free list
+ * @rep: rep to release
+ *
+ * Used after error conditions.
  */
-void
-rpcrdma_recv_buffer_put(struct rpcrdma_rep *rep)
+void rpcrdma_recv_buffer_put(struct rpcrdma_rep *rep)
 {
-       struct rpcrdma_buffer *buffers = &rep->rr_rxprt->rx_buf;
-
-       if (!rep->rr_temp) {
-               spin_lock(&buffers->rb_lock);
-               list_add(&rep->rr_list, &buffers->rb_recv_bufs);
-               spin_unlock(&buffers->rb_lock);
-       } else {
-               rpcrdma_rep_destroy(rep);
-       }
+       rpcrdma_rep_put(&rep->rr_rxprt->rx_buf, rep);
 }
 
 /* Returns a pointer to a rpcrdma_regbuf object, or NULL.
@@ -1470,8 +1476,13 @@ rpcrdma_ep_post(struct rpcrdma_ia *ia,
        return 0;
 }
 
-static void
-rpcrdma_post_recvs(struct rpcrdma_xprt *r_xprt, bool temp)
+/**
+ * rpcrdma_post_recvs - Refill the Receive Queue
+ * @r_xprt: controlling transport instance
+ * @temp: mark Receive buffers to be deleted after use
+ *
+ */
+void rpcrdma_post_recvs(struct rpcrdma_xprt *r_xprt, bool temp)
 {
        struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
        struct rpcrdma_ep *ep = &r_xprt->rx_ep;
@@ -1483,7 +1494,7 @@ rpcrdma_post_recvs(struct rpcrdma_xprt *r_xprt, bool temp)
        count = 0;
 
        needed = buf->rb_credits + (buf->rb_bc_srv_max_requests << 1);
-       if (ep->rep_receive_count > needed)
+       if (likely(ep->rep_receive_count > needed))
                goto out;
        needed -= ep->rep_receive_count;
        if (!temp)
@@ -1491,22 +1502,10 @@ rpcrdma_post_recvs(struct rpcrdma_xprt *r_xprt, bool temp)
 
        /* fast path: all needed reps can be found on the free list */
        wr = NULL;
-       spin_lock(&buf->rb_lock);
        while (needed) {
-               rep = list_first_entry_or_null(&buf->rb_recv_bufs,
-                                              struct rpcrdma_rep, rr_list);
+               rep = rpcrdma_rep_get_locked(buf);
                if (!rep)
-                       break;
-
-               list_del(&rep->rr_list);
-               rep->rr_recv_wr.next = wr;
-               wr = &rep->rr_recv_wr;
-               --needed;
-       }
-       spin_unlock(&buf->rb_lock);
-
-       while (needed) {
-               rep = rpcrdma_rep_create(r_xprt, temp);
+                       rep = rpcrdma_rep_create(r_xprt, temp);
                if (!rep)
                        break;
 
@@ -1523,7 +1522,7 @@ rpcrdma_post_recvs(struct rpcrdma_xprt *r_xprt, bool temp)
                if (!rpcrdma_regbuf_dma_map(r_xprt, rep->rr_rdmabuf))
                        goto release_wrs;
 
-               trace_xprtrdma_post_recv(rep->rr_recv_wr.wr_cqe);
+               trace_xprtrdma_post_recv(rep);
                ++count;
        }