From: Bang Nguyen Date: Thu, 27 Dec 2012 18:23:05 +0000 (-0800) Subject: rds: Misc Async Send fixes X-Git-Tag: v4.1.12-92~319^2^2~2^2~7 X-Git-Url: https://www.infradead.org/git/?a=commitdiff_plain;h=b5d11f7bab5b8da509837b080036d35f3d782f13;p=users%2Fjedix%2Flinux-maple.git rds: Misc Async Send fixes Async send fixes to support new rds-stress option "--async" Signed-off-by: Bang Nguyen --- diff --git a/net/rds/connection.c b/net/rds/connection.c index d5d2deb921804..71edccf40fc51 100644 --- a/net/rds/connection.c +++ b/net/rds/connection.c @@ -527,7 +527,7 @@ static int rds_conn_info_visitor(struct rds_connection *conn, rds_conn_info_set(cinfo->flags, atomic_read(&conn->c_state) == RDS_CONN_UP, CONNECTED); - rds_conn_info_set(cinfo->flags, conn->c_last_failed_op != NULL, + rds_conn_info_set(cinfo->flags, conn->c_pending_flush, ERROR); return 1; } diff --git a/net/rds/ib.c b/net/rds/ib.c index 62f0364edf538..903e9fa847a43 100644 --- a/net/rds/ib.c +++ b/net/rds/ib.c @@ -1014,13 +1014,13 @@ void rds_ib_add_one(struct ib_device *device) rds_ibdev->max_1m_fmrs = dev_attr->max_fmr ? min_t(unsigned int, dev_attr->max_fmr, - RDS_FMR_1M_POOL_SIZE) : - RDS_FMR_1M_POOL_SIZE; + rds_ib_fmr_1m_pool_size) : + rds_ib_fmr_1m_pool_size; rds_ibdev->max_8k_fmrs = dev_attr->max_fmr ? min_t(unsigned int, dev_attr->max_fmr, - RDS_FMR_8K_POOL_SIZE) : - RDS_FMR_8K_POOL_SIZE; + rds_ib_fmr_8k_pool_size) : + rds_ib_fmr_8k_pool_size; rds_ibdev->max_initiator_depth = dev_attr->max_qp_init_rd_atom; rds_ibdev->max_responder_resources = dev_attr->max_qp_rd_atom; diff --git a/net/rds/ib_cm.c b/net/rds/ib_cm.c index bdc64c48b8410..228c21138caef 100644 --- a/net/rds/ib_cm.c +++ b/net/rds/ib_cm.c @@ -461,13 +461,13 @@ static void rds_ib_qp_event_handler(struct ib_event *event, void *data) static int rds_ib_find_least_loaded_vector(struct rds_ib_device *rds_ibdev) { int i; - int index = 0; - int min = rds_ibdev->vector_load[0]; + int index = rds_ibdev->dev->num_comp_vectors - 1; + int min = rds_ibdev->vector_load[rds_ibdev->dev->num_comp_vectors - 1]; if (!rds_ib_cq_balance_enabled) return IB_CQ_VECTOR_LEAST_ATTACHED; - for (i = 1; i < rds_ibdev->dev->num_comp_vectors; i++) { + for (i = rds_ibdev->dev->num_comp_vectors - 1; i >= 0; i--) { if (rds_ibdev->vector_load[i] < min) { index = i; min = rds_ibdev->vector_load[i]; diff --git a/net/rds/rdma.c b/net/rds/rdma.c index c5f630bc3fe14..6dd7bb7e9ea92 100644 --- a/net/rds/rdma.c +++ b/net/rds/rdma.c @@ -565,7 +565,7 @@ int rds_cmsg_rdma_args(struct rds_sock *rs, struct rds_message *rm, WARN_ON(!nr_pages); op->op_sg = rds_message_alloc_sgs(rm, nr_pages); - if (op->op_notify || op->op_recverr) { + if (op->op_notify || op->op_recverr || rds_async_send_enabled) { /* We allocate an uninitialized notifier here, because * we don't want to do that in the completion handler. We * would have to use GFP_ATOMIC there, and don't want to deal @@ -775,7 +775,7 @@ int rds_cmsg_atomic(struct rds_sock *rs, struct rds_message *rm, sg_set_page(rm->atomic.op_sg, page, 8, offset_in_page(args->local_addr)); - if (rm->atomic.op_notify || rm->atomic.op_recverr) { + if (rm->atomic.op_notify || rm->atomic.op_recverr || rds_async_send_enabled) { /* We allocate an uninitialized notifier here, because * we don't want to do that in the completion handler. We * would have to use GFP_ATOMIC there, and don't want to deal diff --git a/net/rds/rds.h b/net/rds/rds.h index b3ec12a769a49..038e809c19638 100644 --- a/net/rds/rds.h +++ b/net/rds/rds.h @@ -145,7 +145,7 @@ struct rds_connection { /* Qos support */ u8 c_tos; - struct rds_notifier *c_last_failed_op; + unsigned int c_pending_flush; unsigned long c_hb_start; @@ -764,6 +764,8 @@ int rds_send_hb(struct rds_connection *conn, int response); struct rds_message *rds_send_get_message(struct rds_connection *, struct rm_rdma_op *); +extern unsigned int rds_async_send_enabled; + /* rdma.c */ void rds_rdma_unuse(struct rds_sock *rs, u32 r_key, int force); int rds_get_mr(struct rds_sock *rs, char __user *optval, int optlen); diff --git a/net/rds/recv.c b/net/rds/recv.c index e4e7cdd466f72..a298aadc2fd45 100644 --- a/net/rds/recv.c +++ b/net/rds/recv.c @@ -194,6 +194,7 @@ void rds_recv_incoming(struct rds_connection *conn, __be32 saddr, __be32 daddr, * XXX we could spend more on the wire to get more robust failure * detection, arguably worth it to avoid data corruption. */ + if (be64_to_cpu(inc->i_hdr.h_sequence) < conn->c_next_rx_seq && (inc->i_hdr.h_flags & RDS_FLAG_RETRANSMITTED)) { rds_stats_inc(s_recv_drop_old_seq); @@ -349,9 +350,14 @@ int rds_notify_queue_get(struct rds_sock *rs, struct msghdr *msghdr) /* If this is the last failed op, re-open the connection for traffic */ - if (notifier->n_conn && - notifier->n_conn->c_last_failed_op == notifier) - notifier->n_conn->c_last_failed_op = NULL; + if (notifier->n_conn) { + spin_lock_irqsave(¬ifier->n_conn->c_lock, flags); + if (notifier->n_conn->c_pending_flush) + notifier->n_conn->c_pending_flush--; + else + printk(KERN_ERR "rds_notify_queue_get: OOPS!\n"); + spin_unlock_irqrestore(¬ifier->n_conn->c_lock, flags); + } list_del_init(¬ifier->n_list); kfree(notifier); diff --git a/net/rds/send.c b/net/rds/send.c index 156b686574c0d..a3b5ecc617fb4 100644 --- a/net/rds/send.c +++ b/net/rds/send.c @@ -51,7 +51,7 @@ static int send_batch_count = 64; module_param(send_batch_count, int, 0444); MODULE_PARM_DESC(send_batch_count, " batch factor when working the send queue"); -static unsigned int rds_async_send_enabled = 0; +unsigned int rds_async_send_enabled = 0; module_param(rds_async_send_enabled, int, 0444); MODULE_PARM_DESC(rds_async_send_enabled, "Set to enable Async Send"); @@ -72,8 +72,10 @@ void rds_send_reset(struct rds_connection *conn) * transport. This isn't entirely true (it's flushed out * independently) but as the connection is down, there's * no ongoing RDMA to/from that memory */ - rds_message_unmapped(rm); - rds_message_put(rm); + if (!rds_async_send_enabled) { + rds_message_unmapped(rm); + rds_message_put(rm); + } } conn->c_xmit_sg = 0; @@ -112,17 +114,31 @@ void rds_send_reset(struct rds_connection *conn) m_conn_item) { if (rm->rdma.op_active) { if (rm->rdma.op_notifier) { - conn->c_last_failed_op = - rm->rdma.op_notifier; - rm->rdma.op_notifier->n_conn = conn; + struct rds_notifier *notifier; + + notifier = rm->rdma.op_notifier; + notifier->n_conn = conn; + if (test_bit(RDS_MSG_RETRANSMITTED, + &rm->m_flags) && + !notifier->n_status) { + notifier->n_status = + RDS_RDMA_REMOTE_ERROR; + } + + if (!test_bit(RDS_MSG_FLUSH, + &rm->m_flags)) { + conn->c_pending_flush++; + } } set_bit(RDS_MSG_FLUSH, &rm->m_flags); } if (rm->data.op_active && rm->data.op_async) { if (rm->data.op_notifier) { - conn->c_last_failed_op = - rm->data.op_notifier; rm->data.op_notifier->n_conn = conn; + if (!test_bit(RDS_MSG_FLUSH, + &rm->m_flags)) { + conn->c_pending_flush++; + } } set_bit(RDS_MSG_FLUSH, &rm->m_flags); } @@ -299,12 +315,15 @@ restart: * with RDMA ops. */ - if ((rm->rdma.op_active - && test_bit(RDS_MSG_RETRANSMITTED, &rm->m_flags)) - || test_bit(RDS_MSG_FLUSH, &rm->m_flags)) { + if (test_bit(RDS_MSG_FLUSH, &rm->m_flags) || + (rm->rdma.op_active && + test_bit(RDS_MSG_RETRANSMITTED, + &rm->m_flags))) { spin_lock_irqsave(&conn->c_lock, flags); - if (test_and_clear_bit(RDS_MSG_ON_CONN, &rm->m_flags)) - list_move_tail(&rm->m_conn_item, &to_be_dropped); + if (test_and_clear_bit(RDS_MSG_ON_CONN, + &rm->m_flags)) + list_move_tail(&rm->m_conn_item, + &to_be_dropped); spin_unlock_irqrestore(&conn->c_lock, flags); continue; } @@ -445,6 +464,7 @@ over_batch: list_for_each_entry(rm, &to_be_dropped, m_conn_item) { if (rds_async_send_enabled && rm->rdma.op_implicit_mr) rds_rdma_unuse(rm->m_rs, rds_rdma_cookie_key(rm->m_rdma_cookie), 1); + rds_message_unmapped(rm); rds_message_put(rm); } rds_send_remove_from_sock(&to_be_dropped, RDS_RDMA_SEND_DROPPED); @@ -519,11 +539,12 @@ void rds_asend_complete(struct rds_message *rm, int status) debug_sock_hold(rds_rs_to_sk(rs)); notifier->n_status = status; - spin_lock(&rs->rs_lock); - list_add_tail(¬ifier->n_list, &rs->rs_notify_queue); - spin_unlock(&rs->rs_lock); - - so->op_notifier = NULL; + if (!status) { + spin_lock(&rs->rs_lock); + list_add_tail(¬ifier->n_list, &rs->rs_notify_queue); + spin_unlock(&rs->rs_lock); + so->op_notifier = NULL; + } } spin_unlock_irqrestore(&rm->m_rs_lock, flags); @@ -560,10 +581,13 @@ void rds_rdma_send_complete(struct rds_message *rm, int status) notifier->n_status = status; if (!ro->op_remote_complete) { - spin_lock(&rs->rs_lock); - list_add_tail(¬ifier->n_list, &rs->rs_notify_queue); - spin_unlock(&rs->rs_lock); - ro->op_notifier = NULL; + if (rds_async_send_enabled && !status) { + spin_lock(&rs->rs_lock); + list_add_tail(¬ifier->n_list, + &rs->rs_notify_queue); + spin_unlock(&rs->rs_lock); + ro->op_notifier = NULL; + } } } @@ -596,11 +620,13 @@ void rds_atomic_send_complete(struct rds_message *rm, int status) debug_sock_hold(rds_rs_to_sk(rs)); notifier->n_status = status; - spin_lock(&rs->rs_lock); - list_add_tail(¬ifier->n_list, &rs->rs_notify_queue); - spin_unlock(&rs->rs_lock); - - ao->op_notifier = NULL; + if (rds_async_send_enabled && !status) { + spin_lock(&rs->rs_lock); + list_add_tail(¬ifier->n_list, + &rs->rs_notify_queue); + spin_unlock(&rs->rs_lock); + ao->op_notifier = NULL; + } } spin_unlock_irqrestore(&rm->m_rs_lock, flags); @@ -747,6 +773,19 @@ void rds_send_remove_from_sock(struct list_head *messages, int status) } else kfree(rm->rdma.op_notifier); rm->rdma.op_notifier = NULL; + } else if (rm->atomic.op_active && rm->atomic.op_notifier) { + struct rm_atomic_op *ao = &rm->atomic; + struct rds_notifier *notifier; + + if (ao->op_notify || status) { + notifier = ao->op_notifier; + list_add_tail(¬ifier->n_list, + &rs->rs_notify_queue); + if (!notifier->n_status) + notifier->n_status = status; + } else + kfree(rm->atomic.op_notifier); + rm->atomic.op_notifier = NULL; } else if (rm->data.op_active && rm->data.op_notifier) { struct rm_data_op *so = &rm->data; struct rds_notifier *notifier; @@ -966,20 +1005,18 @@ static int rds_send_queue_rm(struct rds_sock *rs, struct rds_connection *conn, /* This can race with rds_send_reset. If an async op sneaked * in after resetting the send state, flush it too. */ - if (conn->c_last_failed_op) { + if (conn->c_pending_flush) { if (rm->rdma.op_active) { if (rm->rdma.op_notifier) { - conn->c_last_failed_op = - rm->rdma.op_notifier; rm->rdma.op_notifier->n_conn = conn; + conn->c_pending_flush++; } set_bit(RDS_MSG_FLUSH, &rm->m_flags); } if (rm->data.op_active && rm->data.op_async) { if (rm->data.op_notifier) { - conn->c_last_failed_op = - rm->data.op_notifier; rm->data.op_notifier->n_conn = conn; + conn->c_pending_flush++; } set_bit(RDS_MSG_FLUSH, &rm->m_flags); } @@ -1129,6 +1166,11 @@ static int rds_cmsg_send(struct rds_sock *rs, struct rds_message *rm, return ret; } +struct user_hdr { + u32 seq; + u8 op; +}; + int rds_sendmsg(struct kiocb *iocb, struct socket *sock, struct msghdr *msg, size_t payload_len) { @@ -1187,6 +1229,8 @@ int rds_sendmsg(struct kiocb *iocb, struct socket *sock, struct msghdr *msg, /* Attach data to the rm */ if (payload_len) { + struct user_hdr *uhdr = msg->msg_iov->iov_base; + rm->data.op_sg = rds_message_alloc_sgs(rm, ceil(payload_len, PAGE_SIZE)); ret = rds_message_copy_from_user(rm, msg->msg_iov, payload_len); if (ret) @@ -1225,7 +1269,7 @@ int rds_sendmsg(struct kiocb *iocb, struct socket *sock, struct msghdr *msg, */ /* Not accepting new sends until all the failed ops have been reaped */ - if (conn->c_last_failed_op) { + if (rds_async_send_enabled && conn->c_pending_flush) { ret = -EAGAIN; goto out; } diff --git a/net/rds/threads.c b/net/rds/threads.c index 975364addcdc0..96b9c99b7aa2d 100644 --- a/net/rds/threads.c +++ b/net/rds/threads.c @@ -78,8 +78,8 @@ EXPORT_SYMBOL_GPL(rds_wq); void rds_connect_complete(struct rds_connection *conn) { if (!rds_conn_transition(conn, RDS_CONN_CONNECTING, RDS_CONN_UP)) { - printk(KERN_WARNING "%s: Cannot transition to state UP, " - "current state is %d\n", + printk(KERN_WARNING "%s: Cannot transition to state UP" + ", current state is %d\n", __func__, atomic_read(&conn->c_state)); atomic_set(&conn->c_state, RDS_CONN_ERROR);