]> asedeno.scripts.mit.edu Git - linux.git/blobdiff - net/tipc/link.c
Merge git://git.kernel.org/pub/scm/linux/kernel/git/netdev/net
[linux.git] / net / tipc / link.c
index 2050fd386642a13c117faced130dcc01a70f97af..6cc75ffd9e2c2741c7734c173609bc0f30ce679e 100644 (file)
@@ -106,9 +106,6 @@ struct tipc_stats {
  * @transmitq: queue for sent, non-acked messages
  * @backlogq: queue for messages waiting to be sent
  * @snt_nxt: next sequence number to use for outbound messages
- * @prev_from: sequence number of most previous retransmission request
- * @stale_cnt: counter for number of identical retransmit attempts
- * @stale_limit: time when repeated identical retransmits must force link reset
  * @ackers: # of peers that needs to ack each packet before it can be released
  * @acked: # last packet acked by a certain peer. Used for broadcast.
  * @rcv_nxt: next sequence number to expect for inbound messages
@@ -165,10 +162,7 @@ struct tipc_link {
                u16 limit;
        } backlog[5];
        u16 snd_nxt;
-       u16 prev_from;
        u16 window;
-       u16 stale_cnt;
-       unsigned long stale_limit;
 
        /* Reception */
        u16 rcv_nxt;
@@ -182,6 +176,7 @@ struct tipc_link {
 
        /* Fragmentation/reassembly */
        struct sk_buff *reasm_buf;
+       struct sk_buff *reasm_tnlmsg;
 
        /* Broadcast */
        u16 ackers;
@@ -209,7 +204,7 @@ enum {
        BC_NACK_SND_SUPPRESS,
 };
 
-#define TIPC_BC_RETR_LIM msecs_to_jiffies(10)   /* [ms] */
+#define TIPC_BC_RETR_LIM  (jiffies + msecs_to_jiffies(10))
 #define TIPC_UC_RETR_TIME (jiffies + msecs_to_jiffies(1))
 
 /*
@@ -249,9 +244,9 @@ static void tipc_link_build_bc_init_msg(struct tipc_link *l,
                                        struct sk_buff_head *xmitq);
 static bool tipc_link_release_pkts(struct tipc_link *l, u16 to);
 static u16 tipc_build_gap_ack_blks(struct tipc_link *l, void *data);
-static void tipc_link_advance_transmq(struct tipc_link *l, u16 acked, u16 gap,
-                                     struct tipc_gap_ack_blks *ga,
-                                     struct sk_buff_head *xmitq);
+static int tipc_link_advance_transmq(struct tipc_link *l, u16 acked, u16 gap,
+                                    struct tipc_gap_ack_blks *ga,
+                                    struct sk_buff_head *xmitq);
 
 /*
  *  Simple non-static link routines (i.e. referenced outside this file)
@@ -734,7 +729,7 @@ static void link_profile_stats(struct tipc_link *l)
        if (msg_user(msg) == MSG_FRAGMENTER) {
                if (msg_type(msg) != FIRST_FRAGMENT)
                        return;
-               length = msg_size(msg_get_wrapped(msg));
+               length = msg_size(msg_inner_hdr(msg));
        }
        l->stats.msg_lengths_total += length;
        l->stats.msg_length_counts++;
@@ -855,18 +850,31 @@ static int link_schedule_user(struct tipc_link *l, struct tipc_msg *hdr)
  */
 static void link_prepare_wakeup(struct tipc_link *l)
 {
+       struct sk_buff_head *wakeupq = &l->wakeupq;
+       struct sk_buff_head *inputq = l->inputq;
        struct sk_buff *skb, *tmp;
-       int imp, i = 0;
+       struct sk_buff_head tmpq;
+       int avail[5] = {0,};
+       int imp = 0;
+
+       __skb_queue_head_init(&tmpq);
 
-       skb_queue_walk_safe(&l->wakeupq, skb, tmp) {
+       for (; imp <= TIPC_SYSTEM_IMPORTANCE; imp++)
+               avail[imp] = l->backlog[imp].limit - l->backlog[imp].len;
+
+       skb_queue_walk_safe(wakeupq, skb, tmp) {
                imp = TIPC_SKB_CB(skb)->chain_imp;
-               if (l->backlog[imp].len < l->backlog[imp].limit) {
-                       skb_unlink(skb, &l->wakeupq);
-                       skb_queue_tail(l->inputq, skb);
-               } else if (i++ > 10) {
-                       break;
-               }
+               if (avail[imp] <= 0)
+                       continue;
+               avail[imp]--;
+               __skb_unlink(skb, wakeupq);
+               __skb_queue_tail(&tmpq, skb);
        }
+
+       spin_lock_bh(&inputq->lock);
+       skb_queue_splice_tail(&tmpq, inputq);
+       spin_unlock_bh(&inputq->lock);
+
 }
 
 void tipc_link_reset(struct tipc_link *l)
@@ -899,8 +907,10 @@ void tipc_link_reset(struct tipc_link *l)
        l->backlog[TIPC_CRITICAL_IMPORTANCE].len = 0;
        l->backlog[TIPC_SYSTEM_IMPORTANCE].len = 0;
        kfree_skb(l->reasm_buf);
+       kfree_skb(l->reasm_tnlmsg);
        kfree_skb(l->failover_reasm_skb);
        l->reasm_buf = NULL;
+       l->reasm_tnlmsg = NULL;
        l->failover_reasm_skb = NULL;
        l->rcv_unacked = 0;
        l->snd_nxt = 1;
@@ -910,7 +920,6 @@ void tipc_link_reset(struct tipc_link *l)
        l->acked = 0;
        l->silent_intv_cnt = 0;
        l->rst_cnt = 0;
-       l->stale_cnt = 0;
        l->bc_peer_is_up = false;
        memset(&l->mon_state, 0, sizeof(l->mon_state));
        tipc_link_reset_stats(l);
@@ -943,7 +952,10 @@ int tipc_link_xmit(struct tipc_link *l, struct sk_buff_head *list,
        int rc = 0;
 
        if (unlikely(msg_size(hdr) > mtu)) {
-               skb_queue_purge(list);
+               pr_warn("Too large msg, purging xmit list %d %d %d %d %d!\n",
+                       skb_queue_len(list), msg_user(hdr),
+                       msg_type(hdr), msg_size(hdr), mtu);
+               __skb_queue_purge(list);
                return -EMSGSIZE;
        }
 
@@ -972,15 +984,14 @@ int tipc_link_xmit(struct tipc_link *l, struct sk_buff_head *list,
                if (likely(skb_queue_len(transmq) < maxwin)) {
                        _skb = skb_clone(skb, GFP_ATOMIC);
                        if (!_skb) {
-                               skb_queue_purge(list);
+                               __skb_queue_purge(list);
                                return -ENOBUFS;
                        }
                        __skb_dequeue(list);
                        __skb_queue_tail(transmq, skb);
                        /* next retransmit attempt */
                        if (link_is_bc_sndlink(l))
-                               TIPC_SKB_CB(skb)->nxt_retr =
-                                       jiffies + TIPC_BC_RETR_LIM;
+                               TIPC_SKB_CB(skb)->nxt_retr = TIPC_BC_RETR_LIM;
                        __skb_queue_tail(xmitq, _skb);
                        TIPC_SKB_CB(skb)->ackers = l->ackers;
                        l->rcv_unacked = 0;
@@ -1030,7 +1041,7 @@ static void tipc_link_advance_backlog(struct tipc_link *l,
                __skb_queue_tail(&l->transmq, skb);
                /* next retransmit attempt */
                if (link_is_bc_sndlink(l))
-                       TIPC_SKB_CB(skb)->nxt_retr = jiffies + TIPC_BC_RETR_LIM;
+                       TIPC_SKB_CB(skb)->nxt_retr = TIPC_BC_RETR_LIM;
 
                __skb_queue_tail(xmitq, _skb);
                TIPC_SKB_CB(skb)->ackers = l->ackers;
@@ -1044,32 +1055,74 @@ static void tipc_link_advance_backlog(struct tipc_link *l,
        l->snd_nxt = seqno;
 }
 
-static void link_retransmit_failure(struct tipc_link *l, struct sk_buff *skb)
+/**
+ * link_retransmit_failure() - Detect repeated retransmit failures
+ * @l: tipc link sender
+ * @r: tipc link receiver (= l in case of unicast)
+ * @rc: returned code
+ *
+ * Return: true if the repeated retransmit failures happens, otherwise
+ * false
+ */
+static bool link_retransmit_failure(struct tipc_link *l, struct tipc_link *r,
+                                   int *rc)
 {
-       struct tipc_msg *hdr = buf_msg(skb);
+       struct sk_buff *skb = skb_peek(&l->transmq);
+       struct tipc_msg *hdr;
+
+       if (!skb)
+               return false;
+
+       if (!TIPC_SKB_CB(skb)->retr_cnt)
+               return false;
+
+       if (!time_after(jiffies, TIPC_SKB_CB(skb)->retr_stamp +
+                       msecs_to_jiffies(r->tolerance)))
+               return false;
+
+       hdr = buf_msg(skb);
+       if (link_is_bc_sndlink(l) && !less(r->acked, msg_seqno(hdr)))
+               return false;
 
        pr_warn("Retransmission failure on link <%s>\n", l->name);
        link_print(l, "State of link ");
        pr_info("Failed msg: usr %u, typ %u, len %u, err %u\n",
                msg_user(hdr), msg_type(hdr), msg_size(hdr), msg_errcode(hdr));
-       pr_info("sqno %u, prev: %x, src: %x\n",
-               msg_seqno(hdr), msg_prevnode(hdr), msg_orignode(hdr));
+       pr_info("sqno %u, prev: %x, dest: %x\n",
+               msg_seqno(hdr), msg_prevnode(hdr), msg_destnode(hdr));
+       pr_info("retr_stamp %d, retr_cnt %d\n",
+               jiffies_to_msecs(TIPC_SKB_CB(skb)->retr_stamp),
+               TIPC_SKB_CB(skb)->retr_cnt);
+
+       trace_tipc_list_dump(&l->transmq, true, "retrans failure!");
+       trace_tipc_link_dump(l, TIPC_DUMP_NONE, "retrans failure!");
+       trace_tipc_link_dump(r, TIPC_DUMP_NONE, "retrans failure!");
+
+       if (link_is_bc_sndlink(l)) {
+               r->state = LINK_RESET;
+               *rc = TIPC_LINK_DOWN_EVT;
+       } else {
+               *rc = tipc_link_fsm_evt(l, LINK_FAILURE_EVT);
+       }
+
+       return true;
 }
 
-/* tipc_link_retrans() - retransmit one or more packets
+/* tipc_link_bc_retrans() - retransmit zero or more packets
  * @l: the link to transmit on
  * @r: the receiving link ordering the retransmit. Same as l if unicast
  * @from: retransmit from (inclusive) this sequence number
  * @to: retransmit to (inclusive) this sequence number
  * xmitq: queue for accumulating the retransmitted packets
  */
-static int tipc_link_retrans(struct tipc_link *l, struct tipc_link *r,
-                            u16 from, u16 to, struct sk_buff_head *xmitq)
+static int tipc_link_bc_retrans(struct tipc_link *l, struct tipc_link *r,
+                               u16 from, u16 to, struct sk_buff_head *xmitq)
 {
        struct sk_buff *_skb, *skb = skb_peek(&l->transmq);
        u16 bc_ack = l->bc_rcvlink->rcv_nxt - 1;
        u16 ack = l->rcv_nxt - 1;
        struct tipc_msg *hdr;
+       int rc = 0;
 
        if (!skb)
                return 0;
@@ -1077,20 +1130,9 @@ static int tipc_link_retrans(struct tipc_link *l, struct tipc_link *r,
                return 0;
 
        trace_tipc_link_retrans(r, from, to, &l->transmq);
-       /* Detect repeated retransmit failures on same packet */
-       if (r->prev_from != from) {
-               r->prev_from = from;
-               r->stale_limit = jiffies + msecs_to_jiffies(r->tolerance);
-               r->stale_cnt = 0;
-       } else if (++r->stale_cnt > 99 && time_after(jiffies, r->stale_limit)) {
-               link_retransmit_failure(l, skb);
-               trace_tipc_list_dump(&l->transmq, true, "retrans failure!");
-               trace_tipc_link_dump(l, TIPC_DUMP_NONE, "retrans failure!");
-               trace_tipc_link_dump(r, TIPC_DUMP_NONE, "retrans failure!");
-               if (link_is_bc_sndlink(l))
-                       return TIPC_LINK_DOWN_EVT;
-               return tipc_link_fsm_evt(l, LINK_FAILURE_EVT);
-       }
+
+       if (link_retransmit_failure(l, r, &rc))
+               return rc;
 
        skb_queue_walk(&l->transmq, skb) {
                hdr = buf_msg(skb);
@@ -1098,12 +1140,11 @@ static int tipc_link_retrans(struct tipc_link *l, struct tipc_link *r,
                        continue;
                if (more(msg_seqno(hdr), to))
                        break;
-               if (link_is_bc_sndlink(l)) {
-                       if (time_before(jiffies, TIPC_SKB_CB(skb)->nxt_retr))
-                               continue;
-                       TIPC_SKB_CB(skb)->nxt_retr = jiffies + TIPC_BC_RETR_LIM;
-               }
-               _skb = __pskb_copy(skb, MIN_H_SIZE, GFP_ATOMIC);
+
+               if (time_before(jiffies, TIPC_SKB_CB(skb)->nxt_retr))
+                       continue;
+               TIPC_SKB_CB(skb)->nxt_retr = TIPC_BC_RETR_LIM;
+               _skb = __pskb_copy(skb, LL_MAX_HEADER + MIN_H_SIZE, GFP_ATOMIC);
                if (!_skb)
                        return 0;
                hdr = buf_msg(_skb);
@@ -1112,6 +1153,10 @@ static int tipc_link_retrans(struct tipc_link *l, struct tipc_link *r,
                _skb->priority = TC_PRIO_CONTROL;
                __skb_queue_tail(xmitq, _skb);
                l->stats.retransmitted++;
+
+               /* Increase actual retrans counter & mark first time */
+               if (!TIPC_SKB_CB(skb)->retr_cnt++)
+                       TIPC_SKB_CB(skb)->retr_stamp = jiffies;
        }
        return 0;
 }
@@ -1212,6 +1257,7 @@ static int tipc_link_tnl_rcv(struct tipc_link *l, struct sk_buff *skb,
                             struct sk_buff_head *inputq)
 {
        struct sk_buff **reasm_skb = &l->failover_reasm_skb;
+       struct sk_buff **reasm_tnlmsg = &l->reasm_tnlmsg;
        struct sk_buff_head *fdefq = &l->failover_deferdq;
        struct tipc_msg *hdr = buf_msg(skb);
        struct sk_buff *iskb;
@@ -1219,40 +1265,56 @@ static int tipc_link_tnl_rcv(struct tipc_link *l, struct sk_buff *skb,
        int rc = 0;
        u16 seqno;
 
-       /* SYNCH_MSG */
-       if (msg_type(hdr) == SYNCH_MSG)
-               goto drop;
+       if (msg_type(hdr) == SYNCH_MSG) {
+               kfree_skb(skb);
+               return 0;
+       }
 
-       /* FAILOVER_MSG */
-       if (!tipc_msg_extract(skb, &iskb, &ipos)) {
-               pr_warn_ratelimited("Cannot extract FAILOVER_MSG, defq: %d\n",
-                                   skb_queue_len(fdefq));
-               return rc;
+       /* Not a fragment? */
+       if (likely(!msg_nof_fragms(hdr))) {
+               if (unlikely(!tipc_msg_extract(skb, &iskb, &ipos))) {
+                       pr_warn_ratelimited("Unable to extract msg, defq: %d\n",
+                                           skb_queue_len(fdefq));
+                       return 0;
+               }
+               kfree_skb(skb);
+       } else {
+               /* Set fragment type for buf_append */
+               if (msg_fragm_no(hdr) == 1)
+                       msg_set_type(hdr, FIRST_FRAGMENT);
+               else if (msg_fragm_no(hdr) < msg_nof_fragms(hdr))
+                       msg_set_type(hdr, FRAGMENT);
+               else
+                       msg_set_type(hdr, LAST_FRAGMENT);
+
+               if (!tipc_buf_append(reasm_tnlmsg, &skb)) {
+                       /* Successful but non-complete reassembly? */
+                       if (*reasm_tnlmsg || link_is_bc_rcvlink(l))
+                               return 0;
+                       pr_warn_ratelimited("Unable to reassemble tunnel msg\n");
+                       return tipc_link_fsm_evt(l, LINK_FAILURE_EVT);
+               }
+               iskb = skb;
        }
 
        do {
                seqno = buf_seqno(iskb);
-
                if (unlikely(less(seqno, l->drop_point))) {
                        kfree_skb(iskb);
                        continue;
                }
-
                if (unlikely(seqno != l->drop_point)) {
                        __tipc_skb_queue_sorted(fdefq, seqno, iskb);
                        continue;
                }
 
                l->drop_point++;
-
                if (!tipc_data_input(l, iskb, inputq))
                        rc |= tipc_link_input(l, iskb, inputq, reasm_skb);
                if (unlikely(rc))
                        break;
        } while ((iskb = __tipc_skb_dequeue(fdefq, l->drop_point)));
 
-drop:
-       kfree_skb(skb);
        return rc;
 }
 
@@ -1324,17 +1386,21 @@ static u16 tipc_build_gap_ack_blks(struct tipc_link *l, void *data)
  * @gap: # of gap packets
  * @ga: buffer pointer to Gap ACK blocks from peer
  * @xmitq: queue for accumulating the retransmitted packets if any
+ *
+ * In case of a repeated retransmit failures, the call will return shortly
+ * with a returned code (e.g. TIPC_LINK_DOWN_EVT)
  */
-static void tipc_link_advance_transmq(struct tipc_link *l, u16 acked, u16 gap,
-                                     struct tipc_gap_ack_blks *ga,
-                                     struct sk_buff_head *xmitq)
+static int tipc_link_advance_transmq(struct tipc_link *l, u16 acked, u16 gap,
+                                    struct tipc_gap_ack_blks *ga,
+                                    struct sk_buff_head *xmitq)
 {
        struct sk_buff *skb, *_skb, *tmp;
        struct tipc_msg *hdr;
        u16 bc_ack = l->bc_rcvlink->rcv_nxt - 1;
        u16 ack = l->rcv_nxt - 1;
-       u16 seqno;
-       u16 n = 0;
+       bool passed = false;
+       u16 seqno, n = 0;
+       int rc = 0;
 
        skb_queue_walk_safe(&l->transmq, skb, tmp) {
                seqno = buf_seqno(skb);
@@ -1345,12 +1411,17 @@ static void tipc_link_advance_transmq(struct tipc_link *l, u16 acked, u16 gap,
                        __skb_unlink(skb, &l->transmq);
                        kfree_skb(skb);
                } else if (less_eq(seqno, acked + gap)) {
-                       /* retransmit skb */
+                       /* First, check if repeated retrans failures occurs? */
+                       if (!passed && link_retransmit_failure(l, l, &rc))
+                               return rc;
+                       passed = true;
+
+                       /* retransmit skb if unrestricted*/
                        if (time_before(jiffies, TIPC_SKB_CB(skb)->nxt_retr))
                                continue;
                        TIPC_SKB_CB(skb)->nxt_retr = TIPC_UC_RETR_TIME;
-
-                       _skb = __pskb_copy(skb, MIN_H_SIZE, GFP_ATOMIC);
+                       _skb = __pskb_copy(skb, LL_MAX_HEADER + MIN_H_SIZE,
+                                          GFP_ATOMIC);
                        if (!_skb)
                                continue;
                        hdr = buf_msg(_skb);
@@ -1359,6 +1430,10 @@ static void tipc_link_advance_transmq(struct tipc_link *l, u16 acked, u16 gap,
                        _skb->priority = TC_PRIO_CONTROL;
                        __skb_queue_tail(xmitq, _skb);
                        l->stats.retransmitted++;
+
+                       /* Increase actual retrans counter & mark first time */
+                       if (!TIPC_SKB_CB(skb)->retr_cnt++)
+                               TIPC_SKB_CB(skb)->retr_stamp = jiffies;
                } else {
                        /* retry with Gap ACK blocks if any */
                        if (!ga || n >= ga->gack_cnt)
@@ -1369,6 +1444,8 @@ static void tipc_link_advance_transmq(struct tipc_link *l, u16 acked, u16 gap,
                        goto next_gap_ack;
                }
        }
+
+       return 0;
 }
 
 /* tipc_link_build_state_msg: prepare link state message for transmission
@@ -1481,7 +1558,6 @@ int tipc_link_rcv(struct tipc_link *l, struct sk_buff *skb,
 
                /* Forward queues and wake up waiting users */
                if (likely(tipc_link_release_pkts(l, msg_ack(hdr)))) {
-                       l->stale_cnt = 0;
                        tipc_link_advance_backlog(l, xmitq);
                        if (unlikely(!skb_queue_empty(&l->wakeupq)))
                                link_prepare_wakeup(l);
@@ -1604,7 +1680,7 @@ void tipc_link_create_dummy_tnl_msg(struct tipc_link *l,
        struct sk_buff *skb;
        u32 dnode = l->addr;
 
-       skb_queue_head_init(&tnlq);
+       __skb_queue_head_init(&tnlq);
        skb = tipc_msg_create(TUNNEL_PROTOCOL, FAILOVER_MSG,
                              INT_H_SIZE, BASIC_H_SIZE,
                              dnode, onode, 0, 0, 0);
@@ -1635,14 +1711,18 @@ void tipc_link_tnl_prepare(struct tipc_link *l, struct tipc_link *tnl,
        struct sk_buff *skb, *tnlskb;
        struct tipc_msg *hdr, tnlhdr;
        struct sk_buff_head *queue = &l->transmq;
-       struct sk_buff_head tmpxq, tnlq;
+       struct sk_buff_head tmpxq, tnlq, frags;
        u16 pktlen, pktcnt, seqno = l->snd_nxt;
+       bool pktcnt_need_update = false;
+       u16 syncpt;
+       int rc;
 
        if (!tnl)
                return;
 
-       skb_queue_head_init(&tnlq);
-       skb_queue_head_init(&tmpxq);
+       __skb_queue_head_init(&tnlq);
+       __skb_queue_head_init(&tmpxq);
+       __skb_queue_head_init(&frags);
 
        /* At least one packet required for safe algorithm => add dummy */
        skb = tipc_msg_create(TIPC_LOW_IMPORTANCE, TIPC_DIRECT_MSG,
@@ -1652,10 +1732,35 @@ void tipc_link_tnl_prepare(struct tipc_link *l, struct tipc_link *tnl,
                pr_warn("%sunable to create tunnel packet\n", link_co_err);
                return;
        }
-       skb_queue_tail(&tnlq, skb);
+       __skb_queue_tail(&tnlq, skb);
        tipc_link_xmit(l, &tnlq, &tmpxq);
        __skb_queue_purge(&tmpxq);
 
+       /* Link Synching:
+        * From now on, send only one single ("dummy") SYNCH message
+        * to peer. The SYNCH message does not contain any data, just
+        * a header conveying the synch point to the peer.
+        */
+       if (mtyp == SYNCH_MSG && (tnl->peer_caps & TIPC_TUNNEL_ENHANCED)) {
+               tnlskb = tipc_msg_create(TUNNEL_PROTOCOL, SYNCH_MSG,
+                                        INT_H_SIZE, 0, l->addr,
+                                        tipc_own_addr(l->net),
+                                        0, 0, 0);
+               if (!tnlskb) {
+                       pr_warn("%sunable to create dummy SYNCH_MSG\n",
+                               link_co_err);
+                       return;
+               }
+
+               hdr = buf_msg(tnlskb);
+               syncpt = l->snd_nxt + skb_queue_len(&l->backlogq) - 1;
+               msg_set_syncpt(hdr, syncpt);
+               msg_set_bearer_id(hdr, l->peer_bearer_id);
+               __skb_queue_tail(&tnlq, tnlskb);
+               tipc_link_xmit(tnl, &tnlq, xmitq);
+               return;
+       }
+
        /* Initialize reusable tunnel packet header */
        tipc_msg_init(tipc_own_addr(l->net), &tnlhdr, TUNNEL_PROTOCOL,
                      mtyp, INT_H_SIZE, l->addr);
@@ -1673,6 +1778,39 @@ void tipc_link_tnl_prepare(struct tipc_link *l, struct tipc_link *tnl,
                if (queue == &l->backlogq)
                        msg_set_seqno(hdr, seqno++);
                pktlen = msg_size(hdr);
+
+               /* Tunnel link MTU is not large enough? This could be
+                * due to:
+                * 1) Link MTU has just changed or set differently;
+                * 2) Or FAILOVER on the top of a SYNCH message
+                *
+                * The 2nd case should not happen if peer supports
+                * TIPC_TUNNEL_ENHANCED
+                */
+               if (pktlen > tnl->mtu - INT_H_SIZE) {
+                       if (mtyp == FAILOVER_MSG &&
+                           (tnl->peer_caps & TIPC_TUNNEL_ENHANCED)) {
+                               rc = tipc_msg_fragment(skb, &tnlhdr, tnl->mtu,
+                                                      &frags);
+                               if (rc) {
+                                       pr_warn("%sunable to frag msg: rc %d\n",
+                                               link_co_err, rc);
+                                       return;
+                               }
+                               pktcnt += skb_queue_len(&frags) - 1;
+                               pktcnt_need_update = true;
+                               skb_queue_splice_tail_init(&frags, &tnlq);
+                               continue;
+                       }
+                       /* Unluckily, peer doesn't have TIPC_TUNNEL_ENHANCED
+                        * => Just warn it and return!
+                        */
+                       pr_warn_ratelimited("%stoo large msg <%d, %d>: %d!\n",
+                                           link_co_err, msg_user(hdr),
+                                           msg_type(hdr), msg_size(hdr));
+                       return;
+               }
+
                msg_set_size(&tnlhdr, pktlen + INT_H_SIZE);
                tnlskb = tipc_buf_acquire(pktlen + INT_H_SIZE, GFP_ATOMIC);
                if (!tnlskb) {
@@ -1688,6 +1826,12 @@ void tipc_link_tnl_prepare(struct tipc_link *l, struct tipc_link *tnl,
                goto tnl;
        }
 
+       if (pktcnt_need_update)
+               skb_queue_walk(&tnlq, skb) {
+                       hdr = buf_msg(skb);
+                       msg_set_msgcnt(hdr, pktcnt);
+               }
+
        tipc_link_xmit(tnl, &tnlq, xmitq);
 
        if (mtyp == FAILOVER_MSG) {
@@ -1918,7 +2062,7 @@ static int tipc_link_proto_rcv(struct tipc_link *l, struct sk_buff *skb,
                        tipc_link_build_proto_msg(l, STATE_MSG, 0, reply,
                                                  rcvgap, 0, 0, xmitq);
 
-               tipc_link_advance_transmq(l, ack, gap, ga, xmitq);
+               rc |= tipc_link_advance_transmq(l, ack, gap, ga, xmitq);
 
                /* If NACK, retransmit will now start at right position */
                if (gap)
@@ -2035,7 +2179,7 @@ int tipc_link_bc_sync_rcv(struct tipc_link *l, struct tipc_msg *hdr,
        if (more(peers_snd_nxt, l->rcv_nxt + l->window))
                return rc;
 
-       rc = tipc_link_retrans(snd_l, l, from, to, xmitq);
+       rc = tipc_link_bc_retrans(snd_l, l, from, to, xmitq);
 
        l->snd_nxt = peers_snd_nxt;
        if (link_bc_rcv_gap(l))
@@ -2131,7 +2275,7 @@ int tipc_link_bc_nack_rcv(struct tipc_link *l, struct sk_buff *skb,
 
        if (dnode == tipc_own_addr(l->net)) {
                tipc_link_bc_ack_rcv(l, acked, xmitq);
-               rc = tipc_link_retrans(l->bc_sndlink, l, from, to, xmitq);
+               rc = tipc_link_bc_retrans(l->bc_sndlink, l, from, to, xmitq);
                l->stats.recv_nacks++;
                return rc;
        }
@@ -2549,8 +2693,8 @@ int tipc_link_dump(struct tipc_link *l, u16 dqueues, char *buf)
        i += scnprintf(buf + i, sz - i, " %x", l->peer_caps);
        i += scnprintf(buf + i, sz - i, " %u", l->silent_intv_cnt);
        i += scnprintf(buf + i, sz - i, " %u", l->rst_cnt);
-       i += scnprintf(buf + i, sz - i, " %u", l->prev_from);
-       i += scnprintf(buf + i, sz - i, " %u", l->stale_cnt);
+       i += scnprintf(buf + i, sz - i, " %u", 0);
+       i += scnprintf(buf + i, sz - i, " %u", 0);
        i += scnprintf(buf + i, sz - i, " %u", l->acked);
 
        list = &l->transmq;