#include "netlink.h"
#include "monitor.h"
#include "trace.h"
+#include "crypto.h"
#include <linux/pkt_sched.h>
struct sk_buff *target_bskb;
} backlog[5];
u16 snd_nxt;
- u16 window;
/* Reception */
u16 rcv_nxt;
/* Congestion handling */
struct sk_buff_head wakeupq;
+ u16 window;
+ u16 min_win;
+ u16 ssthresh;
+ u16 max_win;
+ u16 cong_acks;
+ u16 checkpoint;
/* Fragmentation/reassembly */
struct sk_buff *reasm_buf;
struct sk_buff_head *xmitq);
static void tipc_link_build_bc_init_msg(struct tipc_link *l,
struct sk_buff_head *xmitq);
-static bool tipc_link_release_pkts(struct tipc_link *l, u16 to);
-static u16 tipc_build_gap_ack_blks(struct tipc_link *l, void *data);
+static int tipc_link_release_pkts(struct tipc_link *l, u16 to);
+static u16 tipc_build_gap_ack_blks(struct tipc_link *l, void *data, u16 gap);
static int tipc_link_advance_transmq(struct tipc_link *l, u16 acked, u16 gap,
struct tipc_gap_ack_blks *ga,
struct sk_buff_head *xmitq);
-
+static void tipc_link_update_cwin(struct tipc_link *l, int released,
+ bool retransmitted);
/*
* Simple non-static link routines (i.e. referenced outside this file)
*/
return l->peer_bearer_id << 16 | l->bearer_id;
}
-int tipc_link_window(struct tipc_link *l)
+int tipc_link_min_win(struct tipc_link *l)
{
- return l->window;
+ return l->min_win;
+}
+
+int tipc_link_max_win(struct tipc_link *l)
+{
+ return l->max_win;
}
int tipc_link_prio(struct tipc_link *l)
return l->mtu;
}
+int tipc_link_mss(struct tipc_link *l)
+{
+#ifdef CONFIG_TIPC_CRYPTO
+ return l->mtu - INT_H_SIZE - EMSG_OVERHEAD;
+#else
+ return l->mtu - INT_H_SIZE;
+#endif
+}
+
u16 tipc_link_rcv_nxt(struct tipc_link *l)
{
return l->rcv_nxt;
* @net_plane: network plane (A,B,c..) this link belongs to
* @mtu: mtu to be advertised by link
* @priority: priority to be used by link
- * @window: send window to be used by link
+ * @min_win: minimal send window to be used by link
+ * @max_win: maximal send window to be used by link
* @session: session to be used by link
* @ownnode: identity of own node
* @peer: node id of peer node
*/
bool tipc_link_create(struct net *net, char *if_name, int bearer_id,
int tolerance, char net_plane, u32 mtu, int priority,
- int window, u32 session, u32 self,
+ u32 min_win, u32 max_win, u32 session, u32 self,
u32 peer, u8 *peer_id, u16 peer_caps,
struct tipc_link *bc_sndlink,
struct tipc_link *bc_rcvlink,
l->advertised_mtu = mtu;
l->mtu = mtu;
l->priority = priority;
- tipc_link_set_queue_limits(l, window);
+ tipc_link_set_queue_limits(l, min_win, max_win);
l->ackers = 1;
l->bc_sndlink = bc_sndlink;
l->bc_rcvlink = bc_rcvlink;
* Returns true if link was created, otherwise false
*/
bool tipc_link_bc_create(struct net *net, u32 ownnode, u32 peer,
- int mtu, int window, u16 peer_caps,
+ int mtu, u32 min_win, u32 max_win, u16 peer_caps,
struct sk_buff_head *inputq,
struct sk_buff_head *namedq,
struct tipc_link *bc_sndlink,
{
struct tipc_link *l;
- if (!tipc_link_create(net, "", MAX_BEARERS, 0, 'Z', mtu, 0, window,
- 0, ownnode, peer, NULL, peer_caps, bc_sndlink,
- NULL, inputq, namedq, link))
+ if (!tipc_link_create(net, "", MAX_BEARERS, 0, 'Z', mtu, 0, min_win,
+ max_win, 0, ownnode, peer, NULL, peer_caps,
+ bc_sndlink, NULL, inputq, namedq, link))
return false;
l = *link;
/* Disable replicast if even a single peer doesn't support it */
if (link_is_bc_rcvlink(l) && !(peer_caps & TIPC_BCAST_RCAST))
- tipc_bcast_disable_rcast(net);
+ tipc_bcast_toggle_rcast(net, false);
return true;
}
return (l->silent_intv_cnt + 2 > l->abort_limit);
}
+static int tipc_link_bc_retrans(struct tipc_link *l, struct tipc_link *r,
+ u16 from, u16 to, struct sk_buff_head *xmitq);
/* tipc_link_timeout - perform periodic task as instructed from node timeout
*/
int tipc_link_timeout(struct tipc_link *l, struct sk_buff_head *xmitq)
probe |= l->silent_intv_cnt;
if (probe || mstate->monitoring)
l->silent_intv_cnt++;
+ if (l->snd_nxt == l->checkpoint) {
+ tipc_link_update_cwin(l, 0, 0);
+ probe = true;
+ }
+ l->checkpoint = l->snd_nxt;
break;
case LINK_RESET:
setup = l->rst_cnt++ <= 4;
struct sk_buff_head *xmitq)
{
struct tipc_msg *hdr = buf_msg(skb_peek(list));
- unsigned int maxwin = l->window;
- int imp = msg_importance(hdr);
- unsigned int mtu = l->mtu;
+ struct sk_buff_head *backlogq = &l->backlogq;
+ struct sk_buff_head *transmq = &l->transmq;
+ struct sk_buff *skb, *_skb;
+ u16 bc_ack = l->bc_rcvlink->rcv_nxt - 1;
u16 ack = l->rcv_nxt - 1;
u16 seqno = l->snd_nxt;
- u16 bc_ack = l->bc_rcvlink->rcv_nxt - 1;
- struct sk_buff_head *transmq = &l->transmq;
- struct sk_buff_head *backlogq = &l->backlogq;
- struct sk_buff *skb, *_skb, **tskb;
int pkt_cnt = skb_queue_len(list);
+ int imp = msg_importance(hdr);
+ unsigned int mss = tipc_link_mss(l);
+ unsigned int cwin = l->window;
+ unsigned int mtu = l->mtu;
+ bool new_bundle;
int rc = 0;
if (unlikely(msg_size(hdr) > mtu)) {
}
/* Prepare each packet for sending, and add to relevant queue: */
- while (skb_queue_len(list)) {
- skb = skb_peek(list);
- hdr = buf_msg(skb);
- msg_set_seqno(hdr, seqno);
- msg_set_ack(hdr, ack);
- msg_set_bcast_ack(hdr, bc_ack);
-
- if (likely(skb_queue_len(transmq) < maxwin)) {
+ while ((skb = __skb_dequeue(list))) {
+ if (likely(skb_queue_len(transmq) < cwin)) {
+ hdr = buf_msg(skb);
+ msg_set_seqno(hdr, seqno);
+ msg_set_ack(hdr, ack);
+ msg_set_bcast_ack(hdr, bc_ack);
_skb = skb_clone(skb, GFP_ATOMIC);
if (!_skb) {
+ kfree_skb(skb);
__skb_queue_purge(list);
return -ENOBUFS;
}
- __skb_dequeue(list);
__skb_queue_tail(transmq, skb);
/* next retransmit attempt */
if (link_is_bc_sndlink(l))
seqno++;
continue;
}
- tskb = &l->backlog[imp].target_bskb;
- if (tipc_msg_bundle(*tskb, hdr, mtu)) {
- kfree_skb(__skb_dequeue(list));
- l->stats.sent_bundled++;
- continue;
- }
- if (tipc_msg_make_bundle(tskb, hdr, mtu, l->addr)) {
- kfree_skb(__skb_dequeue(list));
- __skb_queue_tail(backlogq, *tskb);
- l->backlog[imp].len++;
- l->stats.sent_bundled++;
- l->stats.sent_bundles++;
+ if (tipc_msg_try_bundle(l->backlog[imp].target_bskb, &skb,
+ mss, l->addr, &new_bundle)) {
+ if (skb) {
+ /* Keep a ref. to the skb for next try */
+ l->backlog[imp].target_bskb = skb;
+ l->backlog[imp].len++;
+ __skb_queue_tail(backlogq, skb);
+ } else {
+ if (new_bundle) {
+ l->stats.sent_bundles++;
+ l->stats.sent_bundled++;
+ }
+ l->stats.sent_bundled++;
+ }
continue;
}
l->backlog[imp].target_bskb = NULL;
- l->backlog[imp].len += skb_queue_len(list);
+ l->backlog[imp].len += (1 + skb_queue_len(list));
+ __skb_queue_tail(backlogq, skb);
skb_queue_splice_tail_init(list, backlogq);
}
l->snd_nxt = seqno;
return rc;
}
+static void tipc_link_update_cwin(struct tipc_link *l, int released,
+ bool retransmitted)
+{
+ int bklog_len = skb_queue_len(&l->backlogq);
+ struct sk_buff_head *txq = &l->transmq;
+ int txq_len = skb_queue_len(txq);
+ u16 cwin = l->window;
+
+ /* Enter fast recovery */
+ if (unlikely(retransmitted)) {
+ l->ssthresh = max_t(u16, l->window / 2, 300);
+ l->window = l->ssthresh;
+ return;
+ }
+ /* Enter slow start */
+ if (unlikely(!released)) {
+ l->ssthresh = max_t(u16, l->window / 2, 300);
+ l->window = l->min_win;
+ return;
+ }
+ /* Don't increase window if no pressure on the transmit queue */
+ if (txq_len + bklog_len < cwin)
+ return;
+
+ /* Don't increase window if there are holes the transmit queue */
+ if (txq_len && l->snd_nxt - buf_seqno(skb_peek(txq)) != txq_len)
+ return;
+
+ l->cong_acks += released;
+
+ /* Slow start */
+ if (cwin <= l->ssthresh) {
+ l->window = min_t(u16, cwin + released, l->max_win);
+ return;
+ }
+ /* Congestion avoidance */
+ if (l->cong_acks < cwin)
+ return;
+ l->window = min_t(u16, ++cwin, l->max_win);
+ l->cong_acks = 0;
+}
+
static void tipc_link_advance_backlog(struct tipc_link *l,
struct sk_buff_head *xmitq)
{
+ u16 bc_ack = l->bc_rcvlink->rcv_nxt - 1;
+ struct sk_buff_head *txq = &l->transmq;
struct sk_buff *skb, *_skb;
- struct tipc_msg *hdr;
- u16 seqno = l->snd_nxt;
u16 ack = l->rcv_nxt - 1;
- u16 bc_ack = l->bc_rcvlink->rcv_nxt - 1;
+ u16 seqno = l->snd_nxt;
+ struct tipc_msg *hdr;
+ u16 cwin = l->window;
u32 imp;
- while (skb_queue_len(&l->transmq) < l->window) {
+ while (skb_queue_len(txq) < cwin) {
skb = skb_peek(&l->backlogq);
if (!skb)
break;
return false;
if (!time_after(jiffies, TIPC_SKB_CB(skb)->retr_stamp +
- msecs_to_jiffies(r->tolerance)))
+ msecs_to_jiffies(r->tolerance * 10)))
return false;
hdr = buf_msg(skb);
struct sk_buff *_skb, *skb = skb_peek(&l->transmq);
u16 bc_ack = l->bc_rcvlink->rcv_nxt - 1;
u16 ack = l->rcv_nxt - 1;
+ int retransmitted = 0;
struct tipc_msg *hdr;
int rc = 0;
continue;
if (more(msg_seqno(hdr), to))
break;
-
if (time_before(jiffies, TIPC_SKB_CB(skb)->nxt_retr))
continue;
TIPC_SKB_CB(skb)->nxt_retr = TIPC_BC_RETR_LIM;
- _skb = __pskb_copy(skb, LL_MAX_HEADER + MIN_H_SIZE, GFP_ATOMIC);
+ _skb = pskb_copy(skb, GFP_ATOMIC);
if (!_skb)
return 0;
hdr = buf_msg(_skb);
_skb->priority = TC_PRIO_CONTROL;
__skb_queue_tail(xmitq, _skb);
l->stats.retransmitted++;
-
+ retransmitted++;
/* Increase actual retrans counter & mark first time */
if (!TIPC_SKB_CB(skb)->retr_cnt++)
TIPC_SKB_CB(skb)->retr_stamp = jiffies;
}
+ tipc_link_update_cwin(l, 0, retransmitted);
return 0;
}
return rc;
}
-static bool tipc_link_release_pkts(struct tipc_link *l, u16 acked)
+static int tipc_link_release_pkts(struct tipc_link *l, u16 acked)
{
- bool released = false;
+ int released = 0;
struct sk_buff *skb, *tmp;
skb_queue_walk_safe(&l->transmq, skb, tmp) {
break;
__skb_unlink(skb, &l->transmq);
kfree_skb(skb);
- released = true;
+ released++;
}
return released;
}
*
* returns the actual allocated memory size
*/
-static u16 tipc_build_gap_ack_blks(struct tipc_link *l, void *data)
+static u16 tipc_build_gap_ack_blks(struct tipc_link *l, void *data, u16 gap)
{
struct sk_buff *skb = skb_peek(&l->deferdq);
struct tipc_gap_ack_blks *ga = data;
u16 len, expect, seqno = 0;
u8 n = 0;
- if (!skb)
+ if (!skb || !gap)
goto exit;
expect = buf_seqno(skb);
struct sk_buff *skb, *_skb, *tmp;
struct tipc_msg *hdr;
u16 bc_ack = l->bc_rcvlink->rcv_nxt - 1;
+ bool retransmitted = false;
u16 ack = l->rcv_nxt - 1;
bool passed = false;
+ u16 released = 0;
u16 seqno, n = 0;
int rc = 0;
/* release skb */
__skb_unlink(skb, &l->transmq);
kfree_skb(skb);
+ released++;
} else if (less_eq(seqno, acked + gap)) {
/* First, check if repeated retrans failures occurs? */
if (!passed && link_retransmit_failure(l, l, &rc))
if (time_before(jiffies, TIPC_SKB_CB(skb)->nxt_retr))
continue;
TIPC_SKB_CB(skb)->nxt_retr = TIPC_UC_RETR_TIME;
- _skb = __pskb_copy(skb, LL_MAX_HEADER + MIN_H_SIZE,
- GFP_ATOMIC);
+ _skb = pskb_copy(skb, GFP_ATOMIC);
if (!_skb)
continue;
hdr = buf_msg(_skb);
_skb->priority = TC_PRIO_CONTROL;
__skb_queue_tail(xmitq, _skb);
l->stats.retransmitted++;
-
+ retransmitted = true;
/* Increase actual retrans counter & mark first time */
if (!TIPC_SKB_CB(skb)->retr_cnt++)
TIPC_SKB_CB(skb)->retr_stamp = jiffies;
goto next_gap_ack;
}
}
-
+ if (released || retransmitted)
+ tipc_link_update_cwin(l, released, retransmitted);
+ if (released)
+ tipc_link_advance_backlog(l, xmitq);
return 0;
}
l->snd_nxt = l->rcv_nxt;
return TIPC_LINK_SND_STATE;
}
-
/* Unicast ACK */
l->rcv_unacked = 0;
l->stats.sent_acks++;
struct sk_buff_head *xmitq)
{
u32 def_cnt = ++l->stats.deferred_recv;
- u32 defq_len = skb_queue_len(&l->deferdq);
+ struct sk_buff_head *dfq = &l->deferdq;
+ u32 defq_len = skb_queue_len(dfq);
int match1, match2;
if (link_is_bc_rcvlink(l)) {
return 0;
}
- if (defq_len >= 3 && !((defq_len - 3) % 16))
- tipc_link_build_proto_msg(l, STATE_MSG, 0, 0, 0, 0, 0, xmitq);
+ if (defq_len >= 3 && !((defq_len - 3) % 16)) {
+ u16 rcvgap = buf_seqno(skb_peek(dfq)) - l->rcv_nxt;
+
+ tipc_link_build_proto_msg(l, STATE_MSG, 0, 0,
+ rcvgap, 0, 0, xmitq);
+ }
return 0;
}
struct sk_buff_head *defq = &l->deferdq;
struct tipc_msg *hdr = buf_msg(skb);
u16 seqno, rcv_nxt, win_lim;
+ int released = 0;
int rc = 0;
/* Verify and update link state */
if (unlikely(!link_is_up(l))) {
if (l->state == LINK_ESTABLISHING)
rc = TIPC_LINK_UP_EVT;
- goto drop;
+ kfree_skb(skb);
+ break;
}
/* Drop if outside receive window */
if (unlikely(less(seqno, rcv_nxt) || more(seqno, win_lim))) {
l->stats.duplicates++;
- goto drop;
- }
-
- /* Forward queues and wake up waiting users */
- if (likely(tipc_link_release_pkts(l, msg_ack(hdr)))) {
- tipc_link_advance_backlog(l, xmitq);
- if (unlikely(!skb_queue_empty(&l->wakeupq)))
- link_prepare_wakeup(l);
+ kfree_skb(skb);
+ break;
}
+ released += tipc_link_release_pkts(l, msg_ack(hdr));
/* Defer delivery if sequence gap */
if (unlikely(seqno != rcv_nxt)) {
break;
} while ((skb = __tipc_skb_dequeue(defq, l->rcv_nxt)));
- return rc;
-drop:
- kfree_skb(skb);
+ /* Forward queues and wake up waiting users */
+ if (released) {
+ tipc_link_update_cwin(l, released, 0);
+ tipc_link_advance_backlog(l, xmitq);
+ if (unlikely(!skb_queue_empty(&l->wakeupq)))
+ link_prepare_wakeup(l);
+ }
return rc;
}
if (!tipc_link_is_up(l) && (mtyp == STATE_MSG))
return;
- if (!skb_queue_empty(dfq))
+ if ((probe || probe_reply) && !skb_queue_empty(dfq))
rcvgap = buf_seqno(skb_peek(dfq)) - l->rcv_nxt;
skb = tipc_msg_create(LINK_PROTOCOL, mtyp, INT_H_SIZE,
msg_set_probe(hdr, probe);
msg_set_is_keepalive(hdr, probe || probe_reply);
if (l->peer_caps & TIPC_GAP_ACK_BLOCK)
- glen = tipc_build_gap_ack_blks(l, data);
+ glen = tipc_build_gap_ack_blks(l, data, rcvgap);
tipc_mon_prep(l->net, data + glen, &dlen, mstate, l->bearer_id);
msg_set_size(hdr, INT_H_SIZE + glen + dlen);
skb_trim(skb, INT_H_SIZE + glen + dlen);
return;
__skb_queue_head_init(&tnlq);
- __skb_queue_head_init(&tmpxq);
- __skb_queue_head_init(&frags);
-
- /* At least one packet required for safe algorithm => add dummy */
- skb = tipc_msg_create(TIPC_LOW_IMPORTANCE, TIPC_DIRECT_MSG,
- BASIC_H_SIZE, 0, l->addr, tipc_own_addr(l->net),
- 0, 0, TIPC_ERR_NO_PORT);
- if (!skb) {
- pr_warn("%sunable to create tunnel packet\n", link_co_err);
- return;
- }
- __skb_queue_tail(&tnlq, skb);
- tipc_link_xmit(l, &tnlq, &tmpxq);
- __skb_queue_purge(&tmpxq);
-
/* Link Synching:
* From now on, send only one single ("dummy") SYNCH message
* to peer. The SYNCH message does not contain any data, just
return;
}
+ __skb_queue_head_init(&tmpxq);
+ __skb_queue_head_init(&frags);
+ /* At least one packet required for safe algorithm => add dummy */
+ skb = tipc_msg_create(TIPC_LOW_IMPORTANCE, TIPC_DIRECT_MSG,
+ BASIC_H_SIZE, 0, l->addr, tipc_own_addr(l->net),
+ 0, 0, TIPC_ERR_NO_PORT);
+ if (!skb) {
+ pr_warn("%sunable to create tunnel packet\n", link_co_err);
+ return;
+ }
+ __skb_queue_tail(&tnlq, skb);
+ tipc_link_xmit(l, &tnlq, &tmpxq);
+ __skb_queue_purge(&tmpxq);
+
/* Initialize reusable tunnel packet header */
tipc_msg_init(tipc_own_addr(l->net), &tnlhdr, TUNNEL_PROTOCOL,
mtyp, INT_H_SIZE, l->addr);
tipc_link_create_dummy_tnl_msg(tnl, xmitq);
- /* This failover link enpoint was never established before,
+ /* This failover link endpoint was never established before,
* so it has not received anything from peer.
* Otherwise, it must be a normal failover situation or the
* node has entered SELF_DOWN_PEER_LEAVING and both peer nodes
&l->mon_state, l->bearer_id);
/* Send NACK if peer has sent pkts we haven't received yet */
- if (more(peers_snd_nxt, rcv_nxt) && !tipc_link_is_synching(l))
+ if ((reply || msg_is_keepalive(hdr)) &&
+ more(peers_snd_nxt, rcv_nxt) &&
+ !tipc_link_is_synching(l) &&
+ skb_queue_empty(&l->deferdq))
rcvgap = peers_snd_nxt - l->rcv_nxt;
if (rcvgap || reply)
tipc_link_build_proto_msg(l, STATE_MSG, 0, reply,
rcvgap, 0, 0, xmitq);
rc |= tipc_link_advance_transmq(l, ack, gap, ga, xmitq);
-
- /* If NACK, retransmit will now start at right position */
if (gap)
l->stats.recv_nacks++;
-
- tipc_link_advance_backlog(l, xmitq);
if (unlikely(!skb_queue_empty(&l->wakeupq)))
link_prepare_wakeup(l);
}
return 0;
}
-void tipc_link_set_queue_limits(struct tipc_link *l, u32 win)
+void tipc_link_set_queue_limits(struct tipc_link *l, u32 min_win, u32 max_win)
{
int max_bulk = TIPC_MAX_PUBL / (l->mtu / ITEM_SIZE);
- l->window = win;
- l->backlog[TIPC_LOW_IMPORTANCE].limit = max_t(u16, 50, win);
- l->backlog[TIPC_MEDIUM_IMPORTANCE].limit = max_t(u16, 100, win * 2);
- l->backlog[TIPC_HIGH_IMPORTANCE].limit = max_t(u16, 150, win * 3);
- l->backlog[TIPC_CRITICAL_IMPORTANCE].limit = max_t(u16, 200, win * 4);
+ l->min_win = min_win;
+ l->ssthresh = max_win;
+ l->max_win = max_win;
+ l->window = min_win;
+ l->backlog[TIPC_LOW_IMPORTANCE].limit = min_win * 2;
+ l->backlog[TIPC_MEDIUM_IMPORTANCE].limit = min_win * 4;
+ l->backlog[TIPC_HIGH_IMPORTANCE].limit = min_win * 6;
+ l->backlog[TIPC_CRITICAL_IMPORTANCE].limit = min_win * 8;
l->backlog[TIPC_SYSTEM_IMPORTANCE].limit = max_bulk;
}
}
if (props[TIPC_NLA_PROP_WIN]) {
- u32 win;
+ u32 max_win;
- win = nla_get_u32(props[TIPC_NLA_PROP_WIN]);
- if ((win < TIPC_MIN_LINK_WIN) || (win > TIPC_MAX_LINK_WIN))
+ max_win = nla_get_u32(props[TIPC_NLA_PROP_WIN]);
+ if (max_win < TIPC_DEF_LINK_WIN || max_win > TIPC_MAX_LINK_WIN)
return -EINVAL;
}
prop = nla_nest_start_noflag(msg->skb, TIPC_NLA_LINK_PROP);
if (!prop)
goto attr_msg_full;
- if (nla_put_u32(msg->skb, TIPC_NLA_PROP_WIN, bcl->window))
+ if (nla_put_u32(msg->skb, TIPC_NLA_PROP_WIN, bcl->max_win))
goto prop_msg_full;
if (nla_put_u32(msg->skb, TIPC_NLA_PROP_BROADCAST, bc_mode))
goto prop_msg_full;