]> asedeno.scripts.mit.edu Git - linux.git/commitdiff
xprtrdma: RPC completion should wait for Send completion
authorChuck Lever <chuck.lever@oracle.com>
Fri, 20 Oct 2017 14:48:36 +0000 (10:48 -0400)
committerAnna Schumaker <Anna.Schumaker@Netapp.com>
Fri, 17 Nov 2017 18:47:57 +0000 (13:47 -0500)
When an RPC Call includes a file data payload, that payload can come
from pages in the page cache, or a user buffer (for direct I/O).

If the payload can fit inline, xprtrdma includes it in the Send
using a scatter-gather technique. xprtrdma mustn't allow the RPC
consumer to re-use the memory where that payload resides before the
Send completes. Otherwise, the new contents of that memory would be
exposed by an HCA retransmit of the Send operation.

So, block RPC completion on Send completion, but only in the case
where a separate file data payload is part of the Send. This
prevents the reuse of that memory while it is still part of a Send
operation without an undue cost to other cases.

Waiting is avoided in the common case because typically the Send
will have completed long before the RPC Reply arrives.

These days, an RPC timeout will trigger a disconnect, which tears
down the QP. The disconnect flushes all waiting Sends. This bounds
the amount of time the reply handler has to wait for a Send
completion.

Signed-off-by: Chuck Lever <chuck.lever@oracle.com>
Signed-off-by: Anna Schumaker <Anna.Schumaker@Netapp.com>
net/sunrpc/xprtrdma/rpc_rdma.c
net/sunrpc/xprtrdma/transport.c
net/sunrpc/xprtrdma/verbs.c
net/sunrpc/xprtrdma/xprt_rdma.h

index 853dede38900aaaced5dc5ab83cb59197ced5e72..4fdeaac6ebe6a3100fd0d7fcd2bc16183efbdfb8 100644 (file)
@@ -534,6 +534,11 @@ rpcrdma_unmap_sendctx(struct rpcrdma_sendctx *sc)
        for (count = sc->sc_unmap_count; count; ++sge, --count)
                ib_dma_unmap_page(ia->ri_device,
                                  sge->addr, sge->length, DMA_TO_DEVICE);
