]> asedeno.scripts.mit.edu Git - linux.git/blobdiff - net/ipv4/udp.c
udp: avoid a cache miss on dequeue
[linux.git] / net / ipv4 / udp.c
index ea6e4cff9fafe99af23fd8ea666cd979d5af9104..d8b265f1a33be9aed413dedcd737193457bf71ee 100644 (file)
@@ -1164,22 +1164,32 @@ int udp_sendpage(struct sock *sk, struct page *page, int offset,
 }
 
 /* fully reclaim rmem/fwd memory allocated for skb */
-static void udp_rmem_release(struct sock *sk, int size, int partial)
+static void udp_rmem_release(struct sock *sk, int size, int partial,
+                            bool rx_queue_lock_held)
 {
        struct udp_sock *up = udp_sk(sk);
+       struct sk_buff_head *sk_queue;
        int amt;
 
        if (likely(partial)) {
                up->forward_deficit += size;
                size = up->forward_deficit;
                if (size < (sk->sk_rcvbuf >> 2) &&
-                   !skb_queue_empty(&sk->sk_receive_queue))
+                   !skb_queue_empty(&up->reader_queue))
                        return;
        } else {
                size += up->forward_deficit;
        }
        up->forward_deficit = 0;
 
+       /* acquire the sk_receive_queue for fwd allocated memory scheduling,
+        * if the called don't held it already
+        */
+       sk_queue = &sk->sk_receive_queue;
+       if (!rx_queue_lock_held)
+               spin_lock(&sk_queue->lock);
+
+
        sk->sk_forward_alloc += size;
        amt = (sk->sk_forward_alloc - partial) & ~(SK_MEM_QUANTUM - 1);
        sk->sk_forward_alloc -= amt;
@@ -1188,19 +1198,31 @@ static void udp_rmem_release(struct sock *sk, int size, int partial)
                __sk_mem_reduce_allocated(sk, amt >> SK_MEM_QUANTUM_SHIFT);
 
        atomic_sub(size, &sk->sk_rmem_alloc);
+
+       /* this can save us from acquiring the rx queue lock on next receive */
+       skb_queue_splice_tail_init(sk_queue, &up->reader_queue);
+
+       if (!rx_queue_lock_held)
+               spin_unlock(&sk_queue->lock);
 }
 
-/* Note: called with sk_receive_queue.lock held.
+/* Note: called with reader_queue.lock held.
  * Instead of using skb->truesize here, find a copy of it in skb->dev_scratch
  * This avoids a cache line miss while receive_queue lock is held.
  * Look at __udp_enqueue_schedule_skb() to find where this copy is done.
  */
 void udp_skb_destructor(struct sock *sk, struct sk_buff *skb)
 {
-       udp_rmem_release(sk, skb->dev_scratch, 1);
+       udp_rmem_release(sk, skb->dev_scratch, 1, false);
 }
 EXPORT_SYMBOL(udp_skb_destructor);
 
+/* as above, but the caller held the rx queue lock, too */
+static void udp_skb_dtor_locked(struct sock *sk, struct sk_buff *skb)
+{
+       udp_rmem_release(sk, skb->dev_scratch, 1, true);
+}
+
 /* Idea of busylocks is to let producers grab an extra spinlock
  * to relieve pressure on the receive_queue spinlock shared by consumer.
  * Under flood, this means that only one producer can be in line
@@ -1306,14 +1328,16 @@ EXPORT_SYMBOL_GPL(__udp_enqueue_schedule_skb);
 void udp_destruct_sock(struct sock *sk)
 {
        /* reclaim completely the forward allocated memory */
+       struct udp_sock *up = udp_sk(sk);
        unsigned int total = 0;
        struct sk_buff *skb;
 
-       while ((skb = __skb_dequeue(&sk->sk_receive_queue)) != NULL) {
+       skb_queue_splice_tail_init(&sk->sk_receive_queue, &up->reader_queue);
+       while ((skb = __skb_dequeue(&up->reader_queue)) != NULL) {
                total += skb->truesize;
                kfree_skb(skb);
        }
-       udp_rmem_release(sk, total, 0);
+       udp_rmem_release(sk, total, 0, true);
 
        inet_sock_destruct(sk);
 }
@@ -1321,6 +1345,7 @@ EXPORT_SYMBOL_GPL(udp_destruct_sock);
 
 int udp_init_sock(struct sock *sk)
 {
+       skb_queue_head_init(&udp_sk(sk)->reader_queue);
        sk->sk_destruct = udp_destruct_sock;
        return 0;
 }
@@ -1334,10 +1359,31 @@ void skb_consume_udp(struct sock *sk, struct sk_buff *skb, int len)
                sk_peek_offset_bwd(sk, len);
                unlock_sock_fast(sk, slow);
        }
-       consume_skb(skb);
+
+       consume_stateless_skb(skb);
 }
 EXPORT_SYMBOL_GPL(skb_consume_udp);
 
+static struct sk_buff *__first_packet_length(struct sock *sk,
+                                            struct sk_buff_head *rcvq,
+                                            int *total)
+{
+       struct sk_buff *skb;
+
+       while ((skb = skb_peek(rcvq)) != NULL &&
+              udp_lib_checksum_complete(skb)) {
+               __UDP_INC_STATS(sock_net(sk), UDP_MIB_CSUMERRORS,
+                               IS_UDPLITE(sk));
+               __UDP_INC_STATS(sock_net(sk), UDP_MIB_INERRORS,
+                               IS_UDPLITE(sk));
+               atomic_inc(&sk->sk_drops);
+               __skb_unlink(skb, rcvq);
+               *total += skb->truesize;
+               kfree_skb(skb);
+       }
+       return skb;
+}
+
 /**
  *     first_packet_length     - return length of first packet in receive queue
  *     @sk: socket
@@ -1347,26 +1393,24 @@ EXPORT_SYMBOL_GPL(skb_consume_udp);
  */
 static int first_packet_length(struct sock *sk)
 {
-       struct sk_buff_head *rcvq = &sk->sk_receive_queue;
+       struct sk_buff_head *rcvq = &udp_sk(sk)->reader_queue;
+       struct sk_buff_head *sk_queue = &sk->sk_receive_queue;
        struct sk_buff *skb;
        int total = 0;
        int res;
 
        spin_lock_bh(&rcvq->lock);
-       while ((skb = skb_peek(rcvq)) != NULL &&
-               udp_lib_checksum_complete(skb)) {
-               __UDP_INC_STATS(sock_net(sk), UDP_MIB_CSUMERRORS,
-                               IS_UDPLITE(sk));
-               __UDP_INC_STATS(sock_net(sk), UDP_MIB_INERRORS,
-                               IS_UDPLITE(sk));
-               atomic_inc(&sk->sk_drops);
-               __skb_unlink(skb, rcvq);
-               total += skb->truesize;
-               kfree_skb(skb);
+       skb = __first_packet_length(sk, rcvq, &total);
+       if (!skb && !skb_queue_empty(sk_queue)) {
+               spin_lock(&sk_queue->lock);
+               skb_queue_splice_tail_init(sk_queue, rcvq);
+               spin_unlock(&sk_queue->lock);
+
+               skb = __first_packet_length(sk, rcvq, &total);
        }
        res = skb ? skb->len : -1;
        if (total)
-               udp_rmem_release(sk, total, 1);
+               udp_rmem_release(sk, total, 1, false);
        spin_unlock_bh(&rcvq->lock);
        return res;
 }
@@ -1400,6 +1444,77 @@ int udp_ioctl(struct sock *sk, int cmd, unsigned long arg)
 }
 EXPORT_SYMBOL(udp_ioctl);
 
