From: Amir Vadai Date: Sun, 26 Apr 2009 12:31:27 +0000 (+0300) Subject: sdp: created sdp_rx and sdp_tx X-Git-Tag: v4.1.12-92~264^2~5^2~283 X-Git-Url: https://www.infradead.org/git/?a=commitdiff_plain;h=9b24af858737f12f8f3711d678ba147232581ebd;p=users%2Fjedix%2Flinux-maple.git sdp: created sdp_rx and sdp_tx Signed-off-by: Amir Vadai --- diff --git a/drivers/infiniband/ulp/sdp/Makefile b/drivers/infiniband/ulp/sdp/Makefile index 5da4b7bbcd4b0..b14a16a4407a7 100644 --- a/drivers/infiniband/ulp/sdp/Makefile +++ b/drivers/infiniband/ulp/sdp/Makefile @@ -3,4 +3,4 @@ EXTRA_CFLAGS += -ggdb obj-$(CONFIG_INFINIBAND_SDP) += ib_sdp.o -ib_sdp-objs := sdp_main.o sdp_cma.o sdp_bcopy.o sdp_proc.o +ib_sdp-objs := sdp_main.o sdp_cma.o sdp_bcopy.o sdp_proc.o sdp_tx.o sdp_rx.o diff --git a/drivers/infiniband/ulp/sdp/sdp.h b/drivers/infiniband/ulp/sdp/sdp.h index 8da329fc96046..5e782820099c7 100644 --- a/drivers/infiniband/ulp/sdp/sdp.h +++ b/drivers/infiniband/ulp/sdp/sdp.h @@ -222,6 +222,7 @@ static inline void sdpstats_hist(u32 *h, u32 val, u32 maxidx, int is_log) #define MIN(a, b) (a < b ? a : b) #endif +extern struct workqueue_struct *sdp_wq; extern struct list_head sock_list; extern spinlock_t sock_list_lock; @@ -318,6 +319,10 @@ static inline int sdp_tx_ring_slots_left(struct sdp_tx_ring *tx_ring) return SDP_TX_SIZE - (tx_ring->head - tx_ring->tail); } +struct sdp_chrecvbuf { + u32 size; +}; + struct sdp_sock { /* sk has to be the first member of inet_sock */ struct inet_sock isk; @@ -509,36 +514,38 @@ void _dump_packet(const char *func, int line, struct sock *sk, char *str, int sdp_cma_handler(struct rdma_cm_id *, struct rdma_cm_event *); void sdp_reset(struct sock *sk); void sdp_reset_sk(struct sock *sk, int rc); -void sdp_rx_irq(struct ib_cq *cq, void *cq_context); -void sdp_tx_irq(struct ib_cq *cq, void *cq_context); -void sdp_poll_tx_cq(unsigned long data); -void _sdp_poll_tx_cq(unsigned long data); -void sdp_rx_comp_work(struct work_struct *work); -void sdp_process_tx_wc_work(struct work_struct *work); int sdp_post_credits(struct sdp_sock *ssk); -void sdp_post_send(struct sdp_sock *ssk, struct sk_buff *skb, u8 mid); -void sdp_post_recvs(struct sdp_sock *ssk); -int sdp_poll_rx_cq(struct sdp_sock *ssk); -void _sdp_post_sends(const char *func, int line, struct sdp_sock *ssk, int nonagle); -#define sdp_post_sends(ssk, nonagle) _sdp_post_sends(__func__, __LINE__, ssk, nonagle) void sdp_destroy_work(struct work_struct *work); void sdp_cancel_dreq_wait_timeout(struct sdp_sock *ssk); void sdp_dreq_wait_timeout_work(struct work_struct *work); -struct sk_buff *sdp_recv_completion(struct sdp_sock *ssk, int id); -struct sk_buff *sdp_send_completion(struct sdp_sock *ssk, int mseq); void sdp_urg(struct sdp_sock *ssk, struct sk_buff *skb); void sdp_add_sock(struct sdp_sock *ssk); void sdp_remove_sock(struct sdp_sock *ssk); void sdp_remove_large_sock(struct sdp_sock *ssk); -int sdp_resize_buffers(struct sdp_sock *ssk, u32 new_size); -int sdp_init_buffers(struct sdp_sock *ssk, u32 new_size); void sdp_post_keepalive(struct sdp_sock *ssk); void sdp_start_keepalive_timer(struct sock *sk); void sdp_bzcopy_write_space(struct sdp_sock *ssk); int sdp_init_sock(struct sock *sk); int __init sdp_proc_init(void); void sdp_proc_unregister(void); + int sdp_xmit_poll(struct sdp_sock *ssk, int force); +void sdp_tx_ring_purge(struct sdp_sock *ssk); +void sdp_post_send(struct sdp_sock *ssk, struct sk_buff *skb, u8 mid); +void _sdp_post_sends(const char *func, int line, struct sdp_sock *ssk, int nonagle); +#define sdp_post_sends(ssk, nonagle) _sdp_post_sends(__func__, __LINE__, ssk, nonagle) +void sdp_process_tx_wc_work(struct work_struct *work); +void sdp_poll_tx_cq(unsigned long data); +void _sdp_poll_tx_cq(unsigned long data); +void sdp_tx_irq(struct ib_cq *cq, void *cq_context); + +int sdp_resize_buffers(struct sdp_sock *ssk, u32 new_size); +int sdp_init_buffers(struct sdp_sock *ssk, u32 new_size); +void sdp_rx_ring_purge(struct sdp_sock *ssk); +void sdp_post_recvs(struct sdp_sock *ssk); +void sdp_rx_comp_work(struct work_struct *work); +int sdp_poll_rx_cq(struct sdp_sock *ssk); +void sdp_rx_irq(struct ib_cq *cq, void *cq_context); static inline struct sk_buff *sdp_stream_alloc_skb(struct sock *sk, int size, gfp_t gfp) { diff --git a/drivers/infiniband/ulp/sdp/sdp_bcopy.c b/drivers/infiniband/ulp/sdp/sdp_bcopy.c index 6d432378aeb4d..5d4441ae8ecc7 100644 --- a/drivers/infiniband/ulp/sdp/sdp_bcopy.c +++ b/drivers/infiniband/ulp/sdp/sdp_bcopy.c @@ -39,153 +39,6 @@ #define SDP_RESIZE_WAIT 16 -struct sdp_chrecvbuf { - u32 size; -}; - -static int rcvbuf_scale = 0x10; - -int rcvbuf_initial_size = SDP_HEAD_SIZE; -module_param_named(rcvbuf_initial_size, rcvbuf_initial_size, int, 0644); -MODULE_PARM_DESC(rcvbuf_initial_size, "Receive buffer initial size in bytes."); - -module_param_named(rcvbuf_scale, rcvbuf_scale, int, 0644); -MODULE_PARM_DESC(rcvbuf_scale, "Receive buffer size scale factor."); - -static int top_mem_usage = 0; -module_param_named(top_mem_usage, top_mem_usage, int, 0644); -MODULE_PARM_DESC(top_mem_usage, "Top system wide sdp memory usage for recv (in MB)."); - -#ifdef CONFIG_PPC -static int max_large_sockets = 100; -#else -static int max_large_sockets = 1000; -#endif -module_param_named(max_large_sockets, max_large_sockets, int, 0644); -MODULE_PARM_DESC(max_large_sockets, "Max number of large sockets (32k buffers)."); - -#define sdp_cnt(var) do { (var)++; } while (0) -static unsigned sdp_keepalive_probes_sent = 0; - -module_param_named(sdp_keepalive_probes_sent, sdp_keepalive_probes_sent, uint, 0644); -MODULE_PARM_DESC(sdp_keepalive_probes_sent, "Total number of keepalive probes sent."); - -static int curr_large_sockets = 0; -atomic_t sdp_current_mem_usage; -spinlock_t sdp_large_sockets_lock; - -static int sdp_get_large_socket(struct sdp_sock *ssk) -{ - int count, ret; - - if (ssk->recv_request) - return 1; - - spin_lock_irq(&sdp_large_sockets_lock); - count = curr_large_sockets; - ret = curr_large_sockets < max_large_sockets; - if (ret) - curr_large_sockets++; - spin_unlock_irq(&sdp_large_sockets_lock); - - return ret; -} - -void sdp_remove_large_sock(struct sdp_sock *ssk) -{ - if (ssk->recv_frags) { - spin_lock_irq(&sdp_large_sockets_lock); - curr_large_sockets--; - spin_unlock_irq(&sdp_large_sockets_lock); - } -} - -/* Like tcp_fin - called when SDP_MID_DISCONNECT is received */ -static void sdp_fin(struct sock *sk) -{ - sdp_dbg(sk, "%s\n", __func__); - - sk->sk_shutdown |= RCV_SHUTDOWN; - sock_set_flag(sk, SOCK_DONE); - - switch (sk->sk_state) { - case TCP_SYN_RECV: - case TCP_ESTABLISHED: - sdp_exch_state(sk, TCPF_SYN_RECV | TCPF_ESTABLISHED, - TCP_CLOSE_WAIT); - break; - - case TCP_FIN_WAIT1: - /* Received a reply FIN - start Infiniband tear down */ - sdp_dbg(sk, "%s: Starting Infiniband tear down sending DREQ\n", - __func__); - - sdp_cancel_dreq_wait_timeout(sdp_sk(sk)); - - sdp_exch_state(sk, TCPF_FIN_WAIT1, TCP_TIME_WAIT); - - if (sdp_sk(sk)->id) { - rdma_disconnect(sdp_sk(sk)->id); - } else { - sdp_warn(sk, "%s: sdp_sk(sk)->id is NULL\n", __func__); - return; - } - break; - case TCP_TIME_WAIT: - /* This is a mutual close situation and we've got the DREQ from - the peer before the SDP_MID_DISCONNECT */ - break; - case TCP_CLOSE: - /* FIN arrived after IB teardown started - do nothing */ - sdp_dbg(sk, "%s: fin in state %s\n", - __func__, sdp_state_str(sk->sk_state)); - return; - default: - sdp_warn(sk, "%s: FIN in unexpected state. sk->sk_state=%d\n", - __func__, sk->sk_state); - break; - } - - - sk_mem_reclaim(sk); - - if (!sock_flag(sk, SOCK_DEAD)) { - sk->sk_state_change(sk); - - /* Do not send POLL_HUP for half duplex close. */ - if (sk->sk_shutdown == SHUTDOWN_MASK || - sk->sk_state == TCP_CLOSE) - sk_wake_async(sk, 1, POLL_HUP); - else - sk_wake_async(sk, 1, POLL_IN); - } -} - -void sdp_post_keepalive(struct sdp_sock *ssk) -{ - int rc; - struct ib_send_wr wr, *bad_wr; - - sdp_dbg(&ssk->isk.sk, "%s\n", __func__); - - memset(&wr, 0, sizeof(wr)); - - wr.next = NULL; - wr.wr_id = 0; - wr.sg_list = NULL; - wr.num_sge = 0; - wr.opcode = IB_WR_RDMA_WRITE; - - rc = ib_post_send(ssk->qp, &wr, &bad_wr); - if (rc) { - sdp_dbg(&ssk->isk.sk, "ib_post_keepalive failed with status %d.\n", rc); - sdp_set_error(&ssk->isk.sk, -ECONNRESET); - wake_up(&ssk->wq); - } - - sdp_cnt(sdp_keepalive_probes_sent); -} - #ifdef CONFIG_INFINIBAND_SDP_DEBUG_DATA void _dump_packet(const char *func, int line, struct sock *sk, char *str, struct sk_buff *skb, const struct sdp_bsdh *h) @@ -245,336 +98,6 @@ void _dump_packet(const char *func, int line, struct sock *sk, char *str, } #endif -static int sdp_process_tx_cq(struct sdp_sock *ssk); - -int sdp_xmit_poll(struct sdp_sock *ssk, int force) -{ - int wc_processed = 0; -// sdp_prf(&ssk->isk.sk, NULL, "xmit_poll force = %d", force); - - /* If we don't have a pending timer, set one up to catch our recent - post in case the interface becomes idle */ - if (!timer_pending(&ssk->tx_ring.timer)) - mod_timer(&ssk->tx_ring.timer, jiffies + SDP_TX_POLL_TIMEOUT); - - /* Poll the CQ every SDP_TX_POLL_MODER packets */ - if (force || (++ssk->tx_ring.poll_cnt & (SDP_TX_POLL_MODER - 1)) == 0) - wc_processed = sdp_process_tx_cq(ssk); - - return wc_processed; -} - -void sdp_post_send(struct sdp_sock *ssk, struct sk_buff *skb, u8 mid) -{ - struct sdp_buf *tx_req; - struct sdp_bsdh *h = (struct sdp_bsdh *)skb_push(skb, sizeof *h); - unsigned mseq = ssk->tx_ring.head; - int i, rc, frags; - u64 addr; - struct ib_device *dev; - struct ib_sge *sge; - struct ib_send_wr *bad_wr; - -#define ENUM2STR(e) [e] = #e - static char *mid2str[] = { - ENUM2STR(SDP_MID_HELLO), - ENUM2STR(SDP_MID_HELLO_ACK), - ENUM2STR(SDP_MID_DISCONN), - ENUM2STR(SDP_MID_CHRCVBUF), - ENUM2STR(SDP_MID_CHRCVBUF_ACK), - ENUM2STR(SDP_MID_DATA), - }; - sdp_prf(&ssk->isk.sk, skb, "post_send mid = %s, bufs = %d", - mid2str[mid], ssk->rx_head - ssk->rx_tail); - - SDPSTATS_COUNTER_MID_INC(post_send, mid); - SDPSTATS_HIST(send_size, skb->len); - - h->mid = mid; - if (unlikely(TCP_SKB_CB(skb)->flags & TCPCB_FLAG_URG)) - h->flags = SDP_OOB_PRES | SDP_OOB_PEND; - else - h->flags = 0; - - h->bufs = htons(ssk->rx_head - ssk->rx_tail); - h->len = htonl(skb->len); - h->mseq = htonl(mseq); - h->mseq_ack = htonl(ssk->mseq_ack); - - SDP_DUMP_PACKET(&ssk->isk.sk, "TX", skb, h); - tx_req = &ssk->tx_ring.buffer[mseq & (SDP_TX_SIZE - 1)]; - tx_req->skb = skb; - dev = ssk->ib_device; - sge = ssk->ibsge; - addr = ib_dma_map_single(dev, skb->data, skb->len - skb->data_len, - DMA_TO_DEVICE); - tx_req->mapping[0] = addr; - - /* TODO: proper error handling */ - BUG_ON(ib_dma_mapping_error(dev, addr)); - - sge->addr = addr; - sge->length = skb->len - skb->data_len; - sge->lkey = ssk->mr->lkey; - frags = skb_shinfo(skb)->nr_frags; - for (i = 0; i < frags; ++i) { - ++sge; - addr = ib_dma_map_page(dev, skb_shinfo(skb)->frags[i].page, - skb_shinfo(skb)->frags[i].page_offset, - skb_shinfo(skb)->frags[i].size, - DMA_TO_DEVICE); - BUG_ON(ib_dma_mapping_error(dev, addr)); - tx_req->mapping[i + 1] = addr; - sge->addr = addr; - sge->length = skb_shinfo(skb)->frags[i].size; - sge->lkey = ssk->mr->lkey; - } - - ssk->tx_wr.next = NULL; - ssk->tx_wr.wr_id = ssk->tx_ring.head | SDP_OP_SEND; - ssk->tx_wr.sg_list = ssk->ibsge; - ssk->tx_wr.num_sge = frags + 1; - ssk->tx_wr.opcode = IB_WR_SEND; - ssk->tx_wr.send_flags = IB_SEND_SIGNALED; - if (unlikely(TCP_SKB_CB(skb)->flags & TCPCB_FLAG_URG)) - ssk->tx_wr.send_flags |= IB_SEND_SOLICITED; - - { - static unsigned long last_send = 0; - int delta = jiffies - last_send; - - if (likely(last_send)) - SDPSTATS_HIST(send_interval, delta); - - last_send = jiffies; - } - rc = ib_post_send(ssk->qp, &ssk->tx_wr, &bad_wr); - ++ssk->tx_ring.head; - --ssk->tx_ring.credits; - ssk->remote_credits = ssk->rx_head - ssk->rx_tail; - if (unlikely(rc)) { - sdp_dbg(&ssk->isk.sk, "ib_post_send failed with status %d.\n", rc); - sdp_set_error(&ssk->isk.sk, -ECONNRESET); - wake_up(&ssk->wq); - } - - if (ssk->tx_ring.credits <= SDP_MIN_TX_CREDITS) { - sdp_poll_rx_cq(ssk); - } -} - -struct sk_buff *sdp_send_completion(struct sdp_sock *ssk, int mseq) -{ - struct ib_device *dev; - struct sdp_buf *tx_req; - struct sk_buff *skb = NULL; - struct bzcopy_state *bz; - int i, frags; - struct sdp_tx_ring *tx_ring = &ssk->tx_ring; - if (unlikely(mseq != tx_ring->tail)) { - printk(KERN_WARNING "Bogus send completion id %d tail %d\n", - mseq, tx_ring->tail); - goto out; - } - - dev = ssk->ib_device; - tx_req = &tx_ring->buffer[mseq & (SDP_TX_SIZE - 1)]; - skb = tx_req->skb; - ib_dma_unmap_single(dev, tx_req->mapping[0], skb->len - skb->data_len, - DMA_TO_DEVICE); - frags = skb_shinfo(skb)->nr_frags; - for (i = 0; i < frags; ++i) { - ib_dma_unmap_page(dev, tx_req->mapping[i + 1], - skb_shinfo(skb)->frags[i].size, - DMA_TO_DEVICE); - } - - tx_ring->una_seq += TCP_SKB_CB(skb)->end_seq; - - /* TODO: AIO and real zcopy cdoe; add their context support here */ - bz = BZCOPY_STATE(skb); - if (bz) - bz->busy--; - - ++tx_ring->tail; - -out: - return skb; -} - -static void sdp_post_recv(struct sdp_sock *ssk) -{ - struct sdp_buf *rx_req; - int i, rc, frags; - u64 addr; - struct ib_device *dev; - struct ib_sge *sge; - struct ib_recv_wr *bad_wr; - struct sk_buff *skb; - struct page *page; - skb_frag_t *frag; - struct sdp_bsdh *h; - int id = ssk->rx_head; - gfp_t gfp_page; - - /* Now, allocate and repost recv */ - /* TODO: allocate from cache */ - - if (unlikely(ssk->isk.sk.sk_allocation)) { - skb = sdp_stream_alloc_skb(&ssk->isk.sk, SDP_HEAD_SIZE, - ssk->isk.sk.sk_allocation); - gfp_page = ssk->isk.sk.sk_allocation | __GFP_HIGHMEM; - } else { - skb = sdp_stream_alloc_skb(&ssk->isk.sk, SDP_HEAD_SIZE, - GFP_KERNEL); - gfp_page = GFP_HIGHUSER; - } - - /* FIXME */ - BUG_ON(!skb); - h = (struct sdp_bsdh *)skb->head; - for (i = 0; i < ssk->recv_frags; ++i) { - page = alloc_pages(gfp_page, 0); - BUG_ON(!page); - frag = &skb_shinfo(skb)->frags[i]; - frag->page = page; - frag->page_offset = 0; - - /* Bugzilla 1311 */ - if ( sizeof(frag->size) < 4 ) - frag->size = min(PAGE_SIZE, SDP_MAX_PAYLOAD); - else - frag->size = PAGE_SIZE; - - ++skb_shinfo(skb)->nr_frags; - skb->len += frag->size; - skb->data_len += frag->size; - skb->truesize += frag->size; - } - - rx_req = ssk->rx_ring + (id & (SDP_RX_SIZE - 1)); - rx_req->skb = skb; - dev = ssk->ib_device; - sge = ssk->ibsge; - addr = ib_dma_map_single(dev, h, SDP_HEAD_SIZE, DMA_FROM_DEVICE); - BUG_ON(ib_dma_mapping_error(dev, addr)); - - rx_req->mapping[0] = addr; - - /* TODO: proper error handling */ - sge->addr = (u64)addr; - sge->length = SDP_HEAD_SIZE; - sge->lkey = ssk->mr->lkey; - frags = skb_shinfo(skb)->nr_frags; - for (i = 0; i < frags; ++i) { - ++sge; - addr = ib_dma_map_page(dev, skb_shinfo(skb)->frags[i].page, - skb_shinfo(skb)->frags[i].page_offset, - skb_shinfo(skb)->frags[i].size, - DMA_FROM_DEVICE); - BUG_ON(ib_dma_mapping_error(dev, addr)); - rx_req->mapping[i + 1] = addr; - sge->addr = addr; - sge->length = skb_shinfo(skb)->frags[i].size; - sge->lkey = ssk->mr->lkey; - } - - ssk->rx_wr.next = NULL; - ssk->rx_wr.wr_id = id | SDP_OP_RECV; - ssk->rx_wr.sg_list = ssk->ibsge; - ssk->rx_wr.num_sge = frags + 1; - rc = ib_post_recv(ssk->qp, &ssk->rx_wr, &bad_wr); - sdp_prf(&ssk->isk.sk, skb, "rx skb was posted"); - SDPSTATS_COUNTER_INC(post_recv); - ++ssk->rx_head; - if (unlikely(rc)) { - sdp_dbg(&ssk->isk.sk, "ib_post_recv failed with status %d\n", rc); - sdp_reset(&ssk->isk.sk); - } - - atomic_add(SDP_MAX_SEND_SKB_FRAGS, &sdp_current_mem_usage); -} - -void sdp_post_recvs(struct sdp_sock *ssk) -{ - struct sock *sk = &ssk->isk.sk; - int scale = ssk->rcvbuf_scale; - - if (unlikely(!ssk->id || ((1 << sk->sk_state) & - (TCPF_CLOSE | TCPF_TIME_WAIT)))) { - return; - } - - if (top_mem_usage && - (top_mem_usage * 0x100000) < atomic_read(&sdp_current_mem_usage) * PAGE_SIZE) - scale = 1; - - while ((likely(ssk->rx_head - ssk->rx_tail < SDP_RX_SIZE) && - (ssk->rx_head - ssk->rx_tail - SDP_MIN_TX_CREDITS) * - (SDP_HEAD_SIZE + ssk->recv_frags * PAGE_SIZE) + - ssk->rcv_nxt - ssk->copied_seq < sk->sk_rcvbuf * scale) || - unlikely(ssk->rx_head - ssk->rx_tail < SDP_MIN_TX_CREDITS)) - sdp_post_recv(ssk); -} - -struct sk_buff *sdp_recv_completion(struct sdp_sock *ssk, int id) -{ - struct sdp_buf *rx_req; - struct ib_device *dev; - struct sk_buff *skb; - int i, frags; - - if (unlikely(id != ssk->rx_tail)) { - printk(KERN_WARNING "Bogus recv completion id %d tail %d\n", - id, ssk->rx_tail); - return NULL; - } - - dev = ssk->ib_device; - rx_req = &ssk->rx_ring[id & (SDP_RX_SIZE - 1)]; - skb = rx_req->skb; - ib_dma_unmap_single(dev, rx_req->mapping[0], SDP_HEAD_SIZE, - DMA_FROM_DEVICE); - frags = skb_shinfo(skb)->nr_frags; - for (i = 0; i < frags; ++i) - ib_dma_unmap_page(dev, rx_req->mapping[i + 1], - skb_shinfo(skb)->frags[i].size, - DMA_FROM_DEVICE); - ++ssk->rx_tail; - --ssk->remote_credits; - return skb; -} - -/* Here because I do not want queue to fail. */ -static inline struct sk_buff *sdp_sock_queue_rcv_skb(struct sock *sk, - struct sk_buff *skb) -{ - int skb_len; - struct sdp_sock *ssk = sdp_sk(sk); - struct sk_buff *tail = NULL; - - /* not needed since sk_rmem_alloc is not currently used - * TODO - remove this? - skb_set_owner_r(skb, sk); */ - - skb_len = skb->len; - - TCP_SKB_CB(skb)->seq = ssk->rcv_nxt; - ssk->rcv_nxt += skb_len; - - if (likely(skb_len && (tail = skb_peek_tail(&sk->sk_receive_queue))) && - unlikely(skb_tailroom(tail) >= skb_len)) { - skb_copy_bits(skb, 0, skb_put(tail, skb_len), skb_len); - __kfree_skb(skb); - skb = tail; - } else - skb_queue_tail(&sk->sk_receive_queue, skb); - - if (!sock_flag(sk, SOCK_DEAD)) - sk->sk_data_ready(sk, skb_len); - return skb; -} - static inline void update_send_head(struct sock *sk, struct sk_buff *skb) { struct page *page; @@ -771,456 +294,7 @@ void _sdp_post_sends(const char *func, int line, struct sdp_sock *ssk, int nonag } } -int sdp_init_buffers(struct sdp_sock *ssk, u32 new_size) -{ - ssk->recv_frags = PAGE_ALIGN(new_size - SDP_HEAD_SIZE) / PAGE_SIZE; - if (ssk->recv_frags > SDP_MAX_SEND_SKB_FRAGS) - ssk->recv_frags = SDP_MAX_SEND_SKB_FRAGS; - ssk->rcvbuf_scale = rcvbuf_scale; - - sdp_post_recvs(ssk); - - return 0; -} - -int sdp_resize_buffers(struct sdp_sock *ssk, u32 new_size) -{ - skb_frag_t skb_frag; - u32 curr_size = SDP_HEAD_SIZE + ssk->recv_frags * PAGE_SIZE; - u32 max_size = SDP_HEAD_SIZE + SDP_MAX_SEND_SKB_FRAGS * PAGE_SIZE; - - /* Bugzilla 1311, Kernels using smaller fragments must reject - * re-size requests larger than 32k to prevent being sent - * fragment larger than the receive buffer fragment. - */ - if ( (sizeof(skb_frag.size) < 4) && (max_size > 0x8000)) - max_size = 0x8000; - - if (new_size > curr_size && new_size <= max_size && - sdp_get_large_socket(ssk)) { - ssk->rcvbuf_scale = rcvbuf_scale; - ssk->recv_frags = PAGE_ALIGN(new_size - SDP_HEAD_SIZE) / PAGE_SIZE; - if (ssk->recv_frags > SDP_MAX_SEND_SKB_FRAGS) - ssk->recv_frags = SDP_MAX_SEND_SKB_FRAGS; - return 0; - } else - return -1; -} - -static void sdp_handle_resize_request(struct sdp_sock *ssk, struct sdp_chrecvbuf *buf) -{ - if (sdp_resize_buffers(ssk, ntohl(buf->size)) == 0) - ssk->recv_request_head = ssk->rx_head + 1; - else - ssk->recv_request_head = ssk->rx_tail; - ssk->recv_request = 1; -} - -static void sdp_handle_resize_ack(struct sdp_sock *ssk, struct sdp_chrecvbuf *buf) -{ - u32 new_size = ntohl(buf->size); - - if (new_size > ssk->xmit_size_goal) { - ssk->sent_request = -1; - ssk->xmit_size_goal = new_size; - ssk->send_frags = - PAGE_ALIGN(ssk->xmit_size_goal) / PAGE_SIZE; - } else - ssk->sent_request = 0; -} - -static int sdp_handle_recv_comp(struct sdp_sock *ssk, struct ib_wc *wc) -{ - struct sock *sk = &ssk->isk.sk; - int frags; - struct sk_buff *skb; - struct sdp_bsdh *h; - int pagesz, i; - - skb = sdp_recv_completion(ssk, wc->wr_id); - if (unlikely(!skb)) - return -1; - - sdp_prf(sk, skb, "recv completion"); - - atomic_sub(SDP_MAX_SEND_SKB_FRAGS, &sdp_current_mem_usage); - - if (unlikely(wc->status)) { - if (wc->status != IB_WC_WR_FLUSH_ERR) { - sdp_dbg(sk, "Recv completion with error. Status %d\n", - wc->status); - sdp_reset(sk); - } - __kfree_skb(skb); - return 0; - } - - sdp_dbg_data(sk, "Recv completion. ID %d Length %d\n", - (int)wc->wr_id, wc->byte_len); - if (unlikely(wc->byte_len < sizeof(struct sdp_bsdh))) { - printk(KERN_WARNING "SDP BUG! byte_len %d < %zd\n", - wc->byte_len, sizeof(struct sdp_bsdh)); - __kfree_skb(skb); - return -1; - } - skb->len = wc->byte_len; - if (likely(wc->byte_len > SDP_HEAD_SIZE)) - skb->data_len = wc->byte_len - SDP_HEAD_SIZE; - else - skb->data_len = 0; - skb->data = skb->head; -#ifdef NET_SKBUFF_DATA_USES_OFFSET - skb->tail = skb_headlen(skb); -#else - skb->tail = skb->head + skb_headlen(skb); -#endif - h = (struct sdp_bsdh *)skb->data; - SDP_DUMP_PACKET(&ssk->isk.sk, "RX", skb, h); - skb_reset_transport_header(skb); - ssk->mseq_ack = ntohl(h->mseq); - if (ssk->mseq_ack != (int)wc->wr_id) - printk(KERN_WARNING "SDP BUG! mseq %d != wrid %d\n", - ssk->mseq_ack, (int)wc->wr_id); - - SDPSTATS_HIST_LINEAR(credits_before_update, ssk->tx_ring.credits); - ssk->tx_ring.credits = ntohl(h->mseq_ack) - ssk->tx_ring.head + 1 + - ntohs(h->bufs); - - frags = skb_shinfo(skb)->nr_frags; - pagesz = PAGE_ALIGN(skb->data_len); - skb_shinfo(skb)->nr_frags = pagesz / PAGE_SIZE; - - for (i = skb_shinfo(skb)->nr_frags; - i < frags; ++i) { - put_page(skb_shinfo(skb)->frags[i].page); - skb->truesize -= PAGE_SIZE; - } - - if (unlikely(h->flags & SDP_OOB_PEND)) - sk_send_sigurg(sk); - - skb_pull(skb, sizeof(struct sdp_bsdh)); - - switch (h->mid) { - case SDP_MID_DATA: - if (unlikely(skb->len <= 0)) { - __kfree_skb(skb); - break; - } - - if (unlikely(sk->sk_shutdown & RCV_SHUTDOWN)) { - /* got data in RCV_SHUTDOWN */ - if (sk->sk_state == TCP_FIN_WAIT1) { - /* go into abortive close */ - sdp_exch_state(sk, TCPF_FIN_WAIT1, - TCP_TIME_WAIT); - - sk->sk_prot->disconnect(sk, 0); - } - - __kfree_skb(skb); - break; - } - skb = sdp_sock_queue_rcv_skb(sk, skb); - if (unlikely(h->flags & SDP_OOB_PRES)) - sdp_urg(ssk, skb); - break; - case SDP_MID_DISCONN: - __kfree_skb(skb); - sdp_fin(sk); - break; - case SDP_MID_CHRCVBUF: - sdp_handle_resize_request(ssk, - (struct sdp_chrecvbuf *)skb->data); - __kfree_skb(skb); - break; - case SDP_MID_CHRCVBUF_ACK: - sdp_handle_resize_ack(ssk, (struct sdp_chrecvbuf *)skb->data); - __kfree_skb(skb); - break; - default: - /* TODO: Handle other messages */ - printk(KERN_WARNING "SDP: FIXME MID %d\n", h->mid); - __kfree_skb(skb); - } - - return 0; -} - -static int sdp_handle_send_comp(struct sdp_sock *ssk, struct ib_wc *wc) -{ - struct sk_buff *skb = NULL; - - skb = sdp_send_completion(ssk, wc->wr_id); - if (unlikely(!skb)) - return -1; - - if (unlikely(wc->status)) { - if (wc->status != IB_WC_WR_FLUSH_ERR) { - struct sock *sk = &ssk->isk.sk; - sdp_warn(sk, "Send completion with error. " - "Status %d\n", wc->status); - sdp_set_error(sk, -ECONNRESET); - wake_up(&ssk->wq); - - queue_work(rx_comp_wq, &ssk->destroy_work); - } - } - - sdp_prf(&ssk->isk.sk, skb, "tx completion"); - sk_stream_free_skb(&ssk->isk.sk, skb); - - return 0; -} - -void sdp_rx_irq(struct ib_cq *cq, void *cq_context) -{ - struct sock *sk = cq_context; - struct sdp_sock *ssk = sdp_sk(sk); - - WARN_ON(ssk->rx_cq && cq != ssk->rx_cq); - - if (!ssk->rx_cq) - sdp_warn(&ssk->isk.sk, "WARNING: rx irq after cq destroyed\n"); - - SDPSTATS_COUNTER_INC(rx_int_count); - - sdp_prf(sk, NULL, "rx completion"); - - /* issue sdp_rx_comp_work() */ - queue_work(rx_comp_wq, &ssk->rx_comp_work); -} - -static inline void sdp_process_tx_wc(struct sdp_sock *ssk, struct ib_wc *wc) -{ - if (likely(wc->wr_id & SDP_OP_SEND)) { - sdp_handle_send_comp(ssk, wc); - return; - } - - sk_wmem_free_skb(&ssk->isk.sk, skb); - - /* Keepalive probe sent cleanup */ - sdp_cnt(sdp_keepalive_probes_sent); - - if (likely(!wc->status)) - return; - - sdp_dbg(&ssk->isk.sk, " %s consumes KEEPALIVE status %d\n", - __func__, wc->status); - - if (wc->status == IB_WC_WR_FLUSH_ERR) - return; - - sdp_set_error(&ssk->isk.sk, -ECONNRESET); - wake_up(&ssk->wq); -} - -static int sdp_process_tx_cq(struct sdp_sock *ssk) -{ - struct ib_wc ibwc[SDP_NUM_WC]; - int n, i; - int wc_processed = 0; - - if (!ssk->tx_ring.cq) { - sdp_warn(&ssk->isk.sk, "WARNING: tx irq when tx_cq is destroyed\n"); - return 0; - } - - do { - n = ib_poll_cq(ssk->tx_ring.cq, SDP_NUM_WC, ibwc); - for (i = 0; i < n; ++i) { - sdp_process_tx_wc(ssk, ibwc + i); - wc_processed++; - } - } while (n == SDP_NUM_WC); - - sdp_dbg_data(&ssk->isk.sk, "processed %d wc's\n", wc_processed); - - if (wc_processed) { - struct sock *sk = &ssk->isk.sk; - sdp_post_sends(ssk, 0); - - if (sk->sk_sleep && waitqueue_active(sk->sk_sleep)) - sk_stream_write_space(&ssk->isk.sk); - - } - - return wc_processed; -} - -void sdp_poll_tx_cq(unsigned long data) -{ - struct sdp_sock *ssk = (struct sdp_sock *)data; - struct sock *sk = &ssk->isk.sk; - u32 inflight, wc_processed; - - - sdp_dbg_data(&ssk->isk.sk, "Polling tx cq. inflight=%d\n", - (u32) ssk->tx_ring.head - ssk->tx_ring.tail); - - /* Only process if the socket is not in use */ - bh_lock_sock(sk); - if (sock_owned_by_user(sk)) { - mod_timer(&ssk->tx_ring.timer, jiffies + SDP_TX_POLL_TIMEOUT); - sdp_dbg_data(&ssk->isk.sk, "socket is busy - trying later\n"); - goto out; - } - - if (sk->sk_state == TCP_CLOSE) - goto out; - - wc_processed = sdp_process_tx_cq(ssk); - if (!wc_processed) - SDPSTATS_COUNTER_INC(tx_poll_miss); - else - SDPSTATS_COUNTER_INC(tx_poll_hit); - - inflight = (u32) ssk->tx_ring.head - ssk->tx_ring.tail; - - /* If there are still packets in flight and the timer has not already - * been scheduled by the Tx routine then schedule it here to guarantee - * completion processing of these packets */ - if (inflight) { /* TODO: make sure socket is not closed */ - sdp_dbg_data(sk, "arming timer for more polling\n"); - mod_timer(&ssk->tx_ring.timer, jiffies + SDP_TX_POLL_TIMEOUT); - } - -out: - bh_unlock_sock(sk); -} - -void _sdp_poll_tx_cq(unsigned long data) -{ - struct sdp_sock *ssk = (struct sdp_sock *)data; - struct sock *sk = &ssk->isk.sk; - - sdp_prf(sk, NULL, "sdp poll tx timeout expired"); - - sdp_poll_tx_cq(data); -} - - -void sdp_tx_irq(struct ib_cq *cq, void *cq_context) -{ - struct sock *sk = cq_context; - struct sdp_sock *ssk = sdp_sk(sk); - - sdp_warn(sk, "Got tx comp interrupt\n"); - - mod_timer(&ssk->tx_ring.timer, jiffies + 1); -} - -static inline int credit_update_needed(struct sdp_sock *ssk, int wc_processed) -{ - int c; - - c = ssk->remote_credits; - if (likely(c > SDP_MIN_TX_CREDITS)) - c += c/2; - -/* sdp_warn(&ssk->isk.sk, "credits: %d remote credits: %d " - "tx ring slots left: %d send_head: %p\n", - ssk->tx_ring.credits, ssk->remote_credits, - sdp_tx_ring_slots_left(&ssk->tx_ring), - ssk->isk.sk.sk_send_head); -*/ - return (unlikely(c < ssk->rx_head - ssk->rx_tail + wc_processed) && - likely(ssk->tx_ring.credits > 1) && - likely(sdp_tx_ring_slots_left(&ssk->tx_ring))); -} - - -int sdp_poll_rx_cq(struct sdp_sock *ssk) -{ - struct ib_cq *cq = ssk->rx_cq; - struct ib_wc ibwc[SDP_NUM_WC]; - int n, i; - int ret = -EAGAIN; - int updated_credits = 0; - int wc_processed = 0; - - do { - n = ib_poll_cq(cq, SDP_NUM_WC, ibwc); - for (i = 0; i < n; ++i) { - struct ib_wc *wc = &ibwc[i]; - - BUG_ON(!(wc->wr_id & SDP_OP_RECV)); - sdp_handle_recv_comp(ssk, wc); - wc_processed++; - -/* if (!updated_credits) { - sdp_post_recvs(ssk); - sdp_post_sends(ssk, 0); - updated_credits = 1; - }*/ -//sdp_warn(&ssk->isk.sk, "i = %d, wc_processed = %d wr_id = 0x%llx\n", i, wc_processed, wc->wr_id); - if (credit_update_needed(ssk, wc_processed)) { - sdp_prf(&ssk->isk.sk, NULL, "credit update. remote_credits: %d, avail now: %d processed: %d", - ssk->remote_credits, - ssk->rx_head - ssk->rx_tail, - wc_processed); - sdp_post_recvs(ssk); - if (sdp_post_credits(ssk)) - wc_processed = 0; - } - - ret = 0; - } - } while (n == SDP_NUM_WC); - - if (!ret) { - struct sock *sk = &ssk->isk.sk; - - sdp_post_recvs(ssk); - - /* update credits */ - sdp_post_sends(ssk, 0); - - if (sk->sk_sleep && waitqueue_active(sk->sk_sleep)) - sk_stream_write_space(&ssk->isk.sk); - } else { - SDPSTATS_COUNTER_INC(rx_poll_miss); - } - - return ret; -} - static inline int sdp_tx_qp_empty(struct sdp_sock *ssk) { return (ssk->tx_ring.head - ssk->tx_ring.tail) == 0; } - -void sdp_rx_comp_work(struct work_struct *work) -{ - struct sdp_sock *ssk = container_of(work, struct sdp_sock, rx_comp_work); - struct sock *sk = &ssk->isk.sk; - struct ib_cq *rx_cq; - - lock_sock(sk); - rx_cq = ssk->rx_cq; - if (unlikely(!rx_cq)) - goto out; - - if (unlikely(!ssk->poll_cq)) { - struct rdma_cm_id *id = ssk->id; - if (id && id->qp) - rdma_notify(id, RDMA_CM_EVENT_ESTABLISHED); - goto out; - } - - sdp_poll_rx_cq(ssk); - sdp_xmit_poll(ssk, 1); /* if has pending tx because run out of tx_credits - xmit it */ - release_sock(sk); - sk_mem_reclaim(sk); - lock_sock(sk); - rx_cq = ssk->rx_cq; - if (unlikely(!rx_cq)) - goto out; - - sdp_arm_rx_cq(sk); - sdp_poll_rx_cq(ssk); - sdp_xmit_poll(ssk, 1); -out: - release_sock(sk); -} diff --git a/drivers/infiniband/ulp/sdp/sdp_main.c b/drivers/infiniband/ulp/sdp/sdp_main.c index 8cd739a05f37c..b5322da653220 100644 --- a/drivers/infiniband/ulp/sdp/sdp_main.c +++ b/drivers/infiniband/ulp/sdp/sdp_main.c @@ -220,21 +220,8 @@ static void sdp_destroy_qp(struct sdp_sock *ssk) ib_destroy_qp(ssk->qp); ssk->qp = NULL; - while (ssk->rx_head != ssk->rx_tail) { - struct sk_buff *skb; - skb = sdp_recv_completion(ssk, ssk->rx_tail); - if (!skb) - break; - atomic_sub(SDP_MAX_SEND_SKB_FRAGS, &sdp_current_mem_usage); - __kfree_skb(skb); - } - while (ssk->tx_ring.head != ssk->tx_ring.tail) { - struct sk_buff *skb; - skb = sdp_send_completion(ssk, ssk->tx_ring.tail); - if (!skb) - break; - __kfree_skb(skb); - } + sdp_rx_ring_purge(ssk); + sdp_tx_ring_purge(ssk); } if (tx_cq) { diff --git a/drivers/infiniband/ulp/sdp/sdp_rx.c b/drivers/infiniband/ulp/sdp/sdp_rx.c new file mode 100644 index 0000000000000..ba8e0fefb2fce --- /dev/null +++ b/drivers/infiniband/ulp/sdp/sdp_rx.c @@ -0,0 +1,628 @@ +/* + * Copyright (c) 2006 Mellanox Technologies Ltd. All rights reserved. + * + * This software is available to you under a choice of one of two + * licenses. You may choose to be licensed under the terms of the GNU + * General Public License (GPL) Version 2, available from the file + * COPYING in the main directory of this source tree, or the + * OpenIB.org BSD license below: + * + * Redistribution and use in source and binary forms, with or + * without modification, are permitted provided that the following + * conditions are met: + * + * - Redistributions of source code must retain the above + * copyright notice, this list of conditions and the following + * disclaimer. + * + * - Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following + * disclaimer in the documentation and/or other materials + * provided with the distribution. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, + * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF + * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND + * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS + * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN + * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN + * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + * + * $Id$ + */ +#include +#include +#include +#include +#include "sdp.h" + +static int rcvbuf_scale = 0x10; + +int rcvbuf_initial_size = SDP_HEAD_SIZE; +module_param_named(rcvbuf_initial_size, rcvbuf_initial_size, int, 0644); +MODULE_PARM_DESC(rcvbuf_initial_size, "Receive buffer initial size in bytes."); + +module_param_named(rcvbuf_scale, rcvbuf_scale, int, 0644); +MODULE_PARM_DESC(rcvbuf_scale, "Receive buffer size scale factor."); + +static int top_mem_usage = 0; +module_param_named(top_mem_usage, top_mem_usage, int, 0644); +MODULE_PARM_DESC(top_mem_usage, "Top system wide sdp memory usage for recv (in MB)."); + +#ifdef CONFIG_PPC +static int max_large_sockets = 100; +#else +static int max_large_sockets = 1000; +#endif +module_param_named(max_large_sockets, max_large_sockets, int, 0644); +MODULE_PARM_DESC(max_large_sockets, "Max number of large sockets (32k buffers)."); + +static int curr_large_sockets = 0; +atomic_t sdp_current_mem_usage; +spinlock_t sdp_large_sockets_lock; + +static int sdp_get_large_socket(struct sdp_sock *ssk) +{ + int count, ret; + + if (ssk->recv_request) + return 1; + + spin_lock_irq(&sdp_large_sockets_lock); + count = curr_large_sockets; + ret = curr_large_sockets < max_large_sockets; + if (ret) + curr_large_sockets++; + spin_unlock_irq(&sdp_large_sockets_lock); + + return ret; +} + +void sdp_remove_large_sock(struct sdp_sock *ssk) +{ + if (ssk->recv_frags) { + spin_lock_irq(&sdp_large_sockets_lock); + curr_large_sockets--; + spin_unlock_irq(&sdp_large_sockets_lock); + } +} + +/* Like tcp_fin - called when SDP_MID_DISCONNECT is received */ +static void sdp_fin(struct sock *sk) +{ + sdp_dbg(sk, "%s\n", __func__); + + sk->sk_shutdown |= RCV_SHUTDOWN; + sock_set_flag(sk, SOCK_DONE); + + switch (sk->sk_state) { + case TCP_SYN_RECV: + case TCP_ESTABLISHED: + sdp_exch_state(sk, TCPF_SYN_RECV | TCPF_ESTABLISHED, + TCP_CLOSE_WAIT); + break; + + case TCP_FIN_WAIT1: + /* Received a reply FIN - start Infiniband tear down */ + sdp_dbg(sk, "%s: Starting Infiniband tear down sending DREQ\n", + __func__); + + sdp_cancel_dreq_wait_timeout(sdp_sk(sk)); + + sdp_exch_state(sk, TCPF_FIN_WAIT1, TCP_TIME_WAIT); + + if (sdp_sk(sk)->id) { + rdma_disconnect(sdp_sk(sk)->id); + } else { + sdp_warn(sk, "%s: sdp_sk(sk)->id is NULL\n", __func__); + return; + } + break; + case TCP_TIME_WAIT: + /* This is a mutual close situation and we've got the DREQ from + the peer before the SDP_MID_DISCONNECT */ + break; + case TCP_CLOSE: + /* FIN arrived after IB teardown started - do nothing */ + sdp_dbg(sk, "%s: fin in state %s\n", + __func__, sdp_state_str(sk->sk_state)); + return; + default: + sdp_warn(sk, "%s: FIN in unexpected state. sk->sk_state=%d\n", + __func__, sk->sk_state); + break; + } + + + sk_stream_mem_reclaim(sk); + + if (!sock_flag(sk, SOCK_DEAD)) { + sk->sk_state_change(sk); + + /* Do not send POLL_HUP for half duplex close. */ + if (sk->sk_shutdown == SHUTDOWN_MASK || + sk->sk_state == TCP_CLOSE) + sk_wake_async(sk, 1, POLL_HUP); + else + sk_wake_async(sk, 1, POLL_IN); + } +} + + +static void sdp_post_recv(struct sdp_sock *ssk) +{ + struct sdp_buf *rx_req; + int i, rc, frags; + u64 addr; + struct ib_device *dev; + struct ib_sge *sge; + struct ib_recv_wr *bad_wr; + struct sk_buff *skb; + struct page *page; + skb_frag_t *frag; + struct sdp_bsdh *h; + int id = ssk->rx_head; + gfp_t gfp_page; + + /* Now, allocate and repost recv */ + /* TODO: allocate from cache */ + + if (unlikely(ssk->isk.sk.sk_allocation)) { + skb = sk_stream_alloc_skb(&ssk->isk.sk, SDP_HEAD_SIZE, + ssk->isk.sk.sk_allocation); + gfp_page = ssk->isk.sk.sk_allocation | __GFP_HIGHMEM; + } else { + skb = sk_stream_alloc_skb(&ssk->isk.sk, SDP_HEAD_SIZE, + GFP_KERNEL); + gfp_page = GFP_HIGHUSER; + } + + /* FIXME */ + BUG_ON(!skb); + h = (struct sdp_bsdh *)skb->head; + for (i = 0; i < ssk->recv_frags; ++i) { + page = alloc_pages(gfp_page, 0); + BUG_ON(!page); + frag = &skb_shinfo(skb)->frags[i]; + frag->page = page; + frag->page_offset = 0; + frag->size = min(PAGE_SIZE, SDP_MAX_PAYLOAD); + ++skb_shinfo(skb)->nr_frags; + skb->len += frag->size; + skb->data_len += frag->size; + skb->truesize += frag->size; + } + + rx_req = ssk->rx_ring + (id & (SDP_RX_SIZE - 1)); + rx_req->skb = skb; + dev = ssk->ib_device; + sge = ssk->ibsge; + addr = ib_dma_map_single(dev, h, SDP_HEAD_SIZE, DMA_FROM_DEVICE); + BUG_ON(ib_dma_mapping_error(dev, addr)); + + rx_req->mapping[0] = addr; + + /* TODO: proper error handling */ + sge->addr = (u64)addr; + sge->length = SDP_HEAD_SIZE; + sge->lkey = ssk->mr->lkey; + frags = skb_shinfo(skb)->nr_frags; + for (i = 0; i < frags; ++i) { + ++sge; + addr = ib_dma_map_page(dev, skb_shinfo(skb)->frags[i].page, + skb_shinfo(skb)->frags[i].page_offset, + skb_shinfo(skb)->frags[i].size, + DMA_FROM_DEVICE); + BUG_ON(ib_dma_mapping_error(dev, addr)); + rx_req->mapping[i + 1] = addr; + sge->addr = addr; + sge->length = skb_shinfo(skb)->frags[i].size; + sge->lkey = ssk->mr->lkey; + } + + ssk->rx_wr.next = NULL; + ssk->rx_wr.wr_id = id | SDP_OP_RECV; + ssk->rx_wr.sg_list = ssk->ibsge; + ssk->rx_wr.num_sge = frags + 1; + rc = ib_post_recv(ssk->qp, &ssk->rx_wr, &bad_wr); + sdp_prf(&ssk->isk.sk, skb, "rx skb was posted"); + SDPSTATS_COUNTER_INC(post_recv); + ++ssk->rx_head; + if (unlikely(rc)) { + sdp_dbg(&ssk->isk.sk, "ib_post_recv failed with status %d\n", rc); + sdp_reset(&ssk->isk.sk); + } + + atomic_add(SDP_MAX_SEND_SKB_FRAGS, &sdp_current_mem_usage); +} + +void sdp_post_recvs(struct sdp_sock *ssk) +{ + struct sock *sk = &ssk->isk.sk; + int scale = ssk->rcvbuf_scale; + + if (unlikely(!ssk->id || ((1 << sk->sk_state) & + (TCPF_CLOSE | TCPF_TIME_WAIT)))) { + return; + } + + if (top_mem_usage && + (top_mem_usage * 0x100000) < atomic_read(&sdp_current_mem_usage) * PAGE_SIZE) + scale = 1; + + while ((likely(ssk->rx_head - ssk->rx_tail < SDP_RX_SIZE) && + (ssk->rx_head - ssk->rx_tail - SDP_MIN_TX_CREDITS) * + (SDP_HEAD_SIZE + ssk->recv_frags * PAGE_SIZE) + + ssk->rcv_nxt - ssk->copied_seq < sk->sk_rcvbuf * scale) || + unlikely(ssk->rx_head - ssk->rx_tail < SDP_MIN_TX_CREDITS)) + sdp_post_recv(ssk); +} + +static inline struct sk_buff *sdp_sock_queue_rcv_skb(struct sock *sk, + struct sk_buff *skb) +{ + int skb_len; + struct sdp_sock *ssk = sdp_sk(sk); + struct sk_buff *tail = NULL; + + /* not needed since sk_rmem_alloc is not currently used + * TODO - remove this? + skb_set_owner_r(skb, sk); */ + + skb_len = skb->len; + + TCP_SKB_CB(skb)->seq = ssk->rcv_nxt; + ssk->rcv_nxt += skb_len; + + if (likely(skb_len && (tail = skb_peek_tail(&sk->sk_receive_queue))) && + unlikely(skb_tailroom(tail) >= skb_len)) { + skb_copy_bits(skb, 0, skb_put(tail, skb_len), skb_len); + __kfree_skb(skb); + skb = tail; + } else + skb_queue_tail(&sk->sk_receive_queue, skb); + + if (!sock_flag(sk, SOCK_DEAD)) + sk->sk_data_ready(sk, skb_len); + return skb; +} + +int sdp_init_buffers(struct sdp_sock *ssk, u32 new_size) +{ + ssk->recv_frags = PAGE_ALIGN(new_size - SDP_HEAD_SIZE) / PAGE_SIZE; + if (ssk->recv_frags > SDP_MAX_SEND_SKB_FRAGS) + ssk->recv_frags = SDP_MAX_SEND_SKB_FRAGS; + ssk->rcvbuf_scale = rcvbuf_scale; + + sdp_post_recvs(ssk); + + return 0; +} + +int sdp_resize_buffers(struct sdp_sock *ssk, u32 new_size) +{ + u32 curr_size = SDP_HEAD_SIZE + ssk->recv_frags * PAGE_SIZE; +#if defined(__ia64__) + /* for huge PAGE_SIZE systems, aka IA64, limit buffers size + [re-]negotiation to a known+working size that will not + trigger a HW error/rc to be interpreted as a IB_WC_LOC_LEN_ERR */ + u32 max_size = (SDP_HEAD_SIZE + SDP_MAX_SEND_SKB_FRAGS * PAGE_SIZE) <= + 32784 ? + (SDP_HEAD_SIZE + SDP_MAX_SEND_SKB_FRAGS * PAGE_SIZE): 32784; +#else + u32 max_size = SDP_HEAD_SIZE + SDP_MAX_SEND_SKB_FRAGS * PAGE_SIZE; +#endif + + if (new_size > curr_size && new_size <= max_size && + sdp_get_large_socket(ssk)) { + ssk->rcvbuf_scale = rcvbuf_scale; + ssk->recv_frags = PAGE_ALIGN(new_size - SDP_HEAD_SIZE) / PAGE_SIZE; + if (ssk->recv_frags > SDP_MAX_SEND_SKB_FRAGS) + ssk->recv_frags = SDP_MAX_SEND_SKB_FRAGS; + return 0; + } else + return -1; +} + +static void sdp_handle_resize_request(struct sdp_sock *ssk, struct sdp_chrecvbuf *buf) +{ + if (sdp_resize_buffers(ssk, ntohl(buf->size)) == 0) + ssk->recv_request_head = ssk->rx_head + 1; + else + ssk->recv_request_head = ssk->rx_tail; + ssk->recv_request = 1; +} + +static void sdp_handle_resize_ack(struct sdp_sock *ssk, struct sdp_chrecvbuf *buf) +{ + u32 new_size = ntohl(buf->size); + + if (new_size > ssk->xmit_size_goal) { + ssk->sent_request = -1; + ssk->xmit_size_goal = new_size; + ssk->send_frags = + PAGE_ALIGN(ssk->xmit_size_goal) / PAGE_SIZE; + } else + ssk->sent_request = 0; +} + +static inline int credit_update_needed(struct sdp_sock *ssk, int wc_processed) +{ + int c; + + c = ssk->remote_credits; + if (likely(c > SDP_MIN_TX_CREDITS)) + c += c/2; + +/* sdp_warn(&ssk->isk.sk, "credits: %d remote credits: %d " + "tx ring slots left: %d send_head: %p\n", + ssk->tx_ring.credits, ssk->remote_credits, + sdp_tx_ring_slots_left(&ssk->tx_ring), + ssk->isk.sk.sk_send_head); +*/ + return (unlikely(c < ssk->rx_head - ssk->rx_tail + wc_processed) && + likely(ssk->tx_ring.credits > 1) && + likely(sdp_tx_ring_slots_left(&ssk->tx_ring))); +} + + +static struct sk_buff *sdp_recv_completion(struct sdp_sock *ssk, int id) +{ + struct sdp_buf *rx_req; + struct ib_device *dev; + struct sk_buff *skb; + int i, frags; + + if (unlikely(id != ssk->rx_tail)) { + printk(KERN_WARNING "Bogus recv completion id %d tail %d\n", + id, ssk->rx_tail); + return NULL; + } + + dev = ssk->ib_device; + rx_req = &ssk->rx_ring[id & (SDP_RX_SIZE - 1)]; + skb = rx_req->skb; + ib_dma_unmap_single(dev, rx_req->mapping[0], SDP_HEAD_SIZE, + DMA_FROM_DEVICE); + frags = skb_shinfo(skb)->nr_frags; + for (i = 0; i < frags; ++i) + ib_dma_unmap_page(dev, rx_req->mapping[i + 1], + skb_shinfo(skb)->frags[i].size, + DMA_FROM_DEVICE); + ++ssk->rx_tail; + --ssk->remote_credits; + return skb; +} + +static int sdp_process_rx_wc(struct sdp_sock *ssk, struct ib_wc *wc) +{ + struct sock *sk = &ssk->isk.sk; + int frags; + struct sk_buff *skb; + struct sdp_bsdh *h; + int pagesz, i; + + skb = sdp_recv_completion(ssk, wc->wr_id); + if (unlikely(!skb)) + return -1; + + sdp_prf(sk, skb, "recv completion"); + + atomic_sub(SDP_MAX_SEND_SKB_FRAGS, &sdp_current_mem_usage); + + if (unlikely(wc->status)) { + if (wc->status != IB_WC_WR_FLUSH_ERR) { + sdp_dbg(sk, "Recv completion with error. Status %d\n", + wc->status); + sdp_reset(sk); + } + __kfree_skb(skb); + return 0; + } + + sdp_dbg_data(sk, "Recv completion. ID %d Length %d\n", + (int)wc->wr_id, wc->byte_len); + if (unlikely(wc->byte_len < sizeof(struct sdp_bsdh))) { + printk(KERN_WARNING "SDP BUG! byte_len %d < %zd\n", + wc->byte_len, sizeof(struct sdp_bsdh)); + __kfree_skb(skb); + return -1; + } + skb->len = wc->byte_len; + if (likely(wc->byte_len > SDP_HEAD_SIZE)) + skb->data_len = wc->byte_len - SDP_HEAD_SIZE; + else + skb->data_len = 0; + skb->data = skb->head; +#ifdef NET_SKBUFF_DATA_USES_OFFSET + skb->tail = skb_headlen(skb); +#else + skb->tail = skb->head + skb_headlen(skb); +#endif + h = (struct sdp_bsdh *)skb->data; + SDP_DUMP_PACKET(&ssk->isk.sk, "RX", skb, h); + skb_reset_transport_header(skb); + ssk->mseq_ack = ntohl(h->mseq); + if (ssk->mseq_ack != (int)wc->wr_id) + printk(KERN_WARNING "SDP BUG! mseq %d != wrid %d\n", + ssk->mseq_ack, (int)wc->wr_id); + + SDPSTATS_HIST_LINEAR(credits_before_update, ssk->tx_ring.credits); + ssk->tx_ring.credits = ntohl(h->mseq_ack) - ssk->tx_ring.head + 1 + + ntohs(h->bufs); + + frags = skb_shinfo(skb)->nr_frags; + pagesz = PAGE_ALIGN(skb->data_len); + skb_shinfo(skb)->nr_frags = pagesz / PAGE_SIZE; + + for (i = skb_shinfo(skb)->nr_frags; + i < frags; ++i) { + put_page(skb_shinfo(skb)->frags[i].page); + skb->truesize -= PAGE_SIZE; + } + + if (unlikely(h->flags & SDP_OOB_PEND)) + sk_send_sigurg(sk); + + skb_pull(skb, sizeof(struct sdp_bsdh)); + + switch (h->mid) { + case SDP_MID_DATA: + if (unlikely(skb->len <= 0)) { + __kfree_skb(skb); + break; + } + + if (unlikely(sk->sk_shutdown & RCV_SHUTDOWN)) { + /* got data in RCV_SHUTDOWN */ + if (sk->sk_state == TCP_FIN_WAIT1) { + /* go into abortive close */ + sdp_exch_state(sk, TCPF_FIN_WAIT1, + TCP_TIME_WAIT); + + sk->sk_prot->disconnect(sk, 0); + } + + __kfree_skb(skb); + break; + } + skb = sdp_sock_queue_rcv_skb(sk, skb); + if (unlikely(h->flags & SDP_OOB_PRES)) + sdp_urg(ssk, skb); + break; + case SDP_MID_DISCONN: + __kfree_skb(skb); + sdp_fin(sk); + break; + case SDP_MID_CHRCVBUF: + sdp_handle_resize_request(ssk, + (struct sdp_chrecvbuf *)skb->data); + __kfree_skb(skb); + break; + case SDP_MID_CHRCVBUF_ACK: + sdp_handle_resize_ack(ssk, (struct sdp_chrecvbuf *)skb->data); + __kfree_skb(skb); + break; + default: + /* TODO: Handle other messages */ + printk(KERN_WARNING "SDP: FIXME MID %d\n", h->mid); + __kfree_skb(skb); + } + + return 0; +} + +int sdp_poll_rx_cq(struct sdp_sock *ssk) +{ + struct ib_cq *cq = ssk->rx_cq; + struct ib_wc ibwc[SDP_NUM_WC]; + int n, i; + int ret = -EAGAIN; + int wc_processed = 0; + + do { + n = ib_poll_cq(cq, SDP_NUM_WC, ibwc); + for (i = 0; i < n; ++i) { + struct ib_wc *wc = &ibwc[i]; + + BUG_ON(!(wc->wr_id & SDP_OP_RECV)); + sdp_process_rx_wc(ssk, wc); + wc_processed++; + + if (credit_update_needed(ssk, wc_processed)) { + sdp_prf(&ssk->isk.sk, NULL, "credit update. remote_credits: %d, avail now: %d processed: %d", + ssk->remote_credits, + ssk->rx_head - ssk->rx_tail, + wc_processed); + sdp_post_recvs(ssk); + if (sdp_post_credits(ssk)) + wc_processed = 0; + } + + ret = 0; + } + } while (n == SDP_NUM_WC); + + if (!ret) { + struct sock *sk = &ssk->isk.sk; + + sdp_post_recvs(ssk); + + /* update credits */ + sdp_post_sends(ssk, 0); + + if (sk->sk_sleep && waitqueue_active(sk->sk_sleep)) + sk_stream_write_space(&ssk->isk.sk); + } else { + SDPSTATS_COUNTER_INC(rx_poll_miss); + } + + return ret; +} + +void sdp_rx_comp_work(struct work_struct *work) +{ + struct sdp_sock *ssk = container_of(work, struct sdp_sock, rx_comp_work); + struct sock *sk = &ssk->isk.sk; + struct ib_cq *rx_cq; + + lock_sock(sk); + rx_cq = ssk->rx_cq; + if (unlikely(!rx_cq)) + goto out; + + if (unlikely(!ssk->poll_cq)) { + struct rdma_cm_id *id = ssk->id; + if (id && id->qp) + rdma_notify(id, RDMA_CM_EVENT_ESTABLISHED); + goto out; + } + + sdp_poll_rx_cq(ssk); + sdp_xmit_poll(ssk, 1); /* if has pending tx because run out of tx_credits - xmit it */ + release_sock(sk); + sk_stream_mem_reclaim(sk); + lock_sock(sk); + rx_cq = ssk->rx_cq; + if (unlikely(!rx_cq)) + goto out; + + sdp_arm_rx_cq(sk); + sdp_poll_rx_cq(ssk); + sdp_xmit_poll(ssk, 1); +out: + release_sock(sk); +} + +void sdp_rx_irq(struct ib_cq *cq, void *cq_context) +{ + struct sock *sk = cq_context; + struct sdp_sock *ssk = sdp_sk(sk); + + WARN_ON(ssk->rx_cq && cq != ssk->rx_cq); + + if (!ssk->rx_cq) + sdp_warn(&ssk->isk.sk, "WARNING: rx irq after cq destroyed\n"); + + SDPSTATS_COUNTER_INC(rx_int_count); + + sdp_prf(sk, NULL, "rx completion"); + + /* issue sdp_rx_comp_work() */ + queue_work(rx_comp_wq, &ssk->rx_comp_work); +} + +void sdp_rx_ring_purge(struct sdp_sock *ssk) +{ + struct sk_buff *skb; + + while (ssk->rx_head != ssk->rx_tail) { + struct sk_buff *skb; + skb = sdp_recv_completion(ssk, ssk->rx_tail); + if (!skb) + break; + atomic_sub(SDP_MAX_SEND_SKB_FRAGS, &sdp_current_mem_usage); + __kfree_skb(skb); + } +} diff --git a/drivers/infiniband/ulp/sdp/sdp_tx.c b/drivers/infiniband/ulp/sdp/sdp_tx.c new file mode 100644 index 0000000000000..5e6a2dc28bf5a --- /dev/null +++ b/drivers/infiniband/ulp/sdp/sdp_tx.c @@ -0,0 +1,382 @@ +/* + * Copyright (c) 2006 Mellanox Technologies Ltd. All rights reserved. + * + * This software is available to you under a choice of one of two + * licenses. You may choose to be licensed under the terms of the GNU + * General Public License (GPL) Version 2, available from the file + * COPYING in the main directory of this source tree, or the + * OpenIB.org BSD license below: + * + * Redistribution and use in source and binary forms, with or + * without modification, are permitted provided that the following + * conditions are met: + * + * - Redistributions of source code must retain the above + * copyright notice, this list of conditions and the following + * disclaimer. + * + * - Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following + * disclaimer in the documentation and/or other materials + * provided with the distribution. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, + * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF + * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND + * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS + * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN + * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN + * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + * + * $Id$ + */ +#include +#include +#include +#include +#include "sdp.h" + +#define sdp_cnt(var) do { (var)++; } while (0) +static unsigned sdp_keepalive_probes_sent = 0; + +module_param_named(sdp_keepalive_probes_sent, sdp_keepalive_probes_sent, uint, 0644); +MODULE_PARM_DESC(sdp_keepalive_probes_sent, "Total number of keepalive probes sent."); + +static int sdp_process_tx_cq(struct sdp_sock *ssk); + +int sdp_xmit_poll(struct sdp_sock *ssk, int force) +{ + int wc_processed = 0; + + /* If we don't have a pending timer, set one up to catch our recent + post in case the interface becomes idle */ + if (!timer_pending(&ssk->tx_ring.timer)) + mod_timer(&ssk->tx_ring.timer, jiffies + SDP_TX_POLL_TIMEOUT); + + /* Poll the CQ every SDP_TX_POLL_MODER packets */ + if (force || (++ssk->tx_ring.poll_cnt & (SDP_TX_POLL_MODER - 1)) == 0) + wc_processed = sdp_process_tx_cq(ssk); + + return wc_processed; +} + +void sdp_post_send(struct sdp_sock *ssk, struct sk_buff *skb, u8 mid) +{ + struct sdp_buf *tx_req; + struct sdp_bsdh *h = (struct sdp_bsdh *)skb_push(skb, sizeof *h); + unsigned mseq = ssk->tx_ring.head; + int i, rc, frags; + u64 addr; + struct ib_device *dev; + struct ib_sge *sge; + struct ib_send_wr *bad_wr; + +#define ENUM2STR(e) [e] = #e + static char *mid2str[] = { + ENUM2STR(SDP_MID_HELLO), + ENUM2STR(SDP_MID_HELLO_ACK), + ENUM2STR(SDP_MID_DISCONN), + ENUM2STR(SDP_MID_CHRCVBUF), + ENUM2STR(SDP_MID_CHRCVBUF_ACK), + ENUM2STR(SDP_MID_DATA), + }; + sdp_prf(&ssk->isk.sk, skb, "post_send mid = %s, bufs = %d", + mid2str[mid], ssk->rx_head - ssk->rx_tail); + + SDPSTATS_COUNTER_MID_INC(post_send, mid); + SDPSTATS_HIST(send_size, skb->len); + + h->mid = mid; + if (unlikely(TCP_SKB_CB(skb)->flags & TCPCB_URG)) + h->flags = SDP_OOB_PRES | SDP_OOB_PEND; + else + h->flags = 0; + + h->bufs = htons(ssk->rx_head - ssk->rx_tail); + h->len = htonl(skb->len); + h->mseq = htonl(mseq); + h->mseq_ack = htonl(ssk->mseq_ack); + + SDP_DUMP_PACKET(&ssk->isk.sk, "TX", skb, h); + tx_req = &ssk->tx_ring.buffer[mseq & (SDP_TX_SIZE - 1)]; + tx_req->skb = skb; + dev = ssk->ib_device; + sge = ssk->ibsge; + addr = ib_dma_map_single(dev, skb->data, skb->len - skb->data_len, + DMA_TO_DEVICE); + tx_req->mapping[0] = addr; + + /* TODO: proper error handling */ + BUG_ON(ib_dma_mapping_error(dev, addr)); + + sge->addr = addr; + sge->length = skb->len - skb->data_len; + sge->lkey = ssk->mr->lkey; + frags = skb_shinfo(skb)->nr_frags; + for (i = 0; i < frags; ++i) { + ++sge; + addr = ib_dma_map_page(dev, skb_shinfo(skb)->frags[i].page, + skb_shinfo(skb)->frags[i].page_offset, + skb_shinfo(skb)->frags[i].size, + DMA_TO_DEVICE); + BUG_ON(ib_dma_mapping_error(dev, addr)); + tx_req->mapping[i + 1] = addr; + sge->addr = addr; + sge->length = skb_shinfo(skb)->frags[i].size; + sge->lkey = ssk->mr->lkey; + } + + ssk->tx_wr.next = NULL; + ssk->tx_wr.wr_id = ssk->tx_ring.head | SDP_OP_SEND; + ssk->tx_wr.sg_list = ssk->ibsge; + ssk->tx_wr.num_sge = frags + 1; + ssk->tx_wr.opcode = IB_WR_SEND; + ssk->tx_wr.send_flags = IB_SEND_SIGNALED; + if (unlikely(TCP_SKB_CB(skb)->flags & TCPCB_URG)) + ssk->tx_wr.send_flags |= IB_SEND_SOLICITED; + + { + static unsigned long last_send = 0; + int delta = jiffies - last_send; + + if (likely(last_send)) + SDPSTATS_HIST(send_interval, delta); + + last_send = jiffies; + } + rc = ib_post_send(ssk->qp, &ssk->tx_wr, &bad_wr); + ++ssk->tx_ring.head; + --ssk->tx_ring.credits; + ssk->remote_credits = ssk->rx_head - ssk->rx_tail; + if (unlikely(rc)) { + sdp_dbg(&ssk->isk.sk, "ib_post_send failed with status %d.\n", rc); + sdp_set_error(&ssk->isk.sk, -ECONNRESET); + wake_up(&ssk->wq); + } + + if (ssk->tx_ring.credits <= SDP_MIN_TX_CREDITS) { + sdp_poll_rx_cq(ssk); + } +} + +static struct sk_buff *sdp_send_completion(struct sdp_sock *ssk, int mseq) +{ + struct ib_device *dev; + struct sdp_buf *tx_req; + struct sk_buff *skb = NULL; + struct bzcopy_state *bz; + int i, frags; + struct sdp_tx_ring *tx_ring = &ssk->tx_ring; + if (unlikely(mseq != tx_ring->tail)) { + printk(KERN_WARNING "Bogus send completion id %d tail %d\n", + mseq, tx_ring->tail); + goto out; + } + + dev = ssk->ib_device; + tx_req = &tx_ring->buffer[mseq & (SDP_TX_SIZE - 1)]; + skb = tx_req->skb; + ib_dma_unmap_single(dev, tx_req->mapping[0], skb->len - skb->data_len, + DMA_TO_DEVICE); + frags = skb_shinfo(skb)->nr_frags; + for (i = 0; i < frags; ++i) { + ib_dma_unmap_page(dev, tx_req->mapping[i + 1], + skb_shinfo(skb)->frags[i].size, + DMA_TO_DEVICE); + } + + tx_ring->una_seq += TCP_SKB_CB(skb)->end_seq; + + /* TODO: AIO and real zcopy code; add their context support here */ + bz = BZCOPY_STATE(skb); + if (bz) + bz->busy--; + + ++tx_ring->tail; + +out: + return skb; +} + +static int sdp_handle_send_comp(struct sdp_sock *ssk, struct ib_wc *wc) +{ + struct sk_buff *skb = NULL; + + skb = sdp_send_completion(ssk, wc->wr_id); + if (unlikely(!skb)) + return -1; + + if (unlikely(wc->status)) { + if (wc->status != IB_WC_WR_FLUSH_ERR) { + struct sock *sk = &ssk->isk.sk; + sdp_warn(sk, "Send completion with error. " + "Status %d\n", wc->status); + sdp_set_error(sk, -ECONNRESET); + wake_up(&ssk->wq); + + queue_work(sdp_wq, &ssk->destroy_work); + } + } + + sdp_prf(&ssk->isk.sk, skb, "tx completion"); + sk_stream_free_skb(&ssk->isk.sk, skb); + + return 0; +} + +static inline void sdp_process_tx_wc(struct sdp_sock *ssk, struct ib_wc *wc) +{ + if (likely(wc->wr_id & SDP_OP_SEND)) { + sdp_handle_send_comp(ssk, wc); + return; + } + + /* Keepalive probe sent cleanup */ + sdp_cnt(sdp_keepalive_probes_sent); + + if (likely(!wc->status)) + return; + + sdp_dbg(&ssk->isk.sk, " %s consumes KEEPALIVE status %d\n", + __func__, wc->status); + + if (wc->status == IB_WC_WR_FLUSH_ERR) + return; + + sdp_set_error(&ssk->isk.sk, -ECONNRESET); + wake_up(&ssk->wq); +} + +static int sdp_process_tx_cq(struct sdp_sock *ssk) +{ + struct ib_wc ibwc[SDP_NUM_WC]; + int n, i; + int wc_processed = 0; + + if (!ssk->tx_ring.cq) { + sdp_warn(&ssk->isk.sk, "WARNING: tx irq when tx_cq is destroyed\n"); + return 0; + } + + do { + n = ib_poll_cq(ssk->tx_ring.cq, SDP_NUM_WC, ibwc); + for (i = 0; i < n; ++i) { + sdp_process_tx_wc(ssk, ibwc + i); + wc_processed++; + } + } while (n == SDP_NUM_WC); + + sdp_dbg_data(&ssk->isk.sk, "processed %d wc's\n", wc_processed); + + if (wc_processed) { + struct sock *sk = &ssk->isk.sk; + sdp_post_sends(ssk, 0); + + if (sk->sk_sleep && waitqueue_active(sk->sk_sleep)) + sk_stream_write_space(&ssk->isk.sk); + + } + + return wc_processed; +} + +void sdp_poll_tx_cq(unsigned long data) +{ + struct sdp_sock *ssk = (struct sdp_sock *)data; + struct sock *sk = &ssk->isk.sk; + u32 inflight, wc_processed; + + + sdp_dbg_data(&ssk->isk.sk, "Polling tx cq. inflight=%d\n", + (u32) ssk->tx_ring.head - ssk->tx_ring.tail); + + /* Only process if the socket is not in use */ + bh_lock_sock(sk); + if (sock_owned_by_user(sk)) { + mod_timer(&ssk->tx_ring.timer, jiffies + SDP_TX_POLL_TIMEOUT); + sdp_dbg_data(&ssk->isk.sk, "socket is busy - trying later\n"); + goto out; + } + + if (sk->sk_state == TCP_CLOSE) + goto out; + + wc_processed = sdp_process_tx_cq(ssk); + if (!wc_processed) + SDPSTATS_COUNTER_INC(tx_poll_miss); + else + SDPSTATS_COUNTER_INC(tx_poll_hit); + + inflight = (u32) ssk->tx_ring.head - ssk->tx_ring.tail; + + /* If there are still packets in flight and the timer has not already + * been scheduled by the Tx routine then schedule it here to guarantee + * completion processing of these packets */ + if (inflight) { /* TODO: make sure socket is not closed */ + sdp_dbg_data(sk, "arming timer for more polling\n"); + mod_timer(&ssk->tx_ring.timer, jiffies + SDP_TX_POLL_TIMEOUT); + } + +out: + bh_unlock_sock(sk); +} + +void _sdp_poll_tx_cq(unsigned long data) +{ + struct sdp_sock *ssk = (struct sdp_sock *)data; + struct sock *sk = &ssk->isk.sk; + + sdp_prf(sk, NULL, "sdp poll tx timeout expired"); + + sdp_poll_tx_cq(data); +} + +void sdp_tx_irq(struct ib_cq *cq, void *cq_context) +{ + struct sock *sk = cq_context; + struct sdp_sock *ssk = sdp_sk(sk); + + sdp_warn(sk, "Got tx comp interrupt\n"); + + mod_timer(&ssk->tx_ring.timer, jiffies + 1); +} + +void sdp_tx_ring_purge(struct sdp_sock *ssk) +{ + struct sk_buff *skb; + + while (ssk->tx_ring.head != ssk->tx_ring.tail) { + struct sk_buff *skb; + skb = sdp_send_completion(ssk, ssk->tx_ring.tail); + if (!skb) + break; + __kfree_skb(skb); + } +} + +void sdp_post_keepalive(struct sdp_sock *ssk) +{ + int rc; + struct ib_send_wr wr, *bad_wr; + + sdp_dbg(&ssk->isk.sk, "%s\n", __func__); + + memset(&wr, 0, sizeof(wr)); + + wr.next = NULL; + wr.wr_id = 0; + wr.sg_list = NULL; + wr.num_sge = 0; + wr.opcode = IB_WR_RDMA_WRITE; + + rc = ib_post_send(ssk->qp, &wr, &bad_wr); + if (rc) { + sdp_dbg(&ssk->isk.sk, "ib_post_keepalive failed with status %d.\n", rc); + sdp_set_error(&ssk->isk.sk, -ECONNRESET); + wake_up(&ssk->wq); + } + + sdp_cnt(sdp_keepalive_probes_sent); +} +