]> asedeno.scripts.mit.edu Git - linux.git/blob - net/sunrpc/xprtrdma/frwr_ops.c
xprtrdma: Fix calculation of ri_max_segs again
[linux.git] / net / sunrpc / xprtrdma / frwr_ops.c
1 // SPDX-License-Identifier: GPL-2.0
2 /*
3  * Copyright (c) 2015, 2017 Oracle.  All rights reserved.
4  * Copyright (c) 2003-2007 Network Appliance, Inc. All rights reserved.
5  */
6
7 /* Lightweight memory registration using Fast Registration Work
8  * Requests (FRWR).
9  *
10  * FRWR features ordered asynchronous registration and invalidation
11  * of arbitrarily-sized memory regions. This is the fastest and safest
12  * but most complex memory registration mode.
13  */
14
15 /* Normal operation
16  *
17  * A Memory Region is prepared for RDMA Read or Write using a FAST_REG
18  * Work Request (frwr_map). When the RDMA operation is finished, this
19  * Memory Region is invalidated using a LOCAL_INV Work Request
20  * (frwr_unmap_async and frwr_unmap_sync).
21  *
22  * Typically FAST_REG Work Requests are not signaled, and neither are
23  * RDMA Send Work Requests (with the exception of signaling occasionally
24  * to prevent provider work queue overflows). This greatly reduces HCA
25  * interrupt workload.
26  */
27
28 /* Transport recovery
29  *
30  * frwr_map and frwr_unmap_* cannot run at the same time the transport
31  * connect worker is running. The connect worker holds the transport
32  * send lock, just as ->send_request does. This prevents frwr_map and
33  * the connect worker from running concurrently. When a connection is
34  * closed, the Receive completion queue is drained before the allowing
35  * the connect worker to get control. This prevents frwr_unmap and the
36  * connect worker from running concurrently.
37  *
38  * When the underlying transport disconnects, MRs that are in flight
39  * are flushed and are likely unusable. Thus all flushed MRs are
40  * destroyed. New MRs are created on demand.
41  */
42
43 #include <linux/sunrpc/rpc_rdma.h>
44 #include <linux/sunrpc/svc_rdma.h>
45
46 #include "xprt_rdma.h"
47 #include <trace/events/rpcrdma.h>
48
49 #if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
50 # define RPCDBG_FACILITY        RPCDBG_TRANS
51 #endif
52
53 /**
54  * frwr_is_supported - Check if device supports FRWR
55  * @device: interface adapter to check
56  *
57  * Returns true if device supports FRWR, otherwise false
58  */
59 bool frwr_is_supported(struct ib_device *device)
60 {
61         struct ib_device_attr *attrs = &device->attrs;
62
63         if (!(attrs->device_cap_flags & IB_DEVICE_MEM_MGT_EXTENSIONS))
64                 goto out_not_supported;
65         if (attrs->max_fast_reg_page_list_len == 0)
66                 goto out_not_supported;
67         return true;
68
69 out_not_supported:
70         pr_info("rpcrdma: 'frwr' mode is not supported by device %s\n",
71                 device->name);
72         return false;
73 }
74
75 /**
76  * frwr_release_mr - Destroy one MR
77  * @mr: MR allocated by frwr_init_mr
78  *
79  */
80 void frwr_release_mr(struct rpcrdma_mr *mr)
81 {
82         int rc;
83
84         rc = ib_dereg_mr(mr->frwr.fr_mr);
85         if (rc)
86                 trace_xprtrdma_frwr_dereg(mr, rc);
87         kfree(mr->mr_sg);
88         kfree(mr);
89 }
90
91 /* MRs are dynamically allocated, so simply clean up and release the MR.
92  * A replacement MR will subsequently be allocated on demand.
93  */
94 static void
95 frwr_mr_recycle_worker(struct work_struct *work)
96 {
97         struct rpcrdma_mr *mr = container_of(work, struct rpcrdma_mr, mr_recycle);
98         struct rpcrdma_xprt *r_xprt = mr->mr_xprt;
99
100         trace_xprtrdma_mr_recycle(mr);
101
102         if (mr->mr_dir != DMA_NONE) {
103                 trace_xprtrdma_mr_unmap(mr);
104                 ib_dma_unmap_sg(r_xprt->rx_ia.ri_id->device,
105                                 mr->mr_sg, mr->mr_nents, mr->mr_dir);
106                 mr->mr_dir = DMA_NONE;
107         }
108
109         spin_lock(&r_xprt->rx_buf.rb_mrlock);
110         list_del(&mr->mr_all);
111         r_xprt->rx_stats.mrs_recycled++;
112         spin_unlock(&r_xprt->rx_buf.rb_mrlock);
113
114         frwr_release_mr(mr);
115 }
116
117 /* frwr_reset - Place MRs back on the free list
118  * @req: request to reset
119  *
120  * Used after a failed marshal. For FRWR, this means the MRs
121  * don't have to be fully released and recreated.
122  *
123  * NB: This is safe only as long as none of @req's MRs are
124  * involved with an ongoing asynchronous FAST_REG or LOCAL_INV
125  * Work Request.
126  */
127 void frwr_reset(struct rpcrdma_req *req)
128 {
129         while (!list_empty(&req->rl_registered)) {
130                 struct rpcrdma_mr *mr;
131
132                 mr = rpcrdma_mr_pop(&req->rl_registered);
133                 rpcrdma_mr_unmap_and_put(mr);
134         }
135 }
136
137 /**
138  * frwr_init_mr - Initialize one MR
139  * @ia: interface adapter
140  * @mr: generic MR to prepare for FRWR
141  *
142  * Returns zero if successful. Otherwise a negative errno
143  * is returned.
144  */
145 int frwr_init_mr(struct rpcrdma_ia *ia, struct rpcrdma_mr *mr)
146 {
147         unsigned int depth = ia->ri_max_frwr_depth;
148         struct scatterlist *sg;
149         struct ib_mr *frmr;
150         int rc;
151
152         frmr = ib_alloc_mr(ia->ri_pd, ia->ri_mrtype, depth);
153         if (IS_ERR(frmr))
154                 goto out_mr_err;
155
156         sg = kcalloc(depth, sizeof(*sg), GFP_KERNEL);
157         if (!sg)
158                 goto out_list_err;
159
160         mr->frwr.fr_mr = frmr;
161         mr->mr_dir = DMA_NONE;
162         INIT_LIST_HEAD(&mr->mr_list);
163         INIT_WORK(&mr->mr_recycle, frwr_mr_recycle_worker);
164         init_completion(&mr->frwr.fr_linv_done);
165
166         sg_init_table(sg, depth);
167         mr->mr_sg = sg;
168         return 0;
169
170 out_mr_err:
171         rc = PTR_ERR(frmr);
172         trace_xprtrdma_frwr_alloc(mr, rc);
173         return rc;
174
175 out_list_err:
176         dprintk("RPC:       %s: sg allocation failure\n",
177                 __func__);
178         ib_dereg_mr(frmr);
179         return -ENOMEM;
180 }
181
182 /**
183  * frwr_open - Prepare an endpoint for use with FRWR
184  * @ia: interface adapter this endpoint will use
185  * @ep: endpoint to prepare
186  *
187  * On success, sets:
188  *      ep->rep_attr.cap.max_send_wr
189  *      ep->rep_attr.cap.max_recv_wr
190  *      ep->rep_max_requests
191  *      ia->ri_max_segs
192  *
193  * And these FRWR-related fields:
194  *      ia->ri_max_frwr_depth
195  *      ia->ri_mrtype
196  *
197  * On failure, a negative errno is returned.
198  */
199 int frwr_open(struct rpcrdma_ia *ia, struct rpcrdma_ep *ep)
200 {
201         struct ib_device_attr *attrs = &ia->ri_id->device->attrs;
202         int max_qp_wr, depth, delta;
203
204         ia->ri_mrtype = IB_MR_TYPE_MEM_REG;
205         if (attrs->device_cap_flags & IB_DEVICE_SG_GAPS_REG)
206                 ia->ri_mrtype = IB_MR_TYPE_SG_GAPS;
207
208         /* Quirk: Some devices advertise a large max_fast_reg_page_list_len
209          * capability, but perform optimally when the MRs are not larger
210          * than a page.
211          */
212         if (attrs->max_sge_rd > 1)
213                 ia->ri_max_frwr_depth = attrs->max_sge_rd;
214         else
215                 ia->ri_max_frwr_depth = attrs->max_fast_reg_page_list_len;
216         if (ia->ri_max_frwr_depth > RPCRDMA_MAX_DATA_SEGS)
217                 ia->ri_max_frwr_depth = RPCRDMA_MAX_DATA_SEGS;
218         dprintk("RPC:       %s: max FR page list depth = %u\n",
219                 __func__, ia->ri_max_frwr_depth);
220
221         /* Add room for frwr register and invalidate WRs.
222          * 1. FRWR reg WR for head
223          * 2. FRWR invalidate WR for head
224          * 3. N FRWR reg WRs for pagelist
225          * 4. N FRWR invalidate WRs for pagelist
226          * 5. FRWR reg WR for tail
227          * 6. FRWR invalidate WR for tail
228          * 7. The RDMA_SEND WR
229          */
230         depth = 7;
231
232         /* Calculate N if the device max FRWR depth is smaller than
233          * RPCRDMA_MAX_DATA_SEGS.
234          */
235         if (ia->ri_max_frwr_depth < RPCRDMA_MAX_DATA_SEGS) {
236                 delta = RPCRDMA_MAX_DATA_SEGS - ia->ri_max_frwr_depth;
237                 do {
238                         depth += 2; /* FRWR reg + invalidate */
239                         delta -= ia->ri_max_frwr_depth;
240                 } while (delta > 0);
241         }
242
243         max_qp_wr = ia->ri_id->device->attrs.max_qp_wr;
244         max_qp_wr -= RPCRDMA_BACKWARD_WRS;
245         max_qp_wr -= 1;
246         if (max_qp_wr < RPCRDMA_MIN_SLOT_TABLE)
247                 return -ENOMEM;
248         if (ep->rep_max_requests > max_qp_wr)
249                 ep->rep_max_requests = max_qp_wr;
250         ep->rep_attr.cap.max_send_wr = ep->rep_max_requests * depth;
251         if (ep->rep_attr.cap.max_send_wr > max_qp_wr) {
252                 ep->rep_max_requests = max_qp_wr / depth;
253                 if (!ep->rep_max_requests)
254                         return -EINVAL;
255                 ep->rep_attr.cap.max_send_wr = ep->rep_max_requests * depth;
256         }
257         ep->rep_attr.cap.max_send_wr += RPCRDMA_BACKWARD_WRS;
258         ep->rep_attr.cap.max_send_wr += 1; /* for ib_drain_sq */
259         ep->rep_attr.cap.max_recv_wr = ep->rep_max_requests;
260         ep->rep_attr.cap.max_recv_wr += RPCRDMA_BACKWARD_WRS;
261         ep->rep_attr.cap.max_recv_wr += 1; /* for ib_drain_rq */
262
263         ia->ri_max_segs =
264                 DIV_ROUND_UP(RPCRDMA_MAX_DATA_SEGS, ia->ri_max_frwr_depth);
265         /* Reply chunks require segments for head and tail buffers */
266         ia->ri_max_segs += 2;
267         if (ia->ri_max_segs > RPCRDMA_MAX_HDR_SEGS)
268                 ia->ri_max_segs = RPCRDMA_MAX_HDR_SEGS;
269         return 0;
270 }
271
272 /**
273  * frwr_maxpages - Compute size of largest payload
274  * @r_xprt: transport
275  *
276  * Returns maximum size of an RPC message, in pages.
277  *
278  * FRWR mode conveys a list of pages per chunk segment. The
279  * maximum length of that list is the FRWR page list depth.
280  */
281 size_t frwr_maxpages(struct rpcrdma_xprt *r_xprt)
282 {
283         struct rpcrdma_ia *ia = &r_xprt->rx_ia;
284
285         return min_t(unsigned int, RPCRDMA_MAX_DATA_SEGS,
286                      (ia->ri_max_segs - 2) * ia->ri_max_frwr_depth);
287 }
288
289 /**
290  * frwr_map - Register a memory region
291  * @r_xprt: controlling transport
292  * @seg: memory region co-ordinates
293  * @nsegs: number of segments remaining
294  * @writing: true when RDMA Write will be used
295  * @xid: XID of RPC using the registered memory
296  * @out: initialized MR
297  *
298  * Prepare a REG_MR Work Request to register a memory region
299  * for remote access via RDMA READ or RDMA WRITE.
300  *
301  * Returns the next segment or a negative errno pointer.
302  * On success, the prepared MR is planted in @out.
303  */
304 struct rpcrdma_mr_seg *frwr_map(struct rpcrdma_xprt *r_xprt,
305                                 struct rpcrdma_mr_seg *seg,
306                                 int nsegs, bool writing, __be32 xid,
307                                 struct rpcrdma_mr **out)
308 {
309         struct rpcrdma_ia *ia = &r_xprt->rx_ia;
310         bool holes_ok = ia->ri_mrtype == IB_MR_TYPE_SG_GAPS;
311         struct rpcrdma_mr *mr;
312         struct ib_mr *ibmr;
313         struct ib_reg_wr *reg_wr;
314         int i, n;
315         u8 key;
316
317         mr = rpcrdma_mr_get(r_xprt);
318         if (!mr)
319                 goto out_getmr_err;
320
321         if (nsegs > ia->ri_max_frwr_depth)
322                 nsegs = ia->ri_max_frwr_depth;
323         for (i = 0; i < nsegs;) {
324                 if (seg->mr_page)
325                         sg_set_page(&mr->mr_sg[i],
326                                     seg->mr_page,
327                                     seg->mr_len,
328                                     offset_in_page(seg->mr_offset));
329                 else
330                         sg_set_buf(&mr->mr_sg[i], seg->mr_offset,
331                                    seg->mr_len);
332
333                 ++seg;
334                 ++i;
335                 if (holes_ok)
336                         continue;
337                 if ((i < nsegs && offset_in_page(seg->mr_offset)) ||
338                     offset_in_page((seg-1)->mr_offset + (seg-1)->mr_len))
339                         break;
340         }
341         mr->mr_dir = rpcrdma_data_dir(writing);
342
343         mr->mr_nents =
344                 ib_dma_map_sg(ia->ri_id->device, mr->mr_sg, i, mr->mr_dir);
345         if (!mr->mr_nents)
346                 goto out_dmamap_err;
347
348         ibmr = mr->frwr.fr_mr;
349         n = ib_map_mr_sg(ibmr, mr->mr_sg, mr->mr_nents, NULL, PAGE_SIZE);
350         if (unlikely(n != mr->mr_nents))
351                 goto out_mapmr_err;
352
353         ibmr->iova &= 0x00000000ffffffff;
354         ibmr->iova |= ((u64)be32_to_cpu(xid)) << 32;
355         key = (u8)(ibmr->rkey & 0x000000FF);
356         ib_update_fast_reg_key(ibmr, ++key);
357
358         reg_wr = &mr->frwr.fr_regwr;
359         reg_wr->mr = ibmr;
360         reg_wr->key = ibmr->rkey;
361         reg_wr->access = writing ?
362                          IB_ACCESS_REMOTE_WRITE | IB_ACCESS_LOCAL_WRITE :
363                          IB_ACCESS_REMOTE_READ;
364
365         mr->mr_handle = ibmr->rkey;
366         mr->mr_length = ibmr->length;
367         mr->mr_offset = ibmr->iova;
368         trace_xprtrdma_mr_map(mr);
369
370         *out = mr;
371         return seg;
372
373 out_getmr_err:
374         xprt_wait_for_buffer_space(&r_xprt->rx_xprt);
375         return ERR_PTR(-EAGAIN);
376
377 out_dmamap_err:
378         mr->mr_dir = DMA_NONE;
379         trace_xprtrdma_frwr_sgerr(mr, i);
380         rpcrdma_mr_put(mr);
381         return ERR_PTR(-EIO);
382
383 out_mapmr_err:
384         trace_xprtrdma_frwr_maperr(mr, n);
385         rpcrdma_mr_recycle(mr);
386         return ERR_PTR(-EIO);
387 }
388
389 /**
390  * frwr_wc_fastreg - Invoked by RDMA provider for a flushed FastReg WC
391  * @cq: completion queue (ignored)
392  * @wc: completed WR
393  *
394  */
395 static void frwr_wc_fastreg(struct ib_cq *cq, struct ib_wc *wc)
396 {
397         struct ib_cqe *cqe = wc->wr_cqe;
398         struct rpcrdma_frwr *frwr =
399                 container_of(cqe, struct rpcrdma_frwr, fr_cqe);
400
401         /* WARNING: Only wr_cqe and status are reliable at this point */
402         trace_xprtrdma_wc_fastreg(wc, frwr);
403         /* The MR will get recycled when the associated req is retransmitted */
404 }
405
406 /**
407  * frwr_send - post Send WR containing the RPC Call message
408  * @ia: interface adapter
409  * @req: Prepared RPC Call
410  *
411  * For FRWR, chain any FastReg WRs to the Send WR. Only a
412  * single ib_post_send call is needed to register memory
413  * and then post the Send WR.
414  *
415  * Returns the result of ib_post_send.
416  */
417 int frwr_send(struct rpcrdma_ia *ia, struct rpcrdma_req *req)
418 {
419         struct ib_send_wr *post_wr;
420         struct rpcrdma_mr *mr;
421
422         post_wr = &req->rl_sendctx->sc_wr;
423         list_for_each_entry(mr, &req->rl_registered, mr_list) {
424                 struct rpcrdma_frwr *frwr;
425
426                 frwr = &mr->frwr;
427
428                 frwr->fr_cqe.done = frwr_wc_fastreg;
429                 frwr->fr_regwr.wr.next = post_wr;
430                 frwr->fr_regwr.wr.wr_cqe = &frwr->fr_cqe;
431                 frwr->fr_regwr.wr.num_sge = 0;
432                 frwr->fr_regwr.wr.opcode = IB_WR_REG_MR;
433                 frwr->fr_regwr.wr.send_flags = 0;
434
435                 post_wr = &frwr->fr_regwr.wr;
436         }
437
438         /* If ib_post_send fails, the next ->send_request for
439          * @req will queue these MRs for recovery.
440          */
441         return ib_post_send(ia->ri_id->qp, post_wr, NULL);
442 }
443
444 /**
445  * frwr_reminv - handle a remotely invalidated mr on the @mrs list
446  * @rep: Received reply
447  * @mrs: list of MRs to check
448  *
449  */
450 void frwr_reminv(struct rpcrdma_rep *rep, struct list_head *mrs)
451 {
452         struct rpcrdma_mr *mr;
453
454         list_for_each_entry(mr, mrs, mr_list)
455                 if (mr->mr_handle == rep->rr_inv_rkey) {
456                         list_del_init(&mr->mr_list);
457                         trace_xprtrdma_mr_remoteinv(mr);
458                         rpcrdma_mr_unmap_and_put(mr);
459                         break;  /* only one invalidated MR per RPC */
460                 }
461 }
462
463 static void __frwr_release_mr(struct ib_wc *wc, struct rpcrdma_mr *mr)
464 {
465         if (wc->status != IB_WC_SUCCESS)
466                 rpcrdma_mr_recycle(mr);
467         else
468                 rpcrdma_mr_unmap_and_put(mr);
469 }
470
471 /**
472  * frwr_wc_localinv - Invoked by RDMA provider for a LOCAL_INV WC
473  * @cq: completion queue (ignored)
474  * @wc: completed WR
475  *
476  */
477 static void frwr_wc_localinv(struct ib_cq *cq, struct ib_wc *wc)
478 {
479         struct ib_cqe *cqe = wc->wr_cqe;
480         struct rpcrdma_frwr *frwr =
481                 container_of(cqe, struct rpcrdma_frwr, fr_cqe);
482         struct rpcrdma_mr *mr = container_of(frwr, struct rpcrdma_mr, frwr);
483
484         /* WARNING: Only wr_cqe and status are reliable at this point */
485         trace_xprtrdma_wc_li(wc, frwr);
486         __frwr_release_mr(wc, mr);
487 }
488
489 /**
490  * frwr_wc_localinv_wake - Invoked by RDMA provider for a LOCAL_INV WC
491  * @cq: completion queue (ignored)
492  * @wc: completed WR
493  *
494  * Awaken anyone waiting for an MR to finish being fenced.
495  */
496 static void frwr_wc_localinv_wake(struct ib_cq *cq, struct ib_wc *wc)
497 {
498         struct ib_cqe *cqe = wc->wr_cqe;
499         struct rpcrdma_frwr *frwr =
500                 container_of(cqe, struct rpcrdma_frwr, fr_cqe);
501         struct rpcrdma_mr *mr = container_of(frwr, struct rpcrdma_mr, frwr);
502
503         /* WARNING: Only wr_cqe and status are reliable at this point */
504         trace_xprtrdma_wc_li_wake(wc, frwr);
505         complete(&frwr->fr_linv_done);
506         __frwr_release_mr(wc, mr);
507 }
508
509 /**
510  * frwr_unmap_sync - invalidate memory regions that were registered for @req
511  * @r_xprt: controlling transport instance
512  * @req: rpcrdma_req with a non-empty list of MRs to process
513  *
514  * Sleeps until it is safe for the host CPU to access the previously mapped
515  * memory regions. This guarantees that registered MRs are properly fenced
516  * from the server before the RPC consumer accesses the data in them. It
517  * also ensures proper Send flow control: waking the next RPC waits until
518  * this RPC has relinquished all its Send Queue entries.
519  */
520 void frwr_unmap_sync(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req)
521 {
522         struct ib_send_wr *first, **prev, *last;
523         const struct ib_send_wr *bad_wr;
524         struct rpcrdma_frwr *frwr;
525         struct rpcrdma_mr *mr;
526         int rc;
527
528         /* ORDER: Invalidate all of the MRs first
529          *
530          * Chain the LOCAL_INV Work Requests and post them with
531          * a single ib_post_send() call.
532          */
533         frwr = NULL;
534         prev = &first;
535         while (!list_empty(&req->rl_registered)) {
536                 mr = rpcrdma_mr_pop(&req->rl_registered);
537
538                 trace_xprtrdma_mr_localinv(mr);
539                 r_xprt->rx_stats.local_inv_needed++;
540
541                 frwr = &mr->frwr;
542                 frwr->fr_cqe.done = frwr_wc_localinv;
543                 last = &frwr->fr_invwr;
544                 last->next = NULL;
545                 last->wr_cqe = &frwr->fr_cqe;
546                 last->sg_list = NULL;
547                 last->num_sge = 0;
548                 last->opcode = IB_WR_LOCAL_INV;
549                 last->send_flags = IB_SEND_SIGNALED;
550                 last->ex.invalidate_rkey = mr->mr_handle;
551
552                 *prev = last;
553                 prev = &last->next;
554         }
555
556         /* Strong send queue ordering guarantees that when the
557          * last WR in the chain completes, all WRs in the chain
558          * are complete.
559          */
560         frwr->fr_cqe.done = frwr_wc_localinv_wake;
561         reinit_completion(&frwr->fr_linv_done);
562
563         /* Transport disconnect drains the receive CQ before it
564          * replaces the QP. The RPC reply handler won't call us
565          * unless ri_id->qp is a valid pointer.
566          */
567         bad_wr = NULL;
568         rc = ib_post_send(r_xprt->rx_ia.ri_id->qp, first, &bad_wr);
569         trace_xprtrdma_post_send(req, rc);
570
571         /* The final LOCAL_INV WR in the chain is supposed to
572          * do the wake. If it was never posted, the wake will
573          * not happen, so don't wait in that case.
574          */
575         if (bad_wr != first)
576                 wait_for_completion(&frwr->fr_linv_done);
577         if (!rc)
578                 return;
579
580         /* Recycle MRs in the LOCAL_INV chain that did not get posted.
581          */
582         while (bad_wr) {
583                 frwr = container_of(bad_wr, struct rpcrdma_frwr,
584                                     fr_invwr);
585                 mr = container_of(frwr, struct rpcrdma_mr, frwr);
586                 bad_wr = bad_wr->next;
587
588                 list_del_init(&mr->mr_list);
589                 rpcrdma_mr_recycle(mr);
590         }
591 }
592
593 /**
594  * frwr_wc_localinv_done - Invoked by RDMA provider for a signaled LOCAL_INV WC
595  * @cq: completion queue (ignored)
596  * @wc: completed WR
597  *
598  */
599 static void frwr_wc_localinv_done(struct ib_cq *cq, struct ib_wc *wc)
600 {
601         struct ib_cqe *cqe = wc->wr_cqe;
602         struct rpcrdma_frwr *frwr =
603                 container_of(cqe, struct rpcrdma_frwr, fr_cqe);
604         struct rpcrdma_mr *mr = container_of(frwr, struct rpcrdma_mr, frwr);
605
606         /* WARNING: Only wr_cqe and status are reliable at this point */
607         trace_xprtrdma_wc_li_done(wc, frwr);
608         rpcrdma_complete_rqst(frwr->fr_req->rl_reply);
609         __frwr_release_mr(wc, mr);
610 }
611
612 /**
613  * frwr_unmap_async - invalidate memory regions that were registered for @req
614  * @r_xprt: controlling transport instance
615  * @req: rpcrdma_req with a non-empty list of MRs to process
616  *
617  * This guarantees that registered MRs are properly fenced from the
618  * server before the RPC consumer accesses the data in them. It also
619  * ensures proper Send flow control: waking the next RPC waits until
620  * this RPC has relinquished all its Send Queue entries.
621  */
622 void frwr_unmap_async(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req)
623 {
624         struct ib_send_wr *first, *last, **prev;
625         const struct ib_send_wr *bad_wr;
626         struct rpcrdma_frwr *frwr;
627         struct rpcrdma_mr *mr;
628         int rc;
629
630         /* Chain the LOCAL_INV Work Requests and post them with
631          * a single ib_post_send() call.
632          */
633         frwr = NULL;
634         prev = &first;
635         while (!list_empty(&req->rl_registered)) {
636                 mr = rpcrdma_mr_pop(&req->rl_registered);
637
638                 trace_xprtrdma_mr_localinv(mr);
639                 r_xprt->rx_stats.local_inv_needed++;
640
641                 frwr = &mr->frwr;
642                 frwr->fr_cqe.done = frwr_wc_localinv;
643                 frwr->fr_req = req;
644                 last = &frwr->fr_invwr;
645                 last->next = NULL;
646                 last->wr_cqe = &frwr->fr_cqe;
647                 last->sg_list = NULL;
648                 last->num_sge = 0;
649                 last->opcode = IB_WR_LOCAL_INV;
650                 last->send_flags = IB_SEND_SIGNALED;
651                 last->ex.invalidate_rkey = mr->mr_handle;
652
653                 *prev = last;
654                 prev = &last->next;
655         }
656
657         /* Strong send queue ordering guarantees that when the
658          * last WR in the chain completes, all WRs in the chain
659          * are complete. The last completion will wake up the
660          * RPC waiter.
661          */
662         frwr->fr_cqe.done = frwr_wc_localinv_done;
663
664         /* Transport disconnect drains the receive CQ before it
665          * replaces the QP. The RPC reply handler won't call us
666          * unless ri_id->qp is a valid pointer.
667          */
668         bad_wr = NULL;
669         rc = ib_post_send(r_xprt->rx_ia.ri_id->qp, first, &bad_wr);
670         trace_xprtrdma_post_send(req, rc);
671         if (!rc)
672                 return;
673
674         /* Recycle MRs in the LOCAL_INV chain that did not get posted.
675          */
676         while (bad_wr) {
677                 frwr = container_of(bad_wr, struct rpcrdma_frwr, fr_invwr);
678                 mr = container_of(frwr, struct rpcrdma_mr, frwr);
679                 bad_wr = bad_wr->next;
680
681                 rpcrdma_mr_recycle(mr);
682         }
683
684         /* The final LOCAL_INV WR in the chain is supposed to
685          * do the wake. If it was never posted, the wake will
686          * not happen, so wake here in that case.
687          */
688         rpcrdma_complete_rqst(req->rl_reply);
689 }