]> asedeno.scripts.mit.edu Git - linux.git/blob - net/sunrpc/xprt.c
642cc0f64e44cafaa1556e790090f168cb3ac00f
[linux.git] / net / sunrpc / xprt.c
1 /*
2  *  linux/net/sunrpc/xprt.c
3  *
4  *  This is a generic RPC call interface supporting congestion avoidance,
5  *  and asynchronous calls.
6  *
7  *  The interface works like this:
8  *
9  *  -   When a process places a call, it allocates a request slot if
10  *      one is available. Otherwise, it sleeps on the backlog queue
11  *      (xprt_reserve).
12  *  -   Next, the caller puts together the RPC message, stuffs it into
13  *      the request struct, and calls xprt_transmit().
14  *  -   xprt_transmit sends the message and installs the caller on the
15  *      transport's wait list. At the same time, if a reply is expected,
16  *      it installs a timer that is run after the packet's timeout has
17  *      expired.
18  *  -   When a packet arrives, the data_ready handler walks the list of
19  *      pending requests for that transport. If a matching XID is found, the
20  *      caller is woken up, and the timer removed.
21  *  -   When no reply arrives within the timeout interval, the timer is
22  *      fired by the kernel and runs xprt_timer(). It either adjusts the
23  *      timeout values (minor timeout) or wakes up the caller with a status
24  *      of -ETIMEDOUT.
25  *  -   When the caller receives a notification from RPC that a reply arrived,
26  *      it should release the RPC slot, and process the reply.
27  *      If the call timed out, it may choose to retry the operation by
28  *      adjusting the initial timeout value, and simply calling rpc_call
29  *      again.
30  *
31  *  Support for async RPC is done through a set of RPC-specific scheduling
32  *  primitives that `transparently' work for processes as well as async
33  *  tasks that rely on callbacks.
34  *
35  *  Copyright (C) 1995-1997, Olaf Kirch <okir@monad.swb.de>
36  *
37  *  Transport switch API copyright (C) 2005, Chuck Lever <cel@netapp.com>
38  */
39
40 #include <linux/module.h>
41
42 #include <linux/types.h>
43 #include <linux/interrupt.h>
44 #include <linux/workqueue.h>
45 #include <linux/net.h>
46 #include <linux/ktime.h>
47
48 #include <linux/sunrpc/clnt.h>
49 #include <linux/sunrpc/metrics.h>
50 #include <linux/sunrpc/bc_xprt.h>
51 #include <linux/rcupdate.h>
52 #include <linux/sched/mm.h>
53
54 #include <trace/events/sunrpc.h>
55
56 #include "sunrpc.h"
57
58 /*
59  * Local variables
60  */
61
62 #if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
63 # define RPCDBG_FACILITY        RPCDBG_XPRT
64 #endif
65
66 /*
67  * Local functions
68  */
69 static void      xprt_init(struct rpc_xprt *xprt, struct net *net);
70 static __be32   xprt_alloc_xid(struct rpc_xprt *xprt);
71 static void      xprt_destroy(struct rpc_xprt *xprt);
72
73 static DEFINE_SPINLOCK(xprt_list_lock);
74 static LIST_HEAD(xprt_list);
75
76 static unsigned long xprt_request_timeout(const struct rpc_rqst *req)
77 {
78         unsigned long timeout = jiffies + req->rq_timeout;
79
80         if (time_before(timeout, req->rq_majortimeo))
81                 return timeout;
82         return req->rq_majortimeo;
83 }
84
85 /**
86  * xprt_register_transport - register a transport implementation
87  * @transport: transport to register
88  *
89  * If a transport implementation is loaded as a kernel module, it can
90  * call this interface to make itself known to the RPC client.
91  *
92  * Returns:
93  * 0:           transport successfully registered
94  * -EEXIST:     transport already registered
95  * -EINVAL:     transport module being unloaded
96  */
97 int xprt_register_transport(struct xprt_class *transport)
98 {
99         struct xprt_class *t;
100         int result;
101
102         result = -EEXIST;
103         spin_lock(&xprt_list_lock);
104         list_for_each_entry(t, &xprt_list, list) {
105                 /* don't register the same transport class twice */
106                 if (t->ident == transport->ident)
107                         goto out;
108         }
109
110         list_add_tail(&transport->list, &xprt_list);
111         printk(KERN_INFO "RPC: Registered %s transport module.\n",
112                transport->name);
113         result = 0;
114
115 out:
116         spin_unlock(&xprt_list_lock);
117         return result;
118 }
119 EXPORT_SYMBOL_GPL(xprt_register_transport);
120
121 /**
122  * xprt_unregister_transport - unregister a transport implementation
123  * @transport: transport to unregister
124  *
125  * Returns:
126  * 0:           transport successfully unregistered
127  * -ENOENT:     transport never registered
128  */
129 int xprt_unregister_transport(struct xprt_class *transport)
130 {
131         struct xprt_class *t;
132         int result;
133
134         result = 0;
135         spin_lock(&xprt_list_lock);
136         list_for_each_entry(t, &xprt_list, list) {
137                 if (t == transport) {
138                         printk(KERN_INFO
139                                 "RPC: Unregistered %s transport module.\n",
140                                 transport->name);
141                         list_del_init(&transport->list);
142                         goto out;
143                 }
144         }
145         result = -ENOENT;
146
147 out:
148         spin_unlock(&xprt_list_lock);
149         return result;
150 }
151 EXPORT_SYMBOL_GPL(xprt_unregister_transport);
152
153 /**
154  * xprt_load_transport - load a transport implementation
155  * @transport_name: transport to load
156  *
157  * Returns:
158  * 0:           transport successfully loaded
159  * -ENOENT:     transport module not available
160  */
161 int xprt_load_transport(const char *transport_name)
162 {
163         struct xprt_class *t;
164         int result;
165
166         result = 0;
167         spin_lock(&xprt_list_lock);
168         list_for_each_entry(t, &xprt_list, list) {
169                 if (strcmp(t->name, transport_name) == 0) {
170                         spin_unlock(&xprt_list_lock);
171                         goto out;
172                 }
173         }
174         spin_unlock(&xprt_list_lock);
175         result = request_module("xprt%s", transport_name);
176 out:
177         return result;
178 }
179 EXPORT_SYMBOL_GPL(xprt_load_transport);
180
181 static void xprt_clear_locked(struct rpc_xprt *xprt)
182 {
183         xprt->snd_task = NULL;
184         if (!test_bit(XPRT_CLOSE_WAIT, &xprt->state)) {
185                 smp_mb__before_atomic();
186                 clear_bit(XPRT_LOCKED, &xprt->state);
187                 smp_mb__after_atomic();
188         } else
189                 queue_work(xprtiod_workqueue, &xprt->task_cleanup);
190 }
191
192 /**
193  * xprt_reserve_xprt - serialize write access to transports
194  * @task: task that is requesting access to the transport
195  * @xprt: pointer to the target transport
196  *
197  * This prevents mixing the payload of separate requests, and prevents
198  * transport connects from colliding with writes.  No congestion control
199  * is provided.
200  */
201 int xprt_reserve_xprt(struct rpc_xprt *xprt, struct rpc_task *task)
202 {
203         struct rpc_rqst *req = task->tk_rqstp;
204
205         if (test_and_set_bit(XPRT_LOCKED, &xprt->state)) {
206                 if (task == xprt->snd_task)
207                         return 1;
208                 goto out_sleep;
209         }
210         if (test_bit(XPRT_WRITE_SPACE, &xprt->state))
211                 goto out_unlock;
212         xprt->snd_task = task;
213
214         return 1;
215
216 out_unlock:
217         xprt_clear_locked(xprt);
218 out_sleep:
219         dprintk("RPC: %5u failed to lock transport %p\n",
220                         task->tk_pid, xprt);
221         task->tk_status = -EAGAIN;
222         if  (RPC_IS_SOFT(task))
223                 rpc_sleep_on_timeout(&xprt->sending, task, NULL,
224                                 xprt_request_timeout(req));
225         else
226                 rpc_sleep_on(&xprt->sending, task, NULL);
227         return 0;
228 }
229 EXPORT_SYMBOL_GPL(xprt_reserve_xprt);
230
231 static bool
232 xprt_need_congestion_window_wait(struct rpc_xprt *xprt)
233 {
234         return test_bit(XPRT_CWND_WAIT, &xprt->state);
235 }
236
237 static void
238 xprt_set_congestion_window_wait(struct rpc_xprt *xprt)
239 {
240         if (!list_empty(&xprt->xmit_queue)) {
241                 /* Peek at head of queue to see if it can make progress */
242                 if (list_first_entry(&xprt->xmit_queue, struct rpc_rqst,
243                                         rq_xmit)->rq_cong)
244                         return;
245         }
246         set_bit(XPRT_CWND_WAIT, &xprt->state);
247 }
248
249 static void
250 xprt_test_and_clear_congestion_window_wait(struct rpc_xprt *xprt)
251 {
252         if (!RPCXPRT_CONGESTED(xprt))
253                 clear_bit(XPRT_CWND_WAIT, &xprt->state);
254 }
255
256 /*
257  * xprt_reserve_xprt_cong - serialize write access to transports
258  * @task: task that is requesting access to the transport
259  *
260  * Same as xprt_reserve_xprt, but Van Jacobson congestion control is
261  * integrated into the decision of whether a request is allowed to be
262  * woken up and given access to the transport.
263  * Note that the lock is only granted if we know there are free slots.
264  */
265 int xprt_reserve_xprt_cong(struct rpc_xprt *xprt, struct rpc_task *task)
266 {
267         struct rpc_rqst *req = task->tk_rqstp;
268
269         if (test_and_set_bit(XPRT_LOCKED, &xprt->state)) {
270                 if (task == xprt->snd_task)
271                         return 1;
272                 goto out_sleep;
273         }
274         if (req == NULL) {
275                 xprt->snd_task = task;
276                 return 1;
277         }
278         if (test_bit(XPRT_WRITE_SPACE, &xprt->state))
279                 goto out_unlock;
280         if (!xprt_need_congestion_window_wait(xprt)) {
281                 xprt->snd_task = task;
282                 return 1;
283         }
284 out_unlock:
285         xprt_clear_locked(xprt);
286 out_sleep:
287         dprintk("RPC: %5u failed to lock transport %p\n", task->tk_pid, xprt);
288         task->tk_status = -EAGAIN;
289         if (RPC_IS_SOFT(task))
290                 rpc_sleep_on_timeout(&xprt->sending, task, NULL,
291                                 xprt_request_timeout(req));
292         else
293                 rpc_sleep_on(&xprt->sending, task, NULL);
294         return 0;
295 }
296 EXPORT_SYMBOL_GPL(xprt_reserve_xprt_cong);
297
298 static inline int xprt_lock_write(struct rpc_xprt *xprt, struct rpc_task *task)
299 {
300         int retval;
301
302         if (test_bit(XPRT_LOCKED, &xprt->state) && xprt->snd_task == task)
303                 return 1;
304         spin_lock_bh(&xprt->transport_lock);
305         retval = xprt->ops->reserve_xprt(xprt, task);
306         spin_unlock_bh(&xprt->transport_lock);
307         return retval;
308 }
309
310 static bool __xprt_lock_write_func(struct rpc_task *task, void *data)
311 {
312         struct rpc_xprt *xprt = data;
313
314         xprt->snd_task = task;
315         return true;
316 }
317
318 static void __xprt_lock_write_next(struct rpc_xprt *xprt)
319 {
320         if (test_and_set_bit(XPRT_LOCKED, &xprt->state))
321                 return;
322         if (test_bit(XPRT_WRITE_SPACE, &xprt->state))
323                 goto out_unlock;
324         if (rpc_wake_up_first_on_wq(xprtiod_workqueue, &xprt->sending,
325                                 __xprt_lock_write_func, xprt))
326                 return;
327 out_unlock:
328         xprt_clear_locked(xprt);
329 }
330
331 static void __xprt_lock_write_next_cong(struct rpc_xprt *xprt)
332 {
333         if (test_and_set_bit(XPRT_LOCKED, &xprt->state))
334                 return;
335         if (test_bit(XPRT_WRITE_SPACE, &xprt->state))
336                 goto out_unlock;
337         if (xprt_need_congestion_window_wait(xprt))
338                 goto out_unlock;
339         if (rpc_wake_up_first_on_wq(xprtiod_workqueue, &xprt->sending,
340                                 __xprt_lock_write_func, xprt))
341                 return;
342 out_unlock:
343         xprt_clear_locked(xprt);
344 }
345
346 /**
347  * xprt_release_xprt - allow other requests to use a transport
348  * @xprt: transport with other tasks potentially waiting
349  * @task: task that is releasing access to the transport
350  *
351  * Note that "task" can be NULL.  No congestion control is provided.
352  */
353 void xprt_release_xprt(struct rpc_xprt *xprt, struct rpc_task *task)
354 {
355         if (xprt->snd_task == task) {
356                 xprt_clear_locked(xprt);
357                 __xprt_lock_write_next(xprt);
358         }
359 }
360 EXPORT_SYMBOL_GPL(xprt_release_xprt);
361
362 /**
363  * xprt_release_xprt_cong - allow other requests to use a transport
364  * @xprt: transport with other tasks potentially waiting
365  * @task: task that is releasing access to the transport
366  *
367  * Note that "task" can be NULL.  Another task is awoken to use the
368  * transport if the transport's congestion window allows it.
369  */
370 void xprt_release_xprt_cong(struct rpc_xprt *xprt, struct rpc_task *task)
371 {
372         if (xprt->snd_task == task) {
373                 xprt_clear_locked(xprt);
374                 __xprt_lock_write_next_cong(xprt);
375         }
376 }
377 EXPORT_SYMBOL_GPL(xprt_release_xprt_cong);
378
379 static inline void xprt_release_write(struct rpc_xprt *xprt, struct rpc_task *task)
380 {
381         if (xprt->snd_task != task)
382                 return;
383         spin_lock_bh(&xprt->transport_lock);
384         xprt->ops->release_xprt(xprt, task);
385         spin_unlock_bh(&xprt->transport_lock);
386 }
387
388 /*
389  * Van Jacobson congestion avoidance. Check if the congestion window
390  * overflowed. Put the task to sleep if this is the case.
391  */
392 static int
393 __xprt_get_cong(struct rpc_xprt *xprt, struct rpc_rqst *req)
394 {
395         if (req->rq_cong)
396                 return 1;
397         dprintk("RPC: %5u xprt_cwnd_limited cong = %lu cwnd = %lu\n",
398                         req->rq_task->tk_pid, xprt->cong, xprt->cwnd);
399         if (RPCXPRT_CONGESTED(xprt)) {
400                 xprt_set_congestion_window_wait(xprt);
401                 return 0;
402         }
403         req->rq_cong = 1;
404         xprt->cong += RPC_CWNDSCALE;
405         return 1;
406 }
407
408 /*
409  * Adjust the congestion window, and wake up the next task
410  * that has been sleeping due to congestion
411  */
412 static void
413 __xprt_put_cong(struct rpc_xprt *xprt, struct rpc_rqst *req)
414 {
415         if (!req->rq_cong)
416                 return;
417         req->rq_cong = 0;
418         xprt->cong -= RPC_CWNDSCALE;
419         xprt_test_and_clear_congestion_window_wait(xprt);
420         __xprt_lock_write_next_cong(xprt);
421 }
422
423 /**
424  * xprt_request_get_cong - Request congestion control credits
425  * @xprt: pointer to transport
426  * @req: pointer to RPC request
427  *
428  * Useful for transports that require congestion control.
429  */
430 bool
431 xprt_request_get_cong(struct rpc_xprt *xprt, struct rpc_rqst *req)
432 {
433         bool ret = false;
434
435         if (req->rq_cong)
436                 return true;
437         spin_lock_bh(&xprt->transport_lock);
438         ret = __xprt_get_cong(xprt, req) != 0;
439         spin_unlock_bh(&xprt->transport_lock);
440         return ret;
441 }
442 EXPORT_SYMBOL_GPL(xprt_request_get_cong);
443
444 /**
445  * xprt_release_rqst_cong - housekeeping when request is complete
446  * @task: RPC request that recently completed
447  *
448  * Useful for transports that require congestion control.
449  */
450 void xprt_release_rqst_cong(struct rpc_task *task)
451 {
452         struct rpc_rqst *req = task->tk_rqstp;
453
454         __xprt_put_cong(req->rq_xprt, req);
455 }
456 EXPORT_SYMBOL_GPL(xprt_release_rqst_cong);
457
458 /*
459  * Clear the congestion window wait flag and wake up the next
460  * entry on xprt->sending
461  */
462 static void
463 xprt_clear_congestion_window_wait(struct rpc_xprt *xprt)
464 {
465         if (test_and_clear_bit(XPRT_CWND_WAIT, &xprt->state)) {
466                 spin_lock_bh(&xprt->transport_lock);
467                 __xprt_lock_write_next_cong(xprt);
468                 spin_unlock_bh(&xprt->transport_lock);
469         }
470 }
471
472 /**
473  * xprt_adjust_cwnd - adjust transport congestion window
474  * @xprt: pointer to xprt
475  * @task: recently completed RPC request used to adjust window
476  * @result: result code of completed RPC request
477  *
478  * The transport code maintains an estimate on the maximum number of out-
479  * standing RPC requests, using a smoothed version of the congestion
480  * avoidance implemented in 44BSD. This is basically the Van Jacobson
481  * congestion algorithm: If a retransmit occurs, the congestion window is
482  * halved; otherwise, it is incremented by 1/cwnd when
483  *
484  *      -       a reply is received and
485  *      -       a full number of requests are outstanding and
486  *      -       the congestion window hasn't been updated recently.
487  */
488 void xprt_adjust_cwnd(struct rpc_xprt *xprt, struct rpc_task *task, int result)
489 {
490         struct rpc_rqst *req = task->tk_rqstp;
491         unsigned long cwnd = xprt->cwnd;
492
493         if (result >= 0 && cwnd <= xprt->cong) {
494                 /* The (cwnd >> 1) term makes sure
495                  * the result gets rounded properly. */
496                 cwnd += (RPC_CWNDSCALE * RPC_CWNDSCALE + (cwnd >> 1)) / cwnd;
497                 if (cwnd > RPC_MAXCWND(xprt))
498                         cwnd = RPC_MAXCWND(xprt);
499                 __xprt_lock_write_next_cong(xprt);
500         } else if (result == -ETIMEDOUT) {
501                 cwnd >>= 1;
502                 if (cwnd < RPC_CWNDSCALE)
503                         cwnd = RPC_CWNDSCALE;
504         }
505         dprintk("RPC:       cong %ld, cwnd was %ld, now %ld\n",
506                         xprt->cong, xprt->cwnd, cwnd);
507         xprt->cwnd = cwnd;
508         __xprt_put_cong(xprt, req);
509 }
510 EXPORT_SYMBOL_GPL(xprt_adjust_cwnd);
511
512 /**
513  * xprt_wake_pending_tasks - wake all tasks on a transport's pending queue
514  * @xprt: transport with waiting tasks
515  * @status: result code to plant in each task before waking it
516  *
517  */
518 void xprt_wake_pending_tasks(struct rpc_xprt *xprt, int status)
519 {
520         if (status < 0)
521                 rpc_wake_up_status(&xprt->pending, status);
522         else
523                 rpc_wake_up(&xprt->pending);
524 }
525 EXPORT_SYMBOL_GPL(xprt_wake_pending_tasks);
526
527 /**
528  * xprt_wait_for_buffer_space - wait for transport output buffer to clear
529  * @xprt: transport
530  *
531  * Note that we only set the timer for the case of RPC_IS_SOFT(), since
532  * we don't in general want to force a socket disconnection due to
533  * an incomplete RPC call transmission.
534  */
535 void xprt_wait_for_buffer_space(struct rpc_xprt *xprt)
536 {
537         set_bit(XPRT_WRITE_SPACE, &xprt->state);
538 }
539 EXPORT_SYMBOL_GPL(xprt_wait_for_buffer_space);
540
541 static bool
542 xprt_clear_write_space_locked(struct rpc_xprt *xprt)
543 {
544         if (test_and_clear_bit(XPRT_WRITE_SPACE, &xprt->state)) {
545                 __xprt_lock_write_next(xprt);
546                 dprintk("RPC:       write space: waking waiting task on "
547                                 "xprt %p\n", xprt);
548                 return true;
549         }
550         return false;
551 }
552
553 /**
554  * xprt_write_space - wake the task waiting for transport output buffer space
555  * @xprt: transport with waiting tasks
556  *
557  * Can be called in a soft IRQ context, so xprt_write_space never sleeps.
558  */
559 bool xprt_write_space(struct rpc_xprt *xprt)
560 {
561         bool ret;
562
563         if (!test_bit(XPRT_WRITE_SPACE, &xprt->state))
564                 return false;
565         spin_lock_bh(&xprt->transport_lock);
566         ret = xprt_clear_write_space_locked(xprt);
567         spin_unlock_bh(&xprt->transport_lock);
568         return ret;
569 }
570 EXPORT_SYMBOL_GPL(xprt_write_space);
571
572 static void xprt_reset_majortimeo(struct rpc_rqst *req)
573 {
574         const struct rpc_timeout *to = req->rq_task->tk_client->cl_timeout;
575
576         req->rq_majortimeo = req->rq_timeout;
577         if (to->to_exponential)
578                 req->rq_majortimeo <<= to->to_retries;
579         else
580                 req->rq_majortimeo += to->to_increment * to->to_retries;
581         if (req->rq_majortimeo > to->to_maxval || req->rq_majortimeo == 0)
582                 req->rq_majortimeo = to->to_maxval;
583         req->rq_majortimeo += jiffies;
584 }
585
586 /**
587  * xprt_adjust_timeout - adjust timeout values for next retransmit
588  * @req: RPC request containing parameters to use for the adjustment
589  *
590  */
591 int xprt_adjust_timeout(struct rpc_rqst *req)
592 {
593         struct rpc_xprt *xprt = req->rq_xprt;
594         const struct rpc_timeout *to = req->rq_task->tk_client->cl_timeout;
595         int status = 0;
596
597         if (time_before(jiffies, req->rq_majortimeo)) {
598                 if (to->to_exponential)
599                         req->rq_timeout <<= 1;
600                 else
601                         req->rq_timeout += to->to_increment;
602                 if (to->to_maxval && req->rq_timeout >= to->to_maxval)
603                         req->rq_timeout = to->to_maxval;
604                 req->rq_retries++;
605         } else {
606                 req->rq_timeout = to->to_initval;
607                 req->rq_retries = 0;
608                 xprt_reset_majortimeo(req);
609                 /* Reset the RTT counters == "slow start" */
610                 spin_lock_bh(&xprt->transport_lock);
611                 rpc_init_rtt(req->rq_task->tk_client->cl_rtt, to->to_initval);
612                 spin_unlock_bh(&xprt->transport_lock);
613                 status = -ETIMEDOUT;
614         }
615
616         if (req->rq_timeout == 0) {
617                 printk(KERN_WARNING "xprt_adjust_timeout: rq_timeout = 0!\n");
618                 req->rq_timeout = 5 * HZ;
619         }
620         return status;
621 }
622
623 static void xprt_autoclose(struct work_struct *work)
624 {
625         struct rpc_xprt *xprt =
626                 container_of(work, struct rpc_xprt, task_cleanup);
627         unsigned int pflags = memalloc_nofs_save();
628
629         clear_bit(XPRT_CLOSE_WAIT, &xprt->state);
630         xprt->ops->close(xprt);
631         xprt_release_write(xprt, NULL);
632         wake_up_bit(&xprt->state, XPRT_LOCKED);
633         memalloc_nofs_restore(pflags);
634 }
635
636 /**
637  * xprt_disconnect_done - mark a transport as disconnected
638  * @xprt: transport to flag for disconnect
639  *
640  */
641 void xprt_disconnect_done(struct rpc_xprt *xprt)
642 {
643         dprintk("RPC:       disconnected transport %p\n", xprt);
644         spin_lock_bh(&xprt->transport_lock);
645         xprt_clear_connected(xprt);
646         xprt_clear_write_space_locked(xprt);
647         xprt_wake_pending_tasks(xprt, -ENOTCONN);
648         spin_unlock_bh(&xprt->transport_lock);
649 }
650 EXPORT_SYMBOL_GPL(xprt_disconnect_done);
651
652 /**
653  * xprt_force_disconnect - force a transport to disconnect
654  * @xprt: transport to disconnect
655  *
656  */
657 void xprt_force_disconnect(struct rpc_xprt *xprt)
658 {
659         /* Don't race with the test_bit() in xprt_clear_locked() */
660         spin_lock_bh(&xprt->transport_lock);
661         set_bit(XPRT_CLOSE_WAIT, &xprt->state);
662         /* Try to schedule an autoclose RPC call */
663         if (test_and_set_bit(XPRT_LOCKED, &xprt->state) == 0)
664                 queue_work(xprtiod_workqueue, &xprt->task_cleanup);
665         else if (xprt->snd_task)
666                 rpc_wake_up_queued_task_set_status(&xprt->pending,
667                                 xprt->snd_task, -ENOTCONN);
668         spin_unlock_bh(&xprt->transport_lock);
669 }
670 EXPORT_SYMBOL_GPL(xprt_force_disconnect);
671
672 static unsigned int
673 xprt_connect_cookie(struct rpc_xprt *xprt)
674 {
675         return READ_ONCE(xprt->connect_cookie);
676 }
677
678 static bool
679 xprt_request_retransmit_after_disconnect(struct rpc_task *task)
680 {
681         struct rpc_rqst *req = task->tk_rqstp;
682         struct rpc_xprt *xprt = req->rq_xprt;
683
684         return req->rq_connect_cookie != xprt_connect_cookie(xprt) ||
685                 !xprt_connected(xprt);
686 }
687
688 /**
689  * xprt_conditional_disconnect - force a transport to disconnect
690  * @xprt: transport to disconnect
691  * @cookie: 'connection cookie'
692  *
693  * This attempts to break the connection if and only if 'cookie' matches
694  * the current transport 'connection cookie'. It ensures that we don't
695  * try to break the connection more than once when we need to retransmit
696  * a batch of RPC requests.
697  *
698  */
699 void xprt_conditional_disconnect(struct rpc_xprt *xprt, unsigned int cookie)
700 {
701         /* Don't race with the test_bit() in xprt_clear_locked() */
702         spin_lock_bh(&xprt->transport_lock);
703         if (cookie != xprt->connect_cookie)
704                 goto out;
705         if (test_bit(XPRT_CLOSING, &xprt->state))
706                 goto out;
707         set_bit(XPRT_CLOSE_WAIT, &xprt->state);
708         /* Try to schedule an autoclose RPC call */
709         if (test_and_set_bit(XPRT_LOCKED, &xprt->state) == 0)
710                 queue_work(xprtiod_workqueue, &xprt->task_cleanup);
711         xprt_wake_pending_tasks(xprt, -EAGAIN);
712 out:
713         spin_unlock_bh(&xprt->transport_lock);
714 }
715
716 static bool
717 xprt_has_timer(const struct rpc_xprt *xprt)
718 {
719         return xprt->idle_timeout != 0;
720 }
721
722 static void
723 xprt_schedule_autodisconnect(struct rpc_xprt *xprt)
724         __must_hold(&xprt->transport_lock)
725 {
726         if (RB_EMPTY_ROOT(&xprt->recv_queue) && xprt_has_timer(xprt))
727                 mod_timer(&xprt->timer, xprt->last_used + xprt->idle_timeout);
728 }
729
730 static void
731 xprt_init_autodisconnect(struct timer_list *t)
732 {
733         struct rpc_xprt *xprt = from_timer(xprt, t, timer);
734
735         spin_lock(&xprt->transport_lock);
736         if (!RB_EMPTY_ROOT(&xprt->recv_queue))
737                 goto out_abort;
738         /* Reset xprt->last_used to avoid connect/autodisconnect cycling */
739         xprt->last_used = jiffies;
740         if (test_and_set_bit(XPRT_LOCKED, &xprt->state))
741                 goto out_abort;
742         spin_unlock(&xprt->transport_lock);
743         queue_work(xprtiod_workqueue, &xprt->task_cleanup);
744         return;
745 out_abort:
746         spin_unlock(&xprt->transport_lock);
747 }
748
749 bool xprt_lock_connect(struct rpc_xprt *xprt,
750                 struct rpc_task *task,
751                 void *cookie)
752 {
753         bool ret = false;
754
755         spin_lock_bh(&xprt->transport_lock);
756         if (!test_bit(XPRT_LOCKED, &xprt->state))
757                 goto out;
758         if (xprt->snd_task != task)
759                 goto out;
760         xprt->snd_task = cookie;
761         ret = true;
762 out:
763         spin_unlock_bh(&xprt->transport_lock);
764         return ret;
765 }
766
767 void xprt_unlock_connect(struct rpc_xprt *xprt, void *cookie)
768 {
769         spin_lock_bh(&xprt->transport_lock);
770         if (xprt->snd_task != cookie)
771                 goto out;
772         if (!test_bit(XPRT_LOCKED, &xprt->state))
773                 goto out;
774         xprt->snd_task =NULL;
775         xprt->ops->release_xprt(xprt, NULL);
776         xprt_schedule_autodisconnect(xprt);
777 out:
778         spin_unlock_bh(&xprt->transport_lock);
779         wake_up_bit(&xprt->state, XPRT_LOCKED);
780 }
781
782 /**
783  * xprt_connect - schedule a transport connect operation
784  * @task: RPC task that is requesting the connect
785  *
786  */
787 void xprt_connect(struct rpc_task *task)
788 {
789         struct rpc_xprt *xprt = task->tk_rqstp->rq_xprt;
790
791         dprintk("RPC: %5u xprt_connect xprt %p %s connected\n", task->tk_pid,
792                         xprt, (xprt_connected(xprt) ? "is" : "is not"));
793
794         if (!xprt_bound(xprt)) {
795                 task->tk_status = -EAGAIN;
796                 return;
797         }
798         if (!xprt_lock_write(xprt, task))
799                 return;
800
801         if (test_and_clear_bit(XPRT_CLOSE_WAIT, &xprt->state))
802                 xprt->ops->close(xprt);
803
804         if (!xprt_connected(xprt)) {
805                 task->tk_rqstp->rq_connect_cookie = xprt->connect_cookie;
806                 rpc_sleep_on_timeout(&xprt->pending, task, NULL,
807                                 xprt_request_timeout(task->tk_rqstp));
808
809                 if (test_bit(XPRT_CLOSING, &xprt->state))
810                         return;
811                 if (xprt_test_and_set_connecting(xprt))
812                         return;
813                 /* Race breaker */
814                 if (!xprt_connected(xprt)) {
815                         xprt->stat.connect_start = jiffies;
816                         xprt->ops->connect(xprt, task);
817                 } else {
818                         xprt_clear_connecting(xprt);
819                         task->tk_status = 0;
820                         rpc_wake_up_queued_task(&xprt->pending, task);
821                 }
822         }
823         xprt_release_write(xprt, task);
824 }
825
826 enum xprt_xid_rb_cmp {
827         XID_RB_EQUAL,
828         XID_RB_LEFT,
829         XID_RB_RIGHT,
830 };
831 static enum xprt_xid_rb_cmp
832 xprt_xid_cmp(__be32 xid1, __be32 xid2)
833 {
834         if (xid1 == xid2)
835                 return XID_RB_EQUAL;
836         if ((__force u32)xid1 < (__force u32)xid2)
837                 return XID_RB_LEFT;
838         return XID_RB_RIGHT;
839 }
840
841 static struct rpc_rqst *
842 xprt_request_rb_find(struct rpc_xprt *xprt, __be32 xid)
843 {
844         struct rb_node *n = xprt->recv_queue.rb_node;
845         struct rpc_rqst *req;
846
847         while (n != NULL) {
848                 req = rb_entry(n, struct rpc_rqst, rq_recv);
849                 switch (xprt_xid_cmp(xid, req->rq_xid)) {
850                 case XID_RB_LEFT:
851                         n = n->rb_left;
852                         break;
853                 case XID_RB_RIGHT:
854                         n = n->rb_right;
855                         break;
856                 case XID_RB_EQUAL:
857                         return req;
858                 }
859         }
860         return NULL;
861 }
862
863 static void
864 xprt_request_rb_insert(struct rpc_xprt *xprt, struct rpc_rqst *new)
865 {
866         struct rb_node **p = &xprt->recv_queue.rb_node;
867         struct rb_node *n = NULL;
868         struct rpc_rqst *req;
869
870         while (*p != NULL) {
871                 n = *p;
872                 req = rb_entry(n, struct rpc_rqst, rq_recv);
873                 switch(xprt_xid_cmp(new->rq_xid, req->rq_xid)) {
874                 case XID_RB_LEFT:
875                         p = &n->rb_left;
876                         break;
877                 case XID_RB_RIGHT:
878                         p = &n->rb_right;
879                         break;
880                 case XID_RB_EQUAL:
881                         WARN_ON_ONCE(new != req);
882                         return;
883                 }
884         }
885         rb_link_node(&new->rq_recv, n, p);
886         rb_insert_color(&new->rq_recv, &xprt->recv_queue);
887 }
888
889 static void
890 xprt_request_rb_remove(struct rpc_xprt *xprt, struct rpc_rqst *req)
891 {
892         rb_erase(&req->rq_recv, &xprt->recv_queue);
893 }
894
895 /**
896  * xprt_lookup_rqst - find an RPC request corresponding to an XID
897  * @xprt: transport on which the original request was transmitted
898  * @xid: RPC XID of incoming reply
899  *
900  * Caller holds xprt->queue_lock.
901  */
902 struct rpc_rqst *xprt_lookup_rqst(struct rpc_xprt *xprt, __be32 xid)
903 {
904         struct rpc_rqst *entry;
905
906         entry = xprt_request_rb_find(xprt, xid);
907         if (entry != NULL) {
908                 trace_xprt_lookup_rqst(xprt, xid, 0);
909                 entry->rq_rtt = ktime_sub(ktime_get(), entry->rq_xtime);
910                 return entry;
911         }
912
913         dprintk("RPC:       xprt_lookup_rqst did not find xid %08x\n",
914                         ntohl(xid));
915         trace_xprt_lookup_rqst(xprt, xid, -ENOENT);
916         xprt->stat.bad_xids++;
917         return NULL;
918 }
919 EXPORT_SYMBOL_GPL(xprt_lookup_rqst);
920
921 static bool
922 xprt_is_pinned_rqst(struct rpc_rqst *req)
923 {
924         return atomic_read(&req->rq_pin) != 0;
925 }
926
927 /**
928  * xprt_pin_rqst - Pin a request on the transport receive list
929  * @req: Request to pin
930  *
931  * Caller must ensure this is atomic with the call to xprt_lookup_rqst()
932  * so should be holding the xprt receive lock.
933  */
934 void xprt_pin_rqst(struct rpc_rqst *req)
935 {
936         atomic_inc(&req->rq_pin);
937 }
938 EXPORT_SYMBOL_GPL(xprt_pin_rqst);
939
940 /**
941  * xprt_unpin_rqst - Unpin a request on the transport receive list
942  * @req: Request to pin
943  *
944  * Caller should be holding the xprt receive lock.
945  */
946 void xprt_unpin_rqst(struct rpc_rqst *req)
947 {
948         if (!test_bit(RPC_TASK_MSG_PIN_WAIT, &req->rq_task->tk_runstate)) {
949                 atomic_dec(&req->rq_pin);
950                 return;
951         }
952         if (atomic_dec_and_test(&req->rq_pin))
953                 wake_up_var(&req->rq_pin);
954 }
955 EXPORT_SYMBOL_GPL(xprt_unpin_rqst);
956
957 static void xprt_wait_on_pinned_rqst(struct rpc_rqst *req)
958 {
959         wait_var_event(&req->rq_pin, !xprt_is_pinned_rqst(req));
960 }
961
962 static bool
963 xprt_request_data_received(struct rpc_task *task)
964 {
965         return !test_bit(RPC_TASK_NEED_RECV, &task->tk_runstate) &&
966                 READ_ONCE(task->tk_rqstp->rq_reply_bytes_recvd) != 0;
967 }
968
969 static bool
970 xprt_request_need_enqueue_receive(struct rpc_task *task, struct rpc_rqst *req)
971 {
972         return !test_bit(RPC_TASK_NEED_RECV, &task->tk_runstate) &&
973                 READ_ONCE(task->tk_rqstp->rq_reply_bytes_recvd) == 0;
974 }
975
976 /**
977  * xprt_request_enqueue_receive - Add an request to the receive queue
978  * @task: RPC task
979  *
980  */
981 void
982 xprt_request_enqueue_receive(struct rpc_task *task)
983 {
984         struct rpc_rqst *req = task->tk_rqstp;
985         struct rpc_xprt *xprt = req->rq_xprt;
986
987         if (!xprt_request_need_enqueue_receive(task, req))
988                 return;
989         spin_lock(&xprt->queue_lock);
990
991         /* Update the softirq receive buffer */
992         memcpy(&req->rq_private_buf, &req->rq_rcv_buf,
993                         sizeof(req->rq_private_buf));
994
995         /* Add request to the receive list */
996         xprt_request_rb_insert(xprt, req);
997         set_bit(RPC_TASK_NEED_RECV, &task->tk_runstate);
998         spin_unlock(&xprt->queue_lock);
999
1000         xprt_reset_majortimeo(req);
1001         /* Turn off autodisconnect */
1002         del_singleshot_timer_sync(&xprt->timer);
1003 }
1004
1005 /**
1006  * xprt_request_dequeue_receive_locked - Remove a request from the receive queue
1007  * @task: RPC task
1008  *
1009  * Caller must hold xprt->queue_lock.
1010  */
1011 static void
1012 xprt_request_dequeue_receive_locked(struct rpc_task *task)
1013 {
1014         struct rpc_rqst *req = task->tk_rqstp;
1015
1016         if (test_and_clear_bit(RPC_TASK_NEED_RECV, &task->tk_runstate))
1017                 xprt_request_rb_remove(req->rq_xprt, req);
1018 }
1019
1020 /**
1021  * xprt_update_rtt - Update RPC RTT statistics
1022  * @task: RPC request that recently completed
1023  *
1024  * Caller holds xprt->queue_lock.
1025  */
1026 void xprt_update_rtt(struct rpc_task *task)
1027 {
1028         struct rpc_rqst *req = task->tk_rqstp;
1029         struct rpc_rtt *rtt = task->tk_client->cl_rtt;
1030         unsigned int timer = task->tk_msg.rpc_proc->p_timer;
1031         long m = usecs_to_jiffies(ktime_to_us(req->rq_rtt));
1032
1033         if (timer) {
1034                 if (req->rq_ntrans == 1)
1035                         rpc_update_rtt(rtt, timer, m);
1036                 rpc_set_timeo(rtt, timer, req->rq_ntrans - 1);
1037         }
1038 }
1039 EXPORT_SYMBOL_GPL(xprt_update_rtt);
1040
1041 /**
1042  * xprt_complete_rqst - called when reply processing is complete
1043  * @task: RPC request that recently completed
1044  * @copied: actual number of bytes received from the transport
1045  *
1046  * Caller holds xprt->queue_lock.
1047  */
1048 void xprt_complete_rqst(struct rpc_task *task, int copied)
1049 {
1050         struct rpc_rqst *req = task->tk_rqstp;
1051         struct rpc_xprt *xprt = req->rq_xprt;
1052
1053         dprintk("RPC: %5u xid %08x complete (%d bytes received)\n",
1054                         task->tk_pid, ntohl(req->rq_xid), copied);
1055         trace_xprt_complete_rqst(xprt, req->rq_xid, copied);
1056
1057         xprt->stat.recvs++;
1058
1059         req->rq_private_buf.len = copied;
1060         /* Ensure all writes are done before we update */
1061         /* req->rq_reply_bytes_recvd */
1062         smp_wmb();
1063         req->rq_reply_bytes_recvd = copied;
1064         xprt_request_dequeue_receive_locked(task);
1065         rpc_wake_up_queued_task(&xprt->pending, task);
1066 }
1067 EXPORT_SYMBOL_GPL(xprt_complete_rqst);
1068
1069 static void xprt_timer(struct rpc_task *task)
1070 {
1071         struct rpc_rqst *req = task->tk_rqstp;
1072         struct rpc_xprt *xprt = req->rq_xprt;
1073
1074         if (task->tk_status != -ETIMEDOUT)
1075                 return;
1076
1077         trace_xprt_timer(xprt, req->rq_xid, task->tk_status);
1078         if (!req->rq_reply_bytes_recvd) {
1079                 if (xprt->ops->timer)
1080                         xprt->ops->timer(xprt, task);
1081         } else
1082                 task->tk_status = 0;
1083 }
1084
1085 /**
1086  * xprt_wait_for_reply_request_def - wait for reply
1087  * @task: pointer to rpc_task
1088  *
1089  * Set a request's retransmit timeout based on the transport's
1090  * default timeout parameters.  Used by transports that don't adjust
1091  * the retransmit timeout based on round-trip time estimation,
1092  * and put the task to sleep on the pending queue.
1093  */
1094 void xprt_wait_for_reply_request_def(struct rpc_task *task)
1095 {
1096         struct rpc_rqst *req = task->tk_rqstp;
1097
1098         rpc_sleep_on_timeout(&req->rq_xprt->pending, task, xprt_timer,
1099                         xprt_request_timeout(req));
1100 }
1101 EXPORT_SYMBOL_GPL(xprt_wait_for_reply_request_def);
1102
1103 /**
1104  * xprt_wait_for_reply_request_rtt - wait for reply using RTT estimator
1105  * @task: pointer to rpc_task
1106  *
1107  * Set a request's retransmit timeout using the RTT estimator,
1108  * and put the task to sleep on the pending queue.
1109  */
1110 void xprt_wait_for_reply_request_rtt(struct rpc_task *task)
1111 {
1112         int timer = task->tk_msg.rpc_proc->p_timer;
1113         struct rpc_clnt *clnt = task->tk_client;
1114         struct rpc_rtt *rtt = clnt->cl_rtt;
1115         struct rpc_rqst *req = task->tk_rqstp;
1116         unsigned long max_timeout = clnt->cl_timeout->to_maxval;
1117         unsigned long timeout;
1118
1119         timeout = rpc_calc_rto(rtt, timer);
1120         timeout <<= rpc_ntimeo(rtt, timer) + req->rq_retries;
1121         if (timeout > max_timeout || timeout == 0)
1122                 timeout = max_timeout;
1123         rpc_sleep_on_timeout(&req->rq_xprt->pending, task, xprt_timer,
1124                         jiffies + timeout);
1125 }
1126 EXPORT_SYMBOL_GPL(xprt_wait_for_reply_request_rtt);
1127
1128 /**
1129  * xprt_request_wait_receive - wait for the reply to an RPC request
1130  * @task: RPC task about to send a request
1131  *
1132  */
1133 void xprt_request_wait_receive(struct rpc_task *task)
1134 {
1135         struct rpc_rqst *req = task->tk_rqstp;
1136         struct rpc_xprt *xprt = req->rq_xprt;
1137
1138         if (!test_bit(RPC_TASK_NEED_RECV, &task->tk_runstate))
1139                 return;
1140         /*
1141          * Sleep on the pending queue if we're expecting a reply.
1142          * The spinlock ensures atomicity between the test of
1143          * req->rq_reply_bytes_recvd, and the call to rpc_sleep_on().
1144          */
1145         spin_lock(&xprt->queue_lock);
1146         if (test_bit(RPC_TASK_NEED_RECV, &task->tk_runstate)) {
1147                 xprt->ops->wait_for_reply_request(task);
1148                 /*
1149                  * Send an extra queue wakeup call if the
1150                  * connection was dropped in case the call to
1151                  * rpc_sleep_on() raced.
1152                  */
1153                 if (xprt_request_retransmit_after_disconnect(task))
1154                         rpc_wake_up_queued_task_set_status(&xprt->pending,
1155                                         task, -ENOTCONN);
1156         }
1157         spin_unlock(&xprt->queue_lock);
1158 }
1159
1160 static bool
1161 xprt_request_need_enqueue_transmit(struct rpc_task *task, struct rpc_rqst *req)
1162 {
1163         return !test_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate);
1164 }
1165
1166 /**
1167  * xprt_request_enqueue_transmit - queue a task for transmission
1168  * @task: pointer to rpc_task
1169  *
1170  * Add a task to the transmission queue.
1171  */
1172 void
1173 xprt_request_enqueue_transmit(struct rpc_task *task)
1174 {
1175         struct rpc_rqst *pos, *req = task->tk_rqstp;
1176         struct rpc_xprt *xprt = req->rq_xprt;
1177
1178         if (xprt_request_need_enqueue_transmit(task, req)) {
1179                 req->rq_bytes_sent = 0;
1180                 spin_lock(&xprt->queue_lock);
1181                 /*
1182                  * Requests that carry congestion control credits are added
1183                  * to the head of the list to avoid starvation issues.
1184                  */
1185                 if (req->rq_cong) {
1186                         xprt_clear_congestion_window_wait(xprt);
1187                         list_for_each_entry(pos, &xprt->xmit_queue, rq_xmit) {
1188                                 if (pos->rq_cong)
1189                                         continue;
1190                                 /* Note: req is added _before_ pos */
1191                                 list_add_tail(&req->rq_xmit, &pos->rq_xmit);
1192                                 INIT_LIST_HEAD(&req->rq_xmit2);
1193                                 trace_xprt_enq_xmit(task, 1);
1194                                 goto out;
1195                         }
1196                 } else if (RPC_IS_SWAPPER(task)) {
1197                         list_for_each_entry(pos, &xprt->xmit_queue, rq_xmit) {
1198                                 if (pos->rq_cong || pos->rq_bytes_sent)
1199                                         continue;
1200                                 if (RPC_IS_SWAPPER(pos->rq_task))
1201                                         continue;
1202                                 /* Note: req is added _before_ pos */
1203                                 list_add_tail(&req->rq_xmit, &pos->rq_xmit);
1204                                 INIT_LIST_HEAD(&req->rq_xmit2);
1205                                 trace_xprt_enq_xmit(task, 2);
1206                                 goto out;
1207                         }
1208                 } else if (!req->rq_seqno) {
1209                         list_for_each_entry(pos, &xprt->xmit_queue, rq_xmit) {
1210                                 if (pos->rq_task->tk_owner != task->tk_owner)
1211                                         continue;
1212                                 list_add_tail(&req->rq_xmit2, &pos->rq_xmit2);
1213                                 INIT_LIST_HEAD(&req->rq_xmit);
1214                                 trace_xprt_enq_xmit(task, 3);
1215                                 goto out;
1216                         }
1217                 }
1218                 list_add_tail(&req->rq_xmit, &xprt->xmit_queue);
1219                 INIT_LIST_HEAD(&req->rq_xmit2);
1220                 trace_xprt_enq_xmit(task, 4);
1221 out:
1222                 set_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate);
1223                 spin_unlock(&xprt->queue_lock);
1224         }
1225 }
1226
1227 /**
1228  * xprt_request_dequeue_transmit_locked - remove a task from the transmission queue
1229  * @task: pointer to rpc_task
1230  *
1231  * Remove a task from the transmission queue
1232  * Caller must hold xprt->queue_lock
1233  */
1234 static void
1235 xprt_request_dequeue_transmit_locked(struct rpc_task *task)
1236 {
1237         struct rpc_rqst *req = task->tk_rqstp;
1238
1239         if (!test_and_clear_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate))
1240                 return;
1241         if (!list_empty(&req->rq_xmit)) {
1242                 list_del(&req->rq_xmit);
1243                 if (!list_empty(&req->rq_xmit2)) {
1244                         struct rpc_rqst *next = list_first_entry(&req->rq_xmit2,
1245                                         struct rpc_rqst, rq_xmit2);
1246                         list_del(&req->rq_xmit2);
1247                         list_add_tail(&next->rq_xmit, &next->rq_xprt->xmit_queue);
1248                 }
1249         } else
1250                 list_del(&req->rq_xmit2);
1251 }
1252
1253 /**
1254  * xprt_request_dequeue_transmit - remove a task from the transmission queue
1255  * @task: pointer to rpc_task
1256  *
1257  * Remove a task from the transmission queue
1258  */
1259 static void
1260 xprt_request_dequeue_transmit(struct rpc_task *task)
1261 {
1262         struct rpc_rqst *req = task->tk_rqstp;
1263         struct rpc_xprt *xprt = req->rq_xprt;
1264
1265         spin_lock(&xprt->queue_lock);
1266         xprt_request_dequeue_transmit_locked(task);
1267         spin_unlock(&xprt->queue_lock);
1268 }
1269
1270 /**
1271  * xprt_request_prepare - prepare an encoded request for transport
1272  * @req: pointer to rpc_rqst
1273  *
1274  * Calls into the transport layer to do whatever is needed to prepare
1275  * the request for transmission or receive.
1276  */
1277 void
1278 xprt_request_prepare(struct rpc_rqst *req)
1279 {
1280         struct rpc_xprt *xprt = req->rq_xprt;
1281
1282         if (xprt->ops->prepare_request)
1283                 xprt->ops->prepare_request(req);
1284 }
1285
1286 /**
1287  * xprt_request_need_retransmit - Test if a task needs retransmission
1288  * @task: pointer to rpc_task
1289  *
1290  * Test for whether a connection breakage requires the task to retransmit
1291  */
1292 bool
1293 xprt_request_need_retransmit(struct rpc_task *task)
1294 {
1295         return xprt_request_retransmit_after_disconnect(task);
1296 }
1297
1298 /**
1299  * xprt_prepare_transmit - reserve the transport before sending a request
1300  * @task: RPC task about to send a request
1301  *
1302  */
1303 bool xprt_prepare_transmit(struct rpc_task *task)
1304 {
1305         struct rpc_rqst *req = task->tk_rqstp;
1306         struct rpc_xprt *xprt = req->rq_xprt;
1307
1308         dprintk("RPC: %5u xprt_prepare_transmit\n", task->tk_pid);
1309
1310         if (!xprt_lock_write(xprt, task)) {
1311                 /* Race breaker: someone may have transmitted us */
1312                 if (!test_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate))
1313                         rpc_wake_up_queued_task_set_status(&xprt->sending,
1314                                         task, 0);
1315                 return false;
1316
1317         }
1318         return true;
1319 }
1320
1321 void xprt_end_transmit(struct rpc_task *task)
1322 {
1323         xprt_release_write(task->tk_rqstp->rq_xprt, task);
1324 }
1325
1326 /**
1327  * xprt_request_transmit - send an RPC request on a transport
1328  * @req: pointer to request to transmit
1329  * @snd_task: RPC task that owns the transport lock
1330  *
1331  * This performs the transmission of a single request.
1332  * Note that if the request is not the same as snd_task, then it
1333  * does need to be pinned.
1334  * Returns '0' on success.
1335  */
1336 static int
1337 xprt_request_transmit(struct rpc_rqst *req, struct rpc_task *snd_task)
1338 {
1339         struct rpc_xprt *xprt = req->rq_xprt;
1340         struct rpc_task *task = req->rq_task;
1341         unsigned int connect_cookie;
1342         int is_retrans = RPC_WAS_SENT(task);
1343         int status;
1344
1345         if (!req->rq_bytes_sent) {
1346                 if (xprt_request_data_received(task)) {
1347                         status = 0;
1348                         goto out_dequeue;
1349                 }
1350                 /* Verify that our message lies in the RPCSEC_GSS window */
1351                 if (rpcauth_xmit_need_reencode(task)) {
1352                         status = -EBADMSG;
1353                         goto out_dequeue;
1354                 }
1355                 if (task->tk_ops->rpc_call_prepare_transmit) {
1356                         task->tk_ops->rpc_call_prepare_transmit(task,
1357                                         task->tk_calldata);
1358                         status = task->tk_status;
1359                         if (status < 0)
1360                                 goto out_dequeue;
1361                 }
1362                 if (RPC_SIGNALLED(task)) {
1363                         status = -ERESTARTSYS;
1364                         goto out_dequeue;
1365                 }
1366         }
1367
1368         /*
1369          * Update req->rq_ntrans before transmitting to avoid races with
1370          * xprt_update_rtt(), which needs to know that it is recording a
1371          * reply to the first transmission.
1372          */
1373         req->rq_ntrans++;
1374
1375         connect_cookie = xprt->connect_cookie;
1376         status = xprt->ops->send_request(req);
1377         if (status != 0) {
1378                 req->rq_ntrans--;
1379                 trace_xprt_transmit(req, status);
1380                 return status;
1381         }
1382
1383         if (is_retrans)
1384                 task->tk_client->cl_stats->rpcretrans++;
1385
1386         xprt_inject_disconnect(xprt);
1387
1388         task->tk_flags |= RPC_TASK_SENT;
1389         spin_lock_bh(&xprt->transport_lock);
1390
1391         xprt->stat.sends++;
1392         xprt->stat.req_u += xprt->stat.sends - xprt->stat.recvs;
1393         xprt->stat.bklog_u += xprt->backlog.qlen;
1394         xprt->stat.sending_u += xprt->sending.qlen;
1395         xprt->stat.pending_u += xprt->pending.qlen;
1396         spin_unlock_bh(&xprt->transport_lock);
1397
1398         req->rq_connect_cookie = connect_cookie;
1399 out_dequeue:
1400         trace_xprt_transmit(req, status);
1401         xprt_request_dequeue_transmit(task);
1402         rpc_wake_up_queued_task_set_status(&xprt->sending, task, status);
1403         return status;
1404 }
1405
1406 /**
1407  * xprt_transmit - send an RPC request on a transport
1408  * @task: controlling RPC task
1409  *
1410  * Attempts to drain the transmit queue. On exit, either the transport
1411  * signalled an error that needs to be handled before transmission can
1412  * resume, or @task finished transmitting, and detected that it already
1413  * received a reply.
1414  */
1415 void
1416 xprt_transmit(struct rpc_task *task)
1417 {
1418         struct rpc_rqst *next, *req = task->tk_rqstp;
1419         struct rpc_xprt *xprt = req->rq_xprt;
1420         int status;
1421
1422         spin_lock(&xprt->queue_lock);
1423         while (!list_empty(&xprt->xmit_queue)) {
1424                 next = list_first_entry(&xprt->xmit_queue,
1425                                 struct rpc_rqst, rq_xmit);
1426                 xprt_pin_rqst(next);
1427                 spin_unlock(&xprt->queue_lock);
1428                 status = xprt_request_transmit(next, task);
1429                 if (status == -EBADMSG && next != req)
1430                         status = 0;
1431                 cond_resched();
1432                 spin_lock(&xprt->queue_lock);
1433                 xprt_unpin_rqst(next);
1434                 if (status == 0) {
1435                         if (!xprt_request_data_received(task) ||
1436                             test_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate))
1437                                 continue;
1438                 } else if (test_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate))
1439                         task->tk_status = status;
1440                 break;
1441         }
1442         spin_unlock(&xprt->queue_lock);
1443 }
1444
1445 static void xprt_add_backlog(struct rpc_xprt *xprt, struct rpc_task *task)
1446 {
1447         set_bit(XPRT_CONGESTED, &xprt->state);
1448         rpc_sleep_on(&xprt->backlog, task, NULL);
1449 }
1450
1451 static void xprt_wake_up_backlog(struct rpc_xprt *xprt)
1452 {
1453         if (rpc_wake_up_next(&xprt->backlog) == NULL)
1454                 clear_bit(XPRT_CONGESTED, &xprt->state);
1455 }
1456
1457 static bool xprt_throttle_congested(struct rpc_xprt *xprt, struct rpc_task *task)
1458 {
1459         bool ret = false;
1460
1461         if (!test_bit(XPRT_CONGESTED, &xprt->state))
1462                 goto out;
1463         spin_lock(&xprt->reserve_lock);
1464         if (test_bit(XPRT_CONGESTED, &xprt->state)) {
1465                 rpc_sleep_on(&xprt->backlog, task, NULL);
1466                 ret = true;
1467         }
1468         spin_unlock(&xprt->reserve_lock);
1469 out:
1470         return ret;
1471 }
1472
1473 static struct rpc_rqst *xprt_dynamic_alloc_slot(struct rpc_xprt *xprt)
1474 {
1475         struct rpc_rqst *req = ERR_PTR(-EAGAIN);
1476
1477         if (xprt->num_reqs >= xprt->max_reqs)
1478                 goto out;
1479         ++xprt->num_reqs;
1480         spin_unlock(&xprt->reserve_lock);
1481         req = kzalloc(sizeof(struct rpc_rqst), GFP_NOFS);
1482         spin_lock(&xprt->reserve_lock);
1483         if (req != NULL)
1484                 goto out;
1485         --xprt->num_reqs;
1486         req = ERR_PTR(-ENOMEM);
1487 out:
1488         return req;
1489 }
1490
1491 static bool xprt_dynamic_free_slot(struct rpc_xprt *xprt, struct rpc_rqst *req)
1492 {
1493         if (xprt->num_reqs > xprt->min_reqs) {
1494                 --xprt->num_reqs;
1495                 kfree(req);
1496                 return true;
1497         }
1498         return false;
1499 }
1500
1501 void xprt_alloc_slot(struct rpc_xprt *xprt, struct rpc_task *task)
1502 {
1503         struct rpc_rqst *req;
1504
1505         spin_lock(&xprt->reserve_lock);
1506         if (!list_empty(&xprt->free)) {
1507                 req = list_entry(xprt->free.next, struct rpc_rqst, rq_list);
1508                 list_del(&req->rq_list);
1509                 goto out_init_req;
1510         }
1511         req = xprt_dynamic_alloc_slot(xprt);
1512         if (!IS_ERR(req))
1513                 goto out_init_req;
1514         switch (PTR_ERR(req)) {
1515         case -ENOMEM:
1516                 dprintk("RPC:       dynamic allocation of request slot "
1517                                 "failed! Retrying\n");
1518                 task->tk_status = -ENOMEM;
1519                 break;
1520         case -EAGAIN:
1521                 xprt_add_backlog(xprt, task);
1522                 dprintk("RPC:       waiting for request slot\n");
1523                 /* fall through */
1524         default:
1525                 task->tk_status = -EAGAIN;
1526         }
1527         spin_unlock(&xprt->reserve_lock);
1528         return;
1529 out_init_req:
1530         xprt->stat.max_slots = max_t(unsigned int, xprt->stat.max_slots,
1531                                      xprt->num_reqs);
1532         spin_unlock(&xprt->reserve_lock);
1533
1534         task->tk_status = 0;
1535         task->tk_rqstp = req;
1536 }
1537 EXPORT_SYMBOL_GPL(xprt_alloc_slot);
1538
1539 void xprt_free_slot(struct rpc_xprt *xprt, struct rpc_rqst *req)
1540 {
1541         spin_lock(&xprt->reserve_lock);
1542         if (!xprt_dynamic_free_slot(xprt, req)) {
1543                 memset(req, 0, sizeof(*req));   /* mark unused */
1544                 list_add(&req->rq_list, &xprt->free);
1545         }
1546         xprt_wake_up_backlog(xprt);
1547         spin_unlock(&xprt->reserve_lock);
1548 }
1549 EXPORT_SYMBOL_GPL(xprt_free_slot);
1550
1551 static void xprt_free_all_slots(struct rpc_xprt *xprt)
1552 {
1553         struct rpc_rqst *req;
1554         while (!list_empty(&xprt->free)) {
1555                 req = list_first_entry(&xprt->free, struct rpc_rqst, rq_list);
1556                 list_del(&req->rq_list);
1557                 kfree(req);
1558         }
1559 }
1560
1561 struct rpc_xprt *xprt_alloc(struct net *net, size_t size,
1562                 unsigned int num_prealloc,
1563                 unsigned int max_alloc)
1564 {
1565         struct rpc_xprt *xprt;
1566         struct rpc_rqst *req;
1567         int i;
1568
1569         xprt = kzalloc(size, GFP_KERNEL);
1570         if (xprt == NULL)
1571                 goto out;
1572
1573         xprt_init(xprt, net);
1574
1575         for (i = 0; i < num_prealloc; i++) {
1576                 req = kzalloc(sizeof(struct rpc_rqst), GFP_KERNEL);
1577                 if (!req)
1578                         goto out_free;
1579                 list_add(&req->rq_list, &xprt->free);
1580         }
1581         if (max_alloc > num_prealloc)
1582                 xprt->max_reqs = max_alloc;
1583         else
1584                 xprt->max_reqs = num_prealloc;
1585         xprt->min_reqs = num_prealloc;
1586         xprt->num_reqs = num_prealloc;
1587
1588         return xprt;
1589
1590 out_free:
1591         xprt_free(xprt);
1592 out:
1593         return NULL;
1594 }
1595 EXPORT_SYMBOL_GPL(xprt_alloc);
1596
1597 void xprt_free(struct rpc_xprt *xprt)
1598 {
1599         put_net(xprt->xprt_net);
1600         xprt_free_all_slots(xprt);
1601         kfree_rcu(xprt, rcu);
1602 }
1603 EXPORT_SYMBOL_GPL(xprt_free);
1604
1605 static void
1606 xprt_init_connect_cookie(struct rpc_rqst *req, struct rpc_xprt *xprt)
1607 {
1608         req->rq_connect_cookie = xprt_connect_cookie(xprt) - 1;
1609 }
1610
1611 static __be32
1612 xprt_alloc_xid(struct rpc_xprt *xprt)
1613 {
1614         __be32 xid;
1615
1616         spin_lock(&xprt->reserve_lock);
1617         xid = (__force __be32)xprt->xid++;
1618         spin_unlock(&xprt->reserve_lock);
1619         return xid;
1620 }
1621
1622 static void
1623 xprt_init_xid(struct rpc_xprt *xprt)
1624 {
1625         xprt->xid = prandom_u32();
1626 }
1627
1628 static void
1629 xprt_request_init(struct rpc_task *task)
1630 {
1631         struct rpc_xprt *xprt = task->tk_xprt;
1632         struct rpc_rqst *req = task->tk_rqstp;
1633
1634         req->rq_timeout = task->tk_client->cl_timeout->to_initval;
1635         req->rq_task    = task;
1636         req->rq_xprt    = xprt;
1637         req->rq_buffer  = NULL;
1638         req->rq_xid     = xprt_alloc_xid(xprt);
1639         xprt_init_connect_cookie(req, xprt);
1640         req->rq_snd_buf.len = 0;
1641         req->rq_snd_buf.buflen = 0;
1642         req->rq_rcv_buf.len = 0;
1643         req->rq_rcv_buf.buflen = 0;
1644         req->rq_snd_buf.bvec = NULL;
1645         req->rq_rcv_buf.bvec = NULL;
1646         req->rq_release_snd_buf = NULL;
1647         xprt_reset_majortimeo(req);
1648         dprintk("RPC: %5u reserved req %p xid %08x\n", task->tk_pid,
1649                         req, ntohl(req->rq_xid));
1650 }
1651
1652 static void
1653 xprt_do_reserve(struct rpc_xprt *xprt, struct rpc_task *task)
1654 {
1655         xprt->ops->alloc_slot(xprt, task);
1656         if (task->tk_rqstp != NULL)
1657                 xprt_request_init(task);
1658 }
1659
1660 /**
1661  * xprt_reserve - allocate an RPC request slot
1662  * @task: RPC task requesting a slot allocation
1663  *
1664  * If the transport is marked as being congested, or if no more
1665  * slots are available, place the task on the transport's
1666  * backlog queue.
1667  */
1668 void xprt_reserve(struct rpc_task *task)
1669 {
1670         struct rpc_xprt *xprt = task->tk_xprt;
1671
1672         task->tk_status = 0;
1673         if (task->tk_rqstp != NULL)
1674                 return;
1675
1676         task->tk_status = -EAGAIN;
1677         if (!xprt_throttle_congested(xprt, task))
1678                 xprt_do_reserve(xprt, task);
1679 }
1680
1681 /**
1682  * xprt_retry_reserve - allocate an RPC request slot
1683  * @task: RPC task requesting a slot allocation
1684  *
1685  * If no more slots are available, place the task on the transport's
1686  * backlog queue.
1687  * Note that the only difference with xprt_reserve is that we now
1688  * ignore the value of the XPRT_CONGESTED flag.
1689  */
1690 void xprt_retry_reserve(struct rpc_task *task)
1691 {
1692         struct rpc_xprt *xprt = task->tk_xprt;
1693
1694         task->tk_status = 0;
1695         if (task->tk_rqstp != NULL)
1696                 return;
1697
1698         task->tk_status = -EAGAIN;
1699         xprt_do_reserve(xprt, task);
1700 }
1701
1702 static void
1703 xprt_request_dequeue_all(struct rpc_task *task, struct rpc_rqst *req)
1704 {
1705         struct rpc_xprt *xprt = req->rq_xprt;
1706
1707         if (test_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate) ||
1708             test_bit(RPC_TASK_NEED_RECV, &task->tk_runstate) ||
1709             xprt_is_pinned_rqst(req)) {
1710                 spin_lock(&xprt->queue_lock);
1711                 xprt_request_dequeue_transmit_locked(task);
1712                 xprt_request_dequeue_receive_locked(task);
1713                 while (xprt_is_pinned_rqst(req)) {
1714                         set_bit(RPC_TASK_MSG_PIN_WAIT, &task->tk_runstate);
1715                         spin_unlock(&xprt->queue_lock);
1716                         xprt_wait_on_pinned_rqst(req);
1717                         spin_lock(&xprt->queue_lock);
1718                         clear_bit(RPC_TASK_MSG_PIN_WAIT, &task->tk_runstate);
1719                 }
1720                 spin_unlock(&xprt->queue_lock);
1721         }
1722 }
1723
1724 /**
1725  * xprt_release - release an RPC request slot
1726  * @task: task which is finished with the slot
1727  *
1728  */
1729 void xprt_release(struct rpc_task *task)
1730 {
1731         struct rpc_xprt *xprt;
1732         struct rpc_rqst *req = task->tk_rqstp;
1733
1734         if (req == NULL) {
1735                 if (task->tk_client) {
1736                         xprt = task->tk_xprt;
1737                         xprt_release_write(xprt, task);
1738                 }
1739                 return;
1740         }
1741
1742         xprt = req->rq_xprt;
1743         if (task->tk_ops->rpc_count_stats != NULL)
1744                 task->tk_ops->rpc_count_stats(task, task->tk_calldata);
1745         else if (task->tk_client)
1746                 rpc_count_iostats(task, task->tk_client->cl_metrics);
1747         xprt_request_dequeue_all(task, req);
1748         spin_lock_bh(&xprt->transport_lock);
1749         xprt->ops->release_xprt(xprt, task);
1750         if (xprt->ops->release_request)
1751                 xprt->ops->release_request(task);
1752         xprt->last_used = jiffies;
1753         xprt_schedule_autodisconnect(xprt);
1754         spin_unlock_bh(&xprt->transport_lock);
1755         if (req->rq_buffer)
1756                 xprt->ops->buf_free(task);
1757         xprt_inject_disconnect(xprt);
1758         xdr_free_bvec(&req->rq_rcv_buf);
1759         xdr_free_bvec(&req->rq_snd_buf);
1760         if (req->rq_cred != NULL)
1761                 put_rpccred(req->rq_cred);
1762         task->tk_rqstp = NULL;
1763         if (req->rq_release_snd_buf)
1764                 req->rq_release_snd_buf(req);
1765
1766         dprintk("RPC: %5u release request %p\n", task->tk_pid, req);
1767         if (likely(!bc_prealloc(req)))
1768                 xprt->ops->free_slot(xprt, req);
1769         else
1770                 xprt_free_bc_request(req);
1771 }
1772
1773 #ifdef CONFIG_SUNRPC_BACKCHANNEL
1774 void
1775 xprt_init_bc_request(struct rpc_rqst *req, struct rpc_task *task)
1776 {
1777         struct xdr_buf *xbufp = &req->rq_snd_buf;
1778
1779         task->tk_rqstp = req;
1780         req->rq_task = task;
1781         xprt_init_connect_cookie(req, req->rq_xprt);
1782         /*
1783          * Set up the xdr_buf length.
1784          * This also indicates that the buffer is XDR encoded already.
1785          */
1786         xbufp->len = xbufp->head[0].iov_len + xbufp->page_len +
1787                 xbufp->tail[0].iov_len;
1788 }
1789 #endif
1790
1791 static void xprt_init(struct rpc_xprt *xprt, struct net *net)
1792 {
1793         kref_init(&xprt->kref);
1794
1795         spin_lock_init(&xprt->transport_lock);
1796         spin_lock_init(&xprt->reserve_lock);
1797         spin_lock_init(&xprt->queue_lock);
1798
1799         INIT_LIST_HEAD(&xprt->free);
1800         xprt->recv_queue = RB_ROOT;
1801         INIT_LIST_HEAD(&xprt->xmit_queue);
1802 #if defined(CONFIG_SUNRPC_BACKCHANNEL)
1803         spin_lock_init(&xprt->bc_pa_lock);
1804         INIT_LIST_HEAD(&xprt->bc_pa_list);
1805 #endif /* CONFIG_SUNRPC_BACKCHANNEL */
1806         INIT_LIST_HEAD(&xprt->xprt_switch);
1807
1808         xprt->last_used = jiffies;
1809         xprt->cwnd = RPC_INITCWND;
1810         xprt->bind_index = 0;
1811
1812         rpc_init_wait_queue(&xprt->binding, "xprt_binding");
1813         rpc_init_wait_queue(&xprt->pending, "xprt_pending");
1814         rpc_init_wait_queue(&xprt->sending, "xprt_sending");
1815         rpc_init_priority_wait_queue(&xprt->backlog, "xprt_backlog");
1816
1817         xprt_init_xid(xprt);
1818
1819         xprt->xprt_net = get_net(net);
1820 }
1821
1822 /**
1823  * xprt_create_transport - create an RPC transport
1824  * @args: rpc transport creation arguments
1825  *
1826  */
1827 struct rpc_xprt *xprt_create_transport(struct xprt_create *args)
1828 {
1829         struct rpc_xprt *xprt;
1830         struct xprt_class *t;
1831
1832         spin_lock(&xprt_list_lock);
1833         list_for_each_entry(t, &xprt_list, list) {
1834                 if (t->ident == args->ident) {
1835                         spin_unlock(&xprt_list_lock);
1836                         goto found;
1837                 }
1838         }
1839         spin_unlock(&xprt_list_lock);
1840         dprintk("RPC: transport (%d) not supported\n", args->ident);
1841         return ERR_PTR(-EIO);
1842
1843 found:
1844         xprt = t->setup(args);
1845         if (IS_ERR(xprt)) {
1846                 dprintk("RPC:       xprt_create_transport: failed, %ld\n",
1847                                 -PTR_ERR(xprt));
1848                 goto out;
1849         }
1850         if (args->flags & XPRT_CREATE_NO_IDLE_TIMEOUT)
1851                 xprt->idle_timeout = 0;
1852         INIT_WORK(&xprt->task_cleanup, xprt_autoclose);
1853         if (xprt_has_timer(xprt))
1854                 timer_setup(&xprt->timer,
1855                                 xprt_init_autodisconnect,
1856                                 TIMER_DEFERRABLE);
1857         else
1858                 timer_setup(&xprt->timer, NULL, 0);
1859
1860         if (strlen(args->servername) > RPC_MAXNETNAMELEN) {
1861                 xprt_destroy(xprt);
1862                 return ERR_PTR(-EINVAL);
1863         }
1864         xprt->servername = kstrdup(args->servername, GFP_KERNEL);
1865         if (xprt->servername == NULL) {
1866                 xprt_destroy(xprt);
1867                 return ERR_PTR(-ENOMEM);
1868         }
1869
1870         rpc_xprt_debugfs_register(xprt);
1871
1872         dprintk("RPC:       created transport %p with %u slots\n", xprt,
1873                         xprt->max_reqs);
1874 out:
1875         return xprt;
1876 }
1877
1878 static void xprt_destroy_cb(struct work_struct *work)
1879 {
1880         struct rpc_xprt *xprt =
1881                 container_of(work, struct rpc_xprt, task_cleanup);
1882
1883         rpc_xprt_debugfs_unregister(xprt);
1884         rpc_destroy_wait_queue(&xprt->binding);
1885         rpc_destroy_wait_queue(&xprt->pending);
1886         rpc_destroy_wait_queue(&xprt->sending);
1887         rpc_destroy_wait_queue(&xprt->backlog);
1888         kfree(xprt->servername);
1889         /*
1890          * Tear down transport state and free the rpc_xprt
1891          */
1892         xprt->ops->destroy(xprt);
1893 }
1894
1895 /**
1896  * xprt_destroy - destroy an RPC transport, killing off all requests.
1897  * @xprt: transport to destroy
1898  *
1899  */
1900 static void xprt_destroy(struct rpc_xprt *xprt)
1901 {
1902         dprintk("RPC:       destroying transport %p\n", xprt);
1903
1904         /*
1905          * Exclude transport connect/disconnect handlers and autoclose
1906          */
1907         wait_on_bit_lock(&xprt->state, XPRT_LOCKED, TASK_UNINTERRUPTIBLE);
1908
1909         del_timer_sync(&xprt->timer);
1910
1911         /*
1912          * Destroy sockets etc from the system workqueue so they can
1913          * safely flush receive work running on rpciod.
1914          */
1915         INIT_WORK(&xprt->task_cleanup, xprt_destroy_cb);
1916         schedule_work(&xprt->task_cleanup);
1917 }
1918
1919 static void xprt_destroy_kref(struct kref *kref)
1920 {
1921         xprt_destroy(container_of(kref, struct rpc_xprt, kref));
1922 }
1923
1924 /**
1925  * xprt_get - return a reference to an RPC transport.
1926  * @xprt: pointer to the transport
1927  *
1928  */
1929 struct rpc_xprt *xprt_get(struct rpc_xprt *xprt)
1930 {
1931         if (xprt != NULL && kref_get_unless_zero(&xprt->kref))
1932                 return xprt;
1933         return NULL;
1934 }
1935 EXPORT_SYMBOL_GPL(xprt_get);
1936
1937 /**
1938  * xprt_put - release a reference to an RPC transport.
1939  * @xprt: pointer to the transport
1940  *
1941  */
1942 void xprt_put(struct rpc_xprt *xprt)
1943 {
1944         if (xprt != NULL)
1945                 kref_put(&xprt->kref, xprt_destroy_kref);
1946 }
1947 EXPORT_SYMBOL_GPL(xprt_put);