static void __unregister_linger_request(struct ceph_osd_client *osdc,
                                        struct ceph_osd_request *req);
 static void __enqueue_request(struct ceph_osd_request *req);
-static void __send_request(struct ceph_osd_client *osdc,
-                          struct ceph_osd_request *req);
 
 /*
  * Implement client access to distributed object storage cluster.
 
        osd_data = osd_req_op_data(osd_req, which, cls, request_data);
        ceph_osd_data_pagelist_init(osd_data, pagelist);
+       osd_req->r_ops[which].cls.indata_len += pagelist->length;
+       osd_req->r_ops[which].indata_len += pagelist->length;
 }
 EXPORT_SYMBOL(osd_req_op_cls_request_data_pagelist);
 
        osd_data = osd_req_op_data(osd_req, which, cls, request_data);
        ceph_osd_data_pages_init(osd_data, pages, length, alignment,
                                pages_from_pool, own_pages);
+       osd_req->r_ops[which].cls.indata_len += length;
+       osd_req->r_ops[which].indata_len += length;
 }
 EXPORT_SYMBOL(osd_req_op_cls_request_data_pages);
 
 
        osd_req_op_cls_request_info_pagelist(osd_req, which, pagelist);
 
-       op->cls.argc = 0;       /* currently unused */
-
        op->indata_len = payload_len;
 }
 EXPORT_SYMBOL(osd_req_op_cls_init);
        }
 }
 
-static u64 osd_req_encode_op(struct ceph_osd_request *req,
-                             struct ceph_osd_op *dst, unsigned int which)
+static u32 osd_req_encode_op(struct ceph_osd_op *dst,
+                            const struct ceph_osd_req_op *src)
 {
-       struct ceph_osd_req_op *src;
-       struct ceph_osd_data *osd_data;
-       u64 request_data_len = 0;
-       u64 data_length;
-
-       BUG_ON(which >= req->r_num_ops);
-       src = &req->r_ops[which];
        if (WARN_ON(!osd_req_opcode_valid(src->op))) {
                pr_err("unrecognized osd opcode %d\n", src->op);
 
 
        switch (src->op) {
        case CEPH_OSD_OP_STAT:
-               osd_data = &src->raw_data_in;
-               ceph_osdc_msg_data_add(req->r_reply, osd_data);
                break;
        case CEPH_OSD_OP_READ:
        case CEPH_OSD_OP_WRITE:
        case CEPH_OSD_OP_WRITEFULL:
        case CEPH_OSD_OP_ZERO:
        case CEPH_OSD_OP_TRUNCATE:
-               if (src->op == CEPH_OSD_OP_WRITE ||
-                   src->op == CEPH_OSD_OP_WRITEFULL)
-                       request_data_len = src->extent.length;
                dst->extent.offset = cpu_to_le64(src->extent.offset);
                dst->extent.length = cpu_to_le64(src->extent.length);
                dst->extent.truncate_size =
                        cpu_to_le64(src->extent.truncate_size);
                dst->extent.truncate_seq =
                        cpu_to_le32(src->extent.truncate_seq);
-               osd_data = &src->extent.osd_data;
-               if (src->op == CEPH_OSD_OP_WRITE ||
-                   src->op == CEPH_OSD_OP_WRITEFULL)
-                       ceph_osdc_msg_data_add(req->r_request, osd_data);
-               else
-                       ceph_osdc_msg_data_add(req->r_reply, osd_data);
                break;
        case CEPH_OSD_OP_CALL:
                dst->cls.class_len = src->cls.class_len;
                dst->cls.method_len = src->cls.method_len;
-               osd_data = &src->cls.request_info;
-               ceph_osdc_msg_data_add(req->r_request, osd_data);
-               BUG_ON(osd_data->type != CEPH_OSD_DATA_TYPE_PAGELIST);
-               request_data_len = osd_data->pagelist->length;
-
-               osd_data = &src->cls.request_data;
-               data_length = ceph_osd_data_length(osd_data);
-               if (data_length) {
-                       BUG_ON(osd_data->type == CEPH_OSD_DATA_TYPE_NONE);
-                       dst->cls.indata_len = cpu_to_le32(data_length);
-                       ceph_osdc_msg_data_add(req->r_request, osd_data);
-                       src->indata_len += data_length;
-                       request_data_len += data_length;
-               }
-               osd_data = &src->cls.response_data;
-               ceph_osdc_msg_data_add(req->r_reply, osd_data);
+               dst->cls.indata_len = cpu_to_le32(src->cls.indata_len);
                break;
        case CEPH_OSD_OP_STARTSYNC:
                break;
                dst->xattr.value_len = cpu_to_le32(src->xattr.value_len);
                dst->xattr.cmp_op = src->xattr.cmp_op;
                dst->xattr.cmp_mode = src->xattr.cmp_mode;
-               osd_data = &src->xattr.osd_data;
-               ceph_osdc_msg_data_add(req->r_request, osd_data);
-               request_data_len = osd_data->pagelist->length;
                break;
        case CEPH_OSD_OP_CREATE:
        case CEPH_OSD_OP_DELETE:
        dst->flags = cpu_to_le32(src->flags);
        dst->payload_len = cpu_to_le32(src->indata_len);
 
-       return request_data_len;
+       return src->indata_len;
 }
 
 /*
                goto fail;
        }
 
-       req->r_flags = flags;
-
        /* calculate max write size */
        r = calc_layout(layout, off, plen, &objnum, &objoff, &objlen);
        if (r)
                                       truncate_size, truncate_seq);
        }
 
