From ea453b568e55d170cf5f5ce406612fce809edda0 Mon Sep 17 00:00:00 2001 From: Amir Vadai Date: Thu, 13 Aug 2009 16:00:40 +0300 Subject: [PATCH] sdp: fix cross SrcAvail deadlock The sending SrcAvail will give up and send SrcAvailCancel. Signed-off-by: Amir Vadai --- drivers/infiniband/ulp/sdp/sdp.h | 122 ++++++++++++++++++-- drivers/infiniband/ulp/sdp/sdp_bcopy.c | 113 ++++--------------- drivers/infiniband/ulp/sdp/sdp_main.c | 74 ++++++++----- drivers/infiniband/ulp/sdp/sdp_rx.c | 51 ++++++--- drivers/infiniband/ulp/sdp/sdp_tx.c | 23 ++-- drivers/infiniband/ulp/sdp/sdp_zcopy.c | 147 +++++++++++-------------- 6 files changed, 296 insertions(+), 234 deletions(-) diff --git a/drivers/infiniband/ulp/sdp/sdp.h b/drivers/infiniband/ulp/sdp/sdp.h index 94c51304bafb..21577b4e73ec 100644 --- a/drivers/infiniband/ulp/sdp/sdp.h +++ b/drivers/infiniband/ulp/sdp/sdp.h @@ -257,6 +257,8 @@ static inline void sdpstats_hist(u32 *h, u32 val, u32 maxidx, int is_log) #define SDP_MAX_RECV_SKB_FRAGS (PAGE_SIZE > 0x8000 ? 1 : 0x8000 / PAGE_SIZE) #define SDP_MAX_SEND_SKB_FRAGS (SDP_MAX_RECV_SKB_FRAGS + 1) #define SDP_MAX_SEND_SGES 32 + +/* payload len - rest will be rx'ed into frags */ #define SDP_HEAD_SIZE (PAGE_SIZE / 2 + sizeof(struct sdp_bsdh)) #define SDP_NUM_WC 4 #define SDP_MAX_PAYLOAD ((1 << 16) - SDP_HEAD_SIZE) @@ -405,6 +407,10 @@ struct bzcopy_state { struct page **pages; }; +enum rx_sa_flag { + RX_SA_ABORTED = 2, +}; + struct rx_srcavail_state { /* Advertised buffer stuff */ u32 mseq; @@ -421,7 +427,15 @@ struct rx_srcavail_state { /* Utility */ u8 busy; - u8 aborted; + enum rx_sa_flag flags; +}; + +enum tx_sa_flag { + TX_SA_SENDSM = 0x01, + TX_SA_CROSS_SEND = 0x02, + TX_SA_INTRRUPTED = 0x04, + TX_SA_TIMEDOUT = 0x08, + TX_SA_ERROR = 0x10, }; struct tx_srcavail_state { @@ -436,7 +450,9 @@ struct tx_srcavail_state { u32 bytes_sent; u32 bytes_acked; - u8 abort; + enum tx_sa_flag abort_flags; + u8 posted; + u32 mseq; }; @@ -558,9 +574,9 @@ struct sdp_sock { int qp_active; struct tx_srcavail_state *tx_sa; + struct rx_srcavail_state *rx_sa; spinlock_t tx_sa_lock; int max_send_sge; - int srcavail_cancel; struct delayed_work srcavail_cancel_work; int srcavail_cancel_mseq; @@ -757,7 +773,7 @@ int sdp_cma_handler(struct rdma_cm_id *, struct rdma_cm_event *); int sdp_tx_ring_create(struct sdp_sock *ssk, struct ib_device *device); void sdp_tx_ring_destroy(struct sdp_sock *ssk); int sdp_xmit_poll(struct sdp_sock *ssk, int force); -void sdp_post_send(struct sdp_sock *ssk, struct sk_buff *skb, u8 mid); +void sdp_post_send(struct sdp_sock *ssk, struct sk_buff *skb); void sdp_post_sends(struct sdp_sock *ssk, int nonagle); void sdp_nagle_timeout(unsigned long data); void sdp_post_keepalive(struct sdp_sock *ssk); @@ -787,14 +803,9 @@ int sdp_get_pages(struct sock *sk, struct page **pages, int page_cnt, unsigned long addr); int sdp_post_rdma_rd_compl(struct sdp_sock *ssk, struct rx_srcavail_state *rx_sa); -int sdp_post_sendsm(struct sdp_sock *ssk); +int sdp_post_sendsm(struct sock *sk); void srcavail_cancel_timeout(struct work_struct *work); -static inline int sdp_tx_ring_slots_left(struct sdp_sock *ssk) -{ - return SDP_TX_SIZE - tx_ring_posted(ssk); -} - static inline void sdp_arm_rx_cq(struct sock *sk) { sdp_prf(sk, NULL, "Arming RX cq"); @@ -886,4 +897,95 @@ static inline struct sk_buff *sdp_stream_alloc_skb(struct sock *sk, int size, return NULL; } +static inline struct sk_buff *sdp_alloc_skb(struct sock *sk, u8 mid, int size) +{ + struct sdp_bsdh *h; + struct sk_buff *skb; + gfp_t gfp; + + if (unlikely(sk->sk_allocation)) + gfp = sk->sk_allocation; + else + gfp = GFP_KERNEL; + + skb = sk_stream_alloc_skb(sk, sizeof(struct sdp_bsdh) + size, gfp); + BUG_ON(!skb); + + skb_header_release(skb); + + h = (struct sdp_bsdh *)skb_push(skb, sizeof *h); + h->mid = mid; + + skb_reset_transport_header(skb); + + return skb; +} +static inline struct sk_buff *sdp_alloc_skb_data(struct sock *sk) +{ + return sdp_alloc_skb(sk, SDP_MID_DATA, 0); +} + +static inline struct sk_buff *sdp_alloc_skb_disconnect(struct sock *sk) +{ + return sdp_alloc_skb(sk, SDP_MID_DISCONN, 0); +} + +static inline struct sk_buff *sdp_alloc_skb_chrcvbuf_ack(struct sock *sk, + int size) +{ + struct sk_buff *skb; + struct sdp_chrecvbuf *resp_size; + + skb = sdp_alloc_skb(sk, SDP_MID_CHRCVBUF_ACK, sizeof(*resp_size)); + + resp_size = (struct sdp_chrecvbuf *)skb_put(skb, sizeof *resp_size); + resp_size->size = htonl(size); + + return skb; +} + +static inline struct sk_buff *sdp_alloc_skb_srcavail(struct sock *sk, + u32 len, u32 rkey, u64 vaddr) +{ + struct sk_buff *skb; + struct sdp_srcah *srcah; + + skb = sdp_alloc_skb(sk, SDP_MID_SRCAVAIL, sizeof(*srcah)); + + srcah = (struct sdp_srcah *)skb_put(skb, sizeof(*srcah)); + srcah->len = htonl(len); + srcah->rkey = htonl(rkey); + srcah->vaddr = cpu_to_be64(vaddr); + + return skb; +} + +static inline struct sk_buff *sdp_alloc_skb_srcavail_cancel(struct sock *sk) +{ + return sdp_alloc_skb(sk, SDP_MID_SRCAVAIL_CANCEL, 0); +} + +static inline struct sk_buff *sdp_alloc_skb_rdmardcompl(struct sock *sk, + u32 len) +{ + struct sk_buff *skb; + struct sdp_rrch *rrch; + + skb = sdp_alloc_skb(sk, SDP_MID_RDMARDCOMPL, sizeof(*rrch)); + + rrch = (struct sdp_rrch *)skb_put(skb, sizeof(*rrch)); + rrch->len = htonl(len); + + return skb; +} + +static inline struct sk_buff *sdp_alloc_skb_sendsm(struct sock *sk) +{ + return sdp_alloc_skb(sk, SDP_MID_SENDSM, 0); +} +static inline int sdp_tx_ring_slots_left(struct sdp_sock *ssk) +{ + return SDP_TX_SIZE - tx_ring_posted(ssk); +} + #endif diff --git a/drivers/infiniband/ulp/sdp/sdp_bcopy.c b/drivers/infiniband/ulp/sdp/sdp_bcopy.c index 477b8a2eac38..db8d6693d1cd 100644 --- a/drivers/infiniband/ulp/sdp/sdp_bcopy.c +++ b/drivers/infiniband/ulp/sdp/sdp_bcopy.c @@ -51,7 +51,7 @@ void _dump_packet(const char *func, int line, struct sock *sk, char *str, int len = 0; char buf[256]; len += snprintf(buf, 255-len, "%s skb: %p mid: %2x:%-20s flags: 0x%x " - "bufs: %d len: %d mseq: %d mseq_ack: %d | ", + "bufs: 0x%x len: 0x%x mseq: 0x%x mseq_ack: 0x%x | ", str, skb, h->mid, mid2str(h->mid), h->flags, ntohs(h->bufs), ntohl(h->len), ntohl(h->mseq), ntohl(h->mseq_ack)); @@ -61,37 +61,37 @@ void _dump_packet(const char *func, int line, struct sock *sk, char *str, hh = (struct sdp_hh *)h; len += snprintf(buf + len, 255-len, "max_adverts: %d majv_minv: 0x%x " - "localrcvsz: %d desremrcvsz: %d |", + "localrcvsz: 0x%x desremrcvsz: 0x%x |", hh->max_adverts, hh->majv_minv, ntohl(hh->localrcvsz), ntohl(hh->desremrcvsz)); break; case SDP_MID_HELLO_ACK: hah = (struct sdp_hah *)h; - len += snprintf(buf + len, 255-len, "actrcvz: %d |", + len += snprintf(buf + len, 255-len, "actrcvz: 0x%x |", ntohl(hah->actrcvsz)); break; case SDP_MID_CHRCVBUF: case SDP_MID_CHRCVBUF_ACK: req_size = (struct sdp_chrecvbuf *)(h+1); - len += snprintf(buf + len, 255-len, "req_size: %d |", + len += snprintf(buf + len, 255-len, "req_size: 0x%x |", ntohl(req_size->size)); break; case SDP_MID_DATA: - len += snprintf(buf + len, 255-len, "data_len: %ld |", + len += snprintf(buf + len, 255-len, "data_len: 0x%lx |", ntohl(h->len) - sizeof(struct sdp_bsdh)); break; case SDP_MID_RDMARDCOMPL: rrch = (struct sdp_rrch *)(h+1); - len += snprintf(buf + len, 255-len, " | len: %d |", + len += snprintf(buf + len, 255-len, " | len: 0x%x |", ntohl(rrch->len)); break; case SDP_MID_SRCAVAIL: srcah = (struct sdp_srcah *)(h+1); - len += snprintf(buf + len, 255-len, " | payload: %ld, " - "len: %d, rkey: 0x%x, vaddr: 0x%llx |", + len += snprintf(buf + len, 255-len, " | payload: 0x%lx, " + "len: 0x%x, rkey: 0x%x, vaddr: 0x%llx |", ntohl(h->len) - sizeof(struct sdp_bsdh) - sizeof(struct sdp_srcah), ntohl(srcah->len), ntohl(srcah->rkey), @@ -180,33 +180,6 @@ out2: mod_timer(&ssk->nagle_timer, jiffies + SDP_NAGLE_TIMEOUT); } -int sdp_post_credits(struct sdp_sock *ssk) -{ - int post_count = 0; - - sdp_dbg_data(&ssk->isk.sk, "credits: %d remote credits: %d " - "tx ring slots left: %d send_head: %p\n", - tx_credits(ssk), remote_credits(ssk), - sdp_tx_ring_slots_left(ssk), - ssk->isk.sk.sk_send_head); - - if (likely(tx_credits(ssk) > 1) && - likely(sdp_tx_ring_slots_left(ssk))) { - struct sk_buff *skb; - skb = sdp_stream_alloc_skb(&ssk->isk.sk, - sizeof(struct sdp_bsdh), - GFP_KERNEL); - if (!skb) - return -ENOMEM; - sdp_post_send(ssk, skb, SDP_MID_DATA); - post_count++; - } - - if (post_count) - sdp_xmit_poll(ssk, 0); - return post_count; -} - void sdp_post_sends(struct sdp_sock *ssk, int nonagle) { /* TODO: nonagle? */ @@ -214,6 +187,7 @@ void sdp_post_sends(struct sdp_sock *ssk, int nonagle) int c; gfp_t gfp_page; int post_count = 0; + struct sock *sk = &ssk->isk.sk; if (unlikely(!ssk->id)) { if (ssk->isk.sk.sk_send_head) { @@ -235,37 +209,16 @@ void sdp_post_sends(struct sdp_sock *ssk, int nonagle) sdp_dbg_data(&ssk->isk.sk, "freed %d\n", wc_processed); } - if (ssk->tx_sa && ssk->srcavail_cancel && - tx_credits(ssk) >= SDP_MIN_TX_CREDITS && - sdp_tx_ring_slots_left(ssk)) { - sdp_prf1(&ssk->isk.sk, NULL, "Going to send srcavail cancel"); - skb = sdp_stream_alloc_skb(&ssk->isk.sk, - sizeof(struct sdp_bsdh), - gfp_page); - /* FIXME */ - BUG_ON(!skb); - TX_SRCAVAIL_STATE(skb) = ssk->tx_sa; - sdp_post_send(ssk, skb, SDP_MID_SRCAVAIL_CANCEL); - post_count++; - ssk->srcavail_cancel = 0; - } - if (ssk->recv_request && ring_tail(ssk->rx_ring) >= ssk->recv_request_head && tx_credits(ssk) >= SDP_MIN_TX_CREDITS && sdp_tx_ring_slots_left(ssk)) { - struct sdp_chrecvbuf *resp_size; ssk->recv_request = 0; - skb = sdp_stream_alloc_skb(&ssk->isk.sk, - sizeof(struct sdp_bsdh) + - sizeof(*resp_size), - gfp_page); - /* FIXME */ - BUG_ON(!skb); - resp_size = (struct sdp_chrecvbuf *)skb_put(skb, - sizeof *resp_size); - resp_size->size = htonl(ssk->recv_frags * PAGE_SIZE); - sdp_post_send(ssk, skb, SDP_MID_CHRCVBUF_ACK); + + skb = sdp_alloc_skb_chrcvbuf_ack(sk, + ssk->recv_frags * PAGE_SIZE); + + sdp_post_send(ssk, skb); post_count++; } @@ -280,25 +233,11 @@ void sdp_post_sends(struct sdp_sock *ssk, int nonagle) sdp_tx_ring_slots_left(ssk) && (skb = ssk->isk.sk.sk_send_head) && sdp_nagle_off(ssk, skb)) { - struct tx_srcavail_state *tx_sa; update_send_head(&ssk->isk.sk, skb); __skb_dequeue(&ssk->isk.sk.sk_write_queue); - tx_sa = TX_SRCAVAIL_STATE(skb); - if (unlikely(tx_sa)) { - if (ssk->tx_sa != tx_sa) { - sdp_warn(&ssk->isk.sk, "SrcAvail cancelled " - "before being sent!\n"); - __kfree_skb(skb); - } else { - if (likely(!tx_sa->abort)) - sdp_post_send(ssk, skb, SDP_MID_SRCAVAIL); - else - sdp_warn(&ssk->isk.sk, "Not sending aborted SrcAvail\n"); - } - } else { - sdp_post_send(ssk, skb, SDP_MID_DATA); - } + sdp_post_send(ssk, skb); + post_count++; } @@ -311,13 +250,11 @@ void sdp_post_sends(struct sdp_sock *ssk, int nonagle) likely(sdp_tx_ring_slots_left(ssk)) && likely((1 << ssk->isk.sk.sk_state) & (TCPF_ESTABLISHED | TCPF_FIN_WAIT1))) { - skb = sdp_stream_alloc_skb(&ssk->isk.sk, - sizeof(struct sdp_bsdh), - GFP_KERNEL); - /* FIXME */ - BUG_ON(!skb); + + skb = sdp_alloc_skb_data(&ssk->isk.sk); + sdp_post_send(ssk, skb); + SDPSTATS_COUNTER_INC(post_send_credits); - sdp_post_send(ssk, skb, SDP_MID_DATA); post_count++; } @@ -330,12 +267,10 @@ void sdp_post_sends(struct sdp_sock *ssk, int nonagle) !ssk->isk.sk.sk_send_head && tx_credits(ssk) > 1) { ssk->sdp_disconnect = 0; - skb = sdp_stream_alloc_skb(&ssk->isk.sk, - sizeof(struct sdp_bsdh), - gfp_page); - /* FIXME */ - BUG_ON(!skb); - sdp_post_send(ssk, skb, SDP_MID_DISCONN); + + skb = sdp_alloc_skb_disconnect(sk); + sdp_post_send(ssk, skb); + post_count++; } diff --git a/drivers/infiniband/ulp/sdp/sdp_main.c b/drivers/infiniband/ulp/sdp/sdp_main.c index 61bdda7a0274..84b0830981a4 100644 --- a/drivers/infiniband/ulp/sdp/sdp_main.c +++ b/drivers/infiniband/ulp/sdp/sdp_main.c @@ -607,7 +607,13 @@ static void sdp_close(struct sock *sk, long timeout) * reader process may not have drained the data yet! */ while ((skb = skb_dequeue(&sk->sk_receive_queue)) != NULL) { - data_was_unread = 1; + struct sdp_bsdh *h = (struct sdp_bsdh *)skb_transport_header(skb); + if (h->mid == SDP_MID_DISCONN) { + sdp_handle_disconn(sk); + } else { + sdp_warn(sk, "Data was unread. skb: %p\n", skb); + data_was_unread = 1; + } __kfree_skb(skb); } @@ -1056,7 +1062,6 @@ int sdp_init_sock(struct sock *sk) ssk->sdp_disconnect = 0; ssk->destructed_already = 0; ssk->destruct_in_process = 0; - ssk->srcavail_cancel = 0; spin_lock_init(&ssk->lock); spin_lock_init(&ssk->tx_sa_lock); @@ -1349,7 +1354,6 @@ static inline void sdp_push(struct sock *sk, struct sdp_sock *ssk, int flags) void skb_entail(struct sock *sk, struct sdp_sock *ssk, struct sk_buff *skb) { - skb_header_release(skb); __skb_queue_tail(&sk->sk_write_queue, skb); sk->sk_wmem_queued += skb->truesize; sk_mem_charge(sk, skb->truesize); @@ -1598,7 +1602,7 @@ static inline int sdp_bzcopy_get(struct sock *sk, struct sk_buff *skb, left -= this_page; skb->len += this_page; - skb->data_len = skb->len; + skb->data_len += this_page; skb->truesize += this_page; sk->sk_wmem_queued += this_page; sk->sk_forward_alloc -= this_page; @@ -1775,8 +1779,7 @@ new_segment: goto wait_for_sndbuf; } - skb = sdp_stream_alloc_skb(sk, 0, - sk->sk_allocation); + skb = sdp_alloc_skb_data(sk); if (!skb) goto wait_for_memory; @@ -1990,12 +1993,6 @@ static int sdp_recvmsg(struct kiocb *iocb, struct sock *sk, struct msghdr *msg, case SDP_MID_SRCAVAIL: rx_sa = RX_SRCAVAIL_STATE(skb); - if (rx_sa->mseq < ssk->srcavail_cancel_mseq) { - rx_sa->aborted = 1; - sdp_dbg_data(sk, "Ignoring src avail " - "due to SrcAvailCancel\n"); - goto skb_cleanup; - } /* if has payload - handle as if MID_DATA */ if (rx_sa->used < skb->len) { @@ -2008,6 +2005,18 @@ static int sdp_recvmsg(struct kiocb *iocb, struct sock *sk, struct msghdr *msg, sdp_dbg_data(sk, "Finished payload. " "RDMAing: %d/%d\n", rx_sa->used, rx_sa->len); + + if (rx_sa->mseq < ssk->srcavail_cancel_mseq) { + rx_sa->flags |= RX_SA_ABORTED; + sdp_dbg_data(sk, "Ignoring src avail " + "due to SrcAvailCancel\n"); + } + + if (rx_sa->flags & RX_SA_ABORTED) { + sdp_warn(sk, "rx_sa aborted. not rdmaing\n"); + goto skb_cleanup; + } + avail_bytes_count = rx_sa->len; } @@ -2022,8 +2031,8 @@ static int sdp_recvmsg(struct kiocb *iocb, struct sock *sk, struct msghdr *msg, } if (before(*seq, SDP_SKB_CB(skb)->seq)) { - sdp_warn(sk, "recvmsg bug: copied %X seq %X\n", - *seq, SDP_SKB_CB(skb)->seq); + sdp_warn(sk, "skb: %p recvmsg bug: copied %X seq %X\n", + skb, *seq, SDP_SKB_CB(skb)->seq); sdp_reset(sk); break; } @@ -2173,33 +2182,40 @@ skip_copy: if (rx_sa) { - if (ssk->srcavail_cancel_mseq < rx_sa->mseq) { - rc = sdp_post_rdma_rd_compl(ssk, rx_sa); - BUG_ON(rc); - } - if (rx_sa->aborted) { - sdp_warn(sk, "RDMA aborted. Sending SendSM\n"); - rc = sdp_post_sendsm(ssk); - BUG_ON(rc); - } + rc = sdp_post_rdma_rd_compl(ssk, rx_sa); + BUG_ON(rc); + } - if ((!rx_sa && used + offset < skb->len) || - (rx_sa && !rx_sa->aborted && rx_sa->used < rx_sa->len)) + if (!rx_sa && used + offset < skb->len) + continue; + + if (rx_sa && !(rx_sa->flags & RX_SA_ABORTED) && + rx_sa->used < rx_sa->len) continue; + offset = 0; skb_cleanup: - if (!(flags & MSG_PEEK) || (rx_sa && rx_sa->aborted)) { + if (!(flags & MSG_PEEK) || + (rx_sa && (rx_sa->flags & RX_SA_ABORTED))) { struct sdp_bsdh *h; h = (struct sdp_bsdh *)skb_transport_header(skb); sdp_prf1(sk, skb, "READ finished. mseq: %d mseq_ack:%d", ntohl(h->mseq), ntohl(h->mseq_ack)); + + if (rx_sa) { + if (!rx_sa->flags) /* else ssk->rx_sa might + point to another rx_sa */ + ssk->rx_sa = NULL; + + kfree(rx_sa); + rx_sa = NULL; + + } + skb_unlink(skb, &sk->sk_receive_queue); __kfree_skb(skb); - - kfree(rx_sa); - rx_sa = NULL; } continue; found_fin_ok: diff --git a/drivers/infiniband/ulp/sdp/sdp_rx.c b/drivers/infiniband/ulp/sdp/sdp_rx.c index 2453715f3969..08e8e3d9f804 100644 --- a/drivers/infiniband/ulp/sdp/sdp_rx.c +++ b/drivers/infiniband/ulp/sdp/sdp_rx.c @@ -305,23 +305,30 @@ static inline struct sk_buff *sdp_sock_queue_rcv_skb(struct sock *sk, SDP_SKB_CB(skb)->seq = rcv_nxt(ssk); if (h->mid == SDP_MID_SRCAVAIL) { struct sdp_srcah *srcah = (struct sdp_srcah *)(h+1); - struct rx_srcavail_state *sa; + struct rx_srcavail_state *rx_sa; ssk->srcavail_cancel_mseq = 0; - sa = RX_SRCAVAIL_STATE(skb) = kzalloc( + ssk->rx_sa = rx_sa = RX_SRCAVAIL_STATE(skb) = kzalloc( sizeof(struct rx_srcavail_state), GFP_ATOMIC); - sa->mseq = ntohl(h->mseq); - sa->aborted = 0; - sa->used = 0; - sa->len = skb_len = ntohl(srcah->len); - sa->rkey = ntohl(srcah->rkey); - sa->vaddr = be64_to_cpu(srcah->vaddr); + rx_sa->mseq = ntohl(h->mseq); + rx_sa->used = 0; + rx_sa->len = skb_len = ntohl(srcah->len); + rx_sa->rkey = ntohl(srcah->rkey); + rx_sa->vaddr = be64_to_cpu(srcah->vaddr); + rx_sa->flags = 0; + + if (ssk->tx_sa) { + sdp_warn(&ssk->isk.sk, "got RX SrcAvail while waiting " + "for TX SrcAvail. waking up TX SrcAvail" + "to be aborted\n"); + wake_up(sk->sk_sleep); + } atomic_add(skb->len, &ssk->rcv_nxt); sdp_dbg_data(sk, "queueing SrcAvail. skb_len = %d vaddr = %lld\n", - skb_len, sa->vaddr); + skb_len, rx_sa->vaddr); } else { skb_len = skb->len; @@ -477,11 +484,23 @@ static int sdp_process_rx_ctl_skb(struct sdp_sock *ssk, struct sk_buff *skb) __kfree_skb(skb); break; case SDP_MID_SRCAVAIL_CANCEL: - sdp_dbg_data(sk, "Handling SrcAvailCancel - sending SendSM\n"); + sdp_dbg_data(sk, "Handling SrcAvailCancel\n"); sdp_prf(sk, NULL, "Handling SrcAvailCancel"); - ssk->srcavail_cancel_mseq = ntohl(h->mseq); - sdp_post_sendsm(ssk); + if (ssk->rx_sa) { + ssk->srcavail_cancel_mseq = ntohl(h->mseq); + ssk->rx_sa->flags |= RX_SA_ABORTED; + ssk->rx_sa = NULL; /* TODO: change it into SDP_MID_DATA and get + the dirty logic from recvmsg */ + sdp_post_sendsm(sk); + } else { + sdp_warn(sk, "Got SrcAvailCancel - " + "but no SrcAvail in process\n"); + } break; + case SDP_MID_SINKAVAIL: + sdp_dbg_data(sk, "Got SinkAvail - not supported: ignored\n"); + sdp_prf(sk, NULL, "Got SinkAvail - not supported: ignored"); + __kfree_skb(skb); case SDP_MID_ABORT: sdp_dbg_data(sk, "Handling ABORT\n"); sdp_prf(sk, NULL, "Handling ABORT"); @@ -549,9 +568,9 @@ static int sdp_process_rx_skb(struct sdp_sock *ssk, struct sk_buff *skb) sk_send_sigurg(sk);*/ skb_pull(skb, sizeof(struct sdp_bsdh)); - if (h->mid == SDP_MID_SRCAVAIL) { + + if (h->mid == SDP_MID_SRCAVAIL) skb_pull(skb, sizeof(struct sdp_srcah)); - } if (unlikely(h->mid == SDP_MID_DATA && skb->len == 0)) { /* Credit update is valid even after RCV_SHUTDOWN */ @@ -628,10 +647,6 @@ static struct sk_buff *sdp_process_rx_wc(struct sdp_sock *ssk, else skb->data_len = 0; - - if (h->mid == SDP_MID_SRCAVAIL) - skb->data_len -= sizeof(struct sdp_srcah); - #ifdef NET_SKBUFF_DATA_USES_OFFSET skb->tail = skb_headlen(skb); #else diff --git a/drivers/infiniband/ulp/sdp/sdp_tx.c b/drivers/infiniband/ulp/sdp/sdp_tx.c index d4569aa1f938..26c5e56ab1c6 100644 --- a/drivers/infiniband/ulp/sdp/sdp_tx.c +++ b/drivers/infiniband/ulp/sdp/sdp_tx.c @@ -62,10 +62,10 @@ int sdp_xmit_poll(struct sdp_sock *ssk, int force) static unsigned long last_send; -void sdp_post_send(struct sdp_sock *ssk, struct sk_buff *skb, u8 mid) +void sdp_post_send(struct sdp_sock *ssk, struct sk_buff *skb) { struct sdp_buf *tx_req; - struct sdp_bsdh *h = (struct sdp_bsdh *)skb_push(skb, sizeof *h); + struct sdp_bsdh *h = (struct sdp_bsdh *)skb_transport_header(skb); unsigned long mseq = ring_head(ssk->tx_ring); int i, rc, frags; u64 addr; @@ -77,13 +77,24 @@ void sdp_post_send(struct sdp_sock *ssk, struct sk_buff *skb, u8 mid) struct ib_sge *sge = ibsge; struct ib_send_wr tx_wr = { 0 }; - SDPSTATS_COUNTER_MID_INC(post_send, mid); + SDPSTATS_COUNTER_MID_INC(post_send, h->mid); SDPSTATS_HIST(send_size, skb->len); ssk->tx_packets++; ssk->tx_bytes += skb->len; - h->mid = mid; + if (unlikely(h->mid == SDP_MID_SRCAVAIL)) { + struct tx_srcavail_state *tx_sa = TX_SRCAVAIL_STATE(skb); + if (ssk->tx_sa != tx_sa) { + sdp_warn(&ssk->isk.sk, "SrcAvail cancelled " + "before being sent!\n"); + WARN_ON(1); + __kfree_skb(skb); + return; + } + TX_SRCAVAIL_STATE(skb)->mseq = mseq; + } + if (unlikely(SDP_SKB_CB(skb)->flags & TCPCB_FLAG_URG)) h->flags = SDP_OOB_PRES | SDP_OOB_PEND; else @@ -92,12 +103,10 @@ void sdp_post_send(struct sdp_sock *ssk, struct sk_buff *skb, u8 mid) h->bufs = htons(rx_ring_posted(ssk)); h->len = htonl(skb->len); h->mseq = htonl(mseq); - if (TX_SRCAVAIL_STATE(skb)) - TX_SRCAVAIL_STATE(skb)->mseq = mseq; h->mseq_ack = htonl(mseq_ack(ssk)); sdp_prf1(&ssk->isk.sk, skb, "TX: %s bufs: %d mseq:%ld ack:%d", - mid2str(mid), rx_ring_posted(ssk), mseq, + mid2str(h->mid), rx_ring_posted(ssk), mseq, ntohl(h->mseq_ack)); SDP_DUMP_PACKET(&ssk->isk.sk, "TX", skb, h); diff --git a/drivers/infiniband/ulp/sdp/sdp_zcopy.c b/drivers/infiniband/ulp/sdp/sdp_zcopy.c index 80c8c8e8c7c7..f2dd17c43c78 100644 --- a/drivers/infiniband/ulp/sdp/sdp_zcopy.c +++ b/drivers/infiniband/ulp/sdp/sdp_zcopy.c @@ -56,7 +56,6 @@ static int sdp_post_srcavail(struct sock *sk, struct tx_srcavail_state *tx_sa, int page_idx, struct iovec *iov, int off, size_t len) { struct sdp_sock *ssk = sdp_sk(sk); - struct sdp_srcah *srcah; struct sk_buff *skb; int payload_len; @@ -65,11 +64,9 @@ static int sdp_post_srcavail(struct sock *sk, struct tx_srcavail_state *tx_sa, BUG_ON(!tx_sa); BUG_ON(!tx_sa->fmr || !tx_sa->fmr->fmr->lkey); - skb = sk_stream_alloc_skb(&ssk->isk.sk, - sizeof(struct sdp_bsdh) + - sizeof(struct sdp_srcah) + - SDP_SRCAVAIL_PAYLOAD_LEN, - GFP_KERNEL); + tx_sa->bytes_sent = tx_sa->bytes_acked = 0; + + skb = sdp_alloc_skb_srcavail(sk, len, tx_sa->fmr->fmr->lkey, off); if (!skb) { return -ENOMEM; } @@ -79,11 +76,6 @@ static int sdp_post_srcavail(struct sock *sk, struct tx_srcavail_state *tx_sa, * but continue to live after skb is freed */ ssk->tx_sa = tx_sa; - srcah = (struct sdp_srcah *)skb_push(skb, sizeof(*srcah)); - srcah->len = htonl(len); - srcah->rkey = htonl(tx_sa->fmr->fmr->lkey); - srcah->vaddr = cpu_to_be64(off); - if (0) { void *payload; payload = skb_put(skb, SDP_SRCAVAIL_PAYLOAD_LEN); @@ -106,6 +98,9 @@ static int sdp_post_srcavail(struct sock *sk, struct tx_srcavail_state *tx_sa, skb_entail(sk, ssk, skb); + ssk->write_seq += payload_len; + SDP_SKB_CB(skb)->end_seq += payload_len; + tx_sa->bytes_sent = len; tx_sa->bytes_acked = payload_len; @@ -117,11 +112,12 @@ static int sdp_post_srcavail(struct sock *sk, struct tx_srcavail_state *tx_sa, static int sdp_post_srcavail_cancel(struct sock *sk) { struct sdp_sock *ssk = sdp_sk(sk); + struct sk_buff *skb; - if (!ssk->tx_sa && !ssk->srcavail_cancel) - return 0; /* srcavail already serviced */ + sdp_warn(&ssk->isk.sk, "Posting srcavail cancel\n"); - ssk->srcavail_cancel = 1; + skb = sdp_alloc_skb_srcavail_cancel(sk); + skb_entail(sk, ssk, skb); sdp_post_sends(ssk, 1); @@ -164,34 +160,48 @@ static int sdp_wait_rdmardcompl(struct sdp_sock *ssk, long *timeo_p, int len, if (unlikely(!*timeo_p)) { err = -ETIME; + tx_sa->abort_flags |= TX_SA_TIMEDOUT; sdp_prf1(sk, NULL, "timeout"); break; } - if (unlikely(!ignore_signals && signal_pending(current))) { - err = -EINTR; - sdp_prf1(sk, NULL, "signalled"); - break; - } - if (tx_sa->bytes_acked == tx_sa->bytes_sent) break; + else if (tx_sa->bytes_acked > tx_sa->bytes_sent) { err = -EINVAL; sdp_warn(sk, "acked bytes > sent bytes\n"); + tx_sa->abort_flags |= TX_SA_ERROR; break; } - if (tx_sa->abort) { + if (tx_sa->abort_flags & TX_SA_SENDSM) { sdp_prf1(sk, NULL, "Aborting SrcAvail sending"); err = -EAGAIN; break ; } + if (!ignore_signals) { + if (signal_pending(current)) { + err = -EINTR; + sdp_prf1(sk, NULL, "signalled"); + tx_sa->abort_flags |= TX_SA_INTRRUPTED; + break; + } + + if (ssk->rx_sa) { + sdp_warn(sk, "Crossing SrcAvail - aborting this\n"); + tx_sa->abort_flags |= TX_SA_CROSS_SEND; + err = -ETIME; + break ; + } + } + posts_handler_put(ssk); sk_wait_event(sk, ¤t_timeo, - tx_sa->abort && + tx_sa->abort_flags && + ssk->rx_sa && (tx_sa->bytes_acked < tx_sa->bytes_sent) && vm_wait); sdp_dbg_data(&ssk->isk.sk, "woke up sleepers\n"); @@ -215,8 +225,8 @@ static int sdp_wait_rdmardcompl(struct sdp_sock *ssk, long *timeo_p, int len, finish_wait(sk->sk_sleep, &wait); - sdp_dbg_data(sk, "Finished waiting - RdmaRdCompl: %d/%d bytes, abort: %d\n", - tx_sa->bytes_acked, tx_sa->bytes_sent, tx_sa->abort); + sdp_dbg_data(sk, "Finished waiting - RdmaRdCompl: %d/%d bytes, flags: 0x%x\n", + tx_sa->bytes_acked, tx_sa->bytes_sent, tx_sa->abort_flags); if (!ssk->qp_active) { sdp_warn(sk, "QP destroyed while waiting\n"); @@ -277,52 +287,27 @@ static int sdp_wait_rdma_wr_finished(struct sdp_sock *ssk, long *timeo_p) int sdp_post_rdma_rd_compl(struct sdp_sock *ssk, struct rx_srcavail_state *rx_sa) { - struct sdp_rrch *rrch; struct sk_buff *skb; - gfp_t gfp_page; int copied = rx_sa->used - rx_sa->reported; if (rx_sa->used <= rx_sa->reported) return 0; - if (unlikely(ssk->isk.sk.sk_allocation)) - gfp_page = ssk->isk.sk.sk_allocation; - else - gfp_page = GFP_KERNEL; + skb = sdp_alloc_skb_rdmardcompl(&ssk->isk.sk, copied); - skb = sdp_stream_alloc_skb(&ssk->isk.sk, - sizeof(struct sdp_bsdh) + - sizeof(struct sdp_rrch), - gfp_page); - /* FIXME */ - BUG_ON(!skb); - - rrch = (struct sdp_rrch *)skb_put(skb, sizeof(*rrch)); - rrch->len = htonl(copied); rx_sa->reported += copied; + /* TODO: What if no tx_credits available? */ - sdp_post_send(ssk, skb, SDP_MID_RDMARDCOMPL); + sdp_post_send(ssk, skb); return 0; } -int sdp_post_sendsm(struct sdp_sock *ssk) +int sdp_post_sendsm(struct sock *sk) { - struct sk_buff *skb; - gfp_t gfp_page; + struct sk_buff *skb = sdp_alloc_skb_sendsm(sk); - if (unlikely(ssk->isk.sk.sk_allocation)) - gfp_page = ssk->isk.sk.sk_allocation; - else - gfp_page = GFP_KERNEL; - - skb = sdp_stream_alloc_skb(&ssk->isk.sk, - sizeof(struct sdp_bsdh), - gfp_page); - /* FIXME */ - BUG_ON(!skb); - - sdp_post_send(ssk, skb, SDP_MID_SENDSM); + sdp_post_send(sdp_sk(sk), skb); return 0; } @@ -367,20 +352,20 @@ void sdp_handle_sendsm(struct sdp_sock *ssk, u32 mseq_ack) goto out; } - if (ssk->tx_sa->mseq < mseq_ack) { - sdp_prf1(sk, NULL, "SendSM arrived for old SrcAvail. " - "SendSM mseq_ack: 0x%x, SrcAvail mseq: 0x%x", + if (mseq_ack < ssk->tx_sa->mseq) { + sdp_warn(sk, "SendSM arrived for old SrcAvail. " + "SendSM mseq_ack: 0x%x, SrcAvail mseq: 0x%x\n", mseq_ack, ssk->tx_sa->mseq); goto out; } - sdp_prf1(sk, NULL, "Got SendSM - aborting SrcAvail"); + sdp_warn(sk, "Got SendSM - aborting SrcAvail\n"); - ssk->tx_sa->abort = 1; + ssk->tx_sa->abort_flags |= TX_SA_SENDSM; cancel_delayed_work(&ssk->srcavail_cancel_work); wake_up(sk->sk_sleep); - sdp_dbg_data(sk, "woke up sleepers\n"); + sdp_warn(sk, "woke up sleepers\n"); out: spin_unlock_irqrestore(&ssk->tx_sa_lock, flags); @@ -743,7 +728,9 @@ int sdp_rdma_to_iovec(struct sock *sk, struct iovec *iov, struct sk_buff *skb, * post sendsm */ sdp_warn(sk, "post rdma, wait_for_compl " "or post rdma_rd_comp failed - post sendsm\n"); - rx_sa->aborted = 1; + rx_sa->flags |= RX_SA_ABORTED; + ssk->rx_sa = NULL; /* TODO: change it into SDP_MID_DATA and get + the dirty logic from recvmsg */ } ssk->tx_ring.rdma_inflight = NULL; @@ -806,18 +793,18 @@ static int sdp_rdma_adv_single(struct sock *sk, if (rc) goto err_abort_send; - rc = sdp_wait_rdmardcompl(ssk, &timeo, len, 0); if (unlikely(rc)) { - switch (rc) { - case -EAGAIN: /* Got SendSM */ + enum tx_sa_flag f = tx_sa->abort_flags; + + if (f & TX_SA_SENDSM) { sdp_warn(sk, "got SendSM. use SEND verb.\n"); - break; + } else if (f & TX_SA_ERROR) { + sdp_warn(sk, "SrcAvail error completion\n"); + sdp_reset(sk); + } else if (ssk->qp_active) { + sdp_warn(sk, "Aborting send. abort_flag = 0x%x.\n", f); - case -ETIME: /* Timedout */ - sdp_warn(sk, "TX srcavail timedout.\n"); - case -EINTR: /* interrupted */ - sdp_prf1(sk, NULL, "Aborting send."); sdp_post_srcavail_cancel(sk); /* Wait for RdmaRdCompl/SendSM to @@ -826,22 +813,17 @@ static int sdp_rdma_adv_single(struct sock *sk, sdp_warn(sk, "Waiting for SendSM\n"); sdp_wait_rdmardcompl(ssk, &timeo, len, 1); sdp_warn(sk, "finished waiting\n"); - - break; - - default: - sdp_warn(sk, "error sending srcavail. rc = %d\n", rc); - /* Socked destroyed while waited */ - break; + } else { + sdp_warn(sk, "QP was destroyed while waiting\n"); } goto err_abort_send; } sdp_prf1(sk, NULL, "got RdmaRdCompl"); - sdp_update_iov_used(sk, iov, tx_sa->bytes_sent); - err_abort_send: + sdp_update_iov_used(sk, iov, tx_sa->bytes_acked); + ib_fmr_pool_unmap(tx_sa->fmr); spin_lock_irqsave(&ssk->tx_sa_lock, lock_flags); @@ -869,6 +851,10 @@ int sdp_sendmsg_zcopy(struct kiocb *iocb, struct sock *sk, struct msghdr *msg, int p_idx; sdp_dbg_data(sk, "%s\n", __func__); + if (ssk->rx_sa) { + sdp_warn(sk, "Deadlock prevent: crossing SrcAvail\n"); + return -EAGAIN; + } lock_sock(sk); @@ -939,14 +925,13 @@ int sdp_sendmsg_zcopy(struct kiocb *iocb, struct sock *sk, struct msghdr *msg, sdp_dbg_data(sk, "bytes_left: %d\n", bytes_left); rc = sdp_rdma_adv_single(sk, tx_sa, iov, p_idx, p_cnt, offset, len); -// offset, p_cnt, len, addrs); copied += len; bytes_left -= len; pages_left -= p_cnt; p_idx += p_cnt; offset = 0; - } while (!rc && !tx_sa->abort && pages_left > 0); + } while (!rc && !tx_sa->abort_flags && pages_left > 0); sdp_unmap_dma(sk, tx_sa->addrs, tx_sa->page_cnt); err_map_dma: -- 2.50.1