#define SDP_NAGLE_TIMEOUT (HZ / 10)
#define SDP_RX_POLL_TIMEOUT (1 + HZ / 1000)
+#define SDP_RDMA_READ_TIMEOUT (5 * HZ)
#define SDP_SRCAVAIL_CANCEL_TIMEOUT (HZ * 5)
#define SDP_SRCAVAIL_ADV_TIMEOUT (1 * HZ)
struct page **pages;
};
-enum rx_sa_flag {
- RX_SA_ABORTED = 2,
-};
-
enum tx_sa_flag {
TX_SA_SENDSM = 0x01,
TX_SA_CROSS_SEND = 0x02,
/* Utility */
u8 busy;
- enum rx_sa_flag flags;
+ struct sk_buff *skb; /* SrcAvail skb */
+ int is_treated; /* recvmsg() is RDMA-reading now */
};
struct tx_srcavail_state {
struct sdp_device *sdp_dev;
int qp_active;
+ spinlock_t tx_sa_lock;
struct tx_srcavail_state *tx_sa;
+
+ /* set when SrcAvail received, reset when SendSM/RdmaRdCompl sent */
struct rx_srcavail_state *rx_sa;
- spinlock_t tx_sa_lock;
- struct delayed_work srcavail_cancel_work;
- int srcavail_cancel_mseq;
+
+ u32 sa_cancel_mseq;
+ int sa_cancel_arrived; /* is 'sa_cancel_mseq' relevant or not, sticky */
struct ib_ucontext context;
write_unlock_bh(&rx_ring->destroyed_lock);
}
+static inline int sdp_chk_sa_cancel(struct sdp_sock *ssk, struct rx_srcavail_state *rx_sa)
+{
+ int res;
+
+ spin_lock_irq(&ssk->rx_ring.lock);
+ res = ssk->sa_cancel_arrived &&
+ before(rx_sa->mseq, ssk->sa_cancel_mseq);
+ spin_unlock_irq(&ssk->rx_ring.lock);
+
+ return res;
+}
+
static inline struct sdp_sock *sdp_sk(const struct sock *sk)
{
return (struct sdp_sock *)sk;
int sdp_post_rdma_rd_compl(struct sock *sk,
struct rx_srcavail_state *rx_sa, u32 offset);
int sdp_post_sendsm(struct sock *sk);
-void srcavail_cancel_timeout(struct work_struct *work);
void sdp_abort_srcavail(struct sock *sk);
void sdp_abort_rdma_read(struct sock *sk);
sdp_abort_srcavail(sk);
- sdp_abort_rdma_read(sk);
-
if (!(sk->sk_shutdown & RCV_SHUTDOWN) || !sk_stream_memory_free(sk)) {
sdp_dbg(sk, "setting state to error\n");
sdp_set_error(sk, rc);
}
percpu_counter_dec(sk->sk_prot->orphan_count);
- cancel_delayed_work(&ssk->srcavail_cancel_work);
-
ssk->destructed_already = 1;
down_read(&device_removal_lock);
sdp_destroy_resources(sk);
up_read(&device_removal_lock);
- flush_scheduled_work();
flush_workqueue(rx_comp_wq);
/* Consider use cancel_work_sync(&ssk->rx_comp_work) */
sdp_cancel_dreq_wait_timeout(ssk);
- cancel_delayed_work(&ssk->srcavail_cancel_work);
-
if (sk->sk_state == TCP_TIME_WAIT) {
if (sdp_cancel_cma_timewait_timeout(ssk))
sock_put(sk, SOCK_REF_CMA);
INIT_LIST_HEAD(&ssk->backlog_queue);
INIT_DELAYED_WORK(&ssk->dreq_wait_work, sdp_dreq_wait_timeout_work);
INIT_DELAYED_WORK(&ssk->cma_timewait_work, sdp_cma_timewait_timeout_work);
- INIT_DELAYED_WORK(&ssk->srcavail_cancel_work, srcavail_cancel_timeout);
INIT_WORK(&ssk->destroy_work, sdp_destroy_work);
lockdep_set_class(&sk->sk_receive_queue.lock,
init_timer(&ssk->tx_ring.timer);
init_timer(&ssk->nagle_timer);
init_timer(&sk->sk_timer);
+ ssk->sa_cancel_arrived = 0;
ssk->zcopy_thresh = -1; /* use global sdp_zcopy_thresh */
ssk->last_bind_err = 0;
return err;
}
-static inline int sdp_abort_rx_srcavail(struct sock *sk, struct sk_buff *skb)
+static inline int sdp_abort_rx_srcavail(struct sock *sk)
{
- struct sdp_bsdh *h = (struct sdp_bsdh *)skb_transport_header(skb);
struct sdp_sock *ssk = sdp_sk(sk);
+ struct sdp_bsdh *h =
+ (struct sdp_bsdh *)skb_transport_header(ssk->rx_sa->skb);
sdp_dbg_data(sk, "SrcAvail aborted\n");
spin_lock_irq(&ssk->rx_ring.lock);
- if (ssk->rx_sa == RX_SRCAVAIL_STATE(skb))
- ssk->rx_sa = NULL;
-
- kfree(RX_SRCAVAIL_STATE(skb));
- RX_SRCAVAIL_STATE(skb) = NULL;
+ RX_SRCAVAIL_STATE(ssk->rx_sa->skb) = NULL;
+ kfree(ssk->rx_sa);
+ ssk->rx_sa = NULL;
spin_unlock_irq(&ssk->rx_ring.lock);
posts_handler_get(ssk);
- sdp_prf(sk, skb, "Read from user");
-
err = -ENOTCONN;
if (sk->sk_state == TCP_LISTEN)
goto out;
goto found_fin_ok;
case SDP_MID_SRCAVAIL:
+ spin_lock_irq(&ssk->rx_ring.lock);
rx_sa = RX_SRCAVAIL_STATE(skb);
+ if (unlikely(!rx_sa)) {
+ /* SrcAvailCancel arrived and handled */
+ h->mid = SDP_MID_DATA;
+ spin_unlock_irq(&ssk->rx_ring.lock);
+ goto sdp_mid_data;
+ }
+
+ rx_sa->is_treated = 1;
+ spin_unlock_irq(&ssk->rx_ring.lock);
- if (rx_sa->mseq < ssk->srcavail_cancel_mseq ||
+ if (sdp_chk_sa_cancel(ssk, rx_sa) ||
!ssk->sdp_dev ||
!ssk->sdp_dev->fmr_pool) {
sdp_dbg_data(sk, "Aborting SA "
"due to SACancel or "
"no fmr pool\n");
sdp_post_sendsm(sk);
+ sdp_abort_rx_srcavail(sk);
+ rx_sa = NULL;
if (offset < skb->len) {
- sdp_abort_rx_srcavail(sk, skb);
sdp_prf(sk, skb, "Converted SA to DATA");
goto sdp_mid_data;
} else {
SDP_SKB_CB(skb)->seq;
sdp_dbg_data(sk, "Peek on RDMA data - "
"fallback to BCopy\n");
- sdp_abort_rx_srcavail(sk, skb);
+ sdp_abort_rx_srcavail(sk);
sdp_post_sendsm(sk);
rx_sa = NULL;
if (real_offset >= skb->len)
sdp_dbg_data(sk, "RDMA copy of %lx bytes\n", used);
err = sdp_rdma_to_iovec(sk, msg->msg_iov, skb,
&used, offset);
- sdp_dbg_data(sk, "used = %lx bytes\n", used);
- if (err == -EAGAIN) {
- sdp_dbg_data(sk, "RDMA Read aborted\n");
- goto skb_cleanup;
+ if (unlikely(err)) {
+ sdp_post_sendsm(sk);
+ sdp_abort_rx_srcavail(sk);
+ rx_sa = NULL;
+ if (err == -EAGAIN || err == -ETIME)
+ goto skb_cleanup;
+ sdp_warn(sk, "err from rdma %d - sendSM\n", err);
+ skb_unlink(skb, &sk->sk_receive_queue);
+ __kfree_skb(skb);
}
} else {
sdp_dbg_data(sk, "memcpy 0x%lx bytes +0x%x -> %p\n",
}
}
if (err) {
- sdp_dbg(sk, "%s: skb_copy_datagram_iovec failed"
+ sdp_dbg(sk, "%s: data copy failed"
"offset %d size %ld status %d\n",
__func__, offset, used, err);
/* Exception. Bailout! */
if (!copied)
- copied = -EFAULT;
+ copied = err;
break;
}
}
if (rx_sa && !(flags & MSG_PEEK)) {
rc = sdp_post_rdma_rd_compl(sk, rx_sa, offset);
if (unlikely(rc)) {
+ sdp_abort_rx_srcavail(sk);
+ rx_sa = NULL;
err = rc;
goto out;
}
ntohl(h->mseq), ntohl(h->mseq_ack));
if (rx_sa) {
- if (!rx_sa->flags) /* else ssk->rx_sa might
- point to another rx_sa */
- ssk->rx_sa = NULL;
-
- kfree(rx_sa);
+ sdp_abort_rx_srcavail(sk);
rx_sa = NULL;
-
}
force_skb_cleanup:
sdp_dbg_data(sk, "unlinking skb %p\n", skb);
destroy_workqueue(rx_comp_wq);
destroy_workqueue(sdp_wq);
- flush_scheduled_work();
-
BUG_ON(!list_empty(&sock_list));
if (atomic_read(&sdp_current_mem_usage))
struct sdp_srcah *srcah = (struct sdp_srcah *)(h+1);
struct rx_srcavail_state *rx_sa;
- ssk->srcavail_cancel_mseq = 0;
-
+ SDP_WARN_ON(ssk->rx_sa);
ssk->rx_sa = rx_sa = RX_SRCAVAIL_STATE(skb) = kzalloc(
sizeof(struct rx_srcavail_state), GFP_ATOMIC);
if (unlikely(!rx_sa)) {
rx_sa->len = skb_len = ntohl(srcah->len);
rx_sa->rkey = ntohl(srcah->rkey);
rx_sa->vaddr = be64_to_cpu(srcah->vaddr);
- rx_sa->flags = 0;
+ rx_sa->skb = skb;
+ rx_sa->is_treated = 0;
if (ssk->tx_sa) {
sdp_dbg_data(&ssk->isk.sk, "got RX SrcAvail while waiting "
__kfree_skb(skb);
break;
case SDP_MID_SRCAVAIL_CANCEL:
- sdp_dbg_data(sk, "Handling SrcAvailCancel\n");
- sdp_prf(sk, NULL, "Handling SrcAvailCancel");
- if (ssk->rx_sa) {
- ssk->srcavail_cancel_mseq = ntohl(h->mseq);
- ssk->rx_sa->flags |= RX_SA_ABORTED;
- ssk->rx_sa = NULL; /* TODO: change it into SDP_MID_DATA and get
- the dirty logic from recvmsg */
- } else {
- sdp_dbg(sk, "Got SrcAvailCancel - "
- "but no SrcAvail in process\n");
+ spin_lock_irq(&ssk->rx_ring.lock);
+ if (ssk->rx_sa && !ssk->rx_sa->is_treated &&
+ after(ntohl(h->mseq), ssk->rx_sa->mseq)) {
+ sdp_dbg(sk, "Handling SrcAvailCancel - post SendSM\n");
+ RX_SRCAVAIL_STATE(ssk->rx_sa->skb) = NULL;
+ kfree(ssk->rx_sa);
+ ssk->rx_sa = NULL;
+ spin_unlock_irq(&ssk->rx_ring.lock);
+ sdp_post_sendsm(sk);
+ break;
}
+
+ spin_unlock_irq(&ssk->rx_ring.lock);
break;
case SDP_MID_SINKAVAIL:
sdp_dbg_data(sk, "Got SinkAvail - not supported: ignored\n");
sdp_dbg_data(sk, "Got SrcAvailCancel. "
"seq: 0x%d seq_ack: 0x%d\n",
ntohl(h->mseq), ntohl(h->mseq_ack));
- ssk->srcavail_cancel_mseq = ntohl(h->mseq);
+ ssk->sa_cancel_mseq = ntohl(h->mseq);
+ ssk->sa_cancel_arrived = 1;
+ if (ssk->rx_sa && ssk->rx_sa->is_treated)
+ wake_up(sk->sk_sleep);
}
sdp_prf1(sk, NULL, "TX comp: RDMA read");
if (!ssk->tx_ring.rdma_inflight) {
- sdp_warn(sk, "ERROR: unexpected RDMA read\n");
+ sdp_dbg(sk, "unexpected RDMA read, "
+ "probably was canceled\n");
return;
}
sdp_post_sends(ssk, 0);
- schedule_delayed_work(&ssk->srcavail_cancel_work,
- SDP_SRCAVAIL_CANCEL_TIMEOUT);
-
return 0;
}
-void srcavail_cancel_timeout(struct work_struct *work)
-{
- struct sdp_sock *ssk =
- container_of(work, struct sdp_sock, srcavail_cancel_work.work);
- struct sock *sk = &ssk->isk.sk;
-
- lock_sock(sk);
-
- sdp_dbg_data(sk, "both SrcAvail and SrcAvailCancel timedout."
- " closing connection\n");
- sdp_set_error(sk, -ECONNRESET);
-
- release_sock(sk);
-}
-
static int sdp_wait_rdmardcompl(struct sdp_sock *ssk, long *timeo_p,
int ignore_signals)
{
return err;
}
-static void sdp_wait_rdma_wr_finished(struct sdp_sock *ssk)
+static int sdp_wait_rdma_wr_finished(struct sdp_sock *ssk,
+ struct rx_srcavail_state *rx_sa)
{
struct sock *sk = &ssk->isk.sk;
- long timeo = HZ * 5; /* Timeout for for RDMA read */
+ long timeo = SDP_RDMA_READ_TIMEOUT;
+ int rc = 0;
DEFINE_WAIT(wait);
sdp_dbg_data(sk, "Sleep till RDMA wr finished.\n");
break;
}
+ if (sdp_chk_sa_cancel(ssk, rx_sa)) {
+ sdp_dbg_data(sk, "got SrcAvailCancel\n");
+ rc = -EAGAIN; /* fall to BCopy */
+ break;
+ }
+
if (!ssk->qp_active) {
sdp_dbg_data(sk, "QP destroyed\n");
+ rc = -EPIPE;
break;
}
if (!timeo) {
- sdp_warn(sk, "Panic: Timed out waiting for RDMA read\n");
- SDP_WARN_ON(1);
+ sdp_warn(sk, "Timed out waiting for RDMA read\n");
+ rc = -ETIME;
break;
}
sdp_prf1(sk, NULL, "Going to sleep");
sk_wait_event(sk, &timeo,
- !ssk->tx_ring.rdma_inflight->busy);
+ !ssk->tx_ring.rdma_inflight->busy ||
+ sdp_chk_sa_cancel(ssk, rx_sa) ||
+ !ssk->qp_active);
sdp_prf1(sk, NULL, "Woke up");
sdp_dbg_data(&ssk->isk.sk, "woke up sleepers\n");
finish_wait(sk->sk_sleep, &wait);
sdp_dbg_data(sk, "Finished waiting\n");
+ return rc;
}
int sdp_post_rdma_rd_compl(struct sock *sk,
sdp_dbg_data(sk, "Got SendSM - aborting SrcAvail\n");
ssk->tx_sa->abort_flags |= TX_SA_SENDSM;
- cancel_delayed_work(&ssk->srcavail_cancel_work);
-
wake_up(sk->sk_sleep);
sdp_dbg_data(sk, "woke up sleepers\n");
{
struct sdp_sock *ssk = sdp_sk(sk);
struct rx_srcavail_state *rx_sa = RX_SRCAVAIL_STATE(skb);
- int got_srcavail_cancel;
int rc = 0;
int len = *used;
int copied;
goto err_post_send;
}
- sdp_prf(sk, skb, "Finished posting(rc=%d), now to wait", rc);
-
- got_srcavail_cancel = ssk->srcavail_cancel_mseq > rx_sa->mseq;
-
+ sdp_prf(sk, skb, "Finished posting, now to wait");
sdp_arm_tx_cq(sk);
- sdp_wait_rdma_wr_finished(ssk);
-
- sdp_prf(sk, skb, "Finished waiting(rc=%d)", rc);
- if (!ssk->qp_active) {
- sdp_dbg_data(sk, "QP destroyed during RDMA read\n");
- rc = -EPIPE;
- goto err_post_send;
- }
+ rc = sdp_wait_rdma_wr_finished(ssk, rx_sa);
+ if (unlikely(rc))
+ goto err_wait;
copied = rx_sa->umem->length;
atomic_add(copied, &ssk->rcv_nxt);
*used = copied;
+err_wait:
ssk->tx_ring.rdma_inflight = NULL;
err_post_send:
sdp_free_fmr(sk, &rx_sa->fmr, &rx_sa->umem);
err_alloc_fmr:
- if (rc && ssk->qp_active) {
- sdp_dbg_data(sk, "Couldn't do RDMA - post sendsm\n");
- rx_sa->flags |= RX_SA_ABORTED;
- }
-
sock_put(sk, SOCK_REF_RDMA_RD);
return rc;
/* Wait for RdmaRdCompl/SendSM to
* finish the transaction */
- *timeo = 2 * HZ;
- sdp_dbg_data(sk, "Waiting for SendSM\n");
- sdp_wait_rdmardcompl(ssk, timeo, 1);
- sdp_dbg_data(sk, "finished waiting\n");
-
- cancel_delayed_work(&ssk->srcavail_cancel_work);
+ *timeo = SDP_SRCAVAIL_CANCEL_TIMEOUT;
+ rc = sdp_wait_rdmardcompl(ssk, timeo, 1);
+ if (unlikely(rc == -ETIME || rc == -EINVAL)) {
+ /* didn't get RdmaRdCompl/SendSM after sending
+ * SrcAvailCancel - There is a connection
+ * problem. */
+ sdp_reset(sk);
+ rc = -sk->sk_err;
+ }
} else {
sdp_dbg_data(sk, "QP was destroyed while waiting\n");
}
iov->iov_base, iov->iov_len);
sdp_prf1(sk, NULL, "sdp_sendmsg_zcopy start");
if (ssk->rx_sa) {
+ /* Don't want both sides to send SrcAvail because both of them
+ * will wait on sendmsg() until timeout.
+ * Don't need to lock 'rx_ring.lock' because when SrcAvail is
+ * received, sk_sleep'ers are woken up.
+ */
sdp_dbg_data(sk, "Deadlock prevent: crossing SrcAvail\n");
return 0;
}
if (!tx_sa)
return;
- cancel_delayed_work(&ssk->srcavail_cancel_work);
- flush_scheduled_work();
-
spin_lock_irqsave(&ssk->tx_sa_lock, flags);
sdp_free_fmr(sk, &tx_sa->fmr, &tx_sa->umem);
void sdp_abort_rdma_read(struct sock *sk)
{
struct sdp_sock *ssk = sdp_sk(sk);
- struct rx_srcavail_state *rx_sa = ssk->rx_sa;
+ struct rx_srcavail_state *rx_sa;
+ spin_lock_irq(&ssk->rx_ring.lock);
+ rx_sa = ssk->rx_sa;
if (!rx_sa)
- return;
+ goto out;
- if (rx_sa->fmr)
- sdp_free_fmr(sk, &rx_sa->fmr, &rx_sa->umem);
+ sdp_free_fmr(sk, &rx_sa->fmr, &rx_sa->umem);
- ssk->rx_sa = NULL;
+ /* kfree(rx_sa) and posting SendSM will be handled in the nornal
+ * flows.
+ */
+out:
+ spin_unlock_irq(&ssk->rx_ring.lock);
}