From: Bang Nguyen Date: Sat, 14 Apr 2012 00:16:31 +0000 (-0700) Subject: RDS Async send support revised X-Git-Tag: v4.1.12-92~319^2^2~2^2~24 X-Git-Url: https://www.infradead.org/git/?a=commitdiff_plain;h=3db96ade1fc84313a45221a82f45fc76285efed7;p=users%2Fjedix%2Flinux-maple.git RDS Async send support revised Signed-off-by: Bang Nguyen --- diff --git a/include/linux/rds.h b/include/linux/rds.h index c3322dd7733d..ebc35f1631ae 100644 --- a/include/linux/rds.h +++ b/include/linux/rds.h @@ -119,6 +119,7 @@ struct rds_info_counter { #define RDS_INFO_CONNECTION_FLAG_SENDING 0x01 #define RDS_INFO_CONNECTION_FLAG_CONNECTING 0x02 #define RDS_INFO_CONNECTION_FLAG_CONNECTED 0x04 +#define RDS_INFO_CONNECTION_FLAG_ERROR 0x08 #define TRANSNAMSIZ 16 @@ -283,6 +284,7 @@ struct rds_reset { struct rds_asend_args { u_int64_t user_token; + u_int64_t flags; }; struct rds_rdma_send_notify { @@ -306,5 +308,8 @@ struct rds_rdma_send_notify { #define RDS_RDMA_DONTWAIT 0x0010 /* Don't wait in SET_BARRIER */ #define RDS_RDMA_NOTIFY_ME 0x0020 /* Notify when operation completes */ #define RDS_RDMA_SILENT 0x0040 /* Do not interrupt remote */ +#define RDS_RDMA_REMOTE_COMPLETE 0x0080 /* Notify when data is available */ +#define RDS_SEND_NOTIFY_ME 0x0100 /* Notify when operation completes */ + #endif /* IB_RDS_H */ diff --git a/net/rds/connection.c b/net/rds/connection.c index dc535e605ac8..ae30085b6246 100644 --- a/net/rds/connection.c +++ b/net/rds/connection.c @@ -525,6 +525,8 @@ static int rds_conn_info_visitor(struct rds_connection *conn, rds_conn_info_set(cinfo->flags, atomic_read(&conn->c_state) == RDS_CONN_UP, CONNECTED); + rds_conn_info_set(cinfo->flags, conn->c_last_failed_op != NULL, + ERROR); return 1; } diff --git a/net/rds/ib_send.c b/net/rds/ib_send.c index faee86e52e59..a1a568809106 100644 --- a/net/rds/ib_send.c +++ b/net/rds/ib_send.c @@ -67,18 +67,27 @@ static void rds_ib_send_complete(struct rds_message *rm, complete(rm, notify_status); } +static void rds_ib_send_unmap_rdma(struct rds_ib_connection *ic, + struct rm_rdma_op *op, + int wc_status); + static void rds_ib_send_unmap_data(struct rds_ib_connection *ic, struct rm_data_op *op, int wc_status) { + struct rds_message *rm; + + rm = container_of(op, struct rds_message, data); + if (op->op_nents) ib_dma_unmap_sg(ic->i_cm_id->device, op->op_sg, op->op_nents, DMA_TO_DEVICE); - if (op->op_notifier) - rds_ib_send_complete(container_of(op, struct rds_message, data), - wc_status, rds_asend_complete); + if (rm->data.op_async) + rds_ib_send_complete(rm, wc_status, rds_asend_complete); + else if (rm->rdma.op_active && rm->rdma.op_remote_complete) + rds_ib_send_unmap_rdma(ic, &rm->rdma, wc_status); } static void rds_ib_send_unmap_rdma(struct rds_ib_connection *ic, @@ -699,7 +708,8 @@ int rds_ib_xmit(struct rds_connection *conn, struct rds_message *rm, if (scat == &rm->data.op_sg[rm->data.op_count]) { prev->s_op = ic->i_data_op; prev->s_wr.send_flags |= IB_SEND_SOLICITED; - if (!(prev->s_wr.send_flags & IB_SEND_SIGNALED)) { + if (!(prev->s_wr.send_flags & IB_SEND_SIGNALED) || + (rm->rdma.op_active && rm->rdma.op_remote_complete)) { ic->i_unsignaled_wrs = rds_ib_sysctl_max_unsig_wrs; prev->s_wr.send_flags |= IB_SEND_SIGNALED; nr_sig++; @@ -895,7 +905,8 @@ int rds_ib_xmit_rdma(struct rds_connection *conn, struct rm_rdma_op *op) send->s_queued = jiffies; send->s_op = NULL; - nr_sig += rds_ib_set_wr_signal_state(ic, send, op->op_notify); + if (!op->op_remote_complete) + nr_sig += rds_ib_set_wr_signal_state(ic, send, op->op_notify); send->s_wr.opcode = op->op_write ? IB_WR_RDMA_WRITE : IB_WR_RDMA_READ; send->s_wr.wr.rdma.remote_addr = remote_addr; @@ -935,17 +946,51 @@ int rds_ib_xmit_rdma(struct rds_connection *conn, struct rm_rdma_op *op) send = ic->i_sends; } + if (i < work_alloc) { + rds_ib_ring_unalloc(&ic->i_send_ring, work_alloc - i); + work_alloc = i; + } + /* give a reference to the last op */ if (scat == &op->op_sg[op->op_count]) { + if (op->op_write && op->op_silent && op->op_remote_complete) { + int rcomp_alloc, rcomp_pos; + struct rds_ib_send_work *rcomp; + + rcomp_alloc = rds_ib_ring_alloc(&ic->i_send_ring, 1, + &rcomp_pos); + if (rcomp_alloc != 1) { + ib_dma_unmap_sg(ic->i_cm_id->device, + op->op_sg, op->op_nents, + op->op_write ? DMA_TO_DEVICE : DMA_FROM_DEVICE); + op->op_mapped = 0; + rds_ib_ring_unalloc(&ic->i_send_ring, work_alloc); + rds_ib_stats_inc(s_ib_tx_ring_full); + ret = -ENOMEM; + goto out; + } + rcomp = &ic->i_sends[rcomp_pos]; + rcomp->s_sge[0] = prev->s_sge[prev->s_wr.num_sge-1]; + rcomp->s_sge[0].addr += + (rcomp->s_sge[0].length - sizeof(u8)); + rcomp->s_sge[0].length = sizeof(u8); + + rcomp->s_wr.num_sge = 1; + rcomp->s_wr.opcode = IB_WR_RDMA_READ; + rcomp->s_wr.next = NULL; + rcomp->s_wr.wr.rdma.remote_addr = + remote_addr - sizeof(u8); + rcomp->s_wr.wr.rdma.rkey = op->op_rkey; + prev->s_wr.next = &rcomp->s_wr; + prev = rcomp; + rcomp->s_wr.send_flags = IB_SEND_SIGNALED; + nr_sig++; + } + prev->s_op = op; rds_message_addref(container_of(op, struct rds_message, rdma)); } - if (i < work_alloc) { - rds_ib_ring_unalloc(&ic->i_send_ring, work_alloc - i); - work_alloc = i; - } - if (nr_sig) atomic_add(nr_sig, &ic->i_signaled_sends); diff --git a/net/rds/rdma.c b/net/rds/rdma.c index b81cf8576e1b..466fbeff69f7 100644 --- a/net/rds/rdma.c +++ b/net/rds/rdma.c @@ -433,9 +433,10 @@ void rds_rdma_unuse(struct rds_sock *rs, u32 r_key, int force) /* If the MR was marked as invalidate, this will * trigger an async flush. */ - if (zot_me) + if (zot_me) { rds_destroy_mr(mr); - rds_mr_put(mr); + rds_mr_put(mr); + } } void rds_rdma_free_op(struct rm_rdma_op *ro) @@ -558,6 +559,7 @@ int rds_cmsg_rdma_args(struct rds_sock *rs, struct rds_message *rm, op->op_fence = !!(args->flags & RDS_RDMA_FENCE); op->op_notify = !!(args->flags & RDS_RDMA_NOTIFY_ME); op->op_silent = !!(args->flags & RDS_RDMA_SILENT); + op->op_remote_complete = !!(args->flags & RDS_RDMA_REMOTE_COMPLETE); op->op_active = 1; op->op_recverr = rs->rs_recverr; WARN_ON(!nr_pages); @@ -691,9 +693,10 @@ int rds_cmsg_rdma_dest(struct rds_sock *rs, struct rds_message *rm, spin_lock_irqsave(&rs->rs_rdma_lock, flags); mr = rds_mr_tree_walk(&rs->rs_rdma_keys, r_key, NULL); - if (!mr) + if (!mr) { + printk(KERN_ERR "rds_cmsg_rdma_dest: key %Lx\n", r_key); err = -EINVAL; /* invalid r_key */ - else + } else atomic_inc(&mr->r_refcount); spin_unlock_irqrestore(&rs->rs_rdma_lock, flags); @@ -713,11 +716,15 @@ int rds_cmsg_rdma_dest(struct rds_sock *rs, struct rds_message *rm, int rds_cmsg_rdma_map(struct rds_sock *rs, struct rds_message *rm, struct cmsghdr *cmsg) { + int ret; if (cmsg->cmsg_len < CMSG_LEN(sizeof(struct rds_get_mr_args)) || rm->m_rdma_cookie != 0) return -EINVAL; - return __rds_rdma_map(rs, CMSG_DATA(cmsg), &rm->m_rdma_cookie, &rm->rdma.op_rdma_mr); + ret = __rds_rdma_map(rs, CMSG_DATA(cmsg), &rm->m_rdma_cookie, &rm->rdma.op_rdma_mr); + if (!ret) + rm->rdma.op_implicit_mr = 1; + return ret; } /* diff --git a/net/rds/rds.h b/net/rds/rds.h index e8b270d0b544..9ded5fde2335 100644 --- a/net/rds/rds.h +++ b/net/rds/rds.h @@ -357,16 +357,19 @@ struct rds_message { unsigned int op_mapped:1; unsigned int op_silent:1; unsigned int op_active:1; + unsigned int op_implicit_mr:1; + unsigned int op_remote_complete:1; unsigned int op_bytes; unsigned int op_nents; unsigned int op_count; struct scatterlist *op_sg; struct rds_notifier *op_notifier; - struct rds_mr *op_rdma_mr; } rdma; struct rm_data_op { unsigned int op_active:1; + unsigned int op_notify:1; + unsigned int op_async:1; struct rds_notifier *op_notifier; unsigned int op_nents; unsigned int op_count; diff --git a/net/rds/send.c b/net/rds/send.c index b9e18cbfd5d5..6245b17bc1d1 100644 --- a/net/rds/send.c +++ b/net/rds/send.c @@ -51,6 +51,10 @@ static int send_batch_count = 64; module_param(send_batch_count, int, 0444); MODULE_PARM_DESC(send_batch_count, " batch factor when working the send queue"); +static unsigned int rds_async_send_enabled = 0; +module_param(rds_async_send_enabled, int, 0444); +MODULE_PARM_DESC(rds_async_send_enabled, "Set to enable Async Send"); + /* * Reset the send state. Callers must ensure that this doesn't race with * rds_send_xmit(). @@ -91,8 +95,8 @@ void rds_send_reset(struct rds_connection *conn) set_bit(RDS_MSG_RETRANSMITTED, &rm->m_flags); /* check for failed op */ - if (!failed_op && (rm->rdma.op_active || - (rm->data.op_active && rm->data.op_notifier))) + if (rds_async_send_enabled && (rm->rdma.op_active || + (rm->data.op_active && rm->data.op_async))) failed_op = 1; } list_splice_init(&conn->c_retrans, &conn->c_send_queue); @@ -101,22 +105,24 @@ void rds_send_reset(struct rds_connection *conn) if (failed_op) { list_for_each_entry_safe(rm, tmp, &conn->c_send_queue, m_conn_item) { - if (rm->rdma.op_active && rm->rdma.op_notifier) { - conn->c_last_failed_op = - rm->rdma.op_notifier; - rm->rdma.op_notifier->n_conn = conn; + if (rm->rdma.op_active) { + if (rm->rdma.op_notifier) { + conn->c_last_failed_op = + rm->rdma.op_notifier; + rm->rdma.op_notifier->n_conn = conn; + } set_bit(RDS_MSG_FLUSH, &rm->m_flags); } - - if (rm->data.op_active && rm->data.op_notifier) { - conn->c_last_failed_op = - rm->data.op_notifier; - rm->data.op_notifier->n_conn = conn; + if (rm->data.op_active && rm->data.op_async) { + if (rm->data.op_notifier) { + conn->c_last_failed_op = + rm->data.op_notifier; + rm->data.op_notifier->n_conn = conn; + } set_bit(RDS_MSG_FLUSH, &rm->m_flags); } } } - spin_unlock_irqrestore(&conn->c_lock, flags); } @@ -280,7 +286,17 @@ restart: if (!rm) break; - if (test_bit(RDS_MSG_FLUSH, &rm->m_flags)) { + /* Unfortunately, the way Infiniband deals with + * RDMA to a bad MR key is by moving the entire + * queue pair to error state. We cold possibly + * recover from that, but right now we drop the + * connection. Therefore, we never retransmit messages + * with RDMA ops. + */ + + if (rm->rdma.op_active + && test_bit(RDS_MSG_RETRANSMITTED, &rm->m_flags) + || test_bit(RDS_MSG_FLUSH, &rm->m_flags)) { spin_lock_irqsave(&conn->c_lock, flags); if (test_and_clear_bit(RDS_MSG_ON_CONN, &rm->m_flags)) list_move_tail(&rm->m_conn_item, &to_be_dropped); @@ -421,8 +437,11 @@ over_batch: /* Nuke any messages we decided not to retransmit. */ if (!list_empty(&to_be_dropped)) { /* irqs on here, so we can put(), unlike above */ - list_for_each_entry(rm, &to_be_dropped, m_conn_item) + list_for_each_entry(rm, &to_be_dropped, m_conn_item) { + if (rds_async_send_enabled && rm->rdma.op_implicit_mr) + rds_rdma_unuse(rm->m_rs, rds_rdma_cookie_key(rm->m_rdma_cookie), 1); rds_message_put(rm); + } rds_send_remove_from_sock(&to_be_dropped, RDS_RDMA_SEND_DROPPED); } @@ -488,8 +507,8 @@ void rds_asend_complete(struct rds_message *rm, int status) spin_lock_irqsave(&rm->m_rs_lock, flags); so = &rm->data; - if (test_bit(RDS_MSG_ON_SOCK, &rm->m_flags) - && so->op_active && so->op_notifier) { + if (test_bit(RDS_MSG_ON_SOCK, &rm->m_flags) && + so->op_active && so->op_notifier && so->op_notify) { notifier = so->op_notifier; rs = rm->m_rs; debug_sock_hold(rds_rs_to_sk(rs)); @@ -527,18 +546,20 @@ void rds_rdma_send_complete(struct rds_message *rm, int status) spin_lock_irqsave(&rm->m_rs_lock, flags); ro = &rm->rdma; - if (test_bit(RDS_MSG_ON_SOCK, &rm->m_flags) - && ro->op_active && ro->op_notify && ro->op_notifier) { + if (test_bit(RDS_MSG_ON_SOCK, &rm->m_flags) && + ro->op_active && ro->op_notifier && ro->op_notify) { notifier = ro->op_notifier; rs = rm->m_rs; debug_sock_hold(rds_rs_to_sk(rs)); notifier->n_status = status; - spin_lock(&rs->rs_lock); - list_add_tail(¬ifier->n_list, &rs->rs_notify_queue); - spin_unlock(&rs->rs_lock); - ro->op_notifier = NULL; + if (!ro->op_remote_complete) { + spin_lock(&rs->rs_lock); + list_add_tail(¬ifier->n_list, &rs->rs_notify_queue); + spin_unlock(&rs->rs_lock); + ro->op_notifier = NULL; + } } spin_unlock_irqrestore(&rm->m_rs_lock, flags); @@ -563,8 +584,8 @@ void rds_atomic_send_complete(struct rds_message *rm, int status) spin_lock_irqsave(&rm->m_rs_lock, flags); ao = &rm->atomic; - if (test_bit(RDS_MSG_ON_SOCK, &rm->m_flags) - && ao->op_active && ao->op_notify && ao->op_notifier) { + if (test_bit(RDS_MSG_ON_SOCK, &rm->m_flags) && + ao->op_active && ao->op_notify && ao->op_notifier) { notifier = ao->op_notifier; rs = rm->m_rs; debug_sock_hold(rds_rs_to_sk(rs)); @@ -689,6 +710,7 @@ void rds_send_remove_from_sock(struct list_head *messages, int status) * while we're messing with it. It does not prevent the * message from being removed from the socket, though. */ + spin_lock_irqsave(&rm->m_rs_lock, flags); if (!test_bit(RDS_MSG_ON_SOCK, &rm->m_flags)) goto unlock_and_drop; @@ -711,24 +733,27 @@ void rds_send_remove_from_sock(struct list_head *messages, int status) struct rm_rdma_op *ro = &rm->rdma; struct rds_notifier *notifier; - if (ro->op_notify - || (ro->op_recverr && status)) { + if (ro->op_notify || status) { notifier = ro->op_notifier; list_add_tail(¬ifier->n_list, &rs->rs_notify_queue); if (!notifier->n_status) notifier->n_status = status; - rm->rdma.op_notifier = NULL; - } + } else + kfree(rm->rdma.op_notifier); + rm->rdma.op_notifier = NULL; } else if (rm->data.op_active && rm->data.op_notifier) { struct rm_data_op *so = &rm->data; struct rds_notifier *notifier; - notifier = so->op_notifier; - list_add_tail(¬ifier->n_list, + if (so->op_notify || status) { + notifier = so->op_notifier; + list_add_tail(¬ifier->n_list, &rs->rs_notify_queue); - if (!notifier->n_status) - notifier->n_status = status; + if (!notifier->n_status) + notifier->n_status = status; + } else + kfree(rm->data.op_notifier); rm->data.op_notifier = NULL; } @@ -937,17 +962,20 @@ static int rds_send_queue_rm(struct rds_sock *rs, struct rds_connection *conn, * in after resetting the send state, flush it too. */ if (conn->c_last_failed_op) { - if (rm->rdma.op_active && rm->rdma.op_notifier) { - conn->c_last_failed_op = - rm->rdma.op_notifier; - rm->rdma.op_notifier->n_conn = conn; + if (rm->rdma.op_active) { + if (rm->rdma.op_notifier) { + conn->c_last_failed_op = + rm->rdma.op_notifier; + rm->rdma.op_notifier->n_conn = conn; + } set_bit(RDS_MSG_FLUSH, &rm->m_flags); } - - if (rm->data.op_active && rm->data.op_notifier) { - conn->c_last_failed_op = - rm->data.op_notifier; - rm->data.op_notifier->n_conn = conn; + if (rm->data.op_active && rm->data.op_async) { + if (rm->data.op_notifier) { + conn->c_last_failed_op = + rm->data.op_notifier; + rm->data.op_notifier->n_conn = conn; + } set_bit(RDS_MSG_FLUSH, &rm->m_flags); } } @@ -996,6 +1024,7 @@ static int rds_rm_size(struct msghdr *msg, int data_len) case RDS_CMSG_RDMA_DEST: case RDS_CMSG_RDMA_MAP: + case RDS_CMSG_ASYNC_SEND: cmsg_groups |= 2; /* these are valid but do no add any size */ break; @@ -1006,10 +1035,6 @@ static int rds_rm_size(struct msghdr *msg, int data_len) size += sizeof(struct scatterlist); break; - case RDS_CMSG_ASYNC_SEND: - cmsg_groups |= 4; - break; - default: return -EINVAL; } @@ -1019,7 +1044,7 @@ static int rds_rm_size(struct msghdr *msg, int data_len) size += ceil(data_len, PAGE_SIZE) * sizeof(struct scatterlist); /* Ensure (DEST, MAP) are never used with (ARGS, ATOMIC) */ - if (cmsg_groups == 3 || cmsg_groups > 4) + if (cmsg_groups == 3) return -EINVAL; return size; @@ -1030,6 +1055,9 @@ static int rds_cmsg_asend(struct rds_sock *rs, struct rds_message *rm, { struct rds_asend_args *args; + if (!rds_async_send_enabled) + return -EINVAL; + if (cmsg->cmsg_len < CMSG_LEN(sizeof(struct rds_asend_args))) return -EINVAL; @@ -1038,8 +1066,10 @@ static int rds_cmsg_asend(struct rds_sock *rs, struct rds_message *rm, if (!rm->data.op_notifier) return -ENOMEM; + rm->data.op_notify = !!(args->flags & RDS_SEND_NOTIFY_ME); rm->data.op_notifier->n_user_token = args->user_token; rm->data.op_notifier->n_status = RDS_RDMA_SEND_SUCCESS; + rm->data.op_async = 1; return 0; } @@ -1182,6 +1212,13 @@ int rds_sendmsg(struct kiocb *iocb, struct socket *sock, struct msghdr *msg, rs->rs_conn = conn; } + /* + if (allocated_mr && conn->c_cleanup_stale_mrs) { + rds_rdma_cleanup_stale_mrs(rs, conn); + conn->c_cleanup_stale_mrs = 0; + } + */ + /* Not accepting new sends until all the failed ops have been reaped */ if (conn->c_last_failed_op) { ret = -EAGAIN;