From: Avinash Repaka Date: Thu, 17 Aug 2017 21:02:47 +0000 (-0700) Subject: RDS: Add support for fast registration work request X-Git-Tag: v4.1.12-111.0.20170918_2215~2 X-Git-Url: https://www.infradead.org/git/?a=commitdiff_plain;h=e7b7c2cffbb9b4acd688d4d1425385d5f51e8bfa;p=users%2Fjedix%2Flinux-maple.git RDS: Add support for fast registration work request This patch adds support for MR registration through work request in RDS, commonly referred as FRWR/fastreg/FRMR. With this patch added, RDS chooses the registration method, between FMR and FRWR, based on the preference given through 'prefer_frwr' module parameter and the support offered by the underlying device. Please note that this patch is adding support for MR registration done only through CMSG. Support for registrations through RDS_GET_MR socket option will be added through another patch. Orabug: 22145384 Suggested-by: Santosh Shilimkar Signed-off-by: Avinash Repaka Tested-by: Gerald Gibson Tested-by: Efrain Galaviz Reviewed-by: Wei Lin Guay --- diff --git a/net/rds/connection.c b/net/rds/connection.c index d6d3d1dcb04b6..a5d6951897069 100644 --- a/net/rds/connection.c +++ b/net/rds/connection.c @@ -160,6 +160,7 @@ static void __rds_conn_path_init(struct rds_connection *conn, INIT_WORK(&cp->cp_down_w, rds_shutdown_worker); mutex_init(&cp->cp_cm_lock); cp->cp_flags = 0; + atomic_set(&cp->cp_rdma_map_pending, 0); } /* @@ -400,6 +401,8 @@ void rds_conn_shutdown(struct rds_conn_path *cp) !test_bit(RDS_IN_XMIT, &cp->cp_flags)); wait_event(cp->cp_waitq, !test_bit(RDS_RECV_REFILL, &cp->cp_flags)); + wait_event(cp->cp_waitq, + (atomic_read(&cp->cp_rdma_map_pending) == 0)); conn->c_trans->conn_path_shutdown(cp); rds_conn_path_reset(cp); diff --git a/net/rds/ib.c b/net/rds/ib.c index f1df0a9659210..0226f14d8b14c 100644 --- a/net/rds/ib.c +++ b/net/rds/ib.c @@ -54,6 +54,7 @@ unsigned int rds_ib_fmr_1m_pool_size = RDS_FMR_1M_POOL_SIZE; unsigned int rds_ib_fmr_8k_pool_size = RDS_FMR_8K_POOL_SIZE; unsigned int rds_ib_retry_count = RDS_IB_DEFAULT_RETRY_COUNT; +bool prefer_frwr; unsigned int rds_ib_active_bonding_enabled = 0; unsigned int rds_ib_active_bonding_fallback = 1; unsigned int rds_ib_active_bonding_trigger_delay_max_msecs; /* = 0; */ @@ -69,6 +70,8 @@ module_param(rds_ib_fmr_8k_pool_size, int, 0444); MODULE_PARM_DESC(rds_ib_fmr_8k_pool_size, " Max number of 8k fmr per HCA"); module_param(rds_ib_retry_count, int, 0444); MODULE_PARM_DESC(rds_ib_retry_count, " Number of hw retries before reporting an error"); +module_param(prefer_frwr, bool, 0444); +MODULE_PARM_DESC(prefer_frwr, "Preference of FRWR over FMR for memory registration(Y/N)"); module_param(rds_ib_active_bonding_enabled, int, 0444); MODULE_PARM_DESC(rds_ib_active_bonding_enabled, " Active Bonding enabled"); module_param(rds_ib_rnr_retry_count, int, 0444); @@ -1985,6 +1988,7 @@ void rds_ib_add_one(struct ib_device *device) { struct rds_ib_device *rds_ibdev; struct ib_device_attr *dev_attr; + bool has_frwr, has_fmr; /* Only handle IB (no iWARP) devices */ if (device->node_type != RDMA_NODE_IB_CA) @@ -2060,6 +2064,14 @@ void rds_ib_add_one(struct ib_device *device) goto put_dev; } + has_frwr = (dev_attr->device_cap_flags & IB_DEVICE_MEM_MGT_EXTENSIONS); + has_fmr = (device->alloc_fmr && device->dealloc_fmr && + device->map_phys_fmr && device->unmap_fmr); + rds_ibdev->use_fastreg = (has_frwr && (!has_fmr || prefer_frwr)); + + pr_info("RDS/IB: %s will be used for ib_device: %s\n", + rds_ibdev->use_fastreg ? "FRWR" : "FMR", device->name); + rds_ibdev->mr_1m_pool = rds_ib_create_mr_pool(rds_ibdev, RDS_IB_MR_1M_POOL); if (IS_ERR(rds_ibdev->mr_1m_pool)) { diff --git a/net/rds/ib.h b/net/rds/ib.h index f595f358d8d77..7f0bd52dddfb8 100644 --- a/net/rds/ib.h +++ b/net/rds/ib.h @@ -18,6 +18,7 @@ #define RDS_IB_DEFAULT_RECV_WR 1024 #define RDS_IB_DEFAULT_SEND_WR 256 +#define RDS_IB_DEFAULT_FREG_WR 256 #define RDS_IB_DEFAULT_SRQ_MAX_WR 4096 #define RDS_IB_DEFAULT_SRQ_HWM_REFILL (RDS_IB_DEFAULT_SRQ_MAX_WR/2) #define RDS_IB_DEFAULT_SRQ_LWM_REFILL (RDS_IB_DEFAULT_SRQ_MAX_WR/10) @@ -161,6 +162,9 @@ struct rds_ib_connection { struct ib_wc i_send_wc[RDS_WC_MAX]; struct ib_wc i_recv_wc[RDS_WC_MAX]; + /* Number of wrs available for MR registration(frwr) */ + atomic_t i_fastreg_wrs; + /* interrupt handling */ struct tasklet_struct i_stasklet; struct tasklet_struct i_rtasklet; @@ -414,6 +418,7 @@ struct rds_ib_device { struct list_head conn_list; struct ib_device *dev; struct ib_pd *pd; + bool use_fastreg; struct ib_mr *mr; struct rds_ib_mr_pool *mr_1m_pool; struct rds_ib_mr_pool *mr_8k_pool; @@ -452,7 +457,6 @@ struct rds_ib_device { #define IB_ACK_IN_FLIGHT 0 #define IB_ACK_REQUESTED 1 -#define RDS_IB_SEND_OP (1ULL << 63) /* Magic WR_ID for ACKs */ #define RDS_IB_ACK_WR_ID (~(u64) 0) @@ -550,6 +554,7 @@ extern struct ib_client rds_ib_client; extern unsigned int rds_ib_fmr_1m_pool_size; extern unsigned int rds_ib_fmr_8k_pool_size; +extern bool prefer_frwr; extern unsigned int rds_ib_retry_count; extern unsigned int rds_ib_rnr_retry_count; extern unsigned int rds_ib_active_bonding_enabled; @@ -587,12 +592,14 @@ struct rds_ib_mr_pool *rds_ib_create_mr_pool(struct rds_ib_device *rds_dev, int void rds_ib_get_mr_info(struct rds_ib_device *rds_ibdev, struct rds_info_rdma_connection *iinfo); void rds_ib_destroy_mr_pool(struct rds_ib_mr_pool *); void *rds_ib_get_mr(struct scatterlist *sg, unsigned long nents, - struct rds_sock *rs, u32 *key_ret); + struct rds_sock *rs, u32 *key_ret, + struct rds_connection *conn); void rds_ib_sync_mr(void *trans_private, int dir); void rds_ib_free_mr(void *trans_private, int invalidate); void rds_ib_flush_mrs(void); int rds_ib_fmr_init(void); void rds_ib_fmr_exit(void); +void rds_ib_mr_cqe_handler(struct rds_ib_connection *ic, struct ib_wc *wc); /* ib_recv.c */ int rds_ib_recv_init(void); diff --git a/net/rds/ib_cm.c b/net/rds/ib_cm.c index 69f81c2a550b2..d433836d02310 100644 --- a/net/rds/ib_cm.c +++ b/net/rds/ib_cm.c @@ -422,42 +422,54 @@ static void rds_ib_cq_comp_handler_recv(struct ib_cq *cq, void *context) tasklet_schedule(&ic->i_rtasklet); } -static void poll_cq(struct rds_ib_connection *ic, struct ib_cq *cq, - struct ib_wc *wcs, - struct rds_ib_ack_state *ack_state, - unsigned int rx) +static void poll_scq(struct rds_ib_connection *ic, struct ib_cq *cq, + struct ib_wc *wcs) { - int nr; - int i; + int nr, i; struct ib_wc *wc; while ((nr = ib_poll_cq(cq, RDS_WC_MAX, wcs)) > 0) { for (i = 0; i < nr; i++) { - if (rx) { - if ((++ic->i_rx_poll_cq % RDS_IB_RX_LIMIT) == 0) { - rdsdebug("connection " - "<%u.%u.%u.%u,%u.%u.%u.%u,%d> " - "RX poll_cq processed %d\n", - NIPQUAD(ic->conn->c_laddr), - NIPQUAD(ic->conn->c_faddr), - ic->conn->c_tos, - ic->i_rx_poll_cq); - } - } wc = wcs + i; rdsdebug("wc wr_id 0x%llx status %u byte_len %u imm_data %u\n", (unsigned long long)wc->wr_id, wc->status, wc->byte_len, be32_to_cpu(wc->ex.imm_data)); - if (wc->wr_id & RDS_IB_SEND_OP) + if (wc->wr_id < (u64)ic->i_send_ring.w_nr || + wc->wr_id == RDS_IB_ACK_WR_ID) rds_ib_send_cqe_handler(ic, wc); else - rds_ib_recv_cqe_handler(ic, wc, ack_state); + rds_ib_mr_cqe_handler(ic, wc); } + } +} - if (rx && ic->i_rx_poll_cq >= RDS_IB_RX_LIMIT) - break; +static void poll_rcq(struct rds_ib_connection *ic, struct ib_cq *cq, + struct ib_wc *wcs, + struct rds_ib_ack_state *ack_state) +{ + int nr, i; + struct ib_wc *wc; + + while ((nr = ib_poll_cq(cq, RDS_WC_MAX, wcs)) > 0) { + for (i = 0; i < nr; i++) { + if ((++ic->i_rx_poll_cq % RDS_IB_RX_LIMIT) == 0) { + rdsdebug("connection <%u.%u.%u.%u,%u.%u.%u.%u,%d>" + " RX poll_cq processed %d\n", + NIPQUAD(ic->conn->c_laddr), + NIPQUAD(ic->conn->c_faddr), + ic->conn->c_tos, + ic->i_rx_poll_cq); + } + wc = wcs + i; + rdsdebug("wc wr_id 0x%llx status %u byte_len %u imm_data %u\n", + (unsigned long long)wc->wr_id, wc->status, + wc->byte_len, be32_to_cpu(wc->ex.imm_data)); + rds_ib_recv_cqe_handler(ic, wc, ack_state); + } + if (ic->i_rx_poll_cq >= RDS_IB_RX_LIMIT) + break; } } @@ -465,18 +477,16 @@ void rds_ib_tasklet_fn_send(unsigned long data) { struct rds_ib_connection *ic = (struct rds_ib_connection *) data; struct rds_connection *conn = ic->conn; - struct rds_ib_ack_state ack_state; - memset(&ack_state, 0, sizeof(ack_state)); rds_ib_stats_inc(s_ib_tasklet_call); /* if cq has been already reaped, ignore incoming cq event */ if (atomic_read(&ic->i_cq_quiesce)) return; - poll_cq(ic, ic->i_scq, ic->i_send_wc, &ack_state, 0); + poll_scq(ic, ic->i_scq, ic->i_send_wc); ib_req_notify_cq(ic->i_scq, IB_CQ_NEXT_COMP); - poll_cq(ic, ic->i_scq, ic->i_send_wc, &ack_state, 0); + poll_scq(ic, ic->i_scq, ic->i_send_wc); if (rds_conn_up(conn) && (!test_bit(RDS_LL_SEND_FULL, &conn->c_flags) || @@ -507,9 +517,9 @@ static void rds_ib_rx(struct rds_ib_connection *ic) memset(&ack_state, 0, sizeof(ack_state)); ic->i_rx_poll_cq = 0; - poll_cq(ic, ic->i_rcq, ic->i_recv_wc, &ack_state, 1); + poll_rcq(ic, ic->i_rcq, ic->i_recv_wc, &ack_state); ib_req_notify_cq(ic->i_rcq, IB_CQ_SOLICITED); - poll_cq(ic, ic->i_rcq, ic->i_recv_wc, &ack_state, 1); + poll_rcq(ic, ic->i_rcq, ic->i_recv_wc, &ack_state); if (ack_state.ack_next_valid) rds_ib_set_ack(ic, ack_state.ack_next, ack_state.ack_required); @@ -626,6 +636,7 @@ static int rds_ib_setup_qp(struct rds_connection *conn) struct ib_qp_init_attr attr; struct rds_ib_device *rds_ibdev; int ret; + int mr_reg, mr_inv; /* * It's normal to see a null device if an incoming connection races @@ -635,11 +646,25 @@ static int rds_ib_setup_qp(struct rds_connection *conn) if (!rds_ibdev) return -EOPNOTSUPP; + /* In the case of FRWR, mr registration and invalidation wrs use the + * same work queue as the send wrs. To make sure that we are not + * overflowing the workqueue, we allocate separately for each operation. + * mr_reg and mr_inv are the wr numbers allocated for reg and inv. + */ + if (rds_ibdev->use_fastreg) { + mr_reg = RDS_IB_DEFAULT_FREG_WR; + mr_inv = 1; + } else { + mr_reg = 0; + mr_inv = 0; + } + /* add the conn now so that connection establishment has the dev */ rds_ib_add_conn(rds_ibdev, conn); - if (rds_ibdev->max_wrs < ic->i_send_ring.w_nr + 1) - rds_ib_ring_resize(&ic->i_send_ring, rds_ibdev->max_wrs - 1); + if (rds_ibdev->max_wrs < ic->i_send_ring.w_nr + 1 + mr_reg + mr_inv) + rds_ib_ring_resize(&ic->i_send_ring, + rds_ibdev->max_wrs - 1 - mr_reg - mr_inv); if (rds_ibdev->max_wrs < ic->i_recv_ring.w_nr + 1) rds_ib_ring_resize(&ic->i_recv_ring, rds_ibdev->max_wrs - 1); @@ -650,7 +675,7 @@ static int rds_ib_setup_qp(struct rds_connection *conn) ic->i_scq_vector = ibdev_get_unused_vector(rds_ibdev); ic->i_scq = ib_create_cq(dev, rds_ib_cq_comp_handler_send, rds_ib_cq_event_handler, conn, - ic->i_send_ring.w_nr + 1, + ic->i_send_ring.w_nr + 1 + mr_reg + mr_inv, ic->i_scq_vector); if (IS_ERR(ic->i_scq)) { ret = PTR_ERR(ic->i_scq); @@ -696,7 +721,7 @@ static int rds_ib_setup_qp(struct rds_connection *conn) attr.event_handler = rds_ib_qp_event_handler; attr.qp_context = conn; /* + 1 to allow for the single ack message */ - attr.cap.max_send_wr = ic->i_send_ring.w_nr + 1; + attr.cap.max_send_wr = ic->i_send_ring.w_nr + 1 + mr_reg + mr_inv; attr.cap.max_recv_wr = ic->i_recv_ring.w_nr + 1; attr.cap.max_send_sge = rds_ibdev->max_sge; attr.cap.max_recv_sge = RDS_IB_RECV_SGE; @@ -1159,7 +1184,9 @@ void rds_ib_conn_path_shutdown(struct rds_conn_path *cp) /* quiesce tx and rx completion before tearing down */ while (!wait_event_timeout(rds_ib_ring_empty_wait, rds_ib_ring_empty(&ic->i_recv_ring) && - (atomic_read(&ic->i_signaled_sends) == 0), + (atomic_read(&ic->i_signaled_sends) == 0) && + (atomic_read(&ic->i_fastreg_wrs) == + RDS_IB_DEFAULT_FREG_WR), msecs_to_jiffies(5000))) { /* Try to reap pending RX completions every 5 secs */ @@ -1310,6 +1337,12 @@ int rds_ib_conn_alloc(struct rds_connection *conn, gfp_t gfp) */ rds_ib_ring_init(&ic->i_send_ring, rds_ib_sysctl_max_send_wr); rds_ib_ring_init(&ic->i_recv_ring, rds_ib_sysctl_max_recv_wr); + + /* Might want to change this hard-coded value to a variable in future. + * Updating this atomic counter will need an update to qp/cq size too. + */ + atomic_set(&ic->i_fastreg_wrs, RDS_IB_DEFAULT_FREG_WR); + rds_ib_init_ic_frag(ic); ic->conn = conn; diff --git a/net/rds/ib_rdma.c b/net/rds/ib_rdma.c index e8b003a7a9c3b..88da05f8f56c8 100644 --- a/net/rds/ib_rdma.c +++ b/net/rds/ib_rdma.c @@ -43,13 +43,25 @@ struct workqueue_struct *rds_ib_fmr_wq; static DEFINE_PER_CPU(unsigned long, clean_list_grace); #define CLEAN_LIST_BUSY_BIT 0 +enum rds_ib_fr_state { + MR_IS_INVALID, /* mr ready to be used */ + MR_IS_VALID, /* mr in use, marked before posting reg. wr */ + MR_IS_STALE, /* mr is possibly corrupt, marked if failure */ +}; + /* * This is stored as mr->r_trans_private. */ struct rds_ib_mr { struct rds_ib_device *device; struct rds_ib_mr_pool *pool; + struct rds_ib_connection *ic; + struct ib_fmr *fmr; + struct ib_mr *mr; + struct ib_fast_reg_page_list *page_list; + enum rds_ib_fr_state fr_state; + struct completion wr_comp; struct xlist_head xlist; @@ -62,6 +74,9 @@ struct rds_ib_mr { u64 *dma; int sg_dma_len; + unsigned int dma_npages; + unsigned int sg_byte_len; + struct rds_sock *rs; struct list_head pool_list; }; @@ -88,6 +103,7 @@ struct rds_ib_mr_pool { unsigned long max_free_pinned; unsigned unmap_fmr_cpu; struct ib_fmr_attr fmr_attr; + bool use_fastreg; spinlock_t busy_lock; /* protect ops on 'busy_list' */ /* All in use MRs allocated from this pool are listed here. This list @@ -101,6 +117,11 @@ static int rds_ib_flush_mr_pool(struct rds_ib_mr_pool *pool, int free_all, struc static void rds_ib_teardown_mr(struct rds_ib_mr *ibmr); static void rds_ib_mr_pool_flush_worker(struct work_struct *work); +static int rds_ib_map_fastreg_mr(struct rds_ib_device *rds_ibdev, + struct rds_ib_mr *ibmr, + struct scatterlist *sg, unsigned int sg_len); +static int rds_ib_fastreg_inv(struct rds_ib_mr *ibmr); + static struct rds_ib_device *rds_ib_get_device(__be32 ipaddr) { struct rds_ib_device *rds_ibdev; @@ -279,6 +300,7 @@ struct rds_ib_mr_pool *rds_ib_create_mr_pool(struct rds_ib_device *rds_ibdev, pool->fmr_attr.max_maps = rds_ibdev->fmr_max_remaps; pool->fmr_attr.page_shift = PAGE_SHIFT; atomic_set(&pool->max_items_soft, pool->max_items); + pool->use_fastreg = rds_ibdev->use_fastreg; return pool; } @@ -359,8 +381,60 @@ static inline void wait_clean_list_grace(void) } } -static struct rds_ib_mr *rds_ib_alloc_fmr(struct rds_ib_device *rds_ibdev, - int npages) +static int rds_ib_init_fastreg_mr(struct rds_ib_device *rds_ibdev, + struct rds_ib_mr_pool *pool, + struct rds_ib_mr *ibmr) +{ + struct ib_fast_reg_page_list *page_list = NULL; + struct ib_mr *mr = NULL; + int err; + + mr = ib_alloc_fast_reg_mr(rds_ibdev->pd, pool->fmr_attr.max_pages); + if (IS_ERR(mr)) { + err = PTR_ERR(mr); + pr_warn("RDS/IB: ib_alloc_fast_reg_mr failed (err=%d)\n", err); + return err; + } + + page_list = ib_alloc_fast_reg_page_list(rds_ibdev->dev, + pool->fmr_attr.max_pages); + if (IS_ERR(page_list)) { + err = PTR_ERR(page_list); + + pr_warn("RDS/IB: ib_alloc_fast_reg_page_list failed (err=%d)\n", + err); + ib_dereg_mr(mr); + return err; + } + + ibmr->page_list = page_list; + ibmr->mr = mr; + return 0; +} + +static int rds_ib_init_fmr(struct rds_ib_device *rds_ibdev, + struct rds_ib_mr_pool *pool, + struct rds_ib_mr *ibmr) +{ + int err; + + ibmr->fmr = ib_alloc_fmr(rds_ibdev->pd, + (IB_ACCESS_LOCAL_WRITE | + IB_ACCESS_REMOTE_READ | + IB_ACCESS_REMOTE_WRITE | + IB_ACCESS_REMOTE_ATOMIC), + &pool->fmr_attr); + if (IS_ERR(ibmr->fmr)) { + err = PTR_ERR(ibmr->fmr); + ibmr->fmr = NULL; + return err; + } + + return 0; +} + +static struct rds_ib_mr *rds_ib_alloc_ibmr(struct rds_ib_device *rds_ibdev, + int npages) { struct rds_ib_mr_pool *pool; struct rds_ib_mr *ibmr = NULL; @@ -423,19 +497,16 @@ static struct rds_ib_mr *rds_ib_alloc_fmr(struct rds_ib_device *rds_ibdev, goto out_no_cigar; } - ibmr->fmr = ib_alloc_fmr(rds_ibdev->pd, - (IB_ACCESS_LOCAL_WRITE | - IB_ACCESS_REMOTE_READ | - IB_ACCESS_REMOTE_WRITE| - IB_ACCESS_REMOTE_ATOMIC), - &pool->fmr_attr); - if (IS_ERR(ibmr->fmr)) { + if (rds_ibdev->use_fastreg) + err = rds_ib_init_fastreg_mr(rds_ibdev, pool, ibmr); + else + err = rds_ib_init_fmr(rds_ibdev, pool, ibmr); + + if (err) { int total_pool_size; int prev_8k_max; int prev_1m_max; - err = PTR_ERR(ibmr->fmr); - ibmr->fmr = NULL; if (err != -ENOMEM) goto out_no_cigar; @@ -481,6 +552,9 @@ static struct rds_ib_mr *rds_ib_alloc_fmr(struct rds_ib_device *rds_ibdev, list_add(&ibmr->pool_list, &pool->busy_list); spin_unlock_bh(&pool->busy_lock); + init_completion(&ibmr->wr_comp); + ibmr->fr_state = MR_IS_INVALID; /* not needed bcas of kzalloc */ + ibmr->pool = pool; if (pool->pool_type == RDS_IB_MR_8K_POOL) rds_ib_stats_inc(s_ib_rdma_mr_8k_alloc); @@ -493,11 +567,8 @@ static struct rds_ib_mr *rds_ib_alloc_fmr(struct rds_ib_device *rds_ibdev, return ibmr; out_no_cigar: - if (ibmr) { - if (ibmr->fmr) - ib_dealloc_fmr(ibmr->fmr); + if (ibmr) kfree(ibmr); - } atomic_dec(&pool->item_count); return ERR_PTR(err); } @@ -776,25 +847,47 @@ static int rds_ib_flush_mr_pool(struct rds_ib_mr_pool *pool, if (list_empty(&unmap_list)) goto out; - /* String all ib_mr's onto one list and hand them to ib_unmap_fmr */ - list_for_each_entry(ibmr, &unmap_list, unmap_list) - list_add(&ibmr->fmr->list, &fmr_list); - - ret = ib_unmap_fmr(&fmr_list); - if (ret) - printk(KERN_WARNING "RDS/IB: ib_unmap_fmr failed (err=%d)\n", ret); + if (!pool->use_fastreg) { + /* String all ib_mrs onto one list and hand them to + * ib_unmap_fmr + */ + list_for_each_entry(ibmr, &unmap_list, unmap_list) + list_add(&ibmr->fmr->list, &fmr_list); + + ret = ib_unmap_fmr(&fmr_list); + if (ret) + pr_warn("RDS/IB: ib_unmap_fmr failed (err=%d)\n", ret); + } else { + list_for_each_entry(ibmr, &unmap_list, unmap_list) { + ret = rds_ib_fastreg_inv(ibmr); + if (ret) + pr_warn_ratelimited( + "RDS/IB: rds_ib_fastreg_inv failed (err=%d)\n", + ret); + } + } /* Now we can destroy the DMA mapping and unpin any pages */ list_for_each_entry_safe(ibmr, next, &unmap_list, unmap_list) { unpinned += ibmr->sg_len; __rds_ib_teardown_mr(ibmr); - if (nfreed < free_goal || ibmr->remap_count >= pool->fmr_attr.max_maps) { + if (nfreed < free_goal || + (!pool->use_fastreg && + ibmr->remap_count >= pool->fmr_attr.max_maps) || + (pool->use_fastreg && ibmr->fr_state == MR_IS_STALE)) { if (ibmr->pool->pool_type == RDS_IB_MR_8K_POOL) rds_ib_stats_inc(s_ib_rdma_mr_8k_free); else rds_ib_stats_inc(s_ib_rdma_mr_1m_free); list_del(&ibmr->unmap_list); - ib_dealloc_fmr(ibmr->fmr); + if (pool->use_fastreg) { + if (ibmr->page_list) + ib_free_fast_reg_page_list(ibmr->page_list); + if (ibmr->mr) + ib_dereg_mr(ibmr->mr); + } else { + ib_dealloc_fmr(ibmr->fmr); + } kfree(ibmr); nfreed++; } @@ -877,7 +970,9 @@ void rds_ib_free_mr(void *trans_private, int invalidate) spin_unlock_bh(&pool->busy_lock); /* Return it to the pool's free list */ - if (ibmr->remap_count >= pool->fmr_attr.max_maps) + if ((!rds_ibdev->use_fastreg && + ibmr->remap_count >= pool->fmr_attr.max_maps) || + (pool->use_fastreg && ibmr->fr_state == MR_IS_STALE)) xlist_add(&ibmr->xlist, &ibmr->xlist, &pool->drop_list); else xlist_add(&ibmr->xlist, &ibmr->xlist, &pool->free_list); @@ -922,10 +1017,12 @@ void rds_ib_flush_mrs(void) } void *rds_ib_get_mr(struct scatterlist *sg, unsigned long nents, - struct rds_sock *rs, u32 *key_ret) + struct rds_sock *rs, u32 *key_ret, + struct rds_connection *conn) { struct rds_ib_device *rds_ibdev; struct rds_ib_mr *ibmr = NULL; + struct rds_ib_connection *ic = NULL; int ret; rds_ibdev = rds_ib_get_device(rs->rs_bound_addr); @@ -934,22 +1031,38 @@ void *rds_ib_get_mr(struct scatterlist *sg, unsigned long nents, goto out; } + if (conn) { + ic = conn->c_transport_data; + } else if (rds_ibdev->use_fastreg) { + /* TODO: Add FRWR support for RDS_GET_MR */ + ret = -EOPNOTSUPP; + goto out; + } + if (!rds_ibdev->mr_8k_pool || !rds_ibdev->mr_1m_pool) { ret = -ENODEV; goto out; } - ibmr = rds_ib_alloc_fmr(rds_ibdev, nents); + ibmr = rds_ib_alloc_ibmr(rds_ibdev, nents); if (IS_ERR(ibmr)) { rds_ib_dev_put(rds_ibdev); return ibmr; } - ret = rds_ib_map_fmr(rds_ibdev, ibmr, sg, nents); - if (ret == 0) - *key_ret = ibmr->fmr->rkey; - else - printk(KERN_WARNING "RDS/IB: map_fmr failed (errno=%d)\n", ret); + ibmr->ic = ic; + + if (rds_ibdev->use_fastreg) { + ret = rds_ib_map_fastreg_mr(rds_ibdev, ibmr, sg, nents); + if (ret == 0) + *key_ret = ibmr->mr->rkey; + } else { + ret = rds_ib_map_fmr(rds_ibdev, ibmr, sg, nents); + if (ret == 0) + *key_ret = ibmr->fmr->rkey; + else + pr_warn("RDS/IB: map_fmr failed (errno=%d)\n", ret); + } ibmr->rs = rs; ibmr->device = rds_ibdev; @@ -966,3 +1079,218 @@ void *rds_ib_get_mr(struct scatterlist *sg, unsigned long nents, return ibmr; } +/* Fastreg related functions */ + +static int rds_ib_map_scatterlist(struct rds_ib_device *rds_ibdev, + struct rds_ib_mr *ibmr) +{ + struct ib_device *dev = rds_ibdev->dev; + int i, j, ret, page_cnt; + u32 len; + + ibmr->sg_dma_len = ib_dma_map_sg(dev, ibmr->sg, ibmr->sg_len, + DMA_BIDIRECTIONAL); + if (unlikely(!ibmr->sg_dma_len)) { + pr_warn("RDS/IB: dma_map_sg failed!\n"); + return -EBUSY; + } + + ibmr->sg_byte_len = 0; + ibmr->dma_npages = 0; + len = 0; + + ret = -EINVAL; + for (i = 0; i < ibmr->sg_dma_len; ++i) { + unsigned int dma_len = ib_sg_dma_len(dev, &ibmr->sg[i]); + u64 dma_addr = ib_sg_dma_address(dev, &ibmr->sg[i]); + + ibmr->sg_byte_len += dma_len; + if (dma_addr & ~PAGE_MASK) { + if (i > 0) + goto out_unmap; + else + ++ibmr->dma_npages; + } + + if ((dma_addr + dma_len) & ~PAGE_MASK) { + if (i < ibmr->sg_dma_len - 1) + goto out_unmap; + else + ++ibmr->dma_npages; + } + + len += dma_len; + } + ibmr->dma_npages += len >> PAGE_SHIFT; + + /* Now gather the dma addrs into one list */ + if (ibmr->dma_npages > ibmr->pool->fmr_attr.max_pages) { + ret = -EMSGSIZE; + goto out_unmap; + } + + page_cnt = 0; + for (i = 0; i < ibmr->sg_dma_len; ++i) { + unsigned int dma_len = ib_sg_dma_len(dev, &ibmr->sg[i]); + u64 dma_addr = ib_sg_dma_address(dev, &ibmr->sg[i]); + + for (j = 0; j < dma_len; j += PAGE_SIZE) + ibmr->page_list->page_list[page_cnt++] = + (dma_addr & PAGE_MASK) + j; + } + + ibmr->dma_npages = page_cnt; + return 0; + +out_unmap: + return ret; +} + +static int rds_ib_rdma_build_fastreg(struct rds_ib_mr *ibmr) +{ + struct ib_send_wr f_wr, *failed_wr; + int ret = 0; + + while (atomic_dec_return(&ibmr->ic->i_fastreg_wrs) <= 0) { + atomic_inc(&ibmr->ic->i_fastreg_wrs); + /* Depending on how many times schedule() is called, + * we could replace it with wait_event() in future. + */ + schedule(); + } + + ib_update_fast_reg_key(ibmr->mr, ibmr->remap_count++); + WARN_ON(ibmr->fr_state != MR_IS_INVALID); + ibmr->fr_state = MR_IS_VALID; + + memset(&f_wr, 0, sizeof(f_wr)); + f_wr.wr_id = (u64)ibmr; + f_wr.opcode = IB_WR_FAST_REG_MR; + f_wr.wr.fast_reg.length = ibmr->sg_byte_len; + f_wr.wr.fast_reg.rkey = ibmr->mr->rkey; + f_wr.wr.fast_reg.page_list = ibmr->page_list; + f_wr.wr.fast_reg.page_list_len = ibmr->dma_npages; + f_wr.wr.fast_reg.page_shift = PAGE_SHIFT; + f_wr.wr.fast_reg.access_flags = IB_ACCESS_LOCAL_WRITE | + IB_ACCESS_REMOTE_READ | + IB_ACCESS_REMOTE_WRITE; + f_wr.wr.fast_reg.iova_start = 0; + f_wr.send_flags = IB_SEND_SIGNALED; + + failed_wr = &f_wr; + ret = ib_post_send(ibmr->ic->i_cm_id->qp, &f_wr, &failed_wr); + BUG_ON(failed_wr != &f_wr); + if (ret) { + atomic_inc(&ibmr->ic->i_fastreg_wrs); + ibmr->fr_state = MR_IS_INVALID; + pr_warn_ratelimited("RDS/IB: %s:%d ib_post_send returned %d\n", + __func__, __LINE__, ret); + goto out; + } + + wait_for_completion(&ibmr->wr_comp); + if (ibmr->fr_state == MR_IS_STALE) { + /* Registration request failed */ + ret = -EAGAIN; + } + +out: + return ret; +} + +static int rds_ib_map_fastreg_mr(struct rds_ib_device *rds_ibdev, + struct rds_ib_mr *ibmr, + struct scatterlist *sg, unsigned int sg_len) +{ + int ret = 0; + + /* We want to teardown old ibmr values here and fill it up with + * new sg values + */ + rds_ib_teardown_mr(ibmr); + + ibmr->sg = sg; + ibmr->sg_len = sg_len; + + ret = rds_ib_map_scatterlist(rds_ibdev, ibmr); + if (ret) + goto out; + + ret = rds_ib_rdma_build_fastreg(ibmr); + if (ret) + goto out; + + if (ibmr->pool->pool_type == RDS_IB_MR_8K_POOL) + rds_ib_stats_inc(s_ib_rdma_mr_8k_used); + else + rds_ib_stats_inc(s_ib_rdma_mr_1m_used); + + return ret; + +out: + if (ibmr->sg_dma_len) { + ib_dma_unmap_sg(rds_ibdev->dev, ibmr->sg, ibmr->sg_len, + DMA_BIDIRECTIONAL); + ibmr->sg_dma_len = 0; + } + ibmr->sg = NULL; + ibmr->sg_len = 0; + return ret; +} + +static int rds_ib_fastreg_inv(struct rds_ib_mr *ibmr) +{ + struct ib_send_wr s_wr, *failed_wr; + int ret = 0; + + if (ibmr->fr_state != MR_IS_VALID) + goto out; + + ibmr->fr_state = MR_IS_INVALID; + + memset(&s_wr, 0, sizeof(s_wr)); + s_wr.wr_id = (u64)ibmr; + s_wr.opcode = IB_WR_LOCAL_INV; + s_wr.ex.invalidate_rkey = ibmr->mr->rkey; + s_wr.send_flags = IB_SEND_SIGNALED; + + failed_wr = &s_wr; + ret = ib_post_send(ibmr->ic->i_cm_id->qp, &s_wr, &failed_wr); + BUG_ON(failed_wr != &s_wr); + if (ret) { + ibmr->fr_state = MR_IS_STALE; + pr_warn_ratelimited("RDS/IB: %s:%d ib_post_send returned %d\n", + __func__, __LINE__, ret); + goto out; + } + + wait_for_completion(&ibmr->wr_comp); + +out: + return ret; +} + +void rds_ib_mr_cqe_handler(struct rds_ib_connection *ic, struct ib_wc *wc) +{ + struct rds_ib_mr *ibmr = (struct rds_ib_mr *)wc->wr_id; + enum rds_ib_fr_state fr_state = ibmr->fr_state; + + WARN_ON(ibmr->fr_state == MR_IS_STALE); + + if (wc->status != IB_WC_SUCCESS) { + if (rds_conn_up(ic->conn)) { + pr_warn("RDS: IB: MR completion <%pI4,%pI4,%d> status %u " + "vendor_err %u, disconnecting and reconnecting\n", + &ic->conn->c_laddr, &ic->conn->c_faddr, + ic->conn->c_tos, wc->status, wc->vendor_err); + } + ibmr->fr_state = MR_IS_STALE; + } + + if (fr_state == MR_IS_INVALID) { + complete(&ibmr->wr_comp); + } else if (fr_state == MR_IS_VALID) { + atomic_inc(&ic->i_fastreg_wrs); + complete(&ibmr->wr_comp); + } +} diff --git a/net/rds/ib_send.c b/net/rds/ib_send.c index 6992cd1052461..c80b08a03f4d0 100644 --- a/net/rds/ib_send.c +++ b/net/rds/ib_send.c @@ -244,7 +244,7 @@ void rds_ib_send_init_ring(struct rds_ib_connection *ic) send->s_op = NULL; - send->s_wr.wr_id = i | RDS_IB_SEND_OP; + send->s_wr.wr_id = i; send->s_wr.sg_list = send->s_sge; send->s_wr.ex.imm_data = 0; @@ -308,8 +308,7 @@ void rds_ib_send_cqe_handler(struct rds_ib_connection *ic, struct ib_wc *wc) oldest = rds_ib_ring_oldest(&ic->i_send_ring); - completed = rds_ib_ring_completed(&ic->i_send_ring, - (wc->wr_id & ~RDS_IB_SEND_OP), oldest); + completed = rds_ib_ring_completed(&ic->i_send_ring, wc->wr_id, oldest); for (i = 0; i < completed; i++) { struct rds_message *rm; diff --git a/net/rds/rdma.c b/net/rds/rdma.c index e874c076e896b..8930b55639823 100644 --- a/net/rds/rdma.c +++ b/net/rds/rdma.c @@ -177,7 +177,8 @@ static int rds_pin_pages(unsigned long user_addr, unsigned int nr_pages, } static int __rds_rdma_map(struct rds_sock *rs, struct rds_get_mr_args *args, - u64 *cookie_ret, struct rds_mr **mr_ret) + u64 *cookie_ret, struct rds_mr **mr_ret, + struct rds_conn_path *cp) { struct rds_mr *mr = NULL, *found; unsigned int nr_pages; @@ -268,7 +269,8 @@ static int __rds_rdma_map(struct rds_sock *rs, struct rds_get_mr_args *args, * Note that dma_map() implies that pending writes are * flushed to RAM, so no dma_sync is needed here. */ trans_private = rs->rs_transport->get_mr(sg, nents, rs, - &mr->r_key); + &mr->r_key, + cp ? cp->cp_conn : NULL); if (IS_ERR(trans_private)) { for (i = 0 ; i < nents; i++) @@ -329,7 +331,7 @@ int rds_get_mr(struct rds_sock *rs, char __user *optval, int optlen) sizeof(struct rds_get_mr_args))) return -EFAULT; - return __rds_rdma_map(rs, &args, NULL, NULL); + return __rds_rdma_map(rs, &args, NULL, NULL, NULL); } int rds_get_mr_for_dest(struct rds_sock *rs, char __user *optval, int optlen) @@ -353,7 +355,7 @@ int rds_get_mr_for_dest(struct rds_sock *rs, char __user *optval, int optlen) new_args.cookie_addr = args.cookie_addr; new_args.flags = args.flags; - return __rds_rdma_map(rs, &new_args, NULL, NULL); + return __rds_rdma_map(rs, &new_args, NULL, NULL, NULL); } /* @@ -724,6 +726,18 @@ int rds_cmsg_rdma_dest(struct rds_sock *rs, struct rds_message *rm, return err; } +static void inc_rdma_map_pending(struct rds_conn_path *cp) +{ + atomic_inc(&cp->cp_rdma_map_pending); +} + +static void dec_rdma_map_pending(struct rds_conn_path *cp) +{ + if (atomic_dec_and_test(&cp->cp_rdma_map_pending)) + if (waitqueue_active(&cp->cp_waitq)) + wake_up_all(&cp->cp_waitq); +} + /* * The application passes us an address range it wants to enable RDMA * to/from. We map the area, and save the pair @@ -738,9 +752,19 @@ int rds_cmsg_rdma_map(struct rds_sock *rs, struct rds_message *rm, || rm->m_rdma_cookie != 0) return -EINVAL; - ret = __rds_rdma_map(rs, CMSG_DATA(cmsg), &rm->m_rdma_cookie, &rm->rdma.op_rdma_mr); + inc_rdma_map_pending(rm->m_conn_path); + if (!rds_conn_path_up(rm->m_conn_path)) { + dec_rdma_map_pending(rm->m_conn_path); + return -EAGAIN; + } + + ret = __rds_rdma_map(rs, CMSG_DATA(cmsg), &rm->m_rdma_cookie, + &rm->rdma.op_rdma_mr, rm->m_conn_path); if (!ret) rm->rdma.op_implicit_mr = 1; + + dec_rdma_map_pending(rm->m_conn_path); + return ret; } diff --git a/net/rds/rds.h b/net/rds/rds.h index 76c8f190d7f93..3f50b738f3db4 100644 --- a/net/rds/rds.h +++ b/net/rds/rds.h @@ -245,6 +245,7 @@ struct rds_conn_path { atomic_t cp_state; unsigned long cp_send_gen; unsigned long cp_flags; + atomic_t cp_rdma_map_pending; unsigned long cp_reconnect_jiffies; struct delayed_work cp_send_w; struct delayed_work cp_recv_w; @@ -613,6 +614,8 @@ struct rds_message { struct scatterlist *op_sg; } data; }; + + struct rds_conn_path *m_conn_path; }; /* @@ -692,7 +695,8 @@ struct rds_transport { unsigned int avail); void (*exit)(void); void *(*get_mr)(struct scatterlist *sg, unsigned long nr_sg, - struct rds_sock *rs, u32 *key_ret); + struct rds_sock *rs, u32 *key_ret, + struct rds_connection *conn); void (*sync_mr)(void *trans_private, int direction); void (*free_mr)(void *trans_private, int invalidate); void (*flush_mrs)(void); diff --git a/net/rds/send.c b/net/rds/send.c index 325cae8f55d08..bd71f388b231a 100644 --- a/net/rds/send.c +++ b/net/rds/send.c @@ -1308,6 +1308,13 @@ int rds_sendmsg(struct socket *sock, struct msghdr *msg, size_t payload_len) rs->rs_conn = conn; } + if (conn->c_trans->t_mp_capable) + cpath = &conn->c_path[rds_send_mprds_hash(rs, conn)]; + else + cpath = &conn->c_path[0]; + + rm->m_conn_path = cpath; + /* Parse any control messages the user may have included. */ ret = rds_cmsg_send(rs, rm, msg, &allocated_mr); if (ret) { @@ -1317,11 +1324,6 @@ int rds_sendmsg(struct socket *sock, struct msghdr *msg, size_t payload_len) goto out; } - if (conn->c_trans->t_mp_capable) - cpath = &conn->c_path[rds_send_mprds_hash(rs, conn)]; - else - cpath = &conn->c_path[0]; - if (test_bit(RDS_DESTROY_PENDING, &cpath->cp_flags)) { ret = -EAGAIN; goto out;