From: Amir Vadai Date: Sun, 12 Jul 2009 12:32:36 +0000 (+0300) Subject: sdp: Add support for ZCopy combined mode - RDMA Read X-Git-Tag: v4.1.12-92~264^2~5^2~260 X-Git-Url: https://www.infradead.org/git/?a=commitdiff_plain;h=4c9bea1d076145d8456cfc4327d2ed4c35e3696c;p=users%2Fjedix%2Flinux-maple.git sdp: Add support for ZCopy combined mode - RDMA Read Signed-off-by: Amir Vadai --- diff --git a/drivers/infiniband/ulp/sdp/Makefile b/drivers/infiniband/ulp/sdp/Makefile index b14a16a4407a7..5c250e9d9dac4 100644 --- a/drivers/infiniband/ulp/sdp/Makefile +++ b/drivers/infiniband/ulp/sdp/Makefile @@ -3,4 +3,4 @@ EXTRA_CFLAGS += -ggdb obj-$(CONFIG_INFINIBAND_SDP) += ib_sdp.o -ib_sdp-objs := sdp_main.o sdp_cma.o sdp_bcopy.o sdp_proc.o sdp_tx.o sdp_rx.o +ib_sdp-objs := sdp_main.o sdp_cma.o sdp_bcopy.o sdp_proc.o sdp_tx.o sdp_rx.o sdp_zcopy.o diff --git a/drivers/infiniband/ulp/sdp/sdp.h b/drivers/infiniband/ulp/sdp/sdp.h index ed05c0e571ee5..0bb249bffc79f 100644 --- a/drivers/infiniband/ulp/sdp/sdp.h +++ b/drivers/infiniband/ulp/sdp/sdp.h @@ -9,7 +9,7 @@ #include #define SDPSTATS_ON -/* #define SDP_PROFILING */ +#define SDP_PROFILING #define _sdp_printk(func, line, level, sk, format, arg...) do { \ preempt_disable(); \ @@ -25,14 +25,6 @@ #define sdp_warn(sk, format, arg...) \ sdp_printk(KERN_WARNING, sk, format , ## arg) -#define rx_ring_lock(ssk, f) do { \ - spin_lock_irqsave(&ssk->rx_ring.lock, f); \ -} while (0) - -#define rx_ring_unlock(ssk, f) do { \ - spin_unlock_irqrestore(&ssk->rx_ring.lock, f); \ -} while (0) - #define SDP_MODPARAM_SINT(var, def_val, msg) \ static int var = def_val; \ module_param_named(var, var, int, 0644); \ @@ -88,8 +80,8 @@ static inline unsigned long long current_nsec(void) preempt_enable(); \ 1; \ }) -#define sdp_prf(sk, s, format, arg...) -/* #define sdp_prf(sk, s, format, arg...) sdp_prf1(sk, s, format, ## arg) */ +//#define sdp_prf(sk, s, format, arg...) +#define sdp_prf(sk, s, format, arg...) sdp_prf1(sk, s, format, ## arg) #else #define sdp_prf1(sk, s, format, arg...) @@ -102,7 +94,7 @@ extern int sdp_debug_level; #define sdp_dbg(sk, format, arg...) \ do { \ if (sdp_debug_level > 0) \ - sdp_printk(KERN_DEBUG, sk, format , ## arg); \ + sdp_printk(KERN_WARNING, sk, format , ## arg); \ } while (0) #define sock_ref(sk, msg, sock_op) ({ \ @@ -136,7 +128,7 @@ extern int sdp_data_debug_level; #define sdp_dbg_data(sk, format, arg...) \ do { \ if (sdp_data_debug_level & 0x2) \ - sdp_printk(KERN_DEBUG, sk, format , ## arg); \ + sdp_printk(KERN_WARNING, sk, format , ## arg); \ } while (0) #define SDP_DUMP_PACKET(sk, str, skb, h) \ do { \ @@ -148,12 +140,42 @@ extern int sdp_data_debug_level; #define SDP_DUMP_PACKET(sk, str, skb, h) #endif +#if 0 +#define lock_sock(sk) do { \ + sdp_dbg_data(sk, "lock_sock: before lock\n"); \ + lock_sock(sk); \ + sdp_dbg_data(sk, "lock_sock: locked\n"); \ +} while (0) + +#define release_sock(sk) do { \ + sdp_dbg_data(sk, "release_sock\n"); \ + release_sock(sk); \ +} while (0) + + +#undef sk_wait_event + +#define sk_wait_event(__sk, __timeo, __condition) \ +({ int rc; \ + release_sock(__sk); \ + rc = __condition; \ + if (!rc) { \ + *(__timeo) = schedule_timeout(*(__timeo)); \ + } \ + lock_sock(__sk); \ + rc = __condition; \ + rc; \ +}) + +#endif + #ifdef SDPSTATS_ON struct sdpstats { u32 post_send[256]; u32 sendmsg_bcopy_segment; u32 sendmsg_bzcopy_segment; + u32 sendmsg_zcopy_segment; u32 sendmsg; u32 post_send_credits; u32 sendmsg_nagle_skip; @@ -219,6 +241,8 @@ static inline void sdpstats_hist(u32 *h, u32 val, u32 maxidx, int is_log) #define SDP_TX_POLL_TIMEOUT (HZ / 4) #define SDP_NAGLE_TIMEOUT (HZ / 10) +#define SDP_SRCAVAIL_CANCEL_TIMEOUT (HZ * 5) + #define SDP_RESOLVE_TIMEOUT 1000 #define SDP_ROUTE_TIMEOUT 1000 #define SDP_RETRY_COUNT 5 @@ -228,8 +252,13 @@ static inline void sdpstats_hist(u32 *h, u32 val, u32 maxidx, int is_log) #define SDP_TX_SIZE 0x40 #define SDP_RX_SIZE 0x40 +#define SDP_FMR_SIZE 256 +#define SDP_FMR_POOL_SIZE 1024 +#define SDP_FMR_DIRTY_SIZE ( SDP_FMR_POOL_SIZE / 4 ) + #define SDP_MAX_RECV_SKB_FRAGS (PAGE_SIZE > 0x8000 ? 1 : 0x8000 / PAGE_SIZE) #define SDP_MAX_SEND_SKB_FRAGS (SDP_MAX_RECV_SKB_FRAGS + 1) +#define SDP_MAX_SEND_SGES 32 #define SDP_HEAD_SIZE (PAGE_SIZE / 2 + sizeof(struct sdp_bsdh)) #define SDP_NUM_WC 4 #define SDP_MAX_PAYLOAD ((1 << 16) - SDP_HEAD_SIZE) @@ -239,6 +268,8 @@ static inline void sdpstats_hist(u32 *h, u32 val, u32 maxidx, int is_log) #define SDP_OP_RECV 0x800000000LL #define SDP_OP_SEND 0x400000000LL +#define SDP_OP_RDMA 0x200000000LL +#define SDP_OP_NOP 0x100000000LL /* how long (in jiffies) to block sender till tx completion*/ #define SDP_BZCOPY_POLL_TIMEOUT (HZ / 10) @@ -246,7 +277,25 @@ static inline void sdpstats_hist(u32 *h, u32 val, u32 maxidx, int is_log) #define SDP_AUTO_CONF 0xffff #define AUTO_MOD_DELAY (HZ / 4) -#define BZCOPY_STATE(skb) (*(struct bzcopy_state **)(skb->cb)) +struct bzcopy_state; +struct rx_srcavail_state; + +struct sdp_skb_cb { + __u32 seq; /* Starting sequence number */ + __u32 end_seq; /* SEQ + FIN + SYN + datalen */ + __u8 flags; /* TCP header flags. */ + struct bzcopy_state *bz; + struct rx_srcavail_state *rx_sa; + struct tx_srcavail_state *tx_sa; +}; + +#define BZCOPY_STATE(skb) (((struct sdp_skb_cb *)(skb->cb))->bz) +#define RX_SRCAVAIL_STATE(skb) (((struct sdp_skb_cb *)(skb->cb))->rx_sa) +#define TX_SRCAVAIL_STATE(skb) (((struct sdp_skb_cb *)(skb->cb))->tx_sa) + +#define SDP_SKB_CB(__skb) ((struct sdp_skb_cb *)&((__skb)->cb[0])) +#undef TCP_SKB_CB + #ifndef MIN #define MIN(a, b) (a < b ? a : b) #endif @@ -259,8 +308,13 @@ enum sdp_mid { SDP_MID_HELLO = 0x0, SDP_MID_HELLO_ACK = 0x1, SDP_MID_DISCONN = 0x2, + SDP_MID_SENDSM = 0x4, + SDP_MID_RDMARDCOMPL = 0x6, + SDP_MID_SRCAVAIL_CANCEL = 0x8, SDP_MID_CHRCVBUF = 0xB, SDP_MID_CHRCVBUF_ACK = 0xC, + SDP_MID_SRCAVAIL = 0xFD, + SDP_MID_SINKAVAIL = 0xFE, SDP_MID_DATA = 0xFF, }; @@ -324,27 +378,88 @@ struct sdp_hah { __u32 actrcvsz; }; +struct sdp_rrch { + __u32 len; +}; + +struct sdp_srcah { + __u32 len; + __u32 rkey; + __u64 vaddr; +}; + struct sdp_buf { struct sk_buff *skb; u64 mapping[SDP_MAX_SEND_SKB_FRAGS + 1]; }; +/* Context used for synchronous zero copy bcopy (BZCOPY) */ +struct bzcopy_state { + unsigned char __user *u_base; + int u_len; + int left; + int page_cnt; + int cur_page; + int cur_offset; + int busy; + struct sdp_sock *ssk; + struct page **pages; +}; + +struct rx_srcavail_state { + /* Advertised buffer stuff */ + u32 mseq; + u32 used; + u32 len; + u32 rkey; + u64 vaddr; + + /* Dest buff info */ + u32 page_cnt; + struct page **pages; + struct ib_sge *sge; + + /* Utility */ + u8 busy; + u8 aborted; +}; + +struct tx_srcavail_state { + u8 busy; + + u32 page_cnt; + struct page **pages; + u64 *addrs; + struct ib_pool_fmr *fmr; + u32 bytes_completed; + u32 bytes_total; + + u8 abort; + u32 mseq; +}; + #define ring_head(ring) (atomic_read(&(ring).head)) #define ring_tail(ring) (atomic_read(&(ring).tail)) #define ring_posted(ring) (ring_head(ring) - ring_tail(ring)) -struct sdp_tx_ring { - struct sdp_buf *buffer; - atomic_t head; - atomic_t tail; - struct ib_cq *cq; +#define rx_ring_posted(ssk) ring_posted(ssk->rx_ring) +#define tx_ring_posted(ssk) (ring_posted(ssk->tx_ring) + \ + (ssk->tx_ring.rdma_inflight ? ssk->tx_ring.rdma_inflight->busy : 0)) - int una_seq; - atomic_t credits; +struct sdp_tx_ring { + struct rx_srcavail_state *rdma_inflight; + struct sdp_buf *buffer; + atomic_t head; + atomic_t tail; + struct ib_cq *cq; + + int una_seq; + atomic_t credits; #define tx_credits(ssk) (atomic_read(&ssk->tx_ring.credits)) - struct timer_list timer; - u16 poll_cnt; + struct timer_list timer; + struct tasklet_struct tasklet; + u16 poll_cnt; }; struct sdp_rx_ring { @@ -353,12 +468,34 @@ struct sdp_rx_ring { atomic_t tail; struct ib_cq *cq; - spinlock_t lock; + int destroyed; + rwlock_t destroyed_lock; }; -static inline int sdp_tx_ring_slots_left(struct sdp_tx_ring *tx_ring) +static inline void rx_ring_unlock(struct sdp_rx_ring *rx_ring, + unsigned long *flags) +{ + read_unlock_irqrestore(&rx_ring->destroyed_lock, *flags); +} + +static inline int rx_ring_trylock(struct sdp_rx_ring *rx_ring, + unsigned long *flags) { - return SDP_TX_SIZE - ring_posted(*tx_ring); + read_lock_irqsave(&rx_ring->destroyed_lock, *flags); + if (rx_ring->destroyed) { + rx_ring_unlock(rx_ring, flags); + return 0; + } + return 1; +} + +static inline void rx_ring_destroy_lock(struct sdp_rx_ring *rx_ring) +{ + unsigned long flags; + + write_lock_irqsave(&rx_ring->destroyed_lock, flags); + rx_ring->destroyed = 1; + write_unlock_irqrestore(&rx_ring->destroyed_lock, flags); } struct sdp_chrecvbuf { @@ -393,6 +530,14 @@ struct sdp_moderation { int moder_time; }; +struct sdp_device { + struct ib_pd *pd; + struct ib_mr *mr; + struct ib_fmr_pool *fmr_pool; +}; + +extern struct ib_client sdp_client; + struct sdp_sock { /* sk has to be the first member of inet_sock */ struct inet_sock isk; @@ -401,6 +546,15 @@ struct sdp_sock { struct list_head backlog_queue; struct sk_buff_head rx_ctl_q; struct sock *parent; + struct sdp_device *sdp_dev; + + int qp_active; + struct tx_srcavail_state *tx_sa; + spinlock_t tx_sa_lock; + int max_send_sge; + int srcavail_cancel; + struct delayed_work srcavail_cancel_work; + int srcavail_cancel_mseq; struct work_struct rx_comp_work; wait_queue_head_t wq; @@ -458,7 +612,6 @@ struct sdp_sock { /* rdma specific */ struct ib_qp *qp; - struct ib_mr *mr; /* SDP slow start */ int rcvbuf_scale; /* local recv buf scale for each socket */ @@ -479,20 +632,7 @@ struct sdp_sock { struct sdp_moderation auto_mod; /* BZCOPY data */ - int zcopy_thresh; -}; - -/* Context used for synchronous zero copy bcopy (BZCOY) */ -struct bzcopy_state { - unsigned char __user *u_base; - int u_len; - int left; - int page_cnt; - int cur_page; - int cur_offset; - int busy; - struct sdp_sock *ssk; - struct page **pages; + int bzcopy_thresh; }; extern int rcvbuf_initial_size; @@ -594,6 +734,9 @@ void sdp_cancel_dreq_wait_timeout(struct sdp_sock *ssk); void sdp_destroy_work(struct work_struct *work); void sdp_reset_sk(struct sock *sk, int rc); void sdp_reset(struct sock *sk); +int sdp_bzcopy_wait_memory(struct sdp_sock *ssk, long *timeo_p, + struct bzcopy_state *bz); +void skb_entail(struct sock *sk, struct sdp_sock *ssk, struct sk_buff *skb); /* sdp_proc.c */ int __init sdp_proc_init(void); @@ -602,6 +745,9 @@ void sdp_proc_unregister(void); /* sdp_cma.c */ int sdp_cma_handler(struct rdma_cm_id *, struct rdma_cm_event *); +/* sdp_bcopy.c */ +int sdp_post_credits(struct sdp_sock *ssk); + /* sdp_tx.c */ int sdp_tx_ring_create(struct sdp_sock *ssk, struct ib_device *device); void sdp_tx_ring_destroy(struct sdp_sock *ssk); @@ -620,11 +766,33 @@ int sdp_init_buffers(struct sdp_sock *ssk, u32 new_size); void sdp_do_posts(struct sdp_sock *ssk); void sdp_rx_comp_full(struct sdp_sock *ssk); void sdp_remove_large_sock(struct sdp_sock *ssk); +void sdp_handle_disconn(struct sock *sk); + +/* sdp_zcopy.c */ +int sdp_sendmsg_zcopy(struct kiocb *iocb, struct sock *sk, struct msghdr *msg, + size_t size); +int sdp_handle_srcavail(struct sdp_sock *ssk, struct sdp_srcah *srcah); +void sdp_handle_sendsm(struct sdp_sock *ssk, u32 mseq_ack); +void sdp_handle_rdma_read_compl(struct sdp_sock *ssk, u32 mseq_ack, + u32 bytes_completed); +int sdp_handle_rdma_read_cqe(struct sdp_sock *ssk); +int sdp_rdma_to_iovec(struct sock *sk, struct iovec *iov, struct sk_buff *skb, + int len); +int sdp_get_pages(struct sock *sk, struct page **pages, int page_cnt, + unsigned long addr); +int sdp_post_rdma_rd_compl(struct sdp_sock *ssk, int copied); +int sdp_post_sendsm(struct sdp_sock *ssk); +void srcavail_cancel_timeout(struct work_struct *work); + +static inline int sdp_tx_ring_slots_left(struct sdp_sock *ssk) +{ + return SDP_TX_SIZE - tx_ring_posted(ssk); +} static inline void sdp_arm_rx_cq(struct sock *sk) { sdp_prf(sk, NULL, "Arming RX cq"); - sdp_dbg_data(sk, "Arming RX cq\n"); +// sdp_dbg_data(sk, "Arming RX cq\n"); ib_req_notify_cq(sdp_sk(sk)->rx_ring.cq, IB_CQ_NEXT_COMP); } @@ -633,11 +801,34 @@ static inline void sdp_arm_tx_cq(struct sock *sk) { sdp_prf(sk, NULL, "Arming TX cq"); sdp_dbg_data(sk, "Arming TX cq. credits: %d, posted: %d\n", - tx_credits(sdp_sk(sk)), ring_posted(sdp_sk(sk)->tx_ring)); + tx_credits(sdp_sk(sk)), tx_ring_posted(sdp_sk(sk))); ib_req_notify_cq(sdp_sk(sk)->tx_ring.cq, IB_CQ_NEXT_COMP); } +/* return the min of: + * - tx credits + * - free slots in tx_ring (not including SDP_MIN_TX_CREDITS + */ +static inline int tx_slots_free(struct sdp_sock *ssk) +{ + int min_free; + + min_free = MIN(tx_credits(ssk), + SDP_TX_SIZE - tx_ring_posted(ssk)); + if (min_free < SDP_MIN_TX_CREDITS) + return 0; + + return min_free - SDP_MIN_TX_CREDITS; +}; + +/* like sk_stream_memory_free - except measures remote credits */ +static inline int sdp_bzcopy_slots_avail(struct sdp_sock *ssk, + struct bzcopy_state *bz) +{ + return tx_slots_free(ssk) > bz->busy; +} + /* utilities */ static inline char *mid2str(int mid) { @@ -646,9 +837,14 @@ static inline char *mid2str(int mid) ENUM2STR(SDP_MID_HELLO), ENUM2STR(SDP_MID_HELLO_ACK), ENUM2STR(SDP_MID_DISCONN), + ENUM2STR(SDP_MID_SENDSM), + ENUM2STR(SDP_MID_RDMARDCOMPL), + ENUM2STR(SDP_MID_SRCAVAIL_CANCEL), ENUM2STR(SDP_MID_CHRCVBUF), ENUM2STR(SDP_MID_CHRCVBUF_ACK), ENUM2STR(SDP_MID_DATA), + ENUM2STR(SDP_MID_SRCAVAIL), + ENUM2STR(SDP_MID_SINKAVAIL), }; if (mid >= ARRAY_SIZE(mid2str)) diff --git a/drivers/infiniband/ulp/sdp/sdp_bcopy.c b/drivers/infiniband/ulp/sdp/sdp_bcopy.c index 8ccd583bfd42b..b743c4cf26c7c 100644 --- a/drivers/infiniband/ulp/sdp/sdp_bcopy.c +++ b/drivers/infiniband/ulp/sdp/sdp_bcopy.c @@ -46,6 +46,8 @@ void _dump_packet(const char *func, int line, struct sock *sk, char *str, struct sdp_hh *hh; struct sdp_hah *hah; struct sdp_chrecvbuf *req_size; + struct sdp_rrch *rrch; + struct sdp_srcah *srcah; int len = 0; char buf[256]; len += snprintf(buf, 255-len, "%s skb: %p mid: %2x:%-20s flags: 0x%x " @@ -78,6 +80,25 @@ void _dump_packet(const char *func, int line, struct sock *sk, char *str, case SDP_MID_DATA: len += snprintf(buf + len, 255-len, "data_len: %ld |", ntohl(h->len) - sizeof(struct sdp_bsdh)); + break; + case SDP_MID_RDMARDCOMPL: + rrch = (struct sdp_rrch *)(h+1); + + len += snprintf(buf + len, 255-len, " | len: %d |", + ntohl(rrch->len)); + break; + case SDP_MID_SRCAVAIL: + srcah = (struct sdp_srcah *)(h+1); + + len += snprintf(buf + len, 255-len, " | data_len: %ld |", + ntohl(h->len) - sizeof(struct sdp_bsdh) - + sizeof(struct sdp_srcah)); + + len += snprintf(buf + len, 255-len, + " | len: %d, rkey: 0x%x, vaddr: 0x%llx |", + ntohl(srcah->len), ntohl(srcah->rkey), + be64_to_cpu(srcah->vaddr)); + break; default: break; } @@ -104,11 +125,12 @@ static inline int sdp_nagle_off(struct sdp_sock *ssk, struct sk_buff *skb) { int send_now = BZCOPY_STATE(skb) || - (ssk->nonagle & TCP_NAGLE_OFF) || + TX_SRCAVAIL_STATE(skb) || + (ssk->nonagle & TCP_NAGLE_OFF) || !ssk->nagle_last_unacked || skb->next != (struct sk_buff *)&ssk->isk.sk.sk_write_queue || skb->len + sizeof(struct sdp_bsdh) >= ssk->xmit_size_goal || - (TCP_SKB_CB(skb)->flags & TCPCB_FLAG_PSH); + (SDP_SKB_CB(skb)->flags & TCPCB_FLAG_PSH); if (send_now) { unsigned long mseq = ring_head(ssk->tx_ring); @@ -160,6 +182,33 @@ out2: mod_timer(&ssk->nagle_timer, jiffies + SDP_NAGLE_TIMEOUT); } +int sdp_post_credits(struct sdp_sock *ssk) +{ + int post_count = 0; + + sdp_dbg_data(&ssk->isk.sk, "credits: %d remote credits: %d " + "tx ring slots left: %d send_head: %p\n", + tx_credits(ssk), remote_credits(ssk), + sdp_tx_ring_slots_left(ssk), + ssk->isk.sk.sk_send_head); + + if (likely(tx_credits(ssk) > 1) && + likely(sdp_tx_ring_slots_left(ssk))) { + struct sk_buff *skb; + skb = sdp_stream_alloc_skb(&ssk->isk.sk, + sizeof(struct sdp_bsdh), + GFP_KERNEL); + if (!skb) + return -ENOMEM; + sdp_post_send(ssk, skb, SDP_MID_DATA); + post_count++; + } + + if (post_count) + sdp_xmit_poll(ssk, 0); + return post_count; +} + void sdp_post_sends(struct sdp_sock *ssk, int nonagle) { /* TODO: nonagle? */ @@ -183,15 +232,30 @@ void sdp_post_sends(struct sdp_sock *ssk, int nonagle) else gfp_page = GFP_KERNEL; - if (sdp_tx_ring_slots_left(&ssk->tx_ring) < SDP_TX_SIZE / 2) { + if (sdp_tx_ring_slots_left(ssk) < SDP_TX_SIZE / 2) { int wc_processed = sdp_xmit_poll(ssk, 1); sdp_dbg_data(&ssk->isk.sk, "freed %d\n", wc_processed); } + if (ssk->tx_sa && ssk->srcavail_cancel && + tx_credits(ssk) >= SDP_MIN_TX_CREDITS && + sdp_tx_ring_slots_left(ssk)) { + sdp_prf1(&ssk->isk.sk, NULL, "Going to send srcavail cancel"); + skb = sdp_stream_alloc_skb(&ssk->isk.sk, + sizeof(struct sdp_bsdh), + gfp_page); + /* FIXME */ + BUG_ON(!skb); + TX_SRCAVAIL_STATE(skb) = ssk->tx_sa; + sdp_post_send(ssk, skb, SDP_MID_SRCAVAIL_CANCEL); + post_count++; + ssk->srcavail_cancel = 0; + } + if (ssk->recv_request && ring_tail(ssk->rx_ring) >= ssk->recv_request_head && tx_credits(ssk) >= SDP_MIN_TX_CREDITS && - sdp_tx_ring_slots_left(&ssk->tx_ring)) { + sdp_tx_ring_slots_left(ssk)) { struct sdp_chrecvbuf *resp_size; ssk->recv_request = 0; skb = sdp_stream_alloc_skb(&ssk->isk.sk, @@ -208,19 +272,29 @@ void sdp_post_sends(struct sdp_sock *ssk, int nonagle) } if (tx_credits(ssk) <= SDP_MIN_TX_CREDITS && - sdp_tx_ring_slots_left(&ssk->tx_ring) && + sdp_tx_ring_slots_left(ssk) && ssk->isk.sk.sk_send_head && sdp_nagle_off(ssk, ssk->isk.sk.sk_send_head)) { SDPSTATS_COUNTER_INC(send_miss_no_credits); } while (tx_credits(ssk) > SDP_MIN_TX_CREDITS && - sdp_tx_ring_slots_left(&ssk->tx_ring) && + sdp_tx_ring_slots_left(ssk) && (skb = ssk->isk.sk.sk_send_head) && sdp_nagle_off(ssk, skb)) { + struct tx_srcavail_state *tx_sa; update_send_head(&ssk->isk.sk, skb); __skb_dequeue(&ssk->isk.sk.sk_write_queue); - sdp_post_send(ssk, skb, SDP_MID_DATA); + + tx_sa = TX_SRCAVAIL_STATE(skb); + if (unlikely(tx_sa)) { + if (likely(!tx_sa->abort)) + sdp_post_send(ssk, skb, SDP_MID_SRCAVAIL); + else + sdp_warn(&ssk->isk.sk, "Not sending aborted SrcAvail\n"); + } else { + sdp_post_send(ssk, skb, SDP_MID_DATA); + } post_count++; } @@ -228,9 +302,9 @@ void sdp_post_sends(struct sdp_sock *ssk, int nonagle) if (likely(c > SDP_MIN_TX_CREDITS)) c *= 2; - if (unlikely(c < ring_posted(ssk->rx_ring)) && + if (unlikely(c < rx_ring_posted(ssk)) && likely(tx_credits(ssk) > 1) && - likely(sdp_tx_ring_slots_left(&ssk->tx_ring)) && + likely(sdp_tx_ring_slots_left(ssk)) && likely((1 << ssk->isk.sk.sk_state) & (TCPF_ESTABLISHED | TCPF_FIN_WAIT1))) { skb = sdp_stream_alloc_skb(&ssk->isk.sk, diff --git a/drivers/infiniband/ulp/sdp/sdp_cma.c b/drivers/infiniband/ulp/sdp/sdp_cma.c index f8a730390e558..ab5ec58b4332d 100644 --- a/drivers/infiniband/ulp/sdp/sdp_cma.c +++ b/drivers/infiniband/ulp/sdp/sdp_cma.c @@ -62,34 +62,18 @@ static int sdp_init_qp(struct sock *sk, struct rdma_cm_id *id) struct ib_qp_init_attr qp_init_attr = { .event_handler = sdp_qp_event_handler, .cap.max_send_wr = SDP_TX_SIZE, - .cap.max_send_sge = SDP_MAX_SEND_SKB_FRAGS, + .cap.max_send_sge = SDP_MAX_SEND_SGES /*SDP_MAX_SEND_SKB_FRAGS*/, .cap.max_recv_wr = SDP_RX_SIZE, .cap.max_recv_sge = SDP_MAX_RECV_SKB_FRAGS + 1, .sq_sig_type = IB_SIGNAL_REQ_WR, .qp_type = IB_QPT_RC, }; struct ib_device *device = id->device; - struct ib_mr *mr; - struct ib_pd *pd; int rc; sdp_dbg(sk, "%s\n", __func__); - pd = ib_alloc_pd(device); - if (IS_ERR(pd)) { - rc = PTR_ERR(pd); - sdp_warn(sk, "Unable to allocate PD: %d.\n", rc); - goto err_pd; - } - - mr = ib_get_dma_mr(pd, IB_ACCESS_LOCAL_WRITE); - if (IS_ERR(mr)) { - rc = PTR_ERR(mr); - sdp_warn(sk, "Unable to get dma MR: %d.\n", rc); - goto err_mr; - } - - sdp_sk(sk)->mr = mr; + sdp_sk(sk)->sdp_dev = ib_get_client_data(device, &sdp_client); rc = sdp_rx_ring_create(sdp_sk(sk), device); if (rc) @@ -102,13 +86,27 @@ static int sdp_init_qp(struct sock *sk, struct rdma_cm_id *id) qp_init_attr.recv_cq = sdp_sk(sk)->rx_ring.cq; qp_init_attr.send_cq = sdp_sk(sk)->tx_ring.cq; - rc = rdma_create_qp(id, pd, &qp_init_attr); + rc = rdma_create_qp(id, sdp_sk(sk)->sdp_dev->pd, &qp_init_attr); if (rc) { sdp_warn(sk, "Unable to create QP: %d.\n", rc); goto err_qp; } sdp_sk(sk)->qp = id->qp; sdp_sk(sk)->ib_device = device; + sdp_sk(sk)->qp_active = 1; + +{ + struct ib_qp_attr qp_attr; + struct ib_qp_init_attr qp_init_attr; + + rc = ib_query_qp(sdp_sk(sk)->qp, + &qp_attr, + 0, + &qp_init_attr); + + sdp_sk(sk)->max_send_sge = qp_attr.cap.max_send_sge; + sdp_dbg(sk, "max_send_sge = %d\n", sdp_sk(sk)->max_send_sge); +} init_waitqueue_head(&sdp_sk(sk)->wq); @@ -120,10 +118,6 @@ err_qp: err_tx: sdp_rx_ring_destroy(sdp_sk(sk)); err_rx: - ib_dereg_mr(sdp_sk(sk)->mr); -err_mr: - ib_dealloc_pd(pd); -err_pd: return rc; } @@ -333,14 +327,14 @@ int sdp_cma_handler(struct rdma_cm_id *id, struct rdma_cm_event *event) if (rc) break; atomic_set(&sdp_sk(sk)->remote_credits, - ring_posted(sdp_sk(sk)->rx_ring)); + rx_ring_posted(sdp_sk(sk))); memset(&hh, 0, sizeof hh); hh.bsdh.mid = SDP_MID_HELLO; + hh.bsdh.bufs = htons(remote_credits(sdp_sk(sk))); hh.bsdh.len = htonl(sizeof(struct sdp_bsdh) + SDP_HH_SIZE); hh.max_adverts = 1; hh.majv_minv = SDP_MAJV_MINV; sdp_init_buffers(sdp_sk(sk), rcvbuf_initial_size); - hh.bsdh.bufs = htons(ring_posted(sdp_sk(sk)->rx_ring)); hh.localrcvsz = hh.desremrcvsz = htonl(sdp_sk(sk)->recv_frags * PAGE_SIZE + sizeof(struct sdp_bsdh)); hh.max_adverts = 0x1; @@ -354,6 +348,7 @@ int sdp_cma_handler(struct rdma_cm_id *id, struct rdma_cm_event *event) conn_param.retry_count = SDP_RETRY_COUNT; SDP_DUMP_PACKET(NULL, "TX", NULL, &hh.bsdh); rc = rdma_connect(id, &conn_param); +// sdp_sk(sk)->qp_active = 1; break; case RDMA_CM_EVENT_ROUTE_ERROR: sdp_dbg(sk, "RDMA_CM_EVENT_ROUTE_ERROR : %p\n", id); @@ -363,15 +358,16 @@ int sdp_cma_handler(struct rdma_cm_id *id, struct rdma_cm_event *event) sdp_dbg(sk, "RDMA_CM_EVENT_CONNECT_REQUEST\n"); rc = sdp_connect_handler(sk, id, event); if (rc) { + sdp_warn(sk, "Destroy qp !!!!\n"); rdma_reject(id, NULL, 0); break; } child = id->context; atomic_set(&sdp_sk(child)->remote_credits, - ring_posted(sdp_sk(child)->rx_ring)); + rx_ring_posted(sdp_sk(child))); memset(&hah, 0, sizeof hah); hah.bsdh.mid = SDP_MID_HELLO_ACK; - hah.bsdh.bufs = htons(ring_posted(sdp_sk(child)->rx_ring)); + hah.bsdh.bufs = htons(remote_credits(sdp_sk(child))); hah.bsdh.len = htonl(sizeof(struct sdp_bsdh) + SDP_HAH_SIZE); hah.majv_minv = SDP_MAJV_MINV; hah.ext_max_adverts = 1; /* Doesn't seem to be mandated by spec, @@ -391,15 +387,24 @@ int sdp_cma_handler(struct rdma_cm_id *id, struct rdma_cm_event *event) id->qp = NULL; id->context = NULL; parent = sdp_sk(child)->parent; /* TODO: hold ? */ + } else { +// sdp_sk(child)->qp_active = 1; } break; case RDMA_CM_EVENT_CONNECT_RESPONSE: sdp_dbg(sk, "RDMA_CM_EVENT_CONNECT_RESPONSE\n"); rc = sdp_response_handler(sk, id, event); - if (rc) + if (rc) { + sdp_warn(sk, "Destroy qp !!!!\n"); rdma_reject(id, NULL, 0); + } else rc = rdma_accept(id, NULL); + + if (!rc) { +// sdp_sk(sk)->qp_active = 1; + rc = sdp_post_credits(sdp_sk(sk)) < 0 ?: 0; + } break; case RDMA_CM_EVENT_CONNECT_ERROR: sdp_dbg(sk, "RDMA_CM_EVENT_CONNECT_ERROR\n"); @@ -431,6 +436,7 @@ int sdp_cma_handler(struct rdma_cm_id *id, struct rdma_cm_event *event) __func__); } + sdp_sk(sk)->qp_active = 0; rdma_disconnect(id); if (sk->sk_state != TCP_TIME_WAIT) { diff --git a/drivers/infiniband/ulp/sdp/sdp_main.c b/drivers/infiniband/ulp/sdp/sdp_main.c index 9c5a8ed9b2e4e..c547ff18bfd7f 100644 --- a/drivers/infiniband/ulp/sdp/sdp_main.c +++ b/drivers/infiniband/ulp/sdp/sdp_main.c @@ -68,6 +68,7 @@ unsigned int csum_partial_copy_from_user_new (const char *src, char *dst, #include #include #include +#include #include /* TODO: remove when sdp_socket.h becomes part of include/linux/socket.h */ #include "sdp_socket.h" @@ -81,7 +82,7 @@ MODULE_LICENSE("Dual BSD/GPL"); #ifdef CONFIG_INFINIBAND_SDP_DEBUG SDP_MODPARAM_INT(sdp_debug_level, 0, "Enable debug tracing if > 0."); #endif -#ifdef CONFIG_INFINIBAND_SDP_DEBUG +#ifdef CONFIG_INFINIBAND_SDP_DEBUG_DATA SDP_MODPARAM_INT(sdp_data_debug_level, 0, "Enable data path debug tracing if > 0."); #endif @@ -91,8 +92,10 @@ SDP_MODPARAM_SINT(recv_poll_miss, -1, "How many times recv poll missed."); SDP_MODPARAM_SINT(recv_poll, 1000, "How many times to poll recv."); SDP_MODPARAM_SINT(sdp_keepalive_time, SDP_KEEPALIVE_TIME, "Default idle time in seconds before keepalive probe sent."); -SDP_MODPARAM_SINT(sdp_zcopy_thresh, 65536, "Zero copy send threshold; 0=0ff."); - +SDP_MODPARAM_SINT(sdp_bzcopy_thresh, 65536, + "Zero copy send using SEND threshold; 0=0ff."); +SDP_MODPARAM_SINT(sdp_zcopy_thresh, 128*1024, + "Zero copy using RDMA threshold; 0=0ff."); #define SDP_RX_COAL_TIME_HIGH 128 SDP_MODPARAM_SINT(sdp_rx_coal_target, 0x50000, "Target number of bytes to coalesce with interrupt moderation."); @@ -168,6 +171,7 @@ static int sdp_get_port(struct sock *sk, unsigned short snum) rc = rdma_bind_addr(ssk->id, (struct sockaddr *)&addr); if (rc) { + sdp_warn(sk, "Destroy qp !!!!\n"); rdma_destroy_id(ssk->id); ssk->id = NULL; return rc; @@ -180,32 +184,21 @@ static int sdp_get_port(struct sock *sk, unsigned short snum) static void sdp_destroy_qp(struct sdp_sock *ssk) { - struct ib_pd *pd = NULL; - - sdp_dbg(&ssk->isk.sk, "destroying qp\n"); sdp_prf(&ssk->isk.sk, NULL, "destroying qp"); - del_timer(&ssk->tx_ring.timer); + ssk->qp_active = 0; + del_timer(&ssk->tx_ring.timer); sdp_rx_ring_destroy(ssk); sdp_tx_ring_destroy(ssk); if (ssk->qp) { - pd = ssk->qp->pd; ib_destroy_qp(ssk->qp); ssk->qp = NULL; } - if (ssk->mr) { - ib_dereg_mr(ssk->mr); - ssk->mr = NULL; - } - - if (pd) - ib_dealloc_pd(pd); - sdp_remove_large_sock(ssk); } @@ -523,10 +516,12 @@ static void sdp_destruct(struct sock *sk) sdp_dbg(sk, "%s\n", __func__); if (ssk->destructed_already) { - sdp_warn(sk, "redestructing sk!"); + sdp_warn(sk, "redestructing sk!\n"); return; } + cancel_delayed_work(&ssk->srcavail_cancel_work); + ssk->destructed_already = 1; sdp_remove_sock(ssk); @@ -749,8 +744,10 @@ static int sdp_disconnect(struct sock *sk, int flags) sdp_dbg(sk, "%s\n", __func__); if (old_state != TCP_LISTEN) { - if (ssk->id) + if (ssk->id) { + sdp_sk(sk)->qp_active = 0; rc = rdma_disconnect(ssk->id); + } return rc; } @@ -973,6 +970,8 @@ void sdp_destroy_work(struct work_struct *work) sdp_cancel_dreq_wait_timeout(ssk); + cancel_delayed_work(&ssk->srcavail_cancel_work); + if (sk->sk_state == TCP_TIME_WAIT) sock_put(sk, SOCK_REF_CM_TW); @@ -1009,15 +1008,25 @@ void sdp_dreq_wait_timeout_work(struct work_struct *work) release_sock(sk); - if (sdp_sk(sk)->id) + if (sdp_sk(sk)->id) { + sdp_warn(sk, "Destroyed QP!!!!\n"); + sdp_sk(sk)->qp_active = 0; rdma_disconnect(sdp_sk(sk)->id); - else + } else sock_put(sk, SOCK_REF_CM_TW); out: sock_put(sk, SOCK_REF_DREQ_TO); } +/* + * Only SDP interact with this receive queue. Don't want + * lockdep warnings that using spinlock irqsave + */ +static struct lock_class_key ib_sdp_sk_receive_queue_lock_key; + +static struct lock_class_key ib_sdp_sk_callback_lock_key; + int sdp_init_sock(struct sock *sk) { struct sdp_sock *ssk = sdp_sk(sk); @@ -1027,8 +1036,15 @@ int sdp_init_sock(struct sock *sk) INIT_LIST_HEAD(&ssk->accept_queue); INIT_LIST_HEAD(&ssk->backlog_queue); INIT_DELAYED_WORK(&ssk->dreq_wait_work, sdp_dreq_wait_timeout_work); + INIT_DELAYED_WORK(&ssk->srcavail_cancel_work, srcavail_cancel_timeout); INIT_WORK(&ssk->destroy_work, sdp_destroy_work); + lockdep_set_class(&sk->sk_receive_queue.lock, + &ib_sdp_sk_receive_queue_lock_key); + + lockdep_set_class(&sk->sk_callback_lock, + &ib_sdp_sk_callback_lock_key); + sk->sk_route_caps |= NETIF_F_SG | NETIF_F_NO_CSUM; skb_queue_head_init(&ssk->rx_ctl_q); @@ -1040,10 +1056,14 @@ int sdp_init_sock(struct sock *sk) ssk->sdp_disconnect = 0; ssk->destructed_already = 0; ssk->destruct_in_process = 0; + ssk->srcavail_cancel = 0; spin_lock_init(&ssk->lock); + spin_lock_init(&ssk->tx_sa_lock); atomic_set(&ssk->somebody_is_doing_posts, 0); + ssk->tx_ring.rdma_inflight = NULL; + return 0; } @@ -1079,7 +1099,7 @@ static void sdp_shutdown(struct sock *sk, int how) static void sdp_mark_push(struct sdp_sock *ssk, struct sk_buff *skb) { - TCP_SKB_CB(skb)->flags |= TCPCB_FLAG_PSH; + SDP_SKB_CB(skb)->flags |= TCPCB_FLAG_PSH; ssk->pushed_seq = ssk->write_seq; sdp_do_posts(ssk); } @@ -1187,7 +1207,7 @@ static int sdp_setsockopt(struct sock *sk, int level, int optname, if (val < SDP_MIN_ZCOPY_THRESH || val > SDP_MAX_ZCOPY_THRESH) err = -EINVAL; else - ssk->zcopy_thresh = val; + ssk->bzcopy_thresh = val; break; default: err = -ENOPROTOOPT; @@ -1231,7 +1251,7 @@ static int sdp_getsockopt(struct sock *sk, int level, int optname, val = (ssk->keepalive_time ? : sdp_keepalive_time) / HZ; break; case SDP_ZCOPY_THRESH: - val = ssk->zcopy_thresh ? ssk->zcopy_thresh : sdp_zcopy_thresh; + val = ssk->bzcopy_thresh ? ssk->bzcopy_thresh : sdp_bzcopy_thresh; break; default: return -ENOPROTOOPT; @@ -1316,7 +1336,7 @@ static inline void sdp_mark_urg(struct sock *sk, struct sdp_sock *ssk, { if (unlikely(flags & MSG_OOB)) { struct sk_buff *skb = sk->sk_write_queue.prev; - TCP_SKB_CB(skb)->flags |= TCPCB_FLAG_URG; + SDP_SKB_CB(skb)->flags |= TCPCB_FLAG_URG; } } @@ -1327,8 +1347,7 @@ static inline void sdp_push(struct sock *sk, struct sdp_sock *ssk, int flags) sdp_do_posts(sdp_sk(sk)); } -static inline void skb_entail(struct sock *sk, struct sdp_sock *ssk, - struct sk_buff *skb) +void skb_entail(struct sock *sk, struct sdp_sock *ssk, struct sk_buff *skb) { skb_header_release(skb); __skb_queue_tail(&sk->sk_write_queue, skb); @@ -1363,8 +1382,9 @@ static inline struct bzcopy_state *sdp_bz_cleanup(struct bzcopy_state *bz) } if (bz->pages) { - for (i = 0; i < bz->cur_page; i++) + for (i = 0; i < bz->cur_page; i++) { put_page(bz->pages[i]); + } kfree(bz->pages); } @@ -1382,11 +1402,11 @@ static struct bzcopy_state *sdp_bz_setup(struct sdp_sock *ssk, { struct bzcopy_state *bz; unsigned long addr; - int done_pages = 0; int thresh; mm_segment_t cur_fs; + int rc = 0; - thresh = ssk->zcopy_thresh ? : sdp_zcopy_thresh; + thresh = ssk->bzcopy_thresh ? : sdp_bzcopy_thresh; if (thresh == 0 || len < thresh || !capable(CAP_IPC_LOCK)) { SDPSTATS_COUNTER_INC(sendmsg_bcopy_segment); return NULL; @@ -1425,36 +1445,18 @@ static struct bzcopy_state *sdp_bz_setup(struct sdp_sock *ssk, return ERR_PTR(-ENOMEM); } + rc = sdp_get_pages(&ssk->isk.sk, bz->pages, bz->page_cnt, + (unsigned long)base); - addr &= PAGE_MASK; - if (segment_eq(cur_fs, KERNEL_DS)) { - for (done_pages = 0; done_pages < bz->page_cnt; done_pages++) { - bz->pages[done_pages] = virt_to_page(addr); - if (!bz->pages[done_pages]) - break; - get_page(bz->pages[done_pages]); - addr += PAGE_SIZE; - } - } else { - if (current->mm) { - down_write(¤t->mm->mmap_sem); - done_pages = get_user_pages(current, current->mm, addr, - bz->page_cnt, 0, 0, bz->pages, NULL); - up_write(¤t->mm->mmap_sem); - } - } - if (unlikely(done_pages != bz->page_cnt)){ - int i; - if (done_pages > 0) { - for (i = 0; i < done_pages; i++) - put_page(bz->pages[i]); - } - kfree(bz->pages); - kfree(bz); - bz = ERR_PTR(-EFAULT); - } + if (unlikely(rc)) + goto err; return bz; + +err: + kfree(bz->pages); + kfree(bz); + return ERR_PTR(-EFAULT); } #define TCP_PAGE(sk) (sk->sk_sndmsg_page) @@ -1606,31 +1608,8 @@ static inline int sdp_bzcopy_get(struct sock *sk, struct sk_buff *skb, return copy; } -/* return the min of: - * - tx credits - * - free slots in tx_ring (not including SDP_MIN_TX_CREDITS - */ -static inline int tx_slots_free(struct sdp_sock *ssk) -{ - int min_free; - - min_free = MIN(tx_credits(ssk), - SDP_TX_SIZE - ring_posted(ssk->tx_ring)); - if (min_free < SDP_MIN_TX_CREDITS) - return 0; - - return min_free - SDP_MIN_TX_CREDITS; -}; - -/* like sk_stream_memory_free - except measures remote credits */ -static inline int sdp_bzcopy_slots_avail(struct sdp_sock *ssk, - struct bzcopy_state *bz) -{ - return tx_slots_free(ssk) > bz->busy; -} - /* like sk_stream_wait_memory - except waits on remote credits */ -static int sdp_bzcopy_wait_memory(struct sdp_sock *ssk, long *timeo_p, +int sdp_bzcopy_wait_memory(struct sdp_sock *ssk, long *timeo_p, struct bzcopy_state *bz) { struct sock *sk = &ssk->isk.sk; @@ -1714,6 +1693,15 @@ static int sdp_sendmsg(struct kiocb *iocb, struct sock *sk, struct msghdr *msg, long timeo; struct bzcopy_state *bz = NULL; SDPSTATS_COUNTER_INC(sendmsg); + + if (sdp_zcopy_thresh && size > sdp_zcopy_thresh) { + err = sdp_sendmsg_zcopy(iocb, sk, msg, size); + if (err != -EAGAIN) + return err; + + /* Got SendSM/Timedout - fallback to regular send */ + } + lock_sock(sk); sdp_dbg_data(sk, "%s\n", __func__); @@ -1816,7 +1804,7 @@ new_segment: /* OOB data byte should be the last byte of the data payload */ - if (unlikely(TCP_SKB_CB(skb)->flags & TCPCB_FLAG_URG) && + if (unlikely(SDP_SKB_CB(skb)->flags & TCPCB_FLAG_URG) && !(flags & MSG_OOB)) { sdp_mark_push(ssk, skb); goto new_segment; @@ -1835,10 +1823,10 @@ new_segment: } if (!copied) - TCP_SKB_CB(skb)->flags &= ~TCPCB_FLAG_PSH; + SDP_SKB_CB(skb)->flags &= ~TCPCB_FLAG_PSH; ssk->write_seq += copy; - TCP_SKB_CB(skb)->end_seq += copy; + SDP_SKB_CB(skb)->end_seq += copy; /*unused: skb_shinfo(skb)->gso_segs = 0;*/ from += copy; @@ -1888,10 +1876,10 @@ out: bz = sdp_bz_cleanup(bz); } - posts_handler_put(ssk); - sdp_auto_moderation(ssk); + posts_handler_put(ssk); + release_sock(sk); return copied; @@ -1916,6 +1904,7 @@ out_err: posts_handler_put(ssk); release_sock(sk); + return err; } @@ -1936,9 +1925,13 @@ static int sdp_recvmsg(struct kiocb *iocb, struct sock *sk, struct msghdr *msg, u32 *seq; int copied = 0; int rc; + int avail_bytes_count = 0; /* Could be inlined in skb */ + /* or advertised for RDMA */ lock_sock(sk); - sdp_dbg_data(sk, "%s\n", __func__); + sdp_dbg_data(sk, "iovlen: %ld iov_len: %ld flags: 0x%x peek: 0x%x\n", + msg->msg_iovlen, msg->msg_iov[0].iov_len, flags, + MSG_PEEK); posts_handler_get(ssk); @@ -1962,6 +1955,7 @@ static int sdp_recvmsg(struct kiocb *iocb, struct sock *sk, struct msghdr *msg, target = sock_rcvlowat(sk, flags & MSG_WAITALL, len); do { + struct rx_srcavail_state *rx_sa = NULL; u32 offset; /* Are we at urgent data? Stop if we have read anything or have @@ -1978,21 +1972,51 @@ static int sdp_recvmsg(struct kiocb *iocb, struct sock *sk, struct msghdr *msg, skb = skb_peek(&sk->sk_receive_queue); do { + struct sdp_bsdh *h; if (!skb) break; - if ((skb_transport_header(skb))[0] == SDP_MID_DISCONN) + avail_bytes_count = 0; + + h = (struct sdp_bsdh *)skb_transport_header(skb); + + switch (h->mid) { + case SDP_MID_DISCONN: + sdp_dbg(sk, "Handle RX SDP_MID_DISCONN\n"); + sdp_prf(sk, NULL, "Handle RX SDP_MID_DISCONN"); + sdp_handle_disconn(sk); goto found_fin_ok; - if (before(*seq, TCP_SKB_CB(skb)->seq)) { + case SDP_MID_SRCAVAIL: + rx_sa = RX_SRCAVAIL_STATE(skb); + if (rx_sa->mseq < ssk->srcavail_cancel_mseq) { + rx_sa->aborted = 1; + sdp_warn(sk, "Ignoring src avail - " + "due to SrcAvailCancel\n"); + goto skb_cleanup; + } + avail_bytes_count = rx_sa->len; + break; + + case SDP_MID_DATA: + rx_sa = NULL; + avail_bytes_count = skb->len; + break; + default: + break; + } + + if (before(*seq, SDP_SKB_CB(skb)->seq)) { sdp_warn(sk, "recvmsg bug: copied %X seq %X\n", - *seq, TCP_SKB_CB(skb)->seq); + *seq, SDP_SKB_CB(skb)->seq); sdp_reset(sk); break; } - offset = *seq - TCP_SKB_CB(skb)->seq; - if (offset < skb->len) + offset = *seq - SDP_SKB_CB(skb)->seq; +// sdp_warn(sk, "offset = 0x%x, avail_byte_count = 0x%x\n", +// offset, avail_bytes_count); + if (offset < avail_bytes_count) goto found_ok_skb; WARN_ON(!(flags & MSG_PEEK)); @@ -2066,10 +2090,10 @@ static int sdp_recvmsg(struct kiocb *iocb, struct sock *sk, struct msghdr *msg, continue; found_ok_skb: - sdp_dbg_data(sk, "found_ok_skb len %d\n", skb->len); - sdp_dbg_data(sk, "len %Zd offset %d\n", len, offset); + sdp_dbg_data(sk, "bytes avail: %d\n", avail_bytes_count); + sdp_dbg_data(sk, "buf len %Zd offset %d\n", len, offset); sdp_dbg_data(sk, "copied %d target %d\n", copied, target); - used = skb->len - offset; + used = avail_bytes_count - offset; if (len < used) used = len; @@ -2091,9 +2115,20 @@ static int sdp_recvmsg(struct kiocb *iocb, struct sock *sk, struct msghdr *msg, } } if (!(flags & MSG_TRUNC)) { - err = skb_copy_datagram_iovec(skb, offset, + if (rx_sa) { + sdp_dbg_data(sk, "Got srcavail - using RDMA\n"); + err = sdp_rdma_to_iovec(sk, msg->msg_iov, skb, + used); + if (err == -EAGAIN) { + sdp_warn(sk, "RDMA Read aborted\n"); + used = 0; + goto skb_cleanup; + } + } else { + err = skb_copy_datagram_iovec(skb, offset, /* TODO: skip header? */ msg->msg_iov, used); + } if (err) { sdp_dbg(sk, "%s: skb_copy_datagram_iovec failed" "offset %d size %ld status %d\n", @@ -2115,17 +2150,36 @@ static int sdp_recvmsg(struct kiocb *iocb, struct sock *sk, struct msghdr *msg, skip_copy: if (ssk->urg_data && after(ssk->copied_seq, ssk->urg_seq)) ssk->urg_data = 0; - if (used + offset < skb->len) + + + if (rx_sa) { + if (ssk->srcavail_cancel_mseq < rx_sa->mseq) { + rc = sdp_post_rdma_rd_compl(ssk, rx_sa->used); + BUG_ON(rc); + } + if (rx_sa->aborted) { + sdp_warn(sk, "RDMA aborted. Sending SendSM\n"); + rc = sdp_post_sendsm(ssk); + BUG_ON(rc); + } + } + + if ((!rx_sa && used + offset < skb->len) || + (rx_sa && !rx_sa->aborted && rx_sa->used < rx_sa->len)) continue; offset = 0; - if (!(flags & MSG_PEEK)) { +skb_cleanup: + if (!(flags & MSG_PEEK) || (rx_sa && rx_sa->aborted)) { struct sdp_bsdh *h; h = (struct sdp_bsdh *)skb_transport_header(skb); sdp_prf1(sk, skb, "READ finished. mseq: %d mseq_ack:%d", ntohl(h->mseq), ntohl(h->mseq_ack)); skb_unlink(skb, &sk->sk_receive_queue); __kfree_skb(skb); + + kfree(rx_sa); + rx_sa = NULL; } continue; found_fin_ok: @@ -2261,7 +2315,7 @@ void sdp_urg(struct sdp_sock *ssk, struct sk_buff *skb) u8 tmp; u32 ptr = skb->len - 1; - ssk->urg_seq = TCP_SKB_CB(skb)->seq + ptr; + ssk->urg_seq = SDP_SKB_CB(skb)->seq + ptr; if (skb_copy_bits(skb, ptr, &tmp, 1)) BUG(); @@ -2376,6 +2430,53 @@ static int sdp_create_socket(struct net *net, struct socket *sock, int protocol) static void sdp_add_device(struct ib_device *device) { + struct sdp_device *sdp_dev; + struct ib_fmr_pool_param fmr_param; + + sdp_dev = kmalloc(sizeof *sdp_dev, GFP_KERNEL); + if (!sdp_dev) + return; + + sdp_dev->pd = ib_alloc_pd(device); + if (IS_ERR(sdp_dev->pd)) { + printk(KERN_WARNING "Unable to allocate PD: %ld.\n", + PTR_ERR(sdp_dev->pd)); + goto err_pd_alloc; + } + + sdp_dev->mr = ib_get_dma_mr(sdp_dev->pd, IB_ACCESS_LOCAL_WRITE); + if (IS_ERR(sdp_dev->mr)) { + printk(KERN_WARNING "Unable to get dma MR: %ld.\n", + PTR_ERR(sdp_dev->mr)); + goto err_mr; + } + + memset(&fmr_param, 0, sizeof fmr_param); + fmr_param.pool_size = SDP_FMR_POOL_SIZE; + fmr_param.dirty_watermark = SDP_FMR_DIRTY_SIZE; + fmr_param.cache = 0; + fmr_param.max_pages_per_fmr = SDP_FMR_SIZE; + fmr_param.page_shift = PAGE_SHIFT; + fmr_param.access = (IB_ACCESS_LOCAL_WRITE | + IB_ACCESS_REMOTE_READ); + + sdp_dev->fmr_pool = ib_create_fmr_pool(sdp_dev->pd, &fmr_param); + if (IS_ERR(sdp_dev->fmr_pool)) { + printk(KERN_WARNING "Error creating fmr pool\n"); + sdp_dev->fmr_pool = NULL; + goto err_fmr_create; + } + + ib_set_client_data(device, &sdp_client, sdp_dev); + + return; + +err_fmr_create: + ib_dereg_mr(sdp_dev->mr); +err_mr: + ib_dealloc_pd(sdp_dev->pd); +err_pd_alloc: + kfree(sdp_dev); } static void sdp_remove_device(struct ib_device *device) @@ -2384,6 +2485,7 @@ static void sdp_remove_device(struct ib_device *device) struct sdp_sock *ssk; struct sock *sk; struct rdma_cm_id *id; + struct sdp_device *sdp_dev; do_next: write_lock(&device_removal_lock); @@ -2416,6 +2518,7 @@ kill_socks: sk = &ssk->isk.sk; sdp_cancel_dreq_wait_timeout(ssk); + cancel_delayed_work(&ssk->srcavail_cancel_work); spin_unlock_irq(&sock_list_lock); @@ -2438,6 +2541,17 @@ kill_socks: spin_unlock_irq(&sock_list_lock); write_unlock(&device_removal_lock); + + sdp_dev = ib_get_client_data(device, &sdp_client); + + ib_flush_fmr_pool(sdp_dev->fmr_pool); + ib_destroy_fmr_pool(sdp_dev->fmr_pool); + + ib_dereg_mr(sdp_dev->mr); + + ib_dealloc_pd(sdp_dev->pd); + + kfree(sdp_dev); } static struct net_proto_family sdp_net_proto = { @@ -2446,7 +2560,7 @@ static struct net_proto_family sdp_net_proto = { .owner = THIS_MODULE, }; -static struct ib_client sdp_client = { +struct ib_client sdp_client = { .name = "sdp", .add = sdp_add_device, .remove = sdp_remove_device @@ -2468,6 +2582,9 @@ static int __init sdp_init(void) if (!orphan_count) goto no_mem_orphan_count; + INIT_LIST_HEAD(&sockets_allocated->list); + INIT_LIST_HEAD(&orphan_count->list); + percpu_counter_init(sockets_allocated, 0); percpu_counter_init(orphan_count, 0); diff --git a/drivers/infiniband/ulp/sdp/sdp_proc.c b/drivers/infiniband/ulp/sdp/sdp_proc.c index 939a143ceede5..791030ffc21e5 100644 --- a/drivers/infiniband/ulp/sdp/sdp_proc.c +++ b/drivers/infiniband/ulp/sdp/sdp_proc.c @@ -250,6 +250,8 @@ static int sdpstats_seq_show(struct seq_file *seq, void *v) sdpstats.sendmsg_bcopy_segment); seq_printf(seq, "bzcopy segments \t\t: %d\n", sdpstats.sendmsg_bzcopy_segment); + seq_printf(seq, "zcopy segments \t\t: %d\n", + sdpstats.sendmsg_zcopy_segment); seq_printf(seq, "post_send_credits \t\t: %d\n", sdpstats.post_send_credits); seq_printf(seq, "memcpy_count \t\t: %u\n", diff --git a/drivers/infiniband/ulp/sdp/sdp_rx.c b/drivers/infiniband/ulp/sdp/sdp_rx.c index 38417c4947222..80c62e753a951 100644 --- a/drivers/infiniband/ulp/sdp/sdp_rx.c +++ b/drivers/infiniband/ulp/sdp/sdp_rx.c @@ -81,7 +81,7 @@ void sdp_remove_large_sock(struct sdp_sock *ssk) } /* Like tcp_fin - called when SDP_MID_DISCONNECT is received */ -static void sdp_fin(struct sock *sk) +void sdp_handle_disconn(struct sock *sk) { sdp_dbg(sk, "%s\n", __func__); @@ -105,6 +105,7 @@ static void sdp_fin(struct sock *sk) sdp_exch_state(sk, TCPF_FIN_WAIT1, TCP_TIME_WAIT); if (sdp_sk(sk)->id) { + sdp_sk(sk)->qp_active = 0; rdma_disconnect(sdp_sk(sk)->id); } else { sdp_warn(sk, "%s: sdp_sk(sk)->id is NULL\n", __func__); @@ -200,7 +201,7 @@ static int sdp_post_recv(struct sdp_sock *ssk) /* TODO: proper error handling */ sge->addr = (u64)addr; sge->length = SDP_HEAD_SIZE; - sge->lkey = ssk->mr->lkey; + sge->lkey = ssk->sdp_dev->mr->lkey; frags = skb_shinfo(skb)->nr_frags; for (i = 0; i < frags; ++i) { ++sge; @@ -212,7 +213,7 @@ static int sdp_post_recv(struct sdp_sock *ssk) rx_req->mapping[i + 1] = addr; sge->addr = addr; sge->length = skb_shinfo(skb)->frags[i].size; - sge->lkey = ssk->mr->lkey; + sge->lkey = ssk->sdp_dev->mr->lkey; } rx_wr.next = NULL; @@ -244,6 +245,11 @@ static inline int sdp_post_recvs_needed(struct sdp_sock *ssk) int buffer_size = SDP_HEAD_SIZE + ssk->recv_frags * PAGE_SIZE; unsigned long max_bytes; + if (!ssk->qp_active) { +// sdp_warn(sk, "post rx while qp is not active\n"); + return 0; + } + if (top_mem_usage && (top_mem_usage * 0x100000) < atomic_read(&sdp_current_mem_usage) * PAGE_SIZE) { scale = 1; @@ -251,12 +257,12 @@ static inline int sdp_post_recvs_needed(struct sdp_sock *ssk) max_bytes = sk->sk_rcvbuf * scale; - if (unlikely(ring_posted(ssk->rx_ring) >= SDP_RX_SIZE)) + if (unlikely(rx_ring_posted(ssk) >= SDP_RX_SIZE)) return 0; - if (likely(ring_posted(ssk->rx_ring) > SDP_MIN_TX_CREDITS)) { + if (likely(rx_ring_posted(ssk) > SDP_MIN_TX_CREDITS)) { unsigned long bytes_in_process = - (ring_posted(ssk->rx_ring) - SDP_MIN_TX_CREDITS) * + (rx_ring_posted(ssk) - SDP_MIN_TX_CREDITS) * buffer_size; bytes_in_process += rcv_nxt(ssk) - ssk->copied_seq; @@ -292,15 +298,36 @@ static inline struct sk_buff *sdp_sock_queue_rcv_skb(struct sock *sk, { int skb_len; struct sdp_sock *ssk = sdp_sk(sk); + struct sdp_bsdh *h = (struct sdp_bsdh *)skb_transport_header(skb); /* not needed since sk_rmem_alloc is not currently used * TODO - remove this? skb_set_owner_r(skb, sk); */ - skb_len = skb->len; + SDP_SKB_CB(skb)->seq = rcv_nxt(ssk); + if (h->mid == SDP_MID_SRCAVAIL) { + struct sdp_srcah *srcah = (struct sdp_srcah *)(h+1); + struct rx_srcavail_state *sa; + + ssk->srcavail_cancel_mseq = 0; + + sa = RX_SRCAVAIL_STATE(skb) = kzalloc( + sizeof(struct rx_srcavail_state), GFP_ATOMIC); + + sa->mseq = ntohl(h->mseq); + sa->aborted = 0; + sa->used = 0; + sa->len = skb_len = ntohl(srcah->len); + sa->rkey = ntohl(srcah->rkey); + sa->vaddr = be64_to_cpu(srcah->vaddr); + + sdp_dbg_data(sk, "queueing SrcAvail. skb_len = %d vaddr = %lld\n", + skb_len, sa->vaddr); + } else { + skb_len = skb->len; - TCP_SKB_CB(skb)->seq = rcv_nxt(ssk); - atomic_add(skb_len, &ssk->rcv_nxt); + atomic_add(skb_len, &ssk->rcv_nxt); + } skb_queue_tail(&sk->sk_receive_queue, skb); @@ -379,9 +406,9 @@ static inline int credit_update_needed(struct sdp_sock *ssk) if (likely(c > SDP_MIN_TX_CREDITS)) c += c/2; - return unlikely(c < ring_posted(ssk->rx_ring)) && + return unlikely(c < rx_ring_posted(ssk)) && likely(tx_credits(ssk) > 1) && - likely(sdp_tx_ring_slots_left(&ssk->tx_ring)); + likely(sdp_tx_ring_slots_left(ssk)); } @@ -421,6 +448,7 @@ static int sdp_process_rx_ctl_skb(struct sdp_sock *ssk, struct sk_buff *skb) switch (h->mid) { case SDP_MID_DATA: + case SDP_MID_SRCAVAIL: WARN_ON(!(sk->sk_shutdown & RCV_SHUTDOWN)); sdp_warn(sk, "DATA after socket rcv was shutdown\n"); @@ -436,14 +464,26 @@ static int sdp_process_rx_ctl_skb(struct sdp_sock *ssk, struct sk_buff *skb) } __kfree_skb(skb); + break; + case SDP_MID_RDMARDCOMPL: + { + struct sdp_rrch *rrch = (struct sdp_rrch *)(h+1); + sdp_dbg_data(sk, "RdmaRdCompl message arrived\n"); + sdp_handle_rdma_read_compl(ssk, ntohl(h->mseq_ack), + ntohl(rrch->len)); + __kfree_skb(skb); + } break; + case SDP_MID_SENDSM: + sdp_handle_sendsm(ssk, ntohl(h->mseq_ack)); + __kfree_skb(skb); + break; + case SDP_MID_SRCAVAIL_CANCEL: + sdp_dbg_data(sk, "Handling SrcAvailCancel - sending SendSM\n"); + sdp_prf(sk, NULL, "Handling SrcAvailCancel"); + ssk->srcavail_cancel_mseq = ntohl(h->mseq); + sdp_post_sendsm(ssk); break; case SDP_MID_DISCONN: - sdp_dbg_data(sk, "Handling RX disconnect\n"); - sdp_prf(sk, NULL, "Handling RX disconnect"); - sdp_fin(sk); - sdp_prf(sk, NULL, "Queueing fin skb - release recvmsg"); - /* Enqueue fin skb to release sleeping recvmsg */ - sdp_sock_queue_rcv_skb(sk, skb); break; case SDP_MID_CHRCVBUF: sdp_dbg_data(sk, "Handling RX CHRCVBUF\n"); @@ -502,20 +542,27 @@ static int sdp_process_rx_skb(struct sdp_sock *ssk, struct sk_buff *skb) skb_pull(skb, sizeof(struct sdp_bsdh)); - if (h->mid != SDP_MID_DATA || + if ((h->mid != SDP_MID_DATA && h->mid != SDP_MID_SRCAVAIL && + h->mid != SDP_MID_DISCONN) || unlikely(sk->sk_shutdown & RCV_SHUTDOWN)) { sdp_prf(sk, NULL, "Control skb - queing to control queue"); + if (h->mid == SDP_MID_SRCAVAIL_CANCEL) { + sdp_warn(sk, "Got SrcAvailCancel. " + "seq: 0x%d seq_ack: 0x%d\n", + ntohl(h->mseq), ntohl(h->mseq_ack)); + ssk->srcavail_cancel_mseq = ntohl(h->mseq); + } skb_queue_tail(&ssk->rx_ctl_q, skb); + return 0; } - if (unlikely(skb->len <= 0)) { + if (unlikely(h->mid == SDP_MID_DATA && skb->len <= 0)) { __kfree_skb(skb); return 0; } - sdp_prf(sk, NULL, "queueing a %s skb", - (h->mid == SDP_MID_DATA ? "data" : "disconnect")); + sdp_prf(sk, NULL, "queueing %s skb", mid2str(h->mid)); skb = sdp_sock_queue_rcv_skb(sk, skb); /* if (unlikely(h->flags & SDP_OOB_PRES)) @@ -540,10 +587,12 @@ static struct sk_buff *sdp_process_rx_wc(struct sdp_sock *ssk, atomic_sub(skb_shinfo(skb)->nr_frags, &sdp_current_mem_usage); if (unlikely(wc->status)) { - if (wc->status != IB_WC_WR_FLUSH_ERR) { - sdp_warn(sk, "Recv completion with error. Status %d\n", - wc->status); + if (ssk->qp_active) { + sdp_warn(sk, "Recv completion with error. " + "Status %d, vendor: %d\n", + wc->status, wc->vendor_err); sdp_reset(sk); + ssk->qp_active = 0; } __kfree_skb(skb); return NULL; @@ -619,8 +668,12 @@ static int sdp_poll_rx_cq(struct sdp_sock *ssk) do { n = ib_poll_cq(cq, SDP_NUM_WC, ibwc); +// sdp_warn(&ssk->isk.sk, "polling: %d\n", n); for (i = 0; i < n; ++i) { struct ib_wc *wc = &ibwc[i]; +/* sdp_warn(&ssk->isk.sk, "wr_id=0x%lx len %d opcode: 0x%x status: 0x%x\n", + wc->wr_id, wc->byte_len, + wc->opcode, wc->status);*/ BUG_ON(!(wc->wr_id & SDP_OP_RECV)); skb = sdp_process_rx_wc(ssk, wc); @@ -686,7 +739,7 @@ void sdp_do_posts(struct sdp_sock *ssk) sdp_post_recvs(ssk); - if (ring_posted(ssk->tx_ring)) + if (tx_ring_posted(ssk)) sdp_xmit_poll(ssk, 1); sdp_post_sends(ssk, 0); @@ -714,7 +767,8 @@ static void sdp_rx_irq(struct ib_cq *cq, void *cq_context) int wc_processed = 0; int credits_before; - sdp_dbg_data(&ssk->isk.sk, "rx irq called\n"); +// sdp_dbg_data(&ssk->isk.sk, "rx irq called\n"); +// sdp_warn(&ssk->isk.sk, "rx irq called\n"); if (cq != ssk->rx_ring.cq) { sdp_warn(sk, "cq = %p, ssk->cq = %p\n", cq, ssk->rx_ring.cq); @@ -725,16 +779,13 @@ static void sdp_rx_irq(struct ib_cq *cq, void *cq_context) sdp_prf(sk, NULL, "rx irq"); - rx_ring_lock(ssk, flags); + if (!rx_ring_trylock(&ssk->rx_ring, &flags)) { + sdp_warn(&ssk->isk.sk, "ring destroyed. not polling it\n"); + return; + } credits_before = tx_credits(ssk); - if (!ssk->rx_ring.cq) { - sdp_warn(&ssk->isk.sk, "WARNING: rx irq after cq destroyed\n"); - - goto out; - } - wc_processed = sdp_poll_rx_cq(ssk); sdp_prf(&ssk->isk.sk, NULL, "processed %d", wc_processed); @@ -757,13 +808,12 @@ static void sdp_rx_irq(struct ib_cq *cq, void *cq_context) } sdp_arm_rx_cq(sk); -out: - rx_ring_unlock(ssk, flags); + rx_ring_unlock(&ssk->rx_ring, &flags); } static void sdp_rx_ring_purge(struct sdp_sock *ssk) { - while (ring_posted(ssk->rx_ring) > 0) { + while (rx_ring_posted(ssk) > 0) { struct sk_buff *skb; skb = sdp_recv_completion(ssk, ring_tail(ssk->rx_ring)); if (!skb) @@ -776,7 +826,8 @@ static void sdp_rx_ring_purge(struct sdp_sock *ssk) void sdp_rx_ring_init(struct sdp_sock *ssk) { ssk->rx_ring.buffer = NULL; - spin_lock_init(&ssk->rx_ring.lock); + ssk->rx_ring.destroyed = 0; + rwlock_init(&ssk->rx_ring.destroyed_lock); } static void sdp_rx_cq_event_handler(struct ib_event *event, void *data) @@ -828,6 +879,8 @@ err_cq: void sdp_rx_ring_destroy(struct sdp_sock *ssk) { + rx_ring_destroy_lock(&ssk->rx_ring); + if (ssk->rx_ring.buffer) { sdp_rx_ring_purge(ssk); diff --git a/drivers/infiniband/ulp/sdp/sdp_tx.c b/drivers/infiniband/ulp/sdp/sdp_tx.c index d53e838fd1704..7eb7d31fe3887 100644 --- a/drivers/infiniband/ulp/sdp/sdp_tx.c +++ b/drivers/infiniband/ulp/sdp/sdp_tx.c @@ -46,7 +46,7 @@ int sdp_xmit_poll(struct sdp_sock *ssk, int force) { int wc_processed = 0; - sdp_prf(&ssk->isk.sk, NULL, "called from %s:%d", func, line); + sdp_prf(&ssk->isk.sk, NULL, "%s", __func__); /* If we don't have a pending timer, set one up to catch our recent post in case the interface becomes idle */ @@ -84,18 +84,20 @@ void sdp_post_send(struct sdp_sock *ssk, struct sk_buff *skb, u8 mid) ssk->tx_bytes += skb->len; h->mid = mid; - if (unlikely(TCP_SKB_CB(skb)->flags & TCPCB_FLAG_URG)) + if (unlikely(SDP_SKB_CB(skb)->flags & TCPCB_FLAG_URG)) h->flags = SDP_OOB_PRES | SDP_OOB_PEND; else h->flags = 0; - h->bufs = htons(ring_posted(ssk->rx_ring)); + h->bufs = htons(rx_ring_posted(ssk)); h->len = htonl(skb->len); h->mseq = htonl(mseq); + if (TX_SRCAVAIL_STATE(skb)) + TX_SRCAVAIL_STATE(skb)->mseq = mseq; h->mseq_ack = htonl(mseq_ack(ssk)); sdp_prf1(&ssk->isk.sk, skb, "TX: %s bufs: %d mseq:%ld ack:%d", - mid2str(mid), ring_posted(ssk->rx_ring), mseq, + mid2str(mid), rx_ring_posted(ssk), mseq, ntohl(h->mseq_ack)); SDP_DUMP_PACKET(&ssk->isk.sk, "TX", skb, h); @@ -112,7 +114,7 @@ void sdp_post_send(struct sdp_sock *ssk, struct sk_buff *skb, u8 mid) sge->addr = addr; sge->length = skb->len - skb->data_len; - sge->lkey = ssk->mr->lkey; + sge->lkey = ssk->sdp_dev->mr->lkey; frags = skb_shinfo(skb)->nr_frags; for (i = 0; i < frags; ++i) { ++sge; @@ -124,7 +126,7 @@ void sdp_post_send(struct sdp_sock *ssk, struct sk_buff *skb, u8 mid) tx_req->mapping[i + 1] = addr; sge->addr = addr; sge->length = skb_shinfo(skb)->frags[i].size; - sge->lkey = ssk->mr->lkey; + sge->lkey = ssk->sdp_dev->mr->lkey; } tx_wr.next = NULL; @@ -133,7 +135,7 @@ void sdp_post_send(struct sdp_sock *ssk, struct sk_buff *skb, u8 mid) tx_wr.num_sge = frags + 1; tx_wr.opcode = IB_WR_SEND; tx_wr.send_flags = IB_SEND_SIGNALED; - if (unlikely(TCP_SKB_CB(skb)->flags & TCPCB_FLAG_URG)) + if (unlikely(SDP_SKB_CB(skb)->flags & TCPCB_FLAG_URG)) tx_wr.send_flags |= IB_SEND_SOLICITED; delta = jiffies - last_send; @@ -144,7 +146,7 @@ void sdp_post_send(struct sdp_sock *ssk, struct sk_buff *skb, u8 mid) rc = ib_post_send(ssk->qp, &tx_wr, &bad_wr); atomic_inc(&ssk->tx_ring.head); atomic_dec(&ssk->tx_ring.credits); - atomic_set(&ssk->remote_credits, ring_posted(ssk->rx_ring)); + atomic_set(&ssk->remote_credits, rx_ring_posted(ssk)); if (unlikely(rc)) { sdp_dbg(&ssk->isk.sk, "ib_post_send failed with status %d.\n", rc); @@ -158,7 +160,6 @@ static struct sk_buff *sdp_send_completion(struct sdp_sock *ssk, int mseq) struct ib_device *dev; struct sdp_buf *tx_req; struct sk_buff *skb = NULL; - struct bzcopy_state *bz; int i, frags; struct sdp_tx_ring *tx_ring = &ssk->tx_ring; if (unlikely(mseq != ring_tail(*tx_ring))) { @@ -179,12 +180,11 @@ static struct sk_buff *sdp_send_completion(struct sdp_sock *ssk, int mseq) DMA_TO_DEVICE); } - tx_ring->una_seq += TCP_SKB_CB(skb)->end_seq; + tx_ring->una_seq += SDP_SKB_CB(skb)->end_seq; /* TODO: AIO and real zcopy code; add their context support here */ - bz = BZCOPY_STATE(skb); - if (bz) - bz->busy--; + if (BZCOPY_STATE(skb)) + BZCOPY_STATE(skb)->busy--; atomic_inc(&tx_ring->tail); @@ -195,6 +195,7 @@ out: static int sdp_handle_send_comp(struct sdp_sock *ssk, struct ib_wc *wc) { struct sk_buff *skb = NULL; + struct sdp_bsdh *h; skb = sdp_send_completion(ssk, wc->wr_id); if (unlikely(!skb)) @@ -214,12 +215,10 @@ static int sdp_handle_send_comp(struct sdp_sock *ssk, struct ib_wc *wc) } } -#ifdef SDP_PROFILING -{ - struct sdp_bsdh *h = (struct sdp_bsdh *)skb->data; + h = (struct sdp_bsdh *)skb->data; + sdp_prf1(&ssk->isk.sk, skb, "tx completion. mseq:%d", ntohl(h->mseq)); -} -#endif + sk_wmem_free_skb(&ssk->isk.sk, skb); return 0; @@ -227,11 +226,37 @@ static int sdp_handle_send_comp(struct sdp_sock *ssk, struct ib_wc *wc) static inline void sdp_process_tx_wc(struct sdp_sock *ssk, struct ib_wc *wc) { + struct sock *sk = &ssk->isk.sk; + if (likely(wc->wr_id & SDP_OP_SEND)) { sdp_handle_send_comp(ssk, wc); return; } + if (wc->wr_id & SDP_OP_RDMA) { + sdp_dbg_data(sk, "TX comp: RDMA read. status: %d\n", wc->status); + sdp_prf1(sk, NULL, "TX comp: RDMA read"); + + if (!ssk->tx_ring.rdma_inflight) { + sdp_warn(sk, "ERROR: unexpected RDMA read\n"); + return; + } + + if (!ssk->tx_ring.rdma_inflight->busy) { + sdp_warn(sk, "ERROR: too many RDMA read completions\n"); + return; + } + + /* Only last RDMA read WR is signalled. Order is guaranteed - therefore + * if Last RDMA read WR is completed - all other have, too */ + ssk->tx_ring.rdma_inflight->busy = 0; + if (!ssk->tx_ring.rdma_inflight->busy) { + wake_up(ssk->isk.sk.sk_sleep); + sdp_dbg_data(&ssk->isk.sk, "woke up sleepers\n"); + } + return; + } + /* Keepalive probe sent cleanup */ sdp_cnt(sdp_keepalive_probes_sent); @@ -267,7 +292,7 @@ static int sdp_process_tx_cq(struct sdp_sock *ssk) } } while (n == SDP_NUM_WC); - sdp_dbg_data(&ssk->isk.sk, "processed %d wc's\n", wc_processed); +// sdp_dbg_data(&ssk->isk.sk, "processed %d wc's\n", wc_processed); if (wc_processed) { struct sock *sk = &ssk->isk.sk; @@ -286,17 +311,18 @@ static void sdp_poll_tx_timeout(unsigned long data) struct sock *sk = &ssk->isk.sk; u32 inflight, wc_processed; - sdp_dbg_data(&ssk->isk.sk, "Polling tx cq. inflight=%d\n", - (u32) ring_posted(ssk->tx_ring)); +// sdp_dbg_data(&ssk->isk.sk, "Polling tx cq. inflight=%d\n", +// (u32) tx_ring_posted(ssk)); - sdp_prf(&ssk->isk.sk, NULL, "%s. inflight=%d", __func__, - (u32) ring_posted(ssk->tx_ring)); + sdp_prf1(&ssk->isk.sk, NULL, "TX timeout: inflight=%d", + (u32) tx_ring_posted(ssk)); /* Only process if the socket is not in use */ bh_lock_sock(sk); if (sock_owned_by_user(sk)) { mod_timer(&ssk->tx_ring.timer, jiffies + SDP_TX_POLL_TIMEOUT); - sdp_dbg_data(&ssk->isk.sk, "socket is busy - trying later\n"); +// sdp_dbg_data(&ssk->isk.sk, "socket is busy - trying later\n"); + sdp_prf(&ssk->isk.sk, NULL, "TX comp: socket is busy\n"); SDPSTATS_COUNTER_INC(tx_poll_busy); goto out; } @@ -310,17 +336,22 @@ static void sdp_poll_tx_timeout(unsigned long data) else SDPSTATS_COUNTER_INC(tx_poll_hit); - inflight = (u32) ring_posted(ssk->tx_ring); + inflight = (u32) rx_ring_posted(ssk); /* If there are still packets in flight and the timer has not already * been scheduled by the Tx routine then schedule it here to guarantee * completion processing of these packets */ if (inflight) { /* TODO: make sure socket is not closed */ - sdp_dbg_data(sk, "arming timer for more polling\n"); +// sdp_dbg_data(sk, "arming timer for more polling\n"); mod_timer(&ssk->tx_ring.timer, jiffies + SDP_TX_POLL_TIMEOUT); } out: + if (ssk->tx_ring.rdma_inflight && ssk->tx_ring.rdma_inflight->busy) { + sdp_prf1(sk, NULL, "RDMA is inflight - arming irq"); + sdp_arm_tx_cq(sk); + } + bh_unlock_sock(sk); } @@ -329,14 +360,18 @@ static void sdp_tx_irq(struct ib_cq *cq, void *cq_context) struct sock *sk = cq_context; struct sdp_sock *ssk = sdp_sk(sk); - sdp_prf1(sk, NULL, "Got tx comp interrupt"); + sdp_prf1(sk, NULL, "tx irq"); + sdp_dbg_data(sk, "Got tx comp interrupt\n"); - mod_timer(&ssk->tx_ring.timer, jiffies); + SDPSTATS_COUNTER_INC(tx_int_count); + + mod_timer(&ssk->tx_ring.timer, jiffies + SDP_TX_POLL_TIMEOUT); + tasklet_schedule(&ssk->tx_ring.tasklet); } void sdp_tx_ring_purge(struct sdp_sock *ssk) { - while (ring_posted(ssk->tx_ring)) { + while (tx_ring_posted(ssk)) { struct sk_buff *skb; skb = sdp_send_completion(ssk, ring_tail(ssk->tx_ring)); if (!skb) @@ -373,6 +408,10 @@ void sdp_post_keepalive(struct sdp_sock *ssk) static void sdp_tx_cq_event_handler(struct ib_event *event, void *data) { + printk("xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx\n"); + printk("xx event called !!!!!!!!!! xxxxxx\n"); + printk("xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx\n"); + printk("xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx\n"); } int sdp_tx_ring_create(struct sdp_sock *ssk, struct ib_device *device) @@ -409,6 +448,9 @@ int sdp_tx_ring_create(struct sdp_sock *ssk, struct ib_device *device) ssk->tx_ring.timer.data = (unsigned long) ssk; ssk->tx_ring.poll_cnt = 0; + tasklet_init(&ssk->tx_ring.tasklet, sdp_poll_tx_timeout, + (unsigned long) ssk); + init_timer(&ssk->nagle_timer); ssk->nagle_timer.function = sdp_nagle_timeout; ssk->nagle_timer.data = (unsigned long) ssk; diff --git a/drivers/infiniband/ulp/sdp/sdp_zcopy.c b/drivers/infiniband/ulp/sdp/sdp_zcopy.c new file mode 100644 index 0000000000000..bac3f71d6a826 --- /dev/null +++ b/drivers/infiniband/ulp/sdp/sdp_zcopy.c @@ -0,0 +1,913 @@ +/* + * Copyright (c) 2006 Mellanox Technologies Ltd. All rights reserved. + * + * This software is available to you under a choice of one of two + * licenses. You may choose to be licensed under the terms of the GNU + * General Public License (GPL) Version 2, available from the file + * COPYING in the main directory of this source tree, or the + * OpenIB.org BSD license below: + * + * Redistribution and use in source and binary forms, with or + * without modification, are permitted provided that the following + * conditions are met: + * + * - Redistributions of source code must retain the above + * copyright notice, this list of conditions and the following + * disclaimer. + * + * - Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following + * disclaimer in the documentation and/or other materials + * provided with the distribution. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, + * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF + * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND + * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS + * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN + * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN + * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include /* for memcpy_toiovec */ +#include +#include +#include +#include "sdp.h" + +static struct bzcopy_state dummy_bz = { +busy: 1, +}; + +static int sdp_post_srcavail(struct sock *sk, struct tx_srcavail_state *tx_sa, + int off, size_t len) +{ + struct sdp_sock *ssk = sdp_sk(sk); + struct sdp_srcah *srcah; + struct sk_buff *skb; + + WARN_ON(ssk->tx_sa); + + BUG_ON(!tx_sa); + BUG_ON(!tx_sa->fmr || !tx_sa->fmr->fmr->lkey); + + skb = sdp_stream_alloc_skb(&ssk->isk.sk, + sizeof(struct sdp_bsdh) + + sizeof(struct sdp_srcah), + GFP_KERNEL); + if (!skb) { + return -ENOMEM; + } + sdp_prf1(sk, skb, "sending SrcAvail"); + + TX_SRCAVAIL_STATE(skb) = tx_sa; /* tx_sa is hanged on the skb + * but continue to live after skb is freed */ + ssk->tx_sa = tx_sa; + + srcah = (struct sdp_srcah *)skb_push(skb, sizeof(*srcah)); + srcah->len = htonl(len); + srcah->rkey = htonl(tx_sa->fmr->fmr->lkey); + srcah->vaddr = cpu_to_be64(off); + + skb_entail(sk, ssk, skb); + + /* TODO: pushing the skb into the tx_queue should be enough */ + + return 0; +} + +static int sdp_post_srcavail_cancel(struct sock *sk) +{ + struct sdp_sock *ssk = sdp_sk(sk); + + if (!ssk->tx_sa && !ssk->srcavail_cancel) + return 0; /* srcavail already serviced */ + + ssk->srcavail_cancel = 1; + + sdp_post_sends(ssk, 1); + + schedule_delayed_work(&ssk->srcavail_cancel_work, + SDP_SRCAVAIL_CANCEL_TIMEOUT); + + return 0; +} + +void srcavail_cancel_timeout(struct work_struct *work) +{ + struct sdp_sock *ssk = + container_of(work, struct sdp_sock, srcavail_cancel_work.work); + struct sock *sk = &ssk->isk.sk; + + lock_sock(sk); + + sdp_warn(sk, "both SrcAvail and SrcAvailCancel timedout." + " closing connection\n"); + sdp_set_error(sk, -ECONNRESET); + wake_up(&ssk->wq); + + release_sock(sk); +} + +static int sdp_wait_rdmardcompl(struct sdp_sock *ssk, long *timeo_p, int len, + int ignore_signals) +{ + struct sock *sk = &ssk->isk.sk; + int err = 0; + long vm_wait = 0; + long current_timeo = *timeo_p; + struct tx_srcavail_state *tx_sa = ssk->tx_sa; + DEFINE_WAIT(wait); + + sdp_dbg_data(sk, "Going to sleep till get RdmaRdCompl.\n"); + sdp_prf1(sk, NULL, "Going to sleep"); + while (ssk->qp_active) { + prepare_to_wait(sk->sk_sleep, &wait, TASK_INTERRUPTIBLE); + + if (unlikely(!*timeo_p)) { + err = -ETIME; + sdp_prf1(sk, NULL, "timeout"); + break; + } + + if (unlikely(!ignore_signals && signal_pending(current))) { + err = -EINTR; + sdp_prf1(sk, NULL, "signalled"); + break; + } + + if (tx_sa->bytes_completed >= len) + break; + + if (tx_sa->abort) { + sdp_prf1(sk, NULL, "Aborting SrcAvail sending"); + err = -EAGAIN; + break ; + } + + posts_handler_put(ssk); + + sk_wait_event(sk, ¤t_timeo, tx_sa->abort && + (tx_sa->bytes_completed >= len) && vm_wait); + sdp_dbg_data(&ssk->isk.sk, "woke up sleepers\n"); + + posts_handler_get(ssk); + + if (vm_wait) { + vm_wait -= current_timeo; + current_timeo = *timeo_p; + if (current_timeo != MAX_SCHEDULE_TIMEOUT && + (current_timeo -= vm_wait) < 0) + current_timeo = 0; + vm_wait = 0; + } + *timeo_p = current_timeo; + } + + if (!ssk->qp_active) { + sdp_warn(sk, "qp is not active\n"); + } + + finish_wait(sk->sk_sleep, &wait); + + sdp_dbg_data(sk, "Finished waiting - RdmaRdCompl: %d bytes, abort: %d\n", + tx_sa->bytes_completed, tx_sa->abort); + + if (!ssk->qp_active) { + sdp_warn(sk, "QP destroyed while waiting\n"); + return -EINVAL; + } + return err; +} + +static int sdp_wait_rdma_wr_finished(struct sdp_sock *ssk, long *timeo_p) +{ + struct sock *sk = &ssk->isk.sk; + int err = 0; + long current_timeo = *timeo_p; + DEFINE_WAIT(wait); + + sdp_dbg_data(sk, "Sleep till RDMA wr finished.\n"); + while (1) { + prepare_to_wait(sk->sk_sleep, &wait, TASK_INTERRUPTIBLE); + + if (unlikely(!*timeo_p)) { + err = -EAGAIN; + sdp_warn(sk, "timedout\n"); + break; + } + + if (unlikely(signal_pending(current))) { + err = sock_intr_errno(*timeo_p); + sdp_warn(sk, "signalled\n"); + break; + } + + if (!ssk->tx_ring.rdma_inflight->busy) { + sdp_dbg_data(sk, "got rdma cqe\n"); + break; + } + + posts_handler_put(ssk); + + sdp_prf1(sk, NULL, "Going to sleep"); + sk_wait_event(sk, ¤t_timeo, + !ssk->tx_ring.rdma_inflight->busy); + sdp_prf1(sk, NULL, "Woke up"); + sdp_dbg_data(&ssk->isk.sk, "woke up sleepers\n"); + + posts_handler_get(ssk); + + *timeo_p = current_timeo; + } + + finish_wait(sk->sk_sleep, &wait); + + sdp_dbg_data(sk, "Finished waiting - rdma's inflight=%d\n", + ssk->tx_ring.rdma_inflight->busy); + + return err; +} + +int sdp_post_rdma_rd_compl(struct sdp_sock *ssk, int copied) +{ + struct sdp_rrch *rrch; + struct sk_buff *skb; + gfp_t gfp_page; + + if (unlikely(ssk->isk.sk.sk_allocation)) + gfp_page = ssk->isk.sk.sk_allocation; + else + gfp_page = GFP_KERNEL; + + skb = sdp_stream_alloc_skb(&ssk->isk.sk, + sizeof(struct sdp_bsdh) + + sizeof(struct sdp_rrch), + gfp_page); + /* FIXME */ + BUG_ON(!skb); + + rrch = (struct sdp_rrch *)skb_put(skb, sizeof(*rrch)); + rrch->len = htonl(copied); + /* TODO: What if no tx_credits available? */ + sdp_post_send(ssk, skb, SDP_MID_RDMARDCOMPL); + + return 0; +} + +int sdp_post_sendsm(struct sdp_sock *ssk) +{ + struct sk_buff *skb; + gfp_t gfp_page; + + if (unlikely(ssk->isk.sk.sk_allocation)) + gfp_page = ssk->isk.sk.sk_allocation; + else + gfp_page = GFP_KERNEL; + + skb = sdp_stream_alloc_skb(&ssk->isk.sk, + sizeof(struct sdp_bsdh), + gfp_page); + /* FIXME */ + BUG_ON(!skb); + + sdp_post_send(ssk, skb, SDP_MID_SENDSM); + + return 0; +} + +static int sdp_update_iov_used(struct sock *sk, struct iovec *iov, int len) +{ + sdp_dbg_data(sk, "updating consumed %d bytes from iov\n", len); + while (len > 0) { + if (iov->iov_len) { + int copy = min_t(unsigned int, iov->iov_len, len); + len -= copy; + iov->iov_len -= copy; + iov->iov_base += copy; + } + iov++; + } + + return 0; +} + +static inline int sge_bytes(struct ib_sge *sge, int sge_cnt) +{ + int bytes = 0; + + while (sge_cnt > 0) { + bytes += sge->length; + sge++; + sge_cnt--; + } + + return bytes; +} +void sdp_handle_sendsm(struct sdp_sock *ssk, u32 mseq_ack) +{ + struct sock *sk = &ssk->isk.sk; + unsigned long flags; + + spin_lock_irqsave(&ssk->tx_sa_lock, flags); + + if (!ssk->tx_sa) { + sdp_prf1(sk, NULL, "SendSM for cancelled/finished SrcAvail"); + goto out; + } + + if (ssk->tx_sa->mseq < mseq_ack) { + sdp_prf1(sk, NULL, "SendSM arrived for old SrcAvail. " + "SendSM mseq_ack: 0x%x, SrcAvail mseq: 0x%x", + mseq_ack, ssk->tx_sa->mseq); + goto out; + } + + sdp_prf1(sk, NULL, "Got SendSM - aborting SrcAvail"); + + ssk->tx_sa->abort = 1; + cancel_delayed_work(&ssk->srcavail_cancel_work); + + wake_up(ssk->isk.sk.sk_sleep); + sdp_dbg_data(&ssk->isk.sk, "woke up sleepers\n"); + +out: + spin_unlock_irqrestore(&ssk->tx_sa_lock, flags); +} + +void sdp_handle_rdma_read_compl(struct sdp_sock *ssk, u32 mseq_ack, + u32 bytes_completed) +{ + struct sock *sk = &ssk->isk.sk; + unsigned long flags; + + sdp_prf1(sk, NULL, "RdmaRdCompl ssk=%p tx_sa=%p", ssk, ssk->tx_sa); + sdp_dbg_data(sk, "RdmaRdCompl ssk=%p tx_sa=%p\n", ssk, ssk->tx_sa); + + spin_lock_irqsave(&ssk->tx_sa_lock, flags); + + BUG_ON(!ssk); + + if (!ssk->tx_sa) { + sdp_warn(sk, "Got RdmaRdCompl for aborted SrcAvail\n"); + goto out; + } + + if (ssk->tx_sa->mseq < mseq_ack) { + sdp_warn(sk, "RdmaRdCompl arrived for old SrcAvail. " + "SendSM mseq_ack: 0x%x, SrcAvail mseq: 0x%x\n", + mseq_ack, ssk->tx_sa->mseq); + goto out; + } + + if (ssk->tx_sa->bytes_completed > bytes_completed) { + sdp_warn(sk, "tx_sa->bytes_total(%d), " + "tx_sa->bytes_completed(%d) > bytes_completed(%d)\n", + ssk->tx_sa->bytes_total, + ssk->tx_sa->bytes_completed, bytes_completed); + } + if (ssk->tx_sa->bytes_total < bytes_completed) { + sdp_warn(sk, "ssk->tx_sa->bytes_total(%d) < bytes_completed(%d)\n", + ssk->tx_sa->bytes_total, bytes_completed); + } + + ssk->tx_sa->bytes_completed = bytes_completed; + + wake_up(sk->sk_sleep); + sdp_dbg_data(sk, "woke up sleepers\n"); + +out: + spin_unlock_irqrestore(&ssk->tx_sa_lock, flags); + return; +} + +int sdp_get_pages(struct sock *sk, struct page **pages, int page_cnt, + unsigned long addr) +{ + int done_pages = 0; + + sdp_dbg_data(sk, "count: %d addr: 0x%lx\n", page_cnt, addr); + + addr &= PAGE_MASK; + if (segment_eq(get_fs(), KERNEL_DS)) { + for (done_pages = 0; done_pages < page_cnt; done_pages++) { + pages[done_pages] = virt_to_page(addr); + if (!pages[done_pages]) + break; + get_page(pages[done_pages]); + addr += PAGE_SIZE; + } + } else { + if (current->mm) { + down_write(¤t->mm->mmap_sem); + done_pages = get_user_pages(current, current->mm, addr, + page_cnt, 0, 0, pages, NULL); + up_write(¤t->mm->mmap_sem); + } + } + + if (unlikely(done_pages != page_cnt)) + goto err; + + return 0; + +err: + sdp_warn(sk, "Error getting pages. done_pages: %d page_cnt: %d\n", + done_pages, page_cnt); + for (; done_pages > 0; done_pages--) + put_page(pages[done_pages - 1]); + + return -1; +} + +void sdp_put_pages(struct sock *sk, struct page **pages, int page_cnt) +{ + int i; + sdp_dbg_data(sk, "count: %d\n", page_cnt); + + for (i = 0; i < page_cnt; i++) { + set_page_dirty_lock(pages[i]); + page_cache_release(pages[i]); + } +} + +static int sdp_map_dma(struct sock *sk, u64 *addrs, int page_cnt, + struct page **pages, int offset, int len) +{ + struct ib_device *dev = sdp_sk(sk)->ib_device; + int i; + sdp_dbg_data(sk, "offset: %d len: %d\n", offset, len); + + for (i = 0; i < page_cnt; i++) { + int o = 0; + int l = PAGE_SIZE; + + if (i == page_cnt - 1) { + /* Last page */ + l = (len + offset) & (PAGE_SIZE - 1); + if (l == 0) + l = PAGE_SIZE; + } + +// sdp_dbg_data(sk, "mapping %03d: page: %p o: %d l: %d\n", +// i, pages[i], o, l); + + addrs[i] = ib_dma_map_page(dev, + pages[i], + o, + l, + DMA_TO_DEVICE); + if (ib_dma_mapping_error(dev, addrs[i])) { + sdp_warn(sk, "Error mapping page %p off: %d len: %d\n", + pages[i], o, l); + goto err; + } + } + + return 0; +err: + for (; i > 0; i--) { + ib_dma_unmap_page(dev, addrs[i], PAGE_SIZE, DMA_TO_DEVICE); + } + return -1; +} + +static void sdp_unmap_dma(struct sock *sk, u64 *addrs, int page_cnt) +{ + int i; + struct ib_device *dev = sdp_sk(sk)->ib_device; + + sdp_dbg_data(sk, "count: %d\n", page_cnt); + + for (i = 0; i < page_cnt; i++) { + ib_dma_unmap_page(dev, addrs[i], 4096, DMA_TO_DEVICE); + } +} + +static int sdp_map_dma_sge(struct sock *sk, struct ib_sge *sge, int page_cnt, + struct page **pages, int offset, int len) +{ + struct ib_device *dev = sdp_sk(sk)->ib_device; + int i; + int left = len; + sdp_dbg_data(sk, "offset: %d len: %d\n", offset, len); + + for (i = 0; i < page_cnt; i++) { + int o = i == 0 ? offset : 0; + int l = MIN(left, PAGE_SIZE - o); + + sge[i].lkey = sdp_sk(sk)->sdp_dev->mr->lkey; + sge[i].length = l; + + sge[i].addr = ib_dma_map_page(dev, + pages[i], + 0, /* map page with offset (fbo) not working */ + l + o, /* compensate on 0 offset */ + DMA_FROM_DEVICE); + if (ib_dma_mapping_error(dev, sge[i].addr)) { + sdp_warn(sk, "Error map page 0x%llx off: %d len: %d\n", + sge[i].addr, o, l); + goto err; + } + sge[i].addr += o; + + sdp_dbg_data(sk, "mapping %03d: page: %p o: %d l: %d | addr: 0x%llx length: %d\n", + i, pages[i], o, l, sge[i].addr, sge[i].length); + left -= l; + } + + WARN_ON(left != 0); + + return 0; +err: + for (; i > 0; i--) { + ib_dma_unmap_page(dev, sge[i].addr, PAGE_SIZE, DMA_FROM_DEVICE); + } + return -1; +} + +static void sdp_unmap_dma_sge(struct sock *sk, struct ib_sge *sge, + int page_cnt, int offset, int len) +{ + int i; + struct ib_device *dev = sdp_sk(sk)->ib_device; + + sdp_dbg_data(sk, "count: %d\n", page_cnt); + + for (i = 0; i < page_cnt; i++) { + int l = PAGE_SIZE; + + if (i == page_cnt - 1) { + /* Last page */ + l = (len + offset) & (PAGE_SIZE - 1); + if (l == 0) + l = PAGE_SIZE; + } + ib_dma_unmap_page(dev, sge[i].addr, l, DMA_FROM_DEVICE); + } +} + +static struct ib_pool_fmr *sdp_map_fmr(struct sock *sk, int page_cnt, + u64 *addrs) +{ + struct ib_pool_fmr *fmr; + int ret = 0; + + fmr = ib_fmr_pool_map_phys(sdp_sk(sk)->sdp_dev->fmr_pool, addrs, + page_cnt, 0); + if (IS_ERR(fmr)) { + ret = PTR_ERR(fmr); + fmr = NULL; + sdp_warn(sk, "Error allocating fmr: %d\n", ret); + goto err; + } + + return fmr; +err: + return NULL; +} + +enum zcopy_type { + SDP_ZCOPY_TYPE_RX, + SDP_ZCOPY_TYPE_TX, +}; + +static struct tx_srcavail_state *sdp_alloc_tx_sa(struct sock *sk, int offset, + int len) +{ + struct tx_srcavail_state *tx_sa; + int page_cnt; + + page_cnt = PAGE_ALIGN(len + offset) >> PAGE_SHIFT; + + tx_sa = kzalloc(sizeof(struct tx_srcavail_state) + + sizeof(struct page *) * page_cnt + + sizeof(u64) * page_cnt, GFP_KERNEL); + if (!tx_sa) + return ERR_PTR(-ENOMEM); + + tx_sa->pages = (struct page **)(tx_sa+1); + tx_sa->page_cnt = page_cnt; + tx_sa->addrs = (u64 *)(&tx_sa->pages[page_cnt]); + + return tx_sa; +} + +int sdp_rdma_to_iovec(struct sock *sk, struct iovec *iov, struct sk_buff *skb, + int len) +{ + struct sdp_sock *ssk = sdp_sk(sk); + struct rx_srcavail_state *rx_sa = RX_SRCAVAIL_STATE(skb); + int rc = 0; + struct ib_send_wr *bad_wr; + struct ib_send_wr wr = { 0 }; + long timeo; + struct ib_sge *sge; + int sge_left; + int copied; + int offset; + + sdp_dbg_data(&ssk->isk.sk, "preparing RDMA read." + " len: %d. buffer len: %ld\n", len, iov->iov_len); + if (len > rx_sa->len) + len = rx_sa->len; + + offset = (unsigned long)iov->iov_base & (PAGE_SIZE - 1); + + rx_sa->page_cnt = PAGE_ALIGN(len + offset) >> PAGE_SHIFT; + sdp_dbg_data(sk, "page_cnt = %d len:%d offset: %d\n", rx_sa->page_cnt, len, offset); + + rx_sa->pages = + (struct page **) kzalloc(sizeof(struct page *) * rx_sa->page_cnt + + sizeof(struct ib_sge) * rx_sa->page_cnt, GFP_KERNEL); + if (!rx_sa->pages) { + sdp_warn(sk, "Error allocating zcopy context\n"); + goto err_alloc_zcopy; + } + + rx_sa->sge = (struct ib_sge *)(&rx_sa->pages[rx_sa->page_cnt]); + + rc = sdp_get_pages(sk, rx_sa->pages, rx_sa->page_cnt, + (unsigned long)iov->iov_base); + if (rc) + goto err_get_pages; + + rc = sdp_map_dma_sge(sk, rx_sa->sge, rx_sa->page_cnt, rx_sa->pages, + offset, len); + if (rc) + goto err_map_dma; + + wr.opcode = IB_WR_RDMA_READ; + wr.next = NULL; + wr.wr_id = SDP_OP_RDMA; + wr.wr.rdma.rkey = rx_sa->rkey; + wr.send_flags = 0; + + timeo = sock_sndtimeo(sk, 0); + + ssk->tx_ring.rdma_inflight = rx_sa; + copied = 0; + sge = rx_sa->sge; + sge_left = rx_sa->page_cnt; + do { + /* Len error when using sge_cnt > 30 ?? */ + int sge_cnt = min(sge_left, SDP_MAX_SEND_SGES - 2); + + wr.wr.rdma.remote_addr = rx_sa->vaddr + copied + rx_sa->used; + wr.num_sge = sge_cnt; + wr.sg_list = sge; + rx_sa->busy++; + + sdp_dbg_data(sk, "rdma read: sge_cnt: %d vaddr: 0x%llx " + "copied: 0x%x rkey: 0x%x in_bytes: %d\n", + sge_cnt, wr.wr.rdma.remote_addr, copied, rx_sa->rkey, + sge_bytes(sge, sge_cnt)); + sdp_prf1(sk, NULL, "TX: RDMA read"); + + if (sge_left == sge_cnt) { + wr.send_flags = IB_SEND_SIGNALED; + sdp_dbg_data(sk, "last wr is signaled\n"); + } + rc = ib_post_send(ssk->qp, &wr, &bad_wr); + if (unlikely(rc)) { + sdp_warn(sk, "ib_post_send failed with status %d.\n", + rc); + sdp_set_error(&ssk->isk.sk, -ECONNRESET); + wake_up(&ssk->wq); + break; + } + + copied += sge_bytes(sge, sge_cnt); + sge_left -= sge_cnt; + sge += sge_cnt; + + if (unlikely(ssk->srcavail_cancel_mseq > rx_sa->mseq)) { + sdp_warn(sk, "got SrcAvailCancel - Aborting RDMA\n"); + rc = -EAGAIN; + } + } while (!rc && sge_left > 0); + + if (!rc || rc == -EAGAIN) { + int got_srcavail_cancel = (rc == -EAGAIN); + + sdp_arm_tx_cq(sk); + + rc = sdp_wait_rdma_wr_finished(ssk, &timeo); + + /* Ignore any data copied after getting SrcAvailCancel */ + if (!got_srcavail_cancel && !rc) { + sdp_update_iov_used(sk, iov, copied); + rx_sa->used += copied; + atomic_add(copied, &ssk->rcv_nxt); + } + } + + if (rc) { + /* post rdma, wait_for_compl or post rdma_rd_comp failed - + * post sendsm */ + sdp_warn(sk, "post rdma, wait_for_compl " + "or post rdma_rd_comp failed - post sendsm\n"); + rx_sa->aborted = 1; + } + + ssk->tx_ring.rdma_inflight = NULL; + + sdp_unmap_dma_sge(sk, rx_sa->sge, rx_sa->page_cnt, offset, len); +err_map_dma: + sdp_put_pages(sk, rx_sa->pages, rx_sa->page_cnt); +err_get_pages: + kfree(rx_sa->pages); + rx_sa->pages = NULL; + rx_sa->sge = NULL; +err_alloc_zcopy: + + return rc; +} + +static inline int wait_for_sndbuf(struct sock *sk, long *timeo_p) +{ + struct sdp_sock *ssk = sdp_sk(sk); + int ret = 0; + sdp_dbg_data(sk, "Wait for mem\n"); + + set_bit(SOCK_NOSPACE, &sk->sk_socket->flags); + + SDPSTATS_COUNTER_INC(send_wait_for_mem); + + sdp_do_posts(ssk); + + sdp_xmit_poll(ssk, 1); + + ret = sdp_bzcopy_wait_memory(ssk, timeo_p, &dummy_bz); + + return ret; +} + +int sdp_sendmsg_zcopy(struct kiocb *iocb, struct sock *sk, struct msghdr *msg, + size_t size) +{ + struct sdp_sock *ssk = sdp_sk(sk); + int iovlen, flags; + struct iovec *iov = NULL; + int rc = 0; + long timeo; + struct tx_srcavail_state *tx_sa; + int offset; + int copied = 0; + + int bytes_left; + int pages_left; + u64 *addrs; + unsigned long lock_flags; + + sdp_dbg_data(sk, "%s\n", __func__); + + lock_sock(sk); + + SDPSTATS_COUNTER_INC(sendmsg_zcopy_segment); + + posts_handler_get(ssk); + + flags = msg->msg_flags; + timeo = sock_sndtimeo(sk, 0); + + /* Wait for a connection to finish. */ + if ((1 << sk->sk_state) & ~(TCPF_ESTABLISHED | TCPF_CLOSE_WAIT)) + if ((rc = sk_stream_wait_connect(sk, &timeo)) != 0) + goto err; + + /* This should be in poll */ + clear_bit(SOCK_ASYNC_NOSPACE, &sk->sk_socket->flags); + + /* Ok commence sending. */ + iovlen = msg->msg_iovlen; + iov = msg->msg_iov; + offset = (unsigned long)iov->iov_base & (PAGE_SIZE - 1); + sdp_dbg_data(sk, "Sending iov: %p, iovlen: %ld, size: %ld\n", + iov->iov_base, iov->iov_len, size); + + SDPSTATS_HIST(sendmsg_seglen, iov->iov_len); + + if (iovlen > 1) { + sdp_warn(sk, "iovlen > 1 not supported\n"); + rc = -ENOTSUPP; + goto err; + } + + if (sk->sk_err || (sk->sk_shutdown & SEND_SHUTDOWN)) { + rc = -EPIPE; + goto err; + } + + tx_sa = sdp_alloc_tx_sa(sk, offset, iov->iov_len); + if (IS_ERR(tx_sa)) { + sdp_warn(sk, "Error allocating zcopy context\n"); + goto err_alloc_tx_sa; + } + + rc = sdp_get_pages(sk, tx_sa->pages, tx_sa->page_cnt, + (unsigned long)iov->iov_base); + if (rc) + goto err_get_pages; + + rc = sdp_map_dma(sk, tx_sa->addrs, tx_sa->page_cnt, tx_sa->pages, + offset, iov->iov_len); + if (rc) + goto err_map_dma; + + bytes_left = iov->iov_len; + pages_left = tx_sa->page_cnt; + addrs = tx_sa->addrs; + do { + int page_cnt = min(256, pages_left); + int len = min_t(int, page_cnt * PAGE_SIZE - offset, bytes_left); + sdp_dbg_data(sk, "bytes_left: %d\n", bytes_left); + + if (!sdp_bzcopy_slots_avail(ssk, &dummy_bz)) { + rc = wait_for_sndbuf(sk, &timeo); + if (rc) { + sdp_warn(sk, "Couldn't get send buffer\n"); + break; + } + } + + tx_sa->fmr = sdp_map_fmr(sk, page_cnt, addrs); + if (!tx_sa->fmr) { + rc = -ENOMEM; + break; + } + + tx_sa->bytes_completed = 0; + tx_sa->bytes_total = len; + rc = sdp_post_srcavail(sk, tx_sa, offset, len); + if (rc) + goto err_abort_send; + + rc = sdp_wait_rdmardcompl(ssk, &timeo, len, 0); + if (unlikely(rc)) { + switch (rc) { + case -EAGAIN: /* Got SendSM */ + sdp_warn(sk, "got SendSM. use SEND verb.\n"); + goto err_abort_send; + + case -ETIME: /* Timedout */ + sdp_warn(sk, "TX srcavail timedout.\n"); + case -EINTR: /* interrupted */ + sdp_prf1(sk, NULL, "Aborting send."); + sdp_post_srcavail_cancel(sk); + + /* Wait for RdmaRdCompl/SendSM to + * finish the transaction */ + sdp_wait_rdmardcompl(ssk, &timeo, len, 1); + + goto err_abort_send; + + default: + sdp_warn(sk, "error sending srcavail. rc = %d\n", rc); + /* Socked destroyed while waited */ + goto err_abort_send; + } + } + sdp_prf1(sk, NULL, "got RdmaRdCompl"); + copied += len; + bytes_left -= len; + pages_left -= page_cnt; + addrs += page_cnt; + offset = 0; + + sdp_update_iov_used(sk, iov, tx_sa->bytes_completed); + +err_abort_send: + ib_fmr_pool_unmap(tx_sa->fmr); + spin_lock_irqsave(&ssk->tx_sa_lock, lock_flags); + ssk->tx_sa = NULL; + spin_unlock_irqrestore(&ssk->tx_sa_lock, lock_flags); + } while (!rc && !tx_sa->abort && pages_left > 0); + + sdp_unmap_dma(sk, tx_sa->addrs, tx_sa->page_cnt); +err_map_dma: + sdp_put_pages(sk, tx_sa->pages, tx_sa->page_cnt); +err_get_pages: + kfree(tx_sa); +err_alloc_tx_sa: +err: + + sdp_prf1(sk, NULL, "Finshed RDMA rc: %d copied: %d", rc, copied); + posts_handler_put(ssk); + release_sock(sk); + + return rc ?: copied; +} +