]> www.infradead.org Git - users/jedix/linux-maple.git/commitdiff
RDS: use different cq handlers for send and recv
authorAndy Grover <andy.grover@oracle.com>
Fri, 3 Feb 2012 16:07:39 +0000 (11:07 -0500)
committerMukesh Kacker <mukesh.kacker@oracle.com>
Tue, 7 Jul 2015 23:41:24 +0000 (16:41 -0700)
Signed-off-by: Chris Mason <chris.mason@oracle.com>
Signed-off-by: Bang Nguyen <bang.nguyen@oracle.com>
net/rds/ib.h
net/rds/ib_cm.c

index 5cf8b6188e5cefe807b0e1d7c9af32254027ba05..e9b103d2abb784fba618211828d35c29a67737f7 100644 (file)
@@ -22,6 +22,7 @@
 #define RDS_IB_SUPPORTED_PROTOCOLS     0x00000003      /* minor versions supported */
 
 #define RDS_IB_RECYCLE_BATCH_COUNT     32
+#define RDS_WC_MAX 32
 
 extern struct list_head rds_ib_devices;
 
@@ -113,10 +114,14 @@ struct rds_ib_connection {
        struct rdma_cm_id       *i_cm_id;
        struct ib_pd            *i_pd;
        struct ib_mr            *i_mr;
-       struct ib_cq            *i_cq;
+       struct ib_cq            *i_scq;
+       struct ib_cq            *i_rcq;
+       struct ib_wc            i_send_wc[RDS_WC_MAX];
+       struct ib_wc            i_recv_wc[RDS_WC_MAX];
 
        /* interrupt handling */
-       struct tasklet_struct   i_tasklet;
+       struct tasklet_struct   i_stasklet;
+       struct tasklet_struct   i_rtasklet;
 
        /* tx */
        struct rds_ib_work_ring i_send_ring;
index 4e30ee595f30207ccaa8d7a9c38db742bdcc5db0..b91b6dacaf12ad47dd26c4601800041d4e9c61e4 100644 (file)
@@ -204,7 +204,7 @@ static void rds_ib_cq_event_handler(struct ib_event *event, void *data)
        rdsdebug("event %u data %p\n", event->event, data);
 }
 
-static void rds_ib_cq_comp_handler(struct ib_cq *cq, void *context)
+static void rds_ib_cq_comp_handler_send(struct ib_cq *cq, void *context)
 {
        struct rds_connection *conn = context;
        struct rds_ib_connection *ic = conn->c_transport_data;
@@ -213,32 +213,76 @@ static void rds_ib_cq_comp_handler(struct ib_cq *cq, void *context)
 
        rds_ib_stats_inc(s_ib_evt_handler_call);
 
-       tasklet_schedule(&ic->i_tasklet);
+       tasklet_schedule(&ic->i_stasklet);
 }
 
