]> asedeno.scripts.mit.edu Git - linux.git/commitdiff
Merge branch 'packet_rollover'
authorDavid S. Miller <davem@davemloft.net>
Wed, 13 May 2015 19:43:01 +0000 (15:43 -0400)
committerDavid S. Miller <davem@davemloft.net>
Wed, 13 May 2015 19:43:01 +0000 (15:43 -0400)
Willem de Bruijn says:

====================
refine packet socket rollover:

1. mitigate a case of lock contention
2. avoid exporting resource exhaustion to other sockets,
   by migrating only to a victim socket that has ample room
3. avoid reordering of most flows on the socket,
   by migrating first the flow responsible for load imbalance
4. help processes detect load imbalance,
   by exporting rollover counters

Context: rollover implements flow migration in packet socket fanout
groups in case of extreme load imbalance. It is a specific
implementation of migration that minimizes reordering by selecting
the same victim socket when possible (and by selecting subsequent
victims in a round robin fashion, from which its name derives).

Changes:
  v2 -> v3:
    - statistics: replace unsigned long with __aligned_u64
  v1 -> v2:
    - huge flow detection: run lockless
    - huge flow detection: replace stored index with random
    - contention avoidance: test in packet_poll while lock held
    - contention avoidance: clear pressure sooner

          packet_poll and packet_recvmsg would clear only if the sock
          is empty to avoid taking the necessary lock. But,
          * packet_poll already holds this lock, so a lockless variant
            __packet_rcv_has_room is cheap.
          * packet_recvmsg is usually called only for non-ring sockets,
            which also runs lockless.

    - preparation: drop "single return" patch

          packet_rcv_has_room is now a locked wrapper around
          __packet_rcv_has_room, achieving the same (single footer).

The benchmark mentioned in the patches is at
https://github.com/wdebruij/kerneltools/blob/master/tests/bench_rollover.c
====================

Signed-off-by: David S. Miller <davem@davemloft.net>
include/uapi/linux/if_packet.h
net/packet/af_packet.c
net/packet/internal.h

index 053bd102fbe00a0affd7227359e25c7246de9d7e..d3d715f8c88f6d57c4318dc5b001e8efad2d074f 100644 (file)
@@ -54,6 +54,7 @@ struct sockaddr_ll {
 #define PACKET_FANOUT                  18
 #define PACKET_TX_HAS_OFF              19
 #define PACKET_QDISC_BYPASS            20
+#define PACKET_ROLLOVER_STATS          21
 
 #define PACKET_FANOUT_HASH             0
 #define PACKET_FANOUT_LB               1
@@ -75,6 +76,12 @@ struct tpacket_stats_v3 {
        unsigned int    tp_freeze_q_cnt;
 };
 
+struct tpacket_rollover_stats {
+       __aligned_u64   tp_all;
+       __aligned_u64   tp_huge;
+       __aligned_u64   tp_failed;
+};
+
 union tpacket_stats_u {
        struct tpacket_stats stats1;
        struct tpacket_stats_v3 stats3;
index 12c5dde8e344f1cc766b631de825211fb73bb352..31d58565726c4d72d48ce105a774901c767ced5e 100644 (file)
@@ -1234,27 +1234,86 @@ static void packet_free_pending(struct packet_sock *po)
        free_percpu(po->tx_ring.pending_refcnt);
 }
 
-static bool packet_rcv_has_room(struct packet_sock *po, struct sk_buff *skb)
+#define ROOM_POW_OFF   2
+#define ROOM_NONE      0x0
+#define ROOM_LOW       0x1
+#define ROOM_NORMAL    0x2
+
+static bool __tpacket_has_room(struct packet_sock *po, int pow_off)
+{
+       int idx, len;
+
+       len = po->rx_ring.frame_max + 1;
+       idx = po->rx_ring.head;
+       if (pow_off)
+               idx += len >> pow_off;
+       if (idx >= len)
+               idx -= len;
+       return packet_lookup_frame(po, &po->rx_ring, idx, TP_STATUS_KERNEL);
+}
+
+static bool __tpacket_v3_has_room(struct packet_sock *po, int pow_off)
+{
+       int idx, len;
+
+       len = po->rx_ring.prb_bdqc.knum_blocks;
+       idx = po->rx_ring.prb_bdqc.kactive_blk_num;
+       if (pow_off)
+               idx += len >> pow_off;
+       if (idx >= len)
+               idx -= len;
+       return prb_lookup_block(po, &po->rx_ring, idx, TP_STATUS_KERNEL);
+}
+
+static int __packet_rcv_has_room(struct packet_sock *po, struct sk_buff *skb)
 {
        struct sock *sk = &po->sk;
+       int ret = ROOM_NONE;
+
+       if (po->prot_hook.func != tpacket_rcv) {
+               int avail = sk->sk_rcvbuf - atomic_read(&sk->sk_rmem_alloc)
+                                         - (skb ? skb->truesize : 0);
+               if (avail > (sk->sk_rcvbuf >> ROOM_POW_OFF))
+                       return ROOM_NORMAL;
+               else if (avail > 0)
+                       return ROOM_LOW;
+               else
+                       return ROOM_NONE;
+       }
+
+       if (po->tp_version == TPACKET_V3) {
+               if (__tpacket_v3_has_room(po, ROOM_POW_OFF))
+                       ret = ROOM_NORMAL;
+               else if (__tpacket_v3_has_room(po, 0))
+                       ret = ROOM_LOW;
+       } else {
+               if (__tpacket_has_room(po, ROOM_POW_OFF))
+                       ret = ROOM_NORMAL;
+               else if (__tpacket_has_room(po, 0))
+                       ret = ROOM_LOW;
+       }
+
+       return ret;
+}
+
+static int packet_rcv_has_room(struct packet_sock *po, struct sk_buff *skb)
+{
+       int ret;
        bool has_room;
 
-       if (po->prot_hook.func != tpacket_rcv)
-               return (atomic_read(&sk->sk_rmem_alloc) + skb->truesize)
-                       <= sk->sk_rcvbuf;
+       if (po->prot_hook.func == tpacket_rcv) {
+               spin_lock(&po->sk.sk_receive_queue.lock);
+               ret = __packet_rcv_has_room(po, skb);
+               spin_unlock(&po->sk.sk_receive_queue.lock);
+       } else {
+               ret = __packet_rcv_has_room(po, skb);
+       }
 
-       spin_lock(&sk->sk_receive_queue.lock);
-       if (po->tp_version == TPACKET_V3)
-               has_room = prb_lookup_block(po, &po->rx_ring,
-                                           po->rx_ring.prb_bdqc.kactive_blk_num,
-                                           TP_STATUS_KERNEL);
-       else
-               has_room = packet_lookup_frame(po, &po->rx_ring,
-                                              po->rx_ring.head,
-                                              TP_STATUS_KERNEL);
-       spin_unlock(&sk->sk_receive_queue.lock);
+       has_room = ret == ROOM_NORMAL;
+       if (po->pressure == has_room)
+               xchg(&po->pressure, !has_room);
 
-       return has_room;
+       return ret;
 }
 
 static void packet_sock_destruct(struct sock *sk)
@@ -1282,6 +1341,20 @@ static int fanout_rr_next(struct packet_fanout *f, unsigned int num)
        return x;
 }
 
+static bool fanout_flow_is_huge(struct packet_sock *po, struct sk_buff *skb)
+{
+       u32 rxhash;
+       int i, count = 0;
+
+       rxhash = skb_get_hash(skb);
+       for (i = 0; i < ROLLOVER_HLEN; i++)
+               if (po->rollover->history[i] == rxhash)
+                       count++;
+
+       po->rollover->history[prandom_u32() % ROLLOVER_HLEN] = rxhash;
+       return count > (ROLLOVER_HLEN >> 1);
+}
+
 static unsigned int fanout_demux_hash(struct packet_fanout *f,
                                      struct sk_buff *skb,
                                      unsigned int num)
