From: Bang Nguyen Date: Fri, 3 Feb 2012 16:10:06 +0000 (-0500) Subject: RDS Quality Of Service X-Git-Tag: v4.1.12-92~319^2^2~2^2~28 X-Git-Url: https://www.infradead.org/git/?a=commitdiff_plain;h=d4186fe15283f639a99bffdc1b97a24468b0b6f8;p=users%2Fjedix%2Flinux-maple.git RDS Quality Of Service RDS QoS is an extension of IB QoS to provide clients the ability to segregate traffic flows and define policy to regulate them. Internally, each traffic flow is represented by a connection with all of its independent resources like that of a normal connection, and is differentiated by service type. In other words, there can be multiple connections between an IP pair and each supports a unique service type. Service type (TOS) is user-defined and can be configured to satisfy certain traffic requirements. For example, one service type may be configured for high-priority low-latency traffic, another for low-priority high-bandwidth traffic, and so on. TOS is socket based. Client can set TOS on a socket via an IOCTL and must do so before initiating any traffic. Once set, the TOS can not be changed. ioctl(fd, RDS_IOC_SET_TOS=1, (uint8_t *)) All out-going traffic from the socket will be associated with its TOS. Signed-off-by: Venkat Venkatsubra Signed-off-by: Chris Mason Signed-off-by: Bang Nguyen --- diff --git a/include/linux/rds.h b/include/linux/rds.h index 0833ae73d062..4a649f4a0927 100644 --- a/include/linux/rds.h +++ b/include/linux/rds.h @@ -57,6 +57,15 @@ #define RDS_RECVERR 5 #define RDS_CONG_MONITOR 6 #define RDS_GET_MR_FOR_DEST 7 +#define RDS_CONN_RESET 8 + +/* + * ioctl commands for SOL_RDS +*/ +#define RDS_IOC_SET_TOS 1 + +typedef u_int8_t rds_tos_t; + /* * Control message types for SOL_RDS. @@ -117,6 +126,7 @@ struct rds_info_connection { __be32 faddr; u_int8_t transport[TRANSNAMSIZ]; /* null term ascii */ u_int8_t flags; + u_int8_t tos; } __attribute__((packed)); struct rds_info_flow { @@ -138,6 +148,7 @@ struct rds_info_message { __be16 lport; __be16 fport; u_int8_t flags; + u_int8_t tos; } __attribute__((packed)); struct rds_info_socket { @@ -174,6 +185,9 @@ struct rds_info_rdma_connection { uint32_t max_send_sge; uint32_t rdma_mr_max; uint32_t rdma_mr_size; + uint8_t tos; + uint8_t sl; + uint32_t cache_allocs; }; /* @@ -258,6 +272,12 @@ struct rds_atomic_args { u_int64_t user_token; }; +struct rds_reset { + u_int8_t tos; + struct in_addr src; + struct in_addr dst; +}; + struct rds_rdma_notify { u_int64_t user_token; int32_t status; diff --git a/net/rds/af_rds.c b/net/rds/af_rds.c index 33ca87138580..b13b922fdcf1 100644 --- a/net/rds/af_rds.c +++ b/net/rds/af_rds.c @@ -200,7 +200,28 @@ static unsigned int rds_poll(struct file *file, struct socket *sock, static int rds_ioctl(struct socket *sock, unsigned int cmd, unsigned long arg) { - return -ENOIOCTLCMD; + struct rds_sock *rs = rds_sk_to_rs(sock->sk); + rds_tos_t tos; + unsigned long flags; + + if (get_user(tos, (rds_tos_t __user *)arg)) + return -EFAULT; + + switch (cmd) { + case RDS_IOC_SET_TOS: + spin_lock_irqsave(&rds_sock_lock, flags); + if (rs->rs_tos || rs->rs_conn) { + spin_unlock_irqrestore(&rds_sock_lock, flags); + return -EINVAL; + } + rs->rs_tos = tos; + spin_unlock_irqrestore(&rds_sock_lock, flags); + break; + default: + return -ENOPROTOOPT; + } + + return 0; } static int rds_cancel_sent_to(struct rds_sock *rs, char __user *optval, @@ -261,6 +282,32 @@ static int rds_cong_monitor(struct rds_sock *rs, char __user *optval, return ret; } +static int rds_user_reset(struct rds_sock *rs, char __user *optval, int optlen) +{ + struct rds_reset reset; + struct rds_connection *conn; + + if (optlen != sizeof(struct rds_reset)) + return -EINVAL; + + if (copy_from_user(&reset, (struct rds_reset __user *)optval, + sizeof(struct rds_reset))) + return -EFAULT; + + conn = rds_conn_find(reset.src.s_addr, reset.dst.s_addr, + rs->rs_transport, reset.tos); + + if (conn) { + printk(KERN_NOTICE "Resetting RDS/IB connection " + "<%u.%u.%u.%u,%u.%u.%u.%u,%d>\n", + NIPQUAD(reset.src.s_addr), + NIPQUAD(reset.dst.s_addr), conn->c_tos); + rds_conn_drop(conn); + } + + return 0; +} + static int rds_setsockopt(struct socket *sock, int level, int optname, char __user *optval, unsigned int optlen) { @@ -291,6 +338,9 @@ static int rds_setsockopt(struct socket *sock, int level, int optname, case RDS_CONG_MONITOR: ret = rds_cong_monitor(rs, optval, optlen); break; + case RDS_CONN_RESET: + ret = rds_user_reset(rs, optval, optlen); + break; default: ret = -ENOPROTOOPT; } @@ -425,6 +475,8 @@ static int __rds_create(struct socket *sock, struct sock *sk, int protocol) spin_lock_init(&rs->rs_rdma_lock); rs->rs_rdma_keys = RB_ROOT; rs->poison = 0xABABABAB; + rs->rs_tos = 0; + rs->rs_conn = 0; if (rs->rs_bound_addr) { printk(KERN_CRIT "bound addr %x at create\n", rs->rs_bound_addr); diff --git a/net/rds/connection.c b/net/rds/connection.c index af13c524edc8..d99b958aa3dc 100644 --- a/net/rds/connection.c +++ b/net/rds/connection.c @@ -64,13 +64,15 @@ static struct hlist_head *rds_conn_bucket(__be32 laddr, __be32 faddr) /* rcu read lock must be held or the connection spinlock */ static struct rds_connection *rds_conn_lookup(struct hlist_head *head, __be32 laddr, __be32 faddr, - struct rds_transport *trans) + struct rds_transport *trans, + u8 tos) { struct rds_connection *conn, *ret = NULL; struct hlist_node *pos; hlist_for_each_entry_rcu(conn, pos, head, c_hash_node) { if (conn->c_faddr == faddr && conn->c_laddr == laddr && + conn->c_tos == tos && conn->c_trans == trans) { ret = conn; break; @@ -112,6 +114,7 @@ void rds_conn_reset(struct rds_connection *conn) */ static struct rds_connection *__rds_conn_create(__be32 laddr, __be32 faddr, struct rds_transport *trans, gfp_t gfp, + u8 tos, int is_outgoing) { struct rds_connection *conn, *parent = NULL; @@ -121,7 +124,7 @@ static struct rds_connection *__rds_conn_create(__be32 laddr, __be32 faddr, int ret; rcu_read_lock(); - conn = rds_conn_lookup(head, laddr, faddr, trans); + conn = rds_conn_lookup(head, laddr, faddr, trans, tos); if (conn && conn->c_loopback && conn->c_trans != &rds_loop_transport @@ -156,6 +159,8 @@ static struct rds_connection *__rds_conn_create(__be32 laddr, __be32 faddr, INIT_LIST_HEAD(&conn->c_send_queue); INIT_LIST_HEAD(&conn->c_retrans); + conn->c_tos = tos; + ret = rds_cong_get_maps(conn); if (ret) { kmem_cache_free(rds_conn_slab, conn); @@ -233,7 +238,7 @@ static struct rds_connection *__rds_conn_create(__be32 laddr, __be32 faddr, /* Creating normal conn */ struct rds_connection *found; - found = rds_conn_lookup(head, laddr, faddr, trans); + found = rds_conn_lookup(head, laddr, faddr, trans, tos); if (found) { trans->conn_free(conn->c_transport_data); kmem_cache_free(rds_conn_slab, conn); @@ -251,19 +256,35 @@ out: } struct rds_connection *rds_conn_create(__be32 laddr, __be32 faddr, - struct rds_transport *trans, gfp_t gfp) + struct rds_transport *trans, + u8 tos, gfp_t gfp) { - return __rds_conn_create(laddr, faddr, trans, gfp, 0); + return __rds_conn_create(laddr, faddr, trans, gfp, tos, 0); } EXPORT_SYMBOL_GPL(rds_conn_create); struct rds_connection *rds_conn_create_outgoing(__be32 laddr, __be32 faddr, - struct rds_transport *trans, gfp_t gfp) + struct rds_transport *trans, + u8 tos, gfp_t gfp) { - return __rds_conn_create(laddr, faddr, trans, gfp, 1); + return __rds_conn_create(laddr, faddr, trans, gfp, tos, 1); } EXPORT_SYMBOL_GPL(rds_conn_create_outgoing); +struct rds_connection *rds_conn_find(__be32 laddr, __be32 faddr, + struct rds_transport *trans, u8 tos) +{ + struct rds_connection *conn; + struct hlist_head *head = rds_conn_bucket(laddr, faddr); + + rcu_read_lock(); + conn = rds_conn_lookup(head, laddr, faddr, trans, tos); + rcu_read_unlock(); + + return conn; +} +EXPORT_SYMBOL_GPL(rds_conn_find); + void rds_conn_shutdown(struct rds_connection *conn) { /* shut it down unless it's down already */ @@ -490,6 +511,7 @@ static int rds_conn_info_visitor(struct rds_connection *conn, cinfo->next_rx_seq = conn->c_next_rx_seq; cinfo->laddr = conn->c_laddr; cinfo->faddr = conn->c_faddr; + cinfo->tos = conn->c_tos; strncpy(cinfo->transport, conn->c_trans->t_name, sizeof(cinfo->transport)); cinfo->flags = 0; @@ -562,7 +584,7 @@ void rds_conn_drop(struct rds_connection *conn) } else if ((conn->c_reconnect_warn) && (now - conn->c_reconnect_start > 60)) { printk(KERN_INFO "RDS/IB: re-connect to %u.%u.%u.%u is " - "stalling for more than 1 min...(drops=%d err=%d)\n", + "stalling for more than 1 min...(drops=%u err=%d)\n", NIPQUAD(conn->c_faddr), conn->c_reconnect_drops, conn->c_reconnect_err); conn->c_reconnect_warn = 0; diff --git a/net/rds/ib.c b/net/rds/ib.c index 4d8c00f797b2..89736d1a0b12 100644 --- a/net/rds/ib.c +++ b/net/rds/ib.c @@ -101,6 +101,8 @@ static void rds_ib_dev_free(struct work_struct *work) ib_dereg_mr(rds_ibdev->mr); if (rds_ibdev->pd) ib_dealloc_pd(rds_ibdev->pd); + if (rds_ibdev->srq) + kfree(rds_ibdev->srq); list_for_each_entry_safe(i_ipaddr, i_next, &rds_ibdev->ipaddr_list, list) { list_del(&i_ipaddr->list); @@ -174,6 +176,10 @@ void rds_ib_add_one(struct ib_device *device) goto put_dev; } + rds_ibdev->srq = kmalloc(sizeof(struct rds_ib_srq), GFP_KERNEL); + if (!rds_ibdev->srq) + goto free_attr; + INIT_LIST_HEAD(&rds_ibdev->ipaddr_list); INIT_LIST_HEAD(&rds_ibdev->conn_list); @@ -291,6 +297,9 @@ static int rds_ib_conn_info_visitor(struct rds_connection *conn, iinfo->max_recv_wr = ic->i_recv_ring.w_nr; iinfo->max_send_sge = rds_ibdev->max_sge; rds_ib_get_mr_info(rds_ibdev, iinfo); + iinfo->tos = ic->conn->c_tos; + iinfo->sl = ic->i_sl; + iinfo->cache_allocs = atomic_read(&ic->i_cache_allocs); } return 1; } @@ -361,6 +370,7 @@ void rds_ib_exit(void) rds_ib_unregister_client(); rds_ib_destroy_nodev_conns(); rds_ib_sysctl_exit(); + rds_ib_srqs_exit(); rds_ib_recv_exit(); rds_trans_unregister(&rds_ib_transport); rds_ib_fmr_exit(); @@ -415,14 +425,22 @@ int rds_ib_init(void) if (ret) goto out_sysctl; + ret = rds_ib_srqs_init(); + if (ret) { + printk(KERN_ERR "rds_ib_srqs_init failed.\n"); + goto out_recv; + } + ret = rds_trans_register(&rds_ib_transport); if (ret) - goto out_recv; + goto out_srq; rds_info_register_func(RDS_INFO_IB_CONNECTIONS, rds_ib_ic_info); goto out; +out_srq: + rds_ib_srqs_exit(); out_recv: rds_ib_recv_exit(); out_sysctl: diff --git a/net/rds/ib.h b/net/rds/ib.h index f0b7c4e1bc6e..234e078407fb 100644 --- a/net/rds/ib.h +++ b/net/rds/ib.h @@ -16,12 +16,18 @@ #define RDS_IB_DEFAULT_RECV_WR 1024 #define RDS_IB_DEFAULT_SEND_WR 256 +#define RDS_IB_DEFAULT_SRQ_MAX_WR 4096 +#define RDS_IB_DEFAULT_SRQ_REFILL_WR RDS_IB_DEFAULT_SRQ_MAX_WR/2 +#define RDS_IB_DEFAULT_SRQ_LOW_WR RDS_IB_DEFAULT_SRQ_MAX_WR/10 #define RDS_IB_DEFAULT_RETRY_COUNT 1 -#define RDS_IB_SUPPORTED_PROTOCOLS 0x00000003 /* minor versions supported */ +#define RDS_IB_SUPPORTED_PROTOCOLS 0x00000007 /* minor versions supported */ #define RDS_IB_RECYCLE_BATCH_COUNT 32 + +#define RDS_IB_SRQ_POST_BATCH_COUNT 64 + #define RDS_WC_MAX 32 extern struct rw_semaphore rds_ib_devices_lock; @@ -65,6 +71,7 @@ struct rds_ib_connect_private { __be32 dp_reserved1; __be64 dp_ack_seq; __be32 dp_credit; /* non-zero enables flow ctl */ + u8 dp_tos; }; struct rds_ib_send_work { @@ -79,6 +86,8 @@ struct rds_ib_recv_work { struct rds_page_frag *r_frag; struct ib_recv_wr r_wr; struct ib_sge r_sge[2]; + struct rds_ib_connection *r_ic; + int r_posted; }; struct rds_ib_work_ring { @@ -174,6 +183,11 @@ struct rds_ib_connection { /* Batched completions */ unsigned int i_unsignaled_wrs; + u8 i_sl; + + atomic_t i_cache_allocs; + + struct completion i_last_wqe_complete; }; /* This assumes that atomic_t is at least 32 bits */ @@ -188,6 +202,20 @@ struct rds_ib_ipaddr { struct rcu_head rcu_head; }; +struct rds_ib_srq { + struct rds_ib_device *rds_ibdev; + struct ib_srq *s_srq; + struct ib_event_handler s_event_handler; + struct rds_ib_recv_work *s_recvs; + u32 s_n_wr; + struct rds_header *s_recv_hdrs; + u64 s_recv_hdrs_dma; + atomic_t s_num_posted; + unsigned long s_refill_gate; + struct delayed_work s_refill_w; + struct delayed_work s_rearm_w; +}; + struct rds_ib_device { struct list_head list; struct list_head ipaddr_list; @@ -205,6 +233,7 @@ struct rds_ib_device { spinlock_t spinlock; /* protect the above */ atomic_t refcount; struct work_struct free_work; + struct rds_ib_srq *srq; }; #define pcidev_to_node(pcidev) pcibus_to_node(pcidev->bus) @@ -235,6 +264,8 @@ struct rds_ib_statistics { uint64_t s_ib_rx_refill_from_cq; uint64_t s_ib_rx_refill_from_thread; uint64_t s_ib_rx_alloc_limit; + uint64_t s_ib_rx_total_frags; + uint64_t s_ib_rx_total_incs; uint64_t s_ib_rx_credit_updates; uint64_t s_ib_ack_sent; uint64_t s_ib_ack_send_failure; @@ -249,6 +280,9 @@ struct rds_ib_statistics { uint64_t s_ib_rdma_mr_pool_depleted; uint64_t s_ib_atomic_cswp; uint64_t s_ib_atomic_fadd; + uint64_t s_ib_srq_lows; + uint64_t s_ib_srq_refills; + uint64_t s_ib_srq_empty_refills; }; extern struct workqueue_struct *rds_ib_wq; @@ -339,6 +373,8 @@ void rds_ib_fmr_exit(void); /* ib_recv.c */ int rds_ib_recv_init(void); void rds_ib_recv_exit(void); +int rds_ib_srqs_init(void); +void rds_ib_srqs_exit(void); int rds_ib_recv(struct rds_connection *conn); int rds_ib_recv_alloc_caches(struct rds_ib_connection *ic); void rds_ib_recv_free_caches(struct rds_ib_connection *ic); @@ -356,6 +392,8 @@ void rds_ib_recv_init_ack(struct rds_ib_connection *ic); void rds_ib_attempt_ack(struct rds_ib_connection *ic); void rds_ib_ack_send_complete(struct rds_ib_connection *ic); u64 rds_ib_piggyb_ack(struct rds_ib_connection *ic); +void rds_ib_srq_refill(struct work_struct *work); +void rds_ib_srq_rearm(struct work_struct *work); void rds_ib_set_ack(struct rds_ib_connection *ic, u64 seq, int ack_required); @@ -392,6 +430,11 @@ DECLARE_PER_CPU(struct rds_ib_statistics, rds_ib_stats); unsigned int rds_ib_stats_info_copy(struct rds_info_iterator *iter, unsigned int avail); +/* ib_recv.c */ +extern unsigned int rds_ib_srq_max_wr; +extern unsigned int rds_ib_srq_refill_wr; +extern unsigned int rds_ib_srq_low_wr; + /* ib_sysctl.c */ int rds_ib_sysctl_init(void); void rds_ib_sysctl_exit(void); diff --git a/net/rds/ib_cm.c b/net/rds/ib_cm.c index 12ccb1447e56..a936a5940891 100644 --- a/net/rds/ib_cm.c +++ b/net/rds/ib_cm.c @@ -141,31 +141,59 @@ void rds_ib_cm_connect_complete(struct rds_connection *conn, struct rdma_cm_even } } - if (conn->c_version < RDS_PROTOCOL(3,1)) { - printk(KERN_NOTICE "RDS/IB: Connection to %pI4 version %u.%u failed," - " no longer supported\n", - &conn->c_faddr, - RDS_PROTOCOL_MAJOR(conn->c_version), - RDS_PROTOCOL_MINOR(conn->c_version)); - rds_conn_destroy(conn); - return; - } else { - printk(KERN_NOTICE "RDS/IB: connected to %pI4 version %u.%u%s\n", - &conn->c_faddr, - RDS_PROTOCOL_MAJOR(conn->c_version), - RDS_PROTOCOL_MINOR(conn->c_version), - ic->i_flowctl ? ", flow control" : ""); - } + if (conn->c_version < RDS_PROTOCOL(3,2)) { + if (conn->c_version == RDS_PROTOCOL(3,1)) { + if (conn->c_tos) { + printk(KERN_NOTICE "RDS: Connection to" + " %u.%u.%u.%u version %u.%u Tos %d" + " failed, not supporting QoS\n", + NIPQUAD(conn->c_faddr), + RDS_PROTOCOL_MAJOR(conn->c_version), + RDS_PROTOCOL_MINOR(conn->c_version), + conn->c_tos); + rds_conn_drop(conn); + return; + } + } else { + /* + * BUG: destroying connection here can deadlock with + * the CM event handler on the c_cm_lock. + */ + printk(KERN_NOTICE "RDS/IB: Connection to" + " %u.%u.%u.%u version %u.%u failed," + " no longer supported\n", + NIPQUAD(conn->c_faddr), + RDS_PROTOCOL_MAJOR(conn->c_version), + RDS_PROTOCOL_MINOR(conn->c_version)); + rds_conn_destroy(conn); + return; + } + } + + printk(KERN_NOTICE + "RDS/IB: connected to %u.%u.%u.%u version %u.%u%s Tos %d\n", + NIPQUAD(conn->c_faddr), + RDS_PROTOCOL_MAJOR(conn->c_version), + RDS_PROTOCOL_MINOR(conn->c_version), + ic->i_flowctl ? ", flow control" : "", + conn->c_tos); + + ic->i_sl = ic->i_cm_id->route.path_rec->sl; /* * Init rings and fill recv. this needs to wait until protocol negotiation * is complete, since ring layout is different from 3.0 to 3.1. */ rds_ib_send_init_ring(ic); - rds_ib_recv_init_ring(ic); + + if (!ic->conn->c_tos) + rds_ib_recv_init_ring(ic); + /* Post receive buffers - as a side effect, this will update * the posted credit count. */ - rds_ib_recv_refill(conn, 1, GFP_KERNEL); + if (!ic->conn->c_tos) { + rds_ib_recv_refill(conn, 1, GFP_KERNEL); + } /* Tune RNR behavior */ rds_ib_tune_rnr(ic, &qp_attr); @@ -216,6 +244,7 @@ static void rds_ib_cm_fill_conn_param(struct rds_connection *conn, dp->dp_protocol_minor = RDS_PROTOCOL_MINOR(protocol_version); dp->dp_protocol_minor_mask = cpu_to_be16(RDS_IB_SUPPORTED_PROTOCOLS); dp->dp_ack_seq = rds_ib_piggyb_ack(ic); + dp->dp_tos = conn->c_tos; /* Advertise flow control */ if (ic->i_flowctl) { @@ -308,6 +337,9 @@ void rds_ib_tasklet_fn_recv(unsigned long data) struct rds_ib_connection *ic = (struct rds_ib_connection *) data; struct rds_connection *conn = ic->conn; struct rds_ib_ack_state ack_state; + struct rds_ib_device *rds_ibdev = ic->rds_ibdev; + + BUG_ON(conn->c_tos && !rds_ibdev); rds_ib_stats_inc(s_ib_tasklet_call); @@ -325,6 +357,14 @@ void rds_ib_tasklet_fn_recv(unsigned long data) } if (rds_conn_up(conn)) rds_ib_attempt_ack(ic); + + if (conn->c_tos) { + if ((atomic_read(&rds_ibdev->srq->s_num_posted) < + rds_ib_srq_refill_wr) && + !test_and_set_bit(0, &rds_ibdev->srq->s_refill_gate)) + queue_delayed_work(rds_wq, &rds_ibdev->srq->s_refill_w,0); + + } } static void rds_ib_qp_event_handler(struct ib_event *event, void *data) @@ -339,6 +379,9 @@ static void rds_ib_qp_event_handler(struct ib_event *event, void *data) case IB_EVENT_COMM_EST: rdma_notify(ic->i_cm_id, IB_EVENT_COMM_EST); break; + case IB_EVENT_QP_LAST_WQE_REACHED: + complete(&ic->i_last_wqe_complete); + break; default: rdsdebug("Fatal QP Event %u (%s) " "- connection %pI4->%pI4, reconnecting\n", @@ -392,10 +435,16 @@ static int rds_ib_setup_qp(struct rds_connection *conn) goto out; } - ic->i_rcq = ib_create_cq(dev, rds_ib_cq_comp_handler_recv, - rds_ib_cq_event_handler, conn, - ic->i_recv_ring.w_nr, - IB_CQ_VECTOR_LEAST_ATTACHED); + if (ic->conn->c_tos) + ic->i_rcq = ib_create_cq(dev, rds_ib_cq_comp_handler_recv, + rds_ib_cq_event_handler, conn, + rds_ib_srq_max_wr - 1, + IB_CQ_VECTOR_LEAST_ATTACHED); + else + ic->i_rcq = ib_create_cq(dev, rds_ib_cq_comp_handler_recv, + rds_ib_cq_event_handler, conn, + ic->i_recv_ring.w_nr, + IB_CQ_VECTOR_LEAST_ATTACHED); if (IS_ERR(ic->i_rcq)) { ret = PTR_ERR(ic->i_rcq); ic->i_rcq = NULL; @@ -429,6 +478,11 @@ static int rds_ib_setup_qp(struct rds_connection *conn) attr.send_cq = ic->i_scq; attr.recv_cq = ic->i_rcq; + if (ic->conn->c_tos) { + attr.cap.max_recv_wr = 0; + attr.srq = rds_ibdev->srq->s_srq; + } + /* * XXX this can fail if max_*_wr is too large? Are we supposed * to back off until we get a value that the hardware can support? @@ -449,15 +503,17 @@ static int rds_ib_setup_qp(struct rds_connection *conn) goto out; } - ic->i_recv_hdrs = ib_dma_alloc_coherent(dev, - ic->i_recv_ring.w_nr * - sizeof(struct rds_header), - &ic->i_recv_hdrs_dma, GFP_KERNEL); - if (!ic->i_recv_hdrs) { - ret = -ENOMEM; - rdsdebug("ib_dma_alloc_coherent recv failed\n"); - goto out; - } + if (!ic->conn->c_tos) { + ic->i_recv_hdrs = ib_dma_alloc_coherent(dev, + ic->i_recv_ring.w_nr * + sizeof(struct rds_header), + &ic->i_recv_hdrs_dma, GFP_KERNEL); + if (!ic->i_recv_hdrs) { + ret = -ENOMEM; + rdsdebug("ib_dma_alloc_coherent recv failed\n"); + goto out; + } + } ic->i_ack = ib_dma_alloc_coherent(dev, sizeof(struct rds_header), &ic->i_ack_dma, GFP_KERNEL); @@ -476,14 +532,16 @@ static int rds_ib_setup_qp(struct rds_connection *conn) } memset(ic->i_sends, 0, ic->i_send_ring.w_nr * sizeof(struct rds_ib_send_work)); - ic->i_recvs = vmalloc_node(ic->i_recv_ring.w_nr * sizeof(struct rds_ib_recv_work), - ibdev_to_node(dev)); - if (!ic->i_recvs) { - ret = -ENOMEM; - rdsdebug("recv allocation failed\n"); - goto out; - } - memset(ic->i_recvs, 0, ic->i_recv_ring.w_nr * sizeof(struct rds_ib_recv_work)); + if (!ic->conn->c_tos) { + ic->i_recvs = vmalloc(ic->i_recv_ring.w_nr * + sizeof(struct rds_ib_recv_work)); + if (!ic->i_recvs) { + ret = -ENOMEM; + rdsdebug("recv allocation failed\n"); + goto out; + } + memset(ic->i_recvs, 0, ic->i_recv_ring.w_nr * sizeof(struct rds_ib_recv_work)); + } rds_ib_recv_init_ack(ic); @@ -563,7 +621,7 @@ int rds_ib_cm_handle_connect(struct rdma_cm_id *cm_id, (unsigned long long)be64_to_cpu(fguid)); conn = rds_conn_create(dp->dp_daddr, dp->dp_saddr, &rds_ib_transport, - GFP_KERNEL); + dp->dp_tos, GFP_KERNEL); if (IS_ERR(conn)) { rdsdebug("rds_conn_create failed (%ld)\n", PTR_ERR(conn)); conn = NULL; @@ -765,6 +823,13 @@ void rds_ib_conn_shutdown(struct rds_connection *conn) */ rdsdebug("failed to disconnect, cm: %p err %d\n", ic->i_cm_id, err); + } else if (ic->conn->c_tos && ic->rds_ibdev) { + /* + * wait for the last wqe to complete, then schedule + * the recv tasklet to drain the RX CQ. + */ + wait_for_completion(&ic->i_last_wqe_complete); + tasklet_schedule(&ic->i_rtasklet); } /* quiesce tx and rx completion before tearing down */ @@ -857,8 +922,12 @@ void rds_ib_conn_shutdown(struct rds_connection *conn) vfree(ic->i_sends); ic->i_sends = NULL; - vfree(ic->i_recvs); + if (!ic->conn->c_tos) + vfree(ic->i_recvs); + ic->i_recvs = NULL; + + INIT_COMPLETION(ic->i_last_wqe_complete); } int rds_ib_conn_alloc(struct rds_connection *conn, gfp_t gfp) @@ -896,6 +965,8 @@ int rds_ib_conn_alloc(struct rds_connection *conn, gfp_t gfp) ic->conn = conn; conn->c_transport_data = ic; + + init_completion(&ic->i_last_wqe_complete); spin_lock_irqsave(&ib_nodev_conns_lock, flags); list_add_tail(&ic->ib_node, &ib_nodev_conns); diff --git a/net/rds/ib_recv.c b/net/rds/ib_recv.c index fc6aa07609ab..eb6c64623207 100644 --- a/net/rds/ib_recv.c +++ b/net/rds/ib_recv.c @@ -38,6 +38,17 @@ #include "rds.h" #include "ib.h" +unsigned int rds_ib_srq_max_wr = RDS_IB_DEFAULT_SRQ_MAX_WR; +unsigned int rds_ib_srq_refill_wr = RDS_IB_DEFAULT_SRQ_REFILL_WR; +unsigned int rds_ib_srq_low_wr = RDS_IB_DEFAULT_SRQ_LOW_WR; + +module_param(rds_ib_srq_max_wr, int, 0444); +MODULE_PARM_DESC(rds_ib_srq_max_wr, "Max number of SRQ WRs"); +module_param(rds_ib_srq_refill_wr, int, 0444); +MODULE_PARM_DESC(rds_ib_srq_refill_wr, "SRQ refill watermark"); +module_param(rds_ib_srq_low_wr, int, 0444); +MODULE_PARM_DESC(rds_ib_srq_low_wr, "SRQ low watermark"); + static struct kmem_cache *rds_ib_incoming_slab; static struct kmem_cache *rds_ib_frag_slab; static atomic_t rds_ib_allocation = ATOMIC_INIT(0); @@ -193,6 +204,7 @@ static void rds_ib_frag_free(struct rds_ib_connection *ic, rdsdebug("frag %p page %p\n", frag, sg_page(&frag->f_sg)); rds_ib_recv_cache_put(&frag->f_cache_entry, &ic->i_cache_frags); + atomic_inc(&ic->i_cache_allocs); } /* Recycle inc after freeing attached frags */ @@ -252,6 +264,7 @@ static struct rds_ib_incoming *rds_ib_refill_one_inc(struct rds_ib_connection *i if (!ibinc) { return NULL; } + rds_ib_stats_inc(s_ib_rx_total_incs); } INIT_LIST_HEAD(&ibinc->ii_frags); rds_inc_init(&ibinc->ii_inc, ic->conn, ic->conn->c_faddr); @@ -270,6 +283,7 @@ static struct rds_page_frag *rds_ib_refill_one_frag(struct rds_ib_connection *ic cache_item = rds_ib_recv_cache_get(&ic->i_cache_frags); if (cache_item) { frag = container_of(cache_item, struct rds_page_frag, f_cache_entry); + atomic_dec(&ic->i_cache_allocs); } else { frag = kmem_cache_alloc(rds_ib_frag_slab, slab_mask); if (!frag) @@ -296,6 +310,7 @@ static struct rds_page_frag *rds_ib_refill_one_frag(struct rds_ib_connection *ic atomic_dec(&rds_ib_allocation); return NULL; } + rds_ib_stats_inc(s_ib_rx_total_frags); } INIT_LIST_HEAD(&frag->f_item); @@ -354,6 +369,139 @@ out: return ret; } +static void rds_ib_srq_clear_one(struct rds_ib_srq *srq, + struct rds_ib_connection *ic, + struct rds_ib_recv_work *recv) +{ + if (recv->r_ibinc) { + rds_inc_put(&recv->r_ibinc->ii_inc); + recv->r_ibinc = NULL; + } + if (recv->r_frag) { + ib_dma_unmap_sg(srq->rds_ibdev->dev, &recv->r_frag->f_sg, + 1, DMA_FROM_DEVICE); + rds_ib_frag_free(ic, recv->r_frag); + recv->r_frag = NULL; + recv->r_ic = ic; + recv->r_posted = 0; + } +} + +static int rds_ib_srq_refill_one(struct rds_ib_srq *srq, + struct rds_ib_connection *ic, + struct rds_ib_recv_work *recv, gfp_t gfp) +{ + struct ib_sge *sge; + int ret = -ENOMEM; + gfp_t slab_mask = GFP_NOWAIT; + gfp_t page_mask = GFP_NOWAIT; + + if (gfp & __GFP_WAIT) { + slab_mask = GFP_KERNEL; + page_mask = GFP_HIGHUSER; + } + + if (!ic->i_cache_incs.ready) + rds_ib_cache_xfer_to_ready(&ic->i_cache_incs); + if (!ic->i_cache_frags.ready) + rds_ib_cache_xfer_to_ready(&ic->i_cache_frags); + + /* + * ibinc was taken from recv if recv contained the start of a message. + * recvs that were continuations will still have this allocated. + */ + if (!recv->r_ibinc) { + recv->r_ibinc = rds_ib_refill_one_inc(ic, slab_mask); + if (!recv->r_ibinc) + goto out; + } + + WARN_ON_ONCE(recv->r_frag); /* leak! */ + recv->r_frag = rds_ib_refill_one_frag(ic, slab_mask, page_mask); + if (!recv->r_frag) + goto out; + + ret = ib_dma_map_sg(srq->rds_ibdev->dev, &recv->r_frag->f_sg, + 1, DMA_FROM_DEVICE); + + WARN_ON(ret != 1); + + sge = &recv->r_sge[0]; + + sge->addr = srq->s_recv_hdrs_dma + + (recv - srq->s_recvs) * + sizeof(struct rds_header); + + sge->length = sizeof(struct rds_header); + + sge = &recv->r_sge[1]; + sge->addr = sg_dma_address(&recv->r_frag->f_sg); + sge->length = sg_dma_len(&recv->r_frag->f_sg); + + ret = 0; +out: + return ret; +} + +static int rds_ib_srq_prefill_one(struct rds_ib_device *rds_ibdev, + struct rds_ib_recv_work *recv, int prefill) +{ + struct ib_sge *sge; + int ret = -ENOMEM; + gfp_t slab_mask = GFP_NOWAIT; + gfp_t page_mask = GFP_NOWAIT; + + if (prefill) { + slab_mask = GFP_KERNEL; + page_mask = GFP_HIGHUSER; + } + + if (!recv->r_ibinc) { + recv->r_ibinc = kmem_cache_alloc(rds_ib_incoming_slab, slab_mask); + if (!recv->r_ibinc) + goto out; + rds_ib_stats_inc(s_ib_rx_total_incs); + INIT_LIST_HEAD(&recv->r_ibinc->ii_frags); + } + + WARN_ON_ONCE(recv->r_frag); /* leak! */ + recv->r_frag = kmem_cache_alloc(rds_ib_frag_slab, slab_mask); + if (!recv->r_frag) + goto out; + sg_init_table(&recv->r_frag->f_sg, 1); + ret = rds_page_remainder_alloc(&recv->r_frag->f_sg, + RDS_FRAG_SIZE, page_mask); + if (ret) { + kmem_cache_free(rds_ib_frag_slab, recv->r_frag); + goto out; + } + rds_ib_stats_inc(s_ib_rx_total_frags); + INIT_LIST_HEAD(&recv->r_frag->f_item); + + ret = ib_dma_map_sg(rds_ibdev->dev, &recv->r_frag->f_sg, + 1, DMA_FROM_DEVICE); + WARN_ON(ret != 1); + + sge = &recv->r_sge[0]; + sge->addr = rds_ibdev->srq->s_recv_hdrs_dma + + (recv - rds_ibdev->srq->s_recvs) * + sizeof(struct rds_header); + sge->length = sizeof(struct rds_header); + sge->lkey = rds_ibdev->mr->lkey; + + sge = &recv->r_sge[1]; + sge->addr = sg_dma_address(&recv->r_frag->f_sg); + sge->length = sg_dma_len(&recv->r_frag->f_sg); + sge->lkey = rds_ibdev->mr->lkey; + + ret = 0; + +out: + return ret; +} + + + static int acquire_refill(struct rds_connection *conn) { return test_and_set_bit(RDS_RECV_REFILL, &conn->c_flags) == 0; @@ -1007,25 +1155,126 @@ static void rds_ib_process_recv(struct rds_connection *conn, } } +void rds_ib_srq_process_recv(struct rds_connection *conn, + struct rds_ib_recv_work *recv, u32 data_len, + struct rds_ib_ack_state *state) +{ + struct rds_ib_connection *ic = conn->c_transport_data; + struct rds_ib_incoming *ibinc = ic->i_ibinc; + struct rds_header *ihdr, *hdr; + + if (data_len < sizeof(struct rds_header)) { + printk(KERN_WARNING "RDS: from %pI4 didn't inclue a " + "header, disconnecting and " + "reconnecting\n", + &conn->c_faddr); + rds_ib_frag_free(ic, recv->r_frag); + recv->r_frag = NULL; + return; + } + data_len -= sizeof(struct rds_header); + + ihdr = &ic->rds_ibdev->srq->s_recv_hdrs[recv->r_wr.wr_id]; + + /* Validate the checksum. */ + if (!rds_message_verify_checksum(ihdr)) { + printk(KERN_WARNING "RDS: from %pI4 has corrupted header - " + "forcing a reconnect\n", + &conn->c_faddr); + rds_stats_inc(s_recv_drop_bad_checksum); + rds_ib_frag_free(ic, recv->r_frag); + recv->r_frag = NULL; + return; + } + + /* Process the ACK sequence which comes with every packet */ + state->ack_recv = be64_to_cpu(ihdr->h_ack); + state->ack_recv_valid = 1; + + if (ihdr->h_sport == 0 && ihdr->h_dport == 0 && data_len == 0) { + rds_ib_stats_inc(s_ib_ack_received); + rds_ib_frag_free(ic, recv->r_frag); + recv->r_frag = NULL; + return; + } + + if (!ibinc) { + ibinc = recv->r_ibinc; + rds_inc_init(&ibinc->ii_inc, ic->conn, ic->conn->c_faddr); + recv->r_ibinc = NULL; + ic->i_ibinc = ibinc; + hdr = &ibinc->ii_inc.i_hdr; + memcpy(hdr, ihdr, sizeof(*hdr)); + ic->i_recv_data_rem = be32_to_cpu(hdr->h_len); + } else { + hdr = &ibinc->ii_inc.i_hdr; + if (hdr->h_sequence != ihdr->h_sequence + || hdr->h_len != ihdr->h_len + || hdr->h_sport != ihdr->h_sport + || hdr->h_dport != ihdr->h_dport) { + printk(KERN_WARNING "RDS: fragment header mismatch; " + "forcing reconnect\n"); + rds_ib_frag_free(ic, recv->r_frag); + recv->r_frag = NULL; + return; + } + } + + list_add_tail(&recv->r_frag->f_item, &ibinc->ii_frags); + + recv->r_frag = NULL; + + if (ic->i_recv_data_rem > RDS_FRAG_SIZE) + ic->i_recv_data_rem -= RDS_FRAG_SIZE; + else { + ic->i_recv_data_rem = 0; + ic->i_ibinc = NULL; + + if (ibinc->ii_inc.i_hdr.h_flags == RDS_FLAG_CONG_BITMAP) + rds_ib_cong_recv(conn, ibinc); + else { + rds_recv_incoming(conn, conn->c_faddr, conn->c_laddr, + &ibinc->ii_inc, GFP_ATOMIC, + KM_SOFTIRQ0); + + state->ack_next = be64_to_cpu(hdr->h_sequence); + state->ack_next_valid = 1; + } + if (hdr->h_flags & RDS_FLAG_ACK_REQUIRED) { + rds_stats_inc(s_recv_ack_required); + state->ack_required = 1; + } + rds_inc_put(&ibinc->ii_inc); + } +} + void rds_ib_recv_cqe_handler(struct rds_ib_connection *ic, struct ib_wc *wc, struct rds_ib_ack_state *state) { struct rds_connection *conn = ic->conn; struct rds_ib_recv_work *recv; + struct rds_ib_device *rds_ibdev = ic->rds_ibdev; rdsdebug("wc wr_id 0x%llx status %u byte_len %u imm_data %u\n", (unsigned long long)wc->wr_id, wc->status, wc->byte_len, be32_to_cpu(wc->ex.imm_data)); rds_ib_stats_inc(s_ib_rx_cq_event); - - recv = &ic->i_recvs[rds_ib_ring_oldest(&ic->i_recv_ring)]; + + if (conn->c_tos) { + recv = &rds_ibdev->srq->s_recvs[wc->wr_id]; + atomic_dec(&rds_ibdev->srq->s_num_posted); + } else + recv = &ic->i_recvs[rds_ib_ring_oldest(&ic->i_recv_ring)]; ib_dma_unmap_sg(ic->i_cm_id->device, &recv->r_frag->f_sg, 1, DMA_FROM_DEVICE); if (wc->status == IB_WC_SUCCESS) { - rds_ib_process_recv(conn, recv, wc->byte_len, state); + if (ic->conn->c_tos) + rds_ib_srq_process_recv(conn, recv, wc->byte_len, state); + else + rds_ib_process_recv(conn, recv, wc->byte_len, state); } else { /* We expect errors as the qp is drained during shutdown */ if (rds_conn_up(conn) || rds_conn_connecting(conn)) @@ -1047,11 +1296,132 @@ void rds_ib_recv_cqe_handler(struct rds_ib_connection *ic, rds_ib_frag_free(ic, recv->r_frag); recv->r_frag = NULL; } - rds_ib_ring_free(&ic->i_recv_ring, 1); - rds_ib_recv_refill(conn, 0, GFP_NOWAIT); + if (!ic->conn->c_tos) { + rds_ib_ring_free(&ic->i_recv_ring, 1); + rds_ib_recv_refill(conn, 0, GFP_NOWAIT); + } else { + recv->r_ic = ic; + recv->r_posted = 0; + } +} + +void rds_ib_srq_refill(struct work_struct *work) +{ + struct rds_ib_srq *srq = container_of(work, struct rds_ib_srq, s_refill_w.work); + struct rds_ib_recv_work *prv=NULL, *cur=NULL, *tmp; + struct ib_recv_wr *bad_wr; + int i,refills=0,total_refills=0; + + if (!test_bit(0,&srq->s_refill_gate)) + return; + + rds_ib_stats_inc(s_ib_srq_refills); + + for (i=0; is_n_wr; i++) { + tmp = &srq->s_recvs[i]; + if (tmp->r_posted) + continue; + + if (rds_ib_srq_refill_one(srq, tmp->r_ic, tmp, GFP_NOWAIT)) { + printk(KERN_ERR "rds_ib_srq_refill_one failed\n"); + break; + } + cur = tmp; + + if (!prv) { + prv = cur; + prv->r_wr.next = NULL; + } else { + cur->r_wr.next = &prv->r_wr; + prv = cur; + } + cur->r_posted = 1; + + total_refills++; + if (++refills == RDS_IB_SRQ_POST_BATCH_COUNT) { + if (ib_post_srq_recv(srq->s_srq, &cur->r_wr, &bad_wr)) { + struct ib_recv_wr *wr; + struct rds_ib_recv_work *recv; + + for (wr = &cur->r_wr; wr; wr = wr->next) { + recv = container_of(wr, struct rds_ib_recv_work, r_wr); + rds_ib_srq_clear_one(srq, recv->r_ic, recv); + } + printk(KERN_ERR "ib_post_srq_recv failed\n"); + goto out; + } + + atomic_add(refills,&srq->s_num_posted); + prv = NULL; + refills = 0; + cur = NULL; + } + } + if (cur) { + if (ib_post_srq_recv(srq->s_srq, &cur->r_wr, &bad_wr)) { + struct ib_recv_wr *wr; + struct rds_ib_recv_work *recv; + + for (wr = &cur->r_wr; wr; wr = wr->next) { + recv = container_of(wr, struct rds_ib_recv_work, r_wr); + rds_ib_srq_clear_one(srq, recv->r_ic, recv); + } + printk(KERN_ERR "ib_post_srq_recv failed\n"); + goto out; + } + atomic_add(refills,&srq->s_num_posted); + } + + if (!total_refills) + rds_ib_stats_inc(s_ib_srq_empty_refills); +out: + clear_bit(0,&srq->s_refill_gate); +} + +int rds_ib_srq_prefill_ring(struct rds_ib_device *rds_ibdev) +{ + struct rds_ib_recv_work *recv; + struct ib_recv_wr *bad_wr; + u32 i; + int ret; + + for (i = 0, recv = rds_ibdev->srq->s_recvs; + i < rds_ibdev->srq->s_n_wr; i++, recv++) { + recv->r_wr.next = NULL; + recv->r_wr.wr_id = i; + recv->r_wr.sg_list = recv->r_sge; + recv->r_wr.num_sge = RDS_IB_RECV_SGE; + recv->r_ibinc = NULL; + recv->r_frag = NULL; + recv->r_ic = NULL; + + if (rds_ib_srq_prefill_one(rds_ibdev, recv, 1)) + return 1; + + ret = ib_post_srq_recv(rds_ibdev->srq->s_srq, + &recv->r_wr, &bad_wr); + if (ret) { + printk(KERN_WARNING "RDS: ib_post_srq_recv failed %d\n", ret); + return 1; + } + atomic_inc(&rds_ibdev->srq->s_num_posted); + recv->r_posted = 1; + } + return 0; +} + +static void rds_ib_srq_clear_ring(struct rds_ib_device *rds_ibdev) +{ + u32 i; + struct rds_ib_recv_work *recv; + + for (i = 0, recv = rds_ibdev->srq->s_recvs; + i < rds_ibdev->srq->s_n_wr; i++, recv++) + rds_ib_srq_clear_one(rds_ibdev->srq, recv->r_ic, recv); } + int rds_ib_recv(struct rds_connection *conn) { struct rds_ib_connection *ic = conn->c_transport_data; @@ -1096,3 +1466,141 @@ void rds_ib_recv_exit(void) kmem_cache_destroy(rds_ib_incoming_slab); kmem_cache_destroy(rds_ib_frag_slab); } + +void rds_ib_srq_rearm(struct work_struct *work) +{ + struct rds_ib_srq *srq = container_of(work, struct rds_ib_srq, s_rearm_w.work); + struct ib_srq_attr srq_attr; + + srq_attr.srq_limit = rds_ib_srq_low_wr; + if (ib_modify_srq(srq->s_srq, &srq_attr, IB_SRQ_LIMIT)) { + printk(KERN_ERR "RDS: ib_modify_srq failed\n"); + return; + } +} + +static void rds_ib_srq_event(struct ib_event *event, + void *ctx) +{ + struct ib_srq_attr srq_attr; + struct rds_ib_device *rds_ibdev = ctx; + + switch (event->event) { + case IB_EVENT_SRQ_ERR: + printk(KERN_ERR "RDS: event IB_EVENT_SRQ_ERR unhandled\n", + event->event); + break; + case IB_EVENT_SRQ_LIMIT_REACHED: + rds_ib_stats_inc(s_ib_srq_lows); + queue_delayed_work(rds_wq, &rds_ibdev->srq->s_rearm_w,HZ); + + if (!test_and_set_bit(0, &rds_ibdev->srq->s_refill_gate)) + queue_delayed_work(rds_wq, &rds_ibdev->srq->s_refill_w, 0); + break; + default: + break; + } +} + +/* Setup SRQ for a device */ +int rds_ib_srq_init(struct rds_ib_device *rds_ibdev) +{ + struct ib_srq_init_attr srq_init_attr = { + rds_ib_srq_event, + (void *)rds_ibdev, + .attr = { + .max_wr = rds_ib_srq_max_wr - 1, + .max_sge = rds_ibdev->max_sge + } + }; + + rds_ibdev->srq->rds_ibdev = rds_ibdev; + + rds_ibdev->srq->s_n_wr = rds_ib_srq_max_wr - 1; + rds_ibdev->srq->s_srq = ib_create_srq(rds_ibdev->pd, + &srq_init_attr); + + if (IS_ERR(rds_ibdev->srq->s_srq)) { + printk(KERN_WARNING "RDS: ib_create_srq failed %d\n", + ERR_PTR(rds_ibdev->srq->s_srq)); + return 1; + } + + rds_ibdev->srq->s_recv_hdrs = ib_dma_alloc_coherent(rds_ibdev->dev, + rds_ibdev->srq->s_n_wr * + sizeof(struct rds_header), + &rds_ibdev->srq->s_recv_hdrs_dma, GFP_KERNEL); + if (!rds_ibdev->srq->s_recv_hdrs) { + printk(KERN_WARNING "ib_dma_alloc_coherent failed\n"); + return 1; + } + + rds_ibdev->srq->s_recvs = vmalloc(rds_ibdev->srq->s_n_wr * + sizeof(struct rds_ib_recv_work)); + + if (!rds_ibdev->srq->s_recvs) { + printk(KERN_WARNING "RDS: vmalloc failed\n"); + return 1; + } + + memset(rds_ibdev->srq->s_recvs, 0, rds_ibdev->srq->s_n_wr * + sizeof(struct rds_ib_recv_work)); + + atomic_set(&rds_ibdev->srq->s_num_posted,0); + clear_bit(0,&rds_ibdev->srq->s_refill_gate); + + if (rds_ib_srq_prefill_ring(rds_ibdev)) + return 1; + + INIT_DELAYED_WORK(&rds_ibdev->srq->s_refill_w, rds_ib_srq_refill); + + INIT_DELAYED_WORK(&rds_ibdev->srq->s_rearm_w, rds_ib_srq_rearm); + + queue_delayed_work(rds_wq, &rds_ibdev->srq->s_rearm_w, 0); + + return 0; +} + +int rds_ib_srqs_init(void) +{ + struct rds_ib_device *rds_ibdev; + int ret; + + list_for_each_entry(rds_ibdev, &rds_ib_devices, list) { + ret = rds_ib_srq_init(rds_ibdev); + if (ret) return ret; + } + + return 0; +} + +void rds_ib_srq_exit(struct rds_ib_device *rds_ibdev) +{ + int ret; + + ret = ib_destroy_srq(rds_ibdev->srq->s_srq); + if (ret) { + printk(KERN_WARNING "RDS: ib_destroy_srq failed %d\n", ret); + } + rds_ibdev->srq->s_srq = NULL; + + if (rds_ibdev->srq->s_recv_hdrs) + ib_dma_free_coherent(rds_ibdev->dev, + rds_ibdev->srq->s_n_wr * + sizeof(struct rds_header), + rds_ibdev->srq->s_recv_hdrs, + rds_ibdev->srq->s_recv_hdrs_dma); + + rds_ib_srq_clear_ring(rds_ibdev); + vfree(rds_ibdev->srq->s_recvs); + rds_ibdev->srq->s_recvs = NULL; +} + +void rds_ib_srqs_exit(void) +{ + struct rds_ib_device *rds_ibdev; + + list_for_each_entry(rds_ibdev, &rds_ib_devices, list) { + rds_ib_srq_exit(rds_ibdev); + } +} diff --git a/net/rds/ib_stats.c b/net/rds/ib_stats.c index 10959bf155a4..80ce6dcee2b6 100644 --- a/net/rds/ib_stats.c +++ b/net/rds/ib_stats.c @@ -55,6 +55,8 @@ static char *rds_ib_stat_names[] = { "ib_rx_refill_from_cq", "ib_rx_refill_from_thread", "ib_rx_alloc_limit", + "ib_rx_total_frags", + "ib_rx_total_incs", "ib_rx_credit_updates", "ib_ack_sent", "ib_ack_send_failure", @@ -69,6 +71,9 @@ static char *rds_ib_stat_names[] = { "ib_rdma_mr_pool_depleted", "ib_atomic_cswp", "ib_atomic_fadd", + "ib_srq_lows", + "ib_srq_refills", + "ib_srq_empty_refills", }; unsigned int rds_ib_stats_info_copy(struct rds_info_iterator *iter, diff --git a/net/rds/iw_cm.c b/net/rds/iw_cm.c index 00bf3e6c9311..f80dac1ff1b1 100644 --- a/net/rds/iw_cm.c +++ b/net/rds/iw_cm.c @@ -396,7 +396,7 @@ int rds_iw_cm_handle_connect(struct rdma_cm_id *cm_id, RDS_PROTOCOL_MAJOR(version), RDS_PROTOCOL_MINOR(version)); conn = rds_conn_create(dp->dp_daddr, dp->dp_saddr, &rds_iw_transport, - GFP_KERNEL); + 0, GFP_KERNEL); if (IS_ERR(conn)) { rdsdebug("rds_conn_create failed (%ld)\n", PTR_ERR(conn)); conn = NULL; diff --git a/net/rds/rdma_transport.c b/net/rds/rdma_transport.c index ed82a8d47cc9..646ba079288a 100644 --- a/net/rds/rdma_transport.c +++ b/net/rds/rdma_transport.c @@ -76,6 +76,9 @@ int rds_rdma_cm_event_handler(struct rdma_cm_id *cm_id, break; case RDMA_CM_EVENT_ADDR_RESOLVED: + rdma_set_service_type(cm_id, conn->c_tos); + + /* XXX do we need to clean up if this fails? */ ret = rdma_resolve_route(cm_id, RDS_RDMA_RESOLVE_TIMEOUT_MS); diff --git a/net/rds/rds.h b/net/rds/rds.h index 3aec13d2c766..a4179429a0e3 100644 --- a/net/rds/rds.h +++ b/net/rds/rds.h @@ -15,7 +15,8 @@ */ #define RDS_PROTOCOL_3_0 0x0300 #define RDS_PROTOCOL_3_1 0x0301 -#define RDS_PROTOCOL_VERSION RDS_PROTOCOL_3_1 +#define RDS_PROTOCOL_3_2 0x0302 +#define RDS_PROTOCOL_VERSION RDS_PROTOCOL_3_2 #define RDS_PROTOCOL_MAJOR(v) ((v) >> 8) #define RDS_PROTOCOL_MINOR(v) ((v) & 255) #define RDS_PROTOCOL(maj, min) (((maj) << 8) | min) @@ -134,9 +135,12 @@ struct rds_connection { /* Re-connect stall diagnostics */ unsigned long c_reconnect_start; - unsigned long c_reconnect_drops; + unsigned int c_reconnect_drops; int c_reconnect_warn; int c_reconnect_err; + + /* Qos support */ + u8 c_tos; }; #define RDS_FLAG_CONG_BITMAP 0x01 @@ -514,6 +518,8 @@ struct rds_sock { unsigned char rs_recverr, rs_cong_monitor; int poison; + + u8 rs_tos; }; static inline struct rds_sock *rds_sk_to_rs(const struct sock *sk) @@ -615,9 +621,13 @@ struct rds_message *rds_cong_update_alloc(struct rds_connection *conn); int rds_conn_init(void); void rds_conn_exit(void); struct rds_connection *rds_conn_create(__be32 laddr, __be32 faddr, - struct rds_transport *trans, gfp_t gfp); + struct rds_transport *trans, + u8 tos, gfp_t gfp); struct rds_connection *rds_conn_create_outgoing(__be32 laddr, __be32 faddr, - struct rds_transport *trans, gfp_t gfp); + struct rds_transport *trans, + u8 tos, gfp_t gfp); +struct rds_connection *rds_conn_find(__be32 laddr, __be32 faddr, + struct rds_transport *trans, u8 tos); void rds_conn_shutdown(struct rds_connection *conn); void rds_conn_destroy(struct rds_connection *conn); void rds_conn_reset(struct rds_connection *conn); diff --git a/net/rds/recv.c b/net/rds/recv.c index d25b52a5c9ba..2c6ce877f03b 100644 --- a/net/rds/recv.c +++ b/net/rds/recv.c @@ -530,6 +530,7 @@ void rds_inc_info_copy(struct rds_incoming *inc, minfo.seq = be64_to_cpu(inc->i_hdr.h_sequence); minfo.len = be32_to_cpu(inc->i_hdr.h_len); + minfo.tos = inc->i_conn->c_tos; if (flip) { minfo.laddr = daddr; diff --git a/net/rds/send.c b/net/rds/send.c index ff7428cc9ac1..4f8199e517d1 100644 --- a/net/rds/send.c +++ b/net/rds/send.c @@ -1048,11 +1048,12 @@ int rds_sendmsg(struct kiocb *iocb, struct socket *sock, struct msghdr *msg, /* rds_conn_create has a spinlock that runs with IRQ off. * Caching the conn in the socket helps a lot. */ - if (rs->rs_conn && rs->rs_conn->c_faddr == daddr) + if (rs->rs_conn && rs->rs_conn->c_faddr == daddr && + rs->rs_tos == rs->rs_conn->c_tos) conn = rs->rs_conn; else { conn = rds_conn_create_outgoing(rs->rs_bound_addr, daddr, - rs->rs_transport, + rs->rs_transport, rs->rs_tos, sock->sk->sk_allocation); if (IS_ERR(conn)) { ret = PTR_ERR(conn); diff --git a/net/rds/tcp_listen.c b/net/rds/tcp_listen.c index d2ef06bad6a6..9a2ae298581a 100644 --- a/net/rds/tcp_listen.c +++ b/net/rds/tcp_listen.c @@ -71,7 +71,7 @@ static int rds_tcp_accept_one(struct socket *sock) NIPQUAD(inet->daddr), ntohs(inet->dport)); conn = rds_conn_create(inet->saddr, inet->daddr, &rds_tcp_transport, - GFP_KERNEL); + 0, GFP_KERNEL); if (IS_ERR(conn)) { ret = PTR_ERR(conn); goto out; diff --git a/net/rds/threads.c b/net/rds/threads.c index 84a06c6c1964..dc1896e731b1 100644 --- a/net/rds/threads.c +++ b/net/rds/threads.c @@ -88,7 +88,9 @@ void rds_connect_complete(struct rds_connection *conn) conn->c_reconnect_jiffies = 0; set_bit(0, &conn->c_map_queued); queue_delayed_work(rds_wq, &conn->c_send_w, 0); - queue_delayed_work(rds_wq, &conn->c_recv_w, 0); + if (!conn->c_tos) + queue_delayed_work(rds_wq, &conn->c_recv_w, 0); + conn->c_connection_start = get_seconds(); } EXPORT_SYMBOL_GPL(rds_connect_complete);