]> www.infradead.org Git - users/jedix/linux-maple.git/commitdiff
sdp: Add support for ZCopy combined mode - RDMA Read
authorAmir Vadai <amirv@mellanox.co.il>
Sun, 12 Jul 2009 12:32:36 +0000 (15:32 +0300)
committerMukesh Kacker <mukesh.kacker@oracle.com>
Tue, 6 Oct 2015 12:04:35 +0000 (05:04 -0700)
Signed-off-by: Amir Vadai <amirv@mellanox.co.il>
drivers/infiniband/ulp/sdp/Makefile
drivers/infiniband/ulp/sdp/sdp.h
drivers/infiniband/ulp/sdp/sdp_bcopy.c
drivers/infiniband/ulp/sdp/sdp_cma.c
drivers/infiniband/ulp/sdp/sdp_main.c
drivers/infiniband/ulp/sdp/sdp_proc.c
drivers/infiniband/ulp/sdp/sdp_rx.c
drivers/infiniband/ulp/sdp/sdp_tx.c
drivers/infiniband/ulp/sdp/sdp_zcopy.c [new file with mode: 0644]

index b14a16a4407a772c19d23d1b59aeb8aaca7455b1..5c250e9d9dac4057d164f0e8b92f6e4bface9060 100644 (file)
@@ -3,4 +3,4 @@ EXTRA_CFLAGS += -ggdb
 
 obj-$(CONFIG_INFINIBAND_SDP) += ib_sdp.o
 
-ib_sdp-objs := sdp_main.o sdp_cma.o sdp_bcopy.o sdp_proc.o sdp_tx.o sdp_rx.o
+ib_sdp-objs := sdp_main.o sdp_cma.o sdp_bcopy.o sdp_proc.o sdp_tx.o sdp_rx.o sdp_zcopy.o
index ed05c0e571ee50aa4e0f78745aeb62842fbb0358..0bb249bffc79fc7ea223573d6a31df36168118a0 100644 (file)
@@ -9,7 +9,7 @@
 #include <linux/sched.h>
 
 #define SDPSTATS_ON
-/* #define SDP_PROFILING */
+#define SDP_PROFILING
 
 #define _sdp_printk(func, line, level, sk, format, arg...) do {               \
        preempt_disable(); \
 #define sdp_warn(sk, format, arg...)                         \
        sdp_printk(KERN_WARNING, sk, format , ## arg)
 
-#define rx_ring_lock(ssk, f) do { \
-       spin_lock_irqsave(&ssk->rx_ring.lock, f); \
-} while (0)
-
-#define rx_ring_unlock(ssk, f) do { \
-       spin_unlock_irqrestore(&ssk->rx_ring.lock, f); \
-} while (0)
-
 #define SDP_MODPARAM_SINT(var, def_val, msg) \
        static int var = def_val; \
        module_param_named(var, var, int, 0644); \
@@ -88,8 +80,8 @@ static inline unsigned long long current_nsec(void)
        preempt_enable(); \
        1; \
 })
-#define sdp_prf(sk, s, format, arg...)
-/* #define sdp_prf(sk, s, format, arg...) sdp_prf1(sk, s, format, ## arg) */
+//#define sdp_prf(sk, s, format, arg...)
+#define sdp_prf(sk, s, format, arg...) sdp_prf1(sk, s, format, ## arg)
 
 #else
 #define sdp_prf1(sk, s, format, arg...)
