]> asedeno.scripts.mit.edu Git - linux.git/blobdiff - net/tipc/link.c
Merge git://git.kernel.org/pub/scm/linux/kernel/git/davem/net
[linux.git] / net / tipc / link.c
index 131aa2f0fd27c46e14f024b317dd65c786b0bea4..6053489c8063633ef7f206977c5ca27985edf5ff 100644 (file)
@@ -151,6 +151,7 @@ struct tipc_link {
        /* Failover/synch */
        u16 drop_point;
        struct sk_buff *failover_reasm_skb;
+       struct sk_buff_head failover_deferdq;
 
        /* Max packet negotiation */
        u16 mtu;
@@ -209,6 +210,7 @@ enum {
 };
 
 #define TIPC_BC_RETR_LIM msecs_to_jiffies(10)   /* [ms] */
+#define TIPC_UC_RETR_TIME (jiffies + msecs_to_jiffies(1))
 
 /*
  * Interval between NACKs when packets arrive out of order
@@ -246,6 +248,10 @@ static int tipc_link_build_nack_msg(struct tipc_link *l,
 static void tipc_link_build_bc_init_msg(struct tipc_link *l,
                                        struct sk_buff_head *xmitq);
 static bool tipc_link_release_pkts(struct tipc_link *l, u16 to);
+static u16 tipc_build_gap_ack_blks(struct tipc_link *l, void *data);
+static void tipc_link_advance_transmq(struct tipc_link *l, u16 acked, u16 gap,
+                                     struct tipc_gap_ack_blks *ga,
+                                     struct sk_buff_head *xmitq);
 
 /*
  *  Simple non-static link routines (i.e. referenced outside this file)
@@ -493,6 +499,7 @@ bool tipc_link_create(struct net *net, char *if_name, int bearer_id,
        __skb_queue_head_init(&l->transmq);
        __skb_queue_head_init(&l->backlogq);
        __skb_queue_head_init(&l->deferdq);
+       __skb_queue_head_init(&l->failover_deferdq);
        skb_queue_head_init(&l->wakeupq);
        skb_queue_head_init(l->inputq);
        return true;
@@ -885,6 +892,7 @@ void tipc_link_reset(struct tipc_link *l)
        __skb_queue_purge(&l->transmq);
        __skb_queue_purge(&l->deferdq);
        __skb_queue_purge(&l->backlogq);
+       __skb_queue_purge(&l->failover_deferdq);
        l->backlog[TIPC_LOW_IMPORTANCE].len = 0;
        l->backlog[TIPC_MEDIUM_IMPORTANCE].len = 0;
        l->backlog[TIPC_HIGH_IMPORTANCE].len = 0;
@@ -1156,34 +1164,14 @@ static bool tipc_data_input(struct tipc_link *l, struct sk_buff *skb,
  * Consumes buffer
  */
 static int tipc_link_input(struct tipc_link *l, struct sk_buff *skb,
-                          struct sk_buff_head *inputq)
+                          struct sk_buff_head *inputq,
+                          struct sk_buff **reasm_skb)
 {
        struct tipc_msg *hdr = buf_msg(skb);
-       struct sk_buff **reasm_skb = &l->reasm_buf;
        struct sk_buff *iskb;
        struct sk_buff_head tmpq;
        int usr = msg_user(hdr);
-       int rc = 0;
        int pos = 0;
-       int ipos = 0;
-
-       if (unlikely(usr == TUNNEL_PROTOCOL)) {
-               if (msg_type(hdr) == SYNCH_MSG) {
-                       __skb_queue_purge(&l->deferdq);
-                       goto drop;
-               }
-               if (!tipc_msg_extract(skb, &iskb, &ipos))
-                       return rc;
-               kfree_skb(skb);
-               skb = iskb;
-               hdr = buf_msg(skb);
-               if (less(msg_seqno(hdr), l->drop_point))
-                       goto drop;
-               if (tipc_data_input(l, skb, inputq))
-                       return rc;
-               usr = msg_user(hdr);
-               reasm_skb = &l->failover_reasm_skb;
-       }
 
        if (usr == MSG_BUNDLER) {
                skb_queue_head_init(&tmpq);
@@ -1208,11 +1196,66 @@ static int tipc_link_input(struct tipc_link *l, struct sk_buff *skb,
                tipc_link_bc_init_rcv(l->bc_rcvlink, hdr);
                tipc_bcast_unlock(l->net);
        }
-drop:
+
        kfree_skb(skb);
        return 0;
 }
 
+/* tipc_link_tnl_rcv() - receive TUNNEL_PROTOCOL message, drop or process the
+ *                      inner message along with the ones in the old link's
+ *                      deferdq
+ * @l: tunnel link
+ * @skb: TUNNEL_PROTOCOL message
+ * @inputq: queue to put messages ready for delivery
+ */
+static int tipc_link_tnl_rcv(struct tipc_link *l, struct sk_buff *skb,
+                            struct sk_buff_head *inputq)
+{
+       struct sk_buff **reasm_skb = &l->failover_reasm_skb;
+       struct sk_buff_head *fdefq = &l->failover_deferdq;
+       struct tipc_msg *hdr = buf_msg(skb);
+       struct sk_buff *iskb;
+       int ipos = 0;
+       int rc = 0;
+       u16 seqno;
+
+       /* SYNCH_MSG */
+       if (msg_type(hdr) == SYNCH_MSG)
+               goto drop;
+
+       /* FAILOVER_MSG */
+       if (!tipc_msg_extract(skb, &iskb, &ipos)) {
+               pr_warn_ratelimited("Cannot extract FAILOVER_MSG, defq: %d\n",
+                                   skb_queue_len(fdefq));
+               return rc;
+       }
+
+       do {
+               seqno = buf_seqno(iskb);
+
+               if (unlikely(less(seqno, l->drop_point))) {
+                       kfree_skb(iskb);
+                       continue;
+               }
+
+               if (unlikely(seqno != l->drop_point)) {
+                       __tipc_skb_queue_sorted(fdefq, seqno, iskb);
+                       continue;
+               }
+
+               l->drop_point++;
+
+               if (!tipc_data_input(l, iskb, inputq))
+                       rc |= tipc_link_input(l, iskb, inputq, reasm_skb);
+               if (unlikely(rc))
+                       break;
+       } while ((iskb = __tipc_skb_dequeue(fdefq, l->drop_point)));
+
+drop:
+       kfree_skb(skb);
+       return rc;
+}
+
 static bool tipc_link_release_pkts(struct tipc_link *l, u16 acked)
 {
        bool released = false;
@@ -1228,6 +1271,106 @@ static bool tipc_link_release_pkts(struct tipc_link *l, u16 acked)
        return released;
 }
 
+/* tipc_build_gap_ack_blks - build Gap ACK blocks
+ * @l: tipc link that data have come with gaps in sequence if any
+ * @data: data buffer to store the Gap ACK blocks after built
+ *
+ * returns the actual allocated memory size
+ */
+static u16 tipc_build_gap_ack_blks(struct tipc_link *l, void *data)
+{
+       struct sk_buff *skb = skb_peek(&l->deferdq);
+       struct tipc_gap_ack_blks *ga = data;
+       u16 len, expect, seqno = 0;
+       u8 n = 0;
+
+       if (!skb)
+               goto exit;
+
+       expect = buf_seqno(skb);
+       skb_queue_walk(&l->deferdq, skb) {
+               seqno = buf_seqno(skb);
+               if (unlikely(more(seqno, expect))) {
+                       ga->gacks[n].ack = htons(expect - 1);
+                       ga->gacks[n].gap = htons(seqno - expect);
+                       if (++n >= MAX_GAP_ACK_BLKS) {
+                               pr_info_ratelimited("Too few Gap ACK blocks!\n");
+                               goto exit;
+                       }
+               } else if (unlikely(less(seqno, expect))) {
+                       pr_warn("Unexpected skb in deferdq!\n");
+                       continue;
+               }
+               expect = seqno + 1;
+       }
+
+       /* last block */
+       ga->gacks[n].ack = htons(seqno);
+       ga->gacks[n].gap = 0;
+       n++;
+
+exit:
+       len = tipc_gap_ack_blks_sz(n);
+       ga->len = htons(len);
+       ga->gack_cnt = n;
+       return len;
+}
+
+/* tipc_link_advance_transmq - advance TIPC link transmq queue by releasing
+ *                            acked packets, also doing retransmissions if
+ *                            gaps found
+ * @l: tipc link with transmq queue to be advanced
+ * @acked: seqno of last packet acked by peer without any gaps before
+ * @gap: # of gap packets
+ * @ga: buffer pointer to Gap ACK blocks from peer
+ * @xmitq: queue for accumulating the retransmitted packets if any
+ */
+static void tipc_link_advance_transmq(struct tipc_link *l, u16 acked, u16 gap,
+                                     struct tipc_gap_ack_blks *ga,
+                                     struct sk_buff_head *xmitq)
+{
+       struct sk_buff *skb, *_skb, *tmp;
+       struct tipc_msg *hdr;
+       u16 bc_ack = l->bc_rcvlink->rcv_nxt - 1;
+       u16 ack = l->rcv_nxt - 1;
+       u16 seqno;
+       u16 n = 0;
+
+       skb_queue_walk_safe(&l->transmq, skb, tmp) {
+               seqno = buf_seqno(skb);
+
+next_gap_ack:
+               if (less_eq(seqno, acked)) {
+                       /* release skb */
+                       __skb_unlink(skb, &l->transmq);
+                       kfree_skb(skb);
+               } else if (less_eq(seqno, acked + gap)) {
+                       /* retransmit skb */
+                       if (time_before(jiffies, TIPC_SKB_CB(skb)->nxt_retr))
+                               continue;
+                       TIPC_SKB_CB(skb)->nxt_retr = TIPC_UC_RETR_TIME;
+
+                       _skb = __pskb_copy(skb, MIN_H_SIZE, GFP_ATOMIC);
+                       if (!_skb)
+                               continue;
+                       hdr = buf_msg(_skb);
+                       msg_set_ack(hdr, ack);
+                       msg_set_bcast_ack(hdr, bc_ack);
+                       _skb->priority = TC_PRIO_CONTROL;
+                       __skb_queue_tail(xmitq, _skb);
+                       l->stats.retransmitted++;
+               } else {
+                       /* retry with Gap ACK blocks if any */
+                       if (!ga || n >= ga->gack_cnt)
+                               break;
+                       acked = ntohs(ga->gacks[n].ack);
+                       gap = ntohs(ga->gacks[n].gap);
+                       n++;
+                       goto next_gap_ack;
+               }
+       }
+}
+
 /* tipc_link_build_state_msg: prepare link state message for transmission
  *
  * Note that sending of broadcast ack is coordinated among nodes, to reduce
@@ -1282,6 +1425,7 @@ static int tipc_link_build_nack_msg(struct tipc_link *l,
                                    struct sk_buff_head *xmitq)
 {
        u32 def_cnt = ++l->stats.deferred_recv;
+       u32 defq_len = skb_queue_len(&l->deferdq);
        int match1, match2;
 
        if (link_is_bc_rcvlink(l)) {
@@ -1292,7 +1436,7 @@ static int tipc_link_build_nack_msg(struct tipc_link *l,
                return 0;
        }
 
-       if ((skb_queue_len(&l->deferdq) == 1) || !(def_cnt % TIPC_NACK_INTV))
+       if (defq_len >= 3 && !((defq_len - 3) % 16))
                tipc_link_build_proto_msg(l, STATE_MSG, 0, 0, 0, 0, 0, xmitq);
        return 0;
 }
@@ -1306,29 +1450,29 @@ int tipc_link_rcv(struct tipc_link *l, struct sk_buff *skb,
                  struct sk_buff_head *xmitq)
 {
        struct sk_buff_head *defq = &l->deferdq;
-       struct tipc_msg *hdr;
+       struct tipc_msg *hdr = buf_msg(skb);
        u16 seqno, rcv_nxt, win_lim;
        int rc = 0;
 
+       /* Verify and update link state */
+       if (unlikely(msg_user(hdr) == LINK_PROTOCOL))
+               return tipc_link_proto_rcv(l, skb, xmitq);
+
+       /* Don't send probe at next timeout expiration */
+       l->silent_intv_cnt = 0;
+
        do {
                hdr = buf_msg(skb);
                seqno = msg_seqno(hdr);
                rcv_nxt = l->rcv_nxt;
                win_lim = rcv_nxt + TIPC_MAX_LINK_WIN;
 
-               /* Verify and update link state */
-               if (unlikely(msg_user(hdr) == LINK_PROTOCOL))
-                       return tipc_link_proto_rcv(l, skb, xmitq);
-
                if (unlikely(!link_is_up(l))) {
                        if (l->state == LINK_ESTABLISHING)
                                rc = TIPC_LINK_UP_EVT;
                        goto drop;
                }
 
-               /* Don't send probe at next timeout expiration */
-               l->silent_intv_cnt = 0;
-
                /* Drop if outside receive window */
                if (unlikely(less(seqno, rcv_nxt) || more(seqno, win_lim))) {
                        l->stats.duplicates++;
@@ -1353,13 +1497,16 @@ int tipc_link_rcv(struct tipc_link *l, struct sk_buff *skb,
                /* Deliver packet */
                l->rcv_nxt++;
                l->stats.recv_pkts++;
-               if (!tipc_data_input(l, skb, l->inputq))
-                       rc |= tipc_link_input(l, skb, l->inputq);
+
+               if (unlikely(msg_user(hdr) == TUNNEL_PROTOCOL))
+                       rc |= tipc_link_tnl_rcv(l, skb, l->inputq);
+               else if (!tipc_data_input(l, skb, l->inputq))
+                       rc |= tipc_link_input(l, skb, l->inputq, &l->reasm_buf);
                if (unlikely(++l->rcv_unacked >= TIPC_MIN_LINK_WIN))
                        rc |= tipc_link_build_state_msg(l, xmitq);
                if (unlikely(rc & ~TIPC_LINK_SND_STATE))
                        break;
-       } while ((skb = __skb_dequeue(defq)));
+       } while ((skb = __tipc_skb_dequeue(defq, l->rcv_nxt)));
 
        return rc;
 drop:
@@ -1380,6 +1527,7 @@ static void tipc_link_build_proto_msg(struct tipc_link *l, int mtyp, bool probe,
        struct tipc_mon_state *mstate = &l->mon_state;
        int dlen = 0;
        void *data;
+       u16 glen = 0;
 
        /* Don't send protocol message during reset or link failover */
        if (tipc_link_is_blocked(l))
@@ -1392,8 +1540,8 @@ static void tipc_link_build_proto_msg(struct tipc_link *l, int mtyp, bool probe,
                rcvgap = buf_seqno(skb_peek(dfq)) - l->rcv_nxt;
 
        skb = tipc_msg_create(LINK_PROTOCOL, mtyp, INT_H_SIZE,
-                             tipc_max_domain_size, l->addr,
-                             tipc_own_addr(l->net), 0, 0, 0);
+                             tipc_max_domain_size + MAX_GAP_ACK_BLKS_SZ,
+                             l->addr, tipc_own_addr(l->net), 0, 0, 0);
        if (!skb)
                return;
 
@@ -1420,9 +1568,11 @@ static void tipc_link_build_proto_msg(struct tipc_link *l, int mtyp, bool probe,
                msg_set_bc_gap(hdr, link_bc_rcv_gap(bcl));
                msg_set_probe(hdr, probe);
                msg_set_is_keepalive(hdr, probe || probe_reply);
-               tipc_mon_prep(l->net, data, &dlen, mstate, l->bearer_id);
-               msg_set_size(hdr, INT_H_SIZE + dlen);
-               skb_trim(skb, INT_H_SIZE + dlen);
+               if (l->peer_caps & TIPC_GAP_ACK_BLOCK)
+                       glen = tipc_build_gap_ack_blks(l, data);
+               tipc_mon_prep(l->net, data + glen, &dlen, mstate, l->bearer_id);
+               msg_set_size(hdr, INT_H_SIZE + glen + dlen);
+               skb_trim(skb, INT_H_SIZE + glen + dlen);
                l->stats.sent_states++;
                l->rcv_unacked = 0;
        } else {
@@ -1481,6 +1631,7 @@ void tipc_link_create_dummy_tnl_msg(struct tipc_link *l,
 void tipc_link_tnl_prepare(struct tipc_link *l, struct tipc_link *tnl,
                           int mtyp, struct sk_buff_head *xmitq)
 {
+       struct sk_buff_head *fdefq = &tnl->failover_deferdq;
        struct sk_buff *skb, *tnlskb;
        struct tipc_msg *hdr, tnlhdr;
        struct sk_buff_head *queue = &l->transmq;
@@ -1508,7 +1659,11 @@ void tipc_link_tnl_prepare(struct tipc_link *l, struct tipc_link *tnl,
        /* Initialize reusable tunnel packet header */
        tipc_msg_init(tipc_own_addr(l->net), &tnlhdr, TUNNEL_PROTOCOL,
                      mtyp, INT_H_SIZE, l->addr);
-       pktcnt = skb_queue_len(&l->transmq) + skb_queue_len(&l->backlogq);
+       if (mtyp == SYNCH_MSG)
+               pktcnt = l->snd_nxt - buf_seqno(skb_peek(&l->transmq));
+       else
+               pktcnt = skb_queue_len(&l->transmq);
+       pktcnt += skb_queue_len(&l->backlogq);
        msg_set_msgcnt(&tnlhdr, pktcnt);
        msg_set_bearer_id(&tnlhdr, l->peer_bearer_id);
 tnl:
@@ -1539,6 +1694,14 @@ void tipc_link_tnl_prepare(struct tipc_link *l, struct tipc_link *tnl,
                tnl->drop_point = l->rcv_nxt;
                tnl->failover_reasm_skb = l->reasm_buf;
                l->reasm_buf = NULL;
+
+               /* Failover the link's deferdq */
+               if (unlikely(!skb_queue_empty(fdefq))) {
+                       pr_warn("Link failover deferdq not empty: %d!\n",
+                               skb_queue_len(fdefq));
+                       __skb_queue_purge(fdefq);
+               }
+               skb_queue_splice_init(&l->deferdq, fdefq);
        }
 }
 
@@ -1592,6 +1755,7 @@ static int tipc_link_proto_rcv(struct tipc_link *l, struct sk_buff *skb,
                               struct sk_buff_head *xmitq)
 {
        struct tipc_msg *hdr = buf_msg(skb);
+       struct tipc_gap_ack_blks *ga = NULL;
        u16 rcvgap = 0;
        u16 ack = msg_ack(hdr);
        u16 gap = msg_seq_gap(hdr);
@@ -1602,6 +1766,7 @@ static int tipc_link_proto_rcv(struct tipc_link *l, struct sk_buff *skb,
        u16 dlen = msg_data_sz(hdr);
        int mtyp = msg_type(hdr);
        bool reply = msg_probe(hdr);
+       u16 glen = 0;
        void *data;
        char *if_name;
        int rc = 0;
@@ -1699,7 +1864,17 @@ static int tipc_link_proto_rcv(struct tipc_link *l, struct sk_buff *skb,
                                rc = TIPC_LINK_UP_EVT;
                        break;
                }
-               tipc_mon_rcv(l->net, data, dlen, l->addr,
+
+               /* Receive Gap ACK blocks from peer if any */
+               if (l->peer_caps & TIPC_GAP_ACK_BLOCK) {
+                       ga = (struct tipc_gap_ack_blks *)data;
+                       glen = ntohs(ga->len);
+                       /* sanity check: if failed, ignore Gap ACK blocks */
+                       if (glen != tipc_gap_ack_blks_sz(ga->gack_cnt))
+                               ga = NULL;
+               }
+
+               tipc_mon_rcv(l->net, data + glen, dlen - glen, l->addr,
                             &l->mon_state, l->bearer_id);
 
                /* Send NACK if peer has sent pkts we haven't received yet */
@@ -1708,13 +1883,12 @@ static int tipc_link_proto_rcv(struct tipc_link *l, struct sk_buff *skb,
                if (rcvgap || reply)
                        tipc_link_build_proto_msg(l, STATE_MSG, 0, reply,
                                                  rcvgap, 0, 0, xmitq);
-               tipc_link_release_pkts(l, ack);
+
+               tipc_link_advance_transmq(l, ack, gap, ga, xmitq);
 
                /* If NACK, retransmit will now start at right position */
-               if (gap) {
-                       rc = tipc_link_retrans(l, l, ack + 1, ack + gap, xmitq);
+               if (gap)
                        l->stats.recv_nacks++;
-               }
 
                tipc_link_advance_backlog(l, xmitq);
                if (unlikely(!skb_queue_empty(&l->wakeupq)))
@@ -2199,6 +2373,8 @@ int tipc_nl_add_bc_link(struct net *net, struct tipc_nl_msg *msg)
        struct nlattr *attrs;
        struct nlattr *prop;
        struct tipc_net *tn = net_generic(net, tipc_net_id);
+       u32 bc_mode = tipc_bcast_get_broadcast_mode(net);
+       u32 bc_ratio = tipc_bcast_get_broadcast_ratio(net);
        struct tipc_link *bcl = tn->bcl;
 
        if (!bcl)
@@ -2235,6 +2411,12 @@ int tipc_nl_add_bc_link(struct net *net, struct tipc_nl_msg *msg)
                goto attr_msg_full;
        if (nla_put_u32(msg->skb, TIPC_NLA_PROP_WIN, bcl->window))
                goto prop_msg_full;
+       if (nla_put_u32(msg->skb, TIPC_NLA_PROP_BROADCAST, bc_mode))
+               goto prop_msg_full;
+       if (bc_mode & BCLINK_MODE_SEL)
+               if (nla_put_u32(msg->skb, TIPC_NLA_PROP_BROADCAST_RATIO,
+                               bc_ratio))
+                       goto prop_msg_full;
        nla_nest_end(msg->skb, prop);
 
        err = __tipc_nl_add_bc_link_stat(msg->skb, &bcl->stats);