]> asedeno.scripts.mit.edu Git - linux.git/blobdiff - net/sunrpc/xprtrdma/verbs.c
Merge tag 'drm-misc-fixes-2019-12-11' of git://anongit.freedesktop.org/drm/drm-misc...
[linux.git] / net / sunrpc / xprtrdma / verbs.c
index 3a1a31c3d7919de56624c824f8573302395ddd64..77c7dd7f05e8be8a9793eccee4ba77d4aa6ca15b 100644 (file)
 /*
  * internal functions
  */
-static void rpcrdma_sendctx_put_locked(struct rpcrdma_sendctx *sc);
+static void rpcrdma_sendctx_put_locked(struct rpcrdma_xprt *r_xprt,
+                                      struct rpcrdma_sendctx *sc);
 static void rpcrdma_reqs_reset(struct rpcrdma_xprt *r_xprt);
 static void rpcrdma_reps_destroy(struct rpcrdma_buffer *buf);
 static void rpcrdma_mrs_create(struct rpcrdma_xprt *r_xprt);
-static void rpcrdma_mrs_destroy(struct rpcrdma_buffer *buf);
-static void rpcrdma_mr_free(struct rpcrdma_mr *mr);
+static void rpcrdma_mrs_destroy(struct rpcrdma_xprt *r_xprt);
 static struct rpcrdma_regbuf *
 rpcrdma_regbuf_alloc(size_t size, enum dma_data_direction direction,
                     gfp_t flags);
@@ -125,7 +125,7 @@ rpcrdma_qp_event_handler(struct ib_event *event, void *context)
 
 /**
  * rpcrdma_wc_send - Invoked by RDMA provider for each polled Send WC
- * @cq:        completion queue (ignored)
+ * @cq:        completion queue
  * @wc:        completed WR
  *
  */
@@ -138,7 +138,7 @@ rpcrdma_wc_send(struct ib_cq *cq, struct ib_wc *wc)
 
        /* WARNING: Only wr_cqe and status are reliable at this point */
        trace_xprtrdma_wc_send(sc, wc);
-       rpcrdma_sendctx_put_locked(sc);
+       rpcrdma_sendctx_put_locked((struct rpcrdma_xprt *)cq->cq_context, sc);
 }
 
 /**
@@ -177,11 +177,11 @@ rpcrdma_wc_receive(struct ib_cq *cq, struct ib_wc *wc)
        rpcrdma_recv_buffer_put(rep);
 }
 
-static void
-rpcrdma_update_connect_private(struct rpcrdma_xprt *r_xprt,
-                              struct rdma_conn_param *param)
+static void rpcrdma_update_cm_private(struct rpcrdma_xprt *r_xprt,
+                                     struct rdma_conn_param *param)
 {
        const struct rpcrdma_connect_private *pmsg = param->private_data;
+       struct rpcrdma_ep *ep = &r_xprt->rx_ep;
        unsigned int rsize, wsize;
 
        /* Default settings for RPC-over-RDMA Version One */
@@ -197,13 +197,11 @@ rpcrdma_update_connect_private(struct rpcrdma_xprt *r_xprt,
                wsize = rpcrdma_decode_buffer_size(pmsg->cp_recv_size);
        }
 
-       if (rsize < r_xprt->rx_ep.rep_inline_recv)
-               r_xprt->rx_ep.rep_inline_recv = rsize;
-       if (wsize < r_xprt->rx_ep.rep_inline_send)
-               r_xprt->rx_ep.rep_inline_send = wsize;
-       dprintk("RPC:       %s: max send %u, max recv %u\n", __func__,
-               r_xprt->rx_ep.rep_inline_send,
-               r_xprt->rx_ep.rep_inline_recv);
+       if (rsize < ep->rep_inline_recv)
+               ep->rep_inline_recv = rsize;
+       if (wsize < ep->rep_inline_send)
+               ep->rep_inline_send = wsize;
+
        rpcrdma_set_max_header_sizes(r_xprt);
 }
 
@@ -257,7 +255,8 @@ rpcrdma_cm_event_handler(struct rdma_cm_id *id, struct rdma_cm_event *event)
        case RDMA_CM_EVENT_ESTABLISHED:
                ++xprt->connect_cookie;
                ep->rep_connected = 1;
-               rpcrdma_update_connect_private(r_xprt, &event->param.conn);
+               rpcrdma_update_cm_private(r_xprt, &event->param.conn);
+               trace_xprtrdma_inline_thresh(r_xprt);
                wake_up_all(&ep->rep_connect_wait);
                break;
        case RDMA_CM_EVENT_CONNECT_ERROR:
@@ -297,8 +296,6 @@ rpcrdma_create_id(struct rpcrdma_xprt *xprt, struct rpcrdma_ia *ia)
        struct rdma_cm_id *id;
        int rc;
 
-       trace_xprtrdma_conn_start(xprt);
-
        init_completion(&ia->ri_done);
        init_completion(&ia->ri_remove_done);
 
@@ -314,10 +311,8 @@ rpcrdma_create_id(struct rpcrdma_xprt *xprt, struct rpcrdma_ia *ia)
        if (rc)
                goto out;
        rc = wait_for_completion_interruptible_timeout(&ia->ri_done, wtimeout);
-       if (rc < 0) {
-               trace_xprtrdma_conn_tout(xprt);
+       if (rc < 0)
                goto out;
-       }
 
        rc = ia->ri_async_rc;
        if (rc)
@@ -328,10 +323,8 @@ rpcrdma_create_id(struct rpcrdma_xprt *xprt, struct rpcrdma_ia *ia)
        if (rc)
                goto out;
        rc = wait_for_completion_interruptible_timeout(&ia->ri_done, wtimeout);
-       if (rc < 0) {
-               trace_xprtrdma_conn_tout(xprt);
+       if (rc < 0)
                goto out;
-       }
        rc = ia->ri_async_rc;
        if (rc)
                goto out;
@@ -408,8 +401,6 @@ rpcrdma_ia_remove(struct rpcrdma_ia *ia)
        struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
        struct rpcrdma_req *req;
 
