]> asedeno.scripts.mit.edu Git - linux.git/blobdiff - net/sunrpc/xprtrdma/verbs.c
Merge tag 'drm-misc-fixes-2019-12-11' of git://anongit.freedesktop.org/drm/drm-misc...
[linux.git] / net / sunrpc / xprtrdma / verbs.c
index 3a907537e2cf31cf471d7cb92dfbe85ebcc9d706..77c7dd7f05e8be8a9793eccee4ba77d4aa6ca15b 100644 (file)
 /*
  * internal functions
  */
-static void rpcrdma_sendctx_put_locked(struct rpcrdma_sendctx *sc);
+static void rpcrdma_sendctx_put_locked(struct rpcrdma_xprt *r_xprt,
+                                      struct rpcrdma_sendctx *sc);
+static void rpcrdma_reqs_reset(struct rpcrdma_xprt *r_xprt);
 static void rpcrdma_reps_destroy(struct rpcrdma_buffer *buf);
 static void rpcrdma_mrs_create(struct rpcrdma_xprt *r_xprt);
-static void rpcrdma_mrs_destroy(struct rpcrdma_buffer *buf);
-static void rpcrdma_mr_free(struct rpcrdma_mr *mr);
+static void rpcrdma_mrs_destroy(struct rpcrdma_xprt *r_xprt);
 static struct rpcrdma_regbuf *
 rpcrdma_regbuf_alloc(size_t size, enum dma_data_direction direction,
                     gfp_t flags);
 static void rpcrdma_regbuf_dma_unmap(struct rpcrdma_regbuf *rb);
 static void rpcrdma_regbuf_free(struct rpcrdma_regbuf *rb);
-static void rpcrdma_post_recvs(struct rpcrdma_xprt *r_xprt, bool temp);
 
 /* Wait for outstanding transport work to finish. ib_drain_qp
  * handles the drains in the wrong order for us, so open code
@@ -125,7 +125,7 @@ rpcrdma_qp_event_handler(struct ib_event *event, void *context)
 
 /**
  * rpcrdma_wc_send - Invoked by RDMA provider for each polled Send WC
- * @cq:        completion queue (ignored)
+ * @cq:        completion queue
  * @wc:        completed WR
  *
  */
@@ -138,7 +138,7 @@ rpcrdma_wc_send(struct ib_cq *cq, struct ib_wc *wc)
 
        /* WARNING: Only wr_cqe and status are reliable at this point */
        trace_xprtrdma_wc_send(sc, wc);
-       rpcrdma_sendctx_put_locked(sc);
+       rpcrdma_sendctx_put_locked((struct rpcrdma_xprt *)cq->cq_context, sc);
 }
 
 /**
@@ -170,7 +170,6 @@ rpcrdma_wc_receive(struct ib_cq *cq, struct ib_wc *wc)
                                   rdmab_addr(rep->rr_rdmabuf),
                                   wc->byte_len, DMA_FROM_DEVICE);
 
-       rpcrdma_post_recvs(r_xprt, false);
        rpcrdma_reply_handler(rep);
        return;
 
@@ -178,11 +177,11 @@ rpcrdma_wc_receive(struct ib_cq *cq, struct ib_wc *wc)
        rpcrdma_recv_buffer_put(rep);
 }
 
-static void
-rpcrdma_update_connect_private(struct rpcrdma_xprt *r_xprt,
-                              struct rdma_conn_param *param)
+static void rpcrdma_update_cm_private(struct rpcrdma_xprt *r_xprt,
+                                     struct rdma_conn_param *param)
 {
        const struct rpcrdma_connect_private *pmsg = param->private_data;
+       struct rpcrdma_ep *ep = &r_xprt->rx_ep;
        unsigned int rsize, wsize;
 
        /* Default settings for RPC-over-RDMA Version One */
@@ -198,13 +197,11 @@ rpcrdma_update_connect_private(struct rpcrdma_xprt *r_xprt,
                wsize = rpcrdma_decode_buffer_size(pmsg->cp_recv_size);
        }
 
-       if (rsize < r_xprt->rx_ep.rep_inline_recv)
-               r_xprt->rx_ep.rep_inline_recv = rsize;
-       if (wsize < r_xprt->rx_ep.rep_inline_send)
-               r_xprt->rx_ep.rep_inline_send = wsize;
-       dprintk("RPC:       %s: max send %u, max recv %u\n", __func__,
-               r_xprt->rx_ep.rep_inline_send,
-               r_xprt->rx_ep.rep_inline_recv);
+       if (rsize < ep->rep_inline_recv)
+               ep->rep_inline_recv = rsize;
+       if (wsize < ep->rep_inline_send)
+               ep->rep_inline_send = wsize;
+
        rpcrdma_set_max_header_sizes(r_xprt);
 }
 
