]> www.infradead.org Git - users/jedix/linux-maple.git/commitdiff
RDS: Add support for fast registration work request
authorAvinash Repaka <avinash.repaka@oracle.com>
Thu, 17 Aug 2017 21:02:47 +0000 (14:02 -0700)
committerChuck Anderson <chuck.anderson@oracle.com>
Tue, 19 Sep 2017 05:43:39 +0000 (22:43 -0700)
This patch adds support for MR registration through work request in RDS,
commonly referred as FRWR/fastreg/FRMR.

With this patch added, RDS chooses the registration method, between FMR
and FRWR, based on the preference given through 'prefer_frwr' module
parameter and the support offered by the underlying device.

Please note that this patch is adding support for MR registration done
only through CMSG. Support for registrations through RDS_GET_MR socket
option will be added through another patch.

Orabug: 22145384

Suggested-by: Santosh Shilimkar <santosh.shilimkar@oracle.com>
Signed-off-by: Avinash Repaka <avinash.repaka@oracle.com>
Tested-by: Gerald Gibson <gerald.gibson@oracle.com>
Tested-by: Efrain Galaviz <efrain.galaviz@oracle.com>
Reviewed-by: Wei Lin Guay <wei.lin.guay@oracle.com>
net/rds/connection.c
net/rds/ib.c
net/rds/ib.h
net/rds/ib_cm.c
net/rds/ib_rdma.c
net/rds/ib_send.c
net/rds/rdma.c
net/rds/rds.h
net/rds/send.c

index d6d3d1dcb04b63a59692e29f4216d1f2d0dc655c..a5d695189706968333ec5daa690213fba2699676 100644 (file)
@@ -160,6 +160,7 @@ static void __rds_conn_path_init(struct rds_connection *conn,
        INIT_WORK(&cp->cp_down_w, rds_shutdown_worker);
        mutex_init(&cp->cp_cm_lock);
        cp->cp_flags = 0;
+       atomic_set(&cp->cp_rdma_map_pending, 0);
 }
 
 /*
@@ -400,6 +401,8 @@ void rds_conn_shutdown(struct rds_conn_path *cp)
                           !test_bit(RDS_IN_XMIT, &cp->cp_flags));
                wait_event(cp->cp_waitq,
                           !test_bit(RDS_RECV_REFILL, &cp->cp_flags));
+               wait_event(cp->cp_waitq,
+                          (atomic_read(&cp->cp_rdma_map_pending) == 0));
 
                conn->c_trans->conn_path_shutdown(cp);
                rds_conn_path_reset(cp);
index f1df0a9659210304767f8ef0dff52389517d40bd..0226f14d8b14ce17c9dcbdf2d507718b618804b6 100644 (file)
@@ -54,6 +54,7 @@
 unsigned int rds_ib_fmr_1m_pool_size = RDS_FMR_1M_POOL_SIZE;
 unsigned int rds_ib_fmr_8k_pool_size = RDS_FMR_8K_POOL_SIZE;
 unsigned int rds_ib_retry_count = RDS_IB_DEFAULT_RETRY_COUNT;
+bool prefer_frwr;
 unsigned int rds_ib_active_bonding_enabled = 0;
 unsigned int rds_ib_active_bonding_fallback = 1;
 unsigned int rds_ib_active_bonding_trigger_delay_max_msecs; /* = 0; */
@@ -69,6 +70,8 @@ module_param(rds_ib_fmr_8k_pool_size, int, 0444);
 MODULE_PARM_DESC(rds_ib_fmr_8k_pool_size, " Max number of 8k fmr per HCA");
 module_param(rds_ib_retry_count, int, 0444);
 MODULE_PARM_DESC(rds_ib_retry_count, " Number of hw retries before reporting an error");
+module_param(prefer_frwr, bool, 0444);
+MODULE_PARM_DESC(prefer_frwr, "Preference of FRWR over FMR for memory registration(Y/N)");
 module_param(rds_ib_active_bonding_enabled, int, 0444);
 MODULE_PARM_DESC(rds_ib_active_bonding_enabled, " Active Bonding enabled");
 module_param(rds_ib_rnr_retry_count, int, 0444);
@@ -1985,6 +1988,7 @@ void rds_ib_add_one(struct ib_device *device)
 {
        struct rds_ib_device *rds_ibdev;
        struct ib_device_attr *dev_attr;
+       bool has_frwr, has_fmr;
 
        /* Only handle IB (no iWARP) devices */
        if (device->node_type != RDMA_NODE_IB_CA)
@@ -2060,6 +2064,14 @@ void rds_ib_add_one(struct ib_device *device)
                goto put_dev;
        }
 
