#include <linux/slab.h>
#include <linux/seq_file.h>
#include <linux/sunrpc/addr.h>
+#include <linux/sunrpc/svc_rdma.h>
#include "xprt_rdma.h"
#define RPCRDMA_MAX_REEST_TO (30U * HZ)
#define RPCRDMA_IDLE_DISC_TO (5U * 60 * HZ)
-static struct rpc_xprt_ops xprt_rdma_procs; /* forward reference */
+static struct rpc_xprt_ops xprt_rdma_procs;
+#if defined(CONFIG_SUNRPC_BACKCHANNEL)
+static struct rpc_xprt_ops xprt_rdma_bc_procs;
+#endif
static void
xprt_rdma_format_addresses4(struct rpc_xprt *xprt, struct sockaddr *sap)
if (req == NULL)
return NULL;
- flags = GFP_NOIO | __GFP_NOWARN;
+ flags = RPCRDMA_DEF_GFP;
if (RPC_IS_SWAPPER(task))
flags = __GFP_MEMALLOC | GFP_NOWAIT | __GFP_NOWARN;
{
}
+#if defined(CONFIG_SUNRPC_BACKCHANNEL)
+
+/* Server-side transport endpoint wants a whole page for its send
+ * buffer. The client RPC code constructs the RPC header in this
+ * buffer before it invokes ->send_request.
+ */
+static void *
+xprt_rdma_bc_allocate(struct rpc_task *task, size_t size)
+{
+ struct rpc_rqst *rqst = task->tk_rqstp;
+ struct svc_rdma_op_ctxt *ctxt;
+ struct svcxprt_rdma *rdma;
+ struct svc_xprt *sxprt;
+ struct page *page;
+
+ if (size > PAGE_SIZE) {
+ WARN_ONCE(1, "failed to handle buffer allocation (size %zu)\n",
+ size);
+ return NULL;
+ }
+
+ page = alloc_page(RPCRDMA_DEF_GFP);
+ if (!page)
+ return NULL;
+
+ sxprt = rqst->rq_xprt->bc_xprt;
+ rdma = container_of(sxprt, struct svcxprt_rdma, sc_xprt);
+ ctxt = svc_rdma_get_context_gfp(rdma, RPCRDMA_DEF_GFP);
+ if (!ctxt) {
+ put_page(page);
+ return NULL;
+ }
+
+ rqst->rq_privdata = ctxt;
+ ctxt->pages[0] = page;
+ ctxt->count = 1;
+ return page_address(page);
+}
+
+static void
+xprt_rdma_bc_free(void *buffer)
+{
+ /* No-op: ctxt and page have already been freed. */
+}
+
+static int
+rpcrdma_bc_send_request(struct svcxprt_rdma *rdma, struct rpc_rqst *rqst)
+{
+ struct rpc_xprt *xprt = rqst->rq_xprt;
+ struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
+ struct rpcrdma_msg *headerp = (struct rpcrdma_msg *)rqst->rq_buffer;
+ struct svc_rdma_op_ctxt *ctxt;
+ int rc;
+
+ /* Space in the send buffer for an RPC/RDMA header is reserved
+ * via xprt->tsh_size */
+ headerp->rm_xid = rqst->rq_xid;
+ headerp->rm_vers = rpcrdma_version;
+ headerp->rm_credit = cpu_to_be32(r_xprt->rx_buf.rb_bc_max_requests);
+ headerp->rm_type = rdma_msg;
+ headerp->rm_body.rm_chunks[0] = xdr_zero;
+ headerp->rm_body.rm_chunks[1] = xdr_zero;
+ headerp->rm_body.rm_chunks[2] = xdr_zero;
+
+ pr_info("%s: %*ph\n", __func__, 64, rqst->rq_buffer);
+
+ ctxt = (struct svc_rdma_op_ctxt *)rqst->rq_privdata;
+ rc = svc_rdma_bc_post_send(rdma, ctxt, &rqst->rq_snd_buf);
+ if (rc)
+ goto drop_connection;
+ return rc;
+
+drop_connection:
+ pr_info("Failed to send backwards request\n");
+ svc_rdma_put_context(ctxt, 1);
+ xprt_disconnect_done(xprt);
+ return -ENOTCONN;
+}
+
+/* Take an RPC request and sent it on the passive end of a
+ * transport connection.
+ */
+static int
+xprt_rdma_bc_send_request(struct rpc_task *task)
+{
+ struct rpc_rqst *rqst = task->tk_rqstp;
+ struct svc_xprt *sxprt = rqst->rq_xprt->bc_xprt;
+ struct svcxprt_rdma *rdma;
+ u32 len;
+
+ pr_info("%s: sending request with xid: %08x\n",
+ __func__, be32_to_cpu(rqst->rq_xid));
+
+ if (!mutex_trylock(&sxprt->xpt_mutex)) {
+ rpc_sleep_on(&sxprt->xpt_bc_pending, task, NULL);
+ if (!mutex_trylock(&sxprt->xpt_mutex))
+ return -EAGAIN;
+ rpc_wake_up_queued_task(&sxprt->xpt_bc_pending, task);
+ }
+
+ len = -ENOTCONN;
+ rdma = container_of(sxprt, struct svcxprt_rdma, sc_xprt);
+ if (!test_bit(XPT_DEAD, &sxprt->xpt_flags))
+ len = rpcrdma_bc_send_request(rdma, rqst);
+
+ mutex_unlock(&sxprt->xpt_mutex);
+
+ if (len < 0)
+ return len;
+ return 0;
+}
+
+static void
+xprt_rdma_bc_close(struct rpc_xprt *xprt)
+{
+ pr_info("RPC: %s: xprt %p\n", __func__, xprt);
+}
+
+static void
+xprt_rdma_bc_dest(struct rpc_xprt *xprt)
+{
+ pr_info("RPC: %s: xprt %p\n", __func__, xprt);
+
+ xprt_free(xprt);
+ module_put(THIS_MODULE);
+}
+
+/* It shouldn't matter if the number of backchannel session slots
+ * doesn't match the number of RPC/RDMA credits. That just means
+ * one or the other will have extra slots that aren't used.
+ */
+static struct rpc_xprt *
+xprt_setup_rdma_bc(struct xprt_create *args)
+{
+ struct rpc_xprt *xprt;
+ struct rpcrdma_xprt *new_xprt;
+
+ if (args->addrlen > sizeof(xprt->addr)) {
+ dprintk("RPC: %s: address too large\n", __func__);
+ return ERR_PTR(-EBADF);
+ }
+
+ xprt = xprt_alloc(args->net, sizeof(*new_xprt),
+ RPCRDMA_MAX_BC_REQUESTS,
+ RPCRDMA_MAX_BC_REQUESTS);
+ if (xprt == NULL) {
+ dprintk("RPC: %s: couldn't allocate rpc_xprt\n",
+ __func__);
+ return ERR_PTR(-ENOMEM);
+ }
+
+ xprt->timeout = &xprt_rdma_default_timeout;
+ xprt_set_bound(xprt);
+ xprt_set_connected(xprt);
+ xprt->bind_timeout = RPCRDMA_BIND_TO;
+ xprt->reestablish_timeout = RPCRDMA_INIT_REEST_TO;
+ xprt->idle_timeout = RPCRDMA_IDLE_DISC_TO;
+
+ xprt->prot = XPRT_TRANSPORT_BC_RDMA;
+ xprt->tsh_size = RPCRDMA_HDRLEN_MIN / sizeof(__be32);
+ xprt->ops = &xprt_rdma_bc_procs;
+
+ memcpy(&xprt->addr, args->dstaddr, args->addrlen);
+ xprt->addrlen = args->addrlen;
+ xprt_rdma_format_addresses(xprt, (struct sockaddr *)&xprt->addr);
+ xprt->resvport = 0;
+
+ xprt->max_payload = xprt_rdma_max_inline_read;
+
+ new_xprt = rpcx_to_rdmax(xprt);
+ new_xprt->rx_buf.rb_bc_max_requests = xprt->max_reqs;
+
+ xprt_get(xprt);
+ args->bc_xprt->xpt_bc_xprt = xprt;
+ xprt->bc_xprt = args->bc_xprt;
+
+ if (!try_module_get(THIS_MODULE))
+ goto out_fail;
+
+ return xprt;
+
+out_fail:
+ xprt_rdma_free_addresses(xprt);
+ args->bc_xprt->xpt_bc_xprt = NULL;
+ xprt_put(xprt);
+ xprt_free(xprt);
+ return ERR_PTR(-EINVAL);
+}
+
+#endif /* CONFIG_SUNRPC_BACKCHANNEL */
+
/*
* Plumbing for rpc transport switch and kernel module
*/
.setup = xprt_setup_rdma,
};
+#if defined(CONFIG_SUNRPC_BACKCHANNEL)
+
+static struct rpc_xprt_ops xprt_rdma_bc_procs = {
+ .reserve_xprt = xprt_reserve_xprt_cong,
+ .release_xprt = xprt_release_xprt_cong,
+ .alloc_slot = xprt_alloc_slot,
+ .release_request = xprt_release_rqst_cong,
+ .buf_alloc = xprt_rdma_bc_allocate,
+ .buf_free = xprt_rdma_bc_free,
+ .send_request = xprt_rdma_bc_send_request,
+ .set_retrans_timeout = xprt_set_retrans_timeout_def,
+ .close = xprt_rdma_bc_close,
+ .destroy = xprt_rdma_bc_dest,
+ .print_stats = xprt_rdma_print_stats
+};
+
+static struct xprt_class xprt_rdma_bc = {
+ .list = LIST_HEAD_INIT(xprt_rdma_bc.list),
+ .name = "rdma backchannel",
+ .owner = THIS_MODULE,
+ .ident = XPRT_TRANSPORT_BC_RDMA,
+ .setup = xprt_setup_rdma_bc,
+};
+
+#endif /* CONFIG_SUNRPC_BACKCHANNEL */
+
void xprt_rdma_cleanup(void)
{
int rc;
__func__, rc);
frwr_destroy_recovery_wq();
+
+#if defined(CONFIG_SUNRPC_BACKCHANNEL)
+ rc = xprt_unregister_transport(&xprt_rdma_bc);
+ if (rc)
+ dprintk("RPC: %s: xprt_unregister(bc) returned %i\n",
+ __func__, rc);
+#endif
}
int xprt_rdma_init(void)
return rc;
}
+#if defined(CONFIG_SUNRPC_BACKCHANNEL)
+ rc = xprt_register_transport(&xprt_rdma_bc);
+ if (rc) {
+ xprt_unregister_transport(&xprt_rdma);
+ frwr_destroy_recovery_wq();
+ return rc;
+ }
+#endif
+
dprintk("RPCRDMA Module Init, register RPC RDMA transport\n");
dprintk("Defaults:\n");