mutex_unlock(&con->mutex);
 }
 
+/*
+ * Revoke a page vector that we may be reading data into
+ */
+void ceph_con_revoke_pages(struct ceph_connection *con, struct page **pages)
+{
+       mutex_lock(&con->mutex);
+       if (con->in_msg && con->in_msg->pages == pages) {
+               unsigned data_len = le32_to_cpu(con->in_hdr.data_len);
+
+               /* skip rest of message */
+               dout("con_revoke_pages %p msg %p pages %p revoked\n", con,
+                    con->in_msg, pages);
+               if (con->in_msg_pos.data_pos < data_len)
+                       con->in_base_pos = con->in_msg_pos.data_pos - data_len;
+               else
+                       con->in_base_pos = con->in_base_pos -
+                               sizeof(struct ceph_msg_header) -
+                               sizeof(struct ceph_msg_footer);
+               con->in_msg->pages = NULL;
+               ceph_msg_put(con->in_msg);
+               con->in_msg = NULL;
+               con->in_tag = CEPH_MSGR_TAG_READY;
+       } else {
+               dout("con_revoke_pages %p msg %p pages %p no-op\n",
+                    con, con->in_msg, pages);
+       }
+       mutex_unlock(&con->mutex);
+}
+
 /*
  * Queue a keepalive byte to ensure the tcp connection is alive.
  */
 
                ceph_msg_put(req->r_request);
        if (req->r_reply)
                ceph_msg_put(req->r_reply);
+       if (req->r_con_filling_pages) {
+               dout("release_request revoking pages %p from con %p\n",
+                    req->r_pages, req->r_con_filling_pages);
+               ceph_con_revoke_pages(req->r_con_filling_pages,
+                                     req->r_pages);
+               ceph_con_put(req->r_con_filling_pages);
+       }
        if (req->r_own_pages)
                ceph_release_page_vector(req->r_pages,
                                         req->r_num_pages);
  * handle osd op reply.  either call the callback if it is specified,
  * or do the completion to wake up the waiting thread.
  */
-static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg)
+static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg,
+                        struct ceph_connection *con)
 {
        struct ceph_osd_reply_head *rhead = msg->front.iov_base;
        struct ceph_osd_request *req;
        ceph_osdc_get_request(req);
        flags = le32_to_cpu(rhead->flags);
 
+       /*
+        * if this connection filled our pages, drop our reference now, to
+        * avoid a (safe but slower) revoke later.
+        */
+       if (req->r_con_filling_pages == con && req->r_pages == msg->pages) {
+               dout(" got pages, dropping con_filling_pages ref %p\n", con);
+               req->r_con_filling_pages = NULL;
+               ceph_con_put(con);
+       }
+
        if (req->r_reply) {
                /*
                 * once we see the message has been received, we don't
        }
        dout("prepare_pages tid %llu has %d pages, want %d\n",
             tid, req->r_num_pages, want);
-       if (likely(req->r_num_pages >= want && !req->r_prepared_pages)) {
-               m->pages = req->r_pages;
-               m->nr_pages = req->r_num_pages;
-               req->r_reply = m;  /* only for duration of read over socket */
-               ceph_msg_get(m);
-               req->r_prepared_pages = 1;
-               ret = 0; /* success */
+       if (unlikely(req->r_num_pages < want))
+               goto out;
+
+       if (req->r_con_filling_pages) {
+               dout("revoking pages %p from old con %p\n", req->r_pages,
+                    req->r_con_filling_pages);
+               ceph_con_revoke_pages(req->r_con_filling_pages, req->r_pages);
+               ceph_con_put(req->r_con_filling_pages);
        }
+       req->r_con_filling_pages = ceph_con_get(con);
+       req->r_reply = ceph_msg_get(m); /* for duration of read over socket */
+       m->pages = req->r_pages;
+       m->nr_pages = req->r_num_pages;
+       ret = 0; /* success */
 out:
        mutex_unlock(&osdc->request_mutex);
        return ret;
                ceph_osdc_handle_map(osdc, msg);
                break;
        case CEPH_MSG_OSD_OPREPLY:
-               handle_reply(osdc, msg);
+               handle_reply(osdc, msg, con);
                break;
 
        default: