u64                     xferred;        /* bytes transferred */
        u64                     version;
-       s32                     result;
+       int                     result;
        atomic_t                done;
 
        rbd_obj_callback_t      callback;
 
 static void rbd_osd_read_callback(struct rbd_obj_request *obj_request)
 {
-
        dout("%s: obj %p result %d %llu/%llu\n", __func__, obj_request,
                obj_request->result, obj_request->xferred, obj_request->length);
-       if (obj_request->result == (s32) -ENOENT) {
+       /*
+        * ENOENT means a hole in the object.  We zero-fill the
+        * entire length of the request.  A short read also implies
+        * zero-fill to the end of the request.  Either way we
+        * update the xferred count to indicate the whole request
+        * was satisfied.
+        */
+       if (obj_request->result == -ENOENT) {
                zero_bio_chain(obj_request->bio_list, 0);
                obj_request->result = 0;
+               obj_request->xferred = obj_request->length;
        } else if (obj_request->xferred < obj_request->length &&
                        !obj_request->result) {
                zero_bio_chain(obj_request->bio_list, obj_request->xferred);
 
 static void rbd_osd_write_callback(struct rbd_obj_request *obj_request)
 {
-       dout("%s: obj %p result %d %llu/%llu\n", __func__, obj_request,
-               obj_request->result, obj_request->xferred, obj_request->length);
-
-       /* A short write really shouldn't occur.  Warn if we see one */
-
-       if (obj_request->xferred != obj_request->length) {
-               struct rbd_img_request *img_request = obj_request->img_request;
-               struct rbd_device *rbd_dev;
-
-               rbd_dev = img_request ? img_request->rbd_dev : NULL;
-               rbd_warn(rbd_dev, "wrote %llu want %llu\n",
-                       obj_request->xferred, obj_request->length);
-       }
-
+       dout("%s: obj %p result %d %llu\n", __func__, obj_request,
+               obj_request->result, obj_request->length);
+       /*
+        * There is no such thing as a successful short write.
+        * Our xferred value is the number of bytes transferred
+        * back.  Set it to our originally-requested length.
+        */
+       obj_request->xferred = obj_request->length;
        obj_request_done_set(obj_request);
 }
 
                                struct ceph_msg *msg)
 {
        struct rbd_obj_request *obj_request = osd_req->r_priv;
-       struct ceph_osd_reply_head *reply_head;
-       struct ceph_osd_op *op;
-       u32 num_ops;
        u16 opcode;
 
        dout("%s: osd_req %p msg %p\n", __func__, osd_req, msg);
        rbd_assert(!!obj_request->img_request ^
                                (obj_request->which == BAD_WHICH));
 
-       reply_head = msg->front.iov_base;
-       obj_request->result = (s32) le32_to_cpu(reply_head->result);
+       if (osd_req->r_result < 0)
+               obj_request->result = osd_req->r_result;
        obj_request->version = le64_to_cpu(osd_req->r_reassert_version.version);
 
-       num_ops = le32_to_cpu(reply_head->num_ops);
-       WARN_ON(num_ops != 1);  /* For now */
+       WARN_ON(osd_req->r_num_ops != 1);       /* For now */
 
        /*
         * We support a 64-bit length, but ultimately it has to be
         * passed to blk_end_request(), which takes an unsigned int.
         */
-       op = &reply_head->ops[0];
-       obj_request->xferred = le64_to_cpu(op->extent.length);
+       obj_request->xferred = osd_req->r_reply_op_len[0];
        rbd_assert(obj_request->xferred < (u64) UINT_MAX);
-
-       opcode = le16_to_cpu(op->op);
+       opcode = osd_req->r_request_ops[0].op;
        switch (opcode) {
        case CEPH_OSD_OP_READ:
                rbd_osd_read_callback(obj_request);
                more = blk_end_request(img_request->rq, result, xferred);
                which++;
        }
+
        rbd_assert(more ^ (which == img_request->obj_request_count));
        img_request->next_completion = which;
 out:
 
 
 struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc,
                                               struct ceph_snap_context *snapc,
-                                              unsigned int num_op,
+                                              unsigned int num_ops,
                                               bool use_mempool,
                                               gfp_t gfp_flags)
 {
        struct ceph_osd_request *req;
        struct ceph_msg *msg;
-       size_t msg_size = sizeof(struct ceph_osd_request_head);
-
-       msg_size += num_op*sizeof(struct ceph_osd_op);
+       size_t msg_size;
+
+       msg_size = 4 + 4 + 8 + 8 + 4+8;
+       msg_size += 2 + 4 + 8 + 4 + 4; /* oloc */
+       msg_size += 1 + 8 + 4 + 4;     /* pg_t */
+       msg_size += 4 + MAX_OBJ_NAME_SIZE;
+       msg_size += 2 + num_ops*sizeof(struct ceph_osd_op);
+       msg_size += 8;  /* snapid */
+       msg_size += 8;  /* snap_seq */
+       msg_size += 8 * (snapc ? snapc->num_snaps : 0);  /* snaps */
+       msg_size += 4;
 
        if (use_mempool) {
                req = mempool_alloc(osdc->req_mempool, gfp_flags);
        ceph_pagelist_init(&req->r_trail);
 
        /* create request message; allow space for oid */
-       msg_size += MAX_OBJ_NAME_SIZE;
-       if (snapc)
-               msg_size += sizeof(u64) * snapc->num_snaps;
        if (use_mempool)
                msg = ceph_msgpool_get(&osdc->msgpool_op, 0);
        else
  *
  */
 void ceph_osdc_build_request(struct ceph_osd_request *req,
-                            u64 off, u64 len, unsigned int num_op,
+                            u64 off, u64 len, unsigned int num_ops,
                             struct ceph_osd_req_op *src_ops,
                             struct ceph_snap_context *snapc, u64 snap_id,
                             struct timespec *mtime)
 {
        struct ceph_msg *msg = req->r_request;
-       struct ceph_osd_request_head *head;
        struct ceph_osd_req_op *src_op;
-       struct ceph_osd_op *op;
        void *p;
-       size_t msg_size = sizeof(*head) + num_op*sizeof(*op);
+       size_t msg_size;
        int flags = req->r_flags;
        u64 data_len;
        int i;
 
-       WARN_ON((flags & (CEPH_OSD_FLAG_READ|CEPH_OSD_FLAG_WRITE)) == 0);
-
-       head = msg->front.iov_base;
-       head->snapid = cpu_to_le64(snap_id);
-       op = (void *)(head + 1);
-       p = (void *)(op + num_op);
-
+       req->r_num_ops = num_ops;
+       req->r_snapid = snap_id;
        req->r_snapc = ceph_get_snap_context(snapc);
 
-       head->client_inc = cpu_to_le32(1); /* always, for now. */
-       head->flags = cpu_to_le32(flags);
-       if (flags & CEPH_OSD_FLAG_WRITE)
-               ceph_encode_timespec(&head->mtime, mtime);
-       BUG_ON(num_op > (unsigned int) ((u16) -1));
-       head->num_ops = cpu_to_le16(num_op);
+       /* encode request */
+       msg->hdr.version = cpu_to_le16(4);
 
-       /* fill in oid */
-       head->object_len = cpu_to_le32(req->r_oid_len);
+       p = msg->front.iov_base;
+       ceph_encode_32(&p, 1);   /* client_inc  is always 1 */
+       req->r_request_osdmap_epoch = p;
+       p += 4;
+       req->r_request_flags = p;
+       p += 4;
+       if (req->r_flags & CEPH_OSD_FLAG_WRITE)
+               ceph_encode_timespec(p, mtime);
+       p += sizeof(struct ceph_timespec);
+       req->r_request_reassert_version = p;
+       p += sizeof(struct ceph_eversion); /* will get filled in */
+
+       /* oloc */
+       ceph_encode_8(&p, 4);
+       ceph_encode_8(&p, 4);
+       ceph_encode_32(&p, 8 + 4 + 4);
+       req->r_request_pool = p;
+       p += 8;
+       ceph_encode_32(&p, -1);  /* preferred */
+       ceph_encode_32(&p, 0);   /* key len */
+
+       ceph_encode_8(&p, 1);
+       req->r_request_pgid = p;
+       p += 8 + 4;
+       ceph_encode_32(&p, -1);  /* preferred */
+
+       /* oid */
+       ceph_encode_32(&p, req->r_oid_len);
        memcpy(p, req->r_oid, req->r_oid_len);
+       dout("oid '%.*s' len %d\n", req->r_oid_len, req->r_oid, req->r_oid_len);
        p += req->r_oid_len;
 
+       /* ops */
+       ceph_encode_16(&p, num_ops);
        src_op = src_ops;
-       while (num_op--)
-               osd_req_encode_op(req, op++, src_op++);
+       req->r_request_ops = p;
+       for (i = 0; i < num_ops; i++, src_op++) {
+               osd_req_encode_op(req, p, src_op);
+               p += sizeof(struct ceph_osd_op);
+       }
 
-       if (snapc) {
-               head->snap_seq = cpu_to_le64(snapc->seq);
-               head->num_snaps = cpu_to_le32(snapc->num_snaps);
+       /* snaps */
+       ceph_encode_64(&p, req->r_snapid);
+       ceph_encode_64(&p, req->r_snapc ? req->r_snapc->seq : 0);
+       ceph_encode_32(&p, req->r_snapc ? req->r_snapc->num_snaps : 0);
+       if (req->r_snapc) {
                for (i = 0; i < snapc->num_snaps; i++) {
-                       put_unaligned_le64(snapc->snaps[i], p);
-                       p += sizeof(u64);
+                       ceph_encode_64(&p, req->r_snapc->snaps[i]);
                }
        }
 
+       req->r_request_attempts = p;
+       p += 4;
+
        data_len = req->r_trail.length;
        if (flags & CEPH_OSD_FLAG_WRITE) {
                req->r_request->hdr.data_off = cpu_to_le16(off);
        msg_size = p - msg->front.iov_base;
        msg->front.iov_len = msg_size;
        msg->hdr.front_len = cpu_to_le32(msg_size);
+
+       dout("build_request msg_size was %d num_ops %d\n", (int)msg_size,
+            num_ops);
        return;
 }
 EXPORT_SYMBOL(ceph_osdc_build_request);
 static void __send_request(struct ceph_osd_client *osdc,
                           struct ceph_osd_request *req)
 {
-       struct ceph_osd_request_head *reqhead;
-
-       dout("send_request %p tid %llu to osd%d flags %d\n",
-            req, req->r_tid, req->r_osd->o_osd, req->r_flags);
-
-       reqhead = req->r_request->front.iov_base;
-       reqhead->snapid = cpu_to_le64(req->r_snapid);
-       reqhead->osdmap_epoch = cpu_to_le32(osdc->osdmap->epoch);
-       reqhead->flags |= cpu_to_le32(req->r_flags);  /* e.g., RETRY */
-       reqhead->reassert_version = req->r_reassert_version;
+       void *p;
 
-       reqhead->layout.ol_pgid.ps = cpu_to_le16(req->r_pgid.seed);
-       reqhead->layout.ol_pgid.pool = cpu_to_le32(req->r_pgid.pool);
-       reqhead->layout.ol_pgid.preferred = cpu_to_le16(-1);
-       reqhead->layout.ol_stripe_unit = 0;
+       dout("send_request %p tid %llu to osd%d flags %d pg %lld.%x\n",
+            req, req->r_tid, req->r_osd->o_osd, req->r_flags,
+            (unsigned long long)req->r_pgid.pool, req->r_pgid.seed);
+
+       /* fill in message content that changes each time we send it */
+       put_unaligned_le32(osdc->osdmap->epoch, req->r_request_osdmap_epoch);
+       put_unaligned_le32(req->r_flags, req->r_request_flags);
+       put_unaligned_le64(req->r_pgid.pool, req->r_request_pool);
+       p = req->r_request_pgid;
+       ceph_encode_64(&p, req->r_pgid.pool);
+       ceph_encode_32(&p, req->r_pgid.seed);
+       put_unaligned_le64(1, req->r_request_attempts);  /* FIXME */
+       memcpy(req->r_request_reassert_version, &req->r_reassert_version,
+              sizeof(req->r_reassert_version));
 
        req->r_stamp = jiffies;
        list_move_tail(&req->r_req_lru_item, &osdc->req_lru);
        complete_all(&req->r_safe_completion);  /* fsync waiter */
 }
 
+static int __decode_pgid(void **p, void *end, struct ceph_pg *pgid)
+{
+       __u8 v;
+
+       ceph_decode_need(p, end, 1 + 8 + 4 + 4, bad);
+       v = ceph_decode_8(p);
+       if (v > 1) {
+               pr_warning("do not understand pg encoding %d > 1", v);
+               return -EINVAL;
+       }
+       pgid->pool = ceph_decode_64(p);
+       pgid->seed = ceph_decode_32(p);
+       *p += 4;
+       return 0;
+
+bad:
+       pr_warning("incomplete pg encoding");
+       return -EINVAL;
+}
+
 /*
  * handle osd op reply.  either call the callback if it is specified,
  * or do the completion to wake up the waiting thread.
 static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg,
                         struct ceph_connection *con)
 {
-       struct ceph_osd_reply_head *rhead = msg->front.iov_base;
+       void *p, *end;
        struct ceph_osd_request *req;
        u64 tid;
-       int numops, object_len, flags;
+       int object_len;
+       int numops, payload_len, flags;
        s32 result;
+       s32 retry_attempt;
+       struct ceph_pg pg;
+       int err;
+       u32 reassert_epoch;
+       u64 reassert_version;
+       u32 osdmap_epoch;
+       int i;
 
        tid = le64_to_cpu(msg->hdr.tid);
-       if (msg->front.iov_len < sizeof(*rhead))
-               goto bad;
-       numops = le32_to_cpu(rhead->num_ops);
-       object_len = le32_to_cpu(rhead->object_len);
-       result = le32_to_cpu(rhead->result);
-       if (msg->front.iov_len != sizeof(*rhead) + object_len +
-           numops * sizeof(struct ceph_osd_op))
+       dout("handle_reply %p tid %llu\n", msg, tid);
+
+       p = msg->front.iov_base;
+       end = p + msg->front.iov_len;
+
+       ceph_decode_need(&p, end, 4, bad);
+       object_len = ceph_decode_32(&p);
+       ceph_decode_need(&p, end, object_len, bad);
+       p += object_len;
+
+       err = __decode_pgid(&p, end, &pg);
+       if (err)
                goto bad;
-       dout("handle_reply %p tid %llu result %d\n", msg, tid, (int)result);
+
+       ceph_decode_need(&p, end, 8 + 4 + 4 + 8 + 4, bad);
+       flags = ceph_decode_64(&p);
+       result = ceph_decode_32(&p);
+       reassert_epoch = ceph_decode_32(&p);
+       reassert_version = ceph_decode_64(&p);
+       osdmap_epoch = ceph_decode_32(&p);
+
        /* lookup */
        mutex_lock(&osdc->request_mutex);
        req = __lookup_request(osdc, tid);
                return;
        }
        ceph_osdc_get_request(req);
-       flags = le32_to_cpu(rhead->flags);
+
+       dout("handle_reply %p tid %llu req %p result %d\n", msg, tid,
+            req, result);
+
+       ceph_decode_need(&p, end, 4, bad);
+       numops = ceph_decode_32(&p);
+       if (numops > CEPH_OSD_MAX_OP)
+               goto bad_put;
+       if (numops != req->r_num_ops)
+               goto bad_put;
+       payload_len = 0;
+       ceph_decode_need(&p, end, numops * sizeof(struct ceph_osd_op), bad);
+       for (i = 0; i < numops; i++) {
+               struct ceph_osd_op *op = p;
+               int len;
+
+               len = le32_to_cpu(op->payload_len);
+               req->r_reply_op_len[i] = len;
+               dout(" op %d has %d bytes\n", i, len);
+               payload_len += len;
+               p += sizeof(*op);
+       }
+       if (payload_len != le32_to_cpu(msg->hdr.data_len)) {
+               pr_warning("sum of op payload lens %d != data_len %d",
+                          payload_len, le32_to_cpu(msg->hdr.data_len));
+               goto bad_put;
+       }
+
+       ceph_decode_need(&p, end, 4 + numops * 4, bad);
+       retry_attempt = ceph_decode_32(&p);
+       for (i = 0; i < numops; i++)
+               req->r_reply_op_result[i] = ceph_decode_32(&p);
 
        /*
         * if this connection filled our message, drop our reference now, to
        if (!req->r_got_reply) {
                unsigned int bytes;
 
-               req->r_result = le32_to_cpu(rhead->result);
+               req->r_result = result;
                bytes = le32_to_cpu(msg->hdr.data_len);
                dout("handle_reply result %d bytes %d\n", req->r_result,
                     bytes);
                        req->r_result = bytes;
 
                /* in case this is a write and we need to replay, */
-               req->r_reassert_version = rhead->reassert_version;
+               req->r_reassert_version.epoch = cpu_to_le32(reassert_epoch);
+               req->r_reassert_version.version = cpu_to_le64(reassert_version);
 
                req->r_got_reply = 1;
        } else if ((flags & CEPH_OSD_FLAG_ONDISK) == 0) {
        ceph_osdc_put_request(req);
        return;
 
+bad_put:
+       ceph_osdc_put_request(req);
 bad:
-       pr_err("corrupt osd_op_reply got %d %d expected %d\n",
-              (int)msg->front.iov_len, le32_to_cpu(msg->hdr.front_len),
-              (int)sizeof(*rhead));
+       pr_err("corrupt osd_op_reply got %d %d\n",
+              (int)msg->front.iov_len, le32_to_cpu(msg->hdr.front_len));
        ceph_msg_dump(msg);
 }