--- /dev/null
+/*
+ * Copyright (c) 2006 Mellanox Technologies Ltd. All rights reserved.
+ *
+ * This software is available to you under a choice of one of two
+ * licenses. You may choose to be licensed under the terms of the GNU
+ * General Public License (GPL) Version 2, available from the file
+ * COPYING in the main directory of this source tree, or the
+ * OpenIB.org BSD license below:
+ *
+ * Redistribution and use in source and binary forms, with or
+ * without modification, are permitted provided that the following
+ * conditions are met:
+ *
+ * - Redistributions of source code must retain the above
+ * copyright notice, this list of conditions and the following
+ * disclaimer.
+ *
+ * - Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following
+ * disclaimer in the documentation and/or other materials
+ * provided with the distribution.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
+ * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
+ * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
+ * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
+ * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
+ * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
+ * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ *
+ * $Id$
+ */
+#include <linux/interrupt.h>
+#include <linux/dma-mapping.h>
+#include <rdma/ib_verbs.h>
+#include <rdma/rdma_cm.h>
+#include "sdp.h"
+
+/* Like tcp_fin */
+static void sdp_fin(struct sock *sk)
+{
+ sdp_dbg(sk, "%s\n", __func__);
+
+ sk->sk_shutdown |= RCV_SHUTDOWN;
+ sock_set_flag(sk, SOCK_DONE);
+
+
+ sk_stream_mem_reclaim(sk);
+
+ if (!sock_flag(sk, SOCK_DEAD)) {
+ sk->sk_state_change(sk);
+
+ /* Do not send POLL_HUP for half duplex close. */
+ if (sk->sk_shutdown == SHUTDOWN_MASK ||
+ sk->sk_state == TCP_CLOSE)
+ sk_wake_async(sk, 1, POLL_HUP);
+ else
+ sk_wake_async(sk, 1, POLL_IN);
+ }
+}
+
+void sdp_post_send(struct sdp_sock *ssk, struct sk_buff *skb, u8 mid)
+{
+ struct sdp_buf *tx_req;
+ struct sdp_bsdh *h = (struct sdp_bsdh *)skb_push(skb, sizeof *h);
+ unsigned mseq = ssk->tx_head;
+ int i, rc, frags;
+ dma_addr_t addr;
+ struct device *hwdev;
+ struct ib_sge *sge;
+ struct ib_send_wr *bad_wr;
+
+ h->mid = mid;
+ h->flags = 0; /* TODO: OOB */
+ h->bufs = htons(ssk->rx_head - ssk->rx_tail);
+ h->len = htonl(skb->len);
+ h->mseq = htonl(mseq);
+ h->mseq_ack = htonl(ssk->mseq_ack);
+
+ tx_req = &ssk->tx_ring[mseq & (SDP_TX_SIZE - 1)];
+ tx_req->skb = skb;
+ hwdev = ssk->dma_device;
+ sge = ssk->ibsge;
+ addr = dma_map_single(hwdev,
+ skb->data, skb->len - skb->data_len,
+ DMA_TO_DEVICE);
+ tx_req->mapping[0] = addr;
+
+ /* TODO: proper error handling */
+ BUG_ON(dma_mapping_error(addr));
+
+ sge->addr = (u64)addr;
+ sge->length = skb->len - skb->data_len;
+ sge->lkey = ssk->mr->lkey;
+ frags = skb_shinfo(skb)->nr_frags;
+ for (i = 0; i < frags; ++i) {
+ ++sge;
+ addr = dma_map_page(hwdev, skb_shinfo(skb)->frags[i].page,
+ skb_shinfo(skb)->frags[i].page_offset,
+ skb_shinfo(skb)->frags[i].size,
+ DMA_TO_DEVICE);
+ BUG_ON(dma_mapping_error(addr));
+ tx_req->mapping[i + 1] = addr;
+ sge->addr = addr;
+ sge->length = skb_shinfo(skb)->frags[i].size;
+ sge->lkey = ssk->mr->lkey;
+ }
+
+ ssk->tx_wr.next = NULL;
+ ssk->tx_wr.wr_id = ssk->tx_head;
+ ssk->tx_wr.sg_list = ssk->ibsge;
+ ssk->tx_wr.num_sge = frags + 1;
+ ssk->tx_wr.opcode = IB_WR_SEND;
+ ssk->tx_wr.send_flags = IB_SEND_SIGNALED;
+ if (unlikely(mid != SDP_MID_DATA))
+ ssk->tx_wr.send_flags |= IB_SEND_SOLICITED;
+ rc = ib_post_send(ssk->qp, &ssk->tx_wr, &bad_wr);
+ BUG_ON(rc);
+ ++ssk->tx_head;
+ --ssk->bufs;
+ ssk->remote_credits = ssk->rx_head - ssk->rx_tail;
+}
+
+struct sk_buff *sdp_send_completion(struct sdp_sock *ssk, int mseq)
+{
+ struct device *hwdev;
+ struct sdp_buf *tx_req;
+ struct sk_buff *skb;
+ int i, frags;
+
+ if (unlikely(mseq != ssk->tx_tail)) {
+ printk(KERN_WARNING "Bogus send completion id %d tail %d\n",
+ mseq, ssk->tx_tail);
+ return NULL;
+ }
+
+ hwdev = ssk->dma_device;
+ tx_req = &ssk->tx_ring[mseq & (SDP_TX_SIZE - 1)];
+ skb = tx_req->skb;
+ dma_unmap_single(hwdev, tx_req->mapping[0], skb->len - skb->data_len,
+ DMA_TO_DEVICE);
+ frags = skb_shinfo(skb)->nr_frags;
+ for (i = 0; i < frags; ++i) {
+ dma_unmap_page(hwdev, tx_req->mapping[i + 1],
+ skb_shinfo(skb)->frags[i].size,
+ DMA_TO_DEVICE);
+ }
+
+ ++ssk->tx_tail;
+ return skb;
+}
+
+
+static void sdp_post_recv(struct sdp_sock *ssk)
+{
+ struct sdp_buf *rx_req;
+ int i, rc, frags;
+ dma_addr_t addr;
+ struct device *hwdev;
+ struct ib_sge *sge;
+ struct ib_recv_wr *bad_wr;
+ struct sk_buff *skb;
+ struct page *page;
+ skb_frag_t *frag;
+ struct sdp_bsdh *h;
+ int id = ssk->rx_head;
+
+ /* Now, allocate and repost recv */
+ /* TODO: allocate from cache */
+ skb = sk_stream_alloc_skb(&ssk->isk.sk, sizeof(struct sdp_bsdh),
+ GFP_KERNEL);
+ /* FIXME */
+ BUG_ON(!skb);
+ h = (struct sdp_bsdh *)skb_push(skb, sizeof *h);
+ for (i = 0; i < SDP_MAX_SEND_SKB_FRAGS; ++i) {
+ page = alloc_pages(GFP_KERNEL, 0);
+ BUG_ON(!page);
+ frag = &skb_shinfo(skb)->frags[i];
+ frag->page = page;
+ frag->page_offset = 0;
+ frag->size = PAGE_SIZE;
+ ++skb_shinfo(skb)->nr_frags;
+ skb->len += PAGE_SIZE;
+ skb->data_len += PAGE_SIZE;
+ skb->truesize += PAGE_SIZE;
+ }
+
+ rx_req = ssk->rx_ring + (id & (SDP_RX_SIZE - 1));
+ rx_req->skb = skb;
+ hwdev = ssk->dma_device;
+ sge = ssk->ibsge;
+ addr = dma_map_single(hwdev, h, skb_headlen(skb),
+ DMA_FROM_DEVICE);
+ BUG_ON(dma_mapping_error(addr));
+
+ rx_req->mapping[0] = addr;
+
+ /* TODO: proper error handling */
+ sge->addr = (u64)addr;
+ sge->length = skb_headlen(skb);
+ sge->lkey = ssk->mr->lkey;
+ frags = skb_shinfo(skb)->nr_frags;
+ for (i = 0; i < frags; ++i) {
+ ++sge;
+ addr = dma_map_page(hwdev, skb_shinfo(skb)->frags[i].page,
+ skb_shinfo(skb)->frags[i].page_offset,
+ skb_shinfo(skb)->frags[i].size,
+ DMA_FROM_DEVICE);
+ BUG_ON(dma_mapping_error(addr));
+ rx_req->mapping[i + 1] = addr;
+ sge->addr = addr;
+ sge->length = skb_shinfo(skb)->frags[i].size;
+ sge->lkey = ssk->mr->lkey;
+ }
+
+ ssk->rx_wr.next = NULL;
+ ssk->rx_wr.wr_id = id | SDP_OP_RECV;
+ ssk->rx_wr.sg_list = ssk->ibsge;
+ ssk->rx_wr.num_sge = frags + 1;
+ rc = ib_post_recv(ssk->qp, &ssk->rx_wr, &bad_wr);
+ /* TODO */
+ BUG_ON(rc);
+ ++ssk->rx_head;
+}
+
+void sdp_post_recvs(struct sdp_sock *ssk)
+{
+ int rmem = atomic_read(&ssk->isk.sk.sk_rmem_alloc);
+
+ if (unlikely(!ssk->id))
+ return;
+
+ while ((likely(ssk->rx_head - ssk->rx_tail < SDP_RX_SIZE) &&
+ (ssk->rx_head - ssk->rx_tail - SDP_MIN_BUFS) *
+ SDP_MAX_SEND_SKB_FRAGS * PAGE_SIZE + rmem <
+ ssk->isk.sk.sk_rcvbuf * 0x10) ||
+ unlikely(ssk->rx_head - ssk->rx_tail < SDP_MIN_BUFS))
+ sdp_post_recv(ssk);
+}
+
+struct sk_buff *sdp_recv_completion(struct sdp_sock *ssk, int id)
+{
+ struct sdp_buf *rx_req;
+ struct device *hwdev;
+ struct sk_buff *skb;
+ int i, frags;
+
+ if (unlikely(id != ssk->rx_tail)) {
+ printk(KERN_WARNING "Bogus recv completion id %d tail %d\n",
+ id, ssk->rx_tail);
+ return NULL;
+ }
+
+ hwdev = ssk->dma_device;
+ rx_req = &ssk->rx_ring[id & (SDP_RX_SIZE - 1)];
+ skb = rx_req->skb;
+ dma_unmap_single(hwdev, rx_req->mapping[0], skb_headlen(skb),
+ DMA_FROM_DEVICE);
+ frags = skb_shinfo(skb)->nr_frags;
+ for (i = 0; i < frags; ++i)
+ dma_unmap_page(hwdev, rx_req->mapping[i + 1],
+ skb_shinfo(skb)->frags[i].size,
+ DMA_TO_DEVICE);
+ ++ssk->rx_tail;
+ --ssk->remote_credits;
+ return skb;
+}
+
+/* Here because I do not want queue to fail. */
+static inline int sdp_sock_queue_rcv_skb(struct sock *sk, struct sk_buff *skb)
+{
+ int skb_len;
+
+ skb_set_owner_r(skb, sk);
+
+ skb_len = skb->len;
+
+ skb_queue_tail(&sk->sk_receive_queue, skb);
+
+ if (!sock_flag(sk, SOCK_DEAD))
+ sk->sk_data_ready(sk, skb_len);
+ return 0;
+}
+
+static inline void update_send_head(struct sock *sk, struct sk_buff *skb)
+{
+ sk->sk_send_head = skb->next;
+ if (sk->sk_send_head == (struct sk_buff *)&sk->sk_write_queue)
+ sk->sk_send_head = NULL;
+}
+
+void sdp_post_sends(struct sdp_sock *ssk, int nonagle)
+{
+ /* TODO: nonagle */
+ struct sk_buff *skb;
+ int c;
+
+ if (unlikely(!ssk->id))
+ return;
+
+ while (ssk->bufs > SDP_MIN_BUFS &&
+ ssk->tx_head - ssk->tx_tail < SDP_TX_SIZE &&
+ (skb = ssk->isk.sk.sk_send_head)) {
+ update_send_head(&ssk->isk.sk, skb);
+ __skb_dequeue(&ssk->isk.sk.sk_write_queue);
+ sdp_post_send(ssk, skb, SDP_MID_DATA);
+ }
+ c = ssk->remote_credits;
+ if (likely(c > SDP_MIN_BUFS))
+ c *= 2;
+
+ if (unlikely(c < ssk->rx_head - ssk->rx_tail) &&
+ likely(ssk->bufs > 1) &&
+ likely(ssk->tx_head - ssk->tx_tail < SDP_TX_SIZE)) {
+ skb = sk_stream_alloc_skb(&ssk->isk.sk,
+ sizeof(struct sdp_bsdh),
+ GFP_KERNEL);
+ /* FIXME */
+ BUG_ON(!skb);
+ sdp_post_send(ssk, skb, SDP_MID_DATA);
+ }
+
+ if (unlikely((1 << ssk->isk.sk.sk_state) &
+ (TCPF_FIN_WAIT1 | TCPF_LAST_ACK)) &&
+ !ssk->isk.sk.sk_send_head &&
+ ssk->bufs) {
+ skb = sk_stream_alloc_skb(&ssk->isk.sk,
+ sizeof(struct sdp_bsdh),
+ GFP_KERNEL);
+ /* FIXME */
+ BUG_ON(!skb);
+ sdp_post_send(ssk, skb, SDP_MID_DISCONN);
+ if (ssk->isk.sk.sk_state == TCP_FIN_WAIT1)
+ ssk->isk.sk.sk_state = TCP_FIN_WAIT2;
+ else
+ ssk->isk.sk.sk_state = TCP_CLOSING;
+ }
+}
+
+static void sdp_handle_wc(struct sdp_sock *ssk, struct ib_wc *wc)
+{
+ struct sk_buff *skb;
+ struct sdp_bsdh *h;
+
+ if (wc->wr_id & SDP_OP_RECV) {
+ skb = sdp_recv_completion(ssk, wc->wr_id);
+ if (unlikely(!skb))
+ return;
+
+ if (unlikely(wc->status)) {
+ if (wc->status != IB_WC_WR_FLUSH_ERR)
+ sdp_dbg(&ssk->isk.sk,
+ "Recv completion with error. "
+ "Status %d\n", wc->status);
+ __kfree_skb(skb);
+ sdp_set_error(&ssk->isk.sk, -ECONNRESET);
+ wake_up(&ssk->wq);
+ } else {
+ /* TODO: handle msg < bsdh */
+ sdp_dbg(&ssk->isk.sk,
+ "Recv completion. ID %d Length %d\n",
+ (int)wc->wr_id, wc->byte_len);
+ skb->len = wc->byte_len;
+ skb->data_len = wc->byte_len - sizeof(struct sdp_bsdh);
+ if (unlikely(skb->data_len < 0)) {
+ printk("SDP: FIXME len %d\n", wc->byte_len);
+ }
+ h = (struct sdp_bsdh *)skb->data;
+ skb->h.raw = skb->data;
+ ssk->mseq_ack = ntohl(h->mseq);
+ if (ssk->mseq_ack != (int)wc->wr_id)
+ printk("SDP BUG! mseq %d != wrid %d\n",
+ ssk->mseq_ack, (int)wc->wr_id);
+ ssk->bufs = ntohl(h->mseq_ack) - ssk->tx_head + 1 +
+ ntohs(h->bufs);
+
+ if (likely(h->mid == SDP_MID_DATA) &&
+ likely(skb->data_len > 0)) {
+ skb_pull(skb, sizeof(struct sdp_bsdh));
+ /* TODO: queue can fail? */
+ /* TODO: free unused fragments */
+ sdp_sock_queue_rcv_skb(&ssk->isk.sk, skb);
+ } else if (likely(h->mid == SDP_MID_DATA)) {
+ __kfree_skb(skb);
+ } else if (h->mid == SDP_MID_DISCONN) {
+ skb_pull(skb, sizeof(struct sdp_bsdh));
+ /* TODO: free unused fragments */
+ /* this will wake recvmsg */
+ sdp_sock_queue_rcv_skb(&ssk->isk.sk, skb);
+ sdp_fin(&ssk->isk.sk);
+ } else {
+ /* TODO: Handle other messages */
+ printk("SDP: FIXME MID %d\n", h->mid);
+ __kfree_skb(skb);
+ }
+ sdp_post_recvs(ssk);
+ }
+ } else {
+ skb = sdp_send_completion(ssk, wc->wr_id);
+ if (unlikely(!skb))
+ return;
+ sk_stream_free_skb(&ssk->isk.sk, skb);
+ if (unlikely(wc->status)) {
+ if (wc->status != IB_WC_WR_FLUSH_ERR)
+ sdp_dbg(&ssk->isk.sk,
+ "Send completion with error. "
+ "Status %d\n", wc->status);
+ sdp_set_error(&ssk->isk.sk, -ECONNRESET);
+ wake_up(&ssk->wq);
+ }
+
+ sk_stream_write_space(&ssk->isk.sk);
+ }
+
+ if (likely(!wc->status)) {
+ sdp_post_recvs(ssk);
+ sdp_post_sends(ssk, 0);
+ }
+
+ if (ssk->time_wait && !ssk->isk.sk.sk_send_head &&
+ ssk->tx_head == ssk->tx_tail) {
+ ssk->time_wait = 0;
+ ssk->isk.sk.sk_state = TCP_CLOSE;
+ sdp_dbg(&ssk->isk.sk, "%s: destroy in time wait state\n",
+ __func__);
+ queue_work(sdp_workqueue, &ssk->destroy_work);
+ }
+}
+
+void sdp_completion_handler(struct ib_cq *cq, void *cq_context)
+{
+ struct sock *sk = cq_context;
+ struct sdp_sock *ssk = sdp_sk(sk);
+ schedule_work(&ssk->work);
+}
+
+void sdp_work(void *data)
+{
+ struct sock *sk = (struct sock *)data;
+ struct sdp_sock *ssk = sdp_sk(sk);
+ struct ib_cq *cq;
+ int n, i;
+
+ sdp_dbg(sk, "%s\n", __func__);
+
+ cq = ssk->cq;
+ if (unlikely(!cq))
+ return;
+
+ do {
+ lock_sock(sk);
+ n = ib_poll_cq(cq, SDP_NUM_WC, ssk->ibwc);
+ for (i = 0; i < n; ++i) {
+ sdp_handle_wc(ssk, ssk->ibwc + i);
+ }
+ release_sock(sk);
+ } while (n == SDP_NUM_WC);
+ sk_stream_mem_reclaim(sk);
+ ib_req_notify_cq(cq, IB_CQ_NEXT_COMP);
+ do {
+ lock_sock(sk);
+ n = ib_poll_cq(cq, SDP_NUM_WC, ssk->ibwc);
+ for (i = 0; i < n; ++i) {
+ sdp_handle_wc(ssk, ssk->ibwc + i);
+ }
+ release_sock(sk);
+ } while (n == SDP_NUM_WC);
+}
--- /dev/null
+/*
+ * Copyright (c) 2006 Mellanox Technologies Ltd. All rights reserved.
+ *
+ * This software is available to you under a choice of one of two
+ * licenses. You may choose to be licensed under the terms of the GNU
+ * General Public License (GPL) Version 2, available from the file
+ * COPYING in the main directory of this source tree, or the
+ * OpenIB.org BSD license below:
+ *
+ * Redistribution and use in source and binary forms, with or
+ * without modification, are permitted provided that the following
+ * conditions are met:
+ *
+ * - Redistributions of source code must retain the above
+ * copyright notice, this list of conditions and the following
+ * disclaimer.
+ *
+ * - Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following
+ * disclaimer in the documentation and/or other materials
+ * provided with the distribution.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
+ * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
+ * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
+ * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
+ * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
+ * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
+ * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ *
+ * $Id$
+ */
+#include <asm/semaphore.h>
+#include <linux/device.h>
+#include <linux/in.h>
+#include <linux/err.h>
+#include <linux/module.h>
+#include <linux/moduleparam.h>
+#include <linux/pci.h>
+#include <linux/time.h>
+#include <linux/workqueue.h>
+
+#include <rdma/ib_verbs.h>
+#include <rdma/rdma_cm.h>
+#include <net/tcp_states.h>
+#include "sdp_socket.h"
+#include "sdp.h"
+
+union cma_ip_addr {
+ struct in6_addr ip6;
+ struct {
+ __u32 pad[3];
+ __u32 addr;
+ } ip4;
+};
+
+#define SDP_MAJV_MINV 0x22
+
+/* TODO: too much? Can I avoid having the src/dst and port here? */
+struct sdp_hh {
+ struct sdp_bsdh bsdh;
+ u8 majv_minv;
+ u8 ipv_cap;
+ u8 rsvd1;
+ u8 max_adverts;
+ __u32 desremrcvsz;
+ __u32 localrcvsz;
+ __u16 port;
+ __u16 rsvd2;
+ union cma_ip_addr src_addr;
+ union cma_ip_addr dst_addr;
+};
+
+struct sdp_hah {
+ struct sdp_bsdh bsdh;
+ u8 majv_minv;
+ u8 ipv_cap;
+ u8 rsvd1;
+ u8 ext_max_adverts;
+ __u32 actrcvsz;
+};
+
+static void sdp_cq_event_handler(struct ib_event *event, void *data)
+{
+}
+
+static void sdp_qp_event_handler(struct ib_event *event, void *data)
+{
+}
+
+int sdp_init_qp(struct sock *sk, struct rdma_cm_id *id)
+{
+ struct ib_qp_init_attr qp_init_attr = {
+ .event_handler = sdp_qp_event_handler,
+ .cap.max_send_wr = SDP_TX_SIZE,
+ .cap.max_send_sge = SDP_MAX_SEND_SKB_FRAGS + 1, /* TODO */
+ .cap.max_recv_wr = SDP_RX_SIZE,
+ .cap.max_recv_sge = SDP_MAX_SEND_SKB_FRAGS + 1, /* TODO */
+ .sq_sig_type = IB_SIGNAL_REQ_WR,
+ .qp_type = IB_QPT_RC,
+ };
+ struct ib_device *device = id->device;
+ struct ib_cq *cq;
+ struct ib_pd *pd;
+ int rc;
+
+ sdp_dbg(sk, "%s\n", __func__);
+
+ sdp_sk(sk)->tx_ring = kmalloc(sizeof *sdp_sk(sk)->tx_ring * SDP_TX_SIZE,
+ GFP_KERNEL);
+ if (!sdp_sk(sk)->tx_ring) {
+ rc = -ENOMEM;
+ sdp_warn(sk, "Unable to allocate TX Ring size %zd.\n",
+ sizeof *sdp_sk(sk)->tx_ring * SDP_TX_SIZE);
+ goto err_tx;
+ }
+
+ sdp_sk(sk)->rx_ring = kmalloc(sizeof *sdp_sk(sk)->rx_ring * SDP_RX_SIZE,
+ GFP_KERNEL);
+ if (!sdp_sk(sk)->rx_ring) {
+ rc = -ENOMEM;
+ sdp_warn(sk, "Unable to allocate RX Ring size %zd.\n",
+ sizeof *sdp_sk(sk)->rx_ring * SDP_TX_SIZE);
+ goto err_rx;
+ }
+
+ pd = ib_alloc_pd(device);
+ if (IS_ERR(pd)) {
+ rc = PTR_ERR(pd);
+ sdp_warn(sk, "Unable to allocate PD: %d.\n", rc);
+ goto err_pd;
+ }
+
+ sdp_sk(sk)->mr = ib_get_dma_mr(pd, IB_ACCESS_LOCAL_WRITE);
+ if (IS_ERR(sdp_sk(sk)->mr)) {
+ rc = PTR_ERR(sdp_sk(sk)->mr);
+ sdp_warn(sk, "Unable to get dma MR: %d.\n", rc);
+ goto err_mr;
+ }
+
+ INIT_WORK(&sdp_sk(sk)->work, sdp_work, sdp_sk(sk));
+
+ cq = ib_create_cq(device, sdp_completion_handler, sdp_cq_event_handler,
+ sk, SDP_TX_SIZE + SDP_RX_SIZE);
+
+ if (IS_ERR(cq)) {
+ rc = PTR_ERR(cq);
+ sdp_warn(sk, "Unable to allocate CQ: %d.\n", rc);
+ goto err_cq;
+ }
+
+ qp_init_attr.send_cq = qp_init_attr.recv_cq = cq;
+
+ rc = rdma_create_qp(id, pd, &qp_init_attr);
+ if (rc) {
+ sdp_warn(sk, "Unable to create QP: %d.\n", rc);
+ goto err_qp;
+ }
+ sdp_sk(sk)->cq = cq;
+ sdp_sk(sk)->qp = id->qp;
+ sdp_sk(sk)->dma_device = device->dma_device;
+
+ init_waitqueue_head(&sdp_sk(sk)->wq);
+
+ sdp_post_recvs(sdp_sk(sk));
+
+ sdp_dbg(sk, "%s done\n", __func__);
+ return 0;
+
+err_qp:
+ ib_destroy_cq(cq);
+err_cq:
+ ib_dereg_mr(sdp_sk(sk)->mr);
+err_mr:
+ ib_dealloc_pd(pd);
+err_pd:
+ kfree(sdp_sk(sk)->rx_ring);
+err_rx:
+ kfree(sdp_sk(sk)->tx_ring);
+err_tx:
+ return rc;
+}
+
+int sdp_connect_handler(struct sock *sk, struct rdma_cm_id *id,
+ struct rdma_cm_event *event)
+{
+ struct sockaddr_in *dst_addr;
+ struct sock *child;
+ struct sdp_hh *h;
+ int rc;
+
+ sdp_dbg(sk, "%s %p -> %p\n", __func__, sdp_sk(sk)->id, id);
+
+ child = sk_clone(sk, GFP_KERNEL);
+ if (!child) {
+ return -ENOMEM;
+ }
+
+ INIT_LIST_HEAD(&sdp_sk(child)->accept_queue);
+ INIT_LIST_HEAD(&sdp_sk(child)->backlog_queue);
+ INIT_WORK(&sdp_sk(child)->time_wait_work, sdp_time_wait_work, child);
+ INIT_WORK(&sdp_sk(child)->destroy_work, sdp_destroy_work, child);
+
+ dst_addr = (struct sockaddr_in *)&id->route.addr.dst_addr;
+ inet_sk(child)->dport = dst_addr->sin_port;
+ inet_sk(child)->daddr = dst_addr->sin_addr.s_addr;
+
+ bh_unlock_sock(child);
+ __sock_put(child);
+
+ rc = sdp_init_qp(child, id);
+ if (rc) {
+ sk_common_release(child);
+ return rc;
+ }
+
+ h = event->private_data;
+ sdp_sk(child)->bufs = ntohs(h->bsdh.bufs);
+ sdp_sk(child)->xmit_size_goal = ntohl(h->localrcvsz) -
+ sizeof(struct sdp_bsdh);
+
+ sdp_dbg(child, "%s bufs %d xmit_size_goal %d\n", __func__,
+ sdp_sk(child)->bufs,
+ sdp_sk(child)->xmit_size_goal);
+
+ id->context = child;
+ sdp_sk(child)->id = id;
+
+ list_add_tail(&sdp_sk(child)->backlog_queue, &sdp_sk(sk)->backlog_queue);
+ sdp_sk(child)->parent = sk;
+
+ /* child->sk_write_space(child); */
+ /* child->sk_data_ready(child, 0); */
+ sk->sk_data_ready(sk, 0);
+
+ return 0;
+}
+
+static int sdp_response_handler(struct sock *sk, struct rdma_cm_event *event)
+{
+ struct sdp_hah *h;
+ sdp_dbg(sk, "%s\n", __func__);
+
+ sk->sk_state = TCP_ESTABLISHED;
+
+ /* TODO: If SOCK_KEEPOPEN set, need to reset and start
+ keepalive timer here */
+
+ if (sock_flag(sk, SOCK_DEAD))
+ return 0;
+
+ h = event->private_data;
+ sdp_sk(sk)->bufs = ntohs(h->bsdh.bufs);
+ sdp_sk(sk)->xmit_size_goal = ntohl(h->actrcvsz) -
+ sizeof(struct sdp_bsdh);
+
+ sdp_dbg(sk, "%s bufs %d xmit_size_goal %d\n", __func__,
+ sdp_sk(sk)->bufs,
+ sdp_sk(sk)->xmit_size_goal);
+
+ ib_req_notify_cq(sdp_sk(sk)->cq, IB_CQ_NEXT_COMP);
+
+ sk->sk_state_change(sk);
+ sk_wake_async(sk, 0, POLL_OUT);
+ return 0;
+}
+
+int sdp_connected_handler(struct sock *sk, struct rdma_cm_event *event)
+{
+ struct sock *parent;
+ sdp_dbg(sk, "%s\n", __func__);
+
+ parent = sdp_sk(sk)->parent;
+ BUG_ON(!parent);
+
+ sk->sk_state = TCP_ESTABLISHED;
+
+ /* TODO: If SOCK_KEEPOPEN set, need to reset and start
+ keepalive timer here */
+
+ if (sock_flag(sk, SOCK_DEAD))
+ return 0;
+
+ lock_sock(parent);
+ if (!sdp_sk(parent)->id) { /* TODO: look at SOCK_DEAD? */
+ sdp_dbg(sk, "parent is going away.\n");
+ goto done;
+ }
+#if 0
+ /* TODO: backlog */
+ if (sk_acceptq_is_full(parent)) {
+ sdp_dbg(parent, "%s ECONNREFUSED: parent accept queue full: %d > %d\n", __func__, parent->sk_ack_backlog, parent->sk_max_ack_backlog);
+ release_sock(parent);
+ return -ECONNREFUSED;
+ }
+#endif
+ sk_acceptq_added(parent);
+ sdp_dbg(parent, "%s child connection established\n", __func__);
+ list_del_init(&sdp_sk(sk)->backlog_queue);
+ list_add_tail(&sdp_sk(sk)->accept_queue, &sdp_sk(parent)->accept_queue);
+
+ parent->sk_state_change(parent);
+ sk_wake_async(parent, 0, POLL_OUT);
+done:
+ release_sock(parent);
+
+ return 0;
+}
+
+void sdp_disconnected_handler(struct sock *sk)
+{
+ sdp_dbg(sk, "%s\n", __func__);
+}
+
+int sdp_cma_handler(struct rdma_cm_id *id, struct rdma_cm_event *event)
+{
+ struct rdma_conn_param conn_param;
+ struct sock *parent = NULL;
+ struct sock *child = NULL;
+ struct sock *sk;
+ struct sdp_hah hah;
+ struct sdp_hh hh;
+
+ int rc = 0;
+
+ sk = id->context;
+ if (!sk) {
+ sdp_dbg(NULL, "cm_id is being torn down, event %d\n",
+ event->event);
+ return event->event == RDMA_CM_EVENT_CONNECT_REQUEST ?
+ -EINVAL : 0;
+ }
+
+ lock_sock(sk);
+ sdp_dbg(sk, "%s event %d id %p\n", __func__, event->event, id);
+ if (!sdp_sk(sk)->id) {
+ sdp_dbg(sk, "socket is being torn down\n");
+ rc = event->event == RDMA_CM_EVENT_CONNECT_REQUEST ?
+ -EINVAL : 0;
+ release_sock(sk);
+ return rc;
+ }
+
+ switch (event->event) {
+ case RDMA_CM_EVENT_ADDR_RESOLVED:
+ sdp_dbg(sk, "RDMA_CM_EVENT_ADDR_RESOLVED\n");
+ rc = rdma_resolve_route(id, SDP_ROUTE_TIMEOUT);
+ break;
+ case RDMA_CM_EVENT_ADDR_ERROR:
+ sdp_dbg(sk, "RDMA_CM_EVENT_ADDR_ERROR\n");
+ rc = -ENETUNREACH;
+ break;
+ case RDMA_CM_EVENT_ROUTE_RESOLVED:
+ sdp_dbg(sk, "RDMA_CM_EVENT_ROUTE_RESOLVED : %p\n", id);
+ rc = sdp_init_qp(sk, id);
+ if (rc)
+ break;
+ sdp_sk(sk)->remote_credits = sdp_sk(sk)->rx_head -
+ sdp_sk(sk)->rx_tail;
+ memset(&hh, 0, sizeof hh);
+ hh.bsdh.mid = SDP_MID_HELLO;
+ hh.bsdh.bufs = htons(sdp_sk(sk)->remote_credits);
+ hh.majv_minv = SDP_MAJV_MINV;
+ hh.localrcvsz = hh.desremrcvsz = htonl(SDP_MAX_SEND_SKB_FRAGS *
+ PAGE_SIZE + sizeof(struct sdp_bsdh));
+ hh.max_adverts = 0x1;
+
+ memset(&conn_param, 0, sizeof conn_param);
+ conn_param.private_data_len = sizeof hh;
+ conn_param.private_data = &hh;
+ conn_param.responder_resources = 4 /* TODO */;
+ conn_param.initiator_depth = 4 /* TODO */;
+ conn_param.retry_count = SDP_RETRY_COUNT;
+ rc = rdma_connect(id, &conn_param);
+ break;
+ case RDMA_CM_EVENT_ROUTE_ERROR:
+ sdp_dbg(sk, "RDMA_CM_EVENT_ROUTE_ERROR : %p\n", id);
+ rc = -ETIMEDOUT;
+ break;
+ case RDMA_CM_EVENT_CONNECT_REQUEST:
+ sdp_dbg(sk, "RDMA_CM_EVENT_CONNECT_REQUEST\n");
+ rc = sdp_connect_handler(sk, id, event);
+ if (rc) {
+ rdma_reject(id, NULL, 0);
+ break;
+ }
+ child = id->context;
+ sdp_sk(child)->remote_credits = sdp_sk(child)->rx_head -
+ sdp_sk(child)->rx_tail;
+ memset(&hah, 0, sizeof hah);
+ hah.bsdh.mid = SDP_MID_HELLO_ACK;
+ hah.bsdh.bufs = htons(sdp_sk(child)->remote_credits);
+ hah.majv_minv = SDP_MAJV_MINV;
+ hah.actrcvsz = htonl(SDP_MAX_SEND_SKB_FRAGS * PAGE_SIZE +
+ sizeof(struct sdp_bsdh));
+ memset(&conn_param, 0, sizeof conn_param);
+ conn_param.private_data_len = sizeof hah;
+ conn_param.private_data = &hah;
+ conn_param.responder_resources = 4 /* TODO */;
+ conn_param.initiator_depth = 4 /* TODO */;
+ conn_param.retry_count = SDP_RETRY_COUNT;
+ rc = rdma_accept(id, &conn_param);
+ if (rc) {
+ sdp_sk(child)->id = NULL;
+ id->qp = NULL;
+ id->context = NULL;
+ parent = sdp_sk(child)->parent; /* TODO: hold ? */
+ }
+ break;
+ case RDMA_CM_EVENT_CONNECT_RESPONSE:
+ sdp_dbg(sk, "RDMA_CM_EVENT_CONNECT_RESPONSE\n");
+ rc = sdp_response_handler(sk, event);
+ if (rc)
+ rdma_reject(id, NULL, 0);
+ else
+ rc = rdma_accept(id, NULL);
+ break;
+ case RDMA_CM_EVENT_CONNECT_ERROR:
+ sdp_dbg(sk, "RDMA_CM_EVENT_CONNECT_ERROR\n");
+ rc = -ETIMEDOUT;
+ break;
+ case RDMA_CM_EVENT_UNREACHABLE:
+ sdp_dbg(sk, "RDMA_CM_EVENT_UNREACHABLE\n");
+ rc = -ENETUNREACH;
+ break;
+ case RDMA_CM_EVENT_REJECTED:
+ sdp_dbg(sk, "RDMA_CM_EVENT_REJECTED\n");
+ rc = -ECONNREFUSED;
+ break;
+ case RDMA_CM_EVENT_ESTABLISHED:
+ sdp_dbg(sk, "RDMA_CM_EVENT_ESTABLISHED\n");
+ rc = sdp_connected_handler(sk, event);
+ break;
+ case RDMA_CM_EVENT_DISCONNECTED:
+ sdp_dbg(sk, "RDMA_CM_EVENT_DISCONNECTED\n");
+ rdma_disconnect(id);
+ sdp_disconnected_handler(sk);
+ rc = -ECONNRESET;
+ break;
+ case RDMA_CM_EVENT_DEVICE_REMOVAL:
+ sdp_warn(sk, "RDMA_CM_EVENT_DEVICE_REMOVAL\n");
+ sdp_disconnected_handler(sk);
+ rc = -ENETRESET;
+ break;
+ default:
+ printk(KERN_ERR "SDP: Unexpected CMA event: %d\n",
+ event->event);
+ rc = -ECONNABORTED;
+ break;
+ }
+
+ sdp_dbg(sk, "%s event %d handled\n", __func__, event->event);
+
+ if (rc && sdp_sk(sk)->id == id) {
+ child = sk;
+ sdp_sk(sk)->id = NULL;
+ id->qp = NULL;
+ id->context = NULL;
+ sdp_set_error(sk, rc);
+ parent = sdp_sk(sk)->parent;
+ }
+
+ release_sock(sk);
+
+ sdp_dbg(sk, "event %d done. status %d\n", event->event, rc);
+
+ if (parent) {
+ sdp_dbg(parent, "deleting child %d done. status %d\n", event->event, rc);
+ lock_sock(parent);
+ if (!sdp_sk(parent)->id) { /* TODO: look at SOCK_DEAD? */
+ sdp_dbg(sk, "parent is going away.\n");
+ goto done;
+ }
+ list_del_init(&sdp_sk(child)->backlog_queue);
+ if (!list_empty(&sdp_sk(child)->accept_queue)) {
+ list_del_init(&sdp_sk(child)->accept_queue);
+ sk_acceptq_removed(parent);
+ }
+done:
+ release_sock(parent);
+ sk_common_release(child);
+ }
+ return rc;
+}
--- /dev/null
+/*
+ * Copyright (c) 2006 Mellanox Technologies Ltd. All rights reserved.
+ *
+ * This software is available to you under a choice of one of two
+ * licenses. You may choose to be licensed under the terms of the GNU
+ * General Public License (GPL) Version 2, available from the file
+ * COPYING in the main directory of this source tree, or the
+ * OpenIB.org BSD license below:
+ *
+ * Redistribution and use in source and binary forms, with or
+ * without modification, are permitted provided that the following
+ * conditions are met:
+ *
+ * - Redistributions of source code must retain the above
+ * copyright notice, this list of conditions and the following
+ * disclaimer.
+ *
+ * - Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following
+ * disclaimer in the documentation and/or other materials
+ * provided with the distribution.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
+ * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
+ * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
+ * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
+ * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
+ * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
+ * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+#if defined(__ia64__)
+/* csum_partial_copy_from_user is not exported on ia64.
+ We don't really need it for SDP - skb_copy_to_page happens to call it
+ but for SDP HW checksum is always set, so ... */
+
+#include <linux/errno.h>
+#include <asm/checksum.h>
+static inline
+unsigned int csum_partial_copy_from_user_new (const char *src, char *dst,
+ int len, unsigned int sum,
+ int *errp)
+{
+ *errp = -EINVAL;
+ return 0;
+}
+
+#define csum_partial_copy_from_user csum_partial_copy_from_user_new
+#endif
+
+#include <linux/tcp.h>
+#include <asm/ioctls.h>
+#include <linux/workqueue.h>
+#include <linux/net.h>
+#include <linux/socket.h>
+#include <net/protocol.h>
+#include <net/inet_common.h>
+#include <rdma/rdma_cm.h>
+#include <rdma/ib_verbs.h>
+/* TODO: remove when sdp_socket.h becomes part of include/linux/socket.h */
+#include "sdp_socket.h"
+#include "sdp.h"
+#include <linux/delay.h>
+
+MODULE_AUTHOR("Michael S. Tsirkin");
+MODULE_DESCRIPTION("InfiniBand SDP module");
+MODULE_LICENSE("Dual BSD/GPL");
+
+#ifdef CONFIG_INFINIBAND_SDP_DEBUG
+int sdp_debug_level;
+
+module_param_named(debug_level, sdp_debug_level, int, 0644);
+MODULE_PARM_DESC(debug_level, "Enable debug tracing if > 0.");
+#endif
+
+struct workqueue_struct *sdp_workqueue;
+
+static int sdp_get_port(struct sock *sk, unsigned short snum)
+{
+ struct sdp_sock *ssk = sdp_sk(sk);
+ struct sockaddr_in *src_addr;
+ int rc;
+
+ struct sockaddr_in addr = {
+ .sin_family = AF_INET,
+ .sin_port = htons(snum),
+ .sin_addr.s_addr = inet_sk(sk)->rcv_saddr,
+ };
+
+ sdp_dbg(sk, "%s: %u.%u.%u.%u:%hu\n", __func__,
+ NIPQUAD(addr.sin_addr.s_addr), ntohs(addr.sin_port));
+
+ if (!ssk->id)
+ ssk->id = rdma_create_id(sdp_cma_handler, sk, RDMA_PS_SDP);
+
+ if (!ssk->id)
+ return -ENOMEM;
+
+ /* IP core seems to bind many times to the same address */
+ /* TODO: I don't really understand why. Find out. */
+ if (!memcmp(&addr, &ssk->id->route.addr.src_addr, sizeof addr))
+ return 0;
+
+ rc = rdma_bind_addr(ssk->id, (struct sockaddr *)&addr);
+ if (rc) {
+ rdma_destroy_id(ssk->id);
+ ssk->id = NULL;
+ return rc;
+ }
+
+ src_addr = (struct sockaddr_in *)&(ssk->id->route.addr.src_addr);
+ inet_sk(sk)->num = ntohs(src_addr->sin_port);
+ return 0;
+}
+
+/* TODO: linger? */
+void sdp_close_sk(struct sock *sk)
+{
+ struct sdp_sock *ssk = sdp_sk(sk);
+ struct rdma_cm_id *id = NULL;
+ struct ib_pd *pd = NULL;
+ struct ib_cq *cq = NULL;
+
+ sdp_dbg(sk, "%s\n", __func__);
+
+ lock_sock(sk);
+
+ sk->sk_send_head = NULL;
+ skb_queue_purge(&sk->sk_write_queue);
+
+ id = ssk->id;
+ if (ssk->id) {
+ id->qp = NULL;
+ ssk->id = NULL;
+ release_sock(sk);
+ rdma_destroy_id(id);
+ } else
+ release_sock(sk);
+
+ if (ssk->qp) {
+ pd = ssk->qp->pd;
+ cq = ssk->cq;
+ sdp_sk(sk)->cq = NULL;
+ flush_scheduled_work();
+ ib_destroy_qp(ssk->qp);
+
+ while (ssk->rx_head != ssk->rx_tail) {
+ struct sk_buff *skb;
+ skb = sdp_recv_completion(ssk, ssk->rx_tail);
+ if (!skb)
+ break;
+ __kfree_skb(skb);
+ }
+ while (ssk->tx_head != ssk->tx_tail) {
+ struct sk_buff *skb;
+ skb = sdp_send_completion(ssk, ssk->tx_tail);
+ if (!skb)
+ break;
+ __kfree_skb(skb);
+ }
+ }
+
+ if (cq) {
+ ib_destroy_cq(cq);
+ flush_scheduled_work();
+ }
+
+ if (ssk->mr)
+ ib_dereg_mr(ssk->mr);
+
+ if (pd)
+ ib_dealloc_pd(pd);
+
+ skb_queue_purge(&sk->sk_receive_queue);
+
+ kfree(sdp_sk(sk)->rx_ring);
+ kfree(sdp_sk(sk)->tx_ring);
+
+ sdp_dbg(sk, "%s done; releasing sock\n", __func__);
+}
+
+static void sdp_destruct(struct sock *sk)
+{
+ struct sdp_sock *ssk = sdp_sk(sk);
+ struct sdp_sock *s, *t;
+
+ sdp_dbg(sk, "%s\n", __func__);
+
+ sdp_close_sk(sk);
+
+ if (ssk->parent)
+ goto done;
+
+ list_for_each_entry_safe(s, t, &ssk->backlog_queue, backlog_queue) {
+ sk_common_release(&s->isk.sk);
+ }
+ list_for_each_entry_safe(s, t, &ssk->accept_queue, accept_queue) {
+ sk_common_release(&s->isk.sk);
+ }
+done:
+ sdp_dbg(sk, "%s done\n", __func__);
+}
+
+static void sdp_send_active_reset(struct sock *sk, gfp_t priority)
+{
+ sk->sk_prot->disconnect(sk, 0);
+}
+
+/*
+ * State processing on a close.
+ * TCP_ESTABLISHED -> TCP_FIN_WAIT1 -> TCP_FIN_WAIT2 -> TCP_CLOSE
+ */
+
+static int sdp_close_state(struct sock *sk)
+{
+ if ((1 << sk->sk_state) & ~(TCPF_ESTABLISHED | TCPF_CLOSE_WAIT))
+ return 0;
+
+ if (sk->sk_state == TCP_ESTABLISHED)
+ sk->sk_state = TCP_FIN_WAIT1;
+ else if (sk->sk_state == TCP_CLOSE_WAIT)
+ sk->sk_state = TCP_LAST_ACK;
+ else
+ return 0;
+ return 1;
+}
+
+/* Like tcp_close */
+static void sdp_close(struct sock *sk, long timeout)
+{
+ struct sk_buff *skb;
+ int data_was_unread = 0;
+
+ lock_sock(sk);
+
+ sdp_dbg(sk, "%s\n", __func__);
+
+ sk->sk_shutdown = SHUTDOWN_MASK;
+ if (sk->sk_state == TCP_LISTEN || sk->sk_state == TCP_SYN_SENT) {
+ sdp_set_state(sk, TCP_CLOSE);
+
+ /* Special case: stop listening.
+ This is done by sdp_destruct. */
+ goto adjudge_to_death;
+ }
+
+ /* We need to flush the recv. buffs. We do this only on the
+ * descriptor close, not protocol-sourced closes, because the
+ * reader process may not have drained the data yet!
+ */
+ while ((skb = __skb_dequeue(&sk->sk_receive_queue)) != NULL) {
+ data_was_unread = 1;
+ __kfree_skb(skb);
+ }
+
+ sk_stream_mem_reclaim(sk);
+
+ /* As outlined in draft-ietf-tcpimpl-prob-03.txt, section
+ * 3.10, we send a RST here because data was lost. To
+ * witness the awful effects of the old behavior of always
+ * doing a FIN, run an older 2.1.x kernel or 2.0.x, start
+ * a bulk GET in an FTP client, suspend the process, wait
+ * for the client to advertise a zero window, then kill -9
+ * the FTP client, wheee... Note: timeout is always zero
+ * in such a case.
+ */
+ if (data_was_unread) {
+ /* Unread data was tossed, zap the connection. */
+ NET_INC_STATS_USER(LINUX_MIB_TCPABORTONCLOSE);
+ sdp_set_state(sk, TCP_CLOSE);
+ sdp_send_active_reset(sk, GFP_KERNEL);
+ } else if (sock_flag(sk, SOCK_LINGER) && !sk->sk_lingertime) {
+ /* Check zero linger _after_ checking for unread data. */
+ sk->sk_prot->disconnect(sk, 0);
+ NET_INC_STATS_USER(LINUX_MIB_TCPABORTONDATA);
+ } else if (sdp_close_state(sk)) {
+ /* We FIN if the application ate all the data before
+ * zapping the connection.
+ */
+
+ sdp_post_sends(sdp_sk(sk), 0);
+ }
+
+ sk_stream_wait_close(sk, timeout);
+
+adjudge_to_death:
+ /* It is the last release_sock in its life. It will remove backlog. */
+ release_sock(sk);
+ /* Now socket is owned by kernel and we acquire lock
+ to finish close. No need to check for user refs.
+ */
+ lock_sock(sk);
+
+ sock_hold(sk);
+ sock_orphan(sk);
+
+ /* This is a (useful) BSD violating of the RFC. There is a
+ * problem with TCP as specified in that the other end could
+ * keep a socket open forever with no application left this end.
+ * We use a 3 minute timeout (about the same as BSD) then kill
+ * our end. If they send after that then tough - BUT: long enough
+ * that we won't make the old 4*rto = almost no time - whoops
+ * reset mistake.
+ *
+ * Nope, it was not mistake. It is really desired behaviour
+ * f.e. on http servers, when such sockets are useless, but
+ * consume significant resources. Let's do it with special
+ * linger2 option. --ANK
+ */
+
+ if (sk->sk_state == TCP_FIN_WAIT2 &&
+ !sk->sk_send_head &&
+ sdp_sk(sk)->tx_head == sdp_sk(sk)->tx_tail) {
+ sk->sk_state = TCP_CLOSE;
+ }
+
+ if ((1 << sk->sk_state) & (TCPF_FIN_WAIT1 | TCPF_FIN_WAIT2)) {
+ sdp_sk(sk)->time_wait = 1;
+ /* TODO: liger2 unimplemented.
+ We should wait 3.5 * rto. How do I know rto? */
+ /* TODO: tcp_fin_time to get timeout */
+ sdp_dbg(sk, "%s: entering time wait refcnt %d\n", __func__,
+ atomic_read(&sk->sk_refcnt));
+ atomic_inc(sk->sk_prot->orphan_count);
+ queue_delayed_work(sdp_workqueue, &sdp_sk(sk)->time_wait_work,
+ TCP_FIN_TIMEOUT);
+ goto out;
+ }
+
+ /* TODO: limit number of orphaned sockets.
+ TCP has sysctl_tcp_mem and sysctl_tcp_max_orphans */
+ sock_put(sk);
+
+ /* Otherwise, socket is reprieved until protocol close. */
+out:
+ sdp_dbg(sk, "%s: last socket put %d\n", __func__,
+ atomic_read(&sk->sk_refcnt));
+ release_sock(sk);
+ sk_common_release(sk);
+}
+
+static int sdp_connect(struct sock *sk, struct sockaddr *uaddr, int addr_len)
+{
+ struct sdp_sock *ssk = sdp_sk(sk);
+ struct sockaddr_in src_addr = {
+ .sin_family = AF_INET,
+ .sin_port = htons(inet_sk(sk)->sport),
+ .sin_addr.s_addr = inet_sk(sk)->saddr,
+ };
+ int rc;
+
+ if (addr_len < sizeof(struct sockaddr_in))
+ return -EINVAL;
+
+ if (uaddr->sa_family != AF_INET)
+ return -EAFNOSUPPORT;
+
+ if (!ssk->id) {
+ rc = sdp_get_port(sk, 0);
+ if (rc)
+ return rc;
+ }
+
+ sdp_dbg(sk, "%s %u.%u.%u.%u:%hu -> %u.%u.%u.%u:%hu\n", __func__,
+ NIPQUAD(src_addr.sin_addr.s_addr),
+ ntohs(src_addr.sin_port),
+ NIPQUAD(((struct sockaddr_in *)uaddr)->sin_addr.s_addr),
+ ntohs(((struct sockaddr_in *)uaddr)->sin_port));
+
+ if (!ssk->id) {
+ printk("??? ssk->id == NULL. Ohh\n");
+ return -EINVAL;
+ }
+
+ rc = rdma_resolve_addr(ssk->id, (struct sockaddr *)&src_addr,
+ uaddr, SDP_RESOLVE_TIMEOUT);
+ if (rc) {
+ sdp_warn(sk, "rdma_resolve_addr failed: %d\n", rc);
+ return rc;
+ }
+
+ sk->sk_state = TCP_SYN_SENT;
+ return 0;
+}
+
+static int sdp_disconnect(struct sock *sk, int flags)
+{
+ struct sdp_sock *ssk = sdp_sk(sk);
+ int rc = 0;
+ sdp_dbg(sk, "%s\n", __func__);
+ if (ssk->id)
+ rc = rdma_disconnect(ssk->id);
+ return rc;
+}
+
+/* Like inet_csk_wait_for_connect */
+static int sdp_wait_for_connect(struct sock *sk, long timeo)
+{
+ struct sdp_sock *ssk = sdp_sk(sk);
+ DEFINE_WAIT(wait);
+ int err;
+
+ sdp_dbg(sk, "%s\n", __func__);
+ /*
+ * True wake-one mechanism for incoming connections: only
+ * one process gets woken up, not the 'whole herd'.
+ * Since we do not 'race & poll' for established sockets
+ * anymore, the common case will execute the loop only once.
+ *
+ * Subtle issue: "add_wait_queue_exclusive()" will be added
+ * after any current non-exclusive waiters, and we know that
+ * it will always _stay_ after any new non-exclusive waiters
+ * because all non-exclusive waiters are added at the
+ * beginning of the wait-queue. As such, it's ok to "drop"
+ * our exclusiveness temporarily when we get woken up without
+ * having to remove and re-insert us on the wait queue.
+ */
+ for (;;) {
+ prepare_to_wait_exclusive(sk->sk_sleep, &wait,
+ TASK_INTERRUPTIBLE);
+ release_sock(sk);
+ if (list_empty(&ssk->accept_queue)) {
+ sdp_dbg(sk, "%s schedule_timeout\n", __func__);
+ timeo = schedule_timeout(timeo);
+ sdp_dbg(sk, "%s schedule_timeout done\n", __func__);
+ }
+ sdp_dbg(sk, "%s lock_sock\n", __func__);
+ lock_sock(sk);
+ sdp_dbg(sk, "%s lock_sock done\n", __func__);
+ err = 0;
+ if (!list_empty(&ssk->accept_queue))
+ break;
+ err = -EINVAL;
+ if (sk->sk_state != TCP_LISTEN)
+ break;
+ err = sock_intr_errno(timeo);
+ if (signal_pending(current))
+ break;
+ err = -EAGAIN;
+ if (!timeo)
+ break;
+ }
+ finish_wait(sk->sk_sleep, &wait);
+ sdp_dbg(sk, "%s returns %d\n", __func__, err);
+ return err;
+}
+
+/* Consider using request_sock_queue instead of duplicating all this */
+/* Like inet_csk_accept */
+static struct sock *sdp_accept(struct sock *sk, int flags, int *err)
+{
+ struct sdp_sock *newssk, *ssk;
+ struct sock *newsk;
+ int error;
+
+ sdp_dbg(sk, "%s state %d expected %d *err %d\n", __func__,
+ sk->sk_state, TCP_LISTEN, *err);
+
+ ssk = sdp_sk(sk);
+ lock_sock(sk);
+
+ /* We need to make sure that this socket is listening,
+ * and that it has something pending.
+ */
+ error = -EINVAL;
+ if (sk->sk_state != TCP_LISTEN)
+ goto out_err;
+
+ /* Find already established connection */
+ if (list_empty(&ssk->accept_queue)) {
+ long timeo = sock_rcvtimeo(sk, flags & O_NONBLOCK);
+
+ /* If this is a non blocking socket don't sleep */
+ error = -EAGAIN;
+ if (!timeo)
+ goto out_err;
+
+ error = sdp_wait_for_connect(sk, timeo);
+ if (error)
+ goto out_err;
+ }
+
+ newssk = list_entry(ssk->accept_queue.next, struct sdp_sock, accept_queue);
+ list_del_init(&newssk->accept_queue);
+ newssk->parent = NULL;
+ sk_acceptq_removed(sk);
+ newsk = &newssk->isk.sk;
+ sdp_dbg(sk, "%s: ib_req_notify_cq\n", __func__);
+ ib_req_notify_cq(newssk->cq, IB_CQ_NEXT_COMP);
+ /* TODO: poll cq here */
+out:
+ release_sock(sk);
+ sdp_dbg(sk, "%s: status %d sk %p newsk %p\n", __func__,
+ *err, sk, newsk);
+ return newsk;
+out_err:
+ sdp_dbg(sk, "%s: error %d\n", __func__, error);
+ newsk = NULL;
+ *err = error;
+ goto out;
+}
+
+static int sdp_ioctl(struct sock *sk, int cmd, unsigned long arg)
+{
+ sdp_dbg(sk, "%s\n", __func__);
+ /* TODO: Need to handle:
+ case SIOCINQ:
+ case SIOCOUTQ:
+ case SIOCATMARK:
+ */
+ return -ENOIOCTLCMD;
+}
+
+void sdp_destroy_work(void *data)
+{
+ struct sock *sk = data;
+ sdp_dbg(sk, "%s: refcnt %d\n", __func__, atomic_read(&sk->sk_refcnt));
+
+ cancel_delayed_work(&sdp_sk(sk)->time_wait_work);
+ atomic_dec(sk->sk_prot->orphan_count);
+
+ sock_put(sk);
+}
+
+void sdp_time_wait_work(void *data)
+{
+ struct sock *sk = data;
+ lock_sock(sk);
+ sdp_dbg(sk, "%s\n", __func__);
+
+ if (!sdp_sk(sk)->time_wait) {
+ release_sock(sk);
+ return;
+ }
+
+ sdp_dbg(sk, "%s: refcnt %d\n", __func__, atomic_read(&sk->sk_refcnt));
+
+ sk->sk_state = TCP_CLOSE;
+ sdp_sk(sk)->time_wait = 0;
+ release_sock(sk);
+
+ atomic_dec(sk->sk_prot->orphan_count);
+ sock_put(data);
+}
+
+static int sdp_init_sock(struct sock *sk)
+{
+ struct sdp_sock *ssk = sdp_sk(sk);
+
+ sdp_dbg(sk, "%s\n", __func__);
+
+ INIT_LIST_HEAD(&ssk->accept_queue);
+ INIT_LIST_HEAD(&ssk->backlog_queue);
+ INIT_WORK(&ssk->time_wait_work, sdp_time_wait_work, sk);
+ INIT_WORK(&ssk->destroy_work, sdp_destroy_work, sk);
+
+ ssk->tx_head = 1;
+ ssk->tx_tail = 1;
+ ssk->rx_head = 1;
+ ssk->rx_tail = 1;
+ sk->sk_route_caps |= NETIF_F_SG | NETIF_F_NO_CSUM;
+ return 0;
+}
+
+static void sdp_shutdown(struct sock *sk, int how)
+{
+ struct sdp_sock *ssk = sdp_sk(sk);
+
+ sdp_dbg(sk, "%s\n", __func__);
+ if (!(how & SEND_SHUTDOWN))
+ return;
+
+ if ((1 << sk->sk_state) & ~(TCPF_ESTABLISHED | TCPF_CLOSE_WAIT))
+ return;
+
+ if (sk->sk_state == TCP_ESTABLISHED)
+ sk->sk_state = TCP_FIN_WAIT1;
+ else if (sk->sk_state == TCP_CLOSE_WAIT)
+ sk->sk_state = TCP_LAST_ACK;
+ else
+ return;
+
+ sdp_post_sends(ssk, 0);
+}
+
+static inline void sdp_push_pending_frames(struct sock *sk)
+{
+}
+
+/* SOL_SOCKET level options are handled by sock_setsockopt */
+static int sdp_setsockopt(struct sock *sk, int level, int optname,
+ char __user *optval, int optlen)
+{
+ struct sdp_sock *ssk = sdp_sk(sk);
+ int val;
+ int err = 0;
+
+ sdp_dbg(sk, "%s\n", __func__);
+ if (level != SOL_TCP)
+ return -ENOPROTOOPT;
+
+ if (optlen < sizeof(int))
+ return -EINVAL;
+
+ if (get_user(val, (int __user *)optval))
+ return -EFAULT;
+
+ lock_sock(sk);
+
+ switch (optname) {
+ case TCP_NODELAY:
+ if (val) {
+ /* TCP_NODELAY is weaker than TCP_CORK, so that
+ * this option on corked socket is remembered, but
+ * it is not activated until cork is cleared.
+ *
+ * However, when TCP_NODELAY is set we make
+ * an explicit push, which overrides even TCP_CORK
+ * for currently queued segments.
+ */
+ ssk->nonagle |= TCP_NAGLE_OFF|TCP_NAGLE_PUSH;
+ sdp_push_pending_frames(sk);
+ } else {
+ ssk->nonagle &= ~TCP_NAGLE_OFF;
+ }
+ break;
+ case TCP_CORK:
+ /* When set indicates to always queue non-full frames.
+ * Later the user clears this option and we transmit
+ * any pending partial frames in the queue. This is
+ * meant to be used alongside sendfile() to get properly
+ * filled frames when the user (for example) must write
+ * out headers with a write() call first and then use
+ * sendfile to send out the data parts.
+ *
+ * TCP_CORK can be set together with TCP_NODELAY and it is
+ * stronger than TCP_NODELAY.
+ */
+ if (val) {
+ ssk->nonagle |= TCP_NAGLE_CORK;
+ } else {
+ ssk->nonagle &= ~TCP_NAGLE_CORK;
+ if (ssk->nonagle&TCP_NAGLE_OFF)
+ ssk->nonagle |= TCP_NAGLE_PUSH;
+ sdp_push_pending_frames(sk);
+ }
+ break;
+ default:
+ err = -ENOPROTOOPT;
+ break;
+ }
+
+ release_sock(sk);
+ return err;
+}
+
+/* SOL_SOCKET level options are handled by sock_getsockopt */
+static int sdp_getsockopt(struct sock *sk, int level, int optname,
+ char __user *optval, int __user *option)
+{
+ /* TODO */
+ struct sdp_sock *ssk = sdp_sk(sk);
+ int val, len;
+
+ sdp_dbg(sk, "%s\n", __func__);
+
+ if (level != SOL_TCP)
+ return -EOPNOTSUPP;
+
+ if (get_user(len, option))
+ return -EFAULT;
+
+ len = min_t(unsigned int, len, sizeof(int));
+
+ if (len < 0)
+ return -EINVAL;
+
+ switch (optname) {
+ case TCP_NODELAY:
+ val = !!(ssk->nonagle&TCP_NAGLE_OFF);
+ break;
+ case TCP_CORK:
+ val = !!(ssk->nonagle&TCP_NAGLE_CORK);
+ break;
+ default:
+ return -ENOPROTOOPT;
+ }
+
+ if (put_user(len, option))
+ return -EFAULT;
+ if (copy_to_user(optval, &val, len))
+ return -EFAULT;
+ return 0;
+}
+
+/* Like tcp_recv_urg */
+/*
+ * Handle reading urgent data. BSD has very simple semantics for
+ * this, no blocking and very strange errors 8)
+ */
+
+static int sdp_recv_urg(struct sock *sk, long timeo,
+ struct msghdr *msg, int len, int flags,
+ int *addr_len)
+{
+ struct sdp_sock *ssk = sdp_sk(sk);
+
+ /* No URG data to read. */
+ if (sock_flag(sk, SOCK_URGINLINE) || !ssk->urg_data ||
+ ssk->urg_data == TCP_URG_READ)
+ return -EINVAL; /* Yes this is right ! */
+
+ if (sk->sk_state == TCP_CLOSE && !sock_flag(sk, SOCK_DONE))
+ return -ENOTCONN;
+
+ if (ssk->urg_data & TCP_URG_VALID) {
+ int err = 0;
+ char c = ssk->urg_data;
+
+ if (!(flags & MSG_PEEK))
+ ssk->urg_data = TCP_URG_READ;
+
+ /* Read urgent data. */
+ msg->msg_flags |= MSG_OOB;
+
+ if (len > 0) {
+ if (!(flags & MSG_TRUNC))
+ err = memcpy_toiovec(msg->msg_iov, &c, 1);
+ len = 1;
+ } else
+ msg->msg_flags |= MSG_TRUNC;
+
+ return err ? -EFAULT : len;
+ }
+
+ if (sk->sk_state == TCP_CLOSE || (sk->sk_shutdown & RCV_SHUTDOWN))
+ return 0;
+
+ /* Fixed the recv(..., MSG_OOB) behaviour. BSD docs and
+ * the available implementations agree in this case:
+ * this call should never block, independent of the
+ * blocking state of the socket.
+ * Mike <pall@rz.uni-karlsruhe.de>
+ */
+ return -EAGAIN;
+}
+
+static inline int sdp_has_urgent_data(struct sk_buff *skb)
+{
+ /* TODO: handle inline urgent data */
+ return 0;
+}
+
+static void sdp_rcv_space_adjust(struct sock *sk)
+{
+ sdp_post_recvs(sdp_sk(sk));
+ sdp_post_sends(sdp_sk(sk), 0);
+}
+
+static unsigned int sdp_current_mss(struct sock *sk, int large_allowed)
+{
+ /* TODO */
+ return PAGE_SIZE;
+}
+
+static int forced_push(struct sdp_sock *sk)
+{
+ /* TODO */
+ return 0;
+}
+
+static void sdp_mark_push(struct sdp_sock *ssk, struct sk_buff *skb)
+{
+ TCP_SKB_CB(skb)->flags |= TCPCB_FLAG_PSH;
+ ssk->pushed_seq = ssk->write_seq;
+}
+
+static inline int select_size(struct sock *sk, struct sdp_sock *ssk)
+{
+ return 0;
+}
+
+static inline void sdp_push(struct sock *sk, struct sdp_sock *ssk, int flags,
+ int mss_now, int nonagle)
+{
+ sdp_post_sends(ssk, nonagle);
+}
+
+static inline void skb_entail(struct sock *sk, struct sdp_sock *ssk,
+ struct sk_buff *skb)
+{
+ skb_header_release(skb);
+ __skb_queue_tail(&sk->sk_write_queue, skb);
+ sk_charge_skb(sk, skb);
+ if (!sk->sk_send_head)
+ sk->sk_send_head = skb;
+ if (ssk->nonagle & TCP_NAGLE_PUSH)
+ ssk->nonagle &= ~TCP_NAGLE_PUSH;
+}
+
+void sdp_push_one(struct sock *sk, unsigned int mss_now)
+{
+}
+
+/* Like tcp_sendmsg */
+/* TODO: check locking */
+#define TCP_PAGE(sk) (sk->sk_sndmsg_page)
+#define TCP_OFF(sk) (sk->sk_sndmsg_off)
+int sdp_sendmsg(struct kiocb *iocb, struct sock *sk, struct msghdr *msg,
+ size_t size)
+{
+ struct iovec *iov;
+ struct sdp_sock *ssk = sdp_sk(sk);
+ struct sk_buff *skb;
+ int iovlen, flags;
+ int mss_now, size_goal;
+ int err, copied;
+ long timeo;
+
+ lock_sock(sk);
+ sdp_dbg(sk, "%s\n", __func__);
+
+ flags = msg->msg_flags;
+ timeo = sock_sndtimeo(sk, flags & MSG_DONTWAIT);
+
+ /* Wait for a connection to finish. */
+ if ((1 << sk->sk_state) & ~(TCPF_ESTABLISHED | TCPF_CLOSE_WAIT))
+ if ((err = sk_stream_wait_connect(sk, &timeo)) != 0)
+ goto out_err;
+
+ /* This should be in poll */
+ clear_bit(SOCK_ASYNC_NOSPACE, &sk->sk_socket->flags);
+
+ mss_now = sdp_current_mss(sk, !(flags&MSG_OOB));
+ size_goal = ssk->xmit_size_goal;
+
+ /* Ok commence sending. */
+ iovlen = msg->msg_iovlen;
+ iov = msg->msg_iov;
+ copied = 0;
+
+ err = -EPIPE;
+ if (sk->sk_err || (sk->sk_shutdown & SEND_SHUTDOWN))
+ goto do_error;
+
+ while (--iovlen >= 0) {
+ int seglen = iov->iov_len;
+ unsigned char __user *from = iov->iov_base;
+
+ iov++;
+
+ while (seglen > 0) {
+ int copy;
+
+ skb = sk->sk_write_queue.prev;
+
+ if (!sk->sk_send_head ||
+ (copy = size_goal - skb->len) <= 0) {
+
+new_segment:
+ /* Allocate new segment. If the interface is SG,
+ * allocate skb fitting to single page.
+ */
+ if (!sk_stream_memory_free(sk))
+ goto wait_for_sndbuf;
+
+ skb = sk_stream_alloc_pskb(sk, select_size(sk, ssk),
+ 0, sk->sk_allocation);
+ if (!skb)
+ goto wait_for_memory;
+
+ /*
+ * Check whether we can use HW checksum.
+ */
+ if (sk->sk_route_caps &
+ (NETIF_F_IP_CSUM | NETIF_F_NO_CSUM |
+ NETIF_F_HW_CSUM))
+ skb->ip_summed = CHECKSUM_HW;
+
+ skb_entail(sk, ssk, skb);
+ copy = size_goal;
+ }
+
+ /* Try to append data to the end of skb. */
+ if (copy > seglen)
+ copy = seglen;
+
+ /* Where to copy to? */
+ if (skb_tailroom(skb) > 0) {
+ /* We have some space in skb head. Superb! */
+ if (copy > skb_tailroom(skb))
+ copy = skb_tailroom(skb);
+ if ((err = skb_add_data(skb, from, copy)) != 0)
+ goto do_fault;
+ } else {
+ int merge = 0;
+ int i = skb_shinfo(skb)->nr_frags;
+ struct page *page = TCP_PAGE(sk);
+ int off = TCP_OFF(sk);
+
+ if (skb_can_coalesce(skb, i, page, off) &&
+ off != PAGE_SIZE) {
+ /* We can extend the last page
+ * fragment. */
+ merge = 1;
+ } else if (i == SDP_MAX_SEND_SKB_FRAGS ||
+ (!i &&
+ !(sk->sk_route_caps & NETIF_F_SG))) {
+ /* Need to add new fragment and cannot
+ * do this because interface is non-SG,
+ * or because all the page slots are
+ * busy. */
+ sdp_mark_push(ssk, skb);
+ goto new_segment;
+ } else if (page) {
+ if (off == PAGE_SIZE) {
+ put_page(page);
+ TCP_PAGE(sk) = page = NULL;
+ off = 0;
+ }
+ } else
+ off = 0;
+
+ if (copy > PAGE_SIZE - off)
+ copy = PAGE_SIZE - off;
+
+ if (!sk_stream_wmem_schedule(sk, copy))
+ goto wait_for_memory;
+
+ if (!page) {
+ /* Allocate new cache page. */
+ if (!(page = sk_stream_alloc_page(sk)))
+ goto wait_for_memory;
+ }
+
+ /* Time to copy data. We are close to
+ * the end! */
+ err = skb_copy_to_page(sk, from, skb, page,
+ off, copy);
+ if (err) {
+ /* If this page was new, give it to the
+ * socket so it does not get leaked.
+ */
+ if (!TCP_PAGE(sk)) {
+ TCP_PAGE(sk) = page;
+ TCP_OFF(sk) = 0;
+ }
+ goto do_error;
+ }
+
+ /* Update the skb. */
+ if (merge) {
+ skb_shinfo(skb)->frags[i - 1].size +=
+ copy;
+ } else {
+ skb_fill_page_desc(skb, i, page, off, copy);
+ if (TCP_PAGE(sk)) {
+ get_page(page);
+ } else if (off + copy < PAGE_SIZE) {
+ get_page(page);
+ TCP_PAGE(sk) = page;
+ }
+ }
+
+ TCP_OFF(sk) = off + copy;
+ }
+
+ if (!copied)
+ TCP_SKB_CB(skb)->flags &= ~TCPCB_FLAG_PSH;
+
+ ssk->write_seq += copy;
+ TCP_SKB_CB(skb)->end_seq += copy;
+ skb_shinfo(skb)->tso_segs = 0;
+
+ from += copy;
+ copied += copy;
+ if ((seglen -= copy) == 0 && iovlen == 0)
+ goto out;
+
+ if (skb->len < mss_now || (flags & MSG_OOB))
+ continue;
+
+ if (forced_push(ssk)) {
+ sdp_mark_push(ssk, skb);
+ /* TODO: and push pending frames mss_now */
+ /* sdp_push_pending(sk, ssk, mss_now, TCP_NAGLE_PUSH); */
+ } else if (skb == sk->sk_send_head)
+ sdp_push_one(sk, mss_now);
+ continue;
+
+wait_for_sndbuf:
+ set_bit(SOCK_NOSPACE, &sk->sk_socket->flags);
+wait_for_memory:
+ if (copied)
+ sdp_push(sk, ssk, flags & ~MSG_MORE, mss_now, TCP_NAGLE_PUSH);
+
+ if ((err = sk_stream_wait_memory(sk, &timeo)) != 0)
+ goto do_error;
+
+ mss_now = sdp_current_mss(sk, !(flags&MSG_OOB));
+ size_goal = ssk->xmit_size_goal;
+ }
+ }
+
+out:
+ if (copied)
+ sdp_push(sk, ssk, flags, mss_now, ssk->nonagle);
+ release_sock(sk);
+ return copied;
+
+do_fault:
+ if (!skb->len) {
+ if (sk->sk_send_head == skb)
+ sk->sk_send_head = NULL;
+ __skb_unlink(skb, &sk->sk_write_queue);
+ sk_stream_free_skb(sk, skb);
+ }
+
+do_error:
+ if (copied)
+ goto out;
+out_err:
+ err = sk_stream_error(sk, flags, err);
+ release_sock(sk);
+ return err;
+}
+
+/* Like tcp_recvmsg */
+/* Maybe use skb_recv_datagram here? */
+/* Note this does not seem to handle vectored messages. Relevant? */
+static int sdp_recvmsg(struct kiocb *iocb, struct sock *sk, struct msghdr *msg,
+ size_t len, int noblock, int flags,
+ int *addr_len)
+{
+ struct sk_buff *skb = NULL;
+ long timeo;
+ int target;
+ unsigned long used;
+ int err;
+ int offset = sdp_sk(sk)->offset;
+ int copied = 0;
+ int urg_data = 0;
+
+ lock_sock(sk);
+ sdp_dbg(sk, "%s\n", __func__);
+
+ err = -ENOTCONN;
+ if (sk->sk_state == TCP_LISTEN)
+ goto out;
+
+ timeo = sock_rcvtimeo(sk, noblock);
+ /* Urgent data needs to be handled specially. */
+ if (flags & MSG_OOB)
+ goto recv_urg;
+
+ target = sock_rcvlowat(sk, flags & MSG_WAITALL, len);
+
+ do {
+
+ /* Are we at urgent data? Stop if we have read anything or have SIGURG pending. */
+ if (urg_data) {
+ if (copied)
+ break;
+ if (signal_pending(current)) {
+ copied = timeo ? sock_intr_errno(timeo) : -EAGAIN;
+ break;
+ }
+ }
+
+ skb = skb_peek(&sk->sk_receive_queue);
+ if (skb) {
+ if (skb->h.raw[0] == SDP_MID_DISCONN)
+ goto found_fin_ok;
+ goto found_ok_skb;
+ }
+
+ if (copied >= target)
+ break;
+
+ if (copied) {
+ if (sk->sk_err ||
+ sk->sk_state == TCP_CLOSE ||
+ (sk->sk_shutdown & RCV_SHUTDOWN) ||
+ !timeo ||
+ signal_pending(current) ||
+ (flags & MSG_PEEK))
+ break;
+ } else {
+ if (sock_flag(sk, SOCK_DONE))
+ break;
+
+ if (sk->sk_err) {
+ copied = sock_error(sk);
+ break;
+ }
+
+ if (sk->sk_shutdown & RCV_SHUTDOWN)
+ break;
+
+ if (sk->sk_state == TCP_CLOSE) {
+ if (!sock_flag(sk, SOCK_DONE)) {
+ /* This occurs when user tries to read
+ * from never connected socket.
+ */
+ copied = -ENOTCONN;
+ break;
+ }
+ break;
+ }
+
+ if (!timeo) {
+ copied = -EAGAIN;
+ break;
+ }
+
+ if (signal_pending(current)) {
+ copied = sock_intr_errno(timeo);
+ break;
+ }
+ }
+
+ if (copied >= target) {
+ /* Do not sleep, just process backlog. */
+ release_sock(sk);
+ lock_sock(sk);
+ } else {
+ sdp_dbg(sk, "%s: sk_wait_data %ld\n", __func__, timeo);
+ sk_wait_data(sk, &timeo);
+ }
+ continue;
+
+ found_ok_skb:
+ sdp_dbg(sk, "%s: found_ok_skb len %d\n", __func__, skb->len);
+ sdp_dbg(sk, "%s: len %Zd offset %d\n", __func__, len, offset);
+ sdp_dbg(sk, "%s: copied %d target %d\n", __func__, copied, target);
+ urg_data = sdp_has_urgent_data(skb);
+ used = skb->len - offset;
+ if (len < used)
+ used = len;
+
+ sdp_dbg(sk, "%s: used %ld\n", __func__, used);
+
+ if (!(flags & MSG_TRUNC)) {
+ int err;
+ err = skb_copy_datagram_iovec(skb, offset,
+ /* TODO: skip header? */
+ msg->msg_iov, used);
+ if (err) {
+ sdp_dbg(sk, "%s: skb_copy_datagram_iovec failed"
+ "offset %d size %ld status %d\n",
+ __func__, offset, used, err);
+ /* Exception. Bailout! */
+ if (!copied)
+ copied = -EFAULT;
+ break;
+ }
+ }
+
+ copied += used;
+ len -= used;
+ offset += used;
+ sdp_dbg(sk, "%s: done copied %d target %d\n", __func__, copied, target);
+
+ sdp_rcv_space_adjust(sk);
+
+ if (offset < skb->len)
+ continue; /* TODO: break? */
+
+ if (!(flags & MSG_PEEK))
+ sk_eat_skb(sk, skb, 0);
+
+ offset = 0;
+ skb = NULL;
+
+ continue;
+found_fin_ok:
+ if (!(flags & MSG_PEEK))
+ sk_eat_skb(sk, skb, 0);
+
+ offset = 0;
+ skb = NULL;
+ break;
+ } while (len > 0);
+
+ sdp_sk(sk)->offset = skb && !(flags & MSG_PEEK) ? offset : 0;
+
+ release_sock(sk);
+ return copied;
+
+out:
+ release_sock(sk);
+ return err;
+
+recv_urg:
+ err = sdp_recv_urg(sk, timeo, msg, len, flags, addr_len);
+ goto out;
+}
+
+static int sdp_listen(struct sock *sk, int backlog)
+{
+ struct sdp_sock *ssk = sdp_sk(sk);
+ int rc;
+
+ sdp_dbg(sk, "%s\n", __func__);
+
+ if (!ssk->id) {
+ rc = sdp_get_port(sk, 0);
+ if (rc)
+ return rc;
+ }
+
+ rc = rdma_listen(ssk->id, backlog);
+ if (rc) {
+ sdp_warn(sk, "rdma_listen failed: %d\n", rc);
+ sdp_set_error(sk, rc);
+ } else
+ sk->sk_state = TCP_LISTEN;
+ return rc;
+}
+
+/* We almost could use inet_listen, but that calls
+ inet_csk_listen_start. Longer term we'll want to add
+ a listen callback to struct proto, similiar to bind. */
+int sdp_inet_listen(struct socket *sock, int backlog)
+{
+ struct sock *sk = sock->sk;
+ unsigned char old_state;
+ int err;
+
+ lock_sock(sk);
+
+ err = -EINVAL;
+ if (sock->state != SS_UNCONNECTED)
+ goto out;
+
+ old_state = sk->sk_state;
+ if (!((1 << old_state) & (TCPF_CLOSE | TCPF_LISTEN)))
+ goto out;
+
+ /* Really, if the socket is already in listen state
+ * we can only allow the backlog to be adjusted.
+ */
+ if (old_state != TCP_LISTEN) {
+ err = sdp_listen(sk, backlog);
+ if (err)
+ goto out;
+ }
+ sk->sk_max_ack_backlog = backlog;
+ err = 0;
+
+out:
+ release_sock(sk);
+ return err;
+}
+
+static void sdp_unhash(struct sock *sk)
+{
+ sdp_dbg(sk, "%s\n", __func__);
+}
+
+static inline unsigned int sdp_listen_poll(const struct sock *sk)
+{
+ return !list_empty(&sdp_sk(sk)->accept_queue) ?
+ (POLLIN | POLLRDNORM) : 0;
+}
+
+static unsigned int sdp_poll(struct file *file, struct socket *socket,
+ struct poll_table_struct *wait)
+{
+ int mask;
+ sdp_dbg(socket->sk, "%s\n", __func__);
+
+ mask = datagram_poll(file, socket, wait);
+ /* TODO: Slightly ugly: it would be nicer if there was function
+ * like datagram_poll that didn't include poll_wait,
+ * then we could reverse the order. */
+ if (socket->sk->sk_state == TCP_LISTEN)
+ return sdp_listen_poll(socket->sk);
+
+ if (sdp_sk(socket->sk)->urg_data & TCP_URG_VALID)
+ mask |= POLLPRI;
+ return mask;
+}
+
+static void sdp_enter_memory_pressure(void)
+{
+ sdp_dbg(NULL, "%s\n", __func__);
+}
+
+static atomic_t sockets_allocated;
+static atomic_t memory_allocated;
+static atomic_t orphan_count;
+static int memory_pressure;
+struct proto sdp_proto = {
+ .close = sdp_close,
+ .connect = sdp_connect,
+ .disconnect = sdp_disconnect,
+ .accept = sdp_accept,
+ .ioctl = sdp_ioctl,
+ .init = sdp_init_sock,
+ .shutdown = sdp_shutdown,
+ .setsockopt = sdp_setsockopt,
+ .getsockopt = sdp_getsockopt,
+ .sendmsg = sdp_sendmsg,
+ .recvmsg = sdp_recvmsg,
+ .unhash = sdp_unhash,
+ .get_port = sdp_get_port,
+ /* Wish we had this: .listen = sdp_listen */
+ .enter_memory_pressure = sdp_enter_memory_pressure,
+ .sockets_allocated = &sockets_allocated,
+ .memory_allocated = &memory_allocated,
+ .memory_pressure = &memory_pressure,
+ .orphan_count = &orphan_count,
+ .sysctl_mem = sysctl_tcp_mem,
+ .sysctl_wmem = sysctl_tcp_wmem,
+ .sysctl_rmem = sysctl_tcp_rmem,
+ .max_header = sizeof(struct sdp_bsdh),
+ .obj_size = sizeof(struct sdp_sock),
+ .owner = THIS_MODULE,
+ .name = "SDP",
+};
+
+static struct proto_ops sdp_proto_ops = {
+ .family = PF_INET,
+ .owner = THIS_MODULE,
+ .release = inet_release,
+ .bind = inet_bind,
+ .connect = inet_stream_connect, /* TODO: inet_datagram connect would
+ autobind, but need to fix get_port
+ with port 0 first. */
+ .socketpair = sock_no_socketpair,
+ .accept = inet_accept,
+ .getname = inet_getname,
+ .poll = sdp_poll,
+ .ioctl = inet_ioctl,
+ .listen = sdp_inet_listen,
+ .shutdown = inet_shutdown,
+ .setsockopt = sock_common_setsockopt,
+ .getsockopt = sock_common_getsockopt,
+ .sendmsg = inet_sendmsg,
+ .recvmsg = sock_common_recvmsg,
+ .mmap = sock_no_mmap,
+ .sendpage = sock_no_sendpage,
+};
+
+static int sdp_create_socket(struct socket *sock, int protocol)
+{
+ struct sock *sk;
+ int rc;
+
+ sdp_dbg(NULL, "%s: type %d protocol %d\n", __func__, sock->type, protocol);
+
+ if (sock->type != SOCK_STREAM) {
+ sdp_warn(NULL, "SDP: unsupported type %d.\n", sock->type);
+ return -ESOCKTNOSUPPORT;
+ }
+
+ /* IPPROTO_IP is a wildcard match */
+ if (protocol != IPPROTO_TCP && protocol != IPPROTO_IP) {
+ sdp_warn(NULL, "SDP: unsupported protocol %d.\n", protocol);
+ return -EPROTONOSUPPORT;
+ }
+
+ sk = sk_alloc(PF_INET_SDP, GFP_KERNEL, &sdp_proto, 1);
+ if (!sk) {
+ sdp_warn(NULL, "SDP: failed to allocate socket.\n");
+ return -ENOMEM;
+ }
+ sock_init_data(sock, sk);
+ sk->sk_protocol = 0x0 /* TODO: inherit tcp socket to use IPPROTO_TCP */;
+
+ rc = sdp_init_sock(sk);
+ if (rc) {
+ sdp_warn(sk, "SDP: failed to init sock.\n");
+ sk_common_release(sk);
+ return -ENOMEM;
+ }
+
+ sk->sk_destruct = sdp_destruct;
+
+ sock->ops = &sdp_proto_ops;
+ sock->state = SS_UNCONNECTED;
+ return 0;
+}
+
+static struct net_proto_family sdp_net_proto = {
+ .family = AF_INET_SDP,
+ .create = sdp_create_socket,
+ .owner = THIS_MODULE,
+};
+
+static int __init sdp_init(void)
+{
+ int rc;
+
+ sdp_workqueue = create_singlethread_workqueue("sdp");
+ if (!sdp_workqueue) {
+ return -ENOMEM;
+ }
+
+ rc = proto_register(&sdp_proto, 1);
+ if (rc) {
+ printk(KERN_WARNING "%s: proto_register failed: %d\n", __func__, rc);
+ destroy_workqueue(sdp_workqueue);
+ return rc;
+ }
+
+ rc = sock_register(&sdp_net_proto);
+ if (rc) {
+ printk(KERN_WARNING "%s: sock_register failed: %d\n", __func__, rc);
+ proto_unregister(&sdp_proto);
+ destroy_workqueue(sdp_workqueue);
+ return rc;
+ }
+
+ return 0;
+}
+
+static void __exit sdp_exit(void)
+{
+ sock_unregister(PF_INET_SDP);
+ proto_unregister(&sdp_proto);
+
+ if (atomic_read(&orphan_count))
+ printk(KERN_WARNING "%s: orphan_count %d\n", __func__,
+ atomic_read(&orphan_count));
+ destroy_workqueue(sdp_workqueue);
+ flush_scheduled_work();
+}
+
+module_init(sdp_init);
+module_exit(sdp_exit);