+       req->r_flags = flags;
        req->r_base_oloc.pool = ceph_file_layout_pg_pool(*layout);
        ceph_oid_printf(&req->r_base_oid, "%llx.%08llx", vino.ino, objnum);
 
+       req->r_snapid = vino.snap;
+       if (flags & CEPH_OSD_FLAG_WRITE)
+               req->r_data_offset = off;
+
        r = ceph_osdc_alloc_messages(req, GFP_NOFS);
        if (r)
                goto fail;
        return err;
 }
 
-/*
- * caller should hold map_sem (for read) and request_mutex
- */
-static void __send_request(struct ceph_osd_client *osdc,
-                          struct ceph_osd_request *req)
+static void setup_request_data(struct ceph_osd_request *req,
+                              struct ceph_msg *msg)
 {
-       void *p;
+       u32 data_len = 0;
+       int i;
+
+       if (!list_empty(&msg->data))
+               return;
 
-       dout("send_request %p tid %llu to osd%d flags %d pg %lld.%x\n",
-            req, req->r_tid, req->r_osd->o_osd, req->r_flags,
-            req->r_t.pgid.pool, req->r_t.pgid.seed);
+       WARN_ON(msg->data_length);
+       for (i = 0; i < req->r_num_ops; i++) {
+               struct ceph_osd_req_op *op = &req->r_ops[i];
+
+               switch (op->op) {
+               /* request */
+               case CEPH_OSD_OP_WRITE:
+               case CEPH_OSD_OP_WRITEFULL:
+                       WARN_ON(op->indata_len != op->extent.length);
+                       ceph_osdc_msg_data_add(msg, &op->extent.osd_data);
+                       break;
+               case CEPH_OSD_OP_SETXATTR:
+               case CEPH_OSD_OP_CMPXATTR:
+                       WARN_ON(op->indata_len != op->xattr.name_len +
+                                                 op->xattr.value_len);
+                       ceph_osdc_msg_data_add(msg, &op->xattr.osd_data);
+                       break;
+
+               /* reply */
+               case CEPH_OSD_OP_STAT:
+                       ceph_osdc_msg_data_add(req->r_reply,
+                                              &op->raw_data_in);
+                       break;
+               case CEPH_OSD_OP_READ:
+                       ceph_osdc_msg_data_add(req->r_reply,
+                                              &op->extent.osd_data);
+                       break;
+
+               /* both */
+               case CEPH_OSD_OP_CALL:
+                       WARN_ON(op->indata_len != op->cls.class_len +
+                                                 op->cls.method_len +
+                                                 op->cls.indata_len);
+                       ceph_osdc_msg_data_add(msg, &op->cls.request_info);
+                       /* optional, can be NONE */
+                       ceph_osdc_msg_data_add(msg, &op->cls.request_data);
+                       /* optional, can be NONE */
+                       ceph_osdc_msg_data_add(req->r_reply,
+                                              &op->cls.response_data);
+                       break;
+               }
+
+               data_len += op->indata_len;
+       }
 
-       /* fill in message content that changes each time we send it */
-       put_unaligned_le32(osdc->osdmap->epoch, req->r_request_osdmap_epoch);
-       put_unaligned_le32(req->r_flags, req->r_request_flags);
-       put_unaligned_le64(req->r_t.target_oloc.pool, req->r_request_pool);
-       p = req->r_request_pgid;
+       WARN_ON(data_len != msg->data_length);
+}
+
+static void encode_request(struct ceph_osd_request *req, struct ceph_msg *msg)
+{
+       void *p = msg->front.iov_base;
+       void *const end = p + msg->front_alloc_len;
+       u32 data_len = 0;
+       int i;
+
+       if (req->r_flags & CEPH_OSD_FLAG_WRITE) {
+               /* snapshots aren't writeable */
+               WARN_ON(req->r_snapid != CEPH_NOSNAP);
+       } else {
+               WARN_ON(req->r_mtime.tv_sec || req->r_mtime.tv_nsec ||
+                       req->r_data_offset || req->r_snapc);
+       }
+
+       setup_request_data(req, msg);
+
+       ceph_encode_32(&p, 1); /* client_inc, always 1 */
+       ceph_encode_32(&p, req->r_osdc->osdmap->epoch);
+       ceph_encode_32(&p, req->r_flags);
+       ceph_encode_timespec(p, &req->r_mtime);
+       p += sizeof(struct ceph_timespec);
+       /* aka reassert_version */
+       memcpy(p, &req->r_replay_version, sizeof(req->r_replay_version));
+       p += sizeof(req->r_replay_version);
+
+       /* oloc */
+       ceph_encode_8(&p, 4);
+       ceph_encode_8(&p, 4);
+       ceph_encode_32(&p, 8 + 4 + 4);
+       ceph_encode_64(&p, req->r_t.target_oloc.pool);
+       ceph_encode_32(&p, -1); /* preferred */
+       ceph_encode_32(&p, 0); /* key len */
+
+       /* pgid */
+       ceph_encode_8(&p, 1);
        ceph_encode_64(&p, req->r_t.pgid.pool);
        ceph_encode_32(&p, req->r_t.pgid.seed);
-       put_unaligned_le64(1, req->r_request_attempts);  /* FIXME */
-       memcpy(req->r_request_reassert_version, &req->r_reassert_version,
-              sizeof(req->r_reassert_version));
+       ceph_encode_32(&p, -1); /* preferred */
 
-       req->r_stamp = jiffies;
-       list_move_tail(&req->r_req_lru_item, &osdc->req_lru);
+       /* oid */
+       ceph_encode_32(&p, req->r_t.target_oid.name_len);
+       memcpy(p, req->r_t.target_oid.name, req->r_t.target_oid.name_len);
+       p += req->r_t.target_oid.name_len;
 
-       ceph_msg_get(req->r_request); /* send consumes a ref */
+       /* ops, can imply data */
+       ceph_encode_16(&p, req->r_num_ops);
+       for (i = 0; i < req->r_num_ops; i++) {
+               data_len += osd_req_encode_op(p, &req->r_ops[i]);
+               p += sizeof(struct ceph_osd_op);
+       }
 
-       req->r_sent = req->r_osd->o_incarnation;
+       ceph_encode_64(&p, req->r_snapid); /* snapid */
+       if (req->r_snapc) {
+               ceph_encode_64(&p, req->r_snapc->seq);
+               ceph_encode_32(&p, req->r_snapc->num_snaps);
+               for (i = 0; i < req->r_snapc->num_snaps; i++)
+                       ceph_encode_64(&p, req->r_snapc->snaps[i]);
+       } else {
+               ceph_encode_64(&p, 0); /* snap_seq */
+               ceph_encode_32(&p, 0); /* snaps len */
+       }
+
+       ceph_encode_32(&p, req->r_attempts); /* retry_attempt */
+
+       BUG_ON(p > end);
+       msg->front.iov_len = p - msg->front.iov_base;
+       msg->hdr.version = cpu_to_le16(4); /* MOSDOp v4 */
+       msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
+       msg->hdr.data_len = cpu_to_le32(data_len);
+       /*
+        * The header "data_off" is a hint to the receiver allowing it
+        * to align received data into its buffers such that there's no
+        * need to re-copy it before writing it to disk (direct I/O).
+        */
+       msg->hdr.data_off = cpu_to_le16(req->r_data_offset);
 
-       ceph_con_send(&req->r_osd->o_con, req->r_request);
+       dout("%s req %p oid %*pE oid_len %d front %zu data %u\n", __func__,
+            req, req->r_t.target_oid.name_len, req->r_t.target_oid.name,
+            req->r_t.target_oid.name_len, msg->front.iov_len, data_len);
+}
+
+/*
+ * @req has to be assigned a tid and registered.
+ */
+static void send_request(struct ceph_osd_request *req)
+{
+       struct ceph_osd *osd = req->r_osd;
+
+       WARN_ON(osd->o_osd != req->r_t.osd);
+
+       req->r_flags |= CEPH_OSD_FLAG_KNOWN_REDIR;
+       if (req->r_attempts)
+               req->r_flags |= CEPH_OSD_FLAG_RETRY;
+       else
+               WARN_ON(req->r_flags & CEPH_OSD_FLAG_RETRY);
+
+       encode_request(req, req->r_request);
+
+       dout("%s req %p tid %llu to pg %llu.%x osd%d flags 0x%x attempt %d\n",
+            __func__, req, req->r_tid, req->r_t.pgid.pool, req->r_t.pgid.seed,
+            req->r_t.osd, req->r_flags, req->r_attempts);
+
+       req->r_t.paused = false;
+       req->r_stamp = jiffies;
+       req->r_attempts++;
+
+       req->r_sent = osd->o_incarnation;
+       req->r_request->hdr.tid = cpu_to_le64(req->r_tid);
+       ceph_con_send(&osd->o_con, ceph_msg_get(req->r_request));
 }
 
 /*
        struct ceph_osd_request *req, *tmp;
 
        dout("__send_queued\n");
-       list_for_each_entry_safe(req, tmp, &osdc->req_unsent, r_req_lru_item)
-               __send_request(osdc, req);
+       list_for_each_entry_safe(req, tmp, &osdc->req_unsent, r_req_lru_item) {
+               list_move_tail(&req->r_req_lru_item, &osdc->req_lru);
+               send_request(req);
+       }
 }
 
 /*
                        req->r_result = bytes;
 
                /* in case this is a write and we need to replay, */
-               req->r_reassert_version.epoch = cpu_to_le32(reassert_epoch);
-               req->r_reassert_version.version = cpu_to_le64(reassert_version);
+               req->r_replay_version.epoch = cpu_to_le32(reassert_epoch);
+               req->r_replay_version.version = cpu_to_le64(reassert_version);
 
                req->r_got_reply = 1;
        } else if ((flags & CEPH_OSD_FLAG_ONDISK) == 0) {
        pr_err("osdc handle_watch_notify corrupt msg\n");
 }
 
-/*
- * build new request AND message
- *
- */
-void ceph_osdc_build_request(struct ceph_osd_request *req, u64 off,
-                               struct ceph_snap_context *snapc, u64 snap_id,
-                               struct timespec *mtime)
-{
-       struct ceph_msg *msg = req->r_request;
-       void *p;
-       size_t msg_size;
-       int flags = req->r_flags;
-       u64 data_len;
-       unsigned int i;
-
-       req->r_snapid = snap_id;
-       WARN_ON(snapc != req->r_snapc);
-
-       /* encode request */
-       msg->hdr.version = cpu_to_le16(4);
-
-       p = msg->front.iov_base;
-       ceph_encode_32(&p, 1);   /* client_inc  is always 1 */
-       req->r_request_osdmap_epoch = p;
-       p += 4;
-       req->r_request_flags = p;
-       p += 4;
-       if (req->r_flags & CEPH_OSD_FLAG_WRITE)
-               ceph_encode_timespec(p, mtime);
-       p += sizeof(struct ceph_timespec);
-       req->r_request_reassert_version = p;
-       p += sizeof(struct ceph_eversion); /* will get filled in */
-
-       /* oloc */
-       ceph_encode_8(&p, 4);
-       ceph_encode_8(&p, 4);
-       ceph_encode_32(&p, 8 + 4 + 4);
-       req->r_request_pool = p;
-       p += 8;
-       ceph_encode_32(&p, -1);  /* preferred */
-       ceph_encode_32(&p, 0);   /* key len */
-
-       ceph_encode_8(&p, 1);
-       req->r_request_pgid = p;
-       p += 8 + 4;
-       ceph_encode_32(&p, -1);  /* preferred */
-
-       /* oid */
-       ceph_encode_32(&p, req->r_base_oid.name_len);
-       memcpy(p, req->r_base_oid.name, req->r_base_oid.name_len);
-       dout("oid %*pE len %d\n", req->r_base_oid.name_len,
-            req->r_base_oid.name, req->r_base_oid.name_len);
-       p += req->r_base_oid.name_len;
-
-       /* ops--can imply data */
-       ceph_encode_16(&p, (u16)req->r_num_ops);
-       data_len = 0;
-       for (i = 0; i < req->r_num_ops; i++) {
-               data_len += osd_req_encode_op(req, p, i);
-               p += sizeof(struct ceph_osd_op);
-       }
-
-       /* snaps */
-       ceph_encode_64(&p, req->r_snapid);
-       ceph_encode_64(&p, req->r_snapc ? req->r_snapc->seq : 0);
-       ceph_encode_32(&p, req->r_snapc ? req->r_snapc->num_snaps : 0);
-       if (req->r_snapc) {
-               for (i = 0; i < req->r_snapc->num_snaps; i++) {
-                       ceph_encode_64(&p, req->r_snapc->snaps[i]);
-               }
-       }
-
-       req->r_request_attempts = p;
-       p += 4;
-
-       /* data */
-       if (flags & CEPH_OSD_FLAG_WRITE) {
-               u16 data_off;
-
-               /*
-                * The header "data_off" is a hint to the receiver
-                * allowing it to align received data into its
-                * buffers such that there's no need to re-copy
-                * it before writing it to disk (direct I/O).
-                */
-               data_off = (u16) (off & 0xffff);
-               req->r_request->hdr.data_off = cpu_to_le16(data_off);
-       }
-       req->r_request->hdr.data_len = cpu_to_le32(data_len);
-
-       BUG_ON(p > msg->front.iov_base + msg->front.iov_len);
-       msg_size = p - msg->front.iov_base;
-       msg->front.iov_len = msg_size;
-       msg->hdr.front_len = cpu_to_le32(msg_size);
-
-       dout("build_request msg_size was %d\n", (int)msg_size);
-}
-EXPORT_SYMBOL(ceph_osdc_build_request);
-
 /*
  * Register request, send initial attempt.
  */
                return PTR_ERR(req);
 
        /* it may be a short read due to an object boundary */
-
        osd_req_op_extent_osd_data_pages(req, 0,
                                pages, *plen, page_align, false, false);
 
        dout("readpages  final extent is %llu~%llu (%llu bytes align %d)\n",
             off, *plen, *plen, page_align);
 
-       ceph_osdc_build_request(req, off, NULL, vino.snap, NULL);
-
        rc = ceph_osdc_start_request(osdc, req, false);
        if (!rc)
                rc = ceph_osdc_wait_request(osdc, req);
        int rc = 0;
        int page_align = off & ~PAGE_MASK;
 
-       BUG_ON(vino.snap != CEPH_NOSNAP);       /* snapshots aren't writeable */
        req = ceph_osdc_new_request(osdc, layout, vino, off, &len, 0, 1,
                                    CEPH_OSD_OP_WRITE,
                                    CEPH_OSD_FLAG_ONDISK | CEPH_OSD_FLAG_WRITE,
                                false, false);
        dout("writepages %llu~%llu (%llu bytes)\n", off, len, len);
 
-       ceph_osdc_build_request(req, off, snapc, CEPH_NOSNAP, mtime);
-
+       req->r_mtime = *mtime;
        rc = ceph_osdc_start_request(osdc, req, true);
        if (!rc)
                rc = ceph_osdc_wait_request(osdc, req);