]> www.infradead.org Git - users/jedix/linux-maple.git/commitdiff
RDS: Fix slowdown when doing massively parallel workload
authorBang Nguyen <bang.nguyen@oracle.com>
Sat, 1 Mar 2014 04:56:21 +0000 (20:56 -0800)
committerMukesh Kacker <mukesh.kacker@oracle.com>
Wed, 8 Jul 2015 20:59:49 +0000 (13:59 -0700)
In shutdown, reap the Completion Queue Entry (CQE)
periodically while waiting for RX ring to quiesce

Reject new send if rds-info is pending to avoid
rcu stall

Break RX stream into work units of 10k messages each
and schedule them sequentially to avoid rcu stall

Orabug: 18362838

Signed-off-by: Bang Nguyen <bang.nguyen@oracle.com>
Signed-off-by: Mukesh Kacker <mukesh.kacker@oracle.com>
Acked-by: Rama Nichanamatlu <rama.nichanamatlu@oracle.com>
Tested-by: Arvind Shukla <arvind.shukla@oracle.com>
(cherry picked from commit dac771f1e55713b8a42bdffa059e1894e1ecdf17)

net/rds/connection.c
net/rds/ib.h
net/rds/ib_cm.c
net/rds/rds.h
net/rds/send.c

index 3c3283c3dd90335828fcc5a4c1a18b88aa2992e5..e5337aef1c8d449812c2c494e762b85584ffe1b3 100644 (file)
@@ -437,6 +437,7 @@ static void rds_conn_message_info(struct socket *sock, unsigned int len,
                                list = &conn->c_retrans;
 
                        spin_lock_irqsave(&conn->c_lock, flags);
+                       conn->c_rdsinfo_pending = 1;
 
                        /* XXX too lazy to maintain counts.. */
                        list_for_each_entry(rm, list, m_conn_item) {
@@ -447,6 +448,7 @@ static void rds_conn_message_info(struct socket *sock, unsigned int len,
                                                          conn->c_faddr, 0);
                        }
 
+                       conn->c_rdsinfo_pending = 0;
                        spin_unlock_irqrestore(&conn->c_lock, flags);
                }
        }
index 823ec806b48f2757d7860462dbdc29e31b175a5c..419b51e5ce669e9831953fa1ff20d26083aa421f 100644 (file)
@@ -28,6 +28,8 @@
 
 #define RDS_IB_DEFAULT_NUM_ARPS                100
 
+#define RDS_IB_RX_LIMIT                        10000
+
 #define RDS_IB_DEFAULT_TIMEOUT          16 /* 4.096 * 2 ^ 16 = 260 msec */
 
 #define RDS_IB_SUPPORTED_PROTOCOLS     0x00000003      /* minor versions supported */
@@ -142,6 +144,11 @@ struct rds_ib_migrate_work {
        struct rds_ib_connection        *ic;
 };
 
+struct rds_ib_rx_work {
+       struct delayed_work             dlywork;
+       struct rds_ib_connection        *ic;
+};
+
 struct rds_ib_connection {
 
        struct list_head        ib_node;
@@ -227,6 +234,11 @@ struct rds_ib_connection {
 
        int                     i_scq_vector;
        int                     i_rcq_vector;
+
+       spinlock_t              i_rx_lock;
+       struct rds_ib_rx_work   i_rx_w;
+       unsigned int            i_rx_wait_for_handler;
+       unsigned int            i_rx_poll_cq;
 };
 
 /* This assumes that atomic_t is at least 32 bits */
index 2c97d639acb0ead95d6e24bf515c9bff324d1ae9..f3892f459f543c35b8d90c45f774d1ae476ad3ca 100644 (file)
@@ -323,8 +323,9 @@ static void rds_ib_cq_comp_handler_recv(struct ib_cq *cq, void *context)
 }
 
 static void poll_cq(struct rds_ib_connection *ic, struct ib_cq *cq,
-                  struct ib_wc *wcs,
-                  struct rds_ib_ack_state *ack_state)
+                   struct ib_wc *wcs,
+                   struct rds_ib_ack_state *ack_state,
+                   unsigned int rx)
 {
        int nr;
        int i;
@@ -332,6 +333,18 @@ static void poll_cq(struct rds_ib_connection *ic, struct ib_cq *cq,
 
        while ((nr = ib_poll_cq(cq, RDS_WC_MAX, wcs)) > 0) {
                for (i = 0; i < nr; i++) {
+                       if (rx) {
+                               if ((++ic->i_rx_poll_cq % RDS_IB_RX_LIMIT)
+                                   == 0) {
+                                       rdsdebug("connection "
+                                                "<%u.%u.%u.%u,%u.%u.%u.%u,%d> "
+                                                "RX poll_cq processed %d\n",
+                                                NIPQUAD(ic->conn->c_laddr),
+                                                NIPQUAD(ic->conn->c_faddr),
+                                                ic->conn->c_tos,
+                                                ic->i_rx_poll_cq);
+                               }
+                       }
                        wc = wcs + i;
                        rdsdebug("wc wr_id 0x%llx status %u byte_len %u imm_data %u\n",
                                 (unsigned long long)wc->wr_id, wc->status, wc->byte_len,
@@ -342,6 +355,10 @@ static void poll_cq(struct rds_ib_connection *ic, struct ib_cq *cq,
                        else
                                rds_ib_recv_cqe_handler(ic, wc, ack_state);
                }
+
+               if (rx && ic->i_rx_poll_cq >= RDS_IB_RX_LIMIT)
+                       break;
+
        }
 }
 
@@ -354,9 +371,9 @@ void rds_ib_tasklet_fn_send(unsigned long data)
        memset(&ack_state, 0, sizeof(ack_state));
        rds_ib_stats_inc(s_ib_tasklet_call);
 
-       poll_cq(ic, ic->i_scq, ic->i_send_wc, &ack_state);
+       poll_cq(ic, ic->i_scq, ic->i_send_wc, &ack_state, 0);
        ib_req_notify_cq(ic->i_scq, IB_CQ_NEXT_COMP);
-       poll_cq(ic, ic->i_scq, ic->i_send_wc, &ack_state);
+       poll_cq(ic, ic->i_scq, ic->i_send_wc, &ack_state, 0);
 
        if (rds_conn_up(conn) &&
           (!test_bit(RDS_LL_SEND_FULL, &conn->c_flags) ||
@@ -364,9 +381,14 @@ void rds_ib_tasklet_fn_send(unsigned long data)
                rds_send_xmit(ic->conn);
 }
 
-void rds_ib_tasklet_fn_recv(unsigned long data)
+/*
+ * Note: rds_ib_rx(): don't call with irqs disabled.
+ * It calls rds_send_drop_acked() which calls other
+ * routines that reach into rds_rdma_free_op()
+ * where irqs_disabled() warning is asserted!
+ */
+static void rds_ib_rx(struct rds_ib_connection *ic)
 {
-       struct rds_ib_connection *ic = (struct rds_ib_connection *) data;
        struct rds_connection *conn = ic->conn;
        struct rds_ib_ack_state ack_state;
        struct rds_ib_device *rds_ibdev = ic->rds_ibdev;
@@ -377,9 +399,10 @@ void rds_ib_tasklet_fn_recv(unsigned long data)
 
        memset(&ack_state, 0, sizeof(ack_state));
 
-       poll_cq(ic, ic->i_rcq, ic->i_recv_wc, &ack_state);
+       ic->i_rx_poll_cq = 0;
+       poll_cq(ic, ic->i_rcq, ic->i_recv_wc, &ack_state, 1);
        ib_req_notify_cq(ic->i_rcq, IB_CQ_SOLICITED);
-       poll_cq(ic, ic->i_rcq, ic->i_recv_wc, &ack_state);
+       poll_cq(ic, ic->i_rcq, ic->i_recv_wc, &ack_state, 1);
 
        if (ack_state.ack_next_valid)
                rds_ib_set_ack(ic, ack_state.ack_next, ack_state.ack_required);
@@ -394,7 +417,41 @@ void rds_ib_tasklet_fn_recv(unsigned long data)
                if ((atomic_read(&rds_ibdev->srq->s_num_posted) <
                                        rds_ib_srq_hwm_refill) &&
                        !test_and_set_bit(0, &rds_ibdev->srq->s_refill_gate))
-                               queue_delayed_work(rds_wq, &rds_ibdev->srq->s_refill_w, 0);
+                               queue_delayed_work(rds_wq,
+                                       &rds_ibdev->srq->s_refill_w, 0);
+
+       if (ic->i_rx_poll_cq >= RDS_IB_RX_LIMIT) {
+               ic->i_rx_w.ic = ic;
+               queue_delayed_work(rds_aux_wq, &ic->i_rx_w.dlywork,
+                                       msecs_to_jiffies(10));
+               ic->i_rx_wait_for_handler = 1;
+       }
+}
+
+void rds_ib_tasklet_fn_recv(unsigned long data)
+{
+       struct rds_ib_connection *ic = (struct rds_ib_connection *) data;
+
+       spin_lock(&ic->i_rx_lock);
+       if (ic->i_rx_wait_for_handler)
+               goto out;
+       rds_ib_rx(ic);
+out:
+       spin_unlock(&ic->i_rx_lock);
+}
+
+static void rds_ib_rx_handler(struct work_struct *workarg)
+{
+       struct delayed_work *delayedwork =
+               container_of(workarg, struct delayed_work, work);
+       struct rds_ib_rx_work *rirwork =
+               container_of(delayedwork, struct rds_ib_rx_work, dlywork);
+       struct rds_ib_connection *ic = rirwork->ic;
+
+       spin_lock(&ic->i_rx_lock);
+       ic->i_rx_wait_for_handler = 0;
+       rds_ib_rx(ic);
+       spin_unlock(&ic->i_rx_lock);
 }
 
 static void rds_ib_qp_event_handler(struct ib_event *event, void *data)
@@ -1069,9 +1126,18 @@ void rds_ib_conn_shutdown(struct rds_connection *conn)
                }
 
                /* quiesce tx and rx completion before tearing down */
-               wait_event(rds_ib_ring_empty_wait,
-                          rds_ib_ring_empty(&ic->i_recv_ring) &&
-                          (atomic_read(&ic->i_signaled_sends) == 0));
+               while (!wait_event_timeout(rds_ib_ring_empty_wait,
+                               rds_ib_ring_empty(&ic->i_recv_ring) &&
+                               (atomic_read(&ic->i_signaled_sends) == 0),
+                               msecs_to_jiffies(5000))) {
+
+                       if (!rds_ib_ring_empty(&ic->i_recv_ring)) {
+                               spin_lock(&ic->i_rx_lock);
+                               rds_ib_rx(ic);
+                               spin_unlock(&ic->i_rx_lock);
+                       }
+               }
+
                tasklet_kill(&ic->i_stasklet);
                tasklet_kill(&ic->i_rtasklet);
 
@@ -1207,6 +1273,7 @@ int rds_ib_conn_alloc(struct rds_connection *conn, gfp_t gfp)
        spin_lock_init(&ic->i_ack_lock);
 #endif
        atomic_set(&ic->i_signaled_sends, 0);
+       spin_lock_init(&ic->i_rx_lock);
 
        /*
         * rds_ib_conn_shutdown() waits for these to be emptied so they
@@ -1221,6 +1288,7 @@ int rds_ib_conn_alloc(struct rds_connection *conn, gfp_t gfp)
        init_completion(&ic->i_last_wqe_complete);
 
        INIT_DELAYED_WORK(&ic->i_migrate_w.work, rds_ib_migrate);
+       INIT_DELAYED_WORK(&ic->i_rx_w.dlywork, rds_ib_rx_handler);
 
        spin_lock_irqsave(&ib_nodev_conns_lock, flags);
        list_add_tail(&ic->ib_node, &ib_nodev_conns);
index 4c764a75a742aabb6e32561f04f2842ba137ac52..55ca1f7a1fb96b25969d854906ec3dfd553f78e3 100644 (file)
@@ -164,6 +164,8 @@ struct rds_connection {
        struct rds_connection   *c_base_conn;
 
        unsigned int            c_route_to_base;
+
+       unsigned int            c_rdsinfo_pending;
 };
 
 #define RDS_FLAG_CONG_BITMAP   0x01
index 399957e369f4dce99cdca1835a6ba3e007a998cc..4376e0a459b67f4b79f0ec77e8b597f0290fbaec 100644 (file)
@@ -1308,6 +1308,11 @@ int rds_sendmsg(struct socket *sock, struct msghdr *msg, size_t payload_len)
                goto out;
        }
 
+       if (conn->c_rdsinfo_pending) {
+               ret = -EAGAIN;
+               goto out;
+       }
+
        while (!rds_send_queue_rm(rs, conn, rm, rs->rs_bound_port,
                                  dport, &queued)) {
                rds_stats_inc(s_send_queue_full);