From: Eldad Zinger Date: Mon, 9 Aug 2010 14:05:44 +0000 (+0300) Subject: sdp: stability improvements for ZCopy X-Git-Tag: v4.1.12-92~264^2~5^2~119 X-Git-Url: https://www.infradead.org/git/?a=commitdiff_plain;h=9482295e1eda183b8f1b1284245e49205e18cb34;p=users%2Fjedix%2Flinux-maple.git sdp: stability improvements for ZCopy - beter handling of ZCopy errors and SrcAvailCancel requests - fixed SrcAvailCancel sequence number handling Signed-off-by: Eldad Zinger --- diff --git a/drivers/infiniband/ulp/sdp/sdp.h b/drivers/infiniband/ulp/sdp/sdp.h index 080fcb749dee..4cf0316debda 100644 --- a/drivers/infiniband/ulp/sdp/sdp.h +++ b/drivers/infiniband/ulp/sdp/sdp.h @@ -19,6 +19,7 @@ #define SDP_NAGLE_TIMEOUT (HZ / 10) #define SDP_RX_POLL_TIMEOUT (1 + HZ / 1000) +#define SDP_RDMA_READ_TIMEOUT (5 * HZ) #define SDP_SRCAVAIL_CANCEL_TIMEOUT (HZ * 5) #define SDP_SRCAVAIL_ADV_TIMEOUT (1 * HZ) @@ -250,10 +251,6 @@ struct bzcopy_state { struct page **pages; }; -enum rx_sa_flag { - RX_SA_ABORTED = 2, -}; - enum tx_sa_flag { TX_SA_SENDSM = 0x01, TX_SA_CROSS_SEND = 0x02, @@ -276,7 +273,8 @@ struct rx_srcavail_state { /* Utility */ u8 busy; - enum rx_sa_flag flags; + struct sk_buff *skb; /* SrcAvail skb */ + int is_treated; /* recvmsg() is RDMA-reading now */ }; struct tx_srcavail_state { @@ -363,11 +361,14 @@ struct sdp_sock { struct sdp_device *sdp_dev; int qp_active; + spinlock_t tx_sa_lock; struct tx_srcavail_state *tx_sa; + + /* set when SrcAvail received, reset when SendSM/RdmaRdCompl sent */ struct rx_srcavail_state *rx_sa; - spinlock_t tx_sa_lock; - struct delayed_work srcavail_cancel_work; - int srcavail_cancel_mseq; + + u32 sa_cancel_mseq; + int sa_cancel_arrived; /* is 'sa_cancel_mseq' relevant or not, sticky */ struct ib_ucontext context; @@ -484,6 +485,18 @@ static inline void rx_ring_destroy_lock(struct sdp_rx_ring *rx_ring) write_unlock_bh(&rx_ring->destroyed_lock); } +static inline int sdp_chk_sa_cancel(struct sdp_sock *ssk, struct rx_srcavail_state *rx_sa) +{ + int res; + + spin_lock_irq(&ssk->rx_ring.lock); + res = ssk->sa_cancel_arrived && + before(rx_sa->mseq, ssk->sa_cancel_mseq); + spin_unlock_irq(&ssk->rx_ring.lock); + + return res; +} + static inline struct sdp_sock *sdp_sk(const struct sock *sk) { return (struct sdp_sock *)sk; @@ -887,7 +900,6 @@ int sdp_rdma_to_iovec(struct sock *sk, struct iovec *iov, struct sk_buff *skb, int sdp_post_rdma_rd_compl(struct sock *sk, struct rx_srcavail_state *rx_sa, u32 offset); int sdp_post_sendsm(struct sock *sk); -void srcavail_cancel_timeout(struct work_struct *work); void sdp_abort_srcavail(struct sock *sk); void sdp_abort_rdma_read(struct sock *sk); diff --git a/drivers/infiniband/ulp/sdp/sdp_main.c b/drivers/infiniband/ulp/sdp/sdp_main.c index 7054d10198ea..84c37279cc33 100644 --- a/drivers/infiniband/ulp/sdp/sdp_main.c +++ b/drivers/infiniband/ulp/sdp/sdp_main.c @@ -435,8 +435,6 @@ void sdp_reset_sk(struct sock *sk, int rc) sdp_abort_srcavail(sk); - sdp_abort_rdma_read(sk); - if (!(sk->sk_shutdown & RCV_SHUTDOWN) || !sk_stream_memory_free(sk)) { sdp_dbg(sk, "setting state to error\n"); sdp_set_error(sk, rc); @@ -533,8 +531,6 @@ static void sdp_destruct(struct sock *sk) } percpu_counter_dec(sk->sk_prot->orphan_count); - cancel_delayed_work(&ssk->srcavail_cancel_work); - ssk->destructed_already = 1; down_read(&device_removal_lock); @@ -542,7 +538,6 @@ static void sdp_destruct(struct sock *sk) sdp_destroy_resources(sk); up_read(&device_removal_lock); - flush_scheduled_work(); flush_workqueue(rx_comp_wq); /* Consider use cancel_work_sync(&ssk->rx_comp_work) */ @@ -1029,8 +1024,6 @@ static void sdp_destroy_work(struct work_struct *work) sdp_cancel_dreq_wait_timeout(ssk); - cancel_delayed_work(&ssk->srcavail_cancel_work); - if (sk->sk_state == TCP_TIME_WAIT) { if (sdp_cancel_cma_timewait_timeout(ssk)) sock_put(sk, SOCK_REF_CMA); @@ -1104,7 +1097,6 @@ int sdp_init_sock(struct sock *sk) INIT_LIST_HEAD(&ssk->backlog_queue); INIT_DELAYED_WORK(&ssk->dreq_wait_work, sdp_dreq_wait_timeout_work); INIT_DELAYED_WORK(&ssk->cma_timewait_work, sdp_cma_timewait_timeout_work); - INIT_DELAYED_WORK(&ssk->srcavail_cancel_work, srcavail_cancel_timeout); INIT_WORK(&ssk->destroy_work, sdp_destroy_work); lockdep_set_class(&sk->sk_receive_queue.lock, @@ -1136,6 +1128,7 @@ int sdp_init_sock(struct sock *sk) init_timer(&ssk->tx_ring.timer); init_timer(&ssk->nagle_timer); init_timer(&sk->sk_timer); + ssk->sa_cancel_arrived = 0; ssk->zcopy_thresh = -1; /* use global sdp_zcopy_thresh */ ssk->last_bind_err = 0; @@ -2105,10 +2098,11 @@ fin: return err; } -static inline int sdp_abort_rx_srcavail(struct sock *sk, struct sk_buff *skb) +static inline int sdp_abort_rx_srcavail(struct sock *sk) { - struct sdp_bsdh *h = (struct sdp_bsdh *)skb_transport_header(skb); struct sdp_sock *ssk = sdp_sk(sk); + struct sdp_bsdh *h = + (struct sdp_bsdh *)skb_transport_header(ssk->rx_sa->skb); sdp_dbg_data(sk, "SrcAvail aborted\n"); @@ -2116,11 +2110,9 @@ static inline int sdp_abort_rx_srcavail(struct sock *sk, struct sk_buff *skb) spin_lock_irq(&ssk->rx_ring.lock); - if (ssk->rx_sa == RX_SRCAVAIL_STATE(skb)) - ssk->rx_sa = NULL; - - kfree(RX_SRCAVAIL_STATE(skb)); - RX_SRCAVAIL_STATE(skb) = NULL; + RX_SRCAVAIL_STATE(ssk->rx_sa->skb) = NULL; + kfree(ssk->rx_sa); + ssk->rx_sa = NULL; spin_unlock_irq(&ssk->rx_ring.lock); @@ -2155,8 +2147,6 @@ static int sdp_recvmsg(struct kiocb *iocb, struct sock *sk, struct msghdr *msg, posts_handler_get(ssk); - sdp_prf(sk, skb, "Read from user"); - err = -ENOTCONN; if (sk->sk_state == TCP_LISTEN) goto out; @@ -2217,17 +2207,28 @@ static int sdp_recvmsg(struct kiocb *iocb, struct sock *sk, struct msghdr *msg, goto found_fin_ok; case SDP_MID_SRCAVAIL: + spin_lock_irq(&ssk->rx_ring.lock); rx_sa = RX_SRCAVAIL_STATE(skb); + if (unlikely(!rx_sa)) { + /* SrcAvailCancel arrived and handled */ + h->mid = SDP_MID_DATA; + spin_unlock_irq(&ssk->rx_ring.lock); + goto sdp_mid_data; + } + + rx_sa->is_treated = 1; + spin_unlock_irq(&ssk->rx_ring.lock); - if (rx_sa->mseq < ssk->srcavail_cancel_mseq || + if (sdp_chk_sa_cancel(ssk, rx_sa) || !ssk->sdp_dev || !ssk->sdp_dev->fmr_pool) { sdp_dbg_data(sk, "Aborting SA " "due to SACancel or " "no fmr pool\n"); sdp_post_sendsm(sk); + sdp_abort_rx_srcavail(sk); + rx_sa = NULL; if (offset < skb->len) { - sdp_abort_rx_srcavail(sk, skb); sdp_prf(sk, skb, "Converted SA to DATA"); goto sdp_mid_data; } else { @@ -2254,7 +2255,7 @@ static int sdp_recvmsg(struct kiocb *iocb, struct sock *sk, struct msghdr *msg, SDP_SKB_CB(skb)->seq; sdp_dbg_data(sk, "Peek on RDMA data - " "fallback to BCopy\n"); - sdp_abort_rx_srcavail(sk, skb); + sdp_abort_rx_srcavail(sk); sdp_post_sendsm(sk); rx_sa = NULL; if (real_offset >= skb->len) @@ -2397,10 +2398,15 @@ sdp_mid_data: sdp_dbg_data(sk, "RDMA copy of %lx bytes\n", used); err = sdp_rdma_to_iovec(sk, msg->msg_iov, skb, &used, offset); - sdp_dbg_data(sk, "used = %lx bytes\n", used); - if (err == -EAGAIN) { - sdp_dbg_data(sk, "RDMA Read aborted\n"); - goto skb_cleanup; + if (unlikely(err)) { + sdp_post_sendsm(sk); + sdp_abort_rx_srcavail(sk); + rx_sa = NULL; + if (err == -EAGAIN || err == -ETIME) + goto skb_cleanup; + sdp_warn(sk, "err from rdma %d - sendSM\n", err); + skb_unlink(skb, &sk->sk_receive_queue); + __kfree_skb(skb); } } else { sdp_dbg_data(sk, "memcpy 0x%lx bytes +0x%x -> %p\n", @@ -2414,12 +2420,12 @@ sdp_mid_data: } } if (err) { - sdp_dbg(sk, "%s: skb_copy_datagram_iovec failed" + sdp_dbg(sk, "%s: data copy failed" "offset %d size %ld status %d\n", __func__, offset, used, err); /* Exception. Bailout! */ if (!copied) - copied = -EFAULT; + copied = err; break; } } @@ -2439,6 +2445,8 @@ skip_copy: if (rx_sa && !(flags & MSG_PEEK)) { rc = sdp_post_rdma_rd_compl(sk, rx_sa, offset); if (unlikely(rc)) { + sdp_abort_rx_srcavail(sk); + rx_sa = NULL; err = rc; goto out; } @@ -2460,13 +2468,8 @@ skb_cleanup: ntohl(h->mseq), ntohl(h->mseq_ack)); if (rx_sa) { - if (!rx_sa->flags) /* else ssk->rx_sa might - point to another rx_sa */ - ssk->rx_sa = NULL; - - kfree(rx_sa); + sdp_abort_rx_srcavail(sk); rx_sa = NULL; - } force_skb_cleanup: sdp_dbg_data(sk, "unlinking skb %p\n", skb); @@ -2953,8 +2956,6 @@ static void __exit sdp_exit(void) destroy_workqueue(rx_comp_wq); destroy_workqueue(sdp_wq); - flush_scheduled_work(); - BUG_ON(!list_empty(&sock_list)); if (atomic_read(&sdp_current_mem_usage)) diff --git a/drivers/infiniband/ulp/sdp/sdp_rx.c b/drivers/infiniband/ulp/sdp/sdp_rx.c index 91389b44cd4b..3600d9bbf8b8 100644 --- a/drivers/infiniband/ulp/sdp/sdp_rx.c +++ b/drivers/infiniband/ulp/sdp/sdp_rx.c @@ -322,8 +322,7 @@ static inline struct sk_buff *sdp_sock_queue_rcv_skb(struct sock *sk, struct sdp_srcah *srcah = (struct sdp_srcah *)(h+1); struct rx_srcavail_state *rx_sa; - ssk->srcavail_cancel_mseq = 0; - + SDP_WARN_ON(ssk->rx_sa); ssk->rx_sa = rx_sa = RX_SRCAVAIL_STATE(skb) = kzalloc( sizeof(struct rx_srcavail_state), GFP_ATOMIC); if (unlikely(!rx_sa)) { @@ -337,7 +336,8 @@ static inline struct sk_buff *sdp_sock_queue_rcv_skb(struct sock *sk, rx_sa->len = skb_len = ntohl(srcah->len); rx_sa->rkey = ntohl(srcah->rkey); rx_sa->vaddr = be64_to_cpu(srcah->vaddr); - rx_sa->flags = 0; + rx_sa->skb = skb; + rx_sa->is_treated = 0; if (ssk->tx_sa) { sdp_dbg_data(&ssk->isk.sk, "got RX SrcAvail while waiting " @@ -507,17 +507,19 @@ static int sdp_process_rx_ctl_skb(struct sdp_sock *ssk, struct sk_buff *skb) __kfree_skb(skb); break; case SDP_MID_SRCAVAIL_CANCEL: - sdp_dbg_data(sk, "Handling SrcAvailCancel\n"); - sdp_prf(sk, NULL, "Handling SrcAvailCancel"); - if (ssk->rx_sa) { - ssk->srcavail_cancel_mseq = ntohl(h->mseq); - ssk->rx_sa->flags |= RX_SA_ABORTED; - ssk->rx_sa = NULL; /* TODO: change it into SDP_MID_DATA and get - the dirty logic from recvmsg */ - } else { - sdp_dbg(sk, "Got SrcAvailCancel - " - "but no SrcAvail in process\n"); + spin_lock_irq(&ssk->rx_ring.lock); + if (ssk->rx_sa && !ssk->rx_sa->is_treated && + after(ntohl(h->mseq), ssk->rx_sa->mseq)) { + sdp_dbg(sk, "Handling SrcAvailCancel - post SendSM\n"); + RX_SRCAVAIL_STATE(ssk->rx_sa->skb) = NULL; + kfree(ssk->rx_sa); + ssk->rx_sa = NULL; + spin_unlock_irq(&ssk->rx_ring.lock); + sdp_post_sendsm(sk); + break; } + + spin_unlock_irq(&ssk->rx_ring.lock); break; case SDP_MID_SINKAVAIL: sdp_dbg_data(sk, "Got SinkAvail - not supported: ignored\n"); @@ -608,7 +610,10 @@ static int sdp_process_rx_skb(struct sdp_sock *ssk, struct sk_buff *skb) sdp_dbg_data(sk, "Got SrcAvailCancel. " "seq: 0x%d seq_ack: 0x%d\n", ntohl(h->mseq), ntohl(h->mseq_ack)); - ssk->srcavail_cancel_mseq = ntohl(h->mseq); + ssk->sa_cancel_mseq = ntohl(h->mseq); + ssk->sa_cancel_arrived = 1; + if (ssk->rx_sa && ssk->rx_sa->is_treated) + wake_up(sk->sk_sleep); } diff --git a/drivers/infiniband/ulp/sdp/sdp_tx.c b/drivers/infiniband/ulp/sdp/sdp_tx.c index 75137c29ccb0..5b4568ef207d 100644 --- a/drivers/infiniband/ulp/sdp/sdp_tx.c +++ b/drivers/infiniband/ulp/sdp/sdp_tx.c @@ -251,7 +251,8 @@ static inline void sdp_process_tx_wc(struct sdp_sock *ssk, struct ib_wc *wc) sdp_prf1(sk, NULL, "TX comp: RDMA read"); if (!ssk->tx_ring.rdma_inflight) { - sdp_warn(sk, "ERROR: unexpected RDMA read\n"); + sdp_dbg(sk, "unexpected RDMA read, " + "probably was canceled\n"); return; } diff --git a/drivers/infiniband/ulp/sdp/sdp_zcopy.c b/drivers/infiniband/ulp/sdp/sdp_zcopy.c index 89aa51740936..8ec145f2328f 100644 --- a/drivers/infiniband/ulp/sdp/sdp_zcopy.c +++ b/drivers/infiniband/ulp/sdp/sdp_zcopy.c @@ -124,27 +124,9 @@ static int sdp_post_srcavail_cancel(struct sock *sk) sdp_post_sends(ssk, 0); - schedule_delayed_work(&ssk->srcavail_cancel_work, - SDP_SRCAVAIL_CANCEL_TIMEOUT); - return 0; } -void srcavail_cancel_timeout(struct work_struct *work) -{ - struct sdp_sock *ssk = - container_of(work, struct sdp_sock, srcavail_cancel_work.work); - struct sock *sk = &ssk->isk.sk; - - lock_sock(sk); - - sdp_dbg_data(sk, "both SrcAvail and SrcAvailCancel timedout." - " closing connection\n"); - sdp_set_error(sk, -ECONNRESET); - - release_sock(sk); -} - static int sdp_wait_rdmardcompl(struct sdp_sock *ssk, long *timeo_p, int ignore_signals) { @@ -236,10 +218,12 @@ static int sdp_wait_rdmardcompl(struct sdp_sock *ssk, long *timeo_p, return err; } -static void sdp_wait_rdma_wr_finished(struct sdp_sock *ssk) +static int sdp_wait_rdma_wr_finished(struct sdp_sock *ssk, + struct rx_srcavail_state *rx_sa) { struct sock *sk = &ssk->isk.sk; - long timeo = HZ * 5; /* Timeout for for RDMA read */ + long timeo = SDP_RDMA_READ_TIMEOUT; + int rc = 0; DEFINE_WAIT(wait); sdp_dbg_data(sk, "Sleep till RDMA wr finished.\n"); @@ -251,14 +235,21 @@ static void sdp_wait_rdma_wr_finished(struct sdp_sock *ssk) break; } + if (sdp_chk_sa_cancel(ssk, rx_sa)) { + sdp_dbg_data(sk, "got SrcAvailCancel\n"); + rc = -EAGAIN; /* fall to BCopy */ + break; + } + if (!ssk->qp_active) { sdp_dbg_data(sk, "QP destroyed\n"); + rc = -EPIPE; break; } if (!timeo) { - sdp_warn(sk, "Panic: Timed out waiting for RDMA read\n"); - SDP_WARN_ON(1); + sdp_warn(sk, "Timed out waiting for RDMA read\n"); + rc = -ETIME; break; } @@ -266,7 +257,9 @@ static void sdp_wait_rdma_wr_finished(struct sdp_sock *ssk) sdp_prf1(sk, NULL, "Going to sleep"); sk_wait_event(sk, &timeo, - !ssk->tx_ring.rdma_inflight->busy); + !ssk->tx_ring.rdma_inflight->busy || + sdp_chk_sa_cancel(ssk, rx_sa) || + !ssk->qp_active); sdp_prf1(sk, NULL, "Woke up"); sdp_dbg_data(&ssk->isk.sk, "woke up sleepers\n"); @@ -276,6 +269,7 @@ static void sdp_wait_rdma_wr_finished(struct sdp_sock *ssk) finish_wait(sk->sk_sleep, &wait); sdp_dbg_data(sk, "Finished waiting\n"); + return rc; } int sdp_post_rdma_rd_compl(struct sock *sk, @@ -364,8 +358,6 @@ void sdp_handle_sendsm(struct sdp_sock *ssk, u32 mseq_ack) sdp_dbg_data(sk, "Got SendSM - aborting SrcAvail\n"); ssk->tx_sa->abort_flags |= TX_SA_SENDSM; - cancel_delayed_work(&ssk->srcavail_cancel_work); - wake_up(sk->sk_sleep); sdp_dbg_data(sk, "woke up sleepers\n"); @@ -563,7 +555,6 @@ int sdp_rdma_to_iovec(struct sock *sk, struct iovec *iov, struct sk_buff *skb, { struct sdp_sock *ssk = sdp_sk(sk); struct rx_srcavail_state *rx_sa = RX_SRCAVAIL_STATE(skb); - int got_srcavail_cancel; int rc = 0; int len = *used; int copied; @@ -595,20 +586,12 @@ int sdp_rdma_to_iovec(struct sock *sk, struct iovec *iov, struct sk_buff *skb, goto err_post_send; } - sdp_prf(sk, skb, "Finished posting(rc=%d), now to wait", rc); - - got_srcavail_cancel = ssk->srcavail_cancel_mseq > rx_sa->mseq; - + sdp_prf(sk, skb, "Finished posting, now to wait"); sdp_arm_tx_cq(sk); - sdp_wait_rdma_wr_finished(ssk); - - sdp_prf(sk, skb, "Finished waiting(rc=%d)", rc); - if (!ssk->qp_active) { - sdp_dbg_data(sk, "QP destroyed during RDMA read\n"); - rc = -EPIPE; - goto err_post_send; - } + rc = sdp_wait_rdma_wr_finished(ssk, rx_sa); + if (unlikely(rc)) + goto err_wait; copied = rx_sa->umem->length; @@ -616,17 +599,13 @@ int sdp_rdma_to_iovec(struct sock *sk, struct iovec *iov, struct sk_buff *skb, atomic_add(copied, &ssk->rcv_nxt); *used = copied; +err_wait: ssk->tx_ring.rdma_inflight = NULL; err_post_send: sdp_free_fmr(sk, &rx_sa->fmr, &rx_sa->umem); err_alloc_fmr: - if (rc && ssk->qp_active) { - sdp_dbg_data(sk, "Couldn't do RDMA - post sendsm\n"); - rx_sa->flags |= RX_SA_ABORTED; - } - sock_put(sk, SOCK_REF_RDMA_RD); return rc; @@ -698,12 +677,15 @@ static int do_sdp_sendmsg_zcopy(struct sock *sk, struct tx_srcavail_state *tx_sa /* Wait for RdmaRdCompl/SendSM to * finish the transaction */ - *timeo = 2 * HZ; - sdp_dbg_data(sk, "Waiting for SendSM\n"); - sdp_wait_rdmardcompl(ssk, timeo, 1); - sdp_dbg_data(sk, "finished waiting\n"); - - cancel_delayed_work(&ssk->srcavail_cancel_work); + *timeo = SDP_SRCAVAIL_CANCEL_TIMEOUT; + rc = sdp_wait_rdmardcompl(ssk, timeo, 1); + if (unlikely(rc == -ETIME || rc == -EINVAL)) { + /* didn't get RdmaRdCompl/SendSM after sending + * SrcAvailCancel - There is a connection + * problem. */ + sdp_reset(sk); + rc = -sk->sk_err; + } } else { sdp_dbg_data(sk, "QP was destroyed while waiting\n"); } @@ -739,6 +721,11 @@ int sdp_sendmsg_zcopy(struct kiocb *iocb, struct sock *sk, struct iovec *iov) iov->iov_base, iov->iov_len); sdp_prf1(sk, NULL, "sdp_sendmsg_zcopy start"); if (ssk->rx_sa) { + /* Don't want both sides to send SrcAvail because both of them + * will wait on sendmsg() until timeout. + * Don't need to lock 'rx_ring.lock' because when SrcAvail is + * received, sk_sleep'ers are woken up. + */ sdp_dbg_data(sk, "Deadlock prevent: crossing SrcAvail\n"); return 0; } @@ -794,9 +781,6 @@ void sdp_abort_srcavail(struct sock *sk) if (!tx_sa) return; - cancel_delayed_work(&ssk->srcavail_cancel_work); - flush_scheduled_work(); - spin_lock_irqsave(&ssk->tx_sa_lock, flags); sdp_free_fmr(sk, &tx_sa->fmr, &tx_sa->umem); @@ -809,13 +793,18 @@ void sdp_abort_srcavail(struct sock *sk) void sdp_abort_rdma_read(struct sock *sk) { struct sdp_sock *ssk = sdp_sk(sk); - struct rx_srcavail_state *rx_sa = ssk->rx_sa; + struct rx_srcavail_state *rx_sa; + spin_lock_irq(&ssk->rx_ring.lock); + rx_sa = ssk->rx_sa; if (!rx_sa) - return; + goto out; - if (rx_sa->fmr) - sdp_free_fmr(sk, &rx_sa->fmr, &rx_sa->umem); + sdp_free_fmr(sk, &rx_sa->fmr, &rx_sa->umem); - ssk->rx_sa = NULL; + /* kfree(rx_sa) and posting SendSM will be handled in the nornal + * flows. + */ +out: + spin_unlock_irq(&ssk->rx_ring.lock); }