}
 
 /* fully reclaim rmem/fwd memory allocated for skb */
-static void udp_rmem_release(struct sock *sk, int size, int partial)
+static void udp_rmem_release(struct sock *sk, int size, int partial,
+                            bool rx_queue_lock_held)
 {
        struct udp_sock *up = udp_sk(sk);
        struct sk_buff_head *sk_queue;
        }
        up->forward_deficit = 0;
 
-       /* acquire the sk_receive_queue for fwd allocated memory scheduling */
+       /* acquire the sk_receive_queue for fwd allocated memory scheduling,
+        * if the called don't held it already
+        */
        sk_queue = &sk->sk_receive_queue;
-       spin_lock(&sk_queue->lock);
+       if (!rx_queue_lock_held)
+               spin_lock(&sk_queue->lock);
+
 
        sk->sk_forward_alloc += size;
        amt = (sk->sk_forward_alloc - partial) & ~(SK_MEM_QUANTUM - 1);
        /* this can save us from acquiring the rx queue lock on next receive */
        skb_queue_splice_tail_init(sk_queue, &up->reader_queue);
 
-       spin_unlock(&sk_queue->lock);
+       if (!rx_queue_lock_held)
+               spin_unlock(&sk_queue->lock);
 }
 
 /* Note: called with reader_queue.lock held.
  */
 void udp_skb_destructor(struct sock *sk, struct sk_buff *skb)
 {
-       udp_rmem_release(sk, skb->dev_scratch, 1);
+       udp_rmem_release(sk, skb->dev_scratch, 1, false);
 }
 EXPORT_SYMBOL(udp_skb_destructor);
 
+/* as above, but the caller held the rx queue lock, too */
+void udp_skb_dtor_locked(struct sock *sk, struct sk_buff *skb)
+{
+       udp_rmem_release(sk, skb->dev_scratch, 1, true);
+}
+
 /* Idea of busylocks is to let producers grab an extra spinlock
  * to relieve pressure on the receive_queue spinlock shared by consumer.
  * Under flood, this means that only one producer can be in line
                total += skb->truesize;
                kfree_skb(skb);
        }
-       udp_rmem_release(sk, total, 0);
+       udp_rmem_release(sk, total, 0, true);
 
        inet_sock_destruct(sk);
 }
        }
        res = skb ? skb->len : -1;
        if (total)
-               udp_rmem_release(sk, total, 1);
+               udp_rmem_release(sk, total, 1, false);
        spin_unlock_bh(&rcvq->lock);
        return res;
 }
                                goto busy_check;
                        }
 
-                       /* refill the reader queue and walk it again */
+                       /* refill the reader queue and walk it again
+                        * keep both queues locked to avoid re-acquiring
+                        * the sk_receive_queue lock if fwd memory scheduling
+                        * is needed.
+                        */
                        _off = *off;
                        spin_lock(&sk_queue->lock);
                        skb_queue_splice_tail_init(sk_queue, queue);
-                       spin_unlock(&sk_queue->lock);
 
                        skb = __skb_try_recv_from_queue(sk, queue, flags,
-                                                       udp_skb_destructor,
+                                                       udp_skb_dtor_locked,
                                                        peeked, &_off, err,
                                                        &last);
+                       spin_unlock(&sk_queue->lock);
                        spin_unlock_bh(&queue->lock);
                        if (skb) {
                                *off = _off;