while ((nr = ib_poll_cq(cq, RDS_WC_MAX, wcs)) > 0) {
for (i = 0; i < nr; i++) {
- if (rx) {
- if ((++ic->i_rx_poll_cq % RDS_IB_RX_LIMIT)
- == 0) {
- rdsdebug("connection "
- "<%u.%u.%u.%u,%u.%u.%u.%u,%d> "
- "RX poll_cq processed %d\n",
- NIPQUAD(ic->conn->c_laddr),
- NIPQUAD(ic->conn->c_faddr),
- ic->conn->c_tos,
- ic->i_rx_poll_cq);
- }
- }
+
+ if (rx &&
+ (++ic->i_rx_poll_cq % RDS_IB_RX_LIMIT) == 0)
+ cond_resched();
+
wc = wcs + i;
rdsdebug("wc wr_id 0x%llx status %u byte_len %u imm_data %u\n",
(unsigned long long)wc->wr_id, wc->status, wc->byte_len,
else
rds_ib_recv_cqe_handler(ic, wc, ack_state);
}
-
- if (rx && ic->i_rx_poll_cq >= RDS_IB_RX_LIMIT)
- break;
-
}
}
rds_send_xmit(ic->conn);
}
-/*
- * Note: rds_ib_rx(): don't call with irqs disabled.
- * It calls rds_send_drop_acked() which calls other
- * routines that reach into rds_rdma_free_op()
- * where irqs_disabled() warning is asserted!
- */
-static void rds_ib_rx(struct rds_ib_connection *ic)
+void rds_ib_tasklet_fn_recv(unsigned long data)
{
+ struct rds_ib_connection *ic = (struct rds_ib_connection *) data;
struct rds_connection *conn = ic->conn;
struct rds_ib_ack_state ack_state;
struct rds_ib_device *rds_ibdev = ic->rds_ibdev;
if (ack_state.ack_next_valid)
rds_ib_set_ack(ic, ack_state.ack_next, ack_state.ack_required);
+
if (ack_state.ack_recv_valid && ack_state.ack_recv > ic->i_ack_recv) {
rds_send_drop_acked(conn, ack_state.ack_recv, NULL);
ic->i_ack_recv = ack_state.ack_recv;
}
+
if (rds_conn_up(conn))
rds_ib_attempt_ack(ic);
if (rds_ib_srq_enabled)
if ((atomic_read(&rds_ibdev->srq->s_num_posted) <
- rds_ib_srq_hwm_refill) &&
- !test_and_set_bit(0, &rds_ibdev->srq->s_refill_gate))
- queue_delayed_work(rds_wq,
- &rds_ibdev->srq->s_refill_w, 0);
-
- if (ic->i_rx_poll_cq >= RDS_IB_RX_LIMIT) {
- ic->i_rx_w.ic = ic;
- queue_delayed_work(rds_aux_wq, &ic->i_rx_w.dlywork,
- msecs_to_jiffies(10));
- ic->i_rx_wait_for_handler = 1;
- }
+ rds_ib_srq_hwm_refill) &&
+ !test_and_set_bit(0, &rds_ibdev->srq->s_refill_gate))
+ queue_delayed_work(rds_wq, &rds_ibdev->srq->s_refill_w, 0);
}
-void rds_ib_tasklet_fn_recv(unsigned long data)
-{
- struct rds_ib_connection *ic = (struct rds_ib_connection *) data;
-
- spin_lock(&ic->i_rx_lock);
- if (ic->i_rx_wait_for_handler)
- goto out;
- rds_ib_rx(ic);
-out:
- spin_unlock(&ic->i_rx_lock);
-}
-
-static void rds_ib_rx_handler(struct work_struct *workarg)
-{
- struct delayed_work *delayedwork =
- container_of(workarg, struct delayed_work, work);
- struct rds_ib_rx_work *rirwork =
- container_of(delayedwork, struct rds_ib_rx_work, dlywork);
- struct rds_ib_connection *ic = rirwork->ic;
-
- spin_lock(&ic->i_rx_lock);
- ic->i_rx_wait_for_handler = 0;
- rds_ib_rx(ic);
- spin_unlock(&ic->i_rx_lock);
-}
static void rds_ib_qp_event_handler(struct ib_event *event, void *data)
{
}
/* quiesce tx and rx completion before tearing down */
- while (!wait_event_timeout(rds_ib_ring_empty_wait,
- rds_ib_ring_empty(&ic->i_recv_ring) &&
- (atomic_read(&ic->i_signaled_sends) == 0),
- msecs_to_jiffies(5000))) {
-
- if (!rds_ib_ring_empty(&ic->i_recv_ring)) {
- spin_lock(&ic->i_rx_lock);
- rds_ib_rx(ic);
- spin_unlock(&ic->i_rx_lock);
- }
- }
+ wait_event(rds_ib_ring_empty_wait,
+ rds_ib_ring_empty(&ic->i_recv_ring) &&
+ (atomic_read(&ic->i_signaled_sends) == 0));
tasklet_kill(&ic->i_stasklet);
tasklet_kill(&ic->i_rtasklet);
spin_lock_init(&ic->i_ack_lock);
#endif
atomic_set(&ic->i_signaled_sends, 0);
- spin_lock_init(&ic->i_rx_lock);
/*
* rds_ib_conn_shutdown() waits for these to be emptied so they
init_completion(&ic->i_last_wqe_complete);
INIT_DELAYED_WORK(&ic->i_migrate_w.work, rds_ib_migrate);
- INIT_DELAYED_WORK(&ic->i_rx_w.dlywork, rds_ib_rx_handler);
spin_lock_irqsave(&ib_nodev_conns_lock, flags);
list_add_tail(&ic->ib_node, &ib_nodev_conns);