]> www.infradead.org Git - users/jedix/linux-maple.git/commitdiff
RDS Async send support revised
authorBang Nguyen <bang.nguyen@oracle.com>
Sat, 14 Apr 2012 00:16:31 +0000 (17:16 -0700)
committerMukesh Kacker <mukesh.kacker@oracle.com>
Tue, 7 Jul 2015 23:41:35 +0000 (16:41 -0700)
Signed-off-by: Bang Nguyen <bang.nguyen@oracle.com>
include/linux/rds.h
net/rds/connection.c
net/rds/ib_send.c
net/rds/rdma.c
net/rds/rds.h
net/rds/send.c

index c3322dd7733dee3054064fb5cd5936bbdbff841b..ebc35f1631ae549f730b6fb742b533fe68e4f1a5 100644 (file)
@@ -119,6 +119,7 @@ struct rds_info_counter {
 #define RDS_INFO_CONNECTION_FLAG_SENDING       0x01
 #define RDS_INFO_CONNECTION_FLAG_CONNECTING    0x02
 #define RDS_INFO_CONNECTION_FLAG_CONNECTED     0x04
+#define RDS_INFO_CONNECTION_FLAG_ERROR          0x08
 
 #define TRANSNAMSIZ    16
 
@@ -283,6 +284,7 @@ struct rds_reset {
 
 struct rds_asend_args {
        u_int64_t       user_token;
+       u_int64_t       flags;
 };
 
 struct rds_rdma_send_notify {
@@ -306,5 +308,8 @@ struct rds_rdma_send_notify {
 #define RDS_RDMA_DONTWAIT      0x0010  /* Don't wait in SET_BARRIER */
 #define RDS_RDMA_NOTIFY_ME     0x0020  /* Notify when operation completes */
 #define RDS_RDMA_SILENT                0x0040  /* Do not interrupt remote */
+#define RDS_RDMA_REMOTE_COMPLETE 0x0080 /* Notify when data is available */
+#define RDS_SEND_NOTIFY_ME      0x0100  /* Notify when operation completes */
+
 
 #endif /* IB_RDS_H */
index dc535e605ac85ab22147402f7f1c20faebc35288..ae30085b6246fef5b5e30d09c8b551bcd8027c20 100644 (file)
@@ -525,6 +525,8 @@ static int rds_conn_info_visitor(struct rds_connection *conn,
        rds_conn_info_set(cinfo->flags,
                          atomic_read(&conn->c_state) == RDS_CONN_UP,
                          CONNECTED);
+       rds_conn_info_set(cinfo->flags, conn->c_last_failed_op != NULL,
+                         ERROR);
        return 1;
 }
 
index faee86e52e59831ae63a36d196ca844cf32026af..a1a5688091066bafc6a42e6a0eae238f7f27db81 100644 (file)
@@ -67,18 +67,27 @@ static void rds_ib_send_complete(struct rds_message *rm,
        complete(rm, notify_status);
 }
 
+static void rds_ib_send_unmap_rdma(struct rds_ib_connection *ic,
+               struct rm_rdma_op *op,
+               int wc_status);
+
 static void rds_ib_send_unmap_data(struct rds_ib_connection *ic,
                                   struct rm_data_op *op,
                                   int wc_status)
 {
+       struct rds_message *rm;
+
+       rm = container_of(op, struct rds_message, data);
+
        if (op->op_nents)
                ib_dma_unmap_sg(ic->i_cm_id->device,
                                op->op_sg, op->op_nents,
                                DMA_TO_DEVICE);
 
-       if (op->op_notifier)
-               rds_ib_send_complete(container_of(op, struct rds_message, data),
-                       wc_status, rds_asend_complete);
+       if (rm->data.op_async)
+               rds_ib_send_complete(rm, wc_status, rds_asend_complete);
+       else if (rm->rdma.op_active && rm->rdma.op_remote_complete)
+               rds_ib_send_unmap_rdma(ic, &rm->rdma, wc_status);
 }
 
 static void rds_ib_send_unmap_rdma(struct rds_ib_connection *ic,
@@ -699,7 +708,8 @@ int rds_ib_xmit(struct rds_connection *conn, struct rds_message *rm,
        if (scat == &rm->data.op_sg[rm->data.op_count]) {
                prev->s_op = ic->i_data_op;
                prev->s_wr.send_flags |= IB_SEND_SOLICITED;
-               if (!(prev->s_wr.send_flags & IB_SEND_SIGNALED)) {
+               if (!(prev->s_wr.send_flags & IB_SEND_SIGNALED) ||
+                    (rm->rdma.op_active && rm->rdma.op_remote_complete)) {
                        ic->i_unsignaled_wrs = rds_ib_sysctl_max_unsig_wrs;
                        prev->s_wr.send_flags |= IB_SEND_SIGNALED;
                        nr_sig++;
@@ -895,7 +905,8 @@ int rds_ib_xmit_rdma(struct rds_connection *conn, struct rm_rdma_op *op)
                send->s_queued = jiffies;
                send->s_op = NULL;
 
-               nr_sig += rds_ib_set_wr_signal_state(ic, send, op->op_notify);
+               if (!op->op_remote_complete)
+                       nr_sig += rds_ib_set_wr_signal_state(ic, send, op->op_notify);
 
                send->s_wr.opcode = op->op_write ? IB_WR_RDMA_WRITE : IB_WR_RDMA_READ;
                send->s_wr.wr.rdma.remote_addr = remote_addr;
@@ -935,17 +946,51 @@ int rds_ib_xmit_rdma(struct rds_connection *conn, struct rm_rdma_op *op)
                        send = ic->i_sends;
        }
 
+       if (i < work_alloc) {
+               rds_ib_ring_unalloc(&ic->i_send_ring, work_alloc - i);
+               work_alloc = i;
+       }
+
        /* give a reference to the last op */
        if (scat == &op->op_sg[op->op_count]) {
+               if (op->op_write && op->op_silent && op->op_remote_complete) {
+                       int rcomp_alloc, rcomp_pos;
+                       struct rds_ib_send_work *rcomp;
+
+                       rcomp_alloc = rds_ib_ring_alloc(&ic->i_send_ring, 1,
+                                       &rcomp_pos);
+                       if (rcomp_alloc != 1) {
+                               ib_dma_unmap_sg(ic->i_cm_id->device,
+                                               op->op_sg, op->op_nents,
+                                               op->op_write ? DMA_TO_DEVICE : DMA_FROM_DEVICE);
+                               op->op_mapped = 0;
+                               rds_ib_ring_unalloc(&ic->i_send_ring, work_alloc);
+                               rds_ib_stats_inc(s_ib_tx_ring_full);
+                               ret = -ENOMEM;
+                               goto out;
+                       }
+                       rcomp = &ic->i_sends[rcomp_pos];
+                       rcomp->s_sge[0] = prev->s_sge[prev->s_wr.num_sge-1];
+                       rcomp->s_sge[0].addr +=
+                               (rcomp->s_sge[0].length - sizeof(u8));
+                       rcomp->s_sge[0].length = sizeof(u8);
+
+                       rcomp->s_wr.num_sge = 1;
+                       rcomp->s_wr.opcode = IB_WR_RDMA_READ;
+                       rcomp->s_wr.next = NULL;
+                       rcomp->s_wr.wr.rdma.remote_addr =
+                               remote_addr - sizeof(u8);
+                       rcomp->s_wr.wr.rdma.rkey = op->op_rkey;
+                       prev->s_wr.next = &rcomp->s_wr;
+                       prev = rcomp;
+                       rcomp->s_wr.send_flags = IB_SEND_SIGNALED;
+                       nr_sig++;
+               }
+
                prev->s_op = op;
                rds_message_addref(container_of(op, struct rds_message, rdma));
        }
 
-       if (i < work_alloc) {
-               rds_ib_ring_unalloc(&ic->i_send_ring, work_alloc - i);
-               work_alloc = i;
-       }
-
        if (nr_sig)
                atomic_add(nr_sig, &ic->i_signaled_sends);
 
index b81cf8576e1b93f1879937a238580a311d03929d..466fbeff69f77de7ffe87d7a2d48d4bd69daff42 100644 (file)
@@ -433,9 +433,10 @@ void rds_rdma_unuse(struct rds_sock *rs, u32 r_key, int force)
 
        /* If the MR was marked as invalidate, this will
         * trigger an async flush. */
-       if (zot_me)
+       if (zot_me) {
                rds_destroy_mr(mr);
-       rds_mr_put(mr);
+               rds_mr_put(mr);
+       }
 }
 
 void rds_rdma_free_op(struct rm_rdma_op *ro)
@@ -558,6 +559,7 @@ int rds_cmsg_rdma_args(struct rds_sock *rs, struct rds_message *rm,
        op->op_fence = !!(args->flags & RDS_RDMA_FENCE);
        op->op_notify = !!(args->flags & RDS_RDMA_NOTIFY_ME);
        op->op_silent = !!(args->flags & RDS_RDMA_SILENT);
+       op->op_remote_complete = !!(args->flags & RDS_RDMA_REMOTE_COMPLETE);
        op->op_active = 1;
        op->op_recverr = rs->rs_recverr;
        WARN_ON(!nr_pages);
@@ -691,9 +693,10 @@ int rds_cmsg_rdma_dest(struct rds_sock *rs, struct rds_message *rm,
 
        spin_lock_irqsave(&rs->rs_rdma_lock, flags);
        mr = rds_mr_tree_walk(&rs->rs_rdma_keys, r_key, NULL);
-       if (!mr)
+       if (!mr) {
+               printk(KERN_ERR "rds_cmsg_rdma_dest: key %Lx\n", r_key);
                err = -EINVAL;  /* invalid r_key */
-       else
+       else
                atomic_inc(&mr->r_refcount);
        spin_unlock_irqrestore(&rs->rs_rdma_lock, flags);
 
@@ -713,11 +716,15 @@ int rds_cmsg_rdma_dest(struct rds_sock *rs, struct rds_message *rm,
 int rds_cmsg_rdma_map(struct rds_sock *rs, struct rds_message *rm,
                          struct cmsghdr *cmsg)
 {
+       int ret;
        if (cmsg->cmsg_len < CMSG_LEN(sizeof(struct rds_get_mr_args))
         || rm->m_rdma_cookie != 0)
                return -EINVAL;
 
-       return __rds_rdma_map(rs, CMSG_DATA(cmsg), &rm->m_rdma_cookie, &rm->rdma.op_rdma_mr);
+       ret = __rds_rdma_map(rs, CMSG_DATA(cmsg), &rm->m_rdma_cookie, &rm->rdma.op_rdma_mr);
+       if (!ret)
+               rm->rdma.op_implicit_mr = 1;
+       return ret;
 }
 
 /*
index e8b270d0b544986de0d76fc4455a0416fce71fc1..9ded5fde23357f1b905ea2e68656f4724f9a7e11 100644 (file)
@@ -357,16 +357,19 @@ struct rds_message {
                        unsigned int            op_mapped:1;
                        unsigned int            op_silent:1;
                        unsigned int            op_active:1;
+                       unsigned int            op_implicit_mr:1;
+                       unsigned int            op_remote_complete:1;
                        unsigned int            op_bytes;
                        unsigned int            op_nents;
                        unsigned int            op_count;
                        struct scatterlist      *op_sg;
                        struct rds_notifier     *op_notifier;
-
                        struct rds_mr           *op_rdma_mr;
                } rdma;
                struct rm_data_op {
                        unsigned int            op_active:1;
+                       unsigned int            op_notify:1;
+                       unsigned int            op_async:1;
                        struct rds_notifier     *op_notifier;
                        unsigned int            op_nents;
                        unsigned int            op_count;
index b9e18cbfd5d5b532f22a9acb0513d16568659c8e..6245b17bc1d1669fffd9c8bb0179d5349081da99 100644 (file)
@@ -51,6 +51,10 @@ static int send_batch_count = 64;
 module_param(send_batch_count, int, 0444);
 MODULE_PARM_DESC(send_batch_count, " batch factor when working the send queue");
 
+static unsigned int rds_async_send_enabled = 0;
+module_param(rds_async_send_enabled, int, 0444);
+MODULE_PARM_DESC(rds_async_send_enabled, "Set to enable Async Send");
+
 /*
  * Reset the send state.  Callers must ensure that this doesn't race with
  * rds_send_xmit().
@@ -91,8 +95,8 @@ void rds_send_reset(struct rds_connection *conn)
                set_bit(RDS_MSG_RETRANSMITTED, &rm->m_flags);
 
                /* check for failed op */
-               if (!failed_op && (rm->rdma.op_active ||
-                       (rm->data.op_active && rm->data.op_notifier)))
+               if (rds_async_send_enabled && (rm->rdma.op_active ||
+                       (rm->data.op_active && rm->data.op_async)))
                                failed_op = 1;
        }
        list_splice_init(&conn->c_retrans, &conn->c_send_queue);
@@ -101,22 +105,24 @@ void rds_send_reset(struct rds_connection *conn)
        if (failed_op) {
                list_for_each_entry_safe(rm, tmp, &conn->c_send_queue,
                                m_conn_item) {
-                       if (rm->rdma.op_active && rm->rdma.op_notifier) {
-                               conn->c_last_failed_op =
-                                       rm->rdma.op_notifier;
-                               rm->rdma.op_notifier->n_conn = conn;
+                       if (rm->rdma.op_active) {
+                               if (rm->rdma.op_notifier) {
+                                       conn->c_last_failed_op =
+                                               rm->rdma.op_notifier;
+                                       rm->rdma.op_notifier->n_conn = conn;
+                               }
                                set_bit(RDS_MSG_FLUSH, &rm->m_flags);
                        }
-
-                       if (rm->data.op_active && rm->data.op_notifier) {
-                               conn->c_last_failed_op =
-                                       rm->data.op_notifier;
-                               rm->data.op_notifier->n_conn = conn;
+                       if (rm->data.op_active && rm->data.op_async) {
+                               if (rm->data.op_notifier) {
+                                       conn->c_last_failed_op =
+                                               rm->data.op_notifier;
+                                       rm->data.op_notifier->n_conn = conn;
+                               }
                                set_bit(RDS_MSG_FLUSH, &rm->m_flags);
                        }
                }
        }
-
        spin_unlock_irqrestore(&conn->c_lock, flags);
 }
 
@@ -280,7 +286,17 @@ restart:
                        if (!rm)
                                break;
 
-                       if (test_bit(RDS_MSG_FLUSH, &rm->m_flags)) {
+                       /* Unfortunately, the way Infiniband deals with
+                        * RDMA to a bad MR key is by moving the entire
+                        * queue pair to error state. We cold possibly
+                        * recover from that, but right now we drop the
+                        * connection. Therefore, we never retransmit messages
+                        * with RDMA ops.
+                        */
+
+                       if (rm->rdma.op_active
+                        && test_bit(RDS_MSG_RETRANSMITTED, &rm->m_flags)
+                        || test_bit(RDS_MSG_FLUSH, &rm->m_flags)) {
                                spin_lock_irqsave(&conn->c_lock, flags);
                                if (test_and_clear_bit(RDS_MSG_ON_CONN, &rm->m_flags))
                                        list_move_tail(&rm->m_conn_item, &to_be_dropped);
@@ -421,8 +437,11 @@ over_batch:
        /* Nuke any messages we decided not to retransmit. */
        if (!list_empty(&to_be_dropped)) {
                /* irqs on here, so we can put(), unlike above */
-               list_for_each_entry(rm, &to_be_dropped, m_conn_item)
+               list_for_each_entry(rm, &to_be_dropped, m_conn_item) {
+                       if (rds_async_send_enabled && rm->rdma.op_implicit_mr)
+                               rds_rdma_unuse(rm->m_rs, rds_rdma_cookie_key(rm->m_rdma_cookie), 1);
                        rds_message_put(rm);
+               }
                rds_send_remove_from_sock(&to_be_dropped, RDS_RDMA_SEND_DROPPED);
        }
 
@@ -488,8 +507,8 @@ void rds_asend_complete(struct rds_message *rm, int status)
        spin_lock_irqsave(&rm->m_rs_lock, flags);
 
        so = &rm->data;
-       if (test_bit(RDS_MSG_ON_SOCK, &rm->m_flags)
-               && so->op_active && so->op_notifier) {
+       if (test_bit(RDS_MSG_ON_SOCK, &rm->m_flags) &&
+               so->op_active && so->op_notifier && so->op_notify) {
                notifier = so->op_notifier;
                rs = rm->m_rs;
                debug_sock_hold(rds_rs_to_sk(rs));
@@ -527,18 +546,20 @@ void rds_rdma_send_complete(struct rds_message *rm, int status)
        spin_lock_irqsave(&rm->m_rs_lock, flags);
 
        ro = &rm->rdma;
-       if (test_bit(RDS_MSG_ON_SOCK, &rm->m_flags)
-        && ro->op_active && ro->op_notify && ro->op_notifier) {
+       if (test_bit(RDS_MSG_ON_SOCK, &rm->m_flags) &&
+          ro->op_active && ro->op_notifier && ro->op_notify) {
                notifier = ro->op_notifier;
                rs = rm->m_rs;
                debug_sock_hold(rds_rs_to_sk(rs));
 
                notifier->n_status = status;
-               spin_lock(&rs->rs_lock);
-               list_add_tail(&notifier->n_list, &rs->rs_notify_queue);
-               spin_unlock(&rs->rs_lock);
 
-               ro->op_notifier = NULL;
+               if (!ro->op_remote_complete) {
+                       spin_lock(&rs->rs_lock);
+                       list_add_tail(&notifier->n_list, &rs->rs_notify_queue);
+                       spin_unlock(&rs->rs_lock);
+                       ro->op_notifier = NULL;
+               }
        }
 
        spin_unlock_irqrestore(&rm->m_rs_lock, flags);
@@ -563,8 +584,8 @@ void rds_atomic_send_complete(struct rds_message *rm, int status)
        spin_lock_irqsave(&rm->m_rs_lock, flags);
 
        ao = &rm->atomic;
-       if (test_bit(RDS_MSG_ON_SOCK, &rm->m_flags)
-           && ao->op_active && ao->op_notify && ao->op_notifier) {
+       if (test_bit(RDS_MSG_ON_SOCK, &rm->m_flags) &&
+           ao->op_active && ao->op_notify && ao->op_notifier) {
                notifier = ao->op_notifier;
                rs = rm->m_rs;
                debug_sock_hold(rds_rs_to_sk(rs));
@@ -689,6 +710,7 @@ void rds_send_remove_from_sock(struct list_head *messages, int status)
                 * while we're messing with it. It does not prevent the
                 * message from being removed from the socket, though.
                 */
+
                spin_lock_irqsave(&rm->m_rs_lock, flags);
                if (!test_bit(RDS_MSG_ON_SOCK, &rm->m_flags))
                        goto unlock_and_drop;
@@ -711,24 +733,27 @@ void rds_send_remove_from_sock(struct list_head *messages, int status)
                                struct rm_rdma_op *ro = &rm->rdma;
                                struct rds_notifier *notifier;
 
-                               if (ro->op_notify
-                                       || (ro->op_recverr && status)) {
+                               if (ro->op_notify || status) {
                                        notifier = ro->op_notifier;
                                        list_add_tail(&notifier->n_list,
                                                        &rs->rs_notify_queue);
                                        if (!notifier->n_status)
                                                notifier->n_status = status;
-                                       rm->rdma.op_notifier = NULL;
-                               }
+                               } else
+                                       kfree(rm->rdma.op_notifier);
+                               rm->rdma.op_notifier = NULL;
                        } else if (rm->data.op_active && rm->data.op_notifier) {
                                struct rm_data_op *so = &rm->data;
                                struct rds_notifier *notifier;
 
-                               notifier = so->op_notifier;
-                               list_add_tail(&notifier->n_list,
+                               if (so->op_notify || status) {
+                                       notifier = so->op_notifier;
+                                       list_add_tail(&notifier->n_list,
                                                &rs->rs_notify_queue);
-                               if (!notifier->n_status)
-                                       notifier->n_status = status;
+                                       if (!notifier->n_status)
+                                               notifier->n_status = status;
+                               } else
+                                       kfree(rm->data.op_notifier);
                                rm->data.op_notifier = NULL;
                        }
 
@@ -937,17 +962,20 @@ static int rds_send_queue_rm(struct rds_sock *rs, struct rds_connection *conn,
                 * in after resetting the send state, flush it too.
                 */
                if (conn->c_last_failed_op) {
-                       if (rm->rdma.op_active && rm->rdma.op_notifier) {
-                               conn->c_last_failed_op =
-                                       rm->rdma.op_notifier;
-                               rm->rdma.op_notifier->n_conn = conn;
+                       if (rm->rdma.op_active) {
+                               if (rm->rdma.op_notifier) {
+                                       conn->c_last_failed_op =
+                                               rm->rdma.op_notifier;
+                                       rm->rdma.op_notifier->n_conn = conn;
+                               }
                                set_bit(RDS_MSG_FLUSH, &rm->m_flags);
                        }
-
-                       if (rm->data.op_active && rm->data.op_notifier) {
-                               conn->c_last_failed_op =
-                                       rm->data.op_notifier;
-                               rm->data.op_notifier->n_conn = conn;
+                       if (rm->data.op_active && rm->data.op_async) {
+                               if (rm->data.op_notifier) {
+                                       conn->c_last_failed_op =
+                                               rm->data.op_notifier;
+                                       rm->data.op_notifier->n_conn = conn;
+                               }
                                set_bit(RDS_MSG_FLUSH, &rm->m_flags);
                        }
                }
@@ -996,6 +1024,7 @@ static int rds_rm_size(struct msghdr *msg, int data_len)
 
                case RDS_CMSG_RDMA_DEST:
                case RDS_CMSG_RDMA_MAP:
+               case RDS_CMSG_ASYNC_SEND:
                        cmsg_groups |= 2;
                        /* these are valid but do no add any size */
                        break;
@@ -1006,10 +1035,6 @@ static int rds_rm_size(struct msghdr *msg, int data_len)
                        size += sizeof(struct scatterlist);
                        break;
 
-               case RDS_CMSG_ASYNC_SEND:
-                       cmsg_groups |= 4;
-                       break;
-
                default:
                        return -EINVAL;
                }
@@ -1019,7 +1044,7 @@ static int rds_rm_size(struct msghdr *msg, int data_len)
        size += ceil(data_len, PAGE_SIZE) * sizeof(struct scatterlist);
 
        /* Ensure (DEST, MAP) are never used with (ARGS, ATOMIC) */
-       if (cmsg_groups == 3 || cmsg_groups > 4)
+       if (cmsg_groups == 3)
                return -EINVAL;
 
        return size;
@@ -1030,6 +1055,9 @@ static int rds_cmsg_asend(struct rds_sock *rs, struct rds_message *rm,
 {
        struct rds_asend_args *args;
 
+       if (!rds_async_send_enabled)
+               return -EINVAL;
+
        if (cmsg->cmsg_len < CMSG_LEN(sizeof(struct rds_asend_args)))
                return -EINVAL;
 
@@ -1038,8 +1066,10 @@ static int rds_cmsg_asend(struct rds_sock *rs, struct rds_message *rm,
        if (!rm->data.op_notifier)
                return -ENOMEM;
 
+       rm->data.op_notify = !!(args->flags & RDS_SEND_NOTIFY_ME);
        rm->data.op_notifier->n_user_token = args->user_token;
        rm->data.op_notifier->n_status = RDS_RDMA_SEND_SUCCESS;
+       rm->data.op_async = 1;
 
        return 0;
 }
@@ -1182,6 +1212,13 @@ int rds_sendmsg(struct kiocb *iocb, struct socket *sock, struct msghdr *msg,
                rs->rs_conn = conn;
        }
 
+       /*
+       if (allocated_mr && conn->c_cleanup_stale_mrs) {
+               rds_rdma_cleanup_stale_mrs(rs, conn);
+               conn->c_cleanup_stale_mrs = 0;
+       }
+       */
+
        /* Not accepting new sends until all the failed ops have been reaped */
        if (conn->c_last_failed_op) {
                ret = -EAGAIN;