]> asedeno.scripts.mit.edu Git - linux.git/blob - net/sunrpc/xprtrdma/svc_rdma_sendto.c
a7dc71daa776249541d4e838dfe4ee23df71e29e
[linux.git] / net / sunrpc / xprtrdma / svc_rdma_sendto.c
1 /*
2  * Copyright (c) 2014 Open Grid Computing, Inc. All rights reserved.
3  * Copyright (c) 2005-2006 Network Appliance, Inc. All rights reserved.
4  *
5  * This software is available to you under a choice of one of two
6  * licenses.  You may choose to be licensed under the terms of the GNU
7  * General Public License (GPL) Version 2, available from the file
8  * COPYING in the main directory of this source tree, or the BSD-type
9  * license below:
10  *
11  * Redistribution and use in source and binary forms, with or without
12  * modification, are permitted provided that the following conditions
13  * are met:
14  *
15  *      Redistributions of source code must retain the above copyright
16  *      notice, this list of conditions and the following disclaimer.
17  *
18  *      Redistributions in binary form must reproduce the above
19  *      copyright notice, this list of conditions and the following
20  *      disclaimer in the documentation and/or other materials provided
21  *      with the distribution.
22  *
23  *      Neither the name of the Network Appliance, Inc. nor the names of
24  *      its contributors may be used to endorse or promote products
25  *      derived from this software without specific prior written
26  *      permission.
27  *
28  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
29  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
30  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
31  * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
32  * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
33  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
34  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
35  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
36  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
37  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
38  * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
39  *
40  * Author: Tom Tucker <tom@opengridcomputing.com>
41  */
42
43 #include <linux/sunrpc/debug.h>
44 #include <linux/sunrpc/rpc_rdma.h>
45 #include <linux/spinlock.h>
46 #include <asm/unaligned.h>
47 #include <rdma/ib_verbs.h>
48 #include <rdma/rdma_cm.h>
49 #include <linux/sunrpc/svc_rdma.h>
50
51 #define RPCDBG_FACILITY RPCDBG_SVCXPRT
52
53 static u32 xdr_padsize(u32 len)
54 {
55         return (len & 3) ? (4 - (len & 3)) : 0;
56 }
57
58 int svc_rdma_map_xdr(struct svcxprt_rdma *xprt,
59                      struct xdr_buf *xdr,
60                      struct svc_rdma_req_map *vec,
61                      bool write_chunk_present)
62 {
63         int sge_no;
64         u32 sge_bytes;
65         u32 page_bytes;
66         u32 page_off;
67         int page_no;
68
69         if (xdr->len !=
70             (xdr->head[0].iov_len + xdr->page_len + xdr->tail[0].iov_len)) {
71                 pr_err("svcrdma: %s: XDR buffer length error\n", __func__);
72                 return -EIO;
73         }
74
75         /* Skip the first sge, this is for the RPCRDMA header */
76         sge_no = 1;
77
78         /* Head SGE */
79         vec->sge[sge_no].iov_base = xdr->head[0].iov_base;
80         vec->sge[sge_no].iov_len = xdr->head[0].iov_len;
81         sge_no++;
82
83         /* pages SGE */
84         page_no = 0;
85         page_bytes = xdr->page_len;
86         page_off = xdr->page_base;
87         while (page_bytes) {
88                 vec->sge[sge_no].iov_base =
89                         page_address(xdr->pages[page_no]) + page_off;
90                 sge_bytes = min_t(u32, page_bytes, (PAGE_SIZE - page_off));
91                 page_bytes -= sge_bytes;
92                 vec->sge[sge_no].iov_len = sge_bytes;
93
94                 sge_no++;
95                 page_no++;
96                 page_off = 0; /* reset for next time through loop */
97         }
98
99         /* Tail SGE */
100         if (xdr->tail[0].iov_len) {
101                 unsigned char *base = xdr->tail[0].iov_base;
102                 size_t len = xdr->tail[0].iov_len;
103                 u32 xdr_pad = xdr_padsize(xdr->page_len);
104
105                 if (write_chunk_present && xdr_pad) {
106                         base += xdr_pad;
107                         len -= xdr_pad;
108                 }
109
110                 if (len) {
111                         vec->sge[sge_no].iov_base = base;
112                         vec->sge[sge_no].iov_len = len;
113                         sge_no++;
114                 }
115         }
116
117         dprintk("svcrdma: %s: sge_no %d page_no %d "
118                 "page_base %u page_len %u head_len %zu tail_len %zu\n",
119                 __func__, sge_no, page_no, xdr->page_base, xdr->page_len,
120                 xdr->head[0].iov_len, xdr->tail[0].iov_len);
121
122         vec->count = sge_no;
123         return 0;
124 }
125
126 static dma_addr_t dma_map_xdr(struct svcxprt_rdma *xprt,
127                               struct xdr_buf *xdr,
128                               u32 xdr_off, size_t len, int dir)
129 {
130         struct page *page;
131         dma_addr_t dma_addr;
132         if (xdr_off < xdr->head[0].iov_len) {
133                 /* This offset is in the head */
134                 xdr_off += (unsigned long)xdr->head[0].iov_base & ~PAGE_MASK;
135                 page = virt_to_page(xdr->head[0].iov_base);
136         } else {
137                 xdr_off -= xdr->head[0].iov_len;
138                 if (xdr_off < xdr->page_len) {
139                         /* This offset is in the page list */
140                         xdr_off += xdr->page_base;
141                         page = xdr->pages[xdr_off >> PAGE_SHIFT];
142                         xdr_off &= ~PAGE_MASK;
143                 } else {
144                         /* This offset is in the tail */
145                         xdr_off -= xdr->page_len;
146                         xdr_off += (unsigned long)
147                                 xdr->tail[0].iov_base & ~PAGE_MASK;
148                         page = virt_to_page(xdr->tail[0].iov_base);
149                 }
150         }
151         dma_addr = ib_dma_map_page(xprt->sc_cm_id->device, page, xdr_off,
152                                    min_t(size_t, PAGE_SIZE, len), dir);
153         return dma_addr;
154 }
155
156 /* Parse the RPC Call's transport header.
157  */
158 static void svc_rdma_get_write_arrays(struct rpcrdma_msg *rmsgp,
159                                       struct rpcrdma_write_array **write,
160                                       struct rpcrdma_write_array **reply)
161 {
162         __be32 *p;
163
164         p = (__be32 *)&rmsgp->rm_body.rm_chunks[0];
165
166         /* Read list */
167         while (*p++ != xdr_zero)
168                 p += 5;
169
170         /* Write list */
171         if (*p != xdr_zero) {
172                 *write = (struct rpcrdma_write_array *)p;
173                 while (*p++ != xdr_zero)
174                         p += 1 + be32_to_cpu(*p) * 4;
175         } else {
176                 *write = NULL;
177                 p++;
178         }
179
180         /* Reply chunk */
181         if (*p != xdr_zero)
182                 *reply = (struct rpcrdma_write_array *)p;
183         else
184                 *reply = NULL;
185 }
186
187 /* RPC-over-RDMA Version One private extension: Remote Invalidation.
188  * Responder's choice: requester signals it can handle Send With
189  * Invalidate, and responder chooses one rkey to invalidate.
190  *
191  * Find a candidate rkey to invalidate when sending a reply.  Picks the
192  * first rkey it finds in the chunks lists.
193  *
194  * Returns zero if RPC's chunk lists are empty.
195  */
196 static u32 svc_rdma_get_inv_rkey(struct rpcrdma_msg *rdma_argp,
197                                  struct rpcrdma_write_array *wr_ary,
198                                  struct rpcrdma_write_array *rp_ary)
199 {
200         struct rpcrdma_read_chunk *rd_ary;
201         struct rpcrdma_segment *arg_ch;
202
203         rd_ary = (struct rpcrdma_read_chunk *)&rdma_argp->rm_body.rm_chunks[0];
204         if (rd_ary->rc_discrim != xdr_zero)
205                 return be32_to_cpu(rd_ary->rc_target.rs_handle);
206
207         if (wr_ary && be32_to_cpu(wr_ary->wc_nchunks)) {
208                 arg_ch = &wr_ary->wc_array[0].wc_target;
209                 return be32_to_cpu(arg_ch->rs_handle);
210         }
211
212         if (rp_ary && be32_to_cpu(rp_ary->wc_nchunks)) {
213                 arg_ch = &rp_ary->wc_array[0].wc_target;
214                 return be32_to_cpu(arg_ch->rs_handle);
215         }
216
217         return 0;
218 }
219
220 static int svc_rdma_dma_map_page(struct svcxprt_rdma *rdma,
221                                  struct svc_rdma_op_ctxt *ctxt,
222                                  unsigned int sge_no,
223                                  struct page *page,
224                                  unsigned int offset,
225                                  unsigned int len)
226 {
227         struct ib_device *dev = rdma->sc_cm_id->device;
228         dma_addr_t dma_addr;
229
230         dma_addr = ib_dma_map_page(dev, page, offset, len, DMA_TO_DEVICE);
231         if (ib_dma_mapping_error(dev, dma_addr))
232                 return -EIO;
233
234         ctxt->sge[sge_no].addr = dma_addr;
235         ctxt->sge[sge_no].length = len;
236         ctxt->sge[sge_no].lkey = rdma->sc_pd->local_dma_lkey;
237         svc_rdma_count_mappings(rdma, ctxt);
238         return 0;
239 }
240
241 /**
242  * svc_rdma_map_reply_hdr - DMA map the transport header buffer
243  * @rdma: controlling transport
244  * @ctxt: op_ctxt for the Send WR
245  * @rdma_resp: buffer containing transport header
246  * @len: length of transport header
247  *
248  * Returns:
249  *      %0 if the header is DMA mapped,
250  *      %-EIO if DMA mapping failed.
251  */
252 int svc_rdma_map_reply_hdr(struct svcxprt_rdma *rdma,
253                            struct svc_rdma_op_ctxt *ctxt,
254                            __be32 *rdma_resp,
255                            unsigned int len)
256 {
257         ctxt->direction = DMA_TO_DEVICE;
258         ctxt->pages[0] = virt_to_page(rdma_resp);
259         ctxt->count = 1;
260         return svc_rdma_dma_map_page(rdma, ctxt, 0, ctxt->pages[0], 0, len);
261 }
262
263 /* Assumptions:
264  * - The specified write_len can be represented in sc_max_sge * PAGE_SIZE
265  */
266 static int send_write(struct svcxprt_rdma *xprt, struct svc_rqst *rqstp,
267                       u32 rmr, u64 to,
268                       u32 xdr_off, int write_len,
269                       struct svc_rdma_req_map *vec)
270 {
271         struct ib_rdma_wr write_wr;
272         struct ib_sge *sge;
273         int xdr_sge_no;
274         int sge_no;
275         int sge_bytes;
276         int sge_off;
277         int bc;
278         struct svc_rdma_op_ctxt *ctxt;
279
280         if (vec->count > RPCSVC_MAXPAGES) {
281                 pr_err("svcrdma: Too many pages (%lu)\n", vec->count);
282                 return -EIO;
283         }
284
285         dprintk("svcrdma: RDMA_WRITE rmr=%x, to=%llx, xdr_off=%d, "
286                 "write_len=%d, vec->sge=%p, vec->count=%lu\n",
287                 rmr, (unsigned long long)to, xdr_off,
288                 write_len, vec->sge, vec->count);
289
290         ctxt = svc_rdma_get_context(xprt);
291         ctxt->direction = DMA_TO_DEVICE;
292         sge = ctxt->sge;
293
294         /* Find the SGE associated with xdr_off */
295         for (bc = xdr_off, xdr_sge_no = 1; bc && xdr_sge_no < vec->count;
296              xdr_sge_no++) {
297                 if (vec->sge[xdr_sge_no].iov_len > bc)
298                         break;
299                 bc -= vec->sge[xdr_sge_no].iov_len;
300         }
301
302         sge_off = bc;
303         bc = write_len;
304         sge_no = 0;
305
306         /* Copy the remaining SGE */
307         while (bc != 0) {
308                 sge_bytes = min_t(size_t,
309                           bc, vec->sge[xdr_sge_no].iov_len-sge_off);
310                 sge[sge_no].length = sge_bytes;
311                 sge[sge_no].addr =
312                         dma_map_xdr(xprt, &rqstp->rq_res, xdr_off,
313                                     sge_bytes, DMA_TO_DEVICE);
314                 xdr_off += sge_bytes;
315                 if (ib_dma_mapping_error(xprt->sc_cm_id->device,
316                                          sge[sge_no].addr))
317                         goto err;
318                 svc_rdma_count_mappings(xprt, ctxt);
319                 sge[sge_no].lkey = xprt->sc_pd->local_dma_lkey;
320                 ctxt->count++;
321                 sge_off = 0;
322                 sge_no++;
323                 xdr_sge_no++;
324                 if (xdr_sge_no > vec->count) {
325                         pr_err("svcrdma: Too many sges (%d)\n", xdr_sge_no);
326                         goto err;
327                 }
328                 bc -= sge_bytes;
329                 if (sge_no == xprt->sc_max_sge)
330                         break;
331         }
332
333         /* Prepare WRITE WR */
334         memset(&write_wr, 0, sizeof write_wr);
335         ctxt->cqe.done = svc_rdma_wc_write;
336         write_wr.wr.wr_cqe = &ctxt->cqe;
337         write_wr.wr.sg_list = &sge[0];
338         write_wr.wr.num_sge = sge_no;
339         write_wr.wr.opcode = IB_WR_RDMA_WRITE;
340         write_wr.wr.send_flags = IB_SEND_SIGNALED;
341         write_wr.rkey = rmr;
342         write_wr.remote_addr = to;
343
344         /* Post It */
345         atomic_inc(&rdma_stat_write);
346         if (svc_rdma_send(xprt, &write_wr.wr))
347                 goto err;
348         return write_len - bc;
349  err:
350         svc_rdma_unmap_dma(ctxt);
351         svc_rdma_put_context(ctxt, 0);
352         return -EIO;
353 }
354
355 noinline
356 static int send_write_chunks(struct svcxprt_rdma *xprt,
357                              struct rpcrdma_write_array *wr_ary,
358                              struct rpcrdma_msg *rdma_resp,
359                              struct svc_rqst *rqstp,
360                              struct svc_rdma_req_map *vec)
361 {
362         u32 xfer_len = rqstp->rq_res.page_len;
363         int write_len;
364         u32 xdr_off;
365         int chunk_off;
366         int chunk_no;
367         int nchunks;
368         struct rpcrdma_write_array *res_ary;
369         int ret;
370
371         res_ary = (struct rpcrdma_write_array *)
372                 &rdma_resp->rm_body.rm_chunks[1];
373
374         /* Write chunks start at the pagelist */
375         nchunks = be32_to_cpu(wr_ary->wc_nchunks);
376         for (xdr_off = rqstp->rq_res.head[0].iov_len, chunk_no = 0;
377              xfer_len && chunk_no < nchunks;
378              chunk_no++) {
379                 struct rpcrdma_segment *arg_ch;
380                 u64 rs_offset;
381
382                 arg_ch = &wr_ary->wc_array[chunk_no].wc_target;
383                 write_len = min(xfer_len, be32_to_cpu(arg_ch->rs_length));
384
385                 /* Prepare the response chunk given the length actually
386                  * written */
387                 xdr_decode_hyper((__be32 *)&arg_ch->rs_offset, &rs_offset);
388                 svc_rdma_xdr_encode_array_chunk(res_ary, chunk_no,
389                                                 arg_ch->rs_handle,
390                                                 arg_ch->rs_offset,
391                                                 write_len);
392                 chunk_off = 0;
393                 while (write_len) {
394                         ret = send_write(xprt, rqstp,
395                                          be32_to_cpu(arg_ch->rs_handle),
396                                          rs_offset + chunk_off,
397                                          xdr_off,
398                                          write_len,
399                                          vec);
400                         if (ret <= 0)
401                                 goto out_err;
402                         chunk_off += ret;
403                         xdr_off += ret;
404                         xfer_len -= ret;
405                         write_len -= ret;
406                 }
407         }
408         /* Update the req with the number of chunks actually used */
409         svc_rdma_xdr_encode_write_list(rdma_resp, chunk_no);
410
411         return rqstp->rq_res.page_len;
412
413 out_err:
414         pr_err("svcrdma: failed to send write chunks, rc=%d\n", ret);
415         return -EIO;
416 }
417
418 noinline
419 static int send_reply_chunks(struct svcxprt_rdma *xprt,
420                              struct rpcrdma_write_array *rp_ary,
421                              struct rpcrdma_msg *rdma_resp,
422                              struct svc_rqst *rqstp,
423                              struct svc_rdma_req_map *vec)
424 {
425         u32 xfer_len = rqstp->rq_res.len;
426         int write_len;
427         u32 xdr_off;
428         int chunk_no;
429         int chunk_off;
430         int nchunks;
431         struct rpcrdma_segment *ch;
432         struct rpcrdma_write_array *res_ary;
433         int ret;
434
435         /* XXX: need to fix when reply lists occur with read-list and or
436          * write-list */
437         res_ary = (struct rpcrdma_write_array *)
438                 &rdma_resp->rm_body.rm_chunks[2];
439
440         /* xdr offset starts at RPC message */
441         nchunks = be32_to_cpu(rp_ary->wc_nchunks);
442         for (xdr_off = 0, chunk_no = 0;
443              xfer_len && chunk_no < nchunks;
444              chunk_no++) {
445                 u64 rs_offset;
446                 ch = &rp_ary->wc_array[chunk_no].wc_target;
447                 write_len = min(xfer_len, be32_to_cpu(ch->rs_length));
448
449                 /* Prepare the reply chunk given the length actually
450                  * written */
451                 xdr_decode_hyper((__be32 *)&ch->rs_offset, &rs_offset);
452                 svc_rdma_xdr_encode_array_chunk(res_ary, chunk_no,
453                                                 ch->rs_handle, ch->rs_offset,
454                                                 write_len);
455                 chunk_off = 0;
456                 while (write_len) {
457                         ret = send_write(xprt, rqstp,
458                                          be32_to_cpu(ch->rs_handle),
459                                          rs_offset + chunk_off,
460                                          xdr_off,
461                                          write_len,
462                                          vec);
463                         if (ret <= 0)
464                                 goto out_err;
465                         chunk_off += ret;
466                         xdr_off += ret;
467                         xfer_len -= ret;
468                         write_len -= ret;
469                 }
470         }
471         /* Update the req with the number of chunks actually used */
472         svc_rdma_xdr_encode_reply_array(res_ary, chunk_no);
473
474         return rqstp->rq_res.len;
475
476 out_err:
477         pr_err("svcrdma: failed to send reply chunks, rc=%d\n", ret);
478         return -EIO;
479 }
480
481 /**
482  * svc_rdma_post_send_wr - Set up and post one Send Work Request
483  * @rdma: controlling transport
484  * @ctxt: op_ctxt for transmitting the Send WR
485  * @num_sge: number of SGEs to send
486  * @inv_rkey: R_key argument to Send With Invalidate, or zero
487  *
488  * Returns:
489  *      %0 if the Send* was posted successfully,
490  *      %-ENOTCONN if the connection was lost or dropped,
491  *      %-EINVAL if there was a problem with the Send we built,
492  *      %-ENOMEM if ib_post_send failed.
493  */
494 int svc_rdma_post_send_wr(struct svcxprt_rdma *rdma,
495                           struct svc_rdma_op_ctxt *ctxt, int num_sge,
496                           u32 inv_rkey)
497 {
498         struct ib_send_wr *send_wr = &ctxt->send_wr;
499
500         dprintk("svcrdma: posting Send WR with %u sge(s)\n", num_sge);
501
502         send_wr->next = NULL;
503         ctxt->cqe.done = svc_rdma_wc_send;
504         send_wr->wr_cqe = &ctxt->cqe;
505         send_wr->sg_list = ctxt->sge;
506         send_wr->num_sge = num_sge;
507         send_wr->send_flags = IB_SEND_SIGNALED;
508         if (inv_rkey) {
509                 send_wr->opcode = IB_WR_SEND_WITH_INV;
510                 send_wr->ex.invalidate_rkey = inv_rkey;
511         } else {
512                 send_wr->opcode = IB_WR_SEND;
513         }
514
515         return svc_rdma_send(rdma, send_wr);
516 }
517
518 /* This function prepares the portion of the RPCRDMA message to be
519  * sent in the RDMA_SEND. This function is called after data sent via
520  * RDMA has already been transmitted. There are three cases:
521  * - The RPCRDMA header, RPC header, and payload are all sent in a
522  *   single RDMA_SEND. This is the "inline" case.
523  * - The RPCRDMA header and some portion of the RPC header and data
524  *   are sent via this RDMA_SEND and another portion of the data is
525  *   sent via RDMA.
526  * - The RPCRDMA header [NOMSG] is sent in this RDMA_SEND and the RPC
527  *   header and data are all transmitted via RDMA.
528  * In all three cases, this function prepares the RPCRDMA header in
529  * sge[0], the 'type' parameter indicates the type to place in the
530  * RPCRDMA header, and the 'byte_count' field indicates how much of
531  * the XDR to include in this RDMA_SEND. NB: The offset of the payload
532  * to send is zero in the XDR.
533  */
534 static int send_reply(struct svcxprt_rdma *rdma,
535                       struct svc_rqst *rqstp,
536                       struct page *page,
537                       struct rpcrdma_msg *rdma_resp,
538                       struct svc_rdma_req_map *vec,
539                       int byte_count,
540                       u32 inv_rkey)
541 {
542         struct svc_rdma_op_ctxt *ctxt;
543         u32 xdr_off;
544         int sge_no;
545         int sge_bytes;
546         int page_no;
547         int pages;
548         int ret = -EIO;
549
550         /* Prepare the context */
551         ctxt = svc_rdma_get_context(rdma);
552         ctxt->direction = DMA_TO_DEVICE;
553         ctxt->pages[0] = page;
554         ctxt->count = 1;
555
556         /* Prepare the SGE for the RPCRDMA Header */
557         ctxt->sge[0].lkey = rdma->sc_pd->local_dma_lkey;
558         ctxt->sge[0].length =
559             svc_rdma_xdr_get_reply_hdr_len((__be32 *)rdma_resp);
560         ctxt->sge[0].addr =
561             ib_dma_map_page(rdma->sc_cm_id->device, page, 0,
562                             ctxt->sge[0].length, DMA_TO_DEVICE);
563         if (ib_dma_mapping_error(rdma->sc_cm_id->device, ctxt->sge[0].addr))
564                 goto err;
565         svc_rdma_count_mappings(rdma, ctxt);
566
567         ctxt->direction = DMA_TO_DEVICE;
568
569         /* Map the payload indicated by 'byte_count' */
570         xdr_off = 0;
571         for (sge_no = 1; byte_count && sge_no < vec->count; sge_no++) {
572                 sge_bytes = min_t(size_t, vec->sge[sge_no].iov_len, byte_count);
573                 byte_count -= sge_bytes;
574                 ctxt->sge[sge_no].addr =
575                         dma_map_xdr(rdma, &rqstp->rq_res, xdr_off,
576                                     sge_bytes, DMA_TO_DEVICE);
577                 xdr_off += sge_bytes;
578                 if (ib_dma_mapping_error(rdma->sc_cm_id->device,
579                                          ctxt->sge[sge_no].addr))
580                         goto err;
581                 svc_rdma_count_mappings(rdma, ctxt);
582                 ctxt->sge[sge_no].lkey = rdma->sc_pd->local_dma_lkey;
583                 ctxt->sge[sge_no].length = sge_bytes;
584         }
585         if (byte_count != 0) {
586                 pr_err("svcrdma: Could not map %d bytes\n", byte_count);
587                 goto err;
588         }
589
590         /* Save all respages in the ctxt and remove them from the
591          * respages array. They are our pages until the I/O
592          * completes.
593          */
594         pages = rqstp->rq_next_page - rqstp->rq_respages;
595         for (page_no = 0; page_no < pages; page_no++) {
596                 ctxt->pages[page_no+1] = rqstp->rq_respages[page_no];
597                 ctxt->count++;
598                 rqstp->rq_respages[page_no] = NULL;
599         }
600         rqstp->rq_next_page = rqstp->rq_respages + 1;
601
602         if (sge_no > rdma->sc_max_sge) {
603                 pr_err("svcrdma: Too many sges (%d)\n", sge_no);
604                 goto err;
605         }
606
607         ret = svc_rdma_post_send_wr(rdma, ctxt, sge_no, inv_rkey);
608         if (ret)
609                 goto err;
610
611         return 0;
612
613  err:
614         svc_rdma_unmap_dma(ctxt);
615         svc_rdma_put_context(ctxt, 1);
616         return ret;
617 }
618
619 void svc_rdma_prep_reply_hdr(struct svc_rqst *rqstp)
620 {
621 }
622
623 int svc_rdma_sendto(struct svc_rqst *rqstp)
624 {
625         struct svc_xprt *xprt = rqstp->rq_xprt;
626         struct svcxprt_rdma *rdma =
627                 container_of(xprt, struct svcxprt_rdma, sc_xprt);
628         struct rpcrdma_msg *rdma_argp;
629         struct rpcrdma_msg *rdma_resp;
630         struct rpcrdma_write_array *wr_ary, *rp_ary;
631         int ret;
632         int inline_bytes;
633         struct page *res_page;
634         struct svc_rdma_req_map *vec;
635         u32 inv_rkey;
636         __be32 *p;
637
638         dprintk("svcrdma: sending response for rqstp=%p\n", rqstp);
639
640         /* Get the RDMA request header. The receive logic always
641          * places this at the start of page 0.
642          */
643         rdma_argp = page_address(rqstp->rq_pages[0]);
644         svc_rdma_get_write_arrays(rdma_argp, &wr_ary, &rp_ary);
645
646         inv_rkey = 0;
647         if (rdma->sc_snd_w_inv)
648                 inv_rkey = svc_rdma_get_inv_rkey(rdma_argp, wr_ary, rp_ary);
649
650         /* Build an req vec for the XDR */
651         vec = svc_rdma_get_req_map(rdma);
652         ret = svc_rdma_map_xdr(rdma, &rqstp->rq_res, vec, wr_ary != NULL);
653         if (ret)
654                 goto err0;
655         inline_bytes = rqstp->rq_res.len;
656
657         /* Create the RDMA response header. xprt->xpt_mutex,
658          * acquired in svc_send(), serializes RPC replies. The
659          * code path below that inserts the credit grant value
660          * into each transport header runs only inside this
661          * critical section.
662          */
663         ret = -ENOMEM;
664         res_page = alloc_page(GFP_KERNEL);
665         if (!res_page)
666                 goto err0;
667         rdma_resp = page_address(res_page);
668
669         p = &rdma_resp->rm_xid;
670         *p++ = rdma_argp->rm_xid;
671         *p++ = rdma_argp->rm_vers;
672         *p++ = rdma->sc_fc_credits;
673         *p++ = rp_ary ? rdma_nomsg : rdma_msg;
674
675         /* Start with empty chunks */
676         *p++ = xdr_zero;
677         *p++ = xdr_zero;
678         *p   = xdr_zero;
679
680         /* Send any write-chunk data and build resp write-list */
681         if (wr_ary) {
682                 ret = send_write_chunks(rdma, wr_ary, rdma_resp, rqstp, vec);
683                 if (ret < 0)
684                         goto err1;
685                 inline_bytes -= ret + xdr_padsize(ret);
686         }
687
688         /* Send any reply-list data and update resp reply-list */
689         if (rp_ary) {
690                 ret = send_reply_chunks(rdma, rp_ary, rdma_resp, rqstp, vec);
691                 if (ret < 0)
692                         goto err1;
693                 inline_bytes -= ret;
694         }
695
696         /* Post a fresh Receive buffer _before_ sending the reply */
697         ret = svc_rdma_post_recv(rdma, GFP_KERNEL);
698         if (ret)
699                 goto err1;
700
701         ret = send_reply(rdma, rqstp, res_page, rdma_resp, vec,
702                          inline_bytes, inv_rkey);
703         if (ret < 0)
704                 goto err0;
705
706         svc_rdma_put_req_map(rdma, vec);
707         dprintk("svcrdma: send_reply returns %d\n", ret);
708         return ret;
709
710  err1:
711         put_page(res_page);
712  err0:
713         svc_rdma_put_req_map(rdma, vec);
714         pr_err("svcrdma: Could not send reply, err=%d. Closing transport.\n",
715                ret);
716         set_bit(XPT_CLOSE, &rdma->sc_xprt.xpt_flags);
717         return -ENOTCONN;
718 }
719
720 void svc_rdma_send_error(struct svcxprt_rdma *xprt, struct rpcrdma_msg *rmsgp,
721                          int status)
722 {
723         struct page *p;
724         struct svc_rdma_op_ctxt *ctxt;
725         enum rpcrdma_errcode err;
726         __be32 *va;
727         int length;
728         int ret;
729
730         ret = svc_rdma_repost_recv(xprt, GFP_KERNEL);
731         if (ret)
732                 return;
733
734         p = alloc_page(GFP_KERNEL);
735         if (!p)
736                 return;
737         va = page_address(p);
738
739         /* XDR encode an error reply */
740         err = ERR_CHUNK;
741         if (status == -EPROTONOSUPPORT)
742                 err = ERR_VERS;
743         length = svc_rdma_xdr_encode_error(xprt, rmsgp, err, va);
744
745         /* Map transport header; no RPC message payload */
746         ctxt = svc_rdma_get_context(xprt);
747         ret = svc_rdma_map_reply_hdr(xprt, ctxt, &rmsgp->rm_xid, length);
748         if (ret) {
749                 dprintk("svcrdma: Error %d mapping send for protocol error\n",
750                         ret);
751                 return;
752         }
753
754         ret = svc_rdma_post_send_wr(xprt, ctxt, 1, 0);
755         if (ret) {
756                 dprintk("svcrdma: Error %d posting send for protocol error\n",
757                         ret);
758                 svc_rdma_unmap_dma(ctxt);
759                 svc_rdma_put_context(ctxt, 1);
760         }
761 }