return done;
 }
 
-void mptcp_data_ready(struct sock *sk)
+/* In most cases we will be able to lock the mptcp socket.  If its already
+ * owned, we need to defer to the work queue to avoid ABBA deadlock.
+ */
+static bool move_skbs_to_msk(struct mptcp_sock *msk, struct sock *ssk)
+{
+       struct sock *sk = (struct sock *)msk;
+       unsigned int moved = 0;
+
+       if (READ_ONCE(sk->sk_lock.owned))
+               return false;
+
+       if (unlikely(!spin_trylock_bh(&sk->sk_lock.slock)))
+               return false;
+
+       /* must re-check after taking the lock */
+       if (!READ_ONCE(sk->sk_lock.owned))
+               __mptcp_move_skbs_from_subflow(msk, ssk, &moved);
+
+       spin_unlock_bh(&sk->sk_lock.slock);
+
+       return moved > 0;
+}
+
+void mptcp_data_ready(struct sock *sk, struct sock *ssk)
 {
        struct mptcp_sock *msk = mptcp_sk(sk);
 
        set_bit(MPTCP_DATA_READY, &msk->flags);
 
+       if (atomic_read(&sk->sk_rmem_alloc) < READ_ONCE(sk->sk_rcvbuf) &&
+           move_skbs_to_msk(msk, ssk))
+               goto wake;
+
        /* don't schedule if mptcp sk is (still) over limit */
        if (atomic_read(&sk->sk_rmem_alloc) > READ_ONCE(sk->sk_rcvbuf))
                goto wake;
 
                       struct tcp_options_received *opt_rx);
 
 void mptcp_finish_connect(struct sock *sk);
-void mptcp_data_ready(struct sock *sk);
+void mptcp_data_ready(struct sock *sk, struct sock *ssk);
 
 int mptcp_token_new_request(struct request_sock *req);
 void mptcp_token_destroy_request(u32 token);
 
        }
 
        if (mptcp_subflow_data_available(sk))
-               mptcp_data_ready(parent);
+               mptcp_data_ready(parent, sk);
 }
 
 static void subflow_write_space(struct sock *sk)
         * the data available machinery here.
         */
        if (parent && subflow->mp_capable && mptcp_subflow_data_available(sk))
-               mptcp_data_ready(parent);
+               mptcp_data_ready(parent, sk);
 
        if (parent && !(parent->sk_shutdown & RCV_SHUTDOWN) &&
            !subflow->rx_eof && subflow_is_done(sk)) {