/* Interval between sucessive polls in the Tx routine when polling is used
instead of interrupts (in per-core Tx rings) - should be power of 2 */
#define SDP_TX_POLL_MODER 16
-#define SDP_TX_POLL_TIMEOUT (HZ / 4)
+#define SDP_TX_POLL_TIMEOUT (HZ / 20)
#define SDP_NAGLE_TIMEOUT (HZ / 10)
#define SDP_SRCAVAIL_CANCEL_TIMEOUT (HZ * 5)
#define SDP_ROUTE_TIMEOUT 1000
#define SDP_RETRY_COUNT 5
#define SDP_KEEPALIVE_TIME (120 * 60 * HZ)
-#define SDP_FIN_WAIT_TIMEOUT (60 * HZ)
+#define SDP_FIN_WAIT_TIMEOUT (10 * HZ)
#define SDP_TX_SIZE 0x40
#define SDP_RX_SIZE 0x40
struct delayed_work dreq_wait_work;
struct work_struct destroy_work;
+ int tx_compl_pending;
atomic_t somebody_is_doing_posts;
/* Like tcp_sock */
return min_free - SDP_MIN_TX_CREDITS;
};
-/* like sk_stream_memory_free - except measures remote credits */
-static inline int sdp_bzcopy_slots_avail(struct sdp_sock *ssk,
- struct bzcopy_state *bz)
-{
- return tx_slots_free(ssk) > bz->busy;
-}
-
/* utilities */
static inline char *mid2str(int mid)
{
void sdp_destroy_work(struct work_struct *work);
void sdp_reset_sk(struct sock *sk, int rc);
void sdp_reset(struct sock *sk);
-int sdp_bzcopy_wait_memory(struct sdp_sock *ssk, long *timeo_p,
- struct bzcopy_state *bz);
+int sdp_tx_wait_memory(struct sdp_sock *ssk, long *timeo_p, int *credits_needed);
void skb_entail(struct sock *sk, struct sdp_sock *ssk, struct sk_buff *skb);
/* sdp_proc.c */
"Default idle time in seconds before keepalive probe sent.");
SDP_MODPARAM_SINT(sdp_bzcopy_thresh, 65536,
"Zero copy send using SEND threshold; 0=0ff.");
-SDP_MODPARAM_SINT(sdp_zcopy_thresh, 128*1024,
+SDP_MODPARAM_SINT(sdp_zcopy_thresh, 0, //128*1024,
"Zero copy using RDMA threshold; 0=0ff.");
#define SDP_RX_COAL_TIME_HIGH 128
SDP_MODPARAM_SINT(sdp_rx_coal_target, 0x50000,
ssk->destruct_in_process = 0;
spin_lock_init(&ssk->lock);
spin_lock_init(&ssk->tx_sa_lock);
+ ssk->tx_compl_pending = 0;
atomic_set(&ssk->somebody_is_doing_posts, 0);
return copy;
}
-/* like sk_stream_wait_memory - except waits on remote credits */
-int sdp_bzcopy_wait_memory(struct sdp_sock *ssk, long *timeo_p,
- struct bzcopy_state *bz)
+/* like sk_stream_wait_memory - except:
+ * - if credits_needed provided - wait for enough credits
+ * - TX irq will use this (in sendmsg context) to do the actual tx
+ * comp poll and post
+ */
+int sdp_tx_wait_memory(struct sdp_sock *ssk, long *timeo_p, int *credits_needed)
{
struct sock *sk = &ssk->isk.sk;
int err = 0;
long current_timeo = *timeo_p;
DEFINE_WAIT(wait);
- BUG_ON(!bz);
-
- if (sdp_bzcopy_slots_avail(ssk, bz))
+ if (sk_stream_memory_free(sk))
current_timeo = vm_wait = (net_random() % (HZ / 5)) + 2;
while (1) {
prepare_to_wait(sk->sk_sleep, &wait, TASK_INTERRUPTIBLE);
- if (unlikely(sk->sk_err | (sk->sk_shutdown & SEND_SHUTDOWN))) {
- err = -EPIPE;
- break;
- }
+ if (sk->sk_err || (sk->sk_shutdown & SEND_SHUTDOWN))
+ goto do_error;
+ if (!*timeo_p)
+ goto do_nonblock;
+ if (signal_pending(current))
+ goto do_interrupted;
+ clear_bit(SOCK_ASYNC_NOSPACE, &sk->sk_socket->flags);
- if (unlikely(!*timeo_p)) {
- err = -EAGAIN;
- break;
- }
+ sdp_do_posts(ssk);
- if (unlikely(signal_pending(current))) {
- err = sock_intr_errno(*timeo_p);
- break;
+ if (credits_needed) {
+ if (tx_slots_free(ssk) >= *credits_needed)
+ break;
+ } else {
+ if (sk_stream_memory_free(sk) && !vm_wait)
+ break;
}
- clear_bit(SOCK_ASYNC_NOSPACE, &sk->sk_socket->flags);
-
posts_handler_put(ssk);
- if (sdp_bzcopy_slots_avail(ssk, bz))
- break;
-
set_bit(SOCK_NOSPACE, &sk->sk_socket->flags);
sk->sk_write_pending++;
+ sdp_prf1(sk, NULL, "Going to sleep");
+
if (tx_credits(ssk) > SDP_MIN_TX_CREDITS)
sdp_arm_tx_cq(sk);
- sk_wait_event(sk, ¤t_timeo,
- sdp_bzcopy_slots_avail(ssk, bz) && vm_wait);
+ if (credits_needed) {
+ sk_wait_event(sk, ¤t_timeo,
+ !sk->sk_err &&
+ !(sk->sk_shutdown & SEND_SHUTDOWN) &&
+ !ssk->tx_compl_pending &&
+ tx_slots_free(ssk) >= *credits_needed &&
+ vm_wait);
+ } else {
+ sk_wait_event(sk, ¤t_timeo,
+ !sk->sk_err &&
+ !(sk->sk_shutdown & SEND_SHUTDOWN) &&
+ !ssk->tx_compl_pending &&
+ sk_stream_memory_free(sk) &&
+ vm_wait);
+ }
+
+ sdp_prf1(sk, NULL, "Woke up");
sk->sk_write_pending--;
- sdp_prf1(sk, NULL, "finished wait for mem");
posts_handler_get(ssk);
- sdp_do_posts(ssk);
+
+ if (!ssk->qp_active)
+ goto do_error;
if (vm_wait) {
vm_wait -= current_timeo;
}
*timeo_p = current_timeo;
}
-
+out:
finish_wait(sk->sk_sleep, &wait);
return err;
+
+do_error:
+ err = -EPIPE;
+ goto out;
+do_nonblock:
+ err = -EAGAIN;
+ goto out;
+do_interrupted:
+ err = sock_intr_errno(*timeo_p);
+ goto out;
}
/* Like tcp_sendmsg */
* receive credits.
*/
if (bz) {
- if (!sdp_bzcopy_slots_avail(ssk, bz))
+ if (tx_slots_free(ssk) < bz->busy)
goto wait_for_sndbuf;
} else {
if (!sk_stream_memory_free(sk))
if (copied)
sdp_push(sk, ssk, flags & ~MSG_MORE);
- sdp_xmit_poll(ssk, 1);
-
- if (bz) {
- err = sdp_bzcopy_wait_memory(ssk, &timeo, bz);
- } else {
- posts_handler_put(ssk);
-
- sdp_arm_tx_cq(sk);
-
- err = sk_stream_wait_memory(sk, &timeo);
-
- posts_handler_get(ssk);
- sdp_do_posts(ssk);
- }
-
+ err = sdp_tx_wait_memory(ssk, &timeo,
+ bz ? &bz->busy : NULL);
if (err)
goto do_error;
if (!timer_pending(&ssk->tx_ring.timer))
mod_timer(&ssk->tx_ring.timer, jiffies + SDP_TX_POLL_TIMEOUT);
+ ssk->tx_compl_pending = 0;
+
/* Poll the CQ every SDP_TX_POLL_MODER packets */
if (force || (++ssk->tx_ring.poll_cnt & (SDP_TX_POLL_MODER - 1)) == 0)
wc_processed = sdp_process_tx_cq(ssk);
if (wc_processed) {
struct sock *sk = &ssk->isk.sk;
sdp_post_sends(ssk, 0);
+ sdp_prf1(sk, NULL, "Waking sendmsg. inflight=%d",
+ (u32) tx_ring_posted(ssk));
+ sk_stream_write_space(&ssk->isk.sk);
+ if (sk->sk_write_pending &&
+ test_bit(SOCK_NOSPACE, &sk->sk_socket->flags) &&
+ tx_ring_posted(ssk)) {
+ /* a write is pending and still no room in tx queue,
+ * arm tx cq
+ */
+ sdp_prf(&ssk->isk.sk, NULL, "pending tx - rearming");
+ sdp_arm_tx_cq(sk);
+ }
- if (sk->sk_sleep && waitqueue_active(sk->sk_sleep))
- sk_stream_write_space(&ssk->isk.sk);
}
return wc_processed;
}
+/* Select who will handle tx completion:
+ * - a write is pending - wake it up and let it do the poll + post
+ * - post handler is taken - taker will do the poll + post
+ * else return 1 and let the caller do it
+ */
+static int sdp_tx_handler_select(struct sdp_sock *ssk)
+{
+ struct sock *sk = &ssk->isk.sk;
+
+ if (sk->sk_write_pending) {
+ /* Do the TX posts from sender context */
+ if (sk->sk_sleep && waitqueue_active(sk->sk_sleep)) {
+ sdp_prf1(sk, NULL, "Waking up pending sendmsg");
+ wake_up_interruptible(sk->sk_sleep);
+ return 0;
+ } else
+ sdp_prf1(sk, NULL, "Unexpected: sk_sleep=%p, "
+ "waitqueue_active: %d",
+ sk->sk_sleep, waitqueue_active(sk->sk_sleep));
+ }
+
+ if (posts_handler(ssk)) {
+ /* Somebody else available to check for completion */
+ sdp_prf1(sk, NULL, "Somebody else will call do_posts");
+ return 0;
+ }
+
+ return 1;
+}
+
static void sdp_poll_tx_timeout(unsigned long data)
{
struct sdp_sock *ssk = (struct sdp_sock *)data;
struct sock *sk = &ssk->isk.sk;
u32 inflight, wc_processed;
- sdp_prf1(&ssk->isk.sk, NULL, "TX timeout: inflight=%d",
- (u32) tx_ring_posted(ssk));
+ sdp_prf1(&ssk->isk.sk, NULL, "TX timeout: inflight=%d, head=%d tail=%d",
+ (u32) tx_ring_posted(ssk),
+ ring_head(ssk->tx_ring), ring_tail(ssk->tx_ring));
/* Only process if the socket is not in use */
bh_lock_sock(sk);
if (sock_owned_by_user(sk)) {
- mod_timer(&ssk->tx_ring.timer, jiffies + SDP_TX_POLL_TIMEOUT);
- sdp_prf(&ssk->isk.sk, NULL, "TX comp: socket is busy\n");
+ sdp_prf(&ssk->isk.sk, NULL, "TX comp: socket is busy");
+
+ if (sdp_tx_handler_select(ssk)) {
+ sdp_prf1(sk, NULL, "schedule a timer");
+ mod_timer(&ssk->tx_ring.timer, jiffies + SDP_TX_POLL_TIMEOUT);
+ }
+
SDPSTATS_COUNTER_INC(tx_poll_busy);
goto out;
}
SDPSTATS_COUNTER_INC(tx_poll_hit);
inflight = (u32) rx_ring_posted(ssk);
+ sdp_prf1(&ssk->isk.sk, NULL, "finished tx proccessing. inflight = %d",
+ tx_ring_posted(ssk));
/* If there are still packets in flight and the timer has not already
* been scheduled by the Tx routine then schedule it here to guarantee
SDPSTATS_COUNTER_INC(tx_int_count);
- mod_timer(&ssk->tx_ring.timer, jiffies + SDP_TX_POLL_TIMEOUT);
- tasklet_schedule(&ssk->tx_ring.tasklet);
+ ssk->tx_compl_pending = 1;
+
+ if (sdp_tx_handler_select(ssk)) {
+ sdp_prf1(sk, NULL, "poll and post from tasklet");
+ mod_timer(&ssk->tx_ring.timer, jiffies + SDP_TX_POLL_TIMEOUT);
+ tasklet_schedule(&ssk->tx_ring.tasklet);
+ }
}
void sdp_tx_ring_purge(struct sdp_sock *ssk)
#include <linux/delay.h>
#include "sdp.h"
-static struct bzcopy_state dummy_bz = {
-busy: 1,
-};
-
static int sdp_post_srcavail(struct sock *sk, struct tx_srcavail_state *tx_sa,
int page_idx, int off, size_t len)
{
{
struct sdp_sock *ssk = sdp_sk(sk);
int ret = 0;
+ int credits_needed = 1;
sdp_dbg_data(sk, "Wait for mem\n");
sdp_xmit_poll(ssk, 1);
- ret = sdp_bzcopy_wait_memory(ssk, timeo_p, &dummy_bz);
+ ret = sdp_tx_wait_memory(ssk, timeo_p, &credits_needed);
return ret;
}
sdp_dbg_data(sk, "off: 0x%x len: 0x%x page_cnt: 0x%x\n",
offset, len, page_cnt);
- if (!sdp_bzcopy_slots_avail(ssk, &dummy_bz)) {
+ if (tx_slots_free(ssk) == 0) {
rc = wait_for_sndbuf(sk, &timeo);
if (rc) {
sdp_warn(sk, "Couldn't get send buffer\n");