obj-$(CONFIG_INFINIBAND_SDP) += ib_sdp.o
-ib_sdp-objs := sdp_main.o sdp_cma.o sdp_bcopy.o sdp_proc.o sdp_tx.o sdp_rx.o
+ib_sdp-objs := sdp_main.o sdp_cma.o sdp_bcopy.o sdp_proc.o sdp_tx.o sdp_rx.o sdp_zcopy.o
#include <linux/sched.h>
#define SDPSTATS_ON
-/* #define SDP_PROFILING */
+#define SDP_PROFILING
#define _sdp_printk(func, line, level, sk, format, arg...) do { \
preempt_disable(); \
#define sdp_warn(sk, format, arg...) \
sdp_printk(KERN_WARNING, sk, format , ## arg)
-#define rx_ring_lock(ssk, f) do { \
- spin_lock_irqsave(&ssk->rx_ring.lock, f); \
-} while (0)
-
-#define rx_ring_unlock(ssk, f) do { \
- spin_unlock_irqrestore(&ssk->rx_ring.lock, f); \
-} while (0)
-
#define SDP_MODPARAM_SINT(var, def_val, msg) \
static int var = def_val; \
module_param_named(var, var, int, 0644); \
preempt_enable(); \
1; \
})
-#define sdp_prf(sk, s, format, arg...)
-/* #define sdp_prf(sk, s, format, arg...) sdp_prf1(sk, s, format, ## arg) */
+//#define sdp_prf(sk, s, format, arg...)
+#define sdp_prf(sk, s, format, arg...) sdp_prf1(sk, s, format, ## arg)
#else
#define sdp_prf1(sk, s, format, arg...)
#define sdp_dbg(sk, format, arg...) \
do { \
if (sdp_debug_level > 0) \
- sdp_printk(KERN_DEBUG, sk, format , ## arg); \
+ sdp_printk(KERN_WARNING, sk, format , ## arg); \
} while (0)
#define sock_ref(sk, msg, sock_op) ({ \
#define sdp_dbg_data(sk, format, arg...) \
do { \
if (sdp_data_debug_level & 0x2) \
- sdp_printk(KERN_DEBUG, sk, format , ## arg); \
+ sdp_printk(KERN_WARNING, sk, format , ## arg); \
} while (0)
#define SDP_DUMP_PACKET(sk, str, skb, h) \
do { \
#define SDP_DUMP_PACKET(sk, str, skb, h)
#endif
+#if 0
+#define lock_sock(sk) do { \
+ sdp_dbg_data(sk, "lock_sock: before lock\n"); \
+ lock_sock(sk); \
+ sdp_dbg_data(sk, "lock_sock: locked\n"); \
+} while (0)
+
+#define release_sock(sk) do { \
+ sdp_dbg_data(sk, "release_sock\n"); \
+ release_sock(sk); \
+} while (0)
+
+
+#undef sk_wait_event
+
+#define sk_wait_event(__sk, __timeo, __condition) \
+({ int rc; \
+ release_sock(__sk); \
+ rc = __condition; \
+ if (!rc) { \
+ *(__timeo) = schedule_timeout(*(__timeo)); \
+ } \
+ lock_sock(__sk); \
+ rc = __condition; \
+ rc; \
+})
+
+#endif
+
#ifdef SDPSTATS_ON
struct sdpstats {
u32 post_send[256];
u32 sendmsg_bcopy_segment;
u32 sendmsg_bzcopy_segment;
+ u32 sendmsg_zcopy_segment;
u32 sendmsg;
u32 post_send_credits;
u32 sendmsg_nagle_skip;
#define SDP_TX_POLL_TIMEOUT (HZ / 4)
#define SDP_NAGLE_TIMEOUT (HZ / 10)
+#define SDP_SRCAVAIL_CANCEL_TIMEOUT (HZ * 5)
+
#define SDP_RESOLVE_TIMEOUT 1000
#define SDP_ROUTE_TIMEOUT 1000
#define SDP_RETRY_COUNT 5
#define SDP_TX_SIZE 0x40
#define SDP_RX_SIZE 0x40
+#define SDP_FMR_SIZE 256
+#define SDP_FMR_POOL_SIZE 1024
+#define SDP_FMR_DIRTY_SIZE ( SDP_FMR_POOL_SIZE / 4 )
+
#define SDP_MAX_RECV_SKB_FRAGS (PAGE_SIZE > 0x8000 ? 1 : 0x8000 / PAGE_SIZE)
#define SDP_MAX_SEND_SKB_FRAGS (SDP_MAX_RECV_SKB_FRAGS + 1)
+#define SDP_MAX_SEND_SGES 32
#define SDP_HEAD_SIZE (PAGE_SIZE / 2 + sizeof(struct sdp_bsdh))
#define SDP_NUM_WC 4
#define SDP_MAX_PAYLOAD ((1 << 16) - SDP_HEAD_SIZE)
#define SDP_OP_RECV 0x800000000LL
#define SDP_OP_SEND 0x400000000LL
+#define SDP_OP_RDMA 0x200000000LL
+#define SDP_OP_NOP 0x100000000LL
/* how long (in jiffies) to block sender till tx completion*/
#define SDP_BZCOPY_POLL_TIMEOUT (HZ / 10)
#define SDP_AUTO_CONF 0xffff
#define AUTO_MOD_DELAY (HZ / 4)
-#define BZCOPY_STATE(skb) (*(struct bzcopy_state **)(skb->cb))
+struct bzcopy_state;
+struct rx_srcavail_state;
+
+struct sdp_skb_cb {
+ __u32 seq; /* Starting sequence number */
+ __u32 end_seq; /* SEQ + FIN + SYN + datalen */
+ __u8 flags; /* TCP header flags. */
+ struct bzcopy_state *bz;
+ struct rx_srcavail_state *rx_sa;
+ struct tx_srcavail_state *tx_sa;
+};
+
+#define BZCOPY_STATE(skb) (((struct sdp_skb_cb *)(skb->cb))->bz)
+#define RX_SRCAVAIL_STATE(skb) (((struct sdp_skb_cb *)(skb->cb))->rx_sa)
+#define TX_SRCAVAIL_STATE(skb) (((struct sdp_skb_cb *)(skb->cb))->tx_sa)
+
+#define SDP_SKB_CB(__skb) ((struct sdp_skb_cb *)&((__skb)->cb[0]))
+#undef TCP_SKB_CB
+
#ifndef MIN
#define MIN(a, b) (a < b ? a : b)
#endif
SDP_MID_HELLO = 0x0,
SDP_MID_HELLO_ACK = 0x1,
SDP_MID_DISCONN = 0x2,
+ SDP_MID_SENDSM = 0x4,
+ SDP_MID_RDMARDCOMPL = 0x6,
+ SDP_MID_SRCAVAIL_CANCEL = 0x8,
SDP_MID_CHRCVBUF = 0xB,
SDP_MID_CHRCVBUF_ACK = 0xC,
+ SDP_MID_SRCAVAIL = 0xFD,
+ SDP_MID_SINKAVAIL = 0xFE,
SDP_MID_DATA = 0xFF,
};
__u32 actrcvsz;
};
+struct sdp_rrch {
+ __u32 len;
+};
+
+struct sdp_srcah {
+ __u32 len;
+ __u32 rkey;
+ __u64 vaddr;
+};
+
struct sdp_buf {
struct sk_buff *skb;
u64 mapping[SDP_MAX_SEND_SKB_FRAGS + 1];
};
+/* Context used for synchronous zero copy bcopy (BZCOPY) */
+struct bzcopy_state {
+ unsigned char __user *u_base;
+ int u_len;
+ int left;
+ int page_cnt;
+ int cur_page;
+ int cur_offset;
+ int busy;
+ struct sdp_sock *ssk;
+ struct page **pages;
+};
+
+struct rx_srcavail_state {
+ /* Advertised buffer stuff */
+ u32 mseq;
+ u32 used;
+ u32 len;
+ u32 rkey;
+ u64 vaddr;
+
+ /* Dest buff info */
+ u32 page_cnt;
+ struct page **pages;
+ struct ib_sge *sge;
+
+ /* Utility */
+ u8 busy;
+ u8 aborted;
+};
+
+struct tx_srcavail_state {
+ u8 busy;
+
+ u32 page_cnt;
+ struct page **pages;
+ u64 *addrs;
+ struct ib_pool_fmr *fmr;
+ u32 bytes_completed;
+ u32 bytes_total;
+
+ u8 abort;
+ u32 mseq;
+};
+
#define ring_head(ring) (atomic_read(&(ring).head))
#define ring_tail(ring) (atomic_read(&(ring).tail))
#define ring_posted(ring) (ring_head(ring) - ring_tail(ring))
-struct sdp_tx_ring {
- struct sdp_buf *buffer;
- atomic_t head;
- atomic_t tail;
- struct ib_cq *cq;
+#define rx_ring_posted(ssk) ring_posted(ssk->rx_ring)
+#define tx_ring_posted(ssk) (ring_posted(ssk->tx_ring) + \
+ (ssk->tx_ring.rdma_inflight ? ssk->tx_ring.rdma_inflight->busy : 0))
- int una_seq;
- atomic_t credits;
+struct sdp_tx_ring {
+ struct rx_srcavail_state *rdma_inflight;
+ struct sdp_buf *buffer;
+ atomic_t head;
+ atomic_t tail;
+ struct ib_cq *cq;
+
+ int una_seq;
+ atomic_t credits;
#define tx_credits(ssk) (atomic_read(&ssk->tx_ring.credits))
- struct timer_list timer;
- u16 poll_cnt;
+ struct timer_list timer;
+ struct tasklet_struct tasklet;
+ u16 poll_cnt;
};
struct sdp_rx_ring {
atomic_t tail;
struct ib_cq *cq;
- spinlock_t lock;
+ int destroyed;
+ rwlock_t destroyed_lock;
};
-static inline int sdp_tx_ring_slots_left(struct sdp_tx_ring *tx_ring)
+static inline void rx_ring_unlock(struct sdp_rx_ring *rx_ring,
+ unsigned long *flags)
+{
+ read_unlock_irqrestore(&rx_ring->destroyed_lock, *flags);
+}
+
+static inline int rx_ring_trylock(struct sdp_rx_ring *rx_ring,
+ unsigned long *flags)
{
- return SDP_TX_SIZE - ring_posted(*tx_ring);
+ read_lock_irqsave(&rx_ring->destroyed_lock, *flags);
+ if (rx_ring->destroyed) {
+ rx_ring_unlock(rx_ring, flags);
+ return 0;
+ }
+ return 1;
+}
+
+static inline void rx_ring_destroy_lock(struct sdp_rx_ring *rx_ring)
+{
+ unsigned long flags;
+
+ write_lock_irqsave(&rx_ring->destroyed_lock, flags);
+ rx_ring->destroyed = 1;
+ write_unlock_irqrestore(&rx_ring->destroyed_lock, flags);
}
struct sdp_chrecvbuf {
int moder_time;
};
+struct sdp_device {
+ struct ib_pd *pd;
+ struct ib_mr *mr;
+ struct ib_fmr_pool *fmr_pool;
+};
+
+extern struct ib_client sdp_client;
+
struct sdp_sock {
/* sk has to be the first member of inet_sock */
struct inet_sock isk;
struct list_head backlog_queue;
struct sk_buff_head rx_ctl_q;
struct sock *parent;
+ struct sdp_device *sdp_dev;
+
+ int qp_active;
+ struct tx_srcavail_state *tx_sa;
+ spinlock_t tx_sa_lock;
+ int max_send_sge;
+ int srcavail_cancel;
+ struct delayed_work srcavail_cancel_work;
+ int srcavail_cancel_mseq;
struct work_struct rx_comp_work;
wait_queue_head_t wq;
/* rdma specific */
struct ib_qp *qp;
- struct ib_mr *mr;
/* SDP slow start */
int rcvbuf_scale; /* local recv buf scale for each socket */
struct sdp_moderation auto_mod;
/* BZCOPY data */
- int zcopy_thresh;
-};
-
-/* Context used for synchronous zero copy bcopy (BZCOY) */
-struct bzcopy_state {
- unsigned char __user *u_base;
- int u_len;
- int left;
- int page_cnt;
- int cur_page;
- int cur_offset;
- int busy;
- struct sdp_sock *ssk;
- struct page **pages;
+ int bzcopy_thresh;
};
extern int rcvbuf_initial_size;
void sdp_destroy_work(struct work_struct *work);
void sdp_reset_sk(struct sock *sk, int rc);
void sdp_reset(struct sock *sk);
+int sdp_bzcopy_wait_memory(struct sdp_sock *ssk, long *timeo_p,
+ struct bzcopy_state *bz);
+void skb_entail(struct sock *sk, struct sdp_sock *ssk, struct sk_buff *skb);
/* sdp_proc.c */
int __init sdp_proc_init(void);
/* sdp_cma.c */
int sdp_cma_handler(struct rdma_cm_id *, struct rdma_cm_event *);
+/* sdp_bcopy.c */
+int sdp_post_credits(struct sdp_sock *ssk);
+
/* sdp_tx.c */
int sdp_tx_ring_create(struct sdp_sock *ssk, struct ib_device *device);
void sdp_tx_ring_destroy(struct sdp_sock *ssk);
void sdp_do_posts(struct sdp_sock *ssk);
void sdp_rx_comp_full(struct sdp_sock *ssk);
void sdp_remove_large_sock(struct sdp_sock *ssk);
+void sdp_handle_disconn(struct sock *sk);
+
+/* sdp_zcopy.c */
+int sdp_sendmsg_zcopy(struct kiocb *iocb, struct sock *sk, struct msghdr *msg,
+ size_t size);
+int sdp_handle_srcavail(struct sdp_sock *ssk, struct sdp_srcah *srcah);
+void sdp_handle_sendsm(struct sdp_sock *ssk, u32 mseq_ack);
+void sdp_handle_rdma_read_compl(struct sdp_sock *ssk, u32 mseq_ack,
+ u32 bytes_completed);
+int sdp_handle_rdma_read_cqe(struct sdp_sock *ssk);
+int sdp_rdma_to_iovec(struct sock *sk, struct iovec *iov, struct sk_buff *skb,
+ int len);
+int sdp_get_pages(struct sock *sk, struct page **pages, int page_cnt,
+ unsigned long addr);
+int sdp_post_rdma_rd_compl(struct sdp_sock *ssk, int copied);
+int sdp_post_sendsm(struct sdp_sock *ssk);
+void srcavail_cancel_timeout(struct work_struct *work);
+
+static inline int sdp_tx_ring_slots_left(struct sdp_sock *ssk)
+{
+ return SDP_TX_SIZE - tx_ring_posted(ssk);
+}
static inline void sdp_arm_rx_cq(struct sock *sk)
{
sdp_prf(sk, NULL, "Arming RX cq");
- sdp_dbg_data(sk, "Arming RX cq\n");
+// sdp_dbg_data(sk, "Arming RX cq\n");
ib_req_notify_cq(sdp_sk(sk)->rx_ring.cq, IB_CQ_NEXT_COMP);
}
{
sdp_prf(sk, NULL, "Arming TX cq");
sdp_dbg_data(sk, "Arming TX cq. credits: %d, posted: %d\n",
- tx_credits(sdp_sk(sk)), ring_posted(sdp_sk(sk)->tx_ring));
+ tx_credits(sdp_sk(sk)), tx_ring_posted(sdp_sk(sk)));
ib_req_notify_cq(sdp_sk(sk)->tx_ring.cq, IB_CQ_NEXT_COMP);
}
+/* return the min of:
+ * - tx credits
+ * - free slots in tx_ring (not including SDP_MIN_TX_CREDITS
+ */
+static inline int tx_slots_free(struct sdp_sock *ssk)
+{
+ int min_free;
+
+ min_free = MIN(tx_credits(ssk),
+ SDP_TX_SIZE - tx_ring_posted(ssk));
+ if (min_free < SDP_MIN_TX_CREDITS)
+ return 0;
+
+ return min_free - SDP_MIN_TX_CREDITS;
+};
+
+/* like sk_stream_memory_free - except measures remote credits */
+static inline int sdp_bzcopy_slots_avail(struct sdp_sock *ssk,
+ struct bzcopy_state *bz)
+{
+ return tx_slots_free(ssk) > bz->busy;
+}
+
/* utilities */
static inline char *mid2str(int mid)
{
ENUM2STR(SDP_MID_HELLO),
ENUM2STR(SDP_MID_HELLO_ACK),
ENUM2STR(SDP_MID_DISCONN),
+ ENUM2STR(SDP_MID_SENDSM),
+ ENUM2STR(SDP_MID_RDMARDCOMPL),
+ ENUM2STR(SDP_MID_SRCAVAIL_CANCEL),
ENUM2STR(SDP_MID_CHRCVBUF),
ENUM2STR(SDP_MID_CHRCVBUF_ACK),
ENUM2STR(SDP_MID_DATA),
+ ENUM2STR(SDP_MID_SRCAVAIL),
+ ENUM2STR(SDP_MID_SINKAVAIL),
};
if (mid >= ARRAY_SIZE(mid2str))
struct sdp_hh *hh;
struct sdp_hah *hah;
struct sdp_chrecvbuf *req_size;
+ struct sdp_rrch *rrch;
+ struct sdp_srcah *srcah;
int len = 0;
char buf[256];
len += snprintf(buf, 255-len, "%s skb: %p mid: %2x:%-20s flags: 0x%x "
case SDP_MID_DATA:
len += snprintf(buf + len, 255-len, "data_len: %ld |",
ntohl(h->len) - sizeof(struct sdp_bsdh));
+ break;
+ case SDP_MID_RDMARDCOMPL:
+ rrch = (struct sdp_rrch *)(h+1);
+
+ len += snprintf(buf + len, 255-len, " | len: %d |",
+ ntohl(rrch->len));
+ break;
+ case SDP_MID_SRCAVAIL:
+ srcah = (struct sdp_srcah *)(h+1);
+
+ len += snprintf(buf + len, 255-len, " | data_len: %ld |",
+ ntohl(h->len) - sizeof(struct sdp_bsdh) -
+ sizeof(struct sdp_srcah));
+
+ len += snprintf(buf + len, 255-len,
+ " | len: %d, rkey: 0x%x, vaddr: 0x%llx |",
+ ntohl(srcah->len), ntohl(srcah->rkey),
+ be64_to_cpu(srcah->vaddr));
+ break;
default:
break;
}
{
int send_now =
BZCOPY_STATE(skb) ||
- (ssk->nonagle & TCP_NAGLE_OFF) ||
+ TX_SRCAVAIL_STATE(skb) ||
+ (ssk->nonagle & TCP_NAGLE_OFF) ||
!ssk->nagle_last_unacked ||
skb->next != (struct sk_buff *)&ssk->isk.sk.sk_write_queue ||
skb->len + sizeof(struct sdp_bsdh) >= ssk->xmit_size_goal ||
- (TCP_SKB_CB(skb)->flags & TCPCB_FLAG_PSH);
+ (SDP_SKB_CB(skb)->flags & TCPCB_FLAG_PSH);
if (send_now) {
unsigned long mseq = ring_head(ssk->tx_ring);
mod_timer(&ssk->nagle_timer, jiffies + SDP_NAGLE_TIMEOUT);
}
+int sdp_post_credits(struct sdp_sock *ssk)
+{
+ int post_count = 0;
+
+ sdp_dbg_data(&ssk->isk.sk, "credits: %d remote credits: %d "
+ "tx ring slots left: %d send_head: %p\n",
+ tx_credits(ssk), remote_credits(ssk),
+ sdp_tx_ring_slots_left(ssk),
+ ssk->isk.sk.sk_send_head);
+
+ if (likely(tx_credits(ssk) > 1) &&
+ likely(sdp_tx_ring_slots_left(ssk))) {
+ struct sk_buff *skb;
+ skb = sdp_stream_alloc_skb(&ssk->isk.sk,
+ sizeof(struct sdp_bsdh),
+ GFP_KERNEL);
+ if (!skb)
+ return -ENOMEM;
+ sdp_post_send(ssk, skb, SDP_MID_DATA);
+ post_count++;
+ }
+
+ if (post_count)
+ sdp_xmit_poll(ssk, 0);
+ return post_count;
+}
+
void sdp_post_sends(struct sdp_sock *ssk, int nonagle)
{
/* TODO: nonagle? */
else
gfp_page = GFP_KERNEL;
- if (sdp_tx_ring_slots_left(&ssk->tx_ring) < SDP_TX_SIZE / 2) {
+ if (sdp_tx_ring_slots_left(ssk) < SDP_TX_SIZE / 2) {
int wc_processed = sdp_xmit_poll(ssk, 1);
sdp_dbg_data(&ssk->isk.sk, "freed %d\n", wc_processed);
}
+ if (ssk->tx_sa && ssk->srcavail_cancel &&
+ tx_credits(ssk) >= SDP_MIN_TX_CREDITS &&
+ sdp_tx_ring_slots_left(ssk)) {
+ sdp_prf1(&ssk->isk.sk, NULL, "Going to send srcavail cancel");
+ skb = sdp_stream_alloc_skb(&ssk->isk.sk,
+ sizeof(struct sdp_bsdh),
+ gfp_page);
+ /* FIXME */
+ BUG_ON(!skb);
+ TX_SRCAVAIL_STATE(skb) = ssk->tx_sa;
+ sdp_post_send(ssk, skb, SDP_MID_SRCAVAIL_CANCEL);
+ post_count++;
+ ssk->srcavail_cancel = 0;
+ }
+
if (ssk->recv_request &&
ring_tail(ssk->rx_ring) >= ssk->recv_request_head &&
tx_credits(ssk) >= SDP_MIN_TX_CREDITS &&
- sdp_tx_ring_slots_left(&ssk->tx_ring)) {
+ sdp_tx_ring_slots_left(ssk)) {
struct sdp_chrecvbuf *resp_size;
ssk->recv_request = 0;
skb = sdp_stream_alloc_skb(&ssk->isk.sk,
}
if (tx_credits(ssk) <= SDP_MIN_TX_CREDITS &&
- sdp_tx_ring_slots_left(&ssk->tx_ring) &&
+ sdp_tx_ring_slots_left(ssk) &&
ssk->isk.sk.sk_send_head &&
sdp_nagle_off(ssk, ssk->isk.sk.sk_send_head)) {
SDPSTATS_COUNTER_INC(send_miss_no_credits);
}
while (tx_credits(ssk) > SDP_MIN_TX_CREDITS &&
- sdp_tx_ring_slots_left(&ssk->tx_ring) &&
+ sdp_tx_ring_slots_left(ssk) &&
(skb = ssk->isk.sk.sk_send_head) &&
sdp_nagle_off(ssk, skb)) {
+ struct tx_srcavail_state *tx_sa;
update_send_head(&ssk->isk.sk, skb);
__skb_dequeue(&ssk->isk.sk.sk_write_queue);
- sdp_post_send(ssk, skb, SDP_MID_DATA);
+
+ tx_sa = TX_SRCAVAIL_STATE(skb);
+ if (unlikely(tx_sa)) {
+ if (likely(!tx_sa->abort))
+ sdp_post_send(ssk, skb, SDP_MID_SRCAVAIL);
+ else
+ sdp_warn(&ssk->isk.sk, "Not sending aborted SrcAvail\n");
+ } else {
+ sdp_post_send(ssk, skb, SDP_MID_DATA);
+ }
post_count++;
}
if (likely(c > SDP_MIN_TX_CREDITS))
c *= 2;
- if (unlikely(c < ring_posted(ssk->rx_ring)) &&
+ if (unlikely(c < rx_ring_posted(ssk)) &&
likely(tx_credits(ssk) > 1) &&
- likely(sdp_tx_ring_slots_left(&ssk->tx_ring)) &&
+ likely(sdp_tx_ring_slots_left(ssk)) &&
likely((1 << ssk->isk.sk.sk_state) &
(TCPF_ESTABLISHED | TCPF_FIN_WAIT1))) {
skb = sdp_stream_alloc_skb(&ssk->isk.sk,
struct ib_qp_init_attr qp_init_attr = {
.event_handler = sdp_qp_event_handler,
.cap.max_send_wr = SDP_TX_SIZE,
- .cap.max_send_sge = SDP_MAX_SEND_SKB_FRAGS,
+ .cap.max_send_sge = SDP_MAX_SEND_SGES /*SDP_MAX_SEND_SKB_FRAGS*/,
.cap.max_recv_wr = SDP_RX_SIZE,
.cap.max_recv_sge = SDP_MAX_RECV_SKB_FRAGS + 1,
.sq_sig_type = IB_SIGNAL_REQ_WR,
.qp_type = IB_QPT_RC,
};
struct ib_device *device = id->device;
- struct ib_mr *mr;
- struct ib_pd *pd;
int rc;
sdp_dbg(sk, "%s\n", __func__);
- pd = ib_alloc_pd(device);
- if (IS_ERR(pd)) {
- rc = PTR_ERR(pd);
- sdp_warn(sk, "Unable to allocate PD: %d.\n", rc);
- goto err_pd;
- }
-
- mr = ib_get_dma_mr(pd, IB_ACCESS_LOCAL_WRITE);
- if (IS_ERR(mr)) {
- rc = PTR_ERR(mr);
- sdp_warn(sk, "Unable to get dma MR: %d.\n", rc);
- goto err_mr;
- }
-
- sdp_sk(sk)->mr = mr;
+ sdp_sk(sk)->sdp_dev = ib_get_client_data(device, &sdp_client);
rc = sdp_rx_ring_create(sdp_sk(sk), device);
if (rc)
qp_init_attr.recv_cq = sdp_sk(sk)->rx_ring.cq;
qp_init_attr.send_cq = sdp_sk(sk)->tx_ring.cq;
- rc = rdma_create_qp(id, pd, &qp_init_attr);
+ rc = rdma_create_qp(id, sdp_sk(sk)->sdp_dev->pd, &qp_init_attr);
if (rc) {
sdp_warn(sk, "Unable to create QP: %d.\n", rc);
goto err_qp;
}
sdp_sk(sk)->qp = id->qp;
sdp_sk(sk)->ib_device = device;
+ sdp_sk(sk)->qp_active = 1;
+
+{
+ struct ib_qp_attr qp_attr;
+ struct ib_qp_init_attr qp_init_attr;
+
+ rc = ib_query_qp(sdp_sk(sk)->qp,
+ &qp_attr,
+ 0,
+ &qp_init_attr);
+
+ sdp_sk(sk)->max_send_sge = qp_attr.cap.max_send_sge;
+ sdp_dbg(sk, "max_send_sge = %d\n", sdp_sk(sk)->max_send_sge);
+}
init_waitqueue_head(&sdp_sk(sk)->wq);
err_tx:
sdp_rx_ring_destroy(sdp_sk(sk));
err_rx:
- ib_dereg_mr(sdp_sk(sk)->mr);
-err_mr:
- ib_dealloc_pd(pd);
-err_pd:
return rc;
}
if (rc)
break;
atomic_set(&sdp_sk(sk)->remote_credits,
- ring_posted(sdp_sk(sk)->rx_ring));
+ rx_ring_posted(sdp_sk(sk)));
memset(&hh, 0, sizeof hh);
hh.bsdh.mid = SDP_MID_HELLO;
+ hh.bsdh.bufs = htons(remote_credits(sdp_sk(sk)));
hh.bsdh.len = htonl(sizeof(struct sdp_bsdh) + SDP_HH_SIZE);
hh.max_adverts = 1;
hh.majv_minv = SDP_MAJV_MINV;
sdp_init_buffers(sdp_sk(sk), rcvbuf_initial_size);
- hh.bsdh.bufs = htons(ring_posted(sdp_sk(sk)->rx_ring));
hh.localrcvsz = hh.desremrcvsz = htonl(sdp_sk(sk)->recv_frags *
PAGE_SIZE + sizeof(struct sdp_bsdh));
hh.max_adverts = 0x1;
conn_param.retry_count = SDP_RETRY_COUNT;
SDP_DUMP_PACKET(NULL, "TX", NULL, &hh.bsdh);
rc = rdma_connect(id, &conn_param);
+// sdp_sk(sk)->qp_active = 1;
break;
case RDMA_CM_EVENT_ROUTE_ERROR:
sdp_dbg(sk, "RDMA_CM_EVENT_ROUTE_ERROR : %p\n", id);
sdp_dbg(sk, "RDMA_CM_EVENT_CONNECT_REQUEST\n");
rc = sdp_connect_handler(sk, id, event);
if (rc) {
+ sdp_warn(sk, "Destroy qp !!!!\n");
rdma_reject(id, NULL, 0);
break;
}
child = id->context;
atomic_set(&sdp_sk(child)->remote_credits,
- ring_posted(sdp_sk(child)->rx_ring));
+ rx_ring_posted(sdp_sk(child)));
memset(&hah, 0, sizeof hah);
hah.bsdh.mid = SDP_MID_HELLO_ACK;
- hah.bsdh.bufs = htons(ring_posted(sdp_sk(child)->rx_ring));
+ hah.bsdh.bufs = htons(remote_credits(sdp_sk(child)));
hah.bsdh.len = htonl(sizeof(struct sdp_bsdh) + SDP_HAH_SIZE);
hah.majv_minv = SDP_MAJV_MINV;
hah.ext_max_adverts = 1; /* Doesn't seem to be mandated by spec,
id->qp = NULL;
id->context = NULL;
parent = sdp_sk(child)->parent; /* TODO: hold ? */
+ } else {
+// sdp_sk(child)->qp_active = 1;
}
break;
case RDMA_CM_EVENT_CONNECT_RESPONSE:
sdp_dbg(sk, "RDMA_CM_EVENT_CONNECT_RESPONSE\n");
rc = sdp_response_handler(sk, id, event);
- if (rc)
+ if (rc) {
+ sdp_warn(sk, "Destroy qp !!!!\n");
rdma_reject(id, NULL, 0);
+ }
else
rc = rdma_accept(id, NULL);
+
+ if (!rc) {
+// sdp_sk(sk)->qp_active = 1;
+ rc = sdp_post_credits(sdp_sk(sk)) < 0 ?: 0;
+ }
break;
case RDMA_CM_EVENT_CONNECT_ERROR:
sdp_dbg(sk, "RDMA_CM_EVENT_CONNECT_ERROR\n");
__func__);
}
+ sdp_sk(sk)->qp_active = 0;
rdma_disconnect(id);
if (sk->sk_state != TCP_TIME_WAIT) {
#include <net/protocol.h>
#include <net/inet_common.h>
#include <rdma/rdma_cm.h>
+#include <rdma/ib_fmr_pool.h>
#include <rdma/ib_verbs.h>
/* TODO: remove when sdp_socket.h becomes part of include/linux/socket.h */
#include "sdp_socket.h"
#ifdef CONFIG_INFINIBAND_SDP_DEBUG
SDP_MODPARAM_INT(sdp_debug_level, 0, "Enable debug tracing if > 0.");
#endif
-#ifdef CONFIG_INFINIBAND_SDP_DEBUG
+#ifdef CONFIG_INFINIBAND_SDP_DEBUG_DATA
SDP_MODPARAM_INT(sdp_data_debug_level, 0,
"Enable data path debug tracing if > 0.");
#endif
SDP_MODPARAM_SINT(recv_poll, 1000, "How many times to poll recv.");
SDP_MODPARAM_SINT(sdp_keepalive_time, SDP_KEEPALIVE_TIME,
"Default idle time in seconds before keepalive probe sent.");
-SDP_MODPARAM_SINT(sdp_zcopy_thresh, 65536, "Zero copy send threshold; 0=0ff.");
-
+SDP_MODPARAM_SINT(sdp_bzcopy_thresh, 65536,
+ "Zero copy send using SEND threshold; 0=0ff.");
+SDP_MODPARAM_SINT(sdp_zcopy_thresh, 128*1024,
+ "Zero copy using RDMA threshold; 0=0ff.");
#define SDP_RX_COAL_TIME_HIGH 128
SDP_MODPARAM_SINT(sdp_rx_coal_target, 0x50000,
"Target number of bytes to coalesce with interrupt moderation.");
rc = rdma_bind_addr(ssk->id, (struct sockaddr *)&addr);
if (rc) {
+ sdp_warn(sk, "Destroy qp !!!!\n");
rdma_destroy_id(ssk->id);
ssk->id = NULL;
return rc;
static void sdp_destroy_qp(struct sdp_sock *ssk)
{
- struct ib_pd *pd = NULL;
-
-
sdp_dbg(&ssk->isk.sk, "destroying qp\n");
sdp_prf(&ssk->isk.sk, NULL, "destroying qp");
- del_timer(&ssk->tx_ring.timer);
+ ssk->qp_active = 0;
+ del_timer(&ssk->tx_ring.timer);
sdp_rx_ring_destroy(ssk);
sdp_tx_ring_destroy(ssk);
if (ssk->qp) {
- pd = ssk->qp->pd;
ib_destroy_qp(ssk->qp);
ssk->qp = NULL;
}
- if (ssk->mr) {
- ib_dereg_mr(ssk->mr);
- ssk->mr = NULL;
- }
-
- if (pd)
- ib_dealloc_pd(pd);
-
sdp_remove_large_sock(ssk);
}
sdp_dbg(sk, "%s\n", __func__);
if (ssk->destructed_already) {
- sdp_warn(sk, "redestructing sk!");
+ sdp_warn(sk, "redestructing sk!\n");
return;
}
+ cancel_delayed_work(&ssk->srcavail_cancel_work);
+
ssk->destructed_already = 1;
sdp_remove_sock(ssk);
sdp_dbg(sk, "%s\n", __func__);
if (old_state != TCP_LISTEN) {
- if (ssk->id)
+ if (ssk->id) {
+ sdp_sk(sk)->qp_active = 0;
rc = rdma_disconnect(ssk->id);
+ }
return rc;
}
sdp_cancel_dreq_wait_timeout(ssk);
+ cancel_delayed_work(&ssk->srcavail_cancel_work);
+
if (sk->sk_state == TCP_TIME_WAIT)
sock_put(sk, SOCK_REF_CM_TW);
release_sock(sk);
- if (sdp_sk(sk)->id)
+ if (sdp_sk(sk)->id) {
+ sdp_warn(sk, "Destroyed QP!!!!\n");
+ sdp_sk(sk)->qp_active = 0;
rdma_disconnect(sdp_sk(sk)->id);
- else
+ } else
sock_put(sk, SOCK_REF_CM_TW);
out:
sock_put(sk, SOCK_REF_DREQ_TO);
}
+/*
+ * Only SDP interact with this receive queue. Don't want
+ * lockdep warnings that using spinlock irqsave
+ */
+static struct lock_class_key ib_sdp_sk_receive_queue_lock_key;
+
+static struct lock_class_key ib_sdp_sk_callback_lock_key;
+
int sdp_init_sock(struct sock *sk)
{
struct sdp_sock *ssk = sdp_sk(sk);
INIT_LIST_HEAD(&ssk->accept_queue);
INIT_LIST_HEAD(&ssk->backlog_queue);
INIT_DELAYED_WORK(&ssk->dreq_wait_work, sdp_dreq_wait_timeout_work);
+ INIT_DELAYED_WORK(&ssk->srcavail_cancel_work, srcavail_cancel_timeout);
INIT_WORK(&ssk->destroy_work, sdp_destroy_work);
+ lockdep_set_class(&sk->sk_receive_queue.lock,
+ &ib_sdp_sk_receive_queue_lock_key);
+
+ lockdep_set_class(&sk->sk_callback_lock,
+ &ib_sdp_sk_callback_lock_key);
+
sk->sk_route_caps |= NETIF_F_SG | NETIF_F_NO_CSUM;
skb_queue_head_init(&ssk->rx_ctl_q);
ssk->sdp_disconnect = 0;
ssk->destructed_already = 0;
ssk->destruct_in_process = 0;
+ ssk->srcavail_cancel = 0;
spin_lock_init(&ssk->lock);
+ spin_lock_init(&ssk->tx_sa_lock);
atomic_set(&ssk->somebody_is_doing_posts, 0);
+ ssk->tx_ring.rdma_inflight = NULL;
+
return 0;
}
static void sdp_mark_push(struct sdp_sock *ssk, struct sk_buff *skb)
{
- TCP_SKB_CB(skb)->flags |= TCPCB_FLAG_PSH;
+ SDP_SKB_CB(skb)->flags |= TCPCB_FLAG_PSH;
ssk->pushed_seq = ssk->write_seq;
sdp_do_posts(ssk);
}
if (val < SDP_MIN_ZCOPY_THRESH || val > SDP_MAX_ZCOPY_THRESH)
err = -EINVAL;
else
- ssk->zcopy_thresh = val;
+ ssk->bzcopy_thresh = val;
break;
default:
err = -ENOPROTOOPT;
val = (ssk->keepalive_time ? : sdp_keepalive_time) / HZ;
break;
case SDP_ZCOPY_THRESH:
- val = ssk->zcopy_thresh ? ssk->zcopy_thresh : sdp_zcopy_thresh;
+ val = ssk->bzcopy_thresh ? ssk->bzcopy_thresh : sdp_bzcopy_thresh;
break;
default:
return -ENOPROTOOPT;
{
if (unlikely(flags & MSG_OOB)) {
struct sk_buff *skb = sk->sk_write_queue.prev;
- TCP_SKB_CB(skb)->flags |= TCPCB_FLAG_URG;
+ SDP_SKB_CB(skb)->flags |= TCPCB_FLAG_URG;
}
}
sdp_do_posts(sdp_sk(sk));
}
-static inline void skb_entail(struct sock *sk, struct sdp_sock *ssk,
- struct sk_buff *skb)
+void skb_entail(struct sock *sk, struct sdp_sock *ssk, struct sk_buff *skb)
{
skb_header_release(skb);
__skb_queue_tail(&sk->sk_write_queue, skb);
}
if (bz->pages) {
- for (i = 0; i < bz->cur_page; i++)
+ for (i = 0; i < bz->cur_page; i++) {
put_page(bz->pages[i]);
+ }
kfree(bz->pages);
}
{
struct bzcopy_state *bz;
unsigned long addr;
- int done_pages = 0;
int thresh;
mm_segment_t cur_fs;
+ int rc = 0;
- thresh = ssk->zcopy_thresh ? : sdp_zcopy_thresh;
+ thresh = ssk->bzcopy_thresh ? : sdp_bzcopy_thresh;
if (thresh == 0 || len < thresh || !capable(CAP_IPC_LOCK)) {
SDPSTATS_COUNTER_INC(sendmsg_bcopy_segment);
return NULL;
return ERR_PTR(-ENOMEM);
}
+ rc = sdp_get_pages(&ssk->isk.sk, bz->pages, bz->page_cnt,
+ (unsigned long)base);
- addr &= PAGE_MASK;
- if (segment_eq(cur_fs, KERNEL_DS)) {
- for (done_pages = 0; done_pages < bz->page_cnt; done_pages++) {
- bz->pages[done_pages] = virt_to_page(addr);
- if (!bz->pages[done_pages])
- break;
- get_page(bz->pages[done_pages]);
- addr += PAGE_SIZE;
- }
- } else {
- if (current->mm) {
- down_write(¤t->mm->mmap_sem);
- done_pages = get_user_pages(current, current->mm, addr,
- bz->page_cnt, 0, 0, bz->pages, NULL);
- up_write(¤t->mm->mmap_sem);
- }
- }
- if (unlikely(done_pages != bz->page_cnt)){
- int i;
- if (done_pages > 0) {
- for (i = 0; i < done_pages; i++)
- put_page(bz->pages[i]);
- }
- kfree(bz->pages);
- kfree(bz);
- bz = ERR_PTR(-EFAULT);
- }
+ if (unlikely(rc))
+ goto err;
return bz;
+
+err:
+ kfree(bz->pages);
+ kfree(bz);
+ return ERR_PTR(-EFAULT);
}
#define TCP_PAGE(sk) (sk->sk_sndmsg_page)
return copy;
}
-/* return the min of:
- * - tx credits
- * - free slots in tx_ring (not including SDP_MIN_TX_CREDITS
- */
-static inline int tx_slots_free(struct sdp_sock *ssk)
-{
- int min_free;
-
- min_free = MIN(tx_credits(ssk),
- SDP_TX_SIZE - ring_posted(ssk->tx_ring));
- if (min_free < SDP_MIN_TX_CREDITS)
- return 0;
-
- return min_free - SDP_MIN_TX_CREDITS;
-};
-
-/* like sk_stream_memory_free - except measures remote credits */
-static inline int sdp_bzcopy_slots_avail(struct sdp_sock *ssk,
- struct bzcopy_state *bz)
-{
- return tx_slots_free(ssk) > bz->busy;
-}
-
/* like sk_stream_wait_memory - except waits on remote credits */
-static int sdp_bzcopy_wait_memory(struct sdp_sock *ssk, long *timeo_p,
+int sdp_bzcopy_wait_memory(struct sdp_sock *ssk, long *timeo_p,
struct bzcopy_state *bz)
{
struct sock *sk = &ssk->isk.sk;
long timeo;
struct bzcopy_state *bz = NULL;
SDPSTATS_COUNTER_INC(sendmsg);
+
+ if (sdp_zcopy_thresh && size > sdp_zcopy_thresh) {
+ err = sdp_sendmsg_zcopy(iocb, sk, msg, size);
+ if (err != -EAGAIN)
+ return err;
+
+ /* Got SendSM/Timedout - fallback to regular send */
+ }
+
lock_sock(sk);
sdp_dbg_data(sk, "%s\n", __func__);
/* OOB data byte should be the last byte of
the data payload */
- if (unlikely(TCP_SKB_CB(skb)->flags & TCPCB_FLAG_URG) &&
+ if (unlikely(SDP_SKB_CB(skb)->flags & TCPCB_FLAG_URG) &&
!(flags & MSG_OOB)) {
sdp_mark_push(ssk, skb);
goto new_segment;
}
if (!copied)
- TCP_SKB_CB(skb)->flags &= ~TCPCB_FLAG_PSH;
+ SDP_SKB_CB(skb)->flags &= ~TCPCB_FLAG_PSH;
ssk->write_seq += copy;
- TCP_SKB_CB(skb)->end_seq += copy;
+ SDP_SKB_CB(skb)->end_seq += copy;
/*unused: skb_shinfo(skb)->gso_segs = 0;*/
from += copy;
bz = sdp_bz_cleanup(bz);
}
- posts_handler_put(ssk);
-
sdp_auto_moderation(ssk);
+ posts_handler_put(ssk);
+
release_sock(sk);
return copied;
posts_handler_put(ssk);
release_sock(sk);
+
return err;
}
u32 *seq;
int copied = 0;
int rc;
+ int avail_bytes_count = 0; /* Could be inlined in skb */
+ /* or advertised for RDMA */
lock_sock(sk);
- sdp_dbg_data(sk, "%s\n", __func__);
+ sdp_dbg_data(sk, "iovlen: %ld iov_len: %ld flags: 0x%x peek: 0x%x\n",
+ msg->msg_iovlen, msg->msg_iov[0].iov_len, flags,
+ MSG_PEEK);
posts_handler_get(ssk);
target = sock_rcvlowat(sk, flags & MSG_WAITALL, len);
do {
+ struct rx_srcavail_state *rx_sa = NULL;
u32 offset;
/* Are we at urgent data? Stop if we have read anything or have
skb = skb_peek(&sk->sk_receive_queue);
do {
+ struct sdp_bsdh *h;
if (!skb)
break;
- if ((skb_transport_header(skb))[0] == SDP_MID_DISCONN)
+ avail_bytes_count = 0;
+
+ h = (struct sdp_bsdh *)skb_transport_header(skb);
+
+ switch (h->mid) {
+ case SDP_MID_DISCONN:
+ sdp_dbg(sk, "Handle RX SDP_MID_DISCONN\n");
+ sdp_prf(sk, NULL, "Handle RX SDP_MID_DISCONN");
+ sdp_handle_disconn(sk);
goto found_fin_ok;
- if (before(*seq, TCP_SKB_CB(skb)->seq)) {
+ case SDP_MID_SRCAVAIL:
+ rx_sa = RX_SRCAVAIL_STATE(skb);
+ if (rx_sa->mseq < ssk->srcavail_cancel_mseq) {
+ rx_sa->aborted = 1;
+ sdp_warn(sk, "Ignoring src avail - "
+ "due to SrcAvailCancel\n");
+ goto skb_cleanup;
+ }
+ avail_bytes_count = rx_sa->len;
+ break;
+
+ case SDP_MID_DATA:
+ rx_sa = NULL;
+ avail_bytes_count = skb->len;
+ break;
+ default:
+ break;
+ }
+
+ if (before(*seq, SDP_SKB_CB(skb)->seq)) {
sdp_warn(sk, "recvmsg bug: copied %X seq %X\n",
- *seq, TCP_SKB_CB(skb)->seq);
+ *seq, SDP_SKB_CB(skb)->seq);
sdp_reset(sk);
break;
}
- offset = *seq - TCP_SKB_CB(skb)->seq;
- if (offset < skb->len)
+ offset = *seq - SDP_SKB_CB(skb)->seq;
+// sdp_warn(sk, "offset = 0x%x, avail_byte_count = 0x%x\n",
+// offset, avail_bytes_count);
+ if (offset < avail_bytes_count)
goto found_ok_skb;
WARN_ON(!(flags & MSG_PEEK));
continue;
found_ok_skb:
- sdp_dbg_data(sk, "found_ok_skb len %d\n", skb->len);
- sdp_dbg_data(sk, "len %Zd offset %d\n", len, offset);
+ sdp_dbg_data(sk, "bytes avail: %d\n", avail_bytes_count);
+ sdp_dbg_data(sk, "buf len %Zd offset %d\n", len, offset);
sdp_dbg_data(sk, "copied %d target %d\n", copied, target);
- used = skb->len - offset;
+ used = avail_bytes_count - offset;
if (len < used)
used = len;
}
}
if (!(flags & MSG_TRUNC)) {
- err = skb_copy_datagram_iovec(skb, offset,
+ if (rx_sa) {
+ sdp_dbg_data(sk, "Got srcavail - using RDMA\n");
+ err = sdp_rdma_to_iovec(sk, msg->msg_iov, skb,
+ used);
+ if (err == -EAGAIN) {
+ sdp_warn(sk, "RDMA Read aborted\n");
+ used = 0;
+ goto skb_cleanup;
+ }
+ } else {
+ err = skb_copy_datagram_iovec(skb, offset,
/* TODO: skip header? */
msg->msg_iov, used);
+ }
if (err) {
sdp_dbg(sk, "%s: skb_copy_datagram_iovec failed"
"offset %d size %ld status %d\n",
skip_copy:
if (ssk->urg_data && after(ssk->copied_seq, ssk->urg_seq))
ssk->urg_data = 0;
- if (used + offset < skb->len)
+
+
+ if (rx_sa) {
+ if (ssk->srcavail_cancel_mseq < rx_sa->mseq) {
+ rc = sdp_post_rdma_rd_compl(ssk, rx_sa->used);
+ BUG_ON(rc);
+ }
+ if (rx_sa->aborted) {
+ sdp_warn(sk, "RDMA aborted. Sending SendSM\n");
+ rc = sdp_post_sendsm(ssk);
+ BUG_ON(rc);
+ }
+ }
+
+ if ((!rx_sa && used + offset < skb->len) ||
+ (rx_sa && !rx_sa->aborted && rx_sa->used < rx_sa->len))
continue;
offset = 0;
- if (!(flags & MSG_PEEK)) {
+skb_cleanup:
+ if (!(flags & MSG_PEEK) || (rx_sa && rx_sa->aborted)) {
struct sdp_bsdh *h;
h = (struct sdp_bsdh *)skb_transport_header(skb);
sdp_prf1(sk, skb, "READ finished. mseq: %d mseq_ack:%d",
ntohl(h->mseq), ntohl(h->mseq_ack));
skb_unlink(skb, &sk->sk_receive_queue);
__kfree_skb(skb);
+
+ kfree(rx_sa);
+ rx_sa = NULL;
}
continue;
found_fin_ok:
u8 tmp;
u32 ptr = skb->len - 1;
- ssk->urg_seq = TCP_SKB_CB(skb)->seq + ptr;
+ ssk->urg_seq = SDP_SKB_CB(skb)->seq + ptr;
if (skb_copy_bits(skb, ptr, &tmp, 1))
BUG();
static void sdp_add_device(struct ib_device *device)
{
+ struct sdp_device *sdp_dev;
+ struct ib_fmr_pool_param fmr_param;
+
+ sdp_dev = kmalloc(sizeof *sdp_dev, GFP_KERNEL);
+ if (!sdp_dev)
+ return;
+
+ sdp_dev->pd = ib_alloc_pd(device);
+ if (IS_ERR(sdp_dev->pd)) {
+ printk(KERN_WARNING "Unable to allocate PD: %ld.\n",
+ PTR_ERR(sdp_dev->pd));
+ goto err_pd_alloc;
+ }
+
+ sdp_dev->mr = ib_get_dma_mr(sdp_dev->pd, IB_ACCESS_LOCAL_WRITE);
+ if (IS_ERR(sdp_dev->mr)) {
+ printk(KERN_WARNING "Unable to get dma MR: %ld.\n",
+ PTR_ERR(sdp_dev->mr));
+ goto err_mr;
+ }
+
+ memset(&fmr_param, 0, sizeof fmr_param);
+ fmr_param.pool_size = SDP_FMR_POOL_SIZE;
+ fmr_param.dirty_watermark = SDP_FMR_DIRTY_SIZE;
+ fmr_param.cache = 0;
+ fmr_param.max_pages_per_fmr = SDP_FMR_SIZE;
+ fmr_param.page_shift = PAGE_SHIFT;
+ fmr_param.access = (IB_ACCESS_LOCAL_WRITE |
+ IB_ACCESS_REMOTE_READ);
+
+ sdp_dev->fmr_pool = ib_create_fmr_pool(sdp_dev->pd, &fmr_param);
+ if (IS_ERR(sdp_dev->fmr_pool)) {
+ printk(KERN_WARNING "Error creating fmr pool\n");
+ sdp_dev->fmr_pool = NULL;
+ goto err_fmr_create;
+ }
+
+ ib_set_client_data(device, &sdp_client, sdp_dev);
+
+ return;
+
+err_fmr_create:
+ ib_dereg_mr(sdp_dev->mr);
+err_mr:
+ ib_dealloc_pd(sdp_dev->pd);
+err_pd_alloc:
+ kfree(sdp_dev);
}
static void sdp_remove_device(struct ib_device *device)
struct sdp_sock *ssk;
struct sock *sk;
struct rdma_cm_id *id;
+ struct sdp_device *sdp_dev;
do_next:
write_lock(&device_removal_lock);
sk = &ssk->isk.sk;
sdp_cancel_dreq_wait_timeout(ssk);
+ cancel_delayed_work(&ssk->srcavail_cancel_work);
spin_unlock_irq(&sock_list_lock);
spin_unlock_irq(&sock_list_lock);
write_unlock(&device_removal_lock);
+
+ sdp_dev = ib_get_client_data(device, &sdp_client);
+
+ ib_flush_fmr_pool(sdp_dev->fmr_pool);
+ ib_destroy_fmr_pool(sdp_dev->fmr_pool);
+
+ ib_dereg_mr(sdp_dev->mr);
+
+ ib_dealloc_pd(sdp_dev->pd);
+
+ kfree(sdp_dev);
}
static struct net_proto_family sdp_net_proto = {
.owner = THIS_MODULE,
};
-static struct ib_client sdp_client = {
+struct ib_client sdp_client = {
.name = "sdp",
.add = sdp_add_device,
.remove = sdp_remove_device
if (!orphan_count)
goto no_mem_orphan_count;
+ INIT_LIST_HEAD(&sockets_allocated->list);
+ INIT_LIST_HEAD(&orphan_count->list);
+
percpu_counter_init(sockets_allocated, 0);
percpu_counter_init(orphan_count, 0);
sdpstats.sendmsg_bcopy_segment);
seq_printf(seq, "bzcopy segments \t\t: %d\n",
sdpstats.sendmsg_bzcopy_segment);
+ seq_printf(seq, "zcopy segments \t\t: %d\n",
+ sdpstats.sendmsg_zcopy_segment);
seq_printf(seq, "post_send_credits \t\t: %d\n",
sdpstats.post_send_credits);
seq_printf(seq, "memcpy_count \t\t: %u\n",
}
/* Like tcp_fin - called when SDP_MID_DISCONNECT is received */
-static void sdp_fin(struct sock *sk)
+void sdp_handle_disconn(struct sock *sk)
{
sdp_dbg(sk, "%s\n", __func__);
sdp_exch_state(sk, TCPF_FIN_WAIT1, TCP_TIME_WAIT);
if (sdp_sk(sk)->id) {
+ sdp_sk(sk)->qp_active = 0;
rdma_disconnect(sdp_sk(sk)->id);
} else {
sdp_warn(sk, "%s: sdp_sk(sk)->id is NULL\n", __func__);
/* TODO: proper error handling */
sge->addr = (u64)addr;
sge->length = SDP_HEAD_SIZE;
- sge->lkey = ssk->mr->lkey;
+ sge->lkey = ssk->sdp_dev->mr->lkey;
frags = skb_shinfo(skb)->nr_frags;
for (i = 0; i < frags; ++i) {
++sge;
rx_req->mapping[i + 1] = addr;
sge->addr = addr;
sge->length = skb_shinfo(skb)->frags[i].size;
- sge->lkey = ssk->mr->lkey;
+ sge->lkey = ssk->sdp_dev->mr->lkey;
}
rx_wr.next = NULL;
int buffer_size = SDP_HEAD_SIZE + ssk->recv_frags * PAGE_SIZE;
unsigned long max_bytes;
+ if (!ssk->qp_active) {
+// sdp_warn(sk, "post rx while qp is not active\n");
+ return 0;
+ }
+
if (top_mem_usage && (top_mem_usage * 0x100000) <
atomic_read(&sdp_current_mem_usage) * PAGE_SIZE) {
scale = 1;
max_bytes = sk->sk_rcvbuf * scale;
- if (unlikely(ring_posted(ssk->rx_ring) >= SDP_RX_SIZE))
+ if (unlikely(rx_ring_posted(ssk) >= SDP_RX_SIZE))
return 0;
- if (likely(ring_posted(ssk->rx_ring) > SDP_MIN_TX_CREDITS)) {
+ if (likely(rx_ring_posted(ssk) > SDP_MIN_TX_CREDITS)) {
unsigned long bytes_in_process =
- (ring_posted(ssk->rx_ring) - SDP_MIN_TX_CREDITS) *
+ (rx_ring_posted(ssk) - SDP_MIN_TX_CREDITS) *
buffer_size;
bytes_in_process += rcv_nxt(ssk) - ssk->copied_seq;
{
int skb_len;
struct sdp_sock *ssk = sdp_sk(sk);
+ struct sdp_bsdh *h = (struct sdp_bsdh *)skb_transport_header(skb);
/* not needed since sk_rmem_alloc is not currently used
* TODO - remove this?
skb_set_owner_r(skb, sk); */
- skb_len = skb->len;
+ SDP_SKB_CB(skb)->seq = rcv_nxt(ssk);
+ if (h->mid == SDP_MID_SRCAVAIL) {
+ struct sdp_srcah *srcah = (struct sdp_srcah *)(h+1);
+ struct rx_srcavail_state *sa;
+
+ ssk->srcavail_cancel_mseq = 0;
+
+ sa = RX_SRCAVAIL_STATE(skb) = kzalloc(
+ sizeof(struct rx_srcavail_state), GFP_ATOMIC);
+
+ sa->mseq = ntohl(h->mseq);
+ sa->aborted = 0;
+ sa->used = 0;
+ sa->len = skb_len = ntohl(srcah->len);
+ sa->rkey = ntohl(srcah->rkey);
+ sa->vaddr = be64_to_cpu(srcah->vaddr);
+
+ sdp_dbg_data(sk, "queueing SrcAvail. skb_len = %d vaddr = %lld\n",
+ skb_len, sa->vaddr);
+ } else {
+ skb_len = skb->len;
- TCP_SKB_CB(skb)->seq = rcv_nxt(ssk);
- atomic_add(skb_len, &ssk->rcv_nxt);
+ atomic_add(skb_len, &ssk->rcv_nxt);
+ }
skb_queue_tail(&sk->sk_receive_queue, skb);
if (likely(c > SDP_MIN_TX_CREDITS))
c += c/2;
- return unlikely(c < ring_posted(ssk->rx_ring)) &&
+ return unlikely(c < rx_ring_posted(ssk)) &&
likely(tx_credits(ssk) > 1) &&
- likely(sdp_tx_ring_slots_left(&ssk->tx_ring));
+ likely(sdp_tx_ring_slots_left(ssk));
}
switch (h->mid) {
case SDP_MID_DATA:
+ case SDP_MID_SRCAVAIL:
WARN_ON(!(sk->sk_shutdown & RCV_SHUTDOWN));
sdp_warn(sk, "DATA after socket rcv was shutdown\n");
}
__kfree_skb(skb);
+ break;
+ case SDP_MID_RDMARDCOMPL:
+ {
+ struct sdp_rrch *rrch = (struct sdp_rrch *)(h+1);
+ sdp_dbg_data(sk, "RdmaRdCompl message arrived\n");
+ sdp_handle_rdma_read_compl(ssk, ntohl(h->mseq_ack),
+ ntohl(rrch->len));
+ __kfree_skb(skb);
+ } break;
+ case SDP_MID_SENDSM:
+ sdp_handle_sendsm(ssk, ntohl(h->mseq_ack));
+ __kfree_skb(skb);
+ break;
+ case SDP_MID_SRCAVAIL_CANCEL:
+ sdp_dbg_data(sk, "Handling SrcAvailCancel - sending SendSM\n");
+ sdp_prf(sk, NULL, "Handling SrcAvailCancel");
+ ssk->srcavail_cancel_mseq = ntohl(h->mseq);
+ sdp_post_sendsm(ssk);
break;
case SDP_MID_DISCONN:
- sdp_dbg_data(sk, "Handling RX disconnect\n");
- sdp_prf(sk, NULL, "Handling RX disconnect");
- sdp_fin(sk);
- sdp_prf(sk, NULL, "Queueing fin skb - release recvmsg");
- /* Enqueue fin skb to release sleeping recvmsg */
- sdp_sock_queue_rcv_skb(sk, skb);
break;
case SDP_MID_CHRCVBUF:
sdp_dbg_data(sk, "Handling RX CHRCVBUF\n");
skb_pull(skb, sizeof(struct sdp_bsdh));
- if (h->mid != SDP_MID_DATA ||
+ if ((h->mid != SDP_MID_DATA && h->mid != SDP_MID_SRCAVAIL &&
+ h->mid != SDP_MID_DISCONN) ||
unlikely(sk->sk_shutdown & RCV_SHUTDOWN)) {
sdp_prf(sk, NULL, "Control skb - queing to control queue");
+ if (h->mid == SDP_MID_SRCAVAIL_CANCEL) {
+ sdp_warn(sk, "Got SrcAvailCancel. "
+ "seq: 0x%d seq_ack: 0x%d\n",
+ ntohl(h->mseq), ntohl(h->mseq_ack));
+ ssk->srcavail_cancel_mseq = ntohl(h->mseq);
+ }
skb_queue_tail(&ssk->rx_ctl_q, skb);
+
return 0;
}
- if (unlikely(skb->len <= 0)) {
+ if (unlikely(h->mid == SDP_MID_DATA && skb->len <= 0)) {
__kfree_skb(skb);
return 0;
}
- sdp_prf(sk, NULL, "queueing a %s skb",
- (h->mid == SDP_MID_DATA ? "data" : "disconnect"));
+ sdp_prf(sk, NULL, "queueing %s skb", mid2str(h->mid));
skb = sdp_sock_queue_rcv_skb(sk, skb);
/* if (unlikely(h->flags & SDP_OOB_PRES))
atomic_sub(skb_shinfo(skb)->nr_frags, &sdp_current_mem_usage);
if (unlikely(wc->status)) {
- if (wc->status != IB_WC_WR_FLUSH_ERR) {
- sdp_warn(sk, "Recv completion with error. Status %d\n",
- wc->status);
+ if (ssk->qp_active) {
+ sdp_warn(sk, "Recv completion with error. "
+ "Status %d, vendor: %d\n",
+ wc->status, wc->vendor_err);
sdp_reset(sk);
+ ssk->qp_active = 0;
}
__kfree_skb(skb);
return NULL;
do {
n = ib_poll_cq(cq, SDP_NUM_WC, ibwc);
+// sdp_warn(&ssk->isk.sk, "polling: %d\n", n);
for (i = 0; i < n; ++i) {
struct ib_wc *wc = &ibwc[i];
+/* sdp_warn(&ssk->isk.sk, "wr_id=0x%lx len %d opcode: 0x%x status: 0x%x\n",
+ wc->wr_id, wc->byte_len,
+ wc->opcode, wc->status);*/
BUG_ON(!(wc->wr_id & SDP_OP_RECV));
skb = sdp_process_rx_wc(ssk, wc);
sdp_post_recvs(ssk);
- if (ring_posted(ssk->tx_ring))
+ if (tx_ring_posted(ssk))
sdp_xmit_poll(ssk, 1);
sdp_post_sends(ssk, 0);
int wc_processed = 0;
int credits_before;
- sdp_dbg_data(&ssk->isk.sk, "rx irq called\n");
+// sdp_dbg_data(&ssk->isk.sk, "rx irq called\n");
+// sdp_warn(&ssk->isk.sk, "rx irq called\n");
if (cq != ssk->rx_ring.cq) {
sdp_warn(sk, "cq = %p, ssk->cq = %p\n", cq, ssk->rx_ring.cq);
sdp_prf(sk, NULL, "rx irq");
- rx_ring_lock(ssk, flags);
+ if (!rx_ring_trylock(&ssk->rx_ring, &flags)) {
+ sdp_warn(&ssk->isk.sk, "ring destroyed. not polling it\n");
+ return;
+ }
credits_before = tx_credits(ssk);
- if (!ssk->rx_ring.cq) {
- sdp_warn(&ssk->isk.sk, "WARNING: rx irq after cq destroyed\n");
-
- goto out;
- }
-
wc_processed = sdp_poll_rx_cq(ssk);
sdp_prf(&ssk->isk.sk, NULL, "processed %d", wc_processed);
}
sdp_arm_rx_cq(sk);
-out:
- rx_ring_unlock(ssk, flags);
+ rx_ring_unlock(&ssk->rx_ring, &flags);
}
static void sdp_rx_ring_purge(struct sdp_sock *ssk)
{
- while (ring_posted(ssk->rx_ring) > 0) {
+ while (rx_ring_posted(ssk) > 0) {
struct sk_buff *skb;
skb = sdp_recv_completion(ssk, ring_tail(ssk->rx_ring));
if (!skb)
void sdp_rx_ring_init(struct sdp_sock *ssk)
{
ssk->rx_ring.buffer = NULL;
- spin_lock_init(&ssk->rx_ring.lock);
+ ssk->rx_ring.destroyed = 0;
+ rwlock_init(&ssk->rx_ring.destroyed_lock);
}
static void sdp_rx_cq_event_handler(struct ib_event *event, void *data)
void sdp_rx_ring_destroy(struct sdp_sock *ssk)
{
+ rx_ring_destroy_lock(&ssk->rx_ring);
+
if (ssk->rx_ring.buffer) {
sdp_rx_ring_purge(ssk);
{
int wc_processed = 0;
- sdp_prf(&ssk->isk.sk, NULL, "called from %s:%d", func, line);
+ sdp_prf(&ssk->isk.sk, NULL, "%s", __func__);
/* If we don't have a pending timer, set one up to catch our recent
post in case the interface becomes idle */
ssk->tx_bytes += skb->len;
h->mid = mid;
- if (unlikely(TCP_SKB_CB(skb)->flags & TCPCB_FLAG_URG))
+ if (unlikely(SDP_SKB_CB(skb)->flags & TCPCB_FLAG_URG))
h->flags = SDP_OOB_PRES | SDP_OOB_PEND;
else
h->flags = 0;
- h->bufs = htons(ring_posted(ssk->rx_ring));
+ h->bufs = htons(rx_ring_posted(ssk));
h->len = htonl(skb->len);
h->mseq = htonl(mseq);
+ if (TX_SRCAVAIL_STATE(skb))
+ TX_SRCAVAIL_STATE(skb)->mseq = mseq;
h->mseq_ack = htonl(mseq_ack(ssk));
sdp_prf1(&ssk->isk.sk, skb, "TX: %s bufs: %d mseq:%ld ack:%d",
- mid2str(mid), ring_posted(ssk->rx_ring), mseq,
+ mid2str(mid), rx_ring_posted(ssk), mseq,
ntohl(h->mseq_ack));
SDP_DUMP_PACKET(&ssk->isk.sk, "TX", skb, h);
sge->addr = addr;
sge->length = skb->len - skb->data_len;
- sge->lkey = ssk->mr->lkey;
+ sge->lkey = ssk->sdp_dev->mr->lkey;
frags = skb_shinfo(skb)->nr_frags;
for (i = 0; i < frags; ++i) {
++sge;
tx_req->mapping[i + 1] = addr;
sge->addr = addr;
sge->length = skb_shinfo(skb)->frags[i].size;
- sge->lkey = ssk->mr->lkey;
+ sge->lkey = ssk->sdp_dev->mr->lkey;
}
tx_wr.next = NULL;
tx_wr.num_sge = frags + 1;
tx_wr.opcode = IB_WR_SEND;
tx_wr.send_flags = IB_SEND_SIGNALED;
- if (unlikely(TCP_SKB_CB(skb)->flags & TCPCB_FLAG_URG))
+ if (unlikely(SDP_SKB_CB(skb)->flags & TCPCB_FLAG_URG))
tx_wr.send_flags |= IB_SEND_SOLICITED;
delta = jiffies - last_send;
rc = ib_post_send(ssk->qp, &tx_wr, &bad_wr);
atomic_inc(&ssk->tx_ring.head);
atomic_dec(&ssk->tx_ring.credits);
- atomic_set(&ssk->remote_credits, ring_posted(ssk->rx_ring));
+ atomic_set(&ssk->remote_credits, rx_ring_posted(ssk));
if (unlikely(rc)) {
sdp_dbg(&ssk->isk.sk,
"ib_post_send failed with status %d.\n", rc);
struct ib_device *dev;
struct sdp_buf *tx_req;
struct sk_buff *skb = NULL;
- struct bzcopy_state *bz;
int i, frags;
struct sdp_tx_ring *tx_ring = &ssk->tx_ring;
if (unlikely(mseq != ring_tail(*tx_ring))) {
DMA_TO_DEVICE);
}
- tx_ring->una_seq += TCP_SKB_CB(skb)->end_seq;
+ tx_ring->una_seq += SDP_SKB_CB(skb)->end_seq;
/* TODO: AIO and real zcopy code; add their context support here */
- bz = BZCOPY_STATE(skb);
- if (bz)
- bz->busy--;
+ if (BZCOPY_STATE(skb))
+ BZCOPY_STATE(skb)->busy--;
atomic_inc(&tx_ring->tail);
static int sdp_handle_send_comp(struct sdp_sock *ssk, struct ib_wc *wc)
{
struct sk_buff *skb = NULL;
+ struct sdp_bsdh *h;
skb = sdp_send_completion(ssk, wc->wr_id);
if (unlikely(!skb))
}
}
-#ifdef SDP_PROFILING
-{
- struct sdp_bsdh *h = (struct sdp_bsdh *)skb->data;
+ h = (struct sdp_bsdh *)skb->data;
+
sdp_prf1(&ssk->isk.sk, skb, "tx completion. mseq:%d", ntohl(h->mseq));
-}
-#endif
+
sk_wmem_free_skb(&ssk->isk.sk, skb);
return 0;
static inline void sdp_process_tx_wc(struct sdp_sock *ssk, struct ib_wc *wc)
{
+ struct sock *sk = &ssk->isk.sk;
+
if (likely(wc->wr_id & SDP_OP_SEND)) {
sdp_handle_send_comp(ssk, wc);
return;
}
+ if (wc->wr_id & SDP_OP_RDMA) {
+ sdp_dbg_data(sk, "TX comp: RDMA read. status: %d\n", wc->status);
+ sdp_prf1(sk, NULL, "TX comp: RDMA read");
+
+ if (!ssk->tx_ring.rdma_inflight) {
+ sdp_warn(sk, "ERROR: unexpected RDMA read\n");
+ return;
+ }
+
+ if (!ssk->tx_ring.rdma_inflight->busy) {
+ sdp_warn(sk, "ERROR: too many RDMA read completions\n");
+ return;
+ }
+
+ /* Only last RDMA read WR is signalled. Order is guaranteed - therefore
+ * if Last RDMA read WR is completed - all other have, too */
+ ssk->tx_ring.rdma_inflight->busy = 0;
+ if (!ssk->tx_ring.rdma_inflight->busy) {
+ wake_up(ssk->isk.sk.sk_sleep);
+ sdp_dbg_data(&ssk->isk.sk, "woke up sleepers\n");
+ }
+ return;
+ }
+
/* Keepalive probe sent cleanup */
sdp_cnt(sdp_keepalive_probes_sent);
}
} while (n == SDP_NUM_WC);
- sdp_dbg_data(&ssk->isk.sk, "processed %d wc's\n", wc_processed);
+// sdp_dbg_data(&ssk->isk.sk, "processed %d wc's\n", wc_processed);
if (wc_processed) {
struct sock *sk = &ssk->isk.sk;
struct sock *sk = &ssk->isk.sk;
u32 inflight, wc_processed;
- sdp_dbg_data(&ssk->isk.sk, "Polling tx cq. inflight=%d\n",
- (u32) ring_posted(ssk->tx_ring));
+// sdp_dbg_data(&ssk->isk.sk, "Polling tx cq. inflight=%d\n",
+// (u32) tx_ring_posted(ssk));
- sdp_prf(&ssk->isk.sk, NULL, "%s. inflight=%d", __func__,
- (u32) ring_posted(ssk->tx_ring));
+ sdp_prf1(&ssk->isk.sk, NULL, "TX timeout: inflight=%d",
+ (u32) tx_ring_posted(ssk));
/* Only process if the socket is not in use */
bh_lock_sock(sk);
if (sock_owned_by_user(sk)) {
mod_timer(&ssk->tx_ring.timer, jiffies + SDP_TX_POLL_TIMEOUT);
- sdp_dbg_data(&ssk->isk.sk, "socket is busy - trying later\n");
+// sdp_dbg_data(&ssk->isk.sk, "socket is busy - trying later\n");
+ sdp_prf(&ssk->isk.sk, NULL, "TX comp: socket is busy\n");
SDPSTATS_COUNTER_INC(tx_poll_busy);
goto out;
}
else
SDPSTATS_COUNTER_INC(tx_poll_hit);
- inflight = (u32) ring_posted(ssk->tx_ring);
+ inflight = (u32) rx_ring_posted(ssk);
/* If there are still packets in flight and the timer has not already
* been scheduled by the Tx routine then schedule it here to guarantee
* completion processing of these packets */
if (inflight) { /* TODO: make sure socket is not closed */
- sdp_dbg_data(sk, "arming timer for more polling\n");
+// sdp_dbg_data(sk, "arming timer for more polling\n");
mod_timer(&ssk->tx_ring.timer, jiffies + SDP_TX_POLL_TIMEOUT);
}
out:
+ if (ssk->tx_ring.rdma_inflight && ssk->tx_ring.rdma_inflight->busy) {
+ sdp_prf1(sk, NULL, "RDMA is inflight - arming irq");
+ sdp_arm_tx_cq(sk);
+ }
+
bh_unlock_sock(sk);
}
struct sock *sk = cq_context;
struct sdp_sock *ssk = sdp_sk(sk);
- sdp_prf1(sk, NULL, "Got tx comp interrupt");
+ sdp_prf1(sk, NULL, "tx irq");
+ sdp_dbg_data(sk, "Got tx comp interrupt\n");
- mod_timer(&ssk->tx_ring.timer, jiffies);
+ SDPSTATS_COUNTER_INC(tx_int_count);
+
+ mod_timer(&ssk->tx_ring.timer, jiffies + SDP_TX_POLL_TIMEOUT);
+ tasklet_schedule(&ssk->tx_ring.tasklet);
}
void sdp_tx_ring_purge(struct sdp_sock *ssk)
{
- while (ring_posted(ssk->tx_ring)) {
+ while (tx_ring_posted(ssk)) {
struct sk_buff *skb;
skb = sdp_send_completion(ssk, ring_tail(ssk->tx_ring));
if (!skb)
static void sdp_tx_cq_event_handler(struct ib_event *event, void *data)
{
+ printk("xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx\n");
+ printk("xx event called !!!!!!!!!! xxxxxx\n");
+ printk("xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx\n");
+ printk("xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx\n");
}
int sdp_tx_ring_create(struct sdp_sock *ssk, struct ib_device *device)
ssk->tx_ring.timer.data = (unsigned long) ssk;
ssk->tx_ring.poll_cnt = 0;
+ tasklet_init(&ssk->tx_ring.tasklet, sdp_poll_tx_timeout,
+ (unsigned long) ssk);
+
init_timer(&ssk->nagle_timer);
ssk->nagle_timer.function = sdp_nagle_timeout;
ssk->nagle_timer.data = (unsigned long) ssk;
--- /dev/null
+/*
+ * Copyright (c) 2006 Mellanox Technologies Ltd. All rights reserved.
+ *
+ * This software is available to you under a choice of one of two
+ * licenses. You may choose to be licensed under the terms of the GNU
+ * General Public License (GPL) Version 2, available from the file
+ * COPYING in the main directory of this source tree, or the
+ * OpenIB.org BSD license below:
+ *
+ * Redistribution and use in source and binary forms, with or
+ * without modification, are permitted provided that the following
+ * conditions are met:
+ *
+ * - Redistributions of source code must retain the above
+ * copyright notice, this list of conditions and the following
+ * disclaimer.
+ *
+ * - Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following
+ * disclaimer in the documentation and/or other materials
+ * provided with the distribution.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
+ * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
+ * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
+ * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
+ * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
+ * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
+ * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+#include <linux/tcp.h>
+#include <asm/ioctls.h>
+#include <linux/workqueue.h>
+#include <linux/net.h>
+#include <linux/socket.h>
+#include <net/protocol.h>
+#include <net/inet_common.h>
+#include <rdma/rdma_cm.h>
+#include <rdma/ib_verbs.h>
+#include <rdma/ib_fmr_pool.h>
+#include <linux/dmaengine.h>
+#include <linux/pagemap.h>
+#include <net/tcp.h> /* for memcpy_toiovec */
+#include <asm/io.h>
+#include <asm/uaccess.h>
+#include <linux/delay.h>
+#include "sdp.h"
+
+static struct bzcopy_state dummy_bz = {
+busy: 1,
+};
+
+static int sdp_post_srcavail(struct sock *sk, struct tx_srcavail_state *tx_sa,
+ int off, size_t len)
+{
+ struct sdp_sock *ssk = sdp_sk(sk);
+ struct sdp_srcah *srcah;
+ struct sk_buff *skb;
+
+ WARN_ON(ssk->tx_sa);
+
+ BUG_ON(!tx_sa);
+ BUG_ON(!tx_sa->fmr || !tx_sa->fmr->fmr->lkey);
+
+ skb = sdp_stream_alloc_skb(&ssk->isk.sk,
+ sizeof(struct sdp_bsdh) +
+ sizeof(struct sdp_srcah),
+ GFP_KERNEL);
+ if (!skb) {
+ return -ENOMEM;
+ }
+ sdp_prf1(sk, skb, "sending SrcAvail");
+
+ TX_SRCAVAIL_STATE(skb) = tx_sa; /* tx_sa is hanged on the skb
+ * but continue to live after skb is freed */
+ ssk->tx_sa = tx_sa;
+
+ srcah = (struct sdp_srcah *)skb_push(skb, sizeof(*srcah));
+ srcah->len = htonl(len);
+ srcah->rkey = htonl(tx_sa->fmr->fmr->lkey);
+ srcah->vaddr = cpu_to_be64(off);
+
+ skb_entail(sk, ssk, skb);
+
+ /* TODO: pushing the skb into the tx_queue should be enough */
+
+ return 0;
+}
+
+static int sdp_post_srcavail_cancel(struct sock *sk)
+{
+ struct sdp_sock *ssk = sdp_sk(sk);
+
+ if (!ssk->tx_sa && !ssk->srcavail_cancel)
+ return 0; /* srcavail already serviced */
+
+ ssk->srcavail_cancel = 1;
+
+ sdp_post_sends(ssk, 1);
+
+ schedule_delayed_work(&ssk->srcavail_cancel_work,
+ SDP_SRCAVAIL_CANCEL_TIMEOUT);
+
+ return 0;
+}
+
+void srcavail_cancel_timeout(struct work_struct *work)
+{
+ struct sdp_sock *ssk =
+ container_of(work, struct sdp_sock, srcavail_cancel_work.work);
+ struct sock *sk = &ssk->isk.sk;
+
+ lock_sock(sk);
+
+ sdp_warn(sk, "both SrcAvail and SrcAvailCancel timedout."
+ " closing connection\n");
+ sdp_set_error(sk, -ECONNRESET);
+ wake_up(&ssk->wq);
+
+ release_sock(sk);
+}
+
+static int sdp_wait_rdmardcompl(struct sdp_sock *ssk, long *timeo_p, int len,
+ int ignore_signals)
+{
+ struct sock *sk = &ssk->isk.sk;
+ int err = 0;
+ long vm_wait = 0;
+ long current_timeo = *timeo_p;
+ struct tx_srcavail_state *tx_sa = ssk->tx_sa;
+ DEFINE_WAIT(wait);
+
+ sdp_dbg_data(sk, "Going to sleep till get RdmaRdCompl.\n");
+ sdp_prf1(sk, NULL, "Going to sleep");
+ while (ssk->qp_active) {
+ prepare_to_wait(sk->sk_sleep, &wait, TASK_INTERRUPTIBLE);
+
+ if (unlikely(!*timeo_p)) {
+ err = -ETIME;
+ sdp_prf1(sk, NULL, "timeout");
+ break;
+ }
+
+ if (unlikely(!ignore_signals && signal_pending(current))) {
+ err = -EINTR;
+ sdp_prf1(sk, NULL, "signalled");
+ break;
+ }
+
+ if (tx_sa->bytes_completed >= len)
+ break;
+
+ if (tx_sa->abort) {
+ sdp_prf1(sk, NULL, "Aborting SrcAvail sending");
+ err = -EAGAIN;
+ break ;
+ }
+
+ posts_handler_put(ssk);
+
+ sk_wait_event(sk, ¤t_timeo, tx_sa->abort &&
+ (tx_sa->bytes_completed >= len) && vm_wait);
+ sdp_dbg_data(&ssk->isk.sk, "woke up sleepers\n");
+
+ posts_handler_get(ssk);
+
+ if (vm_wait) {
+ vm_wait -= current_timeo;
+ current_timeo = *timeo_p;
+ if (current_timeo != MAX_SCHEDULE_TIMEOUT &&
+ (current_timeo -= vm_wait) < 0)
+ current_timeo = 0;
+ vm_wait = 0;
+ }
+ *timeo_p = current_timeo;
+ }
+
+ if (!ssk->qp_active) {
+ sdp_warn(sk, "qp is not active\n");
+ }
+
+ finish_wait(sk->sk_sleep, &wait);
+
+ sdp_dbg_data(sk, "Finished waiting - RdmaRdCompl: %d bytes, abort: %d\n",
+ tx_sa->bytes_completed, tx_sa->abort);
+
+ if (!ssk->qp_active) {
+ sdp_warn(sk, "QP destroyed while waiting\n");
+ return -EINVAL;
+ }
+ return err;
+}
+
+static int sdp_wait_rdma_wr_finished(struct sdp_sock *ssk, long *timeo_p)
+{
+ struct sock *sk = &ssk->isk.sk;
+ int err = 0;
+ long current_timeo = *timeo_p;
+ DEFINE_WAIT(wait);
+
+ sdp_dbg_data(sk, "Sleep till RDMA wr finished.\n");
+ while (1) {
+ prepare_to_wait(sk->sk_sleep, &wait, TASK_INTERRUPTIBLE);
+
+ if (unlikely(!*timeo_p)) {
+ err = -EAGAIN;
+ sdp_warn(sk, "timedout\n");
+ break;
+ }
+
+ if (unlikely(signal_pending(current))) {
+ err = sock_intr_errno(*timeo_p);
+ sdp_warn(sk, "signalled\n");
+ break;
+ }
+
+ if (!ssk->tx_ring.rdma_inflight->busy) {
+ sdp_dbg_data(sk, "got rdma cqe\n");
+ break;
+ }
+
+ posts_handler_put(ssk);
+
+ sdp_prf1(sk, NULL, "Going to sleep");
+ sk_wait_event(sk, ¤t_timeo,
+ !ssk->tx_ring.rdma_inflight->busy);
+ sdp_prf1(sk, NULL, "Woke up");
+ sdp_dbg_data(&ssk->isk.sk, "woke up sleepers\n");
+
+ posts_handler_get(ssk);
+
+ *timeo_p = current_timeo;
+ }
+
+ finish_wait(sk->sk_sleep, &wait);
+
+ sdp_dbg_data(sk, "Finished waiting - rdma's inflight=%d\n",
+ ssk->tx_ring.rdma_inflight->busy);
+
+ return err;
+}
+
+int sdp_post_rdma_rd_compl(struct sdp_sock *ssk, int copied)
+{
+ struct sdp_rrch *rrch;
+ struct sk_buff *skb;
+ gfp_t gfp_page;
+
+ if (unlikely(ssk->isk.sk.sk_allocation))
+ gfp_page = ssk->isk.sk.sk_allocation;
+ else
+ gfp_page = GFP_KERNEL;
+
+ skb = sdp_stream_alloc_skb(&ssk->isk.sk,
+ sizeof(struct sdp_bsdh) +
+ sizeof(struct sdp_rrch),
+ gfp_page);
+ /* FIXME */
+ BUG_ON(!skb);
+
+ rrch = (struct sdp_rrch *)skb_put(skb, sizeof(*rrch));
+ rrch->len = htonl(copied);
+ /* TODO: What if no tx_credits available? */
+ sdp_post_send(ssk, skb, SDP_MID_RDMARDCOMPL);
+
+ return 0;
+}
+
+int sdp_post_sendsm(struct sdp_sock *ssk)
+{
+ struct sk_buff *skb;
+ gfp_t gfp_page;
+
+ if (unlikely(ssk->isk.sk.sk_allocation))
+ gfp_page = ssk->isk.sk.sk_allocation;
+ else
+ gfp_page = GFP_KERNEL;
+
+ skb = sdp_stream_alloc_skb(&ssk->isk.sk,
+ sizeof(struct sdp_bsdh),
+ gfp_page);
+ /* FIXME */
+ BUG_ON(!skb);
+
+ sdp_post_send(ssk, skb, SDP_MID_SENDSM);
+
+ return 0;
+}
+
+static int sdp_update_iov_used(struct sock *sk, struct iovec *iov, int len)
+{
+ sdp_dbg_data(sk, "updating consumed %d bytes from iov\n", len);
+ while (len > 0) {
+ if (iov->iov_len) {
+ int copy = min_t(unsigned int, iov->iov_len, len);
+ len -= copy;
+ iov->iov_len -= copy;
+ iov->iov_base += copy;
+ }
+ iov++;
+ }
+
+ return 0;
+}
+
+static inline int sge_bytes(struct ib_sge *sge, int sge_cnt)
+{
+ int bytes = 0;
+
+ while (sge_cnt > 0) {
+ bytes += sge->length;
+ sge++;
+ sge_cnt--;
+ }
+
+ return bytes;
+}
+void sdp_handle_sendsm(struct sdp_sock *ssk, u32 mseq_ack)
+{
+ struct sock *sk = &ssk->isk.sk;
+ unsigned long flags;
+
+ spin_lock_irqsave(&ssk->tx_sa_lock, flags);
+
+ if (!ssk->tx_sa) {
+ sdp_prf1(sk, NULL, "SendSM for cancelled/finished SrcAvail");
+ goto out;
+ }
+
+ if (ssk->tx_sa->mseq < mseq_ack) {
+ sdp_prf1(sk, NULL, "SendSM arrived for old SrcAvail. "
+ "SendSM mseq_ack: 0x%x, SrcAvail mseq: 0x%x",
+ mseq_ack, ssk->tx_sa->mseq);
+ goto out;
+ }
+
+ sdp_prf1(sk, NULL, "Got SendSM - aborting SrcAvail");
+
+ ssk->tx_sa->abort = 1;
+ cancel_delayed_work(&ssk->srcavail_cancel_work);
+
+ wake_up(ssk->isk.sk.sk_sleep);
+ sdp_dbg_data(&ssk->isk.sk, "woke up sleepers\n");
+
+out:
+ spin_unlock_irqrestore(&ssk->tx_sa_lock, flags);
+}
+
+void sdp_handle_rdma_read_compl(struct sdp_sock *ssk, u32 mseq_ack,
+ u32 bytes_completed)
+{
+ struct sock *sk = &ssk->isk.sk;
+ unsigned long flags;
+
+ sdp_prf1(sk, NULL, "RdmaRdCompl ssk=%p tx_sa=%p", ssk, ssk->tx_sa);
+ sdp_dbg_data(sk, "RdmaRdCompl ssk=%p tx_sa=%p\n", ssk, ssk->tx_sa);
+
+ spin_lock_irqsave(&ssk->tx_sa_lock, flags);
+
+ BUG_ON(!ssk);
+
+ if (!ssk->tx_sa) {
+ sdp_warn(sk, "Got RdmaRdCompl for aborted SrcAvail\n");
+ goto out;
+ }
+
+ if (ssk->tx_sa->mseq < mseq_ack) {
+ sdp_warn(sk, "RdmaRdCompl arrived for old SrcAvail. "
+ "SendSM mseq_ack: 0x%x, SrcAvail mseq: 0x%x\n",
+ mseq_ack, ssk->tx_sa->mseq);
+ goto out;
+ }
+
+ if (ssk->tx_sa->bytes_completed > bytes_completed) {
+ sdp_warn(sk, "tx_sa->bytes_total(%d), "
+ "tx_sa->bytes_completed(%d) > bytes_completed(%d)\n",
+ ssk->tx_sa->bytes_total,
+ ssk->tx_sa->bytes_completed, bytes_completed);
+ }
+ if (ssk->tx_sa->bytes_total < bytes_completed) {
+ sdp_warn(sk, "ssk->tx_sa->bytes_total(%d) < bytes_completed(%d)\n",
+ ssk->tx_sa->bytes_total, bytes_completed);
+ }
+
+ ssk->tx_sa->bytes_completed = bytes_completed;
+
+ wake_up(sk->sk_sleep);
+ sdp_dbg_data(sk, "woke up sleepers\n");
+
+out:
+ spin_unlock_irqrestore(&ssk->tx_sa_lock, flags);
+ return;
+}
+
+int sdp_get_pages(struct sock *sk, struct page **pages, int page_cnt,
+ unsigned long addr)
+{
+ int done_pages = 0;
+
+ sdp_dbg_data(sk, "count: %d addr: 0x%lx\n", page_cnt, addr);
+
+ addr &= PAGE_MASK;
+ if (segment_eq(get_fs(), KERNEL_DS)) {
+ for (done_pages = 0; done_pages < page_cnt; done_pages++) {
+ pages[done_pages] = virt_to_page(addr);
+ if (!pages[done_pages])
+ break;
+ get_page(pages[done_pages]);
+ addr += PAGE_SIZE;
+ }
+ } else {
+ if (current->mm) {
+ down_write(¤t->mm->mmap_sem);
+ done_pages = get_user_pages(current, current->mm, addr,
+ page_cnt, 0, 0, pages, NULL);
+ up_write(¤t->mm->mmap_sem);
+ }
+ }
+
+ if (unlikely(done_pages != page_cnt))
+ goto err;
+
+ return 0;
+
+err:
+ sdp_warn(sk, "Error getting pages. done_pages: %d page_cnt: %d\n",
+ done_pages, page_cnt);
+ for (; done_pages > 0; done_pages--)
+ put_page(pages[done_pages - 1]);
+
+ return -1;
+}
+
+void sdp_put_pages(struct sock *sk, struct page **pages, int page_cnt)
+{
+ int i;
+ sdp_dbg_data(sk, "count: %d\n", page_cnt);
+
+ for (i = 0; i < page_cnt; i++) {
+ set_page_dirty_lock(pages[i]);
+ page_cache_release(pages[i]);
+ }
+}
+
+static int sdp_map_dma(struct sock *sk, u64 *addrs, int page_cnt,
+ struct page **pages, int offset, int len)
+{
+ struct ib_device *dev = sdp_sk(sk)->ib_device;
+ int i;
+ sdp_dbg_data(sk, "offset: %d len: %d\n", offset, len);
+
+ for (i = 0; i < page_cnt; i++) {
+ int o = 0;
+ int l = PAGE_SIZE;
+
+ if (i == page_cnt - 1) {
+ /* Last page */
+ l = (len + offset) & (PAGE_SIZE - 1);
+ if (l == 0)
+ l = PAGE_SIZE;
+ }
+
+// sdp_dbg_data(sk, "mapping %03d: page: %p o: %d l: %d\n",
+// i, pages[i], o, l);
+
+ addrs[i] = ib_dma_map_page(dev,
+ pages[i],
+ o,
+ l,
+ DMA_TO_DEVICE);
+ if (ib_dma_mapping_error(dev, addrs[i])) {
+ sdp_warn(sk, "Error mapping page %p off: %d len: %d\n",
+ pages[i], o, l);
+ goto err;
+ }
+ }
+
+ return 0;
+err:
+ for (; i > 0; i--) {
+ ib_dma_unmap_page(dev, addrs[i], PAGE_SIZE, DMA_TO_DEVICE);
+ }
+ return -1;
+}
+
+static void sdp_unmap_dma(struct sock *sk, u64 *addrs, int page_cnt)
+{
+ int i;
+ struct ib_device *dev = sdp_sk(sk)->ib_device;
+
+ sdp_dbg_data(sk, "count: %d\n", page_cnt);
+
+ for (i = 0; i < page_cnt; i++) {
+ ib_dma_unmap_page(dev, addrs[i], 4096, DMA_TO_DEVICE);
+ }
+}
+
+static int sdp_map_dma_sge(struct sock *sk, struct ib_sge *sge, int page_cnt,
+ struct page **pages, int offset, int len)
+{
+ struct ib_device *dev = sdp_sk(sk)->ib_device;
+ int i;
+ int left = len;
+ sdp_dbg_data(sk, "offset: %d len: %d\n", offset, len);
+
+ for (i = 0; i < page_cnt; i++) {
+ int o = i == 0 ? offset : 0;
+ int l = MIN(left, PAGE_SIZE - o);
+
+ sge[i].lkey = sdp_sk(sk)->sdp_dev->mr->lkey;
+ sge[i].length = l;
+
+ sge[i].addr = ib_dma_map_page(dev,
+ pages[i],
+ 0, /* map page with offset (fbo) not working */
+ l + o, /* compensate on 0 offset */
+ DMA_FROM_DEVICE);
+ if (ib_dma_mapping_error(dev, sge[i].addr)) {
+ sdp_warn(sk, "Error map page 0x%llx off: %d len: %d\n",
+ sge[i].addr, o, l);
+ goto err;
+ }
+ sge[i].addr += o;
+
+ sdp_dbg_data(sk, "mapping %03d: page: %p o: %d l: %d | addr: 0x%llx length: %d\n",
+ i, pages[i], o, l, sge[i].addr, sge[i].length);
+ left -= l;
+ }
+
+ WARN_ON(left != 0);
+
+ return 0;
+err:
+ for (; i > 0; i--) {
+ ib_dma_unmap_page(dev, sge[i].addr, PAGE_SIZE, DMA_FROM_DEVICE);
+ }
+ return -1;
+}
+
+static void sdp_unmap_dma_sge(struct sock *sk, struct ib_sge *sge,
+ int page_cnt, int offset, int len)
+{
+ int i;
+ struct ib_device *dev = sdp_sk(sk)->ib_device;
+
+ sdp_dbg_data(sk, "count: %d\n", page_cnt);
+
+ for (i = 0; i < page_cnt; i++) {
+ int l = PAGE_SIZE;
+
+ if (i == page_cnt - 1) {
+ /* Last page */
+ l = (len + offset) & (PAGE_SIZE - 1);
+ if (l == 0)
+ l = PAGE_SIZE;
+ }
+ ib_dma_unmap_page(dev, sge[i].addr, l, DMA_FROM_DEVICE);
+ }
+}
+
+static struct ib_pool_fmr *sdp_map_fmr(struct sock *sk, int page_cnt,
+ u64 *addrs)
+{
+ struct ib_pool_fmr *fmr;
+ int ret = 0;
+
+ fmr = ib_fmr_pool_map_phys(sdp_sk(sk)->sdp_dev->fmr_pool, addrs,
+ page_cnt, 0);
+ if (IS_ERR(fmr)) {
+ ret = PTR_ERR(fmr);
+ fmr = NULL;
+ sdp_warn(sk, "Error allocating fmr: %d\n", ret);
+ goto err;
+ }
+
+ return fmr;
+err:
+ return NULL;
+}
+
+enum zcopy_type {
+ SDP_ZCOPY_TYPE_RX,
+ SDP_ZCOPY_TYPE_TX,
+};
+
+static struct tx_srcavail_state *sdp_alloc_tx_sa(struct sock *sk, int offset,
+ int len)
+{
+ struct tx_srcavail_state *tx_sa;
+ int page_cnt;
+
+ page_cnt = PAGE_ALIGN(len + offset) >> PAGE_SHIFT;
+
+ tx_sa = kzalloc(sizeof(struct tx_srcavail_state) +
+ sizeof(struct page *) * page_cnt +
+ sizeof(u64) * page_cnt, GFP_KERNEL);
+ if (!tx_sa)
+ return ERR_PTR(-ENOMEM);
+
+ tx_sa->pages = (struct page **)(tx_sa+1);
+ tx_sa->page_cnt = page_cnt;
+ tx_sa->addrs = (u64 *)(&tx_sa->pages[page_cnt]);
+
+ return tx_sa;
+}
+
+int sdp_rdma_to_iovec(struct sock *sk, struct iovec *iov, struct sk_buff *skb,
+ int len)
+{
+ struct sdp_sock *ssk = sdp_sk(sk);
+ struct rx_srcavail_state *rx_sa = RX_SRCAVAIL_STATE(skb);
+ int rc = 0;
+ struct ib_send_wr *bad_wr;
+ struct ib_send_wr wr = { 0 };
+ long timeo;
+ struct ib_sge *sge;
+ int sge_left;
+ int copied;
+ int offset;
+
+ sdp_dbg_data(&ssk->isk.sk, "preparing RDMA read."
+ " len: %d. buffer len: %ld\n", len, iov->iov_len);
+ if (len > rx_sa->len)
+ len = rx_sa->len;
+
+ offset = (unsigned long)iov->iov_base & (PAGE_SIZE - 1);
+
+ rx_sa->page_cnt = PAGE_ALIGN(len + offset) >> PAGE_SHIFT;
+ sdp_dbg_data(sk, "page_cnt = %d len:%d offset: %d\n", rx_sa->page_cnt, len, offset);
+
+ rx_sa->pages =
+ (struct page **) kzalloc(sizeof(struct page *) * rx_sa->page_cnt +
+ sizeof(struct ib_sge) * rx_sa->page_cnt, GFP_KERNEL);
+ if (!rx_sa->pages) {
+ sdp_warn(sk, "Error allocating zcopy context\n");
+ goto err_alloc_zcopy;
+ }
+
+ rx_sa->sge = (struct ib_sge *)(&rx_sa->pages[rx_sa->page_cnt]);
+
+ rc = sdp_get_pages(sk, rx_sa->pages, rx_sa->page_cnt,
+ (unsigned long)iov->iov_base);
+ if (rc)
+ goto err_get_pages;
+
+ rc = sdp_map_dma_sge(sk, rx_sa->sge, rx_sa->page_cnt, rx_sa->pages,
+ offset, len);
+ if (rc)
+ goto err_map_dma;
+
+ wr.opcode = IB_WR_RDMA_READ;
+ wr.next = NULL;
+ wr.wr_id = SDP_OP_RDMA;
+ wr.wr.rdma.rkey = rx_sa->rkey;
+ wr.send_flags = 0;
+
+ timeo = sock_sndtimeo(sk, 0);
+
+ ssk->tx_ring.rdma_inflight = rx_sa;
+ copied = 0;
+ sge = rx_sa->sge;
+ sge_left = rx_sa->page_cnt;
+ do {
+ /* Len error when using sge_cnt > 30 ?? */
+ int sge_cnt = min(sge_left, SDP_MAX_SEND_SGES - 2);
+
+ wr.wr.rdma.remote_addr = rx_sa->vaddr + copied + rx_sa->used;
+ wr.num_sge = sge_cnt;
+ wr.sg_list = sge;
+ rx_sa->busy++;
+
+ sdp_dbg_data(sk, "rdma read: sge_cnt: %d vaddr: 0x%llx "
+ "copied: 0x%x rkey: 0x%x in_bytes: %d\n",
+ sge_cnt, wr.wr.rdma.remote_addr, copied, rx_sa->rkey,
+ sge_bytes(sge, sge_cnt));
+ sdp_prf1(sk, NULL, "TX: RDMA read");
+
+ if (sge_left == sge_cnt) {
+ wr.send_flags = IB_SEND_SIGNALED;
+ sdp_dbg_data(sk, "last wr is signaled\n");
+ }
+ rc = ib_post_send(ssk->qp, &wr, &bad_wr);
+ if (unlikely(rc)) {
+ sdp_warn(sk, "ib_post_send failed with status %d.\n",
+ rc);
+ sdp_set_error(&ssk->isk.sk, -ECONNRESET);
+ wake_up(&ssk->wq);
+ break;
+ }
+
+ copied += sge_bytes(sge, sge_cnt);
+ sge_left -= sge_cnt;
+ sge += sge_cnt;
+
+ if (unlikely(ssk->srcavail_cancel_mseq > rx_sa->mseq)) {
+ sdp_warn(sk, "got SrcAvailCancel - Aborting RDMA\n");
+ rc = -EAGAIN;
+ }
+ } while (!rc && sge_left > 0);
+
+ if (!rc || rc == -EAGAIN) {
+ int got_srcavail_cancel = (rc == -EAGAIN);
+
+ sdp_arm_tx_cq(sk);
+
+ rc = sdp_wait_rdma_wr_finished(ssk, &timeo);
+
+ /* Ignore any data copied after getting SrcAvailCancel */
+ if (!got_srcavail_cancel && !rc) {
+ sdp_update_iov_used(sk, iov, copied);
+ rx_sa->used += copied;
+ atomic_add(copied, &ssk->rcv_nxt);
+ }
+ }
+
+ if (rc) {
+ /* post rdma, wait_for_compl or post rdma_rd_comp failed -
+ * post sendsm */
+ sdp_warn(sk, "post rdma, wait_for_compl "
+ "or post rdma_rd_comp failed - post sendsm\n");
+ rx_sa->aborted = 1;
+ }
+
+ ssk->tx_ring.rdma_inflight = NULL;
+
+ sdp_unmap_dma_sge(sk, rx_sa->sge, rx_sa->page_cnt, offset, len);
+err_map_dma:
+ sdp_put_pages(sk, rx_sa->pages, rx_sa->page_cnt);
+err_get_pages:
+ kfree(rx_sa->pages);
+ rx_sa->pages = NULL;
+ rx_sa->sge = NULL;
+err_alloc_zcopy:
+
+ return rc;
+}
+
+static inline int wait_for_sndbuf(struct sock *sk, long *timeo_p)
+{
+ struct sdp_sock *ssk = sdp_sk(sk);
+ int ret = 0;
+ sdp_dbg_data(sk, "Wait for mem\n");
+
+ set_bit(SOCK_NOSPACE, &sk->sk_socket->flags);
+
+ SDPSTATS_COUNTER_INC(send_wait_for_mem);
+
+ sdp_do_posts(ssk);
+
+ sdp_xmit_poll(ssk, 1);
+
+ ret = sdp_bzcopy_wait_memory(ssk, timeo_p, &dummy_bz);
+
+ return ret;
+}
+
+int sdp_sendmsg_zcopy(struct kiocb *iocb, struct sock *sk, struct msghdr *msg,
+ size_t size)
+{
+ struct sdp_sock *ssk = sdp_sk(sk);
+ int iovlen, flags;
+ struct iovec *iov = NULL;
+ int rc = 0;
+ long timeo;
+ struct tx_srcavail_state *tx_sa;
+ int offset;
+ int copied = 0;
+
+ int bytes_left;
+ int pages_left;
+ u64 *addrs;
+ unsigned long lock_flags;
+
+ sdp_dbg_data(sk, "%s\n", __func__);
+
+ lock_sock(sk);
+
+ SDPSTATS_COUNTER_INC(sendmsg_zcopy_segment);
+
+ posts_handler_get(ssk);
+
+ flags = msg->msg_flags;
+ timeo = sock_sndtimeo(sk, 0);
+
+ /* Wait for a connection to finish. */
+ if ((1 << sk->sk_state) & ~(TCPF_ESTABLISHED | TCPF_CLOSE_WAIT))
+ if ((rc = sk_stream_wait_connect(sk, &timeo)) != 0)
+ goto err;
+
+ /* This should be in poll */
+ clear_bit(SOCK_ASYNC_NOSPACE, &sk->sk_socket->flags);
+
+ /* Ok commence sending. */
+ iovlen = msg->msg_iovlen;
+ iov = msg->msg_iov;
+ offset = (unsigned long)iov->iov_base & (PAGE_SIZE - 1);
+ sdp_dbg_data(sk, "Sending iov: %p, iovlen: %ld, size: %ld\n",
+ iov->iov_base, iov->iov_len, size);
+
+ SDPSTATS_HIST(sendmsg_seglen, iov->iov_len);
+
+ if (iovlen > 1) {
+ sdp_warn(sk, "iovlen > 1 not supported\n");
+ rc = -ENOTSUPP;
+ goto err;
+ }
+
+ if (sk->sk_err || (sk->sk_shutdown & SEND_SHUTDOWN)) {
+ rc = -EPIPE;
+ goto err;
+ }
+
+ tx_sa = sdp_alloc_tx_sa(sk, offset, iov->iov_len);
+ if (IS_ERR(tx_sa)) {
+ sdp_warn(sk, "Error allocating zcopy context\n");
+ goto err_alloc_tx_sa;
+ }
+
+ rc = sdp_get_pages(sk, tx_sa->pages, tx_sa->page_cnt,
+ (unsigned long)iov->iov_base);
+ if (rc)
+ goto err_get_pages;
+
+ rc = sdp_map_dma(sk, tx_sa->addrs, tx_sa->page_cnt, tx_sa->pages,
+ offset, iov->iov_len);
+ if (rc)
+ goto err_map_dma;
+
+ bytes_left = iov->iov_len;
+ pages_left = tx_sa->page_cnt;
+ addrs = tx_sa->addrs;
+ do {
+ int page_cnt = min(256, pages_left);
+ int len = min_t(int, page_cnt * PAGE_SIZE - offset, bytes_left);
+ sdp_dbg_data(sk, "bytes_left: %d\n", bytes_left);
+
+ if (!sdp_bzcopy_slots_avail(ssk, &dummy_bz)) {
+ rc = wait_for_sndbuf(sk, &timeo);
+ if (rc) {
+ sdp_warn(sk, "Couldn't get send buffer\n");
+ break;
+ }
+ }
+
+ tx_sa->fmr = sdp_map_fmr(sk, page_cnt, addrs);
+ if (!tx_sa->fmr) {
+ rc = -ENOMEM;
+ break;
+ }
+
+ tx_sa->bytes_completed = 0;
+ tx_sa->bytes_total = len;
+ rc = sdp_post_srcavail(sk, tx_sa, offset, len);
+ if (rc)
+ goto err_abort_send;
+
+ rc = sdp_wait_rdmardcompl(ssk, &timeo, len, 0);
+ if (unlikely(rc)) {
+ switch (rc) {
+ case -EAGAIN: /* Got SendSM */
+ sdp_warn(sk, "got SendSM. use SEND verb.\n");
+ goto err_abort_send;
+
+ case -ETIME: /* Timedout */
+ sdp_warn(sk, "TX srcavail timedout.\n");
+ case -EINTR: /* interrupted */
+ sdp_prf1(sk, NULL, "Aborting send.");
+ sdp_post_srcavail_cancel(sk);
+
+ /* Wait for RdmaRdCompl/SendSM to
+ * finish the transaction */
+ sdp_wait_rdmardcompl(ssk, &timeo, len, 1);
+
+ goto err_abort_send;
+
+ default:
+ sdp_warn(sk, "error sending srcavail. rc = %d\n", rc);
+ /* Socked destroyed while waited */
+ goto err_abort_send;
+ }
+ }
+ sdp_prf1(sk, NULL, "got RdmaRdCompl");
+ copied += len;
+ bytes_left -= len;
+ pages_left -= page_cnt;
+ addrs += page_cnt;
+ offset = 0;
+
+ sdp_update_iov_used(sk, iov, tx_sa->bytes_completed);
+
+err_abort_send:
+ ib_fmr_pool_unmap(tx_sa->fmr);
+ spin_lock_irqsave(&ssk->tx_sa_lock, lock_flags);
+ ssk->tx_sa = NULL;
+ spin_unlock_irqrestore(&ssk->tx_sa_lock, lock_flags);
+ } while (!rc && !tx_sa->abort && pages_left > 0);
+
+ sdp_unmap_dma(sk, tx_sa->addrs, tx_sa->page_cnt);
+err_map_dma:
+ sdp_put_pages(sk, tx_sa->pages, tx_sa->page_cnt);
+err_get_pages:
+ kfree(tx_sa);
+err_alloc_tx_sa:
+err:
+
+ sdp_prf1(sk, NULL, "Finshed RDMA rc: %d copied: %d", rc, copied);
+ posts_handler_put(ssk);
+ release_sock(sk);
+
+ return rc ?: copied;
+}
+