@@ -258,7 +255,8 @@ rpcrdma_cm_event_handler(struct rdma_cm_id *id, struct rdma_cm_event *event)
        case RDMA_CM_EVENT_ESTABLISHED:
                ++xprt->connect_cookie;
                ep->rep_connected = 1;
-               rpcrdma_update_connect_private(r_xprt, &event->param.conn);
+               rpcrdma_update_cm_private(r_xprt, &event->param.conn);
+               trace_xprtrdma_inline_thresh(r_xprt);
                wake_up_all(&ep->rep_connect_wait);
                break;
        case RDMA_CM_EVENT_CONNECT_ERROR:
@@ -298,8 +296,6 @@ rpcrdma_create_id(struct rpcrdma_xprt *xprt, struct rpcrdma_ia *ia)
        struct rdma_cm_id *id;
        int rc;
 
-       trace_xprtrdma_conn_start(xprt);
-
        init_completion(&ia->ri_done);
        init_completion(&ia->ri_remove_done);
 
@@ -315,10 +311,8 @@ rpcrdma_create_id(struct rpcrdma_xprt *xprt, struct rpcrdma_ia *ia)
        if (rc)
                goto out;
        rc = wait_for_completion_interruptible_timeout(&ia->ri_done, wtimeout);
-       if (rc < 0) {
-               trace_xprtrdma_conn_tout(xprt);
+       if (rc < 0)
                goto out;
-       }
 
        rc = ia->ri_async_rc;
        if (rc)
@@ -329,10 +323,8 @@ rpcrdma_create_id(struct rpcrdma_xprt *xprt, struct rpcrdma_ia *ia)
        if (rc)
                goto out;
        rc = wait_for_completion_interruptible_timeout(&ia->ri_done, wtimeout);
-       if (rc < 0) {
-               trace_xprtrdma_conn_tout(xprt);
+       if (rc < 0)
                goto out;
-       }
        rc = ia->ri_async_rc;
        if (rc)
                goto out;
@@ -409,8 +401,6 @@ rpcrdma_ia_remove(struct rpcrdma_ia *ia)
        struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
        struct rpcrdma_req *req;
 
