wc_status[(status)] : "unexpected completion error")
static void
-rpcrdma_sendcq_process_wc(struct ib_wc *wc)
+rpcrdma_sendcq_process_wc(struct rpcrdma_ep *ep, struct ib_wc *wc)
{
/* WARNING: Only wr_id and status are reliable at this point */
+ if (wc->wr_id == RPCRDMA_LAST_COMPLETION)
+ goto out_last;
+
if (wc->wr_id == RPCRDMA_IGNORE_COMPLETION) {
if (wc->status != IB_WC_SUCCESS &&
wc->status != IB_WC_WR_FLUSH_ERR)
r = (struct rpcrdma_mw *)(unsigned long)wc->wr_id;
r->mw_sendcompletion(wc);
}
+ return;
+
+out_last:
+ dprintk("RPC: %s: completion sentinel for ep=%p\n",
+ __func__, ep);
+ if (atomic_dec_return(&ep->rep_count))
+ complete(&ep->rep_done);
}
/* The wc array is on stack: automatic memory is always CPU-local.
count = rc;
while (count-- > 0)
- rpcrdma_sendcq_process_wc(pos++);
+ rpcrdma_sendcq_process_wc(ep, pos++);
} while (rc == ARRAY_SIZE(wcs));
return;
}
static void
-rpcrdma_recvcq_process_wc(struct ib_wc *wc, struct list_head *sched_list)
+rpcrdma_recvcq_process_wc(struct rpcrdma_ep *ep, struct ib_wc *wc,
+ struct list_head *sched_list)
{
struct rpcrdma_rep *rep =
(struct rpcrdma_rep *)(unsigned long)wc->wr_id;
/* WARNING: Only wr_id and status are reliable at this point */
+ if (wc->wr_id == RPCRDMA_LAST_COMPLETION)
+ goto out_last;
if (wc->status != IB_WC_SUCCESS)
goto out_fail;
__func__, rep, COMPLETION_MSG(wc->status));
rep->rr_len = ~0U;
goto out_schedule;
+
+out_last:
+ dprintk("RPC: %s: completion sentinel for ep=%p\n",
+ __func__, ep);
+ if (atomic_dec_return(&ep->rep_count))
+ complete(&ep->rep_done);
}
/* The wc array is on stack: automatic memory is always CPU-local.
count = rc;
while (count-- > 0)
- rpcrdma_recvcq_process_wc(pos++, &sched_list);
+ rpcrdma_recvcq_process_wc(ep, pos++, &sched_list);
} while (rc == ARRAY_SIZE(wcs));
out_schedule:
LIST_HEAD(sched_list);
while (ib_poll_cq(ep->rep_attr.recv_cq, 1, &wc) > 0)
- rpcrdma_recvcq_process_wc(&wc, &sched_list);
+ rpcrdma_recvcq_process_wc(ep, &wc, &sched_list);
if (!list_empty(&sched_list))
rpcrdma_schedule_tasklet(&sched_list);
while (ib_poll_cq(ep->rep_attr.send_cq, 1, &wc) > 0)
- rpcrdma_sendcq_process_wc(&wc);
+ rpcrdma_sendcq_process_wc(ep, &wc);
}
#if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
return rc;
}
+/* Post sentinel Work Requests
+ *
+ * Caller has already invoked rdma_disconnect(), thus the QP
+ * has already transitioned to the error state. All pending WRs,
+ * including posted receive buffers, send WRs, and the sentinels,
+ * are flushed in order.
+ *
+ * Once the sentinel requests come through the upcall handlers,
+ * all posted WRs are guaranteed to have completed, and it is
+ * safe to tear down the QP.
+ */
+static void
+rpcrdma_destroy_qp(struct rpcrdma_ep *ep, struct rdma_cm_id *id)
+{
+ static struct ib_send_wr send_wr = {
+ .wr_id = RPCRDMA_LAST_COMPLETION
+ };
+ struct ib_send_wr *send_wr_fail;
+ static struct ib_recv_wr recv_wr = {
+ .wr_id = RPCRDMA_LAST_COMPLETION
+ };
+ struct ib_recv_wr *recv_wr_fail;
+ int rc;
+
+ atomic_set(&ep->rep_count, 2);
+ init_completion(&ep->rep_done);
+
+ rc = ib_post_send(id->qp, &send_wr, &send_wr_fail);
+ if (rc) {
+ pr_err("RPC: %s: ib_post_send returned %i\n",
+ __func__, rc);
+ goto out;
+ }
+
+ rc = ib_post_recv(id->qp, &recv_wr, &recv_wr_fail);
+ if (rc) {
+ pr_err("RPC: %s: ib_post_recv returned %i\n",
+ __func__, rc);
+ goto out;
+ }
+
+ wait_for_completion(&ep->rep_done);
+
+out:
+ dprintk("RPC: %s: destroying QP for ep=%p\n",
+ __func__, ep);
+ rdma_destroy_qp(id);
+ id->qp = NULL;
+}
+
/*
* Clean up/close an IA.
* o if event handles and PD have been initialized, free them.
* o close the IA
*/
void
-rpcrdma_ia_close(struct rpcrdma_ia *ia)
+rpcrdma_ia_close(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
{
dprintk("RPC: %s: entering\n", __func__);
if (ia->ri_id != NULL && !IS_ERR(ia->ri_id)) {
- if (ia->ri_id->qp)
- rdma_destroy_qp(ia->ri_id);
rpcrdma_destroy_id(ia->ri_id);
ia->ri_id = NULL;
}
{
struct ib_device_attr *devattr = &ia->ri_devattr;
struct ib_cq *sendcq, *recvcq;
+ unsigned int max_qp_wr;
int rc, err;
if (devattr->max_sge < RPCRDMA_MAX_IOVS) {
return -ENOMEM;
}
+ max_qp_wr = devattr->max_qp_wr - 1;
/* check provider's send/recv wr limits */
if (cdata->max_requests > devattr->max_qp_wr)
cdata->max_requests = devattr->max_qp_wr;
ep->rep_attr.event_handler = rpcrdma_qp_async_error_upcall;
ep->rep_attr.qp_context = ep;
ep->rep_attr.srq = NULL;
- ep->rep_attr.cap.max_send_wr = cdata->max_requests;
+ ep->rep_attr.cap.max_send_wr = cdata->max_requests + 1;
rc = ia->ri_ops->ro_open(ia, ep, cdata);
if (rc)
return rc;
- ep->rep_attr.cap.max_recv_wr = cdata->max_requests;
+ ep->rep_attr.cap.max_recv_wr = cdata->max_requests + 1;
ep->rep_attr.cap.max_send_sge = RPCRDMA_MAX_IOVS;
ep->rep_attr.cap.max_recv_sge = 1;
ep->rep_attr.cap.max_inline_data = 0;
if (ia->ri_id->qp) {
rpcrdma_ep_disconnect(ep, ia);
- rdma_destroy_qp(ia->ri_id);
+ rpcrdma_destroy_qp(ep, ia->ri_id);
ia->ri_id->qp = NULL;
}
ia->ri_id = id;
write_unlock(&ia->ri_qplock);
- rdma_destroy_qp(old);
+ rpcrdma_destroy_qp(ep, old);
rpcrdma_destroy_id(old);
} else {
dprintk("RPC: %s: connecting...\n", __func__);