+struct sk_buff *__skb_recv_udp(struct sock *sk, unsigned int flags,
+                              int noblock, int *peeked, int *off, int *err)
+{
+       struct sk_buff_head *sk_queue = &sk->sk_receive_queue;
+       struct sk_buff_head *queue;
+       struct sk_buff *last;
+       long timeo;
+       int error;
+
+       queue = &udp_sk(sk)->reader_queue;
+       flags |= noblock ? MSG_DONTWAIT : 0;
+       timeo = sock_rcvtimeo(sk, flags & MSG_DONTWAIT);
+       do {
+               struct sk_buff *skb;
+
+               error = sock_error(sk);
+               if (error)
+                       break;
+
+               error = -EAGAIN;
+               *peeked = 0;
+               do {
+                       spin_lock_bh(&queue->lock);
+                       skb = __skb_try_recv_from_queue(sk, queue, flags,
+                                                       udp_skb_destructor,
+                                                       peeked, off, err,
+                                                       &last);
+                       if (skb) {
+                               spin_unlock_bh(&queue->lock);
+                               return skb;
+                       }
+
+                       if (skb_queue_empty(sk_queue)) {
+                               spin_unlock_bh(&queue->lock);
+                               goto busy_check;
+                       }
+
+                       /* refill the reader queue and walk it again
+                        * keep both queues locked to avoid re-acquiring
+                        * the sk_receive_queue lock if fwd memory scheduling
+                        * is needed.
+                        */
+                       spin_lock(&sk_queue->lock);
+                       skb_queue_splice_tail_init(sk_queue, queue);
+
+                       skb = __skb_try_recv_from_queue(sk, queue, flags,
+                                                       udp_skb_dtor_locked,
+                                                       peeked, off, err,
+                                                       &last);
+                       spin_unlock(&sk_queue->lock);
+                       spin_unlock_bh(&queue->lock);
+                       if (skb)
+                               return skb;
+
+busy_check:
+                       if (!sk_can_busy_loop(sk))
+                               break;
+
+                       sk_busy_loop(sk, flags & MSG_DONTWAIT);
+               } while (!skb_queue_empty(sk_queue));
+
+               /* sk_queue is empty, reader_queue may contain peeked packets */
+       } while (timeo &&
+                !__skb_wait_for_more_packets(sk, &error, &timeo,
+                                             (struct sk_buff *)sk_queue));
+
+       *err = error;
+       return NULL;
+}
+EXPORT_SYMBOL_GPL(__skb_recv_udp);
+
 /*
  *     This should be easy, if there is something there we
  *     return it, otherwise we block.
@@ -1490,7 +1605,8 @@ int udp_recvmsg(struct sock *sk, struct msghdr *msg, size_t len, int noblock,
        return err;
 
 csum_copy_err:
-       if (!__sk_queue_drop_skb(sk, skb, flags, udp_skb_destructor)) {
+       if (!__sk_queue_drop_skb(sk, &udp_sk(sk)->reader_queue, skb, flags,
+                                udp_skb_destructor)) {
                UDP_INC_STATS(sock_net(sk), UDP_MIB_CSUMERRORS, is_udplite);
                UDP_INC_STATS(sock_net(sk), UDP_MIB_INERRORS, is_udplite);
        }
@@ -1612,7 +1728,7 @@ static void udp_v4_rehash(struct sock *sk)
        udp_lib_rehash(sk, new_hash);
 }
 
-int __udp_queue_rcv_skb(struct sock *sk, struct sk_buff *skb)
+static int __udp_queue_rcv_skb(struct sock *sk, struct sk_buff *skb)
 {
        int rc;
 
@@ -1624,6 +1740,9 @@ int __udp_queue_rcv_skb(struct sock *sk, struct sk_buff *skb)
                sk_mark_napi_id_once(sk, skb);
        }
 
+       /* clear all pending head states while they are hot in the cache */
+       skb_release_head_state(skb);
+
        rc = __udp_enqueue_schedule_skb(sk, skb);
        if (rc < 0) {
                int is_udplite = IS_UDPLITE(sk);
@@ -1657,7 +1776,7 @@ EXPORT_SYMBOL(udp_encap_enable);
  * Note that in the success and error cases, the skb is assumed to
  * have either been requeued or freed.
  */
-int udp_queue_rcv_skb(struct sock *sk, struct sk_buff *skb)
+static int udp_queue_rcv_skb(struct sock *sk, struct sk_buff *skb)
 {
        struct udp_sock *up = udp_sk(sk);
        int is_udplite = IS_UDPLITE(sk);
@@ -2325,6 +2444,9 @@ unsigned int udp_poll(struct file *file, struct socket *sock, poll_table *wait)
        unsigned int mask = datagram_poll(file, sock, wait);
        struct sock *sk = sock->sk;
 
+       if (!skb_queue_empty(&udp_sk(sk)->reader_queue))
+               mask |= POLLIN | POLLRDNORM;
+
        sock_rps_record_flow(sk);
 
        /* Check for false positives due to checksum errors */