#include <linux/ceph/pagelist.h>
 #include <linux/export.h>
 
+#define list_entry_next(pos, member)                                   \
+       list_entry(pos->member.next, typeof(*pos), member)
+
 /*
  * Ceph uses the messenger to exchange ceph_msg messages with other
  * hosts in the system.  The messenger provides ordered and reliable
 }
 #endif
 
+/*
+ * Message data is handled (sent or received) in pieces, where each
+ * piece resides on a single page.  The network layer might not
+ * consume an entire piece at once.  A data item's cursor keeps
+ * track of which piece is next to process and how much remains to
+ * be processed in that piece.  It also tracks whether the current
+ * piece is the last one in the data item.
+ */
+static void ceph_msg_data_cursor_init(struct ceph_msg_data *data)
+{
+       struct ceph_msg_data_cursor *cursor = &data->cursor;
+       struct ceph_pagelist *pagelist;
+       struct page *page;
+
+       if (data->type != CEPH_MSG_DATA_PAGELIST)
+               return;
+
+       pagelist = data->pagelist;
+       BUG_ON(!pagelist);
+       if (!pagelist->length)
+               return;         /* pagelist can be assigned but empty */
+
+       BUG_ON(list_empty(&pagelist->head));
+       page = list_first_entry(&pagelist->head, struct page, lru);
+
+       cursor->page = page;
+       cursor->offset = 0;
+       cursor->last_piece = pagelist->length <= PAGE_SIZE;
+}
+
+/*
+ * Return the page containing the next piece to process for a given
+ * data item, and supply the page offset and length of that piece.
+ * Indicate whether this is the last piece in this data item.
+ */
+static struct page *ceph_msg_data_next(struct ceph_msg_data *data,
+                                               size_t *page_offset,
+                                               size_t *length,
+                                               bool *last_piece)
+{
+       struct ceph_msg_data_cursor *cursor = &data->cursor;
+       struct ceph_pagelist *pagelist;
+       size_t piece_end;
+
+       BUG_ON(data->type != CEPH_MSG_DATA_PAGELIST);
+
+       pagelist = data->pagelist;
+       BUG_ON(!pagelist);
+
+       BUG_ON(!cursor->page);
+       BUG_ON(cursor->offset >= pagelist->length);
+
+       *last_piece = cursor->last_piece;
+       if (*last_piece) {
+               /* pagelist offset is always 0 */
+               piece_end = pagelist->length & ~PAGE_MASK;
+               if (!piece_end)
+                       piece_end = PAGE_SIZE;
+       } else {
+               piece_end = PAGE_SIZE;
+       }
+       *page_offset = cursor->offset & ~PAGE_MASK;
+       *length = piece_end - *page_offset;
+
+       return data->cursor.page;
+}
+
+/*
+ * Returns true if the result moves the cursor on to the next piece
+ * (the next page) of the pagelist.
+ */
+static bool ceph_msg_data_advance(struct ceph_msg_data *data, size_t bytes)
+{
+       struct ceph_msg_data_cursor *cursor = &data->cursor;
+       struct ceph_pagelist *pagelist;
+
+       BUG_ON(data->type != CEPH_MSG_DATA_PAGELIST);
+
+       pagelist = data->pagelist;
+       BUG_ON(!pagelist);
+       BUG_ON(!cursor->page);
+       BUG_ON(cursor->offset + bytes > pagelist->length);
+       BUG_ON((cursor->offset & ~PAGE_MASK) + bytes > PAGE_SIZE);
+
+       /* Advance the cursor offset */
+
+       cursor->offset += bytes;
+       /* pagelist offset is always 0 */
+       if (!bytes || cursor->offset & ~PAGE_MASK)
+               return false;   /* more bytes to process in the current page */
+
+       /* Move on to the next page */
+
+       BUG_ON(list_is_last(&cursor->page->lru, &pagelist->head));
+       cursor->page = list_entry_next(cursor->page, lru);
+
+       /* cursor offset is at page boundary; pagelist offset is always 0 */
+       if (pagelist->length - cursor->offset <= PAGE_SIZE)
+               cursor->last_piece = true;
+
+       return true;
+}
+
 static void prepare_message_data(struct ceph_msg *msg,
                                struct ceph_msg_pos *msg_pos)
 {
                init_bio_iter(msg->b.bio, &msg->b.bio_iter, &msg->b.bio_seg);
 #endif
        msg_pos->data_pos = 0;
+
+       /* If there's a trail, initialize its cursor */
+
+       if (ceph_msg_has_trail(msg))
+               ceph_msg_data_cursor_init(&msg->t);
+
        msg_pos->did_page_crc = false;
 }
 
 
        msg_pos->data_pos += sent;
        msg_pos->page_pos += sent;
+       if (in_trail) {
+               bool need_crc;
+
+               need_crc = ceph_msg_data_advance(&msg->t, sent);
+               BUG_ON(need_crc && sent != len);
+       }
        if (sent < len)
                return;
 
        msg_pos->page_pos = 0;
        msg_pos->page++;
        msg_pos->did_page_crc = false;
-       if (in_trail) {
-               BUG_ON(!ceph_msg_has_trail(msg));
-               list_rotate_left(&msg->t.pagelist->head);
-       } else if (ceph_msg_has_pagelist(msg)) {
+       if (ceph_msg_has_pagelist(msg)) {
                list_rotate_left(&msg->l.pagelist->head);
 #ifdef CONFIG_BLOCK
        } else if (ceph_msg_has_bio(msg)) {
                size_t length;
                int max_write = PAGE_SIZE;
                int bio_offset = 0;
+               bool use_cursor = false;
+               bool last_piece = true; /* preserve existing behavior */
 
                in_trail = in_trail || msg_pos->data_pos >= trail_off;
                if (!in_trail)
 
                if (in_trail) {
                        BUG_ON(!ceph_msg_has_trail(msg));
-                       total_max_write = data_len - msg_pos->data_pos;
-                       page = list_first_entry(&msg->t.pagelist->head,
-                                               struct page, lru);
+                       use_cursor = true;
+                       page = ceph_msg_data_next(&msg->t, &page_offset,
+                                                       &length, &last_piece);
                } else if (ceph_msg_has_pages(msg)) {
                        page = msg->p.pages[msg_pos->page];
                } else if (ceph_msg_has_pagelist(msg)) {
                } else {
                        page = zero_page;
                }
-               length = min_t(int, max_write - msg_pos->page_pos,
-                           total_max_write);
+               if (!use_cursor)
+                       length = min_t(int, max_write - msg_pos->page_pos,
+                                           total_max_write);
 
                page_offset = msg_pos->page_pos + bio_offset;
                if (do_datacrc && !msg_pos->did_page_crc) {
                        msg_pos->did_page_crc = true;
                }
                ret = ceph_tcp_sendpage(con->sock, page, page_offset,
-                                     length, true);
+                                     length, last_piece);
                if (ret <= 0)
                        goto out;