]> asedeno.scripts.mit.edu Git - linux.git/commitdiff
xprtrdma: Wake RPCs directly in rpcrdma_wc_send path
authorChuck Lever <chuck.lever@oracle.com>
Wed, 19 Jun 2019 14:33:15 +0000 (10:33 -0400)
committerAnna Schumaker <Anna.Schumaker@Netapp.com>
Tue, 9 Jul 2019 14:30:25 +0000 (10:30 -0400)
Eliminate a context switch in the path that handles RPC wake-ups
when a Receive completion has to wait for a Send completion.

Signed-off-by: Chuck Lever <chuck.lever@oracle.com>
Signed-off-by: Anna Schumaker <Anna.Schumaker@Netapp.com>
net/sunrpc/xprtrdma/rpc_rdma.c
net/sunrpc/xprtrdma/transport.c
net/sunrpc/xprtrdma/verbs.c
net/sunrpc/xprtrdma/xprt_rdma.h

index 33b6e6a03f68b341975eba2bad7f6a2fdc3d5a2c..caf0b1950d7635215f4be572272203b7be04cd3b 100644 (file)
@@ -511,6 +511,16 @@ rpcrdma_encode_reply_chunk(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req,
        return 0;
 }
 
+static void rpcrdma_sendctx_done(struct kref *kref)
+{
+       struct rpcrdma_req *req =
+               container_of(kref, struct rpcrdma_req, rl_kref);
+       struct rpcrdma_rep *rep = req->rl_reply;
+
+       rpcrdma_complete_rqst(rep);
+       rep->rr_rxprt->rx_stats.reply_waits_for_send++;
+}
+
 /**
  * rpcrdma_sendctx_unmap - DMA-unmap Send buffer
  * @sc: sendctx containing SGEs to unmap
@@ -520,6 +530,9 @@ void rpcrdma_sendctx_unmap(struct rpcrdma_sendctx *sc)
 {
        struct ib_sge *sge;
 
+       if (!sc->sc_unmap_count)
+               return;
+
        /* The first two SGEs contain the transport header and
         * the inline buffer. These are always left mapped so
         * they can be cheaply re-used.
@@ -529,9 +542,7 @@ void rpcrdma_sendctx_unmap(struct rpcrdma_sendctx *sc)
                ib_dma_unmap_page(sc->sc_device, sge->addr, sge->length,
                                  DMA_TO_DEVICE);
 
-       if (test_and_clear_bit(RPCRDMA_REQ_F_TX_RESOURCES,
-                              &sc->sc_req->rl_flags))
-               wake_up_bit(&sc->sc_req->rl_flags, RPCRDMA_REQ_F_TX_RESOURCES);
+       kref_put(&sc->sc_req->rl_kref, rpcrdma_sendctx_done);
 }
 
 /* Prepare an SGE for the RPC-over-RDMA transport header.
@@ -666,7 +677,7 @@ static bool rpcrdma_prepare_msg_sges(struct rpcrdma_xprt *r_xprt,
 out:
        sc->sc_wr.num_sge += sge_no;
        if (sc->sc_unmap_count)
-               __set_bit(RPCRDMA_REQ_F_TX_RESOURCES, &req->rl_flags);
+               kref_get(&req->rl_kref);
        return true;
 
 out_regbuf:
@@ -708,7 +719,7 @@ rpcrdma_prepare_send_sges(struct rpcrdma_xprt *r_xprt,
        req->rl_sendctx->sc_wr.num_sge = 0;
        req->rl_sendctx->sc_unmap_count = 0;
        req->rl_sendctx->sc_req = req;
-       __clear_bit(RPCRDMA_REQ_F_TX_RESOURCES, &req->rl_flags);
+       kref_init(&req->rl_kref);
 
        ret = -EIO;
        if (!rpcrdma_prepare_hdr_sge(r_xprt, req, hdrlen))
@@ -1268,36 +1279,12 @@ void rpcrdma_complete_rqst(struct rpcrdma_rep *rep)
        goto out;
 }
 
-/* Ensure that any DMA mapped pages associated with
- * the Send of the RPC Call have been unmapped before
- * allowing the RPC to complete. This protects argument
- * memory not controlled by the RPC client from being
- * re-used before we're done with it.
- */
-static void rpcrdma_release_tx(struct rpcrdma_xprt *r_xprt,
-                              struct rpcrdma_req *req)
+static void rpcrdma_reply_done(struct kref *kref)
 {
-       if (test_bit(RPCRDMA_REQ_F_TX_RESOURCES, &req->rl_flags)) {
-               r_xprt->rx_stats.reply_waits_for_send++;
-               out_of_line_wait_on_bit(&req->rl_flags,
-                                       RPCRDMA_REQ_F_TX_RESOURCES,
-                                       bit_wait,
-                                       TASK_UNINTERRUPTIBLE);
-       }
-}
+       struct rpcrdma_req *req =
+               container_of(kref, struct rpcrdma_req, rl_kref);
 
-/**
- * rpcrdma_release_rqst - Release hardware resources
- * @r_xprt: controlling transport instance
- * @req: request with resources to release
- *
- */
-void rpcrdma_release_rqst(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req)
-{
-       if (!list_empty(&req->rl_registered))
-               frwr_unmap_sync(r_xprt, req);
-
-       rpcrdma_release_tx(r_xprt, req);
+       rpcrdma_complete_rqst(req->rl_reply);
 }
 
 /**
@@ -1367,13 +1354,11 @@ void rpcrdma_reply_handler(struct rpcrdma_rep *rep)
 
        if (rep->rr_wc_flags & IB_WC_WITH_INVALIDATE)
                frwr_reminv(rep, &req->rl_registered);
-       if (!list_empty(&req->rl_registered)) {
+       if (!list_empty(&req->rl_registered))
                frwr_unmap_async(r_xprt, req);
                /* LocalInv completion will complete the RPC */
-       } else {
-               rpcrdma_release_tx(r_xprt, req);
-               rpcrdma_complete_rqst(rep);
-       }
+       else
+               kref_put(&req->rl_kref, rpcrdma_reply_done);
        return;
 
 out_badversion:
index f84375ddbb4d85195108e6923e00ceb7a1a4cb66..9575f1d8db0739808a7a5bbd7733f5112ecc4ed0 100644 (file)
@@ -618,8 +618,16 @@ xprt_rdma_free(struct rpc_task *task)
        struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(rqst->rq_xprt);
        struct rpcrdma_req *req = rpcr_to_rdmar(rqst);
 
-       rpcrdma_release_rqst(r_xprt, req);
        trace_xprtrdma_op_free(task, req);
+
+       if (!list_empty(&req->rl_registered))
+               frwr_unmap_sync(r_xprt, req);
+
+       /* XXX: If the RPC is completing because of a signal and
+        * not because a reply was received, we ought to ensure
+        * that the Send completion has fired, so that memory
+        * involved with the Send is not still visible to the NIC.
+        */
 }
 
 /**
index c50a4b295bd71b0feca4f50e5b45a65dacb847f6..4e22cc244149a2ed58171b1e3603c680ddb4d56b 100644 (file)
@@ -1462,8 +1462,7 @@ rpcrdma_ep_post(struct rpcrdma_ia *ia,
        struct ib_send_wr *send_wr = &req->rl_sendctx->sc_wr;
        int rc;
 
-       if (!ep->rep_send_count ||
-           test_bit(RPCRDMA_REQ_F_TX_RESOURCES, &req->rl_flags)) {
+       if (!ep->rep_send_count || kref_read(&req->rl_kref) > 1) {
                send_wr->send_flags |= IB_SEND_SIGNALED;
                ep->rep_send_count = ep->rep_send_batch;
        } else {
index e465221c9c96adf6e257a8cbe774934e8c27bc3b..5475f0dff22a6e5fc940386a4c016d1ccd12b387 100644 (file)
@@ -44,7 +44,8 @@
 
 #include <linux/wait.h>                /* wait_queue_head_t, etc */
 #include <linux/spinlock.h>            /* spinlock_t, etc */
-#include <linux/atomic.h>                      /* atomic_t, etc */
+#include <linux/atomic.h>              /* atomic_t, etc */
+#include <linux/kref.h>                        /* struct kref */
 #include <linux/workqueue.h>           /* struct work_struct */
 
 #include <rdma/rdma_cm.h>              /* RDMA connection api */
@@ -329,17 +330,12 @@ struct rpcrdma_req {
        struct rpcrdma_regbuf   *rl_recvbuf;    /* rq_rcv_buf */
 
        struct list_head        rl_all;
-       unsigned long           rl_flags;
+       struct kref             rl_kref;
 
        struct list_head        rl_registered;  /* registered segments */
        struct rpcrdma_mr_seg   rl_segments[RPCRDMA_MAX_SEGS];
 };
 
-/* rl_flags */
-enum {
-       RPCRDMA_REQ_F_TX_RESOURCES,
-};
-
 static inline struct rpcrdma_req *
 rpcr_to_rdmar(const struct rpc_rqst *rqst)
 {
@@ -584,8 +580,6 @@ int rpcrdma_marshal_req(struct rpcrdma_xprt *r_xprt, struct rpc_rqst *rqst);
 void rpcrdma_set_max_header_sizes(struct rpcrdma_xprt *);
 void rpcrdma_complete_rqst(struct rpcrdma_rep *rep);
 void rpcrdma_reply_handler(struct rpcrdma_rep *rep);
-void rpcrdma_release_rqst(struct rpcrdma_xprt *r_xprt,
-                         struct rpcrdma_req *req);
 
 static inline void rpcrdma_set_xdrlen(struct xdr_buf *xdr, size_t len)
 {