From 0450787ea5432e4243c9e751d65ab692bfa6a427 Mon Sep 17 00:00:00 2001 From: Dotan Barak Date: Mon, 24 Sep 2012 20:25:51 +0200 Subject: [PATCH] rds: Add Automatic Path Migration support RDS APM supports automatic connection failover in case of path failure, and connection failback when the path recovers. RDS APM is enabled by module parameter rds_ib_enable_apm (disabled by default). Signed-off-by: Bang Nguyen --- include/linux/rds.h | 2 +- net/rds/af_rds.c | 4 +- net/rds/connection.c | 1 + net/rds/ib.c | 739 ++++++++++++++++++++++++++++++++++----- net/rds/ib.h | 106 +++++- net/rds/ib_cm.c | 220 +++++++++++- net/rds/ib_rdma.c | 90 +++-- net/rds/ib_recv.c | 8 +- net/rds/ib_ring.c | 1 - net/rds/ib_stats.c | 18 +- net/rds/rdma_transport.c | 49 ++- net/rds/rds.h | 13 + net/rds/recv.c | 10 +- net/rds/send.c | 46 +++ net/rds/threads.c | 52 ++- 15 files changed, 1200 insertions(+), 159 deletions(-) diff --git a/include/linux/rds.h b/include/linux/rds.h index ebc35f1631ae5..ec0a19475a3b9 100644 --- a/include/linux/rds.h +++ b/include/linux/rds.h @@ -62,7 +62,7 @@ /* * ioctl commands for SOL_RDS */ -#define RDS_IOC_SET_TOS 1 +#define SIOCRDSSETTOS (SIOCPROTOPRIVATE) typedef u_int8_t rds_tos_t; diff --git a/net/rds/af_rds.c b/net/rds/af_rds.c index c39181a0b4446..6909969bbcd48 100644 --- a/net/rds/af_rds.c +++ b/net/rds/af_rds.c @@ -208,7 +208,7 @@ static int rds_ioctl(struct socket *sock, unsigned int cmd, unsigned long arg) return -EFAULT; switch (cmd) { - case RDS_IOC_SET_TOS: + case SIOCRDSSETTOS: spin_lock_irqsave(&rds_sock_lock, flags); if (rs->rs_tos || rs->rs_conn) { spin_unlock_irqrestore(&rds_sock_lock, flags); @@ -218,7 +218,7 @@ static int rds_ioctl(struct socket *sock, unsigned int cmd, unsigned long arg) spin_unlock_irqrestore(&rds_sock_lock, flags); break; default: - return -ENOPROTOOPT; + return -ENOIOCTLCMD; } return 0; diff --git a/net/rds/connection.c b/net/rds/connection.c index ae30085b6246f..a97080e174443 100644 --- a/net/rds/connection.c +++ b/net/rds/connection.c @@ -206,6 +206,7 @@ static struct rds_connection *__rds_conn_create(__be32 laddr, __be32 faddr, INIT_DELAYED_WORK(&conn->c_send_w, rds_send_worker); INIT_DELAYED_WORK(&conn->c_recv_w, rds_recv_worker); INIT_DELAYED_WORK(&conn->c_conn_w, rds_connect_worker); + INIT_DELAYED_WORK(&conn->c_hb_w, rds_hb_worker); INIT_WORK(&conn->c_down_w, rds_shutdown_worker); mutex_init(&conn->c_cm_lock); conn->c_flags = 0; diff --git a/net/rds/ib.c b/net/rds/ib.c index 7eb3cab35df56..1d24eebdc04f0 100644 --- a/net/rds/ib.c +++ b/net/rds/ib.c @@ -37,20 +37,35 @@ #include #include #include +#include +#include +#include #include "rds.h" #include "ib.h" -unsigned int fmr_pool_size = RDS_FMR_POOL_SIZE; -unsigned int fmr_message_size = RDS_FMR_SIZE + 1; /* +1 allows for unaligned MRs */ +unsigned int rds_ib_fmr_1m_pool_size = RDS_FMR_1M_POOL_SIZE; +unsigned int rds_ib_fmr_8k_pool_size = RDS_FMR_8K_POOL_SIZE; unsigned int rds_ib_retry_count = RDS_IB_DEFAULT_RETRY_COUNT; - -module_param(fmr_pool_size, int, 0444); -MODULE_PARM_DESC(fmr_pool_size, " Max number of fmr per HCA"); -module_param(fmr_message_size, int, 0444); -MODULE_PARM_DESC(fmr_message_size, " Max size of a RDMA transfer"); +unsigned int rds_ib_apm_enable = 0; +unsigned int rds_ib_active_active_enabled = 0; +unsigned int rds_ib_timeout = RDS_IB_DEFAULT_TIMEOUT; +unsigned int rds_ib_rnr_retry_count = RDS_IB_DEFAULT_RNR_RETRY_COUNT; + +module_param(rds_ib_fmr_1m_pool_size, int, 0444); +MODULE_PARM_DESC(rds_ib_fmr_1m_pool_size, " Max number of 1m fmr per HCA"); +module_param(rds_ib_fmr_8k_pool_size, int, 0444); +MODULE_PARM_DESC(rds_ib_fmr_8k_pool_size, " Max number of 8k fmr per HCA"); module_param(rds_ib_retry_count, int, 0444); MODULE_PARM_DESC(rds_ib_retry_count, " Number of hw retries before reporting an error"); +module_param(rds_ib_apm_enable, int, 0444); +MODULE_PARM_DESC(rds_ib_apm_enable, " Enable APM"); +module_param(rds_ib_active_active_enabled, int, 0444); +MODULE_PARM_DESC(rds_ib_active_active_enabled, " Active/Active enabled"); +module_param(rds_ib_timeout, int, 0444); +MODULE_PARM_DESC(rds_ib_timeout, " QP timeout"); +module_param(rds_ib_rnr_retry_count, int, 0444); +MODULE_PARM_DESC(rds_ib_timeout, " QP rnr retry count"); /* * we have a clumsy combination of RCU and a rwsem protecting this list @@ -64,6 +79,11 @@ struct list_head rds_ib_devices; DEFINE_SPINLOCK(ib_nodev_conns_lock); LIST_HEAD(ib_nodev_conns); +struct workqueue_struct *rds_aux_wq; +EXPORT_SYMBOL_GPL(rds_aux_wq); + +struct socket *rds_ib_inet_socket; + void rds_ib_nodev_connect(void) { struct rds_ib_connection *ic; @@ -95,8 +115,10 @@ static void rds_ib_dev_free(struct work_struct *work) struct rds_ib_device *rds_ibdev = container_of(work, struct rds_ib_device, free_work); - if (rds_ibdev->mr_pool) - rds_ib_destroy_mr_pool(rds_ibdev->mr_pool); + if (rds_ibdev->mr_8k_pool) + rds_ib_destroy_mr_pool(rds_ibdev->mr_8k_pool); + if (rds_ibdev->mr_1m_pool) + rds_ib_destroy_mr_pool(rds_ibdev->mr_1m_pool); if (rds_ibdev->mr) ib_dereg_mr(rds_ibdev->mr); if (rds_ibdev->pd) @@ -118,86 +140,6 @@ void rds_ib_dev_put(struct rds_ib_device *rds_ibdev) queue_work(rds_wq, &rds_ibdev->free_work); } -void rds_ib_add_one(struct ib_device *device) -{ - struct rds_ib_device *rds_ibdev; - struct ib_device_attr *dev_attr; - - /* Only handle IB (no iWARP) devices */ - if (device->node_type != RDMA_NODE_IB_CA) - return; - - dev_attr = kmalloc(sizeof *dev_attr, GFP_KERNEL); - if (!dev_attr) - return; - - if (ib_query_device(device, dev_attr)) { - rdsdebug("Query device failed for %s\n", device->name); - goto free_attr; - } - - rds_ibdev = kzalloc_node(sizeof(struct rds_ib_device), GFP_KERNEL, - ibdev_to_node(device)); - if (!rds_ibdev) - goto free_attr; - - spin_lock_init(&rds_ibdev->spinlock); - atomic_set(&rds_ibdev->refcount, 1); - INIT_WORK(&rds_ibdev->free_work, rds_ib_dev_free); - - rds_ibdev->max_wrs = dev_attr->max_qp_wr; - rds_ibdev->max_sge = min(dev_attr->max_sge, RDS_IB_MAX_SGE); - - rds_ibdev->fmr_max_remaps = dev_attr->max_map_per_fmr?: 32; - rds_ibdev->max_fmrs = dev_attr->max_fmr ? - min_t(unsigned int, dev_attr->max_fmr, fmr_pool_size) : - fmr_pool_size; - - rds_ibdev->max_initiator_depth = dev_attr->max_qp_init_rd_atom; - rds_ibdev->max_responder_resources = dev_attr->max_qp_rd_atom; - - INIT_LIST_HEAD(&rds_ibdev->ipaddr_list); - INIT_LIST_HEAD(&rds_ibdev->conn_list); - - rds_ibdev->dev = device; - rds_ibdev->pd = ib_alloc_pd(device); - if (IS_ERR(rds_ibdev->pd)) { - rds_ibdev->pd = NULL; - goto put_dev; - } - - rds_ibdev->mr = ib_get_dma_mr(rds_ibdev->pd, IB_ACCESS_LOCAL_WRITE); - if (IS_ERR(rds_ibdev->mr)) { - rds_ibdev->mr = NULL; - goto put_dev; - } - - rds_ibdev->mr_pool = rds_ib_create_mr_pool(rds_ibdev); - if (IS_ERR(rds_ibdev->mr_pool)) { - rds_ibdev->mr_pool = NULL; - goto put_dev; - } - - rds_ibdev->srq = kmalloc(sizeof(struct rds_ib_srq), GFP_KERNEL); - if (!rds_ibdev->srq) - goto free_attr; - - down_write(&rds_ib_devices_lock); - list_add_tail_rcu(&rds_ibdev->list, &rds_ib_devices); - up_write(&rds_ib_devices_lock); - atomic_inc(&rds_ibdev->refcount); - - ib_set_client_data(device, &rds_ib_client, rds_ibdev); - atomic_inc(&rds_ibdev->refcount); - - rds_ib_nodev_connect(); - -put_dev: - rds_ib_dev_put(rds_ibdev); -free_attr: - kfree(dev_attr); -} - /* * New connections use this to find the device to associate with the * connection. It's not in the fast path so we're not concerned about the @@ -356,6 +298,601 @@ static int rds_ib_laddr_check(__be32 addr) return ret; } +static int rds_ib_move_ip(char *from_dev, char *to_dev, __be32 addr, int failover) +{ + struct ifreq *ir; + struct sockaddr_in *sin; + __be32 down_ip, down_bcast, down_mask; + struct page *page; + char from_dev2[2*IFNAMSIZ + 1]; + char to_dev2[2*IFNAMSIZ + 1]; + int ret = 0; + + page = alloc_page(GFP_HIGHUSER); + if (!page) { + printk(KERN_ERR "RDS/IB: alloc_page failed .. NO MEM\n"); + ret = -ENOMEM; + goto out; + } + + ir = (struct ifreq *)kmap(page); + memset(ir, 0, sizeof(struct ifreq)); + sin = (struct sockaddr_in *)&ir->ifr_addr; + + if (failover) { + strcpy(to_dev2, to_dev); + strcat(to_dev2, ":"); + strcat(to_dev2, from_dev); + to_dev2[IFNAMSIZ-1] = 0; + strcpy(from_dev2, from_dev); + } else { + strcpy(from_dev2, from_dev); + strcat(from_dev2, ":"); + strcat(from_dev2, to_dev); + from_dev2[IFNAMSIZ-1] = 0; + strcpy(to_dev2, to_dev); + } + + strcpy(ir->ifr_ifrn.ifrn_name, from_dev2); + ret = inet_ioctl(rds_ib_inet_socket, SIOCGIFADDR, (unsigned long) ir); + if (ret) { + printk(KERN_ERR + "RDS/IB: inet_ioctl(SIOCGIFADDR) failed (%d)\n", + ret); + goto out; + } + down_ip = sin->sin_addr.s_addr; + if (addr != down_ip) { + printk(KERN_ERR + "RDS/IP: %u.%u.%u.%u not configured on %s\n", + NIPQUAD(addr), ir->ifr_ifrn.ifrn_name); + goto out; + } + + ret = inet_ioctl(rds_ib_inet_socket, SIOCGIFBRDADDR, (unsigned long) ir); + if (ret) { + printk(KERN_ERR + "RDS/IB: inet_ioctl(SIOCGIFBRDADDR) failed (%d)\n", + ret); + goto out; + } + down_bcast = sin->sin_addr.s_addr; + + ret = inet_ioctl(rds_ib_inet_socket, SIOCGIFNETMASK, (unsigned long) ir); + if (ret) { + printk(KERN_ERR + "RDS/IB: inet_ioctl(SIOCGIFNETMASK) failed (%d)\n", + ret); + goto out; + } + down_mask = sin->sin_addr.s_addr; + + /* Clear IP on down Interface */ + sin->sin_addr.s_addr = 0; + ret = inet_ioctl(rds_ib_inet_socket, SIOCSIFADDR, (unsigned long) ir); + if (ret) { + printk(KERN_ERR + "RDS/IB: inet_ioctl(SIOCSIFADDR) failed (%d)\n", + ret); + goto out; + } + + memset(ir, 0, sizeof(struct ifreq)); + strcpy(ir->ifr_ifrn.ifrn_name, to_dev2); + sin->sin_family = AF_INET; + + sin->sin_addr.s_addr = down_ip; + ret = inet_ioctl(rds_ib_inet_socket, SIOCSIFADDR, (unsigned long) ir); + if (ret) { + printk(KERN_ERR + "RDS/IB: inet_ioctl(SIOCSIFADDR) failed (%d)\n", + ret); + goto out; + } + + sin->sin_addr.s_addr = down_bcast; + ret = inet_ioctl(rds_ib_inet_socket, SIOCSIFBRDADDR, (unsigned long) ir); + if (ret) { + printk(KERN_ERR + "RDS/IB: inet_ioctl(SIOCSIFBRDADDR) failed (%d)\n", + ret); + goto out; + } + + sin->sin_addr.s_addr = down_mask; + ret = inet_ioctl(rds_ib_inet_socket, SIOCSIFNETMASK, (unsigned long) ir); + if (ret) { + printk(KERN_ERR + "RDS/IB: inet_ioctl(SIOCSIFBRDADDR) failed (%d)\n", + ret); + goto out; + } + + if (failover) + printk(KERN_NOTICE + "RDS/IB: IP %u.%u.%u.%u migrated over to %s\n", + NIPQUAD(down_ip), ir->ifr_ifrn.ifrn_name); + else + printk(KERN_NOTICE + "RDS/IB: IP %u.%u.%u.%u migrated back to %s\n", + NIPQUAD(down_ip), ir->ifr_ifrn.ifrn_name); +out: + kunmap(page); + __free_page(page); + + return ret; +} + +static void rds_ib_set_port(struct ib_device *ib_dev, struct net_device *net_dev, char *if_name, u8 port_num, __be32 ip_addr) +{ + struct rds_ib_device *rds_ibdev; + u8 active_port; + unsigned int idx; + + active_port = net_dev->operstate == IF_OPER_UP ? port_num : 0; + + list_for_each_entry(rds_ibdev, &rds_ib_devices, list) { + if (rds_ibdev->dev == ib_dev) { + if (!strcmp(net_dev->name, if_name)) { + strcpy(rds_ibdev->ports[port_num].if_name, + if_name); + rds_ibdev->ports[port_num].ip_addr = ip_addr; + rds_ibdev->ports[port_num].active_port = + active_port; + } else { + idx = rds_ibdev->ports[port_num].alias_cnt++; + strcpy(rds_ibdev->ports[port_num]. + aliases[idx].if_name, if_name); + rds_ibdev->ports[port_num]. + aliases[idx].ip_addr = ip_addr; + } + break; + } + } +} + +static void rds_ib_do_failover(struct rds_ib_device *rds_ibdev, u8 port) +{ + u8 i, j; + + for (i = 1; i <= rds_ibdev->dev->phys_port_cnt; i++) { + if (port != i && i == rds_ibdev->ports[i].active_port) { + if (rds_ib_move_ip( + rds_ibdev->ports[port].if_name, + rds_ibdev->ports[i].if_name, + rds_ibdev->ports[port].ip_addr, + 1)) { + printk(KERN_ERR "RDS/IP: failed to move IP " + "%u.%u.%u.%u from %s over to %s\n", + NIPQUAD(rds_ibdev->ports[port].ip_addr), rds_ibdev->ports[port].if_name, + rds_ibdev->ports[i].if_name); + } else { + rds_ibdev->ports[port].active_port = i; + + for (j = 0; j < rds_ibdev->ports[port].alias_cnt; j++) { + if (rds_ib_move_ip( + rds_ibdev->ports[port]. + aliases[j].if_name, + rds_ibdev->ports[i].if_name, + rds_ibdev->ports[port]. + aliases[j].ip_addr, + 1)) { + printk(KERN_ERR "RDS/IP: failed to move alias IP " + "%u.%u.%u.%u from %s over to %s\n", + NIPQUAD(rds_ibdev->ports[port].aliases[j].ip_addr), + rds_ibdev->ports[port].aliases[j].if_name, + rds_ibdev->ports[i].if_name); + } + } + break; + } + } + } +} + +static void rds_ib_do_failback(struct rds_ib_device *rds_ibdev, u8 port) +{ + u8 active_port = rds_ibdev->ports[port].active_port; + u8 j; + + if (port != rds_ibdev->ports[port].active_port) { + if (rds_ib_move_ip( + rds_ibdev->ports[active_port].if_name, + rds_ibdev->ports[port].if_name, + rds_ibdev->ports[port].ip_addr, + 0)) { + printk(KERN_ERR "RDS/IP: failed to move IP " + "%u.%u.%u.%u from %s back to %s\n", + NIPQUAD(rds_ibdev->ports[port].ip_addr), + rds_ibdev->ports[active_port].if_name, + rds_ibdev->ports[port].if_name); + } else { + for (j = 0; j < rds_ibdev->ports[port].alias_cnt; j++) { + if (rds_ib_move_ip( + rds_ibdev->ports[active_port].if_name, + rds_ibdev->ports[port]. + aliases[j].if_name, + rds_ibdev->ports[port]. + aliases[j].ip_addr, + 0)) { + printk(KERN_ERR "RDS/IP: failed to move alias IP " + "%u.%u.%u.%u from %s back to %s\n", + NIPQUAD(rds_ibdev->ports[port].aliases[j].ip_addr), + rds_ibdev->ports[active_port].if_name, + rds_ibdev->ports[port].aliases[j].if_name); + } + } + rds_ibdev->ports[port].active_port = port; + if (!rds_ibdev->ports[active_port].active_port) { + if (rds_ib_move_ip( + rds_ibdev->ports[active_port].if_name, + rds_ibdev->ports[port].if_name, + rds_ibdev->ports[active_port].ip_addr, + 1)) { + printk(KERN_ERR "RDS/IP: failed to move IP %u.%u.%u.%u from %s to %s\n", + NIPQUAD(rds_ibdev->ports[active_port].ip_addr), + rds_ibdev->ports[active_port].if_name, + rds_ibdev->ports[port].if_name); + } else { + for (j = 0; j < rds_ibdev->ports[active_port].alias_cnt; j++) { + if (rds_ib_move_ip( + rds_ibdev->ports[active_port].aliases[j].if_name, + rds_ibdev->ports[port].if_name, + rds_ibdev->ports[active_port].aliases[j].ip_addr, + 1)) { + printk(KERN_ERR "RDS/IP: failed to move alias IP %u.%u.%u.%u from %s to %s\n", + NIPQUAD(rds_ibdev->ports[active_port].aliases[j].ip_addr), + rds_ibdev->ports[active_port].aliases[j].if_name, + rds_ibdev->ports[port].if_name); + } + } + rds_ibdev->ports[active_port].active_port = port; + } + } + } + } +} + +static void rds_ib_failover(struct work_struct *_work) +{ + struct rds_ib_port_ud_work *work = + container_of(_work, struct rds_ib_port_ud_work, work.work); + struct rds_ib_device *rds_ibdev = work->rds_ibdev; + + if (rds_ibdev->ports[work->port].ip_addr) + rds_ib_do_failover(rds_ibdev, work->port); + + if (rds_ibdev->ports[work->port].active_port == work->port) + rds_ibdev->ports[work->port].active_port = 0; + + kfree(work); +} + +static void rds_ib_failback(struct work_struct *_work) +{ + struct rds_ib_port_ud_work *work = + container_of(_work, struct rds_ib_port_ud_work, work.work); + struct rds_ib_device *rds_ibdev = work->rds_ibdev; + u8 i; + + if (rds_ibdev->ports[work->port].ip_addr && + rds_ibdev->ports[work->port].active_port) + rds_ib_do_failback(rds_ibdev, work->port); + + rds_ibdev->ports[work->port].active_port = work->port; + + for (i = 1; i <= rds_ibdev->dev->phys_port_cnt; i++) { + if (i != work->port && rds_ibdev->ports[i].ip_addr && + !rds_ibdev->ports[i].active_port) { + rds_ib_do_failover(rds_ibdev, i); + } + } + + kfree(work); +} + +static void rds_ib_event_handler(struct ib_event_handler *handler, struct ib_event *event) +{ + struct rds_ib_device *rds_ibdev = + container_of(handler, typeof(*rds_ibdev), event_handler); + u8 port = event->element.port_num; + struct rds_ib_port_ud_work *work; + + if (!rds_ib_active_active_enabled) + return; + + if (event->event != IB_EVENT_PORT_ACTIVE && + event->event != IB_EVENT_PORT_ERR) + return; + + printk(KERN_NOTICE "RDS/IB: port %s/%d is %s\n", event->device->name, + port, (event->event == IB_EVENT_PORT_ACTIVE) ? "UP" : "DOWN"); + + work = kzalloc(sizeof *work, GFP_KERNEL); + if (!work) { + printk(KERN_ERR "RDS/IB: failed to allocate port work\n"); + return; + } + + work->rds_ibdev = rds_ibdev; + work->port = port; + + if (event->event == IB_EVENT_PORT_ACTIVE) { + INIT_DELAYED_WORK(&work->work, rds_ib_failback); + queue_delayed_work(rds_wq, &work->work, 0); + } else { + INIT_DELAYED_WORK(&work->work, rds_ib_failover); + queue_delayed_work(rds_wq, &work->work, 0); + } +} + +static void rds_ib_check_down_port(void) +{ + struct rds_ib_device *rds_ibdev; + struct rds_ib_port_ud_work *work; + u8 i; + + list_for_each_entry(rds_ibdev, &rds_ib_devices, list) { + for (i = 1; i <= rds_ibdev->dev->phys_port_cnt; i++) { + if (!rds_ibdev->ports[i].active_port && + rds_ibdev->ports[i].ip_addr) { + printk(KERN_NOTICE + "RDS/IB: port %s/%d is DOWN\n", + rds_ibdev->dev->name, i); + + work = kzalloc(sizeof *work, GFP_KERNEL); + if (!work) { + printk(KERN_ERR + "RDS/IB: failed to allocate port work\n"); + return; + } + + work->rds_ibdev = rds_ibdev; + work->port = i; + INIT_DELAYED_WORK(&work->work, rds_ib_failover); queue_delayed_work(rds_wq, &work->work, 0); + } + } + } + flush_workqueue(rds_wq); +} + +static void rds_ib_print_port(void) +{ + struct rds_ib_device *rds_ibdev; + int i, j; + + list_for_each_entry(rds_ibdev, &rds_ib_devices, list) { + for (i = 1; i <= rds_ibdev->dev->phys_port_cnt; i++) { + if (!rds_ibdev->ports[i].ip_addr) + continue; + rdsdebug("Device %s / Port %d: name %s, " + "IP %d.%d.%d.%d, active_port %d\n", + rds_ibdev->dev->name, i, + rds_ibdev->ports[i].if_name, + NIPQUAD(rds_ibdev->ports[i].ip_addr), + rds_ibdev->ports[i].active_port); + + for (j = 0; j < rds_ibdev->ports[i].alias_cnt; j++) { + rdsdebug("Alias %s IP %d.%d.%d.%d\n", + rds_ibdev->ports[i].aliases[j].if_name, + NIPQUAD(rds_ibdev->ports[i]. + aliases[j].ip_addr)); + } + } + } +} + +static void rds_ib_check_up_port(void) +{ + struct net_device *dev; + int downs; + int retries = 0; + +retry: + downs = 0; + read_lock(&dev_base_lock); + for_each_netdev(&init_net, dev) { + if ((dev->type == ARPHRD_INFINIBAND) && + !(dev->flags & IFF_MASTER)) { + if (dev->operstate != IF_OPER_UP) + downs++; + } + } + read_unlock(&dev_base_lock); + + if (downs) { + if (retries++ <= 60) { + msleep(1000); + goto retry; + } else { + printk(KERN_ERR "RDS/IB: Some port(s) not operational\n"); + } + } +} + +static int rds_ib_init_port(void) +{ + struct net_device *dev; + struct in_ifaddr *ifa; + struct in_ifaddr **ifap; + struct in_device *in_dev; + struct rdma_cm_id *cm_id; + struct sockaddr_in sin; + struct rdma_dev_addr *dev_addr; + union ib_gid gid; + u8 port_num; + int ret = 0; + + if (!rds_ib_active_active_enabled) + return ret; + + rds_ib_check_up_port(); + + read_lock(&dev_base_lock); + for_each_netdev(&init_net, dev) { + in_dev = in_dev_get(dev); + if ((dev->type == ARPHRD_INFINIBAND) && + !(dev->flags & IFF_MASTER) && + in_dev) { + for (ifap = &in_dev->ifa_list; (ifa = *ifap); + ifap = &ifa->ifa_next) { + + cm_id = rdma_create_id(NULL, NULL, + RDMA_PS_TCP); + ret = (IS_ERR(cm_id)); + if (ret) { + printk(KERN_ERR "RDS/IB: rdma_create_id failed\n"); + goto out; + } + memset(&sin, 0, sizeof(sin)); + sin.sin_family = AF_INET; + sin.sin_addr.s_addr = ifa->ifa_address; + ret = rdma_bind_addr(cm_id, + (struct sockaddr *)&sin); + if (ret) { + printk(KERN_ERR "RDS/IB: rdma_bind_addr failed\n"); + rdma_destroy_id(cm_id); + goto out; + } + dev_addr = &cm_id->route.addr.dev_addr; + memcpy(&gid, dev_addr->src_dev_addr + + rdma_addr_gid_offset(dev_addr), sizeof gid); + ret = ib_find_cached_gid(cm_id->device, &gid, + &port_num, NULL); + if (ret) { + printk(KERN_ERR "RDS/IB: ib_find_cached_gid failed\n"); + rdma_destroy_id(cm_id); + goto out; + } + + rds_ib_set_port(cm_id->device, dev, + ifa->ifa_label, port_num, + ifa->ifa_address); + + rdma_destroy_id(cm_id); + } + } + } + + rds_ib_check_down_port(); + rds_ib_print_port(); +out: + read_unlock(&dev_base_lock); + return ret; +} + +void rds_ib_add_one(struct ib_device *device) +{ + struct rds_ib_device *rds_ibdev; + struct ib_device_attr *dev_attr; + + /* Only handle IB (no iWARP) devices */ + if (device->node_type != RDMA_NODE_IB_CA) + return; + + dev_attr = kmalloc(sizeof *dev_attr, GFP_KERNEL); + if (!dev_attr) + return; + + if (ib_query_device(device, dev_attr)) { + rdsdebug("Query device failed for %s\n", device->name); + goto free_attr; + } + + rds_ibdev = kzalloc_node(sizeof(struct rds_ib_device), GFP_KERNEL, + ibdev_to_node(device)); + if (!rds_ibdev) + goto free_attr; + + spin_lock_init(&rds_ibdev->spinlock); + atomic_set(&rds_ibdev->refcount, 1); + INIT_WORK(&rds_ibdev->free_work, rds_ib_dev_free); + + rds_ibdev->max_wrs = dev_attr->max_qp_wr; + rds_ibdev->max_sge = min(dev_attr->max_sge, RDS_IB_MAX_SGE); + + rds_ibdev->fmr_max_remaps = dev_attr->max_map_per_fmr?: 32; + + rds_ibdev->max_1m_fmrs = dev_attr->max_fmr ? + min_t(unsigned int, (dev_attr->max_fmr / 2), + rds_ib_fmr_1m_pool_size) : + rds_ib_fmr_1m_pool_size; + + rds_ibdev->max_8k_fmrs = dev_attr->max_fmr ? + min_t(unsigned int, ((dev_attr->max_fmr / 2) * 128), + rds_ib_fmr_8k_pool_size) : + rds_ib_fmr_8k_pool_size; + + rds_ibdev->max_initiator_depth = dev_attr->max_qp_init_rd_atom; + rds_ibdev->max_responder_resources = dev_attr->max_qp_rd_atom; + + rds_ibdev->dev = device; + rds_ibdev->pd = ib_alloc_pd(device); + if (IS_ERR(rds_ibdev->pd)) { + rds_ibdev->pd = NULL; + goto put_dev; + } + + if (rds_ib_active_active_enabled) { + rds_ibdev->ports = kzalloc(sizeof(struct rds_ib_port) * + (device->phys_port_cnt + 1), GFP_KERNEL); + if (!rds_ibdev->ports) { + printk(KERN_ERR + "RDS/IB: failed to allocate ports\n"); + goto put_dev; + } + + INIT_IB_EVENT_HANDLER(&rds_ibdev->event_handler, + rds_ibdev->dev, rds_ib_event_handler); + if (ib_register_event_handler(&rds_ibdev->event_handler)) { + printk(KERN_ERR + "RDS/IB: ib_register_event_handler failed\n"); + goto put_dev; + } + } + + rds_ibdev->mr = ib_get_dma_mr(rds_ibdev->pd, IB_ACCESS_LOCAL_WRITE); + if (IS_ERR(rds_ibdev->mr)) { + rds_ibdev->mr = NULL; + goto put_dev; + } + + rds_ibdev->mr_1m_pool = + rds_ib_create_mr_pool(rds_ibdev, RDS_IB_MR_1M_POOL); + if (IS_ERR(rds_ibdev->mr_1m_pool)) { + rds_ibdev->mr_1m_pool = NULL; + goto put_dev; + } + + rds_ibdev->mr_8k_pool = + rds_ib_create_mr_pool(rds_ibdev, RDS_IB_MR_8K_POOL); + if (IS_ERR(rds_ibdev->mr_8k_pool)) { + rds_ibdev->mr_8k_pool = NULL; + goto put_dev; + } + + rds_ibdev->srq = kmalloc(sizeof(struct rds_ib_srq), GFP_KERNEL); + if (!rds_ibdev->srq) + goto free_attr; + + INIT_LIST_HEAD(&rds_ibdev->ipaddr_list); + INIT_LIST_HEAD(&rds_ibdev->conn_list); + + down_write(&rds_ib_devices_lock); + list_add_tail_rcu(&rds_ibdev->list, &rds_ib_devices); + up_write(&rds_ib_devices_lock); + atomic_inc(&rds_ibdev->refcount); + + ib_set_client_data(device, &rds_ib_client, rds_ibdev); + atomic_inc(&rds_ibdev->refcount); + + rds_ib_nodev_connect(); +put_dev: + rds_ib_dev_put(rds_ibdev); +free_attr: + kfree(dev_attr); +} + static void rds_ib_unregister_client(void) { ib_unregister_client(&rds_ib_client); @@ -397,6 +934,7 @@ struct rds_transport rds_ib_transport = { .sync_mr = rds_ib_sync_mr, .free_mr = rds_ib_free_mr, .flush_mrs = rds_ib_flush_mrs, + .check_migration = rds_ib_check_migration, .t_owner = THIS_MODULE, .t_name = "infiniband", .t_type = RDS_TRANS_IB @@ -408,6 +946,15 @@ int rds_ib_init(void) INIT_LIST_HEAD(&rds_ib_devices); + ret = sock_create_kern(PF_INET, SOCK_STREAM, IPPROTO_TCP, + &rds_ib_inet_socket); + if (ret < 0) { + printk(KERN_ERR "RDS/IB: can't create TCP transport socket (%d).\n", -ret); + goto out; + } + + sock_net_set(rds_ib_inet_socket->sk, &init_net); + ret = rds_ib_fmr_init(); if (ret) goto out; @@ -426,7 +973,7 @@ int rds_ib_init(void) ret = rds_ib_srqs_init(); if (ret) { - printk(KERN_ERR "rds_ib_srqs_init failed.\n"); + printk(KERN_ERR "RDS/IB: Failed to init SRQ\n"); goto out_recv; } @@ -436,6 +983,18 @@ int rds_ib_init(void) rds_info_register_func(RDS_INFO_IB_CONNECTIONS, rds_ib_ic_info); + ret = rds_ib_init_port(); + if (ret) { + printk(KERN_ERR "RDS/IB: failed to init port\n"); + goto out_srq; + } + + rds_aux_wq = create_singlethread_workqueue("krdsd_aux"); + if (!rds_aux_wq) { + printk(KERN_ERR "RDS/IB: failed to create aux workqueue\n"); + goto out_srq; + } + goto out; out_srq: diff --git a/net/rds/ib.h b/net/rds/ib.h index 98ff7ea23392c..3c1ef0e4decb3 100644 --- a/net/rds/ib.h +++ b/net/rds/ib.h @@ -8,8 +8,10 @@ #include "rds.h" #include "rdma_transport.h" -#define RDS_FMR_SIZE 256 -#define RDS_FMR_POOL_SIZE 8192 +#define RDS_FMR_1M_POOL_SIZE (8192 / 2) +#define RDS_FMR_1M_MSG_SIZE 256 +#define RDS_FMR_8K_POOL_SIZE 128 * (8192 / 2) +#define RDS_FMR_8K_MSG_SIZE 2 #define RDS_IB_MAX_SGE 8 #define RDS_IB_RECV_SGE 2 @@ -22,12 +24,25 @@ #define RDS_IB_DEFAULT_RETRY_COUNT 1 +#define RDS_IB_DEFAULT_RNR_RETRY_COUNT 7 + +#define RDS_IB_DEFAULT_TIMEOUT 16 /* 4.096 * 2 ^ 16 = 260 msec */ + #define RDS_IB_SUPPORTED_PROTOCOLS 0x00000007 /* minor versions supported */ #define RDS_IB_RECYCLE_BATCH_COUNT 32 #define RDS_IB_SRQ_POST_BATCH_COUNT 64 +#define RDS_IB_GID_FMT "%2.2x%2.2x:%2.2x%2.2x" + +#define RDS_IB_GID_RAW_ARG(gid) ((u8 *)(gid))[12],\ + ((u8 *)(gid))[13],\ + ((u8 *)(gid))[14],\ + ((u8 *)(gid))[15] + +#define RDS_IB_GID_ARG(gid) RDS_IB_GID_RAW_ARG((gid).raw) + #define RDS_WC_MAX 32 extern struct rw_semaphore rds_ib_devices_lock; @@ -114,6 +129,21 @@ struct rds_ib_ack_state { struct rds_ib_device; +struct rds_ib_path { + union ib_gid p_sgid; + union ib_gid p_dgid; +}; + +struct rds_ib_destroy_id_work { + struct delayed_work work; + struct rdma_cm_id *cm_id; +}; + +struct rds_ib_migrate_work { + struct delayed_work work; + struct rds_ib_connection *ic; +}; + struct rds_ib_connection { struct list_head ib_node; @@ -188,6 +218,13 @@ struct rds_ib_connection { atomic_t i_cache_allocs; struct completion i_last_wqe_complete; + + /* APM support */ + struct rds_ib_migrate_work i_migrate_w; + struct rds_ib_path i_pri_path; + struct rds_ib_path i_cur_path; + unsigned int i_alt_path_index; + unsigned int i_active_side; }; /* This assumes that atomic_t is at least 32 bits */ @@ -216,6 +253,31 @@ struct rds_ib_srq { struct delayed_work s_rearm_w; }; +struct rds_ib_alias { + char if_name[IFNAMSIZ]; + __be32 ip_addr; +}; + +#define RDS_IB_MAX_ALIASES 200 +struct rds_ib_port { + char if_name[IFNAMSIZ]; + __be32 ip_addr; + unsigned int active_port; + unsigned int alias_cnt; + struct rds_ib_alias aliases[RDS_IB_MAX_ALIASES]; +}; + +struct rds_ib_port_ud_work { + struct delayed_work work; + struct rds_ib_device *rds_ibdev; + unsigned int port; +}; + +enum { + RDS_IB_MR_8K_POOL, + RDS_IB_MR_1M_POOL, +}; + struct rds_ib_device { struct list_head list; struct list_head ipaddr_list; @@ -223,9 +285,11 @@ struct rds_ib_device { struct ib_device *dev; struct ib_pd *pd; struct ib_mr *mr; - struct rds_ib_mr_pool *mr_pool; + struct rds_ib_mr_pool *mr_1m_pool; + struct rds_ib_mr_pool *mr_8k_pool; unsigned int fmr_max_remaps; - unsigned int max_fmrs; + unsigned int max_8k_fmrs; + unsigned int max_1m_fmrs; int max_sge; unsigned int max_wrs; unsigned int max_initiator_depth; @@ -234,6 +298,8 @@ struct rds_ib_device { atomic_t refcount; struct work_struct free_work; struct rds_ib_srq *srq; + struct rds_ib_port *ports; + struct ib_event_handler event_handler; }; #define pcidev_to_node(pcidev) pcibus_to_node(pcidev->bus) @@ -272,12 +338,18 @@ struct rds_ib_statistics { uint64_t s_ib_ack_send_delayed; uint64_t s_ib_ack_send_piggybacked; uint64_t s_ib_ack_received; - uint64_t s_ib_rdma_mr_alloc; - uint64_t s_ib_rdma_mr_free; - uint64_t s_ib_rdma_mr_used; - uint64_t s_ib_rdma_mr_pool_flush; - uint64_t s_ib_rdma_mr_pool_wait; - uint64_t s_ib_rdma_mr_pool_depleted; + uint64_t s_ib_rdma_mr_8k_alloc; + uint64_t s_ib_rdma_mr_8k_free; + uint64_t s_ib_rdma_mr_8k_used; + uint64_t s_ib_rdma_mr_8k_pool_flush; + uint64_t s_ib_rdma_mr_8k_pool_wait; + uint64_t s_ib_rdma_mr_8k_pool_depleted; + uint64_t s_ib_rdma_mr_1m_alloc; + uint64_t s_ib_rdma_mr_1m_free; + uint64_t s_ib_rdma_mr_1m_used; + uint64_t s_ib_rdma_mr_1m_pool_flush; + uint64_t s_ib_rdma_mr_1m_pool_wait; + uint64_t s_ib_rdma_mr_1m_pool_depleted; uint64_t s_ib_atomic_cswp; uint64_t s_ib_atomic_fadd; uint64_t s_ib_srq_lows; @@ -321,6 +393,7 @@ static inline void rds_ib_dma_sync_sg_for_device(struct ib_device *dev, /* ib.c */ +extern struct workqueue_struct *rds_aux_wq; extern struct rds_transport rds_ib_transport; extern void rds_ib_add_one(struct ib_device *device); extern void rds_ib_remove_one(struct ib_device *device); @@ -328,13 +401,18 @@ struct rds_ib_device *rds_ib_get_client_data(struct ib_device *device); void rds_ib_dev_put(struct rds_ib_device *rds_ibdev); extern struct ib_client rds_ib_client; -extern unsigned int fmr_pool_size; -extern unsigned int fmr_message_size; +extern unsigned int rds_ib_fmr_1m_pool_size; +extern unsigned int rds_ib_fmr_8k_pool_size; extern unsigned int rds_ib_retry_count; +extern unsigned int rds_ib_rnr_retry_count; +extern unsigned int rds_ib_apm_enable; +extern unsigned int rds_ib_timeout; extern spinlock_t ib_nodev_conns_lock; extern struct list_head ib_nodev_conns; +extern struct socket *rds_ib_inet_socket; + /* ib_cm.c */ int rds_ib_conn_alloc(struct rds_connection *conn, gfp_t gfp); void rds_ib_conn_free(void *arg); @@ -349,6 +427,8 @@ int rds_ib_cm_handle_connect(struct rdma_cm_id *cm_id, int rds_ib_cm_initiate_connect(struct rdma_cm_id *cm_id); void rds_ib_cm_connect_complete(struct rds_connection *conn, struct rdma_cm_event *event); +void rds_ib_check_migration(struct rds_connection *conn, + struct rdma_cm_event *event); #define rds_ib_conn_error(conn, fmt...) \ @@ -359,7 +439,7 @@ int rds_ib_update_ipaddr(struct rds_ib_device *rds_ibdev, __be32 ipaddr); void rds_ib_add_conn(struct rds_ib_device *rds_ibdev, struct rds_connection *conn); void rds_ib_remove_conn(struct rds_ib_device *rds_ibdev, struct rds_connection *conn); void rds_ib_destroy_nodev_conns(void); -struct rds_ib_mr_pool *rds_ib_create_mr_pool(struct rds_ib_device *); +struct rds_ib_mr_pool *rds_ib_create_mr_pool(struct rds_ib_device *rds_dev, int npages); void rds_ib_get_mr_info(struct rds_ib_device *rds_ibdev, struct rds_info_rdma_connection *iinfo); void rds_ib_destroy_mr_pool(struct rds_ib_mr_pool *); void *rds_ib_get_mr(struct scatterlist *sg, unsigned long nents, diff --git a/net/rds/ib_cm.c b/net/rds/ib_cm.c index 875fb6bb16e49..b3eef17eaab4e 100644 --- a/net/rds/ib_cm.c +++ b/net/rds/ib_cm.c @@ -33,6 +33,8 @@ #include #include #include +#include +#include #include "rds.h" #include "ib.h" @@ -213,6 +215,33 @@ void rds_ib_cm_connect_complete(struct rds_connection *conn, struct rdma_cm_even if (dp && dp->dp_ack_seq) rds_send_drop_acked(conn, be64_to_cpu(dp->dp_ack_seq), NULL); + if (rds_ib_apm_enable && !ic->conn->c_reconnect) { + memcpy(&ic->i_pri_path.p_sgid, + &ic->i_cm_id->route.path_rec[0].sgid, + sizeof(union ib_gid)); + + memcpy(&ic->i_pri_path.p_dgid, + &ic->i_cm_id->route.path_rec[0].dgid, + sizeof(union ib_gid)); + + memcpy(&ic->i_cur_path.p_sgid, + &ic->i_cm_id->route.path_rec[0].sgid, + sizeof(union ib_gid)); + + memcpy(&ic->i_cur_path.p_dgid, + &ic->i_cm_id->route.path_rec[0].dgid, + sizeof(union ib_gid)); + + printk(KERN_NOTICE "RDS/IB: connection " + "<%u.%u.%u.%u,%u.%u.%u.%u,%d> primary path " + "<"RDS_IB_GID_FMT","RDS_IB_GID_FMT">\n", + NIPQUAD(conn->c_laddr), + NIPQUAD(conn->c_faddr), + conn->c_tos, + RDS_IB_GID_ARG(ic->i_pri_path.p_sgid), + RDS_IB_GID_ARG(ic->i_pri_path.p_dgid)); + } + rds_connect_complete(conn); } @@ -232,8 +261,9 @@ static void rds_ib_cm_fill_conn_param(struct rds_connection *conn, min_t(u32, rds_ibdev->max_responder_resources, max_responder_resources); conn_param->initiator_depth = min_t(u32, rds_ibdev->max_initiator_depth, max_initiator_depth); - conn_param->retry_count = min_t(unsigned int, rds_ib_retry_count, 7); - conn_param->rnr_retry_count = 7; + conn_param->retry_count = + min_t(unsigned int, rds_ib_retry_count, rds_ib_rnr_retry_count); + conn_param->rnr_retry_count = rds_ib_rnr_retry_count; if (dp) { memset(dp, 0, sizeof(*dp)); @@ -379,6 +409,44 @@ static void rds_ib_qp_event_handler(struct ib_event *event, void *data) case IB_EVENT_QP_LAST_WQE_REACHED: complete(&ic->i_last_wqe_complete); break; + case IB_EVENT_PATH_MIG: + memcpy(&ic->i_cur_path.p_sgid, + &ic->i_cm_id->route.path_rec[ic->i_alt_path_index].sgid, + sizeof(union ib_gid)); + + memcpy(&ic->i_cur_path.p_dgid, + &ic->i_cm_id->route.path_rec[ic->i_alt_path_index].dgid, + sizeof(union ib_gid)); + + if (!memcmp(&ic->i_pri_path.p_sgid, &ic->i_cur_path.p_sgid, + sizeof(union ib_gid)) && + !memcmp(&ic->i_pri_path.p_dgid, &ic->i_cur_path.p_dgid, + sizeof(union ib_gid))) { + printk(KERN_NOTICE + "RDS/IB: connection " + "<%u.%u.%u.%u,%u.%u.%u.%u,%d> migrated back to path " + "<"RDS_IB_GID_FMT","RDS_IB_GID_FMT">\n", + NIPQUAD(conn->c_laddr), + NIPQUAD(conn->c_faddr), + conn->c_tos, + RDS_IB_GID_ARG(ic->i_cur_path.p_sgid), + RDS_IB_GID_ARG(ic->i_cur_path.p_dgid)); + } else { + printk(KERN_NOTICE + "RDS/IB: connection " + "<%u.%u.%u.%u,%u.%u.%u.%u,%d> migrated over to path " + "<"RDS_IB_GID_FMT","RDS_IB_GID_FMT">\n", + NIPQUAD(conn->c_laddr), + NIPQUAD(conn->c_faddr), + conn->c_tos, + RDS_IB_GID_ARG(ic->i_cur_path.p_sgid), + RDS_IB_GID_ARG(ic->i_cur_path.p_dgid)); + } + + break; + case IB_EVENT_PATH_MIG_ERR: + rdsdebug("RDS: Path migration error\n"); + break; default: rdsdebug("Fatal QP Event %u (%s) " "- connection %pI4->%pI4, reconnecting\n", @@ -705,10 +773,18 @@ int rds_ib_cm_handle_connect(struct rdma_cm_id *cm_id, event->param.conn.responder_resources, event->param.conn.initiator_depth); + if (rds_ib_apm_enable) + rdma_set_timeout(cm_id, rds_ib_timeout); + /* rdma_accept() calls rdma_reject() internally if it fails */ err = rdma_accept(cm_id, &conn_param); if (err) rds_ib_conn_error(conn, "rdma_accept failed (%d)\n", err); + else if (rds_ib_apm_enable && !conn->c_loopback) { + err = rdma_enable_apm(cm_id, RDMA_ALT_PATH_BEST); + if (err) + printk(KERN_WARNING "RDS/IB: APM couldn't be enabled for passive side: %d\n", err); + } out: if (conn) @@ -727,6 +803,12 @@ int rds_ib_cm_initiate_connect(struct rdma_cm_id *cm_id) struct rds_ib_connect_private dp; int ret; + if (rds_ib_apm_enable && !conn->c_loopback) { + ret = rdma_enable_apm(cm_id, RDMA_ALT_PATH_BEST); + if (ret) + printk(KERN_WARNING "RDS/IB: APM couldn't be enabled for active side: %d\n", ret); + } + /* If the peer doesn't do protocol negotiation, we must * default to RDSv3.0 */ rds_ib_set_protocol(conn, RDS_PROTOCOL_3_0); @@ -752,13 +834,120 @@ out: if (ic->i_cm_id == cm_id) ret = 0; } + + ic->i_active_side = 1; return ret; } +static void rds_ib_migrate(struct work_struct *_work) +{ + struct rds_ib_migrate_work *work = + container_of(_work, struct rds_ib_migrate_work, work.work); + struct rds_ib_connection *ic = work->ic; + struct ib_qp_attr qp_attr; + struct ib_qp_init_attr qp_init_attr; + enum ib_mig_state path_mig_state; + struct rdma_cm_id *cm_id = ic->i_cm_id; + int ret = 0; + + if (!ic->i_active_side) { + ret = ib_query_qp(cm_id->qp, &qp_attr, IB_QP_PATH_MIG_STATE, + &qp_init_attr); + if (ret) { + printk(KERN_ERR "RDS/IB: failed to query QP\n"); + return; + } + + path_mig_state = qp_attr.path_mig_state; + if (!path_mig_state) { + printk(KERN_NOTICE + "RDS/IB: Migration in progress..skip\n"); + return; + } + + qp_attr.path_mig_state = 0; + ret = ib_modify_qp(cm_id->qp, &qp_attr, IB_QP_PATH_MIG_STATE); + if (ret) { + printk(KERN_ERR "RDS/IB: failed to modify QP from %s" + " to MIGRATED state\n", + (!path_mig_state) ? "MIGRATED" : + (path_mig_state == 1) ? "REARM" : + (path_mig_state == 2) ? "ARMED" : "UNKNOWN"); + } + } +} + +void rds_ib_check_migration(struct rds_connection *conn, + struct rdma_cm_event *event) +{ + struct rds_ib_connection *ic = conn->c_transport_data; + union ib_gid sgid; + union ib_gid dgid; + struct ib_qp_init_attr qp_init_attr; + struct ib_qp_attr qp_attr; + struct rdma_cm_id *cm_id = ic->i_cm_id; + int err; + + if (!rds_ib_apm_enable || !rds_conn_up(ic->conn)) + return ; + + ic->i_alt_path_index = event->param.ud.alt_path_index; + + memcpy(&sgid, &cm_id->route.path_rec[event->param.ud.alt_path_index]. + sgid, sizeof(union ib_gid)); + memcpy(&dgid, &cm_id->route.path_rec[event->param.ud.alt_path_index]. + dgid, sizeof(union ib_gid)); + + printk(KERN_NOTICE + "RDS/IB: connection " + "<%u.%u.%u.%u,%u.%u.%u.%u,%d> loaded alternate path " + "<"RDS_IB_GID_FMT","RDS_IB_GID_FMT">\n", + NIPQUAD(conn->c_laddr), + NIPQUAD(conn->c_faddr), + conn->c_tos, + RDS_IB_GID_ARG(sgid), RDS_IB_GID_ARG(dgid)); + + err = ib_query_qp(cm_id->qp, &qp_attr, IB_QP_ALT_PATH, &qp_init_attr); + if (err) { + printk(KERN_ERR "RDS/IB: ib_query_qp failed (%d)\n", err); + return; + } + qp_attr.alt_timeout = rds_ib_timeout; + err = ib_modify_qp(cm_id->qp, &qp_attr, IB_QP_ALT_PATH); + if (err) { + printk(KERN_ERR "RDS/IB: ib_modify_qp failed (%d)\n", err); + return; + } + + if (!memcmp(&ic->i_pri_path.p_sgid, &sgid, sizeof(union ib_gid)) && + !memcmp(&ic->i_pri_path.p_dgid, &dgid, sizeof(union ib_gid))) { + if (memcmp(&ic->i_cur_path.p_sgid, &ic->i_pri_path.p_sgid, + sizeof(union ib_gid)) || + memcmp(&ic->i_cur_path.p_dgid, &ic->i_pri_path.p_dgid, + sizeof(union ib_gid))) { + + ic->i_migrate_w.ic = ic; + queue_delayed_work(rds_wq, &ic->i_migrate_w.work, 0); + } + } +} + +static void rds_ib_destroy_id(struct work_struct *_work) +{ + struct rds_ib_destroy_id_work *work = + container_of(_work, struct rds_ib_destroy_id_work, work.work); + struct rdma_cm_id *cm_id = work->cm_id; + + rdma_destroy_id(cm_id); + + kfree(work); +} + int rds_ib_conn_connect(struct rds_connection *conn) { struct rds_ib_connection *ic = conn->c_transport_data; struct sockaddr_in src, dest; + struct rds_ib_destroy_id_work *work; int ret; /* XXX I wonder what affect the port space has */ @@ -788,7 +977,14 @@ int rds_ib_conn_connect(struct rds_connection *conn) if (ret) { rdsdebug("addr resolve failed for cm id %p: %d\n", ic->i_cm_id, ret); - rdma_destroy_id(ic->i_cm_id); + work = kzalloc(sizeof *work, GFP_KERNEL); + if (work) { + work->cm_id = ic->i_cm_id; + INIT_DELAYED_WORK(&work->work, rds_ib_destroy_id); + queue_delayed_work(rds_aux_wq, &work->work, 0); + } else + rdma_destroy_id(ic->i_cm_id); + ic->i_cm_id = NULL; } @@ -804,6 +1000,7 @@ out: void rds_ib_conn_shutdown(struct rds_connection *conn) { struct rds_ib_connection *ic = conn->c_transport_data; + struct rds_ib_destroy_id_work *work; int err = 0; rdsdebug("cm %p pd %p cq %p qp %p\n", ic->i_cm_id, @@ -868,7 +1065,17 @@ void rds_ib_conn_shutdown(struct rds_connection *conn) if (ic->i_recvs) rds_ib_recv_clear_ring(ic); - rdma_destroy_id(ic->i_cm_id); + /* + * rdma_destroy_id may block so offload it to the aux + * thread for processing. + */ + work = kzalloc(sizeof *work, GFP_KERNEL); + if (work) { + work->cm_id = ic->i_cm_id; + INIT_DELAYED_WORK(&work->work, rds_ib_destroy_id); + queue_delayed_work(rds_aux_wq, &work->work, 0); + } else + rdma_destroy_id(ic->i_cm_id); /* * Move connection back to the nodev list. @@ -925,6 +1132,8 @@ void rds_ib_conn_shutdown(struct rds_connection *conn) ic->i_recvs = NULL; INIT_COMPLETION(ic->i_last_wqe_complete); + + ic->i_active_side = 0; } int rds_ib_conn_alloc(struct rds_connection *conn, gfp_t gfp) @@ -965,11 +1174,12 @@ int rds_ib_conn_alloc(struct rds_connection *conn, gfp_t gfp) init_completion(&ic->i_last_wqe_complete); + INIT_DELAYED_WORK(&ic->i_migrate_w.work, rds_ib_migrate); + spin_lock_irqsave(&ib_nodev_conns_lock, flags); list_add_tail(&ic->ib_node, &ib_nodev_conns); spin_unlock_irqrestore(&ib_nodev_conns_lock, flags); - rdsdebug("conn %p conn ic %p\n", conn, conn->c_transport_data); return 0; } diff --git a/net/rds/ib_rdma.c b/net/rds/ib_rdma.c index a62622c1b7d01..2141f5759718e 100644 --- a/net/rds/ib_rdma.c +++ b/net/rds/ib_rdma.c @@ -66,6 +66,7 @@ struct rds_ib_mr { * Our own little FMR pool */ struct rds_ib_mr_pool { + unsigned int pool_type; struct mutex flush_lock; /* serialize fmr invalidate */ struct delayed_work flush_worker; /* flush worker */ @@ -222,7 +223,8 @@ void rds_ib_destroy_nodev_conns(void) rds_conn_destroy(ic->conn); } -struct rds_ib_mr_pool *rds_ib_create_mr_pool(struct rds_ib_device *rds_ibdev) +struct rds_ib_mr_pool *rds_ib_create_mr_pool(struct rds_ib_device *rds_ibdev, + int pool_type) { struct rds_ib_mr_pool *pool; @@ -230,6 +232,7 @@ struct rds_ib_mr_pool *rds_ib_create_mr_pool(struct rds_ib_device *rds_ibdev) if (!pool) return ERR_PTR(-ENOMEM); + pool->pool_type = pool_type; INIT_XLIST_HEAD(&pool->free_list); INIT_XLIST_HEAD(&pool->drop_list); INIT_XLIST_HEAD(&pool->clean_list); @@ -237,28 +240,28 @@ struct rds_ib_mr_pool *rds_ib_create_mr_pool(struct rds_ib_device *rds_ibdev) init_waitqueue_head(&pool->flush_wait); INIT_DELAYED_WORK(&pool->flush_worker, rds_ib_mr_pool_flush_worker); - pool->fmr_attr.max_pages = fmr_message_size; + if (pool_type == RDS_IB_MR_1M_POOL) { + pool->fmr_attr.max_pages = RDS_FMR_1M_MSG_SIZE; + pool->max_items = rds_ib_fmr_1m_pool_size; + } else /* pool_type == RDS_IB_MR_8K_POOL */ { + pool->fmr_attr.max_pages = RDS_FMR_8K_MSG_SIZE; + pool->max_items = rds_ib_fmr_8k_pool_size; + } + pool->max_free_pinned = + pool->max_items * pool->fmr_attr.max_pages / 4; pool->fmr_attr.max_maps = rds_ibdev->fmr_max_remaps; pool->fmr_attr.page_shift = PAGE_SHIFT; - pool->max_free_pinned = rds_ibdev->max_fmrs * fmr_message_size / 4; - - /* We never allow more than max_items MRs to be allocated. - * When we exceed more than max_items_soft, we start freeing - * items more aggressively. - * Make sure that max_items > max_items_soft > max_items / 2 - */ - pool->max_items_soft = rds_ibdev->max_fmrs * 3 / 4; - pool->max_items = rds_ibdev->max_fmrs; + pool->max_items_soft = pool->max_items * 3 / 4; return pool; } void rds_ib_get_mr_info(struct rds_ib_device *rds_ibdev, struct rds_info_rdma_connection *iinfo) { - struct rds_ib_mr_pool *pool = rds_ibdev->mr_pool; + struct rds_ib_mr_pool *pool_1m = rds_ibdev->mr_1m_pool; - iinfo->rdma_mr_max = pool->max_items; - iinfo->rdma_mr_size = pool->fmr_attr.max_pages; + iinfo->rdma_mr_max = pool_1m->max_items; + iinfo->rdma_mr_size = pool_1m->fmr_attr.max_pages; } void rds_ib_destroy_mr_pool(struct rds_ib_mr_pool *pool) @@ -308,12 +311,18 @@ static inline void wait_clean_list_grace(void) } } -static struct rds_ib_mr *rds_ib_alloc_fmr(struct rds_ib_device *rds_ibdev) +static struct rds_ib_mr *rds_ib_alloc_fmr(struct rds_ib_device *rds_ibdev, + int npages) { - struct rds_ib_mr_pool *pool = rds_ibdev->mr_pool; + struct rds_ib_mr_pool *pool; struct rds_ib_mr *ibmr = NULL; int err = 0, iter = 0; + if (npages <= RDS_FMR_8K_MSG_SIZE) + pool = rds_ibdev->mr_8k_pool; + else + pool = rds_ibdev->mr_1m_pool; + if (atomic_read(&pool->dirty_count) >= pool->max_items / 10) queue_delayed_work(rds_ib_fmr_wq, &pool->flush_worker, 10); @@ -337,12 +346,18 @@ static struct rds_ib_mr *rds_ib_alloc_fmr(struct rds_ib_device *rds_ibdev) atomic_dec(&pool->item_count); if (++iter > 2) { - rds_ib_stats_inc(s_ib_rdma_mr_pool_depleted); + if (pool->pool_type == RDS_IB_MR_8K_POOL) + rds_ib_stats_inc(s_ib_rdma_mr_8k_pool_depleted); + else + rds_ib_stats_inc(s_ib_rdma_mr_1m_pool_depleted); return ERR_PTR(-EAGAIN); } /* We do have some empty MRs. Flush them out. */ - rds_ib_stats_inc(s_ib_rdma_mr_pool_wait); + if (pool->pool_type == RDS_IB_MR_8K_POOL) + rds_ib_stats_inc(s_ib_rdma_mr_8k_pool_wait); + else + rds_ib_stats_inc(s_ib_rdma_mr_1m_pool_wait); rds_ib_flush_mr_pool(pool, 0, &ibmr); if (ibmr) return ibmr; @@ -369,7 +384,11 @@ static struct rds_ib_mr *rds_ib_alloc_fmr(struct rds_ib_device *rds_ibdev) goto out_no_cigar; } - rds_ib_stats_inc(s_ib_rdma_mr_alloc); + ibmr->pool = pool; + if (pool->pool_type == RDS_IB_MR_8K_POOL) + rds_ib_stats_inc(s_ib_rdma_mr_8k_alloc); + else + rds_ib_stats_inc(s_ib_rdma_mr_1m_alloc); return ibmr; out_no_cigar: @@ -425,7 +444,7 @@ static int rds_ib_map_fmr(struct rds_ib_device *rds_ibdev, struct rds_ib_mr *ibm } page_cnt += len >> PAGE_SHIFT; - if (page_cnt > fmr_message_size) + if (page_cnt > ibmr->pool->fmr_attr.max_pages) return -EINVAL; dma_pages = kmalloc(sizeof(u64) * page_cnt, GFP_ATOMIC); @@ -456,7 +475,10 @@ static int rds_ib_map_fmr(struct rds_ib_device *rds_ibdev, struct rds_ib_mr *ibm ibmr->sg_dma_len = sg_dma_len; ibmr->remap_count++; - rds_ib_stats_inc(s_ib_rdma_mr_used); + if (ibmr->pool->pool_type == RDS_IB_MR_8K_POOL) + rds_ib_stats_inc(s_ib_rdma_mr_8k_used); + else + rds_ib_stats_inc(s_ib_rdma_mr_1m_used); ret = 0; out: @@ -519,8 +541,7 @@ static void rds_ib_teardown_mr(struct rds_ib_mr *ibmr) __rds_ib_teardown_mr(ibmr); if (pinned) { - struct rds_ib_device *rds_ibdev = ibmr->device; - struct rds_ib_mr_pool *pool = rds_ibdev->mr_pool; + struct rds_ib_mr_pool *pool = ibmr->pool; atomic_sub(pinned, &pool->free_pinned); } @@ -598,7 +619,10 @@ static int rds_ib_flush_mr_pool(struct rds_ib_mr_pool *pool, unsigned int nfreed = 0, ncleaned = 0, free_goal; int ret = 0; - rds_ib_stats_inc(s_ib_rdma_mr_pool_flush); + if (pool->pool_type == RDS_IB_MR_8K_POOL) + rds_ib_stats_inc(s_ib_rdma_mr_8k_pool_flush); + else + rds_ib_stats_inc(s_ib_rdma_mr_1m_pool_flush); if (ibmr_ret) { DEFINE_WAIT(wait); @@ -660,7 +684,10 @@ static int rds_ib_flush_mr_pool(struct rds_ib_mr_pool *pool, unpinned += ibmr->sg_len; __rds_ib_teardown_mr(ibmr); if (nfreed < free_goal || ibmr->remap_count >= pool->fmr_attr.max_maps) { - rds_ib_stats_inc(s_ib_rdma_mr_free); + if (ibmr->pool->pool_type == RDS_IB_MR_8K_POOL) + rds_ib_stats_inc(s_ib_rdma_mr_8k_free); + else + rds_ib_stats_inc(s_ib_rdma_mr_1m_free); list_del(&ibmr->unmap_list); ib_dealloc_fmr(ibmr->fmr); kfree(ibmr); @@ -732,7 +759,7 @@ void rds_ib_free_mr(void *trans_private, int invalidate) { struct rds_ib_mr *ibmr = trans_private; struct rds_ib_device *rds_ibdev = ibmr->device; - struct rds_ib_mr_pool *pool = rds_ibdev->mr_pool; + struct rds_ib_mr_pool *pool = ibmr->pool; rdsdebug("RDS/IB: free_mr nents %u\n", ibmr->sg_len); @@ -770,10 +797,11 @@ void rds_ib_flush_mrs(void) down_read(&rds_ib_devices_lock); list_for_each_entry(rds_ibdev, &rds_ib_devices, list) { - struct rds_ib_mr_pool *pool = rds_ibdev->mr_pool; + if (rds_ibdev->mr_8k_pool) + rds_ib_flush_mr_pool(rds_ibdev->mr_8k_pool, 0, NULL); - if (pool) - rds_ib_flush_mr_pool(pool, 0, NULL); + if (rds_ibdev->mr_1m_pool) + rds_ib_flush_mr_pool(rds_ibdev->mr_1m_pool, 0, NULL); } up_read(&rds_ib_devices_lock); } @@ -791,12 +819,12 @@ void *rds_ib_get_mr(struct scatterlist *sg, unsigned long nents, goto out; } - if (!rds_ibdev->mr_pool) { + if (!rds_ibdev->mr_8k_pool || !rds_ibdev->mr_1m_pool) { ret = -ENODEV; goto out; } - ibmr = rds_ib_alloc_fmr(rds_ibdev); + ibmr = rds_ib_alloc_fmr(rds_ibdev, nents); if (IS_ERR(ibmr)) return ibmr; diff --git a/net/rds/ib_recv.c b/net/rds/ib_recv.c index 67f61c54a7544..af0893e2f7570 100644 --- a/net/rds/ib_recv.c +++ b/net/rds/ib_recv.c @@ -300,8 +300,8 @@ static struct rds_page_frag *rds_ib_refill_one_frag(struct rds_ib_connection *ic "recv memory exceeded max_recv_allocation %d\n", atomic_read(&rds_ib_allocation)); } - kmem_cache_free(rds_ib_frag_slab, frag); rds_ib_stats_inc(s_ib_rx_alloc_limit); + kmem_cache_free(rds_ib_frag_slab, frag); return NULL; } @@ -629,7 +629,8 @@ release_out: * if we should requeue. */ if (rds_conn_up(conn) && - (must_wake || (can_wait && ring_low) || ring_empty)) { + (must_wake || (can_wait && ring_low) + || rds_ib_ring_empty(&ic->i_recv_ring))) { queue_delayed_work(rds_wq, &conn->c_recv_w, 1); } if (can_wait) @@ -1075,7 +1076,8 @@ static void rds_ib_process_recv(struct rds_connection *conn, if (ihdr->h_credit) rds_ib_send_add_credits(conn, ihdr->h_credit); - if (ihdr->h_sport == 0 && ihdr->h_dport == 0 && data_len == 0) { + if (ihdr->h_sport == 0 && ihdr->h_dport == 0 && data_len == 0 && + ihdr->h_flags == 0) { /* This is an ACK-only packet. The fact that it gets * special treatment here is that historically, ACKs * were rather special beasts. diff --git a/net/rds/ib_ring.c b/net/rds/ib_ring.c index a24458da957ee..b66cd4244d307 100644 --- a/net/rds/ib_ring.c +++ b/net/rds/ib_ring.c @@ -76,7 +76,6 @@ static inline u32 __rds_ib_ring_used(struct rds_ib_work_ring *ring) /* This assumes that atomic_t has at least as many bits as u32 */ diff = ring->w_alloc_ctr - (u32) atomic_read(&ring->w_free_ctr); - BUG_ON(diff > ring->w_nr); return diff; } diff --git a/net/rds/ib_stats.c b/net/rds/ib_stats.c index 46d9d9840cae6..80a4c90ac6cfd 100644 --- a/net/rds/ib_stats.c +++ b/net/rds/ib_stats.c @@ -63,12 +63,18 @@ static char *rds_ib_stat_names[] = { "ib_ack_send_delayed", "ib_ack_send_piggybacked", "ib_ack_received", - "ib_rdma_mr_alloc", - "ib_rdma_mr_free", - "ib_rdma_mr_used", - "ib_rdma_mr_pool_flush", - "ib_rdma_mr_pool_wait", - "ib_rdma_mr_pool_depleted", + "ib_rdma_mr_8k_alloc", + "ib_rdma_mr_8k_free", + "ib_rdma_mr_8k_used", + "ib_rdma_mr_8k_pool_flush", + "ib_rdma_mr_8k_pool_wait", + "ib_rdma_mr_8k_pool_depleted", + "ib_rdma_mr_1m_alloc", + "ib_rdma_mr_1m_free", + "ib_rdma_mr_1m_used", + "ib_rdma_mr_1m_pool_flush", + "ib_rdma_mr_1m_pool_wait", + "ib_rdma_mr_1m_pool_depleted", "ib_atomic_cswp", "ib_atomic_fadd", "ib_srq_lows", diff --git a/net/rds/rdma_transport.c b/net/rds/rdma_transport.c index c79bb56a85844..1e2f7c39dbf60 100644 --- a/net/rds/rdma_transport.c +++ b/net/rds/rdma_transport.c @@ -34,6 +34,9 @@ #include "rdma_transport.h" #include "ib.h" +#include "net/arp.h" +#include +#include static struct rdma_cm_id *rds_iw_listen_id; @@ -43,6 +46,9 @@ int rds_rdma_cm_event_handler(struct rdma_cm_id *cm_id, /* this can be null in the listening path */ struct rds_connection *conn = cm_id->context; struct rds_transport *trans; + struct page *page; + struct arpreq *r; + struct sockaddr_in *sin; int ret = 0; rdsdebug("conn %p id %p handling event %u\n", conn, cm_id, @@ -78,6 +84,8 @@ int rds_rdma_cm_event_handler(struct rdma_cm_id *cm_id, case RDMA_CM_EVENT_ADDR_RESOLVED: rdma_set_service_type(cm_id, conn->c_tos); + if (rds_ib_apm_enable) + rdma_set_timeout(cm_id, rds_ib_timeout); /* XXX do we need to clean up if this fails? */ ret = rdma_resolve_route(cm_id, @@ -105,21 +113,58 @@ int rds_rdma_cm_event_handler(struct rdma_cm_id *cm_id, ret = trans->cm_initiate_connect(cm_id); break; + case RDMA_CM_EVENT_ALT_PATH_LOADED: + rdsdebug("RDS: alt path loaded\n"); + if (conn) + trans->check_migration(conn, event); + break; + + case RDMA_CM_EVENT_ALT_ROUTE_RESOLVED: + rdsdebug("RDS: alt route resolved\n"); + break; + + case RDMA_CM_EVENT_ALT_ROUTE_ERROR: + rdsdebug("RDS: alt route resolve error\n"); + break; + + case RDMA_CM_EVENT_ROUTE_ERROR: + /* IP might have been moved so flush the ARP entry and retry */ + page = alloc_page(GFP_HIGHUSER); + if (!page) { + printk(KERN_ERR "alloc_page failed .. NO MEM\n"); + ret = -ENOMEM; + } else { + r = (struct arpreq *)kmap(page); + memset(r, 0, sizeof(struct arpreq)); + sin = (struct sockaddr_in *)&r->arp_pa; + sin->sin_family = AF_INET; + sin->sin_addr.s_addr = conn->c_faddr; + inet_ioctl(rds_ib_inet_socket, SIOCDARP, (unsigned long) r); + kunmap(page); + __free_page(page); + } + + rds_conn_drop(conn); + break; + case RDMA_CM_EVENT_ESTABLISHED: trans->cm_connect_complete(conn, event); break; case RDMA_CM_EVENT_ADDR_ERROR: - case RDMA_CM_EVENT_ROUTE_ERROR: case RDMA_CM_EVENT_CONNECT_ERROR: case RDMA_CM_EVENT_UNREACHABLE: case RDMA_CM_EVENT_REJECTED: case RDMA_CM_EVENT_DEVICE_REMOVAL: - case RDMA_CM_EVENT_ADDR_CHANGE: if (conn) rds_conn_drop(conn); break; + case RDMA_CM_EVENT_ADDR_CHANGE: + if (conn && !rds_ib_apm_enable) + rds_conn_drop(conn); + break; + case RDMA_CM_EVENT_DISCONNECTED: rdsdebug("DISCONNECT event - dropping connection " "%pI4->%pI4\n", &conn->c_laddr, diff --git a/net/rds/rds.h b/net/rds/rds.h index 9ded5fde23357..51834ff83af87 100644 --- a/net/rds/rds.h +++ b/net/rds/rds.h @@ -119,6 +119,7 @@ struct rds_connection { struct delayed_work c_send_w; struct delayed_work c_recv_w; struct delayed_work c_conn_w; + struct delayed_work c_hb_w; struct work_struct c_down_w; struct mutex c_cm_lock; /* protect conn state & cm */ wait_queue_head_t c_waitq; @@ -139,15 +140,23 @@ struct rds_connection { int c_reconnect_warn; int c_reconnect_err; + unsigned int c_reconnect; + /* Qos support */ u8 c_tos; struct rds_notifier *c_last_failed_op; + + unsigned long c_hb_start; + + unsigned int c_active_side; }; #define RDS_FLAG_CONG_BITMAP 0x01 #define RDS_FLAG_ACK_REQUIRED 0x02 #define RDS_FLAG_RETRANSMITTED 0x04 +#define RDS_FLAG_HB_PING 0x08 +#define RDS_FLAG_HB_PONG 0x10 #define RDS_MAX_ADV_CREDIT 127 /* @@ -461,6 +470,8 @@ struct rds_transport { void (*sync_mr)(void *trans_private, int direction); void (*free_mr)(void *trans_private, int invalidate); void (*flush_mrs)(void); + void (*check_migration)(struct rds_connection *conn, + struct rdma_cm_event *event); }; struct rds_sock { @@ -749,6 +760,7 @@ void rds_send_drop_acked(struct rds_connection *conn, u64 ack, is_acked_func is_acked); void rds_send_remove_from_sock(struct list_head *messages, int status); int rds_send_pong(struct rds_connection *conn, __be16 dport); +int rds_send_hb(struct rds_connection *conn, int response); struct rds_message *rds_send_get_message(struct rds_connection *, struct rm_rdma_op *); @@ -822,6 +834,7 @@ void rds_connect_worker(struct work_struct *); void rds_shutdown_worker(struct work_struct *); void rds_send_worker(struct work_struct *); void rds_recv_worker(struct work_struct *); +void rds_hb_worker(struct work_struct *); void rds_connect_complete(struct rds_connection *conn); /* transport.c */ diff --git a/net/rds/recv.c b/net/rds/recv.c index 9001fe680070b..90d9f75fc0f2f 100644 --- a/net/rds/recv.c +++ b/net/rds/recv.c @@ -202,8 +202,14 @@ void rds_recv_incoming(struct rds_connection *conn, __be32 saddr, __be32 daddr, conn->c_next_rx_seq = be64_to_cpu(inc->i_hdr.h_sequence) + 1; if (rds_sysctl_ping_enable && inc->i_hdr.h_dport == 0) { - rds_stats_inc(s_recv_ping); - rds_send_pong(conn, inc->i_hdr.h_sport); + if (inc->i_hdr.h_flags & RDS_FLAG_HB_PING) { + rds_send_hb(conn, 1); + } else if (inc->i_hdr.h_flags & RDS_FLAG_HB_PONG) { + conn->c_hb_start = 0; + } else { + rds_stats_inc(s_recv_ping); + rds_send_pong(conn, inc->i_hdr.h_sport); + } goto out; } diff --git a/net/rds/send.c b/net/rds/send.c index 3ea05c59625bf..9334b0a935278 100644 --- a/net/rds/send.c +++ b/net/rds/send.c @@ -94,6 +94,11 @@ void rds_send_reset(struct rds_connection *conn) set_bit(RDS_MSG_ACK_REQUIRED, &rm->m_flags); set_bit(RDS_MSG_RETRANSMITTED, &rm->m_flags); + /* flush internal HB msgs */ + if ((rm->m_inc.i_hdr.h_flags == RDS_FLAG_HB_PONG) || + (rm->m_inc.i_hdr.h_flags == RDS_FLAG_HB_PING)) + set_bit(RDS_MSG_FLUSH, &rm->m_flags); + /* check for failed op */ if (rds_async_send_enabled && (rm->rdma.op_active || (rm->data.op_active && rm->data.op_async))) @@ -1355,3 +1360,44 @@ out: rds_message_put(rm); return ret; } + +int +rds_send_hb(struct rds_connection *conn, int response) +{ + struct rds_message *rm; + unsigned long flags; + int ret = 0; + + rm = rds_message_alloc(0, GFP_ATOMIC); + if (!rm) + return -ENOMEM; + + rm->m_daddr = conn->c_faddr; + rm->data.op_active = 1; + + spin_lock_irqsave(&conn->c_lock, flags); + list_add_tail(&rm->m_conn_item, &conn->c_send_queue); + set_bit(RDS_MSG_ON_CONN, &rm->m_flags); + rds_message_addref(rm); + rm->m_inc.i_conn = conn; + + rds_message_populate_header(&rm->m_inc.i_hdr, 0, 0, + conn->c_next_tx_seq); + + if (response) + rm->m_inc.i_hdr.h_flags |= RDS_FLAG_HB_PONG; + else + rm->m_inc.i_hdr.h_flags |= RDS_FLAG_HB_PING; + + rm->m_inc.i_hdr.h_flags |= RDS_FLAG_ACK_REQUIRED; + + conn->c_next_tx_seq++; + spin_unlock_irqrestore(&conn->c_lock, flags); + + ret = rds_send_xmit(conn); + if (ret == -ENOMEM || ret == -EAGAIN) + queue_delayed_work(rds_wq, &conn->c_send_w, 1); + + rds_message_put(rm); + return 0; +} diff --git a/net/rds/threads.c b/net/rds/threads.c index 2c4948a989318..e32a34099f260 100644 --- a/net/rds/threads.c +++ b/net/rds/threads.c @@ -35,6 +35,11 @@ #include "rds.h" +static unsigned int rds_conn_hb_timeout = 0; +module_param(rds_conn_hb_timeout, int, 0444); +MODULE_PARM_DESC(rds_conn_hb_timeout, " Connection heartbeat timeout"); + + /* * All of connection management is simplified by serializing it through * work queues that execute in a connection managing thread. @@ -89,8 +94,11 @@ void rds_connect_complete(struct rds_connection *conn) set_bit(0, &conn->c_map_queued); queue_delayed_work(rds_wq, &conn->c_send_w, 0); queue_delayed_work(rds_wq, &conn->c_recv_w, 0); + queue_delayed_work(rds_wq, &conn->c_hb_w, 0); + conn->c_hb_start = 0; conn->c_connection_start = get_seconds(); + conn->c_reconnect = 1; } EXPORT_SYMBOL_GPL(rds_connect_complete); @@ -156,11 +164,15 @@ void rds_connect_worker(struct work_struct *work) conn, &conn->c_laddr, &conn->c_faddr, ret); if (ret) { - if (rds_conn_transition(conn, RDS_CONN_CONNECTING, RDS_CONN_DOWN)) - rds_queue_reconnect(conn); - else + if (rds_conn_transition(conn, RDS_CONN_CONNECTING, RDS_CONN_DOWN)) { + if (conn->c_reconnect && conn->c_active_side) + rds_queue_reconnect(conn); + } else rds_conn_error(conn, "RDS: connect failed\n"); } + + if (!conn->c_reconnect) + conn->c_active_side = 1; } } @@ -210,11 +222,45 @@ void rds_recv_worker(struct work_struct *work) } } +void rds_hb_worker(struct work_struct *work) +{ + struct rds_connection *conn = container_of(work, struct rds_connection, c_hb_w.work); + unsigned long now = get_seconds(); + int ret; + + if (!rds_conn_hb_timeout || conn->c_loopback) + return; + + if (rds_conn_state(conn) == RDS_CONN_UP) { + if (!conn->c_hb_start) { + ret = rds_send_hb(conn, 0); + if (ret) { + rdsdebug("RDS/IB: rds_hb_worker: failed %d\n", ret); + return; + } + conn->c_hb_start = now; + } else if (now - conn->c_hb_start > rds_conn_hb_timeout) { + printk(KERN_NOTICE + "RDS/IB: connection <%u.%u.%u.%u,%u.%u.%u.%u,%d> " + "timed out (0x%lx,0x%lx)..disconnecting and reconnecting\n", + NIPQUAD(conn->c_laddr), + NIPQUAD(conn->c_faddr), conn->c_tos, + conn->c_hb_start, now); + rds_conn_drop(conn); + return; + } + queue_delayed_work(rds_wq, &conn->c_hb_w, HZ); + } +} + void rds_shutdown_worker(struct work_struct *work) { struct rds_connection *conn = container_of(work, struct rds_connection, c_down_w); rds_conn_shutdown(conn); + + if (!conn->c_reconnect) + conn->c_active_side = 0; } void rds_threads_exit(void) -- 2.50.1