ssk->snd_una += TCP_SKB_CB(skb)->end_seq;
++ssk->tx_tail;
+
+ /* TODO: AIO and real zcopy cdoe; add their context support here */
+ if (ssk->zcopy_context && skb->data_len) {
+ struct bzcopy_state *bz;
+ struct sdp_bsdh *h;
+
+ h = (struct sdp_bsdh *)skb->data;
+ if (h->mid == SDP_MID_DATA) {
+ bz = (struct bzcopy_state *)ssk->zcopy_context;
+ bz->busy--;
+ }
+ }
+
return skb;
}
wake_up(&ssk->wq);
}
}
-
- sk_stream_write_space(&ssk->isk.sk);
} else {
sdp_cnt(sdp_keepalive_probes_sent);
return;
}
- if (likely(!wc->status)) {
- sdp_post_recvs(ssk);
- sdp_post_sends(ssk, 0);
- }
-
if (ssk->time_wait && !ssk->isk.sk.sk_send_head &&
ssk->tx_head == ssk->tx_tail) {
sdp_dbg(&ssk->isk.sk, "%s: destroy in time wait state\n",
ret = 0;
}
} while (n == SDP_NUM_WC);
+
+ if (!ret) {
+ struct sock *sk = &ssk->isk.sk;
+
+ sdp_post_recvs(ssk);
+ sdp_post_sends(ssk, 0);
+
+ if (sk->sk_sleep && waitqueue_active(sk->sk_sleep)) {
+ if (ssk->zcopy_context)
+ sdp_bzcopy_write_space(ssk);
+ else
+ sk_stream_write_space(&ssk->isk.sk);
+ }
+ }
+
return ret;
}
return rc;
}
- sdp_sk(child)->bufs = ntohs(h->bsdh.bufs);
+ sdp_sk(child)->max_bufs = sdp_sk(child)->bufs = ntohs(h->bsdh.bufs);
+ sdp_sk(child)->min_bufs = sdp_sk(child)->bufs / 4;
sdp_sk(child)->xmit_size_goal = ntohl(h->localrcvsz) -
sizeof(struct sdp_bsdh);
sdp_sk(child)->send_frags = PAGE_ALIGN(sdp_sk(child)->xmit_size_goal) /
PAGE_SIZE;
sdp_resize_buffers(sdp_sk(child), ntohl(h->desremrcvsz));
- sdp_dbg(child, "%s bufs %d xmit_size_goal %d\n", __func__,
+ sdp_dbg(child, "%s bufs %d xmit_size_goal %d send trigger %d\n",
+ __func__,
sdp_sk(child)->bufs,
- sdp_sk(child)->xmit_size_goal);
+ sdp_sk(child)->xmit_size_goal,
+ sdp_sk(child)->min_bufs);
id->context = child;
sdp_sk(child)->id = id;
return 0;
h = event->param.conn.private_data;
- sdp_sk(sk)->bufs = ntohs(h->bsdh.bufs);
+ sdp_sk(sk)->max_bufs = sdp_sk(sk)->bufs = ntohs(h->bsdh.bufs);
+ sdp_sk(sk)->min_bufs = sdp_sk(sk)->bufs / 4;
sdp_sk(sk)->xmit_size_goal = ntohl(h->actrcvsz) -
sizeof(struct sdp_bsdh);
sdp_sk(sk)->send_frags = PAGE_ALIGN(sdp_sk(sk)->xmit_size_goal) /
PAGE_SIZE;
- sdp_dbg(sk, "%s bufs %d xmit_size_goal %d\n", __func__,
+ sdp_dbg(sk, "%s bufs %d xmit_size_goal %d send trigger %d\n",
+ __func__,
sdp_sk(sk)->bufs,
- sdp_sk(sk)->xmit_size_goal);
+ sdp_sk(sk)->xmit_size_goal,
+ sdp_sk(sk)->min_bufs);
ib_req_notify_cq(sdp_sk(sk)->cq, IB_CQ_NEXT_COMP);
#include "sdp.h"
#include <linux/delay.h>
-struct bzcopy_state {
- unsigned char __user *u_base;
- int u_len;
- int left;
- int page_cnt;
- int cur_page;
- int cur_offset;
- struct page **pages;
-};
-
MODULE_AUTHOR("Michael S. Tsirkin");
MODULE_DESCRIPTION("InfiniBand SDP module");
MODULE_LICENSE("Dual BSD/GPL");
module_param_named(sdp_keepalive_time, sdp_keepalive_time, uint, 0644);
MODULE_PARM_DESC(sdp_keepalive_time, "Default idle time in seconds before keepalive probe sent.");
-static int sdp_zcopy_thresh = 2048;
+static int sdp_zcopy_thresh = 8192;
module_param_named(sdp_zcopy_thresh, sdp_zcopy_thresh, int, 0644);
MODULE_PARM_DESC(sdp_zcopy_thresh, "Zero copy send threshold; 0=0ff.");
{
}
-static struct bzcopy_state *sdp_bz_cleanup(struct bzcopy_state *bz)
+static inline struct bzcopy_state *sdp_bz_cleanup(struct bzcopy_state *bz)
{
int i;
+ struct sdp_sock *ssk = (struct sdp_sock *)bz->ssk;
+
+ ssk->zcopy_context = NULL;
if (bz->pages) {
for (i = bz->cur_page; i < bz->page_cnt; i++)
bz->u_len = len;
bz->left = len;
bz->cur_offset = addr & ~PAGE_MASK;
+ bz->busy = 0;
+ bz->ssk = ssk;
bz->page_cnt = PAGE_ALIGN(len + bz->cur_offset) >> PAGE_SHIFT;
bz->pages = kcalloc(bz->page_cnt, sizeof(struct page *), GFP_KERNEL);
}
up_write(¤t->mm->mmap_sem);
+ ssk->zcopy_context = bz;
return bz;
int this_page, left;
struct sdp_sock *ssk = sdp_sk(sk);
+ /* Push the first chunk to page align all following - TODO: review */
if (skb_shinfo(skb)->nr_frags == ssk->send_frags) {
sdp_mark_push(ssk, skb);
return SDP_NEW_SEG;
}
bz->left -= copy;
+ bz->busy++;
return copy;
}
+static inline int slots_free(struct sdp_sock *ssk)
+{
+ int min_free;
+
+ min_free = SDP_TX_SIZE - (ssk->tx_head - ssk->tx_tail);
+ if (ssk->bufs < min_free)
+ min_free = ssk->bufs;
+ min_free -= (min_free < SDP_MIN_BUFS) ? min_free : SDP_MIN_BUFS;
+
+ return min_free;
+};
+
+/* like sk_stream_memory_free - except measures remote credits */
+static inline int sdp_bzcopy_slots_avail(struct sdp_sock *ssk)
+{
+ struct bzcopy_state *bz = (struct bzcopy_state *)ssk->zcopy_context;
+
+ BUG_ON(!bz);
+ return slots_free(ssk) > bz->busy;
+}
+
+/* like sk_stream_wait_memory - except waits on remote credits */
+static int sdp_bzcopy_wait_memory(struct sdp_sock *ssk, long *timeo_p)
+{
+ struct sock *sk = &ssk->isk.sk;
+ struct bzcopy_state *bz = (struct bzcopy_state *)ssk->zcopy_context;
+ int err = 0;
+ long vm_wait = 0;
+ long current_timeo = *timeo_p;
+ DEFINE_WAIT(wait);
+
+ BUG_ON(!bz);
+
+ if (sdp_bzcopy_slots_avail(ssk))
+ current_timeo = vm_wait = (net_random() % (HZ / 5)) + 2;
+
+ while (1) {
+ set_bit(SOCK_ASYNC_NOSPACE, &sk->sk_socket->flags);
+
+ prepare_to_wait(sk->sk_sleep, &wait, TASK_INTERRUPTIBLE);
+
+ if (unlikely(sk->sk_err | (sk->sk_shutdown & SEND_SHUTDOWN))) {
+ err = -EPIPE;
+ break;
+ }
+
+ if (unlikely(!*timeo_p)) {
+ err = -EAGAIN;
+ break;
+ }
+
+ if (unlikely(signal_pending(current))) {
+ err = sock_intr_errno(*timeo_p);
+ break;
+ }
+
+ clear_bit(SOCK_ASYNC_NOSPACE, &sk->sk_socket->flags);
+
+ if (sdp_bzcopy_slots_avail(ssk))
+ break;
+
+ set_bit(SOCK_NOSPACE, &sk->sk_socket->flags);
+ sk->sk_write_pending++;
+ sk_wait_event(sk, ¤t_timeo,
+ sdp_bzcopy_slots_avail(ssk) && vm_wait);
+ sk->sk_write_pending--;
+
+ if (vm_wait) {
+ vm_wait -= current_timeo;
+ current_timeo = *timeo_p;
+ if (current_timeo != MAX_SCHEDULE_TIMEOUT &&
+ (current_timeo -= vm_wait) < 0)
+ current_timeo = 0;
+ vm_wait = 0;
+ }
+ *timeo_p = current_timeo;
+ }
+
+ finish_wait(sk->sk_sleep, &wait);
+ return err;
+}
+
+/* like sk_stream_write_space - execpt measures remote credits */
+void sdp_bzcopy_write_space(struct sdp_sock *ssk)
+{
+ struct sock *sk = &ssk->isk.sk;
+ struct socket *sock = sk->sk_socket;
+
+ if (ssk->bufs >= ssk->min_bufs &&
+ ssk->tx_head == ssk->tx_tail &&
+ sock != NULL) {
+ clear_bit(SOCK_NOSPACE, &sock->flags);
+
+ if (sk->sk_sleep && waitqueue_active(sk->sk_sleep))
+ wake_up_interruptible(sk->sk_sleep);
+ if (sock->fasync_list && !(sk->sk_shutdown & SEND_SHUTDOWN))
+ sock_wake_async(sock, 2, POLL_OUT);
+ }
+}
+
/* Like tcp_sendmsg */
/* TODO: check locking */
(copy = size_goal - skb->len) <= 0) {
new_segment:
- /* Allocate new segment. If the interface is SG,
- * allocate skb fitting to single page.
+ /*
+ * Allocate a new segment
+ * For bcopy, we stop sending once we have
+ * SO_SENDBUF bytes in flight. For bzcopy
+ * we stop sending once we run out of remote
+ * receive credits.
*/
- if (!sk_stream_memory_free(sk))
- goto wait_for_sndbuf;
+ if (bz) {
+ if (!sdp_bzcopy_slots_avail(ssk))
+ goto wait_for_sndbuf;
+ } else {
+ if (!sk_stream_memory_free(sk))
+ goto wait_for_sndbuf;
+ }
skb = sk_stream_alloc_pskb(sk, select_size(sk, ssk),
0, sk->sk_allocation);
if (copied)
sdp_push(sk, ssk, flags & ~MSG_MORE, mss_now, TCP_NAGLE_PUSH);
- if ((err = sk_stream_wait_memory(sk, &timeo)) != 0)
+ err = (bz) ? sdp_bzcopy_wait_memory(ssk, &timeo) :
+ sk_stream_wait_memory(sk, &timeo);
+ if (err)
goto do_error;
mss_now = sdp_current_mss(sk, !(flags&MSG_OOB));
}
out:
- if (bz)
- bz = sdp_bz_cleanup(bz);
- if (copied)
+ if (copied) {
sdp_push(sk, ssk, flags, mss_now, ssk->nonagle);
- if (size > send_poll_thresh)
- poll_send_cq(sk);
+ if (bz) {
+ int max_retry;
+
+ /* Wait for in-flight sends; should be quick */
+ for (max_retry = 0; max_retry < 10000; max_retry++) {
+ if (!bz->busy)
+ break;
+
+ poll_send_cq(sk);
+ }
+
+ if (bz->busy)
+ sdp_warn(sk,
+ "Could not reap %d in-flight sends\n",
+ bz->busy);
+
+ bz = sdp_bz_cleanup(bz);
+ } else
+ if (size > send_poll_thresh)
+ poll_send_cq(sk);
+ }
+
release_sock(sk);
return copied;