From: Sowmini Varadhan Date: Tue, 12 Apr 2016 01:49:12 +0000 (-0700) Subject: [PATCH 2/2] Avoid redundant call to rds_bind_lookup() in recv path. X-Git-Tag: v4.1.12-92~175^2 X-Git-Url: https://www.infradead.org/git/?a=commitdiff_plain;h=a60d347e2a984e92ab1f7026d4d49eae070a8b04;p=users%2Fjedix%2Flinux-maple.git [PATCH 2/2] Avoid redundant call to rds_bind_lookup() in recv path. Orabug 20930687 When RAC tries to scale RDS-TCP, they are hitting bottlenecks due to inefficiencies in rds_bind_lookup. Each call to rds_bind_lookup results in an irqsave/irqrestore sequence, and when the list of RDS sockets is large, we end up having IRQs suppressed for long intervals. This trigger flow-control assertions and causes TX queue watchdog hangs in the sender. The current implementation makes this even worse, by superfluously calling rds_bind_lookup(). This patch set takes the first step to solving this problem by avoiding one of the redundant calls to rds_bind_lookup. Signed-off-by: Sowmini Varadhan Acked-by: Santosh Shilimkar --- diff --git a/net/rds/recv.c b/net/rds/recv.c index c00d28759d385..ec25e24266e33 100644 --- a/net/rds/recv.c +++ b/net/rds/recv.c @@ -56,7 +56,7 @@ rds_recv_forward(struct rds_connection *conn, struct rds_incoming *inc, static void rds_recv_local(struct rds_connection *conn, __be32 saddr, __be32 daddr, - struct rds_incoming *inc, gfp_t gfp); + struct rds_incoming *inc, gfp_t gfp, struct rds_sock *rs); static int rds_recv_ok(struct sock *sk, struct sk_buff *skb) @@ -231,11 +231,12 @@ void rds_recv_incoming(struct rds_connection *conn, __be32 saddr, __be32 daddr, /* pass it on locally if there is no socket bound, or if netfilter is * disabled for this socket */ if (NULL == rs || !rs->rs_netfilter_enabled) { + rds_recv_local(conn, saddr, daddr, inc, gfp, rs); + /* drop the reference if we had taken one */ if (NULL != rs) rds_sock_put(rs); - rds_recv_local(conn, saddr, daddr, inc, gfp); return; } @@ -249,7 +250,10 @@ void rds_recv_incoming(struct rds_connection *conn, __be32 saddr, __be32 daddr, rds_rtd(RDS_RTD_ERR, "failure to allocate space for inc %p, %u.%u.%u.%u -> %u.%d.%u.%u tos %d\n", inc, NIPQUAD(saddr), NIPQUAD(daddr), conn->c_tos); - rds_recv_local(conn, saddr, daddr, inc, gfp); + rds_recv_local(conn, saddr, daddr, inc, gfp, rs); + /* drop the reference if we had taken one */ + if (NULL != rs) + rds_sock_put(rs); return; } @@ -279,6 +283,7 @@ void rds_recv_incoming(struct rds_connection *conn, __be32 saddr, __be32 daddr, /* cleanup any references taken */ if (NULL != rs) rds_sock_put(rs); + rs = NULL; /* the original info is just a copy */ memcpy(org, dst, sizeof(struct rds_nf_hdr)); @@ -310,7 +315,7 @@ void rds_recv_incoming(struct rds_connection *conn, __be32 saddr, __be32 daddr, /* check the original header and if changed do the needful */ if (dst->saddr == org->saddr && dst->daddr == org->daddr && conn->c_trans->skb_local(skb)) { - rds_recv_local(conn, saddr, daddr, inc, gfp); + rds_recv_local(conn, saddr, daddr, inc, gfp, NULL); } /* the send both case does both a local recv and a reroute */ else if (dst->flags & RDS_NF_HDR_FLAG_BOTH) { @@ -319,7 +324,7 @@ void rds_recv_incoming(struct rds_connection *conn, __be32 saddr, __be32 daddr, rds_inc_addref(inc); /* send it up the stream locally */ - rds_recv_local(conn, saddr, daddr, inc, gfp); + rds_recv_local(conn, saddr, daddr, inc, gfp, NULL); /* and also reroute the request */ rds_recv_route(conn, inc, gfp); @@ -384,7 +389,7 @@ rds_recv_route(struct rds_connection *conn, struct rds_incoming *inc, /* this is a request for our local node, but potentially a different source * either way we process it locally */ else if (conn->c_trans->skb_local(inc->i_skb)) { - rds_recv_local(nconn, dst->saddr, dst->daddr, inc, gfp); + rds_recv_local(nconn, dst->saddr, dst->daddr, inc, gfp, NULL); } /* looks like this request is going out to another node */ else { @@ -449,17 +454,17 @@ out: NF_HOOK(PF_RDS_HOOK, NF_RDS_FORWARD_ERROR, sk, inc->i_skb, NULL, NULL, rds_recv_ok); /* then hand the request off to normal local processing on the old connection */ - rds_recv_local(inc->i_oconn, org->saddr, org->daddr, inc, gfp); + rds_recv_local(inc->i_oconn, org->saddr, org->daddr, inc, gfp, NULL); } static void rds_recv_local(struct rds_connection *conn, __be32 saddr, __be32 daddr, - struct rds_incoming *inc, gfp_t gfp) + struct rds_incoming *inc, gfp_t gfp, struct rds_sock *rs) { - struct rds_sock *rs = NULL; struct sock *sk; unsigned long flags; u64 inc_hdr_h_sequence = 0; + bool rs_local = (!rs); inc->i_conn = conn; inc->i_rx_jiffies = jiffies; @@ -524,7 +529,8 @@ rds_recv_local(struct rds_connection *conn, __be32 saddr, __be32 daddr, goto out; } - rs = rds_find_bound(daddr, inc->i_hdr.h_dport); + if (!rs) + rs = rds_find_bound(daddr, inc->i_hdr.h_dport); if (!rs) { rds_stats_inc(s_recv_drop_no_sock); goto out; @@ -568,7 +574,7 @@ rds_recv_local(struct rds_connection *conn, __be32 saddr, __be32 daddr, write_unlock_irqrestore(&rs->rs_recv_lock, flags); out: - if (rs) + if (rs_local && rs) rds_sock_put(rs); }