#undef CONFIG_INFINIBAND_SDP_DEBUG
#define SDPSTATS_ON
-#define SDP_PROFILING
-#define CONFIG_INFINIBAND_SDP_DEBUG_DATA
-#define CONFIG_INFINIBAND_SDP_DEBUG
+//#define SDP_PROFILING
+//#define CONFIG_INFINIBAND_SDP_DEBUG_DATA
+//#define CONFIG_INFINIBAND_SDP_DEBUG
#define _sdp_printk(func, line, level, sk, format, arg...) \
printk(level "%s:%d sdp_sock(%5d:%d %d:%d): " format, \
u32 memcpy_count;
u32 credits_before_update[64];
u32 send_interval[25];
+
+ u32 bz_clean_sum;
+ u32 bz_setup_sum;
+ u32 tx_copy_sum;
+ u32 sendmsg_sum;
};
extern struct sdpstats sdpstats;
#define SDP_TX_SIZE 0x40
#define SDP_RX_SIZE 0x40
-#define SDP_MAX_SEND_SKB_FRAGS (PAGE_SIZE > 0x8000 ? 1 : 0x8000 / PAGE_SIZE)
+#define SDP_MAX_RECV_SKB_FRAGS (PAGE_SIZE > 0x8000 ? 1 : 0x8000 / PAGE_SIZE)
+#define SDP_MAX_SEND_SKB_FRAGS (SDP_MAX_RECV_SKB_FRAGS + 1)
#define SDP_HEAD_SIZE (PAGE_SIZE / 2 + sizeof(struct sdp_bsdh))
#define SDP_NUM_WC 4
#define SDP_MAX_PAYLOAD ((1 << 16) - SDP_HEAD_SIZE)
/* sdp_tx.c */
int sdp_tx_ring_create(struct sdp_sock *ssk, struct ib_device *device);
void sdp_tx_ring_destroy(struct sdp_sock *ssk);
-int sdp_xmit_poll(struct sdp_sock *ssk, int force);
+int _sdp_xmit_poll(const char *func, int line, struct sdp_sock *ssk, int force);
+#define sdp_xmit_poll(ssk, force) _sdp_xmit_poll(__func__, __LINE__, ssk, force)
void sdp_post_send(struct sdp_sock *ssk, struct sk_buff *skb, u8 mid);
void _sdp_post_sends(const char *func, int line, struct sdp_sock *ssk, int nonagle);
#define sdp_post_sends(ssk, nonagle) _sdp_post_sends(__func__, __LINE__, ssk, nonagle)
ib_req_notify_cq(sdp_sk(sk)->rx_ring.cq, IB_CQ_NEXT_COMP);
}
+static inline void sdp_arm_tx_cq(struct sock *sk)
+{
+ sdp_prf(sk, NULL, "Arming TX cq");
+ sdp_dbg_data(sk, "Arming TX cq. credits: %d, posted: %d\n",
+ tx_credits(sdp_sk(sk)), ring_posted(sdp_sk(sk)->tx_ring));
+
+ ib_req_notify_cq(sdp_sk(sk)->tx_ring.cq, IB_CQ_NEXT_COMP);
+}
+
/* utilities */
static inline char *mid2str(int mid)
{
gfp_page);
/* FIXME */
BUG_ON(!skb);
- ssk->sent_request = SDP_MAX_SEND_SKB_FRAGS * PAGE_SIZE;
+ ssk->sent_request = (SDP_MAX_SEND_SKB_FRAGS-1) * PAGE_SIZE;
ssk->sent_request_head = ring_head(ssk->tx_ring);
req_size = (struct sdp_chrecvbuf *)skb_put(skb, sizeof *req_size);
req_size->size = htonl(ssk->sent_request);
struct ib_qp_init_attr qp_init_attr = {
.event_handler = sdp_qp_event_handler,
.cap.max_send_wr = SDP_TX_SIZE,
- .cap.max_send_sge = SDP_MAX_SEND_SKB_FRAGS + 1, /* TODO */
+ .cap.max_send_sge = SDP_MAX_SEND_SKB_FRAGS,
.cap.max_recv_wr = SDP_RX_SIZE,
- .cap.max_recv_sge = SDP_MAX_SEND_SKB_FRAGS + 1, /* TODO */
+ .cap.max_recv_sge = SDP_MAX_RECV_SKB_FRAGS + 1,
.sq_sig_type = IB_SIGNAL_REQ_WR,
.qp_type = IB_QPT_RC,
};
sdp_sk(child)->xmit_size_goal = ntohl(h->localrcvsz) -
sizeof(struct sdp_bsdh);
sdp_sk(child)->send_frags = PAGE_ALIGN(sdp_sk(child)->xmit_size_goal) /
- PAGE_SIZE;
+ PAGE_SIZE + 1; /* The +1 is to conpensate on not aligned buffers */
sdp_init_buffers(sdp_sk(child), rcvbuf_initial_size);
sdp_dbg(child, "%s recv_frags: %d tx credits %d xmit_size_goal %d send trigger %d\n",
sdp_sk(sk)->max_bufs = ntohs(h->bsdh.bufs);
atomic_set(&sdp_sk(sk)->tx_ring.credits, sdp_sk(sk)->max_bufs);
sdp_sk(sk)->min_bufs = tx_credits(sdp_sk(sk)) / 4;
- sdp_sk(sk)->xmit_size_goal = ntohl(h->actrcvsz) -
- sizeof(struct sdp_bsdh);
+ sdp_sk(sk)->xmit_size_goal = ntohl(h->actrcvsz) - SDP_HEAD_SIZE;
sdp_sk(sk)->send_frags = MIN(PAGE_ALIGN(sdp_sk(sk)->xmit_size_goal) /
- PAGE_SIZE, SDP_MAX_SEND_SKB_FRAGS);
+ PAGE_SIZE, MAX_SKB_FRAGS) + 1; /* The +1 is to conpensate on not aligned buffers */
sdp_sk(sk)->xmit_size_goal = MIN(sdp_sk(sk)->xmit_size_goal,
sdp_sk(sk)->send_frags * PAGE_SIZE);
return copy;
}
-static inline int slots_free(struct sdp_sock *ssk)
+/* return the min of:
+ * - tx credits
+ * - free slots in tx_ring (not including SDP_MIN_TX_CREDITS
+ */
+static inline int tx_slots_free(struct sdp_sock *ssk)
{
int min_free;
static inline int sdp_bzcopy_slots_avail(struct sdp_sock *ssk,
struct bzcopy_state *bz)
{
- return slots_free(ssk) > bz->busy;
+ return tx_slots_free(ssk) > bz->busy;
}
/* like sk_stream_wait_memory - except waits on remote credits */
sdp_prf1(sk, NULL, "credits: %d, head: %d, tail: %d, busy: %d",
tx_credits(ssk), ring_head(ssk->tx_ring), ring_tail(ssk->tx_ring),
bz->busy);
+
+ if (tx_credits(ssk) > SDP_MIN_TX_CREDITS) {
+ sdp_arm_tx_cq(sk);
+ }
+
sk_wait_event(sk, ¤t_timeo,
sdp_bzcopy_slots_avail(ssk, bz) && vm_wait);
sk->sk_write_pending--;
return err;
}
+//#undef rdtscll
+//#define rdtscll(x) ({ x = current_nsec(); })
+
/* Like tcp_sendmsg */
/* TODO: check locking */
static int sdp_sendmsg(struct kiocb *iocb, struct sock *sk, struct msghdr *msg,
int err, copied;
long timeo;
struct bzcopy_state *bz = NULL;
+ unsigned long long a, b, c;
+ unsigned long long start, end;
SDPSTATS_COUNTER_INC(sendmsg);
lock_sock(sk);
sdp_dbg_data(sk, "%s\n", __func__);
+ rdtscll(start);
+
posts_handler_get(ssk);
flags = msg->msg_flags;
SDPSTATS_HIST(sendmsg_seglen, seglen);
+ rdtscll(a);
if (bz)
sdp_bz_cleanup(bz);
+ rdtscll(b);
bz = sdp_bz_setup(ssk, from, seglen, size_goal);
+ rdtscll(c);
+ SDPSTATS_COUNTER_ADD(bz_clean_sum, b - a);
+ SDPSTATS_COUNTER_ADD(bz_setup_sum, c - b);
if (IS_ERR(bz)) {
bz = NULL;
err = PTR_ERR(bz);
sdp_dbg_data(sk, "adding to existing skb: %p"
" len = %d, sk_send_head: %p copy: %d\n",
skb, skb->len, sk->sk_send_head, copy);
-
}
/* Try to append data to the end of skb. */
goto new_segment;
}
+ rdtscll(a);
copy = (bz) ? sdp_bzcopy_get(sk, skb, from, copy, bz) :
sdp_bcopy_get(sk, skb, from, copy);
+ rdtscll(b);
+ if (copy > 0)
+ SDPSTATS_COUNTER_ADD(tx_copy_sum, b - a);
if (unlikely(copy < 0)) {
if (!++copy)
goto wait_for_memory;
err = sdp_bzcopy_wait_memory(ssk, &timeo, bz);
} else {
posts_handler_put(ssk);
+ sdp_prf1(sk, NULL, "wait for mem. credits: %d, head: %d, tail: %d",
+ tx_credits(ssk), ring_head(ssk->tx_ring),
+ ring_tail(ssk->tx_ring));
+
+ sdp_arm_tx_cq(sk);
err = sk_stream_wait_memory(sk, &timeo);
if (bz)
bz = sdp_bz_cleanup(bz);
- else
- if (size > send_poll_thresh)
- poll_send_cq(sk);
}
posts_handler_put(ssk);
+ rdtscll(end);
+ SDPSTATS_COUNTER_ADD(sendmsg_sum, end - start);
release_sock(sk);
return copied;
offset = 0;
if (!(flags & MSG_PEEK)) {
+ struct sdp_bsdh *h;
+ h = (struct sdp_bsdh *)skb_transport_header(skb);
+ sdp_prf1(sk, skb, "READ finished. mseq: %d mseq_ack:%d",
+ ntohl(h->mseq), ntohl(h->mseq_ack));
skb_unlink(skb, &sk->sk_receive_queue);
__kfree_skb(skb);
}
/*
* Adjust for memory in later kernels
*/
- if (!sk_stream_memory_free(sk) || !slots_free(ssk))
+ if (!sk_stream_memory_free(sk) || !tx_slots_free(ssk))
mask &= ~(POLLOUT | POLLWRNORM | POLLWRBAND);
/* TODO: Slightly ugly: it would be nicer if there was function
seq_printf(seq, "CQ stats:\n");
seq_printf(seq, "- RX interrupts\t\t: %d\n", sdpstats.rx_int_count);
seq_printf(seq, "- TX interrupts\t\t: %d\n", sdpstats.tx_int_count);
+
+ seq_printf(seq, "bz_clean \t\t: %d\n", sdpstats.bz_clean_sum / sdpstats.sendmsg);
+ seq_printf(seq, "bz_setup \t\t: %d\n", sdpstats.bz_setup_sum / sdpstats.sendmsg);
+ seq_printf(seq, "tx_copy \t\t: %d\n", sdpstats.tx_copy_sum / sdpstats.sendmsg);
+ seq_printf(seq, "sendmsg \t\t: %d\n", sdpstats.sendmsg_sum / sdpstats.sendmsg);
return 0;
}
u64 addr;
struct ib_device *dev;
struct ib_recv_wr rx_wr = { 0 };
- struct ib_sge ibsge[SDP_MAX_SEND_SKB_FRAGS + 1];
+ struct ib_sge ibsge[SDP_MAX_RECV_SKB_FRAGS + 1];
struct ib_sge *sge = ibsge;
struct ib_recv_wr *bad_wr;
struct sk_buff *skb;
}
SDPSTATS_COUNTER_INC(post_recv);
- atomic_add(SDP_MAX_SEND_SKB_FRAGS, &sdp_current_mem_usage);
+ atomic_add(ssk->recv_frags, &sdp_current_mem_usage);
return ret;
}
int sdp_init_buffers(struct sdp_sock *ssk, u32 new_size)
{
ssk->recv_frags = PAGE_ALIGN(new_size - SDP_HEAD_SIZE) / PAGE_SIZE;
- if (ssk->recv_frags > SDP_MAX_SEND_SKB_FRAGS)
- ssk->recv_frags = SDP_MAX_SEND_SKB_FRAGS;
+ if (ssk->recv_frags > SDP_MAX_RECV_SKB_FRAGS)
+ ssk->recv_frags = SDP_MAX_RECV_SKB_FRAGS;
ssk->rcvbuf_scale = rcvbuf_scale;
sdp_post_recvs(ssk);
/* for huge PAGE_SIZE systems, aka IA64, limit buffers size
[re-]negotiation to a known+working size that will not
trigger a HW error/rc to be interpreted as a IB_WC_LOC_LEN_ERR */
- u32 max_size = (SDP_HEAD_SIZE + SDP_MAX_SEND_SKB_FRAGS * PAGE_SIZE) <=
+ u32 max_size = (SDP_HEAD_SIZE + SDP_MAX_RECV_SKB_FRAGS * PAGE_SIZE) <=
32784 ?
- (SDP_HEAD_SIZE + SDP_MAX_SEND_SKB_FRAGS * PAGE_SIZE): 32784;
+ (SDP_HEAD_SIZE + SDP_MAX_RECV_SKB_FRAGS * PAGE_SIZE): 32784;
#else
- u32 max_size = SDP_HEAD_SIZE + SDP_MAX_SEND_SKB_FRAGS * PAGE_SIZE;
+ u32 max_size = SDP_HEAD_SIZE + SDP_MAX_RECV_SKB_FRAGS * PAGE_SIZE;
#endif
if (new_size > curr_size && new_size <= max_size &&
sdp_get_large_socket(ssk)) {
ssk->rcvbuf_scale = rcvbuf_scale;
ssk->recv_frags = PAGE_ALIGN(new_size - SDP_HEAD_SIZE) / PAGE_SIZE;
- if (ssk->recv_frags > SDP_MAX_SEND_SKB_FRAGS)
- ssk->recv_frags = SDP_MAX_SEND_SKB_FRAGS;
+ if (ssk->recv_frags > SDP_MAX_RECV_SKB_FRAGS)
+ ssk->recv_frags = SDP_MAX_RECV_SKB_FRAGS;
return 0;
} else
return -1;
ssk->sent_request = -1;
ssk->xmit_size_goal = new_size;
ssk->send_frags =
- PAGE_ALIGN(ssk->xmit_size_goal) / PAGE_SIZE;
+ PAGE_ALIGN(ssk->xmit_size_goal) / PAGE_SIZE + 1;
} else
ssk->sent_request = 0;
}
if (unlikely(!skb))
return NULL;
- atomic_sub(SDP_MAX_SEND_SKB_FRAGS, &sdp_current_mem_usage);
+ atomic_sub(skb_shinfo(skb)->nr_frags, &sdp_current_mem_usage);
if (unlikely(wc->status)) {
if (wc->status != IB_WC_WR_FLUSH_ERR) {
skb = sdp_recv_completion(ssk, ring_tail(ssk->rx_ring));
if (!skb)
break;
- atomic_sub(SDP_MAX_SEND_SKB_FRAGS, &sdp_current_mem_usage);
+ atomic_sub(skb_shinfo(skb)->nr_frags, &sdp_current_mem_usage);
__kfree_skb(skb);
}
}
static int sdp_process_tx_cq(struct sdp_sock *ssk);
-int sdp_xmit_poll(struct sdp_sock *ssk, int force)
+int _sdp_xmit_poll(const char *func, int line, struct sdp_sock *ssk, int force)
{
int wc_processed = 0;
+ sdp_prf(&ssk->isk.sk, NULL, "called from %s:%d", func, line);
+
/* If we don't have a pending timer, set one up to catch our recent
post in case the interface becomes idle */
if (!timer_pending(&ssk->tx_ring.timer))
struct sock *sk = cq_context;
struct sdp_sock *ssk = sdp_sk(sk);
- sdp_warn(sk, "Got tx comp interrupt\n");
+ sdp_prf1(sk, NULL, "Got tx comp interrupt");
- mod_timer(&ssk->tx_ring.timer, jiffies + 1);
+ mod_timer(&ssk->tx_ring.timer, jiffies);
}
void sdp_tx_ring_purge(struct sdp_sock *ssk)