#define SDP_TX_POLL_TIMEOUT (HZ / 20)
#define SDP_NAGLE_TIMEOUT (HZ / 10)
-#define SDP_RX_POLL_TIMEOUT (1 + HZ / 1000)
+#define SDP_RX_ARMING_DELAY (msecs_to_jiffies(10))
#define SDP_RDMA_READ_TIMEOUT (5 * HZ)
#define SDP_SRCAVAIL_CANCEL_TIMEOUT (HZ * 5)
#define posts_handler_get(ssk) \
do { \
atomic_inc(&ssk->somebody_is_doing_posts); \
- /* postpone the rx_ring.timer, there is no need to enable
- * interrupts because there will be cq-polling. */ \
- if (likely(ssk->qp_active)) \
- mod_timer(&ssk->rx_ring.timer, MAX_JIFFY_OFFSET); \
+ sdp_postpone_rx_timer(ssk); \
} while (0)
#define posts_handler_put(ssk, intr_delay) \
do { \
sdp_do_posts(ssk); \
if (atomic_dec_and_test(&ssk->somebody_is_doing_posts) && \
- likely(ssk->qp_active)) { \
- if (intr_delay) \
- mod_timer(&ssk->rx_ring.timer, intr_delay); \
- else \
- /* There is no point of setting up a timer
- * for an immediate cq-arming, better arm it
- * now. */ \
- sdp_arm_rx_cq(&ssk->isk.sk); \
- } \
+ likely(ssk->qp_active)) \
+ sdp_schedule_arm_rx_cq(ssk, intr_delay);\
} while (0)
#define sdp_common_release(sk) do { \
atomic_t tail;
struct ib_cq *cq;
- int destroyed;
- rwlock_t destroyed_lock;
- spinlock_t lock;
-
- struct timer_list timer;
- struct tasklet_struct tasklet;
+ struct timer_list cq_arm_timer;
};
struct sdp_device {
struct sk_buff_head rx_ctl_q;
struct sock *parent;
struct sdp_device *sdp_dev;
+ int cpu;
int qp_active;
spinlock_t tx_sa_lock;
sizeof(*tx_sa) - offsetof(typeof(*tx_sa), busy));
}
-static inline void rx_ring_unlock(struct sdp_rx_ring *rx_ring)
-{
- read_unlock_bh(&rx_ring->destroyed_lock);
-}
-
-static inline int rx_ring_trylock(struct sdp_rx_ring *rx_ring)
-{
- read_lock_bh(&rx_ring->destroyed_lock);
- if (rx_ring->destroyed) {
- rx_ring_unlock(rx_ring);
- return 0;
- }
- return 1;
-}
-
-static inline void rx_ring_destroy_lock(struct sdp_rx_ring *rx_ring)
-{
- write_lock_bh(&rx_ring->destroyed_lock);
- rx_ring->destroyed = 1;
- write_unlock_bh(&rx_ring->destroyed_lock);
-}
-
static inline int sdp_chk_sa_cancel(struct sdp_sock *ssk, struct rx_srcavail_state *rx_sa)
{
- int res;
-
- spin_lock_irq(&ssk->rx_ring.lock);
- res = ssk->sa_cancel_arrived &&
+ return ssk->sa_cancel_arrived &&
before(rx_sa->mseq, ssk->sa_cancel_mseq);
- spin_unlock_irq(&ssk->rx_ring.lock);
-
- return res;
}
static inline struct sdp_sock *sdp_sk(const struct sock *sk)
sk->sk_error_report(sk);
}
-static inline void sdp_arm_rx_cq(struct sock *sk)
-{
- if (unlikely(!sdp_sk(sk)->rx_ring.cq))
- return;
-
- sdp_prf(sk, NULL, "Arming RX cq");
- sdp_dbg_data(sk, "Arming RX cq\n");
-
- ib_req_notify_cq(sdp_sk(sk)->rx_ring.cq, IB_CQ_NEXT_COMP);
-}
-
-static inline void sdp_arm_tx_cq(struct sock *sk)
-{
- if (unlikely(!sdp_sk(sk)->tx_ring.cq))
- return;
-
- sdp_prf(sk, NULL, "Arming TX cq");
- sdp_dbg_data(sk, "Arming TX cq. credits: %d, posted: %d\n",
- tx_credits(sdp_sk(sk)), tx_ring_posted(sdp_sk(sk)));
-
- ib_req_notify_cq(sdp_sk(sk)->tx_ring.cq, IB_CQ_NEXT_COMP);
-}
-
/* return the min of:
* - tx credits
* - free slots in tx_ring (not including SDP_MIN_TX_CREDITS
u32 sendmsg;
u32 recvmsg;
u32 post_send_credits;
- u32 sendmsg_nagle_skip;
u32 sendmsg_seglen[25];
u32 send_size[25];
u32 post_recv;
+ u32 rx_int_arm;
+ u32 tx_int_arm;
u32 rx_int_count;
u32 tx_int_count;
+ u32 rx_int_wake_up;
+ u32 rx_int_queue;
+ u32 rx_int_no_op;
+ u32 rx_cq_modified;
+ u32 rx_cq_arm_timer;
u32 rx_wq;
u32 bzcopy_poll_miss;
u32 send_wait_for_mem;
u32 send_miss_no_credits;
u32 rx_poll_miss;
u32 rx_poll_hit;
+ u32 poll_hit_usec[16];
u32 tx_poll_miss;
u32 tx_poll_hit;
u32 tx_poll_busy;
+ u32 tx_poll_no_op;
u32 memcpy_count;
u32 credits_before_update[64];
u32 zcopy_tx_timeout;
u32 zcopy_tx_aborted;
u32 zcopy_tx_error;
u32 fmr_alloc_error;
+ u32 keepalive_timer;
+ u32 nagle_timer;
};
static inline void sdpstats_hist(u32 *h, u32 val, u32 maxidx, int is_log)
}
}
+static inline void sdp_arm_rx_cq(struct sock *sk)
+{
+ if (unlikely(!sdp_sk(sk)->rx_ring.cq))
+ return;
+
+ SDPSTATS_COUNTER_INC(rx_int_arm);
+ sdp_dbg_data(sk, "Arming RX cq\n");
+
+ if (unlikely(0 > ib_req_notify_cq(sdp_sk(sk)->rx_ring.cq,
+ IB_CQ_NEXT_COMP)))
+ sdp_warn(sk, "error arming rx cq\n");
+}
+
+static inline void sdp_arm_tx_cq(struct sock *sk)
+{
+ if (unlikely(!sdp_sk(sk)->tx_ring.cq))
+ return;
+
+ SDPSTATS_COUNTER_INC(tx_int_arm);
+ sdp_dbg_data(sk, "Arming TX cq. credits: %d, posted: %d\n",
+ tx_credits(sdp_sk(sk)), tx_ring_posted(sdp_sk(sk)));
+
+ if (unlikely(0 > ib_req_notify_cq(sdp_sk(sk)->tx_ring.cq,
+ IB_CQ_NEXT_COMP)))
+ sdp_warn(sk, "error arming tx cq\n");
+}
+
+static inline void sdp_postpone_rx_timer(struct sdp_sock *ssk)
+{
+ if (timer_pending(&ssk->rx_ring.cq_arm_timer) && ssk->qp_active)
+ mod_timer(&ssk->rx_ring.cq_arm_timer, MAX_JIFFY_OFFSET);
+}
+
+static inline void sdp_schedule_arm_rx_cq(struct sdp_sock *ssk,
+ unsigned long delay)
+{
+ if (unlikely(!ssk->rx_ring.cq))
+ return;
+
+ if (delay && ssk->qp_active)
+ mod_timer(&ssk->rx_ring.cq_arm_timer, jiffies + delay);
+ else {
+ /* There is no point of setting up a timer for an immediate
+ * cq-arming, better arm it now. */
+ sdp_arm_rx_cq(&ssk->isk.sk);
+ }
+}
+
+static inline int somebody_is_waiting(struct sock *sk)
+{
+ return sk->sk_socket &&
+ test_bit(SOCK_ASYNC_WAITDATA, &sk->sk_socket->flags);
+}
+
/* sdp_main.c */
void sdp_set_default_moderation(struct sdp_sock *ssk);
int sdp_init_sock(struct sock *sk);
void sdp_post_keepalive(struct sdp_sock *ssk);
/* sdp_rx.c */
-void sdp_rx_ring_init(struct sdp_sock *ssk);
int sdp_rx_ring_create(struct sdp_sock *ssk, struct ib_device *device);
void sdp_rx_ring_destroy(struct sdp_sock *ssk);
int sdp_resize_buffers(struct sdp_sock *ssk, u32 new_size);
void sdp_rx_comp_full(struct sdp_sock *ssk);
void sdp_remove_large_sock(const struct sdp_sock *ssk);
void sdp_handle_disconn(struct sock *sk);
-int sdp_process_rx(struct sdp_sock *ssk);
+int sdp_poll_rx_cq(struct sdp_sock *ssk);
/* sdp_zcopy.c */
int sdp_sendmsg_zcopy(struct kiocb *iocb, struct sock *sk, struct iovec *iov);
srcah = (struct sdp_srcah *)(h+1);
len += snprintf(buf + len, 255-len, " | payload: 0x%zx, "
- "len: 0x%zx, rkey: 0x%x, vaddr: 0x%llx |",
+ "len: 0x%x, rkey: 0x%x, vaddr: 0x%llx |",
ntohl(h->len) - sizeof(struct sdp_bsdh) -
sizeof(struct sdp_srcah),
ntohl(srcah->len), ntohl(srcah->rkey),
struct sdp_sock *ssk = (struct sdp_sock *)data;
struct sock *sk = &ssk->isk.sk;
+ SDPSTATS_COUNTER_INC(nagle_timer);
sdp_dbg_data(sk, "last_unacked = %ld\n", ssk->nagle_last_unacked);
if (!ssk->nagle_last_unacked)
}
}
+static inline int sdp_should_rearm(struct sock *sk)
+{
+ return sk->sk_state != TCP_ESTABLISHED || sdp_sk(sk)->tx_sa ||
+ somebody_is_waiting(sk);
+}
+
void sdp_post_sends(struct sdp_sock *ssk, gfp_t gfp)
{
/* TODO: nonagle? */
sdp_xmit_poll(ssk, 1);
/* Run out of credits, check if got a credit update */
- if (unlikely(tx_credits(ssk) <= SDP_MIN_TX_CREDITS))
- sdp_process_rx(ssk);
+ if (unlikely(tx_credits(ssk) <= SDP_MIN_TX_CREDITS)) {
+ sdp_poll_rx_cq(ssk);
+
+ if (unlikely(sdp_should_rearm(sk) || !posts_handler(ssk)))
+ sdp_arm_rx_cq(sk);
+ }
if (ssk->recv_request &&
ring_tail(ssk->rx_ring) >= ssk->recv_request_head &&
_sdp_printk(__func__, __LINE__, level, sk, format, ## arg)
#define sdp_warn(sk, format, arg...) \
do { \
- sdp_printk(KERN_WARNING, sk, "\t%lx: " format , jiffies, ## arg); \
+ sdp_printk(KERN_WARNING, sk, format, ## arg); \
sdp_prf(sk, NULL, format , ## arg); \
} while (0)
SDP_MODPARAM_SINT(sdp_fmr_pool_size, 20, "Number of FMRs to allocate for pool");
SDP_MODPARAM_SINT(sdp_fmr_dirty_wm, 5, "Watermark to flush fmr pool");
-SDP_MODPARAM_SINT(recv_poll, 10, "How many msec to poll recv.");
+SDP_MODPARAM_SINT(recv_poll, 700, "usecs to poll recv before arming interrupt.");
SDP_MODPARAM_SINT(sdp_keepalive_time, SDP_KEEPALIVE_TIME,
"Default idle time in seconds before keepalive probe sent.");
static int sdp_bzcopy_thresh = 0;
ssk->qp_active = 0;
- del_timer(&ssk->tx_ring.timer);
-
if (ssk->qp) {
ib_destroy_qp(ssk->qp);
ssk->qp = NULL;
struct sdp_sock *ssk = sdp_sk(sk);
sdp_dbg(sk, "%s\n", __func__);
+ SDPSTATS_COUNTER_INC(keepalive_timer);
/* Only process if the socket is not in use */
bh_lock_sock(sk);
sock_put(sk, SOCK_REF_KEEPALIVE);
}
-static void sdp_init_keepalive_timer(struct sock *sk)
-{
- sk->sk_timer.function = sdp_keepalive_timer;
- sk->sk_timer.data = (unsigned long)sk;
-}
-
static void sdp_set_keepalive(struct sock *sk, int val)
{
sdp_dbg(sk, "%s %d\n", __func__, val);
if (hw_int_mod_count > 0 && hw_int_mod_usec > 0) {
err = ib_modify_cq(ssk->rx_ring.cq, hw_int_mod_count,
hw_int_mod_usec);
- if (err)
+ if (unlikely(err))
sdp_warn(sk,
- "Failed modifying moderation for cq");
+ "Failed modifying moderation for cq\n");
else
sdp_dbg(sk,
"Using fixed interrupt moderation\n");
+ SDPSTATS_COUNTER_INC(rx_cq_modified);
}
return;
}
if (moder_time != mod->last_moder_time) {
mod->last_moder_time = moder_time;
err = ib_modify_cq(ssk->rx_ring.cq, mod->moder_cnt, moder_time);
- if (err) {
+ if (unlikely(err)) {
sdp_dbg_data(&ssk->isk.sk,
"Failed modifying moderation for cq");
}
+ SDPSTATS_COUNTER_INC(rx_cq_modified);
}
out:
sdp_dbg(sk, "%s\n", __func__);
sdp_prf(sk, NULL, __func__);
+ sdp_sk(sk)->cpu = smp_processor_id();
sdp_delete_keepalive_timer(sk);
sk->sk_shutdown = SHUTDOWN_MASK;
.sin_addr.s_addr = inet_sk(sk)->saddr,
};
int rc;
+
+ ssk->cpu = smp_processor_id();
release_sock(sk);
flush_workqueue(sdp_wq);
lock_sock(sk);
sdp_dbg(sk, "%s\n", __func__);
+ ssk->cpu = smp_processor_id();
if (sk->sk_state != TCP_LISTEN) {
if (ssk->id) {
sdp_sk(sk)->qp_active = 0;
ssk = sdp_sk(sk);
lock_sock(sk);
+ ssk->cpu = smp_processor_id();
/* We need to make sure that this socket is listening,
* and that it has something pending.
return -EINVAL;
lock_sock(sk);
+ ssk->cpu = smp_processor_id();
if ((1 << sk->sk_state) & (TCPF_SYN_SENT | TCPF_SYN_RECV))
answ = 0;
else if (sock_flag(sk, SOCK_URGINLINE) ||
atomic_set(&ssk->mseq_ack, 0);
- sdp_rx_ring_init(ssk);
+ ssk->rx_ring.buffer = NULL;
ssk->tx_ring.buffer = NULL;
ssk->sdp_disconnect = 0;
ssk->destructed_already = 0;
ssk->tx_compl_pending = 0;
atomic_set(&ssk->somebody_is_doing_posts, 0);
-
+ ssk->cpu = smp_processor_id();
ssk->tx_ring.rdma_inflight = NULL;
- init_timer(&ssk->rx_ring.timer);
+ init_timer(&ssk->rx_ring.cq_arm_timer);
init_timer(&ssk->tx_ring.timer);
init_timer(&ssk->nagle_timer);
init_timer(&sk->sk_timer);
return -EFAULT;
lock_sock(sk);
+ ssk->cpu = smp_processor_id();
/* SOCK_KEEPALIVE is really a SOL_SOCKET level option but there
* is a problem handling it at that level. In order to start
return 0;
}
+static inline int cycles_before(cycles_t a, cycles_t b)
+{
+ /* cycles_t is unsigned, but may be int/long/long long. */
+
+ if (sizeof(cycles_t) == 4)
+ return before(a, b);
+ else
+ return (s64)(a - b) < 0;
+}
+
static inline int poll_recv_cq(struct sock *sk)
{
- unsigned long jiffies_end = jiffies + recv_poll * HZ / 1000;
+ cycles_t start = get_cycles();
+ cycles_t end = start + recv_poll * cpu_khz / 1000;
sdp_prf(sk, NULL, "polling recv");
return 0;
do {
- if (sdp_process_rx(sdp_sk(sk))) {
+ if (sdp_poll_rx_cq(sdp_sk(sk))) {
SDPSTATS_COUNTER_INC(rx_poll_hit);
+ SDPSTATS_HIST(poll_hit_usec,
+ (get_cycles() - start) *
+ 1000 / cpu_khz);
return 0;
}
- } while (jiffies < jiffies_end);
+ } while (cycles_before(get_cycles(), end));
+
SDPSTATS_COUNTER_INC(rx_poll_miss);
return 1;
}
SDPSTATS_COUNTER_INC(sendmsg);
lock_sock(sk);
+ ssk->cpu = smp_processor_id();
sdp_dbg_data(sk, "%s size = 0x%zx\n", __func__, size);
posts_handler_get(ssk);
( __bz && tx_slots_free(ssk) < __bz->busy) || \
(!__bz && !sk_stream_memory_free(sk)))
if (unlikely(can_not_tx(bz))) {
- if (!poll_recv_cq(sk)) {
+ if (!poll_recv_cq(sk))
sdp_do_posts(ssk);
- }
- if ((can_not_tx(bz))) {
- sdp_arm_rx_cq(sk);
+ if ((can_not_tx(bz)))
goto wait_for_sndbuf;
- }
}
skb = sdp_alloc_skb_data(sk, min(seglen, size_goal), 0);
sdp_dbg_data(sk, "err: %d\n", err);
fin:
- posts_handler_put(ssk, jiffies + SDP_RX_POLL_TIMEOUT);
+ posts_handler_put(ssk, SDP_RX_ARMING_DELAY);
if (!err && !ssk->qp_active) {
err = -EPIPE;
h->mid = SDP_MID_DATA;
- spin_lock_irq(&ssk->rx_ring.lock);
-
RX_SRCAVAIL_STATE(ssk->rx_sa->skb) = NULL;
kfree(ssk->rx_sa);
ssk->rx_sa = NULL;
- spin_unlock_irq(&ssk->rx_ring.lock);
-
return 0;
}
SDPSTATS_COUNTER_INC(recvmsg);
lock_sock(sk);
+ ssk->cpu = smp_processor_id();
sdp_dbg_data(sk, "iovlen: %zd iov_len: 0x%zx flags: 0x%x peek: 0x%x\n",
msg->msg_iovlen, msg->msg_iov[0].iov_len, flags,
MSG_PEEK);
goto found_fin_ok;
case SDP_MID_SRCAVAIL:
- spin_lock_irq(&ssk->rx_ring.lock);
rx_sa = RX_SRCAVAIL_STATE(skb);
if (unlikely(!rx_sa)) {
/* SrcAvailCancel arrived and handled */
h->mid = SDP_MID_DATA;
- spin_unlock_irq(&ssk->rx_ring.lock);
goto sdp_mid_data;
}
rx_sa->is_treated = 1;
- spin_unlock_irq(&ssk->rx_ring.lock);
if (sdp_chk_sa_cancel(ssk, rx_sa) ||
!ssk->sdp_dev ||
err = copied;
out:
- posts_handler_put(ssk, jiffies + SDP_RX_POLL_TIMEOUT);
+ posts_handler_put(ssk, SDP_RX_ARMING_DELAY);
sdp_auto_moderation(ssk);
int err;
lock_sock(sk);
+ sdp_sk(sk)->cpu = smp_processor_id();
err = -EINVAL;
if (sock->state != SS_UNCONNECTED)
sdp_dbg_data(sk, "%s\n", __func__);
lock_sock(sk);
+ sdp_sk(sk)->cpu = smp_processor_id();
if (sk->sk_state == TCP_ESTABLISHED) {
sdp_prf(sk, NULL, "polling");
- if (poll_recv_cq(sk))
- sdp_arm_rx_cq(sk);
+ posts_handler_get(sdp_sk(sk));
+ poll_recv_cq(sk);
+ posts_handler_put(sdp_sk(sk), 0);
}
mask = datagram_poll(file, socket, wait);
sk->sk_destruct = sdp_destruct;
- sdp_init_keepalive_timer(sk);
+ setup_timer(&sk->sk_timer, sdp_keepalive_timer, (unsigned long)sk);
sock->ops = &sdp_proto_ops;
sock->state = SS_UNCONNECTED;
sdp_proto.sockets_allocated = sockets_allocated;
sdp_proto.orphan_count = orphan_count;
- rx_comp_wq = create_singlethread_workqueue("rx_comp_wq");
+ rx_comp_wq = create_workqueue("rx_comp_wq");
if (!rx_comp_wq)
goto no_mem_rx_wq;
memset(s, '*', j);
s[j] = '\0';
- seq_printf(seq, "%10d | %-50s - %d\n", val, s, h[i]);
+ seq_printf(seq, "%10d | %-50s - %u\n", val, s, h[i]);
}
}
}
seq_printf(seq, "\n");
+ seq_printf(seq, "sdp_recvmsg() calls\t\t: %d\n",
+ SDPSTATS_COUNTER_GET(recvmsg));
seq_printf(seq, "post_recv \t\t: %d\n",
SDPSTATS_COUNTER_GET(post_recv));
seq_printf(seq, "BZCopy poll miss \t\t: %d\n",
seq_printf(seq, "rx_poll_miss \t\t: %d\n", SDPSTATS_COUNTER_GET(rx_poll_miss));
seq_printf(seq, "rx_poll_hit \t\t: %d\n", SDPSTATS_COUNTER_GET(rx_poll_hit));
+ __sdpstats_seq_hist(seq, "poll_hit_usec", poll_hit_usec, 1);
+ seq_printf(seq, "rx_cq_arm_timer \t\t: %d\n", SDPSTATS_COUNTER_GET(rx_cq_arm_timer));
+
seq_printf(seq, "tx_poll_miss \t\t: %d\n", SDPSTATS_COUNTER_GET(tx_poll_miss));
seq_printf(seq, "tx_poll_busy \t\t: %d\n", SDPSTATS_COUNTER_GET(tx_poll_busy));
seq_printf(seq, "tx_poll_hit \t\t: %d\n", SDPSTATS_COUNTER_GET(tx_poll_hit));
+ seq_printf(seq, "tx_poll_no_op \t\t: %d\n", SDPSTATS_COUNTER_GET(tx_poll_no_op));
+
+ seq_printf(seq, "keepalive timer \t\t: %d\n", SDPSTATS_COUNTER_GET(keepalive_timer));
+ seq_printf(seq, "nagle timer \t\t: %d\n", SDPSTATS_COUNTER_GET(nagle_timer));
seq_printf(seq, "CQ stats:\n");
- seq_printf(seq, "- RX interrupts\t\t: %d\n", SDPSTATS_COUNTER_GET(rx_int_count));
+ seq_printf(seq, "- RX irq armed \t\t: %d\n", SDPSTATS_COUNTER_GET(rx_int_arm));
+ seq_printf(seq, "- RX interrupts \t\t: %d\n", SDPSTATS_COUNTER_GET(rx_int_count));
+ seq_printf(seq, "- RX int wake up\t\t: %d\n", SDPSTATS_COUNTER_GET(rx_int_wake_up));
+ seq_printf(seq, "- RX int queue \t\t: %d\n", SDPSTATS_COUNTER_GET(rx_int_queue));
+ seq_printf(seq, "- RX int no op \t\t: %d\n", SDPSTATS_COUNTER_GET(rx_int_no_op));
+ seq_printf(seq, "- RX cq modified\t\t: %d\n", SDPSTATS_COUNTER_GET(rx_cq_modified));
+
+ seq_printf(seq, "- TX irq armed\t\t: %d\n", SDPSTATS_COUNTER_GET(tx_int_arm));
seq_printf(seq, "- TX interrupts\t\t: %d\n", SDPSTATS_COUNTER_GET(tx_int_count));
seq_printf(seq, "ZCopy stats:\n");
sdp_handle_sendsm(ssk, ntohl(h->mseq_ack));
break;
case SDP_MID_SRCAVAIL_CANCEL:
- spin_lock_irq(&ssk->rx_ring.lock);
if (ssk->rx_sa && !ssk->rx_sa->is_treated &&
after(ntohl(h->mseq), ssk->rx_sa->mseq)) {
sdp_dbg(sk, "Handling SrcAvailCancel - post SendSM\n");
RX_SRCAVAIL_STATE(ssk->rx_sa->skb) = NULL;
kfree(ssk->rx_sa);
ssk->rx_sa = NULL;
- spin_unlock_irq(&ssk->rx_ring.lock);
sdp_post_sendsm(sk);
- break;
}
-
- spin_unlock_irq(&ssk->rx_ring.lock);
break;
case SDP_MID_SINKAVAIL:
case SDP_MID_ABORT:
sock_wake_async(sock, 2, POLL_OUT);
}
-static int sdp_poll_rx_cq(struct sdp_sock *ssk)
+int sdp_poll_rx_cq(struct sdp_sock *ssk)
{
struct ib_cq *cq = ssk->rx_ring.cq;
struct ib_wc ibwc[SDP_NUM_WC];
int n, i;
int wc_processed = 0;
struct sk_buff *skb;
- unsigned long flags;
- spin_lock_irqsave(&ssk->rx_ring.lock, flags);
do {
n = ib_poll_cq(cq, SDP_NUM_WC, ibwc);
for (i = 0; i < n; ++i) {
wc_processed++;
}
} while (n == SDP_NUM_WC);
- spin_unlock_irqrestore(&ssk->rx_ring.lock, flags);
- if (wc_processed)
+ if (wc_processed) {
+ sdp_prf(&ssk->isk.sk, NULL, "processed %d", wc_processed);
sdp_bzcopy_write_space(ssk);
+ }
return wc_processed;
}
lock_sock(sk);
+ posts_handler_get(ssk);
sdp_do_posts(ssk);
-
+ posts_handler_put(ssk, SDP_RX_ARMING_DELAY);
release_sock(sk);
}
}
if (likely(ssk->rx_ring.cq))
- sdp_process_rx(sdp_sk(sk));
+ sdp_poll_rx_cq(ssk);
while ((skb = skb_dequeue(&ssk->rx_ctl_q)))
sdp_process_rx_ctl_skb(ssk, skb);
}
+static inline int should_wake_up(struct sock *sk)
+{
+ return sk->sk_sleep && waitqueue_active(sk->sk_sleep) &&
+ (posts_handler(sdp_sk(sk)) || somebody_is_waiting(sk));
+}
+
static void sdp_rx_irq(struct ib_cq *cq, void *cq_context)
{
struct sock *sk = cq_context;
struct sdp_sock *ssk = sdp_sk(sk);
- if (cq != ssk->rx_ring.cq) {
- sdp_dbg(sk, "cq = %p, ssk->cq = %p\n", cq, ssk->rx_ring.cq);
+ if (unlikely(cq != ssk->rx_ring.cq)) {
+ sdp_warn(sk, "cq = %p, ssk->cq = %p\n", cq, ssk->rx_ring.cq);
return;
}
sdp_prf(sk, NULL, "rx irq");
- /* We could use rx_ring.timer instead, but mod_timer(..., 0)
- * measured to add 4ms delay.
- */
- tasklet_hi_schedule(&ssk->rx_ring.tasklet);
-}
-
-static inline int sdp_should_rearm(struct sock *sk)
-{
- return sk->sk_state != TCP_ESTABLISHED ||
- sdp_sk(sk)->tx_sa ||
- (sk->sk_socket && test_bit(SOCK_ASYNC_WAITDATA, &sk->sk_socket->flags));
-}
-
-int sdp_process_rx(struct sdp_sock *ssk)
-{
- struct sock *sk = &ssk->isk.sk;
- int wc_processed;
- int credits_before;
-
- if (!rx_ring_trylock(&ssk->rx_ring)) {
- sdp_dbg(&ssk->isk.sk, "ring destroyed. not polling it\n");
- return 0;
- }
-
- credits_before = tx_credits(ssk);
-
- wc_processed = sdp_poll_rx_cq(ssk);
-
- if (wc_processed) {
- sdp_prf(sk, NULL, "processed %d", wc_processed);
- sdp_prf(sk, NULL, "credits: %d -> %d",
- credits_before, tx_credits(ssk));
-
- if (posts_handler(ssk) || (sk->sk_socket &&
- test_bit(SOCK_ASYNC_WAITDATA, &sk->sk_socket->flags))) {
-
- sdp_prf(&ssk->isk.sk, NULL,
- "Somebody is doing the post work for me. %d",
- posts_handler(ssk));
-
- } else {
- sdp_prf(&ssk->isk.sk, NULL, "Queuing work. ctl_q: %d",
- !skb_queue_empty(&ssk->rx_ctl_q));
- queue_work(rx_comp_wq, &ssk->rx_comp_work);
- }
+ if (should_wake_up(sk)) {
+ wake_up_interruptible(sk->sk_sleep);
+ SDPSTATS_COUNTER_INC(rx_int_wake_up);
+ } else {
+ if (queue_work_on(ssk->cpu, rx_comp_wq, &ssk->rx_comp_work))
+ SDPSTATS_COUNTER_INC(rx_int_queue);
+ else
+ SDPSTATS_COUNTER_INC(rx_int_no_op);
}
-
- if (unlikely(sdp_should_rearm(sk) || !posts_handler(ssk)))
- sdp_arm_rx_cq(sk);
-
- rx_ring_unlock(&ssk->rx_ring);
-
- return wc_processed;
-}
-
-static void sdp_process_rx_timer(unsigned long data)
-{
- struct sdp_sock *ssk = (struct sdp_sock *)data;
- sdp_process_rx(ssk);
}
static void sdp_rx_ring_purge(struct sdp_sock *ssk)
}
}
-void sdp_rx_ring_init(struct sdp_sock *ssk)
+static void sdp_rx_cq_event_handler(struct ib_event *event, void *data)
{
- ssk->rx_ring.buffer = NULL;
- ssk->rx_ring.destroyed = 0;
- rwlock_init(&ssk->rx_ring.destroyed_lock);
}
-static void sdp_rx_cq_event_handler(struct ib_event *event, void *data)
+static void sdp_arm_cq_timer(unsigned long data)
{
+ struct sdp_sock *ssk = (struct sdp_sock *)data;
+
+ SDPSTATS_COUNTER_INC(rx_cq_arm_timer);
+ sdp_arm_rx_cq(&ssk->isk.sk);
}
int sdp_rx_ring_create(struct sdp_sock *ssk, struct ib_device *device)
sdp_sk(&ssk->isk.sk)->rx_ring.cq = rx_cq;
- spin_lock_init(&ssk->rx_ring.lock);
-
INIT_WORK(&ssk->rx_comp_work, sdp_rx_comp_work);
- ssk->rx_ring.timer.function = sdp_process_rx_timer;
- ssk->rx_ring.timer.data = (unsigned long) ssk;
- tasklet_init(&ssk->rx_ring.tasklet, sdp_process_rx_timer,
- (unsigned long) ssk);
+ setup_timer(&ssk->rx_ring.cq_arm_timer, sdp_arm_cq_timer,
+ (unsigned long)ssk);
sdp_arm_rx_cq(&ssk->isk.sk);
return 0;
void sdp_rx_ring_destroy(struct sdp_sock *ssk)
{
- rx_ring_destroy_lock(&ssk->rx_ring);
+ del_timer_sync(&ssk->rx_ring.cq_arm_timer);
if (ssk->rx_ring.buffer) {
sdp_rx_ring_purge(ssk);
}
}
- /* the tasklet should be killed only after the rx_cq is destroyed,
- * so there won't be rx_irq any more, meaning the tasklet will never be
- * enabled. */
- del_timer_sync(&ssk->rx_ring.timer);
- tasklet_kill(&ssk->rx_ring.tasklet);
-
SDP_WARN_ON(ring_head(ssk->rx_ring) != ring_tail(ssk->rx_ring));
}
if (sock_owned_by_user(sk)) {
sdp_prf(&ssk->isk.sk, NULL, "TX comp: socket is busy");
- if (sdp_tx_handler_select(ssk) && sk->sk_state != TCP_CLOSE) {
+ if (sdp_tx_handler_select(ssk) && sk->sk_state != TCP_CLOSE &&
+ likely(ssk->qp_active)) {
sdp_prf1(sk, NULL, "schedule a timer");
mod_timer(&ssk->tx_ring.timer, jiffies + SDP_TX_POLL_TIMEOUT);
}
goto out;
}
- if (unlikely(sk->sk_state == TCP_CLOSE))
+ if (unlikely(!ssk->qp || sk->sk_state == TCP_CLOSE)) {
+ SDPSTATS_COUNTER_INC(tx_poll_no_op);
goto out;
+ }
wc_processed = sdp_process_tx_cq(ssk);
if (!wc_processed)
/* If there are still packets in flight and the timer has not already
* been scheduled by the Tx routine then schedule it here to guarantee
* completion processing of these packets */
- if (inflight)
+ if (inflight && likely(ssk->qp_active))
mod_timer(&ssk->tx_ring.timer, jiffies + SDP_TX_POLL_TIMEOUT);
out:
sdp_sk(&ssk->isk.sk)->tx_ring.cq = tx_cq;
- ssk->tx_ring.timer.function = sdp_poll_tx_timeout;
- ssk->tx_ring.timer.data = (unsigned long) ssk;
+ setup_timer(&ssk->tx_ring.timer, sdp_poll_tx_timeout,
+ (unsigned long)ssk);
ssk->tx_ring.poll_cnt = 0;
tasklet_init(&ssk->tx_ring.tasklet, sdp_poll_tx_timeout,
void sdp_tx_ring_destroy(struct sdp_sock *ssk)
{
+ del_timer_sync(&ssk->tx_ring.timer);
if (ssk->nagle_timer.function)
del_timer_sync(&ssk->nagle_timer);
spin_lock_irqsave(&ssk->tx_sa_lock, flags);
- BUG_ON(!ssk);
-
if (!ssk->tx_sa) {
sdp_dbg_data(sk, "Got RdmaRdCompl for aborted SrcAvail\n");
goto out;
rc = sdp_alloc_fmr(sk, iov->iov_base, iov->iov_len,
&tx_sa->fmr, &tx_sa->umem);
- if (rc) {
+ if (unlikely(rc)) {
sdp_dbg_data(sk, "Error allocating fmr: %d\n", rc);
goto err_alloc_fmr;
}
if (tx_slots_free(ssk) == 0) {
rc = wait_for_sndbuf(sk, timeo);
- if (rc) {
+ if (unlikely(rc)) {
sdp_warn(sk, "Couldn't get send buffer\n");
goto err_no_tx_slots;
}
}
rc = sdp_post_srcavail(sk, tx_sa);
- if (rc) {
+ if (unlikely(rc)) {
sdp_dbg(sk, "Error posting SrcAvail\n");
goto err_abort_send;
}
- sdp_arm_rx_cq(sk);
-
rc = sdp_wait_rdmardcompl(ssk, timeo, 0);
if (unlikely(rc)) {
enum tx_sa_flag f = tx_sa->abort_flags;
struct sdp_sock *ssk = sdp_sk(sk);
struct rx_srcavail_state *rx_sa;
- spin_lock_irq(&ssk->rx_ring.lock);
rx_sa = ssk->rx_sa;
if (!rx_sa)
- goto out;
+ return;
sdp_free_fmr(sk, &rx_sa->fmr, &rx_sa->umem);
/* kfree(rx_sa) and posting SendSM will be handled in the nornal
* flows.
*/
-out:
- spin_unlock_irq(&ssk->rx_ring.lock);
}