#include <linux/kernel.h>
 #include <linux/slab.h>
 #include <linux/export.h>
+#include <linux/skbuff.h>
+#include <linux/list.h>
+#include <linux/errqueue.h>
 
 #include "rds.h"
 
 }
 EXPORT_SYMBOL_GPL(rds_message_addref);
 
+static inline bool skb_zcookie_add(struct sk_buff *skb, u32 cookie)
+{
+       struct sock_exterr_skb *serr = SKB_EXT_ERR(skb);
+       int ncookies;
+       u32 *ptr;
+
+       if (serr->ee.ee_origin != SO_EE_ORIGIN_ZCOOKIE)
+               return false;
+       ncookies = serr->ee.ee_data;
+       if (ncookies == SO_EE_ORIGIN_MAX_ZCOOKIES)
+               return false;
+       ptr = skb_put(skb, sizeof(u32));
+       *ptr = cookie;
+       serr->ee.ee_data = ++ncookies;
+       return true;
+}
+
+static void rds_rm_zerocopy_callback(struct rds_sock *rs,
+                                    struct rds_znotifier *znotif)
+{
+       struct sock *sk = rds_rs_to_sk(rs);
+       struct sk_buff *skb, *tail;
+       struct sock_exterr_skb *serr;
+       unsigned long flags;
+       struct sk_buff_head *q;
+       u32 cookie = znotif->z_cookie;
+
+       q = &sk->sk_error_queue;
+       spin_lock_irqsave(&q->lock, flags);
+       tail = skb_peek_tail(q);
+
+       if (tail && skb_zcookie_add(tail, cookie)) {
+               spin_unlock_irqrestore(&q->lock, flags);
+               mm_unaccount_pinned_pages(&znotif->z_mmp);
+               consume_skb(rds_skb_from_znotifier(znotif));
+               sk->sk_error_report(sk);
+               return;
+       }
+
+       skb = rds_skb_from_znotifier(znotif);
+       serr = SKB_EXT_ERR(skb);
+       memset(&serr->ee, 0, sizeof(serr->ee));
+       serr->ee.ee_errno = 0;
+       serr->ee.ee_origin = SO_EE_ORIGIN_ZCOOKIE;
+       serr->ee.ee_info = 0;
+       WARN_ON(!skb_zcookie_add(skb, cookie));
+
+       __skb_queue_tail(q, skb);
+
+       spin_unlock_irqrestore(&q->lock, flags);
+       sk->sk_error_report(sk);
+
+       mm_unaccount_pinned_pages(&znotif->z_mmp);
+}
+
 /*
  * This relies on dma_map_sg() not touching sg[].page during merging.
  */
 static void rds_message_purge(struct rds_message *rm)
 {
        unsigned long i, flags;
+       bool zcopy = false;
 
        if (unlikely(test_bit(RDS_MSG_PAGEVEC, &rm->m_flags)))
                return;
 
-       for (i = 0; i < rm->data.op_nents; i++) {
-               rdsdebug("putting data page %p\n", (void *)sg_page(&rm->data.op_sg[i]));
-               /* XXX will have to put_page for page refs */
-               __free_page(sg_page(&rm->data.op_sg[i]));
-       }
-       rm->data.op_nents = 0;
        spin_lock_irqsave(&rm->m_rs_lock, flags);
        if (rm->m_rs) {
-               sock_put(rds_rs_to_sk(rm->m_rs));
+               struct rds_sock *rs = rm->m_rs;
+
+               if (rm->data.op_mmp_znotifier) {
+                       zcopy = true;
+                       rds_rm_zerocopy_callback(rs, rm->data.op_mmp_znotifier);
+                       rm->data.op_mmp_znotifier = NULL;
+               }
+               sock_put(rds_rs_to_sk(rs));
                rm->m_rs = NULL;
        }
        spin_unlock_irqrestore(&rm->m_rs_lock, flags);
 
+       for (i = 0; i < rm->data.op_nents; i++) {
+               /* XXX will have to put_page for page refs */
+               if (!zcopy)
+                       __free_page(sg_page(&rm->data.op_sg[i]));
+               else
+                       put_page(sg_page(&rm->data.op_sg[i]));
+       }
+       rm->data.op_nents = 0;
+
        if (rm->rdma.op_active)
                rds_rdma_free_op(&rm->rdma);
        if (rm->rdma.op_rdma_mr)