]> www.infradead.org Git - users/jedix/linux-maple.git/commitdiff
RDS Quality Of Service
authorBang Nguyen <bang.nguyen@oracle.com>
Fri, 3 Feb 2012 16:10:06 +0000 (11:10 -0500)
committerMukesh Kacker <mukesh.kacker@oracle.com>
Tue, 7 Jul 2015 23:41:34 +0000 (16:41 -0700)
RDS QoS is an extension of IB QoS to provide clients the ability to
segregate traffic flows and define policy to regulate them.
Internally, each traffic flow is represented by a connection with all of its
independent resources like that of a normal connection, and is
differentiated by service type. In other words, there can be multiple
connections between an IP pair and each supports a unique service type.
Service type (TOS) is user-defined and can be configured to satisfy certain
traffic requirements. For example, one service type may be configured for
high-priority low-latency traffic, another for low-priority high-bandwidth
traffic, and so on.

TOS is socket based. Client can set TOS on a socket via an IOCTL and must
do so before initiating any traffic. Once set, the TOS can not be changed.

        ioctl(fd, RDS_IOC_SET_TOS=1, (uint8_t *)<TOS ptr>)

All out-going traffic from the socket will be associated with its TOS.

Signed-off-by: Venkat Venkatsubra <venkat.x.venkatsubra@oracle.com>
Signed-off-by: Chris Mason <chris.mason@oracle.com>
Signed-off-by: Bang Nguyen <bang.nguyen@oracle.com>
15 files changed:
include/linux/rds.h
net/rds/af_rds.c
net/rds/connection.c
net/rds/ib.c
net/rds/ib.h
net/rds/ib_cm.c
net/rds/ib_recv.c
net/rds/ib_stats.c
net/rds/iw_cm.c
net/rds/rdma_transport.c
net/rds/rds.h
net/rds/recv.c
net/rds/send.c
net/rds/tcp_listen.c
net/rds/threads.c

index 0833ae73d06207c2f247fe79ccefc8bb45782197..4a649f4a0927d607a1beb2c3dc48bec67dace613 100644 (file)
 #define RDS_RECVERR                    5
 #define RDS_CONG_MONITOR               6
 #define RDS_GET_MR_FOR_DEST            7
+#define RDS_CONN_RESET                  8
+
+/*
+ * ioctl commands for SOL_RDS
+*/
+#define RDS_IOC_SET_TOS                 1
+
+typedef u_int8_t         rds_tos_t;
+
 
 /*
  * Control message types for SOL_RDS.
@@ -117,6 +126,7 @@ struct rds_info_connection {
        __be32          faddr;
        u_int8_t        transport[TRANSNAMSIZ];         /* null term ascii */
        u_int8_t        flags;