-       cancel_work_sync(&buf->rb_refresh_worker);
-
        /* This is similar to rpcrdma_ep_destroy, but:
         * - Don't cancel the connect worker.
         * - Don't call rpcrdma_ep_disconnect, which waits
@@ -436,7 +427,7 @@ rpcrdma_ia_remove(struct rpcrdma_ia *ia)
                rpcrdma_regbuf_dma_unmap(req->rl_sendbuf);
                rpcrdma_regbuf_dma_unmap(req->rl_recvbuf);
        }
-       rpcrdma_mrs_destroy(buf);
+       rpcrdma_mrs_destroy(r_xprt);
        ib_dealloc_pd(ia->ri_pd);
        ia->ri_pd = NULL;
 
@@ -521,7 +512,7 @@ int rpcrdma_ep_create(struct rpcrdma_xprt *r_xprt)
        init_waitqueue_head(&ep->rep_connect_wait);
        ep->rep_receive_count = 0;
 
-       sendcq = ib_alloc_cq_any(ia->ri_id->device, NULL,
+       sendcq = ib_alloc_cq_any(ia->ri_id->device, r_xprt,
                                 ep->rep_attr.cap.max_send_wr + 1,
                                 IB_POLL_WORKQUEUE);
        if (IS_ERR(sendcq)) {
@@ -629,8 +620,6 @@ static int rpcrdma_ep_recreate_xprt(struct rpcrdma_xprt *r_xprt,
                pr_err("rpcrdma: rdma_create_qp returned %d\n", err);
                goto out3;
        }
-
-       rpcrdma_mrs_create(r_xprt);
        return 0;
 
 out3:
@@ -648,8 +637,6 @@ static int rpcrdma_ep_reconnect(struct rpcrdma_xprt *r_xprt,
        struct rdma_cm_id *id, *old;
        int err, rc;
 
-       trace_xprtrdma_reconnect(r_xprt);
-
        rpcrdma_ep_disconnect(&r_xprt->rx_ep, ia);
 
        rc = -EHOSTUNREACH;
@@ -704,7 +691,6 @@ rpcrdma_ep_connect(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
        memcpy(&qp_init_attr, &ep->rep_attr, sizeof(qp_init_attr));
        switch (ep->rep_connected) {
        case 0:
-               dprintk("RPC:       %s: connecting...\n", __func__);
                rc = rdma_create_qp(ia->ri_id, ia->ri_pd, &qp_init_attr);
                if (rc) {
                        rc = -ENETUNREACH;
@@ -742,13 +728,14 @@ rpcrdma_ep_connect(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
                goto out;
        }
 
-       dprintk("RPC:       %s: connected\n", __func__);
+       rpcrdma_mrs_create(r_xprt);
 
 out:
        if (rc)
                ep->rep_connected = rc;
 
 out_noupdate:
+       trace_xprtrdma_connect(r_xprt, rc);
        return rc;
 }
 
@@ -757,11 +744,8 @@ rpcrdma_ep_connect(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
  * @ep: endpoint to disconnect
  * @ia: associated interface adapter
  *
- * This is separate from destroy to facilitate the ability
- * to reconnect without recreating the endpoint.
- *
- * This call is not reentrant, and must not be made in parallel
- * on the same endpoint.
+ * Caller serializes. Either the transport send lock is held,
+ * or we're being called to destroy the transport.
  */
 void
 rpcrdma_ep_disconnect(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
@@ -781,6 +765,7 @@ rpcrdma_ep_disconnect(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
 
        rpcrdma_xprt_drain(r_xprt);
        rpcrdma_reqs_reset(r_xprt);
+       rpcrdma_mrs_destroy(r_xprt);
 }
 
 /* Fixed-size circular FIFO queue. This implementation is wait-free and
@@ -818,9 +803,6 @@ static struct rpcrdma_sendctx *rpcrdma_sendctx_create(struct rpcrdma_ia *ia)
        if (!sc)
                return NULL;
 
-       sc->sc_wr.wr_cqe = &sc->sc_cqe;
-       sc->sc_wr.sg_list = sc->sc_sges;
-       sc->sc_wr.opcode = IB_WR_SEND;
        sc->sc_cqe.done = rpcrdma_wc_send;
        return sc;
 }
@@ -848,7 +830,6 @@ static int rpcrdma_sendctxs_create(struct rpcrdma_xprt *r_xprt)
                if (!sc)
                        return -ENOMEM;
 
-               sc->sc_xprt = r_xprt;
                buf->rb_sc_ctxs[i] = sc;
        }
 
@@ -911,6 +892,7 @@ struct rpcrdma_sendctx *rpcrdma_sendctx_get_locked(struct rpcrdma_xprt *r_xprt)
 
 /**
  * rpcrdma_sendctx_put_locked - Release a send context
+ * @r_xprt: controlling transport instance
  * @sc: send context to release
  *
  * Usage: Called from Send completion to return a sendctxt
@@ -918,10 +900,10 @@ struct rpcrdma_sendctx *rpcrdma_sendctx_get_locked(struct rpcrdma_xprt *r_xprt)
  *
  * The caller serializes calls to this function (per transport).
  */
-static void
-rpcrdma_sendctx_put_locked(struct rpcrdma_sendctx *sc)
+static void rpcrdma_sendctx_put_locked(struct rpcrdma_xprt *r_xprt,
+                                      struct rpcrdma_sendctx *sc)
 {
-       struct rpcrdma_buffer *buf = &sc->sc_xprt->rx_buf;
+       struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
        unsigned long next_tail;
 
        /* Unmap SGEs of previously completed but unsignaled
@@ -939,7 +921,7 @@ rpcrdma_sendctx_put_locked(struct rpcrdma_sendctx *sc)
        /* Paired with READ_ONCE */
        smp_store_release(&buf->rb_sc_tail, next_tail);
 
-       xprt_write_space(&sc->sc_xprt->rx_xprt);
+       xprt_write_space(&r_xprt->rx_xprt);
 }
 
 static void
@@ -966,7 +948,7 @@ rpcrdma_mrs_create(struct rpcrdma_xprt *r_xprt)
                mr->mr_xprt = r_xprt;
 
                spin_lock(&buf->rb_lock);
-               list_add(&mr->mr_list, &buf->rb_mrs);
+               rpcrdma_mr_push(mr, &buf->rb_mrs);
                list_add(&mr->mr_all, &buf->rb_all_mrs);
                spin_unlock(&buf->rb_lock);
        }
