No need to explicitly arm the cq, posts_handler_put will arm it.
#define SDP_TX_POLL_TIMEOUT (HZ / 20)
#define SDP_NAGLE_TIMEOUT (HZ / 10)
+#define SDP_RX_POLL_TIMEOUT (1 + HZ / 1000)
+
#define SDP_SRCAVAIL_CANCEL_TIMEOUT (HZ * 5)
#define SDP_SRCAVAIL_ADV_TIMEOUT (1 * HZ)
#define SDP_SRCAVAIL_PAYLOAD_LEN 1
(ssk->tx_ring.rdma_inflight ? ssk->tx_ring.rdma_inflight->busy : 0))
#define posts_handler(ssk) atomic_read(&ssk->somebody_is_doing_posts)
-#define posts_handler_get(ssk) atomic_inc(&ssk->somebody_is_doing_posts)
-#define posts_handler_put(ssk) do {\
- atomic_dec(&ssk->somebody_is_doing_posts); \
- sdp_do_posts(ssk); \
-} while (0)
+#define posts_handler_get(ssk) \
+ do { \
+ atomic_inc(&ssk->somebody_is_doing_posts); \
+ /* postpone the rx_ring.timer, there is no need to enable
+ * interrupts because there will be cq-polling. */ \
+ mod_timer(&ssk->rx_ring.timer, MAX_JIFFY_OFFSET); \
+ } while (0)
+
+#define posts_handler_put(ssk, intr_delay) \
+ do { \
+ sdp_do_posts(ssk); \
+ if (atomic_dec_and_test(&ssk->somebody_is_doing_posts)) { \
+ if (intr_delay) \
+ mod_timer(&ssk->rx_ring.timer, intr_delay); \
+ else \
+ /* There is no point of setting up a timer
+ * for an immediate cq-arming, better arm it
+ * now. */ \
+ sdp_arm_rx_cq(&ssk->isk.sk); \
+ } \
+ } while (0)
extern int sdp_zcopy_thresh;
extern struct workqueue_struct *sdp_wq;
rwlock_t destroyed_lock;
spinlock_t lock;
- struct tasklet_struct tasklet;
+ struct timer_list timer;
};
struct sdp_device {
ssk->tx_ring.rdma_inflight = NULL;
+ init_timer(&ssk->rx_ring.timer);
init_timer(&ssk->tx_ring.timer);
init_timer(&ssk->nagle_timer);
init_timer(&sk->sk_timer);
break;
}
- posts_handler_put(ssk);
-
- /* Before going to sleep, make sure no credit update is missed */
- sdp_arm_rx_cq(sk);
+ /* Before going to sleep, make sure no credit update is missed,
+ * rx_cq will be armed now. */
+ posts_handler_put(ssk, 0);
set_bit(SOCK_NOSPACE, &sk->sk_socket->flags);
sk->sk_write_pending++;
sdp_dbg_data(sk, "err: %d\n", err);
fin:
- posts_handler_put(ssk);
+ posts_handler_put(ssk, jiffies + SDP_RX_POLL_TIMEOUT);
if (!err && !ssk->qp_active) {
err = -EPIPE;
} else if (rc) {
sdp_dbg_data(sk, "sk_wait_data %ld\n", timeo);
sdp_prf(sk, NULL, "giving up polling");
- sdp_arm_rx_cq(sk);
- posts_handler_put(ssk);
+ posts_handler_put(ssk, 0);
/* socket lock is released inside sk_wait_data */
sk_wait_data(sk, &timeo);
err = copied;
out:
- posts_handler_put(ssk);
+ posts_handler_put(ssk, jiffies + SDP_RX_POLL_TIMEOUT);
sdp_auto_moderation(ssk);
sdp_prf(sk, NULL, "rx irq");
- tasklet_hi_schedule(&ssk->rx_ring.tasklet);
+ mod_timer(&ssk->rx_ring.timer, 0);
}
static inline int sdp_should_rearm(struct sock *sk)
}
}
- if (unlikely(sdp_should_rearm(sk)))
+ if (unlikely(sdp_should_rearm(sk) || !posts_handler(ssk)))
sdp_arm_rx_cq(sk);
rx_ring_unlock(&ssk->rx_ring);
return wc_processed;
}
-static void sdp_process_rx_tasklet(unsigned long data)
+static void sdp_process_rx_timer(unsigned long data)
{
struct sdp_sock *ssk = (struct sdp_sock *)data;
sdp_process_rx(ssk);
spin_lock_init(&ssk->rx_ring.lock);
INIT_WORK(&ssk->rx_comp_work, sdp_rx_comp_work);
- tasklet_init(&ssk->rx_ring.tasklet, sdp_process_rx_tasklet,
- (unsigned long) ssk);
+ ssk->rx_ring.timer.function = sdp_process_rx_timer;
+ ssk->rx_ring.timer.data = (unsigned long) ssk;
sdp_arm_rx_cq(&ssk->isk.sk);
}
}
- tasklet_kill(&ssk->rx_ring.tasklet);
- /* rx_cq is destroyed, so no more rx_irq, so no one will schedule this
- * tasklet. */
+ /* the timer should be deleted only after the rx_cq is destroyed,
+ * so there won't be rx_irq any more, meaning the timer will never be
+ * enabled. */
+ del_timer_sync(&ssk->rx_ring.timer);
SDP_WARN_ON(ring_head(ssk->rx_ring) != ring_tail(ssk->rx_ring));
}
}
}
- posts_handler_put(ssk);
+ posts_handler_put(ssk, 0);
sk_wait_event(sk, ¤t_timeo,
tx_sa->abort_flags &&
break;
}
- posts_handler_put(ssk);
+ posts_handler_put(ssk, 0);
sdp_prf1(sk, NULL, "Going to sleep");
sk_wait_event(sk, &timeo,