From fccb4f6ed62d3378f08a168d19a08cbd1b0a83fa Mon Sep 17 00:00:00 2001 From: Andy Grover Date: Fri, 3 Feb 2012 11:07:39 -0500 Subject: [PATCH] RDS: use different cq handlers for send and recv Signed-off-by: Chris Mason Signed-off-by: Bang Nguyen --- net/rds/ib.h | 9 +++- net/rds/ib_cm.c | 137 +++++++++++++++++++++++++++++++++++------------- 2 files changed, 107 insertions(+), 39 deletions(-) diff --git a/net/rds/ib.h b/net/rds/ib.h index 5cf8b6188e5c..e9b103d2abb7 100644 --- a/net/rds/ib.h +++ b/net/rds/ib.h @@ -22,6 +22,7 @@ #define RDS_IB_SUPPORTED_PROTOCOLS 0x00000003 /* minor versions supported */ #define RDS_IB_RECYCLE_BATCH_COUNT 32 +#define RDS_WC_MAX 32 extern struct list_head rds_ib_devices; @@ -113,10 +114,14 @@ struct rds_ib_connection { struct rdma_cm_id *i_cm_id; struct ib_pd *i_pd; struct ib_mr *i_mr; - struct ib_cq *i_cq; + struct ib_cq *i_scq; + struct ib_cq *i_rcq; + struct ib_wc i_send_wc[RDS_WC_MAX]; + struct ib_wc i_recv_wc[RDS_WC_MAX]; /* interrupt handling */ - struct tasklet_struct i_tasklet; + struct tasklet_struct i_stasklet; + struct tasklet_struct i_rtasklet; /* tx */ struct rds_ib_work_ring i_send_ring; diff --git a/net/rds/ib_cm.c b/net/rds/ib_cm.c index 4e30ee595f30..b91b6dacaf12 100644 --- a/net/rds/ib_cm.c +++ b/net/rds/ib_cm.c @@ -204,7 +204,7 @@ static void rds_ib_cq_event_handler(struct ib_event *event, void *data) rdsdebug("event %u data %p\n", event->event, data); } -static void rds_ib_cq_comp_handler(struct ib_cq *cq, void *context) +static void rds_ib_cq_comp_handler_send(struct ib_cq *cq, void *context) { struct rds_connection *conn = context; struct rds_ib_connection *ic = conn->c_transport_data; @@ -213,32 +213,76 @@ static void rds_ib_cq_comp_handler(struct ib_cq *cq, void *context) rds_ib_stats_inc(s_ib_evt_handler_call); - tasklet_schedule(&ic->i_tasklet); + tasklet_schedule(&ic->i_stasklet); } -void rds_ib_tasklet_fn(unsigned long data) +static void rds_ib_cq_comp_handler_recv(struct ib_cq *cq, void *context) +{ + struct rds_connection *conn = context; + struct rds_ib_connection *ic = conn->c_transport_data; + + rdsdebug("conn %p cq %p\n", conn, cq); + + rds_ib_stats_inc(s_ib_evt_handler_call); + + tasklet_schedule(&ic->i_rtasklet); +} + +static void poll_cq(struct rds_ib_connection *ic, struct ib_cq *cq, + struct ib_wc *wcs, + struct rds_ib_ack_state *ack_state) +{ + int nr; + int i; + struct ib_wc *wc; + + while ((nr = ib_poll_cq(cq, RDS_WC_MAX, wcs)) > 0) { + for (i = 0; i < nr; i++) { + wc = wcs + i; + rdsdebug("wc wr_id 0x%llx status %u byte_len %u imm_data %u\n", + (unsigned long long)wc->wr_id, wc->status, wc->byte_len, + be32_to_cpu(wc->ex.imm_data)); + + if (wc->wr_id & RDS_IB_SEND_OP) + rds_ib_send_cqe_handler(ic, wc); + else + rds_ib_recv_cqe_handler(ic, wc, ack_state); + } + } +} + +void rds_ib_tasklet_fn_send(unsigned long data) { struct rds_ib_connection *ic = (struct rds_ib_connection *) data; struct rds_connection *conn = ic->conn; - struct rds_ib_ack_state ack_state = { 0, }; - struct ib_wc wc; + struct rds_ib_ack_state ack_state; + memset(&ack_state, 0, sizeof(ack_state)); rds_ib_stats_inc(s_ib_tasklet_call); - /* - * Poll in a loop before and after enabling the next event - */ - while (ib_poll_cq(ic->i_cq, 1, &wc) > 0) { - rdsdebug("wc wr_id 0x%llx status %u byte_len %u imm_data %u\n", - (unsigned long long)wc.wr_id, wc.status, wc.byte_len, - be32_to_cpu(wc.ex.imm_data)); - - if (wc.wr_id & RDS_IB_SEND_OP) - rds_ib_send_cqe_handler(ic, &wc); - else - rds_ib_recv_cqe_handler(ic, &wc, &ack_state); + poll_cq(ic, ic->i_scq, ic->i_send_wc, &ack_state); + ib_req_notify_cq(ic->i_scq, IB_CQ_SOLICITED); + poll_cq(ic, ic->i_scq, ic->i_send_wc, &ack_state); + + if (rds_conn_up(conn)) { + clear_bit(RDS_LL_SEND_FULL, &conn->c_flags); + rds_send_xmit(ic->conn); } - ib_req_notify_cq(ic->i_cq, IB_CQ_SOLICITED); +} + +void rds_ib_tasklet_fn_recv(unsigned long data) +{ + struct rds_ib_connection *ic = (struct rds_ib_connection *) data; + struct rds_connection *conn = ic->conn; + struct rds_ib_ack_state ack_state; + + rds_ib_stats_inc(s_ib_tasklet_call); + + memset(&ack_state, 0, sizeof(ack_state)); + + poll_cq(ic, ic->i_rcq, ic->i_recv_wc, &ack_state); + ib_req_notify_cq(ic->i_rcq, IB_CQ_SOLICITED); + poll_cq(ic, ic->i_rcq, ic->i_recv_wc, &ack_state); if (ack_state.ack_next_valid) rds_ib_set_ack(ic, ack_state.ack_next, ack_state.ack_required); @@ -246,11 +290,8 @@ void rds_ib_tasklet_fn(unsigned long data) rds_send_drop_acked(conn, ack_state.ack_recv, NULL); ic->i_ack_recv = ack_state.ack_recv; } - if (rds_conn_up(conn)) { - if (!test_bit(RDS_LL_SEND_FULL, &conn->c_flags)) - rds_send_xmit(ic->conn); + if (rds_conn_up(conn)) rds_ib_attempt_ack(ic); - } } static void rds_ib_qp_event_handler(struct ib_event *event, void *data) @@ -305,18 +346,35 @@ static int rds_ib_setup_qp(struct rds_connection *conn) ic->i_pd = rds_ibdev->pd; ic->i_mr = rds_ibdev->mr; - ic->i_cq = ib_create_cq(dev, rds_ib_cq_comp_handler, + ic->i_scq = ib_create_cq(dev, rds_ib_cq_comp_handler_send, rds_ib_cq_event_handler, conn, - ic->i_recv_ring.w_nr + ic->i_send_ring.w_nr + 1, + ic->i_send_ring.w_nr + 1, IB_CQ_VECTOR_LEAST_ATTACHED); - if (IS_ERR(ic->i_cq)) { - ret = PTR_ERR(ic->i_cq); - ic->i_cq = NULL; + if (IS_ERR(ic->i_scq)) { + ret = PTR_ERR(ic->i_scq); + ic->i_scq = NULL; rdsdebug("ib_create_cq send failed: %d\n", ret); goto out; } - ret = ib_req_notify_cq(ic->i_cq, IB_CQ_SOLICITED); + ic->i_rcq = ib_create_cq(dev, rds_ib_cq_comp_handler_recv, + rds_ib_cq_event_handler, conn, + ic->i_recv_ring.w_nr, + IB_CQ_VECTOR_LEAST_ATTACHED); + if (IS_ERR(ic->i_rcq)) { + ret = PTR_ERR(ic->i_rcq); + ic->i_rcq = NULL; + rdsdebug("ib_create_cq recv failed: %d\n", ret); + goto out; + } + + ret = ib_req_notify_cq(ic->i_scq, IB_CQ_SOLICITED); + if (ret) { + rdsdebug("ib_req_notify_cq send failed: %d\n", ret); + goto out; + } + + ret = ib_req_notify_cq(ic->i_rcq, IB_CQ_SOLICITED); if (ret) { rdsdebug("ib_req_notify_cq recv failed: %d\n", ret); goto out; @@ -333,8 +391,8 @@ static int rds_ib_setup_qp(struct rds_connection *conn) attr.cap.max_recv_sge = RDS_IB_RECV_SGE; attr.sq_sig_type = IB_SIGNAL_REQ_WR; attr.qp_type = IB_QPT_RC; - attr.send_cq = ic->i_cq; - attr.recv_cq = ic->i_cq; + attr.send_cq = ic->i_scq; + attr.recv_cq = ic->i_rcq; /* * XXX this can fail if max_*_wr is too large? Are we supposed @@ -394,7 +452,7 @@ static int rds_ib_setup_qp(struct rds_connection *conn) rds_ib_recv_init_ack(ic); - rdsdebug("conn %p pd %p mr %p cq %p\n", conn, ic->i_pd, ic->i_mr, ic->i_cq); + rdsdebug("conn %p pd %p mr %p cq %p\n", conn, ic->i_pd, ic->i_mr, ic->i_rcq); out: rds_ib_dev_put(rds_ibdev); @@ -629,7 +687,7 @@ void rds_ib_conn_shutdown(struct rds_connection *conn) int err = 0; rdsdebug("cm %p pd %p cq %p qp %p\n", ic->i_cm_id, - ic->i_pd, ic->i_cq, ic->i_cm_id ? ic->i_cm_id->qp : NULL); + ic->i_pd, ic->i_rcq, ic->i_cm_id ? ic->i_cm_id->qp : NULL); if (ic->i_cm_id) { struct ib_device *dev = ic->i_cm_id->device; @@ -648,7 +706,8 @@ void rds_ib_conn_shutdown(struct rds_connection *conn) wait_event(rds_ib_ring_empty_wait, rds_ib_ring_empty(&ic->i_recv_ring) && (atomic_read(&ic->i_signaled_sends) == 0)); - tasklet_kill(&ic->i_tasklet); + tasklet_kill(&ic->i_stasklet); + tasklet_kill(&ic->i_rtasklet); if (ic->i_send_hdrs) ib_dma_free_coherent(dev, @@ -675,8 +734,10 @@ void rds_ib_conn_shutdown(struct rds_connection *conn) if (ic->i_cm_id->qp) rdma_destroy_qp(ic->i_cm_id); - if (ic->i_cq) - ib_destroy_cq(ic->i_cq); + if (ic->i_rcq) + ib_destroy_cq(ic->i_rcq); + if (ic->i_scq) + ib_destroy_cq(ic->i_scq); rdma_destroy_id(ic->i_cm_id); /* @@ -688,7 +749,8 @@ void rds_ib_conn_shutdown(struct rds_connection *conn) ic->i_cm_id = NULL; ic->i_pd = NULL; ic->i_mr = NULL; - ic->i_cq = NULL; + ic->i_scq = NULL; + ic->i_rcq = NULL; ic->i_send_hdrs = NULL; ic->i_recv_hdrs = NULL; ic->i_ack = NULL; @@ -749,7 +811,8 @@ int rds_ib_conn_alloc(struct rds_connection *conn, gfp_t gfp) } INIT_LIST_HEAD(&ic->ib_node); - tasklet_init(&ic->i_tasklet, rds_ib_tasklet_fn, (unsigned long) ic); + tasklet_init(&ic->i_stasklet, rds_ib_tasklet_fn_send, (unsigned long) ic); + tasklet_init(&ic->i_rtasklet, rds_ib_tasklet_fn_recv, (unsigned long) ic); mutex_init(&ic->i_recv_mutex); #ifndef KERNEL_HAS_ATOMIC64 spin_lock_init(&ic->i_ack_lock); -- 2.50.1