]> www.infradead.org Git - users/jedix/linux-maple.git/commitdiff
sdp: fix cross SrcAvail deadlock
authorAmir Vadai <amirv@mellanox.co.il>
Thu, 13 Aug 2009 13:00:40 +0000 (16:00 +0300)
committerMukesh Kacker <mukesh.kacker@oracle.com>
Tue, 6 Oct 2015 12:04:37 +0000 (05:04 -0700)
The sending SrcAvail will give up and send SrcAvailCancel.

Signed-off-by: Amir Vadai <amirv@mellanox.co.il>
drivers/infiniband/ulp/sdp/sdp.h
drivers/infiniband/ulp/sdp/sdp_bcopy.c
drivers/infiniband/ulp/sdp/sdp_main.c
drivers/infiniband/ulp/sdp/sdp_rx.c
drivers/infiniband/ulp/sdp/sdp_tx.c
drivers/infiniband/ulp/sdp/sdp_zcopy.c

index 94c51304bafb48af110914ffa1cf48333dfc28d5..21577b4e73ece09dc0bf4df4e3a23a2ad9173708 100644 (file)
@@ -257,6 +257,8 @@ static inline void sdpstats_hist(u32 *h, u32 val, u32 maxidx, int is_log)
 #define SDP_MAX_RECV_SKB_FRAGS (PAGE_SIZE > 0x8000 ? 1 : 0x8000 / PAGE_SIZE)
 #define SDP_MAX_SEND_SKB_FRAGS (SDP_MAX_RECV_SKB_FRAGS + 1)
 #define SDP_MAX_SEND_SGES 32
+
+/* payload len - rest will be rx'ed into frags */
 #define SDP_HEAD_SIZE (PAGE_SIZE / 2 + sizeof(struct sdp_bsdh))
 #define SDP_NUM_WC 4
 #define SDP_MAX_PAYLOAD ((1 << 16) - SDP_HEAD_SIZE)
@@ -405,6 +407,10 @@ struct bzcopy_state {
        struct page         **pages;
 };
 
+enum rx_sa_flag {
+       RX_SA_ABORTED    = 2,
+};
+
 struct rx_srcavail_state {
        /* Advertised buffer stuff */
        u32 mseq;
@@ -421,7 +427,15 @@ struct rx_srcavail_state {
 
        /* Utility */
        u8  busy;
-       u8  aborted;
+       enum rx_sa_flag  flags;
+};
+
+enum tx_sa_flag {
+       TX_SA_SENDSM     = 0x01,
+       TX_SA_CROSS_SEND = 0x02,
+       TX_SA_INTRRUPTED = 0x04,
+       TX_SA_TIMEDOUT   = 0x08,
+       TX_SA_ERROR      = 0x10,
 };
 
 struct tx_srcavail_state {
@@ -436,7 +450,9 @@ struct tx_srcavail_state {
        u32             bytes_sent;
        u32             bytes_acked;
 
-       u8              abort;
+       enum tx_sa_flag abort_flags;
+       u8              posted;
+
        u32             mseq;
 };
 
@@ -558,9 +574,9 @@ struct sdp_sock {
 
        int qp_active;
        struct tx_srcavail_state *tx_sa;
+       struct rx_srcavail_state *rx_sa;
        spinlock_t tx_sa_lock;
        int max_send_sge;
-       int srcavail_cancel;
        struct delayed_work srcavail_cancel_work;
        int srcavail_cancel_mseq;
 
@@ -757,7 +773,7 @@ int sdp_cma_handler(struct rdma_cm_id *, struct rdma_cm_event *);
 int sdp_tx_ring_create(struct sdp_sock *ssk, struct ib_device *device);
 void sdp_tx_ring_destroy(struct sdp_sock *ssk);
 int sdp_xmit_poll(struct sdp_sock *ssk, int force);
-void sdp_post_send(struct sdp_sock *ssk, struct sk_buff *skb, u8 mid);
+void sdp_post_send(struct sdp_sock *ssk, struct sk_buff *skb);
 void sdp_post_sends(struct sdp_sock *ssk, int nonagle);
 void sdp_nagle_timeout(unsigned long data);
 void sdp_post_keepalive(struct sdp_sock *ssk);
@@ -787,14 +803,9 @@ int sdp_get_pages(struct sock *sk, struct page **pages, int page_cnt,
                unsigned long addr);
 int sdp_post_rdma_rd_compl(struct sdp_sock *ssk,
                struct rx_srcavail_state *rx_sa);
-int sdp_post_sendsm(struct sdp_sock *ssk);
+int sdp_post_sendsm(struct sock *sk);
 void srcavail_cancel_timeout(struct work_struct *work);
 
-static inline int sdp_tx_ring_slots_left(struct sdp_sock *ssk)
-{
-       return SDP_TX_SIZE - tx_ring_posted(ssk);
-}
-
 static inline void sdp_arm_rx_cq(struct sock *sk)
 {
        sdp_prf(sk, NULL, "Arming RX cq");
@@ -886,4 +897,95 @@ static inline struct sk_buff *sdp_stream_alloc_skb(struct sock *sk, int size,
        return NULL;
 }
 
+static inline struct sk_buff *sdp_alloc_skb(struct sock *sk, u8 mid, int size)
+{
+       struct sdp_bsdh *h;
+       struct sk_buff *skb;
+       gfp_t gfp;
+
+       if (unlikely(sk->sk_allocation))
+               gfp = sk->sk_allocation;
+       else
+               gfp = GFP_KERNEL;
+
+       skb = sk_stream_alloc_skb(sk, sizeof(struct sdp_bsdh) + size, gfp);
+       BUG_ON(!skb);
+
+        skb_header_release(skb);
+
+       h = (struct sdp_bsdh *)skb_push(skb, sizeof *h);
+       h->mid = mid;
+
+       skb_reset_transport_header(skb);
+
+       return skb;
+}
+static inline struct sk_buff *sdp_alloc_skb_data(struct sock *sk)
+{
+       return sdp_alloc_skb(sk, SDP_MID_DATA, 0);
+}
+
+static inline struct sk_buff *sdp_alloc_skb_disconnect(struct sock *sk)
+{
+       return sdp_alloc_skb(sk, SDP_MID_DISCONN, 0);
+}
+
+static inline struct sk_buff *sdp_alloc_skb_chrcvbuf_ack(struct sock *sk,
+               int size)
+{
+       struct sk_buff *skb;
+       struct sdp_chrecvbuf *resp_size;
+
+       skb = sdp_alloc_skb(sk, SDP_MID_CHRCVBUF_ACK, sizeof(*resp_size));
+
+       resp_size = (struct sdp_chrecvbuf *)skb_put(skb, sizeof *resp_size);
+       resp_size->size = htonl(size);
+
+       return skb;
+}
+
+static inline struct sk_buff *sdp_alloc_skb_srcavail(struct sock *sk,
+       u32 len, u32 rkey, u64 vaddr)
+{
+       struct sk_buff *skb;
+       struct sdp_srcah *srcah;
+
+       skb = sdp_alloc_skb(sk, SDP_MID_SRCAVAIL, sizeof(*srcah));
+
+       srcah = (struct sdp_srcah *)skb_put(skb, sizeof(*srcah));
+       srcah->len = htonl(len);
+       srcah->rkey = htonl(rkey);
+       srcah->vaddr = cpu_to_be64(vaddr);
+
+       return skb;
+}
+
+static inline struct sk_buff *sdp_alloc_skb_srcavail_cancel(struct sock *sk)
+{
+       return sdp_alloc_skb(sk, SDP_MID_SRCAVAIL_CANCEL, 0);
+}
+
+static inline struct sk_buff *sdp_alloc_skb_rdmardcompl(struct sock *sk,
+       u32 len)
+{
+       struct sk_buff *skb;
+       struct sdp_rrch *rrch;
+
+       skb = sdp_alloc_skb(sk, SDP_MID_RDMARDCOMPL, sizeof(*rrch));
+
+       rrch = (struct sdp_rrch *)skb_put(skb, sizeof(*rrch));
+       rrch->len = htonl(len);
+
+       return skb;
+}
+
+static inline struct sk_buff *sdp_alloc_skb_sendsm(struct sock *sk)
+{
+       return sdp_alloc_skb(sk, SDP_MID_SENDSM, 0);
+}
+static inline int sdp_tx_ring_slots_left(struct sdp_sock *ssk)
+{
+       return SDP_TX_SIZE - tx_ring_posted(ssk);
+}
+
 #endif
index 477b8a2eac389d27b67e5bc98b6ab4f01045a1d4..db8d6693d1cde4ee25452d94de044137e8705acd 100644 (file)
@@ -51,7 +51,7 @@ void _dump_packet(const char *func, int line, struct sock *sk, char *str,
        int len = 0;
        char buf[256];
        len += snprintf(buf, 255-len, "%s skb: %p mid: %2x:%-20s flags: 0x%x "
-                       "bufs: %d len: %d mseq: %d mseq_ack: %d | ",
+                       "bufs: 0x%x len: 0x%x mseq: 0x%x mseq_ack: 0x%x | ",
                        str, skb, h->mid, mid2str(h->mid), h->flags,
                        ntohs(h->bufs), ntohl(h->len), ntohl(h->mseq),
                        ntohl(h->mseq_ack));
@@ -61,37 +61,37 @@ void _dump_packet(const char *func, int line, struct sock *sk, char *str,
                hh = (struct sdp_hh *)h;
                len += snprintf(buf + len, 255-len,
                                "max_adverts: %d  majv_minv: 0x%x "
-                               "localrcvsz: %d desremrcvsz: %d |",
+                               "localrcvsz: 0x%x desremrcvsz: 0x%x |",
                                hh->max_adverts, hh->majv_minv,
                                ntohl(hh->localrcvsz),
                                ntohl(hh->desremrcvsz));
                break;
        case SDP_MID_HELLO_ACK:
                hah = (struct sdp_hah *)h;
-               len += snprintf(buf + len, 255-len, "actrcvz: %d |",
+               len += snprintf(buf + len, 255-len, "actrcvz: 0x%x |",
                                ntohl(hah->actrcvsz));
                break;
        case SDP_MID_CHRCVBUF:
        case SDP_MID_CHRCVBUF_ACK:
                req_size = (struct sdp_chrecvbuf *)(h+1);
-               len += snprintf(buf + len, 255-len, "req_size: %d |",
+               len += snprintf(buf + len, 255-len, "req_size: 0x%x |",
                                ntohl(req_size->size));
                break;
        case SDP_MID_DATA:
-               len += snprintf(buf + len, 255-len, "data_len: %ld |",
+               len += snprintf(buf + len, 255-len, "data_len: 0x%lx |",
                        ntohl(h->len) - sizeof(struct sdp_bsdh));
                break;
        case SDP_MID_RDMARDCOMPL:
                rrch = (struct sdp_rrch *)(h+1);
 
-               len += snprintf(buf + len, 255-len, " | len: %d |",
+               len += snprintf(buf + len, 255-len, " | len: 0x%x |",
                                ntohl(rrch->len));
                break;
        case SDP_MID_SRCAVAIL:
                srcah = (struct sdp_srcah *)(h+1);
 
-               len += snprintf(buf + len, 255-len, " | payload: %ld, "
-                               "len: %d, rkey: 0x%x, vaddr: 0x%llx |",
+               len += snprintf(buf + len, 255-len, " | payload: 0x%lx, "
+                               "len: 0x%x, rkey: 0x%x, vaddr: 0x%llx |",
                                ntohl(h->len) - sizeof(struct sdp_bsdh) - 
                                sizeof(struct sdp_srcah),
                                ntohl(srcah->len), ntohl(srcah->rkey),
@@ -180,33 +180,6 @@ out2:
                mod_timer(&ssk->nagle_timer, jiffies + SDP_NAGLE_TIMEOUT);
 }
 
-int sdp_post_credits(struct sdp_sock *ssk)
-{
-       int post_count = 0;
-
-       sdp_dbg_data(&ssk->isk.sk, "credits: %d remote credits: %d "
-                       "tx ring slots left: %d send_head: %p\n",
-               tx_credits(ssk), remote_credits(ssk),
-               sdp_tx_ring_slots_left(ssk),
-               ssk->isk.sk.sk_send_head);
-
-       if (likely(tx_credits(ssk) > 1) &&
-           likely(sdp_tx_ring_slots_left(ssk))) {
-               struct sk_buff *skb;
-               skb = sdp_stream_alloc_skb(&ssk->isk.sk,
-                                         sizeof(struct sdp_bsdh),
-                                         GFP_KERNEL);
-               if (!skb)
-                       return -ENOMEM;
-               sdp_post_send(ssk, skb, SDP_MID_DATA);
-               post_count++;
-       }
-
-       if (post_count)
-               sdp_xmit_poll(ssk, 0);
-       return post_count;
-}
-
 void sdp_post_sends(struct sdp_sock *ssk, int nonagle)
 {
        /* TODO: nonagle? */
@@ -214,6 +187,7 @@ void sdp_post_sends(struct sdp_sock *ssk, int nonagle)
        int c;
        gfp_t gfp_page;
        int post_count = 0;
+       struct sock *sk = &ssk->isk.sk;
 
        if (unlikely(!ssk->id)) {
                if (ssk->isk.sk.sk_send_head) {
@@ -235,37 +209,16 @@ void sdp_post_sends(struct sdp_sock *ssk, int nonagle)
                sdp_dbg_data(&ssk->isk.sk, "freed %d\n", wc_processed);
        }
 
-       if (ssk->tx_sa && ssk->srcavail_cancel &&
-               tx_credits(ssk) >= SDP_MIN_TX_CREDITS &&
-           sdp_tx_ring_slots_left(ssk)) {
-               sdp_prf1(&ssk->isk.sk, NULL, "Going to send srcavail cancel");
-               skb = sdp_stream_alloc_skb(&ssk->isk.sk,
-                                         sizeof(struct sdp_bsdh),
-                                         gfp_page);
-               /* FIXME */
-               BUG_ON(!skb);
-               TX_SRCAVAIL_STATE(skb) = ssk->tx_sa;
-               sdp_post_send(ssk, skb, SDP_MID_SRCAVAIL_CANCEL);
-               post_count++;
-               ssk->srcavail_cancel = 0;
-       }
-
        if (ssk->recv_request &&
            ring_tail(ssk->rx_ring) >= ssk->recv_request_head &&
            tx_credits(ssk) >= SDP_MIN_TX_CREDITS &&
            sdp_tx_ring_slots_left(ssk)) {
-               struct sdp_chrecvbuf *resp_size;
                ssk->recv_request = 0;
-               skb = sdp_stream_alloc_skb(&ssk->isk.sk,
-                                         sizeof(struct sdp_bsdh) +
-                                         sizeof(*resp_size),
-                                         gfp_page);
-               /* FIXME */
-               BUG_ON(!skb);
-               resp_size = (struct sdp_chrecvbuf *)skb_put(skb,
-                               sizeof *resp_size);
-               resp_size->size = htonl(ssk->recv_frags * PAGE_SIZE);
-               sdp_post_send(ssk, skb, SDP_MID_CHRCVBUF_ACK);
+
+               skb = sdp_alloc_skb_chrcvbuf_ack(sk, 
+                               ssk->recv_frags * PAGE_SIZE);
+
+               sdp_post_send(ssk, skb);
                post_count++;
        }
 
@@ -280,25 +233,11 @@ void sdp_post_sends(struct sdp_sock *ssk, int nonagle)
               sdp_tx_ring_slots_left(ssk) &&
               (skb = ssk->isk.sk.sk_send_head) &&
                sdp_nagle_off(ssk, skb)) {
-               struct tx_srcavail_state *tx_sa;
                update_send_head(&ssk->isk.sk, skb);
                __skb_dequeue(&ssk->isk.sk.sk_write_queue);
 
-               tx_sa = TX_SRCAVAIL_STATE(skb);
-               if (unlikely(tx_sa)) {
-                       if (ssk->tx_sa != tx_sa) {
-                               sdp_warn(&ssk->isk.sk, "SrcAvail cancelled "
-                                               "before being sent!\n");
-                               __kfree_skb(skb);
-                       } else {
-                               if (likely(!tx_sa->abort))
-                                       sdp_post_send(ssk, skb, SDP_MID_SRCAVAIL);
-                               else
-                                       sdp_warn(&ssk->isk.sk, "Not sending aborted SrcAvail\n");       
-                       }
-               } else {
-                       sdp_post_send(ssk, skb, SDP_MID_DATA);
-               }
+               sdp_post_send(ssk, skb);
+
                post_count++;
        }
 
@@ -311,13 +250,11 @@ void sdp_post_sends(struct sdp_sock *ssk, int nonagle)
            likely(sdp_tx_ring_slots_left(ssk)) &&
            likely((1 << ssk->isk.sk.sk_state) &
                    (TCPF_ESTABLISHED | TCPF_FIN_WAIT1))) {
-               skb = sdp_stream_alloc_skb(&ssk->isk.sk,
-                                         sizeof(struct sdp_bsdh),
-                                         GFP_KERNEL);
-               /* FIXME */
-               BUG_ON(!skb);
+
+               skb = sdp_alloc_skb_data(&ssk->isk.sk);
+               sdp_post_send(ssk, skb);
+
                SDPSTATS_COUNTER_INC(post_send_credits);
-               sdp_post_send(ssk, skb, SDP_MID_DATA);
                post_count++;
        }
 
@@ -330,12 +267,10 @@ void sdp_post_sends(struct sdp_sock *ssk, int nonagle)
                        !ssk->isk.sk.sk_send_head &&
                        tx_credits(ssk) > 1) {
                ssk->sdp_disconnect = 0;
-               skb = sdp_stream_alloc_skb(&ssk->isk.sk,
-                                         sizeof(struct sdp_bsdh),
-                                         gfp_page);
-               /* FIXME */
-               BUG_ON(!skb);
-               sdp_post_send(ssk, skb, SDP_MID_DISCONN);
+
+               skb = sdp_alloc_skb_disconnect(sk);
+               sdp_post_send(ssk, skb);
+
                post_count++;
        }
 
index 61bdda7a0274c91643a1a9ae850974afb6aa2434..84b0830981a451f9be0d2da28918ffc49c9a27b1 100644 (file)
@@ -607,7 +607,13 @@ static void sdp_close(struct sock *sk, long timeout)
         *  reader process may not have drained the data yet!
         */
        while ((skb = skb_dequeue(&sk->sk_receive_queue)) != NULL) {
-               data_was_unread = 1;
+               struct sdp_bsdh *h = (struct sdp_bsdh *)skb_transport_header(skb);
+               if (h->mid == SDP_MID_DISCONN) {
+                               sdp_handle_disconn(sk);
+               } else {
+                       sdp_warn(sk, "Data was unread. skb: %p\n", skb);
+                       data_was_unread = 1;
+               }
                __kfree_skb(skb);
        }
 
@@ -1056,7 +1062,6 @@ int sdp_init_sock(struct sock *sk)
        ssk->sdp_disconnect = 0;
        ssk->destructed_already = 0;
        ssk->destruct_in_process = 0;
-       ssk->srcavail_cancel = 0;
        spin_lock_init(&ssk->lock);
        spin_lock_init(&ssk->tx_sa_lock);
 
@@ -1349,7 +1354,6 @@ static inline void sdp_push(struct sock *sk, struct sdp_sock *ssk, int flags)
 
 void skb_entail(struct sock *sk, struct sdp_sock *ssk, struct sk_buff *skb)
 {
-        skb_header_release(skb);
         __skb_queue_tail(&sk->sk_write_queue, skb);
        sk->sk_wmem_queued += skb->truesize;
         sk_mem_charge(sk, skb->truesize);
@@ -1598,7 +1602,7 @@ static inline int sdp_bzcopy_get(struct sock *sk, struct sk_buff *skb,
                left -= this_page;
 
                skb->len             += this_page;
-               skb->data_len         = skb->len;
+               skb->data_len        += this_page;
                skb->truesize        += this_page;
                sk->sk_wmem_queued   += this_page;
                sk->sk_forward_alloc -= this_page;
@@ -1775,8 +1779,7 @@ new_segment:
                                                goto wait_for_sndbuf;
                                }
 
-                               skb = sdp_stream_alloc_skb(sk, 0,
-                                               sk->sk_allocation);
+                               skb = sdp_alloc_skb_data(sk);
                                if (!skb)
                                        goto wait_for_memory;
 
@@ -1990,12 +1993,6 @@ static int sdp_recvmsg(struct kiocb *iocb, struct sock *sk, struct msghdr *msg,
 
                        case SDP_MID_SRCAVAIL:
                                rx_sa = RX_SRCAVAIL_STATE(skb);
-                               if (rx_sa->mseq < ssk->srcavail_cancel_mseq) {
-                                       rx_sa->aborted = 1;
-                                       sdp_dbg_data(sk, "Ignoring src avail "
-                                               "due to SrcAvailCancel\n");
-                                       goto skb_cleanup;
-                               }
 
                                /* if has payload - handle as if MID_DATA */
                                if (rx_sa->used < skb->len) {
@@ -2008,6 +2005,18 @@ static int sdp_recvmsg(struct kiocb *iocb, struct sock *sk, struct msghdr *msg,
                                        sdp_dbg_data(sk, "Finished payload. "
                                                "RDMAing: %d/%d\n",
                                                rx_sa->used, rx_sa->len);
+
+                                       if (rx_sa->mseq < ssk->srcavail_cancel_mseq) {
+                                               rx_sa->flags |= RX_SA_ABORTED;
+                                               sdp_dbg_data(sk, "Ignoring src avail "
+                                                               "due to SrcAvailCancel\n");
+                                       }
+
+                                       if (rx_sa->flags & RX_SA_ABORTED) {
+                                               sdp_warn(sk, "rx_sa aborted. not rdmaing\n");
+                                               goto skb_cleanup;
+                                       }
+
                                        avail_bytes_count = rx_sa->len;
                                }
 
@@ -2022,8 +2031,8 @@ static int sdp_recvmsg(struct kiocb *iocb, struct sock *sk, struct msghdr *msg,
                        }
 
                        if (before(*seq, SDP_SKB_CB(skb)->seq)) {
-                               sdp_warn(sk, "recvmsg bug: copied %X seq %X\n",
-                                       *seq, SDP_SKB_CB(skb)->seq);
+                               sdp_warn(sk, "skb: %p recvmsg bug: copied %X seq %X\n",
+                                       skb, *seq, SDP_SKB_CB(skb)->seq);
                                sdp_reset(sk);
                                break;
                        }
@@ -2173,33 +2182,40 @@ skip_copy:
 
 
                if (rx_sa) {
-                       if (ssk->srcavail_cancel_mseq < rx_sa->mseq) {
-                               rc = sdp_post_rdma_rd_compl(ssk, rx_sa);
-                               BUG_ON(rc);
-                       }
-                       if (rx_sa->aborted) {
-                               sdp_warn(sk, "RDMA aborted. Sending SendSM\n");
-                               rc = sdp_post_sendsm(ssk);
-                               BUG_ON(rc);
-                       }
+                       rc = sdp_post_rdma_rd_compl(ssk, rx_sa);
+                       BUG_ON(rc);
+
                }
 
-               if ((!rx_sa && used + offset < skb->len) ||
-                       (rx_sa && !rx_sa->aborted && rx_sa->used < rx_sa->len))
+               if (!rx_sa && used + offset < skb->len)
+                       continue;
+
+               if (rx_sa && !(rx_sa->flags & RX_SA_ABORTED) &&
+                               rx_sa->used < rx_sa->len)
                        continue;
+
                offset = 0;
 
 skb_cleanup:
-               if (!(flags & MSG_PEEK) || (rx_sa && rx_sa->aborted)) {
+               if (!(flags & MSG_PEEK) ||
+                               (rx_sa && (rx_sa->flags & RX_SA_ABORTED))) {
                        struct sdp_bsdh *h;
                        h = (struct sdp_bsdh *)skb_transport_header(skb);
                        sdp_prf1(sk, skb, "READ finished. mseq: %d mseq_ack:%d",
                                ntohl(h->mseq), ntohl(h->mseq_ack));
+
+                       if (rx_sa) {
+                               if (!rx_sa->flags) /* else ssk->rx_sa might
+                                                     point to another rx_sa */
+                                       ssk->rx_sa = NULL;
+
+                               kfree(rx_sa);
+                               rx_sa = NULL;
+                               
+                       }
+
                        skb_unlink(skb, &sk->sk_receive_queue);
                        __kfree_skb(skb);
-
-                       kfree(rx_sa);
-                       rx_sa = NULL;
                }
                continue;
 found_fin_ok:
index 2453715f3969b6c19e4c939a2293ae01c927d580..08e8e3d9f80421c5b0ba0cd83913dd88ea728ca1 100644 (file)
@@ -305,23 +305,30 @@ static inline struct sk_buff *sdp_sock_queue_rcv_skb(struct sock *sk,
        SDP_SKB_CB(skb)->seq = rcv_nxt(ssk);
        if (h->mid == SDP_MID_SRCAVAIL) {
                struct sdp_srcah *srcah = (struct sdp_srcah *)(h+1);
-               struct rx_srcavail_state *sa;
+               struct rx_srcavail_state *rx_sa;
                
                ssk->srcavail_cancel_mseq = 0;
 
-               sa = RX_SRCAVAIL_STATE(skb) = kzalloc(
+               ssk->rx_sa = rx_sa = RX_SRCAVAIL_STATE(skb) = kzalloc(
                                sizeof(struct rx_srcavail_state), GFP_ATOMIC);
 
-               sa->mseq = ntohl(h->mseq);
-               sa->aborted = 0;
-               sa->used = 0;
-               sa->len = skb_len = ntohl(srcah->len);
-               sa->rkey = ntohl(srcah->rkey);
-               sa->vaddr = be64_to_cpu(srcah->vaddr);
+               rx_sa->mseq = ntohl(h->mseq);
+               rx_sa->used = 0;
+               rx_sa->len = skb_len = ntohl(srcah->len);
+               rx_sa->rkey = ntohl(srcah->rkey);
+               rx_sa->vaddr = be64_to_cpu(srcah->vaddr);
+               rx_sa->flags = 0;
+
+               if (ssk->tx_sa) {
+                       sdp_warn(&ssk->isk.sk, "got RX SrcAvail while waiting "
+                                       "for TX SrcAvail. waking up TX SrcAvail"
+                                       "to be aborted\n");
+                       wake_up(sk->sk_sleep);
+               }
 
                atomic_add(skb->len, &ssk->rcv_nxt);
                sdp_dbg_data(sk, "queueing SrcAvail. skb_len = %d vaddr = %lld\n",
-                       skb_len, sa->vaddr);
+                       skb_len, rx_sa->vaddr);
        } else {
                skb_len = skb->len;
 
@@ -477,11 +484,23 @@ static int sdp_process_rx_ctl_skb(struct sdp_sock *ssk, struct sk_buff *skb)
                __kfree_skb(skb);
                break;
        case SDP_MID_SRCAVAIL_CANCEL:
-               sdp_dbg_data(sk, "Handling SrcAvailCancel - sending SendSM\n");
+               sdp_dbg_data(sk, "Handling SrcAvailCancel\n");
                sdp_prf(sk, NULL, "Handling SrcAvailCancel");
-               ssk->srcavail_cancel_mseq = ntohl(h->mseq);
-               sdp_post_sendsm(ssk);
+               if (ssk->rx_sa) {
+                       ssk->srcavail_cancel_mseq = ntohl(h->mseq);
+                       ssk->rx_sa->flags |= RX_SA_ABORTED;
+                       ssk->rx_sa = NULL; /* TODO: change it into SDP_MID_DATA and get 
+                                             the dirty logic from recvmsg */
+                       sdp_post_sendsm(sk);
+               } else {
+                       sdp_warn(sk, "Got SrcAvailCancel - "
+                                       "but no SrcAvail in process\n");
+               }
                break;
+       case SDP_MID_SINKAVAIL:
+               sdp_dbg_data(sk, "Got SinkAvail - not supported: ignored\n");
+               sdp_prf(sk, NULL, "Got SinkAvail - not supported: ignored");
+               __kfree_skb(skb);
        case SDP_MID_ABORT:
                sdp_dbg_data(sk, "Handling ABORT\n");
                sdp_prf(sk, NULL, "Handling ABORT");
@@ -549,9 +568,9 @@ static int sdp_process_rx_skb(struct sdp_sock *ssk, struct sk_buff *skb)
                sk_send_sigurg(sk);*/
 
        skb_pull(skb, sizeof(struct sdp_bsdh));
-       if (h->mid == SDP_MID_SRCAVAIL) {
+
+       if (h->mid == SDP_MID_SRCAVAIL)
                skb_pull(skb, sizeof(struct sdp_srcah));
-       }
 
        if (unlikely(h->mid == SDP_MID_DATA && skb->len == 0)) {
                /* Credit update is valid even after RCV_SHUTDOWN */
@@ -628,10 +647,6 @@ static struct sk_buff *sdp_process_rx_wc(struct sdp_sock *ssk,
        else
                skb->data_len = 0;
 
-
-       if (h->mid == SDP_MID_SRCAVAIL)
-               skb->data_len -= sizeof(struct sdp_srcah);
-
 #ifdef NET_SKBUFF_DATA_USES_OFFSET
        skb->tail = skb_headlen(skb);
 #else
index d4569aa1f938ad39240e73232af271621d09c9e0..26c5e56ab1c66123e5364adc0843a1918353ee7d 100644 (file)
@@ -62,10 +62,10 @@ int sdp_xmit_poll(struct sdp_sock *ssk, int force)
 
 static unsigned long last_send;
 
-void sdp_post_send(struct sdp_sock *ssk, struct sk_buff *skb, u8 mid)
+void sdp_post_send(struct sdp_sock *ssk, struct sk_buff *skb)
 {
        struct sdp_buf *tx_req;
-       struct sdp_bsdh *h = (struct sdp_bsdh *)skb_push(skb, sizeof *h);
+       struct sdp_bsdh *h = (struct sdp_bsdh *)skb_transport_header(skb);
        unsigned long mseq = ring_head(ssk->tx_ring);
        int i, rc, frags;
        u64 addr;
@@ -77,13 +77,24 @@ void sdp_post_send(struct sdp_sock *ssk, struct sk_buff *skb, u8 mid)
        struct ib_sge *sge = ibsge;
        struct ib_send_wr tx_wr = { 0 };
 
-       SDPSTATS_COUNTER_MID_INC(post_send, mid);
+       SDPSTATS_COUNTER_MID_INC(post_send, h->mid);
        SDPSTATS_HIST(send_size, skb->len);
 
        ssk->tx_packets++;
        ssk->tx_bytes += skb->len;
 
-       h->mid = mid;
+       if (unlikely(h->mid == SDP_MID_SRCAVAIL)) {
+               struct tx_srcavail_state *tx_sa = TX_SRCAVAIL_STATE(skb);
+               if (ssk->tx_sa != tx_sa) {
+                       sdp_warn(&ssk->isk.sk, "SrcAvail cancelled "
+                                       "before being sent!\n");
+                       WARN_ON(1);
+                       __kfree_skb(skb);
+                       return;
+               }
+               TX_SRCAVAIL_STATE(skb)->mseq = mseq;
+       }
+
        if (unlikely(SDP_SKB_CB(skb)->flags & TCPCB_FLAG_URG))
                h->flags = SDP_OOB_PRES | SDP_OOB_PEND;
        else
@@ -92,12 +103,10 @@ void sdp_post_send(struct sdp_sock *ssk, struct sk_buff *skb, u8 mid)
        h->bufs = htons(rx_ring_posted(ssk));
        h->len = htonl(skb->len);
        h->mseq = htonl(mseq);
-       if (TX_SRCAVAIL_STATE(skb))
-               TX_SRCAVAIL_STATE(skb)->mseq = mseq;
        h->mseq_ack = htonl(mseq_ack(ssk));
 
        sdp_prf1(&ssk->isk.sk, skb, "TX: %s bufs: %d mseq:%ld ack:%d",
-                       mid2str(mid), rx_ring_posted(ssk), mseq,
+                       mid2str(h->mid), rx_ring_posted(ssk), mseq,
                        ntohl(h->mseq_ack));
 
        SDP_DUMP_PACKET(&ssk->isk.sk, "TX", skb, h);
index 80c8c8e8c7c76b30df00f0f4a1976d5e10291ef6..f2dd17c43c786b594b52c0416fcd0f1a3649f316 100644 (file)
@@ -56,7 +56,6 @@ static int sdp_post_srcavail(struct sock *sk, struct tx_srcavail_state *tx_sa,
                int page_idx, struct iovec *iov, int off, size_t len)
 {
        struct sdp_sock *ssk = sdp_sk(sk);
-       struct sdp_srcah *srcah;
        struct sk_buff *skb;
        int payload_len;
 
@@ -65,11 +64,9 @@ static int sdp_post_srcavail(struct sock *sk, struct tx_srcavail_state *tx_sa,
        BUG_ON(!tx_sa);
        BUG_ON(!tx_sa->fmr || !tx_sa->fmr->fmr->lkey);
 
-       skb = sk_stream_alloc_skb(&ssk->isk.sk,
-                       sizeof(struct sdp_bsdh) +
-                       sizeof(struct sdp_srcah) + 
-                       SDP_SRCAVAIL_PAYLOAD_LEN,
-                       GFP_KERNEL);
+       tx_sa->bytes_sent = tx_sa->bytes_acked = 0;
+
+       skb = sdp_alloc_skb_srcavail(sk, len, tx_sa->fmr->fmr->lkey, off);
        if (!skb) {
                return -ENOMEM;
        }
@@ -79,11 +76,6 @@ static int sdp_post_srcavail(struct sock *sk, struct tx_srcavail_state *tx_sa,
                                         * but continue to live after skb is freed */
        ssk->tx_sa = tx_sa;
 
-       srcah = (struct sdp_srcah *)skb_push(skb, sizeof(*srcah));
-       srcah->len = htonl(len);
-       srcah->rkey = htonl(tx_sa->fmr->fmr->lkey);
-       srcah->vaddr = cpu_to_be64(off);
-
        if (0) {
                void *payload;
                payload = skb_put(skb, SDP_SRCAVAIL_PAYLOAD_LEN);
@@ -106,6 +98,9 @@ static int sdp_post_srcavail(struct sock *sk, struct tx_srcavail_state *tx_sa,
 
        skb_entail(sk, ssk, skb);
        
+       ssk->write_seq += payload_len;
+       SDP_SKB_CB(skb)->end_seq += payload_len;
+
        tx_sa->bytes_sent = len;
        tx_sa->bytes_acked = payload_len;
 
@@ -117,11 +112,12 @@ static int sdp_post_srcavail(struct sock *sk, struct tx_srcavail_state *tx_sa,
 static int sdp_post_srcavail_cancel(struct sock *sk)
 {
        struct sdp_sock *ssk = sdp_sk(sk);
+       struct sk_buff *skb;
 
-       if (!ssk->tx_sa && !ssk->srcavail_cancel)
-               return 0; /* srcavail already serviced */
+       sdp_warn(&ssk->isk.sk, "Posting srcavail cancel\n");
 
-       ssk->srcavail_cancel = 1;
+       skb = sdp_alloc_skb_srcavail_cancel(sk);
+       skb_entail(sk, ssk, skb);
 
        sdp_post_sends(ssk, 1);
 
@@ -164,34 +160,48 @@ static int sdp_wait_rdmardcompl(struct sdp_sock *ssk, long *timeo_p, int len,
 
                if (unlikely(!*timeo_p)) {
                        err = -ETIME;
+                       tx_sa->abort_flags |= TX_SA_TIMEDOUT;
                        sdp_prf1(sk, NULL, "timeout");
                        break;
                }
 
-               if (unlikely(!ignore_signals && signal_pending(current))) {
-                       err = -EINTR;
-                       sdp_prf1(sk, NULL, "signalled");
-                       break;
-               }
-
                if (tx_sa->bytes_acked == tx_sa->bytes_sent)
                        break;
+
                else if (tx_sa->bytes_acked > tx_sa->bytes_sent) {
                        err = -EINVAL;
                        sdp_warn(sk, "acked bytes > sent bytes\n");
+                       tx_sa->abort_flags |= TX_SA_ERROR;
                        break;
                }
 
-               if (tx_sa->abort) {
+               if (tx_sa->abort_flags & TX_SA_SENDSM) {
                        sdp_prf1(sk, NULL, "Aborting SrcAvail sending");
                        err = -EAGAIN;
                        break ;
                }
 
+               if (!ignore_signals) {
+                       if (signal_pending(current)) {
+                               err = -EINTR;
+                               sdp_prf1(sk, NULL, "signalled");
+                               tx_sa->abort_flags |= TX_SA_INTRRUPTED;
+                               break;
+                       }
+
+                       if (ssk->rx_sa) {
+                               sdp_warn(sk, "Crossing SrcAvail - aborting this\n");
+                               tx_sa->abort_flags |= TX_SA_CROSS_SEND;
+                               err = -ETIME;
+                               break ;
+                       }
+               }
+
                posts_handler_put(ssk);
 
                sk_wait_event(sk, &current_timeo,
-                               tx_sa->abort && 
+                               tx_sa->abort_flags &&
+                               ssk->rx_sa &&
                                (tx_sa->bytes_acked < tx_sa->bytes_sent) && 
                                vm_wait);
                sdp_dbg_data(&ssk->isk.sk, "woke up sleepers\n");
@@ -215,8 +225,8 @@ static int sdp_wait_rdmardcompl(struct sdp_sock *ssk, long *timeo_p, int len,
 
        finish_wait(sk->sk_sleep, &wait);
 
-       sdp_dbg_data(sk, "Finished waiting - RdmaRdCompl: %d/%d bytes, abort: %d\n",
-                       tx_sa->bytes_acked, tx_sa->bytes_sent, tx_sa->abort);
+       sdp_dbg_data(sk, "Finished waiting - RdmaRdCompl: %d/%d bytes, flags: 0x%x\n",
+                       tx_sa->bytes_acked, tx_sa->bytes_sent, tx_sa->abort_flags);
 
        if (!ssk->qp_active) {
                sdp_warn(sk, "QP destroyed while waiting\n");
@@ -277,52 +287,27 @@ static int sdp_wait_rdma_wr_finished(struct sdp_sock *ssk, long *timeo_p)
 int sdp_post_rdma_rd_compl(struct sdp_sock *ssk,
                struct rx_srcavail_state *rx_sa)
 {
-       struct sdp_rrch *rrch;
        struct sk_buff *skb;
-       gfp_t gfp_page;
        int copied = rx_sa->used - rx_sa->reported;
 
        if (rx_sa->used <= rx_sa->reported)
                return 0;
 
-       if (unlikely(ssk->isk.sk.sk_allocation))
-               gfp_page = ssk->isk.sk.sk_allocation;
-       else
-               gfp_page = GFP_KERNEL;
+       skb = sdp_alloc_skb_rdmardcompl(&ssk->isk.sk, copied);
 
-       skb = sdp_stream_alloc_skb(&ssk->isk.sk,
-                       sizeof(struct sdp_bsdh) +
-                       sizeof(struct sdp_rrch),
-                       gfp_page);
-       /* FIXME */
-       BUG_ON(!skb);
-
-       rrch = (struct sdp_rrch *)skb_put(skb, sizeof(*rrch));
-       rrch->len = htonl(copied);
        rx_sa->reported += copied;
+
        /* TODO: What if no tx_credits available? */
-       sdp_post_send(ssk, skb, SDP_MID_RDMARDCOMPL);
+       sdp_post_send(ssk, skb);
 
        return 0;
 }
 
-int sdp_post_sendsm(struct sdp_sock *ssk)
+int sdp_post_sendsm(struct sock *sk)
 {
-       struct sk_buff *skb;
-       gfp_t gfp_page;
+       struct sk_buff *skb = sdp_alloc_skb_sendsm(sk);
 
-       if (unlikely(ssk->isk.sk.sk_allocation))
-               gfp_page = ssk->isk.sk.sk_allocation;
-       else
-               gfp_page = GFP_KERNEL;
-
-       skb = sdp_stream_alloc_skb(&ssk->isk.sk,
-                       sizeof(struct sdp_bsdh),
-                       gfp_page);
-       /* FIXME */
-       BUG_ON(!skb);
-
-       sdp_post_send(ssk, skb, SDP_MID_SENDSM);
+       sdp_post_send(sdp_sk(sk), skb);
 
        return 0;
 }
@@ -367,20 +352,20 @@ void sdp_handle_sendsm(struct sdp_sock *ssk, u32 mseq_ack)
                goto out;
        }
 
-       if (ssk->tx_sa->mseq < mseq_ack) {
-               sdp_prf1(sk, NULL, "SendSM arrived for old SrcAvail. "
-                       "SendSM mseq_ack: 0x%x, SrcAvail mseq: 0x%x",
+       if (mseq_ack < ssk->tx_sa->mseq) {
+               sdp_warn(sk, "SendSM arrived for old SrcAvail. "
+                       "SendSM mseq_ack: 0x%x, SrcAvail mseq: 0x%x\n",
                        mseq_ack, ssk->tx_sa->mseq);
                goto out;
        }
 
-       sdp_prf1(sk, NULL, "Got SendSM - aborting SrcAvail");
+       sdp_warn(sk, "Got SendSM - aborting SrcAvail\n");
 
-       ssk->tx_sa->abort = 1;
+       ssk->tx_sa->abort_flags |= TX_SA_SENDSM;
        cancel_delayed_work(&ssk->srcavail_cancel_work);
 
        wake_up(sk->sk_sleep);
-       sdp_dbg_data(sk, "woke up sleepers\n");
+       sdp_warn(sk, "woke up sleepers\n");
 
 out:
        spin_unlock_irqrestore(&ssk->tx_sa_lock, flags);
@@ -743,7 +728,9 @@ int sdp_rdma_to_iovec(struct sock *sk, struct iovec *iov, struct sk_buff *skb,
                 * post sendsm */
                sdp_warn(sk, "post rdma, wait_for_compl "
                        "or post rdma_rd_comp failed - post sendsm\n");
-               rx_sa->aborted = 1;
+               rx_sa->flags |= RX_SA_ABORTED;
+               ssk->rx_sa = NULL; /* TODO: change it into SDP_MID_DATA and get 
+                                     the dirty logic from recvmsg */
        }
 
        ssk->tx_ring.rdma_inflight = NULL;
@@ -806,18 +793,18 @@ static int sdp_rdma_adv_single(struct sock *sk,
        if (rc)
                goto err_abort_send;
 
-
        rc = sdp_wait_rdmardcompl(ssk, &timeo, len, 0);
        if (unlikely(rc)) {
-               switch (rc) {
-               case -EAGAIN: /* Got SendSM */
+               enum tx_sa_flag f = tx_sa->abort_flags;
+
+               if (f & TX_SA_SENDSM) {
                        sdp_warn(sk, "got SendSM. use SEND verb.\n");
-                       break;
+               } else if (f & TX_SA_ERROR) {
+                       sdp_warn(sk, "SrcAvail error completion\n");
+                       sdp_reset(sk);
+               } else if (ssk->qp_active) {
+                       sdp_warn(sk, "Aborting send. abort_flag = 0x%x.\n", f);
 
-               case -ETIME: /* Timedout */
-                       sdp_warn(sk, "TX srcavail timedout.\n");
-               case -EINTR: /* interrupted */
-                       sdp_prf1(sk, NULL, "Aborting send.");
                        sdp_post_srcavail_cancel(sk);
 
                        /* Wait for RdmaRdCompl/SendSM to
@@ -826,22 +813,17 @@ static int sdp_rdma_adv_single(struct sock *sk,
                        sdp_warn(sk, "Waiting for SendSM\n");
                        sdp_wait_rdmardcompl(ssk, &timeo, len, 1);
                        sdp_warn(sk, "finished waiting\n");
-
-                       break;
-
-               default:
-                       sdp_warn(sk, "error sending srcavail. rc = %d\n", rc);
-                       /* Socked destroyed while waited */
-                       break;
+               } else {
+                       sdp_warn(sk, "QP was destroyed while waiting\n");
                }
 
                goto err_abort_send;
        }
        sdp_prf1(sk, NULL, "got RdmaRdCompl");
 
-       sdp_update_iov_used(sk, iov, tx_sa->bytes_sent);
-
 err_abort_send:
+       sdp_update_iov_used(sk, iov, tx_sa->bytes_acked);
+
        ib_fmr_pool_unmap(tx_sa->fmr);
 
        spin_lock_irqsave(&ssk->tx_sa_lock, lock_flags);
@@ -869,6 +851,10 @@ int sdp_sendmsg_zcopy(struct kiocb *iocb, struct sock *sk, struct msghdr *msg,
        int p_idx;
 
        sdp_dbg_data(sk, "%s\n", __func__);
+       if (ssk->rx_sa) {
+               sdp_warn(sk, "Deadlock prevent: crossing SrcAvail\n");
+               return -EAGAIN;
+       }
 
        lock_sock(sk);
 
@@ -939,14 +925,13 @@ int sdp_sendmsg_zcopy(struct kiocb *iocb, struct sock *sk, struct msghdr *msg,
                        sdp_dbg_data(sk, "bytes_left: %d\n", bytes_left);
                        rc = sdp_rdma_adv_single(sk, tx_sa, iov,
                                        p_idx, p_cnt, offset, len);
-//                                     offset, p_cnt, len, addrs);
 
                        copied += len;
                        bytes_left -= len;
                        pages_left -= p_cnt;
                        p_idx += p_cnt;
                        offset = 0;
-               } while (!rc && !tx_sa->abort && pages_left > 0);
+               } while (!rc && !tx_sa->abort_flags && pages_left > 0);
 
                sdp_unmap_dma(sk, tx_sa->addrs, tx_sa->page_cnt);
 err_map_dma: