]> www.infradead.org Git - users/jedix/linux-maple.git/commitdiff
rds: Misc Async Send fixes
authorBang Nguyen <bang.nguyen@oracle.com>
Thu, 27 Dec 2012 18:23:05 +0000 (10:23 -0800)
committerMukesh Kacker <mukesh.kacker@oracle.com>
Wed, 8 Jul 2015 20:12:35 +0000 (13:12 -0700)
Async send fixes to support new rds-stress option "--async"

Signed-off-by: Bang Nguyen <bang.nguyen@oracle.com>
net/rds/connection.c
net/rds/ib.c
net/rds/ib_cm.c
net/rds/rdma.c
net/rds/rds.h
net/rds/recv.c
net/rds/send.c
net/rds/threads.c

index d5d2deb9218041706a7405cad073f74b578799c1..71edccf40fc51fca98086d116893a728a8130407 100644 (file)
@@ -527,7 +527,7 @@ static int rds_conn_info_visitor(struct rds_connection *conn,
        rds_conn_info_set(cinfo->flags,
                          atomic_read(&conn->c_state) == RDS_CONN_UP,
                          CONNECTED);
-       rds_conn_info_set(cinfo->flags, conn->c_last_failed_op != NULL,
+       rds_conn_info_set(cinfo->flags, conn->c_pending_flush,
                          ERROR);
        return 1;
 }
index 62f0364edf5389e6c498019b01a494e55ec0a554..903e9fa847a4300fb76b5800a9512a7ea6a47bda 100644 (file)
@@ -1014,13 +1014,13 @@ void rds_ib_add_one(struct ib_device *device)
 
        rds_ibdev->max_1m_fmrs = dev_attr->max_fmr ?
                min_t(unsigned int, dev_attr->max_fmr,
-                       RDS_FMR_1M_POOL_SIZE) :
-                       RDS_FMR_1M_POOL_SIZE;
+                       rds_ib_fmr_1m_pool_size) :
+                       rds_ib_fmr_1m_pool_size;
 
        rds_ibdev->max_8k_fmrs = dev_attr->max_fmr ?
                min_t(unsigned int, dev_attr->max_fmr,
-                       RDS_FMR_8K_POOL_SIZE) :
-                       RDS_FMR_8K_POOL_SIZE;
+                       rds_ib_fmr_8k_pool_size) :
+                       rds_ib_fmr_8k_pool_size;
 
        rds_ibdev->max_initiator_depth = dev_attr->max_qp_init_rd_atom;
        rds_ibdev->max_responder_resources = dev_attr->max_qp_rd_atom;
index bdc64c48b8410fcd3bc936cf1f9e76c5de763657..228c21138caef0ea1f1dd0a5eef95196cba54d8a 100644 (file)
@@ -461,13 +461,13 @@ static void rds_ib_qp_event_handler(struct ib_event *event, void *data)
 static int rds_ib_find_least_loaded_vector(struct rds_ib_device *rds_ibdev)
 {
        int i;
-       int index = 0;
-       int min = rds_ibdev->vector_load[0];
+       int index = rds_ibdev->dev->num_comp_vectors - 1;
+       int min = rds_ibdev->vector_load[rds_ibdev->dev->num_comp_vectors - 1];
 
        if (!rds_ib_cq_balance_enabled)
                return IB_CQ_VECTOR_LEAST_ATTACHED;
 
-       for (i = 1; i < rds_ibdev->dev->num_comp_vectors; i++) {
+       for (i = rds_ibdev->dev->num_comp_vectors - 1; i >= 0; i--) {
                if (rds_ibdev->vector_load[i] < min) {
                        index = i;
                        min = rds_ibdev->vector_load[i];
index c5f630bc3fe1485a4f4312e6ce0f76eb9be4e06f..6dd7bb7e9ea9284a1d55b66ae32c9859c9e29728 100644 (file)
@@ -565,7 +565,7 @@ int rds_cmsg_rdma_args(struct rds_sock *rs, struct rds_message *rm,
        WARN_ON(!nr_pages);
        op->op_sg = rds_message_alloc_sgs(rm, nr_pages);
 
-       if (op->op_notify || op->op_recverr) {
+       if (op->op_notify || op->op_recverr || rds_async_send_enabled) {
                /* We allocate an uninitialized notifier here, because
                 * we don't want to do that in the completion handler. We
                 * would have to use GFP_ATOMIC there, and don't want to deal
@@ -775,7 +775,7 @@ int rds_cmsg_atomic(struct rds_sock *rs, struct rds_message *rm,
 
        sg_set_page(rm->atomic.op_sg, page, 8, offset_in_page(args->local_addr));
 
-       if (rm->atomic.op_notify || rm->atomic.op_recverr) {
+       if (rm->atomic.op_notify || rm->atomic.op_recverr || rds_async_send_enabled) {
                /* We allocate an uninitialized notifier here, because
                 * we don't want to do that in the completion handler. We
                 * would have to use GFP_ATOMIC there, and don't want to deal
index b3ec12a769a49cd86f269f3aa226154b5e6fe949..038e809c19638e5b8044445e22721e93d72bc221 100644 (file)
@@ -145,7 +145,7 @@ struct rds_connection {
        /* Qos support */
        u8                      c_tos;
 
-       struct rds_notifier     *c_last_failed_op;
+       unsigned int            c_pending_flush;
 
        unsigned long           c_hb_start;
 
@@ -764,6 +764,8 @@ int rds_send_hb(struct rds_connection *conn, int response);
 struct rds_message *rds_send_get_message(struct rds_connection *,
                                         struct rm_rdma_op *);
 
+extern unsigned int rds_async_send_enabled;
+
 /* rdma.c */
 void rds_rdma_unuse(struct rds_sock *rs, u32 r_key, int force);
 int rds_get_mr(struct rds_sock *rs, char __user *optval, int optlen);
index e4e7cdd466f722ed5204cf0c371b8fa1bf196c3c..a298aadc2fd45ca0aee04b6c9dfadd16af9b9c0d 100644 (file)
@@ -194,6 +194,7 @@ void rds_recv_incoming(struct rds_connection *conn, __be32 saddr, __be32 daddr,
         * XXX we could spend more on the wire to get more robust failure
         * detection, arguably worth it to avoid data corruption.
         */
+
        if (be64_to_cpu(inc->i_hdr.h_sequence) < conn->c_next_rx_seq
         && (inc->i_hdr.h_flags & RDS_FLAG_RETRANSMITTED)) {
                rds_stats_inc(s_recv_drop_old_seq);
@@ -349,9 +350,14 @@ int rds_notify_queue_get(struct rds_sock *rs, struct msghdr *msghdr)
 
                /* If this is the last failed op, re-open the connection for
                   traffic */
-               if (notifier->n_conn &&
-                       notifier->n_conn->c_last_failed_op == notifier)
-                               notifier->n_conn->c_last_failed_op = NULL;
+               if (notifier->n_conn) {
+                       spin_lock_irqsave(&notifier->n_conn->c_lock, flags);
+                       if (notifier->n_conn->c_pending_flush)
+                               notifier->n_conn->c_pending_flush--;
+                       else
+                               printk(KERN_ERR "rds_notify_queue_get: OOPS!\n");
+                       spin_unlock_irqrestore(&notifier->n_conn->c_lock, flags);
+               }
 
                list_del_init(&notifier->n_list);
                kfree(notifier);
index 156b686574c0d8485aca85980ce597cc6484e8dd..a3b5ecc617fb4f5da61355b144ca8487d7118efe 100644 (file)
@@ -51,7 +51,7 @@ static int send_batch_count = 64;
 module_param(send_batch_count, int, 0444);
 MODULE_PARM_DESC(send_batch_count, " batch factor when working the send queue");
 
-static unsigned int rds_async_send_enabled = 0;
+unsigned int rds_async_send_enabled = 0;
 module_param(rds_async_send_enabled, int, 0444);
 MODULE_PARM_DESC(rds_async_send_enabled, "Set to enable Async Send");
 
@@ -72,8 +72,10 @@ void rds_send_reset(struct rds_connection *conn)
                 * transport. This isn't entirely true (it's flushed out
                 * independently) but as the connection is down, there's
                 * no ongoing RDMA to/from that memory */
-               rds_message_unmapped(rm);
-               rds_message_put(rm);
+               if (!rds_async_send_enabled) {
+                       rds_message_unmapped(rm);
+                       rds_message_put(rm);
+               }
        }
 
        conn->c_xmit_sg = 0;
@@ -112,17 +114,31 @@ void rds_send_reset(struct rds_connection *conn)
                                m_conn_item) {
                        if (rm->rdma.op_active) {
                                if (rm->rdma.op_notifier) {
-                                       conn->c_last_failed_op =
-                                               rm->rdma.op_notifier;
-                                       rm->rdma.op_notifier->n_conn = conn;
+                                       struct rds_notifier *notifier;
+
+                                       notifier = rm->rdma.op_notifier;
+                                       notifier->n_conn = conn;
+                                       if (test_bit(RDS_MSG_RETRANSMITTED,
+                                               &rm->m_flags) &&
+                                               !notifier->n_status) {
+                                                       notifier->n_status =
+                                                       RDS_RDMA_REMOTE_ERROR;
+                                       }
+
+                                       if (!test_bit(RDS_MSG_FLUSH,
+                                               &rm->m_flags)) {
+                                               conn->c_pending_flush++;
+                                       }
                                }
                                set_bit(RDS_MSG_FLUSH, &rm->m_flags);
                        }
                        if (rm->data.op_active && rm->data.op_async) {
                                if (rm->data.op_notifier) {
-                                       conn->c_last_failed_op =
-                                               rm->data.op_notifier;
                                        rm->data.op_notifier->n_conn = conn;
+                                       if (!test_bit(RDS_MSG_FLUSH,
+                                               &rm->m_flags)) {
+                                               conn->c_pending_flush++;
+                                       }
                                }
                                set_bit(RDS_MSG_FLUSH, &rm->m_flags);
                        }
@@ -299,12 +315,15 @@ restart:
                         * with RDMA ops.
                         */
 
-                       if ((rm->rdma.op_active
-                        && test_bit(RDS_MSG_RETRANSMITTED, &rm->m_flags))
-                        || test_bit(RDS_MSG_FLUSH, &rm->m_flags)) {
+                       if (test_bit(RDS_MSG_FLUSH, &rm->m_flags) ||
+                               (rm->rdma.op_active &&
+                               test_bit(RDS_MSG_RETRANSMITTED,
+                                       &rm->m_flags))) {
                                spin_lock_irqsave(&conn->c_lock, flags);
-                               if (test_and_clear_bit(RDS_MSG_ON_CONN, &rm->m_flags))
-                                       list_move_tail(&rm->m_conn_item, &to_be_dropped);
+                               if (test_and_clear_bit(RDS_MSG_ON_CONN,
+                                       &rm->m_flags))
+                                       list_move_tail(&rm->m_conn_item,
+                                               &to_be_dropped);
                                spin_unlock_irqrestore(&conn->c_lock, flags);
                                continue;
                        }
@@ -445,6 +464,7 @@ over_batch:
                list_for_each_entry(rm, &to_be_dropped, m_conn_item) {
                        if (rds_async_send_enabled && rm->rdma.op_implicit_mr)
                                rds_rdma_unuse(rm->m_rs, rds_rdma_cookie_key(rm->m_rdma_cookie), 1);
+                       rds_message_unmapped(rm);
                        rds_message_put(rm);
                }
                rds_send_remove_from_sock(&to_be_dropped, RDS_RDMA_SEND_DROPPED);
@@ -519,11 +539,12 @@ void rds_asend_complete(struct rds_message *rm, int status)
                debug_sock_hold(rds_rs_to_sk(rs));
 
                notifier->n_status = status;
-               spin_lock(&rs->rs_lock);
-               list_add_tail(&notifier->n_list, &rs->rs_notify_queue);
-               spin_unlock(&rs->rs_lock);
-
-               so->op_notifier = NULL;
+               if (!status) {
+                       spin_lock(&rs->rs_lock);
+                       list_add_tail(&notifier->n_list, &rs->rs_notify_queue);
+                       spin_unlock(&rs->rs_lock);
+                       so->op_notifier = NULL;
+               }
        }
 
        spin_unlock_irqrestore(&rm->m_rs_lock, flags);
@@ -560,10 +581,13 @@ void rds_rdma_send_complete(struct rds_message *rm, int status)
                notifier->n_status = status;
 
                if (!ro->op_remote_complete) {
-                       spin_lock(&rs->rs_lock);
-                       list_add_tail(&notifier->n_list, &rs->rs_notify_queue);
-                       spin_unlock(&rs->rs_lock);
-                       ro->op_notifier = NULL;
+                       if (rds_async_send_enabled && !status) {
+                               spin_lock(&rs->rs_lock);
+                               list_add_tail(&notifier->n_list,
+                                       &rs->rs_notify_queue);
+                               spin_unlock(&rs->rs_lock);
+                               ro->op_notifier = NULL;
+                       }
                }
        }
 
@@ -596,11 +620,13 @@ void rds_atomic_send_complete(struct rds_message *rm, int status)
                debug_sock_hold(rds_rs_to_sk(rs));
 
                notifier->n_status = status;
-               spin_lock(&rs->rs_lock);
-               list_add_tail(&notifier->n_list, &rs->rs_notify_queue);
-               spin_unlock(&rs->rs_lock);
-
-               ao->op_notifier = NULL;
+               if (rds_async_send_enabled && !status) {
+                       spin_lock(&rs->rs_lock);
+                       list_add_tail(&notifier->n_list,
+                               &rs->rs_notify_queue);
+                       spin_unlock(&rs->rs_lock);
+                       ao->op_notifier = NULL;
+               }
        }
 
        spin_unlock_irqrestore(&rm->m_rs_lock, flags);
@@ -747,6 +773,19 @@ void rds_send_remove_from_sock(struct list_head *messages, int status)
                                } else
                                        kfree(rm->rdma.op_notifier);
                                rm->rdma.op_notifier = NULL;
+                       } else if (rm->atomic.op_active && rm->atomic.op_notifier) {
+                               struct rm_atomic_op *ao = &rm->atomic;
+                               struct rds_notifier *notifier;
+
+                               if (ao->op_notify || status) {
+                                       notifier = ao->op_notifier;
+                                       list_add_tail(&notifier->n_list,
+                                               &rs->rs_notify_queue);
+                                       if (!notifier->n_status)
+                                               notifier->n_status = status;
+                               } else
+                                       kfree(rm->atomic.op_notifier);
+                               rm->atomic.op_notifier = NULL;
                        } else if (rm->data.op_active && rm->data.op_notifier) {
                                struct rm_data_op *so = &rm->data;
                                struct rds_notifier *notifier;
@@ -966,20 +1005,18 @@ static int rds_send_queue_rm(struct rds_sock *rs, struct rds_connection *conn,
                /* This can race with rds_send_reset. If an async op sneaked
                 * in after resetting the send state, flush it too.
                 */
-               if (conn->c_last_failed_op) {
+               if (conn->c_pending_flush) {
                        if (rm->rdma.op_active) {
                                if (rm->rdma.op_notifier) {
-                                       conn->c_last_failed_op =
-                                               rm->rdma.op_notifier;
                                        rm->rdma.op_notifier->n_conn = conn;
+                                       conn->c_pending_flush++;
                                }
                                set_bit(RDS_MSG_FLUSH, &rm->m_flags);
                        }
                        if (rm->data.op_active && rm->data.op_async) {
                                if (rm->data.op_notifier) {
-                                       conn->c_last_failed_op =
-                                               rm->data.op_notifier;
                                        rm->data.op_notifier->n_conn = conn;
+                                       conn->c_pending_flush++;
                                }
                                set_bit(RDS_MSG_FLUSH, &rm->m_flags);
                        }
@@ -1129,6 +1166,11 @@ static int rds_cmsg_send(struct rds_sock *rs, struct rds_message *rm,
        return ret;
 }
 
+struct user_hdr {
+       u32     seq;
+       u8      op;
+};
+
 int rds_sendmsg(struct kiocb *iocb, struct socket *sock, struct msghdr *msg,
                size_t payload_len)
 {
@@ -1187,6 +1229,8 @@ int rds_sendmsg(struct kiocb *iocb, struct socket *sock, struct msghdr *msg,
 
        /* Attach data to the rm */
        if (payload_len) {
+               struct user_hdr *uhdr = msg->msg_iov->iov_base;
+
                rm->data.op_sg = rds_message_alloc_sgs(rm, ceil(payload_len, PAGE_SIZE));
                ret = rds_message_copy_from_user(rm, msg->msg_iov, payload_len);
                if (ret)
@@ -1225,7 +1269,7 @@ int rds_sendmsg(struct kiocb *iocb, struct socket *sock, struct msghdr *msg,
        */
 
        /* Not accepting new sends until all the failed ops have been reaped */
-       if (conn->c_last_failed_op) {
+       if (rds_async_send_enabled && conn->c_pending_flush) {
                ret = -EAGAIN;
                goto out;
        }
index 975364addcdc0e06635fe076ffd73c9db3a30d59..96b9c99b7aa2d01302be75c803026b29a6e111a0 100644 (file)
@@ -78,8 +78,8 @@ EXPORT_SYMBOL_GPL(rds_wq);
 void rds_connect_complete(struct rds_connection *conn)
 {
        if (!rds_conn_transition(conn, RDS_CONN_CONNECTING, RDS_CONN_UP)) {
-               printk(KERN_WARNING "%s: Cannot transition to state UP"
-                               "current state is %d\n",
+               printk(KERN_WARNING "%s: Cannot transition to state UP"
+                               "current state is %d\n",
                                __func__,
                                atomic_read(&conn->c_state));
                atomic_set(&conn->c_state, RDS_CONN_ERROR);