return 0;
}
+static void rpcrdma_sendctx_done(struct kref *kref)
+{
+ struct rpcrdma_req *req =
+ container_of(kref, struct rpcrdma_req, rl_kref);
+ struct rpcrdma_rep *rep = req->rl_reply;
+
+ rpcrdma_complete_rqst(rep);
+ rep->rr_rxprt->rx_stats.reply_waits_for_send++;
+}
+
/**
* rpcrdma_sendctx_unmap - DMA-unmap Send buffer
* @sc: sendctx containing SGEs to unmap
{
struct ib_sge *sge;
+ if (!sc->sc_unmap_count)
+ return;
+
/* The first two SGEs contain the transport header and
* the inline buffer. These are always left mapped so
* they can be cheaply re-used.
ib_dma_unmap_page(sc->sc_device, sge->addr, sge->length,
DMA_TO_DEVICE);
- if (test_and_clear_bit(RPCRDMA_REQ_F_TX_RESOURCES,
- &sc->sc_req->rl_flags))
- wake_up_bit(&sc->sc_req->rl_flags, RPCRDMA_REQ_F_TX_RESOURCES);
+ kref_put(&sc->sc_req->rl_kref, rpcrdma_sendctx_done);
}
/* Prepare an SGE for the RPC-over-RDMA transport header.
out:
sc->sc_wr.num_sge += sge_no;
if (sc->sc_unmap_count)
- __set_bit(RPCRDMA_REQ_F_TX_RESOURCES, &req->rl_flags);
+ kref_get(&req->rl_kref);
return true;
out_regbuf:
struct rpcrdma_req *req, u32 hdrlen,
struct xdr_buf *xdr, enum rpcrdma_chunktype rtype)
{
+ int ret;
+
+ ret = -EAGAIN;
req->rl_sendctx = rpcrdma_sendctx_get_locked(r_xprt);
if (!req->rl_sendctx)
- return -EAGAIN;
+ goto err;
req->rl_sendctx->sc_wr.num_sge = 0;
req->rl_sendctx->sc_unmap_count = 0;
req->rl_sendctx->sc_req = req;
- __clear_bit(RPCRDMA_REQ_F_TX_RESOURCES, &req->rl_flags);
+ kref_init(&req->rl_kref);
+ ret = -EIO;
if (!rpcrdma_prepare_hdr_sge(r_xprt, req, hdrlen))
- return -EIO;
-
+ goto err;
if (rtype != rpcrdma_areadch)
if (!rpcrdma_prepare_msg_sges(r_xprt, req, xdr, rtype))
- return -EIO;
-
+ goto err;
return 0;
+
+err:
+ trace_xprtrdma_prepsend_failed(&req->rl_slot, ret);
+ return ret;
}
/**
if (ret)
goto out_err;
- trace_xprtrdma_marshal(rqst, xdr_stream_pos(xdr), rtype, wtype);
-
- ret = rpcrdma_prepare_send_sges(r_xprt, req, xdr_stream_pos(xdr),
+ ret = rpcrdma_prepare_send_sges(r_xprt, req, req->rl_hdrbuf.len,
&rqst->rq_snd_buf, rtype);
if (ret)
goto out_err;
+
+ trace_xprtrdma_marshal(req, rtype, wtype);
return 0;
out_err:
trace_xprtrdma_marshal_failed(rqst, ret);
- switch (ret) {
- case -EAGAIN:
- xprt_wait_for_buffer_space(rqst->rq_xprt);
- break;
- case -ENOBUFS:
- break;
- default:
- r_xprt->rx_stats.failed_marshal_count++;
- }
+ r_xprt->rx_stats.failed_marshal_count++;
+ frwr_reset(req);
return ret;
}
goto out;
}
-void rpcrdma_release_rqst(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req)
-{
- /* Invalidate and unmap the data payloads before waking
- * the waiting application. This guarantees the memory
- * regions are properly fenced from the server before the
- * application accesses the data. It also ensures proper
- * send flow control: waking the next RPC waits until this
- * RPC has relinquished all its Send Queue entries.
- */
- if (!list_empty(&req->rl_registered))
- frwr_unmap_sync(r_xprt, &req->rl_registered);
-
- /* Ensure that any DMA mapped pages associated with
- * the Send of the RPC Call have been unmapped before
- * allowing the RPC to complete. This protects argument
- * memory not controlled by the RPC client from being
- * re-used before we're done with it.
- */
- if (test_bit(RPCRDMA_REQ_F_TX_RESOURCES, &req->rl_flags)) {
- r_xprt->rx_stats.reply_waits_for_send++;
- out_of_line_wait_on_bit(&req->rl_flags,
- RPCRDMA_REQ_F_TX_RESOURCES,
- bit_wait,
- TASK_UNINTERRUPTIBLE);
- }
-}
-
-/* Reply handling runs in the poll worker thread. Anything that
- * might wait is deferred to a separate workqueue.
- */
-void rpcrdma_deferred_completion(struct work_struct *work)
+static void rpcrdma_reply_done(struct kref *kref)
{
- struct rpcrdma_rep *rep =
- container_of(work, struct rpcrdma_rep, rr_work);
- struct rpcrdma_req *req = rpcr_to_rdmar(rep->rr_rqst);
- struct rpcrdma_xprt *r_xprt = rep->rr_rxprt;
+ struct rpcrdma_req *req =
+ container_of(kref, struct rpcrdma_req, rl_kref);
- trace_xprtrdma_defer_cmp(rep);
- if (rep->rr_wc_flags & IB_WC_WITH_INVALIDATE)
- frwr_reminv(rep, &req->rl_registered);
- rpcrdma_release_rqst(r_xprt, req);
- rpcrdma_complete_rqst(rep);
+ rpcrdma_complete_rqst(req->rl_reply);
}
-/* Process received RPC/RDMA messages.
+/**
+ * rpcrdma_reply_handler - Process received RPC/RDMA messages
+ * @rep: Incoming rpcrdma_rep object to process
*
* Errors must result in the RPC task either being awakened, or
* allowed to timeout, to discover the errors at that time.
}
req->rl_reply = rep;
rep->rr_rqst = rqst;
- clear_bit(RPCRDMA_REQ_F_PENDING, &req->rl_flags);
trace_xprtrdma_reply(rqst->rq_task, rep, req, credits);
- queue_work(buf->rb_completion_wq, &rep->rr_work);
+
+ if (rep->rr_wc_flags & IB_WC_WITH_INVALIDATE)
+ frwr_reminv(rep, &req->rl_registered);
+ if (!list_empty(&req->rl_registered))
+ frwr_unmap_async(r_xprt, req);
+ /* LocalInv completion will complete the RPC */
+ else
+ kref_put(&req->rl_kref, rpcrdma_reply_done);
return;
out_badversion: