return mpext && mpext->data_seq + mpext->data_len == msk->write_seq;
 }
 
+static bool mptcp_frag_can_collapse_to(const struct mptcp_sock *msk,
+                                      const struct page_frag *pfrag,
+                                      const struct mptcp_data_frag *df)
+{
+       return df && pfrag->page == df->page &&
+               df->data_seq + df->data_len == msk->write_seq;
+}
+
+static void dfrag_clear(struct mptcp_data_frag *dfrag)
+{
+       list_del(&dfrag->list);
+       put_page(dfrag->page);
+}
+
+static void mptcp_clean_una(struct sock *sk)
+{
+       struct mptcp_sock *msk = mptcp_sk(sk);
+       struct mptcp_data_frag *dtmp, *dfrag;
+       u64 snd_una = atomic64_read(&msk->snd_una);
+
+       list_for_each_entry_safe(dfrag, dtmp, &msk->rtx_queue, list) {
+               if (after64(dfrag->data_seq + dfrag->data_len, snd_una))
+                       break;
+
+               dfrag_clear(dfrag);
+       }
+}
+
+/* ensure we get enough memory for the frag hdr, beyond some minimal amount of
+ * data
+ */
+static bool mptcp_page_frag_refill(struct sock *sk, struct page_frag *pfrag)
+{
+       if (likely(skb_page_frag_refill(32U + sizeof(struct mptcp_data_frag),
+                                       pfrag, sk->sk_allocation)))
+               return true;
+
+       sk->sk_prot->enter_memory_pressure(sk);
+       sk_stream_moderate_sndbuf(sk);
+       return false;
+}
+
+static struct mptcp_data_frag *
+mptcp_carve_data_frag(const struct mptcp_sock *msk, struct page_frag *pfrag,
+                     int orig_offset)
+{
+       int offset = ALIGN(orig_offset, sizeof(long));
+       struct mptcp_data_frag *dfrag;
+
+       dfrag = (struct mptcp_data_frag *)(page_to_virt(pfrag->page) + offset);
+       dfrag->data_len = 0;
+       dfrag->data_seq = msk->write_seq;
+       dfrag->overhead = offset - orig_offset + sizeof(struct mptcp_data_frag);
+       dfrag->offset = offset + sizeof(struct mptcp_data_frag);
+       dfrag->page = pfrag->page;
+
+       return dfrag;
+}
+
 static int mptcp_sendmsg_frag(struct sock *sk, struct sock *ssk,
                              struct msghdr *msg, long *timeo, int *pmss_now,
                              int *ps_goal)
 {
-       int mss_now, avail_size, size_goal, ret;
+       int mss_now, avail_size, size_goal, offset, ret, frag_truesize = 0;
+       bool dfrag_collapsed, can_collapse = false;
        struct mptcp_sock *msk = mptcp_sk(sk);
        struct mptcp_ext *mpext = NULL;
+       struct mptcp_data_frag *dfrag;
        struct sk_buff *skb, *tail;
-       bool can_collapse = false;
        struct page_frag *pfrag;
        size_t psize;
 
         * from one substream to another, but do per subflow memory accounting
         */
        pfrag = sk_page_frag(sk);
-       while (!sk_page_frag_refill(ssk, pfrag) ||
+       while (!mptcp_page_frag_refill(ssk, pfrag) ||
               !mptcp_ext_cache_refill(msk)) {
                ret = sk_stream_wait_memory(ssk, timeo);
                if (ret)
                        return ret;
+
+               /* if sk_stream_wait_memory() sleeps snd_una can change
+                * significantly, refresh the rtx queue
+                */
+               mptcp_clean_una(sk);
+
                if (unlikely(__mptcp_needs_tcp_fallback(msk)))
                        return 0;
        }
                else
                        avail_size = size_goal - skb->len;
        }
-       psize = min_t(size_t, pfrag->size - pfrag->offset, avail_size);
+
+       /* reuse tail pfrag, if possible, or carve a new one from the page
+        * allocator
+        */
+       dfrag = mptcp_rtx_tail(sk);
+       offset = pfrag->offset;
+       dfrag_collapsed = mptcp_frag_can_collapse_to(msk, pfrag, dfrag);
+       if (!dfrag_collapsed) {
+               dfrag = mptcp_carve_data_frag(msk, pfrag, offset);
+               offset = dfrag->offset;
+               frag_truesize = dfrag->overhead;
+       }
+       psize = min_t(size_t, pfrag->size - offset, avail_size);
 
        /* Copy to page */
        pr_debug("left=%zu", msg_data_left(msg));
-       psize = copy_page_from_iter(pfrag->page, pfrag->offset,
+       psize = copy_page_from_iter(pfrag->page, offset,
                                    min_t(size_t, msg_data_left(msg), psize),
                                    &msg->msg_iter);
        pr_debug("left=%zu", msg_data_left(msg));
        /* tell the TCP stack to delay the push so that we can safely
         * access the skb after the sendpages call
         */
-       ret = do_tcp_sendpages(ssk, pfrag->page, pfrag->offset, psize,
+       ret = do_tcp_sendpages(ssk, pfrag->page, offset, psize,
                               msg->msg_flags | MSG_SENDPAGE_NOTLAST);
        if (ret <= 0)
                return ret;
+
+       frag_truesize += ret;
        if (unlikely(ret < psize))
                iov_iter_revert(&msg->msg_iter, psize - ret);
 
+       /* send successful, keep track of sent data for mptcp-level
+        * retransmission
+        */
+       dfrag->data_len += ret;
+       if (!dfrag_collapsed) {
+               get_page(dfrag->page);
+               list_add_tail(&dfrag->list, &msk->rtx_queue);
+       }
+
        /* if the tail skb extension is still the cached one, collapsing
         * really happened. Note: we can't check for 'same skb' as the sk_buff
         * hdr on tail can be transmitted, freed and re-allocated by the
                 mpext->dsn64);
 
 out:
-       pfrag->offset += ret;
+       pfrag->offset += frag_truesize;
        msk->write_seq += ret;
        mptcp_subflow_ctx(ssk)->rel_write_seq += ret;
 
                return ret >= 0 ? ret + copied : (copied ? copied : ret);
        }
 
+       mptcp_clean_una(sk);
+
        __mptcp_flush_join_list(msk);
        ssk = mptcp_subflow_get_send(msk);
        while (!sk_stream_memory_free(sk) || !ssk) {
                if (ret)
                        goto out;
 
+               mptcp_clean_una(sk);
+
                ssk = mptcp_subflow_get_send(msk);
                if (list_empty(&msk->conn_list)) {
                        ret = -ENOTCONN;
 
        INIT_LIST_HEAD(&msk->conn_list);
        INIT_LIST_HEAD(&msk->join_list);
+       INIT_LIST_HEAD(&msk->rtx_queue);
        __set_bit(MPTCP_SEND_SPACE, &msk->flags);
        INIT_WORK(&msk->work, mptcp_worker);
 
 
 static int mptcp_init_sock(struct sock *sk)
 {
+       int ret = __mptcp_init_sock(sk);
+
+       if (ret)
+               return ret;
+
        if (!mptcp_is_enabled(sock_net(sk)))
                return -ENOPROTOOPT;
 
-       return __mptcp_init_sock(sk);
+       return 0;
+}
+
+static void __mptcp_clear_xmit(struct sock *sk)
+{
+       struct mptcp_sock *msk = mptcp_sk(sk);
+       struct mptcp_data_frag *dtmp, *dfrag;
+
+       list_for_each_entry_safe(dfrag, dtmp, &msk->rtx_queue, list)
+               dfrag_clear(dfrag);
 }
 
 static void mptcp_cancel_work(struct sock *sk)
 
        data_fin_tx_seq = msk->write_seq;
 
+       __mptcp_clear_xmit(sk);
+
        release_sock(sk);
 
        list_for_each_entry_safe(subflow, tmp, &conn_list, node) {
        inet_sk(msk)->inet_rcv_saddr = inet_sk(ssk)->inet_rcv_saddr;
 }
 
+static int mptcp_disconnect(struct sock *sk, int flags)
+{
+       lock_sock(sk);
+       __mptcp_clear_xmit(sk);
+       release_sock(sk);
+       return tcp_disconnect(sk, flags);
+}
+
 #if IS_ENABLED(CONFIG_MPTCP_IPV6)
 static struct ipv6_pinfo *mptcp_inet6_sk(const struct sock *sk)
 {
        .name           = "MPTCP",
        .owner          = THIS_MODULE,
        .init           = mptcp_init_sock,
+       .disconnect     = mptcp_disconnect,
        .close          = mptcp_close,
        .accept         = mptcp_accept,
        .setsockopt     = mptcp_setsockopt,