@@ -102,7 +94,7 @@ extern int sdp_debug_level;
 #define sdp_dbg(sk, format, arg...)                          \
        do {                                                 \
                if (sdp_debug_level > 0)                     \
-               sdp_printk(KERN_DEBUG, sk, format , ## arg); \
+               sdp_printk(KERN_WARNING, sk, format , ## arg); \
        } while (0)
 
 #define sock_ref(sk, msg, sock_op) ({ \
@@ -136,7 +128,7 @@ extern int sdp_data_debug_level;
 #define sdp_dbg_data(sk, format, arg...)                               \
        do {                                                            \
                if (sdp_data_debug_level & 0x2)                         \
-                       sdp_printk(KERN_DEBUG, sk, format , ## arg);    \
+                       sdp_printk(KERN_WARNING, sk, format , ## arg);  \
        } while (0)
 #define SDP_DUMP_PACKET(sk, str, skb, h)                               \
        do {                                                            \
@@ -148,12 +140,42 @@ extern int sdp_data_debug_level;
 #define SDP_DUMP_PACKET(sk, str, skb, h)
 #endif
 
+#if 0
+#define lock_sock(sk) do { \
+       sdp_dbg_data(sk, "lock_sock: before lock\n"); \
+       lock_sock(sk); \
+       sdp_dbg_data(sk, "lock_sock: locked\n"); \
+} while (0)
+
+#define release_sock(sk) do { \
+       sdp_dbg_data(sk, "release_sock\n"); \
+       release_sock(sk); \
+} while (0)
+
+
+#undef sk_wait_event
+
+#define sk_wait_event(__sk, __timeo, __condition)              \
+({     int rc;                                                 \
+       release_sock(__sk);                                     \
+       rc = __condition;                                       \
+       if (!rc) {                                              \
+               *(__timeo) = schedule_timeout(*(__timeo));      \
+       }                                                       \
+       lock_sock(__sk);                                        \
+       rc = __condition;                                       \
+       rc;                                                     \
+})
+
+#endif
+
 #ifdef SDPSTATS_ON
 
 struct sdpstats {
        u32 post_send[256];
        u32 sendmsg_bcopy_segment;
        u32 sendmsg_bzcopy_segment;
+       u32 sendmsg_zcopy_segment;
        u32 sendmsg;
        u32 post_send_credits;
        u32 sendmsg_nagle_skip;
@@ -219,6 +241,8 @@ static inline void sdpstats_hist(u32 *h, u32 val, u32 maxidx, int is_log)
 #define SDP_TX_POLL_TIMEOUT    (HZ / 4)
 #define SDP_NAGLE_TIMEOUT (HZ / 10)
 
+#define SDP_SRCAVAIL_CANCEL_TIMEOUT (HZ * 5)
+
 #define SDP_RESOLVE_TIMEOUT 1000
 #define SDP_ROUTE_TIMEOUT 1000
 #define SDP_RETRY_COUNT 5
@@ -228,8 +252,13 @@ static inline void sdpstats_hist(u32 *h, u32 val, u32 maxidx, int is_log)
 #define SDP_TX_SIZE 0x40
 #define SDP_RX_SIZE 0x40
 
+#define SDP_FMR_SIZE           256
+#define SDP_FMR_POOL_SIZE      1024
+#define SDP_FMR_DIRTY_SIZE     ( SDP_FMR_POOL_SIZE / 4 )
+
 #define SDP_MAX_RECV_SKB_FRAGS (PAGE_SIZE > 0x8000 ? 1 : 0x8000 / PAGE_SIZE)
 #define SDP_MAX_SEND_SKB_FRAGS (SDP_MAX_RECV_SKB_FRAGS + 1)
+#define SDP_MAX_SEND_SGES 32
 #define SDP_HEAD_SIZE (PAGE_SIZE / 2 + sizeof(struct sdp_bsdh))
 #define SDP_NUM_WC 4
 #define SDP_MAX_PAYLOAD ((1 << 16) - SDP_HEAD_SIZE)
@@ -239,6 +268,8 @@ static inline void sdpstats_hist(u32 *h, u32 val, u32 maxidx, int is_log)
 
 #define SDP_OP_RECV 0x800000000LL
 #define SDP_OP_SEND 0x400000000LL
+#define SDP_OP_RDMA 0x200000000LL
+#define SDP_OP_NOP  0x100000000LL
 
 /* how long (in jiffies) to block sender till tx completion*/
 #define SDP_BZCOPY_POLL_TIMEOUT (HZ / 10)
@@ -246,7 +277,25 @@ static inline void sdpstats_hist(u32 *h, u32 val, u32 maxidx, int is_log)
 #define SDP_AUTO_CONF  0xffff
 #define AUTO_MOD_DELAY (HZ / 4)
 
-#define BZCOPY_STATE(skb) (*(struct bzcopy_state **)(skb->cb))
+struct bzcopy_state;
+struct rx_srcavail_state;
+
+struct sdp_skb_cb {
+       __u32           seq;            /* Starting sequence number     */
+       __u32           end_seq;        /* SEQ + FIN + SYN + datalen    */
+       __u8            flags;          /* TCP header flags.            */
+       struct bzcopy_state      *bz;
+       struct rx_srcavail_state *rx_sa;
+       struct tx_srcavail_state *tx_sa;
+};
+
+#define BZCOPY_STATE(skb) (((struct sdp_skb_cb *)(skb->cb))->bz)
+#define RX_SRCAVAIL_STATE(skb) (((struct sdp_skb_cb *)(skb->cb))->rx_sa)
+#define TX_SRCAVAIL_STATE(skb) (((struct sdp_skb_cb *)(skb->cb))->tx_sa)
+
+#define SDP_SKB_CB(__skb)      ((struct sdp_skb_cb *)&((__skb)->cb[0]))
+#undef TCP_SKB_CB
+
 #ifndef MIN
 #define MIN(a, b) (a < b ? a : b)
 #endif
@@ -259,8 +308,13 @@ enum sdp_mid {
        SDP_MID_HELLO = 0x0,
        SDP_MID_HELLO_ACK = 0x1,
        SDP_MID_DISCONN = 0x2,
+       SDP_MID_SENDSM = 0x4,
+       SDP_MID_RDMARDCOMPL = 0x6,
+       SDP_MID_SRCAVAIL_CANCEL = 0x8,
        SDP_MID_CHRCVBUF = 0xB,
        SDP_MID_CHRCVBUF_ACK = 0xC,
+       SDP_MID_SRCAVAIL = 0xFD,
+       SDP_MID_SINKAVAIL = 0xFE,
        SDP_MID_DATA = 0xFF,
 };
 
@@ -324,27 +378,88 @@ struct sdp_hah {
        __u32 actrcvsz;
 };
 
+struct sdp_rrch {
+       __u32 len;
+};
+
+struct sdp_srcah {
+       __u32 len;
+       __u32 rkey;
+       __u64 vaddr;
+};
+
 struct sdp_buf {
         struct sk_buff *skb;
         u64             mapping[SDP_MAX_SEND_SKB_FRAGS + 1];
 };
 
+/* Context used for synchronous zero copy bcopy (BZCOPY) */
+struct bzcopy_state {
+       unsigned char __user  *u_base;
+       int                    u_len;
+       int                    left;
+       int                    page_cnt;
+       int                    cur_page;
+       int                    cur_offset;
+       int                    busy;
+       struct sdp_sock      *ssk;
+       struct page         **pages;
+};
+
+struct rx_srcavail_state {
+       /* Advertised buffer stuff */
+       u32 mseq;
+       u32 used;
+       u32 len;
+       u32 rkey;
+       u64 vaddr;
+
+       /* Dest buff info */
+       u32             page_cnt;
+       struct page     **pages;
+       struct ib_sge   *sge;
+
+       /* Utility */
+       u8  busy;
+       u8  aborted;
+};
+
+struct tx_srcavail_state {
+       u8              busy;
+
+       u32             page_cnt;
+       struct page     **pages;
+       u64             *addrs;
+       struct ib_pool_fmr *fmr;
+       u32             bytes_completed;
+       u32             bytes_total;
+
+       u8              abort;
+       u32             mseq;
+};
+
 #define ring_head(ring)   (atomic_read(&(ring).head))
 #define ring_tail(ring)   (atomic_read(&(ring).tail))
 #define ring_posted(ring) (ring_head(ring) - ring_tail(ring))
 
-struct sdp_tx_ring {
-       struct sdp_buf   *buffer;
-       atomic_t          head;
-       atomic_t          tail;
-       struct ib_cq     *cq;
+#define rx_ring_posted(ssk) ring_posted(ssk->rx_ring)
+#define tx_ring_posted(ssk) (ring_posted(ssk->tx_ring) + \
+       (ssk->tx_ring.rdma_inflight ? ssk->tx_ring.rdma_inflight->busy : 0))
 
-       int               una_seq;
-       atomic_t          credits;
+struct sdp_tx_ring {
+       struct rx_srcavail_state *rdma_inflight;
+       struct sdp_buf          *buffer;
+       atomic_t                head;
+       atomic_t                tail;
+       struct ib_cq            *cq;
+
+       int                     una_seq;
+       atomic_t                credits;
 #define tx_credits(ssk) (atomic_read(&ssk->tx_ring.credits))
 
-       struct timer_list timer;
-       u16               poll_cnt;
+       struct timer_list       timer;
+       struct tasklet_struct   tasklet;
+       u16                     poll_cnt;
 };
 
 struct sdp_rx_ring {
@@ -353,12 +468,34 @@ struct sdp_rx_ring {
        atomic_t          tail;
        struct ib_cq     *cq;
 
-       spinlock_t       lock;
+       int              destroyed;
+       rwlock_t         destroyed_lock;
 };
 
-static inline int sdp_tx_ring_slots_left(struct sdp_tx_ring *tx_ring)
+static inline void rx_ring_unlock(struct sdp_rx_ring *rx_ring,
+               unsigned long *flags)
+{
+       read_unlock_irqrestore(&rx_ring->destroyed_lock, *flags);
+}
+
+static inline int rx_ring_trylock(struct sdp_rx_ring *rx_ring,
+               unsigned long *flags)
 {
-       return SDP_TX_SIZE - ring_posted(*tx_ring);
+       read_lock_irqsave(&rx_ring->destroyed_lock, *flags);
+       if (rx_ring->destroyed) {
+               rx_ring_unlock(rx_ring, flags);
+               return 0;
+       }
+       return 1;
+}
+
+static inline void rx_ring_destroy_lock(struct sdp_rx_ring *rx_ring)
+{
+       unsigned long flags;
+
+       write_lock_irqsave(&rx_ring->destroyed_lock, flags);
+       rx_ring->destroyed = 1;
+       write_unlock_irqrestore(&rx_ring->destroyed_lock, flags);
 }
 
 struct sdp_chrecvbuf {
@@ -393,6 +530,14 @@ struct sdp_moderation {
        int moder_time;
 };
 
+struct sdp_device {
+       struct ib_pd            *pd;
+       struct ib_mr            *mr;
+       struct ib_fmr_pool      *fmr_pool;
+};
+
+extern struct ib_client sdp_client;
+
 struct sdp_sock {
        /* sk has to be the first member of inet_sock */
        struct inet_sock isk;
@@ -401,6 +546,15 @@ struct sdp_sock {
        struct list_head backlog_queue;
        struct sk_buff_head rx_ctl_q;
        struct sock *parent;
+       struct sdp_device *sdp_dev;
+
+       int qp_active;
+       struct tx_srcavail_state *tx_sa;
+       spinlock_t tx_sa_lock;
+       int max_send_sge;
+       int srcavail_cancel;
+       struct delayed_work srcavail_cancel_work;
+       int srcavail_cancel_mseq;
 
        struct work_struct rx_comp_work;
        wait_queue_head_t wq;
@@ -458,7 +612,6 @@ struct sdp_sock {
 
        /* rdma specific */
        struct ib_qp *qp;
-       struct ib_mr *mr;
 
        /* SDP slow start */
        int rcvbuf_scale;       /* local recv buf scale for each socket */
@@ -479,20 +632,7 @@ struct sdp_sock {
        struct sdp_moderation auto_mod;
 
        /* BZCOPY data */
-       int   zcopy_thresh;
-};
-
-/* Context used for synchronous zero copy bcopy (BZCOY) */
-struct bzcopy_state {
-       unsigned char __user  *u_base;
-       int                    u_len;
-       int                    left;
-       int                    page_cnt;
-       int                    cur_page;
-       int                    cur_offset;
-       int                    busy;
-       struct sdp_sock      *ssk;
-       struct page         **pages;
+       int   bzcopy_thresh;
 };
 
 extern int rcvbuf_initial_size;
@@ -594,6 +734,9 @@ void sdp_cancel_dreq_wait_timeout(struct sdp_sock *ssk);
 void sdp_destroy_work(struct work_struct *work);
 void sdp_reset_sk(struct sock *sk, int rc);
 void sdp_reset(struct sock *sk);
+int sdp_bzcopy_wait_memory(struct sdp_sock *ssk, long *timeo_p,
+                                 struct bzcopy_state *bz);
+void skb_entail(struct sock *sk, struct sdp_sock *ssk, struct sk_buff *skb);
 
 /* sdp_proc.c */
 int __init sdp_proc_init(void);
@@ -602,6 +745,9 @@ void sdp_proc_unregister(void);
 /* sdp_cma.c */
 int sdp_cma_handler(struct rdma_cm_id *, struct rdma_cm_event *);
 
+/* sdp_bcopy.c */
+int sdp_post_credits(struct sdp_sock *ssk);
+
 /* sdp_tx.c */
 int sdp_tx_ring_create(struct sdp_sock *ssk, struct ib_device *device);
 void sdp_tx_ring_destroy(struct sdp_sock *ssk);
@@ -620,11 +766,33 @@ int sdp_init_buffers(struct sdp_sock *ssk, u32 new_size);
 void sdp_do_posts(struct sdp_sock *ssk);
 void sdp_rx_comp_full(struct sdp_sock *ssk);
 void sdp_remove_large_sock(struct sdp_sock *ssk);
+void sdp_handle_disconn(struct sock *sk);
+
+/* sdp_zcopy.c */
+int sdp_sendmsg_zcopy(struct kiocb *iocb, struct sock *sk, struct msghdr *msg,
+               size_t size);
+int sdp_handle_srcavail(struct sdp_sock *ssk, struct sdp_srcah *srcah);
+void sdp_handle_sendsm(struct sdp_sock *ssk, u32 mseq_ack);
+void sdp_handle_rdma_read_compl(struct sdp_sock *ssk, u32 mseq_ack,
+               u32 bytes_completed);
+int sdp_handle_rdma_read_cqe(struct sdp_sock *ssk);
+int sdp_rdma_to_iovec(struct sock *sk, struct iovec *iov, struct sk_buff *skb,
+               int len);
+int sdp_get_pages(struct sock *sk, struct page **pages, int page_cnt,
+               unsigned long addr);
+int sdp_post_rdma_rd_compl(struct sdp_sock *ssk, int copied);
+int sdp_post_sendsm(struct sdp_sock *ssk);
+void srcavail_cancel_timeout(struct work_struct *work);
+
+static inline int sdp_tx_ring_slots_left(struct sdp_sock *ssk)
+{
+       return SDP_TX_SIZE - tx_ring_posted(ssk);
+}
 
 static inline void sdp_arm_rx_cq(struct sock *sk)
 {
        sdp_prf(sk, NULL, "Arming RX cq");
-       sdp_dbg_data(sk, "Arming RX cq\n");
+//     sdp_dbg_data(sk, "Arming RX cq\n");
 
        ib_req_notify_cq(sdp_sk(sk)->rx_ring.cq, IB_CQ_NEXT_COMP);
 }
@@ -633,11 +801,34 @@ static inline void sdp_arm_tx_cq(struct sock *sk)
 {
        sdp_prf(sk, NULL, "Arming TX cq");
        sdp_dbg_data(sk, "Arming TX cq. credits: %d, posted: %d\n",
-               tx_credits(sdp_sk(sk)), ring_posted(sdp_sk(sk)->tx_ring));
+               tx_credits(sdp_sk(sk)), tx_ring_posted(sdp_sk(sk)));
 
        ib_req_notify_cq(sdp_sk(sk)->tx_ring.cq, IB_CQ_NEXT_COMP);
 }
 
+/* return the min of:
+ * - tx credits
+ * - free slots in tx_ring (not including SDP_MIN_TX_CREDITS
+ */
+static inline int tx_slots_free(struct sdp_sock *ssk)
+{
+       int min_free;
+
+       min_free = MIN(tx_credits(ssk),
+                       SDP_TX_SIZE - tx_ring_posted(ssk));
+       if (min_free < SDP_MIN_TX_CREDITS)
+               return 0;
+
+       return min_free - SDP_MIN_TX_CREDITS;
+};
+
+/* like sk_stream_memory_free - except measures remote credits */
+static inline int sdp_bzcopy_slots_avail(struct sdp_sock *ssk,
+                                        struct bzcopy_state *bz)
+{
+       return tx_slots_free(ssk) > bz->busy;
+}
+
 /* utilities */
 static inline char *mid2str(int mid)
 {
@@ -646,9 +837,14 @@ static inline char *mid2str(int mid)
                ENUM2STR(SDP_MID_HELLO),
                ENUM2STR(SDP_MID_HELLO_ACK),
                ENUM2STR(SDP_MID_DISCONN),
+               ENUM2STR(SDP_MID_SENDSM),
+               ENUM2STR(SDP_MID_RDMARDCOMPL),
+               ENUM2STR(SDP_MID_SRCAVAIL_CANCEL),
                ENUM2STR(SDP_MID_CHRCVBUF),
                ENUM2STR(SDP_MID_CHRCVBUF_ACK),
                ENUM2STR(SDP_MID_DATA),
+               ENUM2STR(SDP_MID_SRCAVAIL),
+               ENUM2STR(SDP_MID_SINKAVAIL),
        };
 
        if (mid >= ARRAY_SIZE(mid2str))
index 8ccd583bfd42b6cc332101038c9e1ca2699053e1..b743c4cf26c7c8d94f4380a544f192697a2ea069 100644 (file)
@@ -46,6 +46,8 @@ void _dump_packet(const char *func, int line, struct sock *sk, char *str,
        struct sdp_hh *hh;
        struct sdp_hah *hah;
        struct sdp_chrecvbuf *req_size;
+       struct sdp_rrch *rrch;
+       struct sdp_srcah *srcah;
        int len = 0;
        char buf[256];
        len += snprintf(buf, 255-len, "%s skb: %p mid: %2x:%-20s flags: 0x%x "
@@ -78,6 +80,25 @@ void _dump_packet(const char *func, int line, struct sock *sk, char *str,
        case SDP_MID_DATA:
                len += snprintf(buf + len, 255-len, "data_len: %ld |",
                        ntohl(h->len) - sizeof(struct sdp_bsdh));
+               break;
+       case SDP_MID_RDMARDCOMPL:
+               rrch = (struct sdp_rrch *)(h+1);
+
+               len += snprintf(buf + len, 255-len, " | len: %d |",
+                               ntohl(rrch->len));
+               break;
+       case SDP_MID_SRCAVAIL:
+               srcah = (struct sdp_srcah *)(h+1);
+
+               len += snprintf(buf + len, 255-len, " | data_len: %ld |",
+                               ntohl(h->len) - sizeof(struct sdp_bsdh) - 
+                               sizeof(struct sdp_srcah));
+
+               len += snprintf(buf + len, 255-len,
+                               " | len: %d, rkey: 0x%x, vaddr: 0x%llx |",
+                               ntohl(srcah->len), ntohl(srcah->rkey),
+                               be64_to_cpu(srcah->vaddr));
+               break;
        default:
                break;
        }
@@ -104,11 +125,12 @@ static inline int sdp_nagle_off(struct sdp_sock *ssk, struct sk_buff *skb)
 {
        int send_now =
                BZCOPY_STATE(skb) ||
-                (ssk->nonagle & TCP_NAGLE_OFF) ||
+               TX_SRCAVAIL_STATE(skb) ||
+               (ssk->nonagle & TCP_NAGLE_OFF) ||
                !ssk->nagle_last_unacked ||
                skb->next != (struct sk_buff *)&ssk->isk.sk.sk_write_queue ||
                skb->len + sizeof(struct sdp_bsdh) >= ssk->xmit_size_goal ||
-               (TCP_SKB_CB(skb)->flags & TCPCB_FLAG_PSH);
+               (SDP_SKB_CB(skb)->flags & TCPCB_FLAG_PSH);
 
        if (send_now) {
                unsigned long mseq = ring_head(ssk->tx_ring);
@@ -160,6 +182,33 @@ out2:
                mod_timer(&ssk->nagle_timer, jiffies + SDP_NAGLE_TIMEOUT);
 }
 
+int sdp_post_credits(struct sdp_sock *ssk)
+{
+       int post_count = 0;
+
+       sdp_dbg_data(&ssk->isk.sk, "credits: %d remote credits: %d "
+                       "tx ring slots left: %d send_head: %p\n",
+               tx_credits(ssk), remote_credits(ssk),
+               sdp_tx_ring_slots_left(ssk),
+               ssk->isk.sk.sk_send_head);
+
+       if (likely(tx_credits(ssk) > 1) &&
+           likely(sdp_tx_ring_slots_left(ssk))) {
+               struct sk_buff *skb;
+               skb = sdp_stream_alloc_skb(&ssk->isk.sk,
+                                         sizeof(struct sdp_bsdh),
+                                         GFP_KERNEL);
+               if (!skb)
+                       return -ENOMEM;
+               sdp_post_send(ssk, skb, SDP_MID_DATA);
+               post_count++;
+       }
+
+       if (post_count)
+               sdp_xmit_poll(ssk, 0);
+       return post_count;
+}
+
 void sdp_post_sends(struct sdp_sock *ssk, int nonagle)
 {
        /* TODO: nonagle? */
@@ -183,15 +232,30 @@ void sdp_post_sends(struct sdp_sock *ssk, int nonagle)
        else
                gfp_page = GFP_KERNEL;
 
-       if (sdp_tx_ring_slots_left(&ssk->tx_ring) < SDP_TX_SIZE / 2) {
+       if (sdp_tx_ring_slots_left(ssk) < SDP_TX_SIZE / 2) {
                int wc_processed = sdp_xmit_poll(ssk,  1);
                sdp_dbg_data(&ssk->isk.sk, "freed %d\n", wc_processed);
        }
 
+       if (ssk->tx_sa && ssk->srcavail_cancel &&
+               tx_credits(ssk) >= SDP_MIN_TX_CREDITS &&
+           sdp_tx_ring_slots_left(ssk)) {
+               sdp_prf1(&ssk->isk.sk, NULL, "Going to send srcavail cancel");
+               skb = sdp_stream_alloc_skb(&ssk->isk.sk,
+                                         sizeof(struct sdp_bsdh),
+                                         gfp_page);
+               /* FIXME */
+               BUG_ON(!skb);
+               TX_SRCAVAIL_STATE(skb) = ssk->tx_sa;
+               sdp_post_send(ssk, skb, SDP_MID_SRCAVAIL_CANCEL);
+               post_count++;
+               ssk->srcavail_cancel = 0;
+       }
+
        if (ssk->recv_request &&
            ring_tail(ssk->rx_ring) >= ssk->recv_request_head &&
            tx_credits(ssk) >= SDP_MIN_TX_CREDITS &&
-           sdp_tx_ring_slots_left(&ssk->tx_ring)) {
+           sdp_tx_ring_slots_left(ssk)) {
                struct sdp_chrecvbuf *resp_size;
                ssk->recv_request = 0;
                skb = sdp_stream_alloc_skb(&ssk->isk.sk,
@@ -208,19 +272,29 @@ void sdp_post_sends(struct sdp_sock *ssk, int nonagle)
        }
 
        if (tx_credits(ssk) <= SDP_MIN_TX_CREDITS &&
-              sdp_tx_ring_slots_left(&ssk->tx_ring) &&
+              sdp_tx_ring_slots_left(ssk) &&
               ssk->isk.sk.sk_send_head &&
                sdp_nagle_off(ssk, ssk->isk.sk.sk_send_head)) {
                SDPSTATS_COUNTER_INC(send_miss_no_credits);
        }
 
        while (tx_credits(ssk) > SDP_MIN_TX_CREDITS &&
-              sdp_tx_ring_slots_left(&ssk->tx_ring) &&
+              sdp_tx_ring_slots_left(ssk) &&
               (skb = ssk->isk.sk.sk_send_head) &&
                sdp_nagle_off(ssk, skb)) {
+               struct tx_srcavail_state *tx_sa;
                update_send_head(&ssk->isk.sk, skb);
                __skb_dequeue(&ssk->isk.sk.sk_write_queue);
-               sdp_post_send(ssk, skb, SDP_MID_DATA);
+
+               tx_sa = TX_SRCAVAIL_STATE(skb);
+               if (unlikely(tx_sa)) {
+                       if (likely(!tx_sa->abort))
+                               sdp_post_send(ssk, skb, SDP_MID_SRCAVAIL);
+                       else
+                               sdp_warn(&ssk->isk.sk, "Not sending aborted SrcAvail\n");       
+               } else {
+                       sdp_post_send(ssk, skb, SDP_MID_DATA);
+               }
                post_count++;
        }
 
@@ -228,9 +302,9 @@ void sdp_post_sends(struct sdp_sock *ssk, int nonagle)
        if (likely(c > SDP_MIN_TX_CREDITS))
                c *= 2;
 
-       if (unlikely(c < ring_posted(ssk->rx_ring)) &&
+       if (unlikely(c < rx_ring_posted(ssk)) &&
            likely(tx_credits(ssk) > 1) &&
-           likely(sdp_tx_ring_slots_left(&ssk->tx_ring)) &&
+           likely(sdp_tx_ring_slots_left(ssk)) &&
            likely((1 << ssk->isk.sk.sk_state) &
                    (TCPF_ESTABLISHED | TCPF_FIN_WAIT1))) {
                skb = sdp_stream_alloc_skb(&ssk->isk.sk,
index f8a730390e5585046134f89c8afa9da9620f5ce3..ab5ec58b4332d56a60c9f8c666570c139f4747f6 100644 (file)
@@ -62,34 +62,18 @@ static int sdp_init_qp(struct sock *sk, struct rdma_cm_id *id)
        struct ib_qp_init_attr qp_init_attr = {
                .event_handler = sdp_qp_event_handler,
                .cap.max_send_wr = SDP_TX_SIZE,
-               .cap.max_send_sge = SDP_MAX_SEND_SKB_FRAGS,
+               .cap.max_send_sge = SDP_MAX_SEND_SGES /*SDP_MAX_SEND_SKB_FRAGS*/,
                .cap.max_recv_wr = SDP_RX_SIZE,
                .cap.max_recv_sge = SDP_MAX_RECV_SKB_FRAGS + 1,
                .sq_sig_type = IB_SIGNAL_REQ_WR,
                .qp_type = IB_QPT_RC,
        };
        struct ib_device *device = id->device;
-       struct ib_mr *mr;
-       struct ib_pd *pd;
        int rc;
 
        sdp_dbg(sk, "%s\n", __func__);
 
-       pd = ib_alloc_pd(device);
-       if (IS_ERR(pd)) {
-               rc = PTR_ERR(pd);
-               sdp_warn(sk, "Unable to allocate PD: %d.\n", rc);
-               goto err_pd;
-       }
-
-        mr = ib_get_dma_mr(pd, IB_ACCESS_LOCAL_WRITE);
-        if (IS_ERR(mr)) {
-                rc = PTR_ERR(mr);
-               sdp_warn(sk, "Unable to get dma MR: %d.\n", rc);
-                goto err_mr;
-        }
-
-       sdp_sk(sk)->mr = mr;
+       sdp_sk(sk)->sdp_dev = ib_get_client_data(device, &sdp_client);
 
        rc = sdp_rx_ring_create(sdp_sk(sk), device);
        if (rc)
@@ -102,13 +86,27 @@ static int sdp_init_qp(struct sock *sk, struct rdma_cm_id *id)
        qp_init_attr.recv_cq = sdp_sk(sk)->rx_ring.cq;
        qp_init_attr.send_cq = sdp_sk(sk)->tx_ring.cq;
 
-       rc = rdma_create_qp(id, pd, &qp_init_attr);
+       rc = rdma_create_qp(id, sdp_sk(sk)->sdp_dev->pd, &qp_init_attr);
        if (rc) {
                sdp_warn(sk, "Unable to create QP: %d.\n", rc);
                goto err_qp;
        }
        sdp_sk(sk)->qp = id->qp;
        sdp_sk(sk)->ib_device = device;
+       sdp_sk(sk)->qp_active = 1;
+
+{
+       struct ib_qp_attr qp_attr;
+       struct ib_qp_init_attr qp_init_attr;
+
+       rc = ib_query_qp(sdp_sk(sk)->qp, 
+               &qp_attr,
+               0,
+               &qp_init_attr);
+
+       sdp_sk(sk)->max_send_sge = qp_attr.cap.max_send_sge;
+       sdp_dbg(sk, "max_send_sge = %d\n", sdp_sk(sk)->max_send_sge);
+}
 
        init_waitqueue_head(&sdp_sk(sk)->wq);
 
@@ -120,10 +118,6 @@ err_qp:
 err_tx:
        sdp_rx_ring_destroy(sdp_sk(sk));
 err_rx:
-       ib_dereg_mr(sdp_sk(sk)->mr);
-err_mr:
-       ib_dealloc_pd(pd);
-err_pd:
        return rc;
 }
 
@@ -333,14 +327,14 @@ int sdp_cma_handler(struct rdma_cm_id *id, struct rdma_cm_event *event)
                if (rc)
                        break;
                atomic_set(&sdp_sk(sk)->remote_credits,
-                               ring_posted(sdp_sk(sk)->rx_ring));
+                               rx_ring_posted(sdp_sk(sk)));
                memset(&hh, 0, sizeof hh);
                hh.bsdh.mid = SDP_MID_HELLO;
+               hh.bsdh.bufs = htons(remote_credits(sdp_sk(sk)));
                hh.bsdh.len = htonl(sizeof(struct sdp_bsdh) + SDP_HH_SIZE);
                hh.max_adverts = 1;
                hh.majv_minv = SDP_MAJV_MINV;
                sdp_init_buffers(sdp_sk(sk), rcvbuf_initial_size);
-               hh.bsdh.bufs = htons(ring_posted(sdp_sk(sk)->rx_ring));
                hh.localrcvsz = hh.desremrcvsz = htonl(sdp_sk(sk)->recv_frags *
                                PAGE_SIZE + sizeof(struct sdp_bsdh));
                hh.max_adverts = 0x1;
@@ -354,6 +348,7 @@ int sdp_cma_handler(struct rdma_cm_id *id, struct rdma_cm_event *event)
                conn_param.retry_count = SDP_RETRY_COUNT;
                SDP_DUMP_PACKET(NULL, "TX", NULL, &hh.bsdh);
                rc = rdma_connect(id, &conn_param);
+//             sdp_sk(sk)->qp_active = 1;
                break;
        case RDMA_CM_EVENT_ROUTE_ERROR:
                sdp_dbg(sk, "RDMA_CM_EVENT_ROUTE_ERROR : %p\n", id);
@@ -363,15 +358,16 @@ int sdp_cma_handler(struct rdma_cm_id *id, struct rdma_cm_event *event)
                sdp_dbg(sk, "RDMA_CM_EVENT_CONNECT_REQUEST\n");
                rc = sdp_connect_handler(sk, id, event);
                if (rc) {
+                       sdp_warn(sk, "Destroy qp !!!!\n");
                        rdma_reject(id, NULL, 0);
                        break;
                }
                child = id->context;
                atomic_set(&sdp_sk(child)->remote_credits,
-                               ring_posted(sdp_sk(child)->rx_ring));
+                               rx_ring_posted(sdp_sk(child)));
                memset(&hah, 0, sizeof hah);
                hah.bsdh.mid = SDP_MID_HELLO_ACK;
-               hah.bsdh.bufs = htons(ring_posted(sdp_sk(child)->rx_ring));
+               hah.bsdh.bufs = htons(remote_credits(sdp_sk(child)));
                hah.bsdh.len = htonl(sizeof(struct sdp_bsdh) + SDP_HAH_SIZE);
                hah.majv_minv = SDP_MAJV_MINV;
                hah.ext_max_adverts = 1; /* Doesn't seem to be mandated by spec,
@@ -391,15 +387,24 @@ int sdp_cma_handler(struct rdma_cm_id *id, struct rdma_cm_event *event)
                        id->qp = NULL;
                        id->context = NULL;
                        parent = sdp_sk(child)->parent; /* TODO: hold ? */
+               } else {
+//                     sdp_sk(child)->qp_active = 1;
                }
                break;
        case RDMA_CM_EVENT_CONNECT_RESPONSE:
                sdp_dbg(sk, "RDMA_CM_EVENT_CONNECT_RESPONSE\n");
                rc = sdp_response_handler(sk, id, event);
-               if (rc)
+               if (rc) {
+                       sdp_warn(sk, "Destroy qp !!!!\n");
                        rdma_reject(id, NULL, 0);
+               }
                else
                        rc = rdma_accept(id, NULL);
+
+               if (!rc) {
+//                     sdp_sk(sk)->qp_active = 1;
+                       rc = sdp_post_credits(sdp_sk(sk)) < 0 ?: 0;
+               }
                break;
        case RDMA_CM_EVENT_CONNECT_ERROR:
                sdp_dbg(sk, "RDMA_CM_EVENT_CONNECT_ERROR\n");
@@ -431,6 +436,7 @@ int sdp_cma_handler(struct rdma_cm_id *id, struct rdma_cm_event *event)
                                __func__);
                }
 
+               sdp_sk(sk)->qp_active = 0;
                rdma_disconnect(id);
 
                if (sk->sk_state != TCP_TIME_WAIT) {
index 9c5a8ed9b2e4e95d6991dd2601321d55f07f85b3..c547ff18bfd7fcb6c77716b8560d0adc743f46d0 100644 (file)
@@ -68,6 +68,7 @@ unsigned int csum_partial_copy_from_user_new (const char *src, char *dst,
 #include <net/protocol.h>
 #include <net/inet_common.h>
 #include <rdma/rdma_cm.h>
+#include <rdma/ib_fmr_pool.h>
 #include <rdma/ib_verbs.h>
 /* TODO: remove when sdp_socket.h becomes part of include/linux/socket.h */
 #include "sdp_socket.h"
@@ -81,7 +82,7 @@ MODULE_LICENSE("Dual BSD/GPL");
 #ifdef CONFIG_INFINIBAND_SDP_DEBUG
 SDP_MODPARAM_INT(sdp_debug_level, 0, "Enable debug tracing if > 0.");
 #endif
-#ifdef CONFIG_INFINIBAND_SDP_DEBUG
+#ifdef CONFIG_INFINIBAND_SDP_DEBUG_DATA
 SDP_MODPARAM_INT(sdp_data_debug_level, 0,
                "Enable data path debug tracing if > 0.");
 #endif
@@ -91,8 +92,10 @@ SDP_MODPARAM_SINT(recv_poll_miss, -1, "How many times recv poll missed.");
 SDP_MODPARAM_SINT(recv_poll, 1000, "How many times to poll recv.");
 SDP_MODPARAM_SINT(sdp_keepalive_time, SDP_KEEPALIVE_TIME,
        "Default idle time in seconds before keepalive probe sent.");
-SDP_MODPARAM_SINT(sdp_zcopy_thresh, 65536, "Zero copy send threshold; 0=0ff.");
-
+SDP_MODPARAM_SINT(sdp_bzcopy_thresh, 65536,
+       "Zero copy send using SEND threshold; 0=0ff.");
+SDP_MODPARAM_SINT(sdp_zcopy_thresh, 128*1024,
+       "Zero copy using RDMA threshold; 0=0ff.");
 #define SDP_RX_COAL_TIME_HIGH 128
 SDP_MODPARAM_SINT(sdp_rx_coal_target, 0x50000,
        "Target number of bytes to coalesce with interrupt moderation.");
@@ -168,6 +171,7 @@ static int sdp_get_port(struct sock *sk, unsigned short snum)
 
        rc = rdma_bind_addr(ssk->id, (struct sockaddr *)&addr);
        if (rc) {
+               sdp_warn(sk, "Destroy qp !!!!\n");
                rdma_destroy_id(ssk->id);
                ssk->id = NULL;
                return rc;
@@ -180,32 +184,21 @@ static int sdp_get_port(struct sock *sk, unsigned short snum)
 
 static void sdp_destroy_qp(struct sdp_sock *ssk)
 {
-       struct ib_pd *pd = NULL;
-
-
        sdp_dbg(&ssk->isk.sk, "destroying qp\n");
        sdp_prf(&ssk->isk.sk, NULL, "destroying qp");
 
-       del_timer(&ssk->tx_ring.timer);
+       ssk->qp_active = 0;
 
+       del_timer(&ssk->tx_ring.timer);
 
        sdp_rx_ring_destroy(ssk);
        sdp_tx_ring_destroy(ssk);
 
        if (ssk->qp) {
-               pd = ssk->qp->pd;
                ib_destroy_qp(ssk->qp);
                ssk->qp = NULL;
        }
 
-       if (ssk->mr) {
-               ib_dereg_mr(ssk->mr);
-               ssk->mr = NULL;
-       }
-
-       if (pd)
-               ib_dealloc_pd(pd);
-
        sdp_remove_large_sock(ssk);
 }
 
@@ -523,10 +516,12 @@ static void sdp_destruct(struct sock *sk)
 
        sdp_dbg(sk, "%s\n", __func__);
        if (ssk->destructed_already) {
-               sdp_warn(sk, "redestructing sk!");
+               sdp_warn(sk, "redestructing sk!\n");
                return;
        }
 
+       cancel_delayed_work(&ssk->srcavail_cancel_work);
+
        ssk->destructed_already = 1;
 
        sdp_remove_sock(ssk);
@@ -749,8 +744,10 @@ static int sdp_disconnect(struct sock *sk, int flags)
        sdp_dbg(sk, "%s\n", __func__);
 
        if (old_state != TCP_LISTEN) {
-               if (ssk->id)
+               if (ssk->id) {
+                       sdp_sk(sk)->qp_active = 0;
                        rc = rdma_disconnect(ssk->id);
+               }
 
                return rc;
        }
@@ -973,6 +970,8 @@ void sdp_destroy_work(struct work_struct *work)
 
        sdp_cancel_dreq_wait_timeout(ssk);
 
+       cancel_delayed_work(&ssk->srcavail_cancel_work);
+
        if (sk->sk_state == TCP_TIME_WAIT)
                sock_put(sk, SOCK_REF_CM_TW);
 
@@ -1009,15 +1008,25 @@ void sdp_dreq_wait_timeout_work(struct work_struct *work)
 
        release_sock(sk);
 
-       if (sdp_sk(sk)->id)
+       if (sdp_sk(sk)->id) {
+               sdp_warn(sk, "Destroyed QP!!!!\n");
+               sdp_sk(sk)->qp_active = 0;
                rdma_disconnect(sdp_sk(sk)->id);
-       else
+       else
                sock_put(sk, SOCK_REF_CM_TW);
 
 out:
        sock_put(sk, SOCK_REF_DREQ_TO);
 }
 
+/*
+ * Only SDP interact with this receive queue. Don't want
+ * lockdep warnings that using spinlock irqsave
+ */
+static struct lock_class_key ib_sdp_sk_receive_queue_lock_key;
+
+static struct lock_class_key ib_sdp_sk_callback_lock_key;
+
 int sdp_init_sock(struct sock *sk)
 {
        struct sdp_sock *ssk = sdp_sk(sk);
@@ -1027,8 +1036,15 @@ int sdp_init_sock(struct sock *sk)
        INIT_LIST_HEAD(&ssk->accept_queue);
        INIT_LIST_HEAD(&ssk->backlog_queue);
        INIT_DELAYED_WORK(&ssk->dreq_wait_work, sdp_dreq_wait_timeout_work);
+       INIT_DELAYED_WORK(&ssk->srcavail_cancel_work, srcavail_cancel_timeout);
        INIT_WORK(&ssk->destroy_work, sdp_destroy_work);
 
+       lockdep_set_class(&sk->sk_receive_queue.lock,
+                                       &ib_sdp_sk_receive_queue_lock_key);
+
+       lockdep_set_class(&sk->sk_callback_lock,
+                                       &ib_sdp_sk_callback_lock_key);
+
        sk->sk_route_caps |= NETIF_F_SG | NETIF_F_NO_CSUM;
 
        skb_queue_head_init(&ssk->rx_ctl_q);
@@ -1040,10 +1056,14 @@ int sdp_init_sock(struct sock *sk)
        ssk->sdp_disconnect = 0;
        ssk->destructed_already = 0;
        ssk->destruct_in_process = 0;
+       ssk->srcavail_cancel = 0;
        spin_lock_init(&ssk->lock);
+       spin_lock_init(&ssk->tx_sa_lock);
 
        atomic_set(&ssk->somebody_is_doing_posts, 0);
 
+       ssk->tx_ring.rdma_inflight = NULL;
+
        return 0;
 }
 
@@ -1079,7 +1099,7 @@ static void sdp_shutdown(struct sock *sk, int how)
 
 static void sdp_mark_push(struct sdp_sock *ssk, struct sk_buff *skb)
 {
-       TCP_SKB_CB(skb)->flags |= TCPCB_FLAG_PSH;
+       SDP_SKB_CB(skb)->flags |= TCPCB_FLAG_PSH;
        ssk->pushed_seq = ssk->write_seq;
        sdp_do_posts(ssk);
 }
@@ -1187,7 +1207,7 @@ static int sdp_setsockopt(struct sock *sk, int level, int optname,
                if (val < SDP_MIN_ZCOPY_THRESH || val > SDP_MAX_ZCOPY_THRESH)
                        err = -EINVAL;
                else
-                       ssk->zcopy_thresh = val;
+                       ssk->bzcopy_thresh = val;
                break;
        default:
                err = -ENOPROTOOPT;
@@ -1231,7 +1251,7 @@ static int sdp_getsockopt(struct sock *sk, int level, int optname,
                val = (ssk->keepalive_time ? : sdp_keepalive_time) / HZ;
                break;
        case SDP_ZCOPY_THRESH:
-               val = ssk->zcopy_thresh ? ssk->zcopy_thresh : sdp_zcopy_thresh;
+               val = ssk->bzcopy_thresh ? ssk->bzcopy_thresh : sdp_bzcopy_thresh;
                break;
        default:
                return -ENOPROTOOPT;
@@ -1316,7 +1336,7 @@ static inline void sdp_mark_urg(struct sock *sk, struct sdp_sock *ssk,
 {
        if (unlikely(flags & MSG_OOB)) {
                struct sk_buff *skb = sk->sk_write_queue.prev;
-               TCP_SKB_CB(skb)->flags |= TCPCB_FLAG_URG;
+               SDP_SKB_CB(skb)->flags |= TCPCB_FLAG_URG;
        }
 }
 
@@ -1327,8 +1347,7 @@ static inline void sdp_push(struct sock *sk, struct sdp_sock *ssk, int flags)
        sdp_do_posts(sdp_sk(sk));
 }
 
-static inline void skb_entail(struct sock *sk, struct sdp_sock *ssk,
-                              struct sk_buff *skb)
+void skb_entail(struct sock *sk, struct sdp_sock *ssk, struct sk_buff *skb)
 {
         skb_header_release(skb);
         __skb_queue_tail(&sk->sk_write_queue, skb);
@@ -1363,8 +1382,9 @@ static inline struct bzcopy_state *sdp_bz_cleanup(struct bzcopy_state *bz)
        }
 
        if (bz->pages) {
-               for (i = 0; i < bz->cur_page; i++)
+               for (i = 0; i < bz->cur_page; i++) {
                        put_page(bz->pages[i]);
+               }
 
                kfree(bz->pages);
        }
@@ -1382,11 +1402,11 @@ static struct bzcopy_state *sdp_bz_setup(struct sdp_sock *ssk,
 {
        struct bzcopy_state *bz;
        unsigned long addr;
-       int done_pages = 0;
        int thresh;
        mm_segment_t cur_fs;
+       int rc = 0;
 
-       thresh = ssk->zcopy_thresh ? : sdp_zcopy_thresh;
+       thresh = ssk->bzcopy_thresh ? : sdp_bzcopy_thresh;
        if (thresh == 0 || len < thresh || !capable(CAP_IPC_LOCK)) {
                SDPSTATS_COUNTER_INC(sendmsg_bcopy_segment);
                return NULL;
@@ -1425,36 +1445,18 @@ static struct bzcopy_state *sdp_bz_setup(struct sdp_sock *ssk,
                return ERR_PTR(-ENOMEM);
        }
 
+       rc = sdp_get_pages(&ssk->isk.sk, bz->pages, bz->page_cnt,
+                       (unsigned long)base);
 
-       addr &= PAGE_MASK;
-       if (segment_eq(cur_fs, KERNEL_DS)) {
-               for (done_pages = 0; done_pages < bz->page_cnt; done_pages++) {
-                       bz->pages[done_pages] = virt_to_page(addr);
-                       if (!bz->pages[done_pages])
-                               break;
-                       get_page(bz->pages[done_pages]);
-                       addr += PAGE_SIZE;
-               }
-       } else {
-               if (current->mm) {
-                       down_write(&current->mm->mmap_sem);
-                       done_pages = get_user_pages(current, current->mm, addr,
-                                       bz->page_cnt, 0, 0, bz->pages, NULL);
-                       up_write(&current->mm->mmap_sem);
-               }
-       }
-       if (unlikely(done_pages != bz->page_cnt)){
-               int i;
-               if (done_pages > 0) {
-                       for (i = 0; i < done_pages; i++)
-                               put_page(bz->pages[i]);
-               }
-               kfree(bz->pages);
-               kfree(bz);
-               bz = ERR_PTR(-EFAULT);
-       }
+       if (unlikely(rc))
+               goto err;
 
        return bz;
+
+err:
+       kfree(bz->pages);
+       kfree(bz);
+       return ERR_PTR(-EFAULT);
 }
 
 #define TCP_PAGE(sk)   (sk->sk_sndmsg_page)
@@ -1606,31 +1608,8 @@ static inline int sdp_bzcopy_get(struct sock *sk, struct sk_buff *skb,
        return copy;
 }
 
-/* return the min of:
- * - tx credits
- * - free slots in tx_ring (not including SDP_MIN_TX_CREDITS
- */
-static inline int tx_slots_free(struct sdp_sock *ssk)
-{
-       int min_free;
-
-       min_free = MIN(tx_credits(ssk),
-                       SDP_TX_SIZE - ring_posted(ssk->tx_ring));
-       if (min_free < SDP_MIN_TX_CREDITS)
-               return 0;
-
-       return min_free - SDP_MIN_TX_CREDITS;
-};
-
-/* like sk_stream_memory_free - except measures remote credits */
-static inline int sdp_bzcopy_slots_avail(struct sdp_sock *ssk,
-                                        struct bzcopy_state *bz)
-{
-       return tx_slots_free(ssk) > bz->busy;
-}
-
 /* like sk_stream_wait_memory - except waits on remote credits */
-static int sdp_bzcopy_wait_memory(struct sdp_sock *ssk, long *timeo_p,
+int sdp_bzcopy_wait_memory(struct sdp_sock *ssk, long *timeo_p,
                                  struct bzcopy_state *bz)
 {
        struct sock *sk = &ssk->isk.sk;
@@ -1714,6 +1693,15 @@ static int sdp_sendmsg(struct kiocb *iocb, struct sock *sk, struct msghdr *msg,
        long timeo;
        struct bzcopy_state *bz = NULL;
        SDPSTATS_COUNTER_INC(sendmsg);
+
+       if (sdp_zcopy_thresh && size > sdp_zcopy_thresh) {
+               err = sdp_sendmsg_zcopy(iocb, sk, msg, size);
+               if (err != -EAGAIN)
+                       return err;
+
+               /* Got SendSM/Timedout - fallback to regular send */    
+       }
+
        lock_sock(sk);
        sdp_dbg_data(sk, "%s\n", __func__);
 
@@ -1816,7 +1804,7 @@ new_segment:
 
                        /* OOB data byte should be the last byte of
                           the data payload */
-                       if (unlikely(TCP_SKB_CB(skb)->flags & TCPCB_FLAG_URG) &&
+                       if (unlikely(SDP_SKB_CB(skb)->flags & TCPCB_FLAG_URG) &&
                            !(flags & MSG_OOB)) {
                                sdp_mark_push(ssk, skb);
                                goto new_segment;
@@ -1835,10 +1823,10 @@ new_segment:
                        }
 
                        if (!copied)
-                               TCP_SKB_CB(skb)->flags &= ~TCPCB_FLAG_PSH;
+                               SDP_SKB_CB(skb)->flags &= ~TCPCB_FLAG_PSH;
 
                        ssk->write_seq += copy;
-                       TCP_SKB_CB(skb)->end_seq += copy;
+                       SDP_SKB_CB(skb)->end_seq += copy;
                        /*unused: skb_shinfo(skb)->gso_segs = 0;*/
 
                        from += copy;
@@ -1888,10 +1876,10 @@ out:
                        bz = sdp_bz_cleanup(bz);
        }
 
-       posts_handler_put(ssk);
-
        sdp_auto_moderation(ssk);
 
+       posts_handler_put(ssk);
+
        release_sock(sk);
        return copied;
 
@@ -1916,6 +1904,7 @@ out_err:
        posts_handler_put(ssk);
 
        release_sock(sk);
+
        return err;
 }
 
@@ -1936,9 +1925,13 @@ static int sdp_recvmsg(struct kiocb *iocb, struct sock *sk, struct msghdr *msg,
        u32 *seq;
        int copied = 0;
        int rc;
+       int avail_bytes_count = 0;      /* Could be inlined in skb */
+                                       /* or advertised for RDMA  */
 
        lock_sock(sk);
-       sdp_dbg_data(sk, "%s\n", __func__);
+       sdp_dbg_data(sk, "iovlen: %ld iov_len: %ld flags: 0x%x peek: 0x%x\n",
+                       msg->msg_iovlen, msg->msg_iov[0].iov_len, flags,
+                       MSG_PEEK);
 
        posts_handler_get(ssk);
 
@@ -1962,6 +1955,7 @@ static int sdp_recvmsg(struct kiocb *iocb, struct sock *sk, struct msghdr *msg,
        target = sock_rcvlowat(sk, flags & MSG_WAITALL, len);
 
        do {
+               struct rx_srcavail_state *rx_sa = NULL;
                u32 offset;
 
                /* Are we at urgent data? Stop if we have read anything or have
@@ -1978,21 +1972,51 @@ static int sdp_recvmsg(struct kiocb *iocb, struct sock *sk, struct msghdr *msg,
 
                skb = skb_peek(&sk->sk_receive_queue);
                do {
+                       struct sdp_bsdh *h;
                        if (!skb)
                                break;
 
-                       if ((skb_transport_header(skb))[0] == SDP_MID_DISCONN)
+                       avail_bytes_count = 0;
+
+                       h = (struct sdp_bsdh *)skb_transport_header(skb);
+
+                       switch (h->mid) {
+                       case SDP_MID_DISCONN:
+                               sdp_dbg(sk, "Handle RX SDP_MID_DISCONN\n");
+                               sdp_prf(sk, NULL, "Handle RX SDP_MID_DISCONN");
+                               sdp_handle_disconn(sk);
                                goto found_fin_ok;
 
-                       if (before(*seq, TCP_SKB_CB(skb)->seq)) {
+                       case SDP_MID_SRCAVAIL:
+                               rx_sa = RX_SRCAVAIL_STATE(skb);
+                               if (rx_sa->mseq < ssk->srcavail_cancel_mseq) {
+                                       rx_sa->aborted = 1;
+                                       sdp_warn(sk, "Ignoring src avail - "
+                                               "due to SrcAvailCancel\n");
+                                       goto skb_cleanup;
+                               }
+                               avail_bytes_count = rx_sa->len;
+                               break;
+
+                       case SDP_MID_DATA:
+                               rx_sa = NULL;
+                               avail_bytes_count = skb->len;
+                               break;
+                       default:
+                               break;
+                       }
+
+                       if (before(*seq, SDP_SKB_CB(skb)->seq)) {
                                sdp_warn(sk, "recvmsg bug: copied %X seq %X\n",
-                                       *seq, TCP_SKB_CB(skb)->seq);
+                                       *seq, SDP_SKB_CB(skb)->seq);
                                sdp_reset(sk);
                                break;
                        }
 
-                       offset = *seq - TCP_SKB_CB(skb)->seq;
-                       if (offset < skb->len)
+                       offset = *seq - SDP_SKB_CB(skb)->seq;
+//                     sdp_warn(sk, "offset = 0x%x, avail_byte_count = 0x%x\n",
+//                                     offset, avail_bytes_count);
+                       if (offset < avail_bytes_count)
                                goto found_ok_skb;
 
                        WARN_ON(!(flags & MSG_PEEK));
@@ -2066,10 +2090,10 @@ static int sdp_recvmsg(struct kiocb *iocb, struct sock *sk, struct msghdr *msg,
                continue;
 
        found_ok_skb:
-               sdp_dbg_data(sk, "found_ok_skb len %d\n", skb->len);
-               sdp_dbg_data(sk, "len %Zd offset %d\n", len, offset);
+               sdp_dbg_data(sk, "bytes avail: %d\n", avail_bytes_count);
+               sdp_dbg_data(sk, "buf len %Zd offset %d\n", len, offset);
                sdp_dbg_data(sk, "copied %d target %d\n", copied, target);
-               used = skb->len - offset;
+               used = avail_bytes_count - offset;
                if (len < used)
                        used = len;
 
@@ -2091,9 +2115,20 @@ static int sdp_recvmsg(struct kiocb *iocb, struct sock *sk, struct msghdr *msg,
                        }
                }
                if (!(flags & MSG_TRUNC)) {
-                       err = skb_copy_datagram_iovec(skb, offset,
+                       if (rx_sa) {
+                               sdp_dbg_data(sk, "Got srcavail - using RDMA\n");
+                               err = sdp_rdma_to_iovec(sk, msg->msg_iov, skb,
+                                               used);
+                               if (err == -EAGAIN) {
+                                       sdp_warn(sk, "RDMA Read aborted\n");
+                                       used = 0;
+                                       goto skb_cleanup;
+                               }
+                       } else {
+                               err = skb_copy_datagram_iovec(skb, offset,
                                                      /* TODO: skip header? */
                                                      msg->msg_iov, used);
+                       }
                        if (err) {
                                sdp_dbg(sk, "%s: skb_copy_datagram_iovec failed"
                                        "offset %d size %ld status %d\n",
@@ -2115,17 +2150,36 @@ static int sdp_recvmsg(struct kiocb *iocb, struct sock *sk, struct msghdr *msg,
 skip_copy:
                if (ssk->urg_data && after(ssk->copied_seq, ssk->urg_seq))
                        ssk->urg_data = 0;
-               if (used + offset < skb->len)
+
+
+               if (rx_sa) {
+                       if (ssk->srcavail_cancel_mseq < rx_sa->mseq) {
+                               rc = sdp_post_rdma_rd_compl(ssk, rx_sa->used);
+                               BUG_ON(rc);
+                       }
+                       if (rx_sa->aborted) {
+                               sdp_warn(sk, "RDMA aborted. Sending SendSM\n");
+                               rc = sdp_post_sendsm(ssk);
+                               BUG_ON(rc);
+                       }
+               }
+
+               if ((!rx_sa && used + offset < skb->len) ||
+                       (rx_sa && !rx_sa->aborted && rx_sa->used < rx_sa->len))
                        continue;
                offset = 0;
 
-               if (!(flags & MSG_PEEK)) {
+skb_cleanup:
+               if (!(flags & MSG_PEEK) || (rx_sa && rx_sa->aborted)) {
                        struct sdp_bsdh *h;
                        h = (struct sdp_bsdh *)skb_transport_header(skb);
                        sdp_prf1(sk, skb, "READ finished. mseq: %d mseq_ack:%d",
                                ntohl(h->mseq), ntohl(h->mseq_ack));
                        skb_unlink(skb, &sk->sk_receive_queue);
                        __kfree_skb(skb);
+
+                       kfree(rx_sa);
+                       rx_sa = NULL;
                }
                continue;
 found_fin_ok:
@@ -2261,7 +2315,7 @@ void sdp_urg(struct sdp_sock *ssk, struct sk_buff *skb)
        u8 tmp;
        u32 ptr = skb->len - 1;
 
-       ssk->urg_seq = TCP_SKB_CB(skb)->seq + ptr;
+       ssk->urg_seq = SDP_SKB_CB(skb)->seq + ptr;
 
        if (skb_copy_bits(skb, ptr, &tmp, 1))
                BUG();
@@ -2376,6 +2430,53 @@ static int sdp_create_socket(struct net *net, struct socket *sock, int protocol)
 
 static void sdp_add_device(struct ib_device *device)
 {
+       struct sdp_device *sdp_dev;
+       struct ib_fmr_pool_param fmr_param;
+
+       sdp_dev = kmalloc(sizeof *sdp_dev, GFP_KERNEL);
+       if (!sdp_dev)
+               return;
+
+       sdp_dev->pd = ib_alloc_pd(device);
+       if (IS_ERR(sdp_dev->pd)) {
+               printk(KERN_WARNING "Unable to allocate PD: %ld.\n",
+                               PTR_ERR(sdp_dev->pd));
+               goto err_pd_alloc;
+       }
+
+        sdp_dev->mr = ib_get_dma_mr(sdp_dev->pd, IB_ACCESS_LOCAL_WRITE);
+        if (IS_ERR(sdp_dev->mr)) {
+               printk(KERN_WARNING "Unable to get dma MR: %ld.\n",
+                               PTR_ERR(sdp_dev->mr));
+                goto err_mr;
+        }
+
+       memset(&fmr_param, 0, sizeof fmr_param);
+       fmr_param.pool_size         = SDP_FMR_POOL_SIZE;
+       fmr_param.dirty_watermark   = SDP_FMR_DIRTY_SIZE;
+       fmr_param.cache             = 0;
+       fmr_param.max_pages_per_fmr = SDP_FMR_SIZE;
+       fmr_param.page_shift        = PAGE_SHIFT;
+       fmr_param.access            = (IB_ACCESS_LOCAL_WRITE |
+                                      IB_ACCESS_REMOTE_READ);
+
+       sdp_dev->fmr_pool = ib_create_fmr_pool(sdp_dev->pd, &fmr_param);
+       if (IS_ERR(sdp_dev->fmr_pool)) {
+               printk(KERN_WARNING "Error creating fmr pool\n");
+               sdp_dev->fmr_pool = NULL;
+               goto err_fmr_create;
+       }
+
+       ib_set_client_data(device, &sdp_client, sdp_dev);
+
+       return;
+
+err_fmr_create:
+       ib_dereg_mr(sdp_dev->mr);
+err_mr:
+       ib_dealloc_pd(sdp_dev->pd);
+err_pd_alloc:
+       kfree(sdp_dev);
 }
 
 static void sdp_remove_device(struct ib_device *device)
@@ -2384,6 +2485,7 @@ static void sdp_remove_device(struct ib_device *device)
        struct sdp_sock   *ssk;
        struct sock       *sk;
        struct rdma_cm_id *id;
+       struct sdp_device *sdp_dev;
 
 do_next:
        write_lock(&device_removal_lock);
@@ -2416,6 +2518,7 @@ kill_socks:
                        sk = &ssk->isk.sk;
 
                        sdp_cancel_dreq_wait_timeout(ssk);
+                       cancel_delayed_work(&ssk->srcavail_cancel_work);
 
                        spin_unlock_irq(&sock_list_lock);
 
@@ -2438,6 +2541,17 @@ kill_socks:
        spin_unlock_irq(&sock_list_lock);
 
        write_unlock(&device_removal_lock);
+
+       sdp_dev = ib_get_client_data(device, &sdp_client);
+
+       ib_flush_fmr_pool(sdp_dev->fmr_pool);
+       ib_destroy_fmr_pool(sdp_dev->fmr_pool);
+
+       ib_dereg_mr(sdp_dev->mr);
+
+       ib_dealloc_pd(sdp_dev->pd);
+
+       kfree(sdp_dev);
 }
 
 static struct net_proto_family sdp_net_proto = {
@@ -2446,7 +2560,7 @@ static struct net_proto_family sdp_net_proto = {
        .owner  = THIS_MODULE,
 };
 
-static struct ib_client sdp_client = {
+struct ib_client sdp_client = {
        .name   = "sdp",
        .add    = sdp_add_device,
        .remove = sdp_remove_device
@@ -2468,6 +2582,9 @@ static int __init sdp_init(void)
        if (!orphan_count)
                goto no_mem_orphan_count;
 
+       INIT_LIST_HEAD(&sockets_allocated->list);
+       INIT_LIST_HEAD(&orphan_count->list);
+
        percpu_counter_init(sockets_allocated, 0);
        percpu_counter_init(orphan_count, 0);
 
index 939a143ceede5b4de3cd183bcdf5273e59196858..791030ffc21e54d6f467ba9c62bd3cfad78c9b64 100644 (file)
@@ -250,6 +250,8 @@ static int sdpstats_seq_show(struct seq_file *seq, void *v)
                sdpstats.sendmsg_bcopy_segment);
        seq_printf(seq, "bzcopy segments    \t\t: %d\n",
                sdpstats.sendmsg_bzcopy_segment);
+       seq_printf(seq, "zcopy segments    \t\t: %d\n",
+               sdpstats.sendmsg_zcopy_segment);
        seq_printf(seq, "post_send_credits  \t\t: %d\n",
                sdpstats.post_send_credits);
        seq_printf(seq, "memcpy_count       \t\t: %u\n",
index 38417c4947222f2a852f6f018a4a405bb16ddaf7..80c62e753a9513d40c590ee8b52afc57886676f9 100644 (file)
@@ -81,7 +81,7 @@ void sdp_remove_large_sock(struct sdp_sock *ssk)
 }
 
 /* Like tcp_fin - called when SDP_MID_DISCONNECT is received */
-static void sdp_fin(struct sock *sk)
+void sdp_handle_disconn(struct sock *sk)
 {
        sdp_dbg(sk, "%s\n", __func__);
 
@@ -105,6 +105,7 @@ static void sdp_fin(struct sock *sk)
                sdp_exch_state(sk, TCPF_FIN_WAIT1, TCP_TIME_WAIT);
 
                if (sdp_sk(sk)->id) {
+                       sdp_sk(sk)->qp_active = 0;
                        rdma_disconnect(sdp_sk(sk)->id);
                } else {
                        sdp_warn(sk, "%s: sdp_sk(sk)->id is NULL\n", __func__);
@@ -200,7 +201,7 @@ static int sdp_post_recv(struct sdp_sock *ssk)
        /* TODO: proper error handling */
        sge->addr = (u64)addr;
        sge->length = SDP_HEAD_SIZE;
-       sge->lkey = ssk->mr->lkey;
+       sge->lkey = ssk->sdp_dev->mr->lkey;
        frags = skb_shinfo(skb)->nr_frags;
        for (i = 0; i < frags; ++i) {
                ++sge;
@@ -212,7 +213,7 @@ static int sdp_post_recv(struct sdp_sock *ssk)
                rx_req->mapping[i + 1] = addr;
                sge->addr = addr;
                sge->length = skb_shinfo(skb)->frags[i].size;
-               sge->lkey = ssk->mr->lkey;
+               sge->lkey = ssk->sdp_dev->mr->lkey;
        }
 
        rx_wr.next = NULL;
@@ -244,6 +245,11 @@ static inline int sdp_post_recvs_needed(struct sdp_sock *ssk)
        int buffer_size = SDP_HEAD_SIZE + ssk->recv_frags * PAGE_SIZE;
        unsigned long max_bytes;
 
+       if (!ssk->qp_active) {
+//             sdp_warn(sk, "post rx while qp is not active\n");
+               return 0;
+       }
+
        if (top_mem_usage && (top_mem_usage * 0x100000) <
                        atomic_read(&sdp_current_mem_usage) * PAGE_SIZE) {
                scale = 1;
@@ -251,12 +257,12 @@ static inline int sdp_post_recvs_needed(struct sdp_sock *ssk)
 
        max_bytes = sk->sk_rcvbuf * scale;
 
-       if  (unlikely(ring_posted(ssk->rx_ring) >= SDP_RX_SIZE))
+       if  (unlikely(rx_ring_posted(ssk) >= SDP_RX_SIZE))
                return 0;
 
-       if (likely(ring_posted(ssk->rx_ring) > SDP_MIN_TX_CREDITS)) {
+       if (likely(rx_ring_posted(ssk) > SDP_MIN_TX_CREDITS)) {
                unsigned long bytes_in_process =
-                       (ring_posted(ssk->rx_ring) - SDP_MIN_TX_CREDITS) *
+                       (rx_ring_posted(ssk) - SDP_MIN_TX_CREDITS) *
                        buffer_size;
                bytes_in_process += rcv_nxt(ssk) - ssk->copied_seq;
 
@@ -292,15 +298,36 @@ static inline struct sk_buff *sdp_sock_queue_rcv_skb(struct sock *sk,
 {
        int skb_len;
        struct sdp_sock *ssk = sdp_sk(sk);
+       struct sdp_bsdh *h = (struct sdp_bsdh *)skb_transport_header(skb);
 
        /* not needed since sk_rmem_alloc is not currently used
         * TODO - remove this?
        skb_set_owner_r(skb, sk); */
 
-       skb_len = skb->len;
+       SDP_SKB_CB(skb)->seq = rcv_nxt(ssk);
+       if (h->mid == SDP_MID_SRCAVAIL) {
+               struct sdp_srcah *srcah = (struct sdp_srcah *)(h+1);
+               struct rx_srcavail_state *sa;
+               
+               ssk->srcavail_cancel_mseq = 0;
+
+               sa = RX_SRCAVAIL_STATE(skb) = kzalloc(
+                               sizeof(struct rx_srcavail_state), GFP_ATOMIC);
+
+               sa->mseq = ntohl(h->mseq);
+               sa->aborted = 0;
+               sa->used = 0;
+               sa->len = skb_len = ntohl(srcah->len);
+               sa->rkey = ntohl(srcah->rkey);
+               sa->vaddr = be64_to_cpu(srcah->vaddr);
+
+               sdp_dbg_data(sk, "queueing SrcAvail. skb_len = %d vaddr = %lld\n",
+                       skb_len, sa->vaddr);
+       } else {
+               skb_len = skb->len;
 
-       TCP_SKB_CB(skb)->seq = rcv_nxt(ssk);
-       atomic_add(skb_len, &ssk->rcv_nxt);
+               atomic_add(skb_len, &ssk->rcv_nxt);
+       }
 
        skb_queue_tail(&sk->sk_receive_queue, skb);
 
@@ -379,9 +406,9 @@ static inline int credit_update_needed(struct sdp_sock *ssk)
        if (likely(c > SDP_MIN_TX_CREDITS))
                c += c/2;
 
-       return unlikely(c < ring_posted(ssk->rx_ring)) &&
+       return unlikely(c < rx_ring_posted(ssk)) &&
            likely(tx_credits(ssk) > 1) &&
-           likely(sdp_tx_ring_slots_left(&ssk->tx_ring));
+           likely(sdp_tx_ring_slots_left(ssk));
 }
 
 
@@ -421,6 +448,7 @@ static int sdp_process_rx_ctl_skb(struct sdp_sock *ssk, struct sk_buff *skb)
 
        switch (h->mid) {
        case SDP_MID_DATA:
+       case SDP_MID_SRCAVAIL:
                WARN_ON(!(sk->sk_shutdown & RCV_SHUTDOWN));
 
                sdp_warn(sk, "DATA after socket rcv was shutdown\n");
@@ -436,14 +464,26 @@ static int sdp_process_rx_ctl_skb(struct sdp_sock *ssk, struct sk_buff *skb)
                }
                __kfree_skb(skb);
 
+               break;
+       case SDP_MID_RDMARDCOMPL:
+               {
+                       struct sdp_rrch *rrch = (struct sdp_rrch *)(h+1);
+                       sdp_dbg_data(sk, "RdmaRdCompl message arrived\n");
+                       sdp_handle_rdma_read_compl(ssk, ntohl(h->mseq_ack),
+                                       ntohl(rrch->len));
+                       __kfree_skb(skb);
+               } break;
+       case SDP_MID_SENDSM:
+               sdp_handle_sendsm(ssk, ntohl(h->mseq_ack));
+               __kfree_skb(skb);
+               break;
+       case SDP_MID_SRCAVAIL_CANCEL:
+               sdp_dbg_data(sk, "Handling SrcAvailCancel - sending SendSM\n");
+               sdp_prf(sk, NULL, "Handling SrcAvailCancel");
+               ssk->srcavail_cancel_mseq = ntohl(h->mseq);
+               sdp_post_sendsm(ssk);
                break;
        case SDP_MID_DISCONN:
-               sdp_dbg_data(sk, "Handling RX disconnect\n");
-               sdp_prf(sk, NULL, "Handling RX disconnect");
-               sdp_fin(sk);
-               sdp_prf(sk, NULL, "Queueing fin skb - release recvmsg");
-               /* Enqueue fin skb to release sleeping recvmsg */
-               sdp_sock_queue_rcv_skb(sk, skb);
                break;
        case SDP_MID_CHRCVBUF:
                sdp_dbg_data(sk, "Handling RX CHRCVBUF\n");
@@ -502,20 +542,27 @@ static int sdp_process_rx_skb(struct sdp_sock *ssk, struct sk_buff *skb)
 
        skb_pull(skb, sizeof(struct sdp_bsdh));
 
-       if (h->mid != SDP_MID_DATA ||
+       if ((h->mid != SDP_MID_DATA && h->mid != SDP_MID_SRCAVAIL &&
+                               h->mid != SDP_MID_DISCONN) ||
                        unlikely(sk->sk_shutdown & RCV_SHUTDOWN)) {
                sdp_prf(sk, NULL, "Control skb - queing to control queue");
+               if (h->mid == SDP_MID_SRCAVAIL_CANCEL) {
+                       sdp_warn(sk, "Got SrcAvailCancel. "
+                                       "seq: 0x%d seq_ack: 0x%d\n",
+                                       ntohl(h->mseq), ntohl(h->mseq_ack));
+                       ssk->srcavail_cancel_mseq = ntohl(h->mseq);
+               }
                skb_queue_tail(&ssk->rx_ctl_q, skb);
+
                return 0;
        }
 
-       if (unlikely(skb->len <= 0)) {
+       if (unlikely(h->mid == SDP_MID_DATA && skb->len <= 0)) {
                __kfree_skb(skb);
                return 0;
        }
 
-       sdp_prf(sk, NULL, "queueing a %s skb",
-                       (h->mid == SDP_MID_DATA ? "data" : "disconnect"));
+       sdp_prf(sk, NULL, "queueing %s skb", mid2str(h->mid));
        skb = sdp_sock_queue_rcv_skb(sk, skb);
 
 /*     if (unlikely(h->flags & SDP_OOB_PRES))
@@ -540,10 +587,12 @@ static struct sk_buff *sdp_process_rx_wc(struct sdp_sock *ssk,
        atomic_sub(skb_shinfo(skb)->nr_frags, &sdp_current_mem_usage);
 
        if (unlikely(wc->status)) {
-               if (wc->status != IB_WC_WR_FLUSH_ERR) {
-                       sdp_warn(sk, "Recv completion with error. Status %d\n",
-                               wc->status);
+               if (ssk->qp_active) {
+                       sdp_warn(sk, "Recv completion with error. "
+                                       "Status %d, vendor: %d\n",
+                               wc->status, wc->vendor_err);
                        sdp_reset(sk);
+                       ssk->qp_active = 0;
                }
                __kfree_skb(skb);
                return NULL;
@@ -619,8 +668,12 @@ static int sdp_poll_rx_cq(struct sdp_sock *ssk)
 
        do {
                n = ib_poll_cq(cq, SDP_NUM_WC, ibwc);
+//             sdp_warn(&ssk->isk.sk, "polling: %d\n", n);
                for (i = 0; i < n; ++i) {
                        struct ib_wc *wc = &ibwc[i];
+/*                     sdp_warn(&ssk->isk.sk, "wr_id=0x%lx len %d opcode: 0x%x status: 0x%x\n",
+                                       wc->wr_id, wc->byte_len,
+                                       wc->opcode, wc->status);*/
 
                        BUG_ON(!(wc->wr_id & SDP_OP_RECV));
                        skb = sdp_process_rx_wc(ssk, wc);
@@ -686,7 +739,7 @@ void sdp_do_posts(struct sdp_sock *ssk)
 
        sdp_post_recvs(ssk);
 
-       if (ring_posted(ssk->tx_ring))
+       if (tx_ring_posted(ssk))
                sdp_xmit_poll(ssk, 1);
 
        sdp_post_sends(ssk, 0);
@@ -714,7 +767,8 @@ static void sdp_rx_irq(struct ib_cq *cq, void *cq_context)
        int wc_processed = 0;
        int credits_before;
 
-       sdp_dbg_data(&ssk->isk.sk, "rx irq called\n");
+//     sdp_dbg_data(&ssk->isk.sk, "rx irq called\n");
+//     sdp_warn(&ssk->isk.sk, "rx irq called\n");
 
        if (cq != ssk->rx_ring.cq) {
                sdp_warn(sk, "cq = %p, ssk->cq = %p\n", cq, ssk->rx_ring.cq);
@@ -725,16 +779,13 @@ static void sdp_rx_irq(struct ib_cq *cq, void *cq_context)
 
        sdp_prf(sk, NULL, "rx irq");
 
-       rx_ring_lock(ssk, flags);
+       if (!rx_ring_trylock(&ssk->rx_ring, &flags)) {
+               sdp_warn(&ssk->isk.sk, "ring destroyed. not polling it\n");
+               return;
+       }
 
        credits_before = tx_credits(ssk);
 
-       if (!ssk->rx_ring.cq) {
-               sdp_warn(&ssk->isk.sk, "WARNING: rx irq after cq destroyed\n");
-
-               goto out;
-       }
-
        wc_processed = sdp_poll_rx_cq(ssk);
        sdp_prf(&ssk->isk.sk, NULL, "processed %d", wc_processed);
 
@@ -757,13 +808,12 @@ static void sdp_rx_irq(struct ib_cq *cq, void *cq_context)
        }
        sdp_arm_rx_cq(sk);
 
-out:
-       rx_ring_unlock(ssk, flags);
+       rx_ring_unlock(&ssk->rx_ring, &flags);
 }
 
 static void sdp_rx_ring_purge(struct sdp_sock *ssk)
 {
-       while (ring_posted(ssk->rx_ring) > 0) {
+       while (rx_ring_posted(ssk) > 0) {
                struct sk_buff *skb;
                skb = sdp_recv_completion(ssk, ring_tail(ssk->rx_ring));
                if (!skb)
@@ -776,7 +826,8 @@ static void sdp_rx_ring_purge(struct sdp_sock *ssk)
 void sdp_rx_ring_init(struct sdp_sock *ssk)
 {
        ssk->rx_ring.buffer = NULL;
-       spin_lock_init(&ssk->rx_ring.lock);
+       ssk->rx_ring.destroyed = 0;
+       rwlock_init(&ssk->rx_ring.destroyed_lock);
 }
 
 static void sdp_rx_cq_event_handler(struct ib_event *event, void *data)
@@ -828,6 +879,8 @@ err_cq:
 
 void sdp_rx_ring_destroy(struct sdp_sock *ssk)
 {
+       rx_ring_destroy_lock(&ssk->rx_ring);
+
        if (ssk->rx_ring.buffer) {
                sdp_rx_ring_purge(ssk);
 
index d53e838fd1704bdead7e521c66f3ea2d59533d17..7eb7d31fe38878d72bae6f555a42653af29d7a72 100644 (file)
@@ -46,7 +46,7 @@ int sdp_xmit_poll(struct sdp_sock *ssk, int force)
 {
        int wc_processed = 0;
 
-       sdp_prf(&ssk->isk.sk, NULL, "called from %s:%d", func, line);
+       sdp_prf(&ssk->isk.sk, NULL, "%s", __func__);
 
        /* If we don't have a pending timer, set one up to catch our recent
           post in case the interface becomes idle */
@@ -84,18 +84,20 @@ void sdp_post_send(struct sdp_sock *ssk, struct sk_buff *skb, u8 mid)
        ssk->tx_bytes += skb->len;
 
        h->mid = mid;
-       if (unlikely(TCP_SKB_CB(skb)->flags & TCPCB_FLAG_URG))
+       if (unlikely(SDP_SKB_CB(skb)->flags & TCPCB_FLAG_URG))
                h->flags = SDP_OOB_PRES | SDP_OOB_PEND;
        else
                h->flags = 0;
 
-       h->bufs = htons(ring_posted(ssk->rx_ring));
+       h->bufs = htons(rx_ring_posted(ssk));
        h->len = htonl(skb->len);
        h->mseq = htonl(mseq);
+       if (TX_SRCAVAIL_STATE(skb))
+               TX_SRCAVAIL_STATE(skb)->mseq = mseq;
        h->mseq_ack = htonl(mseq_ack(ssk));
 
        sdp_prf1(&ssk->isk.sk, skb, "TX: %s bufs: %d mseq:%ld ack:%d",
-                       mid2str(mid), ring_posted(ssk->rx_ring), mseq,
+                       mid2str(mid), rx_ring_posted(ssk), mseq,
                        ntohl(h->mseq_ack));
 
        SDP_DUMP_PACKET(&ssk->isk.sk, "TX", skb, h);
@@ -112,7 +114,7 @@ void sdp_post_send(struct sdp_sock *ssk, struct sk_buff *skb, u8 mid)
 
        sge->addr = addr;
        sge->length = skb->len - skb->data_len;
-       sge->lkey = ssk->mr->lkey;
+       sge->lkey = ssk->sdp_dev->mr->lkey;
        frags = skb_shinfo(skb)->nr_frags;
        for (i = 0; i < frags; ++i) {
                ++sge;
@@ -124,7 +126,7 @@ void sdp_post_send(struct sdp_sock *ssk, struct sk_buff *skb, u8 mid)
                tx_req->mapping[i + 1] = addr;
                sge->addr = addr;
                sge->length = skb_shinfo(skb)->frags[i].size;
-               sge->lkey = ssk->mr->lkey;
+               sge->lkey = ssk->sdp_dev->mr->lkey;
        }
 
        tx_wr.next = NULL;
@@ -133,7 +135,7 @@ void sdp_post_send(struct sdp_sock *ssk, struct sk_buff *skb, u8 mid)
        tx_wr.num_sge = frags + 1;
        tx_wr.opcode = IB_WR_SEND;
        tx_wr.send_flags = IB_SEND_SIGNALED;
-       if (unlikely(TCP_SKB_CB(skb)->flags & TCPCB_FLAG_URG))
+       if (unlikely(SDP_SKB_CB(skb)->flags & TCPCB_FLAG_URG))
                tx_wr.send_flags |= IB_SEND_SOLICITED;
 
        delta = jiffies - last_send;
@@ -144,7 +146,7 @@ void sdp_post_send(struct sdp_sock *ssk, struct sk_buff *skb, u8 mid)
        rc = ib_post_send(ssk->qp, &tx_wr, &bad_wr);
        atomic_inc(&ssk->tx_ring.head);
        atomic_dec(&ssk->tx_ring.credits);
-       atomic_set(&ssk->remote_credits, ring_posted(ssk->rx_ring));
+       atomic_set(&ssk->remote_credits, rx_ring_posted(ssk));
        if (unlikely(rc)) {
                sdp_dbg(&ssk->isk.sk,
                                "ib_post_send failed with status %d.\n", rc);
@@ -158,7 +160,6 @@ static struct sk_buff *sdp_send_completion(struct sdp_sock *ssk, int mseq)
        struct ib_device *dev;
        struct sdp_buf *tx_req;
        struct sk_buff *skb = NULL;
-       struct bzcopy_state *bz;
        int i, frags;
        struct sdp_tx_ring *tx_ring = &ssk->tx_ring;
        if (unlikely(mseq != ring_tail(*tx_ring))) {
@@ -179,12 +180,11 @@ static struct sk_buff *sdp_send_completion(struct sdp_sock *ssk, int mseq)
                                  DMA_TO_DEVICE);
        }
 
-       tx_ring->una_seq += TCP_SKB_CB(skb)->end_seq;
+       tx_ring->una_seq += SDP_SKB_CB(skb)->end_seq;
 
        /* TODO: AIO and real zcopy code; add their context support here */
-       bz = BZCOPY_STATE(skb);
-       if (bz)
-               bz->busy--;
+       if (BZCOPY_STATE(skb))
+               BZCOPY_STATE(skb)->busy--;
 
        atomic_inc(&tx_ring->tail);
 
@@ -195,6 +195,7 @@ out:
 static int sdp_handle_send_comp(struct sdp_sock *ssk, struct ib_wc *wc)
 {
        struct sk_buff *skb = NULL;
+       struct sdp_bsdh *h;
 
        skb = sdp_send_completion(ssk, wc->wr_id);
        if (unlikely(!skb))
@@ -214,12 +215,10 @@ static int sdp_handle_send_comp(struct sdp_sock *ssk, struct ib_wc *wc)
                }
        }
 
-#ifdef SDP_PROFILING
-{
-       struct sdp_bsdh *h = (struct sdp_bsdh *)skb->data;
+       h = (struct sdp_bsdh *)skb->data;
+
        sdp_prf1(&ssk->isk.sk, skb, "tx completion. mseq:%d", ntohl(h->mseq));
-}
-#endif
+
        sk_wmem_free_skb(&ssk->isk.sk, skb);
 
        return 0;
@@ -227,11 +226,37 @@ static int sdp_handle_send_comp(struct sdp_sock *ssk, struct ib_wc *wc)
 
 static inline void sdp_process_tx_wc(struct sdp_sock *ssk, struct ib_wc *wc)
 {
+       struct sock *sk = &ssk->isk.sk;
+
        if (likely(wc->wr_id & SDP_OP_SEND)) {
                sdp_handle_send_comp(ssk, wc);
                return;
        }
 
+       if (wc->wr_id & SDP_OP_RDMA) {
+               sdp_dbg_data(sk, "TX comp: RDMA read. status: %d\n", wc->status);
+               sdp_prf1(sk, NULL, "TX comp: RDMA read");
+
+               if (!ssk->tx_ring.rdma_inflight) {
+                       sdp_warn(sk, "ERROR: unexpected RDMA read\n");
+                       return;
+               }
+
+               if (!ssk->tx_ring.rdma_inflight->busy) {
+                       sdp_warn(sk, "ERROR: too many RDMA read completions\n");
+                       return;
+               }
+
+ /* Only last RDMA read WR is signalled. Order is guaranteed - therefore
+  * if Last RDMA read WR is completed - all other have, too */
+               ssk->tx_ring.rdma_inflight->busy = 0;
+               if (!ssk->tx_ring.rdma_inflight->busy) {
+                       wake_up(ssk->isk.sk.sk_sleep);
+                       sdp_dbg_data(&ssk->isk.sk, "woke up sleepers\n");
+               }
+               return;
+       }
+
        /* Keepalive probe sent cleanup */
        sdp_cnt(sdp_keepalive_probes_sent);
 
@@ -267,7 +292,7 @@ static int sdp_process_tx_cq(struct sdp_sock *ssk)
                }
        } while (n == SDP_NUM_WC);
 
-       sdp_dbg_data(&ssk->isk.sk, "processed %d wc's\n", wc_processed);
+//     sdp_dbg_data(&ssk->isk.sk, "processed %d wc's\n", wc_processed);
 
        if (wc_processed) {
                struct sock *sk = &ssk->isk.sk;
@@ -286,17 +311,18 @@ static void sdp_poll_tx_timeout(unsigned long data)
        struct sock *sk = &ssk->isk.sk;
        u32 inflight, wc_processed;
 
-       sdp_dbg_data(&ssk->isk.sk, "Polling tx cq. inflight=%d\n",
-               (u32) ring_posted(ssk->tx_ring));
+//     sdp_dbg_data(&ssk->isk.sk, "Polling tx cq. inflight=%d\n",
+//             (u32) tx_ring_posted(ssk));
 
-       sdp_prf(&ssk->isk.sk, NULL, "%s. inflight=%d", __func__,
-               (u32) ring_posted(ssk->tx_ring));
+       sdp_prf1(&ssk->isk.sk, NULL, "TX timeout: inflight=%d", 
+               (u32) tx_ring_posted(ssk));
 
        /* Only process if the socket is not in use */
        bh_lock_sock(sk);
        if (sock_owned_by_user(sk)) {
                mod_timer(&ssk->tx_ring.timer, jiffies + SDP_TX_POLL_TIMEOUT);
-               sdp_dbg_data(&ssk->isk.sk, "socket is busy - trying later\n");
+//             sdp_dbg_data(&ssk->isk.sk, "socket is busy - trying later\n");
+               sdp_prf(&ssk->isk.sk, NULL, "TX comp: socket is busy\n");
                SDPSTATS_COUNTER_INC(tx_poll_busy);
                goto out;
        }
@@ -310,17 +336,22 @@ static void sdp_poll_tx_timeout(unsigned long data)
        else
                SDPSTATS_COUNTER_INC(tx_poll_hit);
 
-       inflight = (u32) ring_posted(ssk->tx_ring);
+       inflight = (u32) rx_ring_posted(ssk);
 
        /* If there are still packets in flight and the timer has not already
         * been scheduled by the Tx routine then schedule it here to guarantee
         * completion processing of these packets */
        if (inflight) { /* TODO: make sure socket is not closed */
-               sdp_dbg_data(sk, "arming timer for more polling\n");
+//             sdp_dbg_data(sk, "arming timer for more polling\n");
                mod_timer(&ssk->tx_ring.timer, jiffies + SDP_TX_POLL_TIMEOUT);
        }
 
 out:
+       if (ssk->tx_ring.rdma_inflight && ssk->tx_ring.rdma_inflight->busy) {
+               sdp_prf1(sk, NULL, "RDMA is inflight - arming irq");
+               sdp_arm_tx_cq(sk);
+       }
+
        bh_unlock_sock(sk);
 }
 
@@ -329,14 +360,18 @@ static void sdp_tx_irq(struct ib_cq *cq, void *cq_context)
        struct sock *sk = cq_context;
        struct sdp_sock *ssk = sdp_sk(sk);
 
-       sdp_prf1(sk, NULL, "Got tx comp interrupt");
+       sdp_prf1(sk, NULL, "tx irq");
+       sdp_dbg_data(sk, "Got tx comp interrupt\n");
 
-       mod_timer(&ssk->tx_ring.timer, jiffies);
+       SDPSTATS_COUNTER_INC(tx_int_count);
+
+       mod_timer(&ssk->tx_ring.timer, jiffies + SDP_TX_POLL_TIMEOUT);
+       tasklet_schedule(&ssk->tx_ring.tasklet);
 }
 
 void sdp_tx_ring_purge(struct sdp_sock *ssk)
 {
-       while (ring_posted(ssk->tx_ring)) {
+       while (tx_ring_posted(ssk)) {
                struct sk_buff *skb;
                skb = sdp_send_completion(ssk, ring_tail(ssk->tx_ring));
                if (!skb)
@@ -373,6 +408,10 @@ void sdp_post_keepalive(struct sdp_sock *ssk)
 
 static void sdp_tx_cq_event_handler(struct ib_event *event, void *data)
 {
+       printk("xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx\n");
+       printk("xx       event called !!!!!!!!!!                   xxxxxx\n");
+       printk("xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx\n");
+       printk("xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx\n");
 }
 
 int sdp_tx_ring_create(struct sdp_sock *ssk, struct ib_device *device)
@@ -409,6 +448,9 @@ int sdp_tx_ring_create(struct sdp_sock *ssk, struct ib_device *device)
        ssk->tx_ring.timer.data = (unsigned long) ssk;
        ssk->tx_ring.poll_cnt = 0;
 
+       tasklet_init(&ssk->tx_ring.tasklet, sdp_poll_tx_timeout,
+                       (unsigned long) ssk);
+
        init_timer(&ssk->nagle_timer);
        ssk->nagle_timer.function = sdp_nagle_timeout;
        ssk->nagle_timer.data = (unsigned long) ssk;
diff --git a/drivers/infiniband/ulp/sdp/sdp_zcopy.c b/drivers/infiniband/ulp/sdp/sdp_zcopy.c
new file mode 100644 (file)
index 0000000..bac3f71
--- /dev/null
@@ -0,0 +1,913 @@
+/*
+ * Copyright (c) 2006 Mellanox Technologies Ltd.  All rights reserved.
+ *
+ * This software is available to you under a choice of one of two
+ * licenses.  You may choose to be licensed under the terms of the GNU
+ * General Public License (GPL) Version 2, available from the file
+ * COPYING in the main directory of this source tree, or the
+ * OpenIB.org BSD license below:
+ *
+ *     Redistribution and use in source and binary forms, with or
+ *     without modification, are permitted provided that the following
+ *     conditions are met:
+ *
+ *      - Redistributions of source code must retain the above
+ *        copyright notice, this list of conditions and the following
+ *        disclaimer.
+ *
+ *      - Redistributions in binary form must reproduce the above
+ *        copyright notice, this list of conditions and the following
+ *        disclaimer in the documentation and/or other materials
+ *        provided with the distribution.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
+ * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
+ * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
+ * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
+ * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
+ * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
+ * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+#include <linux/tcp.h>
+#include <asm/ioctls.h>
+#include <linux/workqueue.h>
+#include <linux/net.h>
+#include <linux/socket.h>
+#include <net/protocol.h>
+#include <net/inet_common.h>
+#include <rdma/rdma_cm.h>
+#include <rdma/ib_verbs.h>
+#include <rdma/ib_fmr_pool.h>
+#include <linux/dmaengine.h>
+#include <linux/pagemap.h>
+#include <net/tcp.h> /* for memcpy_toiovec */
+#include <asm/io.h>
+#include <asm/uaccess.h>
+#include <linux/delay.h>
+#include "sdp.h"
+
+static struct bzcopy_state dummy_bz = {
+busy: 1,
+};
+
+static int sdp_post_srcavail(struct sock *sk, struct tx_srcavail_state *tx_sa,
+               int off, size_t len)
+{
+       struct sdp_sock *ssk = sdp_sk(sk);
+       struct sdp_srcah *srcah;
+       struct sk_buff *skb;
+
+       WARN_ON(ssk->tx_sa);
+
+       BUG_ON(!tx_sa);
+       BUG_ON(!tx_sa->fmr || !tx_sa->fmr->fmr->lkey);
+
+       skb = sdp_stream_alloc_skb(&ssk->isk.sk,
+                       sizeof(struct sdp_bsdh) +
+                       sizeof(struct sdp_srcah),
+                       GFP_KERNEL);
+       if (!skb) {
+               return -ENOMEM;
+       }
+       sdp_prf1(sk, skb, "sending SrcAvail");
+               
+       TX_SRCAVAIL_STATE(skb) = tx_sa; /* tx_sa is hanged on the skb 
+                                        * but continue to live after skb is freed */
+       ssk->tx_sa = tx_sa;
+
+       srcah = (struct sdp_srcah *)skb_push(skb, sizeof(*srcah));
+       srcah->len = htonl(len);
+       srcah->rkey = htonl(tx_sa->fmr->fmr->lkey);
+       srcah->vaddr = cpu_to_be64(off);
+
+       skb_entail(sk, ssk, skb);
+       
+       /* TODO: pushing the skb into the tx_queue should be enough */
+
+       return 0;
+}
+
+static int sdp_post_srcavail_cancel(struct sock *sk)
+{
+       struct sdp_sock *ssk = sdp_sk(sk);
+
+       if (!ssk->tx_sa && !ssk->srcavail_cancel)
+               return 0; /* srcavail already serviced */
+
+       ssk->srcavail_cancel = 1;
+
+       sdp_post_sends(ssk, 1);
+
+       schedule_delayed_work(&ssk->srcavail_cancel_work,
+                       SDP_SRCAVAIL_CANCEL_TIMEOUT);
+
+       return 0;
+}
+
+void srcavail_cancel_timeout(struct work_struct *work)
+{
+       struct sdp_sock *ssk =
+               container_of(work, struct sdp_sock, srcavail_cancel_work.work);
+       struct sock *sk = &ssk->isk.sk;
+
+       lock_sock(sk);
+
+       sdp_warn(sk, "both SrcAvail and SrcAvailCancel timedout."
+                       " closing connection\n");
+       sdp_set_error(sk, -ECONNRESET);
+       wake_up(&ssk->wq);
+
+       release_sock(sk);
+}
+
+static int sdp_wait_rdmardcompl(struct sdp_sock *ssk, long *timeo_p, int len,
+               int ignore_signals)
+{
+       struct sock *sk = &ssk->isk.sk;
+       int err = 0;
+       long vm_wait = 0;
+       long current_timeo = *timeo_p;
+       struct tx_srcavail_state *tx_sa = ssk->tx_sa;
+       DEFINE_WAIT(wait);
+
+       sdp_dbg_data(sk, "Going to sleep till get RdmaRdCompl.\n");
+       sdp_prf1(sk, NULL, "Going to sleep");
+       while (ssk->qp_active) {
+               prepare_to_wait(sk->sk_sleep, &wait, TASK_INTERRUPTIBLE);
+
+               if (unlikely(!*timeo_p)) {
+                       err = -ETIME;
+                       sdp_prf1(sk, NULL, "timeout");
+                       break;
+               }
+
+               if (unlikely(!ignore_signals && signal_pending(current))) {
+                       err = -EINTR;
+                       sdp_prf1(sk, NULL, "signalled");
+                       break;
+               }
+
+               if (tx_sa->bytes_completed >= len)
+                       break;
+
+               if (tx_sa->abort) {
+                       sdp_prf1(sk, NULL, "Aborting SrcAvail sending");
+                       err = -EAGAIN;
+                       break ;
+               }
+
+               posts_handler_put(ssk);
+
+               sk_wait_event(sk, &current_timeo, tx_sa->abort && 
+                               (tx_sa->bytes_completed >= len) && vm_wait);
+               sdp_dbg_data(&ssk->isk.sk, "woke up sleepers\n");
+
+               posts_handler_get(ssk);
+
+               if (vm_wait) {
+                       vm_wait -= current_timeo;
+                       current_timeo = *timeo_p;
+                       if (current_timeo != MAX_SCHEDULE_TIMEOUT &&
+                           (current_timeo -= vm_wait) < 0)
+                               current_timeo = 0;
+                       vm_wait = 0;
+               }
+               *timeo_p = current_timeo;
+       }
+
+       if (!ssk->qp_active) {
+               sdp_warn(sk, "qp is not active\n");
+       }
+
+       finish_wait(sk->sk_sleep, &wait);
+
+       sdp_dbg_data(sk, "Finished waiting - RdmaRdCompl: %d bytes, abort: %d\n",
+                       tx_sa->bytes_completed, tx_sa->abort);
+
+       if (!ssk->qp_active) {
+               sdp_warn(sk, "QP destroyed while waiting\n");
+               return -EINVAL;
+       }
+       return err;
+}
+
+static int sdp_wait_rdma_wr_finished(struct sdp_sock *ssk, long *timeo_p)
+{
+       struct sock *sk = &ssk->isk.sk;
+       int err = 0;
+       long current_timeo = *timeo_p;
+       DEFINE_WAIT(wait);
+
+       sdp_dbg_data(sk, "Sleep till RDMA wr finished.\n");
+       while (1) {
+               prepare_to_wait(sk->sk_sleep, &wait, TASK_INTERRUPTIBLE);
+
+               if (unlikely(!*timeo_p)) {
+                       err = -EAGAIN;
+                       sdp_warn(sk, "timedout\n");
+                       break;
+               }
+
+               if (unlikely(signal_pending(current))) {
+                       err = sock_intr_errno(*timeo_p);
+                       sdp_warn(sk, "signalled\n");
+                       break;
+               }
+
+               if (!ssk->tx_ring.rdma_inflight->busy) {
+                       sdp_dbg_data(sk, "got rdma cqe\n");
+                       break;
+               }
+
+               posts_handler_put(ssk);
+
+               sdp_prf1(sk, NULL, "Going to sleep");
+               sk_wait_event(sk, &current_timeo, 
+                       !ssk->tx_ring.rdma_inflight->busy);
+               sdp_prf1(sk, NULL, "Woke up");
+               sdp_dbg_data(&ssk->isk.sk, "woke up sleepers\n");
+
+               posts_handler_get(ssk);
+
+               *timeo_p = current_timeo;
+       }
+
+       finish_wait(sk->sk_sleep, &wait);
+
+       sdp_dbg_data(sk, "Finished waiting - rdma's inflight=%d\n",
+                       ssk->tx_ring.rdma_inflight->busy);
+
+       return err;
+}
+
+int sdp_post_rdma_rd_compl(struct sdp_sock *ssk, int copied)
+{
+       struct sdp_rrch *rrch;
+       struct sk_buff *skb;
+       gfp_t gfp_page;
+
+       if (unlikely(ssk->isk.sk.sk_allocation))
+               gfp_page = ssk->isk.sk.sk_allocation;
+       else
+               gfp_page = GFP_KERNEL;
+
+       skb = sdp_stream_alloc_skb(&ssk->isk.sk,
+                       sizeof(struct sdp_bsdh) +
+                       sizeof(struct sdp_rrch),
+                       gfp_page);
+       /* FIXME */
+       BUG_ON(!skb);
+
+       rrch = (struct sdp_rrch *)skb_put(skb, sizeof(*rrch));
+       rrch->len = htonl(copied);
+       /* TODO: What if no tx_credits available? */
+       sdp_post_send(ssk, skb, SDP_MID_RDMARDCOMPL);
+
+       return 0;
+}
+
+int sdp_post_sendsm(struct sdp_sock *ssk)
+{
+       struct sk_buff *skb;
+       gfp_t gfp_page;
+
+       if (unlikely(ssk->isk.sk.sk_allocation))
+               gfp_page = ssk->isk.sk.sk_allocation;
+       else
+               gfp_page = GFP_KERNEL;
+
+       skb = sdp_stream_alloc_skb(&ssk->isk.sk,
+                       sizeof(struct sdp_bsdh),
+                       gfp_page);
+       /* FIXME */
+       BUG_ON(!skb);
+
+       sdp_post_send(ssk, skb, SDP_MID_SENDSM);
+
+       return 0;
+}
+
+static int sdp_update_iov_used(struct sock *sk, struct iovec *iov, int len)
+{
+       sdp_dbg_data(sk, "updating consumed %d bytes from iov\n", len);
+       while (len > 0) {
+               if (iov->iov_len) {
+                       int copy = min_t(unsigned int, iov->iov_len, len);
+                       len -= copy;
+                       iov->iov_len -= copy;
+                       iov->iov_base += copy;
+               }
+               iov++;
+       }
+
+       return 0;
+}
+
+static inline int sge_bytes(struct ib_sge *sge, int sge_cnt)
+{
+       int bytes = 0;
+
+       while (sge_cnt > 0) {
+               bytes += sge->length;
+               sge++;
+               sge_cnt--;
+       }
+
+       return bytes;
+}
+void sdp_handle_sendsm(struct sdp_sock *ssk, u32 mseq_ack)
+{
+       struct sock *sk = &ssk->isk.sk;
+       unsigned long flags;
+
+       spin_lock_irqsave(&ssk->tx_sa_lock, flags);
+
+       if (!ssk->tx_sa) {
+               sdp_prf1(sk, NULL, "SendSM for cancelled/finished SrcAvail");
+               goto out;
+       }
+
+       if (ssk->tx_sa->mseq < mseq_ack) {
+               sdp_prf1(sk, NULL, "SendSM arrived for old SrcAvail. "
+                       "SendSM mseq_ack: 0x%x, SrcAvail mseq: 0x%x",
+                       mseq_ack, ssk->tx_sa->mseq);
+               goto out;
+       }
+
+       sdp_prf1(sk, NULL, "Got SendSM - aborting SrcAvail");
+
+       ssk->tx_sa->abort = 1;
+       cancel_delayed_work(&ssk->srcavail_cancel_work);
+
+       wake_up(ssk->isk.sk.sk_sleep);
+       sdp_dbg_data(&ssk->isk.sk, "woke up sleepers\n");
+
+out:
+       spin_unlock_irqrestore(&ssk->tx_sa_lock, flags);
+}
+
+void sdp_handle_rdma_read_compl(struct sdp_sock *ssk, u32 mseq_ack,
+               u32 bytes_completed)
+{
+       struct sock *sk = &ssk->isk.sk;
+       unsigned long flags;
+
+       sdp_prf1(sk, NULL, "RdmaRdCompl ssk=%p tx_sa=%p", ssk, ssk->tx_sa);
+       sdp_dbg_data(sk, "RdmaRdCompl ssk=%p tx_sa=%p\n", ssk, ssk->tx_sa);
+
+       spin_lock_irqsave(&ssk->tx_sa_lock, flags);
+
+       BUG_ON(!ssk);
+
+       if (!ssk->tx_sa) {
+               sdp_warn(sk, "Got RdmaRdCompl for aborted SrcAvail\n");
+               goto out;
+       }
+
+       if (ssk->tx_sa->mseq < mseq_ack) {
+               sdp_warn(sk, "RdmaRdCompl arrived for old SrcAvail. "
+                       "SendSM mseq_ack: 0x%x, SrcAvail mseq: 0x%x\n",
+                       mseq_ack, ssk->tx_sa->mseq);
+               goto out;
+       }
+
+       if (ssk->tx_sa->bytes_completed > bytes_completed) {
+               sdp_warn(sk, "tx_sa->bytes_total(%d), "
+                       "tx_sa->bytes_completed(%d) > bytes_completed(%d)\n",
+                               ssk->tx_sa->bytes_total, 
+                               ssk->tx_sa->bytes_completed, bytes_completed);
+       }
+       if (ssk->tx_sa->bytes_total < bytes_completed) {
+               sdp_warn(sk, "ssk->tx_sa->bytes_total(%d) < bytes_completed(%d)\n",
+                               ssk->tx_sa->bytes_total, bytes_completed);
+       }
+
+       ssk->tx_sa->bytes_completed = bytes_completed;
+
+       wake_up(sk->sk_sleep);
+       sdp_dbg_data(sk, "woke up sleepers\n");
+
+out:
+       spin_unlock_irqrestore(&ssk->tx_sa_lock, flags);
+       return;
+}
+
+int sdp_get_pages(struct sock *sk, struct page **pages, int page_cnt,
+               unsigned long addr)
+{
+       int done_pages = 0;
+
+       sdp_dbg_data(sk, "count: %d addr: 0x%lx\n", page_cnt, addr);
+
+       addr &= PAGE_MASK;
+       if (segment_eq(get_fs(), KERNEL_DS)) {
+               for (done_pages = 0; done_pages < page_cnt; done_pages++) {
+                       pages[done_pages] = virt_to_page(addr);
+                       if (!pages[done_pages])
+                               break;
+                       get_page(pages[done_pages]);
+                       addr += PAGE_SIZE;
+               }
+       } else {
+               if (current->mm) {
+                       down_write(&current->mm->mmap_sem);
+                       done_pages = get_user_pages(current, current->mm, addr,
+                                       page_cnt, 0, 0, pages, NULL);
+                       up_write(&current->mm->mmap_sem);
+               }
+       }
+
+       if (unlikely(done_pages != page_cnt))
+               goto err;
+
+       return 0;
+
+err:
+       sdp_warn(sk, "Error getting pages. done_pages: %d page_cnt: %d\n",
+                       done_pages, page_cnt);
+       for (; done_pages > 0; done_pages--)
+               put_page(pages[done_pages - 1]);
+
+       return -1;
+}
+
+void sdp_put_pages(struct sock *sk, struct page **pages, int page_cnt)
+{
+       int i;
+       sdp_dbg_data(sk, "count: %d\n", page_cnt);
+
+       for (i = 0; i < page_cnt; i++) {
+               set_page_dirty_lock(pages[i]);
+               page_cache_release(pages[i]);
+       }
+}
+
+static int sdp_map_dma(struct sock *sk, u64 *addrs, int page_cnt,
+               struct page **pages, int offset, int len)
+{
+       struct ib_device *dev = sdp_sk(sk)->ib_device;
+       int i;
+       sdp_dbg_data(sk, "offset: %d len: %d\n", offset, len);
+
+       for (i = 0; i < page_cnt; i++) {
+               int o = 0;
+               int l = PAGE_SIZE;
+
+               if (i == page_cnt - 1) {
+                       /* Last page */
+                       l = (len + offset) & (PAGE_SIZE - 1);
+                       if (l == 0)
+                               l = PAGE_SIZE;
+               }
+
+//             sdp_dbg_data(sk, "mapping %03d: page: %p o: %d l: %d\n",
+//                             i, pages[i], o, l);
+
+               addrs[i] = ib_dma_map_page(dev,
+                               pages[i],
+                               o,
+                               l,
+                               DMA_TO_DEVICE);
+               if (ib_dma_mapping_error(dev, addrs[i])) {
+                       sdp_warn(sk, "Error mapping page %p off: %d len: %d\n",
+                                       pages[i], o, l);
+                       goto err;
+               }
+       }
+
+       return 0;
+err:
+       for (; i > 0; i--) {
+               ib_dma_unmap_page(dev, addrs[i], PAGE_SIZE, DMA_TO_DEVICE);
+       }
+       return -1;
+}
+
+static void sdp_unmap_dma(struct sock *sk, u64 *addrs, int page_cnt)
+{
+       int i;
+       struct ib_device *dev = sdp_sk(sk)->ib_device;
+
+       sdp_dbg_data(sk, "count: %d\n", page_cnt);
+
+       for (i = 0; i < page_cnt; i++) {
+               ib_dma_unmap_page(dev, addrs[i], 4096, DMA_TO_DEVICE);
+       }
+}
+
+static int sdp_map_dma_sge(struct sock *sk, struct ib_sge *sge, int page_cnt,
+               struct page **pages, int offset, int len)
+{
+       struct ib_device *dev = sdp_sk(sk)->ib_device;
+       int i;
+       int left = len;
+       sdp_dbg_data(sk, "offset: %d len: %d\n", offset, len);
+
+       for (i = 0; i < page_cnt; i++) {
+               int o = i == 0 ? offset : 0;
+               int l = MIN(left, PAGE_SIZE - o);
+
+               sge[i].lkey = sdp_sk(sk)->sdp_dev->mr->lkey;
+               sge[i].length = l;
+
+               sge[i].addr = ib_dma_map_page(dev,
+                               pages[i],
+                               0, /* map page with offset (fbo) not working */
+                               l + o, /* compensate on 0 offset */
+                               DMA_FROM_DEVICE);
+               if (ib_dma_mapping_error(dev, sge[i].addr)) {
+                       sdp_warn(sk, "Error map page 0x%llx off: %d len: %d\n",
+                                       sge[i].addr, o, l);
+                       goto err;
+               }
+               sge[i].addr += o;
+
+               sdp_dbg_data(sk, "mapping %03d: page: %p o: %d l: %d | addr: 0x%llx length: %d\n",
+                               i, pages[i], o, l, sge[i].addr, sge[i].length);
+               left -= l;
+       }
+
+       WARN_ON(left != 0);
+
+       return 0;
+err:
+       for (; i > 0; i--) {
+               ib_dma_unmap_page(dev, sge[i].addr, PAGE_SIZE, DMA_FROM_DEVICE);
+       }
+       return -1;
+}
+
+static void sdp_unmap_dma_sge(struct sock *sk, struct ib_sge *sge,
+               int page_cnt, int offset, int len)
+{
+       int i;
+       struct ib_device *dev = sdp_sk(sk)->ib_device;
+
+       sdp_dbg_data(sk, "count: %d\n", page_cnt);
+
+       for (i = 0; i < page_cnt; i++) {
+               int l = PAGE_SIZE;
+
+               if (i == page_cnt - 1) {
+                       /* Last page */
+                       l = (len + offset) & (PAGE_SIZE - 1);
+                       if (l == 0)
+                               l = PAGE_SIZE;
+               }
+               ib_dma_unmap_page(dev, sge[i].addr, l, DMA_FROM_DEVICE);
+       }
+}
+
+static struct ib_pool_fmr *sdp_map_fmr(struct sock *sk, int page_cnt,
+               u64 *addrs)
+{
+       struct ib_pool_fmr *fmr;
+       int ret = 0;
+
+       fmr = ib_fmr_pool_map_phys(sdp_sk(sk)->sdp_dev->fmr_pool, addrs,
+                       page_cnt, 0);
+       if (IS_ERR(fmr)) {
+               ret = PTR_ERR(fmr);
+               fmr = NULL;
+               sdp_warn(sk, "Error allocating fmr: %d\n", ret);
+               goto err;
+       }
+
+       return fmr;
+err:
+       return NULL;
+}
+
+enum zcopy_type {
+       SDP_ZCOPY_TYPE_RX,
+       SDP_ZCOPY_TYPE_TX,
+};
+
+static struct tx_srcavail_state *sdp_alloc_tx_sa(struct sock *sk, int offset,
+               int len)
+{
+       struct tx_srcavail_state *tx_sa;
+       int page_cnt;
+
+       page_cnt = PAGE_ALIGN(len + offset) >> PAGE_SHIFT;
+
+       tx_sa = kzalloc(sizeof(struct tx_srcavail_state) +
+                       sizeof(struct page *) * page_cnt + 
+                       sizeof(u64) * page_cnt, GFP_KERNEL);
+       if (!tx_sa)
+               return ERR_PTR(-ENOMEM);
+       
+       tx_sa->pages = (struct page **)(tx_sa+1);
+       tx_sa->page_cnt = page_cnt;
+       tx_sa->addrs = (u64 *)(&tx_sa->pages[page_cnt]);
+
+       return tx_sa;
+}
+
+int sdp_rdma_to_iovec(struct sock *sk, struct iovec *iov, struct sk_buff *skb,
+               int len)
+{
+       struct sdp_sock *ssk = sdp_sk(sk);
+       struct rx_srcavail_state *rx_sa = RX_SRCAVAIL_STATE(skb);
+       int rc = 0;
+       struct ib_send_wr *bad_wr;
+       struct ib_send_wr wr = { 0 };
+       long timeo;
+       struct ib_sge *sge;
+       int sge_left;
+       int copied;
+       int offset;
+
+       sdp_dbg_data(&ssk->isk.sk, "preparing RDMA read."
+               " len: %d. buffer len: %ld\n", len, iov->iov_len);
+       if (len > rx_sa->len)
+               len = rx_sa->len;
+
+       offset = (unsigned long)iov->iov_base & (PAGE_SIZE - 1);
+
+       rx_sa->page_cnt = PAGE_ALIGN(len + offset) >> PAGE_SHIFT;
+       sdp_dbg_data(sk, "page_cnt = %d len:%d offset: %d\n", rx_sa->page_cnt, len, offset);
+
+       rx_sa->pages = 
+               (struct page **) kzalloc(sizeof(struct page *) * rx_sa->page_cnt +
+               sizeof(struct ib_sge) * rx_sa->page_cnt, GFP_KERNEL);
+       if (!rx_sa->pages) {
+               sdp_warn(sk, "Error allocating zcopy context\n");
+               goto err_alloc_zcopy;
+       }
+
+       rx_sa->sge = (struct ib_sge *)(&rx_sa->pages[rx_sa->page_cnt]);
+
+       rc = sdp_get_pages(sk, rx_sa->pages, rx_sa->page_cnt,
+                       (unsigned long)iov->iov_base);
+       if (rc)
+               goto err_get_pages;
+
+       rc = sdp_map_dma_sge(sk, rx_sa->sge, rx_sa->page_cnt, rx_sa->pages,
+                       offset, len);
+       if (rc)
+               goto err_map_dma;
+
+       wr.opcode = IB_WR_RDMA_READ;
+       wr.next = NULL;
+       wr.wr_id = SDP_OP_RDMA;
+       wr.wr.rdma.rkey = rx_sa->rkey;
+       wr.send_flags = 0;
+
+       timeo = sock_sndtimeo(sk, 0);
+
+       ssk->tx_ring.rdma_inflight = rx_sa;
+       copied = 0;
+       sge = rx_sa->sge;
+       sge_left = rx_sa->page_cnt;
+       do {
+               /* Len error when using sge_cnt > 30 ?? */
+               int sge_cnt = min(sge_left, SDP_MAX_SEND_SGES - 2);
+
+               wr.wr.rdma.remote_addr = rx_sa->vaddr + copied + rx_sa->used;
+               wr.num_sge = sge_cnt;
+               wr.sg_list = sge;
+               rx_sa->busy++;
+
+               sdp_dbg_data(sk, "rdma read: sge_cnt: %d vaddr: 0x%llx "
+                       "copied: 0x%x rkey: 0x%x in_bytes: %d\n",
+                       sge_cnt, wr.wr.rdma.remote_addr, copied, rx_sa->rkey,
+                       sge_bytes(sge, sge_cnt));
+               sdp_prf1(sk, NULL, "TX: RDMA read");
+
+               if (sge_left == sge_cnt) {
+                       wr.send_flags = IB_SEND_SIGNALED;
+                       sdp_dbg_data(sk, "last wr is signaled\n");
+               }
+               rc = ib_post_send(ssk->qp, &wr, &bad_wr);
+               if (unlikely(rc)) {
+                       sdp_warn(sk, "ib_post_send failed with status %d.\n",
+                                       rc);
+                       sdp_set_error(&ssk->isk.sk, -ECONNRESET);
+                       wake_up(&ssk->wq);
+                       break;
+               }
+
+               copied += sge_bytes(sge, sge_cnt);
+               sge_left -= sge_cnt;
+               sge += sge_cnt;
+
+               if (unlikely(ssk->srcavail_cancel_mseq > rx_sa->mseq)) {
+                       sdp_warn(sk, "got SrcAvailCancel - Aborting RDMA\n");
+                       rc = -EAGAIN;
+               }
+       } while (!rc && sge_left > 0);
+
+       if (!rc || rc == -EAGAIN) {
+               int got_srcavail_cancel = (rc == -EAGAIN);
+
+               sdp_arm_tx_cq(sk);
+
+               rc = sdp_wait_rdma_wr_finished(ssk, &timeo);
+
+               /* Ignore any data copied after getting SrcAvailCancel */
+               if (!got_srcavail_cancel && !rc) {
+                       sdp_update_iov_used(sk, iov, copied);
+                       rx_sa->used += copied;
+                       atomic_add(copied, &ssk->rcv_nxt);
+               }
+       }
+
+       if (rc) {
+               /* post rdma, wait_for_compl or post rdma_rd_comp failed - 
+                * post sendsm */
+               sdp_warn(sk, "post rdma, wait_for_compl "
+                       "or post rdma_rd_comp failed - post sendsm\n");
+               rx_sa->aborted = 1;
+       }
+
+       ssk->tx_ring.rdma_inflight = NULL;
+
+       sdp_unmap_dma_sge(sk, rx_sa->sge, rx_sa->page_cnt, offset, len);
+err_map_dma:   
+       sdp_put_pages(sk, rx_sa->pages, rx_sa->page_cnt);
+err_get_pages:
+       kfree(rx_sa->pages);
+       rx_sa->pages = NULL;
+       rx_sa->sge = NULL;
+err_alloc_zcopy:       
+
+       return rc;
+}
+
+static inline int wait_for_sndbuf(struct sock *sk, long *timeo_p)
+{
+       struct sdp_sock *ssk = sdp_sk(sk);
+       int ret = 0;
+       sdp_dbg_data(sk, "Wait for mem\n");
+
+       set_bit(SOCK_NOSPACE, &sk->sk_socket->flags);
+
+       SDPSTATS_COUNTER_INC(send_wait_for_mem);
+
+       sdp_do_posts(ssk);
+
+       sdp_xmit_poll(ssk, 1);
+
+       ret = sdp_bzcopy_wait_memory(ssk, timeo_p, &dummy_bz);
+
+       return ret;
+}
+
+int sdp_sendmsg_zcopy(struct kiocb *iocb, struct sock *sk, struct msghdr *msg,
+               size_t size)
+{
+       struct sdp_sock *ssk = sdp_sk(sk);
+       int iovlen, flags;
+       struct iovec *iov = NULL;
+       int rc = 0;
+       long timeo;
+       struct tx_srcavail_state *tx_sa;
+       int offset;
+       int copied = 0;
+
+       int bytes_left;
+       int pages_left;
+       u64 *addrs;
+       unsigned long lock_flags;
+
+       sdp_dbg_data(sk, "%s\n", __func__);
+
+       lock_sock(sk);
+
+       SDPSTATS_COUNTER_INC(sendmsg_zcopy_segment);
+
+       posts_handler_get(ssk);
+
+       flags = msg->msg_flags;
+       timeo = sock_sndtimeo(sk, 0);
+
+       /* Wait for a connection to finish. */
+       if ((1 << sk->sk_state) & ~(TCPF_ESTABLISHED | TCPF_CLOSE_WAIT))
+               if ((rc = sk_stream_wait_connect(sk, &timeo)) != 0)
+                       goto err;
+
+       /* This should be in poll */
+       clear_bit(SOCK_ASYNC_NOSPACE, &sk->sk_socket->flags);
+
+       /* Ok commence sending. */
+       iovlen = msg->msg_iovlen;
+       iov = msg->msg_iov;
+       offset = (unsigned long)iov->iov_base & (PAGE_SIZE - 1);
+       sdp_dbg_data(sk, "Sending iov: %p, iovlen: %ld, size: %ld\n",
+                       iov->iov_base, iov->iov_len, size);
+
+       SDPSTATS_HIST(sendmsg_seglen, iov->iov_len);
+
+       if (iovlen > 1) {
+               sdp_warn(sk, "iovlen > 1 not supported\n");
+               rc = -ENOTSUPP;
+               goto err;
+       }
+
+       if (sk->sk_err || (sk->sk_shutdown & SEND_SHUTDOWN)) {
+               rc = -EPIPE;
+               goto err;
+       }
+
+       tx_sa = sdp_alloc_tx_sa(sk, offset, iov->iov_len);
+       if (IS_ERR(tx_sa)) {
+               sdp_warn(sk, "Error allocating zcopy context\n");
+               goto err_alloc_tx_sa;
+       }
+
+       rc = sdp_get_pages(sk, tx_sa->pages, tx_sa->page_cnt,
+                       (unsigned long)iov->iov_base);
+       if (rc)
+               goto err_get_pages;
+
+       rc = sdp_map_dma(sk, tx_sa->addrs, tx_sa->page_cnt, tx_sa->pages,
+                       offset, iov->iov_len);
+       if (rc)
+               goto err_map_dma;
+
+       bytes_left = iov->iov_len;
+       pages_left = tx_sa->page_cnt;
+       addrs = tx_sa->addrs;
+       do {
+               int page_cnt = min(256, pages_left);
+               int len = min_t(int, page_cnt * PAGE_SIZE - offset, bytes_left);
+               sdp_dbg_data(sk, "bytes_left: %d\n", bytes_left);
+
+               if (!sdp_bzcopy_slots_avail(ssk, &dummy_bz)) {
+                       rc = wait_for_sndbuf(sk, &timeo);
+                       if (rc) {
+                               sdp_warn(sk, "Couldn't get send buffer\n");
+                               break;
+                       }
+               }
+
+               tx_sa->fmr = sdp_map_fmr(sk, page_cnt, addrs);
+               if (!tx_sa->fmr) {
+                       rc = -ENOMEM;
+                       break;
+               }
+
+               tx_sa->bytes_completed = 0;
+               tx_sa->bytes_total = len;
+               rc = sdp_post_srcavail(sk, tx_sa, offset, len);
+               if (rc)
+                       goto err_abort_send;
+
+               rc = sdp_wait_rdmardcompl(ssk, &timeo, len, 0);
+               if (unlikely(rc)) {
+                       switch (rc) {
+                       case -EAGAIN: /* Got SendSM */
+                               sdp_warn(sk, "got SendSM. use SEND verb.\n");
+                               goto err_abort_send;
+
+                       case -ETIME: /* Timedout */
+                               sdp_warn(sk, "TX srcavail timedout.\n");
+                       case -EINTR: /* interrupted */
+                               sdp_prf1(sk, NULL, "Aborting send.");
+                               sdp_post_srcavail_cancel(sk);
+
+                               /* Wait for RdmaRdCompl/SendSM to
+                                * finish the transaction */
+                               sdp_wait_rdmardcompl(ssk, &timeo, len, 1);
+
+                               goto err_abort_send;
+
+                       default:
+                               sdp_warn(sk, "error sending srcavail. rc = %d\n", rc);
+                               /* Socked destroyed while waited */
+                               goto err_abort_send;
+                       }
+               }
+               sdp_prf1(sk, NULL, "got RdmaRdCompl");
+               copied += len;
+               bytes_left -= len;
+               pages_left -= page_cnt;
+               addrs += page_cnt;
+               offset = 0;
+
+               sdp_update_iov_used(sk, iov, tx_sa->bytes_completed);
+
+err_abort_send:
+               ib_fmr_pool_unmap(tx_sa->fmr);
+               spin_lock_irqsave(&ssk->tx_sa_lock, lock_flags);
+               ssk->tx_sa = NULL;
+               spin_unlock_irqrestore(&ssk->tx_sa_lock, lock_flags);
+       } while (!rc && !tx_sa->abort && pages_left > 0);
+
+       sdp_unmap_dma(sk, tx_sa->addrs, tx_sa->page_cnt);
+err_map_dma:   
+       sdp_put_pages(sk, tx_sa->pages, tx_sa->page_cnt);
+err_get_pages:
+       kfree(tx_sa);
+err_alloc_tx_sa:
+err:
+
+       sdp_prf1(sk, NULL, "Finshed RDMA rc: %d copied: %d", rc, copied);
+       posts_handler_put(ssk);
+       release_sock(sk);
+
+       return rc ?: copied;
+}
+