@@ -1318,22 +1391,39 @@ static unsigned int fanout_demux_rnd(struct packet_fanout *f,
 
 static unsigned int fanout_demux_rollover(struct packet_fanout *f,
                                          struct sk_buff *skb,
-                                         unsigned int idx, unsigned int skip,
+                                         unsigned int idx, bool try_self,
                                          unsigned int num)
 {
-       unsigned int i, j;
+       struct packet_sock *po, *po_next;
+       unsigned int i, j, room = ROOM_NONE;
 
-       i = j = min_t(int, f->next[idx], num - 1);
+       po = pkt_sk(f->arr[idx]);
+
+       if (try_self) {
+               room = packet_rcv_has_room(po, skb);
+               if (room == ROOM_NORMAL ||
+                   (room == ROOM_LOW && !fanout_flow_is_huge(po, skb)))
+                       return idx;
+       }
+
+       i = j = min_t(int, po->rollover->sock, num - 1);
        do {
-               if (i != skip && packet_rcv_has_room(pkt_sk(f->arr[i]), skb)) {
+               po_next = pkt_sk(f->arr[i]);
+               if (po_next != po && !po_next->pressure &&
+                   packet_rcv_has_room(po_next, skb) == ROOM_NORMAL) {
                        if (i != j)
-                               f->next[idx] = i;
+                               po->rollover->sock = i;
+                       atomic_long_inc(&po->rollover->num);
+                       if (room == ROOM_LOW)
+                               atomic_long_inc(&po->rollover->num_huge);
                        return i;
                }
+
                if (++i == num)
                        i = 0;
        } while (i != j);
 
+       atomic_long_inc(&po->rollover->num_failed);
        return idx;
 }
 
@@ -1386,17 +1476,14 @@ static int packet_rcv_fanout(struct sk_buff *skb, struct net_device *dev,
                idx = fanout_demux_qm(f, skb, num);
                break;
        case PACKET_FANOUT_ROLLOVER:
-               idx = fanout_demux_rollover(f, skb, 0, (unsigned int) -1, num);
+               idx = fanout_demux_rollover(f, skb, 0, false, num);
                break;
        }
 
-       po = pkt_sk(f->arr[idx]);
-       if (fanout_has_flag(f, PACKET_FANOUT_FLAG_ROLLOVER) &&
-           unlikely(!packet_rcv_has_room(po, skb))) {
-               idx = fanout_demux_rollover(f, skb, idx, idx, num);
-               po = pkt_sk(f->arr[idx]);
-       }
+       if (fanout_has_flag(f, PACKET_FANOUT_FLAG_ROLLOVER))
+               idx = fanout_demux_rollover(f, skb, idx, true, num);
 
+       po = pkt_sk(f->arr[idx]);
        return po->prot_hook.func(skb, dev, &po->prot_hook, orig_dev);
 }
 
@@ -1467,6 +1554,15 @@ static int fanout_add(struct sock *sk, u16 id, u16 type_flags)
        if (po->fanout)
                return -EALREADY;
 
+       if (type_flags & PACKET_FANOUT_FLAG_ROLLOVER) {
+               po->rollover = kzalloc(sizeof(*po->rollover), GFP_KERNEL);
+               if (!po->rollover)
+                       return -ENOMEM;
+               atomic_long_set(&po->rollover->num, 0);
+               atomic_long_set(&po->rollover->num_huge, 0);
+               atomic_long_set(&po->rollover->num_failed, 0);
+       }
+
        mutex_lock(&fanout_mutex);
        match = NULL;
        list_for_each_entry(f, &fanout_list, list) {
@@ -1515,6 +1611,10 @@ static int fanout_add(struct sock *sk, u16 id, u16 type_flags)
        }
 out:
        mutex_unlock(&fanout_mutex);
+       if (err) {
+               kfree(po->rollover);
+               po->rollover = NULL;
+       }
        return err;
 }
 
@@ -1536,6 +1636,8 @@ static void fanout_release(struct sock *sk)
                kfree(f);
        }
        mutex_unlock(&fanout_mutex);
