}
static void poll_cq(struct rds_ib_connection *ic, struct ib_cq *cq,
- struct ib_wc *wcs,
- struct rds_ib_ack_state *ack_state)
+ struct ib_wc *wcs,
+ struct rds_ib_ack_state *ack_state,
+ unsigned int rx)
{
int nr;
int i;
while ((nr = ib_poll_cq(cq, RDS_WC_MAX, wcs)) > 0) {
for (i = 0; i < nr; i++) {
+ if (rx) {
+ if ((++ic->i_rx_poll_cq % RDS_IB_RX_LIMIT)
+ == 0) {
+ rdsdebug("connection "
+ "<%u.%u.%u.%u,%u.%u.%u.%u,%d> "
+ "RX poll_cq processed %d\n",
+ NIPQUAD(ic->conn->c_laddr),
+ NIPQUAD(ic->conn->c_faddr),
+ ic->conn->c_tos,
+ ic->i_rx_poll_cq);
+ }
+ }
wc = wcs + i;
rdsdebug("wc wr_id 0x%llx status %u byte_len %u imm_data %u\n",
(unsigned long long)wc->wr_id, wc->status, wc->byte_len,
else
rds_ib_recv_cqe_handler(ic, wc, ack_state);
}
+
+ if (rx && ic->i_rx_poll_cq >= RDS_IB_RX_LIMIT)
+ break;
+
}
}
memset(&ack_state, 0, sizeof(ack_state));
rds_ib_stats_inc(s_ib_tasklet_call);
- poll_cq(ic, ic->i_scq, ic->i_send_wc, &ack_state);
+ poll_cq(ic, ic->i_scq, ic->i_send_wc, &ack_state, 0);
ib_req_notify_cq(ic->i_scq, IB_CQ_NEXT_COMP);
- poll_cq(ic, ic->i_scq, ic->i_send_wc, &ack_state);
+ poll_cq(ic, ic->i_scq, ic->i_send_wc, &ack_state, 0);
if (rds_conn_up(conn) &&
(!test_bit(RDS_LL_SEND_FULL, &conn->c_flags) ||
rds_send_xmit(ic->conn);
}
-void rds_ib_tasklet_fn_recv(unsigned long data)
+/*
+ * Note: rds_ib_rx(): don't call with irqs disabled.
+ * It calls rds_send_drop_acked() which calls other
+ * routines that reach into rds_rdma_free_op()
+ * where irqs_disabled() warning is asserted!
+ */
+static void rds_ib_rx(struct rds_ib_connection *ic)
{
- struct rds_ib_connection *ic = (struct rds_ib_connection *) data;
struct rds_connection *conn = ic->conn;
struct rds_ib_ack_state ack_state;
struct rds_ib_device *rds_ibdev = ic->rds_ibdev;
memset(&ack_state, 0, sizeof(ack_state));
- poll_cq(ic, ic->i_rcq, ic->i_recv_wc, &ack_state);
+ ic->i_rx_poll_cq = 0;
+ poll_cq(ic, ic->i_rcq, ic->i_recv_wc, &ack_state, 1);
ib_req_notify_cq(ic->i_rcq, IB_CQ_SOLICITED);
- poll_cq(ic, ic->i_rcq, ic->i_recv_wc, &ack_state);
+ poll_cq(ic, ic->i_rcq, ic->i_recv_wc, &ack_state, 1);
if (ack_state.ack_next_valid)
rds_ib_set_ack(ic, ack_state.ack_next, ack_state.ack_required);
if ((atomic_read(&rds_ibdev->srq->s_num_posted) <
rds_ib_srq_hwm_refill) &&
!test_and_set_bit(0, &rds_ibdev->srq->s_refill_gate))
- queue_delayed_work(rds_wq, &rds_ibdev->srq->s_refill_w, 0);
+ queue_delayed_work(rds_wq,
+ &rds_ibdev->srq->s_refill_w, 0);
+
+ if (ic->i_rx_poll_cq >= RDS_IB_RX_LIMIT) {
+ ic->i_rx_w.ic = ic;
+ queue_delayed_work(rds_aux_wq, &ic->i_rx_w.dlywork,
+ msecs_to_jiffies(10));
+ ic->i_rx_wait_for_handler = 1;
+ }
+}
+
+void rds_ib_tasklet_fn_recv(unsigned long data)
+{
+ struct rds_ib_connection *ic = (struct rds_ib_connection *) data;
+
+ spin_lock(&ic->i_rx_lock);
+ if (ic->i_rx_wait_for_handler)
+ goto out;
+ rds_ib_rx(ic);
+out:
+ spin_unlock(&ic->i_rx_lock);
+}
+
+static void rds_ib_rx_handler(struct work_struct *workarg)
+{
+ struct delayed_work *delayedwork =
+ container_of(workarg, struct delayed_work, work);
+ struct rds_ib_rx_work *rirwork =
+ container_of(delayedwork, struct rds_ib_rx_work, dlywork);
+ struct rds_ib_connection *ic = rirwork->ic;
+
+ spin_lock(&ic->i_rx_lock);
+ ic->i_rx_wait_for_handler = 0;
+ rds_ib_rx(ic);
+ spin_unlock(&ic->i_rx_lock);
}
static void rds_ib_qp_event_handler(struct ib_event *event, void *data)
}
/* quiesce tx and rx completion before tearing down */
- wait_event(rds_ib_ring_empty_wait,
- rds_ib_ring_empty(&ic->i_recv_ring) &&
- (atomic_read(&ic->i_signaled_sends) == 0));
+ while (!wait_event_timeout(rds_ib_ring_empty_wait,
+ rds_ib_ring_empty(&ic->i_recv_ring) &&
+ (atomic_read(&ic->i_signaled_sends) == 0),
+ msecs_to_jiffies(5000))) {
+
+ if (!rds_ib_ring_empty(&ic->i_recv_ring)) {
+ spin_lock(&ic->i_rx_lock);
+ rds_ib_rx(ic);
+ spin_unlock(&ic->i_rx_lock);
+ }
+ }
+
tasklet_kill(&ic->i_stasklet);
tasklet_kill(&ic->i_rtasklet);
spin_lock_init(&ic->i_ack_lock);
#endif
atomic_set(&ic->i_signaled_sends, 0);
+ spin_lock_init(&ic->i_rx_lock);
/*
* rds_ib_conn_shutdown() waits for these to be emptied so they
init_completion(&ic->i_last_wqe_complete);
INIT_DELAYED_WORK(&ic->i_migrate_w.work, rds_ib_migrate);
+ INIT_DELAYED_WORK(&ic->i_rx_w.dlywork, rds_ib_rx_handler);
spin_lock_irqsave(&ib_nodev_conns_lock, flags);
list_add_tail(&ic->ib_node, &ib_nodev_conns);