+
+       if (test_and_clear_bit(RPCRDMA_REQ_F_TX_RESOURCES, &sc->sc_req->rl_flags)) {
+               smp_mb__after_atomic();
+               wake_up_bit(&sc->sc_req->rl_flags, RPCRDMA_REQ_F_TX_RESOURCES);
+       }
 }
 
 /* Prepare an SGE for the RPC-over-RDMA transport header.
@@ -667,6 +672,8 @@ rpcrdma_prepare_msg_sges(struct rpcrdma_ia *ia, struct rpcrdma_req *req,
 
 out:
        sc->sc_wr.num_sge += sge_no;
+       if (sc->sc_unmap_count)
+               __set_bit(RPCRDMA_REQ_F_TX_RESOURCES, &req->rl_flags);
        return true;
 
 out_regbuf:
@@ -704,6 +711,8 @@ rpcrdma_prepare_send_sges(struct rpcrdma_xprt *r_xprt,
                return -ENOBUFS;
        req->rl_sendctx->sc_wr.num_sge = 0;
        req->rl_sendctx->sc_unmap_count = 0;
+       req->rl_sendctx->sc_req = req;
+       __clear_bit(RPCRDMA_REQ_F_TX_RESOURCES, &req->rl_flags);
 
        if (!rpcrdma_prepare_hdr_sge(&r_xprt->rx_ia, req, hdrlen))
                return -EIO;
@@ -1305,6 +1314,20 @@ void rpcrdma_release_rqst(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req)
        if (!list_empty(&req->rl_registered))
                r_xprt->rx_ia.ri_ops->ro_unmap_sync(r_xprt,
                                                    &req->rl_registered);
+
+       /* Ensure that any DMA mapped pages associated with
+        * the Send of the RPC Call have been unmapped before
+        * allowing the RPC to complete. This protects argument
+        * memory not controlled by the RPC client from being
+        * re-used before we're done with it.
+        */
+       if (test_bit(RPCRDMA_REQ_F_TX_RESOURCES, &req->rl_flags)) {
+               r_xprt->rx_stats.reply_waits_for_send++;
+               out_of_line_wait_on_bit(&req->rl_flags,
+                                       RPCRDMA_REQ_F_TX_RESOURCES,
+                                       bit_wait,
+                                       TASK_UNINTERRUPTIBLE);
+       }
 }
 
 /* Reply handling runs in the poll worker thread. Anything that
@@ -1384,7 +1407,8 @@ void rpcrdma_reply_handler(struct rpcrdma_rep *rep)
        dprintk("RPC:       %s: reply %p completes request %p (xid 0x%08x)\n",
                __func__, rep, req, be32_to_cpu(rep->rr_xid));
 
-       if (list_empty(&req->rl_registered))
+       if (list_empty(&req->rl_registered) &&
+           !test_bit(RPCRDMA_REQ_F_TX_RESOURCES, &req->rl_flags))
                rpcrdma_complete_rqst(rep);
        else
                queue_work(rpcrdma_receive_wq, &rep->rr_work);
index 35aefe201848c70cdb800344afa8d5a7f52d6e8f..9fdd11e4758c8cbb9f9772d6dc0dd09546135359 100644 (file)
@@ -789,12 +789,13 @@ void xprt_rdma_print_stats(struct rpc_xprt *xprt, struct seq_file *seq)
                   r_xprt->rx_stats.failed_marshal_count,
                   r_xprt->rx_stats.bad_reply_count,
                   r_xprt->rx_stats.nomsg_call_count);
-       seq_printf(seq, "%lu %lu %lu %lu %lu\n",
+       seq_printf(seq, "%lu %lu %lu %lu %lu %lu\n",
                   r_xprt->rx_stats.mrs_recovered,
                   r_xprt->rx_stats.mrs_orphaned,
                   r_xprt->rx_stats.mrs_allocated,
                   r_xprt->rx_stats.local_inv_needed,
-                  r_xprt->rx_stats.empty_sendctx_q);
+                  r_xprt->rx_stats.empty_sendctx_q,
+                  r_xprt->rx_stats.reply_waits_for_send);
 }
 
 static int
index bab63adf070ba1db63911e0e1774e14371a5e374..9a824fe8ffc27d8a859efb01535750342a019f50 100644 (file)
@@ -1526,7 +1526,8 @@ rpcrdma_ep_post(struct rpcrdma_ia *ia,
        dprintk("RPC:       %s: posting %d s/g entries\n",
                __func__, send_wr->num_sge);
 
-       if (!ep->rep_send_count) {
+       if (!ep->rep_send_count ||
+           test_bit(RPCRDMA_REQ_F_TX_RESOURCES, &req->rl_flags)) {
                send_wr->send_flags |= IB_SEND_SIGNALED;
                ep->rep_send_count = ep->rep_send_batch;
        } else {
index c260475baa367e91bd1b8c35cf2f350b895e14d6..bccd5d8b93840870946fd8376d1f59110120c1bc 100644 (file)
@@ -236,11 +236,13 @@ struct rpcrdma_rep {
 
 /* struct rpcrdma_sendctx - DMA mapped SGEs to unmap after Send completes
  */
+struct rpcrdma_req;
 struct rpcrdma_xprt;
 struct rpcrdma_sendctx {
        struct ib_send_wr       sc_wr;
        struct ib_cqe           sc_cqe;
        struct rpcrdma_xprt     *sc_xprt;
+       struct rpcrdma_req      *sc_req;
        unsigned int            sc_unmap_count;
        struct ib_sge           sc_sges[];
 };
@@ -387,6 +389,7 @@ struct rpcrdma_req {
 enum {
        RPCRDMA_REQ_F_BACKCHANNEL = 0,
        RPCRDMA_REQ_F_PENDING,
+       RPCRDMA_REQ_F_TX_RESOURCES,
 };
 
 static inline void
@@ -492,6 +495,7 @@ struct rpcrdma_stats {
        /* accessed when receiving a reply */
        unsigned long long      total_rdma_reply;
        unsigned long long      fixup_copy_count;
+       unsigned long           reply_waits_for_send;
        unsigned long           local_inv_needed;
        unsigned long           nomsg_call_count;
        unsigned long           bcall_count;