]> www.infradead.org Git - users/jedix/linux-maple.git/commitdiff
[PATCH 2/2] Avoid redundant call to rds_bind_lookup() in recv path.
authorSowmini Varadhan <sowmini.varadhan@oracle.com>
Tue, 12 Apr 2016 01:49:12 +0000 (18:49 -0700)
committerChuck Anderson <chuck.anderson@oracle.com>
Thu, 14 Apr 2016 01:43:49 +0000 (18:43 -0700)
Orabug 20930687

When RAC tries to scale RDS-TCP, they are hitting bottlenecks
due to inefficiencies in rds_bind_lookup. Each call to
rds_bind_lookup results in an irqsave/irqrestore sequence, and
when the list of RDS sockets is large, we end up having IRQs
suppressed for long intervals. This trigger flow-control assertions
and causes TX queue watchdog hangs in the sender. The current
implementation makes this even worse, by superfluously calling
rds_bind_lookup(). This patch set takes the first step to solving
this problem by avoiding one of the redundant calls to rds_bind_lookup.

Signed-off-by: Sowmini Varadhan <sowmini.varadhan@oracle.com>
Acked-by: Santosh Shilimkar <santosh.shilimkar@oracle.com>
net/rds/recv.c

index c00d28759d385abda3cbd43cea2c83df5f433eb2..ec25e24266e3399f8322021067ebd4d5bf9d4560 100644 (file)
@@ -56,7 +56,7 @@ rds_recv_forward(struct rds_connection *conn, struct rds_incoming *inc,
 
 static void
 rds_recv_local(struct rds_connection *conn, __be32 saddr, __be32 daddr,
-              struct rds_incoming *inc, gfp_t gfp);
+              struct rds_incoming *inc, gfp_t gfp, struct rds_sock *rs);
 
 static int
 rds_recv_ok(struct sock *sk, struct sk_buff *skb)
@@ -231,11 +231,12 @@ void rds_recv_incoming(struct rds_connection *conn, __be32 saddr, __be32 daddr,
        /* pass it on locally if there is no socket bound, or if netfilter is
         * disabled for this socket */
        if (NULL == rs || !rs->rs_netfilter_enabled) {
+               rds_recv_local(conn, saddr, daddr, inc, gfp, rs);
+
                /* drop the reference if we had taken one */
                if (NULL != rs)
                        rds_sock_put(rs);
 
-               rds_recv_local(conn, saddr, daddr, inc, gfp);
                return;
        }
 
@@ -249,7 +250,10 @@ void rds_recv_incoming(struct rds_connection *conn, __be32 saddr, __be32 daddr,
                rds_rtd(RDS_RTD_ERR,
                        "failure to allocate space for inc %p, %u.%u.%u.%u -> %u.%d.%u.%u tos %d\n",
                        inc, NIPQUAD(saddr), NIPQUAD(daddr), conn->c_tos);
-               rds_recv_local(conn, saddr, daddr, inc, gfp);
+               rds_recv_local(conn, saddr, daddr, inc, gfp, rs);
+               /* drop the reference if we had taken one */
+               if (NULL != rs)
+                       rds_sock_put(rs);
                return;
        }
 
@@ -279,6 +283,7 @@ void rds_recv_incoming(struct rds_connection *conn, __be32 saddr, __be32 daddr,
        /* cleanup any references taken */
        if (NULL != rs)
                rds_sock_put(rs);
+       rs = NULL;
 
        /* the original info is just a copy */
        memcpy(org, dst, sizeof(struct rds_nf_hdr));
@@ -310,7 +315,7 @@ void rds_recv_incoming(struct rds_connection *conn, __be32 saddr, __be32 daddr,
                /* check the original header and if changed do the needful */
                if (dst->saddr == org->saddr && dst->daddr == org->daddr &&
                    conn->c_trans->skb_local(skb)) {
-                       rds_recv_local(conn, saddr, daddr, inc, gfp);
+                       rds_recv_local(conn, saddr, daddr, inc, gfp, NULL);
                }
                /* the send both case does both a local recv and a reroute */
                else if (dst->flags & RDS_NF_HDR_FLAG_BOTH) {
@@ -319,7 +324,7 @@ void rds_recv_incoming(struct rds_connection *conn, __be32 saddr, __be32 daddr,
                        rds_inc_addref(inc);
 
                        /* send it up the stream locally */
-                       rds_recv_local(conn, saddr, daddr, inc, gfp);
+                       rds_recv_local(conn, saddr, daddr, inc, gfp, NULL);
 
                        /* and also reroute the request */
                        rds_recv_route(conn, inc, gfp);
@@ -384,7 +389,7 @@ rds_recv_route(struct rds_connection *conn, struct rds_incoming *inc,
        /* this is a request for our local node, but potentially a different source
         * either way we process it locally */
        else if (conn->c_trans->skb_local(inc->i_skb)) {
-               rds_recv_local(nconn, dst->saddr, dst->daddr, inc, gfp);
+               rds_recv_local(nconn, dst->saddr, dst->daddr, inc, gfp, NULL);
        }
        /* looks like this request is going out to another node */
        else {
@@ -449,17 +454,17 @@ out:
        NF_HOOK(PF_RDS_HOOK, NF_RDS_FORWARD_ERROR, sk, inc->i_skb, NULL, NULL, rds_recv_ok);
 
        /* then hand the request off to normal local processing on the old connection */
-       rds_recv_local(inc->i_oconn, org->saddr, org->daddr, inc, gfp);
+       rds_recv_local(inc->i_oconn, org->saddr, org->daddr, inc, gfp, NULL);
 }
 
 static void
 rds_recv_local(struct rds_connection *conn, __be32 saddr, __be32 daddr,
-              struct rds_incoming *inc, gfp_t gfp)
+              struct rds_incoming *inc, gfp_t gfp, struct rds_sock *rs)
 {
-       struct rds_sock *rs = NULL;
        struct sock *sk;
        unsigned long flags;
        u64 inc_hdr_h_sequence = 0;
+       bool rs_local = (!rs);
 
        inc->i_conn = conn;
        inc->i_rx_jiffies = jiffies;
@@ -524,7 +529,8 @@ rds_recv_local(struct rds_connection *conn, __be32 saddr, __be32 daddr,
                goto out;
        }
 
-       rs = rds_find_bound(daddr, inc->i_hdr.h_dport);
+       if (!rs)
+               rs = rds_find_bound(daddr, inc->i_hdr.h_dport);
        if (!rs) {
                rds_stats_inc(s_recv_drop_no_sock);
                goto out;
@@ -568,7 +574,7 @@ rds_recv_local(struct rds_connection *conn, __be32 saddr, __be32 daddr,
        write_unlock_irqrestore(&rs->rs_recv_lock, flags);
 
 out:
-       if (rs)
+       if (rs_local && rs)
                rds_sock_put(rs);
 }