From 041dc3e4d34087d5abe6e8aecfe52531e29c0c51 Mon Sep 17 00:00:00 2001 From: Sowmini Varadhan Date: Thu, 27 Apr 2017 17:50:42 -0700 Subject: [PATCH] Backport multipath RDS from upstream to UEK4 This commit backports upstream Multipath RDS to UEK4. Orabug: 26241322 Signed-off-by: Sowmini Varadhan Acked-by: Santosh Shilimkar --- net/rds/af_rds.c | 39 ++- net/rds/cong.c | 3 +- net/rds/connection.c | 452 ++++++++++++++++++++++------------- net/rds/ib.c | 9 +- net/rds/ib.h | 8 +- net/rds/ib_cm.c | 18 +- net/rds/ib_rdma.c | 1 + net/rds/ib_recv.c | 7 +- net/rds/ib_send.c | 7 +- net/rds/loop.c | 15 +- net/rds/message.c | 2 + net/rds/rdma_transport.c | 8 +- net/rds/rds.h | 240 ++++++++++++------- net/rds/rds_single_path.h | 42 ++++ net/rds/recv.c | 203 ++++++++++++++-- net/rds/send.c | 488 +++++++++++++++++++++----------------- net/rds/tcp.c | 190 +++++++++------ net/rds/tcp.h | 24 +- net/rds/tcp_connect.c | 80 ++++--- net/rds/tcp_listen.c | 72 ++++-- net/rds/tcp_recv.c | 38 +-- net/rds/tcp_send.c | 43 ++-- net/rds/threads.c | 208 +++++++++------- 23 files changed, 1413 insertions(+), 784 deletions(-) create mode 100644 net/rds/rds_single_path.h diff --git a/net/rds/af_rds.c b/net/rds/af_rds.c index 894b0b23831e..eaed977aa9bf 100644 --- a/net/rds/af_rds.c +++ b/net/rds/af_rds.c @@ -37,6 +37,7 @@ #include #include #include +#include #include #include "rds.h" @@ -323,6 +324,24 @@ static int rds_cong_monitor(struct rds_sock *rs, char __user *optval, return ret; } +static void rds_user_conn_paths_drop(struct rds_connection *conn, int reason) +{ + int i; + struct rds_conn_path *cp; + + if (!conn->c_trans->t_mp_capable || conn->c_npaths == 1) { + cp = &conn->c_path[0]; + cp->cp_drop_source = reason; + rds_conn_path_drop(cp, DR_USER_RESET); + } else { + for (i = 0; i < RDS_MPATH_WORKERS; i++) { + cp = &conn->c_path[i]; + cp->cp_drop_source = reason; + rds_conn_path_drop(cp, DR_USER_RESET); + } + } +} + static int rds_user_reset(struct rds_sock *rs, char __user *optval, int optlen) { struct rds_reset reset; @@ -341,15 +360,14 @@ static int rds_user_reset(struct rds_sock *rs, char __user *optval, int optlen) pr_info("RDS: Reset ALL conns for Source %pI4\n", &reset.src.s_addr); - rds_conn_laddr_list(reset.src.s_addr, &s_addr_conns); + rds_conn_laddr_list(sock_net(rds_rs_to_sk(rs)), + reset.src.s_addr, &s_addr_conns); if (list_empty(&s_addr_conns)) goto done; list_for_each_entry(conn, &s_addr_conns, c_laddr_node) - if (conn) { - conn->c_drop_source = 1; - rds_conn_drop(conn, DR_USER_RESET); - } + if (conn) + rds_user_conn_paths_drop(conn, 1); goto done; } @@ -358,11 +376,14 @@ static int rds_user_reset(struct rds_sock *rs, char __user *optval, int optlen) rs->rs_transport, reset.tos); if (conn) { - printk(KERN_NOTICE "Resetting RDS/IB connection " + bool is_tcp = conn->c_trans->t_type == RDS_TRANS_TCP; + + printk(KERN_NOTICE "Resetting RDS/%s connection " "<%u.%u.%u.%u,%u.%u.%u.%u,%d>\n", + is_tcp ? "tcp" : "IB", NIPQUAD(reset.src.s_addr), NIPQUAD(reset.dst.s_addr), conn->c_tos); - rds_conn_drop(conn, DR_USER_RESET); + rds_user_conn_paths_drop(conn, DR_USER_RESET); } done: return 0; @@ -898,10 +919,14 @@ static void rds_exit(void) } module_exit(rds_exit); +u32 rds_gen_num; + static int rds_init(void) { int ret; + net_get_random_once(&rds_gen_num, sizeof(rds_gen_num)); + rds_bind_lock_init(); ret = rds_conn_init(); diff --git a/net/rds/cong.c b/net/rds/cong.c index 7ba13755c832..653707e56de9 100644 --- a/net/rds/cong.c +++ b/net/rds/cong.c @@ -242,7 +242,8 @@ void rds_cong_queue_updates(struct rds_cong_map *map) list_for_each_entry(conn, &map->m_conn_list, c_map_item) { if (!test_and_set_bit(0, &conn->c_map_queued)) { rds_stats_inc(s_cong_update_queued); - queue_delayed_work(conn->c_wq, &conn->c_send_w, 0); + queue_delayed_work(conn->c_path[0].cp_wq, + &conn->c_path[0].cp_send_w, 0); } } diff --git a/net/rds/connection.c b/net/rds/connection.c index e021746e46d0..99556dfdd465 100644 --- a/net/rds/connection.c +++ b/net/rds/connection.c @@ -91,7 +91,8 @@ static struct rds_connection *rds_conn_lookup(struct net *net, return ret; } -void rds_conn_laddr_list(__be32 laddr, struct list_head *laddr_conns) +void rds_conn_laddr_list(struct net *net, + __be32 laddr, struct list_head *laddr_conns) { struct rds_connection *conn; struct hlist_head *head; @@ -102,7 +103,8 @@ void rds_conn_laddr_list(__be32 laddr, struct list_head *laddr_conns) for (i = 0, head = rds_conn_hash; i < ARRAY_SIZE(rds_conn_hash); i++, head++) { hlist_for_each_entry_rcu(conn, head, c_hash_node) - if (conn->c_laddr == laddr) + if (conn->c_laddr == laddr && + net == rds_conn_net(conn)) list_add(&conn->c_laddr_node, laddr_conns); } @@ -115,14 +117,16 @@ void rds_conn_laddr_list(__be32 laddr, struct list_head *laddr_conns) * and receiving over this connection again in the future. It is up to * the transport to have serialized this call with its send and recv. */ -void rds_conn_reset(struct rds_connection *conn) +void rds_conn_path_reset(struct rds_conn_path *cp) { + struct rds_connection *conn = cp->cp_conn; + rdsdebug("connection %pI4 to %pI4 reset\n", &conn->c_laddr, &conn->c_faddr); rds_stats_inc(s_conn_reset); - rds_send_reset(conn); - conn->c_flags = 0; + rds_send_path_reset(cp); + cp->cp_flags = 0; /* Do not clear next_rx_seq here, else we cannot distinguish * retransmitted packets from new packets, and will hand all @@ -130,6 +134,41 @@ void rds_conn_reset(struct rds_connection *conn) * reliability guarantees of RDS. */ } +static void __rds_conn_path_init(struct rds_connection *conn, + struct rds_conn_path *cp, bool is_outgoing) +{ + spin_lock_init(&cp->cp_lock); + cp->cp_next_tx_seq = 1; + init_waitqueue_head(&cp->cp_waitq); + INIT_LIST_HEAD(&cp->cp_send_queue); + INIT_LIST_HEAD(&cp->cp_retrans); + + cp->cp_conn = conn; + atomic_set(&cp->cp_state, RDS_CONN_DOWN); + cp->cp_send_gen = 0; + /* cp_outgoing is per-path. So we can only set it here + * for the single-path transports. + */ + if (!conn->c_trans->t_mp_capable) + cp->cp_outgoing = (is_outgoing ? 1 : 0); + cp->cp_reconnect_jiffies = 0; + cp->cp_reconnect_start = get_seconds(); + cp->cp_reconnect_warn = 1; + cp->cp_reconnect_drops = 0; + cp->cp_reconnect_err = 0; + cp->cp_conn->c_proposed_version = RDS_PROTOCOL_VERSION; + cp->cp_route_resolved = 1; + INIT_DELAYED_WORK(&cp->cp_send_w, rds_send_worker); + INIT_DELAYED_WORK(&cp->cp_recv_w, rds_recv_worker); + INIT_DELAYED_WORK(&cp->cp_conn_w, rds_connect_worker); + INIT_DELAYED_WORK(&cp->cp_hb_w, rds_hb_worker); + INIT_DELAYED_WORK(&cp->cp_reconn_w, rds_reconnect_timeout); + INIT_DELAYED_WORK(&cp->cp_reject_w, rds_reject_worker); + INIT_WORK(&cp->cp_down_w, rds_shutdown_worker); + mutex_init(&cp->cp_cm_lock); + cp->cp_flags = 0; +} + /* * There is only every one 'conn' for a given pair of addresses in the * system at a time. They contain messages to be retransmitted and so @@ -148,7 +187,7 @@ static struct rds_connection *__rds_conn_create(struct net *net, struct hlist_head *head = rds_conn_bucket(laddr, faddr); struct rds_transport *loop_trans; unsigned long flags; - int ret; + int ret, i; rcu_read_lock(); conn = rds_conn_lookup(net, head, laddr, faddr, trans, tos); @@ -179,14 +218,8 @@ static struct rds_connection *__rds_conn_create(struct net *net, INIT_HLIST_NODE(&conn->c_hash_node); conn->c_laddr = laddr; conn->c_faddr = faddr; - spin_lock_init(&conn->c_lock); - conn->c_next_tx_seq = 1; rds_conn_net_set(conn, net); - init_waitqueue_head(&conn->c_waitq); - INIT_LIST_HEAD(&conn->c_send_queue); - INIT_LIST_HEAD(&conn->c_retrans); - conn->c_tos = tos; ret = rds_cong_get_maps(conn); @@ -215,14 +248,22 @@ static struct rds_connection *__rds_conn_create(struct net *net, } conn->c_trans = trans; - conn->c_reconnect_retry = rds_sysctl_reconnect_retry_ms; - conn->c_reconnect_retry_count = 0; - if (conn->c_loopback) - conn->c_wq = rds_local_wq; - else - conn->c_wq = rds_wq; + init_waitqueue_head(&conn->c_hs_waitq); + for (i = 0; i < RDS_MPATH_WORKERS; i++) { + struct rds_conn_path *cp; + + cp = &conn->c_path[i]; + __rds_conn_path_init(conn, cp, is_outgoing); + cp->cp_index = i; + cp->cp_reconnect_retry = rds_sysctl_reconnect_retry_ms; + cp->cp_reconnect_retry_count = 0; + if (conn->c_loopback) + cp->cp_wq = rds_local_wq; + else + cp->cp_wq = rds_wq; + } ret = trans->conn_alloc(conn, gfp); if (ret) { kmem_cache_free(rds_conn_slab, conn); @@ -230,27 +271,6 @@ static struct rds_connection *__rds_conn_create(struct net *net, goto out; } - atomic_set(&conn->c_state, RDS_CONN_DOWN); - conn->c_send_gen = 0; - conn->c_outgoing = (is_outgoing ? 1 : 0); - conn->c_reconnect_jiffies = 0; - conn->c_reconnect_start = get_seconds(); - conn->c_reconnect_warn = 1; - conn->c_reconnect_drops = 0; - conn->c_reconnect_err = 0; - conn->c_proposed_version = RDS_PROTOCOL_VERSION; - conn->c_route_resolved = 1; - - INIT_DELAYED_WORK(&conn->c_send_w, rds_send_worker); - INIT_DELAYED_WORK(&conn->c_recv_w, rds_recv_worker); - INIT_DELAYED_WORK(&conn->c_conn_w, rds_connect_worker); - INIT_DELAYED_WORK(&conn->c_hb_w, rds_hb_worker); - INIT_DELAYED_WORK(&conn->c_reconn_w, rds_reconnect_timeout); - INIT_DELAYED_WORK(&conn->c_reject_w, rds_reject_worker); - INIT_WORK(&conn->c_down_w, rds_shutdown_worker); - mutex_init(&conn->c_cm_lock); - conn->c_flags = 0; - rdsdebug("allocated conn %p for %pI4 -> %pI4 over %s %s\n", conn, &laddr, &faddr, trans->t_name ? trans->t_name : "[unknown]", @@ -267,7 +287,7 @@ static struct rds_connection *__rds_conn_create(struct net *net, if (parent) { /* Creating passive conn */ if (parent->c_passive) { - trans->conn_free(conn->c_transport_data); + trans->conn_free(conn->c_path[0].cp_transport_data); kmem_cache_free(rds_conn_slab, conn); conn = parent->c_passive; } else { @@ -281,10 +301,23 @@ static struct rds_connection *__rds_conn_create(struct net *net, found = rds_conn_lookup(net, head, laddr, faddr, trans, tos); if (found) { - trans->conn_free(conn->c_transport_data); + struct rds_conn_path *cp; + int i; + + for (i = 0; i < RDS_MPATH_WORKERS; i++) { + cp = &conn->c_path[i]; + /* The ->conn_alloc invocation may have + * allocated resource for all paths, so all + * of them may have to be freed here. + */ + if (cp->cp_transport_data) + trans->conn_free(cp->cp_transport_data); + } kmem_cache_free(rds_conn_slab, conn); conn = found; } else { + conn->c_my_gen_num = rds_gen_num; + conn->c_peer_gen_num = 0; hlist_add_head_rcu(&conn->c_hash_node, head); rds_cong_add_conn(conn); rds_conn_count++; @@ -329,12 +362,15 @@ struct rds_connection *rds_conn_find(struct net *net, __be32 laddr, } EXPORT_SYMBOL_GPL(rds_conn_find); -void rds_conn_shutdown(struct rds_connection *conn, int restart) +void rds_conn_shutdown(struct rds_conn_path *cp, int restart) { + struct rds_connection *conn = cp->cp_conn; + /* shut it down unless it's down already */ - if (!rds_conn_transition(conn, RDS_CONN_DOWN, RDS_CONN_DOWN)) { + if (!rds_conn_path_transition(cp, RDS_CONN_DOWN, RDS_CONN_DOWN)) { rds_rtd(RDS_RTD_CM_EXT, - "RDS/IB: shutdown init <%u.%u.%u.%u,%u.%u.%u.%u,%d>, cn %p, cn->c_p %p\n", + "RDS/%s: shutdown init <%u.%u.%u.%u,%u.%u.%u.%u,%d>, cn %p, cn->c_p %p\n", + conn->c_trans->t_type == RDS_TRANS_TCP ? "TCP" : "IB", NIPQUAD(conn->c_laddr), NIPQUAD(conn->c_faddr), conn->c_tos, conn, conn->c_passive); /* @@ -344,34 +380,35 @@ void rds_conn_shutdown(struct rds_connection *conn, int restart) * deadlocking with the CM handler. Instead, the CM event * handler is supposed to check for state DISCONNECTING */ - mutex_lock(&conn->c_cm_lock); - if (!rds_conn_transition(conn, RDS_CONN_UP, RDS_CONN_DISCONNECTING) - && !rds_conn_transition(conn, RDS_CONN_ERROR, RDS_CONN_DISCONNECTING)) { - pr_warn("RDS: shutdown called in state %d\n", - atomic_read(&conn->c_state)); - rds_conn_drop(conn, DR_INV_CONN_STATE); - mutex_unlock(&conn->c_cm_lock); + mutex_lock(&cp->cp_cm_lock); + if (!rds_conn_path_transition(cp, RDS_CONN_UP, + RDS_CONN_DISCONNECTING) && + !rds_conn_path_transition(cp, RDS_CONN_ERROR, + RDS_CONN_DISCONNECTING)) { + rds_conn_path_drop(cp, DR_INV_CONN_STATE); + mutex_unlock(&cp->cp_cm_lock); return; } - mutex_unlock(&conn->c_cm_lock); + mutex_unlock(&cp->cp_cm_lock); - wait_event(conn->c_waitq, - !test_bit(RDS_IN_XMIT, &conn->c_flags)); - wait_event(conn->c_waitq, - !test_bit(RDS_RECV_REFILL, &conn->c_flags)); + wait_event(cp->cp_waitq, + !test_bit(RDS_IN_XMIT, &cp->cp_flags)); + wait_event(cp->cp_waitq, + !test_bit(RDS_RECV_REFILL, &cp->cp_flags)); - conn->c_trans->conn_shutdown(conn); - rds_conn_reset(conn); + conn->c_trans->conn_path_shutdown(cp); + rds_conn_path_reset(cp); - if (!rds_conn_transition(conn, RDS_CONN_DISCONNECTING, RDS_CONN_DOWN)) { + if (!rds_conn_path_transition(cp, RDS_CONN_DISCONNECTING, + RDS_CONN_DOWN)) { /* This can happen - eg when we're in the middle of tearing * down the connection, and someone unloads the rds module. * Quite reproduceable with loopback connections. * Mostly harmless. */ pr_warn("RDS: %s: failed to transition to state DOWN, current state is %d\n", - __func__, atomic_read(&conn->c_state)); - rds_conn_drop(conn, DR_DOWN_TRANSITION_FAIL); + __func__, atomic_read(&cp->cp_state)); + rds_conn_path_drop(cp, DR_DOWN_TRANSITION_FAIL); return; } } @@ -380,68 +417,47 @@ void rds_conn_shutdown(struct rds_connection *conn, int restart) * The passive side of an IB loopback connection is never added * to the conn hash, so we never trigger a reconnect on this * conn - the reconnect is always triggered by the active peer. */ - cancel_delayed_work_sync(&conn->c_conn_w); + cancel_delayed_work_sync(&cp->cp_conn_w); rcu_read_lock(); if (!hlist_unhashed(&conn->c_hash_node) && restart) { rcu_read_unlock(); - if (conn->c_trans->t_type != RDS_TRANS_TCP || - conn->c_outgoing == 1) { - rds_rtd(RDS_RTD_CM_EXT, - "queueing reconnect request... " - "<%u.%u.%u.%u,%u.%u.%u.%u,%d>\n", - NIPQUAD(conn->c_laddr), - NIPQUAD(conn->c_faddr), - conn->c_tos); - rds_queue_reconnect(conn); - } + rds_queue_reconnect(cp); } else { rcu_read_unlock(); } } -/* - * Stop and free a connection. - * - * This can only be used in very limited circumstances. It assumes that once - * the conn has been shutdown that no one else is referencing the connection. - * We can only ensure this in the rmmod path in the current code. +/* destroy a single rds_conn_path. rds_conn_destroy() iterates over + * all paths using rds_conn_path_destroy() */ -void rds_conn_destroy(struct rds_connection *conn, int shutdown) +static void rds_conn_path_destroy(struct rds_conn_path *cp, int shutdown) { struct rds_message *rm, *rtmp; - unsigned long flags; LIST_HEAD(to_be_dropped); - rds_rtd(RDS_RTD_CM, "freeing conn %p <%u.%u.%u.%u,%u.%u.%u.%u,%d>\n", - conn, NIPQUAD(conn->c_laddr), NIPQUAD(conn->c_faddr), - conn->c_tos); - - set_bit(RDS_DESTROY_PENDING, &conn->c_flags); + cp->cp_drop_source = DR_CONN_DESTROY; + set_bit(RDS_DESTROY_PENDING, &cp->cp_flags); - /* Ensure conn will not be scheduled for reconnect */ - spin_lock_irq(&rds_conn_lock); - hlist_del_init_rcu(&conn->c_hash_node); - spin_unlock_irq(&rds_conn_lock); - synchronize_rcu(); + if (!cp->cp_transport_data) + return; - /* shut the connection down */ - rds_conn_drop(conn, DR_CONN_DESTROY); - flush_work(&conn->c_down_w); + rds_conn_path_drop(cp, DR_CONN_DESTROY); + flush_work(&cp->cp_down_w); /* now that conn down worker is flushed; there cannot be any * more posting of reconn timeout work. But cancel any already * posted reconn timeout worker as there is a race between rds * module unload and a pending reconn delay work. */ - cancel_delayed_work_sync(&conn->c_reconn_w); + cancel_delayed_work_sync(&cp->cp_reconn_w); /* make sure lingering queued work won't try to ref the conn */ - cancel_delayed_work_sync(&conn->c_send_w); - cancel_delayed_work_sync(&conn->c_recv_w); + cancel_delayed_work_sync(&cp->cp_send_w); + cancel_delayed_work_sync(&cp->cp_recv_w); /* tear down queued messages */ list_for_each_entry_safe(rm, rtmp, - &conn->c_send_queue, + &cp->cp_send_queue, m_conn_item) { if (shutdown) { list_del_init(&rm->m_conn_item); @@ -451,14 +467,44 @@ void rds_conn_destroy(struct rds_connection *conn, int shutdown) list_move_tail(&rm->m_conn_item, &to_be_dropped); } } - if (!list_empty(&to_be_dropped)) - rds_send_remove_from_sock(&to_be_dropped, RDS_RDMA_SEND_DROPPED); + rds_send_remove_from_sock(&to_be_dropped, + RDS_RDMA_SEND_DROPPED); + if (cp->cp_xmit_rm) + rds_message_put(cp->cp_xmit_rm); + + cp->cp_conn->c_trans->conn_free(cp->cp_transport_data); +} + +/* Stop and free a connection. + * + * This can only be used in very limited circumstances. It assumes that once + * the conn has been shutdown that no one else is referencing the connection. + * We can only ensure this in the rmmod path in the current code. + */ +void rds_conn_destroy(struct rds_connection *conn, int shutdown) +{ + unsigned long flags; + int i; + + rds_rtd(RDS_RTD_CM, "freeing conn %p <%u.%u.%u.%u,%u.%u.%u.%u,%d>\n", + conn, NIPQUAD(conn->c_laddr), NIPQUAD(conn->c_faddr), + conn->c_tos); + + /* Ensure conn will not be scheduled for reconnect */ + spin_lock_irq(&rds_conn_lock); + hlist_del_init_rcu(&conn->c_hash_node); + spin_unlock_irq(&rds_conn_lock); + synchronize_rcu(); - if (conn->c_xmit_rm) - rds_message_put(conn->c_xmit_rm); + /* shut the connection down */ + for (i = 0; i < RDS_MPATH_WORKERS; i++) { + struct rds_conn_path *cp; - conn->c_trans->conn_free(conn->c_transport_data); + cp = &conn->c_path[i]; + rds_conn_path_destroy(cp, shutdown); + BUG_ON(!list_empty(&cp->cp_retrans)); + } /* * The congestion maps aren't freed up here. They're @@ -467,7 +513,6 @@ void rds_conn_destroy(struct rds_connection *conn, int shutdown) */ rds_cong_remove_conn(conn); - BUG_ON(!list_empty(&conn->c_retrans)); kmem_cache_free(rds_conn_slab, conn); spin_lock_irqsave(&rds_conn_lock, flags); @@ -488,6 +533,7 @@ static void rds_conn_message_info(struct socket *sock, unsigned int len, unsigned int total = 0; unsigned long flags; size_t i; + int j; len /= sizeof(struct rds_info_message); @@ -496,25 +542,34 @@ static void rds_conn_message_info(struct socket *sock, unsigned int len, for (i = 0, head = rds_conn_hash; i < ARRAY_SIZE(rds_conn_hash); i++, head++) { hlist_for_each_entry_rcu(conn, head, c_hash_node) { - if (want_send) - list = &conn->c_send_queue; - else - list = &conn->c_retrans; - - spin_lock_irqsave(&conn->c_lock, flags); - conn->c_rdsinfo_pending = 1; - - /* XXX too lazy to maintain counts.. */ - list_for_each_entry(rm, list, m_conn_item) { - total++; - if (total <= len) - rds_inc_info_copy(&rm->m_inc, iter, - conn->c_laddr, - conn->c_faddr, 0); + struct rds_conn_path *cp; + + for (j = 0; j < RDS_MPATH_WORKERS; j++) { + cp = &conn->c_path[j]; + if (want_send) + list = &cp->cp_send_queue; + else + list = &cp->cp_retrans; + + cp->cp_rdsinfo_pending = 1; + spin_lock_irqsave(&cp->cp_lock, flags); + + /* XXX too lazy to maintain counts.. */ + list_for_each_entry(rm, list, m_conn_item) { + total++; + if (total <= len) + rds_inc_info_copy(&rm->m_inc, + iter, + conn->c_laddr, + conn->c_faddr, + 0); + } + + cp->cp_rdsinfo_pending = 0; + spin_unlock_irqrestore(&cp->cp_lock, flags); + if (!conn->c_trans->t_mp_capable) + break; } - - conn->c_rdsinfo_pending = 0; - spin_unlock_irqrestore(&conn->c_lock, flags); } } rcu_read_unlock(); @@ -576,30 +631,75 @@ void rds_for_each_conn_info(struct socket *sock, unsigned int len, } EXPORT_SYMBOL_GPL(rds_for_each_conn_info); -static int rds_conn_info_visitor(struct rds_connection *conn, - void *buffer) +void rds_walk_conn_path_info(struct socket *sock, unsigned int len, + struct rds_info_iterator *iter, + struct rds_info_lengths *lens, + int (*visitor)(struct rds_conn_path *, void *), + size_t item_len) +{ + u64 buffer[(item_len + 7) / 8]; + struct hlist_head *head; + struct rds_connection *conn; + size_t i; + int j; + + rcu_read_lock(); + + lens->nr = 0; + lens->each = item_len; + + for (i = 0, head = rds_conn_hash; i < ARRAY_SIZE(rds_conn_hash); + i++, head++) { + hlist_for_each_entry_rcu(conn, head, c_hash_node) { + struct rds_conn_path *cp; + + for (j = 0; j < RDS_MPATH_WORKERS; j++) { + cp = &conn->c_path[j]; + + /* XXX no cp_lock usage.. */ + if (!visitor(cp, buffer)) + continue; + if (!conn->c_trans->t_mp_capable) + break; + } + + /* We copy as much as we can fit in the buffer, + * but we count all items so that the caller + * can resize the buffer. + */ + if (len >= item_len) { + rds_info_copy(iter, buffer, item_len); + len -= item_len; + } + lens->nr++; + } + } + rcu_read_unlock(); +} + +static int rds_conn_info_visitor(struct rds_conn_path *cp, void *buffer) { struct rds_info_connection *cinfo = buffer; - cinfo->next_tx_seq = conn->c_next_tx_seq; - cinfo->next_rx_seq = conn->c_next_rx_seq; - cinfo->laddr = conn->c_laddr; - cinfo->faddr = conn->c_faddr; - cinfo->tos = conn->c_tos; - strncpy(cinfo->transport, conn->c_trans->t_name, + cinfo->next_tx_seq = cp->cp_next_tx_seq; + cinfo->next_rx_seq = cp->cp_next_rx_seq; + cinfo->laddr = cp->cp_conn->c_laddr; + cinfo->faddr = cp->cp_conn->c_faddr; + cinfo->tos = cp->cp_conn->c_tos; + strncpy(cinfo->transport, cp->cp_conn->c_trans->t_name, sizeof(cinfo->transport)); cinfo->flags = 0; - rds_conn_info_set(cinfo->flags, test_bit(RDS_IN_XMIT, &conn->c_flags), + rds_conn_info_set(cinfo->flags, test_bit(RDS_IN_XMIT, &cp->cp_flags), SENDING); /* XXX Future: return the state rather than these funky bits */ rds_conn_info_set(cinfo->flags, - atomic_read(&conn->c_state) == RDS_CONN_CONNECTING, + atomic_read(&cp->cp_state) == RDS_CONN_CONNECTING, CONNECTING); rds_conn_info_set(cinfo->flags, - atomic_read(&conn->c_state) == RDS_CONN_UP, + atomic_read(&cp->cp_state) == RDS_CONN_UP, CONNECTED); - rds_conn_info_set(cinfo->flags, conn->c_pending_flush, + rds_conn_info_set(cinfo->flags, cp->cp_pending_flush, ERROR); return 1; } @@ -608,7 +708,7 @@ static void rds_conn_info(struct socket *sock, unsigned int len, struct rds_info_iterator *iter, struct rds_info_lengths *lens) { - rds_for_each_conn_info(sock, len, iter, lens, + rds_walk_conn_path_info(sock, len, iter, lens, rds_conn_info_visitor, sizeof(struct rds_info_connection)); } @@ -718,6 +818,7 @@ static void rds_conn_probe_lanes(struct rds_connection *conn) rds_conn_bucket(conn->c_laddr, conn->c_faddr); struct rds_connection *tmp; + /* XXX only do this for IB transport? */ rcu_read_lock(); hlist_for_each_entry_rcu(tmp, head, c_hash_node) { if (tmp->c_faddr == conn->c_faddr && @@ -726,10 +827,11 @@ static void rds_conn_probe_lanes(struct rds_connection *conn) tmp->c_trans == conn->c_trans) { if (rds_conn_up(tmp)) rds_send_hb(tmp, 0); - else if (rds_conn_connecting(tmp) && (tmp->c_route_resolved == 0)) { + else if (rds_conn_connecting(tmp) && + (tmp->c_path[0].cp_route_resolved == 0)) { printk(KERN_INFO "RDS/IB: connection " "<%u.%u.%u.%u,%u.%u.%u.%u,%d> " - "connecting, force reset\n", + "connecting, force reset ", NIPQUAD(tmp->c_laddr), NIPQUAD(tmp->c_faddr), tmp->c_tos); @@ -744,51 +846,64 @@ static void rds_conn_probe_lanes(struct rds_connection *conn) /* * Force a disconnect */ -void rds_conn_drop(struct rds_connection *conn, int reason) +void rds_conn_path_drop(struct rds_conn_path *cp, int reason) { unsigned long now = get_seconds(); - - conn->c_drop_source = reason; - if (rds_conn_state(conn) == RDS_CONN_UP) { - conn->c_reconnect_start = now; - conn->c_reconnect_warn = 1; - conn->c_reconnect_drops = 0; - conn->c_reconnect_err = 0; - printk(KERN_INFO "RDS/IB: connection " - "<%u.%u.%u.%u,%u.%u.%u.%u,%d> dropped due to '%s'\n", - NIPQUAD(conn->c_laddr), - NIPQUAD(conn->c_faddr), - conn->c_tos, - conn_drop_reason_str(reason)); + struct rds_connection *conn = cp->cp_conn; + + cp->cp_drop_source = reason; + if (rds_conn_path_state(cp) == RDS_CONN_UP) { + cp->cp_reconnect_start = now; + cp->cp_reconnect_warn = 1; + cp->cp_reconnect_drops = 0; + cp->cp_reconnect_err = 0; + cp->cp_reconnect_racing = 0; + if (conn->c_trans->t_type != RDS_TRANS_TCP) + printk(KERN_INFO "RDS/IB: connection " + "<%u.%u.%u.%u,%u.%u.%u.%u,%d> " + "dropped due to '%s'\n", + NIPQUAD(conn->c_laddr), + NIPQUAD(conn->c_faddr), + conn->c_tos, + conn_drop_reason_str(cp->cp_drop_source)); if (conn->c_tos == 0) rds_conn_probe_lanes(conn); - } else if ((conn->c_reconnect_warn) && - (now - conn->c_reconnect_start > 60)) { - printk(KERN_INFO "RDS/IB: re-connect " + } else if ((cp->cp_reconnect_warn) && + (now - cp->cp_reconnect_start > 60)) { + printk(KERN_INFO "RDS/%s: re-connect " "<%u.%u.%u.%u,%u.%u.%u.%u,%d> stalling " "for more than 1 min...(drops=%u err=%d)\n", + conn->c_trans->t_type == RDS_TRANS_TCP ? "TCP" : "IB", NIPQUAD(conn->c_laddr), NIPQUAD(conn->c_faddr), conn->c_tos, - conn->c_reconnect_drops, - conn->c_reconnect_err); - conn->c_reconnect_warn = 0; + cp->cp_reconnect_drops, + cp->cp_reconnect_err); + cp->cp_reconnect_warn = 0; if (conn->c_tos == 0) rds_conn_probe_lanes(conn); } - conn->c_reconnect_drops++; + cp->cp_reconnect_drops++; - atomic_set(&conn->c_state, RDS_CONN_ERROR); + atomic_set(&cp->cp_state, RDS_CONN_ERROR); rds_rtd(RDS_RTD_CM_EXT, - "RDS/IB: queueing shutdown work, conn %p, <%u.%u.%u.%u,%u.%u.%u.%u,%d>\n", + "RDS/%s: queueing shutdown work, conn %p, <%u.%u.%u.%u,%u.%u.%u.%u,%d>\n", + conn->c_trans->t_type == RDS_TRANS_TCP ? "TCP" : "IB", conn, NIPQUAD(conn->c_laddr), NIPQUAD(conn->c_faddr), conn->c_tos); - queue_work(conn->c_wq, &conn->c_down_w); + queue_work(cp->cp_wq, &cp->cp_down_w); +} +EXPORT_SYMBOL_GPL(rds_conn_path_drop); + +void rds_conn_drop(struct rds_connection *conn, int reason) +{ + WARN_ON(conn->c_trans->t_mp_capable); + rds_conn_path_drop(&conn->c_path[0], reason); } EXPORT_SYMBOL_GPL(rds_conn_drop); @@ -796,15 +911,24 @@ EXPORT_SYMBOL_GPL(rds_conn_drop); * If the connection is down, trigger a connect. We may have scheduled a * delayed reconnect however - in this case we should not interfere. */ -void rds_conn_connect_if_down(struct rds_connection *conn) +void rds_conn_path_connect_if_down(struct rds_conn_path *cp) { - if (rds_conn_state(conn) == RDS_CONN_DOWN && - !test_and_set_bit(RDS_RECONNECT_PENDING, &conn->c_flags)) { + struct rds_connection *conn = cp->cp_conn; + + if (rds_conn_path_state(cp) == RDS_CONN_DOWN && + !test_and_set_bit(RDS_RECONNECT_PENDING, &cp->cp_flags)) { rds_rtd(RDS_RTD_CM_EXT, "queueing connect work, conn %p, <%u.%u.%u.%u,%u.%u.%u.%u,%d>\n", conn, NIPQUAD(conn->c_laddr), NIPQUAD(conn->c_faddr), conn->c_tos); - queue_delayed_work(conn->c_wq, &conn->c_conn_w, 0); + queue_delayed_work(cp->cp_wq, &cp->cp_conn_w, 0); } } +EXPORT_SYMBOL_GPL(rds_conn_path_connect_if_down); + +void rds_conn_connect_if_down(struct rds_connection *conn) +{ + WARN_ON(conn->c_trans->t_mp_capable); + rds_conn_path_connect_if_down(&conn->c_path[0]); +} EXPORT_SYMBOL_GPL(rds_conn_connect_if_down); diff --git a/net/rds/ib.c b/net/rds/ib.c index 7e7d2cd02bd0..d34ca3383a39 100644 --- a/net/rds/ib.c +++ b/net/rds/ib.c @@ -48,6 +48,7 @@ #include "rds.h" #include "ib.h" #include "tcp.h" +#include "rds_single_path.h" #include unsigned int rds_ib_fmr_1m_pool_size = RDS_FMR_1M_POOL_SIZE; @@ -2762,15 +2763,15 @@ void rds_ib_exit(void) struct rds_transport rds_ib_transport = { .laddr_check = rds_ib_laddr_check, - .xmit_complete = rds_ib_xmit_complete, + .xmit_path_complete = rds_ib_xmit_path_complete, .xmit = rds_ib_xmit, .xmit_rdma = rds_ib_xmit_rdma, .xmit_atomic = rds_ib_xmit_atomic, - .recv = rds_ib_recv, + .recv_path = rds_ib_recv_path, .conn_alloc = rds_ib_conn_alloc, .conn_free = rds_ib_conn_free, - .conn_connect = rds_ib_conn_connect, - .conn_shutdown = rds_ib_conn_shutdown, + .conn_path_connect = rds_ib_conn_path_connect, + .conn_path_shutdown = rds_ib_conn_path_shutdown, .inc_copy_to_user = rds_ib_inc_copy_to_user, .inc_free = rds_ib_inc_free, .inc_to_skb = rds_ib_inc_to_skb, diff --git a/net/rds/ib.h b/net/rds/ib.h index 48c7d6a7fca9..fd705edac6a7 100644 --- a/net/rds/ib.h +++ b/net/rds/ib.h @@ -561,8 +561,8 @@ extern struct delayed_work riif_dlywork; /* ib_cm.c */ int rds_ib_conn_alloc(struct rds_connection *conn, gfp_t gfp); void rds_ib_conn_free(void *arg); -int rds_ib_conn_connect(struct rds_connection *conn); -void rds_ib_conn_shutdown(struct rds_connection *conn); +int rds_ib_conn_path_connect(struct rds_conn_path *cp); +void rds_ib_conn_path_shutdown(struct rds_conn_path *cp); void rds_ib_state_change(struct sock *sk); int rds_ib_listen_init(void); void rds_ib_listen_stop(void); @@ -593,7 +593,7 @@ void rds_ib_fmr_exit(void); /* ib_recv.c */ int rds_ib_recv_init(void); void rds_ib_recv_exit(void); -int rds_ib_recv(struct rds_connection *conn); +int rds_ib_recv_path(struct rds_conn_path *cp); int rds_ib_recv_alloc_caches(struct rds_ib_connection *ic); void rds_ib_recv_free_caches(struct rds_ib_connection *ic); void rds_ib_recv_rebuild_caches(struct rds_ib_connection *ic); @@ -629,7 +629,7 @@ extern wait_queue_head_t rds_ib_ring_empty_wait; /* ib_send.c */ char *rds_ib_wc_status_str(enum ib_wc_status status); -void rds_ib_xmit_complete(struct rds_connection *conn); +void rds_ib_xmit_path_complete(struct rds_conn_path *cp); int rds_ib_xmit(struct rds_connection *conn, struct rds_message *rm, unsigned int hdr_off, unsigned int sg, unsigned int off); void rds_ib_send_cqe_handler(struct rds_ib_connection *ic, diff --git a/net/rds/ib_cm.c b/net/rds/ib_cm.c index 6dcc2877811e..c4103f4c0ad0 100644 --- a/net/rds/ib_cm.c +++ b/net/rds/ib_cm.c @@ -41,6 +41,7 @@ #include "rds.h" #include "ib.h" #include "tcp.h" +#include "rds_single_path.h" static unsigned int rds_ib_max_frag = RDS_MAX_FRAG_SIZE; static unsigned int ib_init_frag_size = RDS_FRAG_SIZE; @@ -492,7 +493,7 @@ void rds_ib_tasklet_fn_send(unsigned long data) if (rds_conn_up(conn) && (!test_bit(RDS_LL_SEND_FULL, &conn->c_flags) || test_bit(0, &conn->c_map_queued))) - rds_send_xmit(ic->conn); + rds_send_xmit(&ic->conn->c_path[0]); } /* @@ -535,7 +536,7 @@ static void rds_ib_rx(struct rds_ib_connection *ic) if ((atomic_read(&rds_ibdev->srq->s_num_posted) < rds_ib_srq_hwm_refill) && !test_and_set_bit(0, &rds_ibdev->srq->s_refill_gate)) - queue_delayed_work(conn->c_wq, + queue_delayed_work(conn->c_path[0].cp_wq, &rds_ibdev->srq->s_refill_w, 0); if (ic->i_rx_poll_cq >= RDS_IB_RX_LIMIT) { @@ -928,7 +929,8 @@ int rds_ib_cm_handle_connect(struct rdma_cm_id *cm_id, rds_ib_stats_inc(s_ib_listen_closed_stale); } else if (rds_conn_state(conn) == RDS_CONN_CONNECTING) { unsigned long now = get_seconds(); - unsigned long retry = conn->c_reconnect_retry; + unsigned long retry = + conn->c_path[0].cp_reconnect_retry; /* after retry seconds, give up on @@ -943,7 +945,7 @@ int rds_ib_cm_handle_connect(struct rdma_cm_id *cm_id, &conn->c_laddr, &conn->c_faddr, conn->c_tos, retry); set_bit(RDS_RECONNECT_TIMEDOUT, - &conn->c_reconn_flags); + &conn->c_path[0].cp_reconn_flags); rds_conn_drop(conn, DR_RECONNECT_TIMEOUT); rds_ib_stats_inc(s_ib_listen_closed_stale); } else { @@ -1102,8 +1104,9 @@ out: return ret; } -int rds_ib_conn_connect(struct rds_connection *conn) +int rds_ib_conn_path_connect(struct rds_conn_path *cp) { + struct rds_connection *conn = cp->cp_conn; struct rds_ib_connection *ic = conn->c_transport_data; struct sockaddr_in src, dest; int ret; @@ -1153,8 +1156,9 @@ out: * so that it can be called at any point during startup. In fact it * can be called multiple times for a given connection. */ -void rds_ib_conn_shutdown(struct rds_connection *conn) +void rds_ib_conn_path_shutdown(struct rds_conn_path *cp) { + struct rds_connection *conn = cp->cp_conn; struct rds_ib_connection *ic = conn->c_transport_data; int err = 0; @@ -1331,7 +1335,7 @@ int rds_ib_conn_alloc(struct rds_connection *conn, gfp_t gfp) spin_lock_init(&ic->i_rx_lock); /* - * rds_ib_conn_shutdown() waits for these to be emptied so they + * rds_ib_conn_path_shutdown() waits for these to be emptied so they * must be initialized before it can be called. */ rds_ib_ring_init(&ic->i_send_ring, rds_ib_sysctl_max_send_wr); diff --git a/net/rds/ib_rdma.c b/net/rds/ib_rdma.c index dac3a35620c0..e8b003a7a9c3 100644 --- a/net/rds/ib_rdma.c +++ b/net/rds/ib_rdma.c @@ -36,6 +36,7 @@ #include "rds.h" #include "ib.h" #include "xlist.h" +#include "rds_single_path.h" struct workqueue_struct *rds_ib_fmr_wq; diff --git a/net/rds/ib_recv.c b/net/rds/ib_recv.c index bdfbb8781c04..8b6b065433dd 100644 --- a/net/rds/ib_recv.c +++ b/net/rds/ib_recv.c @@ -38,6 +38,7 @@ #include "rds.h" #include "ib.h" +#include "rds_single_path.h" unsigned int rds_ib_srq_max_wr = RDS_IB_DEFAULT_SRQ_MAX_WR; unsigned int rds_ib_srq_hwm_refill = RDS_IB_DEFAULT_SRQ_HWM_REFILL; @@ -710,7 +711,8 @@ release_out: if (rds_conn_up(conn) && (must_wake || (can_wait && ring_low) || rds_ib_ring_empty(&ic->i_recv_ring))) { - queue_delayed_work(conn->c_wq, &conn->c_recv_w, 1); + queue_delayed_work(conn->c_path[0].cp_wq, + &conn->c_path[0].cp_recv_w, 1); } if (can_wait) cond_resched(); @@ -1498,8 +1500,9 @@ static void rds_ib_srq_clear_ring(struct rds_ib_device *rds_ibdev) } -int rds_ib_recv(struct rds_connection *conn) +int rds_ib_recv_path(struct rds_conn_path *cp) { + struct rds_connection *conn = cp->cp_conn; struct rds_ib_connection *ic = conn->c_transport_data; int ret = 0; diff --git a/net/rds/ib_send.c b/net/rds/ib_send.c index 998e08a0af5c..15e0801ecf53 100644 --- a/net/rds/ib_send.c +++ b/net/rds/ib_send.c @@ -38,6 +38,7 @@ #include "rds.h" #include "ib.h" #include "tcp.h" +#include "rds_single_path.h" static char *rds_ib_wc_status_strings[] = { #define RDS_IB_WC_STATUS_STR(foo) \ @@ -467,7 +468,8 @@ void rds_ib_send_add_credits(struct rds_connection *conn, unsigned int credits) atomic_add(IB_SET_SEND_CREDITS(credits), &ic->i_credits); if (test_and_clear_bit(RDS_LL_SEND_FULL, &conn->c_flags)) - queue_delayed_work(conn->c_wq, &conn->c_send_w, 0); + queue_delayed_work(conn->c_path[0].cp_wq, + &conn->c_path[0].cp_send_w, 0); WARN_ON(IB_GET_SEND_CREDITS(credits) >= 16384); @@ -1084,8 +1086,9 @@ out: return ret; } -void rds_ib_xmit_complete(struct rds_connection *conn) +void rds_ib_xmit_path_complete(struct rds_conn_path *cp) { + struct rds_connection *conn = cp->cp_conn; struct rds_ib_connection *ic = conn->c_transport_data; /* We may have a pending ACK or window update we were unable diff --git a/net/rds/loop.c b/net/rds/loop.c index be0ff94071f9..49fbd3f9eb3c 100644 --- a/net/rds/loop.c +++ b/net/rds/loop.c @@ -35,6 +35,7 @@ #include "rds.h" #include "loop.h" +#include "rds_single_path.h" static DEFINE_SPINLOCK(loop_conns_lock); static LIST_HEAD(loop_conns); @@ -94,7 +95,7 @@ static void rds_loop_inc_free(struct rds_incoming *inc) } /* we need to at least give the thread something to succeed */ -static int rds_loop_recv(struct rds_connection *conn) +static int rds_loop_recv_path(struct rds_conn_path *cp) { return 0; } @@ -142,13 +143,13 @@ static void rds_loop_conn_free(void *arg) kfree(lc); } -static int rds_loop_conn_connect(struct rds_connection *conn) +static int rds_loop_conn_path_connect(struct rds_conn_path *cp) { - rds_connect_complete(conn); + rds_connect_complete(cp->cp_conn); return 0; } -static void rds_loop_conn_shutdown(struct rds_connection *conn) +static void rds_loop_conn_path_shutdown(struct rds_conn_path *cp) { } @@ -197,11 +198,11 @@ void rds_loop_exit(void) */ struct rds_transport rds_loop_transport = { .xmit = rds_loop_xmit, - .recv = rds_loop_recv, + .recv_path = rds_loop_recv_path, .conn_alloc = rds_loop_conn_alloc, .conn_free = rds_loop_conn_free, - .conn_connect = rds_loop_conn_connect, - .conn_shutdown = rds_loop_conn_shutdown, + .conn_path_connect = rds_loop_conn_path_connect, + .conn_path_shutdown = rds_loop_conn_path_shutdown, .inc_copy_to_user = rds_message_inc_copy_to_user, .inc_to_skb = rds_message_inc_to_skb, .skb_local = rds_message_skb_local, diff --git a/net/rds/message.c b/net/rds/message.c index 70697a3942c9..991e87098602 100644 --- a/net/rds/message.c +++ b/net/rds/message.c @@ -40,6 +40,8 @@ static unsigned int rds_exthdr_size[__RDS_EXTHDR_MAX] = { [RDS_EXTHDR_RDMA] = sizeof(struct rds_ext_header_rdma), [RDS_EXTHDR_RDMA_DEST] = sizeof(struct rds_ext_header_rdma_dest), [RDS_EXTHDR_RDMA_BYTES] = sizeof(struct rds_ext_header_rdma_bytes), +[RDS_EXTHDR_NPATHS] = sizeof(u16), +[RDS_EXTHDR_GEN_NUM] = sizeof(u32), }; diff --git a/net/rds/rdma_transport.c b/net/rds/rdma_transport.c index 16aa421a7c8c..8b472640f82d 100644 --- a/net/rds/rdma_transport.c +++ b/net/rds/rdma_transport.c @@ -37,6 +37,7 @@ #include "ib.h" #include "net/arp.h" #include "tcp.h" +#include "rds_single_path.h" #include #include @@ -271,7 +272,7 @@ int rds_rdma_cm_event_handler(struct rdma_cm_id *cm_id, rds_conn_drop(conn, DR_IB_CONSUMER_DEFINED_REJ); } else { - queue_delayed_work(conn->c_wq, + queue_delayed_work(conn->c_path[0].cp_wq, &conn->c_reject_w, msecs_to_jiffies(10)); } @@ -306,8 +307,9 @@ int rds_rdma_cm_event_handler(struct rdma_cm_id *cm_id, NIPQUAD(conn->c_laddr), NIPQUAD(conn->c_faddr), conn->c_tos); if (!rds_conn_self_loopback_passive(conn)) { - queue_delayed_work(conn->c_wq, &conn->c_reconn_w, - msecs_to_jiffies(conn->c_reconnect_retry)); + queue_delayed_work(conn->c_path[0].cp_wq, + &conn->c_path[0].cp_reconn_w, + msecs_to_jiffies(conn->c_path[0].cp_reconnect_retry)); rds_conn_drop(conn, DR_IB_ADDR_CHANGE); } } diff --git a/net/rds/rds.h b/net/rds/rds.h index abfa291c0296..511f1f583d08 100644 --- a/net/rds/rds.h +++ b/net/rds/rds.h @@ -143,6 +143,11 @@ enum { /* Bits for c_reconn_flags */ #define RDS_RECONNECT_TIMEDOUT 0 +/* Max number of multipaths per RDS connection. Must be a power of 2 */ +#define RDS_MPATH_WORKERS 8 +#define RDS_MPATH_HASH(rs, n) (jhash_1word((rs)->rs_bound_port, \ + (rs)->rs_hash_initval) & ((n) - 1)) + enum rds_conn_drop_src { /* rds-core */ DR_DEFAULT, @@ -218,95 +223,115 @@ enum rds_conn_drop_src { DR_TCP_SEND_FAIL, }; +/* Per mpath connection state */ +struct rds_conn_path { + struct rds_connection *cp_conn; + struct rds_message *cp_xmit_rm; + unsigned long cp_xmit_sg; + unsigned int cp_xmit_hdr_off; + unsigned int cp_xmit_data_off; + unsigned int cp_xmit_atomic_sent; + unsigned int cp_xmit_rdma_sent; + unsigned int cp_xmit_data_sent; + + spinlock_t cp_lock; /* protect msg queues */ + u64 cp_next_tx_seq; + struct list_head cp_send_queue; + struct list_head cp_retrans; + + u64 cp_next_rx_seq; + + void *cp_transport_data; + + struct workqueue_struct *cp_wq; + atomic_t cp_state; + unsigned long cp_send_gen; + unsigned long cp_flags; + unsigned long cp_reconnect_jiffies; + struct delayed_work cp_send_w; + struct delayed_work cp_recv_w; + struct delayed_work cp_conn_w; + struct delayed_work cp_reject_w; + struct delayed_work cp_hb_w; + struct delayed_work cp_reconn_w; + struct work_struct cp_down_w; + struct mutex cp_cm_lock; /* protect cp_state & cm */ + wait_queue_head_t cp_waitq; + + unsigned int cp_unacked_packets; + unsigned int cp_unacked_bytes; + unsigned int cp_outgoing:1, + cp_pad_to_32:31; + unsigned int cp_index; + + /* when was this connection started */ + unsigned long cp_connection_start; + + /* Re-connect stall diagnostics */ + unsigned long cp_reconn_flags; + unsigned long cp_reconnect_retry; + unsigned int cp_reconnect_retry_count; + unsigned long cp_reconnect_start; + unsigned int cp_reconnect_drops; + int cp_reconnect_warn; + int cp_reconnect_err; + int cp_to_index; + + unsigned int cp_reconnect; + + unsigned int cp_pending_flush; + + unsigned long cp_hb_start; + + struct rds_connection *cp_base_conn; + + unsigned int cp_route_to_base; + + unsigned int cp_rdsinfo_pending; + + unsigned int cp_reconnect_racing; + unsigned int cp_route_resolved; + + enum rds_conn_drop_src cp_drop_source; + + unsigned char cp_acl_init; + unsigned char cp_acl_en; +}; + struct rds_connection { struct hlist_node c_hash_node; __be32 c_laddr; __be32 c_faddr; unsigned int c_loopback:1, - c_outgoing:1, + c_ping_triggered:1, c_pad_to_32:30; + int c_npaths; struct rds_connection *c_passive; + struct rds_transport *c_trans; struct rds_cong_map *c_lcong; struct rds_cong_map *c_fcong; - struct rds_message *c_xmit_rm; - unsigned long c_xmit_sg; - unsigned int c_xmit_hdr_off; - unsigned int c_xmit_data_off; - unsigned int c_xmit_atomic_sent; - unsigned int c_xmit_rdma_sent; - unsigned int c_xmit_data_sent; - - spinlock_t c_lock; /* protect msg queues */ - u64 c_next_tx_seq; - struct list_head c_send_queue; - struct list_head c_retrans; - - u64 c_next_rx_seq; - - struct rds_transport *c_trans; - void *c_transport_data; - - struct workqueue_struct *c_wq; - atomic_t c_state; - unsigned long c_send_gen; - unsigned long c_flags; - unsigned long c_reconnect_jiffies; - struct delayed_work c_send_w; - struct delayed_work c_recv_w; - struct delayed_work c_conn_w; - struct delayed_work c_reject_w; - struct delayed_work c_hb_w; - struct delayed_work c_reconn_w; - struct work_struct c_down_w; - struct mutex c_cm_lock; /* protect conn state & cm */ - wait_queue_head_t c_waitq; - struct list_head c_map_item; unsigned long c_map_queued; - unsigned long c_connection_start; /* when was this connection started */ - - unsigned int c_unacked_packets; - unsigned int c_unacked_bytes; /* Protocol version */ unsigned int c_proposed_version; unsigned int c_version; possible_net_t c_net; - /* Re-connect stall diagnostics */ - unsigned long c_reconn_flags; - unsigned long c_reconnect_retry; - unsigned int c_reconnect_retry_count; - unsigned long c_reconnect_start; - unsigned int c_reconnect_drops; - int c_reconnect_warn; - int c_reconnect_err; - int c_to_index; - - unsigned int c_reconnect; - /* Qos support */ u8 c_tos; - unsigned int c_pending_flush; + struct rds_conn_path c_path[RDS_MPATH_WORKERS]; + wait_queue_head_t c_hs_waitq; /* handshake waitq */ - unsigned long c_hb_start; - struct rds_connection *c_base_conn; - - unsigned int c_route_to_base; - - unsigned int c_rdsinfo_pending; - - unsigned int c_route_resolved; - - enum rds_conn_drop_src c_drop_source; + /* used by RDS_CONN_RESET */ struct list_head c_laddr_node; - unsigned char c_acl_init; - unsigned char c_acl_en; + u32 c_my_gen_num; + u32 c_peer_gen_num; }; static inline @@ -329,6 +354,18 @@ void rds_conn_net_set(struct rds_connection *conn, struct net *net) #define RDS_FLAG_EXTHDR_EXTENSION 0x20 #define RDS_MAX_ADV_CREDIT 127 +/* RDS_FLAG_PROBE_PORT is the reserved sport used for sending a ping + * probe to exchange control information before establishing a connection. + * Currently the control information that is exchanged is the number of + * supported paths. If the peer is a legacy (older kernel revision) peer, + * it would return a pong message without additional control information + * that would then alert the sender that the peer was an older rev. + */ +#define RDS_FLAG_PROBE_PORT 1 +#define RDS_HS_PROBE(sport, dport) \ + ((sport == RDS_FLAG_PROBE_PORT && dport == 0) || \ + (sport == 0 && dport == RDS_FLAG_PROBE_PORT)) + /* * Maximum space available for extension headers. */ @@ -401,6 +438,9 @@ struct rds_ext_header_rdma_bytes { u8 h_rflags; /* direction of RDMA, write or read */ }; +#define RDS_EXTHDR_NPATHS 5 +#define RDS_EXTHDR_GEN_NUM 6 + #define __RDS_EXTHDR_MAX 16 /* for now */ #define RDS_RX_MAX_TRACES (RDS_MSG_RX_DGRAM_TRACE_MAX + 1) #define RDS_MSG_RX_HDR 0 @@ -412,6 +452,7 @@ struct rds_incoming { atomic_t i_refcount; struct list_head i_item; struct rds_connection *i_conn; + struct rds_conn_path *i_conn_path; struct rds_header i_hdr; unsigned long i_rx_jiffies; __be32 i_saddr; @@ -628,21 +669,22 @@ struct rds_transport { char t_name[TRANSNAMSIZ]; struct list_head t_item; struct module *t_owner; - unsigned int t_prefer_loopback:1; + unsigned int t_prefer_loopback:1, + t_mp_capable:1; unsigned int t_type; int (*laddr_check)(struct net *net, __be32 addr); int (*conn_alloc)(struct rds_connection *conn, gfp_t gfp); void (*conn_free)(void *data); - int (*conn_connect)(struct rds_connection *conn); - void (*conn_shutdown)(struct rds_connection *conn); - void (*xmit_prepare)(struct rds_connection *conn); - void (*xmit_complete)(struct rds_connection *conn); + int (*conn_path_connect)(struct rds_conn_path *cp); + void (*conn_path_shutdown)(struct rds_conn_path *cp); + void (*xmit_path_prepare)(struct rds_conn_path *cp); + void (*xmit_path_complete)(struct rds_conn_path *cp); int (*xmit)(struct rds_connection *conn, struct rds_message *rm, unsigned int hdr_off, unsigned int sg, unsigned int off); int (*xmit_rdma)(struct rds_connection *conn, struct rm_rdma_op *op); int (*xmit_atomic)(struct rds_connection *conn, struct rm_atomic_op *op); - int (*recv)(struct rds_connection *conn); + int (*recv_path)(struct rds_conn_path *cp); int (*inc_copy_to_user)(struct rds_incoming *inc, struct iov_iter *to); void (*inc_free)(struct rds_incoming *inc); @@ -739,6 +781,8 @@ struct rds_sock { u8 rs_rx_trace[RDS_MSG_RX_DGRAM_TRACE_MAX]; bool rs_large_page; + + u32 rs_hash_initval; }; static inline struct rds_sock *rds_sk_to_rs(const struct sock *sk) @@ -843,6 +887,7 @@ void rds_cong_exit(void); struct rds_message *rds_cong_update_alloc(struct rds_connection *conn); /* conn.c */ +extern u32 rds_gen_num; int rds_conn_init(void); void rds_conn_exit(void); struct rds_connection *rds_conn_create(struct net *net, @@ -856,12 +901,15 @@ struct rds_connection *rds_conn_create_outgoing(struct net *net, struct rds_connection *rds_conn_find(struct net *net, __be32 laddr, __be32 faddr, struct rds_transport *trans, u8 tos); -void rds_conn_shutdown(struct rds_connection *conn, int restart); +void rds_conn_shutdown(struct rds_conn_path *cp, int restart); void rds_conn_destroy(struct rds_connection *conn, int shutdown); void rds_conn_reset(struct rds_connection *conn); void rds_conn_drop(struct rds_connection *conn, int reason); -void rds_conn_laddr_list(__be32 laddr, struct list_head *laddr_conns); +void rds_conn_path_drop(struct rds_conn_path *cp, int reason); +void rds_conn_laddr_list(struct net *net, + __be32 laddr, struct list_head *laddr_conns); void rds_conn_connect_if_down(struct rds_connection *conn); +void rds_conn_path_connect_if_down(struct rds_conn_path *conn); void rds_for_each_conn_info(struct socket *sock, unsigned int len, struct rds_info_iterator *iter, struct rds_info_lengths *lens, @@ -869,28 +917,56 @@ void rds_for_each_conn_info(struct socket *sock, unsigned int len, size_t item_len); char *conn_drop_reason_str(enum rds_conn_drop_src reason); +static inline int +rds_conn_path_transition(struct rds_conn_path *cp, int old, int new) +{ + return atomic_cmpxchg(&cp->cp_state, old, new) == old; +} + static inline int rds_conn_transition(struct rds_connection *conn, int old, int new) { - return atomic_cmpxchg(&conn->c_state, old, new) == old; + WARN_ON(conn->c_trans->t_mp_capable); + return rds_conn_path_transition(&conn->c_path[0], old, new); +} + +static inline int +rds_conn_path_state(struct rds_conn_path *cp) +{ + return atomic_read(&cp->cp_state); } static inline int rds_conn_state(struct rds_connection *conn) { - return atomic_read(&conn->c_state); + WARN_ON(conn->c_trans->t_mp_capable); + return rds_conn_path_state(&conn->c_path[0]); +} + +static inline int +rds_conn_path_up(struct rds_conn_path *cp) +{ + return atomic_read(&cp->cp_state) == RDS_CONN_UP; } static inline int rds_conn_up(struct rds_connection *conn) { - return atomic_read(&conn->c_state) == RDS_CONN_UP; + WARN_ON(conn->c_trans->t_mp_capable); + return rds_conn_path_up(&conn->c_path[0]); +} + +static inline int +rds_conn_path_connecting(struct rds_conn_path *cp) +{ + return atomic_read(&cp->cp_state) == RDS_CONN_CONNECTING; } static inline int rds_conn_connecting(struct rds_connection *conn) { - return atomic_read(&conn->c_state) == RDS_CONN_CONNECTING; + WARN_ON(conn->c_trans->t_mp_capable); + return rds_conn_path_connecting(&conn->c_path[0]); } static inline bool @@ -951,6 +1027,8 @@ void rds_page_exit(void); /* recv.c */ void rds_inc_init(struct rds_incoming *inc, struct rds_connection *conn, __be32 saddr); +void rds_inc_path_init(struct rds_incoming *inc, struct rds_conn_path *conn, + __be32 saddr); void rds_inc_addref(struct rds_incoming *inc); void rds_inc_put(struct rds_incoming *inc); void rds_recv_incoming(struct rds_connection *conn, __be32 saddr, __be32 daddr, @@ -966,15 +1044,17 @@ int rds_skb_local(struct sk_buff *skb); /* send.c */ int rds_sendmsg(struct socket *sock, struct msghdr *msg, size_t payload_len); -void rds_send_reset(struct rds_connection *conn); -int rds_send_xmit(struct rds_connection *conn); +void rds_send_path_reset(struct rds_conn_path *cp); +int rds_send_xmit(struct rds_conn_path *cp); struct sockaddr_in; void rds_send_drop_to(struct rds_sock *rs, struct sockaddr_in *dest); typedef int (*is_acked_func)(struct rds_message *rm, uint64_t ack); void rds_send_drop_acked(struct rds_connection *conn, u64 ack, is_acked_func is_acked); +void rds_send_path_drop_acked(struct rds_conn_path *cp, u64 ack, + is_acked_func is_acked); void rds_send_remove_from_sock(struct list_head *messages, int status); -int rds_send_pong(struct rds_connection *conn, __be16 dport); +int rds_send_pong(struct rds_conn_path *cp, __be16 dport); int rds_send_hb(struct rds_connection *conn, int response); struct rds_message *rds_send_get_message(struct rds_connection *, struct rm_rdma_op *); @@ -1054,7 +1134,7 @@ int rds_threads_init(void); void rds_threads_exit(void); extern struct workqueue_struct *rds_wq; extern struct workqueue_struct *rds_local_wq; -void rds_queue_reconnect(struct rds_connection *conn); +void rds_queue_reconnect(struct rds_conn_path *cp); void rds_connect_worker(struct work_struct *); void rds_shutdown_worker(struct work_struct *); void rds_send_worker(struct work_struct *); @@ -1062,7 +1142,7 @@ void rds_reject_worker(struct work_struct *); void rds_recv_worker(struct work_struct *); void rds_hb_worker(struct work_struct *); void rds_reconnect_timeout(struct work_struct *); -void rds_connect_path_complete(struct rds_connection *conn, int curr); +void rds_connect_path_complete(struct rds_conn_path *cp, int curr); void rds_connect_complete(struct rds_connection *conn); /* transport.c */ diff --git a/net/rds/rds_single_path.h b/net/rds/rds_single_path.h new file mode 100644 index 000000000000..2f06ee548eed --- /dev/null +++ b/net/rds/rds_single_path.h @@ -0,0 +1,42 @@ +#ifndef _RDS_RDS_SINGLE_H +#define _RDS_RDS_SINGLE_H + +#define c_xmit_rm c_path[0].cp_xmit_rm +#define c_xmit_sg c_path[0].cp_xmit_sg +#define c_xmit_hdr_off c_path[0].cp_xmit_hdr_off +#define c_xmit_data_off c_path[0].cp_xmit_data_off +#define c_xmit_atomic_sent c_path[0].cp_xmit_atomic_sent +#define c_xmit_rdma_sent c_path[0].cp_xmit_rdma_sent +#define c_xmit_data_sent c_path[0].cp_xmit_data_sent +#define c_lock c_path[0].cp_lock +#define c_next_tx_seq c_path[0].cp_next_tx_seq +#define c_send_queue c_path[0].cp_send_queue +#define c_retrans c_path[0].cp_retrans +#define c_next_rx_seq c_path[0].cp_next_rx_seq +#define c_transport_data c_path[0].cp_transport_data +#define c_state c_path[0].cp_state +#define c_send_gen c_path[0].cp_send_gen +#define c_flags c_path[0].cp_flags +#define c_reconnect_jiffies c_path[0].cp_reconnect_jiffies +#define c_send_w c_path[0].cp_send_w +#define c_recv_w c_path[0].cp_recv_w +#define c_conn_w c_path[0].cp_conn_w +#define c_down_w c_path[0].cp_down_w +#define c_cm_lock c_path[0].cp_cm_lock +#define c_waitq c_path[0].cp_waitq +#define c_unacked_packets c_path[0].cp_unacked_packets +#define c_unacked_bytes c_path[0].cp_unacked_bytes +#define c_route c_path[0].cp_route_resolved +#define c_drop_source c_path[0].cp_drop_source +#define c_acl_init c_path[0].cp_acl_init +#define c_connection_start c_path[0].cp_connection_start +#define c_reconnect_racing c_path[0].cp_reconnect_racing +#define c_reconnect c_path[0].cp_reconnect +#define c_to_index c_path[0].cp_to_index +#define c_base_conn c_path[0].cp_base_conn +#define c_acl_en c_path[0].cp_acl_en +#define c_reconnect_err c_path[0].cp_reconnect_err +#define c_route_resolved c_path[0].cp_route_resolved +#define c_reject_w c_path[0].cp_reject_w + +#endif /* _RDS_RDS_SINGLE_H */ diff --git a/net/rds/recv.c b/net/rds/recv.c index 67509dea2735..e4ba384a1575 100644 --- a/net/rds/recv.c +++ b/net/rds/recv.c @@ -51,11 +51,11 @@ rds_recv_route(struct rds_connection *conn, struct rds_incoming *inc, gfp_t gfp); static void -rds_recv_forward(struct rds_connection *conn, struct rds_incoming *inc, +rds_recv_forward(struct rds_conn_path *cp, struct rds_incoming *inc, gfp_t gfp); static void -rds_recv_local(struct rds_connection *conn, __be32 saddr, __be32 daddr, +rds_recv_local(struct rds_conn_path *cp, __be32 saddr, __be32 daddr, struct rds_incoming *inc, gfp_t gfp, struct rds_sock *rs); static int @@ -85,6 +85,27 @@ void rds_inc_init(struct rds_incoming *inc, struct rds_connection *conn, } EXPORT_SYMBOL_GPL(rds_inc_init); +void rds_inc_path_init(struct rds_incoming *inc, struct rds_conn_path *cp, + __be32 saddr) +{ + int i; + + atomic_set(&inc->i_refcount, 1); + INIT_LIST_HEAD(&inc->i_item); + inc->i_conn = cp->cp_conn; + inc->i_conn_path = cp; + inc->i_saddr = saddr; + inc->i_rdma_cookie = 0; + inc->i_oconn = NULL; + inc->i_skb = NULL; + inc->i_rx_tstamp.tv_sec = 0; + inc->i_rx_tstamp.tv_usec = 0; + + for (i = 0; i < RDS_RX_MAX_TRACES; i++) + inc->i_rx_lat_trace[i] = 0; +} +EXPORT_SYMBOL_GPL(rds_inc_path_init); + void rds_inc_addref(struct rds_incoming *inc) { rdsdebug("addref inc %p ref %d\n", inc, atomic_read(&inc->i_refcount)); @@ -189,6 +210,109 @@ static void rds_recv_incoming_exthdrs(struct rds_incoming *inc, struct rds_sock } } +static void rds_conn_peer_gen_update(struct rds_connection *conn, + u32 peer_gen_num) +{ + int i; + struct rds_message *rm, *tmp; + unsigned long flags; + int flushed; + + WARN_ON(conn->c_trans->t_type != RDS_TRANS_TCP); + if (peer_gen_num != 0) { + if (conn->c_peer_gen_num != 0 && + peer_gen_num != conn->c_peer_gen_num) { + for (i = 0; i < RDS_MPATH_WORKERS; i++) { + struct rds_conn_path *cp; + + cp = &conn->c_path[i]; + spin_lock_irqsave(&cp->cp_lock, flags); + cp->cp_next_tx_seq = 1; + cp->cp_next_rx_seq = 0; + flushed = 0; + list_for_each_entry_safe(rm, tmp, + &cp->cp_retrans, + m_conn_item) { + set_bit(RDS_MSG_FLUSH, &rm->m_flags); + flushed++; + } + spin_unlock_irqrestore(&cp->cp_lock, flags); + pr_info("%s:%d flushed %d\n", + __FILE__, __LINE__, flushed); + } + } + conn->c_peer_gen_num = peer_gen_num; + pr_info("peer gen num %x\n", peer_gen_num); + } +} + +static void rds_recv_hs_exthdrs(struct rds_header *hdr, + struct rds_connection *conn) +{ + unsigned int pos = 0, type, len; + union { + struct rds_ext_header_version version; + u16 rds_npaths; + u32 rds_gen_num; + } buffer; + u32 new_peer_gen_num = 0; + + while (1) { + len = sizeof(buffer); + type = rds_message_next_extension(hdr, &pos, &buffer, &len); + if (type == RDS_EXTHDR_NONE) + break; + /* Process extension header here */ + switch (type) { + case RDS_EXTHDR_NPATHS: + conn->c_npaths = min_t(int, RDS_MPATH_WORKERS, + buffer.rds_npaths); + break; + case RDS_EXTHDR_GEN_NUM: + new_peer_gen_num = buffer.rds_gen_num; + break; + default: + pr_warn_ratelimited("ignoring unknown exthdr type " + "0x%x\n", type); + } + } + /* if RDS_EXTHDR_NPATHS was not found, default to a single-path */ + conn->c_npaths = max_t(int, conn->c_npaths, 1); + rds_conn_peer_gen_update(conn, new_peer_gen_num); +} + +/* rds_start_mprds() will synchronously start multiple paths when appropriate. + * The scheme is based on the following rules: + * + * 1. rds_sendmsg on first connect attempt sends the probe ping, with the + * sender's npaths (s_npaths) + * 2. rcvr of probe-ping knows the mprds_paths = min(s_npaths, r_npaths). It + * sends back a probe-pong with r_npaths. After that, if rcvr is the + * smaller ip addr, it starts rds_conn_path_connect_if_down on all + * mprds_paths. + * 3. sender gets woken up, and can move to rds_conn_path_connect_if_down. + * If it is the smaller ipaddr, rds_conn_path_connect_if_down can be + * called after reception of the probe-pong on all mprds_paths. + * Otherwise (sender of probe-ping is not the smaller ip addr): just call + * rds_conn_path_connect_if_down on the hashed path. (see rule 4) + * 4. when cp_index > 0, rds_connect_worker must only trigger + * a connection if laddr < faddr. + * 5. sender may end up queuing the packet on the cp. will get sent out later. + * when connection is completed. + */ +static void rds_start_mprds(struct rds_connection *conn) +{ + int i; + struct rds_conn_path *cp; + + if (conn->c_npaths > 1 && conn->c_laddr < conn->c_faddr) { + for (i = 1; i < conn->c_npaths; i++) { + cp = &conn->c_path[i]; + rds_conn_path_connect_if_down(cp); + } + } +} + /* * The transport must make sure that this is serialized against other * rx and conn reset on this specific conn. @@ -213,6 +337,7 @@ void rds_recv_incoming(struct rds_connection *conn, __be32 saddr, __be32 daddr, struct sock *sk; struct rds_nf_hdr *dst, *org; int ret; + struct rds_conn_path *cp; rdsdebug(KERN_ALERT "incoming: conn %p, inc %p, %u.%u.%u.%u : %d -> %u.%u.%u.%u : %d\n", conn, inc, NIPQUAD(saddr), inc->i_hdr.h_sport, NIPQUAD(daddr), inc->i_hdr.h_dport); @@ -224,6 +349,10 @@ void rds_recv_incoming(struct rds_connection *conn, __be32 saddr, __be32 daddr, /* save off the original connection against which the request arrived */ inc->i_oconn = conn; inc->i_skb = NULL; + if (conn->c_trans->t_mp_capable) + cp = inc->i_conn_path; + else + cp = &conn->c_path[0]; /* lets find a socket to which this request belongs */ rs = rds_find_bound(daddr, inc->i_hdr.h_dport); @@ -231,7 +360,7 @@ void rds_recv_incoming(struct rds_connection *conn, __be32 saddr, __be32 daddr, /* pass it on locally if there is no socket bound, or if netfilter is * disabled for this socket */ if (NULL == rs || !rs->rs_netfilter_enabled) { - rds_recv_local(conn, saddr, daddr, inc, gfp, rs); + rds_recv_local(cp, saddr, daddr, inc, gfp, rs); /* drop the reference if we had taken one */ if (NULL != rs) @@ -250,7 +379,7 @@ void rds_recv_incoming(struct rds_connection *conn, __be32 saddr, __be32 daddr, rds_rtd(RDS_RTD_ERR, "failure to allocate space for inc %p, %u.%u.%u.%u -> %u.%d.%u.%u tos %d\n", inc, NIPQUAD(saddr), NIPQUAD(daddr), conn->c_tos); - rds_recv_local(conn, saddr, daddr, inc, gfp, rs); + rds_recv_local(cp, saddr, daddr, inc, gfp, rs); /* drop the reference if we had taken one */ if (NULL != rs) rds_sock_put(rs); @@ -315,7 +444,7 @@ void rds_recv_incoming(struct rds_connection *conn, __be32 saddr, __be32 daddr, /* check the original header and if changed do the needful */ if (dst->saddr == org->saddr && dst->daddr == org->daddr && conn->c_trans->skb_local(skb)) { - rds_recv_local(conn, saddr, daddr, inc, gfp, NULL); + rds_recv_local(cp, saddr, daddr, inc, gfp, NULL); } /* the send both case does both a local recv and a reroute */ else if (dst->flags & RDS_NF_HDR_FLAG_BOTH) { @@ -324,7 +453,7 @@ void rds_recv_incoming(struct rds_connection *conn, __be32 saddr, __be32 daddr, rds_inc_addref(inc); /* send it up the stream locally */ - rds_recv_local(conn, saddr, daddr, inc, gfp, NULL); + rds_recv_local(cp, saddr, daddr, inc, gfp, NULL); /* and also reroute the request */ rds_recv_route(conn, inc, gfp); @@ -389,22 +518,26 @@ rds_recv_route(struct rds_connection *conn, struct rds_incoming *inc, /* this is a request for our local node, but potentially a different source * either way we process it locally */ else if (conn->c_trans->skb_local(inc->i_skb)) { - rds_recv_local(nconn, dst->saddr, dst->daddr, inc, gfp, NULL); + WARN_ON(nconn->c_trans->t_mp_capable); + rds_recv_local(&nconn->c_path[0], + dst->saddr, dst->daddr, inc, gfp, NULL); } /* looks like this request is going out to another node */ else { - rds_recv_forward(nconn, inc, gfp); + WARN_ON(nconn->c_trans->t_mp_capable); + rds_recv_forward(&nconn->c_path[0], inc, gfp); } } static void -rds_recv_forward(struct rds_connection *conn, struct rds_incoming *inc, +rds_recv_forward(struct rds_conn_path *cp, struct rds_incoming *inc, gfp_t gfp) { int len, ret; struct rds_nf_hdr *dst, *org; struct rds_sock *rs; struct sock *sk = NULL; + struct rds_connection *conn = cp->cp_conn; /* initialize some bits */ rs = NULL; @@ -454,24 +587,27 @@ out: NF_HOOK(PF_RDS_HOOK, NF_RDS_FORWARD_ERROR, sk, inc->i_skb, NULL, NULL, rds_recv_ok); /* then hand the request off to normal local processing on the old connection */ - rds_recv_local(inc->i_oconn, org->saddr, org->daddr, inc, gfp, NULL); + rds_recv_local(&inc->i_oconn->c_path[0], org->saddr, org->daddr, + inc, gfp, NULL); + } static void -rds_recv_local(struct rds_connection *conn, __be32 saddr, __be32 daddr, +rds_recv_local(struct rds_conn_path *cp, __be32 saddr, __be32 daddr, struct rds_incoming *inc, gfp_t gfp, struct rds_sock *rs) { struct sock *sk; unsigned long flags; u64 inc_hdr_h_sequence = 0; bool rs_local = (!rs); + struct rds_connection *conn = cp->cp_conn; inc->i_conn = conn; inc->i_rx_jiffies = jiffies; rdsdebug("conn %p next %llu inc %p seq %llu len %u sport %u dport %u " "flags 0x%x rx_jiffies %lu\n", conn, - (unsigned long long)conn->c_next_rx_seq, + (unsigned long long)cp->cp_next_rx_seq, inc, (unsigned long long)be64_to_cpu(inc->i_hdr.h_sequence), be32_to_cpu(inc->i_hdr.h_len), @@ -502,32 +638,52 @@ rds_recv_local(struct rds_connection *conn, __be32 saddr, __be32 daddr, */ inc_hdr_h_sequence = be64_to_cpu(inc->i_hdr.h_sequence); - if (inc_hdr_h_sequence != conn->c_next_rx_seq) { + if (inc_hdr_h_sequence != cp->cp_next_rx_seq) { rds_rtd(RDS_RTD_RCV, "conn %p <%u.%u.%u.%u,%u.%u.%u.%u,%d> expect seq# %llu, recved seq# %llu, retrans bit %d\n", conn, NIPQUAD(conn->c_laddr), NIPQUAD(conn->c_faddr), - conn->c_tos, conn->c_next_rx_seq, inc_hdr_h_sequence, + conn->c_tos, cp->cp_next_rx_seq, inc_hdr_h_sequence, inc->i_hdr.h_flags & RDS_FLAG_RETRANSMITTED); } - if (inc_hdr_h_sequence < conn->c_next_rx_seq + if (inc_hdr_h_sequence < cp->cp_next_rx_seq && (inc->i_hdr.h_flags & RDS_FLAG_RETRANSMITTED)) { rds_stats_inc(s_recv_drop_old_seq); goto out; } - conn->c_next_rx_seq = inc_hdr_h_sequence + 1; + cp->cp_next_rx_seq = inc_hdr_h_sequence + 1; if (rds_sysctl_ping_enable && inc->i_hdr.h_dport == 0) { + if (inc->i_hdr.h_sport == 0) { + rdsdebug("ignore ping with 0 sport from 0x%x\n", saddr); + goto out; + } if (inc->i_hdr.h_flags & RDS_FLAG_HB_PING) { rds_send_hb(conn, 1); } else if (inc->i_hdr.h_flags & RDS_FLAG_HB_PONG) { - conn->c_hb_start = 0; + cp->cp_hb_start = 0; } else { rds_stats_inc(s_recv_ping); - rds_send_pong(conn, inc->i_hdr.h_sport); + rds_send_pong(cp, inc->i_hdr.h_sport); + /* if this is a handshake ping, + * start multipath if necessary + */ + if (RDS_HS_PROBE(inc->i_hdr.h_sport, + inc->i_hdr.h_dport)) { + rds_recv_hs_exthdrs(&inc->i_hdr, cp->cp_conn); + rds_start_mprds(cp->cp_conn); + } } goto out; } + if (inc->i_hdr.h_dport == RDS_FLAG_PROBE_PORT && + inc->i_hdr.h_sport == 0) { + rds_recv_hs_exthdrs(&inc->i_hdr, cp->cp_conn); + /* if this is a handshake pong, start multipath if necessary */ + rds_start_mprds(cp->cp_conn); + wake_up(&cp->cp_conn->c_hs_waitq); + goto out; + } if (!rs) rs = rds_find_bound(daddr, inc->i_hdr.h_dport); @@ -681,12 +837,15 @@ int rds_notify_queue_get(struct rds_sock *rs, struct msghdr *msghdr) /* If this is the last failed op, re-open the connection for traffic */ if (notifier->n_conn) { - spin_lock_irqsave(¬ifier->n_conn->c_lock, flags); - if (notifier->n_conn->c_pending_flush) - notifier->n_conn->c_pending_flush--; + struct rds_conn_path *ncp; + + ncp = ¬ifier->n_conn->c_path[0]; + spin_lock_irqsave(&ncp->cp_lock, flags); + if (ncp->cp_pending_flush) + ncp->cp_pending_flush--; else printk(KERN_ERR "rds_notify_queue_get: OOPS!\n"); - spin_unlock_irqrestore(¬ifier->n_conn->c_lock, flags); + spin_unlock_irqrestore(&ncp->cp_lock, flags); } list_del_init(¬ifier->n_list); diff --git a/net/rds/send.c b/net/rds/send.c index dcc17db88272..3491d7ef24a1 100644 --- a/net/rds/send.c +++ b/net/rds/send.c @@ -60,15 +60,15 @@ MODULE_PARM_DESC(rds_async_send_enabled, "Set to enable Async Send"); * Reset the send state. Callers must ensure that this doesn't race with * rds_send_xmit(). */ -void rds_send_reset(struct rds_connection *conn) +void rds_send_path_reset(struct rds_conn_path *cp) { struct rds_message *rm, *tmp; unsigned long flags; int failed_op = 0; - if (conn->c_xmit_rm) { - rm = conn->c_xmit_rm; - conn->c_xmit_rm = NULL; + if (cp->cp_xmit_rm) { + rm = cp->cp_xmit_rm; + cp->cp_xmit_rm = NULL; /* Tell the user the RDMA op is no longer mapped by the * transport. This isn't entirely true (it's flushed out * independently) but as the connection is down, there's @@ -79,21 +79,21 @@ void rds_send_reset(struct rds_connection *conn) } } - conn->c_xmit_sg = 0; - conn->c_xmit_hdr_off = 0; - conn->c_xmit_data_off = 0; - conn->c_xmit_atomic_sent = 0; - conn->c_xmit_rdma_sent = 0; - conn->c_xmit_data_sent = 0; + cp->cp_xmit_sg = 0; + cp->cp_xmit_hdr_off = 0; + cp->cp_xmit_data_off = 0; + cp->cp_xmit_atomic_sent = 0; + cp->cp_xmit_rdma_sent = 0; + cp->cp_xmit_data_sent = 0; - conn->c_map_queued = 0; + cp->cp_conn->c_map_queued = 0; - conn->c_unacked_packets = rds_sysctl_max_unacked_packets; - conn->c_unacked_bytes = rds_sysctl_max_unacked_bytes; + cp->cp_unacked_packets = rds_sysctl_max_unacked_packets; + cp->cp_unacked_bytes = rds_sysctl_max_unacked_bytes; /* Mark messages as retransmissions, and move them to the send q */ - spin_lock_irqsave(&conn->c_lock, flags); - list_for_each_entry_safe(rm, tmp, &conn->c_retrans, m_conn_item) { + spin_lock_irqsave(&cp->cp_lock, flags); + list_for_each_entry_safe(rm, tmp, &cp->cp_retrans, m_conn_item) { set_bit(RDS_MSG_ACK_REQUIRED, &rm->m_flags); set_bit(RDS_MSG_RETRANSMITTED, &rm->m_flags); @@ -107,18 +107,18 @@ void rds_send_reset(struct rds_connection *conn) (rm->data.op_active && rm->data.op_async))) failed_op = 1; } - list_splice_init(&conn->c_retrans, &conn->c_send_queue); + list_splice_init(&cp->cp_retrans, &cp->cp_send_queue); /* if there was a failed op, flush all async ops */ if (failed_op) { - list_for_each_entry_safe(rm, tmp, &conn->c_send_queue, - m_conn_item) { + list_for_each_entry_safe(rm, tmp, &cp->cp_send_queue, + m_conn_item) { if (rm->rdma.op_active) { if (rm->rdma.op_notifier) { struct rds_notifier *notifier; notifier = rm->rdma.op_notifier; - notifier->n_conn = conn; + notifier->n_conn = cp->cp_conn; if (test_bit(RDS_MSG_RETRANSMITTED, &rm->m_flags) && !notifier->n_status) { @@ -128,35 +128,36 @@ void rds_send_reset(struct rds_connection *conn) if (!test_bit(RDS_MSG_FLUSH, &rm->m_flags)) { - conn->c_pending_flush++; + cp->cp_pending_flush++; } } set_bit(RDS_MSG_FLUSH, &rm->m_flags); } if (rm->data.op_active && rm->data.op_async) { if (rm->data.op_notifier) { - rm->data.op_notifier->n_conn = conn; + rm->data.op_notifier->n_conn = + cp->cp_conn; if (!test_bit(RDS_MSG_FLUSH, &rm->m_flags)) { - conn->c_pending_flush++; + cp->cp_pending_flush++; } } set_bit(RDS_MSG_FLUSH, &rm->m_flags); } } } - spin_unlock_irqrestore(&conn->c_lock, flags); + spin_unlock_irqrestore(&cp->cp_lock, flags); } -EXPORT_SYMBOL_GPL(rds_send_reset); +EXPORT_SYMBOL_GPL(rds_send_path_reset); -static int acquire_in_xmit(struct rds_connection *conn) +static int acquire_in_xmit(struct rds_conn_path *cp) { - return test_and_set_bit(RDS_IN_XMIT, &conn->c_flags) == 0; + return test_and_set_bit(RDS_IN_XMIT, &cp->cp_flags) == 0; } -static void release_in_xmit(struct rds_connection *conn) +static void release_in_xmit(struct rds_conn_path *cp) { - clear_bit(RDS_IN_XMIT, &conn->c_flags); + clear_bit(RDS_IN_XMIT, &cp->cp_flags); smp_mb__after_atomic(); /* * We don't use wait_on_bit()/wake_up_bit() because our waking is in a @@ -164,8 +165,8 @@ static void release_in_xmit(struct rds_connection *conn) * the system-wide hashed waitqueue buckets in the fast path only to * almost never find waiters. */ - if (waitqueue_active(&conn->c_waitq)) - wake_up_all(&conn->c_waitq); + if (waitqueue_active(&cp->cp_waitq)) + wake_up_all(&cp->cp_waitq); } /* @@ -182,8 +183,9 @@ static void release_in_xmit(struct rds_connection *conn) * - small message latency is higher behind queued large messages * - large message latency isn't starved by intervening small sends */ -int rds_send_xmit(struct rds_connection *conn) +int rds_send_xmit(struct rds_conn_path *cp) { + struct rds_connection *conn = cp->cp_conn; struct rds_message *rm; unsigned long flags; unsigned int tmp; @@ -205,7 +207,7 @@ restart: * avoids blocking the caller and trading per-connection data between * caches per message. */ - if (!acquire_in_xmit(conn)) { + if (!acquire_in_xmit(cp)) { rds_stats_inc(s_send_lock_contention); ret = -ENOMEM; goto out; @@ -219,21 +221,21 @@ restart: * we don't need a lock because the counter is only incremented * while we have the in_xmit bit held. */ - conn->c_send_gen++; - send_gen = conn->c_send_gen; + cp->cp_send_gen++; + send_gen = cp->cp_send_gen; /* * rds_conn_shutdown() sets the conn state and then tests RDS_IN_XMIT, * we do the opposite to avoid races. */ - if (!rds_conn_up(conn)) { - release_in_xmit(conn); + if (!rds_conn_path_up(cp)) { + release_in_xmit(cp); ret = 0; goto out; } - if (conn->c_trans->xmit_prepare) - conn->c_trans->xmit_prepare(conn); + if (conn->c_trans->xmit_path_prepare) + conn->c_trans->xmit_path_prepare(cp); /* * spin trying to push headers and data down the connection until @@ -241,14 +243,14 @@ restart: */ while (1) { - rm = conn->c_xmit_rm; + rm = cp->cp_xmit_rm; if (!rm) { same_rm = 0; } else { same_rm++; if ((same_rm >= 4096) && printk_ratelimit()) { - printk(KERN_ERR "RDS: Stuck rm\n"); + /* printk(KERN_ERR "RDS: Stuck rm\n"); */ ret = -EAGAIN; break; } @@ -265,14 +267,16 @@ restart: break; } rm->data.op_active = 1; + rm->m_inc.i_conn_path = cp; + rm->m_inc.i_conn = cp->cp_conn; - conn->c_xmit_rm = rm; + cp->cp_xmit_rm = rm; } /* * If not already working on one, grab the next message. * - * c_xmit_rm holds a ref while we're sending this message down + * cp_xmit_rm holds a ref while we're sending this message down * the connction. We can use this ref while holding the * send_sem.. rds_send_reset() is serialized with it. */ @@ -289,10 +293,10 @@ restart: if (batch_count >= send_batch_count) goto over_batch; - spin_lock_irqsave(&conn->c_lock, flags); + spin_lock_irqsave(&cp->cp_lock, flags); - if (!list_empty(&conn->c_send_queue)) { - rm = list_entry(conn->c_send_queue.next, + if (!list_empty(&cp->cp_send_queue)) { + rm = list_entry(cp->cp_send_queue.next, struct rds_message, m_conn_item); rds_message_addref(rm); @@ -301,10 +305,11 @@ restart: * Move the message from the send queue to the retransmit * list right away. */ - list_move_tail(&rm->m_conn_item, &conn->c_retrans); + list_move_tail(&rm->m_conn_item, + &cp->cp_retrans); } - spin_unlock_irqrestore(&conn->c_lock, flags); + spin_unlock_irqrestore(&cp->cp_lock, flags); if (!rm) break; @@ -321,34 +326,36 @@ restart: (rm->rdma.op_active && test_bit(RDS_MSG_RETRANSMITTED, &rm->m_flags))) { - spin_lock_irqsave(&conn->c_lock, flags); + spin_lock_irqsave(&cp->cp_lock, flags); if (test_and_clear_bit(RDS_MSG_ON_CONN, &rm->m_flags)) list_move_tail(&rm->m_conn_item, &to_be_dropped); - spin_unlock_irqrestore(&conn->c_lock, flags); + spin_unlock_irqrestore(&cp->cp_lock, flags); continue; } /* Require an ACK every once in a while */ len = ntohl(rm->m_inc.i_hdr.h_len); - if (conn->c_unacked_packets == 0 - || conn->c_unacked_bytes < len) { + if (cp->cp_unacked_packets == 0 || + cp->cp_unacked_bytes < len) { __set_bit(RDS_MSG_ACK_REQUIRED, &rm->m_flags); - conn->c_unacked_packets = rds_sysctl_max_unacked_packets; - conn->c_unacked_bytes = rds_sysctl_max_unacked_bytes; + cp->cp_unacked_packets = + rds_sysctl_max_unacked_packets; + cp->cp_unacked_bytes = + rds_sysctl_max_unacked_bytes; rds_stats_inc(s_send_ack_required); } else { - conn->c_unacked_bytes -= len; - conn->c_unacked_packets--; + cp->cp_unacked_bytes -= len; + cp->cp_unacked_packets--; } - conn->c_xmit_rm = rm; + cp->cp_xmit_rm = rm; } /* The transport either sends the whole rdma or none of it */ - if (rm->rdma.op_active && !conn->c_xmit_rdma_sent) { + if (rm->rdma.op_active && !cp->cp_xmit_rdma_sent) { rm->m_final_op = &rm->rdma; /* The transport owns the mapped memory for now. * You can't unmap it while it's on the send queue */ @@ -359,11 +366,11 @@ restart: wake_up_interruptible(&rm->m_flush_wait); break; } - conn->c_xmit_rdma_sent = 1; + cp->cp_xmit_rdma_sent = 1; } - if (rm->atomic.op_active && !conn->c_xmit_atomic_sent) { + if (rm->atomic.op_active && !cp->cp_xmit_atomic_sent) { rm->m_final_op = &rm->atomic; /* The transport owns the mapped memory for now. * You can't unmap it while it's on the send queue */ @@ -374,7 +381,7 @@ restart: wake_up_interruptible(&rm->m_flush_wait); break; } - conn->c_xmit_atomic_sent = 1; + cp->cp_xmit_atomic_sent = 1; } @@ -400,41 +407,41 @@ restart: rm->data.op_active = 0; } - if (rm->data.op_active && !conn->c_xmit_data_sent) { + if (rm->data.op_active && !cp->cp_xmit_data_sent) { rm->m_final_op = &rm->data; ret = conn->c_trans->xmit(conn, rm, - conn->c_xmit_hdr_off, - conn->c_xmit_sg, - conn->c_xmit_data_off); + cp->cp_xmit_hdr_off, + cp->cp_xmit_sg, + cp->cp_xmit_data_off); if (ret <= 0) break; - if (conn->c_xmit_hdr_off < sizeof(struct rds_header)) { + if (cp->cp_xmit_hdr_off < sizeof(struct rds_header)) { tmp = min_t(int, ret, sizeof(struct rds_header) - - conn->c_xmit_hdr_off); - conn->c_xmit_hdr_off += tmp; + cp->cp_xmit_hdr_off); + cp->cp_xmit_hdr_off += tmp; ret -= tmp; } - sg = &rm->data.op_sg[conn->c_xmit_sg]; + sg = &rm->data.op_sg[cp->cp_xmit_sg]; while (ret) { tmp = min_t(int, ret, sg->length - - conn->c_xmit_data_off); - conn->c_xmit_data_off += tmp; + cp->cp_xmit_data_off); + cp->cp_xmit_data_off += tmp; ret -= tmp; - if (conn->c_xmit_data_off == sg->length) { - conn->c_xmit_data_off = 0; + if (cp->cp_xmit_data_off == sg->length) { + cp->cp_xmit_data_off = 0; sg++; - conn->c_xmit_sg++; - BUG_ON(ret != 0 && - conn->c_xmit_sg == rm->data.op_nents); + cp->cp_xmit_sg++; + BUG_ON(ret != 0 && cp->cp_xmit_sg == + rm->data.op_nents); } } - if (conn->c_xmit_hdr_off == sizeof(struct rds_header) && - (conn->c_xmit_sg == rm->data.op_nents)) - conn->c_xmit_data_sent = 1; + if (cp->cp_xmit_hdr_off == sizeof(struct rds_header) && + (cp->cp_xmit_sg == rm->data.op_nents)) + cp->cp_xmit_data_sent = 1; } /* @@ -442,23 +449,23 @@ restart: * if there is a data op. Thus, if the data is sent (or there was * none), then we're done with the rm. */ - if (!rm->data.op_active || conn->c_xmit_data_sent) { - conn->c_xmit_rm = NULL; - conn->c_xmit_sg = 0; - conn->c_xmit_hdr_off = 0; - conn->c_xmit_data_off = 0; - conn->c_xmit_rdma_sent = 0; - conn->c_xmit_atomic_sent = 0; - conn->c_xmit_data_sent = 0; + if (!rm->data.op_active || cp->cp_xmit_data_sent) { + cp->cp_xmit_rm = NULL; + cp->cp_xmit_sg = 0; + cp->cp_xmit_hdr_off = 0; + cp->cp_xmit_data_off = 0; + cp->cp_xmit_rdma_sent = 0; + cp->cp_xmit_atomic_sent = 0; + cp->cp_xmit_data_sent = 0; rds_message_put(rm); } } over_batch: - if (conn->c_trans->xmit_complete) - conn->c_trans->xmit_complete(conn); - release_in_xmit(conn); + if (conn->c_trans->xmit_path_complete) + conn->c_trans->xmit_path_complete(cp); + release_in_xmit(cp); /* Nuke any messages we decided not to retransmit. */ if (!list_empty(&to_be_dropped)) { @@ -490,12 +497,12 @@ over_batch: if (ret == 0) { smp_mb(); if ((test_bit(0, &conn->c_map_queued) || - !list_empty(&conn->c_send_queue)) && - send_gen == conn->c_send_gen) { + !list_empty(&cp->cp_send_queue)) && + send_gen == cp->cp_send_gen) { rds_stats_inc(s_send_lock_queue_raced); if (batch_count < send_batch_count) goto restart; - queue_delayed_work(conn->c_wq, &conn->c_send_w, 1); + queue_delayed_work(cp->cp_wq, &cp->cp_send_w, 1); } } out: @@ -678,42 +685,6 @@ __rds_send_complete(struct rds_sock *rs, struct rds_message *rm, int status) /* No need to wake the app - caller does this */ } -/* - * This is called from the IB send completion when we detect - * a RDMA operation that failed with remote access error. - * So speed is not an issue here. - */ -struct rds_message *rds_send_get_message(struct rds_connection *conn, - struct rm_rdma_op *op) -{ - struct rds_message *rm, *tmp, *found = NULL; - unsigned long flags; - - spin_lock_irqsave(&conn->c_lock, flags); - - list_for_each_entry_safe(rm, tmp, &conn->c_retrans, m_conn_item) { - if (&rm->rdma == op) { - atomic_inc(&rm->m_refcount); - found = rm; - goto out; - } - } - - list_for_each_entry_safe(rm, tmp, &conn->c_send_queue, m_conn_item) { - if (&rm->rdma == op) { - atomic_inc(&rm->m_refcount); - found = rm; - break; - } - } - -out: - spin_unlock_irqrestore(&conn->c_lock, flags); - - return found; -} -EXPORT_SYMBOL_GPL(rds_send_get_message); - /* * This removes messages from the socket's list if they're on it. The list * argument must be private to the caller, we must be able to modify it @@ -834,16 +805,16 @@ unlock_and_drop: * XXX It's not clear to me how this is safely serialized with socket * destruction. Maybe it should bail if it sees SOCK_DEAD. */ -void rds_send_drop_acked(struct rds_connection *conn, u64 ack, - is_acked_func is_acked) +void rds_send_path_drop_acked(struct rds_conn_path *cp, u64 ack, + is_acked_func is_acked) { struct rds_message *rm, *tmp; unsigned long flags; LIST_HEAD(list); - spin_lock_irqsave(&conn->c_lock, flags); + spin_lock_irqsave(&cp->cp_lock, flags); - list_for_each_entry_safe(rm, tmp, &conn->c_retrans, m_conn_item) { + list_for_each_entry_safe(rm, tmp, &cp->cp_retrans, m_conn_item) { if (!rds_send_is_acked(rm, ack, is_acked)) break; @@ -855,17 +826,26 @@ void rds_send_drop_acked(struct rds_connection *conn, u64 ack, if (!list_empty(&list)) smp_mb__after_atomic(); - spin_unlock_irqrestore(&conn->c_lock, flags); + spin_unlock_irqrestore(&cp->cp_lock, flags); /* now remove the messages from the sock list as needed */ rds_send_remove_from_sock(&list, RDS_RDMA_SEND_SUCCESS); } +EXPORT_SYMBOL_GPL(rds_send_path_drop_acked); + +void rds_send_drop_acked(struct rds_connection *conn, u64 ack, + is_acked_func is_acked) +{ + WARN_ON(conn->c_trans->t_mp_capable); + rds_send_path_drop_acked(&conn->c_path[0], ack, is_acked); +} EXPORT_SYMBOL_GPL(rds_send_drop_acked); void rds_send_drop_to(struct rds_sock *rs, struct sockaddr_in *dest) { struct rds_message *rm, *tmp; struct rds_connection *conn = NULL; + struct rds_conn_path *cp; unsigned long flags; LIST_HEAD(list); int conn_dropped = 0; @@ -895,19 +875,23 @@ void rds_send_drop_to(struct rds_sock *rs, struct sockaddr_in *dest) list_for_each_entry(rm, &list, m_sock_item) { conn = rm->m_inc.i_conn; + if (conn->c_trans->t_mp_capable) + cp = rm->m_inc.i_conn_path; + else + cp = &conn->c_path[0]; - spin_lock_irqsave(&conn->c_lock, flags); + spin_lock_irqsave(&cp->cp_lock, flags); /* * Maybe someone else beat us to removing rm from the conn. * If we race with their flag update we'll get the lock and * then really see that the flag has been cleared. */ if (!test_and_clear_bit(RDS_MSG_ON_CONN, &rm->m_flags)) { - spin_unlock_irqrestore(&conn->c_lock, flags); + spin_unlock_irqrestore(&cp->cp_lock, flags); continue; } list_del_init(&rm->m_conn_item); - spin_unlock_irqrestore(&conn->c_lock, flags); + spin_unlock_irqrestore(&cp->cp_lock, flags); /* * Couldn't grab m_rs_lock in top loop (lock ordering), @@ -967,6 +951,7 @@ void rds_send_drop_to(struct rds_sock *rs, struct sockaddr_in *dest) * message from the flow with RDS_CANCEL_SENT_TO. */ static int rds_send_queue_rm(struct rds_sock *rs, struct rds_connection *conn, + struct rds_conn_path *cp, struct rds_message *rm, __be16 sport, __be16 dport, int *queued) { @@ -1010,19 +995,20 @@ static int rds_send_queue_rm(struct rds_sock *rs, struct rds_connection *conn, trying to minimize the time we hold c_lock */ rds_message_populate_header(&rm->m_inc.i_hdr, sport, dport, 0); rm->m_inc.i_conn = conn; + rm->m_inc.i_conn_path = cp; rds_message_addref(rm); - spin_lock(&conn->c_lock); - if (conn->c_pending_flush) { - spin_unlock(&conn->c_lock); + spin_lock(&cp->cp_lock); + if (cp->cp_pending_flush) { + spin_unlock(&cp->cp_lock); spin_unlock_irqrestore(&rs->rs_lock, flags); goto out; } - rm->m_inc.i_hdr.h_sequence = cpu_to_be64(conn->c_next_tx_seq++); - list_add_tail(&rm->m_conn_item, &conn->c_send_queue); + rm->m_inc.i_hdr.h_sequence = cpu_to_be64(cp->cp_next_tx_seq++); + list_add_tail(&rm->m_conn_item, &cp->cp_send_queue); set_bit(RDS_MSG_ON_CONN, &rm->m_flags); - spin_unlock(&conn->c_lock); + spin_unlock(&cp->cp_lock); rdsdebug("queued msg %p len %d, rs %p bytes %d seq %llu\n", rm, len, rs, rs->rs_snd_bytes, @@ -1192,6 +1178,29 @@ static inline int rds_rdma_bytes(struct msghdr *msg, size_t *rdma_bytes) return 0; } +static void rds_send_ping(struct rds_connection *conn); + +static int rds_send_mprds_hash(struct rds_sock *rs, struct rds_connection *conn) +{ + int hash; + + if (conn->c_npaths == 0) + hash = RDS_MPATH_HASH(rs, RDS_MPATH_WORKERS); + else + hash = RDS_MPATH_HASH(rs, conn->c_npaths); + if (conn->c_npaths == 0 && hash != 0) { + rds_send_ping(conn); + + if (conn->c_npaths == 0) { + wait_event_interruptible(conn->c_hs_waitq, + (conn->c_npaths != 0)); + } + if (conn->c_npaths == 1) + hash = 0; + } + return hash; +} + int rds_sendmsg(struct socket *sock, struct msghdr *msg, size_t payload_len) { struct sock *sk = sock->sk; @@ -1206,6 +1215,7 @@ int rds_sendmsg(struct socket *sock, struct msghdr *msg, size_t payload_len) int nonblock = msg->msg_flags & MSG_DONTWAIT; long timeo = sock_sndtimeo(sk, nonblock); size_t total_payload_len = payload_len, rdma_payload_len = 0; + struct rds_conn_path *cpath; bool large_page; /* Mirror Linux UDP mirror of BSD error message compatibility */ @@ -1298,32 +1308,37 @@ int rds_sendmsg(struct socket *sock, struct msghdr *msg, size_t payload_len) goto out; } - if (rs->rs_tos && !conn->c_base_conn) { - conn->c_base_conn = rds_conn_create_outgoing( + if (rs->rs_tos && !conn->c_path[0].cp_base_conn) { + struct rds_conn_path *cp0 = &conn->c_path[0]; + + WARN_ON(conn->c_trans->t_mp_capable); + cp0->cp_base_conn = rds_conn_create_outgoing( sock_net(sock->sk), rs->rs_bound_addr, daddr, rs->rs_transport, 0, sock->sk->sk_allocation); - if (IS_ERR(conn->c_base_conn)) { - ret = PTR_ERR(conn->c_base_conn); + if (IS_ERR(cp0->cp_base_conn)) { + ret = PTR_ERR(cp0->cp_base_conn); goto out; } rds_rtd(RDS_RTD_CM_EXT, "checking conn %p\n", - conn->c_base_conn); - rds_conn_connect_if_down(conn->c_base_conn); + cp0->cp_base_conn); + rds_conn_connect_if_down(cp0->cp_base_conn); } rs->rs_conn = conn; } if (conn->c_tos && !rds_conn_up(conn)) { - if (!rds_conn_up(conn->c_base_conn)) { + struct rds_conn_path *cp0 = &conn->c_path[0]; + + WARN_ON(conn->c_trans->t_mp_capable); + if (!rds_conn_up(cp0->cp_base_conn)) { ret = -EAGAIN; goto out; - } else if (conn->c_base_conn->c_version == + } else if (cp0->cp_base_conn->c_version == RDS_PROTOCOL_COMPAT_VERSION) { - if (!conn->c_reconnect || - conn->c_route_to_base) - conn = conn->c_base_conn; + if (!cp0->cp_reconnect || cp0->cp_route_to_base) + conn = cp0->cp_base_conn; else { ret = -EAGAIN; goto out; @@ -1340,13 +1355,18 @@ int rds_sendmsg(struct socket *sock, struct msghdr *msg, size_t payload_len) goto out; } - if (test_bit(RDS_DESTROY_PENDING, &conn->c_flags)) { + if (conn->c_trans->t_mp_capable) + cpath = &conn->c_path[rds_send_mprds_hash(rs, conn)]; + else + cpath = &conn->c_path[0]; + + if (test_bit(RDS_DESTROY_PENDING, &cpath->cp_flags)) { ret = -EAGAIN; goto out; } /* Not accepting new sends until all the failed ops have been reaped */ - if (rds_async_send_enabled && conn->c_pending_flush) { + if (rds_async_send_enabled && cpath->cp_pending_flush) { ret = -EAGAIN; goto out; } @@ -1367,10 +1387,11 @@ int rds_sendmsg(struct socket *sock, struct msghdr *msg, size_t payload_len) goto out; } - if (rds_conn_state(conn) == RDS_CONN_DOWN) + if (rds_conn_path_state(cpath) == RDS_CONN_DOWN) rds_rtd(RDS_RTD_CM_EXT, "checking conn in down state %p\n", conn); - rds_conn_connect_if_down(conn); + + rds_conn_path_connect_if_down(cpath); ret = rds_cong_wait(conn->c_fcong, dport, nonblock, rs); if (ret) { @@ -1378,17 +1399,17 @@ int rds_sendmsg(struct socket *sock, struct msghdr *msg, size_t payload_len) goto out; } - if (conn->c_rdsinfo_pending) { + if (cpath->cp_rdsinfo_pending) { ret = -EAGAIN; goto out; } - if (!rds_conn_up(conn)) { + if (!rds_conn_path_up(cpath)) { ret = -EAGAIN; goto out; } - while (!rds_send_queue_rm(rs, conn, rm, rs->rs_bound_port, + while (!rds_send_queue_rm(rs, conn, cpath, rm, rs->rs_bound_port, dport, &queued)) { rds_stats_inc(s_send_queue_full); @@ -1396,13 +1417,13 @@ int rds_sendmsg(struct socket *sock, struct msghdr *msg, size_t payload_len) ret = -EAGAIN; goto out; } - if (conn->c_pending_flush) { + if (cpath->cp_pending_flush) { ret = -EAGAIN; goto out; } timeo = wait_event_interruptible_timeout(*sk_sleep(sk), - rds_send_queue_rm(rs, conn, rm, + rds_send_queue_rm(rs, conn, cpath, rm, rs->rs_bound_port, dport, &queued), @@ -1423,10 +1444,9 @@ int rds_sendmsg(struct socket *sock, struct msghdr *msg, size_t payload_len) */ rds_stats_inc(s_send_queued); - ret = rds_send_xmit(conn); + ret = rds_send_xmit(cpath); if (ret == -ENOMEM || ret == -EAGAIN) - queue_delayed_work(conn->c_wq, &conn->c_send_w, 1); - + queue_delayed_work(cpath->cp_wq, &cpath->cp_send_w, 1); rds_message_put(rm); return payload_len; @@ -1464,6 +1484,8 @@ int rds_send_internal(struct rds_connection *conn, struct rds_sock *rs, int queued = 0; int i; + WARN_ON(conn->c_trans->t_mp_capable); + /* pull out the destination info */ dst = rds_nf_hdr_dst(skb); @@ -1539,13 +1561,14 @@ int rds_send_internal(struct rds_connection *conn, struct rds_sock *rs, } /* only take a single pass */ - if (!rds_send_queue_rm(rs, conn, rm, rs->rs_bound_port, - dst->dport, &queued)) { + if (!rds_send_queue_rm(rs, conn, &conn->c_path[0], rm, + rs->rs_bound_port, dst->dport, &queued)) { rds_rtd(RDS_RTD_SND, "cannot block on internal send rs %p", rs); rds_stats_inc(s_send_queue_full); /* force a requeue of the work for later */ - queue_delayed_work(conn->c_wq, &conn->c_send_w, 1); + queue_delayed_work(conn->c_path[0].cp_wq, + &conn->c_path[0].cp_send_w, 1); ret = -EAGAIN; goto out; @@ -1558,7 +1581,8 @@ int rds_send_internal(struct rds_connection *conn, struct rds_sock *rs, rds_stats_inc(s_send_queued); /* always hand the send off to the worker thread */ - queue_delayed_work(conn->c_wq, &conn->c_send_w, 0); + queue_delayed_work(conn->c_path[0].cp_wq, + &conn->c_path[0].cp_send_w, 1); rdsdebug("message sent for rs %p, conn %p, len %d, %u.%u.%u.%u : %u -> %u.%u.%u.%u : %u\n", rs, conn, skb->len, NIPQUAD(dst->saddr), dst->sport, NIPQUAD(dst->daddr), dst->dport); @@ -1581,10 +1605,12 @@ out: } /* - * Reply to a ping packet. + * send out a probe. Can be shared by rds_send_ping, + * rds_send_pong, rds_send_hb. */ int -rds_send_pong(struct rds_connection *conn, __be16 dport) +rds_send_probe(struct rds_conn_path *cp, __be16 sport, + __be16 dport, u8 h_flags) { struct rds_message *rm; unsigned long flags; @@ -1596,32 +1622,46 @@ rds_send_pong(struct rds_connection *conn, __be16 dport) goto out; } - rm->m_daddr = conn->c_faddr; + rm->m_daddr = cp->cp_conn->c_faddr; rm->data.op_active = 1; - rds_rtd(RDS_RTD_CM_EXT, "checking conn %p\n", conn); - rds_conn_connect_if_down(conn); + rds_rtd(RDS_RTD_CM_EXT, "checking conn %p\n", cp->cp_conn); + rds_conn_path_connect_if_down(cp); - ret = rds_cong_wait(conn->c_fcong, dport, 1, NULL); + ret = rds_cong_wait(cp->cp_conn->c_fcong, dport, 1, NULL); if (ret) goto out; - spin_lock_irqsave(&conn->c_lock, flags); - list_add_tail(&rm->m_conn_item, &conn->c_send_queue); + spin_lock_irqsave(&cp->cp_lock, flags); + list_add_tail(&rm->m_conn_item, &cp->cp_send_queue); set_bit(RDS_MSG_ON_CONN, &rm->m_flags); rds_message_addref(rm); - rm->m_inc.i_conn = conn; - - rds_message_populate_header(&rm->m_inc.i_hdr, 0, dport, - conn->c_next_tx_seq); - conn->c_next_tx_seq++; - spin_unlock_irqrestore(&conn->c_lock, flags); + rm->m_inc.i_conn = cp->cp_conn; + rm->m_inc.i_conn_path = cp; + + rds_message_populate_header(&rm->m_inc.i_hdr, sport, dport, + cp->cp_next_tx_seq); + rm->m_inc.i_hdr.h_flags |= h_flags; + cp->cp_next_tx_seq++; + + if (RDS_HS_PROBE(sport, dport) && cp->cp_conn->c_trans->t_mp_capable) { + u16 npaths = RDS_MPATH_WORKERS; + + rds_message_add_extension(&rm->m_inc.i_hdr, + RDS_EXTHDR_NPATHS, &npaths, + sizeof(npaths)); + rds_message_add_extension(&rm->m_inc.i_hdr, + RDS_EXTHDR_GEN_NUM, + &cp->cp_conn->c_my_gen_num, + sizeof(u32)); + } + spin_unlock_irqrestore(&cp->cp_lock, flags); rds_stats_inc(s_send_queued); rds_stats_inc(s_send_pong); - if (!test_bit(RDS_LL_SEND_FULL, &conn->c_flags)) - queue_delayed_work(conn->c_wq, &conn->c_send_w, 0); + if (!test_bit(RDS_LL_SEND_FULL, &cp->cp_flags)) + queue_delayed_work(rds_wq, &cp->cp_send_w, 0); rds_message_put(rm); return 0; @@ -1635,65 +1675,65 @@ out: int rds_send_hb(struct rds_connection *conn, int response) { - struct rds_message *rm; - unsigned long flags; - int ret = 0; + u8 flags = 0; if (conn->c_trans->t_type == RDS_TRANS_TCP) return 0; - rm = rds_message_alloc(0, GFP_ATOMIC); - if (!rm) - return -ENOMEM; - - rm->m_daddr = conn->c_faddr; - rm->data.op_active = 1; - - spin_lock_irqsave(&conn->c_lock, flags); - list_add_tail(&rm->m_conn_item, &conn->c_send_queue); - set_bit(RDS_MSG_ON_CONN, &rm->m_flags); - rds_message_addref(rm); - rm->m_inc.i_conn = conn; - - rds_message_populate_header(&rm->m_inc.i_hdr, 0, 0, - conn->c_next_tx_seq); - if (response) - rm->m_inc.i_hdr.h_flags |= RDS_FLAG_HB_PONG; + flags |= RDS_FLAG_HB_PONG; else - rm->m_inc.i_hdr.h_flags |= RDS_FLAG_HB_PING; + flags |= RDS_FLAG_HB_PING; + flags |= RDS_FLAG_ACK_REQUIRED; - rm->m_inc.i_hdr.h_flags |= RDS_FLAG_ACK_REQUIRED; + rds_send_probe(&conn->c_path[0], 0, 0, flags); - conn->c_next_tx_seq++; - spin_unlock_irqrestore(&conn->c_lock, flags); - - ret = rds_send_xmit(conn); - if (ret == -ENOMEM || ret == -EAGAIN) - queue_delayed_work(conn->c_wq, &conn->c_send_w, 1); - - rds_message_put(rm); return 0; } void rds_route_to_base(struct rds_connection *conn) { struct rds_message *rm, *tmp; - struct rds_connection *base_conn = conn->c_base_conn; + struct rds_conn_path *cp = &conn->c_path[0]; + struct rds_connection *base_conn = cp->cp_base_conn; unsigned long flags; + WARN_ON(conn->c_trans->t_mp_capable); BUG_ON(!conn->c_tos || rds_conn_up(conn) || !base_conn || - !list_empty(&conn->c_retrans)); + !list_empty(&cp->cp_retrans)); - spin_lock_irqsave(&base_conn->c_lock, flags); - list_for_each_entry_safe(rm, tmp, &conn->c_send_queue, m_conn_item) { + spin_lock_irqsave(&base_conn->c_path[0].cp_lock, flags); + list_for_each_entry_safe(rm, tmp, &cp->cp_send_queue, m_conn_item) { list_del_init(&rm->m_conn_item); rm->m_inc.i_conn = base_conn; rm->m_inc.i_hdr.h_sequence = - cpu_to_be64(base_conn->c_next_tx_seq++); - list_add_tail(&rm->m_conn_item, &base_conn->c_send_queue); + cpu_to_be64(base_conn->c_path[0].cp_next_tx_seq++); + list_add_tail(&rm->m_conn_item, + &base_conn->c_path[0].cp_send_queue); + } + spin_unlock_irqrestore(&base_conn->c_path[0].cp_lock, flags); + cp->cp_route_to_base = 1; + queue_delayed_work(rds_wq, &base_conn->c_path[0].cp_send_w, 0); +} + +int +rds_send_pong(struct rds_conn_path *cp, __be16 dport) +{ + return rds_send_probe(cp, 0, dport, 0); +} + +void +rds_send_ping(struct rds_connection *conn) +{ + unsigned long flags; + struct rds_conn_path *cp = &conn->c_path[0]; + + spin_lock_irqsave(&cp->cp_lock, flags); + if (conn->c_ping_triggered) { + spin_unlock_irqrestore(&cp->cp_lock, flags); + return; } - spin_unlock_irqrestore(&base_conn->c_lock, flags); - conn->c_route_to_base = 1; - queue_delayed_work(conn->c_wq, &base_conn->c_send_w, 0); + conn->c_ping_triggered = 1; + spin_unlock_irqrestore(&cp->cp_lock, flags); + rds_send_probe(&conn->c_path[0], RDS_FLAG_PROBE_PORT, 0, 0); } diff --git a/net/rds/tcp.c b/net/rds/tcp.c index abd6cb91db3f..ced67c57821a 100644 --- a/net/rds/tcp.c +++ b/net/rds/tcp.c @@ -35,7 +35,6 @@ #include #include #include -#include #include "rds.h" #include "tcp.h" @@ -43,7 +42,7 @@ /* only for info exporting */ static DEFINE_SPINLOCK(rds_tcp_tc_list_lock); static LIST_HEAD(rds_tcp_tc_list); -unsigned int rds_tcp_tc_count; +static unsigned int rds_tcp_tc_count; /* Track rds_tcp_connection structs so they can be cleaned up */ static DEFINE_SPINLOCK(rds_tcp_conn_lock); @@ -55,8 +54,8 @@ static int rds_tcp_skbuf_handler(struct ctl_table *ctl, int write, void __user *buffer, size_t *lenp, loff_t *fpos); -int rds_tcp_min_sndbuf = SOCK_MIN_SNDBUF; -int rds_tcp_min_rcvbuf = SOCK_MIN_RCVBUF; +static int rds_tcp_min_sndbuf = SOCK_MIN_SNDBUF; +static int rds_tcp_min_rcvbuf = SOCK_MIN_RCVBUF; static struct ctl_table rds_tcp_sysctl_table[] = { #define RDS_TCP_SNDBUF 0 @@ -134,9 +133,9 @@ void rds_tcp_restore_callbacks(struct socket *sock, * from being called while it isn't set. */ void rds_tcp_reset_callbacks(struct socket *sock, - struct rds_connection *conn) + struct rds_conn_path *cp) { - struct rds_tcp_connection *tc = conn->c_transport_data; + struct rds_tcp_connection *tc = cp->cp_transport_data; struct socket *osock = tc->t_sock; if (!osock) @@ -146,8 +145,8 @@ void rds_tcp_reset_callbacks(struct socket *sock, * We have an outstanding SYN to this peer, which may * potentially have transitioned to the RDS_CONN_UP state, * so we must quiesce any send threads before resetting - * c_transport_data. We quiesce these threads by setting - * c_state to something other than RDS_CONN_UP, and then + * cp_transport_data. We quiesce these threads by setting + * cp_state to something other than RDS_CONN_UP, and then * waiting for any existing threads in rds_send_xmit to * complete release_in_xmit(). (Subsequent threads entering * rds_send_xmit() will bail on !rds_conn_up(). @@ -162,38 +161,25 @@ void rds_tcp_reset_callbacks(struct socket *sock, * RDS_CONN_RESETTTING, to ensure that rds_tcp_state_change * cannot mark rds_conn_path_up() in the window before lock_sock() */ - atomic_set(&conn->c_state, RDS_CONN_RESETTING); - wait_event(conn->c_waitq, !test_bit(RDS_IN_XMIT, &conn->c_flags)); + atomic_set(&cp->cp_state, RDS_CONN_RESETTING); + wait_event(cp->cp_waitq, !test_bit(RDS_IN_XMIT, &cp->cp_flags)); lock_sock(osock->sk); /* reset receive side state for rds_tcp_data_recv() for osock */ + cancel_delayed_work_sync(&cp->cp_send_w); + cancel_delayed_work_sync(&cp->cp_recv_w); if (tc->t_tinc) { rds_inc_put(&tc->t_tinc->ti_inc); tc->t_tinc = NULL; } tc->t_tinc_hdr_rem = sizeof(struct rds_header); tc->t_tinc_data_rem = 0; - tc->t_sock = NULL; - - write_lock_bh(&osock->sk->sk_callback_lock); - - osock->sk->sk_user_data = NULL; - osock->sk->sk_data_ready = tc->t_orig_data_ready; - osock->sk->sk_write_space = tc->t_orig_write_space; - osock->sk->sk_state_change = tc->t_orig_state_change; - write_unlock_bh(&osock->sk->sk_callback_lock); + rds_tcp_restore_callbacks(osock, tc); release_sock(osock->sk); sock_release(osock); newsock: - rds_send_reset(conn); + rds_send_path_reset(cp); lock_sock(sock->sk); - write_lock_bh(&sock->sk->sk_callback_lock); - tc->t_sock = sock; - sock->sk->sk_user_data = conn; - sock->sk->sk_data_ready = rds_tcp_data_ready; - sock->sk->sk_write_space = rds_tcp_write_space; - sock->sk->sk_state_change = rds_tcp_state_change; - - write_unlock_bh(&sock->sk->sk_callback_lock); + rds_tcp_set_callbacks(sock, cp); release_sock(sock->sk); } @@ -201,9 +187,9 @@ newsock: * above rds_tcp_reset_callbacks for notes about synchronization * with data path */ -void rds_tcp_set_callbacks(struct socket *sock, struct rds_connection *conn) +void rds_tcp_set_callbacks(struct socket *sock, struct rds_conn_path *cp) { - struct rds_tcp_connection *tc = conn->c_transport_data; + struct rds_tcp_connection *tc = cp->cp_transport_data; rdsdebug("setting sock %p callbacks to tc %p\n", sock, tc); write_lock_bh(&sock->sk->sk_callback_lock); @@ -219,12 +205,12 @@ void rds_tcp_set_callbacks(struct socket *sock, struct rds_connection *conn) sock->sk->sk_data_ready = sock->sk->sk_user_data; tc->t_sock = sock; - tc->conn = conn; + tc->t_cpath = cp; tc->t_orig_data_ready = sock->sk->sk_data_ready; tc->t_orig_write_space = sock->sk->sk_write_space; tc->t_orig_state_change = sock->sk->sk_state_change; - sock->sk->sk_user_data = conn; + sock->sk->sk_user_data = cp; sock->sk->sk_data_ready = rds_tcp_data_ready; sock->sk->sk_write_space = rds_tcp_write_space; sock->sk->sk_state_change = rds_tcp_state_change; @@ -232,7 +218,7 @@ void rds_tcp_set_callbacks(struct socket *sock, struct rds_connection *conn) write_unlock_bh(&sock->sk->sk_callback_lock); } -static void rds_tcp_tc_info(struct socket *sock, unsigned int len, +static void rds_tcp_tc_info(struct socket *rds_sock, unsigned int len, struct rds_info_iterator *iter, struct rds_info_lengths *lens) { @@ -241,6 +227,7 @@ static void rds_tcp_tc_info(struct socket *sock, unsigned int len, unsigned long flags; struct sockaddr_in sin; int sinlen; + struct socket *sock; spin_lock_irqsave(&rds_tcp_tc_list_lock, flags); @@ -249,12 +236,17 @@ static void rds_tcp_tc_info(struct socket *sock, unsigned int len, list_for_each_entry(tc, &rds_tcp_tc_list, t_list_item) { - sock->ops->getname(sock, (struct sockaddr *)&sin, &sinlen, 0); - tsinfo.local_addr = sin.sin_addr.s_addr; - tsinfo.local_port = sin.sin_port; - sock->ops->getname(sock, (struct sockaddr *)&sin, &sinlen, 1); - tsinfo.peer_addr = sin.sin_addr.s_addr; - tsinfo.peer_port = sin.sin_port; + sock = tc->t_sock; + if (sock) { + sock->ops->getname(sock, (struct sockaddr *)&sin, + &sinlen, 0); + tsinfo.local_addr = sin.sin_addr.s_addr; + tsinfo.local_port = sin.sin_port; + sock->ops->getname(sock, (struct sockaddr *)&sin, + &sinlen, 1); + tsinfo.peer_addr = sin.sin_addr.s_addr; + tsinfo.peer_port = sin.sin_port; + } tsinfo.hdr_rem = tc->t_tinc_hdr_rem; tsinfo.data_rem = tc->t_tinc_data_rem; @@ -282,24 +274,28 @@ static int rds_tcp_laddr_check(struct net *net, __be32 addr) static int rds_tcp_conn_alloc(struct rds_connection *conn, gfp_t gfp) { struct rds_tcp_connection *tc; + int i; - tc = kmem_cache_alloc(rds_tcp_conn_slab, gfp); - if (!tc) - return -ENOMEM; - - mutex_init(&tc->t_conn_lock); - tc->t_sock = NULL; - tc->t_tinc = NULL; - tc->t_tinc_hdr_rem = sizeof(struct rds_header); - tc->t_tinc_data_rem = 0; + for (i = 0; i < RDS_MPATH_WORKERS; i++) { + tc = kmem_cache_alloc(rds_tcp_conn_slab, gfp); + if (!tc) + return -ENOMEM; - conn->c_transport_data = tc; + mutex_init(&tc->t_conn_path_lock); + tc->t_sock = NULL; + tc->t_tinc = NULL; + tc->t_tinc_hdr_rem = sizeof(struct rds_header); + tc->t_tinc_data_rem = 0; - spin_lock_irq(&rds_tcp_conn_lock); - list_add_tail(&tc->t_tcp_node, &rds_tcp_conn_list); - spin_unlock_irq(&rds_tcp_conn_lock); + conn->c_path[i].cp_transport_data = tc; + tc->t_cpath = &conn->c_path[i]; - rdsdebug("alloced tc %p\n", conn->c_transport_data); + spin_lock_irq(&rds_tcp_conn_lock); + list_add_tail(&tc->t_tcp_node, &rds_tcp_conn_list); + spin_unlock_irq(&rds_tcp_conn_lock); + rdsdebug("rds_conn_path [%d] tc %p\n", i, + conn->c_path[i].cp_transport_data); + } return 0; } @@ -316,6 +312,17 @@ static void rds_tcp_conn_free(void *arg) kmem_cache_free(rds_tcp_conn_slab, tc); } +static bool list_has_conn(struct list_head *list, struct rds_connection *conn) +{ + struct rds_tcp_connection *tc, *_tc; + + list_for_each_entry_safe(tc, _tc, list, t_tcp_node) { + if (tc->t_cpath->cp_conn == conn) + return true; + } + return false; +} + static void rds_tcp_destroy_conns(void) { struct rds_tcp_connection *tc, *_tc; @@ -323,29 +330,28 @@ static void rds_tcp_destroy_conns(void) /* avoid calling conn_destroy with irqs off */ spin_lock_irq(&rds_tcp_conn_lock); - list_splice(&rds_tcp_conn_list, &tmp_list); - INIT_LIST_HEAD(&rds_tcp_conn_list); + list_for_each_entry_safe(tc, _tc, &rds_tcp_conn_list, t_tcp_node) { + if (!list_has_conn(&tmp_list, tc->t_cpath->cp_conn)) + list_move_tail(&tc->t_tcp_node, &tmp_list); + } spin_unlock_irq(&rds_tcp_conn_lock); - list_for_each_entry_safe(tc, _tc, &tmp_list, t_tcp_node) { - if (tc->conn->c_passive) - rds_conn_destroy(tc->conn->c_passive, 1); - rds_conn_destroy(tc->conn, 1); - } + list_for_each_entry_safe(tc, _tc, &tmp_list, t_tcp_node) + rds_conn_destroy(tc->t_cpath->cp_conn, 1); } static void rds_tcp_exit(void); struct rds_transport rds_tcp_transport = { .laddr_check = rds_tcp_laddr_check, - .xmit_prepare = rds_tcp_xmit_prepare, - .xmit_complete = rds_tcp_xmit_complete, + .xmit_path_prepare = rds_tcp_xmit_path_prepare, + .xmit_path_complete = rds_tcp_xmit_path_complete, .xmit = rds_tcp_xmit, - .recv = rds_tcp_recv, + .recv_path = rds_tcp_recv_path, .conn_alloc = rds_tcp_conn_alloc, .conn_free = rds_tcp_conn_free, - .conn_connect = rds_tcp_conn_connect, - .conn_shutdown = rds_tcp_conn_shutdown, + .conn_path_connect = rds_tcp_conn_path_connect, + .conn_path_shutdown = rds_tcp_conn_path_shutdown, .inc_copy_to_user = rds_tcp_inc_copy_to_user, .inc_free = rds_tcp_inc_free, .skb_local = rds_skb_local, @@ -355,6 +361,7 @@ struct rds_transport rds_tcp_transport = { .t_name = "tcp", .t_type = RDS_TRANS_TCP, .t_prefer_loopback = 1, + .t_mp_capable = 1, }; static int rds_tcp_netid; @@ -488,10 +495,30 @@ static struct pernet_operations rds_tcp_net_ops = { .size = sizeof(struct rds_tcp_net), }; +/* explicitly send a RST on each socket, thereby releasing any socket refcnts + * that may otherwise hold up netns deletion. + */ +static void rds_tcp_conn_paths_destroy(struct rds_connection *conn) +{ + struct rds_conn_path *cp; + struct rds_tcp_connection *tc; + int i; + struct sock *sk; + + for (i = 0; i < RDS_MPATH_WORKERS; i++) { + cp = &conn->c_path[i]; + tc = cp->cp_transport_data; + if (!tc->t_sock) + continue; + sk = tc->t_sock->sk; + sk->sk_prot->disconnect(sk, 0); + tcp_done(sk); + } +} + static void rds_tcp_kill_sock(struct net *net) { struct rds_tcp_connection *tc, *_tc; - struct sock *sk; LIST_HEAD(tmp_list); struct rds_tcp_net *rtn = net_generic(net, rds_tcp_netid); @@ -500,23 +527,27 @@ static void rds_tcp_kill_sock(struct net *net) flush_work(&rtn->rds_tcp_accept_w); spin_lock_irq(&rds_tcp_conn_lock); list_for_each_entry_safe(tc, _tc, &rds_tcp_conn_list, t_tcp_node) { - struct net *c_net = read_pnet(&tc->conn->c_net); + struct net *c_net = read_pnet(&tc->t_cpath->cp_conn->c_net); if (net != c_net || !tc->t_sock) continue; - list_move_tail(&tc->t_tcp_node, &tmp_list); + if (!list_has_conn(&tmp_list, tc->t_cpath->cp_conn)) + list_move_tail(&tc->t_tcp_node, &tmp_list); } spin_unlock_irq(&rds_tcp_conn_lock); list_for_each_entry_safe(tc, _tc, &tmp_list, t_tcp_node) { - sk = tc->t_sock->sk; - sk->sk_prot->disconnect(sk, 0); - tcp_done(sk); - if (tc->conn->c_passive) - rds_conn_destroy(tc->conn->c_passive, 1); - rds_conn_destroy(tc->conn, 1); + rds_tcp_conn_paths_destroy(tc->t_cpath->cp_conn); + rds_conn_destroy(tc->t_cpath->cp_conn, 1); } } +void *rds_tcp_listen_sock_def_readable(struct net *net) +{ + struct rds_tcp_net *rtn = net_generic(net, rds_tcp_netid); + + return rtn->rds_tcp_listen_sock->sk->sk_user_data; +} + static int rds_tcp_dev_event(struct notifier_block *this, unsigned long event, void *ptr) { @@ -551,13 +582,13 @@ static void rds_tcp_sysctl_reset(struct net *net) spin_lock_irq(&rds_tcp_conn_lock); list_for_each_entry_safe(tc, _tc, &rds_tcp_conn_list, t_tcp_node) { - struct net *c_net = read_pnet(&tc->conn->c_net); + struct net *c_net = read_pnet(&tc->t_cpath->cp_conn->c_net); if (net != c_net || !tc->t_sock) continue; /* reconnect with new parameters */ - rds_conn_drop(tc->conn, DR_USER_RESET); + rds_conn_path_drop(tc->t_cpath, DR_USER_RESET); } spin_unlock_irq(&rds_tcp_conn_lock); } @@ -593,7 +624,7 @@ static void rds_tcp_exit(void) } module_exit(rds_tcp_exit); -int rds_tcp_init(void) +static int rds_tcp_init(void) { int ret; @@ -617,7 +648,7 @@ int rds_tcp_init(void) ret = rds_tcp_recv_init(); if (ret) - goto out_slab; + goto out_pernet; ret = rds_trans_register(&rds_tcp_transport); if (ret) @@ -629,8 +660,9 @@ int rds_tcp_init(void) out_recv: rds_tcp_recv_exit(); -out_slab: +out_pernet: unregister_pernet_subsys(&rds_tcp_net_ops); +out_slab: kmem_cache_destroy(rds_tcp_conn_slab); out: return ret; diff --git a/net/rds/tcp.h b/net/rds/tcp.h index 397fcbc09929..7afb12cae934 100644 --- a/net/rds/tcp.h +++ b/net/rds/tcp.h @@ -11,11 +11,11 @@ struct rds_tcp_incoming { struct rds_tcp_connection { struct list_head t_tcp_node; - struct rds_connection *conn; - /* t_conn_lock synchronizes the connection establishment between - * rds_tcp_accept_one and rds_tcp_conn_connect + struct rds_conn_path *t_cpath; + /* t_conn_path_lock synchronizes the connection establishment between + * rds_tcp_accept_one and rds_tcp_conn_path_connect */ - struct mutex t_conn_lock; + struct mutex t_conn_path_lock; struct socket *t_sock; void *t_orig_write_space; void *t_orig_data_ready; @@ -47,11 +47,10 @@ struct rds_tcp_statistics { }; /* tcp.c */ -int rds_tcp_init(void); void rds_tcp_tune(struct socket *sock); void rds_tcp_nonagle(struct socket *sock); -void rds_tcp_set_callbacks(struct socket *sock, struct rds_connection *conn); -void rds_tcp_reset_callbacks(struct socket *sock, struct rds_connection *conn); +void rds_tcp_set_callbacks(struct socket *sock, struct rds_conn_path *cp); +void rds_tcp_reset_callbacks(struct socket *sock, struct rds_conn_path *cp); void rds_tcp_restore_callbacks(struct socket *sock, struct rds_tcp_connection *tc); u32 rds_tcp_snd_nxt(struct rds_tcp_connection *tc); @@ -61,8 +60,8 @@ extern struct rds_transport rds_tcp_transport; void rds_tcp_accept_work(struct sock *sk); /* tcp_connect.c */ -int rds_tcp_conn_connect(struct rds_connection *conn); -void rds_tcp_conn_shutdown(struct rds_connection *conn); +int rds_tcp_conn_path_connect(struct rds_conn_path *cp); +void rds_tcp_conn_path_shutdown(struct rds_conn_path *cp); void rds_tcp_state_change(struct sock *sk); /* tcp_listen.c */ @@ -71,18 +70,19 @@ void rds_tcp_listen_stop(struct socket *); void rds_tcp_listen_data_ready(struct sock *sk); int rds_tcp_accept_one(struct socket *sock); int rds_tcp_keepalive(struct socket *sock); +void *rds_tcp_listen_sock_def_readable(struct net *net); /* tcp_recv.c */ int rds_tcp_recv_init(void); void rds_tcp_recv_exit(void); void rds_tcp_data_ready(struct sock *sk); -int rds_tcp_recv(struct rds_connection *conn); +int rds_tcp_recv_path(struct rds_conn_path *cp); void rds_tcp_inc_free(struct rds_incoming *inc); int rds_tcp_inc_copy_to_user(struct rds_incoming *inc, struct iov_iter *to); /* tcp_send.c */ -void rds_tcp_xmit_prepare(struct rds_connection *conn); -void rds_tcp_xmit_complete(struct rds_connection *conn); +void rds_tcp_xmit_path_prepare(struct rds_conn_path *cp); +void rds_tcp_xmit_path_complete(struct rds_conn_path *cp); int rds_tcp_xmit(struct rds_connection *conn, struct rds_message *rm, unsigned int hdr_off, unsigned int sg, unsigned int off); void rds_tcp_write_space(struct sock *sk); diff --git a/net/rds/tcp_connect.c b/net/rds/tcp_connect.c index ebffa5b91aff..d2f04abe7d59 100644 --- a/net/rds/tcp_connect.c +++ b/net/rds/tcp_connect.c @@ -40,50 +40,63 @@ void rds_tcp_state_change(struct sock *sk) { void (*state_change)(struct sock *sk); - struct rds_connection *conn; + struct rds_conn_path *cp; struct rds_tcp_connection *tc; read_lock(&sk->sk_callback_lock); - conn = sk->sk_user_data; - if (!conn) { + cp = sk->sk_user_data; + if (!cp) { state_change = sk->sk_state_change; goto out; } - tc = conn->c_transport_data; + tc = cp->cp_transport_data; state_change = tc->t_orig_state_change; rdsdebug("sock %p state_change to %d\n", tc->t_sock, sk->sk_state); - switch(sk->sk_state) { - /* ignore connecting sockets as they make progress */ - case TCP_SYN_SENT: - case TCP_SYN_RECV: - break; - case TCP_ESTABLISHED: - rds_connect_path_complete(conn, RDS_CONN_CONNECTING); - break; - case TCP_CLOSE_WAIT: - case TCP_CLOSE: - rds_conn_drop(conn, DR_TCP_STATE_CLOSE); - default: - break; + switch (sk->sk_state) { + /* ignore connecting sockets as they make progress */ + case TCP_SYN_SENT: + case TCP_SYN_RECV: + break; + case TCP_ESTABLISHED: + if (cp->cp_conn->c_laddr > cp->cp_conn->c_faddr && + rds_conn_path_transition(cp, RDS_CONN_CONNECTING, + RDS_CONN_ERROR)) { + rds_conn_path_drop(cp, DR_TCP_STATE_CLOSE); + } else { + rds_connect_path_complete(cp, RDS_CONN_CONNECTING); + } + break; + case TCP_CLOSE_WAIT: + case TCP_CLOSE: + rds_conn_path_drop(cp, DR_TCP_STATE_CLOSE); + default: + break; } out: read_unlock(&sk->sk_callback_lock); state_change(sk); } -int rds_tcp_conn_connect(struct rds_connection *conn) +int rds_tcp_conn_path_connect(struct rds_conn_path *cp) { struct socket *sock = NULL; struct sockaddr_in src, dest; int ret; - struct rds_tcp_connection *tc = conn->c_transport_data; + struct rds_connection *conn = cp->cp_conn; + struct rds_tcp_connection *tc = cp->cp_transport_data; - mutex_lock(&tc->t_conn_lock); + /* for multipath rds,we only trigger the connection after + * the handshake probe has determined the number of paths. + */ + if (cp->cp_index > 0 && cp->cp_conn->c_npaths < 2) + return -EAGAIN; + + mutex_lock(&tc->t_conn_path_lock); - if (rds_conn_up(conn)) { - mutex_unlock(&tc->t_conn_lock); + if (rds_conn_path_up(cp)) { + mutex_unlock(&tc->t_conn_path_lock); return 0; } @@ -100,8 +113,8 @@ int rds_tcp_conn_connect(struct rds_connection *conn) ret = sock->ops->bind(sock, (struct sockaddr *)&src, sizeof(src)); if (ret) { - rdsdebug("bind failed with %d at address %u.%u.%u.%u\n", - ret, NIPQUAD(conn->c_laddr)); + rdsdebug("bind failed with %d at address %pI4\n", + ret, &conn->c_laddr); goto out; } @@ -113,11 +126,11 @@ int rds_tcp_conn_connect(struct rds_connection *conn) * once we call connect() we can start getting callbacks and they * own the socket */ - rds_tcp_set_callbacks(sock, conn); + rds_tcp_set_callbacks(sock, cp); ret = sock->ops->connect(sock, (struct sockaddr *)&dest, sizeof(dest), O_NONBLOCK); - rdsdebug("connect to address %u.%u.%u.%u returned %d\n", - NIPQUAD(conn->c_faddr), ret); + cp->cp_outgoing = 1; + rdsdebug("connect to address %pI4 returned %d\n", &conn->c_faddr, ret); if (ret == -EINPROGRESS) ret = 0; @@ -125,10 +138,10 @@ int rds_tcp_conn_connect(struct rds_connection *conn) rds_tcp_keepalive(sock); sock = NULL; } else { - rds_tcp_restore_callbacks(sock, conn->c_transport_data); + rds_tcp_restore_callbacks(sock, cp->cp_transport_data); } out: - mutex_unlock(&tc->t_conn_lock); + mutex_unlock(&tc->t_conn_path_lock); if (sock) sock_release(sock); return ret; @@ -143,12 +156,13 @@ out: * callbacks to those set by TCP. Our callbacks won't execute again once we * hold the sock lock. */ -void rds_tcp_conn_shutdown(struct rds_connection *conn) +void rds_tcp_conn_path_shutdown(struct rds_conn_path *cp) { - struct rds_tcp_connection *tc = conn->c_transport_data; + struct rds_tcp_connection *tc = cp->cp_transport_data; struct socket *sock = tc->t_sock; - rdsdebug("shutting down conn %p tc %p sock %p\n", conn, tc, sock); + rdsdebug("shutting down conn %p tc %p sock %p\n", + cp->cp_conn, tc, sock); if (sock) { sock->ops->shutdown(sock, RCV_SHUTDOWN | SEND_SHUTDOWN); @@ -157,7 +171,7 @@ void rds_tcp_conn_shutdown(struct rds_connection *conn) release_sock(sock->sk); sock_release(sock); - }; + } if (tc->t_tinc) { rds_inc_put(&tc->t_tinc->ti_inc); diff --git a/net/rds/tcp_listen.c b/net/rds/tcp_listen.c index b3826efcdab4..0e179f127081 100644 --- a/net/rds/tcp_listen.c +++ b/net/rds/tcp_listen.c @@ -69,6 +69,40 @@ bail: return ret; } +/* rds_tcp_accept_one_path(): if accepting on cp_index > 0, make sure the + * client's ipaddr < server's ipaddr. Otherwise, close the accepted + * socket and force a reconneect from smaller -> larger ip addr. The reason + * we special case cp_index 0 is to allow the rds probe ping itself to itself + * get through efficiently. + * Since reconnects are only initiated from the node with the numerically + * smaller ip address, we recycle conns in RDS_CONN_ERROR on the passive side + * by moving them to CONNECTING in this function. + */ +struct rds_tcp_connection *rds_tcp_accept_one_path(struct rds_connection *conn) +{ + int i; + bool peer_is_smaller = (conn->c_faddr < conn->c_laddr); + int npaths = max_t(int, 1, conn->c_npaths); + + if (!peer_is_smaller) { + if (npaths <= 1) + rds_conn_path_connect_if_down(&conn->c_path[0]); + return NULL; + } + + for (i = 0; i < npaths; i++) { + struct rds_conn_path *cp = &conn->c_path[i]; + + if (rds_conn_path_transition(cp, RDS_CONN_DOWN, + RDS_CONN_CONNECTING) || + rds_conn_path_transition(cp, RDS_CONN_ERROR, + RDS_CONN_CONNECTING)) { + return cp->cp_transport_data; + } + } + return NULL; +} + int rds_tcp_accept_one(struct socket *sock) { struct socket *new_sock = NULL; @@ -77,6 +111,7 @@ int rds_tcp_accept_one(struct socket *sock) struct inet_sock *inet; struct rds_tcp_connection *rs_tcp = NULL; int conn_state; + struct rds_conn_path *cp; if (!sock) /* module unload or netns delete in progress */ return -ENETUNREACH; @@ -101,9 +136,9 @@ int rds_tcp_accept_one(struct socket *sock) inet = inet_sk(new_sock->sk); - rdsdebug("accepted tcp %u.%u.%u.%u:%u -> %u.%u.%u.%u:%u\n", - NIPQUAD(inet->inet_saddr), ntohs(inet->inet_sport), - NIPQUAD(inet->inet_daddr), ntohs(inet->inet_dport)); + rdsdebug("accepted tcp %pI4:%u -> %pI4:%u\n", + &inet->inet_saddr, ntohs(inet->inet_sport), + &inet->inet_daddr, ntohs(inet->inet_dport)); conn = rds_conn_create(sock_net(sock->sk), inet->inet_saddr, inet->inet_daddr, @@ -117,11 +152,14 @@ int rds_tcp_accept_one(struct socket *sock) * If the client reboots, this conn will need to be cleaned up. * rds_tcp_state_change() will do that cleanup */ - rs_tcp = (struct rds_tcp_connection *)conn->c_transport_data; - rds_conn_transition(conn, RDS_CONN_DOWN, RDS_CONN_CONNECTING); - mutex_lock(&rs_tcp->t_conn_lock); - conn_state = rds_conn_state(conn); - if (conn_state != RDS_CONN_CONNECTING && conn_state != RDS_CONN_UP) + rs_tcp = rds_tcp_accept_one_path(conn); + if (!rs_tcp) + goto rst_nsk; + mutex_lock(&rs_tcp->t_conn_path_lock); + cp = rs_tcp->t_cpath; + conn_state = rds_conn_path_state(cp); + BUG_ON(conn_state == RDS_CONN_UP); + if (conn_state != RDS_CONN_CONNECTING && conn_state != RDS_CONN_ERROR) goto rst_nsk; if (rs_tcp->t_sock) { /* Need to resolve a duelling SYN between peers. @@ -131,17 +169,17 @@ int rds_tcp_accept_one(struct socket *sock) * c_transport_data. */ if (ntohl(inet->inet_saddr) < ntohl(inet->inet_daddr) || - !conn->c_outgoing) { + !cp->cp_outgoing) { goto rst_nsk; } else { - rds_tcp_reset_callbacks(new_sock, conn); - conn->c_outgoing = 0; + rds_tcp_reset_callbacks(new_sock, cp); + cp->cp_outgoing = 0; /* rds_connect_path_complete() marks RDS_CONN_UP */ - rds_connect_path_complete(conn, RDS_CONN_RESETTING); + rds_connect_path_complete(cp, RDS_CONN_RESETTING); } } else { - rds_tcp_set_callbacks(new_sock, conn); - rds_connect_path_complete(conn, RDS_CONN_CONNECTING); + rds_tcp_set_callbacks(new_sock, cp); + rds_connect_path_complete(cp, RDS_CONN_CONNECTING); } new_sock = NULL; ret = 0; @@ -152,7 +190,7 @@ rst_nsk: ret = 0; out: if (rs_tcp) - mutex_unlock(&rs_tcp->t_conn_lock); + mutex_unlock(&rs_tcp->t_conn_path_lock); if (new_sock) sock_release(new_sock); return ret; @@ -179,6 +217,8 @@ void rds_tcp_listen_data_ready(struct sock *sk) */ if (sk->sk_state == TCP_LISTEN) rds_tcp_accept_work(sk); + else + ready = rds_tcp_listen_sock_def_readable(sock_net(sk)); out: read_unlock(&sk->sk_callback_lock); @@ -203,7 +243,7 @@ struct socket *rds_tcp_listen_init(struct net *net) sock->sk->sk_data_ready = rds_tcp_listen_data_ready; write_unlock_bh(&sock->sk->sk_callback_lock); - sin.sin_family = PF_INET, + sin.sin_family = PF_INET; sin.sin_addr.s_addr = (__force u32)htonl(INADDR_ANY); sin.sin_port = (__force u16)htons(RDS_TCP_PORT); diff --git a/net/rds/tcp_recv.c b/net/rds/tcp_recv.c index 74b305eb012b..3319a1a95c09 100644 --- a/net/rds/tcp_recv.c +++ b/net/rds/tcp_recv.c @@ -146,7 +146,7 @@ static void rds_tcp_cong_recv(struct rds_connection *conn, } struct rds_tcp_desc_arg { - struct rds_connection *conn; + struct rds_conn_path *conn_path; gfp_t gfp; }; @@ -154,8 +154,8 @@ static int rds_tcp_data_recv(read_descriptor_t *desc, struct sk_buff *skb, unsigned int offset, size_t len) { struct rds_tcp_desc_arg *arg = desc->arg.data; - struct rds_connection *conn = arg->conn; - struct rds_tcp_connection *tc = conn->c_transport_data; + struct rds_conn_path *cp = arg->conn_path; + struct rds_tcp_connection *tc = cp->cp_transport_data; struct rds_tcp_incoming *tinc = tc->t_tinc; struct sk_buff *clone; size_t left = len, to_copy; @@ -177,7 +177,8 @@ static int rds_tcp_data_recv(read_descriptor_t *desc, struct sk_buff *skb, } tc->t_tinc = tinc; rdsdebug("alloced tinc %p\n", tinc); - rds_inc_init(&tinc->ti_inc, conn, conn->c_faddr); + rds_inc_path_init(&tinc->ti_inc, cp, + cp->cp_conn->c_faddr); tinc->ti_inc.i_rx_lat_trace[RDS_MSG_RX_HDR] = local_clock(); @@ -232,6 +233,8 @@ static int rds_tcp_data_recv(read_descriptor_t *desc, struct sk_buff *skb, } if (tc->t_tinc_hdr_rem == 0 && tc->t_tinc_data_rem == 0) { + struct rds_connection *conn = cp->cp_conn; + if (tinc->ti_inc.i_hdr.h_flags == RDS_FLAG_CONG_BITMAP) rds_tcp_cong_recv(conn, tinc); else @@ -254,15 +257,15 @@ out: } /* the caller has to hold the sock lock */ -int rds_tcp_read_sock(struct rds_connection *conn, gfp_t gfp) +static int rds_tcp_read_sock(struct rds_conn_path *cp, gfp_t gfp) { - struct rds_tcp_connection *tc = conn->c_transport_data; + struct rds_tcp_connection *tc = cp->cp_transport_data; struct socket *sock = tc->t_sock; read_descriptor_t desc; struct rds_tcp_desc_arg arg; /* It's like glib in the kernel! */ - arg.conn = conn; + arg.conn_path = cp; arg.gfp = gfp; desc.arg.data = &arg; desc.error = 0; @@ -282,16 +285,17 @@ int rds_tcp_read_sock(struct rds_connection *conn, gfp_t gfp) * if we fail to allocate we're in trouble.. blindly wait some time before * trying again to see if the VM can free up something for us. */ -int rds_tcp_recv(struct rds_connection *conn) +int rds_tcp_recv_path(struct rds_conn_path *cp) { - struct rds_tcp_connection *tc = conn->c_transport_data; + struct rds_tcp_connection *tc = cp->cp_transport_data; struct socket *sock = tc->t_sock; int ret = 0; - rdsdebug("recv worker conn %p tc %p sock %p\n", conn, tc, sock); + rdsdebug("recv worker path [%d] tc %p sock %p\n", + cp->cp_index, tc, sock); lock_sock(sock->sk); - ret = rds_tcp_read_sock(conn, GFP_KERNEL); + ret = rds_tcp_read_sock(cp, GFP_KERNEL); release_sock(sock->sk); return ret; @@ -300,24 +304,24 @@ int rds_tcp_recv(struct rds_connection *conn) void rds_tcp_data_ready(struct sock *sk) { void (*ready)(struct sock *sk); - struct rds_connection *conn; + struct rds_conn_path *cp; struct rds_tcp_connection *tc; rdsdebug("data ready sk %p\n", sk); read_lock(&sk->sk_callback_lock); - conn = sk->sk_user_data; - if (!conn) { /* check for teardown race */ + cp = sk->sk_user_data; + if (!cp) { /* check for teardown race */ ready = sk->sk_data_ready; goto out; } - tc = conn->c_transport_data; + tc = cp->cp_transport_data; ready = tc->t_orig_data_ready; rds_tcp_stats_inc(s_tcp_data_ready_calls); - if (rds_tcp_read_sock(conn, GFP_ATOMIC) == -ENOMEM) - queue_delayed_work(conn->c_wq, &conn->c_recv_w, 0); + if (rds_tcp_read_sock(cp, GFP_ATOMIC) == -ENOMEM) + queue_delayed_work(rds_wq, &cp->cp_recv_w, 0); out: read_unlock(&sk->sk_callback_lock); ready(sk); diff --git a/net/rds/tcp_send.c b/net/rds/tcp_send.c index aa3a82a918e2..e0ede55bdf3d 100644 --- a/net/rds/tcp_send.c +++ b/net/rds/tcp_send.c @@ -48,22 +48,22 @@ static void rds_tcp_cork(struct socket *sock, int val) set_fs(oldfs); } -void rds_tcp_xmit_prepare(struct rds_connection *conn) +void rds_tcp_xmit_path_prepare(struct rds_conn_path *cp) { - struct rds_tcp_connection *tc = conn->c_transport_data; + struct rds_tcp_connection *tc = cp->cp_transport_data; rds_tcp_cork(tc->t_sock, 1); } -void rds_tcp_xmit_complete(struct rds_connection *conn) +void rds_tcp_xmit_path_complete(struct rds_conn_path *cp) { - struct rds_tcp_connection *tc = conn->c_transport_data; + struct rds_tcp_connection *tc = cp->cp_transport_data; rds_tcp_cork(tc->t_sock, 0); } /* the core send_sem serializes this with other xmit and shutdown */ -int rds_tcp_sendmsg(struct socket *sock, void *data, unsigned int len) +static int rds_tcp_sendmsg(struct socket *sock, void *data, unsigned int len) { struct kvec vec = { .iov_base = data, @@ -80,7 +80,8 @@ int rds_tcp_sendmsg(struct socket *sock, void *data, unsigned int len) int rds_tcp_xmit(struct rds_connection *conn, struct rds_message *rm, unsigned int hdr_off, unsigned int sg, unsigned int off) { - struct rds_tcp_connection *tc = conn->c_transport_data; + struct rds_conn_path *cp = rm->m_inc.i_conn_path; + struct rds_tcp_connection *tc = cp->cp_transport_data; int done = 0; int ret = 0; int more; @@ -98,6 +99,9 @@ int rds_tcp_xmit(struct rds_connection *conn, struct rds_message *rm, set_bit(RDS_MSG_HAS_ACK_SEQ, &rm->m_flags); tc->t_last_expected_una = rm->m_ack_seq + 1; + if (test_bit(RDS_MSG_RETRANSMITTED, &rm->m_flags)) + rm->m_inc.i_hdr.h_flags |= RDS_FLAG_RETRANSMITTED; + rdsdebug("rm %p tcp nxt %u ack_seq %llu\n", rm, rds_tcp_snd_nxt(tc), (unsigned long long)rm->m_ack_seq); @@ -149,10 +153,17 @@ out: rds_tcp_stats_inc(s_tcp_sndbuf_full); ret = 0; } else { - printk(KERN_WARNING "RDS/tcp: send to %u.%u.%u.%u " - "returned %d, disconnecting and reconnecting\n", - NIPQUAD(conn->c_faddr), ret); - rds_conn_drop(conn, DR_TCP_SEND_FAIL); + /* No need to disconnect/reconnect if path_drop + * has already been triggered, because, e.g., of + * an incoming RST. + */ + if (rds_conn_path_up(cp)) { + pr_warn("RDS/tcp: send to %pI4 on cp [%d]" + "returned %d, " + "disconnecting and reconnecting\n", + &conn->c_faddr, cp->cp_index, ret); + rds_conn_path_drop(cp, DR_TCP_STATE_CLOSE); + } } } if (done == 0) @@ -177,27 +188,27 @@ static int rds_tcp_is_acked(struct rds_message *rm, uint64_t ack) void rds_tcp_write_space(struct sock *sk) { void (*write_space)(struct sock *sk); - struct rds_connection *conn; + struct rds_conn_path *cp; struct rds_tcp_connection *tc; read_lock(&sk->sk_callback_lock); - conn = sk->sk_user_data; - if (!conn) { + cp = sk->sk_user_data; + if (!cp) { write_space = sk->sk_write_space; goto out; } - tc = conn->c_transport_data; + tc = cp->cp_transport_data; rdsdebug("write_space for tc %p\n", tc); write_space = tc->t_orig_write_space; rds_tcp_stats_inc(s_tcp_write_space_calls); rdsdebug("tcp una %u\n", rds_tcp_snd_una(tc)); tc->t_last_seen_una = rds_tcp_snd_una(tc); - rds_send_drop_acked(conn, rds_tcp_snd_una(tc), rds_tcp_is_acked); + rds_send_path_drop_acked(cp, rds_tcp_snd_una(tc), rds_tcp_is_acked); if ((atomic_read(&sk->sk_wmem_alloc) << 1) <= sk->sk_sndbuf) - queue_delayed_work(conn->c_wq, &conn->c_send_w, 0); + queue_delayed_work(rds_wq, &cp->cp_send_w, 0); out: read_unlock(&sk->sk_callback_lock); diff --git a/net/rds/threads.c b/net/rds/threads.c index 48fab2371586..e4ea0c4f7440 100644 --- a/net/rds/threads.c +++ b/net/rds/threads.c @@ -77,38 +77,40 @@ EXPORT_SYMBOL_GPL(rds_wq); struct workqueue_struct *rds_local_wq; EXPORT_SYMBOL_GPL(rds_local_wq); -void rds_connect_path_complete(struct rds_connection *conn, int curr) +void rds_connect_path_complete(struct rds_conn_path *cp, int curr) { - if (!rds_conn_transition(conn, curr, RDS_CONN_UP)) { + struct rds_connection *conn = cp->cp_conn; + + if (!rds_conn_path_transition(cp, curr, RDS_CONN_UP)) { pr_warn("RDS: Cannot transition conn <%pI4,%pI4,%d> to state UP, current state is %d\n", &conn->c_laddr, &conn->c_faddr, conn->c_tos, - atomic_read(&conn->c_state)); - rds_conn_drop(conn, DR_IB_NOT_CONNECTING_STATE); + atomic_read(&cp->cp_state)); + rds_conn_path_drop(cp, DR_IB_NOT_CONNECTING_STATE); return; } rds_rtd(RDS_RTD_CM_EXT, "conn %p for %pI4 to %pI4 tos %d complete\n", conn, &conn->c_laddr, &conn->c_faddr, conn->c_tos); - conn->c_reconnect_jiffies = 0; - conn->c_reconnect_retry = rds_sysctl_reconnect_retry_ms; - conn->c_reconnect_retry_count = 0; + cp->cp_reconnect_jiffies = 0; + cp->cp_reconnect_retry = rds_sysctl_reconnect_retry_ms; + cp->cp_reconnect_retry_count = 0; set_bit(0, &conn->c_map_queued); - queue_delayed_work(conn->c_wq, &conn->c_send_w, 0); - queue_delayed_work(conn->c_wq, &conn->c_recv_w, 0); - queue_delayed_work(conn->c_wq, &conn->c_hb_w, 0); - conn->c_hb_start = 0; + queue_delayed_work(cp->cp_wq, &cp->cp_send_w, 0); + queue_delayed_work(cp->cp_wq, &cp->cp_recv_w, 0); + queue_delayed_work(cp->cp_wq, &cp->cp_hb_w, 0); + cp->cp_hb_start = 0; - conn->c_connection_start = get_seconds(); - conn->c_reconnect = 1; + cp->cp_connection_start = get_seconds(); + cp->cp_reconnect = 1; conn->c_proposed_version = RDS_PROTOCOL_VERSION; - conn->c_route_to_base = 0; + cp->cp_route_to_base = 0; } EXPORT_SYMBOL_GPL(rds_connect_path_complete); void rds_connect_complete(struct rds_connection *conn) { - rds_connect_path_complete(conn, RDS_CONN_CONNECTING); + rds_connect_path_complete(&conn->c_path[0], RDS_CONN_CONNECTING); } EXPORT_SYMBOL_GPL(rds_connect_complete); @@ -130,62 +132,78 @@ EXPORT_SYMBOL_GPL(rds_connect_complete); * We should *always* start with a random backoff; otherwise a broken connection * will always take several iterations to be re-established. */ -void rds_queue_reconnect(struct rds_connection *conn) +void rds_queue_reconnect(struct rds_conn_path *cp) { unsigned long rand; + struct rds_connection *conn = cp->cp_conn; + bool is_tcp = conn->c_trans->t_type == RDS_TRANS_TCP; rds_rtd(RDS_RTD_CM_EXT, "conn %p for %pI4 to %pI4 tos %d reconnect jiffies %lu\n", conn, &conn->c_laddr, &conn->c_faddr, conn->c_tos, - conn->c_reconnect_jiffies); + cp->cp_reconnect_jiffies); + + /* let peer with smaller addr initiate reconnect, to avoid duels */ + if (is_tcp && conn->c_laddr > conn->c_faddr) + return; - set_bit(RDS_RECONNECT_PENDING, &conn->c_flags); - if (conn->c_reconnect_jiffies == 0 || - test_and_clear_bit(RDS_RECONNECT_TIMEDOUT, &conn->c_reconn_flags)) { - conn->c_reconnect_jiffies = rds_sysctl_reconnect_min_jiffies; - queue_delayed_work(conn->c_wq, &conn->c_conn_w, 0); + set_bit(RDS_RECONNECT_PENDING, &cp->cp_flags); + if (cp->cp_reconnect_jiffies == 0 || + test_and_clear_bit(RDS_RECONNECT_TIMEDOUT, &cp->cp_reconn_flags)) { + cp->cp_reconnect_jiffies = rds_sysctl_reconnect_min_jiffies; + queue_delayed_work(cp->cp_wq, &cp->cp_conn_w, 0); return; } get_random_bytes(&rand, sizeof(rand)); rds_rtd(RDS_RTD_CM_EXT, "%lu delay %lu ceil conn %p for %pI4 -> %pI4 tos %d\n", - rand % conn->c_reconnect_jiffies, conn->c_reconnect_jiffies, + rand % cp->cp_reconnect_jiffies, cp->cp_reconnect_jiffies, conn, &conn->c_laddr, &conn->c_faddr, conn->c_tos); - queue_delayed_work(conn->c_wq, &conn->c_conn_w, - rand % conn->c_reconnect_jiffies); + queue_delayed_work(cp->cp_wq, &cp->cp_conn_w, + rand % cp->cp_reconnect_jiffies); - conn->c_reconnect_jiffies = min(conn->c_reconnect_jiffies * 2, + cp->cp_reconnect_jiffies = min(cp->cp_reconnect_jiffies * 2, rds_sysctl_reconnect_max_jiffies); } void rds_connect_worker(struct work_struct *work) { - struct rds_connection *conn = container_of(work, struct rds_connection, c_conn_w.work); + struct rds_conn_path *cp = container_of(work, + struct rds_conn_path, + cp_conn_w.work); + struct rds_connection *conn = cp->cp_conn; int ret; + bool is_tcp = conn->c_trans->t_type == RDS_TRANS_TCP; - clear_bit(RDS_RECONNECT_PENDING, &conn->c_flags); - if (rds_conn_transition(conn, RDS_CONN_DOWN, RDS_CONN_CONNECTING)) { + if (is_tcp && cp->cp_index > 1 && + cp->cp_conn->c_laddr > cp->cp_conn->c_faddr) + return; + clear_bit(RDS_RECONNECT_PENDING, &cp->cp_flags); + ret = rds_conn_path_transition(cp, RDS_CONN_DOWN, RDS_CONN_CONNECTING); + if (ret) { /* * record the time we started trying to connect so that we can * drop the connection if it doesn't work out after a while */ - conn->c_connection_start = get_seconds(); - conn->c_drop_source = DR_DEFAULT; + cp->cp_connection_start = get_seconds(); + cp->cp_drop_source = DR_DEFAULT; - ret = conn->c_trans->conn_connect(conn); + ret = conn->c_trans->conn_path_connect(cp); rds_rtd(RDS_RTD_CM_EXT, "conn %p for %pI4 to %pI4 tos %d dispatched, ret %d\n", conn, &conn->c_laddr, &conn->c_faddr, conn->c_tos, ret); if (ret) { - if (rds_conn_transition(conn, RDS_CONN_CONNECTING, RDS_CONN_DOWN)) { + if (rds_conn_path_transition(cp, + RDS_CONN_CONNECTING, + RDS_CONN_DOWN)) { rds_rtd(RDS_RTD_CM_EXT, "reconnecting..., conn %p\n", conn); - rds_queue_reconnect(conn); + rds_queue_reconnect(cp); } else { - rds_conn_drop(conn, DR_CONN_CONNECT_FAIL); + rds_conn_path_drop(cp, DR_CONN_CONNECT_FAIL); } } } else { @@ -197,22 +215,24 @@ void rds_connect_worker(struct work_struct *work) void rds_send_worker(struct work_struct *work) { - struct rds_connection *conn = container_of(work, struct rds_connection, c_send_w.work); + struct rds_conn_path *cp = container_of(work, + struct rds_conn_path, + cp_send_w.work); int ret; - if (rds_conn_state(conn) == RDS_CONN_UP) { - clear_bit(RDS_LL_SEND_FULL, &conn->c_flags); - ret = rds_send_xmit(conn); + if (rds_conn_path_state(cp) == RDS_CONN_UP) { + clear_bit(RDS_LL_SEND_FULL, &cp->cp_flags); + ret = rds_send_xmit(cp); cond_resched(); - rds_rtd(RDS_RTD_SND_EXT, "conn %p ret %d\n", conn, ret); + rds_rtd(RDS_RTD_SND_EXT, "conn %p ret %d\n", cp->cp_conn, ret); switch (ret) { case -EAGAIN: rds_stats_inc(s_send_immediate_retry); - queue_delayed_work(conn->c_wq, &conn->c_send_w, 0); + queue_delayed_work(cp->cp_wq, &cp->cp_send_w, 0); break; case -ENOMEM: rds_stats_inc(s_send_delayed_retry); - queue_delayed_work(conn->c_wq, &conn->c_send_w, 2); + queue_delayed_work(cp->cp_wq, &cp->cp_send_w, 2); default: break; } @@ -221,20 +241,22 @@ void rds_send_worker(struct work_struct *work) void rds_recv_worker(struct work_struct *work) { - struct rds_connection *conn = container_of(work, struct rds_connection, c_recv_w.work); + struct rds_conn_path *cp = container_of(work, + struct rds_conn_path, + cp_recv_w.work); int ret; - if (rds_conn_state(conn) == RDS_CONN_UP) { - ret = conn->c_trans->recv(conn); - rds_rtd(RDS_RTD_RCV_EXT, "conn %p ret %d\n", conn, ret); + if (rds_conn_path_state(cp) == RDS_CONN_UP) { + ret = cp->cp_conn->c_trans->recv_path(cp); + rds_rtd(RDS_RTD_RCV_EXT, "conn %p ret %d\n", cp->cp_conn, ret); switch (ret) { case -EAGAIN: rds_stats_inc(s_recv_immediate_retry); - queue_delayed_work(conn->c_wq, &conn->c_recv_w, 0); + queue_delayed_work(cp->cp_wq, &cp->cp_recv_w, 0); break; case -ENOMEM: rds_stats_inc(s_recv_delayed_retry); - queue_delayed_work(conn->c_wq, &conn->c_recv_w, 2); + queue_delayed_work(cp->cp_wq, &cp->cp_recv_w, 2); default: break; } @@ -243,90 +265,108 @@ void rds_recv_worker(struct work_struct *work) void rds_reject_worker(struct work_struct *work) { - struct rds_connection *conn = container_of(work, struct rds_connection, c_reject_w.work); - - atomic_set(&conn->c_state, RDS_CONN_ERROR); - rds_rtd(RDS_RTD_CM, "calling rds_conn_shutdown, conn %p:0\n", conn); - rds_conn_shutdown(conn, 0); - rds_route_to_base(conn); + struct rds_conn_path *cp = container_of(work, + struct rds_conn_path, + cp_reject_w.work); + + WARN_ON(cp->cp_conn->c_trans->t_mp_capable); + atomic_set(&cp->cp_state, RDS_CONN_ERROR); + rds_rtd(RDS_RTD_CM, "calling rds_conn_shutdown, conn %p:0\n", + cp->cp_conn); + rds_conn_shutdown(cp, 0); + rds_route_to_base(cp->cp_conn); } void rds_hb_worker(struct work_struct *work) { - struct rds_connection *conn = container_of(work, struct rds_connection, c_hb_w.work); + struct rds_conn_path *cp = container_of(work, + struct rds_conn_path, + cp_hb_w.work); unsigned long now = get_seconds(); int ret; + struct rds_connection *conn = cp->cp_conn; - if (!rds_conn_hb_timeout || conn->c_loopback) + if (!rds_conn_hb_timeout || conn->c_loopback || + conn->c_trans->t_type == RDS_TRANS_TCP) return; - if (rds_conn_state(conn) == RDS_CONN_UP) { - if (!conn->c_hb_start) { - ret = rds_send_hb(conn, 0); + if (rds_conn_path_state(cp) == RDS_CONN_UP) { + if (!cp->cp_hb_start) { + ret = rds_send_hb(cp->cp_conn, 0); if (ret) { rds_rtd(RDS_RTD_ERR_EXT, "RDS/IB: rds_hb_worker: failed %d\n", ret); return; } - conn->c_hb_start = now; - } else if (now - conn->c_hb_start > rds_conn_hb_timeout) { + cp->cp_hb_start = now; + } else if (now - cp->cp_hb_start > rds_conn_hb_timeout) { rds_rtd(RDS_RTD_CM, "RDS/IB: connection <%u.%u.%u.%u,%u.%u.%u.%u,%d> timed out (0x%lx,0x%lx)..discon and recon\n", NIPQUAD(conn->c_laddr), NIPQUAD(conn->c_faddr), - conn->c_tos, conn->c_hb_start, now); - rds_conn_drop(conn, DR_HB_TIMEOUT); + conn->c_tos, cp->cp_hb_start, now); + rds_conn_path_drop(cp, DR_HB_TIMEOUT); return; } - queue_delayed_work(conn->c_wq, &conn->c_hb_w, HZ); + queue_delayed_work(cp->cp_wq, &cp->cp_hb_w, HZ); } } void rds_reconnect_timeout(struct work_struct *work) { - struct rds_connection *conn = - container_of(work, struct rds_connection, c_reconn_w.work); + struct rds_conn_path *cp = container_of(work, + struct rds_conn_path, + cp_reconn_w.work); + struct rds_connection *conn = cp->cp_conn; - if (conn->c_reconnect_retry_count > rds_sysctl_reconnect_max_retries) { + if (cp->cp_reconnect_retry_count > rds_sysctl_reconnect_max_retries) { pr_info("RDS: connection <%pI4,%pI4,%d> reconnect retries(%d) exceeded, stop retry\n", &conn->c_laddr, &conn->c_faddr, conn->c_tos, - conn->c_reconnect_retry_count); + cp->cp_reconnect_retry_count); return; } - if (!rds_conn_up(conn)) { - if (rds_conn_state(conn) == RDS_CONN_DISCONNECTING) { - queue_delayed_work(conn->c_wq, &conn->c_reconn_w, + if (!rds_conn_path_up(cp)) { + if (rds_conn_path_state(cp) == RDS_CONN_DISCONNECTING) { + queue_delayed_work(cp->cp_wq, &cp->cp_reconn_w, msecs_to_jiffies(100)); } else { - conn->c_reconnect_retry_count++; + cp->cp_reconnect_retry_count++; rds_rtd(RDS_RTD_CM, "conn <%pI4,%pI4,%d> not up, retry(%d)\n", &conn->c_laddr, &conn->c_faddr, conn->c_tos, - conn->c_reconnect_retry_count); - queue_delayed_work(conn->c_wq, &conn->c_reconn_w, - msecs_to_jiffies(conn->c_reconnect_retry)); - set_bit(RDS_RECONNECT_TIMEDOUT, &conn->c_reconn_flags); - rds_conn_drop(conn, DR_RECONNECT_TIMEOUT); + cp->cp_reconnect_retry_count); + queue_delayed_work(cp->cp_wq, &cp->cp_reconn_w, + msecs_to_jiffies( + cp->cp_reconnect_retry)); + set_bit(RDS_RECONNECT_TIMEDOUT, &cp->cp_reconn_flags); + rds_conn_path_drop(cp, DR_RECONNECT_TIMEOUT); } } } void rds_shutdown_worker(struct work_struct *work) { - struct rds_connection *conn = container_of(work, struct rds_connection, c_down_w); + struct rds_conn_path *cp = container_of(work, + struct rds_conn_path, + cp_down_w); unsigned long now = get_seconds(); - - if ((now - conn->c_reconnect_start > rds_sysctl_shutdown_trace_start_time) && - (now - conn->c_reconnect_start < rds_sysctl_shutdown_trace_end_time)) - pr_info("RDS/IB: connection <%u.%u.%u.%u,%u.%u.%u.%u,%d> " + bool is_tcp = cp->cp_conn->c_trans->t_type == RDS_TRANS_TCP; + struct rds_connection *conn = cp->cp_conn; + + if ((now - cp->cp_reconnect_start > + rds_sysctl_shutdown_trace_start_time) && + (now - cp->cp_reconnect_start < + rds_sysctl_shutdown_trace_end_time)) + pr_info("RDS/%s: connection <%u.%u.%u.%u,%u.%u.%u.%u,%d> " "shutdown init due to '%s'\n", + (is_tcp ? "TCP" : "IB"), NIPQUAD(conn->c_laddr), NIPQUAD(conn->c_faddr), conn->c_tos, - conn_drop_reason_str(conn->c_drop_source)); + conn_drop_reason_str(cp->cp_drop_source)); - rds_conn_shutdown(conn, 1); + rds_conn_shutdown(cp, 1); } void rds_threads_exit(void) -- 2.50.1