}
}
+ atomic_set(&ic->i_destroying, 0);
+
if (conn->c_version < RDS_PROTOCOL_VERSION) {
if (conn->c_version != RDS_PROTOCOL_COMPAT_VERSION) {
printk(KERN_NOTICE "RDS/IB: Connection to"
static void poll_cq(struct rds_ib_connection *ic, struct ib_cq *cq,
struct ib_wc *wcs,
struct rds_ib_ack_state *ack_state,
- unsigned int rx)
+ unsigned int rx, int no_break)
{
int nr;
int i;
rds_ib_recv_cqe_handler(ic, wc, ack_state);
}
- if (rx && ic->i_rx_poll_cq >= RDS_IB_RX_LIMIT)
- break;
+ if (rx && ic->i_rx_poll_cq >= RDS_IB_RX_LIMIT) {
+ if (no_break)
+ ic->i_rx_poll_cq = 0;
+ else
+ break;
+ }
}
}
-void rds_ib_tasklet_fn_send(unsigned long data)
+static void rds_ib_tx(struct rds_ib_connection *ic, int skip_state)
{
- struct rds_ib_connection *ic = (struct rds_ib_connection *) data;
struct rds_connection *conn = ic->conn;
struct rds_ib_ack_state ack_state;
memset(&ack_state, 0, sizeof(ack_state));
rds_ib_stats_inc(s_ib_tasklet_call);
- /* if send cq has been destroyed, ignore incoming cq event */
- if (!ic->i_scq)
+ /* if connection is under destroying or is destroyed,
+ * ignore incoming cq event
+ */
+ if (unlikely(atomic_read(&ic->i_destroying)) && !skip_state)
return;
- poll_cq(ic, ic->i_scq, ic->i_send_wc, &ack_state, 0);
+ poll_cq(ic, ic->i_scq, ic->i_send_wc, &ack_state, 0, skip_state);
ib_req_notify_cq(ic->i_scq, IB_CQ_NEXT_COMP);
- poll_cq(ic, ic->i_scq, ic->i_send_wc, &ack_state, 0);
+ poll_cq(ic, ic->i_scq, ic->i_send_wc, &ack_state, 0, skip_state);
if (rds_conn_up(conn) &&
(!test_bit(RDS_LL_SEND_FULL, &conn->c_flags) ||
rds_send_xmit(ic->conn);
}
+static void rds_ib_final_tx(struct rds_ib_connection *ic)
+{
+ if (ic->i_scq)
+ rds_ib_tx(ic, 1);
+}
+
+void rds_ib_tasklet_fn_send(unsigned long data)
+{
+ struct rds_ib_connection *ic = (struct rds_ib_connection *) data;
+
+ rds_ib_tx(ic, 0);
+}
+
/*
* Note: rds_ib_rx(): don't call with irqs disabled.
* It calls rds_send_drop_acked() which calls other
* routines that reach into rds_rdma_free_op()
* where irqs_disabled() warning is asserted!
+ *
+ * skip_state is set true only when/before we are going to destroy the CQs to
+ * have a final reap on the receive CQ. In this case, i_destroying is expected
+ * positive to avoid other threads poll the same CQ in parallel.
*/
-static void rds_ib_rx(struct rds_ib_connection *ic)
+static void rds_ib_rx(struct rds_ib_connection *ic, int skip_state)
{
struct rds_connection *conn = ic->conn;
struct rds_ib_ack_state ack_state;
rds_ib_stats_inc(s_ib_tasklet_call);
+ if (unlikely(atomic_read(&ic->i_destroying)) && !skip_state)
+ return;
+
memset(&ack_state, 0, sizeof(ack_state));
ic->i_rx_poll_cq = 0;
- poll_cq(ic, ic->i_rcq, ic->i_recv_wc, &ack_state, 1);
+ poll_cq(ic, ic->i_rcq, ic->i_recv_wc, &ack_state, 1, skip_state);
ib_req_notify_cq(ic->i_rcq, IB_CQ_SOLICITED);
- poll_cq(ic, ic->i_rcq, ic->i_recv_wc, &ack_state, 1);
+ poll_cq(ic, ic->i_rcq, ic->i_recv_wc, &ack_state, 1, skip_state);
if (ack_state.ack_next_valid)
rds_ib_set_ack(ic, ack_state.ack_next, ack_state.ack_required);
queue_delayed_work(rds_wq,
&rds_ibdev->srq->s_refill_w, 0);
+ /* if skip_state is true, the following won't happen */
if (ic->i_rx_poll_cq >= RDS_IB_RX_LIMIT) {
ic->i_rx_w.ic = ic;
/* Delay 10 msecs until the RX worker starts reaping again */
}
}
+static void rds_ib_final_rx(struct rds_ib_connection *ic)
+{
+ if (ic->i_rcq)
+ rds_ib_rx(ic, 1);
+}
+
void rds_ib_tasklet_fn_recv(unsigned long data)
{
struct rds_ib_connection *ic = (struct rds_ib_connection *) data;
spin_lock_bh(&ic->i_rx_lock);
if (ic->i_rx_wait_for_handler)
goto out;
- rds_ib_rx(ic);
+ rds_ib_rx(ic, 0);
out:
spin_unlock_bh(&ic->i_rx_lock);
}
spin_lock_bh(&ic->i_rx_lock);
ic->i_rx_wait_for_handler = 0;
- rds_ib_rx(ic);
+ rds_ib_rx(ic, 0);
spin_unlock_bh(&ic->i_rx_lock);
}
if (ic->i_cm_id) {
struct ib_device *dev = ic->i_cm_id->device;
+ /* ic->i_cm_id being non-NULL is not enough to say that the
+ * connection was fully completed before this shutdown call.
+ * i_cm_id can be created when attemping to connect. There is
+ * the possibility that other resrouces like i_rcq and i_scq is
+ * not setup yet when the shutdown come (again).
+ * we use ic->i_destroying to tell if it was fully up.
+ */
+
+ if (atomic_read(&ic->i_destroying)) {
+ /* not fully up yet, only i_cm_id stands. */
+ goto destroy;
+ }
+
rdsdebug("disconnecting cm %p\n", ic->i_cm_id);
err = rdma_disconnect(ic->i_cm_id);
if (err) {
/* Try to reap pending RX completions every 5 secs */
if (!rds_ib_ring_empty(&ic->i_recv_ring)) {
spin_lock_bh(&ic->i_rx_lock);
- rds_ib_rx(ic);
+ rds_ib_rx(ic, 0);
spin_unlock_bh(&ic->i_rx_lock);
}
}
+ atomic_inc(&ic->i_destroying);
+ smp_mb__after_atomic_inc();
+
tasklet_kill(&ic->i_stasklet);
tasklet_kill(&ic->i_rtasklet);
+ flush_delayed_work(&ic->i_rx_w.work);
/* first destroy the ib state that generates callbacks */
if (ic->i_cm_id->qp)
rdma_destroy_qp(ic->i_cm_id);
+ /* Now there should be no threads (tasklet path or kworker path)
+ * really will access the CQs on seeing i_destroying positive.
+ * do the final reap on send/recv CQs. The final reaps take of
+ * flush errors of pending requests.
+ */
+ rds_ib_final_tx(ic);
+ rds_ib_final_rx(ic);
+
+destroy:
+
if (ic->i_rcq) {
if (ic->rds_ibdev)
ibdev_put_vector(ic->rds_ibdev, ic->i_rcq_vector);