]> asedeno.scripts.mit.edu Git - linux.git/blob - net/sunrpc/xprtrdma/verbs.c
b6aba0c859988762fc3a1805bff01ad97e17de79
[linux.git] / net / sunrpc / xprtrdma / verbs.c
1 // SPDX-License-Identifier: GPL-2.0 OR BSD-3-Clause
2 /*
3  * Copyright (c) 2014-2017 Oracle.  All rights reserved.
4  * Copyright (c) 2003-2007 Network Appliance, Inc. All rights reserved.
5  *
6  * This software is available to you under a choice of one of two
7  * licenses.  You may choose to be licensed under the terms of the GNU
8  * General Public License (GPL) Version 2, available from the file
9  * COPYING in the main directory of this source tree, or the BSD-type
10  * license below:
11  *
12  * Redistribution and use in source and binary forms, with or without
13  * modification, are permitted provided that the following conditions
14  * are met:
15  *
16  *      Redistributions of source code must retain the above copyright
17  *      notice, this list of conditions and the following disclaimer.
18  *
19  *      Redistributions in binary form must reproduce the above
20  *      copyright notice, this list of conditions and the following
21  *      disclaimer in the documentation and/or other materials provided
22  *      with the distribution.
23  *
24  *      Neither the name of the Network Appliance, Inc. nor the names of
25  *      its contributors may be used to endorse or promote products
26  *      derived from this software without specific prior written
27  *      permission.
28  *
29  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
30  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
31  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
32  * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
33  * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
34  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
35  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
36  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
37  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
38  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
39  * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
40  */
41
42 /*
43  * verbs.c
44  *
45  * Encapsulates the major functions managing:
46  *  o adapters
47  *  o endpoints
48  *  o connections
49  *  o buffer memory
50  */
51
52 #include <linux/interrupt.h>
53 #include <linux/slab.h>
54 #include <linux/sunrpc/addr.h>
55 #include <linux/sunrpc/svc_rdma.h>
56 #include <linux/log2.h>
57
58 #include <asm-generic/barrier.h>
59 #include <asm/bitops.h>
60
61 #include <rdma/ib_cm.h>
62
63 #include "xprt_rdma.h"
64 #include <trace/events/rpcrdma.h>
65
66 /*
67  * Globals/Macros
68  */
69
70 #if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
71 # define RPCDBG_FACILITY        RPCDBG_TRANS
72 #endif
73
74 /*
75  * internal functions
76  */
77 static int rpcrdma_sendctxs_create(struct rpcrdma_xprt *r_xprt);
78 static void rpcrdma_sendctxs_destroy(struct rpcrdma_xprt *r_xprt);
79 static void rpcrdma_sendctx_put_locked(struct rpcrdma_xprt *r_xprt,
80                                        struct rpcrdma_sendctx *sc);
81 static void rpcrdma_reqs_reset(struct rpcrdma_xprt *r_xprt);
82 static void rpcrdma_reps_unmap(struct rpcrdma_xprt *r_xprt);
83 static void rpcrdma_mrs_create(struct rpcrdma_xprt *r_xprt);
84 static void rpcrdma_mrs_destroy(struct rpcrdma_xprt *r_xprt);
85 static struct rpcrdma_regbuf *
86 rpcrdma_regbuf_alloc(size_t size, enum dma_data_direction direction,
87                      gfp_t flags);
88 static void rpcrdma_regbuf_dma_unmap(struct rpcrdma_regbuf *rb);
89 static void rpcrdma_regbuf_free(struct rpcrdma_regbuf *rb);
90
91 /* Wait for outstanding transport work to finish. ib_drain_qp
92  * handles the drains in the wrong order for us, so open code
93  * them here.
94  */
95 static void rpcrdma_xprt_drain(struct rpcrdma_xprt *r_xprt)
96 {
97         struct rpcrdma_ia *ia = &r_xprt->rx_ia;
98
99         /* Flush Receives, then wait for deferred Reply work
100          * to complete.
101          */
102         ib_drain_rq(ia->ri_id->qp);
103
104         /* Deferred Reply processing might have scheduled
105          * local invalidations.
106          */
107         ib_drain_sq(ia->ri_id->qp);
108 }
109
110 /**
111  * rpcrdma_qp_event_handler - Handle one QP event (error notification)
112  * @event: details of the event
113  * @context: ep that owns QP where event occurred
114  *
115  * Called from the RDMA provider (device driver) possibly in an interrupt
116  * context.
117  */
118 static void
119 rpcrdma_qp_event_handler(struct ib_event *event, void *context)
120 {
121         struct rpcrdma_ep *ep = context;
122         struct rpcrdma_xprt *r_xprt = container_of(ep, struct rpcrdma_xprt,
123                                                    rx_ep);
124
125         trace_xprtrdma_qp_event(r_xprt, event);
126 }
127
128 /**
129  * rpcrdma_wc_send - Invoked by RDMA provider for each polled Send WC
130  * @cq: completion queue
131  * @wc: completed WR
132  *
133  */
134 static void
135 rpcrdma_wc_send(struct ib_cq *cq, struct ib_wc *wc)
136 {
137         struct ib_cqe *cqe = wc->wr_cqe;
138         struct rpcrdma_sendctx *sc =
139                 container_of(cqe, struct rpcrdma_sendctx, sc_cqe);
140
141         /* WARNING: Only wr_cqe and status are reliable at this point */
142         trace_xprtrdma_wc_send(sc, wc);
143         rpcrdma_sendctx_put_locked((struct rpcrdma_xprt *)cq->cq_context, sc);
144 }
145
146 /**
147  * rpcrdma_wc_receive - Invoked by RDMA provider for each polled Receive WC
148  * @cq: completion queue (ignored)
149  * @wc: completed WR
150  *
151  */
152 static void
153 rpcrdma_wc_receive(struct ib_cq *cq, struct ib_wc *wc)
154 {
155         struct ib_cqe *cqe = wc->wr_cqe;
156         struct rpcrdma_rep *rep = container_of(cqe, struct rpcrdma_rep,
157                                                rr_cqe);
158         struct rpcrdma_xprt *r_xprt = rep->rr_rxprt;
159
160         /* WARNING: Only wr_cqe and status are reliable at this point */
161         trace_xprtrdma_wc_receive(wc);
162         --r_xprt->rx_ep.rep_receive_count;
163         if (wc->status != IB_WC_SUCCESS)
164                 goto out_flushed;
165
166         /* status == SUCCESS means all fields in wc are trustworthy */
167         rpcrdma_set_xdrlen(&rep->rr_hdrbuf, wc->byte_len);
168         rep->rr_wc_flags = wc->wc_flags;
169         rep->rr_inv_rkey = wc->ex.invalidate_rkey;
170
171         ib_dma_sync_single_for_cpu(rdmab_device(rep->rr_rdmabuf),
172                                    rdmab_addr(rep->rr_rdmabuf),
173                                    wc->byte_len, DMA_FROM_DEVICE);
174
175         rpcrdma_reply_handler(rep);
176         return;
177
178 out_flushed:
179         rpcrdma_recv_buffer_put(rep);
180 }
181
182 static void rpcrdma_update_cm_private(struct rpcrdma_xprt *r_xprt,
183                                       struct rdma_conn_param *param)
184 {
185         const struct rpcrdma_connect_private *pmsg = param->private_data;
186         struct rpcrdma_ep *ep = &r_xprt->rx_ep;
187         unsigned int rsize, wsize;
188
189         /* Default settings for RPC-over-RDMA Version One */
190         r_xprt->rx_ia.ri_implicit_roundup = xprt_rdma_pad_optimize;
191         rsize = RPCRDMA_V1_DEF_INLINE_SIZE;
192         wsize = RPCRDMA_V1_DEF_INLINE_SIZE;
193
194         if (pmsg &&
195             pmsg->cp_magic == rpcrdma_cmp_magic &&
196             pmsg->cp_version == RPCRDMA_CMP_VERSION) {
197                 r_xprt->rx_ia.ri_implicit_roundup = true;
198                 rsize = rpcrdma_decode_buffer_size(pmsg->cp_send_size);
199                 wsize = rpcrdma_decode_buffer_size(pmsg->cp_recv_size);
200         }
201
202         if (rsize < ep->rep_inline_recv)
203                 ep->rep_inline_recv = rsize;
204         if (wsize < ep->rep_inline_send)
205                 ep->rep_inline_send = wsize;
206
207         rpcrdma_set_max_header_sizes(r_xprt);
208 }
209
210 /**
211  * rpcrdma_cm_event_handler - Handle RDMA CM events
212  * @id: rdma_cm_id on which an event has occurred
213  * @event: details of the event
214  *
215  * Called with @id's mutex held. Returns 1 if caller should
216  * destroy @id, otherwise 0.
217  */
218 static int
219 rpcrdma_cm_event_handler(struct rdma_cm_id *id, struct rdma_cm_event *event)
220 {
221         struct rpcrdma_xprt *r_xprt = id->context;
222         struct rpcrdma_ia *ia = &r_xprt->rx_ia;
223         struct rpcrdma_ep *ep = &r_xprt->rx_ep;
224         struct rpc_xprt *xprt = &r_xprt->rx_xprt;
225
226         might_sleep();
227
228         trace_xprtrdma_cm_event(r_xprt, event);
229         switch (event->event) {
230         case RDMA_CM_EVENT_ADDR_RESOLVED:
231         case RDMA_CM_EVENT_ROUTE_RESOLVED:
232                 ia->ri_async_rc = 0;
233                 complete(&ia->ri_done);
234                 return 0;
235         case RDMA_CM_EVENT_ADDR_ERROR:
236                 ia->ri_async_rc = -EPROTO;
237                 complete(&ia->ri_done);
238                 return 0;
239         case RDMA_CM_EVENT_ROUTE_ERROR:
240                 ia->ri_async_rc = -ENETUNREACH;
241                 complete(&ia->ri_done);
242                 return 0;
243         case RDMA_CM_EVENT_DEVICE_REMOVAL:
244 #if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
245                 pr_info("rpcrdma: removing device %s for %s:%s\n",
246                         ia->ri_id->device->name,
247                         rpcrdma_addrstr(r_xprt), rpcrdma_portstr(r_xprt));
248 #endif
249                 init_completion(&ia->ri_remove_done);
250                 set_bit(RPCRDMA_IAF_REMOVING, &ia->ri_flags);
251                 ep->rep_connected = -ENODEV;
252                 xprt_force_disconnect(xprt);
253                 wait_for_completion(&ia->ri_remove_done);
254
255                 ia->ri_id = NULL;
256                 /* Return 1 to ensure the core destroys the id. */
257                 return 1;
258         case RDMA_CM_EVENT_ESTABLISHED:
259                 ++xprt->connect_cookie;
260                 ep->rep_connected = 1;
261                 rpcrdma_update_cm_private(r_xprt, &event->param.conn);
262                 trace_xprtrdma_inline_thresh(r_xprt);
263                 wake_up_all(&ep->rep_connect_wait);
264                 break;
265         case RDMA_CM_EVENT_CONNECT_ERROR:
266                 ep->rep_connected = -ENOTCONN;
267                 goto disconnected;
268         case RDMA_CM_EVENT_UNREACHABLE:
269                 ep->rep_connected = -ENETUNREACH;
270                 goto disconnected;
271         case RDMA_CM_EVENT_REJECTED:
272                 dprintk("rpcrdma: connection to %s:%s rejected: %s\n",
273                         rpcrdma_addrstr(r_xprt), rpcrdma_portstr(r_xprt),
274                         rdma_reject_msg(id, event->status));
275                 ep->rep_connected = -ECONNREFUSED;
276                 if (event->status == IB_CM_REJ_STALE_CONN)
277                         ep->rep_connected = -EAGAIN;
278                 goto disconnected;
279         case RDMA_CM_EVENT_DISCONNECTED:
280                 ep->rep_connected = -ECONNABORTED;
281 disconnected:
282                 xprt_force_disconnect(xprt);
283                 wake_up_all(&ep->rep_connect_wait);
284                 break;
285         default:
286                 break;
287         }
288
289         dprintk("RPC:       %s: %s:%s on %s/frwr: %s\n", __func__,
290                 rpcrdma_addrstr(r_xprt), rpcrdma_portstr(r_xprt),
291                 ia->ri_id->device->name, rdma_event_msg(event->event));
292         return 0;
293 }
294
295 static struct rdma_cm_id *
296 rpcrdma_create_id(struct rpcrdma_xprt *xprt, struct rpcrdma_ia *ia)
297 {
298         unsigned long wtimeout = msecs_to_jiffies(RDMA_RESOLVE_TIMEOUT) + 1;
299         struct rdma_cm_id *id;
300         int rc;
301
302         init_completion(&ia->ri_done);
303
304         id = rdma_create_id(xprt->rx_xprt.xprt_net, rpcrdma_cm_event_handler,
305                             xprt, RDMA_PS_TCP, IB_QPT_RC);
306         if (IS_ERR(id))
307                 return id;
308
309         ia->ri_async_rc = -ETIMEDOUT;
310         rc = rdma_resolve_addr(id, NULL,
311                                (struct sockaddr *)&xprt->rx_xprt.addr,
312                                RDMA_RESOLVE_TIMEOUT);
313         if (rc)
314                 goto out;
315         rc = wait_for_completion_interruptible_timeout(&ia->ri_done, wtimeout);
316         if (rc < 0)
317                 goto out;
318
319         rc = ia->ri_async_rc;
320         if (rc)
321                 goto out;
322
323         ia->ri_async_rc = -ETIMEDOUT;
324         rc = rdma_resolve_route(id, RDMA_RESOLVE_TIMEOUT);
325         if (rc)
326                 goto out;
327         rc = wait_for_completion_interruptible_timeout(&ia->ri_done, wtimeout);
328         if (rc < 0)
329                 goto out;
330         rc = ia->ri_async_rc;
331         if (rc)
332                 goto out;
333
334         return id;
335
336 out:
337         rdma_destroy_id(id);
338         return ERR_PTR(rc);
339 }
340
341 /*
342  * Exported functions.
343  */
344
345 /**
346  * rpcrdma_ia_open - Open and initialize an Interface Adapter.
347  * @xprt: transport with IA to (re)initialize
348  *
349  * Returns 0 on success, negative errno if an appropriate
350  * Interface Adapter could not be found and opened.
351  */
352 int
353 rpcrdma_ia_open(struct rpcrdma_xprt *xprt)
354 {
355         struct rpcrdma_ia *ia = &xprt->rx_ia;
356         int rc;
357
358         ia->ri_id = rpcrdma_create_id(xprt, ia);
359         if (IS_ERR(ia->ri_id)) {
360                 rc = PTR_ERR(ia->ri_id);
361                 goto out_err;
362         }
363
364         ia->ri_pd = ib_alloc_pd(ia->ri_id->device, 0);
365         if (IS_ERR(ia->ri_pd)) {
366                 rc = PTR_ERR(ia->ri_pd);
367                 pr_err("rpcrdma: ib_alloc_pd() returned %d\n", rc);
368                 goto out_err;
369         }
370
371         switch (xprt_rdma_memreg_strategy) {
372         case RPCRDMA_FRWR:
373                 if (frwr_is_supported(ia->ri_id->device))
374                         break;
375                 /*FALLTHROUGH*/
376         default:
377                 pr_err("rpcrdma: Device %s does not support memreg mode %d\n",
378                        ia->ri_id->device->name, xprt_rdma_memreg_strategy);
379                 rc = -EINVAL;
380                 goto out_err;
381         }
382
383         return 0;
384
385 out_err:
386         rpcrdma_ia_close(ia);
387         return rc;
388 }
389
390 /**
391  * rpcrdma_ia_remove - Handle device driver unload
392  * @ia: interface adapter being removed
393  *
394  * Divest transport H/W resources associated with this adapter,
395  * but allow it to be restored later.
396  */
397 void
398 rpcrdma_ia_remove(struct rpcrdma_ia *ia)
399 {
400         struct rpcrdma_xprt *r_xprt = container_of(ia, struct rpcrdma_xprt,
401                                                    rx_ia);
402         struct rpcrdma_ep *ep = &r_xprt->rx_ep;
403         struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
404         struct rpcrdma_req *req;
405
406         /* This is similar to rpcrdma_ep_destroy, but:
407          * - Don't cancel the connect worker.
408          * - Don't call rpcrdma_ep_disconnect, which waits
409          *   for another conn upcall, which will deadlock.
410          * - rdma_disconnect is unneeded, the underlying
411          *   connection is already gone.
412          */
413         if (ia->ri_id->qp) {
414                 rpcrdma_xprt_drain(r_xprt);
415                 rdma_destroy_qp(ia->ri_id);
416                 ia->ri_id->qp = NULL;
417         }
418         ib_free_cq(ep->rep_attr.recv_cq);
419         ep->rep_attr.recv_cq = NULL;
420         ib_free_cq(ep->rep_attr.send_cq);
421         ep->rep_attr.send_cq = NULL;
422
423         /* The ULP is responsible for ensuring all DMA
424          * mappings and MRs are gone.
425          */
426         rpcrdma_reps_unmap(r_xprt);
427         list_for_each_entry(req, &buf->rb_allreqs, rl_all) {
428                 rpcrdma_regbuf_dma_unmap(req->rl_rdmabuf);
429                 rpcrdma_regbuf_dma_unmap(req->rl_sendbuf);
430                 rpcrdma_regbuf_dma_unmap(req->rl_recvbuf);
431         }
432         rpcrdma_mrs_destroy(r_xprt);
433         rpcrdma_sendctxs_destroy(r_xprt);
434         ib_dealloc_pd(ia->ri_pd);
435         ia->ri_pd = NULL;
436
437         /* Allow waiters to continue */
438         complete(&ia->ri_remove_done);
439
440         trace_xprtrdma_remove(r_xprt);
441 }
442
443 /**
444  * rpcrdma_ia_close - Clean up/close an IA.
445  * @ia: interface adapter to close
446  *
447  */
448 void
449 rpcrdma_ia_close(struct rpcrdma_ia *ia)
450 {
451         if (ia->ri_id != NULL && !IS_ERR(ia->ri_id)) {
452                 if (ia->ri_id->qp)
453                         rdma_destroy_qp(ia->ri_id);
454                 rdma_destroy_id(ia->ri_id);
455         }
456         ia->ri_id = NULL;
457
458         /* If the pd is still busy, xprtrdma missed freeing a resource */
459         if (ia->ri_pd && !IS_ERR(ia->ri_pd))
460                 ib_dealloc_pd(ia->ri_pd);
461         ia->ri_pd = NULL;
462 }
463
464 /**
465  * rpcrdma_ep_create - Create unconnected endpoint
466  * @r_xprt: transport to instantiate
467  *
468  * Returns zero on success, or a negative errno.
469  */
470 int rpcrdma_ep_create(struct rpcrdma_xprt *r_xprt)
471 {
472         struct rpcrdma_ep *ep = &r_xprt->rx_ep;
473         struct rpcrdma_ia *ia = &r_xprt->rx_ia;
474         struct rpcrdma_connect_private *pmsg = &ep->rep_cm_private;
475         struct ib_cq *sendcq, *recvcq;
476         int rc;
477
478         ep->rep_max_requests = xprt_rdma_slot_table_entries;
479         ep->rep_inline_send = xprt_rdma_max_inline_write;
480         ep->rep_inline_recv = xprt_rdma_max_inline_read;
481
482         rc = frwr_open(ia, ep);
483         if (rc)
484                 return rc;
485
486         ep->rep_attr.event_handler = rpcrdma_qp_event_handler;
487         ep->rep_attr.qp_context = ep;
488         ep->rep_attr.srq = NULL;
489         ep->rep_attr.cap.max_inline_data = 0;
490         ep->rep_attr.sq_sig_type = IB_SIGNAL_REQ_WR;
491         ep->rep_attr.qp_type = IB_QPT_RC;
492         ep->rep_attr.port_num = ~0;
493
494         dprintk("RPC:       %s: requested max: dtos: send %d recv %d; "
495                 "iovs: send %d recv %d\n",
496                 __func__,
497                 ep->rep_attr.cap.max_send_wr,
498                 ep->rep_attr.cap.max_recv_wr,
499                 ep->rep_attr.cap.max_send_sge,
500                 ep->rep_attr.cap.max_recv_sge);
501
502         ep->rep_send_batch = ep->rep_max_requests >> 3;
503         ep->rep_send_count = ep->rep_send_batch;
504         init_waitqueue_head(&ep->rep_connect_wait);
505         ep->rep_receive_count = 0;
506
507         sendcq = ib_alloc_cq_any(ia->ri_id->device, r_xprt,
508                                  ep->rep_attr.cap.max_send_wr + 1,
509                                  IB_POLL_WORKQUEUE);
510         if (IS_ERR(sendcq)) {
511                 rc = PTR_ERR(sendcq);
512                 goto out1;
513         }
514
515         recvcq = ib_alloc_cq_any(ia->ri_id->device, NULL,
516                                  ep->rep_attr.cap.max_recv_wr + 1,
517                                  IB_POLL_WORKQUEUE);
518         if (IS_ERR(recvcq)) {
519                 rc = PTR_ERR(recvcq);
520                 goto out2;
521         }
522
523         ep->rep_attr.send_cq = sendcq;
524         ep->rep_attr.recv_cq = recvcq;
525
526         /* Initialize cma parameters */
527         memset(&ep->rep_remote_cma, 0, sizeof(ep->rep_remote_cma));
528
529         /* Prepare RDMA-CM private message */
530         pmsg->cp_magic = rpcrdma_cmp_magic;
531         pmsg->cp_version = RPCRDMA_CMP_VERSION;
532         pmsg->cp_flags |= RPCRDMA_CMP_F_SND_W_INV_OK;
533         pmsg->cp_send_size = rpcrdma_encode_buffer_size(ep->rep_inline_send);
534         pmsg->cp_recv_size = rpcrdma_encode_buffer_size(ep->rep_inline_recv);
535         ep->rep_remote_cma.private_data = pmsg;
536         ep->rep_remote_cma.private_data_len = sizeof(*pmsg);
537
538         /* Client offers RDMA Read but does not initiate */
539         ep->rep_remote_cma.initiator_depth = 0;
540         ep->rep_remote_cma.responder_resources =
541                 min_t(int, U8_MAX, ia->ri_id->device->attrs.max_qp_rd_atom);
542
543         /* Limit transport retries so client can detect server
544          * GID changes quickly. RPC layer handles re-establishing
545          * transport connection and retransmission.
546          */
547         ep->rep_remote_cma.retry_count = 6;
548
549         /* RPC-over-RDMA handles its own flow control. In addition,
550          * make all RNR NAKs visible so we know that RPC-over-RDMA
551          * flow control is working correctly (no NAKs should be seen).
552          */
553         ep->rep_remote_cma.flow_control = 0;
554         ep->rep_remote_cma.rnr_retry_count = 0;
555
556         return 0;
557
558 out2:
559         ib_free_cq(sendcq);
560 out1:
561         return rc;
562 }
563
564 /**
565  * rpcrdma_ep_destroy - Disconnect and destroy endpoint.
566  * @r_xprt: transport instance to shut down
567  *
568  */
569 void rpcrdma_ep_destroy(struct rpcrdma_xprt *r_xprt)
570 {
571         struct rpcrdma_ep *ep = &r_xprt->rx_ep;
572         struct rpcrdma_ia *ia = &r_xprt->rx_ia;
573
574         if (ia->ri_id && ia->ri_id->qp) {
575                 rpcrdma_ep_disconnect(ep, ia);
576                 rdma_destroy_qp(ia->ri_id);
577                 ia->ri_id->qp = NULL;
578         }
579
580         if (ep->rep_attr.recv_cq)
581                 ib_free_cq(ep->rep_attr.recv_cq);
582         if (ep->rep_attr.send_cq)
583                 ib_free_cq(ep->rep_attr.send_cq);
584 }
585
586 /* Re-establish a connection after a device removal event.
587  * Unlike a normal reconnection, a fresh PD and a new set
588  * of MRs and buffers is needed.
589  */
590 static int rpcrdma_ep_recreate_xprt(struct rpcrdma_xprt *r_xprt,
591                                     struct ib_qp_init_attr *qp_init_attr)
592 {
593         struct rpcrdma_ia *ia = &r_xprt->rx_ia;
594         struct rpcrdma_ep *ep = &r_xprt->rx_ep;
595         int rc, err;
596
597         trace_xprtrdma_reinsert(r_xprt);
598
599         rc = -EHOSTUNREACH;
600         if (rpcrdma_ia_open(r_xprt))
601                 goto out1;
602
603         rc = -ENOMEM;
604         err = rpcrdma_ep_create(r_xprt);
605         if (err) {
606                 pr_err("rpcrdma: rpcrdma_ep_create returned %d\n", err);
607                 goto out2;
608         }
609         memcpy(qp_init_attr, &ep->rep_attr, sizeof(*qp_init_attr));
610
611         rc = -ENETUNREACH;
612         err = rdma_create_qp(ia->ri_id, ia->ri_pd, qp_init_attr);
613         if (err) {
614                 pr_err("rpcrdma: rdma_create_qp returned %d\n", err);
615                 goto out3;
616         }
617         return 0;
618
619 out3:
620         rpcrdma_ep_destroy(r_xprt);
621 out2:
622         rpcrdma_ia_close(ia);
623 out1:
624         return rc;
625 }
626
627 static int rpcrdma_ep_reconnect(struct rpcrdma_xprt *r_xprt,
628                                 struct ib_qp_init_attr *qp_init_attr)
629 {
630         struct rpcrdma_ia *ia = &r_xprt->rx_ia;
631         struct rdma_cm_id *id, *old;
632         int err, rc;
633
634         rpcrdma_ep_disconnect(&r_xprt->rx_ep, ia);
635
636         rc = -EHOSTUNREACH;
637         id = rpcrdma_create_id(r_xprt, ia);
638         if (IS_ERR(id))
639                 goto out;
640
641         /* As long as the new ID points to the same device as the
642          * old ID, we can reuse the transport's existing PD and all
643          * previously allocated MRs. Also, the same device means
644          * the transport's previous DMA mappings are still valid.
645          *
646          * This is a sanity check only. There should be no way these
647          * point to two different devices here.
648          */
649         old = id;
650         rc = -ENETUNREACH;
651         if (ia->ri_id->device != id->device) {
652                 pr_err("rpcrdma: can't reconnect on different device!\n");
653                 goto out_destroy;
654         }
655
656         err = rdma_create_qp(id, ia->ri_pd, qp_init_attr);
657         if (err)
658                 goto out_destroy;
659
660         /* Atomically replace the transport's ID and QP. */
661         rc = 0;
662         old = ia->ri_id;
663         ia->ri_id = id;
664         rdma_destroy_qp(old);
665
666 out_destroy:
667         rdma_destroy_id(old);
668 out:
669         return rc;
670 }
671
672 /*
673  * Connect unconnected endpoint.
674  */
675 int
676 rpcrdma_ep_connect(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
677 {
678         struct rpcrdma_xprt *r_xprt = container_of(ia, struct rpcrdma_xprt,
679                                                    rx_ia);
680         struct rpc_xprt *xprt = &r_xprt->rx_xprt;
681         struct ib_qp_init_attr qp_init_attr;
682         int rc;
683
684 retry:
685         memcpy(&qp_init_attr, &ep->rep_attr, sizeof(qp_init_attr));
686         switch (ep->rep_connected) {
687         case 0:
688                 rc = rdma_create_qp(ia->ri_id, ia->ri_pd, &qp_init_attr);
689                 if (rc) {
690                         rc = -ENETUNREACH;
691                         goto out_noupdate;
692                 }
693                 break;
694         case -ENODEV:
695                 rc = rpcrdma_ep_recreate_xprt(r_xprt, &qp_init_attr);
696                 if (rc)
697                         goto out_noupdate;
698                 break;
699         default:
700                 rc = rpcrdma_ep_reconnect(r_xprt, &qp_init_attr);
701                 if (rc)
702                         goto out;
703         }
704
705         ep->rep_connected = 0;
706         xprt_clear_connected(xprt);
707
708         rpcrdma_reset_cwnd(r_xprt);
709         rpcrdma_post_recvs(r_xprt, true);
710
711         rc = rpcrdma_sendctxs_create(r_xprt);
712         if (rc)
713                 goto out;
714
715         rc = rdma_connect(ia->ri_id, &ep->rep_remote_cma);
716         if (rc)
717                 goto out;
718
719         if (xprt->reestablish_timeout < RPCRDMA_INIT_REEST_TO)
720                 xprt->reestablish_timeout = RPCRDMA_INIT_REEST_TO;
721         wait_event_interruptible(ep->rep_connect_wait, ep->rep_connected != 0);
722         if (ep->rep_connected <= 0) {
723                 if (ep->rep_connected == -EAGAIN)
724                         goto retry;
725                 rc = ep->rep_connected;
726                 goto out;
727         }
728
729         rpcrdma_mrs_create(r_xprt);
730
731 out:
732         if (rc)
733                 ep->rep_connected = rc;
734
735 out_noupdate:
736         trace_xprtrdma_connect(r_xprt, rc);
737         return rc;
738 }
739
740 /**
741  * rpcrdma_ep_disconnect - Disconnect underlying transport
742  * @ep: endpoint to disconnect
743  * @ia: associated interface adapter
744  *
745  * Caller serializes. Either the transport send lock is held,
746  * or we're being called to destroy the transport.
747  */
748 void
749 rpcrdma_ep_disconnect(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
750 {
751         struct rpcrdma_xprt *r_xprt = container_of(ep, struct rpcrdma_xprt,
752                                                    rx_ep);
753         int rc;
754
755         /* returns without wait if ID is not connected */
756         rc = rdma_disconnect(ia->ri_id);
757         if (!rc)
758                 wait_event_interruptible(ep->rep_connect_wait,
759                                                         ep->rep_connected != 1);
760         else
761                 ep->rep_connected = rc;
762         trace_xprtrdma_disconnect(r_xprt, rc);
763
764         rpcrdma_xprt_drain(r_xprt);
765         rpcrdma_reqs_reset(r_xprt);
766         rpcrdma_mrs_destroy(r_xprt);
767         rpcrdma_sendctxs_destroy(r_xprt);
768 }
769
770 /* Fixed-size circular FIFO queue. This implementation is wait-free and
771  * lock-free.
772  *
773  * Consumer is the code path that posts Sends. This path dequeues a
774  * sendctx for use by a Send operation. Multiple consumer threads
775  * are serialized by the RPC transport lock, which allows only one
776  * ->send_request call at a time.
777  *
778  * Producer is the code path that handles Send completions. This path
779  * enqueues a sendctx that has been completed. Multiple producer
780  * threads are serialized by the ib_poll_cq() function.
781  */
782
783 /* rpcrdma_sendctxs_destroy() assumes caller has already quiesced
784  * queue activity, and rpcrdma_xprt_drain has flushed all remaining
785  * Send requests.
786  */
787 static void rpcrdma_sendctxs_destroy(struct rpcrdma_xprt *r_xprt)
788 {
789         struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
790         unsigned long i;
791
792         if (!buf->rb_sc_ctxs)
793                 return;
794         for (i = 0; i <= buf->rb_sc_last; i++)
795                 kfree(buf->rb_sc_ctxs[i]);
796         kfree(buf->rb_sc_ctxs);
797         buf->rb_sc_ctxs = NULL;
798 }
799
800 static struct rpcrdma_sendctx *rpcrdma_sendctx_create(struct rpcrdma_ep *ep)
801 {
802         struct rpcrdma_sendctx *sc;
803
804         sc = kzalloc(struct_size(sc, sc_sges, ep->rep_attr.cap.max_send_sge),
805                      GFP_KERNEL);
806         if (!sc)
807                 return NULL;
808
809         sc->sc_cqe.done = rpcrdma_wc_send;
810         return sc;
811 }
812
813 static int rpcrdma_sendctxs_create(struct rpcrdma_xprt *r_xprt)
814 {
815         struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
816         struct rpcrdma_sendctx *sc;
817         unsigned long i;
818
819         /* Maximum number of concurrent outstanding Send WRs. Capping
820          * the circular queue size stops Send Queue overflow by causing
821          * the ->send_request call to fail temporarily before too many
822          * Sends are posted.
823          */
824         i = buf->rb_max_requests + RPCRDMA_MAX_BC_REQUESTS;
825         buf->rb_sc_ctxs = kcalloc(i, sizeof(sc), GFP_KERNEL);
826         if (!buf->rb_sc_ctxs)
827                 return -ENOMEM;
828
829         buf->rb_sc_last = i - 1;
830         for (i = 0; i <= buf->rb_sc_last; i++) {
831                 sc = rpcrdma_sendctx_create(&r_xprt->rx_ep);
832                 if (!sc)
833                         return -ENOMEM;
834
835                 buf->rb_sc_ctxs[i] = sc;
836         }
837
838         buf->rb_sc_head = 0;
839         buf->rb_sc_tail = 0;
840         return 0;
841 }
842
843 /* The sendctx queue is not guaranteed to have a size that is a
844  * power of two, thus the helpers in circ_buf.h cannot be used.
845  * The other option is to use modulus (%), which can be expensive.
846  */
847 static unsigned long rpcrdma_sendctx_next(struct rpcrdma_buffer *buf,
848                                           unsigned long item)
849 {
850         return likely(item < buf->rb_sc_last) ? item + 1 : 0;
851 }
852
853 /**
854  * rpcrdma_sendctx_get_locked - Acquire a send context
855  * @r_xprt: controlling transport instance
856  *
857  * Returns pointer to a free send completion context; or NULL if
858  * the queue is empty.
859  *
860  * Usage: Called to acquire an SGE array before preparing a Send WR.
861  *
862  * The caller serializes calls to this function (per transport), and
863  * provides an effective memory barrier that flushes the new value
864  * of rb_sc_head.
865  */
866 struct rpcrdma_sendctx *rpcrdma_sendctx_get_locked(struct rpcrdma_xprt *r_xprt)
867 {
868         struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
869         struct rpcrdma_sendctx *sc;
870         unsigned long next_head;
871
872         next_head = rpcrdma_sendctx_next(buf, buf->rb_sc_head);
873
874         if (next_head == READ_ONCE(buf->rb_sc_tail))
875                 goto out_emptyq;
876
877         /* ORDER: item must be accessed _before_ head is updated */
878         sc = buf->rb_sc_ctxs[next_head];
879
880         /* Releasing the lock in the caller acts as a memory
881          * barrier that flushes rb_sc_head.
882          */
883         buf->rb_sc_head = next_head;
884
885         return sc;
886
887 out_emptyq:
888         /* The queue is "empty" if there have not been enough Send
889          * completions recently. This is a sign the Send Queue is
890          * backing up. Cause the caller to pause and try again.
891          */
892         xprt_wait_for_buffer_space(&r_xprt->rx_xprt);
893         r_xprt->rx_stats.empty_sendctx_q++;
894         return NULL;
895 }
896
897 /**
898  * rpcrdma_sendctx_put_locked - Release a send context
899  * @r_xprt: controlling transport instance
900  * @sc: send context to release
901  *
902  * Usage: Called from Send completion to return a sendctxt
903  * to the queue.
904  *
905  * The caller serializes calls to this function (per transport).
906  */
907 static void rpcrdma_sendctx_put_locked(struct rpcrdma_xprt *r_xprt,
908                                        struct rpcrdma_sendctx *sc)
909 {
910         struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
911         unsigned long next_tail;
912
913         /* Unmap SGEs of previously completed but unsignaled
914          * Sends by walking up the queue until @sc is found.
915          */
916         next_tail = buf->rb_sc_tail;
917         do {
918                 next_tail = rpcrdma_sendctx_next(buf, next_tail);
919
920                 /* ORDER: item must be accessed _before_ tail is updated */
921                 rpcrdma_sendctx_unmap(buf->rb_sc_ctxs[next_tail]);
922
923         } while (buf->rb_sc_ctxs[next_tail] != sc);
924
925         /* Paired with READ_ONCE */
926         smp_store_release(&buf->rb_sc_tail, next_tail);
927
928         xprt_write_space(&r_xprt->rx_xprt);
929 }
930
931 static void
932 rpcrdma_mrs_create(struct rpcrdma_xprt *r_xprt)
933 {
934         struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
935         struct rpcrdma_ia *ia = &r_xprt->rx_ia;
936         unsigned int count;
937
938         for (count = 0; count < ia->ri_max_segs; count++) {
939                 struct rpcrdma_mr *mr;
940                 int rc;
941
942                 mr = kzalloc(sizeof(*mr), GFP_NOFS);
943                 if (!mr)
944                         break;
945
946                 rc = frwr_init_mr(ia, mr);
947                 if (rc) {
948                         kfree(mr);
949                         break;
950                 }
951
952                 mr->mr_xprt = r_xprt;
953
954                 spin_lock(&buf->rb_lock);
955                 rpcrdma_mr_push(mr, &buf->rb_mrs);
956                 list_add(&mr->mr_all, &buf->rb_all_mrs);
957                 spin_unlock(&buf->rb_lock);
958         }
959
960         r_xprt->rx_stats.mrs_allocated += count;
961         trace_xprtrdma_createmrs(r_xprt, count);
962 }
963
964 static void
965 rpcrdma_mr_refresh_worker(struct work_struct *work)
966 {
967         struct rpcrdma_buffer *buf = container_of(work, struct rpcrdma_buffer,
968                                                   rb_refresh_worker);
969         struct rpcrdma_xprt *r_xprt = container_of(buf, struct rpcrdma_xprt,
970                                                    rx_buf);
971
972         rpcrdma_mrs_create(r_xprt);
973         xprt_write_space(&r_xprt->rx_xprt);
974 }
975
976 /**
977  * rpcrdma_mrs_refresh - Wake the MR refresh worker
978  * @r_xprt: controlling transport instance
979  *
980  */
981 void rpcrdma_mrs_refresh(struct rpcrdma_xprt *r_xprt)
982 {
983         struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
984         struct rpcrdma_ep *ep = &r_xprt->rx_ep;
985
986         /* If there is no underlying device, it's no use to
987          * wake the refresh worker.
988          */
989         if (ep->rep_connected != -ENODEV) {
990                 /* The work is scheduled on a WQ_MEM_RECLAIM
991                  * workqueue in order to prevent MR allocation
992                  * from recursing into NFS during direct reclaim.
993                  */
994                 queue_work(xprtiod_workqueue, &buf->rb_refresh_worker);
995         }
996 }
997
998 /**
999  * rpcrdma_req_create - Allocate an rpcrdma_req object
1000  * @r_xprt: controlling r_xprt
1001  * @size: initial size, in bytes, of send and receive buffers
1002  * @flags: GFP flags passed to memory allocators
1003  *
1004  * Returns an allocated and fully initialized rpcrdma_req or NULL.
1005  */
1006 struct rpcrdma_req *rpcrdma_req_create(struct rpcrdma_xprt *r_xprt, size_t size,
1007                                        gfp_t flags)
1008 {
1009         struct rpcrdma_buffer *buffer = &r_xprt->rx_buf;
1010         struct rpcrdma_regbuf *rb;
1011         struct rpcrdma_req *req;
1012         size_t maxhdrsize;
1013
1014         req = kzalloc(sizeof(*req), flags);
1015         if (req == NULL)
1016                 goto out1;
1017
1018         /* Compute maximum header buffer size in bytes */
1019         maxhdrsize = rpcrdma_fixed_maxsz + 3 +
1020                      r_xprt->rx_ia.ri_max_segs * rpcrdma_readchunk_maxsz;
1021         maxhdrsize *= sizeof(__be32);
1022         rb = rpcrdma_regbuf_alloc(__roundup_pow_of_two(maxhdrsize),
1023                                   DMA_TO_DEVICE, flags);
1024         if (!rb)
1025                 goto out2;
1026         req->rl_rdmabuf = rb;
1027         xdr_buf_init(&req->rl_hdrbuf, rdmab_data(rb), rdmab_length(rb));
1028
1029         req->rl_sendbuf = rpcrdma_regbuf_alloc(size, DMA_TO_DEVICE, flags);
1030         if (!req->rl_sendbuf)
1031                 goto out3;
1032
1033         req->rl_recvbuf = rpcrdma_regbuf_alloc(size, DMA_NONE, flags);
1034         if (!req->rl_recvbuf)
1035                 goto out4;
1036
1037         INIT_LIST_HEAD(&req->rl_free_mrs);
1038         INIT_LIST_HEAD(&req->rl_registered);
1039         spin_lock(&buffer->rb_lock);
1040         list_add(&req->rl_all, &buffer->rb_allreqs);
1041         spin_unlock(&buffer->rb_lock);
1042         return req;
1043
1044 out4:
1045         kfree(req->rl_sendbuf);
1046 out3:
1047         kfree(req->rl_rdmabuf);
1048 out2:
1049         kfree(req);
1050 out1:
1051         return NULL;
1052 }
1053
1054 /**
1055  * rpcrdma_reqs_reset - Reset all reqs owned by a transport
1056  * @r_xprt: controlling transport instance
1057  *
1058  * ASSUMPTION: the rb_allreqs list is stable for the duration,
1059  * and thus can be walked without holding rb_lock. Eg. the
1060  * caller is holding the transport send lock to exclude
1061  * device removal or disconnection.
1062  */
1063 static void rpcrdma_reqs_reset(struct rpcrdma_xprt *r_xprt)
1064 {
1065         struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
1066         struct rpcrdma_req *req;
1067
1068         list_for_each_entry(req, &buf->rb_allreqs, rl_all) {
1069                 /* Credits are valid only for one connection */
1070                 req->rl_slot.rq_cong = 0;
1071         }
1072 }
1073
1074 static struct rpcrdma_rep *rpcrdma_rep_create(struct rpcrdma_xprt *r_xprt,
1075                                               bool temp)
1076 {
1077         struct rpcrdma_rep *rep;
1078
1079         rep = kzalloc(sizeof(*rep), GFP_KERNEL);
1080         if (rep == NULL)
1081                 goto out;
1082
1083         rep->rr_rdmabuf = rpcrdma_regbuf_alloc(r_xprt->rx_ep.rep_inline_recv,
1084                                                DMA_FROM_DEVICE, GFP_KERNEL);
1085         if (!rep->rr_rdmabuf)
1086                 goto out_free;
1087
1088         xdr_buf_init(&rep->rr_hdrbuf, rdmab_data(rep->rr_rdmabuf),
1089                      rdmab_length(rep->rr_rdmabuf));
1090         rep->rr_cqe.done = rpcrdma_wc_receive;
1091         rep->rr_rxprt = r_xprt;
1092         rep->rr_recv_wr.next = NULL;
1093         rep->rr_recv_wr.wr_cqe = &rep->rr_cqe;
1094         rep->rr_recv_wr.sg_list = &rep->rr_rdmabuf->rg_iov;
1095         rep->rr_recv_wr.num_sge = 1;
1096         rep->rr_temp = temp;
1097         list_add(&rep->rr_all, &r_xprt->rx_buf.rb_all_reps);
1098         return rep;
1099
1100 out_free:
1101         kfree(rep);
1102 out:
1103         return NULL;
1104 }
1105
1106 static void rpcrdma_rep_destroy(struct rpcrdma_rep *rep)
1107 {
1108         list_del(&rep->rr_all);
1109         rpcrdma_regbuf_free(rep->rr_rdmabuf);
1110         kfree(rep);
1111 }
1112
1113 static struct rpcrdma_rep *rpcrdma_rep_get_locked(struct rpcrdma_buffer *buf)
1114 {
1115         struct llist_node *node;
1116
1117         /* Calls to llist_del_first are required to be serialized */
1118         node = llist_del_first(&buf->rb_free_reps);
1119         if (!node)
1120                 return NULL;
1121         return llist_entry(node, struct rpcrdma_rep, rr_node);
1122 }
1123
1124 static void rpcrdma_rep_put(struct rpcrdma_buffer *buf,
1125                             struct rpcrdma_rep *rep)
1126 {
1127         llist_add(&rep->rr_node, &buf->rb_free_reps);
1128 }
1129
1130 static void rpcrdma_reps_unmap(struct rpcrdma_xprt *r_xprt)
1131 {
1132         struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
1133         struct rpcrdma_rep *rep;
1134
1135         list_for_each_entry(rep, &buf->rb_all_reps, rr_all)
1136                 rpcrdma_regbuf_dma_unmap(rep->rr_rdmabuf);
1137 }
1138
1139 static void rpcrdma_reps_destroy(struct rpcrdma_buffer *buf)
1140 {
1141         struct rpcrdma_rep *rep;
1142
1143         while ((rep = rpcrdma_rep_get_locked(buf)) != NULL)
1144                 rpcrdma_rep_destroy(rep);
1145 }
1146
1147 /**
1148  * rpcrdma_buffer_create - Create initial set of req/rep objects
1149  * @r_xprt: transport instance to (re)initialize
1150  *
1151  * Returns zero on success, otherwise a negative errno.
1152  */
1153 int rpcrdma_buffer_create(struct rpcrdma_xprt *r_xprt)
1154 {
1155         struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
1156         int i, rc;
1157
1158         buf->rb_max_requests = r_xprt->rx_ep.rep_max_requests;
1159         buf->rb_bc_srv_max_requests = 0;
1160         spin_lock_init(&buf->rb_lock);
1161         INIT_LIST_HEAD(&buf->rb_mrs);
1162         INIT_LIST_HEAD(&buf->rb_all_mrs);
1163         INIT_WORK(&buf->rb_refresh_worker, rpcrdma_mr_refresh_worker);
1164
1165         INIT_LIST_HEAD(&buf->rb_send_bufs);
1166         INIT_LIST_HEAD(&buf->rb_allreqs);
1167         INIT_LIST_HEAD(&buf->rb_all_reps);
1168
1169         rc = -ENOMEM;
1170         for (i = 0; i < buf->rb_max_requests; i++) {
1171                 struct rpcrdma_req *req;
1172
1173                 req = rpcrdma_req_create(r_xprt, RPCRDMA_V1_DEF_INLINE_SIZE * 2,
1174                                          GFP_KERNEL);
1175                 if (!req)
1176                         goto out;
1177                 list_add(&req->rl_list, &buf->rb_send_bufs);
1178         }
1179
1180         init_llist_head(&buf->rb_free_reps);
1181
1182         return 0;
1183 out:
1184         rpcrdma_buffer_destroy(buf);
1185         return rc;
1186 }
1187
1188 /**
1189  * rpcrdma_req_destroy - Destroy an rpcrdma_req object
1190  * @req: unused object to be destroyed
1191  *
1192  * Relies on caller holding the transport send lock to protect
1193  * removing req->rl_all from buf->rb_all_reqs safely.
1194  */
1195 void rpcrdma_req_destroy(struct rpcrdma_req *req)
1196 {
1197         struct rpcrdma_mr *mr;
1198
1199         list_del(&req->rl_all);
1200
1201         while ((mr = rpcrdma_mr_pop(&req->rl_free_mrs))) {
1202                 struct rpcrdma_buffer *buf = &mr->mr_xprt->rx_buf;
1203
1204                 spin_lock(&buf->rb_lock);
1205                 list_del(&mr->mr_all);
1206                 spin_unlock(&buf->rb_lock);
1207
1208                 frwr_release_mr(mr);
1209         }
1210
1211         rpcrdma_regbuf_free(req->rl_recvbuf);
1212         rpcrdma_regbuf_free(req->rl_sendbuf);
1213         rpcrdma_regbuf_free(req->rl_rdmabuf);
1214         kfree(req);
1215 }
1216
1217 /**
1218  * rpcrdma_mrs_destroy - Release all of a transport's MRs
1219  * @r_xprt: controlling transport instance
1220  *
1221  * Relies on caller holding the transport send lock to protect
1222  * removing mr->mr_list from req->rl_free_mrs safely.
1223  */
1224 static void rpcrdma_mrs_destroy(struct rpcrdma_xprt *r_xprt)
1225 {
1226         struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
1227         struct rpcrdma_mr *mr;
1228
1229         cancel_work_sync(&buf->rb_refresh_worker);
1230
1231         spin_lock(&buf->rb_lock);
1232         while ((mr = list_first_entry_or_null(&buf->rb_all_mrs,
1233                                               struct rpcrdma_mr,
1234                                               mr_all)) != NULL) {
1235                 list_del(&mr->mr_list);
1236                 list_del(&mr->mr_all);
1237                 spin_unlock(&buf->rb_lock);
1238
1239                 frwr_release_mr(mr);
1240
1241                 spin_lock(&buf->rb_lock);
1242         }
1243         spin_unlock(&buf->rb_lock);
1244 }
1245
1246 /**
1247  * rpcrdma_buffer_destroy - Release all hw resources
1248  * @buf: root control block for resources
1249  *
1250  * ORDERING: relies on a prior rpcrdma_xprt_drain :
1251  * - No more Send or Receive completions can occur
1252  * - All MRs, reps, and reqs are returned to their free lists
1253  */
1254 void
1255 rpcrdma_buffer_destroy(struct rpcrdma_buffer *buf)
1256 {
1257         rpcrdma_reps_destroy(buf);
1258
1259         while (!list_empty(&buf->rb_send_bufs)) {
1260                 struct rpcrdma_req *req;
1261
1262                 req = list_first_entry(&buf->rb_send_bufs,
1263                                        struct rpcrdma_req, rl_list);
1264                 list_del(&req->rl_list);
1265                 rpcrdma_req_destroy(req);
1266         }
1267 }
1268
1269 /**
1270  * rpcrdma_mr_get - Allocate an rpcrdma_mr object
1271  * @r_xprt: controlling transport
1272  *
1273  * Returns an initialized rpcrdma_mr or NULL if no free
1274  * rpcrdma_mr objects are available.
1275  */
1276 struct rpcrdma_mr *
1277 rpcrdma_mr_get(struct rpcrdma_xprt *r_xprt)
1278 {
1279         struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
1280         struct rpcrdma_mr *mr;
1281
1282         spin_lock(&buf->rb_lock);
1283         mr = rpcrdma_mr_pop(&buf->rb_mrs);
1284         spin_unlock(&buf->rb_lock);
1285         return mr;
1286 }
1287
1288 /**
1289  * rpcrdma_mr_put - DMA unmap an MR and release it
1290  * @mr: MR to release
1291  *
1292  */
1293 void rpcrdma_mr_put(struct rpcrdma_mr *mr)
1294 {
1295         struct rpcrdma_xprt *r_xprt = mr->mr_xprt;
1296
1297         if (mr->mr_dir != DMA_NONE) {
1298                 trace_xprtrdma_mr_unmap(mr);
1299                 ib_dma_unmap_sg(r_xprt->rx_ia.ri_id->device,
1300                                 mr->mr_sg, mr->mr_nents, mr->mr_dir);
1301                 mr->mr_dir = DMA_NONE;
1302         }
1303
1304         rpcrdma_mr_push(mr, &mr->mr_req->rl_free_mrs);
1305 }
1306
1307 /**
1308  * rpcrdma_buffer_get - Get a request buffer
1309  * @buffers: Buffer pool from which to obtain a buffer
1310  *
1311  * Returns a fresh rpcrdma_req, or NULL if none are available.
1312  */
1313 struct rpcrdma_req *
1314 rpcrdma_buffer_get(struct rpcrdma_buffer *buffers)
1315 {
1316         struct rpcrdma_req *req;
1317
1318         spin_lock(&buffers->rb_lock);
1319         req = list_first_entry_or_null(&buffers->rb_send_bufs,
1320                                        struct rpcrdma_req, rl_list);
1321         if (req)
1322                 list_del_init(&req->rl_list);
1323         spin_unlock(&buffers->rb_lock);
1324         return req;
1325 }
1326
1327 /**
1328  * rpcrdma_buffer_put - Put request/reply buffers back into pool
1329  * @buffers: buffer pool
1330  * @req: object to return
1331  *
1332  */
1333 void rpcrdma_buffer_put(struct rpcrdma_buffer *buffers, struct rpcrdma_req *req)
1334 {
1335         if (req->rl_reply)
1336                 rpcrdma_rep_put(buffers, req->rl_reply);
1337         req->rl_reply = NULL;
1338
1339         spin_lock(&buffers->rb_lock);
1340         list_add(&req->rl_list, &buffers->rb_send_bufs);
1341         spin_unlock(&buffers->rb_lock);
1342 }
1343
1344 /**
1345  * rpcrdma_recv_buffer_put - Release rpcrdma_rep back to free list
1346  * @rep: rep to release
1347  *
1348  * Used after error conditions.
1349  */
1350 void rpcrdma_recv_buffer_put(struct rpcrdma_rep *rep)
1351 {
1352         rpcrdma_rep_put(&rep->rr_rxprt->rx_buf, rep);
1353 }
1354
1355 /* Returns a pointer to a rpcrdma_regbuf object, or NULL.
1356  *
1357  * xprtrdma uses a regbuf for posting an outgoing RDMA SEND, or for
1358  * receiving the payload of RDMA RECV operations. During Long Calls
1359  * or Replies they may be registered externally via frwr_map.
1360  */
1361 static struct rpcrdma_regbuf *
1362 rpcrdma_regbuf_alloc(size_t size, enum dma_data_direction direction,
1363                      gfp_t flags)
1364 {
1365         struct rpcrdma_regbuf *rb;
1366
1367         rb = kmalloc(sizeof(*rb), flags);
1368         if (!rb)
1369                 return NULL;
1370         rb->rg_data = kmalloc(size, flags);
1371         if (!rb->rg_data) {
1372                 kfree(rb);
1373                 return NULL;
1374         }
1375
1376         rb->rg_device = NULL;
1377         rb->rg_direction = direction;
1378         rb->rg_iov.length = size;
1379         return rb;
1380 }
1381
1382 /**
1383  * rpcrdma_regbuf_realloc - re-allocate a SEND/RECV buffer
1384  * @rb: regbuf to reallocate
1385  * @size: size of buffer to be allocated, in bytes
1386  * @flags: GFP flags
1387  *
1388  * Returns true if reallocation was successful. If false is
1389  * returned, @rb is left untouched.
1390  */
1391 bool rpcrdma_regbuf_realloc(struct rpcrdma_regbuf *rb, size_t size, gfp_t flags)
1392 {
1393         void *buf;
1394
1395         buf = kmalloc(size, flags);
1396         if (!buf)
1397                 return false;
1398
1399         rpcrdma_regbuf_dma_unmap(rb);
1400         kfree(rb->rg_data);
1401
1402         rb->rg_data = buf;
1403         rb->rg_iov.length = size;
1404         return true;
1405 }
1406
1407 /**
1408  * __rpcrdma_regbuf_dma_map - DMA-map a regbuf
1409  * @r_xprt: controlling transport instance
1410  * @rb: regbuf to be mapped
1411  *
1412  * Returns true if the buffer is now DMA mapped to @r_xprt's device
1413  */
1414 bool __rpcrdma_regbuf_dma_map(struct rpcrdma_xprt *r_xprt,
1415                               struct rpcrdma_regbuf *rb)
1416 {
1417         struct ib_device *device = r_xprt->rx_ia.ri_id->device;
1418
1419         if (rb->rg_direction == DMA_NONE)
1420                 return false;
1421
1422         rb->rg_iov.addr = ib_dma_map_single(device, rdmab_data(rb),
1423                                             rdmab_length(rb), rb->rg_direction);
1424         if (ib_dma_mapping_error(device, rdmab_addr(rb))) {
1425                 trace_xprtrdma_dma_maperr(rdmab_addr(rb));
1426                 return false;
1427         }
1428
1429         rb->rg_device = device;
1430         rb->rg_iov.lkey = r_xprt->rx_ia.ri_pd->local_dma_lkey;
1431         return true;
1432 }
1433
1434 static void rpcrdma_regbuf_dma_unmap(struct rpcrdma_regbuf *rb)
1435 {
1436         if (!rb)
1437                 return;
1438
1439         if (!rpcrdma_regbuf_is_mapped(rb))
1440                 return;
1441
1442         ib_dma_unmap_single(rb->rg_device, rdmab_addr(rb), rdmab_length(rb),
1443                             rb->rg_direction);
1444         rb->rg_device = NULL;
1445 }
1446
1447 static void rpcrdma_regbuf_free(struct rpcrdma_regbuf *rb)
1448 {
1449         rpcrdma_regbuf_dma_unmap(rb);
1450         if (rb)
1451                 kfree(rb->rg_data);
1452         kfree(rb);
1453 }
1454
1455 /**
1456  * rpcrdma_ep_post - Post WRs to a transport's Send Queue
1457  * @ia: transport's device information
1458  * @ep: transport's RDMA endpoint information
1459  * @req: rpcrdma_req containing the Send WR to post
1460  *
1461  * Returns 0 if the post was successful, otherwise -ENOTCONN
1462  * is returned.
1463  */
1464 int
1465 rpcrdma_ep_post(struct rpcrdma_ia *ia,
1466                 struct rpcrdma_ep *ep,
1467                 struct rpcrdma_req *req)
1468 {
1469         struct ib_send_wr *send_wr = &req->rl_wr;
1470         int rc;
1471
1472         if (!ep->rep_send_count || kref_read(&req->rl_kref) > 1) {
1473                 send_wr->send_flags |= IB_SEND_SIGNALED;
1474                 ep->rep_send_count = ep->rep_send_batch;
1475         } else {
1476                 send_wr->send_flags &= ~IB_SEND_SIGNALED;
1477                 --ep->rep_send_count;
1478         }
1479
1480         rc = frwr_send(ia, req);
1481         trace_xprtrdma_post_send(req, rc);
1482         if (rc)
1483                 return -ENOTCONN;
1484         return 0;
1485 }
1486
1487 /**
1488  * rpcrdma_post_recvs - Refill the Receive Queue
1489  * @r_xprt: controlling transport instance
1490  * @temp: mark Receive buffers to be deleted after use
1491  *
1492  */
1493 void rpcrdma_post_recvs(struct rpcrdma_xprt *r_xprt, bool temp)
1494 {
1495         struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
1496         struct rpcrdma_ep *ep = &r_xprt->rx_ep;
1497         struct ib_recv_wr *i, *wr, *bad_wr;
1498         struct rpcrdma_rep *rep;
1499         int needed, count, rc;
1500
1501         rc = 0;
1502         count = 0;
1503
1504         needed = buf->rb_credits + (buf->rb_bc_srv_max_requests << 1);
1505         if (likely(ep->rep_receive_count > needed))
1506                 goto out;
1507         needed -= ep->rep_receive_count;
1508         if (!temp)
1509                 needed += RPCRDMA_MAX_RECV_BATCH;
1510
1511         /* fast path: all needed reps can be found on the free list */
1512         wr = NULL;
1513         while (needed) {
1514                 rep = rpcrdma_rep_get_locked(buf);
1515                 if (rep && rep->rr_temp) {
1516                         rpcrdma_rep_destroy(rep);
1517                         continue;
1518                 }
1519                 if (!rep)
1520                         rep = rpcrdma_rep_create(r_xprt, temp);
1521                 if (!rep)
1522                         break;
1523
1524                 rep->rr_recv_wr.next = wr;
1525                 wr = &rep->rr_recv_wr;
1526                 --needed;
1527         }
1528         if (!wr)
1529                 goto out;
1530
1531         for (i = wr; i; i = i->next) {
1532                 rep = container_of(i, struct rpcrdma_rep, rr_recv_wr);
1533
1534                 if (!rpcrdma_regbuf_dma_map(r_xprt, rep->rr_rdmabuf))
1535                         goto release_wrs;
1536
1537                 trace_xprtrdma_post_recv(rep);
1538                 ++count;
1539         }
1540
1541         rc = ib_post_recv(r_xprt->rx_ia.ri_id->qp, wr,
1542                           (const struct ib_recv_wr **)&bad_wr);
1543 out:
1544         trace_xprtrdma_post_recvs(r_xprt, count, rc);
1545         if (rc) {
1546                 for (wr = bad_wr; wr;) {
1547                         struct rpcrdma_rep *rep;
1548
1549                         rep = container_of(wr, struct rpcrdma_rep, rr_recv_wr);
1550                         wr = wr->next;
1551                         rpcrdma_recv_buffer_put(rep);
1552                         --count;
1553                 }
1554         }
1555         ep->rep_receive_count += count;
1556         return;
1557
1558 release_wrs:
1559         for (i = wr; i;) {
1560                 rep = container_of(i, struct rpcrdma_rep, rr_recv_wr);
1561                 i = i->next;
1562                 rpcrdma_recv_buffer_put(rep);
1563         }
1564 }