]> asedeno.scripts.mit.edu Git - linux.git/blobdiff - net/sunrpc/xprtrdma/rpc_rdma.c
xprtrdma: Wake RPCs directly in rpcrdma_wc_send path
[linux.git] / net / sunrpc / xprtrdma / rpc_rdma.c
index 85115a2e263928f89f9d043322a2a4d13a54a66a..caf0b1950d7635215f4be572272203b7be04cd3b 100644 (file)
@@ -511,6 +511,16 @@ rpcrdma_encode_reply_chunk(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req,
        return 0;
 }
 
+static void rpcrdma_sendctx_done(struct kref *kref)
+{
+       struct rpcrdma_req *req =
+               container_of(kref, struct rpcrdma_req, rl_kref);
+       struct rpcrdma_rep *rep = req->rl_reply;
+
+       rpcrdma_complete_rqst(rep);
+       rep->rr_rxprt->rx_stats.reply_waits_for_send++;
+}
+
 /**
  * rpcrdma_sendctx_unmap - DMA-unmap Send buffer
  * @sc: sendctx containing SGEs to unmap
@@ -520,6 +530,9 @@ void rpcrdma_sendctx_unmap(struct rpcrdma_sendctx *sc)
 {
        struct ib_sge *sge;
 
+       if (!sc->sc_unmap_count)
+               return;
+
        /* The first two SGEs contain the transport header and
         * the inline buffer. These are always left mapped so
         * they can be cheaply re-used.
@@ -529,9 +542,7 @@ void rpcrdma_sendctx_unmap(struct rpcrdma_sendctx *sc)
                ib_dma_unmap_page(sc->sc_device, sge->addr, sge->length,
                                  DMA_TO_DEVICE);
 
-       if (test_and_clear_bit(RPCRDMA_REQ_F_TX_RESOURCES,
-                              &sc->sc_req->rl_flags))
-               wake_up_bit(&sc->sc_req->rl_flags, RPCRDMA_REQ_F_TX_RESOURCES);
+       kref_put(&sc->sc_req->rl_kref, rpcrdma_sendctx_done);
 }
 
 /* Prepare an SGE for the RPC-over-RDMA transport header.
@@ -666,7 +677,7 @@ static bool rpcrdma_prepare_msg_sges(struct rpcrdma_xprt *r_xprt,
 out:
        sc->sc_wr.num_sge += sge_no;
        if (sc->sc_unmap_count)
-               __set_bit(RPCRDMA_REQ_F_TX_RESOURCES, &req->rl_flags);
+               kref_get(&req->rl_kref);
        return true;
 
 out_regbuf:
@@ -699,22 +710,28 @@ rpcrdma_prepare_send_sges(struct rpcrdma_xprt *r_xprt,
                          struct rpcrdma_req *req, u32 hdrlen,
                          struct xdr_buf *xdr, enum rpcrdma_chunktype rtype)
 {
+       int ret;
+
+       ret = -EAGAIN;
        req->rl_sendctx = rpcrdma_sendctx_get_locked(r_xprt);
        if (!req->rl_sendctx)
-               return -EAGAIN;
+               goto err;
        req->rl_sendctx->sc_wr.num_sge = 0;
        req->rl_sendctx->sc_unmap_count = 0;
        req->rl_sendctx->sc_req = req;
-       __clear_bit(RPCRDMA_REQ_F_TX_RESOURCES, &req->rl_flags);
+       kref_init(&req->rl_kref);
 
+       ret = -EIO;
        if (!rpcrdma_prepare_hdr_sge(r_xprt, req, hdrlen))
-               return -EIO;
-
+               goto err;
        if (rtype != rpcrdma_areadch)
                if (!rpcrdma_prepare_msg_sges(r_xprt, req, xdr, rtype))
-                       return -EIO;
-
+                       goto err;
        return 0;
+
+err:
+       trace_xprtrdma_prepsend_failed(&req->rl_slot, ret);
+       return ret;
 }
 
 /**
@@ -867,25 +884,18 @@ rpcrdma_marshal_req(struct rpcrdma_xprt *r_xprt, struct rpc_rqst *rqst)
        if (ret)
                goto out_err;
 
-       trace_xprtrdma_marshal(rqst, xdr_stream_pos(xdr), rtype, wtype);
-
-       ret = rpcrdma_prepare_send_sges(r_xprt, req, xdr_stream_pos(xdr),
+       ret = rpcrdma_prepare_send_sges(r_xprt, req, req->rl_hdrbuf.len,
                                        &rqst->rq_snd_buf, rtype);
        if (ret)
                goto out_err;
+
+       trace_xprtrdma_marshal(req, rtype, wtype);
        return 0;
 
 out_err:
        trace_xprtrdma_marshal_failed(rqst, ret);
-       switch (ret) {
-       case -EAGAIN:
-               xprt_wait_for_buffer_space(rqst->rq_xprt);
-               break;
-       case -ENOBUFS:
-               break;
-       default:
-               r_xprt->rx_stats.failed_marshal_count++;
-       }
+       r_xprt->rx_stats.failed_marshal_count++;
+       frwr_reset(req);
        return ret;
 }
 
@@ -1269,51 +1279,17 @@ void rpcrdma_complete_rqst(struct rpcrdma_rep *rep)
        goto out;
 }
 
-void rpcrdma_release_rqst(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req)
-{
-       /* Invalidate and unmap the data payloads before waking
-        * the waiting application. This guarantees the memory
-        * regions are properly fenced from the server before the
-        * application accesses the data. It also ensures proper
-        * send flow control: waking the next RPC waits until this
-        * RPC has relinquished all its Send Queue entries.
-        */
-       if (!list_empty(&req->rl_registered))
-               frwr_unmap_sync(r_xprt, &req->rl_registered);
-
-       /* Ensure that any DMA mapped pages associated with
-        * the Send of the RPC Call have been unmapped before
-        * allowing the RPC to complete. This protects argument
-        * memory not controlled by the RPC client from being
-        * re-used before we're done with it.
-        */
-       if (test_bit(RPCRDMA_REQ_F_TX_RESOURCES, &req->rl_flags)) {
-               r_xprt->rx_stats.reply_waits_for_send++;
-               out_of_line_wait_on_bit(&req->rl_flags,
-                                       RPCRDMA_REQ_F_TX_RESOURCES,
-                                       bit_wait,
-                                       TASK_UNINTERRUPTIBLE);
-       }
-}
-
-/* Reply handling runs in the poll worker thread. Anything that
- * might wait is deferred to a separate workqueue.
- */
-void rpcrdma_deferred_completion(struct work_struct *work)
+static void rpcrdma_reply_done(struct kref *kref)
 {
-       struct rpcrdma_rep *rep =
-                       container_of(work, struct rpcrdma_rep, rr_work);
-       struct rpcrdma_req *req = rpcr_to_rdmar(rep->rr_rqst);
-       struct rpcrdma_xprt *r_xprt = rep->rr_rxprt;
+       struct rpcrdma_req *req =
+               container_of(kref, struct rpcrdma_req, rl_kref);
 
-       trace_xprtrdma_defer_cmp(rep);
-       if (rep->rr_wc_flags & IB_WC_WITH_INVALIDATE)
-               frwr_reminv(rep, &req->rl_registered);
-       rpcrdma_release_rqst(r_xprt, req);
-       rpcrdma_complete_rqst(rep);
+       rpcrdma_complete_rqst(req->rl_reply);
 }
 
-/* Process received RPC/RDMA messages.
+/**
+ * rpcrdma_reply_handler - Process received RPC/RDMA messages
+ * @rep: Incoming rpcrdma_rep object to process
  *
  * Errors must result in the RPC task either being awakened, or
  * allowed to timeout, to discover the errors at that time.
@@ -1373,10 +1349,16 @@ void rpcrdma_reply_handler(struct rpcrdma_rep *rep)
        }
        req->rl_reply = rep;
        rep->rr_rqst = rqst;
-       clear_bit(RPCRDMA_REQ_F_PENDING, &req->rl_flags);
 
        trace_xprtrdma_reply(rqst->rq_task, rep, req, credits);
-       queue_work(buf->rb_completion_wq, &rep->rr_work);
+
+       if (rep->rr_wc_flags & IB_WC_WITH_INVALIDATE)
+               frwr_reminv(rep, &req->rl_registered);
+       if (!list_empty(&req->rl_registered))
+               frwr_unmap_async(r_xprt, req);
+               /* LocalInv completion will complete the RPC */
+       else
+               kref_put(&req->rl_kref, rpcrdma_reply_done);
        return;
 
 out_badversion: