]> asedeno.scripts.mit.edu Git - linux.git/blobdiff - net/tipc/socket.c
Merge tag 'arm64-upstream' of git://git.kernel.org/pub/scm/linux/kernel/git/arm64/linux
[linux.git] / net / tipc / socket.c
index 4b92b196cfa668c9ee59b9dae4e83bb7b5b5ba3d..41688da233aba37f92416d01a59a9e6b68a74e31 100644 (file)
@@ -75,6 +75,7 @@ struct sockaddr_pair {
  * @conn_instance: TIPC instance used when connection was established
  * @published: non-zero if port has one or more associated names
  * @max_pkt: maximum packet size "hint" used when building messages sent by port
+ * @maxnagle: maximum size of msg which can be subject to nagle
  * @portid: unique port identity in TIPC socket hash table
  * @phdr: preformatted message header used when sending messages
  * #cong_links: list of congested links
@@ -97,6 +98,7 @@ struct tipc_sock {
        u32 conn_instance;
        int published;
        u32 max_pkt;
+       u32 maxnagle;
        u32 portid;
        struct tipc_msg phdr;
        struct list_head cong_links;
@@ -116,6 +118,10 @@ struct tipc_sock {
        struct tipc_mc_method mc_method;
        struct rcu_head rcu;
        struct tipc_group *group;
+       u32 oneway;
+       u16 snd_backlog;
+       bool expect_ack;
+       bool nodelay;
        bool group_is_open;
 };
 
@@ -137,6 +143,7 @@ static int tipc_sk_insert(struct tipc_sock *tsk);
 static void tipc_sk_remove(struct tipc_sock *tsk);
 static int __tipc_sendstream(struct socket *sock, struct msghdr *m, size_t dsz);
 static int __tipc_sendmsg(struct socket *sock, struct msghdr *m, size_t dsz);
+static void tipc_sk_push_backlog(struct tipc_sock *tsk);
 
 static const struct proto_ops packet_ops;
 static const struct proto_ops stream_ops;
@@ -227,6 +234,26 @@ static u16 tsk_inc(struct tipc_sock *tsk, int msglen)
        return 1;
 }
 
+/* tsk_set_nagle - enable/disable nagle property by manipulating maxnagle
+ */
+static void tsk_set_nagle(struct tipc_sock *tsk)
+{
+       struct sock *sk = &tsk->sk;
+
+       tsk->maxnagle = 0;
+       if (sk->sk_type != SOCK_STREAM)
+               return;
+       if (tsk->nodelay)
+               return;
+       if (!(tsk->peer_caps & TIPC_NAGLE))
+               return;
+       /* Limit node local buffer size to avoid receive queue overflow */
+       if (tsk->max_pkt == MAX_MSG_SIZE)
+               tsk->maxnagle = 1500;
+       else
+               tsk->maxnagle = tsk->max_pkt;
+}
+
 /**
  * tsk_advance_rx_queue - discard first buffer in socket receive queue
  *
@@ -446,6 +473,7 @@ static int tipc_sk_create(struct net *net, struct socket *sock,
 
        tsk = tipc_sk(sk);
        tsk->max_pkt = MAX_PKT_DEFAULT;
+       tsk->maxnagle = 0;
        INIT_LIST_HEAD(&tsk->publications);
        INIT_LIST_HEAD(&tsk->cong_links);
        msg = &tsk->phdr;
@@ -504,7 +532,7 @@ static void __tipc_shutdown(struct socket *sock, int error)
        struct sock *sk = sock->sk;
        struct tipc_sock *tsk = tipc_sk(sk);
        struct net *net = sock_net(sk);
-       long timeout = CONN_TIMEOUT_DEFAULT;
+       long timeout = msecs_to_jiffies(CONN_TIMEOUT_DEFAULT);
        u32 dnode = tsk_peer_node(tsk);
        struct sk_buff *skb;
 
@@ -512,7 +540,9 @@ static void __tipc_shutdown(struct socket *sock, int error)
        tipc_wait_for_cond(sock, &timeout, (!tsk->cong_link_cnt &&
                                            !tsk_conn_cong(tsk)));
 
-       /* Remove any pending SYN message */
+       /* Push out delayed messages if in Nagle mode */
+       tipc_sk_push_backlog(tsk);
+       /* Remove pending SYN */
        __skb_queue_purge(&sk->sk_write_queue);
 
        /* Reject all unreceived messages, except on an active connection
@@ -854,7 +884,7 @@ static int tipc_send_group_msg(struct net *net, struct tipc_sock *tsk,
 
        /* Build message as chain of buffers */
        __skb_queue_head_init(&pkts);
-       mtu = tipc_node_get_mtu(net, dnode, tsk->portid);
+       mtu = tipc_node_get_mtu(net, dnode, tsk->portid, false);
        rc = tipc_msg_build(hdr, m, 0, dlen, mtu, &pkts);
        if (unlikely(rc != dlen))
                return rc;
@@ -1208,6 +1238,32 @@ void tipc_sk_mcast_rcv(struct net *net, struct sk_buff_head *arrvq,
        tipc_sk_rcv(net, inputq);
 }
 
+/* tipc_sk_push_backlog(): send accumulated buffers in socket write queue
+ *                         when socket is in Nagle mode
+ */
+static void tipc_sk_push_backlog(struct tipc_sock *tsk)
+{
+       struct sk_buff_head *txq = &tsk->sk.sk_write_queue;
+       struct net *net = sock_net(&tsk->sk);
+       u32 dnode = tsk_peer_node(tsk);
+       struct sk_buff *skb = skb_peek(txq);
+       int rc;
+
+       if (!skb || tsk->cong_link_cnt)
+               return;
+
+       /* Do not send SYN again after congestion */
+       if (msg_is_syn(buf_msg(skb)))
+               return;
+
+       tsk->snt_unacked += tsk->snd_backlog;
+       tsk->snd_backlog = 0;
+       tsk->expect_ack = true;
+       rc = tipc_node_xmit(net, txq, dnode, tsk->portid);
+       if (rc == -ELINKCONG)
+               tsk->cong_link_cnt = 1;
+}
+
 /**
  * tipc_sk_conn_proto_rcv - receive a connection mng protocol message
  * @tsk: receiving socket
@@ -1221,7 +1277,7 @@ static void tipc_sk_conn_proto_rcv(struct tipc_sock *tsk, struct sk_buff *skb,
        u32 onode = tsk_own_node(tsk);
        struct sock *sk = &tsk->sk;
        int mtyp = msg_type(hdr);
-       bool conn_cong;
+       bool was_cong;
 
        /* Ignore if connection cannot be validated: */
        if (!tsk_peer_msg(tsk, hdr)) {
@@ -1254,11 +1310,13 @@ static void tipc_sk_conn_proto_rcv(struct tipc_sock *tsk, struct sk_buff *skb,
                        __skb_queue_tail(xmitq, skb);
                return;
        } else if (mtyp == CONN_ACK) {
-               conn_cong = tsk_conn_cong(tsk);
+               was_cong = tsk_conn_cong(tsk);
+               tsk->expect_ack = false;
+               tipc_sk_push_backlog(tsk);
                tsk->snt_unacked -= msg_conn_ack(hdr);
                if (tsk->peer_caps & TIPC_BLOCK_FLOWCTL)
                        tsk->snd_win = msg_adv_win(hdr);
-               if (conn_cong)
+               if (was_cong && !tsk_conn_cong(tsk))
                        sk->sk_write_space(sk);
        } else if (mtyp != CONN_PROBE_REPLY) {
                pr_warn("Received unknown CONN_PROTO msg\n");
@@ -1388,12 +1446,14 @@ static int __tipc_sendmsg(struct socket *sock, struct msghdr *m, size_t dlen)
                return rc;
 
        __skb_queue_head_init(&pkts);
-       mtu = tipc_node_get_mtu(net, dnode, tsk->portid);
+       mtu = tipc_node_get_mtu(net, dnode, tsk->portid, false);
        rc = tipc_msg_build(hdr, m, 0, dlen, mtu, &pkts);
        if (unlikely(rc != dlen))
                return rc;
-       if (unlikely(syn && !tipc_msg_skb_clone(&pkts, &sk->sk_write_queue)))
+       if (unlikely(syn && !tipc_msg_skb_clone(&pkts, &sk->sk_write_queue))) {
+               __skb_queue_purge(&pkts);
                return -ENOMEM;
+       }
 
        trace_tipc_sk_sendmsg(sk, skb_peek(&pkts), TIPC_DUMP_SK_SNDQ, " ");
        rc = tipc_node_xmit(net, &pkts, dnode, tsk->portid);
@@ -1437,15 +1497,15 @@ static int __tipc_sendstream(struct socket *sock, struct msghdr *m, size_t dlen)
        struct sock *sk = sock->sk;
        DECLARE_SOCKADDR(struct sockaddr_tipc *, dest, m->msg_name);
        long timeout = sock_sndtimeo(sk, m->msg_flags & MSG_DONTWAIT);
+       struct sk_buff_head *txq = &sk->sk_write_queue;
        struct tipc_sock *tsk = tipc_sk(sk);
        struct tipc_msg *hdr = &tsk->phdr;
        struct net *net = sock_net(sk);
-       struct sk_buff_head pkts;
        u32 dnode = tsk_peer_node(tsk);
+       int maxnagle = tsk->maxnagle;
+       int maxpkt = tsk->max_pkt;
        int send, sent = 0;
-       int rc = 0;
-
-       __skb_queue_head_init(&pkts);
+       int blocks, rc = 0;
 
        if (unlikely(dlen > INT_MAX))
                return -EMSGSIZE;
@@ -1467,21 +1527,35 @@ static int __tipc_sendstream(struct socket *sock, struct msghdr *m, size_t dlen)
                                         tipc_sk_connected(sk)));
                if (unlikely(rc))
                        break;
-
                send = min_t(size_t, dlen - sent, TIPC_MAX_USER_MSG_SIZE);
-               rc = tipc_msg_build(hdr, m, sent, send, tsk->max_pkt, &pkts);
-               if (unlikely(rc != send))
-                       break;
-
-               trace_tipc_sk_sendstream(sk, skb_peek(&pkts),
+               blocks = tsk->snd_backlog;
+               if (tsk->oneway++ >= 4 && send <= maxnagle) {
+                       rc = tipc_msg_append(hdr, m, send, maxnagle, txq);
+                       if (unlikely(rc < 0))
+                               break;
+                       blocks += rc;
+                       if (blocks <= 64 && tsk->expect_ack) {
+                               tsk->snd_backlog = blocks;
+                               sent += send;
+                               break;
+                       }
+                       tsk->expect_ack = true;
+               } else {
+                       rc = tipc_msg_build(hdr, m, sent, send, maxpkt, txq);
+                       if (unlikely(rc != send))
+                               break;
+                       blocks += tsk_inc(tsk, send + MIN_H_SIZE);
+               }
+               trace_tipc_sk_sendstream(sk, skb_peek(txq),
                                         TIPC_DUMP_SK_SNDQ, " ");
-               rc = tipc_node_xmit(net, &pkts, dnode, tsk->portid);
+               rc = tipc_node_xmit(net, txq, dnode, tsk->portid);
                if (unlikely(rc == -ELINKCONG)) {
                        tsk->cong_link_cnt = 1;
                        rc = 0;
                }
                if (likely(!rc)) {
-                       tsk->snt_unacked += tsk_inc(tsk, send + MIN_H_SIZE);
+                       tsk->snt_unacked += blocks;
+                       tsk->snd_backlog = 0;
                        sent += send;
                }
        } while (sent < dlen && !rc);
@@ -1526,8 +1600,9 @@ static void tipc_sk_finish_conn(struct tipc_sock *tsk, u32 peer_port,
        sk_reset_timer(sk, &sk->sk_timer, jiffies + CONN_PROBING_INTV);
        tipc_set_sk_state(sk, TIPC_ESTABLISHED);
        tipc_node_add_conn(net, peer_node, tsk->portid, peer_port);
-       tsk->max_pkt = tipc_node_get_mtu(net, peer_node, tsk->portid);
+       tsk->max_pkt = tipc_node_get_mtu(net, peer_node, tsk->portid, true);
        tsk->peer_caps = tipc_node_get_capabilities(net, peer_node);
+       tsk_set_nagle(tsk);
        __skb_queue_purge(&sk->sk_write_queue);
        if (tsk->peer_caps & TIPC_BLOCK_FLOWCTL)
                return;
@@ -1848,6 +1923,7 @@ static int tipc_recvstream(struct socket *sock, struct msghdr *m,
        bool peek = flags & MSG_PEEK;
        int offset, required, copy, copied = 0;
        int hlen, dlen, err, rc;
+       bool ack = false;
        long timeout;
 
        /* Catch invalid receive attempts */
@@ -1892,6 +1968,7 @@ static int tipc_recvstream(struct socket *sock, struct msghdr *m,
 
                /* Copy data if msg ok, otherwise return error/partial data */
                if (likely(!err)) {
+                       ack = msg_ack_required(hdr);
                        offset = skb_cb->bytes_read;
                        copy = min_t(int, dlen - offset, buflen - copied);
                        rc = skb_copy_datagram_msg(skb, hlen + offset, m, copy);
@@ -1919,7 +1996,7 @@ static int tipc_recvstream(struct socket *sock, struct msghdr *m,
 
                /* Send connection flow control advertisement when applicable */
                tsk->rcv_unacked += tsk_inc(tsk, hlen + dlen);
-               if (unlikely(tsk->rcv_unacked >= tsk->rcv_win / TIPC_ACK_RATE))
+               if (ack || tsk->rcv_unacked >= tsk->rcv_win / TIPC_ACK_RATE)
                        tipc_sk_send_ack(tsk);
 
                /* Exit if all requested data or FIN/error received */
@@ -1990,6 +2067,7 @@ static void tipc_sk_proto_rcv(struct sock *sk,
                smp_wmb();
                tsk->cong_link_cnt--;
                wakeup = true;
+               tipc_sk_push_backlog(tsk);
                break;
        case GROUP_PROTOCOL:
                tipc_group_proto_rcv(grp, &wakeup, hdr, inputq, xmitq);
@@ -2029,6 +2107,7 @@ static bool tipc_sk_filter_connect(struct tipc_sock *tsk, struct sk_buff *skb)
 
        if (unlikely(msg_mcast(hdr)))
                return false;
+       tsk->oneway = 0;
 
        switch (sk->sk_state) {
        case TIPC_CONNECTING:
@@ -2074,6 +2153,8 @@ static bool tipc_sk_filter_connect(struct tipc_sock *tsk, struct sk_buff *skb)
                        return true;
                return false;
        case TIPC_ESTABLISHED:
+               if (!skb_queue_empty(&sk->sk_write_queue))
+                       tipc_sk_push_backlog(tsk);
                /* Accept only connection-based messages sent by peer */
                if (likely(con_msg && !err && pport == oport && pnode == onode))
                        return true;
@@ -2681,6 +2762,7 @@ static void tipc_sk_timeout(struct timer_list *t)
        if (sock_owned_by_user(sk)) {
                sk_reset_timer(sk, &sk->sk_timer, jiffies + HZ / 20);
                bh_unlock_sock(sk);
+               sock_put(sk);
                return;
        }
 
@@ -2804,7 +2886,7 @@ static struct tipc_sock *tipc_sk_lookup(struct net *net, u32 portid)
        struct tipc_sock *tsk;
 
        rcu_read_lock();
-       tsk = rhashtable_lookup_fast(&tn->sk_rht, &portid, tsk_rht_params);
+       tsk = rhashtable_lookup(&tn->sk_rht, &portid, tsk_rht_params);
        if (tsk)
                sock_hold(&tsk->sk);
        rcu_read_unlock();
@@ -2959,6 +3041,7 @@ static int tipc_setsockopt(struct socket *sock, int lvl, int opt,
        case TIPC_SRC_DROPPABLE:
        case TIPC_DEST_DROPPABLE:
        case TIPC_CONN_TIMEOUT:
+       case TIPC_NODELAY:
                if (ol < sizeof(value))
                        return -EINVAL;
                if (get_user(value, (u32 __user *)ov))
@@ -3007,6 +3090,10 @@ static int tipc_setsockopt(struct socket *sock, int lvl, int opt,
        case TIPC_GROUP_LEAVE:
                res = tipc_sk_leave(tsk);
                break;
+       case TIPC_NODELAY:
+               tsk->nodelay = !!value;
+               tsk_set_nagle(tsk);
+               break;
        default:
                res = -EINVAL;
        }
@@ -3588,13 +3675,9 @@ int tipc_nl_publ_dump(struct sk_buff *skb, struct netlink_callback *cb)
        struct tipc_sock *tsk;
 
        if (!tsk_portid) {
-               struct nlattr **attrs;
+               struct nlattr **attrs = genl_dumpit_info(cb)->attrs;
                struct nlattr *sock[TIPC_NLA_SOCK_MAX + 1];
 
-               err = tipc_nlmsg_parse(cb->nlh, &attrs);
-               if (err)
-                       return err;
-
                if (!attrs[TIPC_NLA_SOCK])
                        return -EINVAL;