@@ -987,6 +969,28 @@ rpcrdma_mr_refresh_worker(struct work_struct *work)
        xprt_write_space(&r_xprt->rx_xprt);
 }
 
+/**
+ * rpcrdma_mrs_refresh - Wake the MR refresh worker
+ * @r_xprt: controlling transport instance
+ *
+ */
+void rpcrdma_mrs_refresh(struct rpcrdma_xprt *r_xprt)
+{
+       struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
+       struct rpcrdma_ep *ep = &r_xprt->rx_ep;
+
+       /* If there is no underlying device, it's no use to
+        * wake the refresh worker.
+        */
+       if (ep->rep_connected != -ENODEV) {
+               /* The work is scheduled on a WQ_MEM_RECLAIM
+                * workqueue in order to prevent MR allocation
+                * from recursing into NFS during direct reclaim.
+                */
+               queue_work(xprtiod_workqueue, &buf->rb_refresh_worker);
+       }
+}
+
 /**
  * rpcrdma_req_create - Allocate an rpcrdma_req object
  * @r_xprt: controlling r_xprt
@@ -1146,8 +1150,6 @@ int rpcrdma_buffer_create(struct rpcrdma_xprt *r_xprt)
        INIT_LIST_HEAD(&buf->rb_all_mrs);
        INIT_WORK(&buf->rb_refresh_worker, rpcrdma_mr_refresh_worker);
 
-       rpcrdma_mrs_create(r_xprt);
-
        INIT_LIST_HEAD(&buf->rb_send_bufs);
        INIT_LIST_HEAD(&buf->rb_allreqs);
 
@@ -1155,7 +1157,7 @@ int rpcrdma_buffer_create(struct rpcrdma_xprt *r_xprt)
        for (i = 0; i < buf->rb_max_requests; i++) {
                struct rpcrdma_req *req;
 
-               req = rpcrdma_req_create(r_xprt, RPCRDMA_V1_DEF_INLINE_SIZE,
+               req = rpcrdma_req_create(r_xprt, RPCRDMA_V1_DEF_INLINE_SIZE * 2,
                                         GFP_KERNEL);
                if (!req)
                        goto out;
@@ -1178,15 +1180,24 @@ int rpcrdma_buffer_create(struct rpcrdma_xprt *r_xprt)
  * rpcrdma_req_destroy - Destroy an rpcrdma_req object
  * @req: unused object to be destroyed
  *
- * This function assumes that the caller prevents concurrent device
- * unload and transport tear-down.
+ * Relies on caller holding the transport send lock to protect
+ * removing req->rl_all from buf->rb_all_reqs safely.
  */
 void rpcrdma_req_destroy(struct rpcrdma_req *req)
 {
+       struct rpcrdma_mr *mr;
+
        list_del(&req->rl_all);
 
-       while (!list_empty(&req->rl_free_mrs))
-               rpcrdma_mr_free(rpcrdma_mr_pop(&req->rl_free_mrs));
+       while ((mr = rpcrdma_mr_pop(&req->rl_free_mrs))) {
+               struct rpcrdma_buffer *buf = &mr->mr_xprt->rx_buf;
+
+               spin_lock(&buf->rb_lock);
+               list_del(&mr->mr_all);
+               spin_unlock(&buf->rb_lock);
+
+               frwr_release_mr(mr);
+       }
 
        rpcrdma_regbuf_free(req->rl_recvbuf);
        rpcrdma_regbuf_free(req->rl_sendbuf);
@@ -1194,28 +1205,33 @@ void rpcrdma_req_destroy(struct rpcrdma_req *req)
        kfree(req);
 }
 
