#define RDS_IB_RECYCLE_BATCH_COUNT     32
 
 #define RDS_IB_WC_MAX                  32
+#define RDS_IB_SEND_OP                 BIT_ULL(63)
 
 extern struct rw_semaphore rds_ib_devices_lock;
 extern struct list_head rds_ib_devices;
        struct ib_pd            *i_pd;
        struct ib_cq            *i_send_cq;
        struct ib_cq            *i_recv_cq;
+       struct ib_wc            i_send_wc[RDS_IB_WC_MAX];
        struct ib_wc            i_recv_wc[RDS_IB_WC_MAX];
 
        /* interrupt handling */
+       struct tasklet_struct   i_send_tasklet;
        struct tasklet_struct   i_recv_tasklet;
 
        /* tx */
 struct rds_ib_statistics {
        uint64_t        s_ib_connect_raced;
        uint64_t        s_ib_listen_closed_stale;
-       uint64_t        s_ib_tx_cq_call;
        uint64_t        s_ib_evt_handler_call;
        uint64_t        s_ib_tasklet_call;
        uint64_t        s_ib_tx_cq_event;
 void rds_ib_xmit_complete(struct rds_connection *conn);
 int rds_ib_xmit(struct rds_connection *conn, struct rds_message *rm,
                unsigned int hdr_off, unsigned int sg, unsigned int off);
-void rds_ib_send_cq_comp_handler(struct ib_cq *cq, void *context);
+void rds_ib_send_cqe_handler(struct rds_ib_connection *ic, struct ib_wc *wc);
 void rds_ib_send_init_ring(struct rds_ib_connection *ic);
 void rds_ib_send_clear_ring(struct rds_ib_connection *ic);
 int rds_ib_xmit_rdma(struct rds_connection *conn, struct rm_rdma_op *op);
 
                        rdsdebug("wc wr_id 0x%llx status %u byte_len %u imm_data %u\n",
                                 (unsigned long long)wc->wr_id, wc->status,
                                 wc->byte_len, be32_to_cpu(wc->ex.imm_data));
-                       rds_ib_recv_cqe_handler(ic, wc, ack_state);
+
+                       if (wc->wr_id & RDS_IB_SEND_OP)
+                               rds_ib_send_cqe_handler(ic, wc);
+                       else
+                               rds_ib_recv_cqe_handler(ic, wc, ack_state);
                }
        }
 }
 
+static void rds_ib_tasklet_fn_send(unsigned long data)
+{
+       struct rds_ib_connection *ic = (struct rds_ib_connection *)data;
+       struct rds_connection *conn = ic->conn;
+       struct rds_ib_ack_state state;
+
+       rds_ib_stats_inc(s_ib_tasklet_call);
+
+       memset(&state, 0, sizeof(state));
+       poll_cq(ic, ic->i_send_cq, ic->i_send_wc, &state);
+       ib_req_notify_cq(ic->i_send_cq, IB_CQ_NEXT_COMP);
+       poll_cq(ic, ic->i_send_cq, ic->i_send_wc, &state);
+
+       if (rds_conn_up(conn) &&
+           (!test_bit(RDS_LL_SEND_FULL, &conn->c_flags) ||
+           test_bit(0, &conn->c_map_queued)))
+               rds_send_xmit(ic->conn);
+}
+
 static void rds_ib_tasklet_fn_recv(unsigned long data)
 {
        struct rds_ib_connection *ic = (struct rds_ib_connection *)data;
        }
 }
 
+static void rds_ib_cq_comp_handler_send(struct ib_cq *cq, void *context)
+{
+       struct rds_connection *conn = context;
+       struct rds_ib_connection *ic = conn->c_transport_data;
+
+       rdsdebug("conn %p cq %p\n", conn, cq);
+
+       rds_ib_stats_inc(s_ib_evt_handler_call);
+
+       tasklet_schedule(&ic->i_send_tasklet);
+}
+
 /*
  * This needs to be very careful to not leave IS_ERR pointers around for
  * cleanup to trip over.
        ic->i_pd = rds_ibdev->pd;
 
        cq_attr.cqe = ic->i_send_ring.w_nr + 1;
-       ic->i_send_cq = ib_create_cq(dev, rds_ib_send_cq_comp_handler,
+
+       ic->i_send_cq = ib_create_cq(dev, rds_ib_cq_comp_handler_send,
                                     rds_ib_cq_event_handler, conn,
                                     &cq_attr);
        if (IS_ERR(ic->i_send_cq)) {
                wait_event(rds_ib_ring_empty_wait,
                           rds_ib_ring_empty(&ic->i_recv_ring) &&
                           (atomic_read(&ic->i_signaled_sends) == 0));
+               tasklet_kill(&ic->i_send_tasklet);
                tasklet_kill(&ic->i_recv_tasklet);
 
                /* first destroy the ib state that generates callbacks */
        }
 
        INIT_LIST_HEAD(&ic->ib_node);
+       tasklet_init(&ic->i_send_tasklet, rds_ib_tasklet_fn_send,
+                    (unsigned long)ic);
        tasklet_init(&ic->i_recv_tasklet, rds_ib_tasklet_fn_recv,
-                    (unsigned long) ic);
+                    (unsigned long)ic);
        mutex_init(&ic->i_recv_mutex);
 #ifndef KERNEL_HAS_ATOMIC64
        spin_lock_init(&ic->i_ack_lock);
 
 
                send->s_op = NULL;
 
-               send->s_wr.wr_id = i;
+               send->s_wr.wr_id = i | RDS_IB_SEND_OP;
                send->s_wr.sg_list = send->s_sge;
                send->s_wr.ex.imm_data = 0;
 
  * unallocs the next free entry in the ring it doesn't alter which is
  * the next to be freed, which is what this is concerned with.
  */
-void rds_ib_send_cq_comp_handler(struct ib_cq *cq, void *context)
+void rds_ib_send_cqe_handler(struct rds_ib_connection *ic, struct ib_wc *wc)
 {
-       struct rds_connection *conn = context;
-       struct rds_ib_connection *ic = conn->c_transport_data;
        struct rds_message *rm = NULL;
-       struct ib_wc wc;
+       struct rds_connection *conn = ic->conn;
        struct rds_ib_send_work *send;
        u32 completed;
        u32 oldest;
        u32 i = 0;
-       int ret;
        int nr_sig = 0;
 
-       rdsdebug("cq %p conn %p\n", cq, conn);
-       rds_ib_stats_inc(s_ib_tx_cq_call);
-       ret = ib_req_notify_cq(cq, IB_CQ_NEXT_COMP);
-       if (ret)
-               rdsdebug("ib_req_notify_cq send failed: %d\n", ret);
-
-       while (ib_poll_cq(cq, 1, &wc) > 0) {
-               rdsdebug("wc wr_id 0x%llx status %u (%s) byte_len %u imm_data %u\n",
-                        (unsigned long long)wc.wr_id, wc.status,
-                        ib_wc_status_msg(wc.status), wc.byte_len,
-                        be32_to_cpu(wc.ex.imm_data));
-               rds_ib_stats_inc(s_ib_tx_cq_event);
-
-               if (wc.wr_id == RDS_IB_ACK_WR_ID) {
-                       if (time_after(jiffies, ic->i_ack_queued + HZ/2))
-                               rds_ib_stats_inc(s_ib_tx_stalled);
-                       rds_ib_ack_send_complete(ic);
-                       continue;
-               }
 
-               oldest = rds_ib_ring_oldest(&ic->i_send_ring);
+       rdsdebug("wc wr_id 0x%llx status %u (%s) byte_len %u imm_data %u\n",
+                (unsigned long long)wc->wr_id, wc->status,
+                ib_wc_status_msg(wc->status), wc->byte_len,
+                be32_to_cpu(wc->ex.imm_data));
+       rds_ib_stats_inc(s_ib_tx_cq_event);
 
-               completed = rds_ib_ring_completed(&ic->i_send_ring, wc.wr_id, oldest);
+       if (wc->wr_id == RDS_IB_ACK_WR_ID) {
+               if (time_after(jiffies, ic->i_ack_queued + HZ / 2))
+                       rds_ib_stats_inc(s_ib_tx_stalled);
+               rds_ib_ack_send_complete(ic);
+               return;
+       }
 
-               for (i = 0; i < completed; i++) {
-                       send = &ic->i_sends[oldest];
-                       if (send->s_wr.send_flags & IB_SEND_SIGNALED)
-                               nr_sig++;
+       oldest = rds_ib_ring_oldest(&ic->i_send_ring);
 
-                       rm = rds_ib_send_unmap_op(ic, send, wc.status);
+       completed = rds_ib_ring_completed(&ic->i_send_ring,
+                                         (wc->wr_id & ~RDS_IB_SEND_OP),
+                                         oldest);
 
-                       if (time_after(jiffies, send->s_queued + HZ/2))
-                               rds_ib_stats_inc(s_ib_tx_stalled);
+       for (i = 0; i < completed; i++) {
+               send = &ic->i_sends[oldest];
+               if (send->s_wr.send_flags & IB_SEND_SIGNALED)
+                       nr_sig++;
 
-                       if (send->s_op) {
-                               if (send->s_op == rm->m_final_op) {
-                                       /* If anyone waited for this message to get flushed out, wake
-                                        * them up now */
-                                       rds_message_unmapped(rm);
-                               }
-                               rds_message_put(rm);
-                               send->s_op = NULL;
-                       }
+               rm = rds_ib_send_unmap_op(ic, send, wc->status);
 
-                       oldest = (oldest + 1) % ic->i_send_ring.w_nr;
-               }
+               if (time_after(jiffies, send->s_queued + HZ / 2))
+                       rds_ib_stats_inc(s_ib_tx_stalled);
 
-               rds_ib_ring_free(&ic->i_send_ring, completed);
-               rds_ib_sub_signaled(ic, nr_sig);
-               nr_sig = 0;
-
-               if (test_and_clear_bit(RDS_LL_SEND_FULL, &conn->c_flags) ||
-                   test_bit(0, &conn->c_map_queued))
-                       queue_delayed_work(rds_wq, &conn->c_send_w, 0);
-
-               /* We expect errors as the qp is drained during shutdown */
-               if (wc.status != IB_WC_SUCCESS && rds_conn_up(conn)) {
-                       rds_ib_conn_error(conn, "send completion on %pI4 had status "
-                                         "%u (%s), disconnecting and reconnecting\n",
-                                         &conn->c_faddr, wc.status,
-                                         ib_wc_status_msg(wc.status));
+               if (send->s_op) {
+                       if (send->s_op == rm->m_final_op) {
+                               /* If anyone waited for this message to get
+                                * flushed out, wake them up now
+                                */
+                               rds_message_unmapped(rm);
+                       }
+                       rds_message_put(rm);
+                       send->s_op = NULL;
                }
+
+               oldest = (oldest + 1) % ic->i_send_ring.w_nr;
+       }
+
+       rds_ib_ring_free(&ic->i_send_ring, completed);
+       rds_ib_sub_signaled(ic, nr_sig);
+       nr_sig = 0;
+
+       if (test_and_clear_bit(RDS_LL_SEND_FULL, &conn->c_flags) ||
+           test_bit(0, &conn->c_map_queued))
+               queue_delayed_work(rds_wq, &conn->c_send_w, 0);
+
+       /* We expect errors as the qp is drained during shutdown */
+       if (wc->status != IB_WC_SUCCESS && rds_conn_up(conn)) {
+               rds_ib_conn_error(conn, "send completion on %pI4 had status %u (%s), disconnecting and reconnecting\n",
+                                 &conn->c_faddr, wc->status,
+                                 ib_wc_status_msg(wc->status));
        }
 }