]> asedeno.scripts.mit.edu Git - linux.git/blob - net/tipc/socket.c
c543ae6cbf65ff121a0f0c6738cc71f9f778702b
[linux.git] / net / tipc / socket.c
1 /*
2  * net/tipc/socket.c: TIPC socket API
3  *
4  * Copyright (c) 2001-2007, 2012-2015, Ericsson AB
5  * Copyright (c) 2004-2008, 2010-2013, Wind River Systems
6  * All rights reserved.
7  *
8  * Redistribution and use in source and binary forms, with or without
9  * modification, are permitted provided that the following conditions are met:
10  *
11  * 1. Redistributions of source code must retain the above copyright
12  *    notice, this list of conditions and the following disclaimer.
13  * 2. Redistributions in binary form must reproduce the above copyright
14  *    notice, this list of conditions and the following disclaimer in the
15  *    documentation and/or other materials provided with the distribution.
16  * 3. Neither the names of the copyright holders nor the names of its
17  *    contributors may be used to endorse or promote products derived from
18  *    this software without specific prior written permission.
19  *
20  * Alternatively, this software may be distributed under the terms of the
21  * GNU General Public License ("GPL") version 2 as published by the Free
22  * Software Foundation.
23  *
24  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
25  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
26  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
27  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
28  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
29  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
30  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
31  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
32  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
33  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
34  * POSSIBILITY OF SUCH DAMAGE.
35  */
36
37 #include <linux/rhashtable.h>
38 #include "core.h"
39 #include "name_table.h"
40 #include "node.h"
41 #include "link.h"
42 #include "name_distr.h"
43 #include "socket.h"
44 #include "bcast.h"
45 #include "netlink.h"
46
47 #define SS_LISTENING            -1      /* socket is listening */
48 #define SS_READY                -2      /* socket is connectionless */
49
50 #define CONN_TIMEOUT_DEFAULT    8000    /* default connect timeout = 8s */
51 #define CONN_PROBING_INTERVAL   msecs_to_jiffies(3600000)  /* [ms] => 1 h */
52 #define TIPC_FWD_MSG            1
53 #define TIPC_CONN_OK            0
54 #define TIPC_CONN_PROBING       1
55 #define TIPC_MAX_PORT           0xffffffff
56 #define TIPC_MIN_PORT           1
57
58 /**
59  * struct tipc_sock - TIPC socket structure
60  * @sk: socket - interacts with 'port' and with user via the socket API
61  * @connected: non-zero if port is currently connected to a peer port
62  * @conn_type: TIPC type used when connection was established
63  * @conn_instance: TIPC instance used when connection was established
64  * @published: non-zero if port has one or more associated names
65  * @max_pkt: maximum packet size "hint" used when building messages sent by port
66  * @portid: unique port identity in TIPC socket hash table
67  * @phdr: preformatted message header used when sending messages
68  * @port_list: adjacent ports in TIPC's global list of ports
69  * @publications: list of publications for port
70  * @pub_count: total # of publications port has made during its lifetime
71  * @probing_state:
72  * @probing_intv:
73  * @conn_timeout: the time we can wait for an unresponded setup request
74  * @dupl_rcvcnt: number of bytes counted twice, in both backlog and rcv queue
75  * @link_cong: non-zero if owner must sleep because of link congestion
76  * @sent_unacked: # messages sent by socket, and not yet acked by peer
77  * @rcv_unacked: # messages read by user, but not yet acked back to peer
78  * @remote: 'connected' peer for dgram/rdm
79  * @node: hash table node
80  * @rcu: rcu struct for tipc_sock
81  */
82 struct tipc_sock {
83         struct sock sk;
84         int connected;
85         u32 conn_type;
86         u32 conn_instance;
87         int published;
88         u32 max_pkt;
89         u32 portid;
90         struct tipc_msg phdr;
91         struct list_head sock_list;
92         struct list_head publications;
93         u32 pub_count;
94         u32 probing_state;
95         unsigned long probing_intv;
96         uint conn_timeout;
97         atomic_t dupl_rcvcnt;
98         bool link_cong;
99         u16 snt_unacked;
100         u16 snd_win;
101         u16 peer_caps;
102         u16 rcv_unacked;
103         u16 rcv_win;
104         struct sockaddr_tipc remote;
105         struct rhash_head node;
106         struct rcu_head rcu;
107 };
108
109 static int tipc_backlog_rcv(struct sock *sk, struct sk_buff *skb);
110 static void tipc_data_ready(struct sock *sk);
111 static void tipc_write_space(struct sock *sk);
112 static void tipc_sock_destruct(struct sock *sk);
113 static int tipc_release(struct socket *sock);
114 static int tipc_accept(struct socket *sock, struct socket *new_sock, int flags);
115 static int tipc_wait_for_sndmsg(struct socket *sock, long *timeo_p);
116 static void tipc_sk_timeout(unsigned long data);
117 static int tipc_sk_publish(struct tipc_sock *tsk, uint scope,
118                            struct tipc_name_seq const *seq);
119 static int tipc_sk_withdraw(struct tipc_sock *tsk, uint scope,
120                             struct tipc_name_seq const *seq);
121 static struct tipc_sock *tipc_sk_lookup(struct net *net, u32 portid);
122 static int tipc_sk_insert(struct tipc_sock *tsk);
123 static void tipc_sk_remove(struct tipc_sock *tsk);
124 static int __tipc_send_stream(struct socket *sock, struct msghdr *m,
125                               size_t dsz);
126 static int __tipc_sendmsg(struct socket *sock, struct msghdr *m, size_t dsz);
127
128 static const struct proto_ops packet_ops;
129 static const struct proto_ops stream_ops;
130 static const struct proto_ops msg_ops;
131 static struct proto tipc_proto;
132
133 static const struct rhashtable_params tsk_rht_params;
134
135 /*
136  * Revised TIPC socket locking policy:
137  *
138  * Most socket operations take the standard socket lock when they start
139  * and hold it until they finish (or until they need to sleep).  Acquiring
140  * this lock grants the owner exclusive access to the fields of the socket
141  * data structures, with the exception of the backlog queue.  A few socket
142  * operations can be done without taking the socket lock because they only
143  * read socket information that never changes during the life of the socket.
144  *
145  * Socket operations may acquire the lock for the associated TIPC port if they
146  * need to perform an operation on the port.  If any routine needs to acquire
147  * both the socket lock and the port lock it must take the socket lock first
148  * to avoid the risk of deadlock.
149  *
150  * The dispatcher handling incoming messages cannot grab the socket lock in
151  * the standard fashion, since invoked it runs at the BH level and cannot block.
152  * Instead, it checks to see if the socket lock is currently owned by someone,
153  * and either handles the message itself or adds it to the socket's backlog
154  * queue; in the latter case the queued message is processed once the process
155  * owning the socket lock releases it.
156  *
157  * NOTE: Releasing the socket lock while an operation is sleeping overcomes
158  * the problem of a blocked socket operation preventing any other operations
159  * from occurring.  However, applications must be careful if they have
160  * multiple threads trying to send (or receive) on the same socket, as these
161  * operations might interfere with each other.  For example, doing a connect
162  * and a receive at the same time might allow the receive to consume the
163  * ACK message meant for the connect.  While additional work could be done
164  * to try and overcome this, it doesn't seem to be worthwhile at the present.
165  *
166  * NOTE: Releasing the socket lock while an operation is sleeping also ensures
167  * that another operation that must be performed in a non-blocking manner is
168  * not delayed for very long because the lock has already been taken.
169  *
170  * NOTE: This code assumes that certain fields of a port/socket pair are
171  * constant over its lifetime; such fields can be examined without taking
172  * the socket lock and/or port lock, and do not need to be re-read even
173  * after resuming processing after waiting.  These fields include:
174  *   - socket type
175  *   - pointer to socket sk structure (aka tipc_sock structure)
176  *   - pointer to port structure
177  *   - port reference
178  */
179
180 static u32 tsk_own_node(struct tipc_sock *tsk)
181 {
182         return msg_prevnode(&tsk->phdr);
183 }
184
185 static u32 tsk_peer_node(struct tipc_sock *tsk)
186 {
187         return msg_destnode(&tsk->phdr);
188 }
189
190 static u32 tsk_peer_port(struct tipc_sock *tsk)
191 {
192         return msg_destport(&tsk->phdr);
193 }
194
195 static  bool tsk_unreliable(struct tipc_sock *tsk)
196 {
197         return msg_src_droppable(&tsk->phdr) != 0;
198 }
199
200 static void tsk_set_unreliable(struct tipc_sock *tsk, bool unreliable)
201 {
202         msg_set_src_droppable(&tsk->phdr, unreliable ? 1 : 0);
203 }
204
205 static bool tsk_unreturnable(struct tipc_sock *tsk)
206 {
207         return msg_dest_droppable(&tsk->phdr) != 0;
208 }
209
210 static void tsk_set_unreturnable(struct tipc_sock *tsk, bool unreturnable)
211 {
212         msg_set_dest_droppable(&tsk->phdr, unreturnable ? 1 : 0);
213 }
214
215 static int tsk_importance(struct tipc_sock *tsk)
216 {
217         return msg_importance(&tsk->phdr);
218 }
219
220 static int tsk_set_importance(struct tipc_sock *tsk, int imp)
221 {
222         if (imp > TIPC_CRITICAL_IMPORTANCE)
223                 return -EINVAL;
224         msg_set_importance(&tsk->phdr, (u32)imp);
225         return 0;
226 }
227
228 static struct tipc_sock *tipc_sk(const struct sock *sk)
229 {
230         return container_of(sk, struct tipc_sock, sk);
231 }
232
233 static bool tsk_conn_cong(struct tipc_sock *tsk)
234 {
235         return tsk->snt_unacked >= tsk->snd_win;
236 }
237
238 /* tsk_blocks(): translate a buffer size in bytes to number of
239  * advertisable blocks, taking into account the ratio truesize(len)/len
240  * We can trust that this ratio is always < 4 for len >= FLOWCTL_BLK_SZ
241  */
242 static u16 tsk_adv_blocks(int len)
243 {
244         return len / FLOWCTL_BLK_SZ / 4;
245 }
246
247 /* tsk_inc(): increment counter for sent or received data
248  * - If block based flow control is not supported by peer we
249  *   fall back to message based ditto, incrementing the counter
250  */
251 static u16 tsk_inc(struct tipc_sock *tsk, int msglen)
252 {
253         if (likely(tsk->peer_caps & TIPC_BLOCK_FLOWCTL))
254                 return ((msglen / FLOWCTL_BLK_SZ) + 1);
255         return 1;
256 }
257
258 /**
259  * tsk_advance_rx_queue - discard first buffer in socket receive queue
260  *
261  * Caller must hold socket lock
262  */
263 static void tsk_advance_rx_queue(struct sock *sk)
264 {
265         kfree_skb(__skb_dequeue(&sk->sk_receive_queue));
266 }
267
268 /* tipc_sk_respond() : send response message back to sender
269  */
270 static void tipc_sk_respond(struct sock *sk, struct sk_buff *skb, int err)
271 {
272         u32 selector;
273         u32 dnode;
274         u32 onode = tipc_own_addr(sock_net(sk));
275
276         if (!tipc_msg_reverse(onode, &skb, err))
277                 return;
278
279         dnode = msg_destnode(buf_msg(skb));
280         selector = msg_origport(buf_msg(skb));
281         tipc_node_xmit_skb(sock_net(sk), skb, dnode, selector);
282 }
283
284 /**
285  * tsk_rej_rx_queue - reject all buffers in socket receive queue
286  *
287  * Caller must hold socket lock
288  */
289 static void tsk_rej_rx_queue(struct sock *sk)
290 {
291         struct sk_buff *skb;
292
293         while ((skb = __skb_dequeue(&sk->sk_receive_queue)))
294                 tipc_sk_respond(sk, skb, TIPC_ERR_NO_PORT);
295 }
296
297 /* tsk_peer_msg - verify if message was sent by connected port's peer
298  *
299  * Handles cases where the node's network address has changed from
300  * the default of <0.0.0> to its configured setting.
301  */
302 static bool tsk_peer_msg(struct tipc_sock *tsk, struct tipc_msg *msg)
303 {
304         struct tipc_net *tn = net_generic(sock_net(&tsk->sk), tipc_net_id);
305         u32 peer_port = tsk_peer_port(tsk);
306         u32 orig_node;
307         u32 peer_node;
308
309         if (unlikely(!tsk->connected))
310                 return false;
311
312         if (unlikely(msg_origport(msg) != peer_port))
313                 return false;
314
315         orig_node = msg_orignode(msg);
316         peer_node = tsk_peer_node(tsk);
317
318         if (likely(orig_node == peer_node))
319                 return true;
320
321         if (!orig_node && (peer_node == tn->own_addr))
322                 return true;
323
324         if (!peer_node && (orig_node == tn->own_addr))
325                 return true;
326
327         return false;
328 }
329
330 /**
331  * tipc_sk_create - create a TIPC socket
332  * @net: network namespace (must be default network)
333  * @sock: pre-allocated socket structure
334  * @protocol: protocol indicator (must be 0)
335  * @kern: caused by kernel or by userspace?
336  *
337  * This routine creates additional data structures used by the TIPC socket,
338  * initializes them, and links them together.
339  *
340  * Returns 0 on success, errno otherwise
341  */
342 static int tipc_sk_create(struct net *net, struct socket *sock,
343                           int protocol, int kern)
344 {
345         struct tipc_net *tn;
346         const struct proto_ops *ops;
347         socket_state state;
348         struct sock *sk;
349         struct tipc_sock *tsk;
350         struct tipc_msg *msg;
351
352         /* Validate arguments */
353         if (unlikely(protocol != 0))
354                 return -EPROTONOSUPPORT;
355
356         switch (sock->type) {
357         case SOCK_STREAM:
358                 ops = &stream_ops;
359                 state = SS_UNCONNECTED;
360                 break;
361         case SOCK_SEQPACKET:
362                 ops = &packet_ops;
363                 state = SS_UNCONNECTED;
364                 break;
365         case SOCK_DGRAM:
366         case SOCK_RDM:
367                 ops = &msg_ops;
368                 state = SS_READY;
369                 break;
370         default:
371                 return -EPROTOTYPE;
372         }
373
374         /* Allocate socket's protocol area */
375         sk = sk_alloc(net, AF_TIPC, GFP_KERNEL, &tipc_proto, kern);
376         if (sk == NULL)
377                 return -ENOMEM;
378
379         tsk = tipc_sk(sk);
380         tsk->max_pkt = MAX_PKT_DEFAULT;
381         INIT_LIST_HEAD(&tsk->publications);
382         msg = &tsk->phdr;
383         tn = net_generic(sock_net(sk), tipc_net_id);
384         tipc_msg_init(tn->own_addr, msg, TIPC_LOW_IMPORTANCE, TIPC_NAMED_MSG,
385                       NAMED_H_SIZE, 0);
386
387         /* Finish initializing socket data structures */
388         sock->ops = ops;
389         sock->state = state;
390         sock_init_data(sock, sk);
391         if (tipc_sk_insert(tsk)) {
392                 pr_warn("Socket create failed; port number exhausted\n");
393                 return -EINVAL;
394         }
395         msg_set_origport(msg, tsk->portid);
396         setup_timer(&sk->sk_timer, tipc_sk_timeout, (unsigned long)tsk);
397         sk->sk_backlog_rcv = tipc_backlog_rcv;
398         sk->sk_rcvbuf = sysctl_tipc_rmem[1];
399         sk->sk_data_ready = tipc_data_ready;
400         sk->sk_write_space = tipc_write_space;
401         sk->sk_destruct = tipc_sock_destruct;
402         tsk->conn_timeout = CONN_TIMEOUT_DEFAULT;
403         atomic_set(&tsk->dupl_rcvcnt, 0);
404
405         /* Start out with safe limits until we receive an advertised window */
406         tsk->snd_win = tsk_adv_blocks(RCVBUF_MIN);
407         tsk->rcv_win = tsk->snd_win;
408
409         if (sock->state == SS_READY) {
410                 tsk_set_unreturnable(tsk, true);
411                 if (sock->type == SOCK_DGRAM)
412                         tsk_set_unreliable(tsk, true);
413         }
414         return 0;
415 }
416
417 static void tipc_sk_callback(struct rcu_head *head)
418 {
419         struct tipc_sock *tsk = container_of(head, struct tipc_sock, rcu);
420
421         sock_put(&tsk->sk);
422 }
423
424 /**
425  * tipc_release - destroy a TIPC socket
426  * @sock: socket to destroy
427  *
428  * This routine cleans up any messages that are still queued on the socket.
429  * For DGRAM and RDM socket types, all queued messages are rejected.
430  * For SEQPACKET and STREAM socket types, the first message is rejected
431  * and any others are discarded.  (If the first message on a STREAM socket
432  * is partially-read, it is discarded and the next one is rejected instead.)
433  *
434  * NOTE: Rejected messages are not necessarily returned to the sender!  They
435  * are returned or discarded according to the "destination droppable" setting
436  * specified for the message by the sender.
437  *
438  * Returns 0 on success, errno otherwise
439  */
440 static int tipc_release(struct socket *sock)
441 {
442         struct sock *sk = sock->sk;
443         struct net *net;
444         struct tipc_sock *tsk;
445         struct sk_buff *skb;
446         u32 dnode;
447
448         /*
449          * Exit if socket isn't fully initialized (occurs when a failed accept()
450          * releases a pre-allocated child socket that was never used)
451          */
452         if (sk == NULL)
453                 return 0;
454
455         net = sock_net(sk);
456         tsk = tipc_sk(sk);
457         lock_sock(sk);
458
459         /*
460          * Reject all unreceived messages, except on an active connection
461          * (which disconnects locally & sends a 'FIN+' to peer)
462          */
463         dnode = tsk_peer_node(tsk);
464         while (sock->state != SS_DISCONNECTING) {
465                 skb = __skb_dequeue(&sk->sk_receive_queue);
466                 if (skb == NULL)
467                         break;
468                 if (TIPC_SKB_CB(skb)->bytes_read)
469                         kfree_skb(skb);
470                 else {
471                         if ((sock->state == SS_CONNECTING) ||
472                             (sock->state == SS_CONNECTED)) {
473                                 sock->state = SS_DISCONNECTING;
474                                 tsk->connected = 0;
475                                 tipc_node_remove_conn(net, dnode, tsk->portid);
476                         }
477                         tipc_sk_respond(sk, skb, TIPC_ERR_NO_PORT);
478                 }
479         }
480
481         tipc_sk_withdraw(tsk, 0, NULL);
482         sk_stop_timer(sk, &sk->sk_timer);
483         tipc_sk_remove(tsk);
484         if (tsk->connected) {
485                 skb = tipc_msg_create(TIPC_CRITICAL_IMPORTANCE,
486                                       TIPC_CONN_MSG, SHORT_H_SIZE, 0, dnode,
487                                       tsk_own_node(tsk), tsk_peer_port(tsk),
488                                       tsk->portid, TIPC_ERR_NO_PORT);
489                 if (skb)
490                         tipc_node_xmit_skb(net, skb, dnode, tsk->portid);
491                 tipc_node_remove_conn(net, dnode, tsk->portid);
492         }
493
494         /* Reject any messages that accumulated in backlog queue */
495         sock->state = SS_DISCONNECTING;
496         release_sock(sk);
497
498         call_rcu(&tsk->rcu, tipc_sk_callback);
499         sock->sk = NULL;
500
501         return 0;
502 }
503
504 /**
505  * tipc_bind - associate or disassocate TIPC name(s) with a socket
506  * @sock: socket structure
507  * @uaddr: socket address describing name(s) and desired operation
508  * @uaddr_len: size of socket address data structure
509  *
510  * Name and name sequence binding is indicated using a positive scope value;
511  * a negative scope value unbinds the specified name.  Specifying no name
512  * (i.e. a socket address length of 0) unbinds all names from the socket.
513  *
514  * Returns 0 on success, errno otherwise
515  *
516  * NOTE: This routine doesn't need to take the socket lock since it doesn't
517  *       access any non-constant socket information.
518  */
519 static int tipc_bind(struct socket *sock, struct sockaddr *uaddr,
520                      int uaddr_len)
521 {
522         struct sock *sk = sock->sk;
523         struct sockaddr_tipc *addr = (struct sockaddr_tipc *)uaddr;
524         struct tipc_sock *tsk = tipc_sk(sk);
525         int res = -EINVAL;
526
527         lock_sock(sk);
528         if (unlikely(!uaddr_len)) {
529                 res = tipc_sk_withdraw(tsk, 0, NULL);
530                 goto exit;
531         }
532
533         if (uaddr_len < sizeof(struct sockaddr_tipc)) {
534                 res = -EINVAL;
535                 goto exit;
536         }
537         if (addr->family != AF_TIPC) {
538                 res = -EAFNOSUPPORT;
539                 goto exit;
540         }
541
542         if (addr->addrtype == TIPC_ADDR_NAME)
543                 addr->addr.nameseq.upper = addr->addr.nameseq.lower;
544         else if (addr->addrtype != TIPC_ADDR_NAMESEQ) {
545                 res = -EAFNOSUPPORT;
546                 goto exit;
547         }
548
549         if ((addr->addr.nameseq.type < TIPC_RESERVED_TYPES) &&
550             (addr->addr.nameseq.type != TIPC_TOP_SRV) &&
551             (addr->addr.nameseq.type != TIPC_CFG_SRV)) {
552                 res = -EACCES;
553                 goto exit;
554         }
555
556         res = (addr->scope > 0) ?
557                 tipc_sk_publish(tsk, addr->scope, &addr->addr.nameseq) :
558                 tipc_sk_withdraw(tsk, -addr->scope, &addr->addr.nameseq);
559 exit:
560         release_sock(sk);
561         return res;
562 }
563
564 /**
565  * tipc_getname - get port ID of socket or peer socket
566  * @sock: socket structure
567  * @uaddr: area for returned socket address
568  * @uaddr_len: area for returned length of socket address
569  * @peer: 0 = own ID, 1 = current peer ID, 2 = current/former peer ID
570  *
571  * Returns 0 on success, errno otherwise
572  *
573  * NOTE: This routine doesn't need to take the socket lock since it only
574  *       accesses socket information that is unchanging (or which changes in
575  *       a completely predictable manner).
576  */
577 static int tipc_getname(struct socket *sock, struct sockaddr *uaddr,
578                         int *uaddr_len, int peer)
579 {
580         struct sockaddr_tipc *addr = (struct sockaddr_tipc *)uaddr;
581         struct tipc_sock *tsk = tipc_sk(sock->sk);
582         struct tipc_net *tn = net_generic(sock_net(sock->sk), tipc_net_id);
583
584         memset(addr, 0, sizeof(*addr));
585         if (peer) {
586                 if ((sock->state != SS_CONNECTED) &&
587                         ((peer != 2) || (sock->state != SS_DISCONNECTING)))
588                         return -ENOTCONN;
589                 addr->addr.id.ref = tsk_peer_port(tsk);
590                 addr->addr.id.node = tsk_peer_node(tsk);
591         } else {
592                 addr->addr.id.ref = tsk->portid;
593                 addr->addr.id.node = tn->own_addr;
594         }
595
596         *uaddr_len = sizeof(*addr);
597         addr->addrtype = TIPC_ADDR_ID;
598         addr->family = AF_TIPC;
599         addr->scope = 0;
600         addr->addr.name.domain = 0;
601
602         return 0;
603 }
604
605 /**
606  * tipc_poll - read and possibly block on pollmask
607  * @file: file structure associated with the socket
608  * @sock: socket for which to calculate the poll bits
609  * @wait: ???
610  *
611  * Returns pollmask value
612  *
613  * COMMENTARY:
614  * It appears that the usual socket locking mechanisms are not useful here
615  * since the pollmask info is potentially out-of-date the moment this routine
616  * exits.  TCP and other protocols seem to rely on higher level poll routines
617  * to handle any preventable race conditions, so TIPC will do the same ...
618  *
619  * TIPC sets the returned events as follows:
620  *
621  * socket state         flags set
622  * ------------         ---------
623  * unconnected          no read flags
624  *                      POLLOUT if port is not congested
625  *
626  * connecting           POLLIN/POLLRDNORM if ACK/NACK in rx queue
627  *                      no write flags
628  *
629  * connected            POLLIN/POLLRDNORM if data in rx queue
630  *                      POLLOUT if port is not congested
631  *
632  * disconnecting        POLLIN/POLLRDNORM/POLLHUP
633  *                      no write flags
634  *
635  * listening            POLLIN if SYN in rx queue
636  *                      no write flags
637  *
638  * ready                POLLIN/POLLRDNORM if data in rx queue
639  * [connectionless]     POLLOUT (since port cannot be congested)
640  *
641  * IMPORTANT: The fact that a read or write operation is indicated does NOT
642  * imply that the operation will succeed, merely that it should be performed
643  * and will not block.
644  */
645 static unsigned int tipc_poll(struct file *file, struct socket *sock,
646                               poll_table *wait)
647 {
648         struct sock *sk = sock->sk;
649         struct tipc_sock *tsk = tipc_sk(sk);
650         u32 mask = 0;
651
652         sock_poll_wait(file, sk_sleep(sk), wait);
653
654         switch ((int)sock->state) {
655         case SS_UNCONNECTED:
656                 if (!tsk->link_cong)
657                         mask |= POLLOUT;
658                 break;
659         case SS_READY:
660         case SS_CONNECTED:
661                 if (!tsk->link_cong && !tsk_conn_cong(tsk))
662                         mask |= POLLOUT;
663                 /* fall thru' */
664         case SS_CONNECTING:
665         case SS_LISTENING:
666                 if (!skb_queue_empty(&sk->sk_receive_queue))
667                         mask |= (POLLIN | POLLRDNORM);
668                 break;
669         case SS_DISCONNECTING:
670                 mask = (POLLIN | POLLRDNORM | POLLHUP);
671                 break;
672         }
673
674         return mask;
675 }
676
677 /**
678  * tipc_sendmcast - send multicast message
679  * @sock: socket structure
680  * @seq: destination address
681  * @msg: message to send
682  * @dsz: total length of message data
683  * @timeo: timeout to wait for wakeup
684  *
685  * Called from function tipc_sendmsg(), which has done all sanity checks
686  * Returns the number of bytes sent on success, or errno
687  */
688 static int tipc_sendmcast(struct  socket *sock, struct tipc_name_seq *seq,
689                           struct msghdr *msg, size_t dsz, long timeo)
690 {
691         struct sock *sk = sock->sk;
692         struct tipc_sock *tsk = tipc_sk(sk);
693         struct net *net = sock_net(sk);
694         struct tipc_msg *mhdr = &tsk->phdr;
695         struct sk_buff_head pktchain;
696         struct iov_iter save = msg->msg_iter;
697         uint mtu;
698         int rc;
699
700         if (!timeo && tsk->link_cong)
701                 return -ELINKCONG;
702
703         msg_set_type(mhdr, TIPC_MCAST_MSG);
704         msg_set_lookup_scope(mhdr, TIPC_CLUSTER_SCOPE);
705         msg_set_destport(mhdr, 0);
706         msg_set_destnode(mhdr, 0);
707         msg_set_nametype(mhdr, seq->type);
708         msg_set_namelower(mhdr, seq->lower);
709         msg_set_nameupper(mhdr, seq->upper);
710         msg_set_hdr_sz(mhdr, MCAST_H_SIZE);
711
712         skb_queue_head_init(&pktchain);
713
714 new_mtu:
715         mtu = tipc_bcast_get_mtu(net);
716         rc = tipc_msg_build(mhdr, msg, 0, dsz, mtu, &pktchain);
717         if (unlikely(rc < 0))
718                 return rc;
719
720         do {
721                 rc = tipc_bcast_xmit(net, &pktchain);
722                 if (likely(!rc))
723                         return dsz;
724
725                 if (rc == -ELINKCONG) {
726                         tsk->link_cong = 1;
727                         rc = tipc_wait_for_sndmsg(sock, &timeo);
728                         if (!rc)
729                                 continue;
730                 }
731                 __skb_queue_purge(&pktchain);
732                 if (rc == -EMSGSIZE) {
733                         msg->msg_iter = save;
734                         goto new_mtu;
735                 }
736                 break;
737         } while (1);
738         return rc;
739 }
740
741 /**
742  * tipc_sk_mcast_rcv - Deliver multicast messages to all destination sockets
743  * @arrvq: queue with arriving messages, to be cloned after destination lookup
744  * @inputq: queue with cloned messages, delivered to socket after dest lookup
745  *
746  * Multi-threaded: parallel calls with reference to same queues may occur
747  */
748 void tipc_sk_mcast_rcv(struct net *net, struct sk_buff_head *arrvq,
749                        struct sk_buff_head *inputq)
750 {
751         struct tipc_msg *msg;
752         struct tipc_plist dports;
753         u32 portid;
754         u32 scope = TIPC_CLUSTER_SCOPE;
755         struct sk_buff_head tmpq;
756         uint hsz;
757         struct sk_buff *skb, *_skb;
758
759         __skb_queue_head_init(&tmpq);
760         tipc_plist_init(&dports);
761
762         skb = tipc_skb_peek(arrvq, &inputq->lock);
763         for (; skb; skb = tipc_skb_peek(arrvq, &inputq->lock)) {
764                 msg = buf_msg(skb);
765                 hsz = skb_headroom(skb) + msg_hdr_sz(msg);
766
767                 if (in_own_node(net, msg_orignode(msg)))
768                         scope = TIPC_NODE_SCOPE;
769
770                 /* Create destination port list and message clones: */
771                 tipc_nametbl_mc_translate(net,
772                                           msg_nametype(msg), msg_namelower(msg),
773                                           msg_nameupper(msg), scope, &dports);
774                 portid = tipc_plist_pop(&dports);
775                 for (; portid; portid = tipc_plist_pop(&dports)) {
776                         _skb = __pskb_copy(skb, hsz, GFP_ATOMIC);
777                         if (_skb) {
778                                 msg_set_destport(buf_msg(_skb), portid);
779                                 __skb_queue_tail(&tmpq, _skb);
780                                 continue;
781                         }
782                         pr_warn("Failed to clone mcast rcv buffer\n");
783                 }
784                 /* Append to inputq if not already done by other thread */
785                 spin_lock_bh(&inputq->lock);
786                 if (skb_peek(arrvq) == skb) {
787                         skb_queue_splice_tail_init(&tmpq, inputq);
788                         kfree_skb(__skb_dequeue(arrvq));
789                 }
790                 spin_unlock_bh(&inputq->lock);
791                 __skb_queue_purge(&tmpq);
792                 kfree_skb(skb);
793         }
794         tipc_sk_rcv(net, inputq);
795 }
796
797 /**
798  * tipc_sk_proto_rcv - receive a connection mng protocol message
799  * @tsk: receiving socket
800  * @skb: pointer to message buffer.
801  */
802 static void tipc_sk_proto_rcv(struct tipc_sock *tsk, struct sk_buff *skb,
803                               struct sk_buff_head *xmitq)
804 {
805         struct sock *sk = &tsk->sk;
806         u32 onode = tsk_own_node(tsk);
807         struct tipc_msg *hdr = buf_msg(skb);
808         int mtyp = msg_type(hdr);
809         bool conn_cong;
810
811         /* Ignore if connection cannot be validated: */
812         if (!tsk_peer_msg(tsk, hdr))
813                 goto exit;
814
815         tsk->probing_state = TIPC_CONN_OK;
816
817         if (mtyp == CONN_PROBE) {
818                 msg_set_type(hdr, CONN_PROBE_REPLY);
819                 if (tipc_msg_reverse(onode, &skb, TIPC_OK))
820                         __skb_queue_tail(xmitq, skb);
821                 return;
822         } else if (mtyp == CONN_ACK) {
823                 conn_cong = tsk_conn_cong(tsk);
824                 tsk->snt_unacked -= msg_conn_ack(hdr);
825                 if (tsk->peer_caps & TIPC_BLOCK_FLOWCTL)
826                         tsk->snd_win = msg_adv_win(hdr);
827                 if (conn_cong)
828                         sk->sk_write_space(sk);
829         } else if (mtyp != CONN_PROBE_REPLY) {
830                 pr_warn("Received unknown CONN_PROTO msg\n");
831         }
832 exit:
833         kfree_skb(skb);
834 }
835
836 static int tipc_wait_for_sndmsg(struct socket *sock, long *timeo_p)
837 {
838         struct sock *sk = sock->sk;
839         struct tipc_sock *tsk = tipc_sk(sk);
840         DEFINE_WAIT(wait);
841         int done;
842
843         do {
844                 int err = sock_error(sk);
845                 if (err)
846                         return err;
847                 if (sock->state == SS_DISCONNECTING)
848                         return -EPIPE;
849                 if (!*timeo_p)
850                         return -EAGAIN;
851                 if (signal_pending(current))
852                         return sock_intr_errno(*timeo_p);
853
854                 prepare_to_wait(sk_sleep(sk), &wait, TASK_INTERRUPTIBLE);
855                 done = sk_wait_event(sk, timeo_p, !tsk->link_cong);
856                 finish_wait(sk_sleep(sk), &wait);
857         } while (!done);
858         return 0;
859 }
860
861 /**
862  * tipc_sendmsg - send message in connectionless manner
863  * @sock: socket structure
864  * @m: message to send
865  * @dsz: amount of user data to be sent
866  *
867  * Message must have an destination specified explicitly.
868  * Used for SOCK_RDM and SOCK_DGRAM messages,
869  * and for 'SYN' messages on SOCK_SEQPACKET and SOCK_STREAM connections.
870  * (Note: 'SYN+' is prohibited on SOCK_STREAM.)
871  *
872  * Returns the number of bytes sent on success, or errno otherwise
873  */
874 static int tipc_sendmsg(struct socket *sock,
875                         struct msghdr *m, size_t dsz)
876 {
877         struct sock *sk = sock->sk;
878         int ret;
879
880         lock_sock(sk);
881         ret = __tipc_sendmsg(sock, m, dsz);
882         release_sock(sk);
883
884         return ret;
885 }
886
887 static int __tipc_sendmsg(struct socket *sock, struct msghdr *m, size_t dsz)
888 {
889         DECLARE_SOCKADDR(struct sockaddr_tipc *, dest, m->msg_name);
890         struct sock *sk = sock->sk;
891         struct tipc_sock *tsk = tipc_sk(sk);
892         struct net *net = sock_net(sk);
893         struct tipc_msg *mhdr = &tsk->phdr;
894         u32 dnode, dport;
895         struct sk_buff_head pktchain;
896         struct sk_buff *skb;
897         struct tipc_name_seq *seq;
898         struct iov_iter save;
899         u32 mtu;
900         long timeo;
901         int rc;
902
903         if (dsz > TIPC_MAX_USER_MSG_SIZE)
904                 return -EMSGSIZE;
905         if (unlikely(!dest)) {
906                 if (tsk->connected && sock->state == SS_READY)
907                         dest = &tsk->remote;
908                 else
909                         return -EDESTADDRREQ;
910         } else if (unlikely(m->msg_namelen < sizeof(*dest)) ||
911                    dest->family != AF_TIPC) {
912                 return -EINVAL;
913         }
914         if (unlikely(sock->state != SS_READY)) {
915                 if (sock->state == SS_LISTENING)
916                         return -EPIPE;
917                 if (sock->state != SS_UNCONNECTED)
918                         return -EISCONN;
919                 if (tsk->published)
920                         return -EOPNOTSUPP;
921                 if (dest->addrtype == TIPC_ADDR_NAME) {
922                         tsk->conn_type = dest->addr.name.name.type;
923                         tsk->conn_instance = dest->addr.name.name.instance;
924                 }
925         }
926         seq = &dest->addr.nameseq;
927         timeo = sock_sndtimeo(sk, m->msg_flags & MSG_DONTWAIT);
928
929         if (dest->addrtype == TIPC_ADDR_MCAST) {
930                 return tipc_sendmcast(sock, seq, m, dsz, timeo);
931         } else if (dest->addrtype == TIPC_ADDR_NAME) {
932                 u32 type = dest->addr.name.name.type;
933                 u32 inst = dest->addr.name.name.instance;
934                 u32 domain = dest->addr.name.domain;
935
936                 dnode = domain;
937                 msg_set_type(mhdr, TIPC_NAMED_MSG);
938                 msg_set_hdr_sz(mhdr, NAMED_H_SIZE);
939                 msg_set_nametype(mhdr, type);
940                 msg_set_nameinst(mhdr, inst);
941                 msg_set_lookup_scope(mhdr, tipc_addr_scope(domain));
942                 dport = tipc_nametbl_translate(net, type, inst, &dnode);
943                 msg_set_destnode(mhdr, dnode);
944                 msg_set_destport(mhdr, dport);
945                 if (unlikely(!dport && !dnode))
946                         return -EHOSTUNREACH;
947         } else if (dest->addrtype == TIPC_ADDR_ID) {
948                 dnode = dest->addr.id.node;
949                 msg_set_type(mhdr, TIPC_DIRECT_MSG);
950                 msg_set_lookup_scope(mhdr, 0);
951                 msg_set_destnode(mhdr, dnode);
952                 msg_set_destport(mhdr, dest->addr.id.ref);
953                 msg_set_hdr_sz(mhdr, BASIC_H_SIZE);
954         }
955
956         skb_queue_head_init(&pktchain);
957         save = m->msg_iter;
958 new_mtu:
959         mtu = tipc_node_get_mtu(net, dnode, tsk->portid);
960         rc = tipc_msg_build(mhdr, m, 0, dsz, mtu, &pktchain);
961         if (rc < 0)
962                 return rc;
963
964         do {
965                 skb = skb_peek(&pktchain);
966                 TIPC_SKB_CB(skb)->wakeup_pending = tsk->link_cong;
967                 rc = tipc_node_xmit(net, &pktchain, dnode, tsk->portid);
968                 if (likely(!rc)) {
969                         if (sock->state != SS_READY)
970                                 sock->state = SS_CONNECTING;
971                         return dsz;
972                 }
973                 if (rc == -ELINKCONG) {
974                         tsk->link_cong = 1;
975                         rc = tipc_wait_for_sndmsg(sock, &timeo);
976                         if (!rc)
977                                 continue;
978                 }
979                 __skb_queue_purge(&pktchain);
980                 if (rc == -EMSGSIZE) {
981                         m->msg_iter = save;
982                         goto new_mtu;
983                 }
984                 break;
985         } while (1);
986
987         return rc;
988 }
989
990 static int tipc_wait_for_sndpkt(struct socket *sock, long *timeo_p)
991 {
992         struct sock *sk = sock->sk;
993         struct tipc_sock *tsk = tipc_sk(sk);
994         DEFINE_WAIT(wait);
995         int done;
996
997         do {
998                 int err = sock_error(sk);
999                 if (err)
1000                         return err;
1001                 if (sock->state == SS_DISCONNECTING)
1002                         return -EPIPE;
1003                 else if (sock->state != SS_CONNECTED)
1004                         return -ENOTCONN;
1005                 if (!*timeo_p)
1006                         return -EAGAIN;
1007                 if (signal_pending(current))
1008                         return sock_intr_errno(*timeo_p);
1009
1010                 prepare_to_wait(sk_sleep(sk), &wait, TASK_INTERRUPTIBLE);
1011                 done = sk_wait_event(sk, timeo_p,
1012                                      (!tsk->link_cong &&
1013                                       !tsk_conn_cong(tsk)) ||
1014                                      !tsk->connected);
1015                 finish_wait(sk_sleep(sk), &wait);
1016         } while (!done);
1017         return 0;
1018 }
1019
1020 /**
1021  * tipc_send_stream - send stream-oriented data
1022  * @sock: socket structure
1023  * @m: data to send
1024  * @dsz: total length of data to be transmitted
1025  *
1026  * Used for SOCK_STREAM data.
1027  *
1028  * Returns the number of bytes sent on success (or partial success),
1029  * or errno if no data sent
1030  */
1031 static int tipc_send_stream(struct socket *sock, struct msghdr *m, size_t dsz)
1032 {
1033         struct sock *sk = sock->sk;
1034         int ret;
1035
1036         lock_sock(sk);
1037         ret = __tipc_send_stream(sock, m, dsz);
1038         release_sock(sk);
1039
1040         return ret;
1041 }
1042
1043 static int __tipc_send_stream(struct socket *sock, struct msghdr *m, size_t dsz)
1044 {
1045         struct sock *sk = sock->sk;
1046         struct net *net = sock_net(sk);
1047         struct tipc_sock *tsk = tipc_sk(sk);
1048         struct tipc_msg *mhdr = &tsk->phdr;
1049         struct sk_buff_head pktchain;
1050         DECLARE_SOCKADDR(struct sockaddr_tipc *, dest, m->msg_name);
1051         u32 portid = tsk->portid;
1052         int rc = -EINVAL;
1053         long timeo;
1054         u32 dnode;
1055         uint mtu, send, sent = 0;
1056         struct iov_iter save;
1057         int hlen = MIN_H_SIZE;
1058
1059         /* Handle implied connection establishment */
1060         if (unlikely(dest)) {
1061                 rc = __tipc_sendmsg(sock, m, dsz);
1062                 hlen = msg_hdr_sz(mhdr);
1063                 if (dsz && (dsz == rc))
1064                         tsk->snt_unacked = tsk_inc(tsk, dsz + hlen);
1065                 return rc;
1066         }
1067         if (dsz > (uint)INT_MAX)
1068                 return -EMSGSIZE;
1069
1070         if (unlikely(sock->state != SS_CONNECTED)) {
1071                 if (sock->state == SS_DISCONNECTING)
1072                         return -EPIPE;
1073                 else
1074                         return -ENOTCONN;
1075         }
1076
1077         timeo = sock_sndtimeo(sk, m->msg_flags & MSG_DONTWAIT);
1078         if (!timeo && tsk->link_cong)
1079                 return -ELINKCONG;
1080
1081         dnode = tsk_peer_node(tsk);
1082         skb_queue_head_init(&pktchain);
1083
1084 next:
1085         save = m->msg_iter;
1086         mtu = tsk->max_pkt;
1087         send = min_t(uint, dsz - sent, TIPC_MAX_USER_MSG_SIZE);
1088         rc = tipc_msg_build(mhdr, m, sent, send, mtu, &pktchain);
1089         if (unlikely(rc < 0))
1090                 return rc;
1091
1092         do {
1093                 if (likely(!tsk_conn_cong(tsk))) {
1094                         rc = tipc_node_xmit(net, &pktchain, dnode, portid);
1095                         if (likely(!rc)) {
1096                                 tsk->snt_unacked += tsk_inc(tsk, send + hlen);
1097                                 sent += send;
1098                                 if (sent == dsz)
1099                                         return dsz;
1100                                 goto next;
1101                         }
1102                         if (rc == -EMSGSIZE) {
1103                                 __skb_queue_purge(&pktchain);
1104                                 tsk->max_pkt = tipc_node_get_mtu(net, dnode,
1105                                                                  portid);
1106                                 m->msg_iter = save;
1107                                 goto next;
1108                         }
1109                         if (rc != -ELINKCONG)
1110                                 break;
1111
1112                         tsk->link_cong = 1;
1113                 }
1114                 rc = tipc_wait_for_sndpkt(sock, &timeo);
1115         } while (!rc);
1116
1117         __skb_queue_purge(&pktchain);
1118         return sent ? sent : rc;
1119 }
1120
1121 /**
1122  * tipc_send_packet - send a connection-oriented message
1123  * @sock: socket structure
1124  * @m: message to send
1125  * @dsz: length of data to be transmitted
1126  *
1127  * Used for SOCK_SEQPACKET messages.
1128  *
1129  * Returns the number of bytes sent on success, or errno otherwise
1130  */
1131 static int tipc_send_packet(struct socket *sock, struct msghdr *m, size_t dsz)
1132 {
1133         if (dsz > TIPC_MAX_USER_MSG_SIZE)
1134                 return -EMSGSIZE;
1135
1136         return tipc_send_stream(sock, m, dsz);
1137 }
1138
1139 /* tipc_sk_finish_conn - complete the setup of a connection
1140  */
1141 static void tipc_sk_finish_conn(struct tipc_sock *tsk, u32 peer_port,
1142                                 u32 peer_node)
1143 {
1144         struct sock *sk = &tsk->sk;
1145         struct net *net = sock_net(sk);
1146         struct tipc_msg *msg = &tsk->phdr;
1147
1148         msg_set_destnode(msg, peer_node);
1149         msg_set_destport(msg, peer_port);
1150         msg_set_type(msg, TIPC_CONN_MSG);
1151         msg_set_lookup_scope(msg, 0);
1152         msg_set_hdr_sz(msg, SHORT_H_SIZE);
1153
1154         tsk->probing_intv = CONN_PROBING_INTERVAL;
1155         tsk->probing_state = TIPC_CONN_OK;
1156         tsk->connected = 1;
1157         sk_reset_timer(sk, &sk->sk_timer, jiffies + tsk->probing_intv);
1158         tipc_node_add_conn(net, peer_node, tsk->portid, peer_port);
1159         tsk->max_pkt = tipc_node_get_mtu(net, peer_node, tsk->portid);
1160         tsk->peer_caps = tipc_node_get_capabilities(net, peer_node);
1161         if (tsk->peer_caps & TIPC_BLOCK_FLOWCTL)
1162                 return;
1163
1164         /* Fall back to message based flow control */
1165         tsk->rcv_win = FLOWCTL_MSG_WIN;
1166         tsk->snd_win = FLOWCTL_MSG_WIN;
1167 }
1168
1169 /**
1170  * set_orig_addr - capture sender's address for received message
1171  * @m: descriptor for message info
1172  * @msg: received message header
1173  *
1174  * Note: Address is not captured if not requested by receiver.
1175  */
1176 static void set_orig_addr(struct msghdr *m, struct tipc_msg *msg)
1177 {
1178         DECLARE_SOCKADDR(struct sockaddr_tipc *, addr, m->msg_name);
1179
1180         if (addr) {
1181                 addr->family = AF_TIPC;
1182                 addr->addrtype = TIPC_ADDR_ID;
1183                 memset(&addr->addr, 0, sizeof(addr->addr));
1184                 addr->addr.id.ref = msg_origport(msg);
1185                 addr->addr.id.node = msg_orignode(msg);
1186                 addr->addr.name.domain = 0;     /* could leave uninitialized */
1187                 addr->scope = 0;                /* could leave uninitialized */
1188                 m->msg_namelen = sizeof(struct sockaddr_tipc);
1189         }
1190 }
1191
1192 /**
1193  * tipc_sk_anc_data_recv - optionally capture ancillary data for received message
1194  * @m: descriptor for message info
1195  * @msg: received message header
1196  * @tsk: TIPC port associated with message
1197  *
1198  * Note: Ancillary data is not captured if not requested by receiver.
1199  *
1200  * Returns 0 if successful, otherwise errno
1201  */
1202 static int tipc_sk_anc_data_recv(struct msghdr *m, struct tipc_msg *msg,
1203                                  struct tipc_sock *tsk)
1204 {
1205         u32 anc_data[3];
1206         u32 err;
1207         u32 dest_type;
1208         int has_name;
1209         int res;
1210
1211         if (likely(m->msg_controllen == 0))
1212                 return 0;
1213
1214         /* Optionally capture errored message object(s) */
1215         err = msg ? msg_errcode(msg) : 0;
1216         if (unlikely(err)) {
1217                 anc_data[0] = err;
1218                 anc_data[1] = msg_data_sz(msg);
1219                 res = put_cmsg(m, SOL_TIPC, TIPC_ERRINFO, 8, anc_data);
1220                 if (res)
1221                         return res;
1222                 if (anc_data[1]) {
1223                         res = put_cmsg(m, SOL_TIPC, TIPC_RETDATA, anc_data[1],
1224                                        msg_data(msg));
1225                         if (res)
1226                                 return res;
1227                 }
1228         }
1229
1230         /* Optionally capture message destination object */
1231         dest_type = msg ? msg_type(msg) : TIPC_DIRECT_MSG;
1232         switch (dest_type) {
1233         case TIPC_NAMED_MSG:
1234                 has_name = 1;
1235                 anc_data[0] = msg_nametype(msg);
1236                 anc_data[1] = msg_namelower(msg);
1237                 anc_data[2] = msg_namelower(msg);
1238                 break;
1239         case TIPC_MCAST_MSG:
1240                 has_name = 1;
1241                 anc_data[0] = msg_nametype(msg);
1242                 anc_data[1] = msg_namelower(msg);
1243                 anc_data[2] = msg_nameupper(msg);
1244                 break;
1245         case TIPC_CONN_MSG:
1246                 has_name = (tsk->conn_type != 0);
1247                 anc_data[0] = tsk->conn_type;
1248                 anc_data[1] = tsk->conn_instance;
1249                 anc_data[2] = tsk->conn_instance;
1250                 break;
1251         default:
1252                 has_name = 0;
1253         }
1254         if (has_name) {
1255                 res = put_cmsg(m, SOL_TIPC, TIPC_DESTNAME, 12, anc_data);
1256                 if (res)
1257                         return res;
1258         }
1259
1260         return 0;
1261 }
1262
1263 static void tipc_sk_send_ack(struct tipc_sock *tsk)
1264 {
1265         struct net *net = sock_net(&tsk->sk);
1266         struct sk_buff *skb = NULL;
1267         struct tipc_msg *msg;
1268         u32 peer_port = tsk_peer_port(tsk);
1269         u32 dnode = tsk_peer_node(tsk);
1270
1271         if (!tsk->connected)
1272                 return;
1273         skb = tipc_msg_create(CONN_MANAGER, CONN_ACK, INT_H_SIZE, 0,
1274                               dnode, tsk_own_node(tsk), peer_port,
1275                               tsk->portid, TIPC_OK);
1276         if (!skb)
1277                 return;
1278         msg = buf_msg(skb);
1279         msg_set_conn_ack(msg, tsk->rcv_unacked);
1280         tsk->rcv_unacked = 0;
1281
1282         /* Adjust to and advertize the correct window limit */
1283         if (tsk->peer_caps & TIPC_BLOCK_FLOWCTL) {
1284                 tsk->rcv_win = tsk_adv_blocks(tsk->sk.sk_rcvbuf);
1285                 msg_set_adv_win(msg, tsk->rcv_win);
1286         }
1287         tipc_node_xmit_skb(net, skb, dnode, msg_link_selector(msg));
1288 }
1289
1290 static int tipc_wait_for_rcvmsg(struct socket *sock, long *timeop)
1291 {
1292         struct sock *sk = sock->sk;
1293         DEFINE_WAIT(wait);
1294         long timeo = *timeop;
1295         int err;
1296
1297         for (;;) {
1298                 prepare_to_wait(sk_sleep(sk), &wait, TASK_INTERRUPTIBLE);
1299                 if (timeo && skb_queue_empty(&sk->sk_receive_queue)) {
1300                         if (sock->state == SS_DISCONNECTING) {
1301                                 err = -ENOTCONN;
1302                                 break;
1303                         }
1304                         release_sock(sk);
1305                         timeo = schedule_timeout(timeo);
1306                         lock_sock(sk);
1307                 }
1308                 err = 0;
1309                 if (!skb_queue_empty(&sk->sk_receive_queue))
1310                         break;
1311                 err = -EAGAIN;
1312                 if (!timeo)
1313                         break;
1314                 err = sock_intr_errno(timeo);
1315                 if (signal_pending(current))
1316                         break;
1317         }
1318         finish_wait(sk_sleep(sk), &wait);
1319         *timeop = timeo;
1320         return err;
1321 }
1322
1323 /**
1324  * tipc_recvmsg - receive packet-oriented message
1325  * @m: descriptor for message info
1326  * @buf_len: total size of user buffer area
1327  * @flags: receive flags
1328  *
1329  * Used for SOCK_DGRAM, SOCK_RDM, and SOCK_SEQPACKET messages.
1330  * If the complete message doesn't fit in user area, truncate it.
1331  *
1332  * Returns size of returned message data, errno otherwise
1333  */
1334 static int tipc_recvmsg(struct socket *sock, struct msghdr *m, size_t buf_len,
1335                         int flags)
1336 {
1337         struct sock *sk = sock->sk;
1338         struct tipc_sock *tsk = tipc_sk(sk);
1339         struct sk_buff *buf;
1340         struct tipc_msg *msg;
1341         long timeo;
1342         unsigned int sz;
1343         u32 err;
1344         int res, hlen;
1345
1346         /* Catch invalid receive requests */
1347         if (unlikely(!buf_len))
1348                 return -EINVAL;
1349
1350         lock_sock(sk);
1351
1352         if (unlikely(sock->state == SS_UNCONNECTED)) {
1353                 res = -ENOTCONN;
1354                 goto exit;
1355         }
1356
1357         timeo = sock_rcvtimeo(sk, flags & MSG_DONTWAIT);
1358 restart:
1359
1360         /* Look for a message in receive queue; wait if necessary */
1361         res = tipc_wait_for_rcvmsg(sock, &timeo);
1362         if (res)
1363                 goto exit;
1364
1365         /* Look at first message in receive queue */
1366         buf = skb_peek(&sk->sk_receive_queue);
1367         msg = buf_msg(buf);
1368         sz = msg_data_sz(msg);
1369         hlen = msg_hdr_sz(msg);
1370         err = msg_errcode(msg);
1371
1372         /* Discard an empty non-errored message & try again */
1373         if ((!sz) && (!err)) {
1374                 tsk_advance_rx_queue(sk);
1375                 goto restart;
1376         }
1377
1378         /* Capture sender's address (optional) */
1379         set_orig_addr(m, msg);
1380
1381         /* Capture ancillary data (optional) */
1382         res = tipc_sk_anc_data_recv(m, msg, tsk);
1383         if (res)
1384                 goto exit;
1385
1386         /* Capture message data (if valid) & compute return value (always) */
1387         if (!err) {
1388                 if (unlikely(buf_len < sz)) {
1389                         sz = buf_len;
1390                         m->msg_flags |= MSG_TRUNC;
1391                 }
1392                 res = skb_copy_datagram_msg(buf, hlen, m, sz);
1393                 if (res)
1394                         goto exit;
1395                 res = sz;
1396         } else {
1397                 if ((sock->state == SS_READY) ||
1398                     ((err == TIPC_CONN_SHUTDOWN) || m->msg_control))
1399                         res = 0;
1400                 else
1401                         res = -ECONNRESET;
1402         }
1403
1404         if (unlikely(flags & MSG_PEEK))
1405                 goto exit;
1406
1407         if (likely(sock->state != SS_READY)) {
1408                 tsk->rcv_unacked += tsk_inc(tsk, hlen + sz);
1409                 if (unlikely(tsk->rcv_unacked >= (tsk->rcv_win / 4)))
1410                         tipc_sk_send_ack(tsk);
1411         }
1412         tsk_advance_rx_queue(sk);
1413 exit:
1414         release_sock(sk);
1415         return res;
1416 }
1417
1418 /**
1419  * tipc_recv_stream - receive stream-oriented data
1420  * @m: descriptor for message info
1421  * @buf_len: total size of user buffer area
1422  * @flags: receive flags
1423  *
1424  * Used for SOCK_STREAM messages only.  If not enough data is available
1425  * will optionally wait for more; never truncates data.
1426  *
1427  * Returns size of returned message data, errno otherwise
1428  */
1429 static int tipc_recv_stream(struct socket *sock, struct msghdr *m,
1430                             size_t buf_len, int flags)
1431 {
1432         struct sock *sk = sock->sk;
1433         struct tipc_sock *tsk = tipc_sk(sk);
1434         struct sk_buff *buf;
1435         struct tipc_msg *msg;
1436         long timeo;
1437         unsigned int sz;
1438         int target;
1439         int sz_copied = 0;
1440         u32 err;
1441         int res = 0, hlen;
1442
1443         /* Catch invalid receive attempts */
1444         if (unlikely(!buf_len))
1445                 return -EINVAL;
1446
1447         lock_sock(sk);
1448
1449         if (unlikely(sock->state == SS_UNCONNECTED)) {
1450                 res = -ENOTCONN;
1451                 goto exit;
1452         }
1453
1454         target = sock_rcvlowat(sk, flags & MSG_WAITALL, buf_len);
1455         timeo = sock_rcvtimeo(sk, flags & MSG_DONTWAIT);
1456
1457 restart:
1458         /* Look for a message in receive queue; wait if necessary */
1459         res = tipc_wait_for_rcvmsg(sock, &timeo);
1460         if (res)
1461                 goto exit;
1462
1463         /* Look at first message in receive queue */
1464         buf = skb_peek(&sk->sk_receive_queue);
1465         msg = buf_msg(buf);
1466         sz = msg_data_sz(msg);
1467         hlen = msg_hdr_sz(msg);
1468         err = msg_errcode(msg);
1469
1470         /* Discard an empty non-errored message & try again */
1471         if ((!sz) && (!err)) {
1472                 tsk_advance_rx_queue(sk);
1473                 goto restart;
1474         }
1475
1476         /* Optionally capture sender's address & ancillary data of first msg */
1477         if (sz_copied == 0) {
1478                 set_orig_addr(m, msg);
1479                 res = tipc_sk_anc_data_recv(m, msg, tsk);
1480                 if (res)
1481                         goto exit;
1482         }
1483
1484         /* Capture message data (if valid) & compute return value (always) */
1485         if (!err) {
1486                 u32 offset = TIPC_SKB_CB(buf)->bytes_read;
1487                 u32 needed;
1488                 int sz_to_copy;
1489
1490                 sz -= offset;
1491                 needed = (buf_len - sz_copied);
1492                 sz_to_copy = min(sz, needed);
1493
1494                 res = skb_copy_datagram_msg(buf, hlen + offset, m, sz_to_copy);
1495                 if (res)
1496                         goto exit;
1497
1498                 sz_copied += sz_to_copy;
1499
1500                 if (sz_to_copy < sz) {
1501                         if (!(flags & MSG_PEEK))
1502                                 TIPC_SKB_CB(buf)->bytes_read =
1503                                         offset + sz_to_copy;
1504                         goto exit;
1505                 }
1506         } else {
1507                 if (sz_copied != 0)
1508                         goto exit; /* can't add error msg to valid data */
1509
1510                 if ((err == TIPC_CONN_SHUTDOWN) || m->msg_control)
1511                         res = 0;
1512                 else
1513                         res = -ECONNRESET;
1514         }
1515
1516         if (unlikely(flags & MSG_PEEK))
1517                 goto exit;
1518
1519         tsk->rcv_unacked += tsk_inc(tsk, hlen + sz);
1520         if (unlikely(tsk->rcv_unacked >= (tsk->rcv_win / 4)))
1521                 tipc_sk_send_ack(tsk);
1522         tsk_advance_rx_queue(sk);
1523
1524         /* Loop around if more data is required */
1525         if ((sz_copied < buf_len) &&    /* didn't get all requested data */
1526             (!skb_queue_empty(&sk->sk_receive_queue) ||
1527             (sz_copied < target)) &&    /* and more is ready or required */
1528             (!err))                     /* and haven't reached a FIN */
1529                 goto restart;
1530
1531 exit:
1532         release_sock(sk);
1533         return sz_copied ? sz_copied : res;
1534 }
1535
1536 /**
1537  * tipc_write_space - wake up thread if port congestion is released
1538  * @sk: socket
1539  */
1540 static void tipc_write_space(struct sock *sk)
1541 {
1542         struct socket_wq *wq;
1543
1544         rcu_read_lock();
1545         wq = rcu_dereference(sk->sk_wq);
1546         if (skwq_has_sleeper(wq))
1547                 wake_up_interruptible_sync_poll(&wq->wait, POLLOUT |
1548                                                 POLLWRNORM | POLLWRBAND);
1549         rcu_read_unlock();
1550 }
1551
1552 /**
1553  * tipc_data_ready - wake up threads to indicate messages have been received
1554  * @sk: socket
1555  * @len: the length of messages
1556  */
1557 static void tipc_data_ready(struct sock *sk)
1558 {
1559         struct socket_wq *wq;
1560
1561         rcu_read_lock();
1562         wq = rcu_dereference(sk->sk_wq);
1563         if (skwq_has_sleeper(wq))
1564                 wake_up_interruptible_sync_poll(&wq->wait, POLLIN |
1565                                                 POLLRDNORM | POLLRDBAND);
1566         rcu_read_unlock();
1567 }
1568
1569 static void tipc_sock_destruct(struct sock *sk)
1570 {
1571         __skb_queue_purge(&sk->sk_receive_queue);
1572 }
1573
1574 /**
1575  * filter_connect - Handle all incoming messages for a connection-based socket
1576  * @tsk: TIPC socket
1577  * @skb: pointer to message buffer. Set to NULL if buffer is consumed
1578  *
1579  * Returns true if everything ok, false otherwise
1580  */
1581 static bool filter_connect(struct tipc_sock *tsk, struct sk_buff *skb)
1582 {
1583         struct sock *sk = &tsk->sk;
1584         struct net *net = sock_net(sk);
1585         struct socket *sock = sk->sk_socket;
1586         struct tipc_msg *hdr = buf_msg(skb);
1587
1588         if (unlikely(msg_mcast(hdr)))
1589                 return false;
1590
1591         switch ((int)sock->state) {
1592         case SS_CONNECTED:
1593
1594                 /* Accept only connection-based messages sent by peer */
1595                 if (unlikely(!tsk_peer_msg(tsk, hdr)))
1596                         return false;
1597
1598                 if (unlikely(msg_errcode(hdr))) {
1599                         sock->state = SS_DISCONNECTING;
1600                         tsk->connected = 0;
1601                         /* Let timer expire on it's own */
1602                         tipc_node_remove_conn(net, tsk_peer_node(tsk),
1603                                               tsk->portid);
1604                         sk->sk_state_change(sk);
1605                 }
1606                 return true;
1607
1608         case SS_CONNECTING:
1609
1610                 /* Accept only ACK or NACK message */
1611                 if (unlikely(!msg_connected(hdr)))
1612                         return false;
1613
1614                 if (unlikely(msg_errcode(hdr))) {
1615                         sock->state = SS_DISCONNECTING;
1616                         sk->sk_err = ECONNREFUSED;
1617                         return true;
1618                 }
1619
1620                 if (unlikely(!msg_isdata(hdr))) {
1621                         sock->state = SS_DISCONNECTING;
1622                         sk->sk_err = EINVAL;
1623                         return true;
1624                 }
1625
1626                 tipc_sk_finish_conn(tsk, msg_origport(hdr), msg_orignode(hdr));
1627                 msg_set_importance(&tsk->phdr, msg_importance(hdr));
1628                 sock->state = SS_CONNECTED;
1629
1630                 /* If 'ACK+' message, add to socket receive queue */
1631                 if (msg_data_sz(hdr))
1632                         return true;
1633
1634                 /* If empty 'ACK-' message, wake up sleeping connect() */
1635                 if (waitqueue_active(sk_sleep(sk)))
1636                         wake_up_interruptible(sk_sleep(sk));
1637
1638                 /* 'ACK-' message is neither accepted nor rejected: */
1639                 msg_set_dest_droppable(hdr, 1);
1640                 return false;
1641
1642         case SS_LISTENING:
1643         case SS_UNCONNECTED:
1644
1645                 /* Accept only SYN message */
1646                 if (!msg_connected(hdr) && !(msg_errcode(hdr)))
1647                         return true;
1648                 break;
1649         case SS_DISCONNECTING:
1650                 break;
1651         default:
1652                 pr_err("Unknown socket state %u\n", sock->state);
1653         }
1654         return false;
1655 }
1656
1657 /**
1658  * rcvbuf_limit - get proper overload limit of socket receive queue
1659  * @sk: socket
1660  * @skb: message
1661  *
1662  * For connection oriented messages, irrespective of importance,
1663  * default queue limit is 2 MB.
1664  *
1665  * For connectionless messages, queue limits are based on message
1666  * importance as follows:
1667  *
1668  * TIPC_LOW_IMPORTANCE       (2 MB)
1669  * TIPC_MEDIUM_IMPORTANCE    (4 MB)
1670  * TIPC_HIGH_IMPORTANCE      (8 MB)
1671  * TIPC_CRITICAL_IMPORTANCE  (16 MB)
1672  *
1673  * Returns overload limit according to corresponding message importance
1674  */
1675 static unsigned int rcvbuf_limit(struct sock *sk, struct sk_buff *skb)
1676 {
1677         struct tipc_sock *tsk = tipc_sk(sk);
1678         struct tipc_msg *hdr = buf_msg(skb);
1679
1680         if (unlikely(!msg_connected(hdr)))
1681                 return sk->sk_rcvbuf << msg_importance(hdr);
1682
1683         if (likely(tsk->peer_caps & TIPC_BLOCK_FLOWCTL))
1684                 return sk->sk_rcvbuf;
1685
1686         return FLOWCTL_MSG_LIM;
1687 }
1688
1689 /**
1690  * filter_rcv - validate incoming message
1691  * @sk: socket
1692  * @skb: pointer to message.
1693  *
1694  * Enqueues message on receive queue if acceptable; optionally handles
1695  * disconnect indication for a connected socket.
1696  *
1697  * Called with socket lock already taken
1698  *
1699  * Returns true if message was added to socket receive queue, otherwise false
1700  */
1701 static bool filter_rcv(struct sock *sk, struct sk_buff *skb,
1702                        struct sk_buff_head *xmitq)
1703 {
1704         struct socket *sock = sk->sk_socket;
1705         struct tipc_sock *tsk = tipc_sk(sk);
1706         struct tipc_msg *hdr = buf_msg(skb);
1707         unsigned int limit = rcvbuf_limit(sk, skb);
1708         int err = TIPC_OK;
1709         int usr = msg_user(hdr);
1710
1711         if (unlikely(msg_user(hdr) == CONN_MANAGER)) {
1712                 tipc_sk_proto_rcv(tsk, skb, xmitq);
1713                 return false;
1714         }
1715
1716         if (unlikely(usr == SOCK_WAKEUP)) {
1717                 kfree_skb(skb);
1718                 tsk->link_cong = 0;
1719                 sk->sk_write_space(sk);
1720                 return false;
1721         }
1722
1723         /* Drop if illegal message type */
1724         if (unlikely(msg_type(hdr) > TIPC_DIRECT_MSG)) {
1725                 kfree_skb(skb);
1726                 return false;
1727         }
1728
1729         /* Reject if wrong message type for current socket state */
1730         if (unlikely(sock->state == SS_READY)) {
1731                 if (msg_connected(hdr)) {
1732                         err = TIPC_ERR_NO_PORT;
1733                         goto reject;
1734                 }
1735         } else if (unlikely(!filter_connect(tsk, skb))) {
1736                 err = TIPC_ERR_NO_PORT;
1737                 goto reject;
1738         }
1739
1740         /* Reject message if there isn't room to queue it */
1741         if (unlikely(sk_rmem_alloc_get(sk) + skb->truesize >= limit)) {
1742                 err = TIPC_ERR_OVERLOAD;
1743                 goto reject;
1744         }
1745
1746         /* Enqueue message */
1747         TIPC_SKB_CB(skb)->bytes_read = 0;
1748         __skb_queue_tail(&sk->sk_receive_queue, skb);
1749         skb_set_owner_r(skb, sk);
1750
1751         sk->sk_data_ready(sk);
1752         return true;
1753
1754 reject:
1755         if (tipc_msg_reverse(tsk_own_node(tsk), &skb, err))
1756                 __skb_queue_tail(xmitq, skb);
1757         return false;
1758 }
1759
1760 /**
1761  * tipc_backlog_rcv - handle incoming message from backlog queue
1762  * @sk: socket
1763  * @skb: message
1764  *
1765  * Caller must hold socket lock
1766  *
1767  * Returns 0
1768  */
1769 static int tipc_backlog_rcv(struct sock *sk, struct sk_buff *skb)
1770 {
1771         unsigned int truesize = skb->truesize;
1772         struct sk_buff_head xmitq;
1773         u32 dnode, selector;
1774
1775         __skb_queue_head_init(&xmitq);
1776
1777         if (likely(filter_rcv(sk, skb, &xmitq))) {
1778                 atomic_add(truesize, &tipc_sk(sk)->dupl_rcvcnt);
1779                 return 0;
1780         }
1781
1782         if (skb_queue_empty(&xmitq))
1783                 return 0;
1784
1785         /* Send response/rejected message */
1786         skb = __skb_dequeue(&xmitq);
1787         dnode = msg_destnode(buf_msg(skb));
1788         selector = msg_origport(buf_msg(skb));
1789         tipc_node_xmit_skb(sock_net(sk), skb, dnode, selector);
1790         return 0;
1791 }
1792
1793 /**
1794  * tipc_sk_enqueue - extract all buffers with destination 'dport' from
1795  *                   inputq and try adding them to socket or backlog queue
1796  * @inputq: list of incoming buffers with potentially different destinations
1797  * @sk: socket where the buffers should be enqueued
1798  * @dport: port number for the socket
1799  *
1800  * Caller must hold socket lock
1801  */
1802 static void tipc_sk_enqueue(struct sk_buff_head *inputq, struct sock *sk,
1803                             u32 dport, struct sk_buff_head *xmitq)
1804 {
1805         unsigned long time_limit = jiffies + 2;
1806         struct sk_buff *skb;
1807         unsigned int lim;
1808         atomic_t *dcnt;
1809         u32 onode;
1810
1811         while (skb_queue_len(inputq)) {
1812                 if (unlikely(time_after_eq(jiffies, time_limit)))
1813                         return;
1814
1815                 skb = tipc_skb_dequeue(inputq, dport);
1816                 if (unlikely(!skb))
1817                         return;
1818
1819                 /* Add message directly to receive queue if possible */
1820                 if (!sock_owned_by_user(sk)) {
1821                         filter_rcv(sk, skb, xmitq);
1822                         continue;
1823                 }
1824
1825                 /* Try backlog, compensating for double-counted bytes */
1826                 dcnt = &tipc_sk(sk)->dupl_rcvcnt;
1827                 if (!sk->sk_backlog.len)
1828                         atomic_set(dcnt, 0);
1829                 lim = rcvbuf_limit(sk, skb) + atomic_read(dcnt);
1830                 if (likely(!sk_add_backlog(sk, skb, lim)))
1831                         continue;
1832
1833                 /* Overload => reject message back to sender */
1834                 onode = tipc_own_addr(sock_net(sk));
1835                 if (tipc_msg_reverse(onode, &skb, TIPC_ERR_OVERLOAD))
1836                         __skb_queue_tail(xmitq, skb);
1837                 break;
1838         }
1839 }
1840
1841 /**
1842  * tipc_sk_rcv - handle a chain of incoming buffers
1843  * @inputq: buffer list containing the buffers
1844  * Consumes all buffers in list until inputq is empty
1845  * Note: may be called in multiple threads referring to the same queue
1846  */
1847 void tipc_sk_rcv(struct net *net, struct sk_buff_head *inputq)
1848 {
1849         struct sk_buff_head xmitq;
1850         u32 dnode, dport = 0;
1851         int err;
1852         struct tipc_sock *tsk;
1853         struct sock *sk;
1854         struct sk_buff *skb;
1855
1856         __skb_queue_head_init(&xmitq);
1857         while (skb_queue_len(inputq)) {
1858                 dport = tipc_skb_peek_port(inputq, dport);
1859                 tsk = tipc_sk_lookup(net, dport);
1860
1861                 if (likely(tsk)) {
1862                         sk = &tsk->sk;
1863                         if (likely(spin_trylock_bh(&sk->sk_lock.slock))) {
1864                                 tipc_sk_enqueue(inputq, sk, dport, &xmitq);
1865                                 spin_unlock_bh(&sk->sk_lock.slock);
1866                         }
1867                         /* Send pending response/rejected messages, if any */
1868                         while ((skb = __skb_dequeue(&xmitq))) {
1869                                 dnode = msg_destnode(buf_msg(skb));
1870                                 tipc_node_xmit_skb(net, skb, dnode, dport);
1871                         }
1872                         sock_put(sk);
1873                         continue;
1874                 }
1875
1876                 /* No destination socket => dequeue skb if still there */
1877                 skb = tipc_skb_dequeue(inputq, dport);
1878                 if (!skb)
1879                         return;
1880
1881                 /* Try secondary lookup if unresolved named message */
1882                 err = TIPC_ERR_NO_PORT;
1883                 if (tipc_msg_lookup_dest(net, skb, &err))
1884                         goto xmit;
1885
1886                 /* Prepare for message rejection */
1887                 if (!tipc_msg_reverse(tipc_own_addr(net), &skb, err))
1888                         continue;
1889 xmit:
1890                 dnode = msg_destnode(buf_msg(skb));
1891                 tipc_node_xmit_skb(net, skb, dnode, dport);
1892         }
1893 }
1894
1895 static int tipc_wait_for_connect(struct socket *sock, long *timeo_p)
1896 {
1897         struct sock *sk = sock->sk;
1898         DEFINE_WAIT(wait);
1899         int done;
1900
1901         do {
1902                 int err = sock_error(sk);
1903                 if (err)
1904                         return err;
1905                 if (!*timeo_p)
1906                         return -ETIMEDOUT;
1907                 if (signal_pending(current))
1908                         return sock_intr_errno(*timeo_p);
1909
1910                 prepare_to_wait(sk_sleep(sk), &wait, TASK_INTERRUPTIBLE);
1911                 done = sk_wait_event(sk, timeo_p, sock->state != SS_CONNECTING);
1912                 finish_wait(sk_sleep(sk), &wait);
1913         } while (!done);
1914         return 0;
1915 }
1916
1917 /**
1918  * tipc_connect - establish a connection to another TIPC port
1919  * @sock: socket structure
1920  * @dest: socket address for destination port
1921  * @destlen: size of socket address data structure
1922  * @flags: file-related flags associated with socket
1923  *
1924  * Returns 0 on success, errno otherwise
1925  */
1926 static int tipc_connect(struct socket *sock, struct sockaddr *dest,
1927                         int destlen, int flags)
1928 {
1929         struct sock *sk = sock->sk;
1930         struct tipc_sock *tsk = tipc_sk(sk);
1931         struct sockaddr_tipc *dst = (struct sockaddr_tipc *)dest;
1932         struct msghdr m = {NULL,};
1933         long timeout = (flags & O_NONBLOCK) ? 0 : tsk->conn_timeout;
1934         socket_state previous;
1935         int res = 0;
1936
1937         lock_sock(sk);
1938
1939         /* DGRAM/RDM connect(), just save the destaddr */
1940         if (sock->state == SS_READY) {
1941                 if (dst->family == AF_UNSPEC) {
1942                         memset(&tsk->remote, 0, sizeof(struct sockaddr_tipc));
1943                         tsk->connected = 0;
1944                 } else if (destlen != sizeof(struct sockaddr_tipc)) {
1945                         res = -EINVAL;
1946                 } else {
1947                         memcpy(&tsk->remote, dest, destlen);
1948                         tsk->connected = 1;
1949                 }
1950                 goto exit;
1951         }
1952
1953         /*
1954          * Reject connection attempt using multicast address
1955          *
1956          * Note: send_msg() validates the rest of the address fields,
1957          *       so there's no need to do it here
1958          */
1959         if (dst->addrtype == TIPC_ADDR_MCAST) {
1960                 res = -EINVAL;
1961                 goto exit;
1962         }
1963
1964         previous = sock->state;
1965         switch (sock->state) {
1966         case SS_UNCONNECTED:
1967                 /* Send a 'SYN-' to destination */
1968                 m.msg_name = dest;
1969                 m.msg_namelen = destlen;
1970
1971                 /* If connect is in non-blocking case, set MSG_DONTWAIT to
1972                  * indicate send_msg() is never blocked.
1973                  */
1974                 if (!timeout)
1975                         m.msg_flags = MSG_DONTWAIT;
1976
1977                 res = __tipc_sendmsg(sock, &m, 0);
1978                 if ((res < 0) && (res != -EWOULDBLOCK))
1979                         goto exit;
1980
1981                 /* Just entered SS_CONNECTING state; the only
1982                  * difference is that return value in non-blocking
1983                  * case is EINPROGRESS, rather than EALREADY.
1984                  */
1985                 res = -EINPROGRESS;
1986         case SS_CONNECTING:
1987                 if (previous == SS_CONNECTING)
1988                         res = -EALREADY;
1989                 if (!timeout)
1990                         goto exit;
1991                 timeout = msecs_to_jiffies(timeout);
1992                 /* Wait until an 'ACK' or 'RST' arrives, or a timeout occurs */
1993                 res = tipc_wait_for_connect(sock, &timeout);
1994                 break;
1995         case SS_CONNECTED:
1996                 res = -EISCONN;
1997                 break;
1998         default:
1999                 res = -EINVAL;
2000                 break;
2001         }
2002 exit:
2003         release_sock(sk);
2004         return res;
2005 }
2006
2007 /**
2008  * tipc_listen - allow socket to listen for incoming connections
2009  * @sock: socket structure
2010  * @len: (unused)
2011  *
2012  * Returns 0 on success, errno otherwise
2013  */
2014 static int tipc_listen(struct socket *sock, int len)
2015 {
2016         struct sock *sk = sock->sk;
2017         int res;
2018
2019         lock_sock(sk);
2020
2021         if (sock->state != SS_UNCONNECTED)
2022                 res = -EINVAL;
2023         else {
2024                 sock->state = SS_LISTENING;
2025                 res = 0;
2026         }
2027
2028         release_sock(sk);
2029         return res;
2030 }
2031
2032 static int tipc_wait_for_accept(struct socket *sock, long timeo)
2033 {
2034         struct sock *sk = sock->sk;
2035         DEFINE_WAIT(wait);
2036         int err;
2037
2038         /* True wake-one mechanism for incoming connections: only
2039          * one process gets woken up, not the 'whole herd'.
2040          * Since we do not 'race & poll' for established sockets
2041          * anymore, the common case will execute the loop only once.
2042         */
2043         for (;;) {
2044                 prepare_to_wait_exclusive(sk_sleep(sk), &wait,
2045                                           TASK_INTERRUPTIBLE);
2046                 if (timeo && skb_queue_empty(&sk->sk_receive_queue)) {
2047                         release_sock(sk);
2048                         timeo = schedule_timeout(timeo);
2049                         lock_sock(sk);
2050                 }
2051                 err = 0;
2052                 if (!skb_queue_empty(&sk->sk_receive_queue))
2053                         break;
2054                 err = -EINVAL;
2055                 if (sock->state != SS_LISTENING)
2056                         break;
2057                 err = -EAGAIN;
2058                 if (!timeo)
2059                         break;
2060                 err = sock_intr_errno(timeo);
2061                 if (signal_pending(current))
2062                         break;
2063         }
2064         finish_wait(sk_sleep(sk), &wait);
2065         return err;
2066 }
2067
2068 /**
2069  * tipc_accept - wait for connection request
2070  * @sock: listening socket
2071  * @newsock: new socket that is to be connected
2072  * @flags: file-related flags associated with socket
2073  *
2074  * Returns 0 on success, errno otherwise
2075  */
2076 static int tipc_accept(struct socket *sock, struct socket *new_sock, int flags)
2077 {
2078         struct sock *new_sk, *sk = sock->sk;
2079         struct sk_buff *buf;
2080         struct tipc_sock *new_tsock;
2081         struct tipc_msg *msg;
2082         long timeo;
2083         int res;
2084
2085         lock_sock(sk);
2086
2087         if (sock->state != SS_LISTENING) {
2088                 res = -EINVAL;
2089                 goto exit;
2090         }
2091         timeo = sock_rcvtimeo(sk, flags & O_NONBLOCK);
2092         res = tipc_wait_for_accept(sock, timeo);
2093         if (res)
2094                 goto exit;
2095
2096         buf = skb_peek(&sk->sk_receive_queue);
2097
2098         res = tipc_sk_create(sock_net(sock->sk), new_sock, 0, 0);
2099         if (res)
2100                 goto exit;
2101         security_sk_clone(sock->sk, new_sock->sk);
2102
2103         new_sk = new_sock->sk;
2104         new_tsock = tipc_sk(new_sk);
2105         msg = buf_msg(buf);
2106
2107         /* we lock on new_sk; but lockdep sees the lock on sk */
2108         lock_sock_nested(new_sk, SINGLE_DEPTH_NESTING);
2109
2110         /*
2111          * Reject any stray messages received by new socket
2112          * before the socket lock was taken (very, very unlikely)
2113          */
2114         tsk_rej_rx_queue(new_sk);
2115
2116         /* Connect new socket to it's peer */
2117         tipc_sk_finish_conn(new_tsock, msg_origport(msg), msg_orignode(msg));
2118         new_sock->state = SS_CONNECTED;
2119
2120         tsk_set_importance(new_tsock, msg_importance(msg));
2121         if (msg_named(msg)) {
2122                 new_tsock->conn_type = msg_nametype(msg);
2123                 new_tsock->conn_instance = msg_nameinst(msg);
2124         }
2125
2126         /*
2127          * Respond to 'SYN-' by discarding it & returning 'ACK'-.
2128          * Respond to 'SYN+' by queuing it on new socket.
2129          */
2130         if (!msg_data_sz(msg)) {
2131                 struct msghdr m = {NULL,};
2132
2133                 tsk_advance_rx_queue(sk);
2134                 __tipc_send_stream(new_sock, &m, 0);
2135         } else {
2136                 __skb_dequeue(&sk->sk_receive_queue);
2137                 __skb_queue_head(&new_sk->sk_receive_queue, buf);
2138                 skb_set_owner_r(buf, new_sk);
2139         }
2140         release_sock(new_sk);
2141 exit:
2142         release_sock(sk);
2143         return res;
2144 }
2145
2146 /**
2147  * tipc_shutdown - shutdown socket connection
2148  * @sock: socket structure
2149  * @how: direction to close (must be SHUT_RDWR)
2150  *
2151  * Terminates connection (if necessary), then purges socket's receive queue.
2152  *
2153  * Returns 0 on success, errno otherwise
2154  */
2155 static int tipc_shutdown(struct socket *sock, int how)
2156 {
2157         struct sock *sk = sock->sk;
2158         struct net *net = sock_net(sk);
2159         struct tipc_sock *tsk = tipc_sk(sk);
2160         struct sk_buff *skb;
2161         u32 dnode = tsk_peer_node(tsk);
2162         u32 dport = tsk_peer_port(tsk);
2163         u32 onode = tipc_own_addr(net);
2164         u32 oport = tsk->portid;
2165         int res;
2166
2167         if (how != SHUT_RDWR)
2168                 return -EINVAL;
2169
2170         lock_sock(sk);
2171
2172         switch (sock->state) {
2173         case SS_CONNECTING:
2174         case SS_CONNECTED:
2175
2176 restart:
2177                 dnode = tsk_peer_node(tsk);
2178
2179                 /* Disconnect and send a 'FIN+' or 'FIN-' message to peer */
2180                 skb = __skb_dequeue(&sk->sk_receive_queue);
2181                 if (skb) {
2182                         if (TIPC_SKB_CB(skb)->bytes_read) {
2183                                 kfree_skb(skb);
2184                                 goto restart;
2185                         }
2186                         tipc_sk_respond(sk, skb, TIPC_CONN_SHUTDOWN);
2187                 } else {
2188                         skb = tipc_msg_create(TIPC_CRITICAL_IMPORTANCE,
2189                                               TIPC_CONN_MSG, SHORT_H_SIZE,
2190                                               0, dnode, onode, dport, oport,
2191                                               TIPC_CONN_SHUTDOWN);
2192                         if (skb)
2193                                 tipc_node_xmit_skb(net, skb, dnode, tsk->portid);
2194                 }
2195                 tsk->connected = 0;
2196                 sock->state = SS_DISCONNECTING;
2197                 tipc_node_remove_conn(net, dnode, tsk->portid);
2198                 /* fall through */
2199
2200         case SS_DISCONNECTING:
2201
2202                 /* Discard any unreceived messages */
2203                 __skb_queue_purge(&sk->sk_receive_queue);
2204
2205                 /* Wake up anyone sleeping in poll */
2206                 sk->sk_state_change(sk);
2207                 res = 0;
2208                 break;
2209
2210         default:
2211                 res = -ENOTCONN;
2212         }
2213
2214         release_sock(sk);
2215         return res;
2216 }
2217
2218 static void tipc_sk_timeout(unsigned long data)
2219 {
2220         struct tipc_sock *tsk = (struct tipc_sock *)data;
2221         struct sock *sk = &tsk->sk;
2222         struct sk_buff *skb = NULL;
2223         u32 peer_port, peer_node;
2224         u32 own_node = tsk_own_node(tsk);
2225
2226         bh_lock_sock(sk);
2227         if (!tsk->connected) {
2228                 bh_unlock_sock(sk);
2229                 goto exit;
2230         }
2231         peer_port = tsk_peer_port(tsk);
2232         peer_node = tsk_peer_node(tsk);
2233
2234         if (tsk->probing_state == TIPC_CONN_PROBING) {
2235                 if (!sock_owned_by_user(sk)) {
2236                         sk->sk_socket->state = SS_DISCONNECTING;
2237                         tsk->connected = 0;
2238                         tipc_node_remove_conn(sock_net(sk), tsk_peer_node(tsk),
2239                                               tsk_peer_port(tsk));
2240                         sk->sk_state_change(sk);
2241                 } else {
2242                         /* Try again later */
2243                         sk_reset_timer(sk, &sk->sk_timer, (HZ / 20));
2244                 }
2245
2246         } else {
2247                 skb = tipc_msg_create(CONN_MANAGER, CONN_PROBE,
2248                                       INT_H_SIZE, 0, peer_node, own_node,
2249                                       peer_port, tsk->portid, TIPC_OK);
2250                 tsk->probing_state = TIPC_CONN_PROBING;
2251                 sk_reset_timer(sk, &sk->sk_timer, jiffies + tsk->probing_intv);
2252         }
2253         bh_unlock_sock(sk);
2254         if (skb)
2255                 tipc_node_xmit_skb(sock_net(sk), skb, peer_node, tsk->portid);
2256 exit:
2257         sock_put(sk);
2258 }
2259
2260 static int tipc_sk_publish(struct tipc_sock *tsk, uint scope,
2261                            struct tipc_name_seq const *seq)
2262 {
2263         struct net *net = sock_net(&tsk->sk);
2264         struct publication *publ;
2265         u32 key;
2266
2267         if (tsk->connected)
2268                 return -EINVAL;
2269         key = tsk->portid + tsk->pub_count + 1;
2270         if (key == tsk->portid)
2271                 return -EADDRINUSE;
2272
2273         publ = tipc_nametbl_publish(net, seq->type, seq->lower, seq->upper,
2274                                     scope, tsk->portid, key);
2275         if (unlikely(!publ))
2276                 return -EINVAL;
2277
2278         list_add(&publ->pport_list, &tsk->publications);
2279         tsk->pub_count++;
2280         tsk->published = 1;
2281         return 0;
2282 }
2283
2284 static int tipc_sk_withdraw(struct tipc_sock *tsk, uint scope,
2285                             struct tipc_name_seq const *seq)
2286 {
2287         struct net *net = sock_net(&tsk->sk);
2288         struct publication *publ;
2289         struct publication *safe;
2290         int rc = -EINVAL;
2291
2292         list_for_each_entry_safe(publ, safe, &tsk->publications, pport_list) {
2293                 if (seq) {
2294                         if (publ->scope != scope)
2295                                 continue;
2296                         if (publ->type != seq->type)
2297                                 continue;
2298                         if (publ->lower != seq->lower)
2299                                 continue;
2300                         if (publ->upper != seq->upper)
2301                                 break;
2302                         tipc_nametbl_withdraw(net, publ->type, publ->lower,
2303                                               publ->ref, publ->key);
2304                         rc = 0;
2305                         break;
2306                 }
2307                 tipc_nametbl_withdraw(net, publ->type, publ->lower,
2308                                       publ->ref, publ->key);
2309                 rc = 0;
2310         }
2311         if (list_empty(&tsk->publications))
2312                 tsk->published = 0;
2313         return rc;
2314 }
2315
2316 /* tipc_sk_reinit: set non-zero address in all existing sockets
2317  *                 when we go from standalone to network mode.
2318  */
2319 void tipc_sk_reinit(struct net *net)
2320 {
2321         struct tipc_net *tn = net_generic(net, tipc_net_id);
2322         const struct bucket_table *tbl;
2323         struct rhash_head *pos;
2324         struct tipc_sock *tsk;
2325         struct tipc_msg *msg;
2326         int i;
2327
2328         rcu_read_lock();
2329         tbl = rht_dereference_rcu((&tn->sk_rht)->tbl, &tn->sk_rht);
2330         for (i = 0; i < tbl->size; i++) {
2331                 rht_for_each_entry_rcu(tsk, pos, tbl, i, node) {
2332                         spin_lock_bh(&tsk->sk.sk_lock.slock);
2333                         msg = &tsk->phdr;
2334                         msg_set_prevnode(msg, tn->own_addr);
2335                         msg_set_orignode(msg, tn->own_addr);
2336                         spin_unlock_bh(&tsk->sk.sk_lock.slock);
2337                 }
2338         }
2339         rcu_read_unlock();
2340 }
2341
2342 static struct tipc_sock *tipc_sk_lookup(struct net *net, u32 portid)
2343 {
2344         struct tipc_net *tn = net_generic(net, tipc_net_id);
2345         struct tipc_sock *tsk;
2346
2347         rcu_read_lock();
2348         tsk = rhashtable_lookup_fast(&tn->sk_rht, &portid, tsk_rht_params);
2349         if (tsk)
2350                 sock_hold(&tsk->sk);
2351         rcu_read_unlock();
2352
2353         return tsk;
2354 }
2355
2356 static int tipc_sk_insert(struct tipc_sock *tsk)
2357 {
2358         struct sock *sk = &tsk->sk;
2359         struct net *net = sock_net(sk);
2360         struct tipc_net *tn = net_generic(net, tipc_net_id);
2361         u32 remaining = (TIPC_MAX_PORT - TIPC_MIN_PORT) + 1;
2362         u32 portid = prandom_u32() % remaining + TIPC_MIN_PORT;
2363
2364         while (remaining--) {
2365                 portid++;
2366                 if ((portid < TIPC_MIN_PORT) || (portid > TIPC_MAX_PORT))
2367                         portid = TIPC_MIN_PORT;
2368                 tsk->portid = portid;
2369                 sock_hold(&tsk->sk);
2370                 if (!rhashtable_lookup_insert_fast(&tn->sk_rht, &tsk->node,
2371                                                    tsk_rht_params))
2372                         return 0;
2373                 sock_put(&tsk->sk);
2374         }
2375
2376         return -1;
2377 }
2378
2379 static void tipc_sk_remove(struct tipc_sock *tsk)
2380 {
2381         struct sock *sk = &tsk->sk;
2382         struct tipc_net *tn = net_generic(sock_net(sk), tipc_net_id);
2383
2384         if (!rhashtable_remove_fast(&tn->sk_rht, &tsk->node, tsk_rht_params)) {
2385                 WARN_ON(atomic_read(&sk->sk_refcnt) == 1);
2386                 __sock_put(sk);
2387         }
2388 }
2389
2390 static const struct rhashtable_params tsk_rht_params = {
2391         .nelem_hint = 192,
2392         .head_offset = offsetof(struct tipc_sock, node),
2393         .key_offset = offsetof(struct tipc_sock, portid),
2394         .key_len = sizeof(u32), /* portid */
2395         .max_size = 1048576,
2396         .min_size = 256,
2397         .automatic_shrinking = true,
2398 };
2399
2400 int tipc_sk_rht_init(struct net *net)
2401 {
2402         struct tipc_net *tn = net_generic(net, tipc_net_id);
2403
2404         return rhashtable_init(&tn->sk_rht, &tsk_rht_params);
2405 }
2406
2407 void tipc_sk_rht_destroy(struct net *net)
2408 {
2409         struct tipc_net *tn = net_generic(net, tipc_net_id);
2410
2411         /* Wait for socket readers to complete */
2412         synchronize_net();
2413
2414         rhashtable_destroy(&tn->sk_rht);
2415 }
2416
2417 /**
2418  * tipc_setsockopt - set socket option
2419  * @sock: socket structure
2420  * @lvl: option level
2421  * @opt: option identifier
2422  * @ov: pointer to new option value
2423  * @ol: length of option value
2424  *
2425  * For stream sockets only, accepts and ignores all IPPROTO_TCP options
2426  * (to ease compatibility).
2427  *
2428  * Returns 0 on success, errno otherwise
2429  */
2430 static int tipc_setsockopt(struct socket *sock, int lvl, int opt,
2431                            char __user *ov, unsigned int ol)
2432 {
2433         struct sock *sk = sock->sk;
2434         struct tipc_sock *tsk = tipc_sk(sk);
2435         u32 value;
2436         int res;
2437
2438         if ((lvl == IPPROTO_TCP) && (sock->type == SOCK_STREAM))
2439                 return 0;
2440         if (lvl != SOL_TIPC)
2441                 return -ENOPROTOOPT;
2442         if (ol < sizeof(value))
2443                 return -EINVAL;
2444         res = get_user(value, (u32 __user *)ov);
2445         if (res)
2446                 return res;
2447
2448         lock_sock(sk);
2449
2450         switch (opt) {
2451         case TIPC_IMPORTANCE:
2452                 res = tsk_set_importance(tsk, value);
2453                 break;
2454         case TIPC_SRC_DROPPABLE:
2455                 if (sock->type != SOCK_STREAM)
2456                         tsk_set_unreliable(tsk, value);
2457                 else
2458                         res = -ENOPROTOOPT;
2459                 break;
2460         case TIPC_DEST_DROPPABLE:
2461                 tsk_set_unreturnable(tsk, value);
2462                 break;
2463         case TIPC_CONN_TIMEOUT:
2464                 tipc_sk(sk)->conn_timeout = value;
2465                 /* no need to set "res", since already 0 at this point */
2466                 break;
2467         default:
2468                 res = -EINVAL;
2469         }
2470
2471         release_sock(sk);
2472
2473         return res;
2474 }
2475
2476 /**
2477  * tipc_getsockopt - get socket option
2478  * @sock: socket structure
2479  * @lvl: option level
2480  * @opt: option identifier
2481  * @ov: receptacle for option value
2482  * @ol: receptacle for length of option value
2483  *
2484  * For stream sockets only, returns 0 length result for all IPPROTO_TCP options
2485  * (to ease compatibility).
2486  *
2487  * Returns 0 on success, errno otherwise
2488  */
2489 static int tipc_getsockopt(struct socket *sock, int lvl, int opt,
2490                            char __user *ov, int __user *ol)
2491 {
2492         struct sock *sk = sock->sk;
2493         struct tipc_sock *tsk = tipc_sk(sk);
2494         int len;
2495         u32 value;
2496         int res;
2497
2498         if ((lvl == IPPROTO_TCP) && (sock->type == SOCK_STREAM))
2499                 return put_user(0, ol);
2500         if (lvl != SOL_TIPC)
2501                 return -ENOPROTOOPT;
2502         res = get_user(len, ol);
2503         if (res)
2504                 return res;
2505
2506         lock_sock(sk);
2507
2508         switch (opt) {
2509         case TIPC_IMPORTANCE:
2510                 value = tsk_importance(tsk);
2511                 break;
2512         case TIPC_SRC_DROPPABLE:
2513                 value = tsk_unreliable(tsk);
2514                 break;
2515         case TIPC_DEST_DROPPABLE:
2516                 value = tsk_unreturnable(tsk);
2517                 break;
2518         case TIPC_CONN_TIMEOUT:
2519                 value = tsk->conn_timeout;
2520                 /* no need to set "res", since already 0 at this point */
2521                 break;
2522         case TIPC_NODE_RECVQ_DEPTH:
2523                 value = 0; /* was tipc_queue_size, now obsolete */
2524                 break;
2525         case TIPC_SOCK_RECVQ_DEPTH:
2526                 value = skb_queue_len(&sk->sk_receive_queue);
2527                 break;
2528         default:
2529                 res = -EINVAL;
2530         }
2531
2532         release_sock(sk);
2533
2534         if (res)
2535                 return res;     /* "get" failed */
2536
2537         if (len < sizeof(value))
2538                 return -EINVAL;
2539
2540         if (copy_to_user(ov, &value, sizeof(value)))
2541                 return -EFAULT;
2542
2543         return put_user(sizeof(value), ol);
2544 }
2545
2546 static int tipc_ioctl(struct socket *sock, unsigned int cmd, unsigned long arg)
2547 {
2548         struct sock *sk = sock->sk;
2549         struct tipc_sioc_ln_req lnr;
2550         void __user *argp = (void __user *)arg;
2551
2552         switch (cmd) {
2553         case SIOCGETLINKNAME:
2554                 if (copy_from_user(&lnr, argp, sizeof(lnr)))
2555                         return -EFAULT;
2556                 if (!tipc_node_get_linkname(sock_net(sk),
2557                                             lnr.bearer_id & 0xffff, lnr.peer,
2558                                             lnr.linkname, TIPC_MAX_LINK_NAME)) {
2559                         if (copy_to_user(argp, &lnr, sizeof(lnr)))
2560                                 return -EFAULT;
2561                         return 0;
2562                 }
2563                 return -EADDRNOTAVAIL;
2564         default:
2565                 return -ENOIOCTLCMD;
2566         }
2567 }
2568
2569 /* Protocol switches for the various types of TIPC sockets */
2570
2571 static const struct proto_ops msg_ops = {
2572         .owner          = THIS_MODULE,
2573         .family         = AF_TIPC,
2574         .release        = tipc_release,
2575         .bind           = tipc_bind,
2576         .connect        = tipc_connect,
2577         .socketpair     = sock_no_socketpair,
2578         .accept         = sock_no_accept,
2579         .getname        = tipc_getname,
2580         .poll           = tipc_poll,
2581         .ioctl          = tipc_ioctl,
2582         .listen         = sock_no_listen,
2583         .shutdown       = tipc_shutdown,
2584         .setsockopt     = tipc_setsockopt,
2585         .getsockopt     = tipc_getsockopt,
2586         .sendmsg        = tipc_sendmsg,
2587         .recvmsg        = tipc_recvmsg,
2588         .mmap           = sock_no_mmap,
2589         .sendpage       = sock_no_sendpage
2590 };
2591
2592 static const struct proto_ops packet_ops = {
2593         .owner          = THIS_MODULE,
2594         .family         = AF_TIPC,
2595         .release        = tipc_release,
2596         .bind           = tipc_bind,
2597         .connect        = tipc_connect,
2598         .socketpair     = sock_no_socketpair,
2599         .accept         = tipc_accept,
2600         .getname        = tipc_getname,
2601         .poll           = tipc_poll,
2602         .ioctl          = tipc_ioctl,
2603         .listen         = tipc_listen,
2604         .shutdown       = tipc_shutdown,
2605         .setsockopt     = tipc_setsockopt,
2606         .getsockopt     = tipc_getsockopt,
2607         .sendmsg        = tipc_send_packet,
2608         .recvmsg        = tipc_recvmsg,
2609         .mmap           = sock_no_mmap,
2610         .sendpage       = sock_no_sendpage
2611 };
2612
2613 static const struct proto_ops stream_ops = {
2614         .owner          = THIS_MODULE,
2615         .family         = AF_TIPC,
2616         .release        = tipc_release,
2617         .bind           = tipc_bind,
2618         .connect        = tipc_connect,
2619         .socketpair     = sock_no_socketpair,
2620         .accept         = tipc_accept,
2621         .getname        = tipc_getname,
2622         .poll           = tipc_poll,
2623         .ioctl          = tipc_ioctl,
2624         .listen         = tipc_listen,
2625         .shutdown       = tipc_shutdown,
2626         .setsockopt     = tipc_setsockopt,
2627         .getsockopt     = tipc_getsockopt,
2628         .sendmsg        = tipc_send_stream,
2629         .recvmsg        = tipc_recv_stream,
2630         .mmap           = sock_no_mmap,
2631         .sendpage       = sock_no_sendpage
2632 };
2633
2634 static const struct net_proto_family tipc_family_ops = {
2635         .owner          = THIS_MODULE,
2636         .family         = AF_TIPC,
2637         .create         = tipc_sk_create
2638 };
2639
2640 static struct proto tipc_proto = {
2641         .name           = "TIPC",
2642         .owner          = THIS_MODULE,
2643         .obj_size       = sizeof(struct tipc_sock),
2644         .sysctl_rmem    = sysctl_tipc_rmem
2645 };
2646
2647 /**
2648  * tipc_socket_init - initialize TIPC socket interface
2649  *
2650  * Returns 0 on success, errno otherwise
2651  */
2652 int tipc_socket_init(void)
2653 {
2654         int res;
2655
2656         res = proto_register(&tipc_proto, 1);
2657         if (res) {
2658                 pr_err("Failed to register TIPC protocol type\n");
2659                 goto out;
2660         }
2661
2662         res = sock_register(&tipc_family_ops);
2663         if (res) {
2664                 pr_err("Failed to register TIPC socket type\n");
2665                 proto_unregister(&tipc_proto);
2666                 goto out;
2667         }
2668  out:
2669         return res;
2670 }
2671
2672 /**
2673  * tipc_socket_stop - stop TIPC socket interface
2674  */
2675 void tipc_socket_stop(void)
2676 {
2677         sock_unregister(tipc_family_ops.family);
2678         proto_unregister(&tipc_proto);
2679 }
2680
2681 /* Caller should hold socket lock for the passed tipc socket. */
2682 static int __tipc_nl_add_sk_con(struct sk_buff *skb, struct tipc_sock *tsk)
2683 {
2684         u32 peer_node;
2685         u32 peer_port;
2686         struct nlattr *nest;
2687
2688         peer_node = tsk_peer_node(tsk);
2689         peer_port = tsk_peer_port(tsk);
2690
2691         nest = nla_nest_start(skb, TIPC_NLA_SOCK_CON);
2692
2693         if (nla_put_u32(skb, TIPC_NLA_CON_NODE, peer_node))
2694                 goto msg_full;
2695         if (nla_put_u32(skb, TIPC_NLA_CON_SOCK, peer_port))
2696                 goto msg_full;
2697
2698         if (tsk->conn_type != 0) {
2699                 if (nla_put_flag(skb, TIPC_NLA_CON_FLAG))
2700                         goto msg_full;
2701                 if (nla_put_u32(skb, TIPC_NLA_CON_TYPE, tsk->conn_type))
2702                         goto msg_full;
2703                 if (nla_put_u32(skb, TIPC_NLA_CON_INST, tsk->conn_instance))
2704                         goto msg_full;
2705         }
2706         nla_nest_end(skb, nest);
2707
2708         return 0;
2709
2710 msg_full:
2711         nla_nest_cancel(skb, nest);
2712
2713         return -EMSGSIZE;
2714 }
2715
2716 /* Caller should hold socket lock for the passed tipc socket. */
2717 static int __tipc_nl_add_sk(struct sk_buff *skb, struct netlink_callback *cb,
2718                             struct tipc_sock *tsk)
2719 {
2720         int err;
2721         void *hdr;
2722         struct nlattr *attrs;
2723         struct net *net = sock_net(skb->sk);
2724         struct tipc_net *tn = net_generic(net, tipc_net_id);
2725
2726         hdr = genlmsg_put(skb, NETLINK_CB(cb->skb).portid, cb->nlh->nlmsg_seq,
2727                           &tipc_genl_family, NLM_F_MULTI, TIPC_NL_SOCK_GET);
2728         if (!hdr)
2729                 goto msg_cancel;
2730
2731         attrs = nla_nest_start(skb, TIPC_NLA_SOCK);
2732         if (!attrs)
2733                 goto genlmsg_cancel;
2734         if (nla_put_u32(skb, TIPC_NLA_SOCK_REF, tsk->portid))
2735                 goto attr_msg_cancel;
2736         if (nla_put_u32(skb, TIPC_NLA_SOCK_ADDR, tn->own_addr))
2737                 goto attr_msg_cancel;
2738
2739         if (tsk->connected) {
2740                 err = __tipc_nl_add_sk_con(skb, tsk);
2741                 if (err)
2742                         goto attr_msg_cancel;
2743         } else if (!list_empty(&tsk->publications)) {
2744                 if (nla_put_flag(skb, TIPC_NLA_SOCK_HAS_PUBL))
2745                         goto attr_msg_cancel;
2746         }
2747         nla_nest_end(skb, attrs);
2748         genlmsg_end(skb, hdr);
2749
2750         return 0;
2751
2752 attr_msg_cancel:
2753         nla_nest_cancel(skb, attrs);
2754 genlmsg_cancel:
2755         genlmsg_cancel(skb, hdr);
2756 msg_cancel:
2757         return -EMSGSIZE;
2758 }
2759
2760 int tipc_nl_sk_dump(struct sk_buff *skb, struct netlink_callback *cb)
2761 {
2762         int err;
2763         struct tipc_sock *tsk;
2764         const struct bucket_table *tbl;
2765         struct rhash_head *pos;
2766         struct net *net = sock_net(skb->sk);
2767         struct tipc_net *tn = net_generic(net, tipc_net_id);
2768         u32 tbl_id = cb->args[0];
2769         u32 prev_portid = cb->args[1];
2770
2771         rcu_read_lock();
2772         tbl = rht_dereference_rcu((&tn->sk_rht)->tbl, &tn->sk_rht);
2773         for (; tbl_id < tbl->size; tbl_id++) {
2774                 rht_for_each_entry_rcu(tsk, pos, tbl, tbl_id, node) {
2775                         spin_lock_bh(&tsk->sk.sk_lock.slock);
2776                         if (prev_portid && prev_portid != tsk->portid) {
2777                                 spin_unlock_bh(&tsk->sk.sk_lock.slock);
2778                                 continue;
2779                         }
2780
2781                         err = __tipc_nl_add_sk(skb, cb, tsk);
2782                         if (err) {
2783                                 prev_portid = tsk->portid;
2784                                 spin_unlock_bh(&tsk->sk.sk_lock.slock);
2785                                 goto out;
2786                         }
2787                         prev_portid = 0;
2788                         spin_unlock_bh(&tsk->sk.sk_lock.slock);
2789                 }
2790         }
2791 out:
2792         rcu_read_unlock();
2793         cb->args[0] = tbl_id;
2794         cb->args[1] = prev_portid;
2795
2796         return skb->len;
2797 }
2798
2799 /* Caller should hold socket lock for the passed tipc socket. */
2800 static int __tipc_nl_add_sk_publ(struct sk_buff *skb,
2801                                  struct netlink_callback *cb,
2802                                  struct publication *publ)
2803 {
2804         void *hdr;
2805         struct nlattr *attrs;
2806
2807         hdr = genlmsg_put(skb, NETLINK_CB(cb->skb).portid, cb->nlh->nlmsg_seq,
2808                           &tipc_genl_family, NLM_F_MULTI, TIPC_NL_PUBL_GET);
2809         if (!hdr)
2810                 goto msg_cancel;
2811
2812         attrs = nla_nest_start(skb, TIPC_NLA_PUBL);
2813         if (!attrs)
2814                 goto genlmsg_cancel;
2815
2816         if (nla_put_u32(skb, TIPC_NLA_PUBL_KEY, publ->key))
2817                 goto attr_msg_cancel;
2818         if (nla_put_u32(skb, TIPC_NLA_PUBL_TYPE, publ->type))
2819                 goto attr_msg_cancel;
2820         if (nla_put_u32(skb, TIPC_NLA_PUBL_LOWER, publ->lower))
2821                 goto attr_msg_cancel;
2822         if (nla_put_u32(skb, TIPC_NLA_PUBL_UPPER, publ->upper))
2823                 goto attr_msg_cancel;
2824
2825         nla_nest_end(skb, attrs);
2826         genlmsg_end(skb, hdr);
2827
2828         return 0;
2829
2830 attr_msg_cancel:
2831         nla_nest_cancel(skb, attrs);
2832 genlmsg_cancel:
2833         genlmsg_cancel(skb, hdr);
2834 msg_cancel:
2835         return -EMSGSIZE;
2836 }
2837
2838 /* Caller should hold socket lock for the passed tipc socket. */
2839 static int __tipc_nl_list_sk_publ(struct sk_buff *skb,
2840                                   struct netlink_callback *cb,
2841                                   struct tipc_sock *tsk, u32 *last_publ)
2842 {
2843         int err;
2844         struct publication *p;
2845
2846         if (*last_publ) {
2847                 list_for_each_entry(p, &tsk->publications, pport_list) {
2848                         if (p->key == *last_publ)
2849                                 break;
2850                 }
2851                 if (p->key != *last_publ) {
2852                         /* We never set seq or call nl_dump_check_consistent()
2853                          * this means that setting prev_seq here will cause the
2854                          * consistence check to fail in the netlink callback
2855                          * handler. Resulting in the last NLMSG_DONE message
2856                          * having the NLM_F_DUMP_INTR flag set.
2857                          */
2858                         cb->prev_seq = 1;
2859                         *last_publ = 0;
2860                         return -EPIPE;
2861                 }
2862         } else {
2863                 p = list_first_entry(&tsk->publications, struct publication,
2864                                      pport_list);
2865         }
2866
2867         list_for_each_entry_from(p, &tsk->publications, pport_list) {
2868                 err = __tipc_nl_add_sk_publ(skb, cb, p);
2869                 if (err) {
2870                         *last_publ = p->key;
2871                         return err;
2872                 }
2873         }
2874         *last_publ = 0;
2875
2876         return 0;
2877 }
2878
2879 int tipc_nl_publ_dump(struct sk_buff *skb, struct netlink_callback *cb)
2880 {
2881         int err;
2882         u32 tsk_portid = cb->args[0];
2883         u32 last_publ = cb->args[1];
2884         u32 done = cb->args[2];
2885         struct net *net = sock_net(skb->sk);
2886         struct tipc_sock *tsk;
2887
2888         if (!tsk_portid) {
2889                 struct nlattr **attrs;
2890                 struct nlattr *sock[TIPC_NLA_SOCK_MAX + 1];
2891
2892                 err = tipc_nlmsg_parse(cb->nlh, &attrs);
2893                 if (err)
2894                         return err;
2895
2896                 if (!attrs[TIPC_NLA_SOCK])
2897                         return -EINVAL;
2898
2899                 err = nla_parse_nested(sock, TIPC_NLA_SOCK_MAX,
2900                                        attrs[TIPC_NLA_SOCK],
2901                                        tipc_nl_sock_policy);
2902                 if (err)
2903                         return err;
2904
2905                 if (!sock[TIPC_NLA_SOCK_REF])
2906                         return -EINVAL;
2907
2908                 tsk_portid = nla_get_u32(sock[TIPC_NLA_SOCK_REF]);
2909         }
2910
2911         if (done)
2912                 return 0;
2913
2914         tsk = tipc_sk_lookup(net, tsk_portid);
2915         if (!tsk)
2916                 return -EINVAL;
2917
2918         lock_sock(&tsk->sk);
2919         err = __tipc_nl_list_sk_publ(skb, cb, tsk, &last_publ);
2920         if (!err)
2921                 done = 1;
2922         release_sock(&tsk->sk);
2923         sock_put(&tsk->sk);
2924
2925         cb->args[0] = tsk_portid;
2926         cb->args[1] = last_publ;
2927         cb->args[2] = done;
2928
2929         return skb->len;
2930 }