-       cancel_work_sync(&buf->rb_refresh_worker);
-
        /* This is similar to rpcrdma_ep_destroy, but:
         * - Don't cancel the connect worker.
         * - Don't call rpcrdma_ep_disconnect, which waits
@@ -437,7 +427,7 @@ rpcrdma_ia_remove(struct rpcrdma_ia *ia)
                rpcrdma_regbuf_dma_unmap(req->rl_sendbuf);
                rpcrdma_regbuf_dma_unmap(req->rl_recvbuf);
        }
-       rpcrdma_mrs_destroy(buf);
+       rpcrdma_mrs_destroy(r_xprt);
        ib_dealloc_pd(ia->ri_pd);
        ia->ri_pd = NULL;
 
@@ -522,7 +512,7 @@ int rpcrdma_ep_create(struct rpcrdma_xprt *r_xprt)
        init_waitqueue_head(&ep->rep_connect_wait);
        ep->rep_receive_count = 0;
 
-       sendcq = ib_alloc_cq_any(ia->ri_id->device, NULL,
+       sendcq = ib_alloc_cq_any(ia->ri_id->device, r_xprt,
                                 ep->rep_attr.cap.max_send_wr + 1,
                                 IB_POLL_WORKQUEUE);
        if (IS_ERR(sendcq)) {
@@ -630,8 +620,6 @@ static int rpcrdma_ep_recreate_xprt(struct rpcrdma_xprt *r_xprt,
                pr_err("rpcrdma: rdma_create_qp returned %d\n", err);
                goto out3;
        }
-
-       rpcrdma_mrs_create(r_xprt);
        return 0;
 
 out3:
@@ -649,8 +637,6 @@ static int rpcrdma_ep_reconnect(struct rpcrdma_xprt *r_xprt,
        struct rdma_cm_id *id, *old;
        int err, rc;
 
-       trace_xprtrdma_reconnect(r_xprt);
-
        rpcrdma_ep_disconnect(&r_xprt->rx_ep, ia);
 
        rc = -EHOSTUNREACH;
@@ -705,7 +691,6 @@ rpcrdma_ep_connect(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
        memcpy(&qp_init_attr, &ep->rep_attr, sizeof(qp_init_attr));
        switch (ep->rep_connected) {
        case 0:
-               dprintk("RPC:       %s: connecting...\n", __func__);
                rc = rdma_create_qp(ia->ri_id, ia->ri_pd, &qp_init_attr);
                if (rc) {
                        rc = -ENETUNREACH;
@@ -726,6 +711,7 @@ rpcrdma_ep_connect(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
        ep->rep_connected = 0;
        xprt_clear_connected(xprt);
 
+       rpcrdma_reset_cwnd(r_xprt);
        rpcrdma_post_recvs(r_xprt, true);
 
        rc = rdma_connect(ia->ri_id, &ep->rep_remote_cma);
@@ -742,13 +728,14 @@ rpcrdma_ep_connect(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
                goto out;
        }
 
-       dprintk("RPC:       %s: connected\n", __func__);
+       rpcrdma_mrs_create(r_xprt);
 
 out:
        if (rc)
                ep->rep_connected = rc;
 
 out_noupdate:
+       trace_xprtrdma_connect(r_xprt, rc);
        return rc;
 }
 
@@ -757,11 +744,8 @@ rpcrdma_ep_connect(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
  * @ep: endpoint to disconnect
  * @ia: associated interface adapter
  *
- * This is separate from destroy to facilitate the ability
- * to reconnect without recreating the endpoint.
- *
- * This call is not reentrant, and must not be made in parallel
- * on the same endpoint.
+ * Caller serializes. Either the transport send lock is held,
+ * or we're being called to destroy the transport.
  */
 void
 rpcrdma_ep_disconnect(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
@@ -780,6 +764,8 @@ rpcrdma_ep_disconnect(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
        trace_xprtrdma_disconnect(r_xprt, rc);
 
        rpcrdma_xprt_drain(r_xprt);
+       rpcrdma_reqs_reset(r_xprt);
+       rpcrdma_mrs_destroy(r_xprt);
 }
 
 /* Fixed-size circular FIFO queue. This implementation is wait-free and
@@ -817,9 +803,6 @@ static struct rpcrdma_sendctx *rpcrdma_sendctx_create(struct rpcrdma_ia *ia)
        if (!sc)
                return NULL;
 
-       sc->sc_wr.wr_cqe = &sc->sc_cqe;
-       sc->sc_wr.sg_list = sc->sc_sges;
-       sc->sc_wr.opcode = IB_WR_SEND;
        sc->sc_cqe.done = rpcrdma_wc_send;
        return sc;
 }
@@ -847,7 +830,6 @@ static int rpcrdma_sendctxs_create(struct rpcrdma_xprt *r_xprt)
                if (!sc)
                        return -ENOMEM;
 
-               sc->sc_xprt = r_xprt;
                buf->rb_sc_ctxs[i] = sc;
        }
 
@@ -910,6 +892,7 @@ struct rpcrdma_sendctx *rpcrdma_sendctx_get_locked(struct rpcrdma_xprt *r_xprt)
 
 /**
  * rpcrdma_sendctx_put_locked - Release a send context
+ * @r_xprt: controlling transport instance
  * @sc: send context to release
  *
  * Usage: Called from Send completion to return a sendctxt
@@ -917,10 +900,10 @@ struct rpcrdma_sendctx *rpcrdma_sendctx_get_locked(struct rpcrdma_xprt *r_xprt)
  *
  * The caller serializes calls to this function (per transport).
  */
-static void
-rpcrdma_sendctx_put_locked(struct rpcrdma_sendctx *sc)
+static void rpcrdma_sendctx_put_locked(struct rpcrdma_xprt *r_xprt,
+                                      struct rpcrdma_sendctx *sc)
 {
-       struct rpcrdma_buffer *buf = &sc->sc_xprt->rx_buf;
+       struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
        unsigned long next_tail;
 
        /* Unmap SGEs of previously completed but unsignaled
@@ -938,7 +921,7 @@ rpcrdma_sendctx_put_locked(struct rpcrdma_sendctx *sc)
        /* Paired with READ_ONCE */
        smp_store_release(&buf->rb_sc_tail, next_tail);
 
-       xprt_write_space(&sc->sc_xprt->rx_xprt);
+       xprt_write_space(&r_xprt->rx_xprt);
 }
 
 static void
@@ -965,7 +948,7 @@ rpcrdma_mrs_create(struct rpcrdma_xprt *r_xprt)
                mr->mr_xprt = r_xprt;
 
                spin_lock(&buf->rb_lock);
-               list_add(&mr->mr_list, &buf->rb_mrs);
+               rpcrdma_mr_push(mr, &buf->rb_mrs);
                list_add(&mr->mr_all, &buf->rb_all_mrs);
                spin_unlock(&buf->rb_lock);
        }
@@ -986,6 +969,28 @@ rpcrdma_mr_refresh_worker(struct work_struct *work)
        xprt_write_space(&r_xprt->rx_xprt);
 }
 
+/**
+ * rpcrdma_mrs_refresh - Wake the MR refresh worker
+ * @r_xprt: controlling transport instance
+ *
+ */
+void rpcrdma_mrs_refresh(struct rpcrdma_xprt *r_xprt)
+{
+       struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
+       struct rpcrdma_ep *ep = &r_xprt->rx_ep;
+
+       /* If there is no underlying device, it's no use to
+        * wake the refresh worker.
+        */
+       if (ep->rep_connected != -ENODEV) {
+               /* The work is scheduled on a WQ_MEM_RECLAIM
+                * workqueue in order to prevent MR allocation
+                * from recursing into NFS during direct reclaim.
+                */
+               queue_work(xprtiod_workqueue, &buf->rb_refresh_worker);
+       }
+}
+
 /**
  * rpcrdma_req_create - Allocate an rpcrdma_req object
  * @r_xprt: controlling r_xprt
@@ -1042,6 +1047,26 @@ struct rpcrdma_req *rpcrdma_req_create(struct rpcrdma_xprt *r_xprt, size_t size,
        return NULL;
 }
 
+/**
+ * rpcrdma_reqs_reset - Reset all reqs owned by a transport
+ * @r_xprt: controlling transport instance
+ *
+ * ASSUMPTION: the rb_allreqs list is stable for the duration,
+ * and thus can be walked without holding rb_lock. Eg. the
+ * caller is holding the transport send lock to exclude
+ * device removal or disconnection.
+ */
+static void rpcrdma_reqs_reset(struct rpcrdma_xprt *r_xprt)
+{
+       struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
+       struct rpcrdma_req *req;
+
+       list_for_each_entry(req, &buf->rb_allreqs, rl_all) {
+               /* Credits are valid only for one connection */
+               req->rl_slot.rq_cong = 0;
+       }
+}
+
 static struct rpcrdma_rep *rpcrdma_rep_create(struct rpcrdma_xprt *r_xprt,
                                              bool temp)
 {
@@ -1125,8 +1150,6 @@ int rpcrdma_buffer_create(struct rpcrdma_xprt *r_xprt)
        INIT_LIST_HEAD(&buf->rb_all_mrs);
        INIT_WORK(&buf->rb_refresh_worker, rpcrdma_mr_refresh_worker);
 
-       rpcrdma_mrs_create(r_xprt);
-
        INIT_LIST_HEAD(&buf->rb_send_bufs);
        INIT_LIST_HEAD(&buf->rb_allreqs);
 
@@ -1134,14 +1157,13 @@ int rpcrdma_buffer_create(struct rpcrdma_xprt *r_xprt)
        for (i = 0; i < buf->rb_max_requests; i++) {
                struct rpcrdma_req *req;
 
-               req = rpcrdma_req_create(r_xprt, RPCRDMA_V1_DEF_INLINE_SIZE,
+               req = rpcrdma_req_create(r_xprt, RPCRDMA_V1_DEF_INLINE_SIZE * 2,
                                         GFP_KERNEL);
                if (!req)
                        goto out;
                list_add(&req->rl_list, &buf->rb_send_bufs);
        }
 
-       buf->rb_credits = 1;
        init_llist_head(&buf->rb_free_reps);
 
        rc = rpcrdma_sendctxs_create(r_xprt);
@@ -1158,15 +1180,24 @@ int rpcrdma_buffer_create(struct rpcrdma_xprt *r_xprt)
  * rpcrdma_req_destroy - Destroy an rpcrdma_req object
  * @req: unused object to be destroyed
  *
- * This function assumes that the caller prevents concurrent device
- * unload and transport tear-down.
+ * Relies on caller holding the transport send lock to protect
+ * removing req->rl_all from buf->rb_all_reqs safely.
  */
 void rpcrdma_req_destroy(struct rpcrdma_req *req)
 {
+       struct rpcrdma_mr *mr;
+
        list_del(&req->rl_all);
 
-       while (!list_empty(&req->rl_free_mrs))
-               rpcrdma_mr_free(rpcrdma_mr_pop(&req->rl_free_mrs));
+       while ((mr = rpcrdma_mr_pop(&req->rl_free_mrs))) {
+               struct rpcrdma_buffer *buf = &mr->mr_xprt->rx_buf;
+
+               spin_lock(&buf->rb_lock);
+               list_del(&mr->mr_all);
+               spin_unlock(&buf->rb_lock);
+
+               frwr_release_mr(mr);
+       }
 
        rpcrdma_regbuf_free(req->rl_recvbuf);
        rpcrdma_regbuf_free(req->rl_sendbuf);
@@ -1174,28 +1205,33 @@ void rpcrdma_req_destroy(struct rpcrdma_req *req)
        kfree(req);
 }
 
-static void
-rpcrdma_mrs_destroy(struct rpcrdma_buffer *buf)
+/**
+ * rpcrdma_mrs_destroy - Release all of a transport's MRs
+ * @r_xprt: controlling transport instance
+ *
+ * Relies on caller holding the transport send lock to protect
+ * removing mr->mr_list from req->rl_free_mrs safely.
+ */
+static void rpcrdma_mrs_destroy(struct rpcrdma_xprt *r_xprt)
 {
-       struct rpcrdma_xprt *r_xprt = container_of(buf, struct rpcrdma_xprt,
-                                                  rx_buf);
+       struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
        struct rpcrdma_mr *mr;
-       unsigned int count;
 
-       count = 0;
+       cancel_work_sync(&buf->rb_refresh_worker);
+
        spin_lock(&buf->rb_lock);
        while ((mr = list_first_entry_or_null(&buf->rb_all_mrs,
                                              struct rpcrdma_mr,
                                              mr_all)) != NULL) {
+               list_del(&mr->mr_list);
                list_del(&mr->mr_all);
                spin_unlock(&buf->rb_lock);
 
                frwr_release_mr(mr);
-               count++;
+
                spin_lock(&buf->rb_lock);
        }
        spin_unlock(&buf->rb_lock);
-       r_xprt->rx_stats.mrs_allocated = 0;
 }
 
 /**
@@ -1209,8 +1245,6 @@ rpcrdma_mrs_destroy(struct rpcrdma_buffer *buf)
 void
 rpcrdma_buffer_destroy(struct rpcrdma_buffer *buf)
 {
-       cancel_work_sync(&buf->rb_refresh_worker);
-
        rpcrdma_sendctxs_destroy(buf);
        rpcrdma_reps_destroy(buf);
 
@@ -1222,8 +1256,6 @@ rpcrdma_buffer_destroy(struct rpcrdma_buffer *buf)
                list_del(&req->rl_list);
                rpcrdma_req_destroy(req);
        }
-
-       rpcrdma_mrs_destroy(buf);
 }
 
 /**
@@ -1264,17 +1296,6 @@ void rpcrdma_mr_put(struct rpcrdma_mr *mr)
        rpcrdma_mr_push(mr, &mr->mr_req->rl_free_mrs);
 }
 
-static void rpcrdma_mr_free(struct rpcrdma_mr *mr)
-{
-       struct rpcrdma_xprt *r_xprt = mr->mr_xprt;
-       struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
-
-       mr->mr_req = NULL;
-       spin_lock(&buf->rb_lock);
-       rpcrdma_mr_push(mr, &buf->rb_mrs);
-       spin_unlock(&buf->rb_lock);
-}
-
 /**
  * rpcrdma_buffer_get - Get a request buffer
  * @buffers: Buffer pool from which to obtain a buffer
@@ -1437,7 +1458,7 @@ rpcrdma_ep_post(struct rpcrdma_ia *ia,
                struct rpcrdma_ep *ep,
                struct rpcrdma_req *req)
 {
-       struct ib_send_wr *send_wr = &req->rl_sendctx->sc_wr;
+       struct ib_send_wr *send_wr = &req->rl_wr;
        int rc;
 
        if (!ep->rep_send_count || kref_read(&req->rl_kref) > 1) {
@@ -1455,8 +1476,13 @@ rpcrdma_ep_post(struct rpcrdma_ia *ia,
        return 0;
 }
 
-static void
-rpcrdma_post_recvs(struct rpcrdma_xprt *r_xprt, bool temp)
+/**
+ * rpcrdma_post_recvs - Refill the Receive Queue
+ * @r_xprt: controlling transport instance
+ * @temp: mark Receive buffers to be deleted after use
+ *
+ */
+void rpcrdma_post_recvs(struct rpcrdma_xprt *r_xprt, bool temp)
 {
        struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
        struct rpcrdma_ep *ep = &r_xprt->rx_ep;