-void rds_ib_tasklet_fn(unsigned long data)
+static void rds_ib_cq_comp_handler_recv(struct ib_cq *cq, void *context)
+{
+       struct rds_connection *conn = context;
+       struct rds_ib_connection *ic = conn->c_transport_data;
+
+       rdsdebug("conn %p cq %p\n", conn, cq);
+
+       rds_ib_stats_inc(s_ib_evt_handler_call);
+
+       tasklet_schedule(&ic->i_rtasklet);
+}
+
+static void poll_cq(struct rds_ib_connection *ic, struct ib_cq *cq,
+                  struct ib_wc *wcs,
+                  struct rds_ib_ack_state *ack_state)
+{
+       int nr;
+       int i;
+       struct ib_wc *wc;
+
+       while ((nr = ib_poll_cq(cq, RDS_WC_MAX, wcs)) > 0) {
+               for (i = 0; i < nr; i++) {
+                       wc = wcs + i;
+                       rdsdebug("wc wr_id 0x%llx status %u byte_len %u imm_data %u\n",
+                                (unsigned long long)wc->wr_id, wc->status, wc->byte_len,
+                                be32_to_cpu(wc->ex.imm_data));
+
+                       if (wc->wr_id & RDS_IB_SEND_OP)
+                               rds_ib_send_cqe_handler(ic, wc);
+                       else
+                               rds_ib_recv_cqe_handler(ic, wc, ack_state);
+               }
+       }
+}
+
+void rds_ib_tasklet_fn_send(unsigned long data)
 {
        struct rds_ib_connection *ic = (struct rds_ib_connection *) data;
        struct rds_connection *conn = ic->conn;
-       struct rds_ib_ack_state ack_state = { 0, };
-       struct ib_wc wc;
+       struct rds_ib_ack_state ack_state;
 
+       memset(&ack_state, 0, sizeof(ack_state));
        rds_ib_stats_inc(s_ib_tasklet_call);
 
-       /*
-        * Poll in a loop before and after enabling the next event
-        */
-       while (ib_poll_cq(ic->i_cq, 1, &wc) > 0) {
-               rdsdebug("wc wr_id 0x%llx status %u byte_len %u imm_data %u\n",
-                        (unsigned long long)wc.wr_id, wc.status, wc.byte_len,
-                        be32_to_cpu(wc.ex.imm_data));
-
-               if (wc.wr_id & RDS_IB_SEND_OP)
-                       rds_ib_send_cqe_handler(ic, &wc);
-               else
-                       rds_ib_recv_cqe_handler(ic, &wc, &ack_state);
+       poll_cq(ic, ic->i_scq, ic->i_send_wc, &ack_state);
+       ib_req_notify_cq(ic->i_scq, IB_CQ_SOLICITED);
+       poll_cq(ic, ic->i_scq, ic->i_send_wc, &ack_state);
+
+       if (rds_conn_up(conn)) {
+               clear_bit(RDS_LL_SEND_FULL, &conn->c_flags);
+               rds_send_xmit(ic->conn);
        }
-       ib_req_notify_cq(ic->i_cq, IB_CQ_SOLICITED);
+}
+
+void rds_ib_tasklet_fn_recv(unsigned long data)
+{
+       struct rds_ib_connection *ic = (struct rds_ib_connection *) data;
+       struct rds_connection *conn = ic->conn;
+       struct rds_ib_ack_state ack_state;
+
+       rds_ib_stats_inc(s_ib_tasklet_call);
+
+       memset(&ack_state, 0, sizeof(ack_state));
+
+       poll_cq(ic, ic->i_rcq, ic->i_recv_wc, &ack_state);
+       ib_req_notify_cq(ic->i_rcq, IB_CQ_SOLICITED);
+       poll_cq(ic, ic->i_rcq, ic->i_recv_wc, &ack_state);
 
        if (ack_state.ack_next_valid)
                rds_ib_set_ack(ic, ack_state.ack_next, ack_state.ack_required);
@@ -246,11 +290,8 @@ void rds_ib_tasklet_fn(unsigned long data)
                rds_send_drop_acked(conn, ack_state.ack_recv, NULL);
                ic->i_ack_recv = ack_state.ack_recv;
        }
-       if (rds_conn_up(conn)) {
-               if (!test_bit(RDS_LL_SEND_FULL, &conn->c_flags))
-                       rds_send_xmit(ic->conn);
+       if (rds_conn_up(conn))
                rds_ib_attempt_ack(ic);
-       }
 }
 
 static void rds_ib_qp_event_handler(struct ib_event *event, void *data)
@@ -305,18 +346,35 @@ static int rds_ib_setup_qp(struct rds_connection *conn)
        ic->i_pd = rds_ibdev->pd;
        ic->i_mr = rds_ibdev->mr;
 
-       ic->i_cq = ib_create_cq(dev, rds_ib_cq_comp_handler,
+       ic->i_scq = ib_create_cq(dev, rds_ib_cq_comp_handler_send,
                                rds_ib_cq_event_handler, conn,
-                               ic->i_recv_ring.w_nr + ic->i_send_ring.w_nr + 1,
+                               ic->i_send_ring.w_nr + 1,
                                IB_CQ_VECTOR_LEAST_ATTACHED);
-       if (IS_ERR(ic->i_cq)) {
-               ret = PTR_ERR(ic->i_cq);
-               ic->i_cq = NULL;
+       if (IS_ERR(ic->i_scq)) {
+               ret = PTR_ERR(ic->i_scq);
+               ic->i_scq = NULL;
                rdsdebug("ib_create_cq send failed: %d\n", ret);
                goto out;
        }
 
-       ret = ib_req_notify_cq(ic->i_cq, IB_CQ_SOLICITED);
+       ic->i_rcq = ib_create_cq(dev, rds_ib_cq_comp_handler_recv,
+                               rds_ib_cq_event_handler, conn,
+                               ic->i_recv_ring.w_nr,
+                               IB_CQ_VECTOR_LEAST_ATTACHED);
+       if (IS_ERR(ic->i_rcq)) {
+               ret = PTR_ERR(ic->i_rcq);
+               ic->i_rcq = NULL;
+               rdsdebug("ib_create_cq recv failed: %d\n", ret);
+               goto out;
+       }
+
+       ret = ib_req_notify_cq(ic->i_scq, IB_CQ_SOLICITED);
+       if (ret) {
+               rdsdebug("ib_req_notify_cq send failed: %d\n", ret);
+               goto out;
+       }
+
+       ret = ib_req_notify_cq(ic->i_rcq, IB_CQ_SOLICITED);
        if (ret) {
                rdsdebug("ib_req_notify_cq recv failed: %d\n", ret);
                goto out;
@@ -333,8 +391,8 @@ static int rds_ib_setup_qp(struct rds_connection *conn)
        attr.cap.max_recv_sge = RDS_IB_RECV_SGE;
        attr.sq_sig_type = IB_SIGNAL_REQ_WR;
        attr.qp_type = IB_QPT_RC;
-       attr.send_cq = ic->i_cq;
-       attr.recv_cq = ic->i_cq;
+       attr.send_cq = ic->i_scq;
+       attr.recv_cq = ic->i_rcq;
 
        /*
         * XXX this can fail if max_*_wr is too large?  Are we supposed
@@ -394,7 +452,7 @@ static int rds_ib_setup_qp(struct rds_connection *conn)
 
        rds_ib_recv_init_ack(ic);
 
-       rdsdebug("conn %p pd %p mr %p cq %p\n", conn, ic->i_pd, ic->i_mr, ic->i_cq);
+       rdsdebug("conn %p pd %p mr %p cq %p\n", conn, ic->i_pd, ic->i_mr, ic->i_rcq);
 
 out:
        rds_ib_dev_put(rds_ibdev);
@@ -629,7 +687,7 @@ void rds_ib_conn_shutdown(struct rds_connection *conn)
        int err = 0;
 
        rdsdebug("cm %p pd %p cq %p qp %p\n", ic->i_cm_id,
-                ic->i_pd, ic->i_cq, ic->i_cm_id ? ic->i_cm_id->qp : NULL);
+                ic->i_pd, ic->i_rcq, ic->i_cm_id ? ic->i_cm_id->qp : NULL);
 
        if (ic->i_cm_id) {
                struct ib_device *dev = ic->i_cm_id->device;
@@ -648,7 +706,8 @@ void rds_ib_conn_shutdown(struct rds_connection *conn)
                wait_event(rds_ib_ring_empty_wait,
                           rds_ib_ring_empty(&ic->i_recv_ring) &&
                           (atomic_read(&ic->i_signaled_sends) == 0));
-               tasklet_kill(&ic->i_tasklet);
+               tasklet_kill(&ic->i_stasklet);
+               tasklet_kill(&ic->i_rtasklet);
 
                if (ic->i_send_hdrs)
                        ib_dma_free_coherent(dev,
@@ -675,8 +734,10 @@ void rds_ib_conn_shutdown(struct rds_connection *conn)
 
                if (ic->i_cm_id->qp)
                        rdma_destroy_qp(ic->i_cm_id);
-               if (ic->i_cq)
-                       ib_destroy_cq(ic->i_cq);
+               if (ic->i_rcq)
+                       ib_destroy_cq(ic->i_rcq);
+               if (ic->i_scq)
+                       ib_destroy_cq(ic->i_scq);
                rdma_destroy_id(ic->i_cm_id);
 
                /*
@@ -688,7 +749,8 @@ void rds_ib_conn_shutdown(struct rds_connection *conn)
                ic->i_cm_id = NULL;
                ic->i_pd = NULL;
                ic->i_mr = NULL;
-               ic->i_cq = NULL;
+               ic->i_scq = NULL;
+               ic->i_rcq = NULL;
                ic->i_send_hdrs = NULL;
                ic->i_recv_hdrs = NULL;
                ic->i_ack = NULL;
@@ -749,7 +811,8 @@ int rds_ib_conn_alloc(struct rds_connection *conn, gfp_t gfp)
        }
 
        INIT_LIST_HEAD(&ic->ib_node);
-       tasklet_init(&ic->i_tasklet, rds_ib_tasklet_fn, (unsigned long) ic);
+       tasklet_init(&ic->i_stasklet, rds_ib_tasklet_fn_send, (unsigned long) ic);
+       tasklet_init(&ic->i_rtasklet, rds_ib_tasklet_fn_recv, (unsigned long) ic);
        mutex_init(&ic->i_recv_mutex);
 #ifndef KERNEL_HAS_ATOMIC64
        spin_lock_init(&ic->i_ack_lock);