#define SDP_SRCAVAIL_CANCEL_TIMEOUT (HZ * 5)
#define SDP_SRCAVAIL_ADV_TIMEOUT (1 * HZ)
+#define SDP_SRCAVAIL_PAYLOAD_LEN 1
#define MAX_ZCOPY_SEND_SIZE (512 * 1024)
SDP_MID_HELLO = 0x0,
SDP_MID_HELLO_ACK = 0x1,
SDP_MID_DISCONN = 0x2,
+ SDP_MID_ABORT = 0x3,
SDP_MID_SENDSM = 0x4,
SDP_MID_RDMARDCOMPL = 0x6,
SDP_MID_SRCAVAIL_CANCEL = 0x8,
SDP_MID_CHRCVBUF = 0xB,
SDP_MID_CHRCVBUF_ACK = 0xC,
- SDP_MID_SRCAVAIL = 0xFD,
- SDP_MID_SINKAVAIL = 0xFE,
+ SDP_MID_SINKAVAIL = 0xFD,
+ SDP_MID_SRCAVAIL = 0xFE,
SDP_MID_DATA = 0xFF,
};
/* Advertised buffer stuff */
u32 mseq;
u32 used;
+ u32 reported;
u32 len;
u32 rkey;
u64 vaddr;
u8 busy;
struct ib_pool_fmr *fmr;
- u32 bytes_completed;
- u32 bytes_total;
+ u32 bytes_sent;
+ u32 bytes_acked;
u8 abort;
u32 mseq;
/* sdp_cma.c */
int sdp_cma_handler(struct rdma_cm_id *, struct rdma_cm_event *);
-/* sdp_bcopy.c */
-int sdp_post_credits(struct sdp_sock *ssk);
-
/* sdp_tx.c */
int sdp_tx_ring_create(struct sdp_sock *ssk, struct ib_device *device);
void sdp_tx_ring_destroy(struct sdp_sock *ssk);
int len);
int sdp_get_pages(struct sock *sk, struct page **pages, int page_cnt,
unsigned long addr);
-int sdp_post_rdma_rd_compl(struct sdp_sock *ssk, int copied);
+int sdp_post_rdma_rd_compl(struct sdp_sock *ssk,
+ struct rx_srcavail_state *rx_sa);
int sdp_post_sendsm(struct sdp_sock *ssk);
void srcavail_cancel_timeout(struct work_struct *work);
static char *mid2str[] = {
ENUM2STR(SDP_MID_HELLO),
ENUM2STR(SDP_MID_HELLO_ACK),
+ ENUM2STR(SDP_MID_ABORT),
ENUM2STR(SDP_MID_DISCONN),
ENUM2STR(SDP_MID_SENDSM),
ENUM2STR(SDP_MID_RDMARDCOMPL),
case SDP_MID_HELLO:
hh = (struct sdp_hh *)h;
len += snprintf(buf + len, 255-len,
- "max_adverts: %d majv_minv: %d "
+ "max_adverts: %d majv_minv: 0x%x "
"localrcvsz: %d desremrcvsz: %d |",
hh->max_adverts, hh->majv_minv,
ntohl(hh->localrcvsz),
case SDP_MID_SRCAVAIL:
srcah = (struct sdp_srcah *)(h+1);
- len += snprintf(buf + len, 255-len, " | data_len: %ld |",
+ len += snprintf(buf + len, 255-len, " | payload: %ld, "
+ "len: %d, rkey: 0x%x, vaddr: 0x%llx |",
ntohl(h->len) - sizeof(struct sdp_bsdh) -
- sizeof(struct sdp_srcah));
-
- len += snprintf(buf + len, 255-len,
- " | len: %d, rkey: 0x%x, vaddr: 0x%llx |",
+ sizeof(struct sdp_srcah),
ntohl(srcah->len), ntohl(srcah->rkey),
be64_to_cpu(srcah->vaddr));
break;
#include "sdp_socket.h"
#include "sdp.h"
-#define SDP_MAJV_MINV 0x22
+#define SDP_MAJV_MINV 0x11
enum {
SDP_HH_SIZE = 76,
rx_ring_posted(sdp_sk(sk)));
memset(&hh, 0, sizeof hh);
hh.bsdh.mid = SDP_MID_HELLO;
- hh.bsdh.bufs = htons(remote_credits(sdp_sk(sk)));
hh.bsdh.len = htonl(sizeof(struct sdp_bsdh) + SDP_HH_SIZE);
hh.max_adverts = 1;
hh.majv_minv = SDP_MAJV_MINV;
sdp_init_buffers(sdp_sk(sk), rcvbuf_initial_size);
+ hh.bsdh.bufs = htons(rx_ring_posted(sdp_sk(sk)));
hh.localrcvsz = hh.desremrcvsz = htonl(sdp_sk(sk)->recv_frags *
PAGE_SIZE + sizeof(struct sdp_bsdh));
hh.max_adverts = 0x1;
rx_ring_posted(sdp_sk(child)));
memset(&hah, 0, sizeof hah);
hah.bsdh.mid = SDP_MID_HELLO_ACK;
- hah.bsdh.bufs = htons(remote_credits(sdp_sk(child)));
+ hah.bsdh.bufs = htons(rx_ring_posted(sdp_sk(child)));
hah.bsdh.len = htonl(sizeof(struct sdp_bsdh) + SDP_HAH_SIZE);
hah.majv_minv = SDP_MAJV_MINV;
hah.ext_max_adverts = 1; /* Doesn't seem to be mandated by spec,
if (rc) {
sdp_warn(sk, "Destroy qp !!!!\n");
rdma_reject(id, NULL, 0);
- }
- else
+ } else
rc = rdma_accept(id, NULL);
-
- if (!rc) {
-// sdp_sk(sk)->qp_active = 1;
- rc = sdp_post_credits(sdp_sk(sk)) < 0 ?: 0;
- }
break;
case RDMA_CM_EVENT_CONNECT_ERROR:
sdp_dbg(sk, "RDMA_CM_EVENT_CONNECT_ERROR\n");
rx_sa = RX_SRCAVAIL_STATE(skb);
if (rx_sa->mseq < ssk->srcavail_cancel_mseq) {
rx_sa->aborted = 1;
- sdp_warn(sk, "Ignoring src avail - "
+ sdp_dbg_data(sk, "Ignoring src avail "
"due to SrcAvailCancel\n");
goto skb_cleanup;
}
- avail_bytes_count = rx_sa->len;
+
+ /* if has payload - handle as if MID_DATA */
+ if (rx_sa->used < skb->len) {
+ sdp_dbg_data(sk, "SrcAvail has "
+ "payload: %d/%d\n",
+ rx_sa->used,
+ skb->len);
+ avail_bytes_count = skb->len;
+ } else {
+ sdp_dbg_data(sk, "Finished payload. "
+ "RDMAing: %d/%d\n",
+ rx_sa->used, rx_sa->len);
+ avail_bytes_count = rx_sa->len;
+ }
+
break;
case SDP_MID_DATA:
}
offset = *seq - SDP_SKB_CB(skb)->seq;
-// sdp_warn(sk, "offset = 0x%x, avail_byte_count = 0x%x\n",
-// offset, avail_bytes_count);
if (offset < avail_bytes_count)
goto found_ok_skb;
}
}
if (!(flags & MSG_TRUNC)) {
- if (rx_sa) {
- sdp_dbg_data(sk, "Got srcavail - using RDMA\n");
+ if (rx_sa && rx_sa->used >= skb->len) {
+ /* No more payload - start rdma copy */
+ sdp_dbg_data(sk, "RDMA copy of %ld bytes\n", used);
err = sdp_rdma_to_iovec(sk, msg->msg_iov, skb,
used);
if (err == -EAGAIN) {
}
} else {
err = skb_copy_datagram_iovec(skb, offset,
- /* TODO: skip header? */
- msg->msg_iov, used);
+ /* TODO: skip header? */
+ msg->msg_iov, used);
+ if (rx_sa) {
+ rx_sa->used += used;
+ rx_sa->reported += used;
+ sdp_dbg_data(sk, "copied %ld from "
+ "payload\n", used);
+ }
}
if (err) {
sdp_dbg(sk, "%s: skb_copy_datagram_iovec failed"
if (rx_sa) {
if (ssk->srcavail_cancel_mseq < rx_sa->mseq) {
- rc = sdp_post_rdma_rd_compl(ssk, rx_sa->used);
+ rc = sdp_post_rdma_rd_compl(ssk, rx_sa);
BUG_ON(rc);
}
if (rx_sa->aborted) {
{
}
-
-const struct seq_operations sdpprf_ops = {
+struct seq_operations sdpprf_ops = {
.start = sdpprf_start,
.stop = sdpprf_stop,
.next = sdpprf_next,
return count;
}
-static const struct file_operations sdpprf_fops = {
+static struct file_operations sdpprf_fops = {
.open = sdpprf_open,
.read = seq_read,
.llseek = seq_lseek,
int buffer_size = SDP_HEAD_SIZE + ssk->recv_frags * PAGE_SIZE;
unsigned long max_bytes;
- if (!ssk->qp_active) {
-// sdp_warn(sk, "post rx while qp is not active\n");
+ if (!ssk->qp_active)
return 0;
- }
if (top_mem_usage && (top_mem_usage * 0x100000) <
atomic_read(&sdp_current_mem_usage) * PAGE_SIZE) {
sa->rkey = ntohl(srcah->rkey);
sa->vaddr = be64_to_cpu(srcah->vaddr);
+ atomic_add(skb->len, &ssk->rcv_nxt);
sdp_dbg_data(sk, "queueing SrcAvail. skb_len = %d vaddr = %lld\n",
skb_len, sa->vaddr);
} else {
ssk->srcavail_cancel_mseq = ntohl(h->mseq);
sdp_post_sendsm(ssk);
break;
+ case SDP_MID_ABORT:
+ sdp_dbg_data(sk, "Handling ABORT\n");
+ sdp_prf(sk, NULL, "Handling ABORT");
+ sdp_reset(sk);
+ __kfree_skb(skb);
+ break;
case SDP_MID_DISCONN:
sdp_dbg_data(sk, "Handling DISCONN\n");
sdp_prf(sk, NULL, "Handling DISCONN");
sk_send_sigurg(sk);*/
skb_pull(skb, sizeof(struct sdp_bsdh));
+ if (h->mid == SDP_MID_SRCAVAIL) {
+ skb_pull(skb, sizeof(struct sdp_srcah));
+ }
if (unlikely(h->mid == SDP_MID_DATA && skb->len == 0)) {
/* Credit update is valid even after RCV_SHUTDOWN */
return NULL;
}
skb->len = wc->byte_len;
+ skb->data = skb->head;
+
+ h = (struct sdp_bsdh *)skb->data;
+
if (likely(wc->byte_len > SDP_HEAD_SIZE))
skb->data_len = wc->byte_len - SDP_HEAD_SIZE;
else
skb->data_len = 0;
- skb->data = skb->head;
+
+
+ if (h->mid == SDP_MID_SRCAVAIL)
+ skb->data_len -= sizeof(struct sdp_srcah);
+
#ifdef NET_SKBUFF_DATA_USES_OFFSET
skb->tail = skb_headlen(skb);
#else
skb->tail = skb->head + skb_headlen(skb);
#endif
- h = (struct sdp_bsdh *)skb->data;
SDP_DUMP_PACKET(&ssk->isk.sk, "RX", skb, h);
skb_reset_transport_header(skb);
busy: 1,
};
+static int sdp_update_iov_used(struct sock *sk, struct iovec *iov, int len);
+
static int sdp_post_srcavail(struct sock *sk, struct tx_srcavail_state *tx_sa,
- int off, size_t len)
+ int page_idx, struct iovec *iov, int off, size_t len)
{
struct sdp_sock *ssk = sdp_sk(sk);
struct sdp_srcah *srcah;
struct sk_buff *skb;
+ int payload_len;
WARN_ON(ssk->tx_sa);
BUG_ON(!tx_sa);
BUG_ON(!tx_sa->fmr || !tx_sa->fmr->fmr->lkey);
- skb = sdp_stream_alloc_skb(&ssk->isk.sk,
+ skb = sk_stream_alloc_skb(&ssk->isk.sk,
sizeof(struct sdp_bsdh) +
- sizeof(struct sdp_srcah),
+ sizeof(struct sdp_srcah) +
+ SDP_SRCAVAIL_PAYLOAD_LEN,
GFP_KERNEL);
if (!skb) {
return -ENOMEM;
srcah->rkey = htonl(tx_sa->fmr->fmr->lkey);
srcah->vaddr = cpu_to_be64(off);
+ if (0) {
+ void *payload;
+ payload = skb_put(skb, SDP_SRCAVAIL_PAYLOAD_LEN);
+ memcpy(payload, iov->iov_base, SDP_SRCAVAIL_PAYLOAD_LEN);
+ payload_len = SDP_SRCAVAIL_PAYLOAD_LEN;
+ } else {
+ /* must have payload inlined in SrcAvail packet in combined mode */
+ payload_len = min(len, PAGE_SIZE - off);
+ get_page(tx_sa->pages[page_idx]);
+ skb_fill_page_desc(skb, skb_shinfo(skb)->nr_frags,
+ tx_sa->pages[page_idx], off, payload_len);
+
+ skb->len += payload_len;
+ skb->data_len = payload_len;
+ skb->truesize += payload_len;
+// sk->sk_wmem_queued += payload_len;
+// sk->sk_forward_alloc -= payload_len;
+
+ }
+
skb_entail(sk, ssk, skb);
+ tx_sa->bytes_sent = len;
+ tx_sa->bytes_acked = payload_len;
+
/* TODO: pushing the skb into the tx_queue should be enough */
return 0;
break;
}
- if (tx_sa->bytes_completed >= len)
+ if (tx_sa->bytes_acked == tx_sa->bytes_sent)
break;
+ else if (tx_sa->bytes_acked > tx_sa->bytes_sent) {
+ err = -EINVAL;
+ sdp_warn(sk, "acked bytes > sent bytes\n");
+ break;
+ }
if (tx_sa->abort) {
sdp_prf1(sk, NULL, "Aborting SrcAvail sending");
posts_handler_put(ssk);
- sk_wait_event(sk, ¤t_timeo, tx_sa->abort &&
- (tx_sa->bytes_completed >= len) && vm_wait);
+ sk_wait_event(sk, ¤t_timeo,
+ tx_sa->abort &&
+ (tx_sa->bytes_acked < tx_sa->bytes_sent) &&
+ vm_wait);
sdp_dbg_data(&ssk->isk.sk, "woke up sleepers\n");
posts_handler_get(ssk);
finish_wait(sk->sk_sleep, &wait);
- sdp_dbg_data(sk, "Finished waiting - RdmaRdCompl: %d bytes, abort: %d\n",
- tx_sa->bytes_completed, tx_sa->abort);
+ sdp_dbg_data(sk, "Finished waiting - RdmaRdCompl: %d/%d bytes, abort: %d\n",
+ tx_sa->bytes_acked, tx_sa->bytes_sent, tx_sa->abort);
if (!ssk->qp_active) {
sdp_warn(sk, "QP destroyed while waiting\n");
return err;
}
-int sdp_post_rdma_rd_compl(struct sdp_sock *ssk, int copied)
+int sdp_post_rdma_rd_compl(struct sdp_sock *ssk,
+ struct rx_srcavail_state *rx_sa)
{
struct sdp_rrch *rrch;
struct sk_buff *skb;
gfp_t gfp_page;
+ int copied = rx_sa->used - rx_sa->reported;
+
+ if (rx_sa->used <= rx_sa->reported)
+ return 0;
if (unlikely(ssk->isk.sk.sk_allocation))
gfp_page = ssk->isk.sk.sk_allocation;
rrch = (struct sdp_rrch *)skb_put(skb, sizeof(*rrch));
rrch->len = htonl(copied);
+ rx_sa->reported += copied;
/* TODO: What if no tx_credits available? */
sdp_post_send(ssk, skb, SDP_MID_RDMARDCOMPL);
goto out;
}
- if (ssk->tx_sa->bytes_completed > bytes_completed) {
- sdp_warn(sk, "tx_sa->bytes_total(%d), "
- "tx_sa->bytes_completed(%d) > bytes_completed(%d)\n",
- ssk->tx_sa->bytes_total,
- ssk->tx_sa->bytes_completed, bytes_completed);
- }
- if (ssk->tx_sa->bytes_total < bytes_completed) {
- sdp_warn(sk, "ssk->tx_sa->bytes_total(%d) < bytes_completed(%d)\n",
- ssk->tx_sa->bytes_total, bytes_completed);
- }
-
- ssk->tx_sa->bytes_completed = bytes_completed;
+ ssk->tx_sa->bytes_acked += bytes_completed;
wake_up(sk->sk_sleep);
sdp_dbg_data(sk, "woke up sleepers\n");
static int sdp_rdma_adv_single(struct sock *sk,
struct tx_srcavail_state *tx_sa, struct iovec *iov,
- int offset, int page_cnt, int len, u64 *addrs)
+ int p_idx, int page_cnt, int offset, int len)
{
struct sdp_sock *ssk = sdp_sk(sk);
long timeo = SDP_SRCAVAIL_ADV_TIMEOUT;
}
}
- tx_sa->fmr = sdp_map_fmr(sk, page_cnt, addrs);
+ tx_sa->fmr = sdp_map_fmr(sk, page_cnt, &tx_sa->addrs[p_idx]);
if (!tx_sa->fmr) {
return -ENOMEM;
}
- tx_sa->bytes_completed = 0;
- tx_sa->bytes_total = len;
- rc = sdp_post_srcavail(sk, tx_sa, offset, len);
+ rc = sdp_post_srcavail(sk, tx_sa, p_idx, iov, offset, len);
if (rc)
goto err_abort_send;
+
rc = sdp_wait_rdmardcompl(ssk, &timeo, len, 0);
if (unlikely(rc)) {
switch (rc) {
}
sdp_prf1(sk, NULL, "got RdmaRdCompl");
- sdp_update_iov_used(sk, iov, tx_sa->bytes_completed);
+ sdp_update_iov_used(sk, iov, tx_sa->bytes_sent);
err_abort_send:
ib_fmr_pool_unmap(tx_sa->fmr);
int page_cnt;
int bytes_left;
int pages_left;
- u64 *addrs;
+ int p_idx;
sdp_dbg_data(sk, "%s\n", __func__);
bytes_left = iov->iov_len;
pages_left = tx_sa->page_cnt;
- addrs = tx_sa->addrs;
+ p_idx = 0;
do {
int p_cnt = min(256, pages_left);
int len = min_t(int, page_cnt * PAGE_SIZE - offset, bytes_left);
sdp_dbg_data(sk, "bytes_left: %d\n", bytes_left);
rc = sdp_rdma_adv_single(sk, tx_sa, iov,
- offset, p_cnt, len, addrs);
+ p_idx, p_cnt, offset, len);
+// offset, p_cnt, len, addrs);
copied += len;
bytes_left -= len;
pages_left -= p_cnt;
- addrs += p_cnt;
+ p_idx += p_cnt;
offset = 0;
} while (!rc && !tx_sa->abort && pages_left > 0);