]> www.infradead.org Git - users/jedix/linux-maple.git/commitdiff
sdp: stability improvements for ZCopy
authorEldad Zinger <eldadz@mellanox.co.il>
Mon, 9 Aug 2010 14:05:44 +0000 (17:05 +0300)
committerMukesh Kacker <mukesh.kacker@oracle.com>
Tue, 6 Oct 2015 12:05:17 +0000 (05:05 -0700)
- beter handling of ZCopy errors and SrcAvailCancel requests
- fixed SrcAvailCancel sequence number handling

Signed-off-by: Eldad Zinger <eldadz@mellanox.co.il>
drivers/infiniband/ulp/sdp/sdp.h
drivers/infiniband/ulp/sdp/sdp_main.c
drivers/infiniband/ulp/sdp/sdp_rx.c
drivers/infiniband/ulp/sdp/sdp_tx.c
drivers/infiniband/ulp/sdp/sdp_zcopy.c

index 080fcb749dee3d92709e6cacc52666c735c22924..4cf0316debdac69b8d6b5380c51468ffa523f9f4 100644 (file)
@@ -19,6 +19,7 @@
 #define SDP_NAGLE_TIMEOUT (HZ / 10)
 
 #define SDP_RX_POLL_TIMEOUT    (1 + HZ / 1000)
+#define SDP_RDMA_READ_TIMEOUT  (5 * HZ)
 
 #define SDP_SRCAVAIL_CANCEL_TIMEOUT (HZ * 5)
 #define SDP_SRCAVAIL_ADV_TIMEOUT (1 * HZ)
@@ -250,10 +251,6 @@ struct bzcopy_state {
        struct page         **pages;
 };
 
-enum rx_sa_flag {
-       RX_SA_ABORTED    = 2,
-};
-
 enum tx_sa_flag {
        TX_SA_SENDSM     = 0x01,
        TX_SA_CROSS_SEND = 0x02,
@@ -276,7 +273,8 @@ struct rx_srcavail_state {
 
        /* Utility */
        u8  busy;
-       enum rx_sa_flag  flags;
+       struct sk_buff *skb; /* SrcAvail skb */
+       int is_treated; /* recvmsg() is RDMA-reading now */
 };
 
 struct tx_srcavail_state {
@@ -363,11 +361,14 @@ struct sdp_sock {
        struct sdp_device *sdp_dev;
 
        int qp_active;
+       spinlock_t tx_sa_lock;
        struct tx_srcavail_state *tx_sa;
+
+       /* set when SrcAvail received, reset when SendSM/RdmaRdCompl sent */
        struct rx_srcavail_state *rx_sa;
-       spinlock_t tx_sa_lock;
-       struct delayed_work srcavail_cancel_work;
-       int srcavail_cancel_mseq;
+
+       u32 sa_cancel_mseq;
+       int sa_cancel_arrived; /* is 'sa_cancel_mseq' relevant or not, sticky */
 
        struct ib_ucontext context;
 
@@ -484,6 +485,18 @@ static inline void rx_ring_destroy_lock(struct sdp_rx_ring *rx_ring)
        write_unlock_bh(&rx_ring->destroyed_lock);
 }
 
+static inline int sdp_chk_sa_cancel(struct sdp_sock *ssk, struct rx_srcavail_state *rx_sa)
+{
+       int res;
+
+       spin_lock_irq(&ssk->rx_ring.lock);
+       res = ssk->sa_cancel_arrived &&
+               before(rx_sa->mseq, ssk->sa_cancel_mseq);
+       spin_unlock_irq(&ssk->rx_ring.lock);
+
+       return res;
+}
+
 static inline struct sdp_sock *sdp_sk(const struct sock *sk)
 {
                return (struct sdp_sock *)sk;
@@ -887,7 +900,6 @@ int sdp_rdma_to_iovec(struct sock *sk, struct iovec *iov, struct sk_buff *skb,
 int sdp_post_rdma_rd_compl(struct sock *sk,
                struct rx_srcavail_state *rx_sa, u32 offset);
 int sdp_post_sendsm(struct sock *sk);
-void srcavail_cancel_timeout(struct work_struct *work);
 void sdp_abort_srcavail(struct sock *sk);
 void sdp_abort_rdma_read(struct sock *sk);
 
index 7054d10198ea9bdb29f67be5406513ad7628ce45..84c37279cc33590a3cb6a173dd47524192bee50c 100644 (file)
@@ -435,8 +435,6 @@ void sdp_reset_sk(struct sock *sk, int rc)
 
        sdp_abort_srcavail(sk);
 
-       sdp_abort_rdma_read(sk);
-
        if (!(sk->sk_shutdown & RCV_SHUTDOWN) || !sk_stream_memory_free(sk)) {
                sdp_dbg(sk, "setting state to error\n");
                sdp_set_error(sk, rc);
@@ -533,8 +531,6 @@ static void sdp_destruct(struct sock *sk)
        }
 
        percpu_counter_dec(sk->sk_prot->orphan_count);
-       cancel_delayed_work(&ssk->srcavail_cancel_work);
-
        ssk->destructed_already = 1;
 
        down_read(&device_removal_lock);
@@ -542,7 +538,6 @@ static void sdp_destruct(struct sock *sk)
        sdp_destroy_resources(sk);
        up_read(&device_removal_lock);
 
-       flush_scheduled_work();
        flush_workqueue(rx_comp_wq);
        /* Consider use cancel_work_sync(&ssk->rx_comp_work) */
 
@@ -1029,8 +1024,6 @@ static void sdp_destroy_work(struct work_struct *work)
 
        sdp_cancel_dreq_wait_timeout(ssk);
 
-       cancel_delayed_work(&ssk->srcavail_cancel_work);
-
        if (sk->sk_state == TCP_TIME_WAIT) {
                if (sdp_cancel_cma_timewait_timeout(ssk))
                        sock_put(sk, SOCK_REF_CMA);
@@ -1104,7 +1097,6 @@ int sdp_init_sock(struct sock *sk)
        INIT_LIST_HEAD(&ssk->backlog_queue);
        INIT_DELAYED_WORK(&ssk->dreq_wait_work, sdp_dreq_wait_timeout_work);
        INIT_DELAYED_WORK(&ssk->cma_timewait_work, sdp_cma_timewait_timeout_work);
-       INIT_DELAYED_WORK(&ssk->srcavail_cancel_work, srcavail_cancel_timeout);
        INIT_WORK(&ssk->destroy_work, sdp_destroy_work);
 
        lockdep_set_class(&sk->sk_receive_queue.lock,
@@ -1136,6 +1128,7 @@ int sdp_init_sock(struct sock *sk)
        init_timer(&ssk->tx_ring.timer);
        init_timer(&ssk->nagle_timer);
        init_timer(&sk->sk_timer);
+       ssk->sa_cancel_arrived = 0;
        ssk->zcopy_thresh = -1; /* use global sdp_zcopy_thresh */
        ssk->last_bind_err = 0;
 
@@ -2105,10 +2098,11 @@ fin:
        return err;
 }
 
-static inline int sdp_abort_rx_srcavail(struct sock *sk, struct sk_buff *skb)
+static inline int sdp_abort_rx_srcavail(struct sock *sk)
 {
-       struct sdp_bsdh *h = (struct sdp_bsdh *)skb_transport_header(skb);
        struct sdp_sock *ssk = sdp_sk(sk);
+       struct sdp_bsdh *h =
+               (struct sdp_bsdh *)skb_transport_header(ssk->rx_sa->skb);
 
        sdp_dbg_data(sk, "SrcAvail aborted\n");
 
@@ -2116,11 +2110,9 @@ static inline int sdp_abort_rx_srcavail(struct sock *sk, struct sk_buff *skb)
 
        spin_lock_irq(&ssk->rx_ring.lock);
 
-       if (ssk->rx_sa == RX_SRCAVAIL_STATE(skb))
-               ssk->rx_sa = NULL;
-
-       kfree(RX_SRCAVAIL_STATE(skb));
-       RX_SRCAVAIL_STATE(skb) = NULL;
+       RX_SRCAVAIL_STATE(ssk->rx_sa->skb) = NULL;
+       kfree(ssk->rx_sa);
+       ssk->rx_sa = NULL;
 
        spin_unlock_irq(&ssk->rx_ring.lock);
 
@@ -2155,8 +2147,6 @@ static int sdp_recvmsg(struct kiocb *iocb, struct sock *sk, struct msghdr *msg,
 
        posts_handler_get(ssk);
 
-       sdp_prf(sk, skb, "Read from user");
-
        err = -ENOTCONN;
        if (sk->sk_state == TCP_LISTEN)
                goto out;
@@ -2217,17 +2207,28 @@ static int sdp_recvmsg(struct kiocb *iocb, struct sock *sk, struct msghdr *msg,
                                goto found_fin_ok;
 
                        case SDP_MID_SRCAVAIL:
+                               spin_lock_irq(&ssk->rx_ring.lock);
                                rx_sa = RX_SRCAVAIL_STATE(skb);
+                               if (unlikely(!rx_sa)) {
+                                       /* SrcAvailCancel arrived and handled */
+                                       h->mid = SDP_MID_DATA;
+                                       spin_unlock_irq(&ssk->rx_ring.lock);
+                                       goto sdp_mid_data;
+                               }
+
+                               rx_sa->is_treated = 1;
+                               spin_unlock_irq(&ssk->rx_ring.lock);
 
-                               if (rx_sa->mseq < ssk->srcavail_cancel_mseq ||
+                               if (sdp_chk_sa_cancel(ssk, rx_sa) ||
                                                !ssk->sdp_dev ||
                                                !ssk->sdp_dev->fmr_pool) {
                                        sdp_dbg_data(sk, "Aborting SA "
                                                        "due to SACancel or "
                                                        "no fmr pool\n");
                                        sdp_post_sendsm(sk);
+                                       sdp_abort_rx_srcavail(sk);
+                                       rx_sa = NULL;
                                        if (offset < skb->len) {
-                                               sdp_abort_rx_srcavail(sk, skb);
                                                sdp_prf(sk, skb, "Converted SA to DATA");
                                                goto sdp_mid_data;
                                        } else {
@@ -2254,7 +2255,7 @@ static int sdp_recvmsg(struct kiocb *iocb, struct sock *sk, struct msghdr *msg,
                                                        SDP_SKB_CB(skb)->seq;
                                                sdp_dbg_data(sk, "Peek on RDMA data - "
                                                                "fallback to BCopy\n");
-                                               sdp_abort_rx_srcavail(sk, skb);
+                                               sdp_abort_rx_srcavail(sk);
                                                sdp_post_sendsm(sk);
                                                rx_sa = NULL;
                                                if (real_offset >= skb->len)
@@ -2397,10 +2398,15 @@ sdp_mid_data:
                                sdp_dbg_data(sk, "RDMA copy of %lx bytes\n", used);
                                err = sdp_rdma_to_iovec(sk, msg->msg_iov, skb,
                                                &used, offset);
-                               sdp_dbg_data(sk, "used = %lx bytes\n", used);
-                               if (err == -EAGAIN) {
-                                       sdp_dbg_data(sk, "RDMA Read aborted\n");
-                                       goto skb_cleanup;
+                               if (unlikely(err)) {
+                                       sdp_post_sendsm(sk);
+                                       sdp_abort_rx_srcavail(sk);
+                                       rx_sa = NULL;
+                                       if (err == -EAGAIN || err == -ETIME)
+                                               goto skb_cleanup;
+                                       sdp_warn(sk, "err from rdma %d - sendSM\n", err);
+                                       skb_unlink(skb, &sk->sk_receive_queue);
+                                       __kfree_skb(skb);
                                }
                        } else {
                                sdp_dbg_data(sk, "memcpy 0x%lx bytes +0x%x -> %p\n",
@@ -2414,12 +2420,12 @@ sdp_mid_data:
                                }
                        }
                        if (err) {
-                               sdp_dbg(sk, "%s: skb_copy_datagram_iovec failed"
+                               sdp_dbg(sk, "%s: data copy failed"
                                        "offset %d size %ld status %d\n",
                                        __func__, offset, used, err);
                                /* Exception. Bailout! */
                                if (!copied)
-                                       copied = -EFAULT;
+                                       copied = err;
                                break;
                        }
                }
@@ -2439,6 +2445,8 @@ skip_copy:
                if (rx_sa && !(flags & MSG_PEEK)) {
                        rc = sdp_post_rdma_rd_compl(sk, rx_sa, offset);
                        if (unlikely(rc)) {
+                               sdp_abort_rx_srcavail(sk);
+                               rx_sa = NULL;
                                err = rc;
                                goto out;
                        }
@@ -2460,13 +2468,8 @@ skb_cleanup:
                                ntohl(h->mseq), ntohl(h->mseq_ack));
 
                        if (rx_sa) {
-                               if (!rx_sa->flags) /* else ssk->rx_sa might
-                                                     point to another rx_sa */
-                                       ssk->rx_sa = NULL;
-
-                               kfree(rx_sa);
+                               sdp_abort_rx_srcavail(sk);
                                rx_sa = NULL;
-
                        }
 force_skb_cleanup:
                        sdp_dbg_data(sk, "unlinking skb %p\n", skb);
@@ -2953,8 +2956,6 @@ static void __exit sdp_exit(void)
        destroy_workqueue(rx_comp_wq);
        destroy_workqueue(sdp_wq);
 
-       flush_scheduled_work();
-
        BUG_ON(!list_empty(&sock_list));
 
        if (atomic_read(&sdp_current_mem_usage))
index 91389b44cd4bccedb0609a6b624c94d0d367636f..3600d9bbf8b8438f36d8e395d3b2a91ff6f76ece 100644 (file)
@@ -322,8 +322,7 @@ static inline struct sk_buff *sdp_sock_queue_rcv_skb(struct sock *sk,
                struct sdp_srcah *srcah = (struct sdp_srcah *)(h+1);
                struct rx_srcavail_state *rx_sa;
 
-               ssk->srcavail_cancel_mseq = 0;
-
+               SDP_WARN_ON(ssk->rx_sa);
                ssk->rx_sa = rx_sa = RX_SRCAVAIL_STATE(skb) = kzalloc(
                                sizeof(struct rx_srcavail_state), GFP_ATOMIC);
                if (unlikely(!rx_sa)) {
@@ -337,7 +336,8 @@ static inline struct sk_buff *sdp_sock_queue_rcv_skb(struct sock *sk,
                rx_sa->len = skb_len = ntohl(srcah->len);
                rx_sa->rkey = ntohl(srcah->rkey);
                rx_sa->vaddr = be64_to_cpu(srcah->vaddr);
-               rx_sa->flags = 0;
+               rx_sa->skb = skb;
+               rx_sa->is_treated = 0;
 
                if (ssk->tx_sa) {
                        sdp_dbg_data(&ssk->isk.sk, "got RX SrcAvail while waiting "
@@ -507,17 +507,19 @@ static int sdp_process_rx_ctl_skb(struct sdp_sock *ssk, struct sk_buff *skb)
                __kfree_skb(skb);
                break;
        case SDP_MID_SRCAVAIL_CANCEL:
-               sdp_dbg_data(sk, "Handling SrcAvailCancel\n");
-               sdp_prf(sk, NULL, "Handling SrcAvailCancel");
-               if (ssk->rx_sa) {
-                       ssk->srcavail_cancel_mseq = ntohl(h->mseq);
-                       ssk->rx_sa->flags |= RX_SA_ABORTED;
-                       ssk->rx_sa = NULL; /* TODO: change it into SDP_MID_DATA and get
-                                             the dirty logic from recvmsg */
-               } else {
-                       sdp_dbg(sk, "Got SrcAvailCancel - "
-                                       "but no SrcAvail in process\n");
+               spin_lock_irq(&ssk->rx_ring.lock);
+               if (ssk->rx_sa && !ssk->rx_sa->is_treated &&
+                               after(ntohl(h->mseq), ssk->rx_sa->mseq)) {
+                       sdp_dbg(sk, "Handling SrcAvailCancel - post SendSM\n");
+                       RX_SRCAVAIL_STATE(ssk->rx_sa->skb) = NULL;
+                       kfree(ssk->rx_sa);
+                       ssk->rx_sa = NULL;
+                       spin_unlock_irq(&ssk->rx_ring.lock);
+                       sdp_post_sendsm(sk);
+                       break;
                }
+
+               spin_unlock_irq(&ssk->rx_ring.lock);
                break;
        case SDP_MID_SINKAVAIL:
                sdp_dbg_data(sk, "Got SinkAvail - not supported: ignored\n");
@@ -608,7 +610,10 @@ static int sdp_process_rx_skb(struct sdp_sock *ssk, struct sk_buff *skb)
                        sdp_dbg_data(sk, "Got SrcAvailCancel. "
                                        "seq: 0x%d seq_ack: 0x%d\n",
                                        ntohl(h->mseq), ntohl(h->mseq_ack));
-                       ssk->srcavail_cancel_mseq = ntohl(h->mseq);
+                       ssk->sa_cancel_mseq = ntohl(h->mseq);
+                       ssk->sa_cancel_arrived = 1;
+                       if (ssk->rx_sa && ssk->rx_sa->is_treated)
+                               wake_up(sk->sk_sleep);
                }
 
 
index 75137c29ccb09c0d29f99b413093a91f66b3bdbd..5b4568ef207db475ebcc4fe1280a873fa2469339 100644 (file)
@@ -251,7 +251,8 @@ static inline void sdp_process_tx_wc(struct sdp_sock *ssk, struct ib_wc *wc)
                sdp_prf1(sk, NULL, "TX comp: RDMA read");
 
                if (!ssk->tx_ring.rdma_inflight) {
-                       sdp_warn(sk, "ERROR: unexpected RDMA read\n");
+                       sdp_dbg(sk, "unexpected RDMA read, "
+                                       "probably was canceled\n");
                        return;
                }
 
index 89aa5174093619854607f81e426ede720b8681ec..8ec145f2328f83857724fe62403ca396506ea0ab 100644 (file)
@@ -124,27 +124,9 @@ static int sdp_post_srcavail_cancel(struct sock *sk)
 
        sdp_post_sends(ssk, 0);
 
-       schedule_delayed_work(&ssk->srcavail_cancel_work,
-                       SDP_SRCAVAIL_CANCEL_TIMEOUT);
-
        return 0;
 }
 
-void srcavail_cancel_timeout(struct work_struct *work)
-{
-       struct sdp_sock *ssk =
-               container_of(work, struct sdp_sock, srcavail_cancel_work.work);
-       struct sock *sk = &ssk->isk.sk;
-
-       lock_sock(sk);
-
-       sdp_dbg_data(sk, "both SrcAvail and SrcAvailCancel timedout."
-                       " closing connection\n");
-       sdp_set_error(sk, -ECONNRESET);
-
-       release_sock(sk);
-}
-
 static int sdp_wait_rdmardcompl(struct sdp_sock *ssk, long *timeo_p,
                int ignore_signals)
 {
@@ -236,10 +218,12 @@ static int sdp_wait_rdmardcompl(struct sdp_sock *ssk, long *timeo_p,
        return err;
 }
 
-static void sdp_wait_rdma_wr_finished(struct sdp_sock *ssk)
+static int sdp_wait_rdma_wr_finished(struct sdp_sock *ssk,
+               struct rx_srcavail_state *rx_sa)
 {
        struct sock *sk = &ssk->isk.sk;
-       long timeo = HZ * 5; /* Timeout for for RDMA read */
+       long timeo = SDP_RDMA_READ_TIMEOUT;
+       int rc = 0;
        DEFINE_WAIT(wait);
 
        sdp_dbg_data(sk, "Sleep till RDMA wr finished.\n");
@@ -251,14 +235,21 @@ static void sdp_wait_rdma_wr_finished(struct sdp_sock *ssk)
                        break;
                }
 
+               if (sdp_chk_sa_cancel(ssk, rx_sa)) {
+                       sdp_dbg_data(sk, "got SrcAvailCancel\n");
+                       rc = -EAGAIN; /* fall to BCopy */
+                       break;
+               }
+
                if (!ssk->qp_active) {
                        sdp_dbg_data(sk, "QP destroyed\n");
+                       rc = -EPIPE;
                        break;
                }
 
                if (!timeo) {
-                       sdp_warn(sk, "Panic: Timed out waiting for RDMA read\n");
-                       SDP_WARN_ON(1);
+                       sdp_warn(sk, "Timed out waiting for RDMA read\n");
+                       rc = -ETIME;
                        break;
                }
 
@@ -266,7 +257,9 @@ static void sdp_wait_rdma_wr_finished(struct sdp_sock *ssk)
 
                sdp_prf1(sk, NULL, "Going to sleep");
                sk_wait_event(sk, &timeo,
-                       !ssk->tx_ring.rdma_inflight->busy);
+                       !ssk->tx_ring.rdma_inflight->busy ||
+                       sdp_chk_sa_cancel(ssk, rx_sa) ||
+                       !ssk->qp_active);
                sdp_prf1(sk, NULL, "Woke up");
                sdp_dbg_data(&ssk->isk.sk, "woke up sleepers\n");
 
@@ -276,6 +269,7 @@ static void sdp_wait_rdma_wr_finished(struct sdp_sock *ssk)
        finish_wait(sk->sk_sleep, &wait);
 
        sdp_dbg_data(sk, "Finished waiting\n");
+       return rc;
 }
 
 int sdp_post_rdma_rd_compl(struct sock *sk,
@@ -364,8 +358,6 @@ void sdp_handle_sendsm(struct sdp_sock *ssk, u32 mseq_ack)
        sdp_dbg_data(sk, "Got SendSM - aborting SrcAvail\n");
 
        ssk->tx_sa->abort_flags |= TX_SA_SENDSM;
-       cancel_delayed_work(&ssk->srcavail_cancel_work);
-
        wake_up(sk->sk_sleep);
        sdp_dbg_data(sk, "woke up sleepers\n");
 
@@ -563,7 +555,6 @@ int sdp_rdma_to_iovec(struct sock *sk, struct iovec *iov, struct sk_buff *skb,
 {
        struct sdp_sock *ssk = sdp_sk(sk);
        struct rx_srcavail_state *rx_sa = RX_SRCAVAIL_STATE(skb);
-       int got_srcavail_cancel;
        int rc = 0;
        int len = *used;
        int copied;
@@ -595,20 +586,12 @@ int sdp_rdma_to_iovec(struct sock *sk, struct iovec *iov, struct sk_buff *skb,
                goto err_post_send;
        }
 
-       sdp_prf(sk, skb, "Finished posting(rc=%d), now to wait", rc);
-
-       got_srcavail_cancel = ssk->srcavail_cancel_mseq > rx_sa->mseq;
-
+       sdp_prf(sk, skb, "Finished posting, now to wait");
        sdp_arm_tx_cq(sk);
 
-       sdp_wait_rdma_wr_finished(ssk);
-
-       sdp_prf(sk, skb, "Finished waiting(rc=%d)", rc);
-       if (!ssk->qp_active) {
-               sdp_dbg_data(sk, "QP destroyed during RDMA read\n");
-               rc = -EPIPE;
-               goto err_post_send;
-       }
+       rc = sdp_wait_rdma_wr_finished(ssk, rx_sa);
+       if (unlikely(rc))
+               goto err_wait;
 
        copied = rx_sa->umem->length;
 
@@ -616,17 +599,13 @@ int sdp_rdma_to_iovec(struct sock *sk, struct iovec *iov, struct sk_buff *skb,
        atomic_add(copied, &ssk->rcv_nxt);
        *used = copied;
 
+err_wait:
        ssk->tx_ring.rdma_inflight = NULL;
 
 err_post_send:
        sdp_free_fmr(sk, &rx_sa->fmr, &rx_sa->umem);
 
 err_alloc_fmr:
-       if (rc && ssk->qp_active) {
-               sdp_dbg_data(sk, "Couldn't do RDMA - post sendsm\n");
-               rx_sa->flags |= RX_SA_ABORTED;
-       }
-
        sock_put(sk, SOCK_REF_RDMA_RD);
 
        return rc;
@@ -698,12 +677,15 @@ static int do_sdp_sendmsg_zcopy(struct sock *sk, struct tx_srcavail_state *tx_sa
 
                        /* Wait for RdmaRdCompl/SendSM to
                         * finish the transaction */
-                       *timeo = 2 * HZ;
-                       sdp_dbg_data(sk, "Waiting for SendSM\n");
-                       sdp_wait_rdmardcompl(ssk, timeo, 1);
-                       sdp_dbg_data(sk, "finished waiting\n");
-
-                       cancel_delayed_work(&ssk->srcavail_cancel_work);
+                       *timeo = SDP_SRCAVAIL_CANCEL_TIMEOUT;
+                       rc = sdp_wait_rdmardcompl(ssk, timeo, 1);
+                       if (unlikely(rc == -ETIME || rc == -EINVAL)) {
+                               /* didn't get RdmaRdCompl/SendSM after sending
+                                * SrcAvailCancel - There is a connection
+                                * problem. */
+                               sdp_reset(sk);
+                               rc = -sk->sk_err;
+                       }
                } else {
                        sdp_dbg_data(sk, "QP was destroyed while waiting\n");
                }
@@ -739,6 +721,11 @@ int sdp_sendmsg_zcopy(struct kiocb *iocb, struct sock *sk, struct iovec *iov)
                        iov->iov_base, iov->iov_len);
        sdp_prf1(sk, NULL, "sdp_sendmsg_zcopy start");
        if (ssk->rx_sa) {
+               /* Don't want both sides to send SrcAvail because both of them
+                * will wait on sendmsg() until timeout.
+                * Don't need to lock 'rx_ring.lock' because when SrcAvail is
+                * received, sk_sleep'ers are woken up.
+                */
                sdp_dbg_data(sk, "Deadlock prevent: crossing SrcAvail\n");
                return 0;
        }
@@ -794,9 +781,6 @@ void sdp_abort_srcavail(struct sock *sk)
        if (!tx_sa)
                return;
 
-       cancel_delayed_work(&ssk->srcavail_cancel_work);
-       flush_scheduled_work();
-
        spin_lock_irqsave(&ssk->tx_sa_lock, flags);
 
        sdp_free_fmr(sk, &tx_sa->fmr, &tx_sa->umem);
@@ -809,13 +793,18 @@ void sdp_abort_srcavail(struct sock *sk)
 void sdp_abort_rdma_read(struct sock *sk)
 {
        struct sdp_sock *ssk = sdp_sk(sk);
-       struct rx_srcavail_state *rx_sa = ssk->rx_sa;
+       struct rx_srcavail_state *rx_sa;
 
+       spin_lock_irq(&ssk->rx_ring.lock);
+       rx_sa = ssk->rx_sa;
        if (!rx_sa)
-               return;
+               goto out;
 
-       if (rx_sa->fmr)
-               sdp_free_fmr(sk, &rx_sa->fmr, &rx_sa->umem);
+       sdp_free_fmr(sk, &rx_sa->fmr, &rx_sa->umem);
 
-       ssk->rx_sa = NULL;
+       /* kfree(rx_sa) and posting SendSM will be handled in the nornal
+        * flows.
+        */
+out:
+       spin_unlock_irq(&ssk->rx_ring.lock);
 }