]> asedeno.scripts.mit.edu Git - linux.git/blob - net/sunrpc/xprtrdma/frwr_ops.c
xprtrdma: Combine rpcrdma_mr_put and rpcrdma_mr_unmap_and_put
[linux.git] / net / sunrpc / xprtrdma / frwr_ops.c
1 // SPDX-License-Identifier: GPL-2.0
2 /*
3  * Copyright (c) 2015, 2017 Oracle.  All rights reserved.
4  * Copyright (c) 2003-2007 Network Appliance, Inc. All rights reserved.
5  */
6
7 /* Lightweight memory registration using Fast Registration Work
8  * Requests (FRWR).
9  *
10  * FRWR features ordered asynchronous registration and invalidation
11  * of arbitrarily-sized memory regions. This is the fastest and safest
12  * but most complex memory registration mode.
13  */
14
15 /* Normal operation
16  *
17  * A Memory Region is prepared for RDMA Read or Write using a FAST_REG
18  * Work Request (frwr_map). When the RDMA operation is finished, this
19  * Memory Region is invalidated using a LOCAL_INV Work Request
20  * (frwr_unmap_async and frwr_unmap_sync).
21  *
22  * Typically FAST_REG Work Requests are not signaled, and neither are
23  * RDMA Send Work Requests (with the exception of signaling occasionally
24  * to prevent provider work queue overflows). This greatly reduces HCA
25  * interrupt workload.
26  */
27
28 /* Transport recovery
29  *
30  * frwr_map and frwr_unmap_* cannot run at the same time the transport
31  * connect worker is running. The connect worker holds the transport
32  * send lock, just as ->send_request does. This prevents frwr_map and
33  * the connect worker from running concurrently. When a connection is
34  * closed, the Receive completion queue is drained before the allowing
35  * the connect worker to get control. This prevents frwr_unmap and the
36  * connect worker from running concurrently.
37  *
38  * When the underlying transport disconnects, MRs that are in flight
39  * are flushed and are likely unusable. Thus all flushed MRs are
40  * destroyed. New MRs are created on demand.
41  */
42
43 #include <linux/sunrpc/rpc_rdma.h>
44 #include <linux/sunrpc/svc_rdma.h>
45
46 #include "xprt_rdma.h"
47 #include <trace/events/rpcrdma.h>
48
49 #if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
50 # define RPCDBG_FACILITY        RPCDBG_TRANS
51 #endif
52
53 /**
54  * frwr_is_supported - Check if device supports FRWR
55  * @device: interface adapter to check
56  *
57  * Returns true if device supports FRWR, otherwise false
58  */
59 bool frwr_is_supported(struct ib_device *device)
60 {
61         struct ib_device_attr *attrs = &device->attrs;
62
63         if (!(attrs->device_cap_flags & IB_DEVICE_MEM_MGT_EXTENSIONS))
64                 goto out_not_supported;
65         if (attrs->max_fast_reg_page_list_len == 0)
66                 goto out_not_supported;
67         return true;
68
69 out_not_supported:
70         pr_info("rpcrdma: 'frwr' mode is not supported by device %s\n",
71                 device->name);
72         return false;
73 }
74
75 /**
76  * frwr_release_mr - Destroy one MR
77  * @mr: MR allocated by frwr_init_mr
78  *
79  */
80 void frwr_release_mr(struct rpcrdma_mr *mr)
81 {
82         int rc;
83
84         rc = ib_dereg_mr(mr->frwr.fr_mr);
85         if (rc)
86                 trace_xprtrdma_frwr_dereg(mr, rc);
87         kfree(mr->mr_sg);
88         kfree(mr);
89 }
90
91 /* MRs are dynamically allocated, so simply clean up and release the MR.
92  * A replacement MR will subsequently be allocated on demand.
93  */
94 static void
95 frwr_mr_recycle_worker(struct work_struct *work)
96 {
97         struct rpcrdma_mr *mr = container_of(work, struct rpcrdma_mr, mr_recycle);
98         struct rpcrdma_xprt *r_xprt = mr->mr_xprt;
99
100         trace_xprtrdma_mr_recycle(mr);
101
102         if (mr->mr_dir != DMA_NONE) {
103                 trace_xprtrdma_mr_unmap(mr);
104                 ib_dma_unmap_sg(r_xprt->rx_ia.ri_id->device,
105                                 mr->mr_sg, mr->mr_nents, mr->mr_dir);
106                 mr->mr_dir = DMA_NONE;
107         }
108
109         spin_lock(&r_xprt->rx_buf.rb_mrlock);
110         list_del(&mr->mr_all);
111         r_xprt->rx_stats.mrs_recycled++;
112         spin_unlock(&r_xprt->rx_buf.rb_mrlock);
113
114         frwr_release_mr(mr);
115 }
116
117 /* frwr_reset - Place MRs back on the free list
118  * @req: request to reset
119  *
120  * Used after a failed marshal. For FRWR, this means the MRs
121  * don't have to be fully released and recreated.
122  *
123  * NB: This is safe only as long as none of @req's MRs are
124  * involved with an ongoing asynchronous FAST_REG or LOCAL_INV
125  * Work Request.
126  */
127 void frwr_reset(struct rpcrdma_req *req)
128 {
129         struct rpcrdma_mr *mr;
130
131         while ((mr = rpcrdma_mr_pop(&req->rl_registered)))
132                 rpcrdma_mr_put(mr);
133 }
134
135 /**
136  * frwr_init_mr - Initialize one MR
137  * @ia: interface adapter
138  * @mr: generic MR to prepare for FRWR
139  *
140  * Returns zero if successful. Otherwise a negative errno
141  * is returned.
142  */
143 int frwr_init_mr(struct rpcrdma_ia *ia, struct rpcrdma_mr *mr)
144 {
145         unsigned int depth = ia->ri_max_frwr_depth;
146         struct scatterlist *sg;
147         struct ib_mr *frmr;
148         int rc;
149
150         frmr = ib_alloc_mr(ia->ri_pd, ia->ri_mrtype, depth);
151         if (IS_ERR(frmr))
152                 goto out_mr_err;
153
154         sg = kcalloc(depth, sizeof(*sg), GFP_KERNEL);
155         if (!sg)
156                 goto out_list_err;
157
158         mr->frwr.fr_mr = frmr;
159         mr->mr_dir = DMA_NONE;
160         INIT_LIST_HEAD(&mr->mr_list);
161         INIT_WORK(&mr->mr_recycle, frwr_mr_recycle_worker);
162         init_completion(&mr->frwr.fr_linv_done);
163
164         sg_init_table(sg, depth);
165         mr->mr_sg = sg;
166         return 0;
167
168 out_mr_err:
169         rc = PTR_ERR(frmr);
170         trace_xprtrdma_frwr_alloc(mr, rc);
171         return rc;
172
173 out_list_err:
174         dprintk("RPC:       %s: sg allocation failure\n",
175                 __func__);
176         ib_dereg_mr(frmr);
177         return -ENOMEM;
178 }
179
180 /**
181  * frwr_open - Prepare an endpoint for use with FRWR
182  * @ia: interface adapter this endpoint will use
183  * @ep: endpoint to prepare
184  *
185  * On success, sets:
186  *      ep->rep_attr.cap.max_send_wr
187  *      ep->rep_attr.cap.max_recv_wr
188  *      ep->rep_max_requests
189  *      ia->ri_max_segs
190  *
191  * And these FRWR-related fields:
192  *      ia->ri_max_frwr_depth
193  *      ia->ri_mrtype
194  *
195  * On failure, a negative errno is returned.
196  */
197 int frwr_open(struct rpcrdma_ia *ia, struct rpcrdma_ep *ep)
198 {
199         struct ib_device_attr *attrs = &ia->ri_id->device->attrs;
200         int max_qp_wr, depth, delta;
201
202         ia->ri_mrtype = IB_MR_TYPE_MEM_REG;
203         if (attrs->device_cap_flags & IB_DEVICE_SG_GAPS_REG)
204                 ia->ri_mrtype = IB_MR_TYPE_SG_GAPS;
205
206         /* Quirk: Some devices advertise a large max_fast_reg_page_list_len
207          * capability, but perform optimally when the MRs are not larger
208          * than a page.
209          */
210         if (attrs->max_sge_rd > 1)
211                 ia->ri_max_frwr_depth = attrs->max_sge_rd;
212         else
213                 ia->ri_max_frwr_depth = attrs->max_fast_reg_page_list_len;
214         if (ia->ri_max_frwr_depth > RPCRDMA_MAX_DATA_SEGS)
215                 ia->ri_max_frwr_depth = RPCRDMA_MAX_DATA_SEGS;
216         dprintk("RPC:       %s: max FR page list depth = %u\n",
217                 __func__, ia->ri_max_frwr_depth);
218
219         /* Add room for frwr register and invalidate WRs.
220          * 1. FRWR reg WR for head
221          * 2. FRWR invalidate WR for head
222          * 3. N FRWR reg WRs for pagelist
223          * 4. N FRWR invalidate WRs for pagelist
224          * 5. FRWR reg WR for tail
225          * 6. FRWR invalidate WR for tail
226          * 7. The RDMA_SEND WR
227          */
228         depth = 7;
229
230         /* Calculate N if the device max FRWR depth is smaller than
231          * RPCRDMA_MAX_DATA_SEGS.
232          */
233         if (ia->ri_max_frwr_depth < RPCRDMA_MAX_DATA_SEGS) {
234                 delta = RPCRDMA_MAX_DATA_SEGS - ia->ri_max_frwr_depth;
235                 do {
236                         depth += 2; /* FRWR reg + invalidate */
237                         delta -= ia->ri_max_frwr_depth;
238                 } while (delta > 0);
239         }
240
241         max_qp_wr = ia->ri_id->device->attrs.max_qp_wr;
242         max_qp_wr -= RPCRDMA_BACKWARD_WRS;
243         max_qp_wr -= 1;
244         if (max_qp_wr < RPCRDMA_MIN_SLOT_TABLE)
245                 return -ENOMEM;
246         if (ep->rep_max_requests > max_qp_wr)
247                 ep->rep_max_requests = max_qp_wr;
248         ep->rep_attr.cap.max_send_wr = ep->rep_max_requests * depth;
249         if (ep->rep_attr.cap.max_send_wr > max_qp_wr) {
250                 ep->rep_max_requests = max_qp_wr / depth;
251                 if (!ep->rep_max_requests)
252                         return -EINVAL;
253                 ep->rep_attr.cap.max_send_wr = ep->rep_max_requests * depth;
254         }
255         ep->rep_attr.cap.max_send_wr += RPCRDMA_BACKWARD_WRS;
256         ep->rep_attr.cap.max_send_wr += 1; /* for ib_drain_sq */
257         ep->rep_attr.cap.max_recv_wr = ep->rep_max_requests;
258         ep->rep_attr.cap.max_recv_wr += RPCRDMA_BACKWARD_WRS;
259         ep->rep_attr.cap.max_recv_wr += 1; /* for ib_drain_rq */
260
261         ia->ri_max_segs =
262                 DIV_ROUND_UP(RPCRDMA_MAX_DATA_SEGS, ia->ri_max_frwr_depth);
263         /* Reply chunks require segments for head and tail buffers */
264         ia->ri_max_segs += 2;
265         if (ia->ri_max_segs > RPCRDMA_MAX_HDR_SEGS)
266                 ia->ri_max_segs = RPCRDMA_MAX_HDR_SEGS;
267         return 0;
268 }
269
270 /**
271  * frwr_maxpages - Compute size of largest payload
272  * @r_xprt: transport
273  *
274  * Returns maximum size of an RPC message, in pages.
275  *
276  * FRWR mode conveys a list of pages per chunk segment. The
277  * maximum length of that list is the FRWR page list depth.
278  */
279 size_t frwr_maxpages(struct rpcrdma_xprt *r_xprt)
280 {
281         struct rpcrdma_ia *ia = &r_xprt->rx_ia;
282
283         return min_t(unsigned int, RPCRDMA_MAX_DATA_SEGS,
284                      (ia->ri_max_segs - 2) * ia->ri_max_frwr_depth);
285 }
286
287 /**
288  * frwr_map - Register a memory region
289  * @r_xprt: controlling transport
290  * @seg: memory region co-ordinates
291  * @nsegs: number of segments remaining
292  * @writing: true when RDMA Write will be used
293  * @xid: XID of RPC using the registered memory
294  * @out: initialized MR
295  *
296  * Prepare a REG_MR Work Request to register a memory region
297  * for remote access via RDMA READ or RDMA WRITE.
298  *
299  * Returns the next segment or a negative errno pointer.
300  * On success, the prepared MR is planted in @out.
301  */
302 struct rpcrdma_mr_seg *frwr_map(struct rpcrdma_xprt *r_xprt,
303                                 struct rpcrdma_mr_seg *seg,
304                                 int nsegs, bool writing, __be32 xid,
305                                 struct rpcrdma_mr **out)
306 {
307         struct rpcrdma_ia *ia = &r_xprt->rx_ia;
308         bool holes_ok = ia->ri_mrtype == IB_MR_TYPE_SG_GAPS;
309         struct rpcrdma_mr *mr;
310         struct ib_mr *ibmr;
311         struct ib_reg_wr *reg_wr;
312         int i, n;
313         u8 key;
314
315         mr = rpcrdma_mr_get(r_xprt);
316         if (!mr)
317                 goto out_getmr_err;
318
319         if (nsegs > ia->ri_max_frwr_depth)
320                 nsegs = ia->ri_max_frwr_depth;
321         for (i = 0; i < nsegs;) {
322                 if (seg->mr_page)
323                         sg_set_page(&mr->mr_sg[i],
324                                     seg->mr_page,
325                                     seg->mr_len,
326                                     offset_in_page(seg->mr_offset));
327                 else
328                         sg_set_buf(&mr->mr_sg[i], seg->mr_offset,
329                                    seg->mr_len);
330
331                 ++seg;
332                 ++i;
333                 if (holes_ok)
334                         continue;
335                 if ((i < nsegs && offset_in_page(seg->mr_offset)) ||
336                     offset_in_page((seg-1)->mr_offset + (seg-1)->mr_len))
337                         break;
338         }
339         mr->mr_dir = rpcrdma_data_dir(writing);
340
341         mr->mr_nents =
342                 ib_dma_map_sg(ia->ri_id->device, mr->mr_sg, i, mr->mr_dir);
343         if (!mr->mr_nents)
344                 goto out_dmamap_err;
345
346         ibmr = mr->frwr.fr_mr;
347         n = ib_map_mr_sg(ibmr, mr->mr_sg, mr->mr_nents, NULL, PAGE_SIZE);
348         if (unlikely(n != mr->mr_nents))
349                 goto out_mapmr_err;
350
351         ibmr->iova &= 0x00000000ffffffff;
352         ibmr->iova |= ((u64)be32_to_cpu(xid)) << 32;
353         key = (u8)(ibmr->rkey & 0x000000FF);
354         ib_update_fast_reg_key(ibmr, ++key);
355
356         reg_wr = &mr->frwr.fr_regwr;
357         reg_wr->mr = ibmr;
358         reg_wr->key = ibmr->rkey;
359         reg_wr->access = writing ?
360                          IB_ACCESS_REMOTE_WRITE | IB_ACCESS_LOCAL_WRITE :
361                          IB_ACCESS_REMOTE_READ;
362
363         mr->mr_handle = ibmr->rkey;
364         mr->mr_length = ibmr->length;
365         mr->mr_offset = ibmr->iova;
366         trace_xprtrdma_mr_map(mr);
367
368         *out = mr;
369         return seg;
370
371 out_getmr_err:
372         xprt_wait_for_buffer_space(&r_xprt->rx_xprt);
373         return ERR_PTR(-EAGAIN);
374
375 out_dmamap_err:
376         mr->mr_dir = DMA_NONE;
377         trace_xprtrdma_frwr_sgerr(mr, i);
378         rpcrdma_mr_put(mr);
379         return ERR_PTR(-EIO);
380
381 out_mapmr_err:
382         trace_xprtrdma_frwr_maperr(mr, n);
383         rpcrdma_mr_recycle(mr);
384         return ERR_PTR(-EIO);
385 }
386
387 /**
388  * frwr_wc_fastreg - Invoked by RDMA provider for a flushed FastReg WC
389  * @cq: completion queue (ignored)
390  * @wc: completed WR
391  *
392  */
393 static void frwr_wc_fastreg(struct ib_cq *cq, struct ib_wc *wc)
394 {
395         struct ib_cqe *cqe = wc->wr_cqe;
396         struct rpcrdma_frwr *frwr =
397                 container_of(cqe, struct rpcrdma_frwr, fr_cqe);
398
399         /* WARNING: Only wr_cqe and status are reliable at this point */
400         trace_xprtrdma_wc_fastreg(wc, frwr);
401         /* The MR will get recycled when the associated req is retransmitted */
402 }
403
404 /**
405  * frwr_send - post Send WR containing the RPC Call message
406  * @ia: interface adapter
407  * @req: Prepared RPC Call
408  *
409  * For FRWR, chain any FastReg WRs to the Send WR. Only a
410  * single ib_post_send call is needed to register memory
411  * and then post the Send WR.
412  *
413  * Returns the result of ib_post_send.
414  */
415 int frwr_send(struct rpcrdma_ia *ia, struct rpcrdma_req *req)
416 {
417         struct ib_send_wr *post_wr;
418         struct rpcrdma_mr *mr;
419
420         post_wr = &req->rl_sendctx->sc_wr;
421         list_for_each_entry(mr, &req->rl_registered, mr_list) {
422                 struct rpcrdma_frwr *frwr;
423
424                 frwr = &mr->frwr;
425
426                 frwr->fr_cqe.done = frwr_wc_fastreg;
427                 frwr->fr_regwr.wr.next = post_wr;
428                 frwr->fr_regwr.wr.wr_cqe = &frwr->fr_cqe;
429                 frwr->fr_regwr.wr.num_sge = 0;
430                 frwr->fr_regwr.wr.opcode = IB_WR_REG_MR;
431                 frwr->fr_regwr.wr.send_flags = 0;
432
433                 post_wr = &frwr->fr_regwr.wr;
434         }
435
436         /* If ib_post_send fails, the next ->send_request for
437          * @req will queue these MRs for recovery.
438          */
439         return ib_post_send(ia->ri_id->qp, post_wr, NULL);
440 }
441
442 /**
443  * frwr_reminv - handle a remotely invalidated mr on the @mrs list
444  * @rep: Received reply
445  * @mrs: list of MRs to check
446  *
447  */
448 void frwr_reminv(struct rpcrdma_rep *rep, struct list_head *mrs)
449 {
450         struct rpcrdma_mr *mr;
451
452         list_for_each_entry(mr, mrs, mr_list)
453                 if (mr->mr_handle == rep->rr_inv_rkey) {
454                         list_del_init(&mr->mr_list);
455                         trace_xprtrdma_mr_remoteinv(mr);
456                         rpcrdma_mr_put(mr);
457                         break;  /* only one invalidated MR per RPC */
458                 }
459 }
460
461 static void __frwr_release_mr(struct ib_wc *wc, struct rpcrdma_mr *mr)
462 {
463         if (wc->status != IB_WC_SUCCESS)
464                 rpcrdma_mr_recycle(mr);
465         else
466                 rpcrdma_mr_put(mr);
467 }
468
469 /**
470  * frwr_wc_localinv - Invoked by RDMA provider for a LOCAL_INV WC
471  * @cq: completion queue (ignored)
472  * @wc: completed WR
473  *
474  */
475 static void frwr_wc_localinv(struct ib_cq *cq, struct ib_wc *wc)
476 {
477         struct ib_cqe *cqe = wc->wr_cqe;
478         struct rpcrdma_frwr *frwr =
479                 container_of(cqe, struct rpcrdma_frwr, fr_cqe);
480         struct rpcrdma_mr *mr = container_of(frwr, struct rpcrdma_mr, frwr);
481
482         /* WARNING: Only wr_cqe and status are reliable at this point */
483         trace_xprtrdma_wc_li(wc, frwr);
484         __frwr_release_mr(wc, mr);
485 }
486
487 /**
488  * frwr_wc_localinv_wake - Invoked by RDMA provider for a LOCAL_INV WC
489  * @cq: completion queue (ignored)
490  * @wc: completed WR
491  *
492  * Awaken anyone waiting for an MR to finish being fenced.
493  */
494 static void frwr_wc_localinv_wake(struct ib_cq *cq, struct ib_wc *wc)
495 {
496         struct ib_cqe *cqe = wc->wr_cqe;
497         struct rpcrdma_frwr *frwr =
498                 container_of(cqe, struct rpcrdma_frwr, fr_cqe);
499         struct rpcrdma_mr *mr = container_of(frwr, struct rpcrdma_mr, frwr);
500
501         /* WARNING: Only wr_cqe and status are reliable at this point */
502         trace_xprtrdma_wc_li_wake(wc, frwr);
503         complete(&frwr->fr_linv_done);
504         __frwr_release_mr(wc, mr);
505 }
506
507 /**
508  * frwr_unmap_sync - invalidate memory regions that were registered for @req
509  * @r_xprt: controlling transport instance
510  * @req: rpcrdma_req with a non-empty list of MRs to process
511  *
512  * Sleeps until it is safe for the host CPU to access the previously mapped
513  * memory regions. This guarantees that registered MRs are properly fenced
514  * from the server before the RPC consumer accesses the data in them. It
515  * also ensures proper Send flow control: waking the next RPC waits until
516  * this RPC has relinquished all its Send Queue entries.
517  */
518 void frwr_unmap_sync(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req)
519 {
520         struct ib_send_wr *first, **prev, *last;
521         const struct ib_send_wr *bad_wr;
522         struct rpcrdma_frwr *frwr;
523         struct rpcrdma_mr *mr;
524         int rc;
525
526         /* ORDER: Invalidate all of the MRs first
527          *
528          * Chain the LOCAL_INV Work Requests and post them with
529          * a single ib_post_send() call.
530          */
531         frwr = NULL;
532         prev = &first;
533         while ((mr = rpcrdma_mr_pop(&req->rl_registered))) {
534
535                 trace_xprtrdma_mr_localinv(mr);
536                 r_xprt->rx_stats.local_inv_needed++;
537
538                 frwr = &mr->frwr;
539                 frwr->fr_cqe.done = frwr_wc_localinv;
540                 last = &frwr->fr_invwr;
541                 last->next = NULL;
542                 last->wr_cqe = &frwr->fr_cqe;
543                 last->sg_list = NULL;
544                 last->num_sge = 0;
545                 last->opcode = IB_WR_LOCAL_INV;
546                 last->send_flags = IB_SEND_SIGNALED;
547                 last->ex.invalidate_rkey = mr->mr_handle;
548
549                 *prev = last;
550                 prev = &last->next;
551         }
552
553         /* Strong send queue ordering guarantees that when the
554          * last WR in the chain completes, all WRs in the chain
555          * are complete.
556          */
557         frwr->fr_cqe.done = frwr_wc_localinv_wake;
558         reinit_completion(&frwr->fr_linv_done);
559
560         /* Transport disconnect drains the receive CQ before it
561          * replaces the QP. The RPC reply handler won't call us
562          * unless ri_id->qp is a valid pointer.
563          */
564         bad_wr = NULL;
565         rc = ib_post_send(r_xprt->rx_ia.ri_id->qp, first, &bad_wr);
566         trace_xprtrdma_post_send(req, rc);
567
568         /* The final LOCAL_INV WR in the chain is supposed to
569          * do the wake. If it was never posted, the wake will
570          * not happen, so don't wait in that case.
571          */
572         if (bad_wr != first)
573                 wait_for_completion(&frwr->fr_linv_done);
574         if (!rc)
575                 return;
576
577         /* Recycle MRs in the LOCAL_INV chain that did not get posted.
578          */
579         while (bad_wr) {
580                 frwr = container_of(bad_wr, struct rpcrdma_frwr,
581                                     fr_invwr);
582                 mr = container_of(frwr, struct rpcrdma_mr, frwr);
583                 bad_wr = bad_wr->next;
584
585                 list_del_init(&mr->mr_list);
586                 rpcrdma_mr_recycle(mr);
587         }
588 }
589
590 /**
591  * frwr_wc_localinv_done - Invoked by RDMA provider for a signaled LOCAL_INV WC
592  * @cq: completion queue (ignored)
593  * @wc: completed WR
594  *
595  */
596 static void frwr_wc_localinv_done(struct ib_cq *cq, struct ib_wc *wc)
597 {
598         struct ib_cqe *cqe = wc->wr_cqe;
599         struct rpcrdma_frwr *frwr =
600                 container_of(cqe, struct rpcrdma_frwr, fr_cqe);
601         struct rpcrdma_mr *mr = container_of(frwr, struct rpcrdma_mr, frwr);
602
603         /* WARNING: Only wr_cqe and status are reliable at this point */
604         trace_xprtrdma_wc_li_done(wc, frwr);
605         rpcrdma_complete_rqst(frwr->fr_req->rl_reply);
606         __frwr_release_mr(wc, mr);
607 }
608
609 /**
610  * frwr_unmap_async - invalidate memory regions that were registered for @req
611  * @r_xprt: controlling transport instance
612  * @req: rpcrdma_req with a non-empty list of MRs to process
613  *
614  * This guarantees that registered MRs are properly fenced from the
615  * server before the RPC consumer accesses the data in them. It also
616  * ensures proper Send flow control: waking the next RPC waits until
617  * this RPC has relinquished all its Send Queue entries.
618  */
619 void frwr_unmap_async(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req)
620 {
621         struct ib_send_wr *first, *last, **prev;
622         const struct ib_send_wr *bad_wr;
623         struct rpcrdma_frwr *frwr;
624         struct rpcrdma_mr *mr;
625         int rc;
626
627         /* Chain the LOCAL_INV Work Requests and post them with
628          * a single ib_post_send() call.
629          */
630         frwr = NULL;
631         prev = &first;
632         while ((mr = rpcrdma_mr_pop(&req->rl_registered))) {
633
634                 trace_xprtrdma_mr_localinv(mr);
635                 r_xprt->rx_stats.local_inv_needed++;
636
637                 frwr = &mr->frwr;
638                 frwr->fr_cqe.done = frwr_wc_localinv;
639                 frwr->fr_req = req;
640                 last = &frwr->fr_invwr;
641                 last->next = NULL;
642                 last->wr_cqe = &frwr->fr_cqe;
643                 last->sg_list = NULL;
644                 last->num_sge = 0;
645                 last->opcode = IB_WR_LOCAL_INV;
646                 last->send_flags = IB_SEND_SIGNALED;
647                 last->ex.invalidate_rkey = mr->mr_handle;
648
649                 *prev = last;
650                 prev = &last->next;
651         }
652
653         /* Strong send queue ordering guarantees that when the
654          * last WR in the chain completes, all WRs in the chain
655          * are complete. The last completion will wake up the
656          * RPC waiter.
657          */
658         frwr->fr_cqe.done = frwr_wc_localinv_done;
659
660         /* Transport disconnect drains the receive CQ before it
661          * replaces the QP. The RPC reply handler won't call us
662          * unless ri_id->qp is a valid pointer.
663          */
664         bad_wr = NULL;
665         rc = ib_post_send(r_xprt->rx_ia.ri_id->qp, first, &bad_wr);
666         trace_xprtrdma_post_send(req, rc);
667         if (!rc)
668                 return;
669
670         /* Recycle MRs in the LOCAL_INV chain that did not get posted.
671          */
672         while (bad_wr) {
673                 frwr = container_of(bad_wr, struct rpcrdma_frwr, fr_invwr);
674                 mr = container_of(frwr, struct rpcrdma_mr, frwr);
675                 bad_wr = bad_wr->next;
676
677                 rpcrdma_mr_recycle(mr);
678         }
679
680         /* The final LOCAL_INV WR in the chain is supposed to
681          * do the wake. If it was never posted, the wake will
682          * not happen, so wake here in that case.
683          */
684         rpcrdma_complete_rqst(req->rl_reply);
685 }