+
+       kfree(po->rollover);
 }
 
 static const struct proto_ops packet_ops;
@@ -2865,6 +2967,7 @@ static int packet_create(struct net *net, struct socket *sock, int protocol,
 
        spin_lock_init(&po->bind_lock);
        mutex_init(&po->pg_vec_lock);
+       po->rollover = NULL;
        po->prot_hook.func = packet_rcv;
 
        if (sock->type == SOCK_PACKET)
@@ -2942,6 +3045,9 @@ static int packet_recvmsg(struct socket *sock, struct msghdr *msg, size_t len,
        if (skb == NULL)
                goto out;
 
+       if (pkt_sk(sk)->pressure)
+               packet_rcv_has_room(pkt_sk(sk), NULL);
+
        if (pkt_sk(sk)->has_vnet_hdr) {
                struct virtio_net_hdr vnet_hdr = { 0 };
 
@@ -3485,6 +3591,7 @@ static int packet_getsockopt(struct socket *sock, int level, int optname,
        struct packet_sock *po = pkt_sk(sk);
        void *data = &val;
        union tpacket_stats_u st;
+       struct tpacket_rollover_stats rstats;
 
        if (level != SOL_PACKET)
                return -ENOPROTOOPT;
@@ -3560,6 +3667,15 @@ static int packet_getsockopt(struct socket *sock, int level, int optname,
                        ((u32)po->fanout->flags << 24)) :
                       0);
                break;
+       case PACKET_ROLLOVER_STATS:
+               if (!po->rollover)
+                       return -EINVAL;
+               rstats.tp_all = atomic_long_read(&po->rollover->num);
+               rstats.tp_huge = atomic_long_read(&po->rollover->num_huge);
+               rstats.tp_failed = atomic_long_read(&po->rollover->num_failed);
+               data = &rstats;
+               lv = sizeof(rstats);
+               break;
        case PACKET_TX_HAS_OFF:
                val = po->tp_tx_has_off;
                break;
@@ -3697,6 +3813,8 @@ static unsigned int packet_poll(struct file *file, struct socket *sock,
                        TP_STATUS_KERNEL))
                        mask |= POLLIN | POLLRDNORM;
        }
+       if (po->pressure && __packet_rcv_has_room(po, NULL) == ROOM_NORMAL)
+               xchg(&po->pressure, 0);
        spin_unlock_bh(&sk->sk_receive_queue.lock);
        spin_lock_bh(&sk->sk_write_queue.lock);
        if (po->tx_ring.pg_vec) {
index fe6e20caea1d9bcd3711b3ad29a8de2ae40cd1bc..c035d263c1e8d119267633971920106b3bf627f3 100644 (file)
@@ -82,12 +82,20 @@ struct packet_fanout {
        atomic_t                rr_cur;
        struct list_head        list;
        struct sock             *arr[PACKET_FANOUT_MAX];
-       int                     next[PACKET_FANOUT_MAX];
        spinlock_t              lock;
        atomic_t                sk_ref;
        struct packet_type      prot_hook ____cacheline_aligned_in_smp;
 };
 
+struct packet_rollover {
+       int                     sock;
+       atomic_long_t           num;
+       atomic_long_t           num_huge;
+       atomic_long_t           num_failed;
+#define ROLLOVER_HLEN  (L1_CACHE_BYTES / sizeof(u32))
+       u32                     history[ROLLOVER_HLEN] ____cacheline_aligned;
+} ____cacheline_aligned_in_smp;
+
 struct packet_sock {
        /* struct sock has to be the first member of packet_sock */
        struct sock             sk;
@@ -102,8 +110,10 @@ struct packet_sock {
                                auxdata:1,
                                origdev:1,
                                has_vnet_hdr:1;
+       int                     pressure;
        int                     ifindex;        /* bound device         */
        __be16                  num;
+       struct packet_rollover  *rollover;
        struct packet_mclist    *mclist;
        atomic_t                mapped;
        enum tpacket_versions   tp_version;