]> asedeno.scripts.mit.edu Git - linux.git/blobdiff - net/tipc/link.c
Merge git://git.kernel.org/pub/scm/linux/kernel/git/netdev/net
[linux.git] / net / tipc / link.c
index 66d3a07bc5711ff404332e2224d139ad71daaaee..6cc75ffd9e2c2741c7734c173609bc0f30ce679e 100644 (file)
@@ -106,8 +106,6 @@ struct tipc_stats {
  * @transmitq: queue for sent, non-acked messages
  * @backlogq: queue for messages waiting to be sent
  * @snt_nxt: next sequence number to use for outbound messages
- * @prev_from: sequence number of most previous retransmission request
- * @stale_limit: time when repeated identical retransmits must force link reset
  * @ackers: # of peers that needs to ack each packet before it can be released
  * @acked: # last packet acked by a certain peer. Used for broadcast.
  * @rcv_nxt: next sequence number to expect for inbound messages
@@ -164,9 +162,7 @@ struct tipc_link {
                u16 limit;
        } backlog[5];
        u16 snd_nxt;
-       u16 prev_from;
        u16 window;
-       unsigned long stale_limit;
 
        /* Reception */
        u16 rcv_nxt;
@@ -180,6 +176,7 @@ struct tipc_link {
 
        /* Fragmentation/reassembly */
        struct sk_buff *reasm_buf;
+       struct sk_buff *reasm_tnlmsg;
 
        /* Broadcast */
        u16 ackers;
@@ -853,18 +850,31 @@ static int link_schedule_user(struct tipc_link *l, struct tipc_msg *hdr)
  */
 static void link_prepare_wakeup(struct tipc_link *l)
 {
+       struct sk_buff_head *wakeupq = &l->wakeupq;
+       struct sk_buff_head *inputq = l->inputq;
        struct sk_buff *skb, *tmp;
-       int imp, i = 0;
+       struct sk_buff_head tmpq;
+       int avail[5] = {0,};
+       int imp = 0;
+
+       __skb_queue_head_init(&tmpq);
+
+       for (; imp <= TIPC_SYSTEM_IMPORTANCE; imp++)
+               avail[imp] = l->backlog[imp].limit - l->backlog[imp].len;
 
-       skb_queue_walk_safe(&l->wakeupq, skb, tmp) {
+       skb_queue_walk_safe(wakeupq, skb, tmp) {
                imp = TIPC_SKB_CB(skb)->chain_imp;
-               if (l->backlog[imp].len < l->backlog[imp].limit) {
-                       skb_unlink(skb, &l->wakeupq);
-                       skb_queue_tail(l->inputq, skb);
-               } else if (i++ > 10) {
-                       break;
-               }
+               if (avail[imp] <= 0)
+                       continue;
+               avail[imp]--;
+               __skb_unlink(skb, wakeupq);
+               __skb_queue_tail(&tmpq, skb);
        }
+
+       spin_lock_bh(&inputq->lock);
+       skb_queue_splice_tail(&tmpq, inputq);
+       spin_unlock_bh(&inputq->lock);
+
 }
 
 void tipc_link_reset(struct tipc_link *l)
@@ -897,8 +907,10 @@ void tipc_link_reset(struct tipc_link *l)
        l->backlog[TIPC_CRITICAL_IMPORTANCE].len = 0;
        l->backlog[TIPC_SYSTEM_IMPORTANCE].len = 0;
        kfree_skb(l->reasm_buf);
+       kfree_skb(l->reasm_tnlmsg);
        kfree_skb(l->failover_reasm_skb);
        l->reasm_buf = NULL;
+       l->reasm_tnlmsg = NULL;
        l->failover_reasm_skb = NULL;
        l->rcv_unacked = 0;
        l->snd_nxt = 1;
@@ -940,7 +952,10 @@ int tipc_link_xmit(struct tipc_link *l, struct sk_buff_head *list,
        int rc = 0;
 
        if (unlikely(msg_size(hdr) > mtu)) {
-               skb_queue_purge(list);
+               pr_warn("Too large msg, purging xmit list %d %d %d %d %d!\n",
+                       skb_queue_len(list), msg_user(hdr),
+                       msg_type(hdr), msg_size(hdr), mtu);
+               __skb_queue_purge(list);
                return -EMSGSIZE;
        }
 
@@ -969,7 +984,7 @@ int tipc_link_xmit(struct tipc_link *l, struct sk_buff_head *list,
                if (likely(skb_queue_len(transmq) < maxwin)) {
                        _skb = skb_clone(skb, GFP_ATOMIC);
                        if (!_skb) {
-                               skb_queue_purge(list);
+                               __skb_queue_purge(list);
                                return -ENOBUFS;
                        }
                        __skb_dequeue(list);
@@ -1044,47 +1059,53 @@ static void tipc_link_advance_backlog(struct tipc_link *l,
  * link_retransmit_failure() - Detect repeated retransmit failures
  * @l: tipc link sender
  * @r: tipc link receiver (= l in case of unicast)
- * @from: seqno of the 1st packet in retransmit request
  * @rc: returned code
  *
  * Return: true if the repeated retransmit failures happens, otherwise
  * false
  */
 static bool link_retransmit_failure(struct tipc_link *l, struct tipc_link *r,
-                                   u16 from, int *rc)
+                                   int *rc)
 {
        struct sk_buff *skb = skb_peek(&l->transmq);
        struct tipc_msg *hdr;
 
        if (!skb)
                return false;
-       hdr = buf_msg(skb);
 
-       /* Detect repeated retransmit failures on same packet */
-       if (r->prev_from != from) {
-               r->prev_from = from;
-               r->stale_limit = jiffies + msecs_to_jiffies(r->tolerance);
-       } else if (time_after(jiffies, r->stale_limit)) {
-               pr_warn("Retransmission failure on link <%s>\n", l->name);
-               link_print(l, "State of link ");
-               pr_info("Failed msg: usr %u, typ %u, len %u, err %u\n",
-                       msg_user(hdr), msg_type(hdr), msg_size(hdr),
-                       msg_errcode(hdr));
-               pr_info("sqno %u, prev: %x, src: %x\n",
-                       msg_seqno(hdr), msg_prevnode(hdr), msg_orignode(hdr));
-
-               trace_tipc_list_dump(&l->transmq, true, "retrans failure!");
-               trace_tipc_link_dump(l, TIPC_DUMP_NONE, "retrans failure!");
-               trace_tipc_link_dump(r, TIPC_DUMP_NONE, "retrans failure!");
+       if (!TIPC_SKB_CB(skb)->retr_cnt)
+               return false;
 
-               if (link_is_bc_sndlink(l))
-                       *rc = TIPC_LINK_DOWN_EVT;
+       if (!time_after(jiffies, TIPC_SKB_CB(skb)->retr_stamp +
+                       msecs_to_jiffies(r->tolerance)))
+               return false;
+
+       hdr = buf_msg(skb);
+       if (link_is_bc_sndlink(l) && !less(r->acked, msg_seqno(hdr)))
+               return false;
 
+       pr_warn("Retransmission failure on link <%s>\n", l->name);
+       link_print(l, "State of link ");
+       pr_info("Failed msg: usr %u, typ %u, len %u, err %u\n",
+               msg_user(hdr), msg_type(hdr), msg_size(hdr), msg_errcode(hdr));
+       pr_info("sqno %u, prev: %x, dest: %x\n",
+               msg_seqno(hdr), msg_prevnode(hdr), msg_destnode(hdr));
+       pr_info("retr_stamp %d, retr_cnt %d\n",
+               jiffies_to_msecs(TIPC_SKB_CB(skb)->retr_stamp),
+               TIPC_SKB_CB(skb)->retr_cnt);
+
+       trace_tipc_list_dump(&l->transmq, true, "retrans failure!");
+       trace_tipc_link_dump(l, TIPC_DUMP_NONE, "retrans failure!");
+       trace_tipc_link_dump(r, TIPC_DUMP_NONE, "retrans failure!");
+
+       if (link_is_bc_sndlink(l)) {
+               r->state = LINK_RESET;
+               *rc = TIPC_LINK_DOWN_EVT;
+       } else {
                *rc = tipc_link_fsm_evt(l, LINK_FAILURE_EVT);
-               return true;
        }
 
-       return false;
+       return true;
 }
 
 /* tipc_link_bc_retrans() - retransmit zero or more packets
@@ -1110,7 +1131,7 @@ static int tipc_link_bc_retrans(struct tipc_link *l, struct tipc_link *r,
 
        trace_tipc_link_retrans(r, from, to, &l->transmq);
 
-       if (link_retransmit_failure(l, r, from, &rc))
+       if (link_retransmit_failure(l, r, &rc))
                return rc;
 
        skb_queue_walk(&l->transmq, skb) {
@@ -1119,11 +1140,10 @@ static int tipc_link_bc_retrans(struct tipc_link *l, struct tipc_link *r,
                        continue;
                if (more(msg_seqno(hdr), to))
                        break;
-               if (link_is_bc_sndlink(l)) {
-                       if (time_before(jiffies, TIPC_SKB_CB(skb)->nxt_retr))
-                               continue;
-                       TIPC_SKB_CB(skb)->nxt_retr = TIPC_BC_RETR_LIM;
-               }
+
+               if (time_before(jiffies, TIPC_SKB_CB(skb)->nxt_retr))
+                       continue;
+               TIPC_SKB_CB(skb)->nxt_retr = TIPC_BC_RETR_LIM;
                _skb = __pskb_copy(skb, LL_MAX_HEADER + MIN_H_SIZE, GFP_ATOMIC);
                if (!_skb)
                        return 0;
@@ -1133,6 +1153,10 @@ static int tipc_link_bc_retrans(struct tipc_link *l, struct tipc_link *r,
                _skb->priority = TC_PRIO_CONTROL;
                __skb_queue_tail(xmitq, _skb);
                l->stats.retransmitted++;
+
+               /* Increase actual retrans counter & mark first time */
+               if (!TIPC_SKB_CB(skb)->retr_cnt++)
+                       TIPC_SKB_CB(skb)->retr_stamp = jiffies;
        }
        return 0;
 }
@@ -1233,6 +1257,7 @@ static int tipc_link_tnl_rcv(struct tipc_link *l, struct sk_buff *skb,
                             struct sk_buff_head *inputq)
 {
        struct sk_buff **reasm_skb = &l->failover_reasm_skb;
+       struct sk_buff **reasm_tnlmsg = &l->reasm_tnlmsg;
        struct sk_buff_head *fdefq = &l->failover_deferdq;
        struct tipc_msg *hdr = buf_msg(skb);
        struct sk_buff *iskb;
@@ -1240,40 +1265,56 @@ static int tipc_link_tnl_rcv(struct tipc_link *l, struct sk_buff *skb,
        int rc = 0;
        u16 seqno;
 
-       /* SYNCH_MSG */
-       if (msg_type(hdr) == SYNCH_MSG)
-               goto drop;
+       if (msg_type(hdr) == SYNCH_MSG) {
+               kfree_skb(skb);
+               return 0;
+       }
 
-       /* FAILOVER_MSG */
-       if (!tipc_msg_extract(skb, &iskb, &ipos)) {
-               pr_warn_ratelimited("Cannot extract FAILOVER_MSG, defq: %d\n",
-                                   skb_queue_len(fdefq));
-               return rc;
+       /* Not a fragment? */
+       if (likely(!msg_nof_fragms(hdr))) {
+               if (unlikely(!tipc_msg_extract(skb, &iskb, &ipos))) {
+                       pr_warn_ratelimited("Unable to extract msg, defq: %d\n",
+                                           skb_queue_len(fdefq));
+                       return 0;
+               }
+               kfree_skb(skb);
+       } else {
+               /* Set fragment type for buf_append */
+               if (msg_fragm_no(hdr) == 1)
+                       msg_set_type(hdr, FIRST_FRAGMENT);
+               else if (msg_fragm_no(hdr) < msg_nof_fragms(hdr))
+                       msg_set_type(hdr, FRAGMENT);
+               else
+                       msg_set_type(hdr, LAST_FRAGMENT);
+
+               if (!tipc_buf_append(reasm_tnlmsg, &skb)) {
+                       /* Successful but non-complete reassembly? */
+                       if (*reasm_tnlmsg || link_is_bc_rcvlink(l))
+                               return 0;
+                       pr_warn_ratelimited("Unable to reassemble tunnel msg\n");
+                       return tipc_link_fsm_evt(l, LINK_FAILURE_EVT);
+               }
+               iskb = skb;
        }
 
        do {
                seqno = buf_seqno(iskb);
-
                if (unlikely(less(seqno, l->drop_point))) {
                        kfree_skb(iskb);
                        continue;
                }
-
                if (unlikely(seqno != l->drop_point)) {
                        __tipc_skb_queue_sorted(fdefq, seqno, iskb);
                        continue;
                }
 
                l->drop_point++;
-
                if (!tipc_data_input(l, iskb, inputq))
                        rc |= tipc_link_input(l, iskb, inputq, reasm_skb);
                if (unlikely(rc))
                        break;
        } while ((iskb = __tipc_skb_dequeue(fdefq, l->drop_point)));
 
-drop:
-       kfree_skb(skb);
        return rc;
 }
 
@@ -1357,12 +1398,10 @@ static int tipc_link_advance_transmq(struct tipc_link *l, u16 acked, u16 gap,
        struct tipc_msg *hdr;
        u16 bc_ack = l->bc_rcvlink->rcv_nxt - 1;
        u16 ack = l->rcv_nxt - 1;
+       bool passed = false;
        u16 seqno, n = 0;
        int rc = 0;
 
-       if (gap && link_retransmit_failure(l, l, acked + 1, &rc))
-               return rc;
-
        skb_queue_walk_safe(&l->transmq, skb, tmp) {
                seqno = buf_seqno(skb);
 
@@ -1372,12 +1411,17 @@ static int tipc_link_advance_transmq(struct tipc_link *l, u16 acked, u16 gap,
                        __skb_unlink(skb, &l->transmq);
                        kfree_skb(skb);
                } else if (less_eq(seqno, acked + gap)) {
-                       /* retransmit skb */
+                       /* First, check if repeated retrans failures occurs? */
+                       if (!passed && link_retransmit_failure(l, l, &rc))
+                               return rc;
+                       passed = true;
+
+                       /* retransmit skb if unrestricted*/
                        if (time_before(jiffies, TIPC_SKB_CB(skb)->nxt_retr))
                                continue;
                        TIPC_SKB_CB(skb)->nxt_retr = TIPC_UC_RETR_TIME;
-
-                       _skb = __pskb_copy(skb, MIN_H_SIZE, GFP_ATOMIC);
+                       _skb = __pskb_copy(skb, LL_MAX_HEADER + MIN_H_SIZE,
+                                          GFP_ATOMIC);
                        if (!_skb)
                                continue;
                        hdr = buf_msg(_skb);
@@ -1386,6 +1430,10 @@ static int tipc_link_advance_transmq(struct tipc_link *l, u16 acked, u16 gap,
                        _skb->priority = TC_PRIO_CONTROL;
                        __skb_queue_tail(xmitq, _skb);
                        l->stats.retransmitted++;
+
+                       /* Increase actual retrans counter & mark first time */
+                       if (!TIPC_SKB_CB(skb)->retr_cnt++)
+                               TIPC_SKB_CB(skb)->retr_stamp = jiffies;
                } else {
                        /* retry with Gap ACK blocks if any */
                        if (!ga || n >= ga->gack_cnt)
@@ -1632,7 +1680,7 @@ void tipc_link_create_dummy_tnl_msg(struct tipc_link *l,
        struct sk_buff *skb;
        u32 dnode = l->addr;
 
-       skb_queue_head_init(&tnlq);
+       __skb_queue_head_init(&tnlq);
        skb = tipc_msg_create(TUNNEL_PROTOCOL, FAILOVER_MSG,
                              INT_H_SIZE, BASIC_H_SIZE,
                              dnode, onode, 0, 0, 0);
@@ -1663,14 +1711,18 @@ void tipc_link_tnl_prepare(struct tipc_link *l, struct tipc_link *tnl,
        struct sk_buff *skb, *tnlskb;
        struct tipc_msg *hdr, tnlhdr;
        struct sk_buff_head *queue = &l->transmq;
-       struct sk_buff_head tmpxq, tnlq;
+       struct sk_buff_head tmpxq, tnlq, frags;
        u16 pktlen, pktcnt, seqno = l->snd_nxt;
+       bool pktcnt_need_update = false;
+       u16 syncpt;
+       int rc;
 
        if (!tnl)
                return;
 
-       skb_queue_head_init(&tnlq);
-       skb_queue_head_init(&tmpxq);
+       __skb_queue_head_init(&tnlq);
+       __skb_queue_head_init(&tmpxq);
+       __skb_queue_head_init(&frags);
 
        /* At least one packet required for safe algorithm => add dummy */
        skb = tipc_msg_create(TIPC_LOW_IMPORTANCE, TIPC_DIRECT_MSG,
@@ -1680,10 +1732,35 @@ void tipc_link_tnl_prepare(struct tipc_link *l, struct tipc_link *tnl,
                pr_warn("%sunable to create tunnel packet\n", link_co_err);
                return;
        }
-       skb_queue_tail(&tnlq, skb);
+       __skb_queue_tail(&tnlq, skb);
        tipc_link_xmit(l, &tnlq, &tmpxq);
        __skb_queue_purge(&tmpxq);
 
+       /* Link Synching:
+        * From now on, send only one single ("dummy") SYNCH message
+        * to peer. The SYNCH message does not contain any data, just
+        * a header conveying the synch point to the peer.
+        */
+       if (mtyp == SYNCH_MSG && (tnl->peer_caps & TIPC_TUNNEL_ENHANCED)) {
+               tnlskb = tipc_msg_create(TUNNEL_PROTOCOL, SYNCH_MSG,
+                                        INT_H_SIZE, 0, l->addr,
+                                        tipc_own_addr(l->net),
+                                        0, 0, 0);
+               if (!tnlskb) {
+                       pr_warn("%sunable to create dummy SYNCH_MSG\n",
+                               link_co_err);
+                       return;
+               }
+
+               hdr = buf_msg(tnlskb);
+               syncpt = l->snd_nxt + skb_queue_len(&l->backlogq) - 1;
+               msg_set_syncpt(hdr, syncpt);
+               msg_set_bearer_id(hdr, l->peer_bearer_id);
+               __skb_queue_tail(&tnlq, tnlskb);
+               tipc_link_xmit(tnl, &tnlq, xmitq);
+               return;
+       }
+
        /* Initialize reusable tunnel packet header */
        tipc_msg_init(tipc_own_addr(l->net), &tnlhdr, TUNNEL_PROTOCOL,
                      mtyp, INT_H_SIZE, l->addr);
@@ -1701,6 +1778,39 @@ void tipc_link_tnl_prepare(struct tipc_link *l, struct tipc_link *tnl,
                if (queue == &l->backlogq)
                        msg_set_seqno(hdr, seqno++);
                pktlen = msg_size(hdr);
+
+               /* Tunnel link MTU is not large enough? This could be
+                * due to:
+                * 1) Link MTU has just changed or set differently;
+                * 2) Or FAILOVER on the top of a SYNCH message
+                *
+                * The 2nd case should not happen if peer supports
+                * TIPC_TUNNEL_ENHANCED
+                */
+               if (pktlen > tnl->mtu - INT_H_SIZE) {
+                       if (mtyp == FAILOVER_MSG &&
+                           (tnl->peer_caps & TIPC_TUNNEL_ENHANCED)) {
+                               rc = tipc_msg_fragment(skb, &tnlhdr, tnl->mtu,
+                                                      &frags);
+                               if (rc) {
+                                       pr_warn("%sunable to frag msg: rc %d\n",
+                                               link_co_err, rc);
+                                       return;
+                               }
+                               pktcnt += skb_queue_len(&frags) - 1;
+                               pktcnt_need_update = true;
+                               skb_queue_splice_tail_init(&frags, &tnlq);
+                               continue;
+                       }
+                       /* Unluckily, peer doesn't have TIPC_TUNNEL_ENHANCED
+                        * => Just warn it and return!
+                        */
+                       pr_warn_ratelimited("%stoo large msg <%d, %d>: %d!\n",
+                                           link_co_err, msg_user(hdr),
+                                           msg_type(hdr), msg_size(hdr));
+                       return;
+               }
+
                msg_set_size(&tnlhdr, pktlen + INT_H_SIZE);
                tnlskb = tipc_buf_acquire(pktlen + INT_H_SIZE, GFP_ATOMIC);
                if (!tnlskb) {
@@ -1716,6 +1826,12 @@ void tipc_link_tnl_prepare(struct tipc_link *l, struct tipc_link *tnl,
                goto tnl;
        }
 
+       if (pktcnt_need_update)
+               skb_queue_walk(&tnlq, skb) {
+                       hdr = buf_msg(skb);
+                       msg_set_msgcnt(hdr, pktcnt);
+               }
+
        tipc_link_xmit(tnl, &tnlq, xmitq);
 
        if (mtyp == FAILOVER_MSG) {
@@ -2577,7 +2693,7 @@ int tipc_link_dump(struct tipc_link *l, u16 dqueues, char *buf)
        i += scnprintf(buf + i, sz - i, " %x", l->peer_caps);
        i += scnprintf(buf + i, sz - i, " %u", l->silent_intv_cnt);
        i += scnprintf(buf + i, sz - i, " %u", l->rst_cnt);
-       i += scnprintf(buf + i, sz - i, " %u", l->prev_from);
+       i += scnprintf(buf + i, sz - i, " %u", 0);
        i += scnprintf(buf + i, sz - i, " %u", 0);
        i += scnprintf(buf + i, sz - i, " %u", l->acked);