}
 EXPORT_SYMBOL(udp_skb_destructor);
 
+/* Idea of busylocks is to let producers grab an extra spinlock
+ * to relieve pressure on the receive_queue spinlock shared by consumer.
+ * Under flood, this means that only one producer can be in line
+ * trying to acquire the receive_queue spinlock.
+ * These busylock can be allocated on a per cpu manner, instead of a
+ * per socket one (that would consume a cache line per socket)
+ */
+static int udp_busylocks_log __read_mostly;
+static spinlock_t *udp_busylocks __read_mostly;
+
+static spinlock_t *busylock_acquire(void *ptr)
+{
+       spinlock_t *busy;
+
+       busy = udp_busylocks + hash_ptr(ptr, udp_busylocks_log);
+       spin_lock(busy);
+       return busy;
+}
+
+static void busylock_release(spinlock_t *busy)
+{
+       if (busy)
+               spin_unlock(busy);
+}
+
 int __udp_enqueue_schedule_skb(struct sock *sk, struct sk_buff *skb)
 {
        struct sk_buff_head *list = &sk->sk_receive_queue;
        int rmem, delta, amt, err = -ENOMEM;
+       spinlock_t *busy = NULL;
        int size;
 
        /* try to avoid the costly atomic add/sub pair when the receive
         * - Less cache line misses at copyout() time
         * - Less work at consume_skb() (less alien page frag freeing)
         */
-       if (rmem > (sk->sk_rcvbuf >> 1))
+       if (rmem > (sk->sk_rcvbuf >> 1)) {
                skb_condense(skb);
+
+               busy = busylock_acquire(sk);
+       }
        size = skb->truesize;
 
        /* we drop only if the receive buf is full and the receive
        if (!sock_flag(sk, SOCK_DEAD))
                sk->sk_data_ready(sk);
 
+       busylock_release(busy);
        return 0;
 
 uncharge_drop:
 
 drop:
        atomic_inc(&sk->sk_drops);
+       busylock_release(busy);
        return err;
 }
 EXPORT_SYMBOL_GPL(__udp_enqueue_schedule_skb);
 void __init udp_init(void)
 {
        unsigned long limit;
+       unsigned int i;
 
        udp_table_init(&udp_table, "UDP");
        limit = nr_free_buffer_pages() / 8;
 
        sysctl_udp_rmem_min = SK_MEM_QUANTUM;
        sysctl_udp_wmem_min = SK_MEM_QUANTUM;
+
+       /* 16 spinlocks per cpu */
+       udp_busylocks_log = ilog2(nr_cpu_ids) + 4;
+       udp_busylocks = kmalloc(sizeof(spinlock_t) << udp_busylocks_log,
+                               GFP_KERNEL);
+       if (!udp_busylocks)
+               panic("UDP: failed to alloc udp_busylocks\n");
+       for (i = 0; i < (1U << udp_busylocks_log); i++)
+               spin_lock_init(udp_busylocks + i);
 }