]> asedeno.scripts.mit.edu Git - linux.git/blob - net/sunrpc/xprtrdma/verbs.c
xprtrdma: Rename rpcrdma_conn_upcall
[linux.git] / net / sunrpc / xprtrdma / verbs.c
1 // SPDX-License-Identifier: GPL-2.0 OR BSD-3-Clause
2 /*
3  * Copyright (c) 2014-2017 Oracle.  All rights reserved.
4  * Copyright (c) 2003-2007 Network Appliance, Inc. All rights reserved.
5  *
6  * This software is available to you under a choice of one of two
7  * licenses.  You may choose to be licensed under the terms of the GNU
8  * General Public License (GPL) Version 2, available from the file
9  * COPYING in the main directory of this source tree, or the BSD-type
10  * license below:
11  *
12  * Redistribution and use in source and binary forms, with or without
13  * modification, are permitted provided that the following conditions
14  * are met:
15  *
16  *      Redistributions of source code must retain the above copyright
17  *      notice, this list of conditions and the following disclaimer.
18  *
19  *      Redistributions in binary form must reproduce the above
20  *      copyright notice, this list of conditions and the following
21  *      disclaimer in the documentation and/or other materials provided
22  *      with the distribution.
23  *
24  *      Neither the name of the Network Appliance, Inc. nor the names of
25  *      its contributors may be used to endorse or promote products
26  *      derived from this software without specific prior written
27  *      permission.
28  *
29  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
30  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
31  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
32  * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
33  * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
34  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
35  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
36  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
37  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
38  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
39  * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
40  */
41
42 /*
43  * verbs.c
44  *
45  * Encapsulates the major functions managing:
46  *  o adapters
47  *  o endpoints
48  *  o connections
49  *  o buffer memory
50  */
51
52 #include <linux/interrupt.h>
53 #include <linux/slab.h>
54 #include <linux/sunrpc/addr.h>
55 #include <linux/sunrpc/svc_rdma.h>
56
57 #include <asm-generic/barrier.h>
58 #include <asm/bitops.h>
59
60 #include <rdma/ib_cm.h>
61
62 #include "xprt_rdma.h"
63 #include <trace/events/rpcrdma.h>
64
65 /*
66  * Globals/Macros
67  */
68
69 #if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
70 # define RPCDBG_FACILITY        RPCDBG_TRANS
71 #endif
72
73 /*
74  * internal functions
75  */
76 static void rpcrdma_sendctx_put_locked(struct rpcrdma_sendctx *sc);
77 static void rpcrdma_mrs_create(struct rpcrdma_xprt *r_xprt);
78 static void rpcrdma_mrs_destroy(struct rpcrdma_buffer *buf);
79 static int rpcrdma_create_rep(struct rpcrdma_xprt *r_xprt, bool temp);
80 static void rpcrdma_dma_unmap_regbuf(struct rpcrdma_regbuf *rb);
81
82 struct workqueue_struct *rpcrdma_receive_wq __read_mostly;
83
84 int
85 rpcrdma_alloc_wq(void)
86 {
87         struct workqueue_struct *recv_wq;
88
89         recv_wq = alloc_workqueue("xprtrdma_receive",
90                                   WQ_MEM_RECLAIM | WQ_HIGHPRI,
91                                   0);
92         if (!recv_wq)
93                 return -ENOMEM;
94
95         rpcrdma_receive_wq = recv_wq;
96         return 0;
97 }
98
99 void
100 rpcrdma_destroy_wq(void)
101 {
102         struct workqueue_struct *wq;
103
104         if (rpcrdma_receive_wq) {
105                 wq = rpcrdma_receive_wq;
106                 rpcrdma_receive_wq = NULL;
107                 destroy_workqueue(wq);
108         }
109 }
110
111 static void
112 rpcrdma_qp_async_error_upcall(struct ib_event *event, void *context)
113 {
114         struct rpcrdma_ep *ep = context;
115         struct rpcrdma_xprt *r_xprt = container_of(ep, struct rpcrdma_xprt,
116                                                    rx_ep);
117
118         trace_xprtrdma_qp_error(r_xprt, event);
119         pr_err("rpcrdma: %s on device %s ep %p\n",
120                ib_event_msg(event->event), event->device->name, context);
121
122         if (ep->rep_connected == 1) {
123                 ep->rep_connected = -EIO;
124                 rpcrdma_conn_func(ep);
125                 wake_up_all(&ep->rep_connect_wait);
126         }
127 }
128
129 /**
130  * rpcrdma_wc_send - Invoked by RDMA provider for each polled Send WC
131  * @cq: completion queue (ignored)
132  * @wc: completed WR
133  *
134  */
135 static void
136 rpcrdma_wc_send(struct ib_cq *cq, struct ib_wc *wc)
137 {
138         struct ib_cqe *cqe = wc->wr_cqe;
139         struct rpcrdma_sendctx *sc =
140                 container_of(cqe, struct rpcrdma_sendctx, sc_cqe);
141
142         /* WARNING: Only wr_cqe and status are reliable at this point */
143         trace_xprtrdma_wc_send(sc, wc);
144         if (wc->status != IB_WC_SUCCESS && wc->status != IB_WC_WR_FLUSH_ERR)
145                 pr_err("rpcrdma: Send: %s (%u/0x%x)\n",
146                        ib_wc_status_msg(wc->status),
147                        wc->status, wc->vendor_err);
148
149         rpcrdma_sendctx_put_locked(sc);
150 }
151
152 /**
153  * rpcrdma_wc_receive - Invoked by RDMA provider for each polled Receive WC
154  * @cq: completion queue (ignored)
155  * @wc: completed WR
156  *
157  */
158 static void
159 rpcrdma_wc_receive(struct ib_cq *cq, struct ib_wc *wc)
160 {
161         struct ib_cqe *cqe = wc->wr_cqe;
162         struct rpcrdma_rep *rep = container_of(cqe, struct rpcrdma_rep,
163                                                rr_cqe);
164
165         /* WARNING: Only wr_id and status are reliable at this point */
166         trace_xprtrdma_wc_receive(wc);
167         if (wc->status != IB_WC_SUCCESS)
168                 goto out_fail;
169
170         /* status == SUCCESS means all fields in wc are trustworthy */
171         rpcrdma_set_xdrlen(&rep->rr_hdrbuf, wc->byte_len);
172         rep->rr_wc_flags = wc->wc_flags;
173         rep->rr_inv_rkey = wc->ex.invalidate_rkey;
174
175         ib_dma_sync_single_for_cpu(rdmab_device(rep->rr_rdmabuf),
176                                    rdmab_addr(rep->rr_rdmabuf),
177                                    wc->byte_len, DMA_FROM_DEVICE);
178
179 out_schedule:
180         rpcrdma_reply_handler(rep);
181         return;
182
183 out_fail:
184         if (wc->status != IB_WC_WR_FLUSH_ERR)
185                 pr_err("rpcrdma: Recv: %s (%u/0x%x)\n",
186                        ib_wc_status_msg(wc->status),
187                        wc->status, wc->vendor_err);
188         rpcrdma_set_xdrlen(&rep->rr_hdrbuf, 0);
189         goto out_schedule;
190 }
191
192 static void
193 rpcrdma_update_connect_private(struct rpcrdma_xprt *r_xprt,
194                                struct rdma_conn_param *param)
195 {
196         struct rpcrdma_create_data_internal *cdata = &r_xprt->rx_data;
197         const struct rpcrdma_connect_private *pmsg = param->private_data;
198         unsigned int rsize, wsize;
199
200         /* Default settings for RPC-over-RDMA Version One */
201         r_xprt->rx_ia.ri_implicit_roundup = xprt_rdma_pad_optimize;
202         rsize = RPCRDMA_V1_DEF_INLINE_SIZE;
203         wsize = RPCRDMA_V1_DEF_INLINE_SIZE;
204
205         if (pmsg &&
206             pmsg->cp_magic == rpcrdma_cmp_magic &&
207             pmsg->cp_version == RPCRDMA_CMP_VERSION) {
208                 r_xprt->rx_ia.ri_implicit_roundup = true;
209                 rsize = rpcrdma_decode_buffer_size(pmsg->cp_send_size);
210                 wsize = rpcrdma_decode_buffer_size(pmsg->cp_recv_size);
211         }
212
213         if (rsize < cdata->inline_rsize)
214                 cdata->inline_rsize = rsize;
215         if (wsize < cdata->inline_wsize)
216                 cdata->inline_wsize = wsize;
217         dprintk("RPC:       %s: max send %u, max recv %u\n",
218                 __func__, cdata->inline_wsize, cdata->inline_rsize);
219         rpcrdma_set_max_header_sizes(r_xprt);
220 }
221
222 /**
223  * rpcrdma_cm_event_handler - Handle RDMA CM events
224  * @id: rdma_cm_id on which an event has occurred
225  * @event: details of the event
226  *
227  * Called with @id's mutex held. Returns 1 if caller should
228  * destroy @id, otherwise 0.
229  */
230 static int
231 rpcrdma_cm_event_handler(struct rdma_cm_id *id, struct rdma_cm_event *event)
232 {
233         struct rpcrdma_xprt *xprt = id->context;
234         struct rpcrdma_ia *ia = &xprt->rx_ia;
235         struct rpcrdma_ep *ep = &xprt->rx_ep;
236         int connstate = 0;
237
238         might_sleep();
239
240         trace_xprtrdma_cm_event(xprt, event);
241         switch (event->event) {
242         case RDMA_CM_EVENT_ADDR_RESOLVED:
243         case RDMA_CM_EVENT_ROUTE_RESOLVED:
244                 ia->ri_async_rc = 0;
245                 complete(&ia->ri_done);
246                 break;
247         case RDMA_CM_EVENT_ADDR_ERROR:
248                 ia->ri_async_rc = -EPROTO;
249                 complete(&ia->ri_done);
250                 break;
251         case RDMA_CM_EVENT_ROUTE_ERROR:
252                 ia->ri_async_rc = -ENETUNREACH;
253                 complete(&ia->ri_done);
254                 break;
255         case RDMA_CM_EVENT_DEVICE_REMOVAL:
256 #if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
257                 pr_info("rpcrdma: removing device %s for %s:%s\n",
258                         ia->ri_device->name,
259                         rpcrdma_addrstr(xprt), rpcrdma_portstr(xprt));
260 #endif
261                 set_bit(RPCRDMA_IAF_REMOVING, &ia->ri_flags);
262                 ep->rep_connected = -ENODEV;
263                 xprt_force_disconnect(&xprt->rx_xprt);
264                 wait_for_completion(&ia->ri_remove_done);
265
266                 ia->ri_id = NULL;
267                 ia->ri_device = NULL;
268                 /* Return 1 to ensure the core destroys the id. */
269                 return 1;
270         case RDMA_CM_EVENT_ESTABLISHED:
271                 ++xprt->rx_xprt.connect_cookie;
272                 connstate = 1;
273                 rpcrdma_update_connect_private(xprt, &event->param.conn);
274                 goto connected;
275         case RDMA_CM_EVENT_CONNECT_ERROR:
276                 connstate = -ENOTCONN;
277                 goto connected;
278         case RDMA_CM_EVENT_UNREACHABLE:
279                 connstate = -ENETUNREACH;
280                 goto connected;
281         case RDMA_CM_EVENT_REJECTED:
282                 dprintk("rpcrdma: connection to %s:%s rejected: %s\n",
283                         rpcrdma_addrstr(xprt), rpcrdma_portstr(xprt),
284                         rdma_reject_msg(id, event->status));
285                 connstate = -ECONNREFUSED;
286                 if (event->status == IB_CM_REJ_STALE_CONN)
287                         connstate = -EAGAIN;
288                 goto connected;
289         case RDMA_CM_EVENT_DISCONNECTED:
290                 ++xprt->rx_xprt.connect_cookie;
291                 connstate = -ECONNABORTED;
292 connected:
293                 ep->rep_connected = connstate;
294                 rpcrdma_conn_func(ep);
295                 wake_up_all(&ep->rep_connect_wait);
296                 /*FALLTHROUGH*/
297         default:
298                 dprintk("RPC:       %s: %s:%s on %s/%s (ep 0x%p): %s\n",
299                         __func__,
300                         rpcrdma_addrstr(xprt), rpcrdma_portstr(xprt),
301                         ia->ri_device->name, ia->ri_ops->ro_displayname,
302                         ep, rdma_event_msg(event->event));
303                 break;
304         }
305
306         return 0;
307 }
308
309 static struct rdma_cm_id *
310 rpcrdma_create_id(struct rpcrdma_xprt *xprt, struct rpcrdma_ia *ia)
311 {
312         unsigned long wtimeout = msecs_to_jiffies(RDMA_RESOLVE_TIMEOUT) + 1;
313         struct rdma_cm_id *id;
314         int rc;
315
316         trace_xprtrdma_conn_start(xprt);
317
318         init_completion(&ia->ri_done);
319         init_completion(&ia->ri_remove_done);
320
321         id = rdma_create_id(xprt->rx_xprt.xprt_net, rpcrdma_cm_event_handler,
322                             xprt, RDMA_PS_TCP, IB_QPT_RC);
323         if (IS_ERR(id)) {
324                 rc = PTR_ERR(id);
325                 dprintk("RPC:       %s: rdma_create_id() failed %i\n",
326                         __func__, rc);
327                 return id;
328         }
329
330         ia->ri_async_rc = -ETIMEDOUT;
331         rc = rdma_resolve_addr(id, NULL,
332                                (struct sockaddr *)&xprt->rx_xprt.addr,
333                                RDMA_RESOLVE_TIMEOUT);
334         if (rc) {
335                 dprintk("RPC:       %s: rdma_resolve_addr() failed %i\n",
336                         __func__, rc);
337                 goto out;
338         }
339         rc = wait_for_completion_interruptible_timeout(&ia->ri_done, wtimeout);
340         if (rc < 0) {
341                 trace_xprtrdma_conn_tout(xprt);
342                 goto out;
343         }
344
345         rc = ia->ri_async_rc;
346         if (rc)
347                 goto out;
348
349         ia->ri_async_rc = -ETIMEDOUT;
350         rc = rdma_resolve_route(id, RDMA_RESOLVE_TIMEOUT);
351         if (rc) {
352                 dprintk("RPC:       %s: rdma_resolve_route() failed %i\n",
353                         __func__, rc);
354                 goto out;
355         }
356         rc = wait_for_completion_interruptible_timeout(&ia->ri_done, wtimeout);
357         if (rc < 0) {
358                 trace_xprtrdma_conn_tout(xprt);
359                 goto out;
360         }
361         rc = ia->ri_async_rc;
362         if (rc)
363                 goto out;
364
365         return id;
366
367 out:
368         rdma_destroy_id(id);
369         return ERR_PTR(rc);
370 }
371
372 /*
373  * Exported functions.
374  */
375
376 /**
377  * rpcrdma_ia_open - Open and initialize an Interface Adapter.
378  * @xprt: transport with IA to (re)initialize
379  *
380  * Returns 0 on success, negative errno if an appropriate
381  * Interface Adapter could not be found and opened.
382  */
383 int
384 rpcrdma_ia_open(struct rpcrdma_xprt *xprt)
385 {
386         struct rpcrdma_ia *ia = &xprt->rx_ia;
387         int rc;
388
389         ia->ri_id = rpcrdma_create_id(xprt, ia);
390         if (IS_ERR(ia->ri_id)) {
391                 rc = PTR_ERR(ia->ri_id);
392                 goto out_err;
393         }
394         ia->ri_device = ia->ri_id->device;
395
396         ia->ri_pd = ib_alloc_pd(ia->ri_device, 0);
397         if (IS_ERR(ia->ri_pd)) {
398                 rc = PTR_ERR(ia->ri_pd);
399                 pr_err("rpcrdma: ib_alloc_pd() returned %d\n", rc);
400                 goto out_err;
401         }
402
403         switch (xprt_rdma_memreg_strategy) {
404         case RPCRDMA_FRWR:
405                 if (frwr_is_supported(ia)) {
406                         ia->ri_ops = &rpcrdma_frwr_memreg_ops;
407                         break;
408                 }
409                 /*FALLTHROUGH*/
410         case RPCRDMA_MTHCAFMR:
411                 if (fmr_is_supported(ia)) {
412                         ia->ri_ops = &rpcrdma_fmr_memreg_ops;
413                         break;
414                 }
415                 /*FALLTHROUGH*/
416         default:
417                 pr_err("rpcrdma: Device %s does not support memreg mode %d\n",
418                        ia->ri_device->name, xprt_rdma_memreg_strategy);
419                 rc = -EINVAL;
420                 goto out_err;
421         }
422
423         return 0;
424
425 out_err:
426         rpcrdma_ia_close(ia);
427         return rc;
428 }
429
430 /**
431  * rpcrdma_ia_remove - Handle device driver unload
432  * @ia: interface adapter being removed
433  *
434  * Divest transport H/W resources associated with this adapter,
435  * but allow it to be restored later.
436  */
437 void
438 rpcrdma_ia_remove(struct rpcrdma_ia *ia)
439 {
440         struct rpcrdma_xprt *r_xprt = container_of(ia, struct rpcrdma_xprt,
441                                                    rx_ia);
442         struct rpcrdma_ep *ep = &r_xprt->rx_ep;
443         struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
444         struct rpcrdma_req *req;
445         struct rpcrdma_rep *rep;
446
447         cancel_delayed_work_sync(&buf->rb_refresh_worker);
448
449         /* This is similar to rpcrdma_ep_destroy, but:
450          * - Don't cancel the connect worker.
451          * - Don't call rpcrdma_ep_disconnect, which waits
452          *   for another conn upcall, which will deadlock.
453          * - rdma_disconnect is unneeded, the underlying
454          *   connection is already gone.
455          */
456         if (ia->ri_id->qp) {
457                 ib_drain_qp(ia->ri_id->qp);
458                 rdma_destroy_qp(ia->ri_id);
459                 ia->ri_id->qp = NULL;
460         }
461         ib_free_cq(ep->rep_attr.recv_cq);
462         ep->rep_attr.recv_cq = NULL;
463         ib_free_cq(ep->rep_attr.send_cq);
464         ep->rep_attr.send_cq = NULL;
465
466         /* The ULP is responsible for ensuring all DMA
467          * mappings and MRs are gone.
468          */
469         list_for_each_entry(rep, &buf->rb_recv_bufs, rr_list)
470                 rpcrdma_dma_unmap_regbuf(rep->rr_rdmabuf);
471         list_for_each_entry(req, &buf->rb_allreqs, rl_all) {
472                 rpcrdma_dma_unmap_regbuf(req->rl_rdmabuf);
473                 rpcrdma_dma_unmap_regbuf(req->rl_sendbuf);
474                 rpcrdma_dma_unmap_regbuf(req->rl_recvbuf);
475         }
476         rpcrdma_mrs_destroy(buf);
477         ib_dealloc_pd(ia->ri_pd);
478         ia->ri_pd = NULL;
479
480         /* Allow waiters to continue */
481         complete(&ia->ri_remove_done);
482
483         trace_xprtrdma_remove(r_xprt);
484 }
485
486 /**
487  * rpcrdma_ia_close - Clean up/close an IA.
488  * @ia: interface adapter to close
489  *
490  */
491 void
492 rpcrdma_ia_close(struct rpcrdma_ia *ia)
493 {
494         if (ia->ri_id != NULL && !IS_ERR(ia->ri_id)) {
495                 if (ia->ri_id->qp)
496                         rdma_destroy_qp(ia->ri_id);
497                 rdma_destroy_id(ia->ri_id);
498         }
499         ia->ri_id = NULL;
500         ia->ri_device = NULL;
501
502         /* If the pd is still busy, xprtrdma missed freeing a resource */
503         if (ia->ri_pd && !IS_ERR(ia->ri_pd))
504                 ib_dealloc_pd(ia->ri_pd);
505         ia->ri_pd = NULL;
506 }
507
508 /*
509  * Create unconnected endpoint.
510  */
511 int
512 rpcrdma_ep_create(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia,
513                   struct rpcrdma_create_data_internal *cdata)
514 {
515         struct rpcrdma_connect_private *pmsg = &ep->rep_cm_private;
516         struct ib_cq *sendcq, *recvcq;
517         unsigned int max_sge;
518         int rc;
519
520         max_sge = min_t(unsigned int, ia->ri_device->attrs.max_send_sge,
521                         RPCRDMA_MAX_SEND_SGES);
522         if (max_sge < RPCRDMA_MIN_SEND_SGES) {
523                 pr_warn("rpcrdma: HCA provides only %d send SGEs\n", max_sge);
524                 return -ENOMEM;
525         }
526         ia->ri_max_send_sges = max_sge;
527
528         rc = ia->ri_ops->ro_open(ia, ep, cdata);
529         if (rc)
530                 return rc;
531
532         ep->rep_attr.event_handler = rpcrdma_qp_async_error_upcall;
533         ep->rep_attr.qp_context = ep;
534         ep->rep_attr.srq = NULL;
535         ep->rep_attr.cap.max_send_sge = max_sge;
536         ep->rep_attr.cap.max_recv_sge = 1;
537         ep->rep_attr.cap.max_inline_data = 0;
538         ep->rep_attr.sq_sig_type = IB_SIGNAL_REQ_WR;
539         ep->rep_attr.qp_type = IB_QPT_RC;
540         ep->rep_attr.port_num = ~0;
541
542         dprintk("RPC:       %s: requested max: dtos: send %d recv %d; "
543                 "iovs: send %d recv %d\n",
544                 __func__,
545                 ep->rep_attr.cap.max_send_wr,
546                 ep->rep_attr.cap.max_recv_wr,
547                 ep->rep_attr.cap.max_send_sge,
548                 ep->rep_attr.cap.max_recv_sge);
549
550         /* set trigger for requesting send completion */
551         ep->rep_send_batch = min_t(unsigned int, RPCRDMA_MAX_SEND_BATCH,
552                                    cdata->max_requests >> 2);
553         ep->rep_send_count = ep->rep_send_batch;
554         init_waitqueue_head(&ep->rep_connect_wait);
555         INIT_DELAYED_WORK(&ep->rep_connect_worker, rpcrdma_connect_worker);
556
557         sendcq = ib_alloc_cq(ia->ri_device, NULL,
558                              ep->rep_attr.cap.max_send_wr + 1,
559                              1, IB_POLL_WORKQUEUE);
560         if (IS_ERR(sendcq)) {
561                 rc = PTR_ERR(sendcq);
562                 dprintk("RPC:       %s: failed to create send CQ: %i\n",
563                         __func__, rc);
564                 goto out1;
565         }
566
567         recvcq = ib_alloc_cq(ia->ri_device, NULL,
568                              ep->rep_attr.cap.max_recv_wr + 1,
569                              0, IB_POLL_WORKQUEUE);
570         if (IS_ERR(recvcq)) {
571                 rc = PTR_ERR(recvcq);
572                 dprintk("RPC:       %s: failed to create recv CQ: %i\n",
573                         __func__, rc);
574                 goto out2;
575         }
576
577         ep->rep_attr.send_cq = sendcq;
578         ep->rep_attr.recv_cq = recvcq;
579
580         /* Initialize cma parameters */
581         memset(&ep->rep_remote_cma, 0, sizeof(ep->rep_remote_cma));
582
583         /* Prepare RDMA-CM private message */
584         pmsg->cp_magic = rpcrdma_cmp_magic;
585         pmsg->cp_version = RPCRDMA_CMP_VERSION;
586         pmsg->cp_flags |= ia->ri_ops->ro_send_w_inv_ok;
587         pmsg->cp_send_size = rpcrdma_encode_buffer_size(cdata->inline_wsize);
588         pmsg->cp_recv_size = rpcrdma_encode_buffer_size(cdata->inline_rsize);
589         ep->rep_remote_cma.private_data = pmsg;
590         ep->rep_remote_cma.private_data_len = sizeof(*pmsg);
591
592         /* Client offers RDMA Read but does not initiate */
593         ep->rep_remote_cma.initiator_depth = 0;
594         ep->rep_remote_cma.responder_resources =
595                 min_t(int, U8_MAX, ia->ri_device->attrs.max_qp_rd_atom);
596
597         /* Limit transport retries so client can detect server
598          * GID changes quickly. RPC layer handles re-establishing
599          * transport connection and retransmission.
600          */
601         ep->rep_remote_cma.retry_count = 6;
602
603         /* RPC-over-RDMA handles its own flow control. In addition,
604          * make all RNR NAKs visible so we know that RPC-over-RDMA
605          * flow control is working correctly (no NAKs should be seen).
606          */
607         ep->rep_remote_cma.flow_control = 0;
608         ep->rep_remote_cma.rnr_retry_count = 0;
609
610         return 0;
611
612 out2:
613         ib_free_cq(sendcq);
614 out1:
615         return rc;
616 }
617
618 /*
619  * rpcrdma_ep_destroy
620  *
621  * Disconnect and destroy endpoint. After this, the only
622  * valid operations on the ep are to free it (if dynamically
623  * allocated) or re-create it.
624  */
625 void
626 rpcrdma_ep_destroy(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
627 {
628         cancel_delayed_work_sync(&ep->rep_connect_worker);
629
630         if (ia->ri_id && ia->ri_id->qp) {
631                 rpcrdma_ep_disconnect(ep, ia);
632                 rdma_destroy_qp(ia->ri_id);
633                 ia->ri_id->qp = NULL;
634         }
635
636         if (ep->rep_attr.recv_cq)
637                 ib_free_cq(ep->rep_attr.recv_cq);
638         if (ep->rep_attr.send_cq)
639                 ib_free_cq(ep->rep_attr.send_cq);
640 }
641
642 /* Re-establish a connection after a device removal event.
643  * Unlike a normal reconnection, a fresh PD and a new set
644  * of MRs and buffers is needed.
645  */
646 static int
647 rpcrdma_ep_recreate_xprt(struct rpcrdma_xprt *r_xprt,
648                          struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
649 {
650         int rc, err;
651
652         trace_xprtrdma_reinsert(r_xprt);
653
654         rc = -EHOSTUNREACH;
655         if (rpcrdma_ia_open(r_xprt))
656                 goto out1;
657
658         rc = -ENOMEM;
659         err = rpcrdma_ep_create(ep, ia, &r_xprt->rx_data);
660         if (err) {
661                 pr_err("rpcrdma: rpcrdma_ep_create returned %d\n", err);
662                 goto out2;
663         }
664
665         rc = -ENETUNREACH;
666         err = rdma_create_qp(ia->ri_id, ia->ri_pd, &ep->rep_attr);
667         if (err) {
668                 pr_err("rpcrdma: rdma_create_qp returned %d\n", err);
669                 goto out3;
670         }
671
672         rpcrdma_mrs_create(r_xprt);
673         return 0;
674
675 out3:
676         rpcrdma_ep_destroy(ep, ia);
677 out2:
678         rpcrdma_ia_close(ia);
679 out1:
680         return rc;
681 }
682
683 static int
684 rpcrdma_ep_reconnect(struct rpcrdma_xprt *r_xprt, struct rpcrdma_ep *ep,
685                      struct rpcrdma_ia *ia)
686 {
687         struct rdma_cm_id *id, *old;
688         int err, rc;
689
690         trace_xprtrdma_reconnect(r_xprt);
691
692         rpcrdma_ep_disconnect(ep, ia);
693
694         rc = -EHOSTUNREACH;
695         id = rpcrdma_create_id(r_xprt, ia);
696         if (IS_ERR(id))
697                 goto out;
698
699         /* As long as the new ID points to the same device as the
700          * old ID, we can reuse the transport's existing PD and all
701          * previously allocated MRs. Also, the same device means
702          * the transport's previous DMA mappings are still valid.
703          *
704          * This is a sanity check only. There should be no way these
705          * point to two different devices here.
706          */
707         old = id;
708         rc = -ENETUNREACH;
709         if (ia->ri_device != id->device) {
710                 pr_err("rpcrdma: can't reconnect on different device!\n");
711                 goto out_destroy;
712         }
713
714         err = rdma_create_qp(id, ia->ri_pd, &ep->rep_attr);
715         if (err) {
716                 dprintk("RPC:       %s: rdma_create_qp returned %d\n",
717                         __func__, err);
718                 goto out_destroy;
719         }
720
721         /* Atomically replace the transport's ID and QP. */
722         rc = 0;
723         old = ia->ri_id;
724         ia->ri_id = id;
725         rdma_destroy_qp(old);
726
727 out_destroy:
728         rdma_destroy_id(old);
729 out:
730         return rc;
731 }
732
733 /*
734  * Connect unconnected endpoint.
735  */
736 int
737 rpcrdma_ep_connect(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
738 {
739         struct rpcrdma_xprt *r_xprt = container_of(ia, struct rpcrdma_xprt,
740                                                    rx_ia);
741         int rc;
742
743 retry:
744         switch (ep->rep_connected) {
745         case 0:
746                 dprintk("RPC:       %s: connecting...\n", __func__);
747                 rc = rdma_create_qp(ia->ri_id, ia->ri_pd, &ep->rep_attr);
748                 if (rc) {
749                         dprintk("RPC:       %s: rdma_create_qp failed %i\n",
750                                 __func__, rc);
751                         rc = -ENETUNREACH;
752                         goto out_noupdate;
753                 }
754                 break;
755         case -ENODEV:
756                 rc = rpcrdma_ep_recreate_xprt(r_xprt, ep, ia);
757                 if (rc)
758                         goto out_noupdate;
759                 break;
760         default:
761                 rc = rpcrdma_ep_reconnect(r_xprt, ep, ia);
762                 if (rc)
763                         goto out;
764         }
765
766         ep->rep_connected = 0;
767         rpcrdma_post_recvs(r_xprt, true);
768
769         rc = rdma_connect(ia->ri_id, &ep->rep_remote_cma);
770         if (rc) {
771                 dprintk("RPC:       %s: rdma_connect() failed with %i\n",
772                                 __func__, rc);
773                 goto out;
774         }
775
776         wait_event_interruptible(ep->rep_connect_wait, ep->rep_connected != 0);
777         if (ep->rep_connected <= 0) {
778                 if (ep->rep_connected == -EAGAIN)
779                         goto retry;
780                 rc = ep->rep_connected;
781                 goto out;
782         }
783
784         dprintk("RPC:       %s: connected\n", __func__);
785
786 out:
787         if (rc)
788                 ep->rep_connected = rc;
789
790 out_noupdate:
791         return rc;
792 }
793
794 /*
795  * rpcrdma_ep_disconnect
796  *
797  * This is separate from destroy to facilitate the ability
798  * to reconnect without recreating the endpoint.
799  *
800  * This call is not reentrant, and must not be made in parallel
801  * on the same endpoint.
802  */
803 void
804 rpcrdma_ep_disconnect(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
805 {
806         int rc;
807
808         rc = rdma_disconnect(ia->ri_id);
809         if (!rc)
810                 /* returns without wait if not connected */
811                 wait_event_interruptible(ep->rep_connect_wait,
812                                                         ep->rep_connected != 1);
813         else
814                 ep->rep_connected = rc;
815         trace_xprtrdma_disconnect(container_of(ep, struct rpcrdma_xprt,
816                                                rx_ep), rc);
817
818         ib_drain_qp(ia->ri_id->qp);
819 }
820
821 /* Fixed-size circular FIFO queue. This implementation is wait-free and
822  * lock-free.
823  *
824  * Consumer is the code path that posts Sends. This path dequeues a
825  * sendctx for use by a Send operation. Multiple consumer threads
826  * are serialized by the RPC transport lock, which allows only one
827  * ->send_request call at a time.
828  *
829  * Producer is the code path that handles Send completions. This path
830  * enqueues a sendctx that has been completed. Multiple producer
831  * threads are serialized by the ib_poll_cq() function.
832  */
833
834 /* rpcrdma_sendctxs_destroy() assumes caller has already quiesced
835  * queue activity, and ib_drain_qp has flushed all remaining Send
836  * requests.
837  */
838 static void rpcrdma_sendctxs_destroy(struct rpcrdma_buffer *buf)
839 {
840         unsigned long i;
841
842         for (i = 0; i <= buf->rb_sc_last; i++)
843                 kfree(buf->rb_sc_ctxs[i]);
844         kfree(buf->rb_sc_ctxs);
845 }
846
847 static struct rpcrdma_sendctx *rpcrdma_sendctx_create(struct rpcrdma_ia *ia)
848 {
849         struct rpcrdma_sendctx *sc;
850
851         sc = kzalloc(sizeof(*sc) +
852                      ia->ri_max_send_sges * sizeof(struct ib_sge),
853                      GFP_KERNEL);
854         if (!sc)
855                 return NULL;
856
857         sc->sc_wr.wr_cqe = &sc->sc_cqe;
858         sc->sc_wr.sg_list = sc->sc_sges;
859         sc->sc_wr.opcode = IB_WR_SEND;
860         sc->sc_cqe.done = rpcrdma_wc_send;
861         return sc;
862 }
863
864 static int rpcrdma_sendctxs_create(struct rpcrdma_xprt *r_xprt)
865 {
866         struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
867         struct rpcrdma_sendctx *sc;
868         unsigned long i;
869
870         /* Maximum number of concurrent outstanding Send WRs. Capping
871          * the circular queue size stops Send Queue overflow by causing
872          * the ->send_request call to fail temporarily before too many
873          * Sends are posted.
874          */
875         i = buf->rb_max_requests + RPCRDMA_MAX_BC_REQUESTS;
876         dprintk("RPC:       %s: allocating %lu send_ctxs\n", __func__, i);
877         buf->rb_sc_ctxs = kcalloc(i, sizeof(sc), GFP_KERNEL);
878         if (!buf->rb_sc_ctxs)
879                 return -ENOMEM;
880
881         buf->rb_sc_last = i - 1;
882         for (i = 0; i <= buf->rb_sc_last; i++) {
883                 sc = rpcrdma_sendctx_create(&r_xprt->rx_ia);
884                 if (!sc)
885                         goto out_destroy;
886
887                 sc->sc_xprt = r_xprt;
888                 buf->rb_sc_ctxs[i] = sc;
889         }
890         buf->rb_flags = 0;
891
892         return 0;
893
894 out_destroy:
895         rpcrdma_sendctxs_destroy(buf);
896         return -ENOMEM;
897 }
898
899 /* The sendctx queue is not guaranteed to have a size that is a
900  * power of two, thus the helpers in circ_buf.h cannot be used.
901  * The other option is to use modulus (%), which can be expensive.
902  */
903 static unsigned long rpcrdma_sendctx_next(struct rpcrdma_buffer *buf,
904                                           unsigned long item)
905 {
906         return likely(item < buf->rb_sc_last) ? item + 1 : 0;
907 }
908
909 /**
910  * rpcrdma_sendctx_get_locked - Acquire a send context
911  * @buf: transport buffers from which to acquire an unused context
912  *
913  * Returns pointer to a free send completion context; or NULL if
914  * the queue is empty.
915  *
916  * Usage: Called to acquire an SGE array before preparing a Send WR.
917  *
918  * The caller serializes calls to this function (per rpcrdma_buffer),
919  * and provides an effective memory barrier that flushes the new value
920  * of rb_sc_head.
921  */
922 struct rpcrdma_sendctx *rpcrdma_sendctx_get_locked(struct rpcrdma_buffer *buf)
923 {
924         struct rpcrdma_xprt *r_xprt;
925         struct rpcrdma_sendctx *sc;
926         unsigned long next_head;
927
928         next_head = rpcrdma_sendctx_next(buf, buf->rb_sc_head);
929
930         if (next_head == READ_ONCE(buf->rb_sc_tail))
931                 goto out_emptyq;
932
933         /* ORDER: item must be accessed _before_ head is updated */
934         sc = buf->rb_sc_ctxs[next_head];
935
936         /* Releasing the lock in the caller acts as a memory
937          * barrier that flushes rb_sc_head.
938          */
939         buf->rb_sc_head = next_head;
940
941         return sc;
942
943 out_emptyq:
944         /* The queue is "empty" if there have not been enough Send
945          * completions recently. This is a sign the Send Queue is
946          * backing up. Cause the caller to pause and try again.
947          */
948         set_bit(RPCRDMA_BUF_F_EMPTY_SCQ, &buf->rb_flags);
949         r_xprt = container_of(buf, struct rpcrdma_xprt, rx_buf);
950         r_xprt->rx_stats.empty_sendctx_q++;
951         return NULL;
952 }
953
954 /**
955  * rpcrdma_sendctx_put_locked - Release a send context
956  * @sc: send context to release
957  *
958  * Usage: Called from Send completion to return a sendctxt
959  * to the queue.
960  *
961  * The caller serializes calls to this function (per rpcrdma_buffer).
962  */
963 static void
964 rpcrdma_sendctx_put_locked(struct rpcrdma_sendctx *sc)
965 {
966         struct rpcrdma_buffer *buf = &sc->sc_xprt->rx_buf;
967         unsigned long next_tail;
968
969         /* Unmap SGEs of previously completed by unsignaled
970          * Sends by walking up the queue until @sc is found.
971          */
972         next_tail = buf->rb_sc_tail;
973         do {
974                 next_tail = rpcrdma_sendctx_next(buf, next_tail);
975
976                 /* ORDER: item must be accessed _before_ tail is updated */
977                 rpcrdma_unmap_sendctx(buf->rb_sc_ctxs[next_tail]);
978
979         } while (buf->rb_sc_ctxs[next_tail] != sc);
980
981         /* Paired with READ_ONCE */
982         smp_store_release(&buf->rb_sc_tail, next_tail);
983
984         if (test_and_clear_bit(RPCRDMA_BUF_F_EMPTY_SCQ, &buf->rb_flags)) {
985                 smp_mb__after_atomic();
986                 xprt_write_space(&sc->sc_xprt->rx_xprt);
987         }
988 }
989
990 static void
991 rpcrdma_mrs_create(struct rpcrdma_xprt *r_xprt)
992 {
993         struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
994         struct rpcrdma_ia *ia = &r_xprt->rx_ia;
995         unsigned int count;
996         LIST_HEAD(free);
997         LIST_HEAD(all);
998
999         for (count = 0; count < ia->ri_max_segs; count++) {
1000                 struct rpcrdma_mr *mr;
1001                 int rc;
1002
1003                 mr = kzalloc(sizeof(*mr), GFP_KERNEL);
1004                 if (!mr)
1005                         break;
1006
1007                 rc = ia->ri_ops->ro_init_mr(ia, mr);
1008                 if (rc) {
1009                         kfree(mr);
1010                         break;
1011                 }
1012
1013                 mr->mr_xprt = r_xprt;
1014
1015                 list_add(&mr->mr_list, &free);
1016                 list_add(&mr->mr_all, &all);
1017         }
1018
1019         spin_lock(&buf->rb_mrlock);
1020         list_splice(&free, &buf->rb_mrs);
1021         list_splice(&all, &buf->rb_all);
1022         r_xprt->rx_stats.mrs_allocated += count;
1023         spin_unlock(&buf->rb_mrlock);
1024         trace_xprtrdma_createmrs(r_xprt, count);
1025
1026         xprt_write_space(&r_xprt->rx_xprt);
1027 }
1028
1029 static void
1030 rpcrdma_mr_refresh_worker(struct work_struct *work)
1031 {
1032         struct rpcrdma_buffer *buf = container_of(work, struct rpcrdma_buffer,
1033                                                   rb_refresh_worker.work);
1034         struct rpcrdma_xprt *r_xprt = container_of(buf, struct rpcrdma_xprt,
1035                                                    rx_buf);
1036
1037         rpcrdma_mrs_create(r_xprt);
1038 }
1039
1040 struct rpcrdma_req *
1041 rpcrdma_create_req(struct rpcrdma_xprt *r_xprt)
1042 {
1043         struct rpcrdma_buffer *buffer = &r_xprt->rx_buf;
1044         struct rpcrdma_regbuf *rb;
1045         struct rpcrdma_req *req;
1046
1047         req = kzalloc(sizeof(*req), GFP_KERNEL);
1048         if (req == NULL)
1049                 return ERR_PTR(-ENOMEM);
1050
1051         rb = rpcrdma_alloc_regbuf(RPCRDMA_HDRBUF_SIZE,
1052                                   DMA_TO_DEVICE, GFP_KERNEL);
1053         if (IS_ERR(rb)) {
1054                 kfree(req);
1055                 return ERR_PTR(-ENOMEM);
1056         }
1057         req->rl_rdmabuf = rb;
1058         xdr_buf_init(&req->rl_hdrbuf, rb->rg_base, rdmab_length(rb));
1059         req->rl_buffer = buffer;
1060         INIT_LIST_HEAD(&req->rl_registered);
1061
1062         spin_lock(&buffer->rb_reqslock);
1063         list_add(&req->rl_all, &buffer->rb_allreqs);
1064         spin_unlock(&buffer->rb_reqslock);
1065         return req;
1066 }
1067
1068 static int
1069 rpcrdma_create_rep(struct rpcrdma_xprt *r_xprt, bool temp)
1070 {
1071         struct rpcrdma_create_data_internal *cdata = &r_xprt->rx_data;
1072         struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
1073         struct rpcrdma_rep *rep;
1074         int rc;
1075
1076         rc = -ENOMEM;
1077         rep = kzalloc(sizeof(*rep), GFP_KERNEL);
1078         if (rep == NULL)
1079                 goto out;
1080
1081         rep->rr_rdmabuf = rpcrdma_alloc_regbuf(cdata->inline_rsize,
1082                                                DMA_FROM_DEVICE, GFP_KERNEL);
1083         if (IS_ERR(rep->rr_rdmabuf)) {
1084                 rc = PTR_ERR(rep->rr_rdmabuf);
1085                 goto out_free;
1086         }
1087         xdr_buf_init(&rep->rr_hdrbuf, rep->rr_rdmabuf->rg_base,
1088                      rdmab_length(rep->rr_rdmabuf));
1089
1090         rep->rr_cqe.done = rpcrdma_wc_receive;
1091         rep->rr_rxprt = r_xprt;
1092         INIT_WORK(&rep->rr_work, rpcrdma_deferred_completion);
1093         rep->rr_recv_wr.next = NULL;
1094         rep->rr_recv_wr.wr_cqe = &rep->rr_cqe;
1095         rep->rr_recv_wr.sg_list = &rep->rr_rdmabuf->rg_iov;
1096         rep->rr_recv_wr.num_sge = 1;
1097         rep->rr_temp = temp;
1098
1099         spin_lock(&buf->rb_lock);
1100         list_add(&rep->rr_list, &buf->rb_recv_bufs);
1101         spin_unlock(&buf->rb_lock);
1102         return 0;
1103
1104 out_free:
1105         kfree(rep);
1106 out:
1107         dprintk("RPC:       %s: reply buffer %d alloc failed\n",
1108                 __func__, rc);
1109         return rc;
1110 }
1111
1112 int
1113 rpcrdma_buffer_create(struct rpcrdma_xprt *r_xprt)
1114 {
1115         struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
1116         int i, rc;
1117
1118         buf->rb_max_requests = r_xprt->rx_data.max_requests;
1119         buf->rb_bc_srv_max_requests = 0;
1120         spin_lock_init(&buf->rb_mrlock);
1121         spin_lock_init(&buf->rb_lock);
1122         INIT_LIST_HEAD(&buf->rb_mrs);
1123         INIT_LIST_HEAD(&buf->rb_all);
1124         INIT_DELAYED_WORK(&buf->rb_refresh_worker,
1125                           rpcrdma_mr_refresh_worker);
1126
1127         rpcrdma_mrs_create(r_xprt);
1128
1129         INIT_LIST_HEAD(&buf->rb_send_bufs);
1130         INIT_LIST_HEAD(&buf->rb_allreqs);
1131         spin_lock_init(&buf->rb_reqslock);
1132         for (i = 0; i < buf->rb_max_requests; i++) {
1133                 struct rpcrdma_req *req;
1134
1135                 req = rpcrdma_create_req(r_xprt);
1136                 if (IS_ERR(req)) {
1137                         dprintk("RPC:       %s: request buffer %d alloc"
1138                                 " failed\n", __func__, i);
1139                         rc = PTR_ERR(req);
1140                         goto out;
1141                 }
1142                 list_add(&req->rl_list, &buf->rb_send_bufs);
1143         }
1144
1145         buf->rb_credits = 1;
1146         buf->rb_posted_receives = 0;
1147         INIT_LIST_HEAD(&buf->rb_recv_bufs);
1148
1149         rc = rpcrdma_sendctxs_create(r_xprt);
1150         if (rc)
1151                 goto out;
1152
1153         return 0;
1154 out:
1155         rpcrdma_buffer_destroy(buf);
1156         return rc;
1157 }
1158
1159 static void
1160 rpcrdma_destroy_rep(struct rpcrdma_rep *rep)
1161 {
1162         rpcrdma_free_regbuf(rep->rr_rdmabuf);
1163         kfree(rep);
1164 }
1165
1166 void
1167 rpcrdma_destroy_req(struct rpcrdma_req *req)
1168 {
1169         rpcrdma_free_regbuf(req->rl_recvbuf);
1170         rpcrdma_free_regbuf(req->rl_sendbuf);
1171         rpcrdma_free_regbuf(req->rl_rdmabuf);
1172         kfree(req);
1173 }
1174
1175 static void
1176 rpcrdma_mrs_destroy(struct rpcrdma_buffer *buf)
1177 {
1178         struct rpcrdma_xprt *r_xprt = container_of(buf, struct rpcrdma_xprt,
1179                                                    rx_buf);
1180         struct rpcrdma_ia *ia = rdmab_to_ia(buf);
1181         struct rpcrdma_mr *mr;
1182         unsigned int count;
1183
1184         count = 0;
1185         spin_lock(&buf->rb_mrlock);
1186         while (!list_empty(&buf->rb_all)) {
1187                 mr = list_entry(buf->rb_all.next, struct rpcrdma_mr, mr_all);
1188                 list_del(&mr->mr_all);
1189
1190                 spin_unlock(&buf->rb_mrlock);
1191
1192                 /* Ensure MW is not on any rl_registered list */
1193                 if (!list_empty(&mr->mr_list))
1194                         list_del(&mr->mr_list);
1195
1196                 ia->ri_ops->ro_release_mr(mr);
1197                 count++;
1198                 spin_lock(&buf->rb_mrlock);
1199         }
1200         spin_unlock(&buf->rb_mrlock);
1201         r_xprt->rx_stats.mrs_allocated = 0;
1202
1203         dprintk("RPC:       %s: released %u MRs\n", __func__, count);
1204 }
1205
1206 void
1207 rpcrdma_buffer_destroy(struct rpcrdma_buffer *buf)
1208 {
1209         cancel_delayed_work_sync(&buf->rb_refresh_worker);
1210
1211         rpcrdma_sendctxs_destroy(buf);
1212
1213         while (!list_empty(&buf->rb_recv_bufs)) {
1214                 struct rpcrdma_rep *rep;
1215
1216                 rep = list_first_entry(&buf->rb_recv_bufs,
1217                                        struct rpcrdma_rep, rr_list);
1218                 list_del(&rep->rr_list);
1219                 rpcrdma_destroy_rep(rep);
1220         }
1221
1222         spin_lock(&buf->rb_reqslock);
1223         while (!list_empty(&buf->rb_allreqs)) {
1224                 struct rpcrdma_req *req;
1225
1226                 req = list_first_entry(&buf->rb_allreqs,
1227                                        struct rpcrdma_req, rl_all);
1228                 list_del(&req->rl_all);
1229
1230                 spin_unlock(&buf->rb_reqslock);
1231                 rpcrdma_destroy_req(req);
1232                 spin_lock(&buf->rb_reqslock);
1233         }
1234         spin_unlock(&buf->rb_reqslock);
1235
1236         rpcrdma_mrs_destroy(buf);
1237 }
1238
1239 /**
1240  * rpcrdma_mr_get - Allocate an rpcrdma_mr object
1241  * @r_xprt: controlling transport
1242  *
1243  * Returns an initialized rpcrdma_mr or NULL if no free
1244  * rpcrdma_mr objects are available.
1245  */
1246 struct rpcrdma_mr *
1247 rpcrdma_mr_get(struct rpcrdma_xprt *r_xprt)
1248 {
1249         struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
1250         struct rpcrdma_mr *mr = NULL;
1251
1252         spin_lock(&buf->rb_mrlock);
1253         if (!list_empty(&buf->rb_mrs))
1254                 mr = rpcrdma_mr_pop(&buf->rb_mrs);
1255         spin_unlock(&buf->rb_mrlock);
1256
1257         if (!mr)
1258                 goto out_nomrs;
1259         return mr;
1260
1261 out_nomrs:
1262         trace_xprtrdma_nomrs(r_xprt);
1263         if (r_xprt->rx_ep.rep_connected != -ENODEV)
1264                 schedule_delayed_work(&buf->rb_refresh_worker, 0);
1265
1266         /* Allow the reply handler and refresh worker to run */
1267         cond_resched();
1268
1269         return NULL;
1270 }
1271
1272 static void
1273 __rpcrdma_mr_put(struct rpcrdma_buffer *buf, struct rpcrdma_mr *mr)
1274 {
1275         spin_lock(&buf->rb_mrlock);
1276         rpcrdma_mr_push(mr, &buf->rb_mrs);
1277         spin_unlock(&buf->rb_mrlock);
1278 }
1279
1280 /**
1281  * rpcrdma_mr_put - Release an rpcrdma_mr object
1282  * @mr: object to release
1283  *
1284  */
1285 void
1286 rpcrdma_mr_put(struct rpcrdma_mr *mr)
1287 {
1288         __rpcrdma_mr_put(&mr->mr_xprt->rx_buf, mr);
1289 }
1290
1291 /**
1292  * rpcrdma_mr_unmap_and_put - DMA unmap an MR and release it
1293  * @mr: object to release
1294  *
1295  */
1296 void
1297 rpcrdma_mr_unmap_and_put(struct rpcrdma_mr *mr)
1298 {
1299         struct rpcrdma_xprt *r_xprt = mr->mr_xprt;
1300
1301         trace_xprtrdma_mr_unmap(mr);
1302         ib_dma_unmap_sg(r_xprt->rx_ia.ri_device,
1303                         mr->mr_sg, mr->mr_nents, mr->mr_dir);
1304         __rpcrdma_mr_put(&r_xprt->rx_buf, mr);
1305 }
1306
1307 /**
1308  * rpcrdma_buffer_get - Get a request buffer
1309  * @buffers: Buffer pool from which to obtain a buffer
1310  *
1311  * Returns a fresh rpcrdma_req, or NULL if none are available.
1312  */
1313 struct rpcrdma_req *
1314 rpcrdma_buffer_get(struct rpcrdma_buffer *buffers)
1315 {
1316         struct rpcrdma_req *req;
1317
1318         spin_lock(&buffers->rb_lock);
1319         req = list_first_entry_or_null(&buffers->rb_send_bufs,
1320                                        struct rpcrdma_req, rl_list);
1321         if (req)
1322                 list_del_init(&req->rl_list);
1323         spin_unlock(&buffers->rb_lock);
1324         return req;
1325 }
1326
1327 /**
1328  * rpcrdma_buffer_put - Put request/reply buffers back into pool
1329  * @req: object to return
1330  *
1331  */
1332 void
1333 rpcrdma_buffer_put(struct rpcrdma_req *req)
1334 {
1335         struct rpcrdma_buffer *buffers = req->rl_buffer;
1336         struct rpcrdma_rep *rep = req->rl_reply;
1337
1338         req->rl_reply = NULL;
1339
1340         spin_lock(&buffers->rb_lock);
1341         list_add(&req->rl_list, &buffers->rb_send_bufs);
1342         if (rep) {
1343                 if (!rep->rr_temp) {
1344                         list_add(&rep->rr_list, &buffers->rb_recv_bufs);
1345                         rep = NULL;
1346                 }
1347         }
1348         spin_unlock(&buffers->rb_lock);
1349         if (rep)
1350                 rpcrdma_destroy_rep(rep);
1351 }
1352
1353 /*
1354  * Put reply buffers back into pool when not attached to
1355  * request. This happens in error conditions.
1356  */
1357 void
1358 rpcrdma_recv_buffer_put(struct rpcrdma_rep *rep)
1359 {
1360         struct rpcrdma_buffer *buffers = &rep->rr_rxprt->rx_buf;
1361
1362         if (!rep->rr_temp) {
1363                 spin_lock(&buffers->rb_lock);
1364                 list_add(&rep->rr_list, &buffers->rb_recv_bufs);
1365                 spin_unlock(&buffers->rb_lock);
1366         } else {
1367                 rpcrdma_destroy_rep(rep);
1368         }
1369 }
1370
1371 /**
1372  * rpcrdma_alloc_regbuf - allocate and DMA-map memory for SEND/RECV buffers
1373  * @size: size of buffer to be allocated, in bytes
1374  * @direction: direction of data movement
1375  * @flags: GFP flags
1376  *
1377  * Returns an ERR_PTR, or a pointer to a regbuf, a buffer that
1378  * can be persistently DMA-mapped for I/O.
1379  *
1380  * xprtrdma uses a regbuf for posting an outgoing RDMA SEND, or for
1381  * receiving the payload of RDMA RECV operations. During Long Calls
1382  * or Replies they may be registered externally via ro_map.
1383  */
1384 struct rpcrdma_regbuf *
1385 rpcrdma_alloc_regbuf(size_t size, enum dma_data_direction direction,
1386                      gfp_t flags)
1387 {
1388         struct rpcrdma_regbuf *rb;
1389
1390         rb = kmalloc(sizeof(*rb) + size, flags);
1391         if (rb == NULL)
1392                 return ERR_PTR(-ENOMEM);
1393
1394         rb->rg_device = NULL;
1395         rb->rg_direction = direction;
1396         rb->rg_iov.length = size;
1397
1398         return rb;
1399 }
1400
1401 /**
1402  * __rpcrdma_map_regbuf - DMA-map a regbuf
1403  * @ia: controlling rpcrdma_ia
1404  * @rb: regbuf to be mapped
1405  */
1406 bool
1407 __rpcrdma_dma_map_regbuf(struct rpcrdma_ia *ia, struct rpcrdma_regbuf *rb)
1408 {
1409         struct ib_device *device = ia->ri_device;
1410
1411         if (rb->rg_direction == DMA_NONE)
1412                 return false;
1413
1414         rb->rg_iov.addr = ib_dma_map_single(device,
1415                                             (void *)rb->rg_base,
1416                                             rdmab_length(rb),
1417                                             rb->rg_direction);
1418         if (ib_dma_mapping_error(device, rdmab_addr(rb)))
1419                 return false;
1420
1421         rb->rg_device = device;
1422         rb->rg_iov.lkey = ia->ri_pd->local_dma_lkey;
1423         return true;
1424 }
1425
1426 static void
1427 rpcrdma_dma_unmap_regbuf(struct rpcrdma_regbuf *rb)
1428 {
1429         if (!rb)
1430                 return;
1431
1432         if (!rpcrdma_regbuf_is_mapped(rb))
1433                 return;
1434
1435         ib_dma_unmap_single(rb->rg_device, rdmab_addr(rb),
1436                             rdmab_length(rb), rb->rg_direction);
1437         rb->rg_device = NULL;
1438 }
1439
1440 /**
1441  * rpcrdma_free_regbuf - deregister and free registered buffer
1442  * @rb: regbuf to be deregistered and freed
1443  */
1444 void
1445 rpcrdma_free_regbuf(struct rpcrdma_regbuf *rb)
1446 {
1447         rpcrdma_dma_unmap_regbuf(rb);
1448         kfree(rb);
1449 }
1450
1451 /*
1452  * Prepost any receive buffer, then post send.
1453  *
1454  * Receive buffer is donated to hardware, reclaimed upon recv completion.
1455  */
1456 int
1457 rpcrdma_ep_post(struct rpcrdma_ia *ia,
1458                 struct rpcrdma_ep *ep,
1459                 struct rpcrdma_req *req)
1460 {
1461         struct ib_send_wr *send_wr = &req->rl_sendctx->sc_wr;
1462         int rc;
1463
1464         if (!ep->rep_send_count ||
1465             test_bit(RPCRDMA_REQ_F_TX_RESOURCES, &req->rl_flags)) {
1466                 send_wr->send_flags |= IB_SEND_SIGNALED;
1467                 ep->rep_send_count = ep->rep_send_batch;
1468         } else {
1469                 send_wr->send_flags &= ~IB_SEND_SIGNALED;
1470                 --ep->rep_send_count;
1471         }
1472
1473         rc = ia->ri_ops->ro_send(ia, req);
1474         trace_xprtrdma_post_send(req, rc);
1475         if (rc)
1476                 return -ENOTCONN;
1477         return 0;
1478 }
1479
1480 /**
1481  * rpcrdma_post_recvs - Maybe post some Receive buffers
1482  * @r_xprt: controlling transport
1483  * @temp: when true, allocate temp rpcrdma_rep objects
1484  *
1485  */
1486 void
1487 rpcrdma_post_recvs(struct rpcrdma_xprt *r_xprt, bool temp)
1488 {
1489         struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
1490         struct ib_recv_wr *wr, *bad_wr;
1491         int needed, count, rc;
1492
1493         needed = buf->rb_credits + (buf->rb_bc_srv_max_requests << 1);
1494         if (buf->rb_posted_receives > needed)
1495                 return;
1496         needed -= buf->rb_posted_receives;
1497
1498         count = 0;
1499         wr = NULL;
1500         while (needed) {
1501                 struct rpcrdma_regbuf *rb;
1502                 struct rpcrdma_rep *rep;
1503
1504                 spin_lock(&buf->rb_lock);
1505                 rep = list_first_entry_or_null(&buf->rb_recv_bufs,
1506                                                struct rpcrdma_rep, rr_list);
1507                 if (likely(rep))
1508                         list_del(&rep->rr_list);
1509                 spin_unlock(&buf->rb_lock);
1510                 if (!rep) {
1511                         if (rpcrdma_create_rep(r_xprt, temp))
1512                                 break;
1513                         continue;
1514                 }
1515
1516                 rb = rep->rr_rdmabuf;
1517                 if (!rpcrdma_regbuf_is_mapped(rb)) {
1518                         if (!__rpcrdma_dma_map_regbuf(&r_xprt->rx_ia, rb)) {
1519                                 rpcrdma_recv_buffer_put(rep);
1520                                 break;
1521                         }
1522                 }
1523
1524                 trace_xprtrdma_post_recv(rep->rr_recv_wr.wr_cqe);
1525                 rep->rr_recv_wr.next = wr;
1526                 wr = &rep->rr_recv_wr;
1527                 ++count;
1528                 --needed;
1529         }
1530         if (!count)
1531                 return;
1532
1533         rc = ib_post_recv(r_xprt->rx_ia.ri_id->qp, wr,
1534                           (const struct ib_recv_wr **)&bad_wr);
1535         if (rc) {
1536                 for (wr = bad_wr; wr; wr = wr->next) {
1537                         struct rpcrdma_rep *rep;
1538
1539                         rep = container_of(wr, struct rpcrdma_rep, rr_recv_wr);
1540                         rpcrdma_recv_buffer_put(rep);
1541                         --count;
1542                 }
1543         }
1544         buf->rb_posted_receives += count;
1545         trace_xprtrdma_post_recvs(r_xprt, count, rc);
1546 }