From 6d9df4fd9d65c5accb76002f76bd97eab7bb8718 Mon Sep 17 00:00:00 2001 From: Amir Vadai Date: Wed, 13 May 2009 14:37:13 +0300 Subject: [PATCH] sdp: process RX CQ from interrupt Signed-off-by: Amir Vadai --- drivers/infiniband/ulp/sdp/sdp.h | 140 +++++--- drivers/infiniband/ulp/sdp/sdp_bcopy.c | 66 ++-- drivers/infiniband/ulp/sdp/sdp_cma.c | 114 ++----- drivers/infiniband/ulp/sdp/sdp_main.c | 124 ++++--- drivers/infiniband/ulp/sdp/sdp_proc.c | 13 +- drivers/infiniband/ulp/sdp/sdp_rx.c | 442 +++++++++++++++++-------- drivers/infiniband/ulp/sdp/sdp_tx.c | 160 ++++++--- 7 files changed, 614 insertions(+), 445 deletions(-) diff --git a/drivers/infiniband/ulp/sdp/sdp.h b/drivers/infiniband/ulp/sdp/sdp.h index 5e782820099c..57265aa9ff74 100644 --- a/drivers/infiniband/ulp/sdp/sdp.h +++ b/drivers/infiniband/ulp/sdp/sdp.h @@ -7,11 +7,12 @@ #include /* For urgent data flags */ #include +#undef SDP_LOCKS_CHECK #define SDPSTATS_ON -#undef CONFIG_INFINIBAND_SDP_DEBUG_DATA +#define SDP_PROFILING #define _sdp_printk(func, line, level, sk, format, arg...) \ - printk(level "%s:%d sdp_sock(%d %d:%d): " format, \ + printk(level "%s:%d sdp_sock(%5d %d:%d): " format, \ func, line, \ current->pid, \ (sk) ? inet_sk(sk)->num : -1, \ @@ -22,6 +23,34 @@ sdp_printk(KERN_WARNING, sk, format , ## arg) +#ifdef SDP_LOCKS_CHECK +#define WARN_ON_UNLOCKED(sk, l) do {\ + if (unlikely(!spin_is_locked(l))) { \ + sdp_warn(sk, "lock " #l " should be locked\n"); \ + WARN_ON(1); \ + } \ +} while (0) + +#define WARN_ON_LOCKED(sk, l) do {\ + if (unlikely(spin_is_locked(l))) { \ + sdp_warn(sk, "lock " #l " should be unlocked\n"); \ + WARN_ON(1); \ + } \ +} while (0) +#else +#define WARN_ON_UNLOCKED(sk, l) +#define WARN_ON_LOCKED(sk, l) +#endif + +#define rx_ring_lock(ssk, f) do { \ + spin_lock_irqsave(&ssk->rx_ring.lock, f); \ +} while (0) + +#define rx_ring_unlock(ssk, f) do { \ + spin_unlock_irqrestore(&ssk->rx_ring.lock, f); \ +} while (0) + +#ifdef SDP_PROFILING struct sk_buff; struct sdpprf_log { int idx; @@ -37,7 +66,7 @@ struct sdpprf_log { int line; }; -#define SDPPRF_LOG_SIZE 0x10000 /* must be a power of 2 */ +#define SDPPRF_LOG_SIZE 0x20000 /* must be a power of 2 */ extern struct sdpprf_log sdpprf_log[SDPPRF_LOG_SIZE]; extern int sdpprf_log_count; @@ -48,7 +77,6 @@ static inline unsigned long long current_nsec(void) getnstimeofday(&tv); return tv.tv_sec * NSEC_PER_SEC + tv.tv_nsec; } -#if 1 #define sdp_prf(sk, s, format, arg...) ({ \ struct sdpprf_log *l = &sdpprf_log[sdpprf_log_count++ & (SDPPRF_LOG_SIZE - 1)]; \ l->idx = sdpprf_log_count - 1; \ @@ -66,16 +94,6 @@ static inline unsigned long long current_nsec(void) #define sdp_prf(sk, s, format, arg...) #endif -#if 0 -#if 1 -#define sdp_prf_rx(sk, s, format, arg...) sdp_prf(sk, s, format, ## arg) -#define sdp_prf_tx(sk, s, format, arg...) -#else -#define sdp_prf_rx(sk, s, format, arg...) -#define sdp_prf_tx(sk, s, format, arg...) sdp_prf(sk, s, format, ## arg) -#endif -#endif - #ifdef CONFIG_INFINIBAND_SDP_DEBUG extern int sdp_debug_level; @@ -125,7 +143,6 @@ extern int sdp_data_debug_level; } while (0) #else #define sdp_dbg_data(priv, format, arg...) -// do { (void) (priv); } while (0) #define SDP_DUMP_PACKET(sk, str, skb, h) #endif @@ -300,23 +317,36 @@ struct sdp_buf { u64 mapping[SDP_MAX_SEND_SKB_FRAGS + 1]; }; +#define ring_head(ring) (atomic_read(&(ring).head)) +#define ring_tail(ring) (atomic_read(&(ring).tail)) +#define ring_posted(ring) (ring_head(ring) - ring_tail(ring)) struct sdp_tx_ring { struct sdp_buf *buffer; - unsigned head; - unsigned tail; + atomic_t head; + atomic_t tail; + struct ib_cq *cq; int una_seq; - unsigned credits; + atomic_t credits; +#define tx_credits(ssk) (atomic_read(&ssk->tx_ring.credits)) struct timer_list timer; u16 poll_cnt; +}; + +struct sdp_rx_ring { + struct sdp_buf *buffer; + atomic_t head; + atomic_t tail; struct ib_cq *cq; + + spinlock_t lock; }; static inline int sdp_tx_ring_slots_left(struct sdp_tx_ring *tx_ring) { - return SDP_TX_SIZE - (tx_ring->head - tx_ring->tail); + return SDP_TX_SIZE - ring_posted(*tx_ring); } struct sdp_chrecvbuf { @@ -329,6 +359,7 @@ struct sdp_sock { struct list_head sock_list; struct list_head accept_queue; struct list_head backlog_queue; + struct sk_buff_head rx_backlog; struct sock *parent; struct work_struct rx_comp_work; @@ -362,32 +393,27 @@ struct sdp_sock { int sdp_disconnect; int destruct_in_process; - + struct sdp_rx_ring rx_ring; + struct sdp_tx_ring tx_ring; /* Data below will be reset on error */ struct rdma_cm_id *id; struct ib_device *ib_device; /* SDP specific */ - struct ib_recv_wr rx_wr; - unsigned rx_head; - unsigned rx_tail; - unsigned mseq_ack; + atomic_t mseq_ack; +#define mseq_ack(ssk) (atomic_read(&ssk->mseq_ack)) unsigned max_bufs; /* Initial buffers offered by other side */ unsigned min_bufs; /* Low water mark to wake senders */ - int remote_credits; + atomic_t remote_credits; +#define remote_credits(ssk) (atomic_read(&ssk->remote_credits)) int poll_cq; /* rdma specific */ struct ib_qp *qp; - struct ib_cq *rx_cq; struct ib_mr *mr; - struct sdp_buf *rx_ring; - struct sdp_tx_ring tx_ring; - struct ib_send_wr tx_wr; - /* SDP slow start */ int rcvbuf_scale; /* local recv buf scale for each socket */ int sent_request_head; /* mark the tx_head of the last send resize @@ -402,8 +428,6 @@ struct sdp_sock { /* BZCOPY data */ int zcopy_thresh; - - struct ib_sge ibsge[SDP_MAX_SEND_SKB_FRAGS + 1]; }; /* Context used for synchronous zero copy bcopy (BZCOY) */ @@ -499,13 +523,6 @@ static inline void sdp_set_error(struct sock *sk, int err) sk->sk_error_report(sk); } -static inline void sdp_arm_rx_cq(struct sock *sk) -{ - sdp_dbg_data(sk, "ib_req_notify_cq on RX cq\n"); - - ib_req_notify_cq(sdp_sk(sk)->rx_cq, IB_CQ_NEXT_COMP); -} - #ifdef CONFIG_INFINIBAND_SDP_DEBUG_DATA void _dump_packet(const char *func, int line, struct sock *sk, char *str, struct sk_buff *skb, const struct sdp_bsdh *h); @@ -524,28 +541,53 @@ void sdp_remove_sock(struct sdp_sock *ssk); void sdp_remove_large_sock(struct sdp_sock *ssk); void sdp_post_keepalive(struct sdp_sock *ssk); void sdp_start_keepalive_timer(struct sock *sk); -void sdp_bzcopy_write_space(struct sdp_sock *ssk); int sdp_init_sock(struct sock *sk); int __init sdp_proc_init(void); void sdp_proc_unregister(void); +/* sdp_tx.c */ +int sdp_tx_ring_create(struct sdp_sock *ssk, struct ib_device *device); +void sdp_tx_ring_destroy(struct sdp_sock *ssk); int sdp_xmit_poll(struct sdp_sock *ssk, int force); -void sdp_tx_ring_purge(struct sdp_sock *ssk); void sdp_post_send(struct sdp_sock *ssk, struct sk_buff *skb, u8 mid); void _sdp_post_sends(const char *func, int line, struct sdp_sock *ssk, int nonagle); #define sdp_post_sends(ssk, nonagle) _sdp_post_sends(__func__, __LINE__, ssk, nonagle) -void sdp_process_tx_wc_work(struct work_struct *work); -void sdp_poll_tx_cq(unsigned long data); -void _sdp_poll_tx_cq(unsigned long data); -void sdp_tx_irq(struct ib_cq *cq, void *cq_context); +/* sdp_rx.c */ +void sdp_rx_ring_init(struct sdp_sock *ssk); +int sdp_rx_ring_create(struct sdp_sock *ssk, struct ib_device *device); +void sdp_rx_ring_destroy(struct sdp_sock *ssk); +int sdp_process_rx_q(struct sdp_sock *ssk); int sdp_resize_buffers(struct sdp_sock *ssk, u32 new_size); int sdp_init_buffers(struct sdp_sock *ssk, u32 new_size); -void sdp_rx_ring_purge(struct sdp_sock *ssk); void sdp_post_recvs(struct sdp_sock *ssk); -void sdp_rx_comp_work(struct work_struct *work); -int sdp_poll_rx_cq(struct sdp_sock *ssk); -void sdp_rx_irq(struct ib_cq *cq, void *cq_context); + +static inline void sdp_arm_rx_cq(struct sock *sk) +{ + sdp_prf(sk, NULL, "Arming RX cq"); + sdp_dbg_data(sk, "Arming RX cq\n"); + + ib_req_notify_cq(sdp_sk(sk)->rx_ring.cq, IB_CQ_NEXT_COMP); +} + +/* utilities */ +static inline char *mid2str(int mid) +{ +#define ENUM2STR(e) [e] = #e + static char *mid2str[] = { + ENUM2STR(SDP_MID_HELLO), + ENUM2STR(SDP_MID_HELLO_ACK), + ENUM2STR(SDP_MID_DISCONN), + ENUM2STR(SDP_MID_CHRCVBUF), + ENUM2STR(SDP_MID_CHRCVBUF_ACK), + ENUM2STR(SDP_MID_DATA), + }; + + if (mid >= ARRAY_SIZE(mid2str)) + return NULL; + + return mid2str[mid]; +} static inline struct sk_buff *sdp_stream_alloc_skb(struct sock *sk, int size, gfp_t gfp) { diff --git a/drivers/infiniband/ulp/sdp/sdp_bcopy.c b/drivers/infiniband/ulp/sdp/sdp_bcopy.c index 5d4441ae8ecc..b98171ebc38c 100644 --- a/drivers/infiniband/ulp/sdp/sdp_bcopy.c +++ b/drivers/infiniband/ulp/sdp/sdp_bcopy.c @@ -45,18 +45,9 @@ void _dump_packet(const char *func, int line, struct sock *sk, char *str, { int len = 0; char buf[256]; -#define ENUM2STR(e) [e] = #e - static char *mid2str[] = { - ENUM2STR(SDP_MID_HELLO), - ENUM2STR(SDP_MID_HELLO_ACK), - ENUM2STR(SDP_MID_DISCONN), - ENUM2STR(SDP_MID_CHRCVBUF), - ENUM2STR(SDP_MID_CHRCVBUF_ACK), - ENUM2STR(SDP_MID_DATA), - }; - len += snprintf(buf, 255-len, "skb: %p mid: %2x:%-20s flags: 0x%x bufs: %d " + len += snprintf(buf, 255-len, "%s skb: %p mid: %2x:%-20s flags: 0x%x bufs: %d " "len: %d mseq: %d mseq_ack: %d", - skb, h->mid, mid2str[h->mid], h->flags, + str, skb, h->mid, mid2str(h->mid), h->flags, ntohs(h->bufs), ntohl(h->len),ntohl(h->mseq), ntohl(h->mseq_ack)); @@ -117,7 +108,7 @@ static inline int sdp_nagle_off(struct sdp_sock *ssk, struct sk_buff *skb) return (ssk->nonagle & TCP_NAGLE_OFF) || skb->next != (struct sk_buff *)&ssk->isk.sk.sk_write_queue || skb->len + sizeof(struct sdp_bsdh) >= ssk->xmit_size_goal || - (ssk->tx_ring.tail == ssk->tx_ring.head && + (ring_tail(ssk->tx_ring) == ring_head(ssk->tx_ring) && !(ssk->nonagle & TCP_NAGLE_CORK)) || (TCP_SKB_CB(skb)->flags & TCPCB_FLAG_PSH); } @@ -125,26 +116,14 @@ static inline int sdp_nagle_off(struct sdp_sock *ssk, struct sk_buff *skb) int sdp_post_credits(struct sdp_sock *ssk) { int post_count = 0; - struct sk_buff *skb; sdp_dbg_data(&ssk->isk.sk, "credits: %d remote credits: %d " "tx ring slots left: %d send_head: %p\n", - ssk->tx_ring.credits, ssk->remote_credits, + tx_credits(ssk), remote_credits(ssk), sdp_tx_ring_slots_left(&ssk->tx_ring), ssk->isk.sk.sk_send_head); - if (ssk->tx_ring.credits > SDP_MIN_TX_CREDITS && - sdp_tx_ring_slots_left(&ssk->tx_ring) && - (skb = ssk->isk.sk.sk_send_head) && - sdp_nagle_off(ssk, skb)) { - update_send_head(&ssk->isk.sk, skb); - __skb_dequeue(&ssk->isk.sk.sk_write_queue); - sdp_post_send(ssk, skb, SDP_MID_DATA); - post_count++; - goto out; - } - - if (likely(ssk->tx_ring.credits > 1) && + if (likely(tx_credits(ssk) > 1) && likely(sdp_tx_ring_slots_left(&ssk->tx_ring))) { struct sk_buff *skb; skb = sdp_stream_alloc_skb(&ssk->isk.sk, @@ -156,7 +135,6 @@ int sdp_post_credits(struct sdp_sock *ssk) post_count++; } -out: if (post_count) sdp_xmit_poll(ssk, 0); return post_count; @@ -188,7 +166,7 @@ void _sdp_post_sends(const char *func, int line, struct sdp_sock *ssk, int nonag gfp_page = GFP_KERNEL; sdp_dbg_data(&ssk->isk.sk, "credits: %d tx ring slots left: %d send_head: %p\n", - ssk->tx_ring.credits, sdp_tx_ring_slots_left(&ssk->tx_ring), + tx_credits(ssk), sdp_tx_ring_slots_left(&ssk->tx_ring), ssk->isk.sk.sk_send_head); if (sdp_tx_ring_slots_left(&ssk->tx_ring) < SDP_TX_SIZE / 2) { @@ -197,8 +175,8 @@ void _sdp_post_sends(const char *func, int line, struct sdp_sock *ssk, int nonag } if (ssk->recv_request && - ssk->rx_tail >= ssk->recv_request_head && - ssk->tx_ring.credits >= SDP_MIN_TX_CREDITS && + ring_tail(ssk->rx_ring) >= ssk->recv_request_head && + tx_credits(ssk) >= SDP_MIN_TX_CREDITS && sdp_tx_ring_slots_left(&ssk->tx_ring)) { struct sdp_chrecvbuf *resp_size; ssk->recv_request = 0; @@ -214,7 +192,7 @@ void _sdp_post_sends(const char *func, int line, struct sdp_sock *ssk, int nonag post_count++; } - if (ssk->tx_ring.credits <= SDP_MIN_TX_CREDITS && + if (tx_credits(ssk) <= SDP_MIN_TX_CREDITS && sdp_tx_ring_slots_left(&ssk->tx_ring) && (skb = ssk->isk.sk.sk_send_head) && sdp_nagle_off(ssk, skb)) { @@ -222,7 +200,7 @@ void _sdp_post_sends(const char *func, int line, struct sdp_sock *ssk, int nonag sdp_prf(&ssk->isk.sk, skb, "no credits. called from %s:%d", func, line); } - while (ssk->tx_ring.credits > SDP_MIN_TX_CREDITS && + while (tx_credits(ssk) > SDP_MIN_TX_CREDITS && sdp_tx_ring_slots_left(&ssk->tx_ring) && (skb = ssk->isk.sk.sk_send_head) && sdp_nagle_off(ssk, skb)) { @@ -232,9 +210,9 @@ void _sdp_post_sends(const char *func, int line, struct sdp_sock *ssk, int nonag post_count++; } - if (0 && ssk->tx_ring.credits == SDP_MIN_TX_CREDITS && + if (0 && tx_credits(ssk) == SDP_MIN_TX_CREDITS && !ssk->sent_request && - ssk->tx_ring.head > ssk->sent_request_head + SDP_RESIZE_WAIT && + ring_head(ssk->tx_ring) > ssk->sent_request_head + SDP_RESIZE_WAIT && sdp_tx_ring_slots_left(&ssk->tx_ring)) { struct sdp_chrecvbuf *req_size; skb = sdp_stream_alloc_skb(&ssk->isk.sk, @@ -244,19 +222,19 @@ void _sdp_post_sends(const char *func, int line, struct sdp_sock *ssk, int nonag /* FIXME */ BUG_ON(!skb); ssk->sent_request = SDP_MAX_SEND_SKB_FRAGS * PAGE_SIZE; - ssk->sent_request_head = ssk->tx_ring.head; + ssk->sent_request_head = ring_head(ssk->tx_ring); req_size = (struct sdp_chrecvbuf *)skb_put(skb, sizeof *req_size); req_size->size = htonl(ssk->sent_request); sdp_post_send(ssk, skb, SDP_MID_CHRCVBUF); post_count++; } - c = ssk->remote_credits; + c = remote_credits(ssk); if (likely(c > SDP_MIN_TX_CREDITS)) c *= 2; - if (unlikely(c < ssk->rx_head - ssk->rx_tail) && - likely(ssk->tx_ring.credits > 1) && + if (unlikely(c < ring_posted(ssk->rx_ring)) && + likely(tx_credits(ssk) > 1) && likely(sdp_tx_ring_slots_left(&ssk->tx_ring)) && likely((1 << ssk->isk.sk.sk_state) & (TCPF_ESTABLISHED | TCPF_FIN_WAIT1))) { @@ -276,8 +254,8 @@ void _sdp_post_sends(const char *func, int line, struct sdp_sock *ssk, int nonag * messages that provide additional credits and also do not contain ULP * payload. */ if (unlikely(ssk->sdp_disconnect) && - !ssk->isk.sk.sk_send_head && - ssk->tx_ring.credits > (ssk->remote_credits >= ssk->rx_head - ssk->rx_tail)) { + !ssk->isk.sk.sk_send_head && + tx_credits(ssk) > 1) { ssk->sdp_disconnect = 0; skb = sdp_stream_alloc_skb(&ssk->isk.sk, sizeof(struct sdp_bsdh), @@ -287,14 +265,10 @@ void _sdp_post_sends(const char *func, int line, struct sdp_sock *ssk, int nonag sdp_post_send(ssk, skb, SDP_MID_DISCONN); post_count++; } + if (post_count) { sdp_xmit_poll(ssk, 0); - sdp_prf(&ssk->isk.sk, NULL, "finshed polling from post_sends"); + sdp_prf(&ssk->isk.sk, NULL, "post_sends finished polling [%s:%d].", func, line); } } - -static inline int sdp_tx_qp_empty(struct sdp_sock *ssk) -{ - return (ssk->tx_ring.head - ssk->tx_ring.tail) == 0; -} diff --git a/drivers/infiniband/ulp/sdp/sdp_cma.c b/drivers/infiniband/ulp/sdp/sdp_cma.c index e0d0b20e8c31..a9dcf777509c 100644 --- a/drivers/infiniband/ulp/sdp/sdp_cma.c +++ b/drivers/infiniband/ulp/sdp/sdp_cma.c @@ -53,10 +53,6 @@ enum { SDP_HAH_SIZE = 180, }; -static void sdp_cq_event_handler(struct ib_event *event, void *data) -{ -} - static void sdp_qp_event_handler(struct ib_event *event, void *data) { } @@ -73,36 +69,12 @@ static int sdp_init_qp(struct sock *sk, struct rdma_cm_id *id) .qp_type = IB_QPT_RC, }; struct ib_device *device = id->device; - struct ib_cq *rx_cq, *tx_cq; struct ib_mr *mr; struct ib_pd *pd; int rc; sdp_dbg(sk, "%s\n", __func__); - sdp_sk(sk)->tx_ring.head = 1; - sdp_sk(sk)->tx_ring.tail = 1; - sdp_sk(sk)->rx_head = 1; - sdp_sk(sk)->rx_tail = 1; - - sdp_sk(sk)->tx_ring.buffer = kmalloc(sizeof(*sdp_sk(sk)->tx_ring.buffer) * - (SDP_TX_SIZE + 1), GFP_KERNEL); - if (!sdp_sk(sk)->tx_ring.buffer) { - rc = -ENOMEM; - sdp_warn(sk, "Unable to allocate TX Ring size %zd.\n", - sizeof *sdp_sk(sk)->tx_ring.buffer * (SDP_TX_SIZE + 1)); - goto err_tx; - } - - sdp_sk(sk)->rx_ring = kmalloc(sizeof *sdp_sk(sk)->rx_ring * SDP_RX_SIZE, - GFP_KERNEL); - if (!sdp_sk(sk)->rx_ring) { - rc = -ENOMEM; - sdp_warn(sk, "Unable to allocate RX Ring size %zd.\n", - sizeof *sdp_sk(sk)->rx_ring * SDP_RX_SIZE); - goto err_rx; - } - pd = ib_alloc_pd(device); if (IS_ERR(pd)) { rc = PTR_ERR(pd); @@ -118,43 +90,15 @@ static int sdp_init_qp(struct sock *sk, struct rdma_cm_id *id) } sdp_sk(sk)->mr = mr; - INIT_WORK(&sdp_sk(sk)->rx_comp_work, sdp_rx_comp_work); - - rx_cq = ib_create_cq(device, sdp_rx_irq, sdp_cq_event_handler, - sk, SDP_RX_SIZE, 0); - - if (IS_ERR(rx_cq)) { - rc = PTR_ERR(rx_cq); - sdp_warn(sk, "Unable to allocate RX CQ: %d.\n", rc); - goto err_rx_cq; - } - rc = ib_modify_cq(rx_cq, 10, 200); - if (rc) { - sdp_warn(sk, "Unable to modify RX CQ: %d.\n", rc); - goto err_tx_cq; - } - sdp_warn(sk, "Initialized CQ moderation\n"); - sdp_sk(sk)->rx_cq = rx_cq; - sdp_arm_rx_cq(sk); - qp_init_attr.recv_cq = rx_cq; - - tx_cq = ib_create_cq(device, sdp_tx_irq, sdp_cq_event_handler, - sk, SDP_TX_SIZE, 0); - - if (IS_ERR(tx_cq)) { - rc = PTR_ERR(tx_cq); - sdp_warn(sk, "Unable to allocate TX CQ: %d.\n", rc); - goto err_tx_cq; - } + if ((rc = sdp_rx_ring_create(sdp_sk(sk), device))) + goto err_rx; - init_timer(&sdp_sk(sk)->tx_ring.timer); - sdp_sk(sk)->tx_ring.timer.function = _sdp_poll_tx_cq; - sdp_sk(sk)->tx_ring.timer.data = (unsigned long) sdp_sk(sk); - sdp_sk(sk)->tx_ring.poll_cnt = 0; + if ((rc = sdp_tx_ring_create(sdp_sk(sk), device))) + goto err_tx; - sdp_sk(sk)->tx_ring.cq = tx_cq; - qp_init_attr.send_cq = tx_cq; + qp_init_attr.recv_cq = sdp_sk(sk)->rx_ring.cq; + qp_init_attr.send_cq = sdp_sk(sk)->tx_ring.cq; rc = rdma_create_qp(id, pd, &qp_init_attr); if (rc) { @@ -170,20 +114,14 @@ static int sdp_init_qp(struct sock *sk, struct rdma_cm_id *id) return 0; err_qp: - ib_destroy_cq(tx_cq); -err_tx_cq: - ib_destroy_cq(rx_cq); -err_rx_cq: + sdp_tx_ring_destroy(sdp_sk(sk)); +err_tx: + sdp_rx_ring_destroy(sdp_sk(sk)); +err_rx: ib_dereg_mr(sdp_sk(sk)->mr); err_mr: ib_dealloc_pd(pd); err_pd: - kfree(sdp_sk(sk)->rx_ring); - sdp_sk(sk)->rx_ring = NULL; -err_rx: - kfree(sdp_sk(sk)->tx_ring.buffer); - sdp_sk(sk)->tx_ring.buffer = NULL; -err_tx: return rc; } @@ -225,8 +163,10 @@ static int sdp_connect_handler(struct sock *sk, struct rdma_cm_id *id, sdp_add_sock(sdp_sk(child)); - sdp_sk(child)->max_bufs = sdp_sk(child)->tx_ring.credits = ntohs(h->bsdh.bufs); - sdp_sk(child)->min_bufs = sdp_sk(child)->tx_ring.credits / 4; + sdp_sk(child)->max_bufs = ntohs(h->bsdh.bufs); + atomic_set(&sdp_sk(child)->tx_ring.credits, sdp_sk(child)->max_bufs); + + sdp_sk(child)->min_bufs = tx_credits(sdp_sk(child)) / 4; sdp_sk(child)->xmit_size_goal = ntohl(h->localrcvsz) - sizeof(struct sdp_bsdh); sdp_sk(child)->send_frags = PAGE_ALIGN(sdp_sk(child)->xmit_size_goal) / @@ -236,7 +176,7 @@ static int sdp_connect_handler(struct sock *sk, struct rdma_cm_id *id, sdp_dbg(child, "%s recv_frags: %d tx credits %d xmit_size_goal %d send trigger %d\n", __func__, sdp_sk(child)->recv_frags, - sdp_sk(child)->tx_ring.credits, + tx_credits(sdp_sk(child)), sdp_sk(child)->xmit_size_goal, sdp_sk(child)->min_bufs); @@ -272,8 +212,9 @@ static int sdp_response_handler(struct sock *sk, struct rdma_cm_id *id, h = event->param.conn.private_data; SDP_DUMP_PACKET(sk, "RX", NULL, &h->bsdh); - sdp_sk(sk)->max_bufs = sdp_sk(sk)->tx_ring.credits = ntohs(h->bsdh.bufs); - sdp_sk(sk)->min_bufs = sdp_sk(sk)->tx_ring.credits / 4; + sdp_sk(sk)->max_bufs = ntohs(h->bsdh.bufs); + atomic_set(&sdp_sk(sk)->tx_ring.credits, sdp_sk(sk)->max_bufs); + sdp_sk(sk)->min_bufs = tx_credits(sdp_sk(sk)) / 4; sdp_sk(sk)->xmit_size_goal = ntohl(h->actrcvsz) - sizeof(struct sdp_bsdh); sdp_sk(sk)->send_frags = MIN(PAGE_ALIGN(sdp_sk(sk)->xmit_size_goal) / @@ -282,14 +223,12 @@ static int sdp_response_handler(struct sock *sk, struct rdma_cm_id *id, sdp_sk(sk)->send_frags * PAGE_SIZE); sdp_dbg(sk, "tx credits %d xmit_size_goal %d send_frags: %d credits update trigger %d\n", - sdp_sk(sk)->tx_ring.credits, + tx_credits(sdp_sk(sk)), sdp_sk(sk)->xmit_size_goal, sdp_sk(sk)->send_frags, sdp_sk(sk)->min_bufs); sdp_sk(sk)->poll_cq = 1; - sdp_arm_rx_cq(sk); - sdp_poll_rx_cq(sdp_sk(sk)); sk->sk_state_change(sk); sk_wake_async(sk, 0, POLL_OUT); @@ -349,8 +288,7 @@ static int sdp_disconnected_handler(struct sock *sk) sdp_dbg(sk, "%s\n", __func__); - if (ssk->rx_cq) - sdp_poll_rx_cq(ssk); + sdp_process_rx_q(ssk); if (ssk->tx_ring.cq) sdp_xmit_poll(ssk, 1); @@ -400,7 +338,7 @@ int sdp_cma_handler(struct rdma_cm_id *id, struct rdma_cm_event *event) rc = rdma_resolve_route(id, SDP_ROUTE_TIMEOUT); break; case RDMA_CM_EVENT_ADDR_ERROR: - sdp_dbg(sk, "RDMA_CM_EVENT_ADDR_ERROR\n"); + sdp_warn(sk, "RDMA_CM_EVENT_ADDR_ERROR\n"); rc = -ENETUNREACH; break; case RDMA_CM_EVENT_ROUTE_RESOLVED: @@ -408,11 +346,10 @@ int sdp_cma_handler(struct rdma_cm_id *id, struct rdma_cm_event *event) rc = sdp_init_qp(sk, id); if (rc) break; - sdp_sk(sk)->remote_credits = sdp_sk(sk)->rx_head - - sdp_sk(sk)->rx_tail; + atomic_set(&sdp_sk(sk)->remote_credits, ring_posted(sdp_sk(sk)->rx_ring)); memset(&hh, 0, sizeof hh); hh.bsdh.mid = SDP_MID_HELLO; - hh.bsdh.bufs = htons(sdp_sk(sk)->remote_credits); + hh.bsdh.bufs = htons(remote_credits(sdp_sk(sk))); hh.bsdh.len = htonl(sizeof(struct sdp_bsdh) + SDP_HH_SIZE); hh.max_adverts = 1; hh.majv_minv = SDP_MAJV_MINV; @@ -443,11 +380,10 @@ int sdp_cma_handler(struct rdma_cm_id *id, struct rdma_cm_event *event) break; } child = id->context; - sdp_sk(child)->remote_credits = sdp_sk(child)->rx_head - - sdp_sk(child)->rx_tail; + atomic_set(&sdp_sk(child)->remote_credits, ring_posted(sdp_sk(child)->rx_ring)); memset(&hah, 0, sizeof hah); hah.bsdh.mid = SDP_MID_HELLO_ACK; - hah.bsdh.bufs = htons(sdp_sk(child)->remote_credits); + hah.bsdh.bufs = htons(remote_credits(sdp_sk(child))); hah.bsdh.len = htonl(sizeof(struct sdp_bsdh) + SDP_HAH_SIZE); hah.majv_minv = SDP_MAJV_MINV; hah.ext_max_adverts = 1; /* Doesn't seem to be mandated by spec, diff --git a/drivers/infiniband/ulp/sdp/sdp_main.c b/drivers/infiniband/ulp/sdp/sdp_main.c index b5322da65322..c6b17dbcc6d8 100644 --- a/drivers/infiniband/ulp/sdp/sdp_main.c +++ b/drivers/infiniband/ulp/sdp/sdp_main.c @@ -206,31 +206,24 @@ static int sdp_get_port(struct sock *sk, unsigned short snum) static void sdp_destroy_qp(struct sdp_sock *ssk) { struct ib_pd *pd = NULL; - struct ib_cq *rx_cq = NULL; - struct ib_cq *tx_cq = NULL; + unsigned long flags; + + + sdp_dbg(&ssk->isk.sk, "destroying qp\n"); del_timer(&ssk->tx_ring.timer); + rx_ring_lock(ssk, flags); + + sdp_rx_ring_destroy(ssk); + sdp_tx_ring_destroy(ssk); + if (ssk->qp) { pd = ssk->qp->pd; - rx_cq = ssk->rx_cq; - ssk->rx_cq = NULL; - tx_cq = ssk->tx_ring.cq; - ssk->tx_ring.cq = NULL; ib_destroy_qp(ssk->qp); ssk->qp = NULL; - - sdp_rx_ring_purge(ssk); - sdp_tx_ring_purge(ssk); - } - - if (tx_cq) { - ib_destroy_cq(tx_cq); } - if (rx_cq) - ib_destroy_cq(rx_cq); - if (ssk->mr) { ib_dereg_mr(ssk->mr); ssk->mr = NULL; @@ -241,14 +234,8 @@ static void sdp_destroy_qp(struct sdp_sock *ssk) sdp_remove_large_sock(ssk); - if (ssk->rx_ring) { - kfree(ssk->rx_ring); - ssk->rx_ring = NULL; - } - if (ssk->tx_ring.buffer) { - kfree(ssk->tx_ring.buffer); - ssk->tx_ring.buffer = NULL; - } + rx_ring_unlock(ssk, flags); + } static void sdp_reset_keepalive_timer(struct sock *sk, unsigned long len) @@ -257,8 +244,8 @@ static void sdp_reset_keepalive_timer(struct sock *sk, unsigned long len) sdp_dbg(sk, "%s\n", __func__); - ssk->keepalive_tx_head = ssk->tx_ring.head; - ssk->keepalive_rx_head = ssk->rx_head; + ssk->keepalive_tx_head = ring_head(ssk->tx_ring); + ssk->keepalive_rx_head = ring_head(ssk->rx_ring); sk_reset_timer(sk, &sk->sk_timer, jiffies + len); } @@ -293,8 +280,8 @@ static void sdp_keepalive_timer(unsigned long data) sk->sk_state == TCP_CLOSE) goto out; - if (ssk->keepalive_tx_head == ssk->tx_ring.head && - ssk->keepalive_rx_head == ssk->rx_head) + if (ssk->keepalive_tx_head == ring_head(ssk->tx_ring) && + ssk->keepalive_rx_head == ring_head(ssk->rx_ring)) sdp_post_keepalive(ssk); sdp_reset_keepalive_timer(sk, sdp_keepalive_time_when(ssk)); @@ -338,17 +325,21 @@ void sdp_reset_sk(struct sock *sk, int rc) read_lock(&device_removal_lock); - if (ssk->rx_cq) - sdp_poll_rx_cq(ssk); + sdp_process_rx_q(ssk); if (ssk->tx_ring.cq) sdp_xmit_poll(ssk, 1); - if (!(sk->sk_shutdown & RCV_SHUTDOWN) || !sk_stream_memory_free(sk)) + if (!(sk->sk_shutdown & RCV_SHUTDOWN) || !sk_stream_memory_free(sk)) { + sdp_warn(sk, "setting state to error. shutdown: %d, mem_free: %d\n", + !(sk->sk_shutdown & RCV_SHUTDOWN), + !sk_stream_memory_free(sk)); sdp_set_error(sk, rc); + } sdp_destroy_qp(ssk); + sdp_dbg(sk, "memset on sdp_sock\n"); memset((void *)&ssk->id, 0, sizeof(*ssk) - offsetof(typeof(*ssk), id)); sk->sk_state_change(sk); @@ -488,6 +479,7 @@ static void sdp_close(struct sock *sk, long timeout) lock_sock(sk); sdp_dbg(sk, "%s\n", __func__); + sdp_prf(sk, NULL, __func__); sdp_delete_keepalive_timer(sk); @@ -773,10 +765,9 @@ out: release_sock(sk); if (newsk) { lock_sock(newsk); - if (newssk->rx_cq) { + if (newssk->rx_ring.cq) { newssk->poll_cq = 1; sdp_arm_rx_cq(&newssk->isk.sk); - sdp_poll_rx_cq(newssk); } release_sock(newsk); } @@ -934,7 +925,11 @@ int sdp_init_sock(struct sock *sk) sk->sk_route_caps |= NETIF_F_SG | NETIF_F_NO_CSUM; - ssk->rx_ring = NULL; + skb_queue_head_init(&ssk->rx_backlog); + + atomic_set(&ssk->mseq_ack, 0); + + sdp_rx_ring_init(ssk); ssk->tx_ring.buffer = NULL; ssk->sdp_disconnect = 0; ssk->destructed_already = 0; @@ -1142,14 +1137,13 @@ static int sdp_getsockopt(struct sock *sk, int level, int optname, static inline int poll_recv_cq(struct sock *sk) { int i; - if (sdp_sk(sk)->rx_cq) { - for (i = 0; i < recv_poll; ++i) - if (!sdp_poll_rx_cq(sdp_sk(sk))) { - ++recv_poll_hit; - return 0; - } - ++recv_poll_miss; + for (i = 0; i < recv_poll; ++i) { + if (!sdp_process_rx_q(sdp_sk(sk))) { + ++recv_poll_hit; + return 0; + } } + ++recv_poll_miss; return 1; } @@ -1551,8 +1545,8 @@ static inline int slots_free(struct sdp_sock *ssk) { int min_free; - min_free = MIN(ssk->tx_ring.credits, - SDP_TX_SIZE - (ssk->tx_ring.head - ssk->tx_ring.tail)); + min_free = MIN(tx_credits(ssk), + SDP_TX_SIZE - ring_posted(ssk->tx_ring)); if (min_free < SDP_MIN_TX_CREDITS) return 0; @@ -1608,6 +1602,9 @@ static int sdp_bzcopy_wait_memory(struct sdp_sock *ssk, long *timeo_p, set_bit(SOCK_NOSPACE, &sk->sk_socket->flags); sk->sk_write_pending++; + sdp_prf(sk, NULL, "credits: %d, head: %d, tail: %d, busy: %d", + tx_credits(ssk), ring_head(ssk->tx_ring), ring_tail(ssk->tx_ring), + bz->busy); sk_wait_event(sk, ¤t_timeo, sdp_bzcopy_slots_avail(ssk, bz) && vm_wait); sk->sk_write_pending--; @@ -1627,24 +1624,6 @@ static int sdp_bzcopy_wait_memory(struct sdp_sock *ssk, long *timeo_p, return err; } -/* like sk_stream_write_space - execpt measures remote credits */ -void sdp_bzcopy_write_space(struct sdp_sock *ssk) -{ - struct sock *sk = &ssk->isk.sk; - struct socket *sock = sk->sk_socket; - - if (ssk->tx_ring.credits >= ssk->min_bufs && - ssk->tx_ring.head == ssk->tx_ring.tail && - sock != NULL) { - clear_bit(SOCK_NOSPACE, &sock->flags); - - if (sk->sk_sleep && waitqueue_active(sk->sk_sleep)) - wake_up_interruptible(sk->sk_sleep); - if (sock->fasync_list && !(sk->sk_shutdown & SEND_SHUTDOWN)) - sock_wake_async(sock, 2, POLL_OUT); - } -} - /* Like tcp_sendmsg */ /* TODO: check locking */ static int sdp_sendmsg(struct kiocb *iocb, struct sock *sk, struct msghdr *msg, @@ -1735,7 +1714,7 @@ new_segment: if (!skb) goto wait_for_memory; - sdp_prf(sk, skb, "Created"); +// sdp_prf(sk, skb, "Created"); BZCOPY_STATE(skb) = bz; @@ -1751,7 +1730,7 @@ new_segment: skb_entail(sk, ssk, skb); copy = size_goal; } else { - sdp_prf(sk, skb, "adding %d bytes", copy); +// sdp_prf(sk, skb, "adding %d bytes", copy); sdp_dbg_data(sk, "adding to existing skb: %p" " len = %d, sk_send_head: %p copy: %d\n", skb, skb->len, sk->sk_send_head, copy); @@ -1770,10 +1749,8 @@ new_segment: goto new_segment; } -// sdp_prf(sk, skb, "before memcpy %d bytes", copy); copy = (bz) ? sdp_bzcopy_get(sk, skb, from, copy, bz) : sdp_bcopy_get(sk, skb, from, copy); -// sdp_prf(sk, skb, "after memcpy. result: %d", copy); if (unlikely(copy < 0)) { if (!++copy) goto wait_for_memory; @@ -1863,6 +1840,20 @@ out_err: return err; } +int dummy_memcpy_toiovec(struct iovec *iov, int len) +{ + while (len > 0) { + if (iov->iov_len) { + int copy = min_t(unsigned int, iov->iov_len, len); + len -= copy; + iov->iov_len -= copy; + iov->iov_base += copy; + } + iov++; + } + + return 0; +} /* Like tcp_recvmsg */ /* Maybe use skb_recv_datagram here? */ /* Note this does not seem to handle vectored messages. Relevant? */ @@ -1884,7 +1875,7 @@ static int sdp_recvmsg(struct kiocb *iocb, struct sock *sk, struct msghdr *msg, lock_sock(sk); sdp_dbg_data(sk, "%s\n", __func__); - sdp_prf(sk, skb, "Read from user"); +// sdp_prf(sk, skb, "Read from user"); err = -ENOTCONN; if (sk->sk_state == TCP_LISTEN) @@ -2024,6 +2015,7 @@ static int sdp_recvmsg(struct kiocb *iocb, struct sock *sk, struct msghdr *msg, err = skb_copy_datagram_iovec(skb, offset, /* TODO: skip header? */ msg->msg_iov, used); +// err = dummy_memcpy_toiovec(msg->msg_iov, used); sdp_prf(sk, skb, "Copied to user %ld bytes. err = %d", used, err); if (err) { sdp_dbg(sk, "%s: skb_copy_datagram_iovec failed" diff --git a/drivers/infiniband/ulp/sdp/sdp_proc.c b/drivers/infiniband/ulp/sdp/sdp_proc.c index 3778c9a971af..537b3a626553 100644 --- a/drivers/infiniband/ulp/sdp/sdp_proc.c +++ b/drivers/infiniband/ulp/sdp/sdp_proc.c @@ -236,15 +236,6 @@ static void sdpstats_seq_hist(struct seq_file *seq, char *str, u32 *h, int n, in static int sdpstats_seq_show(struct seq_file *seq, void *v) { -#define ENUM2STR(e) [e] = #e - static char *mid2str[] = { - ENUM2STR(SDP_MID_HELLO), - ENUM2STR(SDP_MID_HELLO_ACK), - ENUM2STR(SDP_MID_DISCONN), - ENUM2STR(SDP_MID_CHRCVBUF), - ENUM2STR(SDP_MID_CHRCVBUF_ACK), - ENUM2STR(SDP_MID_DATA), - }; int i; seq_printf(seq, "SDP statistics:\n"); @@ -268,9 +259,9 @@ static int sdpstats_seq_show(struct seq_file *seq, void *v) seq_printf(seq, "memcpy_count \t\t: %u\n", sdpstats.memcpy_count); for (i = 0; i < ARRAY_SIZE(sdpstats.post_send); i++) { - if (mid2str[i]) { + if (mid2str(i)) { seq_printf(seq, "post_send %-20s\t: %d\n", - mid2str[i], sdpstats.post_send[i]); + mid2str(i), sdpstats.post_send[i]); } } diff --git a/drivers/infiniband/ulp/sdp/sdp_rx.c b/drivers/infiniband/ulp/sdp/sdp_rx.c index ba8e0fefb2fc..810dcbbcdead 100644 --- a/drivers/infiniband/ulp/sdp/sdp_rx.c +++ b/drivers/infiniband/ulp/sdp/sdp_rx.c @@ -39,7 +39,7 @@ static int rcvbuf_scale = 0x10; -int rcvbuf_initial_size = SDP_HEAD_SIZE; +int rcvbuf_initial_size = 32 * 1024; module_param_named(rcvbuf_initial_size, rcvbuf_initial_size, int, 0644); MODULE_PARM_DESC(rcvbuf_initial_size, "Receive buffer initial size in bytes."); @@ -149,22 +149,28 @@ static void sdp_fin(struct sock *sk) } } - -static void sdp_post_recv(struct sdp_sock *ssk) +/* lock_sock must be taken before calling this - since rx_ring.head is not + * protected (although being atomic + */ +static int sdp_post_recv(struct sdp_sock *ssk) { struct sdp_buf *rx_req; int i, rc, frags; u64 addr; struct ib_device *dev; - struct ib_sge *sge; + struct ib_recv_wr rx_wr = { 0 }; + struct ib_sge ibsge[SDP_MAX_SEND_SKB_FRAGS + 1]; + struct ib_sge *sge = ibsge; struct ib_recv_wr *bad_wr; struct sk_buff *skb; struct page *page; skb_frag_t *frag; struct sdp_bsdh *h; - int id = ssk->rx_head; + int id = ring_head(ssk->rx_ring); gfp_t gfp_page; + int ret = 0; + WARN_ON_UNLOCKED(&ssk->isk.sk, &ssk->rx_ring.lock); /* Now, allocate and repost recv */ /* TODO: allocate from cache */ @@ -194,10 +200,9 @@ static void sdp_post_recv(struct sdp_sock *ssk) skb->truesize += frag->size; } - rx_req = ssk->rx_ring + (id & (SDP_RX_SIZE - 1)); + rx_req = ssk->rx_ring.buffer + (id & (SDP_RX_SIZE - 1)); rx_req->skb = skb; dev = ssk->ib_device; - sge = ssk->ibsge; addr = ib_dma_map_single(dev, h, SDP_HEAD_SIZE, DMA_FROM_DEVICE); BUG_ON(ib_dma_mapping_error(dev, addr)); @@ -221,27 +226,32 @@ static void sdp_post_recv(struct sdp_sock *ssk) sge->lkey = ssk->mr->lkey; } - ssk->rx_wr.next = NULL; - ssk->rx_wr.wr_id = id | SDP_OP_RECV; - ssk->rx_wr.sg_list = ssk->ibsge; - ssk->rx_wr.num_sge = frags + 1; - rc = ib_post_recv(ssk->qp, &ssk->rx_wr, &bad_wr); - sdp_prf(&ssk->isk.sk, skb, "rx skb was posted"); + rx_wr.next = NULL; + rx_wr.wr_id = id | SDP_OP_RECV; + rx_wr.sg_list = ibsge; + rx_wr.num_sge = frags + 1; + rc = ib_post_recv(ssk->qp, &rx_wr, &bad_wr); SDPSTATS_COUNTER_INC(post_recv); - ++ssk->rx_head; + atomic_inc(&ssk->rx_ring.head); if (unlikely(rc)) { - sdp_dbg(&ssk->isk.sk, "ib_post_recv failed with status %d\n", rc); + sdp_warn(&ssk->isk.sk, "ib_post_recv failed with status %d\n", rc); sdp_reset(&ssk->isk.sk); + ret = -1; } atomic_add(SDP_MAX_SEND_SKB_FRAGS, &sdp_current_mem_usage); + + return ret; } -void sdp_post_recvs(struct sdp_sock *ssk) +/* lock_sock must be taken before calling this */ +static void _sdp_post_recvs(struct sdp_sock *ssk) { struct sock *sk = &ssk->isk.sk; int scale = ssk->rcvbuf_scale; + WARN_ON_UNLOCKED(&ssk->isk.sk, &ssk->rx_ring.lock); + if (unlikely(!ssk->id || ((1 << sk->sk_state) & (TCPF_CLOSE | TCPF_TIME_WAIT)))) { return; @@ -251,12 +261,23 @@ void sdp_post_recvs(struct sdp_sock *ssk) (top_mem_usage * 0x100000) < atomic_read(&sdp_current_mem_usage) * PAGE_SIZE) scale = 1; - while ((likely(ssk->rx_head - ssk->rx_tail < SDP_RX_SIZE) && - (ssk->rx_head - ssk->rx_tail - SDP_MIN_TX_CREDITS) * + while ((likely(ring_posted(ssk->rx_ring) < SDP_RX_SIZE) && + (ring_posted(ssk->rx_ring) - SDP_MIN_TX_CREDITS) * (SDP_HEAD_SIZE + ssk->recv_frags * PAGE_SIZE) + ssk->rcv_nxt - ssk->copied_seq < sk->sk_rcvbuf * scale) || - unlikely(ssk->rx_head - ssk->rx_tail < SDP_MIN_TX_CREDITS)) - sdp_post_recv(ssk); + unlikely(ring_posted(ssk->rx_ring) < SDP_MIN_TX_CREDITS)) { + if (sdp_post_recv(ssk)) + break; + } +} + +void sdp_post_recvs(struct sdp_sock *ssk) +{ + unsigned long flags; + + rx_ring_lock(ssk, flags); + _sdp_post_recvs(ssk); + rx_ring_unlock(ssk, flags); } static inline struct sk_buff *sdp_sock_queue_rcv_skb(struct sock *sk, @@ -328,9 +349,9 @@ int sdp_resize_buffers(struct sdp_sock *ssk, u32 new_size) static void sdp_handle_resize_request(struct sdp_sock *ssk, struct sdp_chrecvbuf *buf) { if (sdp_resize_buffers(ssk, ntohl(buf->size)) == 0) - ssk->recv_request_head = ssk->rx_head + 1; + ssk->recv_request_head = ring_head(ssk->rx_ring) + 1; else - ssk->recv_request_head = ssk->rx_tail; + ssk->recv_request_head = ring_tail(ssk->rx_ring); ssk->recv_request = 1; } @@ -347,23 +368,17 @@ static void sdp_handle_resize_ack(struct sdp_sock *ssk, struct sdp_chrecvbuf *bu ssk->sent_request = 0; } -static inline int credit_update_needed(struct sdp_sock *ssk, int wc_processed) +static inline int credit_update_needed(struct sdp_sock *ssk) { int c; - c = ssk->remote_credits; + c = remote_credits(ssk); if (likely(c > SDP_MIN_TX_CREDITS)) c += c/2; -/* sdp_warn(&ssk->isk.sk, "credits: %d remote credits: %d " - "tx ring slots left: %d send_head: %p\n", - ssk->tx_ring.credits, ssk->remote_credits, - sdp_tx_ring_slots_left(&ssk->tx_ring), - ssk->isk.sk.sk_send_head); -*/ - return (unlikely(c < ssk->rx_head - ssk->rx_tail + wc_processed) && - likely(ssk->tx_ring.credits > 1) && - likely(sdp_tx_ring_slots_left(&ssk->tx_ring))); + return unlikely(c < ring_posted(ssk->rx_ring)) && + likely(tx_credits(ssk) > 1) && + likely(sdp_tx_ring_slots_left(&ssk->tx_ring)); } @@ -374,14 +389,16 @@ static struct sk_buff *sdp_recv_completion(struct sdp_sock *ssk, int id) struct sk_buff *skb; int i, frags; - if (unlikely(id != ssk->rx_tail)) { + WARN_ON_UNLOCKED(&ssk->isk.sk, &ssk->rx_ring.lock); + + if (unlikely(id != ring_tail(ssk->rx_ring))) { printk(KERN_WARNING "Bogus recv completion id %d tail %d\n", - id, ssk->rx_tail); + id, ring_tail(ssk->rx_ring)); return NULL; } dev = ssk->ib_device; - rx_req = &ssk->rx_ring[id & (SDP_RX_SIZE - 1)]; + rx_req = &ssk->rx_ring.buffer[id & (SDP_RX_SIZE - 1)]; skb = rx_req->skb; ib_dma_unmap_single(dev, rx_req->mapping[0], SDP_HEAD_SIZE, DMA_FROM_DEVICE); @@ -390,67 +407,20 @@ static struct sk_buff *sdp_recv_completion(struct sdp_sock *ssk, int id) ib_dma_unmap_page(dev, rx_req->mapping[i + 1], skb_shinfo(skb)->frags[i].size, DMA_FROM_DEVICE); - ++ssk->rx_tail; - --ssk->remote_credits; + atomic_inc(&ssk->rx_ring.tail); + atomic_dec(&ssk->remote_credits); return skb; } -static int sdp_process_rx_wc(struct sdp_sock *ssk, struct ib_wc *wc) +/* this must be called while sock_lock is taken */ +static int sdp_process_rx_skb(struct sdp_sock *ssk, struct sk_buff *skb) { struct sock *sk = &ssk->isk.sk; int frags; - struct sk_buff *skb; struct sdp_bsdh *h; int pagesz, i; - skb = sdp_recv_completion(ssk, wc->wr_id); - if (unlikely(!skb)) - return -1; - - sdp_prf(sk, skb, "recv completion"); - - atomic_sub(SDP_MAX_SEND_SKB_FRAGS, &sdp_current_mem_usage); - - if (unlikely(wc->status)) { - if (wc->status != IB_WC_WR_FLUSH_ERR) { - sdp_dbg(sk, "Recv completion with error. Status %d\n", - wc->status); - sdp_reset(sk); - } - __kfree_skb(skb); - return 0; - } - - sdp_dbg_data(sk, "Recv completion. ID %d Length %d\n", - (int)wc->wr_id, wc->byte_len); - if (unlikely(wc->byte_len < sizeof(struct sdp_bsdh))) { - printk(KERN_WARNING "SDP BUG! byte_len %d < %zd\n", - wc->byte_len, sizeof(struct sdp_bsdh)); - __kfree_skb(skb); - return -1; - } - skb->len = wc->byte_len; - if (likely(wc->byte_len > SDP_HEAD_SIZE)) - skb->data_len = wc->byte_len - SDP_HEAD_SIZE; - else - skb->data_len = 0; - skb->data = skb->head; -#ifdef NET_SKBUFF_DATA_USES_OFFSET - skb->tail = skb_headlen(skb); -#else - skb->tail = skb->head + skb_headlen(skb); -#endif h = (struct sdp_bsdh *)skb->data; - SDP_DUMP_PACKET(&ssk->isk.sk, "RX", skb, h); - skb_reset_transport_header(skb); - ssk->mseq_ack = ntohl(h->mseq); - if (ssk->mseq_ack != (int)wc->wr_id) - printk(KERN_WARNING "SDP BUG! mseq %d != wrid %d\n", - ssk->mseq_ack, (int)wc->wr_id); - - SDPSTATS_HIST_LINEAR(credits_before_update, ssk->tx_ring.credits); - ssk->tx_ring.credits = ntohl(h->mseq_ack) - ssk->tx_ring.head + 1 + - ntohs(h->bufs); frags = skb_shinfo(skb)->nr_frags; pagesz = PAGE_ALIGN(skb->data_len); @@ -513,13 +483,105 @@ static int sdp_process_rx_wc(struct sdp_sock *ssk, struct ib_wc *wc) return 0; } -int sdp_poll_rx_cq(struct sdp_sock *ssk) +/* called only from irq */ +static struct sk_buff *sdp_process_rx_wc(struct sdp_sock *ssk, struct ib_wc *wc) { - struct ib_cq *cq = ssk->rx_cq; + struct sk_buff *skb; + struct sdp_bsdh *h; + struct sock *sk = &ssk->isk.sk; + int credits_before; + + skb = sdp_recv_completion(ssk, wc->wr_id); + if (unlikely(!skb)) + return NULL; + + atomic_sub(SDP_MAX_SEND_SKB_FRAGS, &sdp_current_mem_usage); + + if (unlikely(wc->status)) { + if (wc->status != IB_WC_WR_FLUSH_ERR) { + sdp_warn(sk, "Recv completion with error. Status %d\n", + wc->status); + sdp_reset(sk); + } + __kfree_skb(skb); + return NULL; + } + + sdp_dbg_data(sk, "Recv completion. ID %d Length %d\n", + (int)wc->wr_id, wc->byte_len); + if (unlikely(wc->byte_len < sizeof(struct sdp_bsdh))) { + printk(KERN_WARNING "SDP BUG! byte_len %d < %zd\n", + wc->byte_len, sizeof(struct sdp_bsdh)); + __kfree_skb(skb); + return NULL; + } + skb->len = wc->byte_len; + if (likely(wc->byte_len > SDP_HEAD_SIZE)) + skb->data_len = wc->byte_len - SDP_HEAD_SIZE; + else + skb->data_len = 0; + skb->data = skb->head; +#ifdef NET_SKBUFF_DATA_USES_OFFSET + skb->tail = skb_headlen(skb); +#else + skb->tail = skb->head + skb_headlen(skb); +#endif + h = (struct sdp_bsdh *)skb->data; + SDP_DUMP_PACKET(&ssk->isk.sk, "RX", skb, h); + skb_reset_transport_header(skb); + atomic_set(&ssk->mseq_ack, ntohl(h->mseq)); + if (mseq_ack(ssk) != (int)wc->wr_id) + printk(KERN_WARNING "SDP BUG! mseq %d != wrid %d\n", + mseq_ack(ssk), (int)wc->wr_id); + + SDPSTATS_HIST_LINEAR(credits_before_update, tx_credits(ssk)); + + credits_before = tx_credits(ssk); + atomic_set(&ssk->tx_ring.credits, ntohl(h->mseq_ack) - ring_head(ssk->tx_ring) + 1 + + ntohs(h->bufs)); + + sdp_prf(&ssk->isk.sk, skb, "RX %s bufs=%d c before:%d after:%d " + "mseq:%d, ack:%d", mid2str(h->mid), ntohs(h->bufs), credits_before, + tx_credits(ssk), ntohl(h->mseq), ntohl(h->mseq_ack)); + + return skb; +} + +/* like sk_stream_write_space - execpt measures remote credits */ +static void sdp_bzcopy_write_space(struct sdp_sock *ssk) +{ + struct sock *sk = &ssk->isk.sk; + struct socket *sock = sk->sk_socket; + + if (sk->sk_sleep && waitqueue_active(sk->sk_sleep)) { + sdp_prf(&ssk->isk.sk, NULL, "credits: %d, min_bufs: %d. tx_head: %d, tx_tail: %d", + tx_credits(ssk), ssk->min_bufs, + ring_head(ssk->tx_ring), ring_tail(ssk->tx_ring)); + } + + if (tx_credits(ssk) >= ssk->min_bufs && + ring_head(ssk->tx_ring) == ring_tail(ssk->tx_ring) && + sock != NULL) { + clear_bit(SOCK_NOSPACE, &sock->flags); + + if (sk->sk_sleep && waitqueue_active(sk->sk_sleep)) + wake_up_interruptible(sk->sk_sleep); + if (sock->fasync_list && !(sk->sk_shutdown & SEND_SHUTDOWN)) + sock_wake_async(sock, 2, POLL_OUT); + } +} + +/* only from interrupt. + * drain rx cq into rx_backlog queue */ +static int sdp_poll_rx_cq(struct sdp_sock *ssk) +{ + struct ib_cq *cq = ssk->rx_ring.cq; struct ib_wc ibwc[SDP_NUM_WC]; int n, i; - int ret = -EAGAIN; int wc_processed = 0; + struct sk_buff *skb; + + WARN_ON_UNLOCKED(&ssk->isk.sk, &ssk->rx_ring.lock); do { n = ib_poll_cq(cq, SDP_NUM_WC, ibwc); @@ -527,102 +589,224 @@ int sdp_poll_rx_cq(struct sdp_sock *ssk) struct ib_wc *wc = &ibwc[i]; BUG_ON(!(wc->wr_id & SDP_OP_RECV)); - sdp_process_rx_wc(ssk, wc); + skb = sdp_process_rx_wc(ssk, wc); + if (!skb) + continue; + skb_queue_tail(&ssk->rx_backlog, skb); wc_processed++; - - if (credit_update_needed(ssk, wc_processed)) { - sdp_prf(&ssk->isk.sk, NULL, "credit update. remote_credits: %d, avail now: %d processed: %d", - ssk->remote_credits, - ssk->rx_head - ssk->rx_tail, - wc_processed); - sdp_post_recvs(ssk); - if (sdp_post_credits(ssk)) - wc_processed = 0; - } - - ret = 0; } } while (n == SDP_NUM_WC); - if (!ret) { - struct sock *sk = &ssk->isk.sk; + if (wc_processed) + sdp_bzcopy_write_space(ssk); - sdp_post_recvs(ssk); + return wc_processed; +} - /* update credits */ - sdp_post_sends(ssk, 0); +int sdp_process_rx_q(struct sdp_sock *ssk) +{ + struct sk_buff *skb; + struct sock *sk = &ssk->isk.sk; + unsigned long flags; - if (sk->sk_sleep && waitqueue_active(sk->sk_sleep)) - sk_stream_write_space(&ssk->isk.sk); - } else { + if (!ssk->rx_backlog.next || !ssk->rx_backlog.prev) { + sdp_warn(&ssk->isk.sk, "polling a zeroed rx_backlog!!!! %p\n", &ssk->rx_backlog); + return 0; + } + + if (skb_queue_empty(&ssk->rx_backlog)) { SDPSTATS_COUNTER_INC(rx_poll_miss); + return -EAGAIN; } - return ret; + /* update credits */ + sdp_post_sends(ssk, 0); + + spin_lock_irqsave(&ssk->rx_backlog.lock, flags); + while ((skb = __skb_dequeue(&ssk->rx_backlog))) { + sdp_process_rx_skb(ssk, skb); + } + spin_unlock_irqrestore(&ssk->rx_backlog.lock, flags); + + if (sk->sk_sleep && waitqueue_active(sk->sk_sleep)) + sk_stream_write_space(&ssk->isk.sk); + + return 0; } -void sdp_rx_comp_work(struct work_struct *work) +static void sdp_rx_comp_work(struct work_struct *work) { struct sdp_sock *ssk = container_of(work, struct sdp_sock, rx_comp_work); struct sock *sk = &ssk->isk.sk; struct ib_cq *rx_cq; lock_sock(sk); - rx_cq = ssk->rx_cq; + rx_cq = ssk->rx_ring.cq; if (unlikely(!rx_cq)) goto out; if (unlikely(!ssk->poll_cq)) { struct rdma_cm_id *id = ssk->id; + sdp_warn(sk, "poll cq is 0. socket was reset or wasn't initialized\n"); if (id && id->qp) rdma_notify(id, RDMA_CM_EVENT_ESTABLISHED); goto out; } - sdp_poll_rx_cq(ssk); + sdp_process_rx_q(ssk); sdp_xmit_poll(ssk, 1); /* if has pending tx because run out of tx_credits - xmit it */ release_sock(sk); sk_stream_mem_reclaim(sk); lock_sock(sk); - rx_cq = ssk->rx_cq; + rx_cq = ssk->rx_ring.cq; if (unlikely(!rx_cq)) goto out; - sdp_arm_rx_cq(sk); - sdp_poll_rx_cq(ssk); + sdp_process_rx_q(ssk); sdp_xmit_poll(ssk, 1); + out: release_sock(sk); } -void sdp_rx_irq(struct ib_cq *cq, void *cq_context) +static void sdp_rx_irq(struct ib_cq *cq, void *cq_context) { struct sock *sk = cq_context; struct sdp_sock *ssk = sdp_sk(sk); + unsigned long flags; + int wc_processed = 0; - WARN_ON(ssk->rx_cq && cq != ssk->rx_cq); + sdp_dbg_data(&ssk->isk.sk, "rx irq called\n"); - if (!ssk->rx_cq) - sdp_warn(&ssk->isk.sk, "WARNING: rx irq after cq destroyed\n"); + WARN_ON(cq != ssk->rx_ring.cq); SDPSTATS_COUNTER_INC(rx_int_count); - sdp_prf(sk, NULL, "rx completion"); + sdp_prf(sk, NULL, "rx irq"); + + rx_ring_lock(ssk, flags); + + if (unlikely(!ssk->poll_cq)) + sdp_warn(sk, "poll cq is 0. socket was reset or wasn't initialized\n"); + + if (!ssk->rx_ring.cq) { + sdp_warn(&ssk->isk.sk, "WARNING: rx irq after cq destroyed\n"); + + goto out; + } + + wc_processed = sdp_poll_rx_cq(ssk); + sdp_prf(&ssk->isk.sk, NULL, "processed %d", wc_processed); + + if (wc_processed) { + _sdp_post_recvs(ssk); + + /* Best was to send credit update from here */ +/* sdp_post_credits(ssk); */ - /* issue sdp_rx_comp_work() */ - queue_work(rx_comp_wq, &ssk->rx_comp_work); + /* issue sdp_rx_comp_work() */ + queue_work(rx_comp_wq, &ssk->rx_comp_work); + } + + sdp_arm_rx_cq(sk); + +out: + rx_ring_unlock(ssk, flags); } -void sdp_rx_ring_purge(struct sdp_sock *ssk) +static void sdp_rx_ring_purge(struct sdp_sock *ssk) { - struct sk_buff *skb; + WARN_ON_UNLOCKED(&ssk->isk.sk, &ssk->rx_ring.lock); - while (ssk->rx_head != ssk->rx_tail) { + while (ring_posted(ssk->rx_ring) > 0) { struct sk_buff *skb; - skb = sdp_recv_completion(ssk, ssk->rx_tail); + skb = sdp_recv_completion(ssk, ring_tail(ssk->rx_ring)); if (!skb) break; atomic_sub(SDP_MAX_SEND_SKB_FRAGS, &sdp_current_mem_usage); __kfree_skb(skb); } } + +void sdp_rx_ring_init(struct sdp_sock *ssk) +{ + ssk->rx_ring.buffer = NULL; + spin_lock_init(&ssk->rx_ring.lock); +} + +static void sdp_rx_cq_event_handler(struct ib_event *event, void *data) +{ +} + +int sdp_rx_ring_create(struct sdp_sock *ssk, struct ib_device *device) +{ + struct ib_cq *rx_cq; + int rc = 0; + unsigned long flags; + + rx_ring_lock(ssk, flags); + + atomic_set(&ssk->rx_ring.head, 1); + atomic_set(&ssk->rx_ring.tail, 1); + + ssk->rx_ring.buffer = kmalloc(sizeof *ssk->rx_ring.buffer * SDP_RX_SIZE, + GFP_KERNEL); + if (!ssk->rx_ring.buffer) { + rc = -ENOMEM; + sdp_warn(&ssk->isk.sk, "Unable to allocate RX Ring size %zd.\n", + sizeof(*ssk->rx_ring.buffer) * SDP_RX_SIZE); + + goto out; + } + + rx_cq = ib_create_cq(device, sdp_rx_irq, sdp_rx_cq_event_handler, + &ssk->isk.sk, SDP_RX_SIZE, 0); + + if (IS_ERR(rx_cq)) { + rc = PTR_ERR(rx_cq); + sdp_warn(&ssk->isk.sk, "Unable to allocate RX CQ: %d.\n", rc); + goto err_cq; + } + + rc = ib_modify_cq(rx_cq, 10, 200); + if (rc) { + sdp_warn(&ssk->isk.sk, "Unable to modify RX CQ: %d.\n", rc); + goto err_mod; + } + sdp_warn(&ssk->isk.sk, "Initialized CQ moderation\n"); + sdp_sk(&ssk->isk.sk)->rx_ring.cq = rx_cq; + + INIT_WORK(&ssk->rx_comp_work, sdp_rx_comp_work); + + sdp_arm_rx_cq(&ssk->isk.sk); + + goto out; + +err_mod: + ib_destroy_cq(rx_cq); +err_cq: + kfree(ssk->rx_ring.buffer); + ssk->rx_ring.buffer = NULL; +out: + rx_ring_unlock(ssk, flags); + return rc; +} + +void sdp_rx_ring_destroy(struct sdp_sock *ssk) +{ + WARN_ON_UNLOCKED(&ssk->isk.sk, &ssk->rx_ring.lock); + + if (ssk->rx_ring.buffer) { + sdp_rx_ring_purge(ssk); + + kfree(ssk->rx_ring.buffer); + ssk->rx_ring.buffer = NULL; + } + + if (ssk->rx_ring.cq) { + ib_destroy_cq(ssk->rx_ring.cq); + ssk->rx_ring.cq = NULL; + } + + WARN_ON(ring_head(ssk->rx_ring) != ring_tail(ssk->rx_ring)); +} diff --git a/drivers/infiniband/ulp/sdp/sdp_tx.c b/drivers/infiniband/ulp/sdp/sdp_tx.c index 5e6a2dc28bf5..de9d7920b839 100644 --- a/drivers/infiniband/ulp/sdp/sdp_tx.c +++ b/drivers/infiniband/ulp/sdp/sdp_tx.c @@ -55,8 +55,11 @@ int sdp_xmit_poll(struct sdp_sock *ssk, int force) mod_timer(&ssk->tx_ring.timer, jiffies + SDP_TX_POLL_TIMEOUT); /* Poll the CQ every SDP_TX_POLL_MODER packets */ - if (force || (++ssk->tx_ring.poll_cnt & (SDP_TX_POLL_MODER - 1)) == 0) + if (force || (++ssk->tx_ring.poll_cnt & (SDP_TX_POLL_MODER - 1)) == 0) { wc_processed = sdp_process_tx_cq(ssk); + sdp_prf(&ssk->isk.sk, NULL, "processed %d wc's. inflight=%d", wc_processed, + ring_posted(ssk->tx_ring)); + } return wc_processed; } @@ -65,24 +68,16 @@ void sdp_post_send(struct sdp_sock *ssk, struct sk_buff *skb, u8 mid) { struct sdp_buf *tx_req; struct sdp_bsdh *h = (struct sdp_bsdh *)skb_push(skb, sizeof *h); - unsigned mseq = ssk->tx_ring.head; + unsigned mseq = ring_head(ssk->tx_ring); int i, rc, frags; u64 addr; struct ib_device *dev; - struct ib_sge *sge; struct ib_send_wr *bad_wr; -#define ENUM2STR(e) [e] = #e - static char *mid2str[] = { - ENUM2STR(SDP_MID_HELLO), - ENUM2STR(SDP_MID_HELLO_ACK), - ENUM2STR(SDP_MID_DISCONN), - ENUM2STR(SDP_MID_CHRCVBUF), - ENUM2STR(SDP_MID_CHRCVBUF_ACK), - ENUM2STR(SDP_MID_DATA), - }; - sdp_prf(&ssk->isk.sk, skb, "post_send mid = %s, bufs = %d", - mid2str[mid], ssk->rx_head - ssk->rx_tail); + struct ib_sge ibsge[SDP_MAX_SEND_SKB_FRAGS + 1]; + struct ib_sge *sge = ibsge; + struct ib_send_wr tx_wr = { 0 }; + SDPSTATS_COUNTER_MID_INC(post_send, mid); SDPSTATS_HIST(send_size, skb->len); @@ -93,16 +88,19 @@ void sdp_post_send(struct sdp_sock *ssk, struct sk_buff *skb, u8 mid) else h->flags = 0; - h->bufs = htons(ssk->rx_head - ssk->rx_tail); + h->bufs = htons(ring_posted(ssk->rx_ring)); h->len = htonl(skb->len); h->mseq = htonl(mseq); - h->mseq_ack = htonl(ssk->mseq_ack); + h->mseq_ack = htonl(mseq_ack(ssk)); + + sdp_prf(&ssk->isk.sk, skb, "TX: %s bufs: %d mseq:%d ack:%d", + mid2str(mid), ring_posted(ssk->rx_ring), mseq, ntohl(h->mseq_ack)); SDP_DUMP_PACKET(&ssk->isk.sk, "TX", skb, h); + tx_req = &ssk->tx_ring.buffer[mseq & (SDP_TX_SIZE - 1)]; tx_req->skb = skb; dev = ssk->ib_device; - sge = ssk->ibsge; addr = ib_dma_map_single(dev, skb->data, skb->len - skb->data_len, DMA_TO_DEVICE); tx_req->mapping[0] = addr; @@ -127,14 +125,14 @@ void sdp_post_send(struct sdp_sock *ssk, struct sk_buff *skb, u8 mid) sge->lkey = ssk->mr->lkey; } - ssk->tx_wr.next = NULL; - ssk->tx_wr.wr_id = ssk->tx_ring.head | SDP_OP_SEND; - ssk->tx_wr.sg_list = ssk->ibsge; - ssk->tx_wr.num_sge = frags + 1; - ssk->tx_wr.opcode = IB_WR_SEND; - ssk->tx_wr.send_flags = IB_SEND_SIGNALED; + tx_wr.next = NULL; + tx_wr.wr_id = ring_head(ssk->tx_ring) | SDP_OP_SEND; + tx_wr.sg_list = ibsge; + tx_wr.num_sge = frags + 1; + tx_wr.opcode = IB_WR_SEND; + tx_wr.send_flags = IB_SEND_SIGNALED; if (unlikely(TCP_SKB_CB(skb)->flags & TCPCB_URG)) - ssk->tx_wr.send_flags |= IB_SEND_SOLICITED; + tx_wr.send_flags |= IB_SEND_SOLICITED; { static unsigned long last_send = 0; @@ -145,19 +143,15 @@ void sdp_post_send(struct sdp_sock *ssk, struct sk_buff *skb, u8 mid) last_send = jiffies; } - rc = ib_post_send(ssk->qp, &ssk->tx_wr, &bad_wr); - ++ssk->tx_ring.head; - --ssk->tx_ring.credits; - ssk->remote_credits = ssk->rx_head - ssk->rx_tail; + rc = ib_post_send(ssk->qp, &tx_wr, &bad_wr); + atomic_inc(&ssk->tx_ring.head); + atomic_dec(&ssk->tx_ring.credits); + atomic_set(&ssk->remote_credits, ring_posted(ssk->rx_ring)); if (unlikely(rc)) { sdp_dbg(&ssk->isk.sk, "ib_post_send failed with status %d.\n", rc); sdp_set_error(&ssk->isk.sk, -ECONNRESET); wake_up(&ssk->wq); } - - if (ssk->tx_ring.credits <= SDP_MIN_TX_CREDITS) { - sdp_poll_rx_cq(ssk); - } } static struct sk_buff *sdp_send_completion(struct sdp_sock *ssk, int mseq) @@ -168,9 +162,9 @@ static struct sk_buff *sdp_send_completion(struct sdp_sock *ssk, int mseq) struct bzcopy_state *bz; int i, frags; struct sdp_tx_ring *tx_ring = &ssk->tx_ring; - if (unlikely(mseq != tx_ring->tail)) { + if (unlikely(mseq != ring_tail(*tx_ring))) { printk(KERN_WARNING "Bogus send completion id %d tail %d\n", - mseq, tx_ring->tail); + mseq, ring_tail(*tx_ring)); goto out; } @@ -193,7 +187,7 @@ static struct sk_buff *sdp_send_completion(struct sdp_sock *ssk, int mseq) if (bz) bz->busy--; - ++tx_ring->tail; + atomic_inc(&tx_ring->tail); out: return skb; @@ -219,7 +213,11 @@ static int sdp_handle_send_comp(struct sdp_sock *ssk, struct ib_wc *wc) } } - sdp_prf(&ssk->isk.sk, skb, "tx completion"); + { + struct sdp_bsdh *h = (struct sdp_bsdh *)skb->data; + sdp_prf(&ssk->isk.sk, skb, "tx completion. mseq:%d", ntohl(h->mseq)); + } + sk_stream_free_skb(&ssk->isk.sk, skb); return 0; @@ -281,15 +279,14 @@ static int sdp_process_tx_cq(struct sdp_sock *ssk) return wc_processed; } -void sdp_poll_tx_cq(unsigned long data) +static void sdp_poll_tx_timeout(unsigned long data) { struct sdp_sock *ssk = (struct sdp_sock *)data; struct sock *sk = &ssk->isk.sk; u32 inflight, wc_processed; - sdp_dbg_data(&ssk->isk.sk, "Polling tx cq. inflight=%d\n", - (u32) ssk->tx_ring.head - ssk->tx_ring.tail); + (u32) ring_posted(ssk->tx_ring)); /* Only process if the socket is not in use */ bh_lock_sock(sk); @@ -303,12 +300,14 @@ void sdp_poll_tx_cq(unsigned long data) goto out; wc_processed = sdp_process_tx_cq(ssk); + sdp_prf(&ssk->isk.sk, NULL, "processed %d wc's. inflight=%d", wc_processed, + ring_posted(ssk->tx_ring)); if (!wc_processed) SDPSTATS_COUNTER_INC(tx_poll_miss); else SDPSTATS_COUNTER_INC(tx_poll_hit); - inflight = (u32) ssk->tx_ring.head - ssk->tx_ring.tail; + inflight = (u32) ring_posted(ssk->tx_ring); /* If there are still packets in flight and the timer has not already * been scheduled by the Tx routine then schedule it here to guarantee @@ -322,17 +321,7 @@ out: bh_unlock_sock(sk); } -void _sdp_poll_tx_cq(unsigned long data) -{ - struct sdp_sock *ssk = (struct sdp_sock *)data; - struct sock *sk = &ssk->isk.sk; - - sdp_prf(sk, NULL, "sdp poll tx timeout expired"); - - sdp_poll_tx_cq(data); -} - -void sdp_tx_irq(struct ib_cq *cq, void *cq_context) +static void sdp_tx_irq(struct ib_cq *cq, void *cq_context) { struct sock *sk = cq_context; struct sdp_sock *ssk = sdp_sk(sk); @@ -344,11 +333,9 @@ void sdp_tx_irq(struct ib_cq *cq, void *cq_context) void sdp_tx_ring_purge(struct sdp_sock *ssk) { - struct sk_buff *skb; - - while (ssk->tx_ring.head != ssk->tx_ring.tail) { + while (ring_posted(ssk->tx_ring)) { struct sk_buff *skb; - skb = sdp_send_completion(ssk, ssk->tx_ring.tail); + skb = sdp_send_completion(ssk, ring_tail(ssk->tx_ring)); if (!skb) break; __kfree_skb(skb); @@ -380,3 +367,66 @@ void sdp_post_keepalive(struct sdp_sock *ssk) sdp_cnt(sdp_keepalive_probes_sent); } +static void sdp_tx_cq_event_handler(struct ib_event *event, void *data) +{ +} + +int sdp_tx_ring_create(struct sdp_sock *ssk, struct ib_device *device) +{ + struct ib_cq *tx_cq; + int rc = 0; + + atomic_set(&ssk->tx_ring.head, 1); + atomic_set(&ssk->tx_ring.tail, 1); + + ssk->tx_ring.buffer = kmalloc(sizeof *ssk->tx_ring.buffer * SDP_TX_SIZE, + GFP_KERNEL); + if (!ssk->tx_ring.buffer) { + rc = -ENOMEM; + sdp_warn(&ssk->isk.sk, "Unable to allocate TX Ring size %zd.\n", + sizeof(*ssk->tx_ring.buffer) * SDP_TX_SIZE); + + goto out; + } + + tx_cq = ib_create_cq(device, sdp_tx_irq, sdp_tx_cq_event_handler, + &ssk->isk.sk, SDP_TX_SIZE, 0); + + if (IS_ERR(tx_cq)) { + rc = PTR_ERR(tx_cq); + sdp_warn(&ssk->isk.sk, "Unable to allocate TX CQ: %d.\n", rc); + goto err_cq; + } + + sdp_sk(&ssk->isk.sk)->tx_ring.cq = tx_cq; + + init_timer(&ssk->tx_ring.timer); + ssk->tx_ring.timer.function = sdp_poll_tx_timeout; + ssk->tx_ring.timer.data = (unsigned long) ssk; + ssk->tx_ring.poll_cnt = 0; + + return 0; + +err_cq: + kfree(ssk->tx_ring.buffer); + ssk->tx_ring.buffer = NULL; +out: + return rc; +} + +void sdp_tx_ring_destroy(struct sdp_sock *ssk) +{ + if (ssk->tx_ring.buffer) { + sdp_tx_ring_purge(ssk); + + kfree(ssk->tx_ring.buffer); + ssk->tx_ring.buffer = NULL; + } + + if (ssk->tx_ring.cq) { + ib_destroy_cq(ssk->tx_ring.cq); + ssk->tx_ring.cq = NULL; + } + + WARN_ON(ring_head(ssk->tx_ring) != ring_tail(ssk->tx_ring)); +} -- 2.50.1