The sending SrcAvail will give up and send SrcAvailCancel.
Signed-off-by: Amir Vadai <amirv@mellanox.co.il>
#define SDP_MAX_RECV_SKB_FRAGS (PAGE_SIZE > 0x8000 ? 1 : 0x8000 / PAGE_SIZE)
#define SDP_MAX_SEND_SKB_FRAGS (SDP_MAX_RECV_SKB_FRAGS + 1)
#define SDP_MAX_SEND_SGES 32
+
+/* payload len - rest will be rx'ed into frags */
#define SDP_HEAD_SIZE (PAGE_SIZE / 2 + sizeof(struct sdp_bsdh))
#define SDP_NUM_WC 4
#define SDP_MAX_PAYLOAD ((1 << 16) - SDP_HEAD_SIZE)
struct page **pages;
};
+enum rx_sa_flag {
+ RX_SA_ABORTED = 2,
+};
+
struct rx_srcavail_state {
/* Advertised buffer stuff */
u32 mseq;
/* Utility */
u8 busy;
- u8 aborted;
+ enum rx_sa_flag flags;
+};
+
+enum tx_sa_flag {
+ TX_SA_SENDSM = 0x01,
+ TX_SA_CROSS_SEND = 0x02,
+ TX_SA_INTRRUPTED = 0x04,
+ TX_SA_TIMEDOUT = 0x08,
+ TX_SA_ERROR = 0x10,
};
struct tx_srcavail_state {
u32 bytes_sent;
u32 bytes_acked;
- u8 abort;
+ enum tx_sa_flag abort_flags;
+ u8 posted;
+
u32 mseq;
};
int qp_active;
struct tx_srcavail_state *tx_sa;
+ struct rx_srcavail_state *rx_sa;
spinlock_t tx_sa_lock;
int max_send_sge;
- int srcavail_cancel;
struct delayed_work srcavail_cancel_work;
int srcavail_cancel_mseq;
int sdp_tx_ring_create(struct sdp_sock *ssk, struct ib_device *device);
void sdp_tx_ring_destroy(struct sdp_sock *ssk);
int sdp_xmit_poll(struct sdp_sock *ssk, int force);
-void sdp_post_send(struct sdp_sock *ssk, struct sk_buff *skb, u8 mid);
+void sdp_post_send(struct sdp_sock *ssk, struct sk_buff *skb);
void sdp_post_sends(struct sdp_sock *ssk, int nonagle);
void sdp_nagle_timeout(unsigned long data);
void sdp_post_keepalive(struct sdp_sock *ssk);
unsigned long addr);
int sdp_post_rdma_rd_compl(struct sdp_sock *ssk,
struct rx_srcavail_state *rx_sa);
-int sdp_post_sendsm(struct sdp_sock *ssk);
+int sdp_post_sendsm(struct sock *sk);
void srcavail_cancel_timeout(struct work_struct *work);
-static inline int sdp_tx_ring_slots_left(struct sdp_sock *ssk)
-{
- return SDP_TX_SIZE - tx_ring_posted(ssk);
-}
-
static inline void sdp_arm_rx_cq(struct sock *sk)
{
sdp_prf(sk, NULL, "Arming RX cq");
return NULL;
}
+static inline struct sk_buff *sdp_alloc_skb(struct sock *sk, u8 mid, int size)
+{
+ struct sdp_bsdh *h;
+ struct sk_buff *skb;
+ gfp_t gfp;
+
+ if (unlikely(sk->sk_allocation))
+ gfp = sk->sk_allocation;
+ else
+ gfp = GFP_KERNEL;
+
+ skb = sk_stream_alloc_skb(sk, sizeof(struct sdp_bsdh) + size, gfp);
+ BUG_ON(!skb);
+
+ skb_header_release(skb);
+
+ h = (struct sdp_bsdh *)skb_push(skb, sizeof *h);
+ h->mid = mid;
+
+ skb_reset_transport_header(skb);
+
+ return skb;
+}
+static inline struct sk_buff *sdp_alloc_skb_data(struct sock *sk)
+{
+ return sdp_alloc_skb(sk, SDP_MID_DATA, 0);
+}
+
+static inline struct sk_buff *sdp_alloc_skb_disconnect(struct sock *sk)
+{
+ return sdp_alloc_skb(sk, SDP_MID_DISCONN, 0);
+}
+
+static inline struct sk_buff *sdp_alloc_skb_chrcvbuf_ack(struct sock *sk,
+ int size)
+{
+ struct sk_buff *skb;
+ struct sdp_chrecvbuf *resp_size;
+
+ skb = sdp_alloc_skb(sk, SDP_MID_CHRCVBUF_ACK, sizeof(*resp_size));
+
+ resp_size = (struct sdp_chrecvbuf *)skb_put(skb, sizeof *resp_size);
+ resp_size->size = htonl(size);
+
+ return skb;
+}
+
+static inline struct sk_buff *sdp_alloc_skb_srcavail(struct sock *sk,
+ u32 len, u32 rkey, u64 vaddr)
+{
+ struct sk_buff *skb;
+ struct sdp_srcah *srcah;
+
+ skb = sdp_alloc_skb(sk, SDP_MID_SRCAVAIL, sizeof(*srcah));
+
+ srcah = (struct sdp_srcah *)skb_put(skb, sizeof(*srcah));
+ srcah->len = htonl(len);
+ srcah->rkey = htonl(rkey);
+ srcah->vaddr = cpu_to_be64(vaddr);
+
+ return skb;
+}
+
+static inline struct sk_buff *sdp_alloc_skb_srcavail_cancel(struct sock *sk)
+{
+ return sdp_alloc_skb(sk, SDP_MID_SRCAVAIL_CANCEL, 0);
+}
+
+static inline struct sk_buff *sdp_alloc_skb_rdmardcompl(struct sock *sk,
+ u32 len)
+{
+ struct sk_buff *skb;
+ struct sdp_rrch *rrch;
+
+ skb = sdp_alloc_skb(sk, SDP_MID_RDMARDCOMPL, sizeof(*rrch));
+
+ rrch = (struct sdp_rrch *)skb_put(skb, sizeof(*rrch));
+ rrch->len = htonl(len);
+
+ return skb;
+}
+
+static inline struct sk_buff *sdp_alloc_skb_sendsm(struct sock *sk)
+{
+ return sdp_alloc_skb(sk, SDP_MID_SENDSM, 0);
+}
+static inline int sdp_tx_ring_slots_left(struct sdp_sock *ssk)
+{
+ return SDP_TX_SIZE - tx_ring_posted(ssk);
+}
+
#endif
int len = 0;
char buf[256];
len += snprintf(buf, 255-len, "%s skb: %p mid: %2x:%-20s flags: 0x%x "
- "bufs: %d len: %d mseq: %d mseq_ack: %d | ",
+ "bufs: 0x%x len: 0x%x mseq: 0x%x mseq_ack: 0x%x | ",
str, skb, h->mid, mid2str(h->mid), h->flags,
ntohs(h->bufs), ntohl(h->len), ntohl(h->mseq),
ntohl(h->mseq_ack));
hh = (struct sdp_hh *)h;
len += snprintf(buf + len, 255-len,
"max_adverts: %d majv_minv: 0x%x "
- "localrcvsz: %d desremrcvsz: %d |",
+ "localrcvsz: 0x%x desremrcvsz: 0x%x |",
hh->max_adverts, hh->majv_minv,
ntohl(hh->localrcvsz),
ntohl(hh->desremrcvsz));
break;
case SDP_MID_HELLO_ACK:
hah = (struct sdp_hah *)h;
- len += snprintf(buf + len, 255-len, "actrcvz: %d |",
+ len += snprintf(buf + len, 255-len, "actrcvz: 0x%x |",
ntohl(hah->actrcvsz));
break;
case SDP_MID_CHRCVBUF:
case SDP_MID_CHRCVBUF_ACK:
req_size = (struct sdp_chrecvbuf *)(h+1);
- len += snprintf(buf + len, 255-len, "req_size: %d |",
+ len += snprintf(buf + len, 255-len, "req_size: 0x%x |",
ntohl(req_size->size));
break;
case SDP_MID_DATA:
- len += snprintf(buf + len, 255-len, "data_len: %ld |",
+ len += snprintf(buf + len, 255-len, "data_len: 0x%lx |",
ntohl(h->len) - sizeof(struct sdp_bsdh));
break;
case SDP_MID_RDMARDCOMPL:
rrch = (struct sdp_rrch *)(h+1);
- len += snprintf(buf + len, 255-len, " | len: %d |",
+ len += snprintf(buf + len, 255-len, " | len: 0x%x |",
ntohl(rrch->len));
break;
case SDP_MID_SRCAVAIL:
srcah = (struct sdp_srcah *)(h+1);
- len += snprintf(buf + len, 255-len, " | payload: %ld, "
- "len: %d, rkey: 0x%x, vaddr: 0x%llx |",
+ len += snprintf(buf + len, 255-len, " | payload: 0x%lx, "
+ "len: 0x%x, rkey: 0x%x, vaddr: 0x%llx |",
ntohl(h->len) - sizeof(struct sdp_bsdh) -
sizeof(struct sdp_srcah),
ntohl(srcah->len), ntohl(srcah->rkey),
mod_timer(&ssk->nagle_timer, jiffies + SDP_NAGLE_TIMEOUT);
}
-int sdp_post_credits(struct sdp_sock *ssk)
-{
- int post_count = 0;
-
- sdp_dbg_data(&ssk->isk.sk, "credits: %d remote credits: %d "
- "tx ring slots left: %d send_head: %p\n",
- tx_credits(ssk), remote_credits(ssk),
- sdp_tx_ring_slots_left(ssk),
- ssk->isk.sk.sk_send_head);
-
- if (likely(tx_credits(ssk) > 1) &&
- likely(sdp_tx_ring_slots_left(ssk))) {
- struct sk_buff *skb;
- skb = sdp_stream_alloc_skb(&ssk->isk.sk,
- sizeof(struct sdp_bsdh),
- GFP_KERNEL);
- if (!skb)
- return -ENOMEM;
- sdp_post_send(ssk, skb, SDP_MID_DATA);
- post_count++;
- }
-
- if (post_count)
- sdp_xmit_poll(ssk, 0);
- return post_count;
-}
-
void sdp_post_sends(struct sdp_sock *ssk, int nonagle)
{
/* TODO: nonagle? */
int c;
gfp_t gfp_page;
int post_count = 0;
+ struct sock *sk = &ssk->isk.sk;
if (unlikely(!ssk->id)) {
if (ssk->isk.sk.sk_send_head) {
sdp_dbg_data(&ssk->isk.sk, "freed %d\n", wc_processed);
}
- if (ssk->tx_sa && ssk->srcavail_cancel &&
- tx_credits(ssk) >= SDP_MIN_TX_CREDITS &&
- sdp_tx_ring_slots_left(ssk)) {
- sdp_prf1(&ssk->isk.sk, NULL, "Going to send srcavail cancel");
- skb = sdp_stream_alloc_skb(&ssk->isk.sk,
- sizeof(struct sdp_bsdh),
- gfp_page);
- /* FIXME */
- BUG_ON(!skb);
- TX_SRCAVAIL_STATE(skb) = ssk->tx_sa;
- sdp_post_send(ssk, skb, SDP_MID_SRCAVAIL_CANCEL);
- post_count++;
- ssk->srcavail_cancel = 0;
- }
-
if (ssk->recv_request &&
ring_tail(ssk->rx_ring) >= ssk->recv_request_head &&
tx_credits(ssk) >= SDP_MIN_TX_CREDITS &&
sdp_tx_ring_slots_left(ssk)) {
- struct sdp_chrecvbuf *resp_size;
ssk->recv_request = 0;
- skb = sdp_stream_alloc_skb(&ssk->isk.sk,
- sizeof(struct sdp_bsdh) +
- sizeof(*resp_size),
- gfp_page);
- /* FIXME */
- BUG_ON(!skb);
- resp_size = (struct sdp_chrecvbuf *)skb_put(skb,
- sizeof *resp_size);
- resp_size->size = htonl(ssk->recv_frags * PAGE_SIZE);
- sdp_post_send(ssk, skb, SDP_MID_CHRCVBUF_ACK);
+
+ skb = sdp_alloc_skb_chrcvbuf_ack(sk,
+ ssk->recv_frags * PAGE_SIZE);
+
+ sdp_post_send(ssk, skb);
post_count++;
}
sdp_tx_ring_slots_left(ssk) &&
(skb = ssk->isk.sk.sk_send_head) &&
sdp_nagle_off(ssk, skb)) {
- struct tx_srcavail_state *tx_sa;
update_send_head(&ssk->isk.sk, skb);
__skb_dequeue(&ssk->isk.sk.sk_write_queue);
- tx_sa = TX_SRCAVAIL_STATE(skb);
- if (unlikely(tx_sa)) {
- if (ssk->tx_sa != tx_sa) {
- sdp_warn(&ssk->isk.sk, "SrcAvail cancelled "
- "before being sent!\n");
- __kfree_skb(skb);
- } else {
- if (likely(!tx_sa->abort))
- sdp_post_send(ssk, skb, SDP_MID_SRCAVAIL);
- else
- sdp_warn(&ssk->isk.sk, "Not sending aborted SrcAvail\n");
- }
- } else {
- sdp_post_send(ssk, skb, SDP_MID_DATA);
- }
+ sdp_post_send(ssk, skb);
+
post_count++;
}
likely(sdp_tx_ring_slots_left(ssk)) &&
likely((1 << ssk->isk.sk.sk_state) &
(TCPF_ESTABLISHED | TCPF_FIN_WAIT1))) {
- skb = sdp_stream_alloc_skb(&ssk->isk.sk,
- sizeof(struct sdp_bsdh),
- GFP_KERNEL);
- /* FIXME */
- BUG_ON(!skb);
+
+ skb = sdp_alloc_skb_data(&ssk->isk.sk);
+ sdp_post_send(ssk, skb);
+
SDPSTATS_COUNTER_INC(post_send_credits);
- sdp_post_send(ssk, skb, SDP_MID_DATA);
post_count++;
}
!ssk->isk.sk.sk_send_head &&
tx_credits(ssk) > 1) {
ssk->sdp_disconnect = 0;
- skb = sdp_stream_alloc_skb(&ssk->isk.sk,
- sizeof(struct sdp_bsdh),
- gfp_page);
- /* FIXME */
- BUG_ON(!skb);
- sdp_post_send(ssk, skb, SDP_MID_DISCONN);
+
+ skb = sdp_alloc_skb_disconnect(sk);
+ sdp_post_send(ssk, skb);
+
post_count++;
}
* reader process may not have drained the data yet!
*/
while ((skb = skb_dequeue(&sk->sk_receive_queue)) != NULL) {
- data_was_unread = 1;
+ struct sdp_bsdh *h = (struct sdp_bsdh *)skb_transport_header(skb);
+ if (h->mid == SDP_MID_DISCONN) {
+ sdp_handle_disconn(sk);
+ } else {
+ sdp_warn(sk, "Data was unread. skb: %p\n", skb);
+ data_was_unread = 1;
+ }
__kfree_skb(skb);
}
ssk->sdp_disconnect = 0;
ssk->destructed_already = 0;
ssk->destruct_in_process = 0;
- ssk->srcavail_cancel = 0;
spin_lock_init(&ssk->lock);
spin_lock_init(&ssk->tx_sa_lock);
void skb_entail(struct sock *sk, struct sdp_sock *ssk, struct sk_buff *skb)
{
- skb_header_release(skb);
__skb_queue_tail(&sk->sk_write_queue, skb);
sk->sk_wmem_queued += skb->truesize;
sk_mem_charge(sk, skb->truesize);
left -= this_page;
skb->len += this_page;
- skb->data_len = skb->len;
+ skb->data_len += this_page;
skb->truesize += this_page;
sk->sk_wmem_queued += this_page;
sk->sk_forward_alloc -= this_page;
goto wait_for_sndbuf;
}
- skb = sdp_stream_alloc_skb(sk, 0,
- sk->sk_allocation);
+ skb = sdp_alloc_skb_data(sk);
if (!skb)
goto wait_for_memory;
case SDP_MID_SRCAVAIL:
rx_sa = RX_SRCAVAIL_STATE(skb);
- if (rx_sa->mseq < ssk->srcavail_cancel_mseq) {
- rx_sa->aborted = 1;
- sdp_dbg_data(sk, "Ignoring src avail "
- "due to SrcAvailCancel\n");
- goto skb_cleanup;
- }
/* if has payload - handle as if MID_DATA */
if (rx_sa->used < skb->len) {
sdp_dbg_data(sk, "Finished payload. "
"RDMAing: %d/%d\n",
rx_sa->used, rx_sa->len);
+
+ if (rx_sa->mseq < ssk->srcavail_cancel_mseq) {
+ rx_sa->flags |= RX_SA_ABORTED;
+ sdp_dbg_data(sk, "Ignoring src avail "
+ "due to SrcAvailCancel\n");
+ }
+
+ if (rx_sa->flags & RX_SA_ABORTED) {
+ sdp_warn(sk, "rx_sa aborted. not rdmaing\n");
+ goto skb_cleanup;
+ }
+
avail_bytes_count = rx_sa->len;
}
}
if (before(*seq, SDP_SKB_CB(skb)->seq)) {
- sdp_warn(sk, "recvmsg bug: copied %X seq %X\n",
- *seq, SDP_SKB_CB(skb)->seq);
+ sdp_warn(sk, "skb: %p recvmsg bug: copied %X seq %X\n",
+ skb, *seq, SDP_SKB_CB(skb)->seq);
sdp_reset(sk);
break;
}
if (rx_sa) {
- if (ssk->srcavail_cancel_mseq < rx_sa->mseq) {
- rc = sdp_post_rdma_rd_compl(ssk, rx_sa);
- BUG_ON(rc);
- }
- if (rx_sa->aborted) {
- sdp_warn(sk, "RDMA aborted. Sending SendSM\n");
- rc = sdp_post_sendsm(ssk);
- BUG_ON(rc);
- }
+ rc = sdp_post_rdma_rd_compl(ssk, rx_sa);
+ BUG_ON(rc);
+
}
- if ((!rx_sa && used + offset < skb->len) ||
- (rx_sa && !rx_sa->aborted && rx_sa->used < rx_sa->len))
+ if (!rx_sa && used + offset < skb->len)
+ continue;
+
+ if (rx_sa && !(rx_sa->flags & RX_SA_ABORTED) &&
+ rx_sa->used < rx_sa->len)
continue;
+
offset = 0;
skb_cleanup:
- if (!(flags & MSG_PEEK) || (rx_sa && rx_sa->aborted)) {
+ if (!(flags & MSG_PEEK) ||
+ (rx_sa && (rx_sa->flags & RX_SA_ABORTED))) {
struct sdp_bsdh *h;
h = (struct sdp_bsdh *)skb_transport_header(skb);
sdp_prf1(sk, skb, "READ finished. mseq: %d mseq_ack:%d",
ntohl(h->mseq), ntohl(h->mseq_ack));
+
+ if (rx_sa) {
+ if (!rx_sa->flags) /* else ssk->rx_sa might
+ point to another rx_sa */
+ ssk->rx_sa = NULL;
+
+ kfree(rx_sa);
+ rx_sa = NULL;
+
+ }
+
skb_unlink(skb, &sk->sk_receive_queue);
__kfree_skb(skb);
-
- kfree(rx_sa);
- rx_sa = NULL;
}
continue;
found_fin_ok:
SDP_SKB_CB(skb)->seq = rcv_nxt(ssk);
if (h->mid == SDP_MID_SRCAVAIL) {
struct sdp_srcah *srcah = (struct sdp_srcah *)(h+1);
- struct rx_srcavail_state *sa;
+ struct rx_srcavail_state *rx_sa;
ssk->srcavail_cancel_mseq = 0;
- sa = RX_SRCAVAIL_STATE(skb) = kzalloc(
+ ssk->rx_sa = rx_sa = RX_SRCAVAIL_STATE(skb) = kzalloc(
sizeof(struct rx_srcavail_state), GFP_ATOMIC);
- sa->mseq = ntohl(h->mseq);
- sa->aborted = 0;
- sa->used = 0;
- sa->len = skb_len = ntohl(srcah->len);
- sa->rkey = ntohl(srcah->rkey);
- sa->vaddr = be64_to_cpu(srcah->vaddr);
+ rx_sa->mseq = ntohl(h->mseq);
+ rx_sa->used = 0;
+ rx_sa->len = skb_len = ntohl(srcah->len);
+ rx_sa->rkey = ntohl(srcah->rkey);
+ rx_sa->vaddr = be64_to_cpu(srcah->vaddr);
+ rx_sa->flags = 0;
+
+ if (ssk->tx_sa) {
+ sdp_warn(&ssk->isk.sk, "got RX SrcAvail while waiting "
+ "for TX SrcAvail. waking up TX SrcAvail"
+ "to be aborted\n");
+ wake_up(sk->sk_sleep);
+ }
atomic_add(skb->len, &ssk->rcv_nxt);
sdp_dbg_data(sk, "queueing SrcAvail. skb_len = %d vaddr = %lld\n",
- skb_len, sa->vaddr);
+ skb_len, rx_sa->vaddr);
} else {
skb_len = skb->len;
__kfree_skb(skb);
break;
case SDP_MID_SRCAVAIL_CANCEL:
- sdp_dbg_data(sk, "Handling SrcAvailCancel - sending SendSM\n");
+ sdp_dbg_data(sk, "Handling SrcAvailCancel\n");
sdp_prf(sk, NULL, "Handling SrcAvailCancel");
- ssk->srcavail_cancel_mseq = ntohl(h->mseq);
- sdp_post_sendsm(ssk);
+ if (ssk->rx_sa) {
+ ssk->srcavail_cancel_mseq = ntohl(h->mseq);
+ ssk->rx_sa->flags |= RX_SA_ABORTED;
+ ssk->rx_sa = NULL; /* TODO: change it into SDP_MID_DATA and get
+ the dirty logic from recvmsg */
+ sdp_post_sendsm(sk);
+ } else {
+ sdp_warn(sk, "Got SrcAvailCancel - "
+ "but no SrcAvail in process\n");
+ }
break;
+ case SDP_MID_SINKAVAIL:
+ sdp_dbg_data(sk, "Got SinkAvail - not supported: ignored\n");
+ sdp_prf(sk, NULL, "Got SinkAvail - not supported: ignored");
+ __kfree_skb(skb);
case SDP_MID_ABORT:
sdp_dbg_data(sk, "Handling ABORT\n");
sdp_prf(sk, NULL, "Handling ABORT");
sk_send_sigurg(sk);*/
skb_pull(skb, sizeof(struct sdp_bsdh));
- if (h->mid == SDP_MID_SRCAVAIL) {
+
+ if (h->mid == SDP_MID_SRCAVAIL)
skb_pull(skb, sizeof(struct sdp_srcah));
- }
if (unlikely(h->mid == SDP_MID_DATA && skb->len == 0)) {
/* Credit update is valid even after RCV_SHUTDOWN */
else
skb->data_len = 0;
-
- if (h->mid == SDP_MID_SRCAVAIL)
- skb->data_len -= sizeof(struct sdp_srcah);
-
#ifdef NET_SKBUFF_DATA_USES_OFFSET
skb->tail = skb_headlen(skb);
#else
static unsigned long last_send;
-void sdp_post_send(struct sdp_sock *ssk, struct sk_buff *skb, u8 mid)
+void sdp_post_send(struct sdp_sock *ssk, struct sk_buff *skb)
{
struct sdp_buf *tx_req;
- struct sdp_bsdh *h = (struct sdp_bsdh *)skb_push(skb, sizeof *h);
+ struct sdp_bsdh *h = (struct sdp_bsdh *)skb_transport_header(skb);
unsigned long mseq = ring_head(ssk->tx_ring);
int i, rc, frags;
u64 addr;
struct ib_sge *sge = ibsge;
struct ib_send_wr tx_wr = { 0 };
- SDPSTATS_COUNTER_MID_INC(post_send, mid);
+ SDPSTATS_COUNTER_MID_INC(post_send, h->mid);
SDPSTATS_HIST(send_size, skb->len);
ssk->tx_packets++;
ssk->tx_bytes += skb->len;
- h->mid = mid;
+ if (unlikely(h->mid == SDP_MID_SRCAVAIL)) {
+ struct tx_srcavail_state *tx_sa = TX_SRCAVAIL_STATE(skb);
+ if (ssk->tx_sa != tx_sa) {
+ sdp_warn(&ssk->isk.sk, "SrcAvail cancelled "
+ "before being sent!\n");
+ WARN_ON(1);
+ __kfree_skb(skb);
+ return;
+ }
+ TX_SRCAVAIL_STATE(skb)->mseq = mseq;
+ }
+
if (unlikely(SDP_SKB_CB(skb)->flags & TCPCB_FLAG_URG))
h->flags = SDP_OOB_PRES | SDP_OOB_PEND;
else
h->bufs = htons(rx_ring_posted(ssk));
h->len = htonl(skb->len);
h->mseq = htonl(mseq);
- if (TX_SRCAVAIL_STATE(skb))
- TX_SRCAVAIL_STATE(skb)->mseq = mseq;
h->mseq_ack = htonl(mseq_ack(ssk));
sdp_prf1(&ssk->isk.sk, skb, "TX: %s bufs: %d mseq:%ld ack:%d",
- mid2str(mid), rx_ring_posted(ssk), mseq,
+ mid2str(h->mid), rx_ring_posted(ssk), mseq,
ntohl(h->mseq_ack));
SDP_DUMP_PACKET(&ssk->isk.sk, "TX", skb, h);
int page_idx, struct iovec *iov, int off, size_t len)
{
struct sdp_sock *ssk = sdp_sk(sk);
- struct sdp_srcah *srcah;
struct sk_buff *skb;
int payload_len;
BUG_ON(!tx_sa);
BUG_ON(!tx_sa->fmr || !tx_sa->fmr->fmr->lkey);
- skb = sk_stream_alloc_skb(&ssk->isk.sk,
- sizeof(struct sdp_bsdh) +
- sizeof(struct sdp_srcah) +
- SDP_SRCAVAIL_PAYLOAD_LEN,
- GFP_KERNEL);
+ tx_sa->bytes_sent = tx_sa->bytes_acked = 0;
+
+ skb = sdp_alloc_skb_srcavail(sk, len, tx_sa->fmr->fmr->lkey, off);
if (!skb) {
return -ENOMEM;
}
* but continue to live after skb is freed */
ssk->tx_sa = tx_sa;
- srcah = (struct sdp_srcah *)skb_push(skb, sizeof(*srcah));
- srcah->len = htonl(len);
- srcah->rkey = htonl(tx_sa->fmr->fmr->lkey);
- srcah->vaddr = cpu_to_be64(off);
-
if (0) {
void *payload;
payload = skb_put(skb, SDP_SRCAVAIL_PAYLOAD_LEN);
skb_entail(sk, ssk, skb);
+ ssk->write_seq += payload_len;
+ SDP_SKB_CB(skb)->end_seq += payload_len;
+
tx_sa->bytes_sent = len;
tx_sa->bytes_acked = payload_len;
static int sdp_post_srcavail_cancel(struct sock *sk)
{
struct sdp_sock *ssk = sdp_sk(sk);
+ struct sk_buff *skb;
- if (!ssk->tx_sa && !ssk->srcavail_cancel)
- return 0; /* srcavail already serviced */
+ sdp_warn(&ssk->isk.sk, "Posting srcavail cancel\n");
- ssk->srcavail_cancel = 1;
+ skb = sdp_alloc_skb_srcavail_cancel(sk);
+ skb_entail(sk, ssk, skb);
sdp_post_sends(ssk, 1);
if (unlikely(!*timeo_p)) {
err = -ETIME;
+ tx_sa->abort_flags |= TX_SA_TIMEDOUT;
sdp_prf1(sk, NULL, "timeout");
break;
}
- if (unlikely(!ignore_signals && signal_pending(current))) {
- err = -EINTR;
- sdp_prf1(sk, NULL, "signalled");
- break;
- }
-
if (tx_sa->bytes_acked == tx_sa->bytes_sent)
break;
+
else if (tx_sa->bytes_acked > tx_sa->bytes_sent) {
err = -EINVAL;
sdp_warn(sk, "acked bytes > sent bytes\n");
+ tx_sa->abort_flags |= TX_SA_ERROR;
break;
}
- if (tx_sa->abort) {
+ if (tx_sa->abort_flags & TX_SA_SENDSM) {
sdp_prf1(sk, NULL, "Aborting SrcAvail sending");
err = -EAGAIN;
break ;
}
+ if (!ignore_signals) {
+ if (signal_pending(current)) {
+ err = -EINTR;
+ sdp_prf1(sk, NULL, "signalled");
+ tx_sa->abort_flags |= TX_SA_INTRRUPTED;
+ break;
+ }
+
+ if (ssk->rx_sa) {
+ sdp_warn(sk, "Crossing SrcAvail - aborting this\n");
+ tx_sa->abort_flags |= TX_SA_CROSS_SEND;
+ err = -ETIME;
+ break ;
+ }
+ }
+
posts_handler_put(ssk);
sk_wait_event(sk, ¤t_timeo,
- tx_sa->abort &&
+ tx_sa->abort_flags &&
+ ssk->rx_sa &&
(tx_sa->bytes_acked < tx_sa->bytes_sent) &&
vm_wait);
sdp_dbg_data(&ssk->isk.sk, "woke up sleepers\n");
finish_wait(sk->sk_sleep, &wait);
- sdp_dbg_data(sk, "Finished waiting - RdmaRdCompl: %d/%d bytes, abort: %d\n",
- tx_sa->bytes_acked, tx_sa->bytes_sent, tx_sa->abort);
+ sdp_dbg_data(sk, "Finished waiting - RdmaRdCompl: %d/%d bytes, flags: 0x%x\n",
+ tx_sa->bytes_acked, tx_sa->bytes_sent, tx_sa->abort_flags);
if (!ssk->qp_active) {
sdp_warn(sk, "QP destroyed while waiting\n");
int sdp_post_rdma_rd_compl(struct sdp_sock *ssk,
struct rx_srcavail_state *rx_sa)
{
- struct sdp_rrch *rrch;
struct sk_buff *skb;
- gfp_t gfp_page;
int copied = rx_sa->used - rx_sa->reported;
if (rx_sa->used <= rx_sa->reported)
return 0;
- if (unlikely(ssk->isk.sk.sk_allocation))
- gfp_page = ssk->isk.sk.sk_allocation;
- else
- gfp_page = GFP_KERNEL;
+ skb = sdp_alloc_skb_rdmardcompl(&ssk->isk.sk, copied);
- skb = sdp_stream_alloc_skb(&ssk->isk.sk,
- sizeof(struct sdp_bsdh) +
- sizeof(struct sdp_rrch),
- gfp_page);
- /* FIXME */
- BUG_ON(!skb);
-
- rrch = (struct sdp_rrch *)skb_put(skb, sizeof(*rrch));
- rrch->len = htonl(copied);
rx_sa->reported += copied;
+
/* TODO: What if no tx_credits available? */
- sdp_post_send(ssk, skb, SDP_MID_RDMARDCOMPL);
+ sdp_post_send(ssk, skb);
return 0;
}
-int sdp_post_sendsm(struct sdp_sock *ssk)
+int sdp_post_sendsm(struct sock *sk)
{
- struct sk_buff *skb;
- gfp_t gfp_page;
+ struct sk_buff *skb = sdp_alloc_skb_sendsm(sk);
- if (unlikely(ssk->isk.sk.sk_allocation))
- gfp_page = ssk->isk.sk.sk_allocation;
- else
- gfp_page = GFP_KERNEL;
-
- skb = sdp_stream_alloc_skb(&ssk->isk.sk,
- sizeof(struct sdp_bsdh),
- gfp_page);
- /* FIXME */
- BUG_ON(!skb);
-
- sdp_post_send(ssk, skb, SDP_MID_SENDSM);
+ sdp_post_send(sdp_sk(sk), skb);
return 0;
}
goto out;
}
- if (ssk->tx_sa->mseq < mseq_ack) {
- sdp_prf1(sk, NULL, "SendSM arrived for old SrcAvail. "
- "SendSM mseq_ack: 0x%x, SrcAvail mseq: 0x%x",
+ if (mseq_ack < ssk->tx_sa->mseq) {
+ sdp_warn(sk, "SendSM arrived for old SrcAvail. "
+ "SendSM mseq_ack: 0x%x, SrcAvail mseq: 0x%x\n",
mseq_ack, ssk->tx_sa->mseq);
goto out;
}
- sdp_prf1(sk, NULL, "Got SendSM - aborting SrcAvail");
+ sdp_warn(sk, "Got SendSM - aborting SrcAvail\n");
- ssk->tx_sa->abort = 1;
+ ssk->tx_sa->abort_flags |= TX_SA_SENDSM;
cancel_delayed_work(&ssk->srcavail_cancel_work);
wake_up(sk->sk_sleep);
- sdp_dbg_data(sk, "woke up sleepers\n");
+ sdp_warn(sk, "woke up sleepers\n");
out:
spin_unlock_irqrestore(&ssk->tx_sa_lock, flags);
* post sendsm */
sdp_warn(sk, "post rdma, wait_for_compl "
"or post rdma_rd_comp failed - post sendsm\n");
- rx_sa->aborted = 1;
+ rx_sa->flags |= RX_SA_ABORTED;
+ ssk->rx_sa = NULL; /* TODO: change it into SDP_MID_DATA and get
+ the dirty logic from recvmsg */
}
ssk->tx_ring.rdma_inflight = NULL;
if (rc)
goto err_abort_send;
-
rc = sdp_wait_rdmardcompl(ssk, &timeo, len, 0);
if (unlikely(rc)) {
- switch (rc) {
- case -EAGAIN: /* Got SendSM */
+ enum tx_sa_flag f = tx_sa->abort_flags;
+
+ if (f & TX_SA_SENDSM) {
sdp_warn(sk, "got SendSM. use SEND verb.\n");
- break;
+ } else if (f & TX_SA_ERROR) {
+ sdp_warn(sk, "SrcAvail error completion\n");
+ sdp_reset(sk);
+ } else if (ssk->qp_active) {
+ sdp_warn(sk, "Aborting send. abort_flag = 0x%x.\n", f);
- case -ETIME: /* Timedout */
- sdp_warn(sk, "TX srcavail timedout.\n");
- case -EINTR: /* interrupted */
- sdp_prf1(sk, NULL, "Aborting send.");
sdp_post_srcavail_cancel(sk);
/* Wait for RdmaRdCompl/SendSM to
sdp_warn(sk, "Waiting for SendSM\n");
sdp_wait_rdmardcompl(ssk, &timeo, len, 1);
sdp_warn(sk, "finished waiting\n");
-
- break;
-
- default:
- sdp_warn(sk, "error sending srcavail. rc = %d\n", rc);
- /* Socked destroyed while waited */
- break;
+ } else {
+ sdp_warn(sk, "QP was destroyed while waiting\n");
}
goto err_abort_send;
}
sdp_prf1(sk, NULL, "got RdmaRdCompl");
- sdp_update_iov_used(sk, iov, tx_sa->bytes_sent);
-
err_abort_send:
+ sdp_update_iov_used(sk, iov, tx_sa->bytes_acked);
+
ib_fmr_pool_unmap(tx_sa->fmr);
spin_lock_irqsave(&ssk->tx_sa_lock, lock_flags);
int p_idx;
sdp_dbg_data(sk, "%s\n", __func__);
+ if (ssk->rx_sa) {
+ sdp_warn(sk, "Deadlock prevent: crossing SrcAvail\n");
+ return -EAGAIN;
+ }
lock_sock(sk);
sdp_dbg_data(sk, "bytes_left: %d\n", bytes_left);
rc = sdp_rdma_adv_single(sk, tx_sa, iov,
p_idx, p_cnt, offset, len);
-// offset, p_cnt, len, addrs);
copied += len;
bytes_left -= len;
pages_left -= p_cnt;
p_idx += p_cnt;
offset = 0;
- } while (!rc && !tx_sa->abort && pages_left > 0);
+ } while (!rc && !tx_sa->abort_flags && pages_left > 0);
sdp_unmap_dma(sk, tx_sa->addrs, tx_sa->page_cnt);
err_map_dma: