]> asedeno.scripts.mit.edu Git - linux.git/blob - net/tipc/socket.c
tipc: remove probing_intv from tipc_sock
[linux.git] / net / tipc / socket.c
1 /*
2  * net/tipc/socket.c: TIPC socket API
3  *
4  * Copyright (c) 2001-2007, 2012-2015, Ericsson AB
5  * Copyright (c) 2004-2008, 2010-2013, Wind River Systems
6  * All rights reserved.
7  *
8  * Redistribution and use in source and binary forms, with or without
9  * modification, are permitted provided that the following conditions are met:
10  *
11  * 1. Redistributions of source code must retain the above copyright
12  *    notice, this list of conditions and the following disclaimer.
13  * 2. Redistributions in binary form must reproduce the above copyright
14  *    notice, this list of conditions and the following disclaimer in the
15  *    documentation and/or other materials provided with the distribution.
16  * 3. Neither the names of the copyright holders nor the names of its
17  *    contributors may be used to endorse or promote products derived from
18  *    this software without specific prior written permission.
19  *
20  * Alternatively, this software may be distributed under the terms of the
21  * GNU General Public License ("GPL") version 2 as published by the Free
22  * Software Foundation.
23  *
24  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
25  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
26  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
27  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
28  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
29  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
30  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
31  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
32  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
33  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
34  * POSSIBILITY OF SUCH DAMAGE.
35  */
36
37 #include <linux/rhashtable.h>
38 #include "core.h"
39 #include "name_table.h"
40 #include "node.h"
41 #include "link.h"
42 #include "name_distr.h"
43 #include "socket.h"
44 #include "bcast.h"
45 #include "netlink.h"
46
47 #define SS_LISTENING            -1      /* socket is listening */
48 #define SS_READY                -2      /* socket is connectionless */
49
50 #define CONN_TIMEOUT_DEFAULT    8000    /* default connect timeout = 8s */
51 #define CONN_PROBING_INTERVAL   msecs_to_jiffies(3600000)  /* [ms] => 1 h */
52 #define TIPC_FWD_MSG            1
53 #define TIPC_CONN_OK            0
54 #define TIPC_CONN_PROBING       1
55 #define TIPC_MAX_PORT           0xffffffff
56 #define TIPC_MIN_PORT           1
57
58 /**
59  * struct tipc_sock - TIPC socket structure
60  * @sk: socket - interacts with 'port' and with user via the socket API
61  * @conn_type: TIPC type used when connection was established
62  * @conn_instance: TIPC instance used when connection was established
63  * @published: non-zero if port has one or more associated names
64  * @max_pkt: maximum packet size "hint" used when building messages sent by port
65  * @portid: unique port identity in TIPC socket hash table
66  * @phdr: preformatted message header used when sending messages
67  * @publications: list of publications for port
68  * @pub_count: total # of publications port has made during its lifetime
69  * @probing_state:
70  * @conn_timeout: the time we can wait for an unresponded setup request
71  * @dupl_rcvcnt: number of bytes counted twice, in both backlog and rcv queue
72  * @link_cong: non-zero if owner must sleep because of link congestion
73  * @sent_unacked: # messages sent by socket, and not yet acked by peer
74  * @rcv_unacked: # messages read by user, but not yet acked back to peer
75  * @peer: 'connected' peer for dgram/rdm
76  * @node: hash table node
77  * @rcu: rcu struct for tipc_sock
78  */
79 struct tipc_sock {
80         struct sock sk;
81         u32 conn_type;
82         u32 conn_instance;
83         int published;
84         u32 max_pkt;
85         u32 portid;
86         struct tipc_msg phdr;
87         struct list_head sock_list;
88         struct list_head publications;
89         u32 pub_count;
90         u32 probing_state;
91         uint conn_timeout;
92         atomic_t dupl_rcvcnt;
93         bool link_cong;
94         u16 snt_unacked;
95         u16 snd_win;
96         u16 peer_caps;
97         u16 rcv_unacked;
98         u16 rcv_win;
99         struct sockaddr_tipc peer;
100         struct rhash_head node;
101         struct rcu_head rcu;
102 };
103
104 static int tipc_backlog_rcv(struct sock *sk, struct sk_buff *skb);
105 static void tipc_data_ready(struct sock *sk);
106 static void tipc_write_space(struct sock *sk);
107 static void tipc_sock_destruct(struct sock *sk);
108 static int tipc_release(struct socket *sock);
109 static int tipc_accept(struct socket *sock, struct socket *new_sock, int flags);
110 static int tipc_wait_for_sndmsg(struct socket *sock, long *timeo_p);
111 static void tipc_sk_timeout(unsigned long data);
112 static int tipc_sk_publish(struct tipc_sock *tsk, uint scope,
113                            struct tipc_name_seq const *seq);
114 static int tipc_sk_withdraw(struct tipc_sock *tsk, uint scope,
115                             struct tipc_name_seq const *seq);
116 static struct tipc_sock *tipc_sk_lookup(struct net *net, u32 portid);
117 static int tipc_sk_insert(struct tipc_sock *tsk);
118 static void tipc_sk_remove(struct tipc_sock *tsk);
119 static int __tipc_send_stream(struct socket *sock, struct msghdr *m,
120                               size_t dsz);
121 static int __tipc_sendmsg(struct socket *sock, struct msghdr *m, size_t dsz);
122
123 static const struct proto_ops packet_ops;
124 static const struct proto_ops stream_ops;
125 static const struct proto_ops msg_ops;
126 static struct proto tipc_proto;
127
128 static const struct rhashtable_params tsk_rht_params;
129
130 /*
131  * Revised TIPC socket locking policy:
132  *
133  * Most socket operations take the standard socket lock when they start
134  * and hold it until they finish (or until they need to sleep).  Acquiring
135  * this lock grants the owner exclusive access to the fields of the socket
136  * data structures, with the exception of the backlog queue.  A few socket
137  * operations can be done without taking the socket lock because they only
138  * read socket information that never changes during the life of the socket.
139  *
140  * Socket operations may acquire the lock for the associated TIPC port if they
141  * need to perform an operation on the port.  If any routine needs to acquire
142  * both the socket lock and the port lock it must take the socket lock first
143  * to avoid the risk of deadlock.
144  *
145  * The dispatcher handling incoming messages cannot grab the socket lock in
146  * the standard fashion, since invoked it runs at the BH level and cannot block.
147  * Instead, it checks to see if the socket lock is currently owned by someone,
148  * and either handles the message itself or adds it to the socket's backlog
149  * queue; in the latter case the queued message is processed once the process
150  * owning the socket lock releases it.
151  *
152  * NOTE: Releasing the socket lock while an operation is sleeping overcomes
153  * the problem of a blocked socket operation preventing any other operations
154  * from occurring.  However, applications must be careful if they have
155  * multiple threads trying to send (or receive) on the same socket, as these
156  * operations might interfere with each other.  For example, doing a connect
157  * and a receive at the same time might allow the receive to consume the
158  * ACK message meant for the connect.  While additional work could be done
159  * to try and overcome this, it doesn't seem to be worthwhile at the present.
160  *
161  * NOTE: Releasing the socket lock while an operation is sleeping also ensures
162  * that another operation that must be performed in a non-blocking manner is
163  * not delayed for very long because the lock has already been taken.
164  *
165  * NOTE: This code assumes that certain fields of a port/socket pair are
166  * constant over its lifetime; such fields can be examined without taking
167  * the socket lock and/or port lock, and do not need to be re-read even
168  * after resuming processing after waiting.  These fields include:
169  *   - socket type
170  *   - pointer to socket sk structure (aka tipc_sock structure)
171  *   - pointer to port structure
172  *   - port reference
173  */
174
175 static u32 tsk_own_node(struct tipc_sock *tsk)
176 {
177         return msg_prevnode(&tsk->phdr);
178 }
179
180 static u32 tsk_peer_node(struct tipc_sock *tsk)
181 {
182         return msg_destnode(&tsk->phdr);
183 }
184
185 static u32 tsk_peer_port(struct tipc_sock *tsk)
186 {
187         return msg_destport(&tsk->phdr);
188 }
189
190 static  bool tsk_unreliable(struct tipc_sock *tsk)
191 {
192         return msg_src_droppable(&tsk->phdr) != 0;
193 }
194
195 static void tsk_set_unreliable(struct tipc_sock *tsk, bool unreliable)
196 {
197         msg_set_src_droppable(&tsk->phdr, unreliable ? 1 : 0);
198 }
199
200 static bool tsk_unreturnable(struct tipc_sock *tsk)
201 {
202         return msg_dest_droppable(&tsk->phdr) != 0;
203 }
204
205 static void tsk_set_unreturnable(struct tipc_sock *tsk, bool unreturnable)
206 {
207         msg_set_dest_droppable(&tsk->phdr, unreturnable ? 1 : 0);
208 }
209
210 static int tsk_importance(struct tipc_sock *tsk)
211 {
212         return msg_importance(&tsk->phdr);
213 }
214
215 static int tsk_set_importance(struct tipc_sock *tsk, int imp)
216 {
217         if (imp > TIPC_CRITICAL_IMPORTANCE)
218                 return -EINVAL;
219         msg_set_importance(&tsk->phdr, (u32)imp);
220         return 0;
221 }
222
223 static struct tipc_sock *tipc_sk(const struct sock *sk)
224 {
225         return container_of(sk, struct tipc_sock, sk);
226 }
227
228 static bool tsk_conn_cong(struct tipc_sock *tsk)
229 {
230         return tsk->snt_unacked >= tsk->snd_win;
231 }
232
233 /* tsk_blocks(): translate a buffer size in bytes to number of
234  * advertisable blocks, taking into account the ratio truesize(len)/len
235  * We can trust that this ratio is always < 4 for len >= FLOWCTL_BLK_SZ
236  */
237 static u16 tsk_adv_blocks(int len)
238 {
239         return len / FLOWCTL_BLK_SZ / 4;
240 }
241
242 /* tsk_inc(): increment counter for sent or received data
243  * - If block based flow control is not supported by peer we
244  *   fall back to message based ditto, incrementing the counter
245  */
246 static u16 tsk_inc(struct tipc_sock *tsk, int msglen)
247 {
248         if (likely(tsk->peer_caps & TIPC_BLOCK_FLOWCTL))
249                 return ((msglen / FLOWCTL_BLK_SZ) + 1);
250         return 1;
251 }
252
253 /**
254  * tsk_advance_rx_queue - discard first buffer in socket receive queue
255  *
256  * Caller must hold socket lock
257  */
258 static void tsk_advance_rx_queue(struct sock *sk)
259 {
260         kfree_skb(__skb_dequeue(&sk->sk_receive_queue));
261 }
262
263 /* tipc_sk_respond() : send response message back to sender
264  */
265 static void tipc_sk_respond(struct sock *sk, struct sk_buff *skb, int err)
266 {
267         u32 selector;
268         u32 dnode;
269         u32 onode = tipc_own_addr(sock_net(sk));
270
271         if (!tipc_msg_reverse(onode, &skb, err))
272                 return;
273
274         dnode = msg_destnode(buf_msg(skb));
275         selector = msg_origport(buf_msg(skb));
276         tipc_node_xmit_skb(sock_net(sk), skb, dnode, selector);
277 }
278
279 /**
280  * tsk_rej_rx_queue - reject all buffers in socket receive queue
281  *
282  * Caller must hold socket lock
283  */
284 static void tsk_rej_rx_queue(struct sock *sk)
285 {
286         struct sk_buff *skb;
287
288         while ((skb = __skb_dequeue(&sk->sk_receive_queue)))
289                 tipc_sk_respond(sk, skb, TIPC_ERR_NO_PORT);
290 }
291
292 static bool tipc_sk_connected(struct sock *sk)
293 {
294         return sk->sk_socket->state == SS_CONNECTED;
295 }
296
297 /* tsk_peer_msg - verify if message was sent by connected port's peer
298  *
299  * Handles cases where the node's network address has changed from
300  * the default of <0.0.0> to its configured setting.
301  */
302 static bool tsk_peer_msg(struct tipc_sock *tsk, struct tipc_msg *msg)
303 {
304         struct sock *sk = &tsk->sk;
305         struct tipc_net *tn = net_generic(sock_net(sk), tipc_net_id);
306         u32 peer_port = tsk_peer_port(tsk);
307         u32 orig_node;
308         u32 peer_node;
309
310         if (unlikely(!tipc_sk_connected(sk)))
311                 return false;
312
313         if (unlikely(msg_origport(msg) != peer_port))
314                 return false;
315
316         orig_node = msg_orignode(msg);
317         peer_node = tsk_peer_node(tsk);
318
319         if (likely(orig_node == peer_node))
320                 return true;
321
322         if (!orig_node && (peer_node == tn->own_addr))
323                 return true;
324
325         if (!peer_node && (orig_node == tn->own_addr))
326                 return true;
327
328         return false;
329 }
330
331 /**
332  * tipc_sk_create - create a TIPC socket
333  * @net: network namespace (must be default network)
334  * @sock: pre-allocated socket structure
335  * @protocol: protocol indicator (must be 0)
336  * @kern: caused by kernel or by userspace?
337  *
338  * This routine creates additional data structures used by the TIPC socket,
339  * initializes them, and links them together.
340  *
341  * Returns 0 on success, errno otherwise
342  */
343 static int tipc_sk_create(struct net *net, struct socket *sock,
344                           int protocol, int kern)
345 {
346         struct tipc_net *tn;
347         const struct proto_ops *ops;
348         socket_state state;
349         struct sock *sk;
350         struct tipc_sock *tsk;
351         struct tipc_msg *msg;
352
353         /* Validate arguments */
354         if (unlikely(protocol != 0))
355                 return -EPROTONOSUPPORT;
356
357         switch (sock->type) {
358         case SOCK_STREAM:
359                 ops = &stream_ops;
360                 state = SS_UNCONNECTED;
361                 break;
362         case SOCK_SEQPACKET:
363                 ops = &packet_ops;
364                 state = SS_UNCONNECTED;
365                 break;
366         case SOCK_DGRAM:
367         case SOCK_RDM:
368                 ops = &msg_ops;
369                 state = SS_READY;
370                 break;
371         default:
372                 return -EPROTOTYPE;
373         }
374
375         /* Allocate socket's protocol area */
376         sk = sk_alloc(net, AF_TIPC, GFP_KERNEL, &tipc_proto, kern);
377         if (sk == NULL)
378                 return -ENOMEM;
379
380         tsk = tipc_sk(sk);
381         tsk->max_pkt = MAX_PKT_DEFAULT;
382         INIT_LIST_HEAD(&tsk->publications);
383         msg = &tsk->phdr;
384         tn = net_generic(sock_net(sk), tipc_net_id);
385         tipc_msg_init(tn->own_addr, msg, TIPC_LOW_IMPORTANCE, TIPC_NAMED_MSG,
386                       NAMED_H_SIZE, 0);
387
388         /* Finish initializing socket data structures */
389         sock->ops = ops;
390         sock->state = state;
391         sock_init_data(sock, sk);
392         if (tipc_sk_insert(tsk)) {
393                 pr_warn("Socket create failed; port number exhausted\n");
394                 return -EINVAL;
395         }
396         msg_set_origport(msg, tsk->portid);
397         setup_timer(&sk->sk_timer, tipc_sk_timeout, (unsigned long)tsk);
398         sk->sk_backlog_rcv = tipc_backlog_rcv;
399         sk->sk_rcvbuf = sysctl_tipc_rmem[1];
400         sk->sk_data_ready = tipc_data_ready;
401         sk->sk_write_space = tipc_write_space;
402         sk->sk_destruct = tipc_sock_destruct;
403         tsk->conn_timeout = CONN_TIMEOUT_DEFAULT;
404         atomic_set(&tsk->dupl_rcvcnt, 0);
405
406         /* Start out with safe limits until we receive an advertised window */
407         tsk->snd_win = tsk_adv_blocks(RCVBUF_MIN);
408         tsk->rcv_win = tsk->snd_win;
409
410         if (sock->state == SS_READY) {
411                 tsk_set_unreturnable(tsk, true);
412                 if (sock->type == SOCK_DGRAM)
413                         tsk_set_unreliable(tsk, true);
414         }
415         return 0;
416 }
417
418 static void tipc_sk_callback(struct rcu_head *head)
419 {
420         struct tipc_sock *tsk = container_of(head, struct tipc_sock, rcu);
421
422         sock_put(&tsk->sk);
423 }
424
425 /**
426  * tipc_release - destroy a TIPC socket
427  * @sock: socket to destroy
428  *
429  * This routine cleans up any messages that are still queued on the socket.
430  * For DGRAM and RDM socket types, all queued messages are rejected.
431  * For SEQPACKET and STREAM socket types, the first message is rejected
432  * and any others are discarded.  (If the first message on a STREAM socket
433  * is partially-read, it is discarded and the next one is rejected instead.)
434  *
435  * NOTE: Rejected messages are not necessarily returned to the sender!  They
436  * are returned or discarded according to the "destination droppable" setting
437  * specified for the message by the sender.
438  *
439  * Returns 0 on success, errno otherwise
440  */
441 static int tipc_release(struct socket *sock)
442 {
443         struct sock *sk = sock->sk;
444         struct net *net;
445         struct tipc_sock *tsk;
446         struct sk_buff *skb;
447         u32 dnode;
448
449         /*
450          * Exit if socket isn't fully initialized (occurs when a failed accept()
451          * releases a pre-allocated child socket that was never used)
452          */
453         if (sk == NULL)
454                 return 0;
455
456         net = sock_net(sk);
457         tsk = tipc_sk(sk);
458         lock_sock(sk);
459
460         /*
461          * Reject all unreceived messages, except on an active connection
462          * (which disconnects locally & sends a 'FIN+' to peer)
463          */
464         dnode = tsk_peer_node(tsk);
465         while (sock->state != SS_DISCONNECTING) {
466                 skb = __skb_dequeue(&sk->sk_receive_queue);
467                 if (skb == NULL)
468                         break;
469                 if (TIPC_SKB_CB(skb)->bytes_read)
470                         kfree_skb(skb);
471                 else {
472                         if ((sock->state == SS_CONNECTING) ||
473                             (sock->state == SS_CONNECTED)) {
474                                 sock->state = SS_DISCONNECTING;
475                                 tipc_node_remove_conn(net, dnode, tsk->portid);
476                         }
477                         tipc_sk_respond(sk, skb, TIPC_ERR_NO_PORT);
478                 }
479         }
480
481         tipc_sk_withdraw(tsk, 0, NULL);
482         sk_stop_timer(sk, &sk->sk_timer);
483         tipc_sk_remove(tsk);
484         if (tipc_sk_connected(sk)) {
485                 skb = tipc_msg_create(TIPC_CRITICAL_IMPORTANCE,
486                                       TIPC_CONN_MSG, SHORT_H_SIZE, 0, dnode,
487                                       tsk_own_node(tsk), tsk_peer_port(tsk),
488                                       tsk->portid, TIPC_ERR_NO_PORT);
489                 if (skb)
490                         tipc_node_xmit_skb(net, skb, dnode, tsk->portid);
491                 tipc_node_remove_conn(net, dnode, tsk->portid);
492         }
493
494         /* Reject any messages that accumulated in backlog queue */
495         sock->state = SS_DISCONNECTING;
496         release_sock(sk);
497
498         call_rcu(&tsk->rcu, tipc_sk_callback);
499         sock->sk = NULL;
500
501         return 0;
502 }
503
504 /**
505  * tipc_bind - associate or disassocate TIPC name(s) with a socket
506  * @sock: socket structure
507  * @uaddr: socket address describing name(s) and desired operation
508  * @uaddr_len: size of socket address data structure
509  *
510  * Name and name sequence binding is indicated using a positive scope value;
511  * a negative scope value unbinds the specified name.  Specifying no name
512  * (i.e. a socket address length of 0) unbinds all names from the socket.
513  *
514  * Returns 0 on success, errno otherwise
515  *
516  * NOTE: This routine doesn't need to take the socket lock since it doesn't
517  *       access any non-constant socket information.
518  */
519 static int tipc_bind(struct socket *sock, struct sockaddr *uaddr,
520                      int uaddr_len)
521 {
522         struct sock *sk = sock->sk;
523         struct sockaddr_tipc *addr = (struct sockaddr_tipc *)uaddr;
524         struct tipc_sock *tsk = tipc_sk(sk);
525         int res = -EINVAL;
526
527         lock_sock(sk);
528         if (unlikely(!uaddr_len)) {
529                 res = tipc_sk_withdraw(tsk, 0, NULL);
530                 goto exit;
531         }
532
533         if (uaddr_len < sizeof(struct sockaddr_tipc)) {
534                 res = -EINVAL;
535                 goto exit;
536         }
537         if (addr->family != AF_TIPC) {
538                 res = -EAFNOSUPPORT;
539                 goto exit;
540         }
541
542         if (addr->addrtype == TIPC_ADDR_NAME)
543                 addr->addr.nameseq.upper = addr->addr.nameseq.lower;
544         else if (addr->addrtype != TIPC_ADDR_NAMESEQ) {
545                 res = -EAFNOSUPPORT;
546                 goto exit;
547         }
548
549         if ((addr->addr.nameseq.type < TIPC_RESERVED_TYPES) &&
550             (addr->addr.nameseq.type != TIPC_TOP_SRV) &&
551             (addr->addr.nameseq.type != TIPC_CFG_SRV)) {
552                 res = -EACCES;
553                 goto exit;
554         }
555
556         res = (addr->scope > 0) ?
557                 tipc_sk_publish(tsk, addr->scope, &addr->addr.nameseq) :
558                 tipc_sk_withdraw(tsk, -addr->scope, &addr->addr.nameseq);
559 exit:
560         release_sock(sk);
561         return res;
562 }
563
564 /**
565  * tipc_getname - get port ID of socket or peer socket
566  * @sock: socket structure
567  * @uaddr: area for returned socket address
568  * @uaddr_len: area for returned length of socket address
569  * @peer: 0 = own ID, 1 = current peer ID, 2 = current/former peer ID
570  *
571  * Returns 0 on success, errno otherwise
572  *
573  * NOTE: This routine doesn't need to take the socket lock since it only
574  *       accesses socket information that is unchanging (or which changes in
575  *       a completely predictable manner).
576  */
577 static int tipc_getname(struct socket *sock, struct sockaddr *uaddr,
578                         int *uaddr_len, int peer)
579 {
580         struct sockaddr_tipc *addr = (struct sockaddr_tipc *)uaddr;
581         struct tipc_sock *tsk = tipc_sk(sock->sk);
582         struct tipc_net *tn = net_generic(sock_net(sock->sk), tipc_net_id);
583
584         memset(addr, 0, sizeof(*addr));
585         if (peer) {
586                 if ((sock->state != SS_CONNECTED) &&
587                         ((peer != 2) || (sock->state != SS_DISCONNECTING)))
588                         return -ENOTCONN;
589                 addr->addr.id.ref = tsk_peer_port(tsk);
590                 addr->addr.id.node = tsk_peer_node(tsk);
591         } else {
592                 addr->addr.id.ref = tsk->portid;
593                 addr->addr.id.node = tn->own_addr;
594         }
595
596         *uaddr_len = sizeof(*addr);
597         addr->addrtype = TIPC_ADDR_ID;
598         addr->family = AF_TIPC;
599         addr->scope = 0;
600         addr->addr.name.domain = 0;
601
602         return 0;
603 }
604
605 /**
606  * tipc_poll - read and possibly block on pollmask
607  * @file: file structure associated with the socket
608  * @sock: socket for which to calculate the poll bits
609  * @wait: ???
610  *
611  * Returns pollmask value
612  *
613  * COMMENTARY:
614  * It appears that the usual socket locking mechanisms are not useful here
615  * since the pollmask info is potentially out-of-date the moment this routine
616  * exits.  TCP and other protocols seem to rely on higher level poll routines
617  * to handle any preventable race conditions, so TIPC will do the same ...
618  *
619  * TIPC sets the returned events as follows:
620  *
621  * socket state         flags set
622  * ------------         ---------
623  * unconnected          no read flags
624  *                      POLLOUT if port is not congested
625  *
626  * connecting           POLLIN/POLLRDNORM if ACK/NACK in rx queue
627  *                      no write flags
628  *
629  * connected            POLLIN/POLLRDNORM if data in rx queue
630  *                      POLLOUT if port is not congested
631  *
632  * disconnecting        POLLIN/POLLRDNORM/POLLHUP
633  *                      no write flags
634  *
635  * listening            POLLIN if SYN in rx queue
636  *                      no write flags
637  *
638  * ready                POLLIN/POLLRDNORM if data in rx queue
639  * [connectionless]     POLLOUT (since port cannot be congested)
640  *
641  * IMPORTANT: The fact that a read or write operation is indicated does NOT
642  * imply that the operation will succeed, merely that it should be performed
643  * and will not block.
644  */
645 static unsigned int tipc_poll(struct file *file, struct socket *sock,
646                               poll_table *wait)
647 {
648         struct sock *sk = sock->sk;
649         struct tipc_sock *tsk = tipc_sk(sk);
650         u32 mask = 0;
651
652         sock_poll_wait(file, sk_sleep(sk), wait);
653
654         switch ((int)sock->state) {
655         case SS_UNCONNECTED:
656                 if (!tsk->link_cong)
657                         mask |= POLLOUT;
658                 break;
659         case SS_READY:
660         case SS_CONNECTED:
661                 if (!tsk->link_cong && !tsk_conn_cong(tsk))
662                         mask |= POLLOUT;
663                 /* fall thru' */
664         case SS_CONNECTING:
665         case SS_LISTENING:
666                 if (!skb_queue_empty(&sk->sk_receive_queue))
667                         mask |= (POLLIN | POLLRDNORM);
668                 break;
669         case SS_DISCONNECTING:
670                 mask = (POLLIN | POLLRDNORM | POLLHUP);
671                 break;
672         }
673
674         return mask;
675 }
676
677 /**
678  * tipc_sendmcast - send multicast message
679  * @sock: socket structure
680  * @seq: destination address
681  * @msg: message to send
682  * @dsz: total length of message data
683  * @timeo: timeout to wait for wakeup
684  *
685  * Called from function tipc_sendmsg(), which has done all sanity checks
686  * Returns the number of bytes sent on success, or errno
687  */
688 static int tipc_sendmcast(struct  socket *sock, struct tipc_name_seq *seq,
689                           struct msghdr *msg, size_t dsz, long timeo)
690 {
691         struct sock *sk = sock->sk;
692         struct tipc_sock *tsk = tipc_sk(sk);
693         struct net *net = sock_net(sk);
694         struct tipc_msg *mhdr = &tsk->phdr;
695         struct sk_buff_head pktchain;
696         struct iov_iter save = msg->msg_iter;
697         uint mtu;
698         int rc;
699
700         if (!timeo && tsk->link_cong)
701                 return -ELINKCONG;
702
703         msg_set_type(mhdr, TIPC_MCAST_MSG);
704         msg_set_lookup_scope(mhdr, TIPC_CLUSTER_SCOPE);
705         msg_set_destport(mhdr, 0);
706         msg_set_destnode(mhdr, 0);
707         msg_set_nametype(mhdr, seq->type);
708         msg_set_namelower(mhdr, seq->lower);
709         msg_set_nameupper(mhdr, seq->upper);
710         msg_set_hdr_sz(mhdr, MCAST_H_SIZE);
711
712         skb_queue_head_init(&pktchain);
713
714 new_mtu:
715         mtu = tipc_bcast_get_mtu(net);
716         rc = tipc_msg_build(mhdr, msg, 0, dsz, mtu, &pktchain);
717         if (unlikely(rc < 0))
718                 return rc;
719
720         do {
721                 rc = tipc_bcast_xmit(net, &pktchain);
722                 if (likely(!rc))
723                         return dsz;
724
725                 if (rc == -ELINKCONG) {
726                         tsk->link_cong = 1;
727                         rc = tipc_wait_for_sndmsg(sock, &timeo);
728                         if (!rc)
729                                 continue;
730                 }
731                 __skb_queue_purge(&pktchain);
732                 if (rc == -EMSGSIZE) {
733                         msg->msg_iter = save;
734                         goto new_mtu;
735                 }
736                 break;
737         } while (1);
738         return rc;
739 }
740
741 /**
742  * tipc_sk_mcast_rcv - Deliver multicast messages to all destination sockets
743  * @arrvq: queue with arriving messages, to be cloned after destination lookup
744  * @inputq: queue with cloned messages, delivered to socket after dest lookup
745  *
746  * Multi-threaded: parallel calls with reference to same queues may occur
747  */
748 void tipc_sk_mcast_rcv(struct net *net, struct sk_buff_head *arrvq,
749                        struct sk_buff_head *inputq)
750 {
751         struct tipc_msg *msg;
752         struct tipc_plist dports;
753         u32 portid;
754         u32 scope = TIPC_CLUSTER_SCOPE;
755         struct sk_buff_head tmpq;
756         uint hsz;
757         struct sk_buff *skb, *_skb;
758
759         __skb_queue_head_init(&tmpq);
760         tipc_plist_init(&dports);
761
762         skb = tipc_skb_peek(arrvq, &inputq->lock);
763         for (; skb; skb = tipc_skb_peek(arrvq, &inputq->lock)) {
764                 msg = buf_msg(skb);
765                 hsz = skb_headroom(skb) + msg_hdr_sz(msg);
766
767                 if (in_own_node(net, msg_orignode(msg)))
768                         scope = TIPC_NODE_SCOPE;
769
770                 /* Create destination port list and message clones: */
771                 tipc_nametbl_mc_translate(net,
772                                           msg_nametype(msg), msg_namelower(msg),
773                                           msg_nameupper(msg), scope, &dports);
774                 portid = tipc_plist_pop(&dports);
775                 for (; portid; portid = tipc_plist_pop(&dports)) {
776                         _skb = __pskb_copy(skb, hsz, GFP_ATOMIC);
777                         if (_skb) {
778                                 msg_set_destport(buf_msg(_skb), portid);
779                                 __skb_queue_tail(&tmpq, _skb);
780                                 continue;
781                         }
782                         pr_warn("Failed to clone mcast rcv buffer\n");
783                 }
784                 /* Append to inputq if not already done by other thread */
785                 spin_lock_bh(&inputq->lock);
786                 if (skb_peek(arrvq) == skb) {
787                         skb_queue_splice_tail_init(&tmpq, inputq);
788                         kfree_skb(__skb_dequeue(arrvq));
789                 }
790                 spin_unlock_bh(&inputq->lock);
791                 __skb_queue_purge(&tmpq);
792                 kfree_skb(skb);
793         }
794         tipc_sk_rcv(net, inputq);
795 }
796
797 /**
798  * tipc_sk_proto_rcv - receive a connection mng protocol message
799  * @tsk: receiving socket
800  * @skb: pointer to message buffer.
801  */
802 static void tipc_sk_proto_rcv(struct tipc_sock *tsk, struct sk_buff *skb,
803                               struct sk_buff_head *xmitq)
804 {
805         struct sock *sk = &tsk->sk;
806         u32 onode = tsk_own_node(tsk);
807         struct tipc_msg *hdr = buf_msg(skb);
808         int mtyp = msg_type(hdr);
809         bool conn_cong;
810
811         /* Ignore if connection cannot be validated: */
812         if (!tsk_peer_msg(tsk, hdr))
813                 goto exit;
814
815         tsk->probing_state = TIPC_CONN_OK;
816
817         if (mtyp == CONN_PROBE) {
818                 msg_set_type(hdr, CONN_PROBE_REPLY);
819                 if (tipc_msg_reverse(onode, &skb, TIPC_OK))
820                         __skb_queue_tail(xmitq, skb);
821                 return;
822         } else if (mtyp == CONN_ACK) {
823                 conn_cong = tsk_conn_cong(tsk);
824                 tsk->snt_unacked -= msg_conn_ack(hdr);
825                 if (tsk->peer_caps & TIPC_BLOCK_FLOWCTL)
826                         tsk->snd_win = msg_adv_win(hdr);
827                 if (conn_cong)
828                         sk->sk_write_space(sk);
829         } else if (mtyp != CONN_PROBE_REPLY) {
830                 pr_warn("Received unknown CONN_PROTO msg\n");
831         }
832 exit:
833         kfree_skb(skb);
834 }
835
836 static int tipc_wait_for_sndmsg(struct socket *sock, long *timeo_p)
837 {
838         struct sock *sk = sock->sk;
839         struct tipc_sock *tsk = tipc_sk(sk);
840         DEFINE_WAIT(wait);
841         int done;
842
843         do {
844                 int err = sock_error(sk);
845                 if (err)
846                         return err;
847                 if (sock->state == SS_DISCONNECTING)
848                         return -EPIPE;
849                 if (!*timeo_p)
850                         return -EAGAIN;
851                 if (signal_pending(current))
852                         return sock_intr_errno(*timeo_p);
853
854                 prepare_to_wait(sk_sleep(sk), &wait, TASK_INTERRUPTIBLE);
855                 done = sk_wait_event(sk, timeo_p, !tsk->link_cong);
856                 finish_wait(sk_sleep(sk), &wait);
857         } while (!done);
858         return 0;
859 }
860
861 /**
862  * tipc_sendmsg - send message in connectionless manner
863  * @sock: socket structure
864  * @m: message to send
865  * @dsz: amount of user data to be sent
866  *
867  * Message must have an destination specified explicitly.
868  * Used for SOCK_RDM and SOCK_DGRAM messages,
869  * and for 'SYN' messages on SOCK_SEQPACKET and SOCK_STREAM connections.
870  * (Note: 'SYN+' is prohibited on SOCK_STREAM.)
871  *
872  * Returns the number of bytes sent on success, or errno otherwise
873  */
874 static int tipc_sendmsg(struct socket *sock,
875                         struct msghdr *m, size_t dsz)
876 {
877         struct sock *sk = sock->sk;
878         int ret;
879
880         lock_sock(sk);
881         ret = __tipc_sendmsg(sock, m, dsz);
882         release_sock(sk);
883
884         return ret;
885 }
886
887 static int __tipc_sendmsg(struct socket *sock, struct msghdr *m, size_t dsz)
888 {
889         DECLARE_SOCKADDR(struct sockaddr_tipc *, dest, m->msg_name);
890         struct sock *sk = sock->sk;
891         struct tipc_sock *tsk = tipc_sk(sk);
892         struct net *net = sock_net(sk);
893         struct tipc_msg *mhdr = &tsk->phdr;
894         u32 dnode, dport;
895         struct sk_buff_head pktchain;
896         struct sk_buff *skb;
897         struct tipc_name_seq *seq;
898         struct iov_iter save;
899         u32 mtu;
900         long timeo;
901         int rc;
902
903         if (dsz > TIPC_MAX_USER_MSG_SIZE)
904                 return -EMSGSIZE;
905         if (unlikely(!dest)) {
906                 if (sock->state == SS_READY && tsk->peer.family == AF_TIPC)
907                         dest = &tsk->peer;
908                 else
909                         return -EDESTADDRREQ;
910         } else if (unlikely(m->msg_namelen < sizeof(*dest)) ||
911                    dest->family != AF_TIPC) {
912                 return -EINVAL;
913         }
914         if (unlikely(sock->state != SS_READY)) {
915                 if (sock->state == SS_LISTENING)
916                         return -EPIPE;
917                 if (sock->state != SS_UNCONNECTED)
918                         return -EISCONN;
919                 if (tsk->published)
920                         return -EOPNOTSUPP;
921                 if (dest->addrtype == TIPC_ADDR_NAME) {
922                         tsk->conn_type = dest->addr.name.name.type;
923                         tsk->conn_instance = dest->addr.name.name.instance;
924                 }
925         }
926         seq = &dest->addr.nameseq;
927         timeo = sock_sndtimeo(sk, m->msg_flags & MSG_DONTWAIT);
928
929         if (dest->addrtype == TIPC_ADDR_MCAST) {
930                 return tipc_sendmcast(sock, seq, m, dsz, timeo);
931         } else if (dest->addrtype == TIPC_ADDR_NAME) {
932                 u32 type = dest->addr.name.name.type;
933                 u32 inst = dest->addr.name.name.instance;
934                 u32 domain = dest->addr.name.domain;
935
936                 dnode = domain;
937                 msg_set_type(mhdr, TIPC_NAMED_MSG);
938                 msg_set_hdr_sz(mhdr, NAMED_H_SIZE);
939                 msg_set_nametype(mhdr, type);
940                 msg_set_nameinst(mhdr, inst);
941                 msg_set_lookup_scope(mhdr, tipc_addr_scope(domain));
942                 dport = tipc_nametbl_translate(net, type, inst, &dnode);
943                 msg_set_destnode(mhdr, dnode);
944                 msg_set_destport(mhdr, dport);
945                 if (unlikely(!dport && !dnode))
946                         return -EHOSTUNREACH;
947         } else if (dest->addrtype == TIPC_ADDR_ID) {
948                 dnode = dest->addr.id.node;
949                 msg_set_type(mhdr, TIPC_DIRECT_MSG);
950                 msg_set_lookup_scope(mhdr, 0);
951                 msg_set_destnode(mhdr, dnode);
952                 msg_set_destport(mhdr, dest->addr.id.ref);
953                 msg_set_hdr_sz(mhdr, BASIC_H_SIZE);
954         }
955
956         skb_queue_head_init(&pktchain);
957         save = m->msg_iter;
958 new_mtu:
959         mtu = tipc_node_get_mtu(net, dnode, tsk->portid);
960         rc = tipc_msg_build(mhdr, m, 0, dsz, mtu, &pktchain);
961         if (rc < 0)
962                 return rc;
963
964         do {
965                 skb = skb_peek(&pktchain);
966                 TIPC_SKB_CB(skb)->wakeup_pending = tsk->link_cong;
967                 rc = tipc_node_xmit(net, &pktchain, dnode, tsk->portid);
968                 if (likely(!rc)) {
969                         if (sock->state != SS_READY)
970                                 sock->state = SS_CONNECTING;
971                         return dsz;
972                 }
973                 if (rc == -ELINKCONG) {
974                         tsk->link_cong = 1;
975                         rc = tipc_wait_for_sndmsg(sock, &timeo);
976                         if (!rc)
977                                 continue;
978                 }
979                 __skb_queue_purge(&pktchain);
980                 if (rc == -EMSGSIZE) {
981                         m->msg_iter = save;
982                         goto new_mtu;
983                 }
984                 break;
985         } while (1);
986
987         return rc;
988 }
989
990 static int tipc_wait_for_sndpkt(struct socket *sock, long *timeo_p)
991 {
992         struct sock *sk = sock->sk;
993         struct tipc_sock *tsk = tipc_sk(sk);
994         DEFINE_WAIT(wait);
995         int done;
996
997         do {
998                 int err = sock_error(sk);
999                 if (err)
1000                         return err;
1001                 if (sock->state == SS_DISCONNECTING)
1002                         return -EPIPE;
1003                 else if (sock->state != SS_CONNECTED)
1004                         return -ENOTCONN;
1005                 if (!*timeo_p)
1006                         return -EAGAIN;
1007                 if (signal_pending(current))
1008                         return sock_intr_errno(*timeo_p);
1009
1010                 prepare_to_wait(sk_sleep(sk), &wait, TASK_INTERRUPTIBLE);
1011                 done = sk_wait_event(sk, timeo_p,
1012                                      (!tsk->link_cong &&
1013                                       !tsk_conn_cong(tsk)) ||
1014                                       !tipc_sk_connected(sk));
1015                 finish_wait(sk_sleep(sk), &wait);
1016         } while (!done);
1017         return 0;
1018 }
1019
1020 /**
1021  * tipc_send_stream - send stream-oriented data
1022  * @sock: socket structure
1023  * @m: data to send
1024  * @dsz: total length of data to be transmitted
1025  *
1026  * Used for SOCK_STREAM data.
1027  *
1028  * Returns the number of bytes sent on success (or partial success),
1029  * or errno if no data sent
1030  */
1031 static int tipc_send_stream(struct socket *sock, struct msghdr *m, size_t dsz)
1032 {
1033         struct sock *sk = sock->sk;
1034         int ret;
1035
1036         lock_sock(sk);
1037         ret = __tipc_send_stream(sock, m, dsz);
1038         release_sock(sk);
1039
1040         return ret;
1041 }
1042
1043 static int __tipc_send_stream(struct socket *sock, struct msghdr *m, size_t dsz)
1044 {
1045         struct sock *sk = sock->sk;
1046         struct net *net = sock_net(sk);
1047         struct tipc_sock *tsk = tipc_sk(sk);
1048         struct tipc_msg *mhdr = &tsk->phdr;
1049         struct sk_buff_head pktchain;
1050         DECLARE_SOCKADDR(struct sockaddr_tipc *, dest, m->msg_name);
1051         u32 portid = tsk->portid;
1052         int rc = -EINVAL;
1053         long timeo;
1054         u32 dnode;
1055         uint mtu, send, sent = 0;
1056         struct iov_iter save;
1057         int hlen = MIN_H_SIZE;
1058
1059         /* Handle implied connection establishment */
1060         if (unlikely(dest)) {
1061                 rc = __tipc_sendmsg(sock, m, dsz);
1062                 hlen = msg_hdr_sz(mhdr);
1063                 if (dsz && (dsz == rc))
1064                         tsk->snt_unacked = tsk_inc(tsk, dsz + hlen);
1065                 return rc;
1066         }
1067         if (dsz > (uint)INT_MAX)
1068                 return -EMSGSIZE;
1069
1070         if (unlikely(sock->state != SS_CONNECTED)) {
1071                 if (sock->state == SS_DISCONNECTING)
1072                         return -EPIPE;
1073                 else
1074                         return -ENOTCONN;
1075         }
1076
1077         timeo = sock_sndtimeo(sk, m->msg_flags & MSG_DONTWAIT);
1078         if (!timeo && tsk->link_cong)
1079                 return -ELINKCONG;
1080
1081         dnode = tsk_peer_node(tsk);
1082         skb_queue_head_init(&pktchain);
1083
1084 next:
1085         save = m->msg_iter;
1086         mtu = tsk->max_pkt;
1087         send = min_t(uint, dsz - sent, TIPC_MAX_USER_MSG_SIZE);
1088         rc = tipc_msg_build(mhdr, m, sent, send, mtu, &pktchain);
1089         if (unlikely(rc < 0))
1090                 return rc;
1091
1092         do {
1093                 if (likely(!tsk_conn_cong(tsk))) {
1094                         rc = tipc_node_xmit(net, &pktchain, dnode, portid);
1095                         if (likely(!rc)) {
1096                                 tsk->snt_unacked += tsk_inc(tsk, send + hlen);
1097                                 sent += send;
1098                                 if (sent == dsz)
1099                                         return dsz;
1100                                 goto next;
1101                         }
1102                         if (rc == -EMSGSIZE) {
1103                                 __skb_queue_purge(&pktchain);
1104                                 tsk->max_pkt = tipc_node_get_mtu(net, dnode,
1105                                                                  portid);
1106                                 m->msg_iter = save;
1107                                 goto next;
1108                         }
1109                         if (rc != -ELINKCONG)
1110                                 break;
1111
1112                         tsk->link_cong = 1;
1113                 }
1114                 rc = tipc_wait_for_sndpkt(sock, &timeo);
1115         } while (!rc);
1116
1117         __skb_queue_purge(&pktchain);
1118         return sent ? sent : rc;
1119 }
1120
1121 /**
1122  * tipc_send_packet - send a connection-oriented message
1123  * @sock: socket structure
1124  * @m: message to send
1125  * @dsz: length of data to be transmitted
1126  *
1127  * Used for SOCK_SEQPACKET messages.
1128  *
1129  * Returns the number of bytes sent on success, or errno otherwise
1130  */
1131 static int tipc_send_packet(struct socket *sock, struct msghdr *m, size_t dsz)
1132 {
1133         if (dsz > TIPC_MAX_USER_MSG_SIZE)
1134                 return -EMSGSIZE;
1135
1136         return tipc_send_stream(sock, m, dsz);
1137 }
1138
1139 /* tipc_sk_finish_conn - complete the setup of a connection
1140  */
1141 static void tipc_sk_finish_conn(struct tipc_sock *tsk, u32 peer_port,
1142                                 u32 peer_node)
1143 {
1144         struct sock *sk = &tsk->sk;
1145         struct net *net = sock_net(sk);
1146         struct tipc_msg *msg = &tsk->phdr;
1147
1148         msg_set_destnode(msg, peer_node);
1149         msg_set_destport(msg, peer_port);
1150         msg_set_type(msg, TIPC_CONN_MSG);
1151         msg_set_lookup_scope(msg, 0);
1152         msg_set_hdr_sz(msg, SHORT_H_SIZE);
1153
1154         tsk->probing_state = TIPC_CONN_OK;
1155         sk_reset_timer(sk, &sk->sk_timer, jiffies + CONN_PROBING_INTERVAL);
1156         tipc_node_add_conn(net, peer_node, tsk->portid, peer_port);
1157         tsk->max_pkt = tipc_node_get_mtu(net, peer_node, tsk->portid);
1158         tsk->peer_caps = tipc_node_get_capabilities(net, peer_node);
1159         if (tsk->peer_caps & TIPC_BLOCK_FLOWCTL)
1160                 return;
1161
1162         /* Fall back to message based flow control */
1163         tsk->rcv_win = FLOWCTL_MSG_WIN;
1164         tsk->snd_win = FLOWCTL_MSG_WIN;
1165 }
1166
1167 /**
1168  * set_orig_addr - capture sender's address for received message
1169  * @m: descriptor for message info
1170  * @msg: received message header
1171  *
1172  * Note: Address is not captured if not requested by receiver.
1173  */
1174 static void set_orig_addr(struct msghdr *m, struct tipc_msg *msg)
1175 {
1176         DECLARE_SOCKADDR(struct sockaddr_tipc *, addr, m->msg_name);
1177
1178         if (addr) {
1179                 addr->family = AF_TIPC;
1180                 addr->addrtype = TIPC_ADDR_ID;
1181                 memset(&addr->addr, 0, sizeof(addr->addr));
1182                 addr->addr.id.ref = msg_origport(msg);
1183                 addr->addr.id.node = msg_orignode(msg);
1184                 addr->addr.name.domain = 0;     /* could leave uninitialized */
1185                 addr->scope = 0;                /* could leave uninitialized */
1186                 m->msg_namelen = sizeof(struct sockaddr_tipc);
1187         }
1188 }
1189
1190 /**
1191  * tipc_sk_anc_data_recv - optionally capture ancillary data for received message
1192  * @m: descriptor for message info
1193  * @msg: received message header
1194  * @tsk: TIPC port associated with message
1195  *
1196  * Note: Ancillary data is not captured if not requested by receiver.
1197  *
1198  * Returns 0 if successful, otherwise errno
1199  */
1200 static int tipc_sk_anc_data_recv(struct msghdr *m, struct tipc_msg *msg,
1201                                  struct tipc_sock *tsk)
1202 {
1203         u32 anc_data[3];
1204         u32 err;
1205         u32 dest_type;
1206         int has_name;
1207         int res;
1208
1209         if (likely(m->msg_controllen == 0))
1210                 return 0;
1211
1212         /* Optionally capture errored message object(s) */
1213         err = msg ? msg_errcode(msg) : 0;
1214         if (unlikely(err)) {
1215                 anc_data[0] = err;
1216                 anc_data[1] = msg_data_sz(msg);
1217                 res = put_cmsg(m, SOL_TIPC, TIPC_ERRINFO, 8, anc_data);
1218                 if (res)
1219                         return res;
1220                 if (anc_data[1]) {
1221                         res = put_cmsg(m, SOL_TIPC, TIPC_RETDATA, anc_data[1],
1222                                        msg_data(msg));
1223                         if (res)
1224                                 return res;
1225                 }
1226         }
1227
1228         /* Optionally capture message destination object */
1229         dest_type = msg ? msg_type(msg) : TIPC_DIRECT_MSG;
1230         switch (dest_type) {
1231         case TIPC_NAMED_MSG:
1232                 has_name = 1;
1233                 anc_data[0] = msg_nametype(msg);
1234                 anc_data[1] = msg_namelower(msg);
1235                 anc_data[2] = msg_namelower(msg);
1236                 break;
1237         case TIPC_MCAST_MSG:
1238                 has_name = 1;
1239                 anc_data[0] = msg_nametype(msg);
1240                 anc_data[1] = msg_namelower(msg);
1241                 anc_data[2] = msg_nameupper(msg);
1242                 break;
1243         case TIPC_CONN_MSG:
1244                 has_name = (tsk->conn_type != 0);
1245                 anc_data[0] = tsk->conn_type;
1246                 anc_data[1] = tsk->conn_instance;
1247                 anc_data[2] = tsk->conn_instance;
1248                 break;
1249         default:
1250                 has_name = 0;
1251         }
1252         if (has_name) {
1253                 res = put_cmsg(m, SOL_TIPC, TIPC_DESTNAME, 12, anc_data);
1254                 if (res)
1255                         return res;
1256         }
1257
1258         return 0;
1259 }
1260
1261 static void tipc_sk_send_ack(struct tipc_sock *tsk)
1262 {
1263         struct sock *sk = &tsk->sk;
1264         struct net *net = sock_net(sk);
1265         struct sk_buff *skb = NULL;
1266         struct tipc_msg *msg;
1267         u32 peer_port = tsk_peer_port(tsk);
1268         u32 dnode = tsk_peer_node(tsk);
1269
1270         if (!tipc_sk_connected(sk))
1271                 return;
1272         skb = tipc_msg_create(CONN_MANAGER, CONN_ACK, INT_H_SIZE, 0,
1273                               dnode, tsk_own_node(tsk), peer_port,
1274                               tsk->portid, TIPC_OK);
1275         if (!skb)
1276                 return;
1277         msg = buf_msg(skb);
1278         msg_set_conn_ack(msg, tsk->rcv_unacked);
1279         tsk->rcv_unacked = 0;
1280
1281         /* Adjust to and advertize the correct window limit */
1282         if (tsk->peer_caps & TIPC_BLOCK_FLOWCTL) {
1283                 tsk->rcv_win = tsk_adv_blocks(tsk->sk.sk_rcvbuf);
1284                 msg_set_adv_win(msg, tsk->rcv_win);
1285         }
1286         tipc_node_xmit_skb(net, skb, dnode, msg_link_selector(msg));
1287 }
1288
1289 static int tipc_wait_for_rcvmsg(struct socket *sock, long *timeop)
1290 {
1291         struct sock *sk = sock->sk;
1292         DEFINE_WAIT(wait);
1293         long timeo = *timeop;
1294         int err;
1295
1296         for (;;) {
1297                 prepare_to_wait(sk_sleep(sk), &wait, TASK_INTERRUPTIBLE);
1298                 if (timeo && skb_queue_empty(&sk->sk_receive_queue)) {
1299                         if (sock->state == SS_DISCONNECTING) {
1300                                 err = -ENOTCONN;
1301                                 break;
1302                         }
1303                         release_sock(sk);
1304                         timeo = schedule_timeout(timeo);
1305                         lock_sock(sk);
1306                 }
1307                 err = 0;
1308                 if (!skb_queue_empty(&sk->sk_receive_queue))
1309                         break;
1310                 err = -EAGAIN;
1311                 if (!timeo)
1312                         break;
1313                 err = sock_intr_errno(timeo);
1314                 if (signal_pending(current))
1315                         break;
1316         }
1317         finish_wait(sk_sleep(sk), &wait);
1318         *timeop = timeo;
1319         return err;
1320 }
1321
1322 /**
1323  * tipc_recvmsg - receive packet-oriented message
1324  * @m: descriptor for message info
1325  * @buf_len: total size of user buffer area
1326  * @flags: receive flags
1327  *
1328  * Used for SOCK_DGRAM, SOCK_RDM, and SOCK_SEQPACKET messages.
1329  * If the complete message doesn't fit in user area, truncate it.
1330  *
1331  * Returns size of returned message data, errno otherwise
1332  */
1333 static int tipc_recvmsg(struct socket *sock, struct msghdr *m, size_t buf_len,
1334                         int flags)
1335 {
1336         struct sock *sk = sock->sk;
1337         struct tipc_sock *tsk = tipc_sk(sk);
1338         struct sk_buff *buf;
1339         struct tipc_msg *msg;
1340         long timeo;
1341         unsigned int sz;
1342         u32 err;
1343         int res, hlen;
1344
1345         /* Catch invalid receive requests */
1346         if (unlikely(!buf_len))
1347                 return -EINVAL;
1348
1349         lock_sock(sk);
1350
1351         if (unlikely(sock->state == SS_UNCONNECTED)) {
1352                 res = -ENOTCONN;
1353                 goto exit;
1354         }
1355
1356         timeo = sock_rcvtimeo(sk, flags & MSG_DONTWAIT);
1357 restart:
1358
1359         /* Look for a message in receive queue; wait if necessary */
1360         res = tipc_wait_for_rcvmsg(sock, &timeo);
1361         if (res)
1362                 goto exit;
1363
1364         /* Look at first message in receive queue */
1365         buf = skb_peek(&sk->sk_receive_queue);
1366         msg = buf_msg(buf);
1367         sz = msg_data_sz(msg);
1368         hlen = msg_hdr_sz(msg);
1369         err = msg_errcode(msg);
1370
1371         /* Discard an empty non-errored message & try again */
1372         if ((!sz) && (!err)) {
1373                 tsk_advance_rx_queue(sk);
1374                 goto restart;
1375         }
1376
1377         /* Capture sender's address (optional) */
1378         set_orig_addr(m, msg);
1379
1380         /* Capture ancillary data (optional) */
1381         res = tipc_sk_anc_data_recv(m, msg, tsk);
1382         if (res)
1383                 goto exit;
1384
1385         /* Capture message data (if valid) & compute return value (always) */
1386         if (!err) {
1387                 if (unlikely(buf_len < sz)) {
1388                         sz = buf_len;
1389                         m->msg_flags |= MSG_TRUNC;
1390                 }
1391                 res = skb_copy_datagram_msg(buf, hlen, m, sz);
1392                 if (res)
1393                         goto exit;
1394                 res = sz;
1395         } else {
1396                 if ((sock->state == SS_READY) ||
1397                     ((err == TIPC_CONN_SHUTDOWN) || m->msg_control))
1398                         res = 0;
1399                 else
1400                         res = -ECONNRESET;
1401         }
1402
1403         if (unlikely(flags & MSG_PEEK))
1404                 goto exit;
1405
1406         if (likely(sock->state != SS_READY)) {
1407                 tsk->rcv_unacked += tsk_inc(tsk, hlen + sz);
1408                 if (unlikely(tsk->rcv_unacked >= (tsk->rcv_win / 4)))
1409                         tipc_sk_send_ack(tsk);
1410         }
1411         tsk_advance_rx_queue(sk);
1412 exit:
1413         release_sock(sk);
1414         return res;
1415 }
1416
1417 /**
1418  * tipc_recv_stream - receive stream-oriented data
1419  * @m: descriptor for message info
1420  * @buf_len: total size of user buffer area
1421  * @flags: receive flags
1422  *
1423  * Used for SOCK_STREAM messages only.  If not enough data is available
1424  * will optionally wait for more; never truncates data.
1425  *
1426  * Returns size of returned message data, errno otherwise
1427  */
1428 static int tipc_recv_stream(struct socket *sock, struct msghdr *m,
1429                             size_t buf_len, int flags)
1430 {
1431         struct sock *sk = sock->sk;
1432         struct tipc_sock *tsk = tipc_sk(sk);
1433         struct sk_buff *buf;
1434         struct tipc_msg *msg;
1435         long timeo;
1436         unsigned int sz;
1437         int target;
1438         int sz_copied = 0;
1439         u32 err;
1440         int res = 0, hlen;
1441
1442         /* Catch invalid receive attempts */
1443         if (unlikely(!buf_len))
1444                 return -EINVAL;
1445
1446         lock_sock(sk);
1447
1448         if (unlikely(sock->state == SS_UNCONNECTED)) {
1449                 res = -ENOTCONN;
1450                 goto exit;
1451         }
1452
1453         target = sock_rcvlowat(sk, flags & MSG_WAITALL, buf_len);
1454         timeo = sock_rcvtimeo(sk, flags & MSG_DONTWAIT);
1455
1456 restart:
1457         /* Look for a message in receive queue; wait if necessary */
1458         res = tipc_wait_for_rcvmsg(sock, &timeo);
1459         if (res)
1460                 goto exit;
1461
1462         /* Look at first message in receive queue */
1463         buf = skb_peek(&sk->sk_receive_queue);
1464         msg = buf_msg(buf);
1465         sz = msg_data_sz(msg);
1466         hlen = msg_hdr_sz(msg);
1467         err = msg_errcode(msg);
1468
1469         /* Discard an empty non-errored message & try again */
1470         if ((!sz) && (!err)) {
1471                 tsk_advance_rx_queue(sk);
1472                 goto restart;
1473         }
1474
1475         /* Optionally capture sender's address & ancillary data of first msg */
1476         if (sz_copied == 0) {
1477                 set_orig_addr(m, msg);
1478                 res = tipc_sk_anc_data_recv(m, msg, tsk);
1479                 if (res)
1480                         goto exit;
1481         }
1482
1483         /* Capture message data (if valid) & compute return value (always) */
1484         if (!err) {
1485                 u32 offset = TIPC_SKB_CB(buf)->bytes_read;
1486                 u32 needed;
1487                 int sz_to_copy;
1488
1489                 sz -= offset;
1490                 needed = (buf_len - sz_copied);
1491                 sz_to_copy = min(sz, needed);
1492
1493                 res = skb_copy_datagram_msg(buf, hlen + offset, m, sz_to_copy);
1494                 if (res)
1495                         goto exit;
1496
1497                 sz_copied += sz_to_copy;
1498
1499                 if (sz_to_copy < sz) {
1500                         if (!(flags & MSG_PEEK))
1501                                 TIPC_SKB_CB(buf)->bytes_read =
1502                                         offset + sz_to_copy;
1503                         goto exit;
1504                 }
1505         } else {
1506                 if (sz_copied != 0)
1507                         goto exit; /* can't add error msg to valid data */
1508
1509                 if ((err == TIPC_CONN_SHUTDOWN) || m->msg_control)
1510                         res = 0;
1511                 else
1512                         res = -ECONNRESET;
1513         }
1514
1515         if (unlikely(flags & MSG_PEEK))
1516                 goto exit;
1517
1518         tsk->rcv_unacked += tsk_inc(tsk, hlen + sz);
1519         if (unlikely(tsk->rcv_unacked >= (tsk->rcv_win / 4)))
1520                 tipc_sk_send_ack(tsk);
1521         tsk_advance_rx_queue(sk);
1522
1523         /* Loop around if more data is required */
1524         if ((sz_copied < buf_len) &&    /* didn't get all requested data */
1525             (!skb_queue_empty(&sk->sk_receive_queue) ||
1526             (sz_copied < target)) &&    /* and more is ready or required */
1527             (!err))                     /* and haven't reached a FIN */
1528                 goto restart;
1529
1530 exit:
1531         release_sock(sk);
1532         return sz_copied ? sz_copied : res;
1533 }
1534
1535 /**
1536  * tipc_write_space - wake up thread if port congestion is released
1537  * @sk: socket
1538  */
1539 static void tipc_write_space(struct sock *sk)
1540 {
1541         struct socket_wq *wq;
1542
1543         rcu_read_lock();
1544         wq = rcu_dereference(sk->sk_wq);
1545         if (skwq_has_sleeper(wq))
1546                 wake_up_interruptible_sync_poll(&wq->wait, POLLOUT |
1547                                                 POLLWRNORM | POLLWRBAND);
1548         rcu_read_unlock();
1549 }
1550
1551 /**
1552  * tipc_data_ready - wake up threads to indicate messages have been received
1553  * @sk: socket
1554  * @len: the length of messages
1555  */
1556 static void tipc_data_ready(struct sock *sk)
1557 {
1558         struct socket_wq *wq;
1559
1560         rcu_read_lock();
1561         wq = rcu_dereference(sk->sk_wq);
1562         if (skwq_has_sleeper(wq))
1563                 wake_up_interruptible_sync_poll(&wq->wait, POLLIN |
1564                                                 POLLRDNORM | POLLRDBAND);
1565         rcu_read_unlock();
1566 }
1567
1568 static void tipc_sock_destruct(struct sock *sk)
1569 {
1570         __skb_queue_purge(&sk->sk_receive_queue);
1571 }
1572
1573 /**
1574  * filter_connect - Handle all incoming messages for a connection-based socket
1575  * @tsk: TIPC socket
1576  * @skb: pointer to message buffer. Set to NULL if buffer is consumed
1577  *
1578  * Returns true if everything ok, false otherwise
1579  */
1580 static bool filter_connect(struct tipc_sock *tsk, struct sk_buff *skb)
1581 {
1582         struct sock *sk = &tsk->sk;
1583         struct net *net = sock_net(sk);
1584         struct socket *sock = sk->sk_socket;
1585         struct tipc_msg *hdr = buf_msg(skb);
1586
1587         if (unlikely(msg_mcast(hdr)))
1588                 return false;
1589
1590         switch ((int)sock->state) {
1591         case SS_CONNECTED:
1592
1593                 /* Accept only connection-based messages sent by peer */
1594                 if (unlikely(!tsk_peer_msg(tsk, hdr)))
1595                         return false;
1596
1597                 if (unlikely(msg_errcode(hdr))) {
1598                         sock->state = SS_DISCONNECTING;
1599                         /* Let timer expire on it's own */
1600                         tipc_node_remove_conn(net, tsk_peer_node(tsk),
1601                                               tsk->portid);
1602                         sk->sk_state_change(sk);
1603                 }
1604                 return true;
1605
1606         case SS_CONNECTING:
1607
1608                 /* Accept only ACK or NACK message */
1609                 if (unlikely(!msg_connected(hdr)))
1610                         return false;
1611
1612                 if (unlikely(msg_errcode(hdr))) {
1613                         sock->state = SS_DISCONNECTING;
1614                         sk->sk_err = ECONNREFUSED;
1615                         return true;
1616                 }
1617
1618                 if (unlikely(!msg_isdata(hdr))) {
1619                         sock->state = SS_DISCONNECTING;
1620                         sk->sk_err = EINVAL;
1621                         return true;
1622                 }
1623
1624                 tipc_sk_finish_conn(tsk, msg_origport(hdr), msg_orignode(hdr));
1625                 msg_set_importance(&tsk->phdr, msg_importance(hdr));
1626                 sock->state = SS_CONNECTED;
1627
1628                 /* If 'ACK+' message, add to socket receive queue */
1629                 if (msg_data_sz(hdr))
1630                         return true;
1631
1632                 /* If empty 'ACK-' message, wake up sleeping connect() */
1633                 if (waitqueue_active(sk_sleep(sk)))
1634                         wake_up_interruptible(sk_sleep(sk));
1635
1636                 /* 'ACK-' message is neither accepted nor rejected: */
1637                 msg_set_dest_droppable(hdr, 1);
1638                 return false;
1639
1640         case SS_LISTENING:
1641         case SS_UNCONNECTED:
1642
1643                 /* Accept only SYN message */
1644                 if (!msg_connected(hdr) && !(msg_errcode(hdr)))
1645                         return true;
1646                 break;
1647         case SS_DISCONNECTING:
1648                 break;
1649         default:
1650                 pr_err("Unknown socket state %u\n", sock->state);
1651         }
1652         return false;
1653 }
1654
1655 /**
1656  * rcvbuf_limit - get proper overload limit of socket receive queue
1657  * @sk: socket
1658  * @skb: message
1659  *
1660  * For connection oriented messages, irrespective of importance,
1661  * default queue limit is 2 MB.
1662  *
1663  * For connectionless messages, queue limits are based on message
1664  * importance as follows:
1665  *
1666  * TIPC_LOW_IMPORTANCE       (2 MB)
1667  * TIPC_MEDIUM_IMPORTANCE    (4 MB)
1668  * TIPC_HIGH_IMPORTANCE      (8 MB)
1669  * TIPC_CRITICAL_IMPORTANCE  (16 MB)
1670  *
1671  * Returns overload limit according to corresponding message importance
1672  */
1673 static unsigned int rcvbuf_limit(struct sock *sk, struct sk_buff *skb)
1674 {
1675         struct tipc_sock *tsk = tipc_sk(sk);
1676         struct tipc_msg *hdr = buf_msg(skb);
1677
1678         if (unlikely(!msg_connected(hdr)))
1679                 return sk->sk_rcvbuf << msg_importance(hdr);
1680
1681         if (likely(tsk->peer_caps & TIPC_BLOCK_FLOWCTL))
1682                 return sk->sk_rcvbuf;
1683
1684         return FLOWCTL_MSG_LIM;
1685 }
1686
1687 /**
1688  * filter_rcv - validate incoming message
1689  * @sk: socket
1690  * @skb: pointer to message.
1691  *
1692  * Enqueues message on receive queue if acceptable; optionally handles
1693  * disconnect indication for a connected socket.
1694  *
1695  * Called with socket lock already taken
1696  *
1697  * Returns true if message was added to socket receive queue, otherwise false
1698  */
1699 static bool filter_rcv(struct sock *sk, struct sk_buff *skb,
1700                        struct sk_buff_head *xmitq)
1701 {
1702         struct socket *sock = sk->sk_socket;
1703         struct tipc_sock *tsk = tipc_sk(sk);
1704         struct tipc_msg *hdr = buf_msg(skb);
1705         unsigned int limit = rcvbuf_limit(sk, skb);
1706         int err = TIPC_OK;
1707         int usr = msg_user(hdr);
1708
1709         if (unlikely(msg_user(hdr) == CONN_MANAGER)) {
1710                 tipc_sk_proto_rcv(tsk, skb, xmitq);
1711                 return false;
1712         }
1713
1714         if (unlikely(usr == SOCK_WAKEUP)) {
1715                 kfree_skb(skb);
1716                 tsk->link_cong = 0;
1717                 sk->sk_write_space(sk);
1718                 return false;
1719         }
1720
1721         /* Drop if illegal message type */
1722         if (unlikely(msg_type(hdr) > TIPC_DIRECT_MSG)) {
1723                 kfree_skb(skb);
1724                 return false;
1725         }
1726
1727         /* Reject if wrong message type for current socket state */
1728         if (unlikely(sock->state == SS_READY)) {
1729                 if (msg_connected(hdr)) {
1730                         err = TIPC_ERR_NO_PORT;
1731                         goto reject;
1732                 }
1733         } else if (unlikely(!filter_connect(tsk, skb))) {
1734                 err = TIPC_ERR_NO_PORT;
1735                 goto reject;
1736         }
1737
1738         /* Reject message if there isn't room to queue it */
1739         if (unlikely(sk_rmem_alloc_get(sk) + skb->truesize >= limit)) {
1740                 err = TIPC_ERR_OVERLOAD;
1741                 goto reject;
1742         }
1743
1744         /* Enqueue message */
1745         TIPC_SKB_CB(skb)->bytes_read = 0;
1746         __skb_queue_tail(&sk->sk_receive_queue, skb);
1747         skb_set_owner_r(skb, sk);
1748
1749         sk->sk_data_ready(sk);
1750         return true;
1751
1752 reject:
1753         if (tipc_msg_reverse(tsk_own_node(tsk), &skb, err))
1754                 __skb_queue_tail(xmitq, skb);
1755         return false;
1756 }
1757
1758 /**
1759  * tipc_backlog_rcv - handle incoming message from backlog queue
1760  * @sk: socket
1761  * @skb: message
1762  *
1763  * Caller must hold socket lock
1764  *
1765  * Returns 0
1766  */
1767 static int tipc_backlog_rcv(struct sock *sk, struct sk_buff *skb)
1768 {
1769         unsigned int truesize = skb->truesize;
1770         struct sk_buff_head xmitq;
1771         u32 dnode, selector;
1772
1773         __skb_queue_head_init(&xmitq);
1774
1775         if (likely(filter_rcv(sk, skb, &xmitq))) {
1776                 atomic_add(truesize, &tipc_sk(sk)->dupl_rcvcnt);
1777                 return 0;
1778         }
1779
1780         if (skb_queue_empty(&xmitq))
1781                 return 0;
1782
1783         /* Send response/rejected message */
1784         skb = __skb_dequeue(&xmitq);
1785         dnode = msg_destnode(buf_msg(skb));
1786         selector = msg_origport(buf_msg(skb));
1787         tipc_node_xmit_skb(sock_net(sk), skb, dnode, selector);
1788         return 0;
1789 }
1790
1791 /**
1792  * tipc_sk_enqueue - extract all buffers with destination 'dport' from
1793  *                   inputq and try adding them to socket or backlog queue
1794  * @inputq: list of incoming buffers with potentially different destinations
1795  * @sk: socket where the buffers should be enqueued
1796  * @dport: port number for the socket
1797  *
1798  * Caller must hold socket lock
1799  */
1800 static void tipc_sk_enqueue(struct sk_buff_head *inputq, struct sock *sk,
1801                             u32 dport, struct sk_buff_head *xmitq)
1802 {
1803         unsigned long time_limit = jiffies + 2;
1804         struct sk_buff *skb;
1805         unsigned int lim;
1806         atomic_t *dcnt;
1807         u32 onode;
1808
1809         while (skb_queue_len(inputq)) {
1810                 if (unlikely(time_after_eq(jiffies, time_limit)))
1811                         return;
1812
1813                 skb = tipc_skb_dequeue(inputq, dport);
1814                 if (unlikely(!skb))
1815                         return;
1816
1817                 /* Add message directly to receive queue if possible */
1818                 if (!sock_owned_by_user(sk)) {
1819                         filter_rcv(sk, skb, xmitq);
1820                         continue;
1821                 }
1822
1823                 /* Try backlog, compensating for double-counted bytes */
1824                 dcnt = &tipc_sk(sk)->dupl_rcvcnt;
1825                 if (!sk->sk_backlog.len)
1826                         atomic_set(dcnt, 0);
1827                 lim = rcvbuf_limit(sk, skb) + atomic_read(dcnt);
1828                 if (likely(!sk_add_backlog(sk, skb, lim)))
1829                         continue;
1830
1831                 /* Overload => reject message back to sender */
1832                 onode = tipc_own_addr(sock_net(sk));
1833                 if (tipc_msg_reverse(onode, &skb, TIPC_ERR_OVERLOAD))
1834                         __skb_queue_tail(xmitq, skb);
1835                 break;
1836         }
1837 }
1838
1839 /**
1840  * tipc_sk_rcv - handle a chain of incoming buffers
1841  * @inputq: buffer list containing the buffers
1842  * Consumes all buffers in list until inputq is empty
1843  * Note: may be called in multiple threads referring to the same queue
1844  */
1845 void tipc_sk_rcv(struct net *net, struct sk_buff_head *inputq)
1846 {
1847         struct sk_buff_head xmitq;
1848         u32 dnode, dport = 0;
1849         int err;
1850         struct tipc_sock *tsk;
1851         struct sock *sk;
1852         struct sk_buff *skb;
1853
1854         __skb_queue_head_init(&xmitq);
1855         while (skb_queue_len(inputq)) {
1856                 dport = tipc_skb_peek_port(inputq, dport);
1857                 tsk = tipc_sk_lookup(net, dport);
1858
1859                 if (likely(tsk)) {
1860                         sk = &tsk->sk;
1861                         if (likely(spin_trylock_bh(&sk->sk_lock.slock))) {
1862                                 tipc_sk_enqueue(inputq, sk, dport, &xmitq);
1863                                 spin_unlock_bh(&sk->sk_lock.slock);
1864                         }
1865                         /* Send pending response/rejected messages, if any */
1866                         while ((skb = __skb_dequeue(&xmitq))) {
1867                                 dnode = msg_destnode(buf_msg(skb));
1868                                 tipc_node_xmit_skb(net, skb, dnode, dport);
1869                         }
1870                         sock_put(sk);
1871                         continue;
1872                 }
1873
1874                 /* No destination socket => dequeue skb if still there */
1875                 skb = tipc_skb_dequeue(inputq, dport);
1876                 if (!skb)
1877                         return;
1878
1879                 /* Try secondary lookup if unresolved named message */
1880                 err = TIPC_ERR_NO_PORT;
1881                 if (tipc_msg_lookup_dest(net, skb, &err))
1882                         goto xmit;
1883
1884                 /* Prepare for message rejection */
1885                 if (!tipc_msg_reverse(tipc_own_addr(net), &skb, err))
1886                         continue;
1887 xmit:
1888                 dnode = msg_destnode(buf_msg(skb));
1889                 tipc_node_xmit_skb(net, skb, dnode, dport);
1890         }
1891 }
1892
1893 static int tipc_wait_for_connect(struct socket *sock, long *timeo_p)
1894 {
1895         struct sock *sk = sock->sk;
1896         DEFINE_WAIT(wait);
1897         int done;
1898
1899         do {
1900                 int err = sock_error(sk);
1901                 if (err)
1902                         return err;
1903                 if (!*timeo_p)
1904                         return -ETIMEDOUT;
1905                 if (signal_pending(current))
1906                         return sock_intr_errno(*timeo_p);
1907
1908                 prepare_to_wait(sk_sleep(sk), &wait, TASK_INTERRUPTIBLE);
1909                 done = sk_wait_event(sk, timeo_p, sock->state != SS_CONNECTING);
1910                 finish_wait(sk_sleep(sk), &wait);
1911         } while (!done);
1912         return 0;
1913 }
1914
1915 /**
1916  * tipc_connect - establish a connection to another TIPC port
1917  * @sock: socket structure
1918  * @dest: socket address for destination port
1919  * @destlen: size of socket address data structure
1920  * @flags: file-related flags associated with socket
1921  *
1922  * Returns 0 on success, errno otherwise
1923  */
1924 static int tipc_connect(struct socket *sock, struct sockaddr *dest,
1925                         int destlen, int flags)
1926 {
1927         struct sock *sk = sock->sk;
1928         struct tipc_sock *tsk = tipc_sk(sk);
1929         struct sockaddr_tipc *dst = (struct sockaddr_tipc *)dest;
1930         struct msghdr m = {NULL,};
1931         long timeout = (flags & O_NONBLOCK) ? 0 : tsk->conn_timeout;
1932         socket_state previous;
1933         int res = 0;
1934
1935         lock_sock(sk);
1936
1937         /* DGRAM/RDM connect(), just save the destaddr */
1938         if (sock->state == SS_READY) {
1939                 if (dst->family == AF_UNSPEC) {
1940                         memset(&tsk->peer, 0, sizeof(struct sockaddr_tipc));
1941                 } else if (destlen != sizeof(struct sockaddr_tipc)) {
1942                         res = -EINVAL;
1943                 } else {
1944                         memcpy(&tsk->peer, dest, destlen);
1945                 }
1946                 goto exit;
1947         }
1948
1949         /*
1950          * Reject connection attempt using multicast address
1951          *
1952          * Note: send_msg() validates the rest of the address fields,
1953          *       so there's no need to do it here
1954          */
1955         if (dst->addrtype == TIPC_ADDR_MCAST) {
1956                 res = -EINVAL;
1957                 goto exit;
1958         }
1959
1960         previous = sock->state;
1961         switch (sock->state) {
1962         case SS_UNCONNECTED:
1963                 /* Send a 'SYN-' to destination */
1964                 m.msg_name = dest;
1965                 m.msg_namelen = destlen;
1966
1967                 /* If connect is in non-blocking case, set MSG_DONTWAIT to
1968                  * indicate send_msg() is never blocked.
1969                  */
1970                 if (!timeout)
1971                         m.msg_flags = MSG_DONTWAIT;
1972
1973                 res = __tipc_sendmsg(sock, &m, 0);
1974                 if ((res < 0) && (res != -EWOULDBLOCK))
1975                         goto exit;
1976
1977                 /* Just entered SS_CONNECTING state; the only
1978                  * difference is that return value in non-blocking
1979                  * case is EINPROGRESS, rather than EALREADY.
1980                  */
1981                 res = -EINPROGRESS;
1982         case SS_CONNECTING:
1983                 if (previous == SS_CONNECTING)
1984                         res = -EALREADY;
1985                 if (!timeout)
1986                         goto exit;
1987                 timeout = msecs_to_jiffies(timeout);
1988                 /* Wait until an 'ACK' or 'RST' arrives, or a timeout occurs */
1989                 res = tipc_wait_for_connect(sock, &timeout);
1990                 break;
1991         case SS_CONNECTED:
1992                 res = -EISCONN;
1993                 break;
1994         default:
1995                 res = -EINVAL;
1996                 break;
1997         }
1998 exit:
1999         release_sock(sk);
2000         return res;
2001 }
2002
2003 /**
2004  * tipc_listen - allow socket to listen for incoming connections
2005  * @sock: socket structure
2006  * @len: (unused)
2007  *
2008  * Returns 0 on success, errno otherwise
2009  */
2010 static int tipc_listen(struct socket *sock, int len)
2011 {
2012         struct sock *sk = sock->sk;
2013         int res;
2014
2015         lock_sock(sk);
2016
2017         if (sock->state != SS_UNCONNECTED)
2018                 res = -EINVAL;
2019         else {
2020                 sock->state = SS_LISTENING;
2021                 res = 0;
2022         }
2023
2024         release_sock(sk);
2025         return res;
2026 }
2027
2028 static int tipc_wait_for_accept(struct socket *sock, long timeo)
2029 {
2030         struct sock *sk = sock->sk;
2031         DEFINE_WAIT(wait);
2032         int err;
2033
2034         /* True wake-one mechanism for incoming connections: only
2035          * one process gets woken up, not the 'whole herd'.
2036          * Since we do not 'race & poll' for established sockets
2037          * anymore, the common case will execute the loop only once.
2038         */
2039         for (;;) {
2040                 prepare_to_wait_exclusive(sk_sleep(sk), &wait,
2041                                           TASK_INTERRUPTIBLE);
2042                 if (timeo && skb_queue_empty(&sk->sk_receive_queue)) {
2043                         release_sock(sk);
2044                         timeo = schedule_timeout(timeo);
2045                         lock_sock(sk);
2046                 }
2047                 err = 0;
2048                 if (!skb_queue_empty(&sk->sk_receive_queue))
2049                         break;
2050                 err = -EINVAL;
2051                 if (sock->state != SS_LISTENING)
2052                         break;
2053                 err = -EAGAIN;
2054                 if (!timeo)
2055                         break;
2056                 err = sock_intr_errno(timeo);
2057                 if (signal_pending(current))
2058                         break;
2059         }
2060         finish_wait(sk_sleep(sk), &wait);
2061         return err;
2062 }
2063
2064 /**
2065  * tipc_accept - wait for connection request
2066  * @sock: listening socket
2067  * @newsock: new socket that is to be connected
2068  * @flags: file-related flags associated with socket
2069  *
2070  * Returns 0 on success, errno otherwise
2071  */
2072 static int tipc_accept(struct socket *sock, struct socket *new_sock, int flags)
2073 {
2074         struct sock *new_sk, *sk = sock->sk;
2075         struct sk_buff *buf;
2076         struct tipc_sock *new_tsock;
2077         struct tipc_msg *msg;
2078         long timeo;
2079         int res;
2080
2081         lock_sock(sk);
2082
2083         if (sock->state != SS_LISTENING) {
2084                 res = -EINVAL;
2085                 goto exit;
2086         }
2087         timeo = sock_rcvtimeo(sk, flags & O_NONBLOCK);
2088         res = tipc_wait_for_accept(sock, timeo);
2089         if (res)
2090                 goto exit;
2091
2092         buf = skb_peek(&sk->sk_receive_queue);
2093
2094         res = tipc_sk_create(sock_net(sock->sk), new_sock, 0, 0);
2095         if (res)
2096                 goto exit;
2097         security_sk_clone(sock->sk, new_sock->sk);
2098
2099         new_sk = new_sock->sk;
2100         new_tsock = tipc_sk(new_sk);
2101         msg = buf_msg(buf);
2102
2103         /* we lock on new_sk; but lockdep sees the lock on sk */
2104         lock_sock_nested(new_sk, SINGLE_DEPTH_NESTING);
2105
2106         /*
2107          * Reject any stray messages received by new socket
2108          * before the socket lock was taken (very, very unlikely)
2109          */
2110         tsk_rej_rx_queue(new_sk);
2111
2112         /* Connect new socket to it's peer */
2113         tipc_sk_finish_conn(new_tsock, msg_origport(msg), msg_orignode(msg));
2114         new_sock->state = SS_CONNECTED;
2115
2116         tsk_set_importance(new_tsock, msg_importance(msg));
2117         if (msg_named(msg)) {
2118                 new_tsock->conn_type = msg_nametype(msg);
2119                 new_tsock->conn_instance = msg_nameinst(msg);
2120         }
2121
2122         /*
2123          * Respond to 'SYN-' by discarding it & returning 'ACK'-.
2124          * Respond to 'SYN+' by queuing it on new socket.
2125          */
2126         if (!msg_data_sz(msg)) {
2127                 struct msghdr m = {NULL,};
2128
2129                 tsk_advance_rx_queue(sk);
2130                 __tipc_send_stream(new_sock, &m, 0);
2131         } else {
2132                 __skb_dequeue(&sk->sk_receive_queue);
2133                 __skb_queue_head(&new_sk->sk_receive_queue, buf);
2134                 skb_set_owner_r(buf, new_sk);
2135         }
2136         release_sock(new_sk);
2137 exit:
2138         release_sock(sk);
2139         return res;
2140 }
2141
2142 /**
2143  * tipc_shutdown - shutdown socket connection
2144  * @sock: socket structure
2145  * @how: direction to close (must be SHUT_RDWR)
2146  *
2147  * Terminates connection (if necessary), then purges socket's receive queue.
2148  *
2149  * Returns 0 on success, errno otherwise
2150  */
2151 static int tipc_shutdown(struct socket *sock, int how)
2152 {
2153         struct sock *sk = sock->sk;
2154         struct net *net = sock_net(sk);
2155         struct tipc_sock *tsk = tipc_sk(sk);
2156         struct sk_buff *skb;
2157         u32 dnode = tsk_peer_node(tsk);
2158         u32 dport = tsk_peer_port(tsk);
2159         u32 onode = tipc_own_addr(net);
2160         u32 oport = tsk->portid;
2161         int res;
2162
2163         if (how != SHUT_RDWR)
2164                 return -EINVAL;
2165
2166         lock_sock(sk);
2167
2168         switch (sock->state) {
2169         case SS_CONNECTING:
2170         case SS_CONNECTED:
2171
2172 restart:
2173                 dnode = tsk_peer_node(tsk);
2174
2175                 /* Disconnect and send a 'FIN+' or 'FIN-' message to peer */
2176                 skb = __skb_dequeue(&sk->sk_receive_queue);
2177                 if (skb) {
2178                         if (TIPC_SKB_CB(skb)->bytes_read) {
2179                                 kfree_skb(skb);
2180                                 goto restart;
2181                         }
2182                         tipc_sk_respond(sk, skb, TIPC_CONN_SHUTDOWN);
2183                 } else {
2184                         skb = tipc_msg_create(TIPC_CRITICAL_IMPORTANCE,
2185                                               TIPC_CONN_MSG, SHORT_H_SIZE,
2186                                               0, dnode, onode, dport, oport,
2187                                               TIPC_CONN_SHUTDOWN);
2188                         if (skb)
2189                                 tipc_node_xmit_skb(net, skb, dnode, tsk->portid);
2190                 }
2191                 sock->state = SS_DISCONNECTING;
2192                 tipc_node_remove_conn(net, dnode, tsk->portid);
2193                 /* fall through */
2194
2195         case SS_DISCONNECTING:
2196
2197                 /* Discard any unreceived messages */
2198                 __skb_queue_purge(&sk->sk_receive_queue);
2199
2200                 /* Wake up anyone sleeping in poll */
2201                 sk->sk_state_change(sk);
2202                 res = 0;
2203                 break;
2204
2205         default:
2206                 res = -ENOTCONN;
2207         }
2208
2209         release_sock(sk);
2210         return res;
2211 }
2212
2213 static void tipc_sk_timeout(unsigned long data)
2214 {
2215         struct tipc_sock *tsk = (struct tipc_sock *)data;
2216         struct sock *sk = &tsk->sk;
2217         struct sk_buff *skb = NULL;
2218         u32 peer_port, peer_node;
2219         u32 own_node = tsk_own_node(tsk);
2220
2221         bh_lock_sock(sk);
2222         if (!tipc_sk_connected(sk)) {
2223                 bh_unlock_sock(sk);
2224                 goto exit;
2225         }
2226         peer_port = tsk_peer_port(tsk);
2227         peer_node = tsk_peer_node(tsk);
2228
2229         if (tsk->probing_state == TIPC_CONN_PROBING) {
2230                 if (!sock_owned_by_user(sk)) {
2231                         sk->sk_socket->state = SS_DISCONNECTING;
2232                         tipc_node_remove_conn(sock_net(sk), tsk_peer_node(tsk),
2233                                               tsk_peer_port(tsk));
2234                         sk->sk_state_change(sk);
2235                 } else {
2236                         /* Try again later */
2237                         sk_reset_timer(sk, &sk->sk_timer, (HZ / 20));
2238                 }
2239
2240                 bh_unlock_sock(sk);
2241                 goto exit;
2242         }
2243
2244         skb = tipc_msg_create(CONN_MANAGER, CONN_PROBE,
2245                               INT_H_SIZE, 0, peer_node, own_node,
2246                               peer_port, tsk->portid, TIPC_OK);
2247         tsk->probing_state = TIPC_CONN_PROBING;
2248         sk_reset_timer(sk, &sk->sk_timer, jiffies + CONN_PROBING_INTERVAL);
2249         bh_unlock_sock(sk);
2250         if (skb)
2251                 tipc_node_xmit_skb(sock_net(sk), skb, peer_node, tsk->portid);
2252 exit:
2253         sock_put(sk);
2254 }
2255
2256 static int tipc_sk_publish(struct tipc_sock *tsk, uint scope,
2257                            struct tipc_name_seq const *seq)
2258 {
2259         struct sock *sk = &tsk->sk;
2260         struct net *net = sock_net(sk);
2261         struct publication *publ;
2262         u32 key;
2263
2264         if (tipc_sk_connected(sk))
2265                 return -EINVAL;
2266         key = tsk->portid + tsk->pub_count + 1;
2267         if (key == tsk->portid)
2268                 return -EADDRINUSE;
2269
2270         publ = tipc_nametbl_publish(net, seq->type, seq->lower, seq->upper,
2271                                     scope, tsk->portid, key);
2272         if (unlikely(!publ))
2273                 return -EINVAL;
2274
2275         list_add(&publ->pport_list, &tsk->publications);
2276         tsk->pub_count++;
2277         tsk->published = 1;
2278         return 0;
2279 }
2280
2281 static int tipc_sk_withdraw(struct tipc_sock *tsk, uint scope,
2282                             struct tipc_name_seq const *seq)
2283 {
2284         struct net *net = sock_net(&tsk->sk);
2285         struct publication *publ;
2286         struct publication *safe;
2287         int rc = -EINVAL;
2288
2289         list_for_each_entry_safe(publ, safe, &tsk->publications, pport_list) {
2290                 if (seq) {
2291                         if (publ->scope != scope)
2292                                 continue;
2293                         if (publ->type != seq->type)
2294                                 continue;
2295                         if (publ->lower != seq->lower)
2296                                 continue;
2297                         if (publ->upper != seq->upper)
2298                                 break;
2299                         tipc_nametbl_withdraw(net, publ->type, publ->lower,
2300                                               publ->ref, publ->key);
2301                         rc = 0;
2302                         break;
2303                 }
2304                 tipc_nametbl_withdraw(net, publ->type, publ->lower,
2305                                       publ->ref, publ->key);
2306                 rc = 0;
2307         }
2308         if (list_empty(&tsk->publications))
2309                 tsk->published = 0;
2310         return rc;
2311 }
2312
2313 /* tipc_sk_reinit: set non-zero address in all existing sockets
2314  *                 when we go from standalone to network mode.
2315  */
2316 void tipc_sk_reinit(struct net *net)
2317 {
2318         struct tipc_net *tn = net_generic(net, tipc_net_id);
2319         const struct bucket_table *tbl;
2320         struct rhash_head *pos;
2321         struct tipc_sock *tsk;
2322         struct tipc_msg *msg;
2323         int i;
2324
2325         rcu_read_lock();
2326         tbl = rht_dereference_rcu((&tn->sk_rht)->tbl, &tn->sk_rht);
2327         for (i = 0; i < tbl->size; i++) {
2328                 rht_for_each_entry_rcu(tsk, pos, tbl, i, node) {
2329                         spin_lock_bh(&tsk->sk.sk_lock.slock);
2330                         msg = &tsk->phdr;
2331                         msg_set_prevnode(msg, tn->own_addr);
2332                         msg_set_orignode(msg, tn->own_addr);
2333                         spin_unlock_bh(&tsk->sk.sk_lock.slock);
2334                 }
2335         }
2336         rcu_read_unlock();
2337 }
2338
2339 static struct tipc_sock *tipc_sk_lookup(struct net *net, u32 portid)
2340 {
2341         struct tipc_net *tn = net_generic(net, tipc_net_id);
2342         struct tipc_sock *tsk;
2343
2344         rcu_read_lock();
2345         tsk = rhashtable_lookup_fast(&tn->sk_rht, &portid, tsk_rht_params);
2346         if (tsk)
2347                 sock_hold(&tsk->sk);
2348         rcu_read_unlock();
2349
2350         return tsk;
2351 }
2352
2353 static int tipc_sk_insert(struct tipc_sock *tsk)
2354 {
2355         struct sock *sk = &tsk->sk;
2356         struct net *net = sock_net(sk);
2357         struct tipc_net *tn = net_generic(net, tipc_net_id);
2358         u32 remaining = (TIPC_MAX_PORT - TIPC_MIN_PORT) + 1;
2359         u32 portid = prandom_u32() % remaining + TIPC_MIN_PORT;
2360
2361         while (remaining--) {
2362                 portid++;
2363                 if ((portid < TIPC_MIN_PORT) || (portid > TIPC_MAX_PORT))
2364                         portid = TIPC_MIN_PORT;
2365                 tsk->portid = portid;
2366                 sock_hold(&tsk->sk);
2367                 if (!rhashtable_lookup_insert_fast(&tn->sk_rht, &tsk->node,
2368                                                    tsk_rht_params))
2369                         return 0;
2370                 sock_put(&tsk->sk);
2371         }
2372
2373         return -1;
2374 }
2375
2376 static void tipc_sk_remove(struct tipc_sock *tsk)
2377 {
2378         struct sock *sk = &tsk->sk;
2379         struct tipc_net *tn = net_generic(sock_net(sk), tipc_net_id);
2380
2381         if (!rhashtable_remove_fast(&tn->sk_rht, &tsk->node, tsk_rht_params)) {
2382                 WARN_ON(atomic_read(&sk->sk_refcnt) == 1);
2383                 __sock_put(sk);
2384         }
2385 }
2386
2387 static const struct rhashtable_params tsk_rht_params = {
2388         .nelem_hint = 192,
2389         .head_offset = offsetof(struct tipc_sock, node),
2390         .key_offset = offsetof(struct tipc_sock, portid),
2391         .key_len = sizeof(u32), /* portid */
2392         .max_size = 1048576,
2393         .min_size = 256,
2394         .automatic_shrinking = true,
2395 };
2396
2397 int tipc_sk_rht_init(struct net *net)
2398 {
2399         struct tipc_net *tn = net_generic(net, tipc_net_id);
2400
2401         return rhashtable_init(&tn->sk_rht, &tsk_rht_params);
2402 }
2403
2404 void tipc_sk_rht_destroy(struct net *net)
2405 {
2406         struct tipc_net *tn = net_generic(net, tipc_net_id);
2407
2408         /* Wait for socket readers to complete */
2409         synchronize_net();
2410
2411         rhashtable_destroy(&tn->sk_rht);
2412 }
2413
2414 /**
2415  * tipc_setsockopt - set socket option
2416  * @sock: socket structure
2417  * @lvl: option level
2418  * @opt: option identifier
2419  * @ov: pointer to new option value
2420  * @ol: length of option value
2421  *
2422  * For stream sockets only, accepts and ignores all IPPROTO_TCP options
2423  * (to ease compatibility).
2424  *
2425  * Returns 0 on success, errno otherwise
2426  */
2427 static int tipc_setsockopt(struct socket *sock, int lvl, int opt,
2428                            char __user *ov, unsigned int ol)
2429 {
2430         struct sock *sk = sock->sk;
2431         struct tipc_sock *tsk = tipc_sk(sk);
2432         u32 value;
2433         int res;
2434
2435         if ((lvl == IPPROTO_TCP) && (sock->type == SOCK_STREAM))
2436                 return 0;
2437         if (lvl != SOL_TIPC)
2438                 return -ENOPROTOOPT;
2439         if (ol < sizeof(value))
2440                 return -EINVAL;
2441         res = get_user(value, (u32 __user *)ov);
2442         if (res)
2443                 return res;
2444
2445         lock_sock(sk);
2446
2447         switch (opt) {
2448         case TIPC_IMPORTANCE:
2449                 res = tsk_set_importance(tsk, value);
2450                 break;
2451         case TIPC_SRC_DROPPABLE:
2452                 if (sock->type != SOCK_STREAM)
2453                         tsk_set_unreliable(tsk, value);
2454                 else
2455                         res = -ENOPROTOOPT;
2456                 break;
2457         case TIPC_DEST_DROPPABLE:
2458                 tsk_set_unreturnable(tsk, value);
2459                 break;
2460         case TIPC_CONN_TIMEOUT:
2461                 tipc_sk(sk)->conn_timeout = value;
2462                 /* no need to set "res", since already 0 at this point */
2463                 break;
2464         default:
2465                 res = -EINVAL;
2466         }
2467
2468         release_sock(sk);
2469
2470         return res;
2471 }
2472
2473 /**
2474  * tipc_getsockopt - get socket option
2475  * @sock: socket structure
2476  * @lvl: option level
2477  * @opt: option identifier
2478  * @ov: receptacle for option value
2479  * @ol: receptacle for length of option value
2480  *
2481  * For stream sockets only, returns 0 length result for all IPPROTO_TCP options
2482  * (to ease compatibility).
2483  *
2484  * Returns 0 on success, errno otherwise
2485  */
2486 static int tipc_getsockopt(struct socket *sock, int lvl, int opt,
2487                            char __user *ov, int __user *ol)
2488 {
2489         struct sock *sk = sock->sk;
2490         struct tipc_sock *tsk = tipc_sk(sk);
2491         int len;
2492         u32 value;
2493         int res;
2494
2495         if ((lvl == IPPROTO_TCP) && (sock->type == SOCK_STREAM))
2496                 return put_user(0, ol);
2497         if (lvl != SOL_TIPC)
2498                 return -ENOPROTOOPT;
2499         res = get_user(len, ol);
2500         if (res)
2501                 return res;
2502
2503         lock_sock(sk);
2504
2505         switch (opt) {
2506         case TIPC_IMPORTANCE:
2507                 value = tsk_importance(tsk);
2508                 break;
2509         case TIPC_SRC_DROPPABLE:
2510                 value = tsk_unreliable(tsk);
2511                 break;
2512         case TIPC_DEST_DROPPABLE:
2513                 value = tsk_unreturnable(tsk);
2514                 break;
2515         case TIPC_CONN_TIMEOUT:
2516                 value = tsk->conn_timeout;
2517                 /* no need to set "res", since already 0 at this point */
2518                 break;
2519         case TIPC_NODE_RECVQ_DEPTH:
2520                 value = 0; /* was tipc_queue_size, now obsolete */
2521                 break;
2522         case TIPC_SOCK_RECVQ_DEPTH:
2523                 value = skb_queue_len(&sk->sk_receive_queue);
2524                 break;
2525         default:
2526                 res = -EINVAL;
2527         }
2528
2529         release_sock(sk);
2530
2531         if (res)
2532                 return res;     /* "get" failed */
2533
2534         if (len < sizeof(value))
2535                 return -EINVAL;
2536
2537         if (copy_to_user(ov, &value, sizeof(value)))
2538                 return -EFAULT;
2539
2540         return put_user(sizeof(value), ol);
2541 }
2542
2543 static int tipc_ioctl(struct socket *sock, unsigned int cmd, unsigned long arg)
2544 {
2545         struct sock *sk = sock->sk;
2546         struct tipc_sioc_ln_req lnr;
2547         void __user *argp = (void __user *)arg;
2548
2549         switch (cmd) {
2550         case SIOCGETLINKNAME:
2551                 if (copy_from_user(&lnr, argp, sizeof(lnr)))
2552                         return -EFAULT;
2553                 if (!tipc_node_get_linkname(sock_net(sk),
2554                                             lnr.bearer_id & 0xffff, lnr.peer,
2555                                             lnr.linkname, TIPC_MAX_LINK_NAME)) {
2556                         if (copy_to_user(argp, &lnr, sizeof(lnr)))
2557                                 return -EFAULT;
2558                         return 0;
2559                 }
2560                 return -EADDRNOTAVAIL;
2561         default:
2562                 return -ENOIOCTLCMD;
2563         }
2564 }
2565
2566 /* Protocol switches for the various types of TIPC sockets */
2567
2568 static const struct proto_ops msg_ops = {
2569         .owner          = THIS_MODULE,
2570         .family         = AF_TIPC,
2571         .release        = tipc_release,
2572         .bind           = tipc_bind,
2573         .connect        = tipc_connect,
2574         .socketpair     = sock_no_socketpair,
2575         .accept         = sock_no_accept,
2576         .getname        = tipc_getname,
2577         .poll           = tipc_poll,
2578         .ioctl          = tipc_ioctl,
2579         .listen         = sock_no_listen,
2580         .shutdown       = tipc_shutdown,
2581         .setsockopt     = tipc_setsockopt,
2582         .getsockopt     = tipc_getsockopt,
2583         .sendmsg        = tipc_sendmsg,
2584         .recvmsg        = tipc_recvmsg,
2585         .mmap           = sock_no_mmap,
2586         .sendpage       = sock_no_sendpage
2587 };
2588
2589 static const struct proto_ops packet_ops = {
2590         .owner          = THIS_MODULE,
2591         .family         = AF_TIPC,
2592         .release        = tipc_release,
2593         .bind           = tipc_bind,
2594         .connect        = tipc_connect,
2595         .socketpair     = sock_no_socketpair,
2596         .accept         = tipc_accept,
2597         .getname        = tipc_getname,
2598         .poll           = tipc_poll,
2599         .ioctl          = tipc_ioctl,
2600         .listen         = tipc_listen,
2601         .shutdown       = tipc_shutdown,
2602         .setsockopt     = tipc_setsockopt,
2603         .getsockopt     = tipc_getsockopt,
2604         .sendmsg        = tipc_send_packet,
2605         .recvmsg        = tipc_recvmsg,
2606         .mmap           = sock_no_mmap,
2607         .sendpage       = sock_no_sendpage
2608 };
2609
2610 static const struct proto_ops stream_ops = {
2611         .owner          = THIS_MODULE,
2612         .family         = AF_TIPC,
2613         .release        = tipc_release,
2614         .bind           = tipc_bind,
2615         .connect        = tipc_connect,
2616         .socketpair     = sock_no_socketpair,
2617         .accept         = tipc_accept,
2618         .getname        = tipc_getname,
2619         .poll           = tipc_poll,
2620         .ioctl          = tipc_ioctl,
2621         .listen         = tipc_listen,
2622         .shutdown       = tipc_shutdown,
2623         .setsockopt     = tipc_setsockopt,
2624         .getsockopt     = tipc_getsockopt,
2625         .sendmsg        = tipc_send_stream,
2626         .recvmsg        = tipc_recv_stream,
2627         .mmap           = sock_no_mmap,
2628         .sendpage       = sock_no_sendpage
2629 };
2630
2631 static const struct net_proto_family tipc_family_ops = {
2632         .owner          = THIS_MODULE,
2633         .family         = AF_TIPC,
2634         .create         = tipc_sk_create
2635 };
2636
2637 static struct proto tipc_proto = {
2638         .name           = "TIPC",
2639         .owner          = THIS_MODULE,
2640         .obj_size       = sizeof(struct tipc_sock),
2641         .sysctl_rmem    = sysctl_tipc_rmem
2642 };
2643
2644 /**
2645  * tipc_socket_init - initialize TIPC socket interface
2646  *
2647  * Returns 0 on success, errno otherwise
2648  */
2649 int tipc_socket_init(void)
2650 {
2651         int res;
2652
2653         res = proto_register(&tipc_proto, 1);
2654         if (res) {
2655                 pr_err("Failed to register TIPC protocol type\n");
2656                 goto out;
2657         }
2658
2659         res = sock_register(&tipc_family_ops);
2660         if (res) {
2661                 pr_err("Failed to register TIPC socket type\n");
2662                 proto_unregister(&tipc_proto);
2663                 goto out;
2664         }
2665  out:
2666         return res;
2667 }
2668
2669 /**
2670  * tipc_socket_stop - stop TIPC socket interface
2671  */
2672 void tipc_socket_stop(void)
2673 {
2674         sock_unregister(tipc_family_ops.family);
2675         proto_unregister(&tipc_proto);
2676 }
2677
2678 /* Caller should hold socket lock for the passed tipc socket. */
2679 static int __tipc_nl_add_sk_con(struct sk_buff *skb, struct tipc_sock *tsk)
2680 {
2681         u32 peer_node;
2682         u32 peer_port;
2683         struct nlattr *nest;
2684
2685         peer_node = tsk_peer_node(tsk);
2686         peer_port = tsk_peer_port(tsk);
2687
2688         nest = nla_nest_start(skb, TIPC_NLA_SOCK_CON);
2689
2690         if (nla_put_u32(skb, TIPC_NLA_CON_NODE, peer_node))
2691                 goto msg_full;
2692         if (nla_put_u32(skb, TIPC_NLA_CON_SOCK, peer_port))
2693                 goto msg_full;
2694
2695         if (tsk->conn_type != 0) {
2696                 if (nla_put_flag(skb, TIPC_NLA_CON_FLAG))
2697                         goto msg_full;
2698                 if (nla_put_u32(skb, TIPC_NLA_CON_TYPE, tsk->conn_type))
2699                         goto msg_full;
2700                 if (nla_put_u32(skb, TIPC_NLA_CON_INST, tsk->conn_instance))
2701                         goto msg_full;
2702         }
2703         nla_nest_end(skb, nest);
2704
2705         return 0;
2706
2707 msg_full:
2708         nla_nest_cancel(skb, nest);
2709
2710         return -EMSGSIZE;
2711 }
2712
2713 /* Caller should hold socket lock for the passed tipc socket. */
2714 static int __tipc_nl_add_sk(struct sk_buff *skb, struct netlink_callback *cb,
2715                             struct tipc_sock *tsk)
2716 {
2717         int err;
2718         void *hdr;
2719         struct nlattr *attrs;
2720         struct net *net = sock_net(skb->sk);
2721         struct tipc_net *tn = net_generic(net, tipc_net_id);
2722         struct sock *sk = &tsk->sk;
2723
2724         hdr = genlmsg_put(skb, NETLINK_CB(cb->skb).portid, cb->nlh->nlmsg_seq,
2725                           &tipc_genl_family, NLM_F_MULTI, TIPC_NL_SOCK_GET);
2726         if (!hdr)
2727                 goto msg_cancel;
2728
2729         attrs = nla_nest_start(skb, TIPC_NLA_SOCK);
2730         if (!attrs)
2731                 goto genlmsg_cancel;
2732         if (nla_put_u32(skb, TIPC_NLA_SOCK_REF, tsk->portid))
2733                 goto attr_msg_cancel;
2734         if (nla_put_u32(skb, TIPC_NLA_SOCK_ADDR, tn->own_addr))
2735                 goto attr_msg_cancel;
2736
2737         if (tipc_sk_connected(sk)) {
2738                 err = __tipc_nl_add_sk_con(skb, tsk);
2739                 if (err)
2740                         goto attr_msg_cancel;
2741         } else if (!list_empty(&tsk->publications)) {
2742                 if (nla_put_flag(skb, TIPC_NLA_SOCK_HAS_PUBL))
2743                         goto attr_msg_cancel;
2744         }
2745         nla_nest_end(skb, attrs);
2746         genlmsg_end(skb, hdr);
2747
2748         return 0;
2749
2750 attr_msg_cancel:
2751         nla_nest_cancel(skb, attrs);
2752 genlmsg_cancel:
2753         genlmsg_cancel(skb, hdr);
2754 msg_cancel:
2755         return -EMSGSIZE;
2756 }
2757
2758 int tipc_nl_sk_dump(struct sk_buff *skb, struct netlink_callback *cb)
2759 {
2760         int err;
2761         struct tipc_sock *tsk;
2762         const struct bucket_table *tbl;
2763         struct rhash_head *pos;
2764         struct net *net = sock_net(skb->sk);
2765         struct tipc_net *tn = net_generic(net, tipc_net_id);
2766         u32 tbl_id = cb->args[0];
2767         u32 prev_portid = cb->args[1];
2768
2769         rcu_read_lock();
2770         tbl = rht_dereference_rcu((&tn->sk_rht)->tbl, &tn->sk_rht);
2771         for (; tbl_id < tbl->size; tbl_id++) {
2772                 rht_for_each_entry_rcu(tsk, pos, tbl, tbl_id, node) {
2773                         spin_lock_bh(&tsk->sk.sk_lock.slock);
2774                         if (prev_portid && prev_portid != tsk->portid) {
2775                                 spin_unlock_bh(&tsk->sk.sk_lock.slock);
2776                                 continue;
2777                         }
2778
2779                         err = __tipc_nl_add_sk(skb, cb, tsk);
2780                         if (err) {
2781                                 prev_portid = tsk->portid;
2782                                 spin_unlock_bh(&tsk->sk.sk_lock.slock);
2783                                 goto out;
2784                         }
2785                         prev_portid = 0;
2786                         spin_unlock_bh(&tsk->sk.sk_lock.slock);
2787                 }
2788         }
2789 out:
2790         rcu_read_unlock();
2791         cb->args[0] = tbl_id;
2792         cb->args[1] = prev_portid;
2793
2794         return skb->len;
2795 }
2796
2797 /* Caller should hold socket lock for the passed tipc socket. */
2798 static int __tipc_nl_add_sk_publ(struct sk_buff *skb,
2799                                  struct netlink_callback *cb,
2800                                  struct publication *publ)
2801 {
2802         void *hdr;
2803         struct nlattr *attrs;
2804
2805         hdr = genlmsg_put(skb, NETLINK_CB(cb->skb).portid, cb->nlh->nlmsg_seq,
2806                           &tipc_genl_family, NLM_F_MULTI, TIPC_NL_PUBL_GET);
2807         if (!hdr)
2808                 goto msg_cancel;
2809
2810         attrs = nla_nest_start(skb, TIPC_NLA_PUBL);
2811         if (!attrs)
2812                 goto genlmsg_cancel;
2813
2814         if (nla_put_u32(skb, TIPC_NLA_PUBL_KEY, publ->key))
2815                 goto attr_msg_cancel;
2816         if (nla_put_u32(skb, TIPC_NLA_PUBL_TYPE, publ->type))
2817                 goto attr_msg_cancel;
2818         if (nla_put_u32(skb, TIPC_NLA_PUBL_LOWER, publ->lower))
2819                 goto attr_msg_cancel;
2820         if (nla_put_u32(skb, TIPC_NLA_PUBL_UPPER, publ->upper))
2821                 goto attr_msg_cancel;
2822
2823         nla_nest_end(skb, attrs);
2824         genlmsg_end(skb, hdr);
2825
2826         return 0;
2827
2828 attr_msg_cancel:
2829         nla_nest_cancel(skb, attrs);
2830 genlmsg_cancel:
2831         genlmsg_cancel(skb, hdr);
2832 msg_cancel:
2833         return -EMSGSIZE;
2834 }
2835
2836 /* Caller should hold socket lock for the passed tipc socket. */
2837 static int __tipc_nl_list_sk_publ(struct sk_buff *skb,
2838                                   struct netlink_callback *cb,
2839                                   struct tipc_sock *tsk, u32 *last_publ)
2840 {
2841         int err;
2842         struct publication *p;
2843
2844         if (*last_publ) {
2845                 list_for_each_entry(p, &tsk->publications, pport_list) {
2846                         if (p->key == *last_publ)
2847                                 break;
2848                 }
2849                 if (p->key != *last_publ) {
2850                         /* We never set seq or call nl_dump_check_consistent()
2851                          * this means that setting prev_seq here will cause the
2852                          * consistence check to fail in the netlink callback
2853                          * handler. Resulting in the last NLMSG_DONE message
2854                          * having the NLM_F_DUMP_INTR flag set.
2855                          */
2856                         cb->prev_seq = 1;
2857                         *last_publ = 0;
2858                         return -EPIPE;
2859                 }
2860         } else {
2861                 p = list_first_entry(&tsk->publications, struct publication,
2862                                      pport_list);
2863         }
2864
2865         list_for_each_entry_from(p, &tsk->publications, pport_list) {
2866                 err = __tipc_nl_add_sk_publ(skb, cb, p);
2867                 if (err) {
2868                         *last_publ = p->key;
2869                         return err;
2870                 }
2871         }
2872         *last_publ = 0;
2873
2874         return 0;
2875 }
2876
2877 int tipc_nl_publ_dump(struct sk_buff *skb, struct netlink_callback *cb)
2878 {
2879         int err;
2880         u32 tsk_portid = cb->args[0];
2881         u32 last_publ = cb->args[1];
2882         u32 done = cb->args[2];
2883         struct net *net = sock_net(skb->sk);
2884         struct tipc_sock *tsk;
2885
2886         if (!tsk_portid) {
2887                 struct nlattr **attrs;
2888                 struct nlattr *sock[TIPC_NLA_SOCK_MAX + 1];
2889
2890                 err = tipc_nlmsg_parse(cb->nlh, &attrs);
2891                 if (err)
2892                         return err;
2893
2894                 if (!attrs[TIPC_NLA_SOCK])
2895                         return -EINVAL;
2896
2897                 err = nla_parse_nested(sock, TIPC_NLA_SOCK_MAX,
2898                                        attrs[TIPC_NLA_SOCK],
2899                                        tipc_nl_sock_policy);
2900                 if (err)
2901                         return err;
2902
2903                 if (!sock[TIPC_NLA_SOCK_REF])
2904                         return -EINVAL;
2905
2906                 tsk_portid = nla_get_u32(sock[TIPC_NLA_SOCK_REF]);
2907         }
2908
2909         if (done)
2910                 return 0;
2911
2912         tsk = tipc_sk_lookup(net, tsk_portid);
2913         if (!tsk)
2914                 return -EINVAL;
2915
2916         lock_sock(&tsk->sk);
2917         err = __tipc_nl_list_sk_publ(skb, cb, tsk, &last_publ);
2918         if (!err)
2919                 done = 1;
2920         release_sock(&tsk->sk);
2921         sock_put(&tsk->sk);
2922
2923         cb->args[0] = tsk_portid;
2924         cb->args[1] = last_publ;
2925         cb->args[2] = done;
2926
2927         return skb->len;
2928 }