#define _LINUX_RDS_H
#include <linux/types.h>
+#include <net/sock.h>
/* These sparse annotated types shouldn't be in any user
* visible header file. We should clean this up rather
*/
#define SIOCRDSSETTOS (SIOCPROTOPRIVATE)
#define SIOCRDSGETTOS (SIOCPROTOPRIVATE + 1)
+#define SIOCRDSENABLENETFILTER (SIOCPROTOPRIVATE + 2)
-typedef u_int8_t rds_tos_t;
+#define IPPROTO_OKA (142)
+typedef u_int8_t rds_tos_t;
/*
* Control message types for SOL_RDS.
#define RDS_RDMA_REMOTE_COMPLETE 0x0080 /* Notify when data is available */
#define RDS_SEND_NOTIFY_ME 0x0100 /* Notify when operation completes */
+/* netfilter related components */
+struct rds_nf_hdr {
+ __be32 saddr; /* source address of request */
+ __be32 daddr; /* destination address */
+ __be16 sport; /* source port number */
+ __be16 dport; /* destination port number */
+ __be16 protocol; /* rds socket protocol family to use */
+
+#define RDS_NF_HDR_FLAG_BOTH (0x1) /* request needs to go locally and remote */
+#define RDS_NF_HDR_FLAG_DONE (0x2) /* the request is consumed and done */
+ __be16 flags; /* any configuration flags */
+ struct sock *sk;
+};
+
+/* pull out the 2 rdshdr from the SKB structures passed around */
+#define rds_nf_hdr_dst(skb) (&(((struct rds_nf_hdr *)skb_tail_pointer((skb)))[0]))
+#define rds_nf_hdr_org(skb) (&(((struct rds_nf_hdr *)skb_tail_pointer((skb)))[1]))
+
+/* temporary hack for a family that exists in the netfilter family */
+#define PF_RDS_HOOK 11
+
+enum rds_inet_hooks {
+ NF_RDS_PRE_ROUTING,
+ NF_RDS_FORWARD_ERROR,
+ NF_RDS_NUMHOOKS
+};
+
+enum rds_hook_priorities {
+ NF_RDS_PRI_FIRST = INT_MIN,
+ NF_RDS_PRI_OKA = 0,
+ NF_RDS_PRI_LAST = INT_MAX
+};
+
#endif /* IB_RDS_H */
if (put_user(tos, (rds_tos_t __user *)arg))
return -EFAULT;
break;
+ case SIOCRDSENABLENETFILTER:
+ spin_lock_bh(&rds_sock_lock);
+ rs->rs_netfilter_enabled = 1;
+ spin_unlock_bh(&rds_sock_lock);
+ break;
default:
return -ENOIOCTLCMD;
}
rs->poison = 0xABABABAB;
rs->rs_tos = 0;
rs->rs_conn = 0;
+ rs->rs_netfilter_enabled = 0;
if (rs->rs_bound_addr)
printk(KERN_CRIT "bound addr %x at create\n", rs->rs_bound_addr);
{
struct sock *sk;
- if (sock->type != SOCK_SEQPACKET || protocol)
+ if (sock->type != SOCK_SEQPACKET ||
+ (protocol && IPPROTO_OKA != protocol))
return -ESOCKTNOSUPPORT;
sk = sk_alloc(net, AF_RDS, GFP_KERNEL, &rds_proto);
WARN_ON(1);
}
rs->poison = 0xDEADBEEF;
+ rs->rs_netfilter_enabled = 0;
sk_free(sk);
}
}
.conn_shutdown = rds_ib_conn_shutdown,
.inc_copy_to_user = rds_ib_inc_copy_to_user,
.inc_free = rds_ib_inc_free,
+ .inc_to_skb = rds_ib_inc_to_skb,
+ .skb_local = rds_skb_local,
.cm_initiate_connect = rds_ib_cm_initiate_connect,
.cm_handle_connect = rds_ib_cm_handle_connect,
.cm_connect_complete = rds_ib_cm_connect_complete,
.t_type = RDS_TRANS_IB
};
+int rds_ib_inc_to_skb(struct rds_incoming *inc, struct sk_buff *skb)
+{
+ skb_frag_t *frag;
+ int ret = 0, slen;
+ u32 len;
+ int i;
+ struct rds_ib_incoming *ibinc;
+ struct rds_page_frag *ibfrag;
+
+ /* pull out initial pointers */
+ ibinc = container_of(inc, struct rds_ib_incoming, ii_inc);
+ ibfrag = list_entry(ibinc->ii_frags.next, struct rds_page_frag, f_item);
+ len = be32_to_cpu(inc->i_hdr.h_len);
+ slen = len;
+ i = 0;
+
+ /* run through the entire ib fragment list and save off the buffers */
+ while (NULL != ibfrag && slen > 0) {
+ /* one to one mapping of frags to sg structures */
+ frag = &skb_shinfo(skb)->frags[i];
+
+ /* save off all the sg pieces to the skb frags we are creating */
+ frag->size = ibfrag->f_sg.length;
+ frag->page_offset = ibfrag->f_sg.offset;
+ frag->page.p = sg_page(&ibfrag->f_sg);
+
+ /* AA: do we need to bump up the page reference */
+ /* get_page(frag->page); */
+
+ /* dec the amount of data we are consuming */
+ slen -= frag->size;
+
+ /* bump to the next entry */
+ ibfrag = list_entry(ibfrag->f_item.next, struct rds_page_frag, f_item);
+ i++;
+
+ /* for now we will only have a single chain of fragments in the skb */
+ if (i >= MAX_SKB_FRAGS) {
+ rdsdebug("too many fragments in op %u > max %u, skb %p",
+ i, (int)MAX_SKB_FRAGS, skb);
+ goto done;
+ }
+ }
+
+ /* track the full message length too */
+ skb->len = len;
+
+ /* all good */
+ ret = 1;
+
+done:
+ /* track all the fragments we saved */
+ skb_shinfo(skb)->nr_frags = i;
+
+ return ret;
+}
+
MODULE_LICENSE("GPL");
.conn_shutdown = rds_iw_conn_shutdown,
.inc_copy_to_user = rds_iw_inc_copy_to_user,
.inc_free = rds_iw_inc_free,
+ .skb_local = rds_skb_local,
.cm_initiate_connect = rds_iw_cm_initiate_connect,
.cm_handle_connect = rds_iw_cm_handle_connect,
.cm_connect_complete = rds_iw_cm_connect_complete,
{
}
+static int rds_message_skb_local(struct sk_buff *skb)
+{
+ struct rds_nf_hdr *dst, *org;
+
+ /* pull out the headers */
+ dst = rds_nf_hdr_dst(skb);
+ org = rds_nf_hdr_org(skb);
+
+ /* assuming original and dest are exactly the same then it's our own node */
+ if (dst->daddr == org->daddr && dst->saddr == org->saddr &&
+ dst->sport == org->sport && dst->dport == org->dport) {
+ return 1;
+ }
+ /* otherwise, the sport/dport have likely swapped so consider
+ * it a different node */
+ else {
+ return 0;
+ }
+}
+
void rds_loop_exit(void)
{
struct rds_loop_connection *lc, *_lc;
.conn_connect = rds_loop_conn_connect,
.conn_shutdown = rds_loop_conn_shutdown,
.inc_copy_to_user = rds_message_inc_copy_to_user,
+ .inc_to_skb = rds_message_inc_to_skb,
+ .skb_local = rds_message_skb_local,
.inc_free = rds_loop_inc_free,
.t_name = "loopback",
};
return rm;
}
-int rds_message_copy_from_user(struct rds_message *rm, struct iov_iter *from)
+int rds_message_copy_from_user(struct rds_message *rm, struct iov_iter *from,
+ gfp_t gfp)
{
unsigned long to_copy, nbytes;
unsigned long sg_off;
while (iov_iter_count(from)) {
if (!sg_page(sg)) {
ret = rds_page_remainder_alloc(sg, iov_iter_count(from),
- GFP_HIGHUSER);
+ GFP_ATOMIC == gfp ?
+ gfp : GFP_HIGHUSER);
+
if (ret)
return ret;
rm->data.op_nents++;
}
EXPORT_SYMBOL_GPL(rds_message_unmapped);
+int rds_message_inc_to_skb(struct rds_incoming *inc, struct sk_buff *skb)
+{
+ struct rds_message *rm;
+ struct scatterlist *sg;
+ skb_frag_t *frag;
+ int ret = 0;
+ u32 len;
+ int i;
+
+ rm = container_of(inc, struct rds_message, m_inc);
+ len = be32_to_cpu(rm->m_inc.i_hdr.h_len);
+ i = 0;
+
+ /* for now we will only have a single chain of fragments in the skb */
+ if (rm->data.op_nents >= MAX_SKB_FRAGS) {
+ rdsdebug("too many fragments in op %u > max %u, rm %p",
+ rm->data.op_nents, (int)MAX_SKB_FRAGS, rm);
+ goto done;
+ }
+
+ /* run through the entire scatter gather list and save off the buffers */
+ for (i = 0; i < rm->data.op_nents; i++) {
+ /* one to one mapping of frags to sg structures */
+ frag = &skb_shinfo(skb)->frags[i];
+ sg = &rm->data.op_sg[i];
+
+ /* save off all the sg pieces to the skb frags we are creating */
+ frag->size = sg->length;
+ frag->page_offset = sg->offset;
+ frag->page.p = sg_page(sg);
+
+ /* AA: do we need to bump up the page reference too */
+ /* get_page(frag->page); */
+ }
+
+ /* track the full message length too */
+ skb->len = len;
+
+ /* all good */
+ ret = 1;
+
+done:
+ /* track all the fragments we saved */
+ skb_shinfo(skb)->nr_frags = i;
+
+ return ret;
+}
unsigned long ret;
void *addr;
+ /* AA: can this be removed as sometimes it gives false negative - this doesn't
+ * exist in the OFA 1.5.3 code line */
+#if 0
if (to_user)
ret = access_ok(VERIFY_WRITE, ptr, bytes);
else
if (!ret)
return -EFAULT;
+#endif
if (to_user)
rds_stats_add(s_copy_to_user, bytes);
unsigned long i_rx_jiffies;
__be32 i_saddr;
+ /* extension fields for dealing with netfilter */
+ struct rds_connection *i_oconn;
+ struct sk_buff *i_skb;
+
rds_rdma_cookie_t i_rdma_cookie;
};
int (*inc_copy_to_user)(struct rds_incoming *inc, struct iov_iter *to);
void (*inc_free)(struct rds_incoming *inc);
+ int (*inc_to_skb)(struct rds_incoming *inc, struct sk_buff *skb);
+ int (*skb_local)(struct sk_buff *skb);
+
int (*cm_handle_connect)(struct rdma_cm_id *cm_id,
struct rdma_cm_event *event);
int (*cm_initiate_connect)(struct rdma_cm_id *cm_id);
/* Socket options - in case there will be more */
unsigned char rs_recverr,
rs_cong_monitor;
- int poison;
+ int poison;
+ int rs_netfilter_enabled;
u8 rs_tos;
};
/* message.c */
struct rds_message *rds_message_alloc(unsigned int nents, gfp_t gfp);
struct scatterlist *rds_message_alloc_sgs(struct rds_message *rm, int nents);
-int rds_message_copy_from_user(struct rds_message *rm, struct iov_iter *from);
+int rds_message_copy_from_user(struct rds_message *rm, struct iov_iter *from,
+ gfp_t gfp);
struct rds_message *rds_message_map_pages(unsigned long *page_addrs, unsigned int total_len);
void rds_message_populate_header(struct rds_header *hdr, __be16 sport,
__be16 dport, u64 seq);
int rds_message_add_rdma_dest_extension(struct rds_header *hdr, u32 r_key, u32 offset);
int rds_message_inc_copy_to_user(struct rds_incoming *inc, struct iov_iter *to);
void rds_message_inc_free(struct rds_incoming *inc);
+int rds_message_inc_to_skb(struct rds_incoming *inc, struct sk_buff *skb);
void rds_message_addref(struct rds_message *rm);
void rds_message_put(struct rds_message *rm);
void rds_message_wait(struct rds_message *rm);
void rds_inc_info_copy(struct rds_incoming *inc,
struct rds_info_iterator *iter,
__be32 saddr, __be32 daddr, int flip);
+int rds_skb_local(struct sk_buff *skb);
/* send.c */
int rds_sendmsg(struct socket *sock, struct msghdr *msg, size_t payload_len);
int rds_send_hb(struct rds_connection *conn, int response);
struct rds_message *rds_send_get_message(struct rds_connection *,
struct rm_rdma_op *);
+int rds_send_internal(struct rds_connection *conn, struct rds_sock *rs,
+ struct sk_buff *skb, gfp_t gfp);
extern unsigned int rds_async_send_enabled;
int rds_trans_init(void);
void rds_trans_exit(void);
+/* ib.c */
+int rds_ib_inc_to_skb(struct rds_incoming *inc, struct sk_buff *skb);
+
#endif
#include <linux/kernel.h>
#include <net/sock.h>
#include <linux/in.h>
+#include <linux/ip.h>
+#include <linux/netfilter.h>
#include "rds.h"
+#include "tcp.h"
+
+/* forward prototypes */
+static void
+rds_recv_drop(struct rds_connection *conn, __be32 saddr, __be32 daddr,
+ struct rds_incoming *inc, gfp_t gfp);
+
+static void
+rds_recv_route(struct rds_connection *conn, struct rds_incoming *inc,
+ gfp_t gfp);
+
+static void
+rds_recv_forward(struct rds_connection *conn, struct rds_incoming *inc,
+ gfp_t gfp);
+
+static void
+rds_recv_local(struct rds_connection *conn, __be32 saddr, __be32 daddr,
+ struct rds_incoming *inc, gfp_t gfp);
+
+static int
+rds_recv_ok(struct sock *sk, struct sk_buff *skb)
+{
+ /* don't do anything here, just continue along */
+ return NF_ACCEPT;
+}
void rds_inc_init(struct rds_incoming *inc, struct rds_connection *conn,
__be32 saddr)
inc->i_conn = conn;
inc->i_saddr = saddr;
inc->i_rdma_cookie = 0;
+ inc->i_oconn = NULL;
+ inc->i_skb = NULL;
}
EXPORT_SYMBOL_GPL(rds_inc_init);
if (atomic_dec_and_test(&inc->i_refcount)) {
BUG_ON(!list_empty(&inc->i_item));
+ /* free up the skb if any were created */
+ if (NULL != inc->i_skb) {
+ /* wipe out any fragments so they don't get released */
+ skb_shinfo(inc->i_skb)->nr_frags = 0;
+
+ /* and free the whole skb */
+ kfree_skb(inc->i_skb);
+ inc->i_skb = NULL;
+ }
+
inc->i_conn->c_trans->inc_free(inc);
}
}
*/
void rds_recv_incoming(struct rds_connection *conn, __be32 saddr, __be32 daddr,
struct rds_incoming *inc, gfp_t gfp)
+{
+ struct sk_buff *skb;
+ struct rds_sock *rs;
+ struct sock *sk;
+ struct rds_nf_hdr *dst, *org;
+ int ret;
+
+ rdsdebug(KERN_ALERT "incoming: conn %p, inc %p, %u.%u.%u.%u : %d -> %u.%u.%u.%u : %d\n",
+ conn, inc, NIPQUAD(saddr), inc->i_hdr.h_sport, NIPQUAD(daddr), inc->i_hdr.h_dport);
+
+ /* initialize some globals */
+ rs = NULL;
+ sk = NULL;
+
+ /* save off the original connection against which the request arrived */
+ inc->i_oconn = conn;
+ inc->i_skb = NULL;
+
+ /* lets find a socket to which this request belongs */
+ rs = rds_find_bound(daddr, inc->i_hdr.h_dport);
+
+ /* pass it on locally if there is no socket bound, or if netfilter is
+ * disabled for this socket */
+ if (NULL == rs || !rs->rs_netfilter_enabled) {
+ /* drop the reference if we had taken one */
+ if (NULL != rs)
+ rds_sock_put(rs);
+
+ rds_recv_local(conn, saddr, daddr, inc, gfp);
+ return;
+ }
+
+ /* otherwise pull out the socket */
+ sk = rds_rs_to_sk(rs);
+
+ /* create an skb with some additional space to store our rds_nf_hdr info */
+ skb = alloc_skb(sizeof(struct rds_nf_hdr) * 2, gfp);
+ if (NULL == skb) {
+ /* if we have allocation problems, then we just need to depart */
+ rdsdebug("failure to allocate space for inc %p, %u.%u.%u.%u -> %u.%d.%u.%u\n",
+ inc, NIPQUAD(saddr), NIPQUAD(daddr));
+ rds_recv_local(conn, saddr, daddr, inc, gfp);
+ return;
+ }
+
+ /* once we've allocated an skb, also store it in our structures */
+ inc->i_skb = skb;
+
+ /* now pull out the rds headers */
+ dst = rds_nf_hdr_dst(skb);
+ org = rds_nf_hdr_org(skb);
+
+ /* now update our rds_nf_hdr for tracking locations of the request */
+ dst->saddr = saddr;
+ dst->daddr = daddr;
+ dst->sport = inc->i_hdr.h_sport;
+ dst->dport = inc->i_hdr.h_dport;
+ dst->flags = 0;
+
+ /* assign the appropriate protocol if any */
+ if (NULL != sk) {
+ dst->protocol = sk->sk_protocol;
+ dst->sk = sk;
+ } else {
+ dst->protocol = 0;
+ dst->sk = NULL;
+ }
+
+ /* cleanup any references taken */
+ if (NULL != rs)
+ rds_sock_put(rs);
+
+ /* the original info is just a copy */
+ memcpy(org, dst, sizeof(struct rds_nf_hdr));
+
+ /* convert our local data structures in the message to a generalized skb form */
+ if (conn->c_trans->inc_to_skb(inc, skb)) {
+ rdsdebug("handing off to PRE_ROUTING hook\n");
+ /* call down through the hook layers */
+ ret = NF_HOOK(PF_RDS_HOOK, NF_RDS_PRE_ROUTING, sk, skb, NULL, NULL, rds_recv_ok);
+ }
+ /* if we had a failure to convert, then just assuming to continue as local */
+ else {
+ rdsdebug("failed to create skb form, conn %p, inc %p, %u.%u.%u.%u -> %u.%u.%u.%u\n",
+ conn, inc, NIPQUAD(saddr), NIPQUAD(daddr));
+ ret = 1;
+ }
+
+ /* pull back out the rds headers */
+ dst = rds_nf_hdr_dst(skb);
+ org = rds_nf_hdr_org(skb);
+
+ /* now depending upon we got back we can perform appropriate activities */
+ if (dst->flags & RDS_NF_HDR_FLAG_DONE) {
+ rds_recv_drop(conn, saddr, daddr, inc, gfp);
+ }
+ /* this is the normal good processed state */
+ else if (ret >= 0) {
+ /* check the original header and if changed do the needful */
+ if (dst->saddr == org->saddr && dst->daddr == org->daddr &&
+ conn->c_trans->skb_local(skb)) {
+ rds_recv_local(conn, saddr, daddr, inc, gfp);
+ }
+ /* the send both case does both a local recv and a reroute */
+ else if (dst->flags & RDS_NF_HDR_FLAG_BOTH) {
+ /* we must be sure to take an extra reference on the inc
+ * to be sure it doesn't accidentally get freed in between */
+ rds_inc_addref(inc);
+
+ /* send it up the stream locally */
+ rds_recv_local(conn, saddr, daddr, inc, gfp);
+
+ /* and also reroute the request */
+ rds_recv_route(conn, inc, gfp);
+
+ /* since we are done with processing we can drop this additional reference */
+ rds_inc_put(inc);
+
+ }
+ /* anything else is a change in possible destination so pass to route */
+ else
+ rds_recv_route(conn, inc, gfp);
+ }
+ /* we don't really expect an error state from this call that isn't the done above */
+ else {
+ /* we don't really know how to handle this yet - just ignore for now */
+ printk(KERN_ERR "unacceptible state for skb ret %d, conn %p, inc %p, "
+ "%u.%u.%u.%u -> %u.%u.%u.%u\n",
+ ret, conn, inc, NIPQUAD(saddr), NIPQUAD(daddr));
+ }
+}
+EXPORT_SYMBOL_GPL(rds_recv_incoming);
+
+static void
+rds_recv_drop(struct rds_connection *conn, __be32 saddr, __be32 daddr,
+ struct rds_incoming *inc, gfp_t gfp)
+{
+ /* drop the existing incoming message */
+ rdsdebug("dropping request on conn %p, inc %p, %u.%u.%u.%u -> %u.%u.%u.%u",
+ conn, inc, NIPQUAD(saddr), NIPQUAD(daddr));
+}
+
+static void
+rds_recv_route(struct rds_connection *conn, struct rds_incoming *inc,
+ gfp_t gfp)
+{
+ struct rds_connection *nconn;
+ struct rds_nf_hdr *dst, *org;
+
+ /* pull out the rds header */
+ dst = rds_nf_hdr_dst(inc->i_skb);
+ org = rds_nf_hdr_org(inc->i_skb);
+
+ /* special case where we are swapping the message back on the same connection */
+ if (dst->saddr == org->daddr && dst->daddr == org->saddr) {
+ nconn = conn;
+ } else {
+ /* reroute to a new conn structure, possibly the same one */
+ nconn = rds_conn_find(dst->saddr, dst->daddr, conn->c_trans,
+ conn->c_tos);
+ }
+
+ /* cannot find a matching connection so drop the request */
+ if (NULL == nconn) {
+ printk(KERN_ALERT "cannot find matching conn for inc %p, %u.%u.%u.%u -> %u.%u.%u.%u\n",
+ inc, NIPQUAD(dst->saddr), NIPQUAD(dst->daddr));
+
+ rdsdebug("cannot find matching conn for inc %p, %u.%u.%u.%u -> %u.%u.%u.%u",
+ inc, NIPQUAD(dst->saddr), NIPQUAD(dst->daddr));
+ rds_recv_drop(conn, dst->saddr, dst->daddr, inc, gfp);
+ }
+ /* this is a request for our local node, but potentially a different source
+ * either way we process it locally */
+ else if (conn->c_trans->skb_local(inc->i_skb)) {
+ rds_recv_local(nconn, dst->saddr, dst->daddr, inc, gfp);
+ }
+ /* looks like this request is going out to another node */
+ else {
+ rds_recv_forward(nconn, inc, gfp);
+ }
+}
+
+static void
+rds_recv_forward(struct rds_connection *conn, struct rds_incoming *inc,
+ gfp_t gfp)
+{
+ int len, ret;
+ struct rds_nf_hdr *dst, *org;
+ struct rds_sock *rs;
+ struct sock *sk = NULL;
+
+ /* initialize some bits */
+ rs = NULL;
+
+ /* pull out the destination and original rds headers */
+ dst = rds_nf_hdr_dst(inc->i_skb);
+ org = rds_nf_hdr_org(inc->i_skb);
+
+ /* find the proper output socket - it should be the local one on which we originated */
+ rs = rds_find_bound(dst->saddr, dst->sport);
+ if (!rs) {
+ rdsdebug("failed to find output rds_socket dst %u.%u.%u.%u : %u, inc %p, conn %p\n",
+ NIPQUAD(dst->daddr), dst->dport, inc, conn);
+ rds_stats_inc(s_recv_drop_no_sock);
+ goto out;
+ }
+
+ /* pull out the actual message len */
+ len = be32_to_cpu(inc->i_hdr.h_len);
+
+ /* now lets see if we can send it all */
+ ret = rds_send_internal(conn, rs, inc->i_skb, gfp);
+ if (len != ret) {
+ rdsdebug("failed to send rds_data dst %u.%u.%u.%u : %u, inc %p, conn %p, len %d != ret %d\n",
+ NIPQUAD(dst->daddr), dst->dport, inc, conn, len, ret);
+ goto out;
+ }
+
+ if (NULL != rs)
+ rds_sock_put(rs);
+
+ /* all good so we are done */
+ return;
+
+out:
+ /* cleanup any handles */
+ if (NULL != rs) {
+ sk = rds_rs_to_sk(rs);
+ rds_sock_put(rs);
+ }
+
+ /* on error lets take a shot at hook cleanup */
+ NF_HOOK(PF_RDS_HOOK, NF_RDS_FORWARD_ERROR, sk, inc->i_skb, NULL, NULL, rds_recv_ok);
+
+ /* then hand the request off to normal local processing on the old connection */
+ rds_recv_local(inc->i_oconn, org->saddr, org->daddr, inc, gfp);
+}
+
+static void
+rds_recv_local(struct rds_connection *conn, __be32 saddr, __be32 daddr,
+ struct rds_incoming *inc, gfp_t gfp)
{
struct rds_sock *rs = NULL;
struct sock *sk;
if (rs)
rds_sock_put(rs);
}
-EXPORT_SYMBOL_GPL(rds_recv_incoming);
/*
* be very careful here. This is being called as the condition in
rds_info_copy(iter, &minfo, sizeof(minfo));
}
+
+int rds_skb_local(struct sk_buff *skb)
+{
+ struct rds_nf_hdr *dst, *org;
+
+ /* pull out the headers */
+ dst = rds_nf_hdr_dst(skb);
+ org = rds_nf_hdr_org(skb);
+
+ /* just check to see that the destination is still the same */
+ if (dst->daddr == org->daddr && dst->dport == org->dport) {
+ return 1;
+ }
+ /* otherwise, the sport/dport have likely swapped so consider
+ * it a different node */
+ else {
+ return 0;
+ }
+}
+EXPORT_SYMBOL(rds_skb_local);
#include <linux/list.h>
#include "rds.h"
+#include "tcp.h"
/* When transmitting messages in rds_send_xmit, we need to emerge from
* time to time and briefly release the CPU. Otherwise the softlock watchdog
/* Attach data to the rm */
if (payload_len) {
rm->data.op_sg = rds_message_alloc_sgs(rm, ceil(payload_len, PAGE_SIZE));
- ret = rds_message_copy_from_user(rm, &msg->msg_iter);
+ ret = rds_message_copy_from_user(rm, &msg->msg_iter, GFP_KERNEL);
if (ret)
goto out;
}
return ret;
}
+/* this function and rds_sendmsg can likely be folded together into a single function that understand to package
+ * up a message for transmission either from the user or from an internal source.
+ *
+ * Also there is potentially a need to allow either for a retry of the send attempt if we are at a high enough level
+ * in the stack, or for only a single shot attempt for the send if we are low enough in the stack that we cannot afford
+ * to sleep or block forever.
+ *
+ * At present this form of the code will only ever do a single shot at the send and it assumes that the source is internal
+ */
+
+int rds_send_internal(struct rds_connection *conn, struct rds_sock *rs,
+ struct sk_buff *skb, gfp_t gfp)
+{
+ struct rds_nf_hdr *dst;
+ struct rds_message *rm = NULL;
+ struct scatterlist *sg;
+ skb_frag_t *frags;
+ int ret = 0;
+ int queued = 0;
+ int i;
+
+ /* pull out the destination info */
+ dst = rds_nf_hdr_dst(skb);
+
+ /* size of rm including all sgs */
+ ret = ceil(skb->len, PAGE_SIZE) * sizeof(struct scatterlist);
+ if (ret < 0)
+ goto out;
+
+ /* create ourselves a new message to send out the data */
+ rm = rds_message_alloc(ret, gfp);
+ if (!rm) {
+ rdsdebug("failed to allocate response message rs %p", rs);
+ ret = -ENOMEM;
+ goto out;
+ }
+
+ /* Attach data to the rm */
+ if (skb->len) {
+ /* innitialize the segments we need to use */
+ rm->data.op_sg = rds_message_alloc_sgs(rm, ceil(skb->len, PAGE_SIZE));
+
+ /* copy out all the pages from the skb */
+ for (i = 0; i < skb_shinfo(skb)->nr_frags; i++) {
+ /* one to one mapping from skb info to rm info */
+ frags = &skb_shinfo(skb)->frags[i];
+ sg = &rm->data.op_sg[i];
+
+ /* just save the pieces directly */
+ sg_set_page(sg, frags->page.p, frags->size, frags->page_offset);
+
+ /* and take an extra reference on the page */
+ get_page(frags->page.p);
+ }
+
+ /* finalization of the pieces of the message */
+ rm->m_inc.i_hdr.h_len = cpu_to_be32(skb->len);
+ rm->data.op_nents = skb_shinfo(skb)->nr_frags;
+ }
+
+ rdsdebug("Created send rm %p, nents %d, len %d, skbLen %d\n",
+ rm, rm->data.op_nents, be32_to_cpu(rm->m_inc.i_hdr.h_len), skb->len);
+
+ /* initializes all the subpieces of the message */
+ rm->data.op_active = 1;
+ rm->m_daddr = dst->daddr;
+
+ if (rm->rdma.op_active && !conn->c_trans->xmit_rdma) {
+ if (printk_ratelimit())
+ printk(KERN_NOTICE "rdma_op %p conn xmit_rdma %p\n",
+ &rm->rdma, conn->c_trans->xmit_rdma);
+ ret = -EOPNOTSUPP;
+ goto out;
+ }
+
+ if (rm->atomic.op_active && !conn->c_trans->xmit_atomic) {
+ if (printk_ratelimit())
+ printk(KERN_NOTICE "atomic_op %p conn xmit_atomic %p\n",
+ &rm->atomic, conn->c_trans->xmit_atomic);
+ ret = -EOPNOTSUPP;
+ goto out;
+ }
+
+ /* retry the connection if it hasn't actually been made */
+ rds_conn_connect_if_down(conn);
+
+ /* simple congestion check */
+ ret = rds_cong_wait(conn->c_fcong, dst->dport, 1, rs);
+ if (ret) {
+ rs->rs_seen_congestion = 1;
+ goto out;
+ }
+
+ /* only take a single pass */
+ if (!rds_send_queue_rm(rs, conn, rm, rs->rs_bound_port,
+ dst->dport, &queued)) {
+ rdsdebug("cannot block on internal send rs %p", rs);
+ rds_stats_inc(s_send_queue_full);
+
+ /* force a requeue of the work for later */
+ queue_delayed_work(rds_wq, &conn->c_send_w, 1);
+
+ ret = -EAGAIN;
+ goto out;
+ }
+
+ /*
+ * By now we've committed to the send. We reuse rds_send_worker()
+ * to retry sends in the rds thread if the transport asks us to.
+ */
+ rds_stats_inc(s_send_queued);
+
+/* Set this to 1 for normal testing but 0 when building the TCP version
+ * of the code. The TCP version has hang issues otherwise. */
+#if 1
+ /* for the time being it looks like the send_xmit code may lead to a
+ * deadlock/hang, so we are not going to use it yet */
+ ret = rds_send_xmit(conn);
+ if (ret == -ENOMEM || ret == -EAGAIN)
+ queue_delayed_work(rds_wq, &conn->c_send_w, 1);
+#else
+ /* always hand the send off to the worker thread */
+ queue_delayed_work(rds_wq, &conn->c_send_w, 0);
+#endif
+
+ rdsdebug("message sent for rs %p, conn %p, len %d, %u.%u.%u.%u : %u -> %u.%u.%u.%u : %u\n",
+ rs, conn, skb->len, NIPQUAD(dst->saddr), dst->sport, NIPQUAD(dst->daddr), dst->dport);
+ ret = skb->len;
+
+out:
+ /* on error free up page references but don't allow the pages to be freed */
+ if (ret < 0 && rm) {
+ for (i = 0; i < rm->data.op_nents; i++) {
+ sg = &rm->data.op_sg[i];
+ put_page(sg_page(sg));
+ sg_set_page(sg, NULL, 0, 0);
+ }
+ rm->data.op_nents = 0;
+ }
+
+ if (rm)
+ rds_message_put(rm);
+ return ret;
+}
+
/*
* Reply to a ping packet.
*/
.conn_shutdown = rds_tcp_conn_shutdown,
.inc_copy_to_user = rds_tcp_inc_copy_to_user,
.inc_free = rds_tcp_inc_free,
+ .skb_local = rds_skb_local,
.stats_info_copy = rds_tcp_stats_info_copy,
.exit = rds_tcp_exit,
.t_owner = THIS_MODULE,