+       has_frwr = (dev_attr->device_cap_flags & IB_DEVICE_MEM_MGT_EXTENSIONS);
+       has_fmr = (device->alloc_fmr && device->dealloc_fmr &&
+                  device->map_phys_fmr && device->unmap_fmr);
+       rds_ibdev->use_fastreg = (has_frwr && (!has_fmr || prefer_frwr));
+
+       pr_info("RDS/IB: %s will be used for ib_device: %s\n",
+               rds_ibdev->use_fastreg ? "FRWR" : "FMR", device->name);
+
        rds_ibdev->mr_1m_pool =
                rds_ib_create_mr_pool(rds_ibdev, RDS_IB_MR_1M_POOL);
        if (IS_ERR(rds_ibdev->mr_1m_pool)) {
index f595f358d8d77fcb83f6862817122719474148af..7f0bd52dddfb853cb67d036e6305c1a45ff56783 100644 (file)
@@ -18,6 +18,7 @@
 
 #define RDS_IB_DEFAULT_RECV_WR         1024
 #define RDS_IB_DEFAULT_SEND_WR         256
+#define RDS_IB_DEFAULT_FREG_WR         256
 #define RDS_IB_DEFAULT_SRQ_MAX_WR       4096
 #define RDS_IB_DEFAULT_SRQ_HWM_REFILL  (RDS_IB_DEFAULT_SRQ_MAX_WR/2)
 #define RDS_IB_DEFAULT_SRQ_LWM_REFILL  (RDS_IB_DEFAULT_SRQ_MAX_WR/10)
@@ -161,6 +162,9 @@ struct rds_ib_connection {
        struct ib_wc            i_send_wc[RDS_WC_MAX];
        struct ib_wc            i_recv_wc[RDS_WC_MAX];
 
+       /* Number of wrs available for MR registration(frwr) */
+       atomic_t                i_fastreg_wrs;
+
        /* interrupt handling */
        struct tasklet_struct   i_stasklet;
        struct tasklet_struct   i_rtasklet;
@@ -414,6 +418,7 @@ struct rds_ib_device {
        struct list_head        conn_list;
        struct ib_device        *dev;
        struct ib_pd            *pd;
+       bool                    use_fastreg;
        struct ib_mr            *mr;
        struct rds_ib_mr_pool   *mr_1m_pool;
        struct rds_ib_mr_pool   *mr_8k_pool;
@@ -452,7 +457,6 @@ struct rds_ib_device {
 #define IB_ACK_IN_FLIGHT       0
 #define IB_ACK_REQUESTED       1
 
-#define RDS_IB_SEND_OP         (1ULL << 63)
 /* Magic WR_ID for ACKs */
 #define RDS_IB_ACK_WR_ID       (~(u64) 0)
 
@@ -550,6 +554,7 @@ extern struct ib_client rds_ib_client;
 
 extern unsigned int rds_ib_fmr_1m_pool_size;
 extern unsigned int rds_ib_fmr_8k_pool_size;
+extern bool prefer_frwr;
 extern unsigned int rds_ib_retry_count;
 extern unsigned int rds_ib_rnr_retry_count;
 extern unsigned int rds_ib_active_bonding_enabled;
@@ -587,12 +592,14 @@ struct rds_ib_mr_pool *rds_ib_create_mr_pool(struct rds_ib_device *rds_dev, int
 void rds_ib_get_mr_info(struct rds_ib_device *rds_ibdev, struct rds_info_rdma_connection *iinfo);
 void rds_ib_destroy_mr_pool(struct rds_ib_mr_pool *);
 void *rds_ib_get_mr(struct scatterlist *sg, unsigned long nents,
-                   struct rds_sock *rs, u32 *key_ret);
+                   struct rds_sock *rs, u32 *key_ret,
+                   struct rds_connection *conn);
 void rds_ib_sync_mr(void *trans_private, int dir);
 void rds_ib_free_mr(void *trans_private, int invalidate);
 void rds_ib_flush_mrs(void);
 int rds_ib_fmr_init(void);
 void rds_ib_fmr_exit(void);
+void rds_ib_mr_cqe_handler(struct rds_ib_connection *ic, struct ib_wc *wc);
 
 /* ib_recv.c */
 int rds_ib_recv_init(void);
index 69f81c2a550b220cfe3d4cd2af96c3bb38384641..d433836d023101c8a49487ce50ca7ae0d61314f6 100644 (file)
@@ -422,42 +422,54 @@ static void rds_ib_cq_comp_handler_recv(struct ib_cq *cq, void *context)
        tasklet_schedule(&ic->i_rtasklet);
 }
 
-static void poll_cq(struct rds_ib_connection *ic, struct ib_cq *cq,
-                   struct ib_wc *wcs,
-                   struct rds_ib_ack_state *ack_state,
-                   unsigned int rx)
+static void poll_scq(struct rds_ib_connection *ic, struct ib_cq *cq,
+                    struct ib_wc *wcs)
 {
-       int nr;
-       int i;
+       int nr, i;
        struct ib_wc *wc;
 
        while ((nr = ib_poll_cq(cq, RDS_WC_MAX, wcs)) > 0) {
                for (i = 0; i < nr; i++) {
-                       if (rx) {
-                               if ((++ic->i_rx_poll_cq % RDS_IB_RX_LIMIT) == 0) {
-                                       rdsdebug("connection "
-                                                "<%u.%u.%u.%u,%u.%u.%u.%u,%d> "
-                                                "RX poll_cq processed %d\n",
-                                                NIPQUAD(ic->conn->c_laddr),
-                                                NIPQUAD(ic->conn->c_faddr),
-                                                ic->conn->c_tos,
-                                                ic->i_rx_poll_cq);
-                               }
-                       }
                        wc = wcs + i;
                        rdsdebug("wc wr_id 0x%llx status %u byte_len %u imm_data %u\n",
                                 (unsigned long long)wc->wr_id, wc->status, wc->byte_len,
                                 be32_to_cpu(wc->ex.imm_data));
 
-                       if (wc->wr_id & RDS_IB_SEND_OP)
+                       if (wc->wr_id < (u64)ic->i_send_ring.w_nr ||
+                           wc->wr_id == RDS_IB_ACK_WR_ID)
                                rds_ib_send_cqe_handler(ic, wc);
                        else
-                               rds_ib_recv_cqe_handler(ic, wc, ack_state);
+                               rds_ib_mr_cqe_handler(ic, wc);
                }
+       }
+}
 
-               if (rx && ic->i_rx_poll_cq >= RDS_IB_RX_LIMIT)
-                       break;
+static void poll_rcq(struct rds_ib_connection *ic, struct ib_cq *cq,
+                    struct ib_wc *wcs,
+                    struct rds_ib_ack_state *ack_state)
+{
+       int nr, i;
+       struct ib_wc *wc;
+
+       while ((nr = ib_poll_cq(cq, RDS_WC_MAX, wcs)) > 0) {
+               for (i = 0; i < nr; i++) {
+                       if ((++ic->i_rx_poll_cq % RDS_IB_RX_LIMIT) == 0) {
+                               rdsdebug("connection <%u.%u.%u.%u,%u.%u.%u.%u,%d>"
+                                        " RX poll_cq processed %d\n",
+                                        NIPQUAD(ic->conn->c_laddr),
+                                        NIPQUAD(ic->conn->c_faddr),
+                                        ic->conn->c_tos,
+                                        ic->i_rx_poll_cq);
+                       }
+                       wc = wcs + i;
+                       rdsdebug("wc wr_id 0x%llx status %u byte_len %u imm_data %u\n",
+                                (unsigned long long)wc->wr_id, wc->status,
+                                wc->byte_len, be32_to_cpu(wc->ex.imm_data));
+                       rds_ib_recv_cqe_handler(ic, wc, ack_state);
+               }
 
+               if (ic->i_rx_poll_cq >= RDS_IB_RX_LIMIT)
+                       break;
        }
 }
 
@@ -465,18 +477,16 @@ void rds_ib_tasklet_fn_send(unsigned long data)
 {
        struct rds_ib_connection *ic = (struct rds_ib_connection *) data;
        struct rds_connection *conn = ic->conn;
-       struct rds_ib_ack_state ack_state;
 
-       memset(&ack_state, 0, sizeof(ack_state));
        rds_ib_stats_inc(s_ib_tasklet_call);
 
        /* if cq has been already reaped, ignore incoming cq event */
         if (atomic_read(&ic->i_cq_quiesce))
                return;
 
-       poll_cq(ic, ic->i_scq, ic->i_send_wc, &ack_state, 0);
+       poll_scq(ic, ic->i_scq, ic->i_send_wc);
        ib_req_notify_cq(ic->i_scq, IB_CQ_NEXT_COMP);
-       poll_cq(ic, ic->i_scq, ic->i_send_wc, &ack_state, 0);
+       poll_scq(ic, ic->i_scq, ic->i_send_wc);
 
        if (rds_conn_up(conn) &&
           (!test_bit(RDS_LL_SEND_FULL, &conn->c_flags) ||
@@ -507,9 +517,9 @@ static void rds_ib_rx(struct rds_ib_connection *ic)
        memset(&ack_state, 0, sizeof(ack_state));
 
        ic->i_rx_poll_cq = 0;
-       poll_cq(ic, ic->i_rcq, ic->i_recv_wc, &ack_state, 1);
+       poll_rcq(ic, ic->i_rcq, ic->i_recv_wc, &ack_state);
        ib_req_notify_cq(ic->i_rcq, IB_CQ_SOLICITED);
-       poll_cq(ic, ic->i_rcq, ic->i_recv_wc, &ack_state, 1);
+       poll_rcq(ic, ic->i_rcq, ic->i_recv_wc, &ack_state);
 
        if (ack_state.ack_next_valid)
                rds_ib_set_ack(ic, ack_state.ack_next, ack_state.ack_required);
@@ -626,6 +636,7 @@ static int rds_ib_setup_qp(struct rds_connection *conn)
        struct ib_qp_init_attr attr;
        struct rds_ib_device *rds_ibdev;
        int ret;
+       int mr_reg, mr_inv;
 
        /*
         * It's normal to see a null device if an incoming connection races
@@ -635,11 +646,25 @@ static int rds_ib_setup_qp(struct rds_connection *conn)
        if (!rds_ibdev)
                return -EOPNOTSUPP;
 
+       /* In the case of FRWR, mr registration and invalidation wrs use the
+        * same work queue as the send wrs. To make sure that we are not
+        * overflowing the workqueue, we allocate separately for each operation.
+        * mr_reg and mr_inv are the wr numbers allocated for reg and inv.
+        */
+       if (rds_ibdev->use_fastreg) {
+               mr_reg = RDS_IB_DEFAULT_FREG_WR;
+               mr_inv = 1;
+       } else {
+               mr_reg = 0;
+               mr_inv = 0;
+       }
+
        /* add the conn now so that connection establishment has the dev */
        rds_ib_add_conn(rds_ibdev, conn);
 
-       if (rds_ibdev->max_wrs < ic->i_send_ring.w_nr + 1)
-               rds_ib_ring_resize(&ic->i_send_ring, rds_ibdev->max_wrs - 1);
+       if (rds_ibdev->max_wrs < ic->i_send_ring.w_nr + 1 + mr_reg + mr_inv)
+               rds_ib_ring_resize(&ic->i_send_ring,
+                                  rds_ibdev->max_wrs - 1 - mr_reg - mr_inv);
        if (rds_ibdev->max_wrs < ic->i_recv_ring.w_nr + 1)
                rds_ib_ring_resize(&ic->i_recv_ring, rds_ibdev->max_wrs - 1);
 
@@ -650,7 +675,7 @@ static int rds_ib_setup_qp(struct rds_connection *conn)
        ic->i_scq_vector = ibdev_get_unused_vector(rds_ibdev);
        ic->i_scq = ib_create_cq(dev, rds_ib_cq_comp_handler_send,
                                rds_ib_cq_event_handler, conn,
-                               ic->i_send_ring.w_nr + 1,
+                               ic->i_send_ring.w_nr + 1 + mr_reg + mr_inv,
                                ic->i_scq_vector);
        if (IS_ERR(ic->i_scq)) {
                ret = PTR_ERR(ic->i_scq);
@@ -696,7 +721,7 @@ static int rds_ib_setup_qp(struct rds_connection *conn)
        attr.event_handler = rds_ib_qp_event_handler;
        attr.qp_context = conn;
        /* + 1 to allow for the single ack message */
-       attr.cap.max_send_wr = ic->i_send_ring.w_nr + 1;
+       attr.cap.max_send_wr = ic->i_send_ring.w_nr + 1 + mr_reg + mr_inv;
        attr.cap.max_recv_wr = ic->i_recv_ring.w_nr + 1;
        attr.cap.max_send_sge = rds_ibdev->max_sge;
        attr.cap.max_recv_sge = RDS_IB_RECV_SGE;
@@ -1159,7 +1184,9 @@ void rds_ib_conn_path_shutdown(struct rds_conn_path *cp)
                /* quiesce tx and rx completion before tearing down */
                while (!wait_event_timeout(rds_ib_ring_empty_wait,
                                rds_ib_ring_empty(&ic->i_recv_ring) &&
-                               (atomic_read(&ic->i_signaled_sends) == 0),
+                               (atomic_read(&ic->i_signaled_sends) == 0) &&
+                               (atomic_read(&ic->i_fastreg_wrs) ==
+                                RDS_IB_DEFAULT_FREG_WR),
                                msecs_to_jiffies(5000))) {
 
                        /* Try to reap pending RX completions every 5 secs */
@@ -1310,6 +1337,12 @@ int rds_ib_conn_alloc(struct rds_connection *conn, gfp_t gfp)
         */
        rds_ib_ring_init(&ic->i_send_ring, rds_ib_sysctl_max_send_wr);
        rds_ib_ring_init(&ic->i_recv_ring, rds_ib_sysctl_max_recv_wr);
+
+       /* Might want to change this hard-coded value to a variable in future.
+        * Updating this atomic counter will need an update to qp/cq size too.
+        */
+       atomic_set(&ic->i_fastreg_wrs, RDS_IB_DEFAULT_FREG_WR);
+
        rds_ib_init_ic_frag(ic);
 
        ic->conn = conn;
index e8b003a7a9c3bd38d16c8527f946375e33a8c52a..88da05f8f56c8cfe4543002eec9d07d2145be84c 100644 (file)
@@ -43,13 +43,25 @@ struct workqueue_struct *rds_ib_fmr_wq;
 static DEFINE_PER_CPU(unsigned long, clean_list_grace);
 #define CLEAN_LIST_BUSY_BIT 0
 
+enum rds_ib_fr_state {
+       MR_IS_INVALID,          /* mr ready to be used */
+       MR_IS_VALID,            /* mr in use, marked before posting reg. wr */
+       MR_IS_STALE,            /* mr is possibly corrupt, marked if failure */
+};
+
 /*
  * This is stored as mr->r_trans_private.
  */
 struct rds_ib_mr {
        struct rds_ib_device    *device;
        struct rds_ib_mr_pool   *pool;
+       struct rds_ib_connection *ic;
+
        struct ib_fmr           *fmr;
+       struct ib_mr            *mr;
+       struct ib_fast_reg_page_list    *page_list;
+       enum rds_ib_fr_state    fr_state;
+       struct completion       wr_comp;
 
        struct xlist_head       xlist;
 
@@ -62,6 +74,9 @@ struct rds_ib_mr {
        u64                     *dma;
        int                     sg_dma_len;
 
+       unsigned int            dma_npages;
+       unsigned int            sg_byte_len;
+
        struct rds_sock         *rs;
        struct list_head        pool_list;
 };
@@ -88,6 +103,7 @@ struct rds_ib_mr_pool {
        unsigned long           max_free_pinned;
        unsigned                unmap_fmr_cpu;
        struct ib_fmr_attr      fmr_attr;
+       bool                    use_fastreg;
 
        spinlock_t              busy_lock; /* protect ops on 'busy_list' */
        /* All in use MRs allocated from this pool are listed here. This list
@@ -101,6 +117,11 @@ static int rds_ib_flush_mr_pool(struct rds_ib_mr_pool *pool, int free_all, struc
 static void rds_ib_teardown_mr(struct rds_ib_mr *ibmr);
 static void rds_ib_mr_pool_flush_worker(struct work_struct *work);
 
+static int rds_ib_map_fastreg_mr(struct rds_ib_device *rds_ibdev,
+                                struct rds_ib_mr *ibmr,
+                                struct scatterlist *sg, unsigned int sg_len);
+static int rds_ib_fastreg_inv(struct rds_ib_mr *ibmr);
+
 static struct rds_ib_device *rds_ib_get_device(__be32 ipaddr)
 {
        struct rds_ib_device *rds_ibdev;
@@ -279,6 +300,7 @@ struct rds_ib_mr_pool *rds_ib_create_mr_pool(struct rds_ib_device *rds_ibdev,
        pool->fmr_attr.max_maps = rds_ibdev->fmr_max_remaps;
        pool->fmr_attr.page_shift = PAGE_SHIFT;
        atomic_set(&pool->max_items_soft, pool->max_items);
+       pool->use_fastreg = rds_ibdev->use_fastreg;
 
        return pool;
 }
@@ -359,8 +381,60 @@ static inline void wait_clean_list_grace(void)
        }
 }
 
-static struct rds_ib_mr *rds_ib_alloc_fmr(struct rds_ib_device *rds_ibdev,
-                                       int npages)
+static int rds_ib_init_fastreg_mr(struct rds_ib_device *rds_ibdev,
+                                 struct rds_ib_mr_pool *pool,
+                                 struct rds_ib_mr *ibmr)
+{
+       struct ib_fast_reg_page_list *page_list = NULL;
+       struct ib_mr *mr = NULL;
+       int err;
+
+       mr = ib_alloc_fast_reg_mr(rds_ibdev->pd, pool->fmr_attr.max_pages);
+       if (IS_ERR(mr)) {
+               err = PTR_ERR(mr);
+               pr_warn("RDS/IB: ib_alloc_fast_reg_mr failed (err=%d)\n", err);
+               return err;
+       }
+
+       page_list = ib_alloc_fast_reg_page_list(rds_ibdev->dev,
+                                               pool->fmr_attr.max_pages);
+       if (IS_ERR(page_list)) {
+               err = PTR_ERR(page_list);
+
+               pr_warn("RDS/IB: ib_alloc_fast_reg_page_list failed (err=%d)\n",
+                       err);
+               ib_dereg_mr(mr);
+               return err;
+       }
+
+       ibmr->page_list = page_list;
+       ibmr->mr = mr;
+       return 0;
+}
+
+static int rds_ib_init_fmr(struct rds_ib_device *rds_ibdev,
+                          struct rds_ib_mr_pool *pool,
+                          struct rds_ib_mr *ibmr)
+{
+       int err;
+
+       ibmr->fmr = ib_alloc_fmr(rds_ibdev->pd,
+                       (IB_ACCESS_LOCAL_WRITE |
+                        IB_ACCESS_REMOTE_READ |
+                        IB_ACCESS_REMOTE_WRITE |
+                        IB_ACCESS_REMOTE_ATOMIC),
+                       &pool->fmr_attr);
+       if (IS_ERR(ibmr->fmr)) {
+               err = PTR_ERR(ibmr->fmr);
+               ibmr->fmr = NULL;
+               return err;
+       }
+
+       return 0;
+}
+
+static struct rds_ib_mr *rds_ib_alloc_ibmr(struct rds_ib_device *rds_ibdev,
+                                          int npages)
 {
        struct rds_ib_mr_pool *pool;
        struct rds_ib_mr *ibmr = NULL;
@@ -423,19 +497,16 @@ static struct rds_ib_mr *rds_ib_alloc_fmr(struct rds_ib_device *rds_ibdev,
                goto out_no_cigar;
        }
 
-       ibmr->fmr = ib_alloc_fmr(rds_ibdev->pd,
-                       (IB_ACCESS_LOCAL_WRITE |
-                        IB_ACCESS_REMOTE_READ |
-                        IB_ACCESS_REMOTE_WRITE|
-                        IB_ACCESS_REMOTE_ATOMIC),
-                       &pool->fmr_attr);
-       if (IS_ERR(ibmr->fmr)) {
+       if (rds_ibdev->use_fastreg)
+               err = rds_ib_init_fastreg_mr(rds_ibdev, pool, ibmr);
+       else
+               err = rds_ib_init_fmr(rds_ibdev, pool, ibmr);
+
+       if (err) {
                int total_pool_size;
                int prev_8k_max;
                int prev_1m_max;
 
-               err = PTR_ERR(ibmr->fmr);
-               ibmr->fmr = NULL;
                if (err != -ENOMEM)
                        goto out_no_cigar;
 
@@ -481,6 +552,9 @@ static struct rds_ib_mr *rds_ib_alloc_fmr(struct rds_ib_device *rds_ibdev,
        list_add(&ibmr->pool_list, &pool->busy_list);
        spin_unlock_bh(&pool->busy_lock);
 
+       init_completion(&ibmr->wr_comp);
+       ibmr->fr_state = MR_IS_INVALID; /* not needed bcas of kzalloc */
+
        ibmr->pool = pool;
        if (pool->pool_type == RDS_IB_MR_8K_POOL)
                rds_ib_stats_inc(s_ib_rdma_mr_8k_alloc);
@@ -493,11 +567,8 @@ static struct rds_ib_mr *rds_ib_alloc_fmr(struct rds_ib_device *rds_ibdev,
        return ibmr;
 
 out_no_cigar:
-       if (ibmr) {
-               if (ibmr->fmr)
-                       ib_dealloc_fmr(ibmr->fmr);
+       if (ibmr)
                kfree(ibmr);
-       }
        atomic_dec(&pool->item_count);
        return ERR_PTR(err);
 }
@@ -776,25 +847,47 @@ static int rds_ib_flush_mr_pool(struct rds_ib_mr_pool *pool,
        if (list_empty(&unmap_list))
                goto out;
 
-       /* String all ib_mr's onto one list and hand them to ib_unmap_fmr */
-       list_for_each_entry(ibmr, &unmap_list, unmap_list)
-               list_add(&ibmr->fmr->list, &fmr_list);
-
-       ret = ib_unmap_fmr(&fmr_list);
-       if (ret)
-               printk(KERN_WARNING "RDS/IB: ib_unmap_fmr failed (err=%d)\n", ret);
+       if (!pool->use_fastreg) {
+               /* String all ib_mrs onto one list and hand them to
+                * ib_unmap_fmr
+                */
+               list_for_each_entry(ibmr, &unmap_list, unmap_list)
+                       list_add(&ibmr->fmr->list, &fmr_list);
+
+               ret = ib_unmap_fmr(&fmr_list);
+               if (ret)
+                       pr_warn("RDS/IB: ib_unmap_fmr failed (err=%d)\n", ret);
+       } else {
+               list_for_each_entry(ibmr, &unmap_list, unmap_list) {
+                       ret = rds_ib_fastreg_inv(ibmr);
+                       if (ret)
+                               pr_warn_ratelimited(
+                                       "RDS/IB: rds_ib_fastreg_inv failed (err=%d)\n",
+                                       ret);
+               }
+       }
 
        /* Now we can destroy the DMA mapping and unpin any pages */
        list_for_each_entry_safe(ibmr, next, &unmap_list, unmap_list) {
                unpinned += ibmr->sg_len;
                __rds_ib_teardown_mr(ibmr);
-               if (nfreed < free_goal || ibmr->remap_count >= pool->fmr_attr.max_maps) {
+               if (nfreed < free_goal ||
+                   (!pool->use_fastreg &&
+                    ibmr->remap_count >= pool->fmr_attr.max_maps) ||
+                   (pool->use_fastreg && ibmr->fr_state == MR_IS_STALE)) {
                        if (ibmr->pool->pool_type == RDS_IB_MR_8K_POOL)
                                rds_ib_stats_inc(s_ib_rdma_mr_8k_free);
                        else
                                rds_ib_stats_inc(s_ib_rdma_mr_1m_free);
                        list_del(&ibmr->unmap_list);
-                       ib_dealloc_fmr(ibmr->fmr);
+                       if (pool->use_fastreg) {
+                               if (ibmr->page_list)
+                                       ib_free_fast_reg_page_list(ibmr->page_list);
+                               if (ibmr->mr)
+                                       ib_dereg_mr(ibmr->mr);
+                       } else {
+                               ib_dealloc_fmr(ibmr->fmr);
+                       }
                        kfree(ibmr);
                        nfreed++;
                }
@@ -877,7 +970,9 @@ void rds_ib_free_mr(void *trans_private, int invalidate)
        spin_unlock_bh(&pool->busy_lock);
 
        /* Return it to the pool's free list */
-       if (ibmr->remap_count >= pool->fmr_attr.max_maps)
+       if ((!rds_ibdev->use_fastreg &&
+            ibmr->remap_count >= pool->fmr_attr.max_maps) ||
+           (pool->use_fastreg && ibmr->fr_state == MR_IS_STALE))
                xlist_add(&ibmr->xlist, &ibmr->xlist, &pool->drop_list);
        else
                xlist_add(&ibmr->xlist, &ibmr->xlist, &pool->free_list);
@@ -922,10 +1017,12 @@ void rds_ib_flush_mrs(void)
 }
 
 void *rds_ib_get_mr(struct scatterlist *sg, unsigned long nents,
-                   struct rds_sock *rs, u32 *key_ret)
+                   struct rds_sock *rs, u32 *key_ret,
+                   struct rds_connection *conn)
 {
        struct rds_ib_device *rds_ibdev;
        struct rds_ib_mr *ibmr = NULL;
+       struct rds_ib_connection *ic = NULL;
        int ret;
 
        rds_ibdev = rds_ib_get_device(rs->rs_bound_addr);
@@ -934,22 +1031,38 @@ void *rds_ib_get_mr(struct scatterlist *sg, unsigned long nents,
                goto out;
        }
 
+       if (conn) {
+               ic = conn->c_transport_data;
+       } else if (rds_ibdev->use_fastreg) {
+               /* TODO: Add FRWR support for RDS_GET_MR */
+               ret = -EOPNOTSUPP;
+               goto out;
+       }
+
        if (!rds_ibdev->mr_8k_pool || !rds_ibdev->mr_1m_pool) {
                ret = -ENODEV;
                goto out;
        }
 
-       ibmr = rds_ib_alloc_fmr(rds_ibdev, nents);
+       ibmr = rds_ib_alloc_ibmr(rds_ibdev, nents);
        if (IS_ERR(ibmr)) {
                rds_ib_dev_put(rds_ibdev);
                return ibmr;
        }
 
-       ret = rds_ib_map_fmr(rds_ibdev, ibmr, sg, nents);
-       if (ret == 0)
-               *key_ret = ibmr->fmr->rkey;
-       else
-               printk(KERN_WARNING "RDS/IB: map_fmr failed (errno=%d)\n", ret);
+       ibmr->ic = ic;
+
+       if (rds_ibdev->use_fastreg) {
+               ret = rds_ib_map_fastreg_mr(rds_ibdev, ibmr, sg, nents);
+               if (ret == 0)
+                       *key_ret = ibmr->mr->rkey;
+       } else {
+               ret = rds_ib_map_fmr(rds_ibdev, ibmr, sg, nents);
+               if (ret == 0)
+                       *key_ret = ibmr->fmr->rkey;
+               else
+                       pr_warn("RDS/IB: map_fmr failed (errno=%d)\n", ret);
+       }
 
        ibmr->rs = rs;
        ibmr->device = rds_ibdev;
@@ -966,3 +1079,218 @@ void *rds_ib_get_mr(struct scatterlist *sg, unsigned long nents,
        return ibmr;
 }
 
+/* Fastreg related functions */
+
+static int rds_ib_map_scatterlist(struct rds_ib_device *rds_ibdev,
+                                 struct rds_ib_mr *ibmr)
+{
+       struct ib_device *dev = rds_ibdev->dev;
+       int i, j, ret, page_cnt;
+       u32 len;
+
+       ibmr->sg_dma_len = ib_dma_map_sg(dev, ibmr->sg, ibmr->sg_len,
+                                        DMA_BIDIRECTIONAL);
+       if (unlikely(!ibmr->sg_dma_len)) {
+               pr_warn("RDS/IB: dma_map_sg failed!\n");
+               return -EBUSY;
+       }
+
+       ibmr->sg_byte_len = 0;
+       ibmr->dma_npages = 0;
+       len = 0;
+
+       ret = -EINVAL;
+       for (i = 0; i < ibmr->sg_dma_len; ++i) {
+               unsigned int dma_len = ib_sg_dma_len(dev, &ibmr->sg[i]);
+               u64 dma_addr = ib_sg_dma_address(dev, &ibmr->sg[i]);
+
+               ibmr->sg_byte_len += dma_len;
+               if (dma_addr & ~PAGE_MASK) {
+                       if (i > 0)
+                               goto out_unmap;
+                       else
+                               ++ibmr->dma_npages;
+               }
+
+               if ((dma_addr + dma_len) & ~PAGE_MASK) {
+                       if (i < ibmr->sg_dma_len - 1)
+                               goto out_unmap;
+                       else
+                               ++ibmr->dma_npages;
+               }
+
+               len += dma_len;
+       }
+       ibmr->dma_npages += len >> PAGE_SHIFT;
+
+       /* Now gather the dma addrs into one list */
+       if (ibmr->dma_npages > ibmr->pool->fmr_attr.max_pages) {
+               ret = -EMSGSIZE;
+               goto out_unmap;
+       }
+
+       page_cnt = 0;
+       for (i = 0; i < ibmr->sg_dma_len; ++i) {
+               unsigned int dma_len = ib_sg_dma_len(dev, &ibmr->sg[i]);
+               u64 dma_addr = ib_sg_dma_address(dev, &ibmr->sg[i]);
+
+               for (j = 0; j < dma_len; j += PAGE_SIZE)
+                       ibmr->page_list->page_list[page_cnt++] =
+                               (dma_addr & PAGE_MASK) + j;
+       }
+
+       ibmr->dma_npages = page_cnt;
+       return 0;
+
+out_unmap:
+       return ret;
+}
+
+static int rds_ib_rdma_build_fastreg(struct rds_ib_mr *ibmr)
+{
+       struct ib_send_wr f_wr, *failed_wr;
+       int ret = 0;
+
+       while (atomic_dec_return(&ibmr->ic->i_fastreg_wrs) <= 0) {
+               atomic_inc(&ibmr->ic->i_fastreg_wrs);
+               /* Depending on how many times schedule() is called,
+                * we could replace it with wait_event() in future.
+                */
+               schedule();
+       }
+
+       ib_update_fast_reg_key(ibmr->mr, ibmr->remap_count++);
+       WARN_ON(ibmr->fr_state != MR_IS_INVALID);
+       ibmr->fr_state = MR_IS_VALID;
+
+       memset(&f_wr, 0, sizeof(f_wr));
+       f_wr.wr_id = (u64)ibmr;
+       f_wr.opcode = IB_WR_FAST_REG_MR;
+       f_wr.wr.fast_reg.length = ibmr->sg_byte_len;
+       f_wr.wr.fast_reg.rkey = ibmr->mr->rkey;
+       f_wr.wr.fast_reg.page_list = ibmr->page_list;
+       f_wr.wr.fast_reg.page_list_len = ibmr->dma_npages;
+       f_wr.wr.fast_reg.page_shift = PAGE_SHIFT;
+       f_wr.wr.fast_reg.access_flags = IB_ACCESS_LOCAL_WRITE |
+                                       IB_ACCESS_REMOTE_READ |
+                                       IB_ACCESS_REMOTE_WRITE;
+       f_wr.wr.fast_reg.iova_start = 0;
+       f_wr.send_flags = IB_SEND_SIGNALED;
+
+       failed_wr = &f_wr;
+       ret = ib_post_send(ibmr->ic->i_cm_id->qp, &f_wr, &failed_wr);
+       BUG_ON(failed_wr != &f_wr);
+       if (ret) {
+               atomic_inc(&ibmr->ic->i_fastreg_wrs);
+               ibmr->fr_state = MR_IS_INVALID;
+               pr_warn_ratelimited("RDS/IB: %s:%d ib_post_send returned %d\n",
+                                   __func__, __LINE__, ret);
+               goto out;
+       }
+
+       wait_for_completion(&ibmr->wr_comp);
+       if (ibmr->fr_state == MR_IS_STALE) {
+               /* Registration request failed */
+               ret = -EAGAIN;
+       }
+
+out:
+       return ret;
+}
+
+static int rds_ib_map_fastreg_mr(struct rds_ib_device *rds_ibdev,
+                                struct rds_ib_mr *ibmr,
+                                struct scatterlist *sg, unsigned int sg_len)
+{
+       int ret = 0;
+
+       /* We want to teardown old ibmr values here and fill it up with
+        * new sg values
+        */
+       rds_ib_teardown_mr(ibmr);
+
+       ibmr->sg = sg;
+       ibmr->sg_len = sg_len;
+
+       ret = rds_ib_map_scatterlist(rds_ibdev, ibmr);
+       if (ret)
+               goto out;
+
+       ret = rds_ib_rdma_build_fastreg(ibmr);
+       if (ret)
+               goto out;
+
+       if (ibmr->pool->pool_type == RDS_IB_MR_8K_POOL)
+               rds_ib_stats_inc(s_ib_rdma_mr_8k_used);
+       else
+               rds_ib_stats_inc(s_ib_rdma_mr_1m_used);
+
+       return ret;
+
+out:
+       if (ibmr->sg_dma_len) {
+               ib_dma_unmap_sg(rds_ibdev->dev, ibmr->sg, ibmr->sg_len,
+                               DMA_BIDIRECTIONAL);
+               ibmr->sg_dma_len = 0;
+       }
+       ibmr->sg = NULL;
+       ibmr->sg_len = 0;
+       return ret;
+}
+
+static int rds_ib_fastreg_inv(struct rds_ib_mr *ibmr)
+{
+       struct ib_send_wr s_wr, *failed_wr;
+       int ret = 0;
+
+       if (ibmr->fr_state != MR_IS_VALID)
+               goto out;
+
+       ibmr->fr_state = MR_IS_INVALID;
+
+       memset(&s_wr, 0, sizeof(s_wr));
+       s_wr.wr_id = (u64)ibmr;
+       s_wr.opcode = IB_WR_LOCAL_INV;
+       s_wr.ex.invalidate_rkey = ibmr->mr->rkey;
+       s_wr.send_flags = IB_SEND_SIGNALED;
+
+       failed_wr = &s_wr;
+       ret = ib_post_send(ibmr->ic->i_cm_id->qp, &s_wr, &failed_wr);
+       BUG_ON(failed_wr != &s_wr);
+       if (ret) {
+               ibmr->fr_state = MR_IS_STALE;
+               pr_warn_ratelimited("RDS/IB: %s:%d ib_post_send returned %d\n",
+                                   __func__, __LINE__, ret);
+               goto out;
+       }
+
+       wait_for_completion(&ibmr->wr_comp);
+
+out:
+       return ret;
+}
+
+void rds_ib_mr_cqe_handler(struct rds_ib_connection *ic, struct ib_wc *wc)
+{
+       struct rds_ib_mr *ibmr = (struct rds_ib_mr *)wc->wr_id;
+       enum rds_ib_fr_state fr_state = ibmr->fr_state;
+
+       WARN_ON(ibmr->fr_state == MR_IS_STALE);
+
+       if (wc->status != IB_WC_SUCCESS) {
+               if (rds_conn_up(ic->conn)) {
+                       pr_warn("RDS: IB: MR completion <%pI4,%pI4,%d> status %u "
+                               "vendor_err %u, disconnecting and reconnecting\n",
+                               &ic->conn->c_laddr, &ic->conn->c_faddr,
+                               ic->conn->c_tos, wc->status, wc->vendor_err);
+               }
+               ibmr->fr_state = MR_IS_STALE;
+       }
+
+       if (fr_state == MR_IS_INVALID) {
+               complete(&ibmr->wr_comp);
+       } else if (fr_state == MR_IS_VALID) {
+               atomic_inc(&ic->i_fastreg_wrs);
+               complete(&ibmr->wr_comp);
+       }
+}
index 6992cd105246183cda8ce0d82e6541cf56df49a5..c80b08a03f4d0136d4ad94b17ab33405d64b8fb7 100644 (file)
@@ -244,7 +244,7 @@ void rds_ib_send_init_ring(struct rds_ib_connection *ic)
 
                send->s_op = NULL;
 
-               send->s_wr.wr_id = i | RDS_IB_SEND_OP;
+               send->s_wr.wr_id = i;
                send->s_wr.sg_list = send->s_sge;
                send->s_wr.ex.imm_data = 0;
 
@@ -308,8 +308,7 @@ void rds_ib_send_cqe_handler(struct rds_ib_connection *ic, struct ib_wc *wc)
 
        oldest = rds_ib_ring_oldest(&ic->i_send_ring);
 
-       completed = rds_ib_ring_completed(&ic->i_send_ring,
-                                 (wc->wr_id & ~RDS_IB_SEND_OP), oldest);
+       completed = rds_ib_ring_completed(&ic->i_send_ring, wc->wr_id, oldest);
 
        for (i = 0; i < completed; i++) {
                struct rds_message *rm;
index e874c076e896b81f97ec1b7c19b565f8959cce2d..8930b55639823c209defdb8dddc63a1976f1558b 100644 (file)
@@ -177,7 +177,8 @@ static int rds_pin_pages(unsigned long user_addr, unsigned int nr_pages,
 }
 
 static int __rds_rdma_map(struct rds_sock *rs, struct rds_get_mr_args *args,
-                               u64 *cookie_ret, struct rds_mr **mr_ret)
+                         u64 *cookie_ret, struct rds_mr **mr_ret,
+                         struct rds_conn_path *cp)
 {
        struct rds_mr *mr = NULL, *found;
        unsigned int nr_pages;
@@ -268,7 +269,8 @@ static int __rds_rdma_map(struct rds_sock *rs, struct rds_get_mr_args *args,
         * Note that dma_map() implies that pending writes are
         * flushed to RAM, so no dma_sync is needed here. */
        trans_private = rs->rs_transport->get_mr(sg, nents, rs,
-                                                &mr->r_key);
+                                                &mr->r_key,
+                                                cp ? cp->cp_conn : NULL);
 
        if (IS_ERR(trans_private)) {
                for (i = 0 ; i < nents; i++)
@@ -329,7 +331,7 @@ int rds_get_mr(struct rds_sock *rs, char __user *optval, int optlen)
                           sizeof(struct rds_get_mr_args)))
                return -EFAULT;
 
-       return __rds_rdma_map(rs, &args, NULL, NULL);
+       return __rds_rdma_map(rs, &args, NULL, NULL, NULL);
 }
 
 int rds_get_mr_for_dest(struct rds_sock *rs, char __user *optval, int optlen)
@@ -353,7 +355,7 @@ int rds_get_mr_for_dest(struct rds_sock *rs, char __user *optval, int optlen)
        new_args.cookie_addr = args.cookie_addr;
        new_args.flags = args.flags;
 
-       return __rds_rdma_map(rs, &new_args, NULL, NULL);
+       return __rds_rdma_map(rs, &new_args, NULL, NULL, NULL);
 }
 
 /*
@@ -724,6 +726,18 @@ int rds_cmsg_rdma_dest(struct rds_sock *rs, struct rds_message *rm,
        return err;
 }
 
+static void inc_rdma_map_pending(struct rds_conn_path *cp)
+{
+       atomic_inc(&cp->cp_rdma_map_pending);
+}
+
+static void dec_rdma_map_pending(struct rds_conn_path *cp)
+{
+       if (atomic_dec_and_test(&cp->cp_rdma_map_pending))
+               if (waitqueue_active(&cp->cp_waitq))
+                       wake_up_all(&cp->cp_waitq);
+}
+
 /*
  * The application passes us an address range it wants to enable RDMA
  * to/from. We map the area, and save the <R_Key,offset> pair
@@ -738,9 +752,19 @@ int rds_cmsg_rdma_map(struct rds_sock *rs, struct rds_message *rm,
         || rm->m_rdma_cookie != 0)
                return -EINVAL;
 
-       ret = __rds_rdma_map(rs, CMSG_DATA(cmsg), &rm->m_rdma_cookie, &rm->rdma.op_rdma_mr);
+       inc_rdma_map_pending(rm->m_conn_path);
+       if (!rds_conn_path_up(rm->m_conn_path)) {
+               dec_rdma_map_pending(rm->m_conn_path);
+               return -EAGAIN;
+       }
+
+       ret = __rds_rdma_map(rs, CMSG_DATA(cmsg), &rm->m_rdma_cookie,
+                            &rm->rdma.op_rdma_mr, rm->m_conn_path);
        if (!ret)
                rm->rdma.op_implicit_mr = 1;
+
+       dec_rdma_map_pending(rm->m_conn_path);
+
        return ret;
 }
 
index 76c8f190d7f93fc301519c3e6d7fa8340aec5aee..3f50b738f3db4d7a927633722efaa9d77e2cd387 100644 (file)
@@ -245,6 +245,7 @@ struct rds_conn_path {
        atomic_t                cp_state;
        unsigned long           cp_send_gen;
        unsigned long           cp_flags;
+       atomic_t                cp_rdma_map_pending;
        unsigned long           cp_reconnect_jiffies;
        struct delayed_work     cp_send_w;
        struct delayed_work     cp_recv_w;
@@ -613,6 +614,8 @@ struct rds_message {
                        struct scatterlist      *op_sg;
                } data;
        };
+
+       struct rds_conn_path *m_conn_path;
 };
 
 /*
@@ -692,7 +695,8 @@ struct rds_transport {
                                        unsigned int avail);
        void (*exit)(void);
        void *(*get_mr)(struct scatterlist *sg, unsigned long nr_sg,
-                       struct rds_sock *rs, u32 *key_ret);
+                       struct rds_sock *rs, u32 *key_ret,
+                       struct rds_connection *conn);
        void (*sync_mr)(void *trans_private, int direction);
        void (*free_mr)(void *trans_private, int invalidate);
        void (*flush_mrs)(void);
index 325cae8f55d083b6a2a28ba1ad0ebeee02e84a11..bd71f388b231af66e16959c9343390b645c51426 100644 (file)
@@ -1308,6 +1308,13 @@ int rds_sendmsg(struct socket *sock, struct msghdr *msg, size_t payload_len)
                rs->rs_conn = conn;
        }
 
+       if (conn->c_trans->t_mp_capable)
+               cpath = &conn->c_path[rds_send_mprds_hash(rs, conn)];
+       else
+               cpath = &conn->c_path[0];
+
+       rm->m_conn_path = cpath;
+
        /* Parse any control messages the user may have included. */
        ret = rds_cmsg_send(rs, rm, msg, &allocated_mr);
        if (ret) {
@@ -1317,11 +1324,6 @@ int rds_sendmsg(struct socket *sock, struct msghdr *msg, size_t payload_len)
                goto out;
        }
 
-       if (conn->c_trans->t_mp_capable)
-               cpath = &conn->c_path[rds_send_mprds_hash(rs, conn)];
-       else
-               cpath = &conn->c_path[0];
-
        if (test_bit(RDS_DESTROY_PENDING, &cpath->cp_flags)) {
                ret = -EAGAIN;
                goto out;