#include <rdma/rdma_cm.h>
#include "sdp.h"
+#define SDP_RESIZE_WAIT 16
+
+struct sdp_chrecvbuf {
+ u32 size;
+};
+
static int rcvbuf_scale = 0x10;
+
module_param_named(rcvbuf_scale, rcvbuf_scale, int, 0644);
MODULE_PARM_DESC(rcvbuf_scale, "Receive buffer size scale factor.");
module_param_named(top_mem_usage, top_mem_usage, int, 0644);
MODULE_PARM_DESC(top_mem_usage, "Top system wide sdp memory usage for recv (in MB).");
-atomic_t current_mem_usage;
+#ifdef CONFIG_PPC
+static int max_large_sockets = 100;
+#else
+static int max_large_sockets = 1000;
+#endif
+module_param_named(max_large_sockets, max_large_sockets, int, 0644);
+MODULE_PARM_DESC(max_large_sockets, "Max number of large sockets (32k buffers).");
+
+static int curr_large_sockets = 0;
+atomic_t sdp_current_mem_usage;
+spinlock_t sdp_large_sockets_lock;
+
+static int sdp_can_resize(void)
+{
+ int count, ret;
+ spin_lock_irq(&sdp_large_sockets_lock);
+ count = curr_large_sockets;
+ ret = curr_large_sockets < max_large_sockets;
+ if (ret)
+ curr_large_sockets++;
+ spin_unlock_irq(&sdp_large_sockets_lock);
+
+ return ret;
+}
+
+void sdp_remove_large_sock(void)
+{
+ spin_lock_irq(&sdp_large_sockets_lock);
+ curr_large_sockets--;
+ spin_unlock_irq(&sdp_large_sockets_lock);
+}
/* Like tcp_fin */
static void sdp_fin(struct sock *sk)
h->flags = SDP_OOB_PRES | SDP_OOB_PEND;
else
h->flags = 0;
+
h->bufs = htons(ssk->rx_head - ssk->rx_tail);
h->len = htonl(skb->len);
h->mseq = htonl(mseq);
/* FIXME */
BUG_ON(!skb);
h = (struct sdp_bsdh *)skb->head;
- for (i = 0; i < SDP_MAX_SEND_SKB_FRAGS; ++i) {
+ for (i = 0; i < ssk->recv_frags; ++i) {
page = alloc_pages(GFP_HIGHUSER, 0);
BUG_ON(!page);
frag = &skb_shinfo(skb)->frags[i];
sdp_reset(&ssk->isk.sk);
}
- atomic_add(SDP_MAX_SEND_SKB_FRAGS, ¤t_mem_usage);
+ atomic_add(SDP_MAX_SEND_SKB_FRAGS, &sdp_current_mem_usage);
}
void sdp_post_recvs(struct sdp_sock *ssk)
{
- int scale = rcvbuf_scale;
+ int scale = ssk->rcvbuf_scale;
if (unlikely(!ssk->id))
return;
if (top_mem_usage &&
- (top_mem_usage * 0x100000) < atomic_read(¤t_mem_usage) * PAGE_SIZE)
+ (top_mem_usage * 0x100000) < atomic_read(&sdp_current_mem_usage) * PAGE_SIZE)
scale = 1;
while ((likely(ssk->rx_head - ssk->rx_tail < SDP_RX_SIZE) &&
(ssk->rx_head - ssk->rx_tail - SDP_MIN_BUFS) *
- (SDP_HEAD_SIZE + SDP_MAX_SEND_SKB_FRAGS * PAGE_SIZE) +
+ (SDP_HEAD_SIZE + ssk->recv_frags * PAGE_SIZE) +
ssk->rcv_nxt - ssk->copied_seq <
ssk->isk.sk.sk_rcvbuf * scale) ||
unlikely(ssk->rx_head - ssk->rx_tail < SDP_MIN_BUFS))
return;
}
+ if (ssk->recv_request &&
+ ssk->rx_tail >= ssk->recv_request_head &&
+ ssk->bufs >= SDP_MIN_BUFS &&
+ ssk->tx_head - ssk->tx_tail < SDP_TX_SIZE) {
+ struct sdp_chrecvbuf *resp_size;
+ ssk->recv_request = 0;
+ skb = sk_stream_alloc_skb(&ssk->isk.sk,
+ sizeof(struct sdp_bsdh) +
+ sizeof(*resp_size),
+ GFP_KERNEL);
+ /* FIXME */
+ BUG_ON(!skb);
+ resp_size = (struct sdp_chrecvbuf *)skb_push(skb, sizeof *resp_size);
+ resp_size->size = htons(ssk->recv_frags * PAGE_SIZE);
+ sdp_post_send(ssk, skb, SDP_MID_CHRCVBUF_ACK);
+ }
+
while (ssk->bufs > SDP_MIN_BUFS &&
ssk->tx_head - ssk->tx_tail < SDP_TX_SIZE &&
(skb = ssk->isk.sk.sk_send_head) &&
__skb_dequeue(&ssk->isk.sk.sk_write_queue);
sdp_post_send(ssk, skb, SDP_MID_DATA);
}
+
+ if (ssk->bufs == SDP_MIN_BUFS &&
+ !ssk->sent_request &&
+ ssk->tx_head > ssk->sent_request_head + SDP_RESIZE_WAIT &&
+ ssk->tx_head - ssk->tx_tail < SDP_TX_SIZE) {
+ struct sdp_chrecvbuf *req_size;
+ skb = sk_stream_alloc_skb(&ssk->isk.sk,
+ sizeof(struct sdp_bsdh) +
+ sizeof(*req_size),
+ GFP_KERNEL);
+ /* FIXME */
+ BUG_ON(!skb);
+ ssk->sent_request = SDP_MAX_SEND_SKB_FRAGS * PAGE_SIZE;
+ ssk->sent_request_head = ssk->tx_head;
+ req_size = (struct sdp_chrecvbuf *)skb_push(skb, sizeof *req_size);
+ req_size->size = htons(ssk->sent_request);
+ sdp_post_send(ssk, skb, SDP_MID_CHRCVBUF);
+ }
+
c = ssk->remote_credits;
if (likely(c > SDP_MIN_BUFS))
c *= 2;
}
}
+static inline void sdp_resize(struct sdp_sock *ssk, u32 new_size)
+{
+ ssk->recv_frags = PAGE_ALIGN(new_size - SDP_HEAD_SIZE) / PAGE_SIZE;
+ if (ssk->recv_frags > SDP_MAX_SEND_SKB_FRAGS)
+ ssk->recv_frags = SDP_MAX_SEND_SKB_FRAGS;
+}
+
static void sdp_handle_wc(struct sdp_sock *ssk, struct ib_wc *wc)
{
struct sk_buff *skb;
if (unlikely(!skb))
return;
- atomic_sub(SDP_MAX_SEND_SKB_FRAGS, ¤t_mem_usage);
+ atomic_sub(SDP_MAX_SEND_SKB_FRAGS, &sdp_current_mem_usage);
if (unlikely(wc->status)) {
if (wc->status != IB_WC_WR_FLUSH_ERR) {
}
__kfree_skb(skb);
} else {
+ int frags;
+
sdp_dbg_data(&ssk->isk.sk,
"Recv completion. ID %d Length %d\n",
(int)wc->wr_id, wc->byte_len);
ssk->bufs = ntohl(h->mseq_ack) - ssk->tx_head + 1 +
ntohs(h->bufs);
+ frags = skb_shinfo(skb)->nr_frags;
pagesz = PAGE_ALIGN(skb->data_len);
skb_shinfo(skb)->nr_frags = pagesz / PAGE_SIZE;
for (i = skb_shinfo(skb)->nr_frags;
- i < SDP_MAX_SEND_SKB_FRAGS; ++i) {
+ i < frags; ++i) {
put_page(skb_shinfo(skb)->frags[i].page);
skb->truesize -= PAGE_SIZE;
}
/* this will wake recvmsg */
sdp_sock_queue_rcv_skb(&ssk->isk.sk, skb);
sdp_fin(&ssk->isk.sk);
+ } else if (h->mid == SDP_MID_CHRCVBUF) {
+ u32 new_size = *(u32 *)skb->data;
+
+ if (ssk->recv_request || sdp_can_resize()) {
+ ssk->rcvbuf_scale = rcvbuf_scale;
+ sdp_resize(ssk, ntohs(new_size));
+ ssk->recv_request_head = ssk->rx_head + 1;
+ } else
+ ssk->recv_request_head = ssk->rx_tail;
+ ssk->recv_request = 1;
+ __kfree_skb(skb);
+ } else if (h->mid == SDP_MID_CHRCVBUF_ACK) {
+ u32 new_size = *(u32 *)skb->data;
+ new_size = ntohs(new_size);
+
+ if (new_size > ssk->xmit_size_goal) {
+ ssk->sent_request = -1;
+ ssk->xmit_size_goal = new_size;
+ ssk->send_frags =
+ PAGE_ALIGN(ssk->xmit_size_goal) /
+ PAGE_SIZE;
+ } else
+ ssk->sent_request = 0;
+ __kfree_skb(skb);
} else {
/* TODO: Handle other messages */
printk("SDP: FIXME MID %d\n", h->mid);
init_waitqueue_head(&sdp_sk(sk)->wq);
+ sdp_sk(sk)->recv_frags = 0;
+ sdp_sk(sk)->rcvbuf_scale = 1;
sdp_post_recvs(sdp_sk(sk));
sdp_dbg(sk, "%s done\n", __func__);
sdp_sk(child)->bufs = ntohs(h->bsdh.bufs);
sdp_sk(child)->xmit_size_goal = ntohl(h->localrcvsz) -
sizeof(struct sdp_bsdh);
+ sdp_sk(child)->send_frags = PAGE_ALIGN(sdp_sk(child)->xmit_size_goal) /
+ PAGE_SIZE;
sdp_dbg(child, "%s bufs %d xmit_size_goal %d\n", __func__,
sdp_sk(child)->bufs,
sdp_sk(sk)->bufs = ntohs(h->bsdh.bufs);
sdp_sk(sk)->xmit_size_goal = ntohl(h->actrcvsz) -
sizeof(struct sdp_bsdh);
+ sdp_sk(sk)->send_frags = PAGE_ALIGN(sdp_sk(sk)->xmit_size_goal) /
+ PAGE_SIZE;
sdp_dbg(sk, "%s bufs %d xmit_size_goal %d\n", __func__,
sdp_sk(sk)->bufs,
hh.bsdh.len = htonl(sizeof(struct sdp_bsdh) + SDP_HH_SIZE);
hh.max_adverts = 1;
hh.majv_minv = SDP_MAJV_MINV;
- hh.localrcvsz = hh.desremrcvsz = htonl(SDP_MAX_SEND_SKB_FRAGS *
+ hh.localrcvsz = hh.desremrcvsz = htonl(sdp_sk(sk)->recv_frags *
PAGE_SIZE + SDP_HEAD_SIZE);
hh.max_adverts = 0x1;
inet_sk(sk)->saddr = inet_sk(sk)->rcv_saddr =
hah.majv_minv = SDP_MAJV_MINV;
hah.ext_max_adverts = 1; /* Doesn't seem to be mandated by spec,
but just in case */
- hah.actrcvsz = htonl(SDP_MAX_SEND_SKB_FRAGS * PAGE_SIZE + SDP_HEAD_SIZE);
+ hah.actrcvsz = htonl(sdp_sk(child)->recv_frags * PAGE_SIZE + SDP_HEAD_SIZE);
memset(&conn_param, 0, sizeof conn_param);
conn_param.private_data_len = sizeof hah;
conn_param.private_data = &hah;
static struct list_head sock_list;
static spinlock_t sock_list_lock;
-extern atomic_t current_mem_usage;
-
DEFINE_RWLOCK(device_removal_lock);
inline void sdp_add_sock(struct sdp_sock *ssk)
skb = sdp_recv_completion(ssk, ssk->rx_tail);
if (!skb)
break;
- atomic_sub(SDP_MAX_SEND_SKB_FRAGS, ¤t_mem_usage);
+ atomic_sub(SDP_MAX_SEND_SKB_FRAGS, &sdp_current_mem_usage);
__kfree_skb(skb);
}
while (ssk->tx_head != ssk->tx_tail) {
if (pd)
ib_dealloc_pd(pd);
+ if (ssk->recv_frags)
+ sdp_remove_large_sock();
+
kfree(ssk->rx_ring);
kfree(ssk->tx_ring);
}
/* We can extend the last page
* fragment. */
merge = 1;
- } else if (i == SDP_MAX_SEND_SKB_FRAGS ||
+ } else if (i == ssk->send_frags ||
(!i &&
!(sk->sk_route_caps & NETIF_F_SG))) {
/* Need to add new fragment and cannot
sdp_seq_afinfo.seq_fops->llseek = seq_lseek;
sdp_seq_afinfo.seq_fops->release = seq_release_private;
+ p = proc_net_fops_create(sdp_seq_afinfo.name, S_IRUGO, sdp_seq_afinfo.seq_fops);
+ if (p)
+ p->data = &sdp_seq_afinfo;
p = proc_net_fops_create(sdp_seq_afinfo.name, S_IRUGO, sdp_seq_afinfo.seq_fops);
if (p)
p->data = &sdp_seq_afinfo;
INIT_LIST_HEAD(&sock_list);
spin_lock_init(&sock_list_lock);
+ spin_lock_init(&sdp_large_sockets_lock);
sdp_workqueue = create_singlethread_workqueue("sdp");
if (!sdp_workqueue) {
sdp_proc_init();
- atomic_set(¤t_mem_usage, 0);
+ atomic_set(&sdp_current_mem_usage, 0);
ib_register_client(&sdp_client);
BUG_ON(!list_empty(&sock_list));
- if (atomic_read(¤t_mem_usage))
+ if (atomic_read(&sdp_current_mem_usage))
printk(KERN_WARNING "%s: current mem usage %d\n", __func__,
- atomic_read(¤t_mem_usage));
+ atomic_read(&sdp_current_mem_usage));
sdp_proc_unregister();