module_param(send_batch_count, int, 0444);
MODULE_PARM_DESC(send_batch_count, " batch factor when working the send queue");
-static unsigned int rds_async_send_enabled = 0;
+unsigned int rds_async_send_enabled = 0;
module_param(rds_async_send_enabled, int, 0444);
MODULE_PARM_DESC(rds_async_send_enabled, "Set to enable Async Send");
* transport. This isn't entirely true (it's flushed out
* independently) but as the connection is down, there's
* no ongoing RDMA to/from that memory */
- rds_message_unmapped(rm);
- rds_message_put(rm);
+ if (!rds_async_send_enabled) {
+ rds_message_unmapped(rm);
+ rds_message_put(rm);
+ }
}
conn->c_xmit_sg = 0;
m_conn_item) {
if (rm->rdma.op_active) {
if (rm->rdma.op_notifier) {
- conn->c_last_failed_op =
- rm->rdma.op_notifier;
- rm->rdma.op_notifier->n_conn = conn;
+ struct rds_notifier *notifier;
+
+ notifier = rm->rdma.op_notifier;
+ notifier->n_conn = conn;
+ if (test_bit(RDS_MSG_RETRANSMITTED,
+ &rm->m_flags) &&
+ !notifier->n_status) {
+ notifier->n_status =
+ RDS_RDMA_REMOTE_ERROR;
+ }
+
+ if (!test_bit(RDS_MSG_FLUSH,
+ &rm->m_flags)) {
+ conn->c_pending_flush++;
+ }
}
set_bit(RDS_MSG_FLUSH, &rm->m_flags);
}
if (rm->data.op_active && rm->data.op_async) {
if (rm->data.op_notifier) {
- conn->c_last_failed_op =
- rm->data.op_notifier;
rm->data.op_notifier->n_conn = conn;
+ if (!test_bit(RDS_MSG_FLUSH,
+ &rm->m_flags)) {
+ conn->c_pending_flush++;
+ }
}
set_bit(RDS_MSG_FLUSH, &rm->m_flags);
}
* with RDMA ops.
*/
- if ((rm->rdma.op_active
- && test_bit(RDS_MSG_RETRANSMITTED, &rm->m_flags))
- || test_bit(RDS_MSG_FLUSH, &rm->m_flags)) {
+ if (test_bit(RDS_MSG_FLUSH, &rm->m_flags) ||
+ (rm->rdma.op_active &&
+ test_bit(RDS_MSG_RETRANSMITTED,
+ &rm->m_flags))) {
spin_lock_irqsave(&conn->c_lock, flags);
- if (test_and_clear_bit(RDS_MSG_ON_CONN, &rm->m_flags))
- list_move_tail(&rm->m_conn_item, &to_be_dropped);
+ if (test_and_clear_bit(RDS_MSG_ON_CONN,
+ &rm->m_flags))
+ list_move_tail(&rm->m_conn_item,
+ &to_be_dropped);
spin_unlock_irqrestore(&conn->c_lock, flags);
continue;
}
list_for_each_entry(rm, &to_be_dropped, m_conn_item) {
if (rds_async_send_enabled && rm->rdma.op_implicit_mr)
rds_rdma_unuse(rm->m_rs, rds_rdma_cookie_key(rm->m_rdma_cookie), 1);
+ rds_message_unmapped(rm);
rds_message_put(rm);
}
rds_send_remove_from_sock(&to_be_dropped, RDS_RDMA_SEND_DROPPED);
debug_sock_hold(rds_rs_to_sk(rs));
notifier->n_status = status;
- spin_lock(&rs->rs_lock);
- list_add_tail(¬ifier->n_list, &rs->rs_notify_queue);
- spin_unlock(&rs->rs_lock);
-
- so->op_notifier = NULL;
+ if (!status) {
+ spin_lock(&rs->rs_lock);
+ list_add_tail(¬ifier->n_list, &rs->rs_notify_queue);
+ spin_unlock(&rs->rs_lock);
+ so->op_notifier = NULL;
+ }
}
spin_unlock_irqrestore(&rm->m_rs_lock, flags);
notifier->n_status = status;
if (!ro->op_remote_complete) {
- spin_lock(&rs->rs_lock);
- list_add_tail(¬ifier->n_list, &rs->rs_notify_queue);
- spin_unlock(&rs->rs_lock);
- ro->op_notifier = NULL;
+ if (rds_async_send_enabled && !status) {
+ spin_lock(&rs->rs_lock);
+ list_add_tail(¬ifier->n_list,
+ &rs->rs_notify_queue);
+ spin_unlock(&rs->rs_lock);
+ ro->op_notifier = NULL;
+ }
}
}
debug_sock_hold(rds_rs_to_sk(rs));
notifier->n_status = status;
- spin_lock(&rs->rs_lock);
- list_add_tail(¬ifier->n_list, &rs->rs_notify_queue);
- spin_unlock(&rs->rs_lock);
-
- ao->op_notifier = NULL;
+ if (rds_async_send_enabled && !status) {
+ spin_lock(&rs->rs_lock);
+ list_add_tail(¬ifier->n_list,
+ &rs->rs_notify_queue);
+ spin_unlock(&rs->rs_lock);
+ ao->op_notifier = NULL;
+ }
}
spin_unlock_irqrestore(&rm->m_rs_lock, flags);
} else
kfree(rm->rdma.op_notifier);
rm->rdma.op_notifier = NULL;
+ } else if (rm->atomic.op_active && rm->atomic.op_notifier) {
+ struct rm_atomic_op *ao = &rm->atomic;
+ struct rds_notifier *notifier;
+
+ if (ao->op_notify || status) {
+ notifier = ao->op_notifier;
+ list_add_tail(¬ifier->n_list,
+ &rs->rs_notify_queue);
+ if (!notifier->n_status)
+ notifier->n_status = status;
+ } else
+ kfree(rm->atomic.op_notifier);
+ rm->atomic.op_notifier = NULL;
} else if (rm->data.op_active && rm->data.op_notifier) {
struct rm_data_op *so = &rm->data;
struct rds_notifier *notifier;
/* This can race with rds_send_reset. If an async op sneaked
* in after resetting the send state, flush it too.
*/
- if (conn->c_last_failed_op) {
+ if (conn->c_pending_flush) {
if (rm->rdma.op_active) {
if (rm->rdma.op_notifier) {
- conn->c_last_failed_op =
- rm->rdma.op_notifier;
rm->rdma.op_notifier->n_conn = conn;
+ conn->c_pending_flush++;
}
set_bit(RDS_MSG_FLUSH, &rm->m_flags);
}
if (rm->data.op_active && rm->data.op_async) {
if (rm->data.op_notifier) {
- conn->c_last_failed_op =
- rm->data.op_notifier;
rm->data.op_notifier->n_conn = conn;
+ conn->c_pending_flush++;
}
set_bit(RDS_MSG_FLUSH, &rm->m_flags);
}
return ret;
}
+struct user_hdr {
+ u32 seq;
+ u8 op;
+};
+
int rds_sendmsg(struct kiocb *iocb, struct socket *sock, struct msghdr *msg,
size_t payload_len)
{
/* Attach data to the rm */
if (payload_len) {
+ struct user_hdr *uhdr = msg->msg_iov->iov_base;
+
rm->data.op_sg = rds_message_alloc_sgs(rm, ceil(payload_len, PAGE_SIZE));
ret = rds_message_copy_from_user(rm, msg->msg_iov, payload_len);
if (ret)
*/
/* Not accepting new sends until all the failed ops have been reaped */
- if (conn->c_last_failed_op) {
+ if (rds_async_send_enabled && conn->c_pending_flush) {
ret = -EAGAIN;
goto out;
}