complete(rm, notify_status);
}
+static void rds_ib_send_unmap_rdma(struct rds_ib_connection *ic,
+ struct rm_rdma_op *op,
+ int wc_status);
+
static void rds_ib_send_unmap_data(struct rds_ib_connection *ic,
struct rm_data_op *op,
int wc_status)
{
+ struct rds_message *rm;
+
+ rm = container_of(op, struct rds_message, data);
+
if (op->op_nents)
ib_dma_unmap_sg(ic->i_cm_id->device,
op->op_sg, op->op_nents,
DMA_TO_DEVICE);
- if (op->op_notifier)
- rds_ib_send_complete(container_of(op, struct rds_message, data),
- wc_status, rds_asend_complete);
+ if (rm->data.op_async)
+ rds_ib_send_complete(rm, wc_status, rds_asend_complete);
+ else if (rm->rdma.op_active && rm->rdma.op_remote_complete)
+ rds_ib_send_unmap_rdma(ic, &rm->rdma, wc_status);
}
static void rds_ib_send_unmap_rdma(struct rds_ib_connection *ic,
if (scat == &rm->data.op_sg[rm->data.op_count]) {
prev->s_op = ic->i_data_op;
prev->s_wr.send_flags |= IB_SEND_SOLICITED;
- if (!(prev->s_wr.send_flags & IB_SEND_SIGNALED)) {
+ if (!(prev->s_wr.send_flags & IB_SEND_SIGNALED) ||
+ (rm->rdma.op_active && rm->rdma.op_remote_complete)) {
ic->i_unsignaled_wrs = rds_ib_sysctl_max_unsig_wrs;
prev->s_wr.send_flags |= IB_SEND_SIGNALED;
nr_sig++;
send->s_queued = jiffies;
send->s_op = NULL;
- nr_sig += rds_ib_set_wr_signal_state(ic, send, op->op_notify);
+ if (!op->op_remote_complete)
+ nr_sig += rds_ib_set_wr_signal_state(ic, send, op->op_notify);
send->s_wr.opcode = op->op_write ? IB_WR_RDMA_WRITE : IB_WR_RDMA_READ;
send->s_wr.wr.rdma.remote_addr = remote_addr;
send = ic->i_sends;
}
+ if (i < work_alloc) {
+ rds_ib_ring_unalloc(&ic->i_send_ring, work_alloc - i);
+ work_alloc = i;
+ }
+
/* give a reference to the last op */
if (scat == &op->op_sg[op->op_count]) {
+ if (op->op_write && op->op_silent && op->op_remote_complete) {
+ int rcomp_alloc, rcomp_pos;
+ struct rds_ib_send_work *rcomp;
+
+ rcomp_alloc = rds_ib_ring_alloc(&ic->i_send_ring, 1,
+ &rcomp_pos);
+ if (rcomp_alloc != 1) {
+ ib_dma_unmap_sg(ic->i_cm_id->device,
+ op->op_sg, op->op_nents,
+ op->op_write ? DMA_TO_DEVICE : DMA_FROM_DEVICE);
+ op->op_mapped = 0;
+ rds_ib_ring_unalloc(&ic->i_send_ring, work_alloc);
+ rds_ib_stats_inc(s_ib_tx_ring_full);
+ ret = -ENOMEM;
+ goto out;
+ }
+ rcomp = &ic->i_sends[rcomp_pos];
+ rcomp->s_sge[0] = prev->s_sge[prev->s_wr.num_sge-1];
+ rcomp->s_sge[0].addr +=
+ (rcomp->s_sge[0].length - sizeof(u8));
+ rcomp->s_sge[0].length = sizeof(u8);
+
+ rcomp->s_wr.num_sge = 1;
+ rcomp->s_wr.opcode = IB_WR_RDMA_READ;
+ rcomp->s_wr.next = NULL;
+ rcomp->s_wr.wr.rdma.remote_addr =
+ remote_addr - sizeof(u8);
+ rcomp->s_wr.wr.rdma.rkey = op->op_rkey;
+ prev->s_wr.next = &rcomp->s_wr;
+ prev = rcomp;
+ rcomp->s_wr.send_flags = IB_SEND_SIGNALED;
+ nr_sig++;
+ }
+
prev->s_op = op;
rds_message_addref(container_of(op, struct rds_message, rdma));
}
- if (i < work_alloc) {
- rds_ib_ring_unalloc(&ic->i_send_ring, work_alloc - i);
- work_alloc = i;
- }
-
if (nr_sig)
atomic_add(nr_sig, &ic->i_signaled_sends);
module_param(send_batch_count, int, 0444);
MODULE_PARM_DESC(send_batch_count, " batch factor when working the send queue");
+static unsigned int rds_async_send_enabled = 0;
+module_param(rds_async_send_enabled, int, 0444);
+MODULE_PARM_DESC(rds_async_send_enabled, "Set to enable Async Send");
+
/*
* Reset the send state. Callers must ensure that this doesn't race with
* rds_send_xmit().
set_bit(RDS_MSG_RETRANSMITTED, &rm->m_flags);
/* check for failed op */
- if (!failed_op && (rm->rdma.op_active ||
- (rm->data.op_active && rm->data.op_notifier)))
+ if (rds_async_send_enabled && (rm->rdma.op_active ||
+ (rm->data.op_active && rm->data.op_async)))
failed_op = 1;
}
list_splice_init(&conn->c_retrans, &conn->c_send_queue);
if (failed_op) {
list_for_each_entry_safe(rm, tmp, &conn->c_send_queue,
m_conn_item) {
- if (rm->rdma.op_active && rm->rdma.op_notifier) {
- conn->c_last_failed_op =
- rm->rdma.op_notifier;
- rm->rdma.op_notifier->n_conn = conn;
+ if (rm->rdma.op_active) {
+ if (rm->rdma.op_notifier) {
+ conn->c_last_failed_op =
+ rm->rdma.op_notifier;
+ rm->rdma.op_notifier->n_conn = conn;
+ }
set_bit(RDS_MSG_FLUSH, &rm->m_flags);
}
-
- if (rm->data.op_active && rm->data.op_notifier) {
- conn->c_last_failed_op =
- rm->data.op_notifier;
- rm->data.op_notifier->n_conn = conn;
+ if (rm->data.op_active && rm->data.op_async) {
+ if (rm->data.op_notifier) {
+ conn->c_last_failed_op =
+ rm->data.op_notifier;
+ rm->data.op_notifier->n_conn = conn;
+ }
set_bit(RDS_MSG_FLUSH, &rm->m_flags);
}
}
}
-
spin_unlock_irqrestore(&conn->c_lock, flags);
}
if (!rm)
break;
- if (test_bit(RDS_MSG_FLUSH, &rm->m_flags)) {
+ /* Unfortunately, the way Infiniband deals with
+ * RDMA to a bad MR key is by moving the entire
+ * queue pair to error state. We cold possibly
+ * recover from that, but right now we drop the
+ * connection. Therefore, we never retransmit messages
+ * with RDMA ops.
+ */
+
+ if (rm->rdma.op_active
+ && test_bit(RDS_MSG_RETRANSMITTED, &rm->m_flags)
+ || test_bit(RDS_MSG_FLUSH, &rm->m_flags)) {
spin_lock_irqsave(&conn->c_lock, flags);
if (test_and_clear_bit(RDS_MSG_ON_CONN, &rm->m_flags))
list_move_tail(&rm->m_conn_item, &to_be_dropped);
/* Nuke any messages we decided not to retransmit. */
if (!list_empty(&to_be_dropped)) {
/* irqs on here, so we can put(), unlike above */
- list_for_each_entry(rm, &to_be_dropped, m_conn_item)
+ list_for_each_entry(rm, &to_be_dropped, m_conn_item) {
+ if (rds_async_send_enabled && rm->rdma.op_implicit_mr)
+ rds_rdma_unuse(rm->m_rs, rds_rdma_cookie_key(rm->m_rdma_cookie), 1);
rds_message_put(rm);
+ }
rds_send_remove_from_sock(&to_be_dropped, RDS_RDMA_SEND_DROPPED);
}
spin_lock_irqsave(&rm->m_rs_lock, flags);
so = &rm->data;
- if (test_bit(RDS_MSG_ON_SOCK, &rm->m_flags)
- && so->op_active && so->op_notifier) {
+ if (test_bit(RDS_MSG_ON_SOCK, &rm->m_flags) &&
+ so->op_active && so->op_notifier && so->op_notify) {
notifier = so->op_notifier;
rs = rm->m_rs;
debug_sock_hold(rds_rs_to_sk(rs));
spin_lock_irqsave(&rm->m_rs_lock, flags);
ro = &rm->rdma;
- if (test_bit(RDS_MSG_ON_SOCK, &rm->m_flags)
- && ro->op_active && ro->op_notify && ro->op_notifier) {
+ if (test_bit(RDS_MSG_ON_SOCK, &rm->m_flags) &&
+ ro->op_active && ro->op_notifier && ro->op_notify) {
notifier = ro->op_notifier;
rs = rm->m_rs;
debug_sock_hold(rds_rs_to_sk(rs));
notifier->n_status = status;
- spin_lock(&rs->rs_lock);
- list_add_tail(¬ifier->n_list, &rs->rs_notify_queue);
- spin_unlock(&rs->rs_lock);
- ro->op_notifier = NULL;
+ if (!ro->op_remote_complete) {
+ spin_lock(&rs->rs_lock);
+ list_add_tail(¬ifier->n_list, &rs->rs_notify_queue);
+ spin_unlock(&rs->rs_lock);
+ ro->op_notifier = NULL;
+ }
}
spin_unlock_irqrestore(&rm->m_rs_lock, flags);
spin_lock_irqsave(&rm->m_rs_lock, flags);
ao = &rm->atomic;
- if (test_bit(RDS_MSG_ON_SOCK, &rm->m_flags)
- && ao->op_active && ao->op_notify && ao->op_notifier) {
+ if (test_bit(RDS_MSG_ON_SOCK, &rm->m_flags) &&
+ ao->op_active && ao->op_notify && ao->op_notifier) {
notifier = ao->op_notifier;
rs = rm->m_rs;
debug_sock_hold(rds_rs_to_sk(rs));
* while we're messing with it. It does not prevent the
* message from being removed from the socket, though.
*/
+
spin_lock_irqsave(&rm->m_rs_lock, flags);
if (!test_bit(RDS_MSG_ON_SOCK, &rm->m_flags))
goto unlock_and_drop;
struct rm_rdma_op *ro = &rm->rdma;
struct rds_notifier *notifier;
- if (ro->op_notify
- || (ro->op_recverr && status)) {
+ if (ro->op_notify || status) {
notifier = ro->op_notifier;
list_add_tail(¬ifier->n_list,
&rs->rs_notify_queue);
if (!notifier->n_status)
notifier->n_status = status;
- rm->rdma.op_notifier = NULL;
- }
+ } else
+ kfree(rm->rdma.op_notifier);
+ rm->rdma.op_notifier = NULL;
} else if (rm->data.op_active && rm->data.op_notifier) {
struct rm_data_op *so = &rm->data;
struct rds_notifier *notifier;
- notifier = so->op_notifier;
- list_add_tail(¬ifier->n_list,
+ if (so->op_notify || status) {
+ notifier = so->op_notifier;
+ list_add_tail(¬ifier->n_list,
&rs->rs_notify_queue);
- if (!notifier->n_status)
- notifier->n_status = status;
+ if (!notifier->n_status)
+ notifier->n_status = status;
+ } else
+ kfree(rm->data.op_notifier);
rm->data.op_notifier = NULL;
}
* in after resetting the send state, flush it too.
*/
if (conn->c_last_failed_op) {
- if (rm->rdma.op_active && rm->rdma.op_notifier) {
- conn->c_last_failed_op =
- rm->rdma.op_notifier;
- rm->rdma.op_notifier->n_conn = conn;
+ if (rm->rdma.op_active) {
+ if (rm->rdma.op_notifier) {
+ conn->c_last_failed_op =
+ rm->rdma.op_notifier;
+ rm->rdma.op_notifier->n_conn = conn;
+ }
set_bit(RDS_MSG_FLUSH, &rm->m_flags);
}
-
- if (rm->data.op_active && rm->data.op_notifier) {
- conn->c_last_failed_op =
- rm->data.op_notifier;
- rm->data.op_notifier->n_conn = conn;
+ if (rm->data.op_active && rm->data.op_async) {
+ if (rm->data.op_notifier) {
+ conn->c_last_failed_op =
+ rm->data.op_notifier;
+ rm->data.op_notifier->n_conn = conn;
+ }
set_bit(RDS_MSG_FLUSH, &rm->m_flags);
}
}
case RDS_CMSG_RDMA_DEST:
case RDS_CMSG_RDMA_MAP:
+ case RDS_CMSG_ASYNC_SEND:
cmsg_groups |= 2;
/* these are valid but do no add any size */
break;
size += sizeof(struct scatterlist);
break;
- case RDS_CMSG_ASYNC_SEND:
- cmsg_groups |= 4;
- break;
-
default:
return -EINVAL;
}
size += ceil(data_len, PAGE_SIZE) * sizeof(struct scatterlist);
/* Ensure (DEST, MAP) are never used with (ARGS, ATOMIC) */
- if (cmsg_groups == 3 || cmsg_groups > 4)
+ if (cmsg_groups == 3)
return -EINVAL;
return size;
{
struct rds_asend_args *args;
+ if (!rds_async_send_enabled)
+ return -EINVAL;
+
if (cmsg->cmsg_len < CMSG_LEN(sizeof(struct rds_asend_args)))
return -EINVAL;
if (!rm->data.op_notifier)
return -ENOMEM;
+ rm->data.op_notify = !!(args->flags & RDS_SEND_NOTIFY_ME);
rm->data.op_notifier->n_user_token = args->user_token;
rm->data.op_notifier->n_status = RDS_RDMA_SEND_SUCCESS;
+ rm->data.op_async = 1;
return 0;
}
rs->rs_conn = conn;
}
+ /*
+ if (allocated_mr && conn->c_cleanup_stale_mrs) {
+ rds_rdma_cleanup_stale_mrs(rs, conn);
+ conn->c_cleanup_stale_mrs = 0;
+ }
+ */
+
/* Not accepting new sends until all the failed ops have been reaped */
if (conn->c_last_failed_op) {
ret = -EAGAIN;