+        u_int8_t        tos;
 } __attribute__((packed));
 
 struct rds_info_flow {
@@ -138,6 +148,7 @@ struct rds_info_message {
        __be16          lport;
        __be16          fport;
        u_int8_t        flags;
+        u_int8_t        tos;
 } __attribute__((packed));
 
 struct rds_info_socket {
@@ -174,6 +185,9 @@ struct rds_info_rdma_connection {
        uint32_t        max_send_sge;
        uint32_t        rdma_mr_max;
        uint32_t        rdma_mr_size;
+        uint8_t         tos;
+        uint8_t         sl;
+        uint32_t        cache_allocs;
 };
 
 /*
@@ -258,6 +272,12 @@ struct rds_atomic_args {
        u_int64_t       user_token;
 };
 
+struct rds_reset {
+        u_int8_t        tos;
+        struct in_addr  src;
+        struct in_addr  dst;
+};
+
 struct rds_rdma_notify {
        u_int64_t       user_token;
        int32_t         status;
index 33ca8713858089f34718f3d1b6df0099d4bb9cc7..b13b922fdcf16e84ebfc8950b992358f3b53f940 100644 (file)
@@ -200,7 +200,28 @@ static unsigned int rds_poll(struct file *file, struct socket *sock,
 
 static int rds_ioctl(struct socket *sock, unsigned int cmd, unsigned long arg)
 {
-       return -ENOIOCTLCMD;
+        struct rds_sock *rs = rds_sk_to_rs(sock->sk);
+        rds_tos_t tos;
+        unsigned long flags;
+
+        if (get_user(tos, (rds_tos_t __user *)arg))
+                return -EFAULT;
+
+        switch (cmd) {
+        case RDS_IOC_SET_TOS:
+                spin_lock_irqsave(&rds_sock_lock, flags);
+                if (rs->rs_tos || rs->rs_conn) {
+                        spin_unlock_irqrestore(&rds_sock_lock, flags);
+                        return -EINVAL;
+                }
+                rs->rs_tos = tos;
+                spin_unlock_irqrestore(&rds_sock_lock, flags);
+                break;
+        default:
+                return -ENOPROTOOPT;
+        }
+
+        return 0;
 }
 
 static int rds_cancel_sent_to(struct rds_sock *rs, char __user *optval,
@@ -261,6 +282,32 @@ static int rds_cong_monitor(struct rds_sock *rs, char __user *optval,
        return ret;
 }
 
+static int rds_user_reset(struct rds_sock *rs, char __user *optval, int optlen)
+{
+        struct rds_reset reset;
+        struct rds_connection *conn;
+
+        if (optlen != sizeof(struct rds_reset))
+                return -EINVAL;
+
+        if (copy_from_user(&reset, (struct rds_reset __user *)optval,
+                                sizeof(struct rds_reset)))
+                return -EFAULT;
+
+        conn = rds_conn_find(reset.src.s_addr, reset.dst.s_addr,
+                        rs->rs_transport, reset.tos);
+
+        if (conn) {
+                printk(KERN_NOTICE "Resetting RDS/IB connection "
+                                "<%u.%u.%u.%u,%u.%u.%u.%u,%d>\n",
+                                NIPQUAD(reset.src.s_addr),
+                                NIPQUAD(reset.dst.s_addr), conn->c_tos);
+                rds_conn_drop(conn);
+        }
+
+        return 0;
+}
+
 static int rds_setsockopt(struct socket *sock, int level, int optname,
                          char __user *optval, unsigned int optlen)
 {
@@ -291,6 +338,9 @@ static int rds_setsockopt(struct socket *sock, int level, int optname,
        case RDS_CONG_MONITOR:
                ret = rds_cong_monitor(rs, optval, optlen);
                break;
+        case RDS_CONN_RESET:
+                ret = rds_user_reset(rs, optval, optlen);
+                break;
        default:
                ret = -ENOPROTOOPT;
        }
@@ -425,6 +475,8 @@ static int __rds_create(struct socket *sock, struct sock *sk, int protocol)
        spin_lock_init(&rs->rs_rdma_lock);
        rs->rs_rdma_keys = RB_ROOT;
        rs->poison = 0xABABABAB;
+       rs->rs_tos = 0;
+       rs->rs_conn = 0;
 
        if (rs->rs_bound_addr) {
 printk(KERN_CRIT "bound addr %x at create\n", rs->rs_bound_addr);
index af13c524edc8a7a2574ce89ae2fdc180edb92637..d99b958aa3dcc40526e7e3270712e539f3fdbba0 100644 (file)
@@ -64,13 +64,15 @@ static struct hlist_head *rds_conn_bucket(__be32 laddr, __be32 faddr)
 /* rcu read lock must be held or the connection spinlock */
 static struct rds_connection *rds_conn_lookup(struct hlist_head *head,
                                              __be32 laddr, __be32 faddr,
-                                             struct rds_transport *trans)
+                                              struct rds_transport *trans,
+                                              u8 tos)
 {
        struct rds_connection *conn, *ret = NULL;
        struct hlist_node *pos;
 
        hlist_for_each_entry_rcu(conn, pos, head, c_hash_node) {
                if (conn->c_faddr == faddr && conn->c_laddr == laddr &&
+                               conn->c_tos == tos &&
                                conn->c_trans == trans) {
                        ret = conn;
                        break;
@@ -112,6 +114,7 @@ void rds_conn_reset(struct rds_connection *conn)
  */
 static struct rds_connection *__rds_conn_create(__be32 laddr, __be32 faddr,
                                       struct rds_transport *trans, gfp_t gfp,
+                                      u8 tos,
                                       int is_outgoing)
 {
        struct rds_connection *conn, *parent = NULL;
@@ -121,7 +124,7 @@ static struct rds_connection *__rds_conn_create(__be32 laddr, __be32 faddr,
        int ret;
 
        rcu_read_lock();
-       conn = rds_conn_lookup(head, laddr, faddr, trans);
+       conn = rds_conn_lookup(head, laddr, faddr, trans, tos);
        if (conn
         && conn->c_loopback
         && conn->c_trans != &rds_loop_transport
@@ -156,6 +159,8 @@ static struct rds_connection *__rds_conn_create(__be32 laddr, __be32 faddr,
        INIT_LIST_HEAD(&conn->c_send_queue);
        INIT_LIST_HEAD(&conn->c_retrans);
 
+       conn->c_tos = tos;
+
        ret = rds_cong_get_maps(conn);
        if (ret) {
                kmem_cache_free(rds_conn_slab, conn);
@@ -233,7 +238,7 @@ static struct rds_connection *__rds_conn_create(__be32 laddr, __be32 faddr,
                /* Creating normal conn */
                struct rds_connection *found;
 
-               found = rds_conn_lookup(head, laddr, faddr, trans);
+               found = rds_conn_lookup(head, laddr, faddr, trans, tos);
                if (found) {
                        trans->conn_free(conn->c_transport_data);
                        kmem_cache_free(rds_conn_slab, conn);
@@ -251,19 +256,35 @@ out:
 }
 
 struct rds_connection *rds_conn_create(__be32 laddr, __be32 faddr,
-                                      struct rds_transport *trans, gfp_t gfp)
+                                       struct rds_transport *trans,
+                                       u8 tos, gfp_t gfp)
 {
-       return __rds_conn_create(laddr, faddr, trans, gfp, 0);
+       return __rds_conn_create(laddr, faddr, trans, gfp, tos, 0);
 }
 EXPORT_SYMBOL_GPL(rds_conn_create);
 
 struct rds_connection *rds_conn_create_outgoing(__be32 laddr, __be32 faddr,
-                                      struct rds_transport *trans, gfp_t gfp)
+                                       struct rds_transport *trans,
+                                       u8 tos, gfp_t gfp)
 {
-       return __rds_conn_create(laddr, faddr, trans, gfp, 1);
+       return __rds_conn_create(laddr, faddr, trans, gfp, tos, 1);
 }
 EXPORT_SYMBOL_GPL(rds_conn_create_outgoing);
 
+struct rds_connection *rds_conn_find(__be32 laddr, __be32 faddr,
+                                        struct rds_transport *trans, u8 tos)
+{
+        struct rds_connection *conn;
+        struct hlist_head *head = rds_conn_bucket(laddr, faddr);
+
+        rcu_read_lock();
+        conn = rds_conn_lookup(head, laddr, faddr, trans, tos);
+        rcu_read_unlock();
+
+        return conn;
+}
+EXPORT_SYMBOL_GPL(rds_conn_find);
+
 void rds_conn_shutdown(struct rds_connection *conn)
 {
        /* shut it down unless it's down already */
@@ -490,6 +511,7 @@ static int rds_conn_info_visitor(struct rds_connection *conn,
        cinfo->next_rx_seq = conn->c_next_rx_seq;
        cinfo->laddr = conn->c_laddr;
        cinfo->faddr = conn->c_faddr;
+       cinfo->tos = conn->c_tos;
        strncpy(cinfo->transport, conn->c_trans->t_name,
                sizeof(cinfo->transport));
        cinfo->flags = 0;
@@ -562,7 +584,7 @@ void rds_conn_drop(struct rds_connection *conn)
        } else if ((conn->c_reconnect_warn) &&
                   (now - conn->c_reconnect_start > 60)) {
                printk(KERN_INFO "RDS/IB: re-connect to %u.%u.%u.%u is "
-                       "stalling for more than 1 min...(drops=%d err=%d)\n",
+                       "stalling for more than 1 min...(drops=%u err=%d)\n",
                        NIPQUAD(conn->c_faddr), conn->c_reconnect_drops,
                        conn->c_reconnect_err);
                conn->c_reconnect_warn = 0;
index 4d8c00f797b21f56929b8333cb5173753b96d40c..89736d1a0b12be889e07bbf8ef4d91a031256384 100644 (file)
@@ -101,6 +101,8 @@ static void rds_ib_dev_free(struct work_struct *work)
                ib_dereg_mr(rds_ibdev->mr);
        if (rds_ibdev->pd)
                ib_dealloc_pd(rds_ibdev->pd);
+        if (rds_ibdev->srq)
+                kfree(rds_ibdev->srq);
 
        list_for_each_entry_safe(i_ipaddr, i_next, &rds_ibdev->ipaddr_list, list) {
                list_del(&i_ipaddr->list);
@@ -174,6 +176,10 @@ void rds_ib_add_one(struct ib_device *device)
                goto put_dev;
        }
 
+        rds_ibdev->srq = kmalloc(sizeof(struct rds_ib_srq), GFP_KERNEL);
+        if (!rds_ibdev->srq)
+                goto free_attr;
+
        INIT_LIST_HEAD(&rds_ibdev->ipaddr_list);
        INIT_LIST_HEAD(&rds_ibdev->conn_list);
 
@@ -291,6 +297,9 @@ static int rds_ib_conn_info_visitor(struct rds_connection *conn,
                iinfo->max_recv_wr = ic->i_recv_ring.w_nr;
                iinfo->max_send_sge = rds_ibdev->max_sge;
                rds_ib_get_mr_info(rds_ibdev, iinfo);
+                iinfo->tos = ic->conn->c_tos;
+                iinfo->sl = ic->i_sl;
+                iinfo->cache_allocs = atomic_read(&ic->i_cache_allocs);
        }
        return 1;
 }
@@ -361,6 +370,7 @@ void rds_ib_exit(void)
        rds_ib_unregister_client();
        rds_ib_destroy_nodev_conns();
        rds_ib_sysctl_exit();
+       rds_ib_srqs_exit();
        rds_ib_recv_exit();
        rds_trans_unregister(&rds_ib_transport);
        rds_ib_fmr_exit();
@@ -415,14 +425,22 @@ int rds_ib_init(void)
        if (ret)
                goto out_sysctl;
 
+        ret = rds_ib_srqs_init();
+        if (ret) {
+                printk(KERN_ERR "rds_ib_srqs_init failed.\n");
+                goto out_recv;
+        }
+
        ret = rds_trans_register(&rds_ib_transport);
        if (ret)
-               goto out_recv;
+               goto out_srq;
 
        rds_info_register_func(RDS_INFO_IB_CONNECTIONS, rds_ib_ic_info);
 
        goto out;
 
+out_srq:
+        rds_ib_srqs_exit();
 out_recv:
        rds_ib_recv_exit();
 out_sysctl:
index f0b7c4e1bc6e0cd65f54916a9f9242052d4e3ba8..234e078407fb5f385e2c01463f360cf7e3715e23 100644 (file)
 
 #define RDS_IB_DEFAULT_RECV_WR         1024
 #define RDS_IB_DEFAULT_SEND_WR         256
+#define RDS_IB_DEFAULT_SRQ_MAX_WR       4096
+#define RDS_IB_DEFAULT_SRQ_REFILL_WR    RDS_IB_DEFAULT_SRQ_MAX_WR/2
+#define RDS_IB_DEFAULT_SRQ_LOW_WR       RDS_IB_DEFAULT_SRQ_MAX_WR/10
 
 #define RDS_IB_DEFAULT_RETRY_COUNT     1
 
-#define RDS_IB_SUPPORTED_PROTOCOLS     0x00000003      /* minor versions supported */
+#define RDS_IB_SUPPORTED_PROTOCOLS     0x00000007      /* minor versions supported */
 
 #define RDS_IB_RECYCLE_BATCH_COUNT     32
+
+#define RDS_IB_SRQ_POST_BATCH_COUNT     64
+
 #define RDS_WC_MAX 32
 
 extern struct rw_semaphore rds_ib_devices_lock;
@@ -65,6 +71,7 @@ struct rds_ib_connect_private {
        __be32                  dp_reserved1;
        __be64                  dp_ack_seq;
        __be32                  dp_credit;              /* non-zero enables flow ctl */
+        u8                      dp_tos;
 };
 
 struct rds_ib_send_work {
@@ -79,6 +86,8 @@ struct rds_ib_recv_work {
        struct rds_page_frag    *r_frag;
        struct ib_recv_wr       r_wr;
        struct ib_sge           r_sge[2];
+        struct rds_ib_connection *r_ic;
+        int                     r_posted;
 };
 
 struct rds_ib_work_ring {
@@ -174,6 +183,11 @@ struct rds_ib_connection {
 
        /* Batched completions */
        unsigned int            i_unsignaled_wrs;
+        u8                      i_sl;
+
+        atomic_t                i_cache_allocs;
+
+        struct completion       i_last_wqe_complete;
 };
 
 /* This assumes that atomic_t is at least 32 bits */
@@ -188,6 +202,20 @@ struct rds_ib_ipaddr {
        struct rcu_head         rcu_head;
 };
 
+struct rds_ib_srq {
+        struct rds_ib_device       *rds_ibdev;
+        struct ib_srq              *s_srq;
+        struct ib_event_handler    s_event_handler;
+        struct rds_ib_recv_work    *s_recvs;
+        u32                        s_n_wr;
+        struct rds_header          *s_recv_hdrs;
+        u64                        s_recv_hdrs_dma;
+        atomic_t                   s_num_posted;
+        unsigned long              s_refill_gate;
+        struct delayed_work        s_refill_w;
+        struct delayed_work        s_rearm_w;
+};
+
 struct rds_ib_device {
        struct list_head        list;
        struct list_head        ipaddr_list;
@@ -205,6 +233,7 @@ struct rds_ib_device {
        spinlock_t              spinlock;       /* protect the above */
        atomic_t                refcount;
        struct work_struct      free_work;
+        struct rds_ib_srq       *srq;
 };
 
 #define pcidev_to_node(pcidev) pcibus_to_node(pcidev->bus)
@@ -235,6 +264,8 @@ struct rds_ib_statistics {
        uint64_t        s_ib_rx_refill_from_cq;
        uint64_t        s_ib_rx_refill_from_thread;
        uint64_t        s_ib_rx_alloc_limit;
+        uint64_t        s_ib_rx_total_frags;
+        uint64_t        s_ib_rx_total_incs;
        uint64_t        s_ib_rx_credit_updates;
        uint64_t        s_ib_ack_sent;
        uint64_t        s_ib_ack_send_failure;
@@ -249,6 +280,9 @@ struct rds_ib_statistics {
        uint64_t        s_ib_rdma_mr_pool_depleted;
        uint64_t        s_ib_atomic_cswp;
        uint64_t        s_ib_atomic_fadd;
+        uint64_t        s_ib_srq_lows;
+        uint64_t        s_ib_srq_refills;
+        uint64_t        s_ib_srq_empty_refills;
 };
 
 extern struct workqueue_struct *rds_ib_wq;
@@ -339,6 +373,8 @@ void rds_ib_fmr_exit(void);
 /* ib_recv.c */
 int rds_ib_recv_init(void);
 void rds_ib_recv_exit(void);
+int rds_ib_srqs_init(void);
+void rds_ib_srqs_exit(void);
 int rds_ib_recv(struct rds_connection *conn);
 int rds_ib_recv_alloc_caches(struct rds_ib_connection *ic);
 void rds_ib_recv_free_caches(struct rds_ib_connection *ic);
@@ -356,6 +392,8 @@ void rds_ib_recv_init_ack(struct rds_ib_connection *ic);
 void rds_ib_attempt_ack(struct rds_ib_connection *ic);
 void rds_ib_ack_send_complete(struct rds_ib_connection *ic);
 u64 rds_ib_piggyb_ack(struct rds_ib_connection *ic);
+void rds_ib_srq_refill(struct work_struct *work);
+void rds_ib_srq_rearm(struct work_struct *work);
 void rds_ib_set_ack(struct rds_ib_connection *ic, u64 seq, int ack_required);
 
 
@@ -392,6 +430,11 @@ DECLARE_PER_CPU(struct rds_ib_statistics, rds_ib_stats);
 unsigned int rds_ib_stats_info_copy(struct rds_info_iterator *iter,
                                    unsigned int avail);
 
+/* ib_recv.c */
+extern unsigned int rds_ib_srq_max_wr;
+extern unsigned int rds_ib_srq_refill_wr;
+extern unsigned int rds_ib_srq_low_wr;
+
 /* ib_sysctl.c */
 int rds_ib_sysctl_init(void);
 void rds_ib_sysctl_exit(void);
index 12ccb1447e565268e57bb4c206e7fcdd20a77060..a936a5940891c9914a4c3057a357059ea4afddb1 100644 (file)
@@ -141,31 +141,59 @@ void rds_ib_cm_connect_complete(struct rds_connection *conn, struct rdma_cm_even
                }
        }
 
-       if (conn->c_version < RDS_PROTOCOL(3,1)) {
-               printk(KERN_NOTICE "RDS/IB: Connection to %pI4 version %u.%u failed,"
-                      " no longer supported\n",
-                      &conn->c_faddr,
-                      RDS_PROTOCOL_MAJOR(conn->c_version),
-                      RDS_PROTOCOL_MINOR(conn->c_version));
-               rds_conn_destroy(conn);
-               return;
-       } else {
-               printk(KERN_NOTICE "RDS/IB: connected to %pI4 version %u.%u%s\n",
-                      &conn->c_faddr,
-                      RDS_PROTOCOL_MAJOR(conn->c_version),
-                      RDS_PROTOCOL_MINOR(conn->c_version),
-                      ic->i_flowctl ? ", flow control" : "");
-       }
+        if (conn->c_version < RDS_PROTOCOL(3,2)) {
+                if (conn->c_version == RDS_PROTOCOL(3,1)) {
+                        if (conn->c_tos) {
+                                printk(KERN_NOTICE "RDS: Connection to"
+                                        " %u.%u.%u.%u version %u.%u Tos %d"
+                                        " failed, not supporting QoS\n",
+                                        NIPQUAD(conn->c_faddr),
+                                        RDS_PROTOCOL_MAJOR(conn->c_version),
+                                        RDS_PROTOCOL_MINOR(conn->c_version),
+                                        conn->c_tos);
+                                rds_conn_drop(conn);
+                                return;
+                        }
+                } else {
+                        /*
+                         * BUG: destroying connection here can deadlock with
+                         * the CM event handler on the c_cm_lock.
+                         */
+                        printk(KERN_NOTICE "RDS/IB: Connection to"
+                                " %u.%u.%u.%u version %u.%u failed,"
+                                " no longer supported\n",
+                                NIPQUAD(conn->c_faddr),
+                                RDS_PROTOCOL_MAJOR(conn->c_version),
+                                RDS_PROTOCOL_MINOR(conn->c_version));
+                        rds_conn_destroy(conn);
+                        return;
+                }
+        }
+
+        printk(KERN_NOTICE
+               "RDS/IB: connected to %u.%u.%u.%u version %u.%u%s Tos %d\n",
+               NIPQUAD(conn->c_faddr),
+               RDS_PROTOCOL_MAJOR(conn->c_version),
+               RDS_PROTOCOL_MINOR(conn->c_version),
+               ic->i_flowctl ? ", flow control" : "",
+               conn->c_tos);
+
+        ic->i_sl = ic->i_cm_id->route.path_rec->sl;
 
        /*
         * Init rings and fill recv. this needs to wait until protocol negotiation
         * is complete, since ring layout is different from 3.0 to 3.1.
         */
        rds_ib_send_init_ring(ic);
-       rds_ib_recv_init_ring(ic);
+
+        if (!ic->conn->c_tos)
+                rds_ib_recv_init_ring(ic);
+
        /* Post receive buffers - as a side effect, this will update
         * the posted credit count. */
-       rds_ib_recv_refill(conn, 1, GFP_KERNEL);
+        if (!ic->conn->c_tos) {
+                rds_ib_recv_refill(conn, 1, GFP_KERNEL);
+        }
 
        /* Tune RNR behavior */
        rds_ib_tune_rnr(ic, &qp_attr);
@@ -216,6 +244,7 @@ static void rds_ib_cm_fill_conn_param(struct rds_connection *conn,
                dp->dp_protocol_minor = RDS_PROTOCOL_MINOR(protocol_version);
                dp->dp_protocol_minor_mask = cpu_to_be16(RDS_IB_SUPPORTED_PROTOCOLS);
                dp->dp_ack_seq = rds_ib_piggyb_ack(ic);
+               dp->dp_tos = conn->c_tos;
 
                /* Advertise flow control */
                if (ic->i_flowctl) {
@@ -308,6 +337,9 @@ void rds_ib_tasklet_fn_recv(unsigned long data)
        struct rds_ib_connection *ic = (struct rds_ib_connection *) data;
        struct rds_connection *conn = ic->conn;
        struct rds_ib_ack_state ack_state;
+       struct rds_ib_device *rds_ibdev = ic->rds_ibdev;
+
+       BUG_ON(conn->c_tos && !rds_ibdev);
 
        rds_ib_stats_inc(s_ib_tasklet_call);
 
@@ -325,6 +357,14 @@ void rds_ib_tasklet_fn_recv(unsigned long data)
        }
        if (rds_conn_up(conn))
                rds_ib_attempt_ack(ic);
+
+        if (conn->c_tos) {
+                if ((atomic_read(&rds_ibdev->srq->s_num_posted) <
+                                        rds_ib_srq_refill_wr) &&
+                     !test_and_set_bit(0, &rds_ibdev->srq->s_refill_gate))
+                                queue_delayed_work(rds_wq, &rds_ibdev->srq->s_refill_w,0);
+
+        }
 }
 
 static void rds_ib_qp_event_handler(struct ib_event *event, void *data)
@@ -339,6 +379,9 @@ static void rds_ib_qp_event_handler(struct ib_event *event, void *data)
        case IB_EVENT_COMM_EST:
                rdma_notify(ic->i_cm_id, IB_EVENT_COMM_EST);
                break;
+        case IB_EVENT_QP_LAST_WQE_REACHED:
+                complete(&ic->i_last_wqe_complete);
+                break;
        default:
                rdsdebug("Fatal QP Event %u (%s) "
                        "- connection %pI4->%pI4, reconnecting\n",
@@ -392,10 +435,16 @@ static int rds_ib_setup_qp(struct rds_connection *conn)
                goto out;
        }
 
-       ic->i_rcq = ib_create_cq(dev, rds_ib_cq_comp_handler_recv,
-                               rds_ib_cq_event_handler, conn,
-                               ic->i_recv_ring.w_nr,
-                               IB_CQ_VECTOR_LEAST_ATTACHED);
+        if (ic->conn->c_tos)
+                ic->i_rcq = ib_create_cq(dev, rds_ib_cq_comp_handler_recv,
+                                        rds_ib_cq_event_handler, conn,
+                                        rds_ib_srq_max_wr - 1,
+                                        IB_CQ_VECTOR_LEAST_ATTACHED);
+        else
+                ic->i_rcq = ib_create_cq(dev, rds_ib_cq_comp_handler_recv,
+                                     rds_ib_cq_event_handler, conn,
+                                     ic->i_recv_ring.w_nr,
+                                     IB_CQ_VECTOR_LEAST_ATTACHED);
        if (IS_ERR(ic->i_rcq)) {
                ret = PTR_ERR(ic->i_rcq);
                ic->i_rcq = NULL;
@@ -429,6 +478,11 @@ static int rds_ib_setup_qp(struct rds_connection *conn)
        attr.send_cq = ic->i_scq;
        attr.recv_cq = ic->i_rcq;
 
+        if (ic->conn->c_tos) {
+                attr.cap.max_recv_wr = 0;
+                attr.srq = rds_ibdev->srq->s_srq;
+        }
+
        /*
         * XXX this can fail if max_*_wr is too large?  Are we supposed
         * to back off until we get a value that the hardware can support?
@@ -449,15 +503,17 @@ static int rds_ib_setup_qp(struct rds_connection *conn)
                goto out;
        }
 
-       ic->i_recv_hdrs = ib_dma_alloc_coherent(dev,
-                                          ic->i_recv_ring.w_nr *
-                                               sizeof(struct rds_header),
-                                          &ic->i_recv_hdrs_dma, GFP_KERNEL);
-       if (!ic->i_recv_hdrs) {
-               ret = -ENOMEM;
-               rdsdebug("ib_dma_alloc_coherent recv failed\n");
-               goto out;
-       }
+        if (!ic->conn->c_tos) {
+                ic->i_recv_hdrs = ib_dma_alloc_coherent(dev,
+                                           ic->i_recv_ring.w_nr *
+                                                sizeof(struct rds_header),
+                                           &ic->i_recv_hdrs_dma, GFP_KERNEL);
+                if (!ic->i_recv_hdrs) {
+                        ret = -ENOMEM;
+                        rdsdebug("ib_dma_alloc_coherent recv failed\n");
+                        goto out;
+                }
+        }
 
        ic->i_ack = ib_dma_alloc_coherent(dev, sizeof(struct rds_header),
                                       &ic->i_ack_dma, GFP_KERNEL);
@@ -476,14 +532,16 @@ static int rds_ib_setup_qp(struct rds_connection *conn)
        }
        memset(ic->i_sends, 0, ic->i_send_ring.w_nr * sizeof(struct rds_ib_send_work));
 
-       ic->i_recvs = vmalloc_node(ic->i_recv_ring.w_nr * sizeof(struct rds_ib_recv_work),
-                                  ibdev_to_node(dev));
-       if (!ic->i_recvs) {
-               ret = -ENOMEM;
-               rdsdebug("recv allocation failed\n");
-               goto out;
-       }
-       memset(ic->i_recvs, 0, ic->i_recv_ring.w_nr * sizeof(struct rds_ib_recv_work));
+        if (!ic->conn->c_tos) {
+                ic->i_recvs = vmalloc(ic->i_recv_ring.w_nr *
+                        sizeof(struct rds_ib_recv_work));
+                if (!ic->i_recvs) {
+                        ret = -ENOMEM;
+                        rdsdebug("recv allocation failed\n");
+                        goto out;
+                }
+                memset(ic->i_recvs, 0, ic->i_recv_ring.w_nr * sizeof(struct rds_ib_recv_work));
+        }
 
        rds_ib_recv_init_ack(ic);
 
@@ -563,7 +621,7 @@ int rds_ib_cm_handle_connect(struct rdma_cm_id *cm_id,
                 (unsigned long long)be64_to_cpu(fguid));
 
        conn = rds_conn_create(dp->dp_daddr, dp->dp_saddr, &rds_ib_transport,
-                              GFP_KERNEL);
+                              dp->dp_tos, GFP_KERNEL);
        if (IS_ERR(conn)) {
                rdsdebug("rds_conn_create failed (%ld)\n", PTR_ERR(conn));
                conn = NULL;
@@ -765,6 +823,13 @@ void rds_ib_conn_shutdown(struct rds_connection *conn)
                         */
                        rdsdebug("failed to disconnect, cm: %p err %d\n",
                                ic->i_cm_id, err);
+                } else if (ic->conn->c_tos && ic->rds_ibdev) {
+                        /*
+                         * wait for the last wqe to complete, then schedule
+                         * the recv tasklet to drain the RX CQ.
+                         */
+                        wait_for_completion(&ic->i_last_wqe_complete);
+                        tasklet_schedule(&ic->i_rtasklet);
                }
 
                /* quiesce tx and rx completion before tearing down */
@@ -857,8 +922,12 @@ void rds_ib_conn_shutdown(struct rds_connection *conn)
 
        vfree(ic->i_sends);
        ic->i_sends = NULL;
-       vfree(ic->i_recvs);
+        if (!ic->conn->c_tos)
+                vfree(ic->i_recvs);
+
        ic->i_recvs = NULL;
+       
+       INIT_COMPLETION(ic->i_last_wqe_complete);
 }
 
 int rds_ib_conn_alloc(struct rds_connection *conn, gfp_t gfp)
@@ -896,6 +965,8 @@ int rds_ib_conn_alloc(struct rds_connection *conn, gfp_t gfp)
 
        ic->conn = conn;
        conn->c_transport_data = ic;
+       
+       init_completion(&ic->i_last_wqe_complete);
 
        spin_lock_irqsave(&ib_nodev_conns_lock, flags);
        list_add_tail(&ic->ib_node, &ib_nodev_conns);
index fc6aa07609ab0e75e169854078ffc265723d28fc..eb6c64623207c82413d541bfab7982d9ca5e0fa4 100644 (file)
 #include "rds.h"
 #include "ib.h"
 
+unsigned int rds_ib_srq_max_wr = RDS_IB_DEFAULT_SRQ_MAX_WR;
+unsigned int rds_ib_srq_refill_wr = RDS_IB_DEFAULT_SRQ_REFILL_WR;
+unsigned int rds_ib_srq_low_wr = RDS_IB_DEFAULT_SRQ_LOW_WR;
+
+module_param(rds_ib_srq_max_wr, int, 0444);
+MODULE_PARM_DESC(rds_ib_srq_max_wr, "Max number of SRQ WRs");
+module_param(rds_ib_srq_refill_wr, int, 0444);
+MODULE_PARM_DESC(rds_ib_srq_refill_wr, "SRQ refill watermark");
+module_param(rds_ib_srq_low_wr, int, 0444);
+MODULE_PARM_DESC(rds_ib_srq_low_wr, "SRQ low watermark");
+
 static struct kmem_cache *rds_ib_incoming_slab;
 static struct kmem_cache *rds_ib_frag_slab;
 static atomic_t        rds_ib_allocation = ATOMIC_INIT(0);
@@ -193,6 +204,7 @@ static void rds_ib_frag_free(struct rds_ib_connection *ic,
        rdsdebug("frag %p page %p\n", frag, sg_page(&frag->f_sg));
 
        rds_ib_recv_cache_put(&frag->f_cache_entry, &ic->i_cache_frags);
+       atomic_inc(&ic->i_cache_allocs);
 }
 
 /* Recycle inc after freeing attached frags */
@@ -252,6 +264,7 @@ static struct rds_ib_incoming *rds_ib_refill_one_inc(struct rds_ib_connection *i
                if (!ibinc) {
                        return NULL;
                }
+               rds_ib_stats_inc(s_ib_rx_total_incs);
        }
        INIT_LIST_HEAD(&ibinc->ii_frags);
        rds_inc_init(&ibinc->ii_inc, ic->conn, ic->conn->c_faddr);
@@ -270,6 +283,7 @@ static struct rds_page_frag *rds_ib_refill_one_frag(struct rds_ib_connection *ic
        cache_item = rds_ib_recv_cache_get(&ic->i_cache_frags);
        if (cache_item) {
                frag = container_of(cache_item, struct rds_page_frag, f_cache_entry);
+               atomic_dec(&ic->i_cache_allocs);
        } else {
                frag = kmem_cache_alloc(rds_ib_frag_slab, slab_mask);
                if (!frag)
@@ -296,6 +310,7 @@ static struct rds_page_frag *rds_ib_refill_one_frag(struct rds_ib_connection *ic
                        atomic_dec(&rds_ib_allocation);
                        return NULL;
                }
+               rds_ib_stats_inc(s_ib_rx_total_frags);
        }
 
        INIT_LIST_HEAD(&frag->f_item);
@@ -354,6 +369,139 @@ out:
        return ret;
 }
 
+static void rds_ib_srq_clear_one(struct rds_ib_srq *srq,
+                                struct rds_ib_connection *ic,
+                                struct rds_ib_recv_work *recv)
+{
+        if (recv->r_ibinc) {
+                rds_inc_put(&recv->r_ibinc->ii_inc);
+                recv->r_ibinc = NULL;
+        }
+        if (recv->r_frag) {
+                ib_dma_unmap_sg(srq->rds_ibdev->dev, &recv->r_frag->f_sg,
+                                1, DMA_FROM_DEVICE);
+                rds_ib_frag_free(ic, recv->r_frag);
+                recv->r_frag = NULL;
+                recv->r_ic = ic;
+                recv->r_posted = 0;
+        }
+}
+
+static int rds_ib_srq_refill_one(struct rds_ib_srq *srq,
+                                 struct rds_ib_connection *ic,
+                                 struct rds_ib_recv_work *recv, gfp_t gfp)
+{
+        struct ib_sge *sge;
+        int ret = -ENOMEM;
+        gfp_t slab_mask = GFP_NOWAIT;
+        gfp_t page_mask = GFP_NOWAIT;
+
+        if (gfp & __GFP_WAIT) {
+                slab_mask = GFP_KERNEL;
+                page_mask = GFP_HIGHUSER;
+        }
+
+        if (!ic->i_cache_incs.ready)
+                rds_ib_cache_xfer_to_ready(&ic->i_cache_incs);
+        if (!ic->i_cache_frags.ready)
+                rds_ib_cache_xfer_to_ready(&ic->i_cache_frags);
+
+        /*
+         * ibinc was taken from recv if recv contained the start of a message.
+         * recvs that were continuations will still have this allocated.
+         */
+        if (!recv->r_ibinc) {
+                recv->r_ibinc = rds_ib_refill_one_inc(ic, slab_mask);
+                if (!recv->r_ibinc)
+                        goto out;
+        }
+
+        WARN_ON_ONCE(recv->r_frag); /* leak! */
+        recv->r_frag = rds_ib_refill_one_frag(ic, slab_mask, page_mask);
+        if (!recv->r_frag)
+                goto out;
+
+        ret = ib_dma_map_sg(srq->rds_ibdev->dev, &recv->r_frag->f_sg,
+                            1, DMA_FROM_DEVICE);
+
+        WARN_ON(ret != 1);
+
+        sge = &recv->r_sge[0];
+
+        sge->addr = srq->s_recv_hdrs_dma +
+                (recv - srq->s_recvs) *
+                sizeof(struct rds_header);
+
+        sge->length = sizeof(struct rds_header);
+
+        sge = &recv->r_sge[1];
+        sge->addr = sg_dma_address(&recv->r_frag->f_sg);
+        sge->length = sg_dma_len(&recv->r_frag->f_sg);
+
+        ret = 0;
+out:
+        return ret;
+}
+
+static int rds_ib_srq_prefill_one(struct rds_ib_device *rds_ibdev,
+                                 struct rds_ib_recv_work *recv, int prefill)
+{
+        struct ib_sge *sge;
+        int ret = -ENOMEM;
+        gfp_t slab_mask = GFP_NOWAIT;
+        gfp_t page_mask = GFP_NOWAIT;
+
+        if (prefill) {
+                slab_mask = GFP_KERNEL;
+                page_mask = GFP_HIGHUSER;
+        }
+
+        if (!recv->r_ibinc) {
+              recv->r_ibinc = kmem_cache_alloc(rds_ib_incoming_slab, slab_mask);
+              if (!recv->r_ibinc)
+                      goto out;
+              rds_ib_stats_inc(s_ib_rx_total_incs);
+              INIT_LIST_HEAD(&recv->r_ibinc->ii_frags);
+        }
+
+        WARN_ON_ONCE(recv->r_frag); /* leak! */
+        recv->r_frag = kmem_cache_alloc(rds_ib_frag_slab, slab_mask);
+        if (!recv->r_frag)
+                goto out;
+        sg_init_table(&recv->r_frag->f_sg, 1);
+        ret = rds_page_remainder_alloc(&recv->r_frag->f_sg,
+                        RDS_FRAG_SIZE, page_mask);
+        if (ret) {
+                kmem_cache_free(rds_ib_frag_slab, recv->r_frag);
+                goto out;
+        }
+        rds_ib_stats_inc(s_ib_rx_total_frags);
+        INIT_LIST_HEAD(&recv->r_frag->f_item);
+
+        ret = ib_dma_map_sg(rds_ibdev->dev, &recv->r_frag->f_sg,
+                                1, DMA_FROM_DEVICE);
+        WARN_ON(ret != 1);
+
+        sge = &recv->r_sge[0];
+        sge->addr = rds_ibdev->srq->s_recv_hdrs_dma +
+                                (recv - rds_ibdev->srq->s_recvs) *
+                                sizeof(struct rds_header);
+        sge->length = sizeof(struct rds_header);
+        sge->lkey = rds_ibdev->mr->lkey;
+
+        sge = &recv->r_sge[1];
+        sge->addr = sg_dma_address(&recv->r_frag->f_sg);
+        sge->length = sg_dma_len(&recv->r_frag->f_sg);
+        sge->lkey = rds_ibdev->mr->lkey;
+
+        ret = 0;
+
+out:
+        return ret;
+}
+
+
+
 static int acquire_refill(struct rds_connection *conn)
 {
        return test_and_set_bit(RDS_RECV_REFILL, &conn->c_flags) == 0;
@@ -1007,25 +1155,126 @@ static void rds_ib_process_recv(struct rds_connection *conn,
        }
 }
 
+void rds_ib_srq_process_recv(struct rds_connection *conn,
+                                    struct rds_ib_recv_work *recv, u32 data_len,
+                                    struct rds_ib_ack_state *state)
+{
+        struct rds_ib_connection *ic = conn->c_transport_data;
+        struct rds_ib_incoming *ibinc = ic->i_ibinc;
+        struct rds_header *ihdr, *hdr;
+
+        if (data_len < sizeof(struct rds_header)) {
+                printk(KERN_WARNING "RDS: from %pI4 didn't inclue a "
+                        "header, disconnecting and "
+                        "reconnecting\n",
+                        &conn->c_faddr);
+                rds_ib_frag_free(ic, recv->r_frag);
+                recv->r_frag = NULL;
+                return;
+        }
+        data_len -= sizeof(struct rds_header);
+
+        ihdr = &ic->rds_ibdev->srq->s_recv_hdrs[recv->r_wr.wr_id];
+
+        /* Validate the checksum. */
+        if (!rds_message_verify_checksum(ihdr)) {
+                printk(KERN_WARNING "RDS: from %pI4 has corrupted header - "
+                        "forcing a reconnect\n",
+                        &conn->c_faddr);
+                rds_stats_inc(s_recv_drop_bad_checksum);
+                rds_ib_frag_free(ic, recv->r_frag);
+                recv->r_frag = NULL;
+                return;
+        }
+
+        /* Process the ACK sequence which comes with every packet */
+        state->ack_recv = be64_to_cpu(ihdr->h_ack);
+        state->ack_recv_valid = 1;
+
+        if (ihdr->h_sport == 0 && ihdr->h_dport == 0 && data_len == 0) {
+                rds_ib_stats_inc(s_ib_ack_received);
+                rds_ib_frag_free(ic, recv->r_frag);
+                recv->r_frag = NULL;
+                return;
+        }
+
+        if (!ibinc) {
+                ibinc = recv->r_ibinc;
+                rds_inc_init(&ibinc->ii_inc, ic->conn, ic->conn->c_faddr);
+                recv->r_ibinc = NULL;
+                ic->i_ibinc = ibinc;
+                hdr = &ibinc->ii_inc.i_hdr;
+                memcpy(hdr, ihdr, sizeof(*hdr));
+                ic->i_recv_data_rem = be32_to_cpu(hdr->h_len);
+        } else {
+                hdr = &ibinc->ii_inc.i_hdr;
+                if (hdr->h_sequence != ihdr->h_sequence
+                   || hdr->h_len != ihdr->h_len
+                   || hdr->h_sport != ihdr->h_sport
+                   || hdr->h_dport != ihdr->h_dport) {
+                        printk(KERN_WARNING "RDS: fragment header mismatch; "
+                                           "forcing reconnect\n");
+                        rds_ib_frag_free(ic, recv->r_frag);
+                        recv->r_frag = NULL;
+                        return;
+                }
+        }
+
+        list_add_tail(&recv->r_frag->f_item, &ibinc->ii_frags);
+
+        recv->r_frag = NULL;
+
+        if (ic->i_recv_data_rem > RDS_FRAG_SIZE)
+                ic->i_recv_data_rem -= RDS_FRAG_SIZE;
+        else {
+                ic->i_recv_data_rem = 0;
+                ic->i_ibinc = NULL;
+
+                if (ibinc->ii_inc.i_hdr.h_flags == RDS_FLAG_CONG_BITMAP)
+                        rds_ib_cong_recv(conn, ibinc);
+                else {
+                        rds_recv_incoming(conn, conn->c_faddr, conn->c_laddr,
+                                          &ibinc->ii_inc, GFP_ATOMIC,
+                                          KM_SOFTIRQ0);
+
+                        state->ack_next = be64_to_cpu(hdr->h_sequence);
+                        state->ack_next_valid = 1;
+                }
+                if (hdr->h_flags & RDS_FLAG_ACK_REQUIRED) {
+                        rds_stats_inc(s_recv_ack_required);
+                        state->ack_required = 1;
+                }
+                rds_inc_put(&ibinc->ii_inc);
+        }
+}
+
 void rds_ib_recv_cqe_handler(struct rds_ib_connection *ic,
                             struct ib_wc *wc,
                             struct rds_ib_ack_state *state)
 {
        struct rds_connection *conn = ic->conn;
        struct rds_ib_recv_work *recv;
+       struct rds_ib_device *rds_ibdev = ic->rds_ibdev;
 
        rdsdebug("wc wr_id 0x%llx status %u byte_len %u imm_data %u\n",
                 (unsigned long long)wc->wr_id, wc->status, wc->byte_len,
                 be32_to_cpu(wc->ex.imm_data));
 
        rds_ib_stats_inc(s_ib_rx_cq_event);
-
-       recv = &ic->i_recvs[rds_ib_ring_oldest(&ic->i_recv_ring)];
+       
+       if (conn->c_tos) {
+               recv = &rds_ibdev->srq->s_recvs[wc->wr_id];
+               atomic_dec(&rds_ibdev->srq->s_num_posted); 
+       } else
+               recv = &ic->i_recvs[rds_ib_ring_oldest(&ic->i_recv_ring)];
 
        ib_dma_unmap_sg(ic->i_cm_id->device, &recv->r_frag->f_sg, 1, DMA_FROM_DEVICE);
 
        if (wc->status == IB_WC_SUCCESS) {
-               rds_ib_process_recv(conn, recv, wc->byte_len, state);
+               if (ic->conn->c_tos)
+                       rds_ib_srq_process_recv(conn, recv, wc->byte_len, state);
+               else
+                       rds_ib_process_recv(conn, recv, wc->byte_len, state);
        } else {
                /* We expect errors as the qp is drained during shutdown */
                if (rds_conn_up(conn) || rds_conn_connecting(conn))
@@ -1047,11 +1296,132 @@ void rds_ib_recv_cqe_handler(struct rds_ib_connection *ic,
                rds_ib_frag_free(ic, recv->r_frag);
                recv->r_frag = NULL;
        }
-       rds_ib_ring_free(&ic->i_recv_ring, 1);
 
-       rds_ib_recv_refill(conn, 0, GFP_NOWAIT);
+       if (!ic->conn->c_tos) {
+               rds_ib_ring_free(&ic->i_recv_ring, 1);
+               rds_ib_recv_refill(conn, 0, GFP_NOWAIT);
+       } else {
+               recv->r_ic = ic;
+               recv->r_posted = 0;
+       }
+}
+
+void rds_ib_srq_refill(struct work_struct *work)
+{
+        struct rds_ib_srq *srq = container_of(work, struct rds_ib_srq, s_refill_w.work);
+        struct rds_ib_recv_work *prv=NULL, *cur=NULL, *tmp;
+        struct ib_recv_wr *bad_wr;
+        int i,refills=0,total_refills=0;
+
+        if (!test_bit(0,&srq->s_refill_gate))
+                return;
+
+        rds_ib_stats_inc(s_ib_srq_refills);
+
+        for (i=0; i<srq->s_n_wr; i++) {
+                tmp = &srq->s_recvs[i];
+                if (tmp->r_posted)
+                        continue;
+
+                if (rds_ib_srq_refill_one(srq, tmp->r_ic, tmp, GFP_NOWAIT)) {
+                        printk(KERN_ERR "rds_ib_srq_refill_one failed\n");
+                        break;
+                }
+                cur = tmp;
+
+                if (!prv) {
+                        prv = cur;
+                        prv->r_wr.next = NULL;
+                } else {
+                        cur->r_wr.next = &prv->r_wr;
+                        prv = cur;
+                }
+                cur->r_posted = 1;
+
+                total_refills++;
+                if (++refills == RDS_IB_SRQ_POST_BATCH_COUNT) {
+                        if (ib_post_srq_recv(srq->s_srq, &cur->r_wr, &bad_wr)) {
+                                struct ib_recv_wr *wr;
+                                struct rds_ib_recv_work *recv;
+
+                                for (wr = &cur->r_wr; wr; wr = wr->next) {
+                                        recv = container_of(wr, struct rds_ib_recv_work, r_wr);
+                                        rds_ib_srq_clear_one(srq, recv->r_ic, recv);
+                                }
+                                printk(KERN_ERR "ib_post_srq_recv failed\n");
+                                goto out;
+                        }
+
+                        atomic_add(refills,&srq->s_num_posted);
+                        prv = NULL;
+                        refills = 0;
+                        cur = NULL;
+                }
+        }
+        if (cur) {
+                if (ib_post_srq_recv(srq->s_srq, &cur->r_wr, &bad_wr)) {
+                        struct ib_recv_wr *wr;
+                        struct rds_ib_recv_work *recv;
+
+                        for (wr = &cur->r_wr; wr; wr = wr->next) {
+                                recv = container_of(wr, struct rds_ib_recv_work, r_wr);
+                                rds_ib_srq_clear_one(srq, recv->r_ic, recv);
+                        }
+                        printk(KERN_ERR "ib_post_srq_recv failed\n");
+                        goto out;
+                }
+                atomic_add(refills,&srq->s_num_posted);
+        }
+
+        if (!total_refills)
+                rds_ib_stats_inc(s_ib_srq_empty_refills);
+out:
+        clear_bit(0,&srq->s_refill_gate);
+}
+
+int rds_ib_srq_prefill_ring(struct rds_ib_device *rds_ibdev)
+{
+        struct rds_ib_recv_work *recv;
+        struct ib_recv_wr *bad_wr;
+        u32 i;
+        int ret;
+
+        for (i = 0, recv = rds_ibdev->srq->s_recvs;
+                i < rds_ibdev->srq->s_n_wr; i++, recv++) {
+                recv->r_wr.next = NULL;
+                recv->r_wr.wr_id = i;
+                recv->r_wr.sg_list = recv->r_sge;
+                recv->r_wr.num_sge = RDS_IB_RECV_SGE;
+                recv->r_ibinc = NULL;
+                recv->r_frag = NULL;
+                recv->r_ic = NULL;
+
+                if (rds_ib_srq_prefill_one(rds_ibdev, recv, 1))
+                        return 1;
+
+                ret = ib_post_srq_recv(rds_ibdev->srq->s_srq,
+                                &recv->r_wr, &bad_wr);
+                if (ret) {
+                        printk(KERN_WARNING "RDS: ib_post_srq_recv failed %d\n", ret);
+                        return 1;
+                }
+                atomic_inc(&rds_ibdev->srq->s_num_posted);
+                recv->r_posted = 1;
+        }
+        return 0;
+}
+
+static void rds_ib_srq_clear_ring(struct rds_ib_device *rds_ibdev)
+{
+        u32 i;
+        struct rds_ib_recv_work *recv;
+
+        for (i = 0, recv = rds_ibdev->srq->s_recvs;
+                i < rds_ibdev->srq->s_n_wr; i++, recv++)
+                        rds_ib_srq_clear_one(rds_ibdev->srq, recv->r_ic, recv);
 }
 
+
 int rds_ib_recv(struct rds_connection *conn)
 {
        struct rds_ib_connection *ic = conn->c_transport_data;
@@ -1096,3 +1466,141 @@ void rds_ib_recv_exit(void)
        kmem_cache_destroy(rds_ib_incoming_slab);
        kmem_cache_destroy(rds_ib_frag_slab);
 }
+
+void rds_ib_srq_rearm(struct work_struct *work)
+{
+        struct rds_ib_srq *srq = container_of(work, struct rds_ib_srq, s_rearm_w.work);
+        struct ib_srq_attr srq_attr;
+
+        srq_attr.srq_limit = rds_ib_srq_low_wr;
+        if (ib_modify_srq(srq->s_srq, &srq_attr, IB_SRQ_LIMIT)) {
+                printk(KERN_ERR "RDS: ib_modify_srq failed\n");
+                return;
+        }
+}
+
+static void rds_ib_srq_event(struct ib_event *event,
+                             void *ctx)
+{
+        struct ib_srq_attr srq_attr;
+        struct rds_ib_device *rds_ibdev = ctx;
+
+        switch (event->event) {
+        case IB_EVENT_SRQ_ERR:
+                printk(KERN_ERR "RDS: event IB_EVENT_SRQ_ERR unhandled\n",
+                                event->event);
+                break;
+        case IB_EVENT_SRQ_LIMIT_REACHED:
+                rds_ib_stats_inc(s_ib_srq_lows);
+                queue_delayed_work(rds_wq, &rds_ibdev->srq->s_rearm_w,HZ);
+
+                if (!test_and_set_bit(0, &rds_ibdev->srq->s_refill_gate))
+                        queue_delayed_work(rds_wq, &rds_ibdev->srq->s_refill_w, 0);
+                break;
+        default:
+                break;
+        }
+}
+
+/* Setup SRQ for a device */
+int rds_ib_srq_init(struct rds_ib_device *rds_ibdev)
+{
+        struct ib_srq_init_attr srq_init_attr = {
+                rds_ib_srq_event,
+                (void *)rds_ibdev,
+                .attr = {
+                        .max_wr = rds_ib_srq_max_wr - 1,
+                        .max_sge = rds_ibdev->max_sge
+                }
+        };
+
+        rds_ibdev->srq->rds_ibdev = rds_ibdev;
+
+        rds_ibdev->srq->s_n_wr =  rds_ib_srq_max_wr - 1;
+        rds_ibdev->srq->s_srq = ib_create_srq(rds_ibdev->pd,
+                                              &srq_init_attr);
+
+        if (IS_ERR(rds_ibdev->srq->s_srq)) {
+                printk(KERN_WARNING "RDS: ib_create_srq failed %d\n",
+                ERR_PTR(rds_ibdev->srq->s_srq));
+                return 1;
+        }
+
+        rds_ibdev->srq->s_recv_hdrs = ib_dma_alloc_coherent(rds_ibdev->dev,
+                                rds_ibdev->srq->s_n_wr *
+                                sizeof(struct rds_header),
+                                &rds_ibdev->srq->s_recv_hdrs_dma, GFP_KERNEL);
+        if (!rds_ibdev->srq->s_recv_hdrs) {
+                printk(KERN_WARNING "ib_dma_alloc_coherent failed\n");
+                return 1;
+        }
+
+        rds_ibdev->srq->s_recvs = vmalloc(rds_ibdev->srq->s_n_wr *
+                                        sizeof(struct rds_ib_recv_work));
+
+        if (!rds_ibdev->srq->s_recvs) {
+                printk(KERN_WARNING "RDS: vmalloc failed\n");
+                return 1;
+        }
+
+        memset(rds_ibdev->srq->s_recvs, 0, rds_ibdev->srq->s_n_wr *
+                                        sizeof(struct rds_ib_recv_work));
+
+        atomic_set(&rds_ibdev->srq->s_num_posted,0);
+        clear_bit(0,&rds_ibdev->srq->s_refill_gate);
+
+        if (rds_ib_srq_prefill_ring(rds_ibdev))
+                return 1;
+
+        INIT_DELAYED_WORK(&rds_ibdev->srq->s_refill_w, rds_ib_srq_refill);
+
+        INIT_DELAYED_WORK(&rds_ibdev->srq->s_rearm_w, rds_ib_srq_rearm);
+
+        queue_delayed_work(rds_wq, &rds_ibdev->srq->s_rearm_w, 0);
+
+        return 0;
+}
+
+int rds_ib_srqs_init(void)
+{
+        struct rds_ib_device *rds_ibdev;
+        int ret;
+
+        list_for_each_entry(rds_ibdev, &rds_ib_devices, list) {
+                ret = rds_ib_srq_init(rds_ibdev);
+                if (ret) return ret;
+        }
+
+        return 0;
+}
+
+void rds_ib_srq_exit(struct rds_ib_device *rds_ibdev)
+{
+        int ret;
+
+        ret = ib_destroy_srq(rds_ibdev->srq->s_srq);
+        if (ret) {
+                printk(KERN_WARNING "RDS: ib_destroy_srq failed %d\n", ret);
+        }
+        rds_ibdev->srq->s_srq = NULL;
+
+        if (rds_ibdev->srq->s_recv_hdrs)
+                ib_dma_free_coherent(rds_ibdev->dev,
+                                     rds_ibdev->srq->s_n_wr *
+                                     sizeof(struct rds_header),
+                                     rds_ibdev->srq->s_recv_hdrs,
+                                     rds_ibdev->srq->s_recv_hdrs_dma);
+
+        rds_ib_srq_clear_ring(rds_ibdev);
+        vfree(rds_ibdev->srq->s_recvs);
+        rds_ibdev->srq->s_recvs = NULL;
+}
+
+void rds_ib_srqs_exit(void)
+{
+        struct rds_ib_device *rds_ibdev;
+
+        list_for_each_entry(rds_ibdev, &rds_ib_devices, list) {
+                rds_ib_srq_exit(rds_ibdev);
+        }
+}
index 10959bf155a4ba04098c61eb4df18eaeed61210e..80ce6dcee2b6d9d15ee548dacefa80cc97a86eea 100644 (file)
@@ -55,6 +55,8 @@ static char *rds_ib_stat_names[] = {
        "ib_rx_refill_from_cq",
        "ib_rx_refill_from_thread",
        "ib_rx_alloc_limit",
+        "ib_rx_total_frags",
+        "ib_rx_total_incs",
        "ib_rx_credit_updates",
        "ib_ack_sent",
        "ib_ack_send_failure",
@@ -69,6 +71,9 @@ static char *rds_ib_stat_names[] = {
        "ib_rdma_mr_pool_depleted",
        "ib_atomic_cswp",
        "ib_atomic_fadd",
+        "ib_srq_lows",
+        "ib_srq_refills",
+        "ib_srq_empty_refills",
 };
 
 unsigned int rds_ib_stats_info_copy(struct rds_info_iterator *iter,
index 00bf3e6c93119c5f02fb6428f7a3b2c01af46015..f80dac1ff1b16e86dcbe89fb4efc94d46aa306ff 100644 (file)
@@ -396,7 +396,7 @@ int rds_iw_cm_handle_connect(struct rdma_cm_id *cm_id,
                 RDS_PROTOCOL_MAJOR(version), RDS_PROTOCOL_MINOR(version));
 
        conn = rds_conn_create(dp->dp_daddr, dp->dp_saddr, &rds_iw_transport,
-                              GFP_KERNEL);
+                              0, GFP_KERNEL);
        if (IS_ERR(conn)) {
                rdsdebug("rds_conn_create failed (%ld)\n", PTR_ERR(conn));
                conn = NULL;
index ed82a8d47cc93c2722a773bd88a92fd2bf796e6f..646ba079288adb6f261899aac1e9e77a666997ed 100644 (file)
@@ -76,6 +76,9 @@ int rds_rdma_cm_event_handler(struct rdma_cm_id *cm_id,
                break;
 
        case RDMA_CM_EVENT_ADDR_RESOLVED:
+               rdma_set_service_type(cm_id, conn->c_tos);
+
+
                /* XXX do we need to clean up if this fails? */
                ret = rdma_resolve_route(cm_id,
                                         RDS_RDMA_RESOLVE_TIMEOUT_MS);
index 3aec13d2c7666a0ccf743e56959a93bfa5fd80a5..a4179429a0e3f5b3d54040d83b57d3ac647b4bb0 100644 (file)
@@ -15,7 +15,8 @@
  */
 #define RDS_PROTOCOL_3_0       0x0300
 #define RDS_PROTOCOL_3_1       0x0301
-#define RDS_PROTOCOL_VERSION   RDS_PROTOCOL_3_1
+#define RDS_PROTOCOL_3_2        0x0302
+#define RDS_PROTOCOL_VERSION    RDS_PROTOCOL_3_2
 #define RDS_PROTOCOL_MAJOR(v)  ((v) >> 8)
 #define RDS_PROTOCOL_MINOR(v)  ((v) & 255)
 #define RDS_PROTOCOL(maj, min) (((maj) << 8) | min)
@@ -134,9 +135,12 @@ struct rds_connection {
 
        /* Re-connect stall diagnostics */
        unsigned long           c_reconnect_start;
-       unsigned long           c_reconnect_drops;
+       unsigned int            c_reconnect_drops;
        int                     c_reconnect_warn;
        int                     c_reconnect_err;
+
+        /* Qos support */
+        u8                      c_tos;
 };
 
 #define RDS_FLAG_CONG_BITMAP   0x01
@@ -514,6 +518,8 @@ struct rds_sock {
        unsigned char           rs_recverr,
                                rs_cong_monitor;
        int poison;
+
+       u8                      rs_tos;
 };
 
 static inline struct rds_sock *rds_sk_to_rs(const struct sock *sk)
@@ -615,9 +621,13 @@ struct rds_message *rds_cong_update_alloc(struct rds_connection *conn);
 int rds_conn_init(void);
 void rds_conn_exit(void);
 struct rds_connection *rds_conn_create(__be32 laddr, __be32 faddr,
-                                      struct rds_transport *trans, gfp_t gfp);
+                                       struct rds_transport *trans,
+                                       u8 tos, gfp_t gfp);
 struct rds_connection *rds_conn_create_outgoing(__be32 laddr, __be32 faddr,
-                              struct rds_transport *trans, gfp_t gfp);
+                               struct rds_transport *trans,
+                               u8 tos, gfp_t gfp);
+struct rds_connection *rds_conn_find(__be32 laddr, __be32 faddr,
+                                     struct rds_transport *trans, u8 tos);
 void rds_conn_shutdown(struct rds_connection *conn);
 void rds_conn_destroy(struct rds_connection *conn);
 void rds_conn_reset(struct rds_connection *conn);
index d25b52a5c9baaeb9e2d668ee5cd52ac3c8c6d9af..2c6ce877f03be66d1ea10b62a0e665904d80c1a6 100644 (file)
@@ -530,6 +530,7 @@ void rds_inc_info_copy(struct rds_incoming *inc,
 
        minfo.seq = be64_to_cpu(inc->i_hdr.h_sequence);
        minfo.len = be32_to_cpu(inc->i_hdr.h_len);
+       minfo.tos = inc->i_conn->c_tos;
 
        if (flip) {
                minfo.laddr = daddr;
index ff7428cc9ac1062d9ecd8a26f26aa03cbe781d2b..4f8199e517d14e9ef8eac6f90a0df689a3fb99f6 100644 (file)
@@ -1048,11 +1048,12 @@ int rds_sendmsg(struct kiocb *iocb, struct socket *sock, struct msghdr *msg,
 
        /* rds_conn_create has a spinlock that runs with IRQ off.
         * Caching the conn in the socket helps a lot. */
-       if (rs->rs_conn && rs->rs_conn->c_faddr == daddr)
+        if (rs->rs_conn && rs->rs_conn->c_faddr == daddr &&
+                        rs->rs_tos == rs->rs_conn->c_tos)
                conn = rs->rs_conn;
        else {
                conn = rds_conn_create_outgoing(rs->rs_bound_addr, daddr,
-                                       rs->rs_transport,
+                                       rs->rs_transport, rs->rs_tos,
                                        sock->sk->sk_allocation);
                if (IS_ERR(conn)) {
                        ret = PTR_ERR(conn);
index d2ef06bad6a6f9d74d1d01cda4da8acb147f1523..9a2ae298581a0886ce55b29060b47f7e0a34af43 100644 (file)
@@ -71,7 +71,7 @@ static int rds_tcp_accept_one(struct socket *sock)
                  NIPQUAD(inet->daddr), ntohs(inet->dport));
 
        conn = rds_conn_create(inet->saddr, inet->daddr, &rds_tcp_transport,
-                              GFP_KERNEL);
+                              0, GFP_KERNEL);
        if (IS_ERR(conn)) {
                ret = PTR_ERR(conn);
                goto out;
index 84a06c6c1964dfcec19e2ae0916c672a4cce63c6..dc1896e731b161ac95cacc66a0e0a86803474e4a 100644 (file)
@@ -88,7 +88,9 @@ void rds_connect_complete(struct rds_connection *conn)
        conn->c_reconnect_jiffies = 0;
        set_bit(0, &conn->c_map_queued);
        queue_delayed_work(rds_wq, &conn->c_send_w, 0);
-       queue_delayed_work(rds_wq, &conn->c_recv_w, 0);
+       if (!conn->c_tos)
+               queue_delayed_work(rds_wq, &conn->c_recv_w, 0);
+
        conn->c_connection_start = get_seconds();
 }
 EXPORT_SYMBOL_GPL(rds_connect_complete);