int __skb_wait_for_more_packets(struct sock *sk, int *err, long *timeo_p,
                                const struct sk_buff *skb);
 struct sk_buff *__skb_try_recv_datagram(struct sock *sk, unsigned flags,
+                                       void (*destructor)(struct sock *sk,
+                                                          struct sk_buff *skb),
                                        int *peeked, int *off, int *err,
                                        struct sk_buff **last);
 struct sk_buff *__skb_recv_datagram(struct sock *sk, unsigned flags,
+                                   void (*destructor)(struct sock *sk,
+                                                      struct sk_buff *skb),
                                    int *peeked, int *off, int *err);
 struct sk_buff *skb_recv_datagram(struct sock *sk, unsigned flags, int noblock,
                                  int *err);
 
 /* net/ipv4/udp.c */
 void skb_consume_udp(struct sock *sk, struct sk_buff *skb, int len);
 int __udp_enqueue_schedule_skb(struct sock *sk, struct sk_buff *skb);
+void udp_skb_destructor(struct sock *sk, struct sk_buff *skb);
+static inline struct sk_buff *
+__skb_recv_udp(struct sock *sk, unsigned int flags, int noblock, int *peeked,
+              int *off, int *err)
+{
+       return __skb_recv_datagram(sk, flags | (noblock ? MSG_DONTWAIT : 0),
+                                  udp_skb_destructor, peeked, off, err);
+}
+static inline struct sk_buff *skb_recv_udp(struct sock *sk, unsigned int flags,
+                                          int noblock, int *err)
+{
+       int peeked, off = 0;
+
+       return __skb_recv_udp(sk, flags, noblock, &peeked, &off, err);
+}
 
 void udp_v4_early_demux(struct sk_buff *skb);
 int udp_get_port(struct sock *sk, unsigned short snum,
 
  *     __skb_try_recv_datagram - Receive a datagram skbuff
  *     @sk: socket
  *     @flags: MSG_ flags
+ *     @destructor: invoked under the receive lock on successful dequeue
  *     @peeked: returns non-zero if this packet has been seen before
  *     @off: an offset in bytes to peek skb from. Returns an offset
  *           within an skb where data actually starts
  *     the standard around please.
  */
 struct sk_buff *__skb_try_recv_datagram(struct sock *sk, unsigned int flags,
+                                       void (*destructor)(struct sock *sk,
+                                                          struct sk_buff *skb),
                                        int *peeked, int *off, int *err,
                                        struct sk_buff **last)
 {
                                }
 
                                atomic_inc(&skb->users);
-                       } else
+                       } else {
                                __skb_unlink(skb, queue);
-
+                               if (destructor)
+                                       destructor(sk, skb);
+                       }
                        spin_unlock_irqrestore(&queue->lock, cpu_flags);
                        *off = _off;
                        return skb;
 EXPORT_SYMBOL(__skb_try_recv_datagram);
 
 struct sk_buff *__skb_recv_datagram(struct sock *sk, unsigned int flags,
+                                   void (*destructor)(struct sock *sk,
+                                                      struct sk_buff *skb),
                                    int *peeked, int *off, int *err)
 {
        struct sk_buff *skb, *last;
        timeo = sock_rcvtimeo(sk, flags & MSG_DONTWAIT);
 
        do {
-               skb = __skb_try_recv_datagram(sk, flags, peeked, off, err,
-                                             &last);
+               skb = __skb_try_recv_datagram(sk, flags, destructor, peeked,
+                                             off, err, &last);
                if (skb)
                        return skb;
 
        int peeked, off = 0;
 
        return __skb_recv_datagram(sk, flags | (noblock ? MSG_DONTWAIT : 0),
-                                  &peeked, &off, err);
+                                  NULL, &peeked, &off, err);
 }
 EXPORT_SYMBOL(skb_recv_datagram);
 
 
        return ret;
 }
 
+/* fully reclaim rmem/fwd memory allocated for skb */
 static void udp_rmem_release(struct sock *sk, int size, int partial)
 {
        int amt;
 
        atomic_sub(size, &sk->sk_rmem_alloc);
-
-       spin_lock_bh(&sk->sk_receive_queue.lock);
        sk->sk_forward_alloc += size;
        amt = (sk->sk_forward_alloc - partial) & ~(SK_MEM_QUANTUM - 1);
        sk->sk_forward_alloc -= amt;
-       spin_unlock_bh(&sk->sk_receive_queue.lock);
 
        if (amt)
                __sk_mem_reduce_allocated(sk, amt >> SK_MEM_QUANTUM_SHIFT);
 }
 
-static void udp_rmem_free(struct sk_buff *skb)
+/* Note: called with sk_receive_queue.lock held */
+void udp_skb_destructor(struct sock *sk, struct sk_buff *skb)
 {
-       udp_rmem_release(skb->sk, skb->truesize, 1);
+       udp_rmem_release(sk, skb->truesize, 1);
 }
+EXPORT_SYMBOL(udp_skb_destructor);
 
 int __udp_enqueue_schedule_skb(struct sock *sk, struct sk_buff *skb)
 {
 
        sk->sk_forward_alloc -= size;
 
-       /* the skb owner in now the udp socket */
-       skb->sk = sk;
-       skb->destructor = udp_rmem_free;
+       /* no need to setup a destructor, we will explicitly release the
+        * forward allocated memory on dequeue
+        */
        skb->dev = NULL;
        sock_skb_set_dropcount(sk, skb);
 
 static void udp_destruct_sock(struct sock *sk)
 {
        /* reclaim completely the forward allocated memory */
-       __skb_queue_purge(&sk->sk_receive_queue);
-       udp_rmem_release(sk, 0, 0);
+       unsigned int total = 0;
+       struct sk_buff *skb;
+
+       while ((skb = __skb_dequeue(&sk->sk_receive_queue)) != NULL) {
+               total += skb->truesize;
+               kfree_skb(skb);
+       }
+       udp_rmem_release(sk, total, 0);
+
        inet_sock_destruct(sk);
 }
 
  */
 static int first_packet_length(struct sock *sk)
 {
-       struct sk_buff_head list_kill, *rcvq = &sk->sk_receive_queue;
+       struct sk_buff_head *rcvq = &sk->sk_receive_queue;
        struct sk_buff *skb;
+       int total = 0;
        int res;
 
-       __skb_queue_head_init(&list_kill);
-
        spin_lock_bh(&rcvq->lock);
        while ((skb = skb_peek(rcvq)) != NULL &&
                udp_lib_checksum_complete(skb)) {
                                IS_UDPLITE(sk));
                atomic_inc(&sk->sk_drops);
                __skb_unlink(skb, rcvq);
-               __skb_queue_tail(&list_kill, skb);
+               total += skb->truesize;
+               kfree_skb(skb);
        }
        res = skb ? skb->len : -1;
+       if (total)
+               udp_rmem_release(sk, total, 1);
        spin_unlock_bh(&rcvq->lock);
-
-       __skb_queue_purge(&list_kill);
        return res;
 }
 
 
 try_again:
        peeking = off = sk_peek_offset(sk, flags);
-       skb = __skb_recv_datagram(sk, flags | (noblock ? MSG_DONTWAIT : 0),
-                                 &peeked, &off, &err);
+       skb = __skb_recv_udp(sk, flags, noblock, &peeked, &off, &err);
        if (!skb)
                return err;
 
 
 
 try_again:
        peeking = off = sk_peek_offset(sk, flags);
-       skb = __skb_recv_datagram(sk, flags | (noblock ? MSG_DONTWAIT : 0),
-                                 &peeked, &off, &err);
+       skb = __skb_recv_udp(sk, flags, noblock, &peeked, &off, &err);
        if (!skb)
                return err;
 
 
 
        ASSERT(!irqs_disabled());
 
-       skb = skb_recv_datagram(udp_sk, 0, 1, &ret);
+       skb = skb_recv_udp(udp_sk, 0, 1, &ret);
        if (!skb) {
                if (ret == -EAGAIN)
                        return;
 
        __UDP_INC_STATS(&init_net, UDP_MIB_INDATAGRAMS, 0);
 
-       /* The socket buffer we have is owned by UDP, with UDP's data all over
-        * it, but we really want our own data there.
+       /* The UDP protocol already released all skb resources;
+        * we are free to add our own data there.
         */
-       skb_orphan(skb);
        sp = rxrpc_skb(skb);
 
        /* dig out the RxRPC connection details */
 
        err = kernel_recvmsg(svsk->sk_sock, &msg, NULL,
                             0, 0, MSG_PEEK | MSG_DONTWAIT);
        if (err >= 0)
-               skb = skb_recv_datagram(svsk->sk_sk, 0, 1, &err);
+               skb = skb_recv_udp(svsk->sk_sk, 0, 1, &err);
 
        if (skb == NULL) {
                if (err != -EAGAIN) {
 
        if (sk == NULL)
                goto out;
        for (;;) {
-               skb = skb_recv_datagram(sk, 0, 1, &err);
+               skb = skb_recv_udp(sk, 0, 1, &err);
                if (skb != NULL) {
                        xs_udp_data_read_skb(&transport->xprt, sk, skb);
                        consume_skb(skb);
 
                mutex_lock(&u->iolock);
 
                skip = sk_peek_offset(sk, flags);
-               skb = __skb_try_recv_datagram(sk, flags, &peeked, &skip, &err,
-                                             &last);
+               skb = __skb_try_recv_datagram(sk, flags, NULL, &peeked, &skip,
+                                             &err, &last);
                if (skb)
                        break;