return SDP_TX_SIZE - tx_ring_posted(ssk);
}
+/* Return true if need to send credit update. Rules are:
+ * - at least half of the RX buffer is available
+ * - 1.5 * c < p
+ * - has TX credits
+ * - has room in tx Q
+ *
+ * p = number of posted buffers
+ * c = current credits count at the peer
+ */
static inline int credit_update_needed(struct sdp_sock *ssk)
{
int c;
+ if (rx_ring_posted(ssk) < (SDP_RX_SIZE >> 1))
+ return 0;
+
c = remote_credits(ssk);
if (likely(c > SDP_MIN_TX_CREDITS))
c += c/2;
u32 send_wait_for_mem;
u32 send_miss_no_credits;
u32 rx_poll_miss;
+ u32 rx_poll_hit;
u32 tx_poll_miss;
u32 tx_poll_hit;
u32 tx_poll_busy;
void sdp_rx_comp_full(struct sdp_sock *ssk);
void sdp_remove_large_sock(const struct sdp_sock *ssk);
void sdp_handle_disconn(struct sock *sk);
+int sdp_process_rx(struct sdp_sock *ssk);
/* sdp_zcopy.c */
int sdp_sendmsg_zcopy(struct kiocb *iocb, struct sock *sk, struct iovec *iov);
"Enable data path debug tracing if > 0.");
#endif
-SDP_MODPARAM_SINT(recv_poll_hit, -1, "How many times recv poll helped.");
-SDP_MODPARAM_SINT(recv_poll_miss, -1, "How many times recv poll missed.");
-SDP_MODPARAM_SINT(recv_poll, 1000, "How many times to poll recv.");
+SDP_MODPARAM_SINT(recv_poll, 10, "How many msec to poll recv.");
SDP_MODPARAM_SINT(sdp_keepalive_time, SDP_KEEPALIVE_TIME,
"Default idle time in seconds before keepalive probe sent.");
static int sdp_bzcopy_thresh = 0;
sdp_sk(sk)->sdp_disconnect = 1;
sdp_post_sends(sdp_sk(sk), 0);
+
+ sdp_arm_rx_cq(sk);
}
/*
case TCP_KEEPIDLE:
val = (ssk->keepalive_time ? : sdp_keepalive_time) / HZ;
break;
+ case TCP_MAXSEG:
+ val = ssk->xmit_size_goal;
+ break;
case SDP_ZCOPY_THRESH:
val = ssk->zcopy_thresh;
break;
static inline int poll_recv_cq(struct sock *sk)
{
- int i;
- for (i = 0; i < recv_poll; ++i) {
- if (!skb_queue_empty(&sk->sk_receive_queue)) {
- ++recv_poll_hit;
+ unsigned long jiffies_end = jiffies + recv_poll * HZ / 1000;
+
+ sdp_prf(sk, NULL, "polling recv");
+
+ while (jiffies <= jiffies_end) {
+ if (sdp_process_rx(sdp_sk(sk))) {
+ SDPSTATS_COUNTER_INC(rx_poll_hit);
return 0;
}
}
- ++recv_poll_miss;
+ SDPSTATS_COUNTER_INC(rx_poll_miss);
return 1;
}
posts_handler_put(ssk);
+ /* Before going to sleep, make sure no credit update is missed */
+ sdp_arm_rx_cq(sk);
+
set_bit(SOCK_NOSPACE, &sk->sk_socket->flags);
sk->sk_write_pending++;
!(sk->sk_shutdown & SEND_SHUTDOWN) &&
!ssk->tx_compl_pending &&
sk_stream_memory_free(sk) &&
+ tx_credits(ssk) > SDP_MIN_TX_CREDITS &&
vm_wait);
}
- sdp_prf1(sk, NULL, "Woke up");
+ sdp_prf(sk, NULL, "Woke up. memfree: %d", sk_stream_memory_free(sk));
sk->sk_write_pending--;
posts_handler_get(ssk);
* we stop sending once we run out of remote
* receive credits.
*/
- if (bz) {
- if (tx_slots_free(ssk) < bz->busy)
- goto wait_for_sndbuf;
- } else {
- if (!sk_stream_memory_free(sk))
+#define can_not_tx(__bz) (\
+ ( __bz && tx_slots_free(ssk) < __bz->busy) || \
+ (!__bz && !sk_stream_memory_free(sk)))
+ if (unlikely(can_not_tx(bz))) {
+ if (!poll_recv_cq(sk)) {
+ sdp_do_posts(ssk);
+ }
+ if ((can_not_tx(bz))) {
+ sdp_arm_rx_cq(sk);
goto wait_for_sndbuf;
+ }
}
skb = sdp_alloc_skb_data(sk, 0);
wait_for_sndbuf:
set_bit(SOCK_NOSPACE, &sk->sk_socket->flags);
wait_for_memory:
- sdp_prf(sk, skb, "wait for mem");
+ sdp_prf(sk, skb, "wait for mem. credits: %d", tx_credits(ssk));
SDPSTATS_COUNTER_INC(send_wait_for_mem);
if (copied)
sdp_push(sk, ssk, flags & ~MSG_MORE);
sdp_auto_moderation(ssk);
- posts_handler_put(ssk);
+ err = copied;
- release_sock(sk);
+ sdp_dbg_data(sk, "copied: 0x%x\n", copied);
- sdp_dbg_data(sk, "copied: 0x%x\n", copied);
- return copied;
+ goto fin;
do_fault:
sdp_prf(sk, skb, "prepare fault");
if (bz)
bz = sdp_bz_cleanup(bz);
err = sk_stream_error(sk, flags, err);
+ sdp_dbg_data(sk, "err: %d\n", err);
+fin:
posts_handler_put(ssk);
release_sock(sk);
}
rc = poll_recv_cq(sk);
+ if (!rc) {
+ sdp_do_posts(ssk);
+ }
if (copied >= target && !recv_poll) {
/* Do not sleep, just process backlog. */
lock_sock(sk);
} else if (rc) {
sdp_dbg_data(sk, "sk_wait_data %ld\n", timeo);
+ sdp_prf(sk, NULL, "giving up polling");
+ sdp_arm_rx_cq(sk);
posts_handler_put(ssk);
struct sock *sk = socket->sk;
struct sdp_sock *ssk = sdp_sk(sk);
- sdp_dbg_data(socket->sk, "%s\n", __func__);
+ sdp_dbg_data(sk, "%s\n", __func__);
+
+ if (sk->sk_state == TCP_ESTABLISHED) {
+ sdp_prf(sk, NULL, "polling");
+ if (poll_recv_cq(sk)) {
+ sdp_arm_rx_cq(sk);
+ }
+ }
mask = datagram_poll(file, socket, wait);
SDPSTATS_COUNTER_GET(send_miss_no_credits));
seq_printf(seq, "rx_poll_miss \t\t: %d\n", SDPSTATS_COUNTER_GET(rx_poll_miss));
+ seq_printf(seq, "rx_poll_hit \t\t: %d\n", SDPSTATS_COUNTER_GET(rx_poll_hit));
seq_printf(seq, "tx_poll_miss \t\t: %d\n", SDPSTATS_COUNTER_GET(tx_poll_miss));
seq_printf(seq, "tx_poll_busy \t\t: %d\n", SDPSTATS_COUNTER_GET(tx_poll_busy));
seq_printf(seq, "tx_poll_hit \t\t: %d\n", SDPSTATS_COUNTER_GET(tx_poll_hit));
struct sk_buff *skb;
if (unlikely(id != ring_tail(ssk->rx_ring))) {
- printk(KERN_WARNING "Bogus recv completion id %d tail %d\n",
+ sdp_warn(&ssk->isk.sk, "Bogus recv completion id %d tail %d\n",
id, ring_tail(ssk->rx_ring));
return NULL;
}
if (mseq_ack >= ssk->nagle_last_unacked)
ssk->nagle_last_unacked = 0;
- sdp_prf1(&ssk->isk.sk, skb, "RX %s +%d c:%d->%d mseq:%d ack:%d",
+ sdp_prf1(&ssk->isk.sk, skb, "RX: %s +%d c:%d->%d mseq:%d ack:%d",
mid2str(h->mid), ntohs(h->bufs), credits_before,
tx_credits(ssk), ntohl(h->mseq), ntohl(h->mseq_ack));
return 0;
}
-/* called only from irq */
static struct sk_buff *sdp_process_rx_wc(struct sdp_sock *ssk,
struct ib_wc *wc)
{
}
}
-/* only from interrupt. */
static int sdp_poll_rx_cq(struct sdp_sock *ssk)
{
struct ib_cq *cq = ssk->rx_ring.cq;
return;
}
+ sdp_process_rx(sdp_sk(sk));
+
while ((skb = skb_dequeue(&ssk->rx_ctl_q)))
sdp_process_rx_ctl_skb(ssk, skb);
tasklet_hi_schedule(&ssk->rx_ring.tasklet);
}
-static void sdp_process_rx(unsigned long data)
+int sdp_process_rx(struct sdp_sock *ssk)
{
- struct sdp_sock *ssk = (struct sdp_sock *)data;
struct sock *sk = &ssk->isk.sk;
int wc_processed = 0;
int credits_before;
if (!rx_ring_trylock(&ssk->rx_ring)) {
sdp_dbg(&ssk->isk.sk, "ring destroyed. not polling it\n");
- return;
+ return 0;
}
credits_before = tx_credits(ssk);
wc_processed = sdp_poll_rx_cq(ssk);
- sdp_prf(&ssk->isk.sk, NULL, "processed %d", wc_processed);
+
+ if (wc_processed)
+ sdp_prf(&ssk->isk.sk, NULL, "processed %d", wc_processed);
if (wc_processed) {
sdp_prf(&ssk->isk.sk, NULL, "credits: %d -> %d",
queue_work(rx_comp_wq, &ssk->rx_comp_work);
}
}
- sdp_arm_rx_cq(sk);
+
+ if (ssk->sdp_disconnect || ssk->tx_sa)
+ sdp_arm_rx_cq(sk);
rx_ring_unlock(&ssk->rx_ring);
+
+ return wc_processed;
+}
+
+static void sdp_process_rx_tasklet(unsigned long data)
+{
+ struct sdp_sock *ssk = (struct sdp_sock *)data;
+ sdp_process_rx(ssk);
}
static void sdp_rx_ring_purge(struct sdp_sock *ssk)
sdp_sk(&ssk->isk.sk)->rx_ring.cq = rx_cq;
INIT_WORK(&ssk->rx_comp_work, sdp_rx_comp_work);
- tasklet_init(&ssk->rx_ring.tasklet, sdp_process_rx,
+ tasklet_init(&ssk->rx_ring.tasklet, sdp_process_rx_tasklet,
(unsigned long) ssk);
sdp_arm_rx_cq(&ssk->isk.sk);
h->mseq = htonl(mseq);
h->mseq_ack = htonl(mseq_ack(ssk));
- sdp_prf1(&ssk->isk.sk, skb, "TX: %s bufs: %d mseq:%ld ack:%d",
+ sdp_prf(&ssk->isk.sk, skb, "TX: %s bufs: %d mseq:%ld ack:%d c: %d",
mid2str(h->mid), rx_ring_posted(ssk), mseq,
- ntohl(h->mseq_ack));
+ ntohl(h->mseq_ack), tx_credits(ssk));
SDP_DUMP_PACKET(&ssk->isk.sk, "TX", skb, h);
ssk->rx_sa &&
(tx_sa->bytes_acked < tx_sa->bytes_sent) &&
vm_wait);
- sdp_dbg_data(&ssk->isk.sk, "woke up sleepers\n");
+ sdp_prf(&ssk->isk.sk, NULL, "woke up sleepers");
posts_handler_get(ssk);
goto err_abort_send;
}
+ sdp_arm_rx_cq(sk);
+
rc = sdp_wait_rdmardcompl(ssk, timeo, 0);
if (unlikely(rc)) {
enum tx_sa_flag f = tx_sa->abort_flags;