-static void
-rpcrdma_mrs_destroy(struct rpcrdma_buffer *buf)
+/**
+ * rpcrdma_mrs_destroy - Release all of a transport's MRs
+ * @r_xprt: controlling transport instance
+ *
+ * Relies on caller holding the transport send lock to protect
+ * removing mr->mr_list from req->rl_free_mrs safely.
+ */
+static void rpcrdma_mrs_destroy(struct rpcrdma_xprt *r_xprt)
 {
-       struct rpcrdma_xprt *r_xprt = container_of(buf, struct rpcrdma_xprt,
-                                                  rx_buf);
+       struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
        struct rpcrdma_mr *mr;
-       unsigned int count;
 
-       count = 0;
+       cancel_work_sync(&buf->rb_refresh_worker);
+
        spin_lock(&buf->rb_lock);
        while ((mr = list_first_entry_or_null(&buf->rb_all_mrs,
                                              struct rpcrdma_mr,
                                              mr_all)) != NULL) {
+               list_del(&mr->mr_list);
                list_del(&mr->mr_all);
                spin_unlock(&buf->rb_lock);
 
                frwr_release_mr(mr);
-               count++;
+
                spin_lock(&buf->rb_lock);
        }
        spin_unlock(&buf->rb_lock);
-       r_xprt->rx_stats.mrs_allocated = 0;
 }
 
 /**
@@ -1229,8 +1245,6 @@ rpcrdma_mrs_destroy(struct rpcrdma_buffer *buf)
 void
 rpcrdma_buffer_destroy(struct rpcrdma_buffer *buf)
 {
-       cancel_work_sync(&buf->rb_refresh_worker);
-
        rpcrdma_sendctxs_destroy(buf);
        rpcrdma_reps_destroy(buf);
 
@@ -1242,8 +1256,6 @@ rpcrdma_buffer_destroy(struct rpcrdma_buffer *buf)
                list_del(&req->rl_list);
                rpcrdma_req_destroy(req);
        }
-
-       rpcrdma_mrs_destroy(buf);
 }
 
 /**
@@ -1284,17 +1296,6 @@ void rpcrdma_mr_put(struct rpcrdma_mr *mr)
        rpcrdma_mr_push(mr, &mr->mr_req->rl_free_mrs);
 }
 
-static void rpcrdma_mr_free(struct rpcrdma_mr *mr)
-{
-       struct rpcrdma_xprt *r_xprt = mr->mr_xprt;
-       struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
-
-       mr->mr_req = NULL;
-       spin_lock(&buf->rb_lock);
-       rpcrdma_mr_push(mr, &buf->rb_mrs);
-       spin_unlock(&buf->rb_lock);
-}
-
 /**
  * rpcrdma_buffer_get - Get a request buffer
  * @buffers: Buffer pool from which to obtain a buffer
@@ -1457,7 +1458,7 @@ rpcrdma_ep_post(struct rpcrdma_ia *ia,
                struct rpcrdma_ep *ep,
                struct rpcrdma_req *req)
 {
-       struct ib_send_wr *send_wr = &req->rl_sendctx->sc_wr;
+       struct ib_send_wr *send_wr = &req->rl_wr;
        int rc;
 
        if (!ep->rep_send_count || kref_read(&req->rl_kref) > 1) {