#include <rdma/rdma_cm.h>
#include <rdma/ib_verbs.h>
#include <rdma/ib_fmr_pool.h>
-#include <linux/pagemap.h>
+#include <rdma/ib_umem.h>
#include <net/tcp.h> /* for memcpy_toiovec */
#include <asm/io.h>
#include <asm/uaccess.h>
#include <linux/delay.h>
#include "sdp.h"
-static int sdp_post_srcavail(struct sock *sk, struct tx_srcavail_state *tx_sa,
- int page_idx, int off, size_t len)
+static int sdp_post_srcavail(struct sock *sk, struct tx_srcavail_state *tx_sa)
{
struct sdp_sock *ssk = sdp_sk(sk);
struct sk_buff *skb;
int payload_len;
+ struct page *payload_pg;
+ int off, len;
+ struct ib_umem_chunk *chunk;
WARN_ON(ssk->tx_sa);
BUG_ON(!tx_sa);
BUG_ON(!tx_sa->fmr || !tx_sa->fmr->fmr->lkey);
+ BUG_ON(!tx_sa->umem);
+ BUG_ON(!tx_sa->umem->chunk_list.next);
+
+ chunk = list_entry(tx_sa->umem->chunk_list.next, struct ib_umem_chunk, list);
+ BUG_ON(!chunk->nmap);
+
+ off = tx_sa->umem->offset;
+ len = tx_sa->umem->length;
tx_sa->bytes_sent = tx_sa->bytes_acked = 0;
if (!skb) {
return -ENOMEM;
}
- sdp_prf1(sk, skb, "sending SrcAvail");
+ sdp_dbg_data(sk, "sending SrcAvail\n");
TX_SRCAVAIL_STATE(skb) = tx_sa; /* tx_sa is hanged on the skb
* but continue to live after skb is freed */
ssk->tx_sa = tx_sa;
/* must have payload inlined in SrcAvail packet in combined mode */
- payload_len = min(len, PAGE_SIZE - off);
- get_page(tx_sa->pages[page_idx]);
+ payload_len = tx_sa->umem->page_size - off;
+ payload_pg = sg_page(&chunk->page_list[0]);
+ get_page(payload_pg);
+
+ sdp_dbg_data(sk, "payload: off: 0x%x, pg: %p, len: 0x%x\n",
+ off, payload_pg, payload_len);
+
skb_fill_page_desc(skb, skb_shinfo(skb)->nr_frags,
- tx_sa->pages[page_idx], off, payload_len);
+ payload_pg, off, payload_len);
skb->len += payload_len;
skb->data_len = payload_len;
ssk->write_seq += payload_len;
SDP_SKB_CB(skb)->end_seq += payload_len;
- tx_sa->bytes_sent = len;
+ tx_sa->bytes_sent = tx_sa->umem->length;
tx_sa->bytes_acked = payload_len;
/* TODO: pushing the skb into the tx_queue should be enough */
release_sock(sk);
}
-static int sdp_wait_rdmardcompl(struct sdp_sock *ssk, long *timeo_p, int len,
+static int sdp_wait_rdmardcompl(struct sdp_sock *ssk, long *timeo_p,
int ignore_signals)
{
struct sock *sk = &ssk->isk.sk;
return err;
}
-static int sdp_wait_rdma_wr_finished(struct sdp_sock *ssk, long *timeo_p)
+static void sdp_wait_rdma_wr_finished(struct sdp_sock *ssk)
{
struct sock *sk = &ssk->isk.sk;
- int err = 0;
- long current_timeo = *timeo_p;
+ long timeo = HZ * 5; /* Timeout for for RDMA read */
DEFINE_WAIT(wait);
sdp_dbg_data(sk, "Sleep till RDMA wr finished.\n");
while (1) {
- prepare_to_wait(sk->sk_sleep, &wait, TASK_INTERRUPTIBLE);
+ prepare_to_wait(sk->sk_sleep, &wait, TASK_UNINTERRUPTIBLE);
- if (unlikely(!ssk->qp_active)) {
- err = -EPIPE;
- sdp_warn(sk, "Socket closed\n");
- break;
- }
-
- if (unlikely(!*timeo_p)) {
- err = -EAGAIN;
- sdp_warn(sk, "Timedout\n");
+ if (!ssk->tx_ring.rdma_inflight->busy) {
+ sdp_dbg_data(sk, "got rdma cqe\n");
break;
}
- if (unlikely(signal_pending(current))) {
- err = sock_intr_errno(*timeo_p);
- sdp_warn(sk, "Signalled\n");
+ if (!ssk->qp_active) {
+ sdp_warn(sk, "QP destroyed\n");
break;
}
- if (!ssk->tx_ring.rdma_inflight->busy) {
- sdp_dbg_data(sk, "got rdma cqe\n");
+ if (!timeo) {
+ sdp_warn(sk, "Panic: Timed out waiting for RDMA read\n");
+ WARN_ON(1);
break;
}
posts_handler_put(ssk);
sdp_prf1(sk, NULL, "Going to sleep");
- sk_wait_event(sk, ¤t_timeo,
+ sk_wait_event(sk, &timeo,
!ssk->tx_ring.rdma_inflight->busy);
sdp_prf1(sk, NULL, "Woke up");
sdp_dbg_data(&ssk->isk.sk, "woke up sleepers\n");
posts_handler_get(ssk);
-
- *timeo_p = current_timeo;
}
finish_wait(sk->sk_sleep, &wait);
- if (ssk->tx_ring.rdma_inflight->busy) {
- sdp_warn(sk, "RDMA reads are in the air: %d\n",
- ssk->tx_ring.rdma_inflight->busy);
- } else
- sdp_dbg_data(sk, "Finished waiting - no rdma's inflight\n");
-
- return err;
+ sdp_dbg_data(sk, "Finished waiting\n");
}
int sdp_post_rdma_rd_compl(struct sdp_sock *ssk,
return;
}
-static int sdp_get_user_pages(struct page **pages, const unsigned int nr_pages,
- unsigned long uaddr, int rw)
-{
- int res, i;
-
- /* Try to fault in all of the necessary pages */
- down_read(¤t->mm->mmap_sem);
- /* rw==READ means read from drive, write into memory area */
- res = get_user_pages(
- current,
- current->mm,
- uaddr,
- nr_pages,
- rw == READ,
- 0, /* don't force */
- pages,
- NULL);
- up_read(¤t->mm->mmap_sem);
-
- /* Errors and no page mapped should return here */
- if (res < nr_pages)
- return res;
-
- for (i=0; i < nr_pages; i++) {
- /* FIXME: flush superflous for rw==READ,
- * probably wrong function for rw==WRITE
- */
- flush_dcache_page(pages[i]);
- }
-
- return nr_pages;
-}
-
-int sdp_get_pages(struct sock *sk, struct page **pages, int page_cnt,
- unsigned long addr)
+static int sdp_alloc_fmr(struct sock *sk, void *uaddr, size_t len,
+ struct ib_pool_fmr **_fmr, struct ib_umem **_umem)
{
- int done_pages = 0;
-
- sdp_dbg_data(sk, "count: 0x%x addr: 0x%lx\n", page_cnt, addr);
+ struct ib_pool_fmr *fmr;
+ struct ib_umem *umem;
+ u64 *pages;
+ struct ib_umem_chunk *chunk;
+ int n, j, k;
+ int rc = 0;
- addr &= PAGE_MASK;
- if (segment_eq(get_fs(), KERNEL_DS)) {
- for (done_pages = 0; done_pages < page_cnt; done_pages++) {
- pages[done_pages] = virt_to_page(addr);
- if (!pages[done_pages])
- break;
- get_page(pages[done_pages]);
- addr += PAGE_SIZE;
- }
- } else {
- done_pages = sdp_get_user_pages(pages, page_cnt, addr, WRITE);
+ if (len > SDP_MAX_RDMA_READ_LEN) {
+ sdp_dbg_data(sk, "len:0x%lx > FMR_SIZE: 0x%lx\n",
+ len, SDP_MAX_RDMA_READ_LEN);
+ len = SDP_MAX_RDMA_READ_LEN;
}
- if (unlikely(done_pages != page_cnt))
- goto err;
+ sdp_dbg_data(sk, "user buf: %p, len:0x%lx\n", uaddr, len);
- return 0;
+ umem = ib_umem_get(&sdp_sk(sk)->context, (unsigned long)uaddr, len,
+ IB_ACCESS_REMOTE_WRITE, 0);
-err:
- sdp_warn(sk, "Error getting pages. done_pages: %d page_cnt: %d\n",
- done_pages, page_cnt);
- for (; done_pages > 0; done_pages--)
- page_cache_release(pages[done_pages - 1]);
+ if (IS_ERR(umem)) {
+ rc = PTR_ERR(umem);
+ sdp_warn(sk, "Error getting umem: %d\n", rc);
+ goto err_umem_get;
+ }
- return -1;
-}
+ sdp_dbg_data(sk, "umem->offset = 0x%x, length = 0x%lx\n",
+ umem->offset, umem->length);
-static void sdp_put_pages(struct sock *sk, struct page **pages, int page_cnt)
-{
- int i;
- sdp_dbg_data(sk, "count: %d\n", page_cnt);
+ pages = (u64 *) __get_free_page(GFP_KERNEL);
+ if (!pages)
+ goto err_pages_alloc;
- for (i = 0; i < page_cnt; i++) {
- set_page_dirty_lock(pages[i]);
- page_cache_release(pages[i]);
- }
-}
+ n = 0;
-static int sdp_map_dma(struct sock *sk, u64 *addrs, struct page **pages,
- int nr_pages, size_t offset, size_t count)
-{
- struct ib_device *dev = sdp_sk(sk)->ib_device;
- int i = 0;
- sdp_dbg_data(sk, "map dma offset: 0x%lx count: 0x%lx\n", offset, count);
-
-#define map_page(p, o, l) ({\
- u64 addr = ib_dma_map_page(dev, p, o, l, DMA_TO_DEVICE); \
- if (ib_dma_mapping_error(dev, addr)) { \
- sdp_warn(sk, "Error mapping page %p off: 0x%lx len: 0x%lx\n", \
- p, o, l); \
- goto err; \
- } \
- addr; \
-})
-
- if (nr_pages > 1) {
- size_t length = PAGE_SIZE - offset;
- addrs[0] = map_page(pages[0], offset, length);
- count -= length;
- for (i=1; i < nr_pages ; i++) {
- length = count < PAGE_SIZE ? count : PAGE_SIZE;
- addrs[i] = map_page(pages[i], 0UL, length);
- count -= PAGE_SIZE;
+ list_for_each_entry(chunk, &umem->chunk_list, list) {
+ for (j = 0; j < chunk->nmap; ++j) {
+ len = sg_dma_len(&chunk->page_list[j]) >> PAGE_SHIFT;
+ for (k = 0; k < len; ++k) {
+ pages[n++] = sg_dma_address(&chunk->page_list[j]) +
+ umem->page_size * k;
+ }
}
- } else
- addrs[0] = map_page(pages[0], offset, count);
+ }
- return 0;
-err:
- for (; i > 0; i--) {
- ib_dma_unmap_page(dev, addrs[i], PAGE_SIZE, DMA_TO_DEVICE);
+ fmr = ib_fmr_pool_map_phys(sdp_sk(sk)->sdp_dev->fmr_pool, pages, n, 0);
+ if (IS_ERR(fmr)) {
+ sdp_warn(sk, "Error allocating fmr: %ld\n", PTR_ERR(fmr));
+ goto err_fmr_alloc;
}
- return -1;
-}
-void sdp_unmap_dma(struct sock *sk, u64 *addrs, int page_cnt)
-{
- int i;
- struct ib_device *dev = sdp_sk(sk)->ib_device;
+ free_page((unsigned long) pages);
- sdp_dbg_data(sk, "count: %d\n", page_cnt);
+ *_umem = umem;
+ *_fmr = fmr;
- for (i = 0; i < page_cnt; i++)
- ib_dma_unmap_page(dev, addrs[i], PAGE_SIZE, DMA_TO_DEVICE);
-}
+ return 0;
-static int sdp_map_dma_sge(struct sock *sk, struct ib_sge *sge, int page_cnt,
- struct page **pages, int offset, int len)
-{
- struct ib_device *dev = sdp_sk(sk)->ib_device;
- int i;
- int left = len;
- sdp_dbg_data(sk, "offset: %d len: %d\n", offset, len);
-
- for (i = 0; i < page_cnt; i++) {
- int o = i == 0 ? offset : 0;
- int l = MIN(left, PAGE_SIZE - o);
-
- sge[i].lkey = sdp_sk(sk)->sdp_dev->mr->lkey;
- sge[i].length = l;
-
- sge[i].addr = ib_dma_map_page(dev,
- pages[i],
- 0, /* map page with offset (fbo) not working */
- l + o, /* compensate on 0 offset */
- DMA_FROM_DEVICE);
- if (ib_dma_mapping_error(dev, sge[i].addr)) {
- sdp_warn(sk, "Error map page 0x%llx off: %d len: %d\n",
- sge[i].addr, o, l);
- goto err;
- }
- sge[i].addr += o;
+err_fmr_alloc:
+ free_page((unsigned long) pages);
- sdp_dbg_data(sk, "mapping %03d: page: %p o: %d l: %d | "
- "addr: 0x%llx length: %d\n",
- i, pages[i], o, l, sge[i].addr, sge[i].length);
- left -= l;
- }
+err_pages_alloc:
+ ib_umem_release(umem);
- WARN_ON(left != 0);
+err_umem_get:
- return 0;
-err:
- for (; i > 0; i--) {
- ib_dma_unmap_page(dev, sge[i].addr, PAGE_SIZE, DMA_FROM_DEVICE);
- }
- return -1;
+ return rc;
}
-static void sdp_unmap_dma_sge(struct sock *sk, struct ib_sge *sge,
- int page_cnt, int offset, int len)
+void sdp_free_fmr(struct sock *sk, struct ib_pool_fmr **_fmr, struct ib_umem **_umem)
{
- int i;
- struct ib_device *dev = sdp_sk(sk)->ib_device;
+ if (!sdp_sk(sk)->qp_active)
+ return;
- sdp_dbg_data(sk, "count: %d\n", page_cnt);
+ ib_fmr_pool_unmap(*_fmr);
+ *_fmr = NULL;
- for (i = 0; i < page_cnt; i++) {
- int l = PAGE_SIZE;
-
- if (i == page_cnt - 1) {
- /* Last page */
- l = (len + offset) & (PAGE_SIZE - 1);
- if (l == 0)
- l = PAGE_SIZE;
- }
- ib_dma_unmap_page(dev, sge[i].addr, l, DMA_FROM_DEVICE);
- }
+ ib_umem_release(*_umem);
+ *_umem = NULL;
}
-static struct ib_pool_fmr *sdp_map_fmr(struct sock *sk, int page_cnt,
- u64 *addrs)
+static int sdp_post_rdma_read(struct sock *sk, struct rx_srcavail_state *rx_sa)
{
- struct ib_pool_fmr *fmr;
- int ret = 0;
-
- fmr = ib_fmr_pool_map_phys(sdp_sk(sk)->sdp_dev->fmr_pool, addrs,
- page_cnt, 0);
- if (IS_ERR(fmr)) {
- ret = PTR_ERR(fmr);
- fmr = NULL;
- sdp_warn(sk, "Error allocating fmr: %d\n", ret);
- goto err;
- }
+ struct sdp_sock *ssk = sdp_sk(sk);
+ struct ib_send_wr *bad_wr;
+ struct ib_send_wr wr = { NULL };
+ struct ib_sge sge;
- return fmr;
-err:
- return NULL;
-}
+ wr.opcode = IB_WR_RDMA_READ;
+ wr.next = NULL;
+ wr.wr_id = SDP_OP_RDMA;
+ wr.wr.rdma.rkey = rx_sa->rkey;
+ wr.send_flags = 0;
-enum zcopy_type {
- SDP_ZCOPY_TYPE_RX,
- SDP_ZCOPY_TYPE_TX,
-};
+ ssk->tx_ring.rdma_inflight = rx_sa;
-static struct tx_srcavail_state *sdp_alloc_tx_sa(struct sock *sk, int page_cnt)
-{
- struct tx_srcavail_state *tx_sa;
+ sge.addr = rx_sa->umem->offset;
+ sge.length = rx_sa->umem->length;
+ sge.lkey = rx_sa->fmr->fmr->lkey;
+ wr.wr.rdma.remote_addr = rx_sa->vaddr + rx_sa->used;
+ wr.num_sge = 1;
+ wr.sg_list = &sge;
+ rx_sa->busy++;
- tx_sa = kzalloc(sizeof(struct tx_srcavail_state) +
- sizeof(struct page *) * page_cnt +
- sizeof(u64) * page_cnt, GFP_KERNEL);
- if (!tx_sa)
- return ERR_PTR(-ENOMEM);
-
- tx_sa->pages = (struct page **)(tx_sa+1);
- tx_sa->addrs = (u64 *)(&tx_sa->pages[page_cnt]);
+ wr.send_flags = IB_SEND_SIGNALED;
- return tx_sa;
+ return ib_post_send(ssk->qp, &wr, &bad_wr);
}
int sdp_rdma_to_iovec(struct sock *sk, struct iovec *iov, struct sk_buff *skb,
{
struct sdp_sock *ssk = sdp_sk(sk);
struct rx_srcavail_state *rx_sa = RX_SRCAVAIL_STATE(skb);
+ int got_srcavail_cancel;
int rc = 0;
- struct ib_send_wr *bad_wr;
- struct ib_send_wr wr = { NULL };
- long timeo;
- struct ib_sge *sge;
- int sge_left;
- int copied;
- int offset;
sdp_dbg_data(&ssk->isk.sk, "preparing RDMA read."
" len: 0x%x. buffer len: 0x%lx\n", len, iov->iov_len);
- if (len > rx_sa->len)
- len = rx_sa->len;
-
- offset = (unsigned long)iov->iov_base & (PAGE_SIZE - 1);
- rx_sa->page_cnt = PAGE_ALIGN(len + offset) >> PAGE_SHIFT;
- sdp_dbg_data(sk, "page_cnt = 0x%x len:0x%x offset: 0x%x\n",
- rx_sa->page_cnt, len, offset);
+ sock_hold(sk, SOCK_REF_RDMA_RD);
- rx_sa->pages =
- (struct page **) kzalloc(sizeof(struct page *) * rx_sa->page_cnt +
- sizeof(struct ib_sge) * rx_sa->page_cnt, GFP_KERNEL);
- if (!rx_sa->pages) {
- sdp_warn(sk, "Error allocating zcopy context\n");
- goto err_alloc_zcopy;
+ if (len > rx_sa->len) {
+ sdp_warn(sk, "len:0x%x > rx_sa->len: 0x%x\n", len, rx_sa->len);
+ WARN_ON(1);
+ len = rx_sa->len;
}
- rx_sa->sge = (struct ib_sge *)(&rx_sa->pages[rx_sa->page_cnt]);
-
- rc = sdp_get_pages(sk, rx_sa->pages, rx_sa->page_cnt,
- (unsigned long)iov->iov_base);
- if (rc)
- goto err_get_pages;
-
- rc = sdp_map_dma_sge(sk, rx_sa->sge, rx_sa->page_cnt, rx_sa->pages,
- offset, len);
- if (rc)
- goto err_map_dma;
-
- wr.opcode = IB_WR_RDMA_READ;
- wr.next = NULL;
- wr.wr_id = SDP_OP_RDMA;
- wr.wr.rdma.rkey = rx_sa->rkey;
- wr.send_flags = 0;
+ rc = sdp_alloc_fmr(sk, iov->iov_base, len, &rx_sa->fmr, &rx_sa->umem);
+ if (rc) {
+ sdp_warn(sk, "Error allocating fmr: %d\n", rc);
+ goto err_alloc_fmr;
+ }
- timeo = sock_sndtimeo(sk, 0);
+ rc = sdp_post_rdma_read(sk, rx_sa);
+ if (unlikely(rc)) {
+ sdp_warn(sk, "ib_post_send failed with status %d.\n", rc);
+ sdp_set_error(&ssk->isk.sk, -ECONNRESET);
+ wake_up(&ssk->wq);
+ goto err_post_send;
+ }
- ssk->tx_ring.rdma_inflight = rx_sa;
- copied = 0;
- sge = rx_sa->sge;
- sge_left = rx_sa->page_cnt;
- do {
- /* Len error when using sge_cnt > 30 ?? */
- int sge_cnt = min(sge_left, ssk->max_sge - 2);
-
- wr.wr.rdma.remote_addr = rx_sa->vaddr + copied + rx_sa->used;
- wr.num_sge = sge_cnt;
- wr.sg_list = sge;
- rx_sa->busy++;
-
- sdp_dbg_data(sk, "rdma read: sge_cnt: %d vaddr: 0x%llx "
- "copied: 0x%x rkey: 0x%x in_bytes: 0x%x\n",
- sge_cnt, wr.wr.rdma.remote_addr, copied, rx_sa->rkey,
- sge_bytes(sge, sge_cnt));
- if (sge_left == sge_cnt) {
- wr.send_flags = IB_SEND_SIGNALED;
- sdp_dbg_data(sk, "last wr is signaled\n");
- }
- sdp_prf1(sk, NULL, "TX: RDMA read 0x%x bytes %s",
- sge_bytes(sge, sge_cnt),
- wr.send_flags & IB_SEND_SIGNALED ? "Signalled" : "");
-
- rc = ib_post_send(ssk->qp, &wr, &bad_wr);
- if (unlikely(rc)) {
- sdp_warn(sk, "ib_post_send failed with status %d.\n",
- rc);
- sdp_set_error(&ssk->isk.sk, -ECONNRESET);
- wake_up(&ssk->wq);
- break;
- }
+ sdp_prf(sk, skb, "Finished posting(rc=%d), now to wait", rc);
- copied += sge_bytes(sge, sge_cnt);
- sge_left -= sge_cnt;
- sge += sge_cnt;
+ got_srcavail_cancel = ssk->srcavail_cancel_mseq > rx_sa->mseq;
- if (unlikely(ssk->srcavail_cancel_mseq > rx_sa->mseq)) {
- sdp_warn(sk, "got SrcAvailCancel - Aborting RDMA\n");
- rc = -EAGAIN;
- }
- } while (!rc && sge_left > 0);
+ sdp_arm_tx_cq(sk);
- if (!rc || rc == -EAGAIN) {
- int got_srcavail_cancel = (rc == -EAGAIN);
+ sdp_wait_rdma_wr_finished(ssk);
- sdp_arm_tx_cq(sk);
+ sdp_prf(sk, skb, "Finished waiting(rc=%d)", rc);
+ if (!ssk->qp_active) {
+ sdp_warn(sk, "QP destroyed during RDMA read\n");
+ rc = -EPIPE;
+ goto err_post_send;
+ }
- rc = sdp_wait_rdma_wr_finished(ssk, &timeo);
+ /* Ignore any data copied after getting SrcAvailCancel */
+ if (!got_srcavail_cancel) {
+ int copied = rx_sa->umem->length;
- /* Ignore any data copied after getting SrcAvailCancel */
- if (!got_srcavail_cancel && !rc) {
- sdp_update_iov_used(sk, iov, copied);
- rx_sa->used += copied;
- atomic_add(copied, &ssk->rcv_nxt);
- }
+ sdp_update_iov_used(sk, iov, copied);
+ rx_sa->used += copied;
+ atomic_add(copied, &ssk->rcv_nxt);
}
- if (ssk->tx_ring.rdma_inflight->busy) {
- sdp_warn(sk, "Aborted waiting for RDMA completion before finished!"
- "Memory might be corrupted.\n");
- WARN_ON(1);
- }
+ ssk->tx_ring.rdma_inflight = NULL;
+
+err_post_send:
+ sdp_free_fmr(sk, &rx_sa->fmr, &rx_sa->umem);
+err_alloc_fmr:
if (rc && ssk->qp_active) {
- /* post rdma, wait_for_compl or post rdma_rd_comp failed -
- * post sendsm */
- sdp_warn(sk, "post rdma, wait_for_compl "
- "or post rdma_rd_comp failed - post sendsm\n");
+ sdp_warn(sk, "Couldn't do RDMA - post sendsm\n");
rx_sa->flags |= RX_SA_ABORTED;
- ssk->rx_sa = NULL; /* TODO: change it into SDP_MID_DATA and get
- the dirty logic from recvmsg */
}
- ssk->tx_ring.rdma_inflight = NULL;
-
- sdp_unmap_dma_sge(sk, rx_sa->sge, rx_sa->page_cnt, offset, len);
-err_map_dma:
- sdp_put_pages(sk, rx_sa->pages, rx_sa->page_cnt);
-err_get_pages:
- kfree(rx_sa->pages);
- rx_sa->pages = NULL;
- rx_sa->sge = NULL;
-err_alloc_zcopy:
+ sock_put(sk, SOCK_REF_RDMA_RD);
return rc;
}
return ret;
}
-static int sdp_rdma_adv_single(struct sock *sk,
- struct tx_srcavail_state *tx_sa, struct iovec *iov,
- int page_cnt, int offset, int len)
+static int do_sdp_sendmsg_zcopy(struct sock *sk, struct tx_srcavail_state *tx_sa,
+ struct iovec *iov, long *timeo)
{
struct sdp_sock *ssk = sdp_sk(sk);
- long timeo = SDP_SRCAVAIL_ADV_TIMEOUT;
- unsigned long lock_flags;
int rc = 0;
+ unsigned long lock_flags;
- sdp_dbg_data(sk, "off: 0x%x len: 0x%x page_cnt: 0x%x\n",
- offset, len, page_cnt);
+ rc = sdp_alloc_fmr(sk, iov->iov_base, iov->iov_len,
+ &tx_sa->fmr, &tx_sa->umem);
+ if (rc) {
+ sdp_warn(sk, "Error allocating fmr: %d\n", rc);
+ goto err_alloc_fmr;
+ }
if (tx_slots_free(ssk) == 0) {
- rc = wait_for_sndbuf(sk, &timeo);
+ rc = wait_for_sndbuf(sk, timeo);
if (rc) {
sdp_warn(sk, "Couldn't get send buffer\n");
- return rc;
+ goto err_no_tx_slots;
}
}
- tx_sa->fmr = sdp_map_fmr(sk, page_cnt, &tx_sa->addrs[0]);
- if (!tx_sa->fmr) {
- sdp_warn(sk, "Error allocating fmr\n");
- return -ENOMEM;
- }
-
- rc = sdp_post_srcavail(sk, tx_sa, 0, offset, len);
+ rc = sdp_post_srcavail(sk, tx_sa);
if (rc) {
sdp_dbg(sk, "Error posting SrcAvail\n");
goto err_abort_send;
}
- rc = sdp_wait_rdmardcompl(ssk, &timeo, len, 0);
+ rc = sdp_wait_rdmardcompl(ssk, timeo, 0);
if (unlikely(rc)) {
enum tx_sa_flag f = tx_sa->abort_flags;
if (f & TX_SA_SENDSM) {
- sdp_dbg_data(sk, "got SendSM. use SEND verb.\n");
+ sdp_dbg_data(sk, "Got SendSM. use SEND verb.\n");
} else if (f & TX_SA_ERROR) {
sdp_dbg_data(sk, "SrcAvail error completion\n");
sdp_reset(sk);
} else if (ssk->qp_active) {
- if (f & TX_SA_INTRRUPTED)
- sdp_dbg_data(sk, "SrcAvail error completion\n");
- else
- sdp_warn(sk, "SrcAvail send aborted flag = 0x%x.\n", f);
+ if (!(f & TX_SA_INTRRUPTED))
+ sdp_warn(sk, "abort_flag = 0x%x.\n", f);
sdp_post_srcavail_cancel(sk);
/* Wait for RdmaRdCompl/SendSM to
* finish the transaction */
- timeo = 2 * HZ;
+ *timeo = 2 * HZ;
sdp_dbg_data(sk, "Waiting for SendSM\n");
- sdp_wait_rdmardcompl(ssk, &timeo, len, 1);
+ sdp_wait_rdmardcompl(ssk, timeo, 1);
sdp_dbg_data(sk, "finished waiting\n");
} else {
sdp_warn(sk, "QP was destroyed while waiting\n");
}
-
- goto err_abort_send;
+ } else {
+ sdp_dbg_data(sk, "got RdmaRdCompl\n");
}
- sdp_prf1(sk, NULL, "got RdmaRdCompl");
-
-err_abort_send:
- sdp_update_iov_used(sk, iov, tx_sa->bytes_acked);
-
- ib_fmr_pool_unmap(tx_sa->fmr);
spin_lock_irqsave(&ssk->tx_sa_lock, lock_flags);
ssk->tx_sa = NULL;
spin_unlock_irqrestore(&ssk->tx_sa_lock, lock_flags);
- return rc;
-}
+err_abort_send:
+ sdp_update_iov_used(sk, iov, tx_sa->bytes_acked);
-static inline size_t get_page_count(unsigned long uaddr, size_t count)
-{
- unsigned long end = (uaddr + count + PAGE_SIZE - 1) >> PAGE_SHIFT;
- unsigned long start = uaddr >> PAGE_SHIFT;
- return end - start;
+err_no_tx_slots:
+ sdp_free_fmr(sk, &tx_sa->fmr, &tx_sa->umem);
+
+err_alloc_fmr:
+ return rc;
}
int sdp_sendmsg_zcopy(struct kiocb *iocb, struct sock *sk, struct msghdr *msg,
long timeo;
struct tx_srcavail_state *tx_sa;
int offset;
+ size_t bytes_to_copy = 0;
int copied = 0;
- int page_cnt;
-
sdp_dbg_data(sk, "%s\n", __func__);
sdp_prf1(sk, NULL, "sdp_sendmsg_zcopy start");
if (ssk->rx_sa) {
posts_handler_get(ssk);
flags = msg->msg_flags;
+
+ iovlen = msg->msg_iovlen;
+ iov = msg->msg_iov;
+
timeo = SDP_SRCAVAIL_ADV_TIMEOUT ;
/* Wait for a connection to finish. */
clear_bit(SOCK_ASYNC_NOSPACE, &sk->sk_socket->flags);
/* Ok commence sending. */
- iovlen = msg->msg_iovlen;
- iov = msg->msg_iov;
offset = (unsigned long)iov->iov_base & (PAGE_SIZE - 1);
sdp_dbg_data(sk, "Sending iov: %p, iovlen: 0x%lx, size: 0x%lx\n",
iov->iov_base, iov->iov_len, size);
goto err;
}
- page_cnt = min(get_page_count((unsigned long)iov->iov_base,
- iov->iov_len), SDP_FMR_SIZE);
-
- tx_sa = sdp_alloc_tx_sa(sk, page_cnt);
- if (IS_ERR(tx_sa)) {
+ tx_sa = kmalloc(sizeof(struct tx_srcavail_state), GFP_KERNEL);
+ if (!tx_sa) {
sdp_warn(sk, "Error allocating zcopy context\n");
rc = -EAGAIN; /* Buffer too big - fallback to bcopy */
goto err_alloc_tx_sa;
}
+ bytes_to_copy = iov->iov_len;
do {
- size_t off = (unsigned long)iov->iov_base & ~PAGE_MASK;
- size_t len = page_cnt * PAGE_SIZE - off;
- if (len > iov->iov_len)
- len = iov->iov_len;
-
- tx_sa->page_cnt = page_cnt;
-
- rc = sdp_get_pages(sk, tx_sa->pages, page_cnt,
- (unsigned long)iov->iov_base);
- if (rc)
- goto err_get_pages;
-
- rc = sdp_map_dma(sk, tx_sa->addrs,
- tx_sa->pages, page_cnt,
- off, len);
- if (rc)
- goto err_map_dma;
-
- rc = sdp_rdma_adv_single(sk, tx_sa, iov, page_cnt, off, len);
- if (rc)
- sdp_dbg_data(sk, "Error sending SrcAvail. rc = %d\n", rc);
-
-
- if (tx_sa->addrs)
- sdp_unmap_dma(sk, tx_sa->addrs, page_cnt);
-err_map_dma:
- sdp_put_pages(sk, tx_sa->pages, page_cnt);
-err_get_pages:
- page_cnt = min(get_page_count((unsigned long)iov->iov_base,
- iov->iov_len), SDP_FMR_SIZE);
- copied += tx_sa->bytes_acked;
tx_sa_reset(tx_sa);
+
+ rc = do_sdp_sendmsg_zcopy(sk, tx_sa, iov, &timeo);
+
+ if (iov->iov_len < sdp_zcopy_thresh) {
+ sdp_dbg_data(sk, "0x%lx bytes left, switching to bcopy\n",
+ iov->iov_len);
+ break;
+ }
} while (!rc && iov->iov_len > 0 && !tx_sa->abort_flags);
+
kfree(tx_sa);
err_alloc_tx_sa:
err:
+ copied = bytes_to_copy - iov->iov_len;
sdp_prf1(sk, NULL, "sdp_sendmsg_zcopy end rc: %d copied: %d", rc, copied);
posts_handler_put(ssk);
+
release_sock(sk);
+
sock_put(&ssk->isk.sk, SOCK_REF_ZCOPY);
return rc ?: copied;
}
+void sdp_abort_srcavail(struct sock *sk)
+{
+ struct sdp_sock *ssk = sdp_sk(sk);
+ struct tx_srcavail_state *tx_sa = ssk->tx_sa;
+ unsigned long flags;
+
+ if (!tx_sa)
+ return;
+
+ cancel_delayed_work(&ssk->srcavail_cancel_work);
+ flush_scheduled_work();
+
+ spin_lock_irqsave(&ssk->tx_sa_lock, flags);
+
+ sdp_free_fmr(sk, &tx_sa->fmr, &tx_sa->umem);
+
+ ssk->tx_sa = NULL;
+
+ spin_unlock_irqrestore(&ssk->tx_sa_lock, flags);
+}
+
+void sdp_abort_rdma_read(struct sock *sk)
+{
+ struct sdp_sock *ssk = sdp_sk(sk);
+ struct rx_srcavail_state *rx_sa = ssk->rx_sa;
+
+ if (!rx_sa)
+ return;
+
+ sdp_free_fmr(sk, &rx_sa->fmr, &rx_sa->umem);
+
+ ssk->rx_sa = NULL;
+}