From: Ahmed Abbas Date: Thu, 18 Jul 2013 23:59:59 +0000 (-0700) Subject: add NETFILTER suppport X-Git-Tag: v4.1.12-92~293^2^2~55 X-Git-Url: https://www.infradead.org/git/?a=commitdiff_plain;h=6d2a3ca2b529f73a5b7f541a51690b013e1bb1be;p=users%2Fjedix%2Flinux-maple.git add NETFILTER suppport Orabug: 17082619 Adds the ability for the RDS code to support the NETFILTER kernel interfaces. This allows for packet inspection, modification, and potential redirection as the packets flow through the lower layers of the RDS code. Signed-off-by: Bang Nguyen (Ported from UEK2 commit 1913973db561fd6db2e495d3b95e6f8c78b3ba23) Signed-off-by: Mukesh Kacker --- diff --git a/include/uapi/linux/rds.h b/include/uapi/linux/rds.h index 1caffb379c3e..83370cdd898a 100644 --- a/include/uapi/linux/rds.h +++ b/include/uapi/linux/rds.h @@ -35,6 +35,7 @@ #define _LINUX_RDS_H #include +#include /* These sparse annotated types shouldn't be in any user * visible header file. We should clean this up rather @@ -64,9 +65,11 @@ */ #define SIOCRDSSETTOS (SIOCPROTOPRIVATE) #define SIOCRDSGETTOS (SIOCPROTOPRIVATE + 1) +#define SIOCRDSENABLENETFILTER (SIOCPROTOPRIVATE + 2) -typedef u_int8_t rds_tos_t; +#define IPPROTO_OKA (142) +typedef u_int8_t rds_tos_t; /* * Control message types for SOL_RDS. @@ -312,5 +315,38 @@ struct rds_rdma_send_notify { #define RDS_RDMA_REMOTE_COMPLETE 0x0080 /* Notify when data is available */ #define RDS_SEND_NOTIFY_ME 0x0100 /* Notify when operation completes */ +/* netfilter related components */ +struct rds_nf_hdr { + __be32 saddr; /* source address of request */ + __be32 daddr; /* destination address */ + __be16 sport; /* source port number */ + __be16 dport; /* destination port number */ + __be16 protocol; /* rds socket protocol family to use */ + +#define RDS_NF_HDR_FLAG_BOTH (0x1) /* request needs to go locally and remote */ +#define RDS_NF_HDR_FLAG_DONE (0x2) /* the request is consumed and done */ + __be16 flags; /* any configuration flags */ + struct sock *sk; +}; + +/* pull out the 2 rdshdr from the SKB structures passed around */ +#define rds_nf_hdr_dst(skb) (&(((struct rds_nf_hdr *)skb_tail_pointer((skb)))[0])) +#define rds_nf_hdr_org(skb) (&(((struct rds_nf_hdr *)skb_tail_pointer((skb)))[1])) + +/* temporary hack for a family that exists in the netfilter family */ +#define PF_RDS_HOOK 11 + +enum rds_inet_hooks { + NF_RDS_PRE_ROUTING, + NF_RDS_FORWARD_ERROR, + NF_RDS_NUMHOOKS +}; + +enum rds_hook_priorities { + NF_RDS_PRI_FIRST = INT_MIN, + NF_RDS_PRI_OKA = 0, + NF_RDS_PRI_LAST = INT_MAX +}; + #endif /* IB_RDS_H */ diff --git a/net/rds/af_rds.c b/net/rds/af_rds.c index 5869a0434fe1..ac41334f57c8 100644 --- a/net/rds/af_rds.c +++ b/net/rds/af_rds.c @@ -234,6 +234,11 @@ static int rds_ioctl(struct socket *sock, unsigned int cmd, unsigned long arg) if (put_user(tos, (rds_tos_t __user *)arg)) return -EFAULT; break; + case SIOCRDSENABLENETFILTER: + spin_lock_bh(&rds_sock_lock); + rs->rs_netfilter_enabled = 1; + spin_unlock_bh(&rds_sock_lock); + break; default: return -ENOIOCTLCMD; } @@ -493,6 +498,7 @@ static int __rds_create(struct socket *sock, struct sock *sk, int protocol) rs->poison = 0xABABABAB; rs->rs_tos = 0; rs->rs_conn = 0; + rs->rs_netfilter_enabled = 0; if (rs->rs_bound_addr) printk(KERN_CRIT "bound addr %x at create\n", rs->rs_bound_addr); @@ -509,7 +515,8 @@ static int rds_create(struct net *net, struct socket *sock, int protocol, int ke { struct sock *sk; - if (sock->type != SOCK_SEQPACKET || protocol) + if (sock->type != SOCK_SEQPACKET || + (protocol && IPPROTO_OKA != protocol)) return -ESOCKTNOSUPPORT; sk = sk_alloc(net, AF_RDS, GFP_KERNEL, &rds_proto); @@ -552,6 +559,7 @@ void debug_sock_put(struct sock *sk) WARN_ON(1); } rs->poison = 0xDEADBEEF; + rs->rs_netfilter_enabled = 0; sk_free(sk); } } diff --git a/net/rds/ib.c b/net/rds/ib.c index 35bebe74c02c..47f7d91c3850 100644 --- a/net/rds/ib.c +++ b/net/rds/ib.c @@ -1599,6 +1599,8 @@ struct rds_transport rds_ib_transport = { .conn_shutdown = rds_ib_conn_shutdown, .inc_copy_to_user = rds_ib_inc_copy_to_user, .inc_free = rds_ib_inc_free, + .inc_to_skb = rds_ib_inc_to_skb, + .skb_local = rds_skb_local, .cm_initiate_connect = rds_ib_cm_initiate_connect, .cm_handle_connect = rds_ib_cm_handle_connect, .cm_connect_complete = rds_ib_cm_connect_complete, @@ -1616,5 +1618,62 @@ struct rds_transport rds_ib_transport = { .t_type = RDS_TRANS_IB }; +int rds_ib_inc_to_skb(struct rds_incoming *inc, struct sk_buff *skb) +{ + skb_frag_t *frag; + int ret = 0, slen; + u32 len; + int i; + struct rds_ib_incoming *ibinc; + struct rds_page_frag *ibfrag; + + /* pull out initial pointers */ + ibinc = container_of(inc, struct rds_ib_incoming, ii_inc); + ibfrag = list_entry(ibinc->ii_frags.next, struct rds_page_frag, f_item); + len = be32_to_cpu(inc->i_hdr.h_len); + slen = len; + i = 0; + + /* run through the entire ib fragment list and save off the buffers */ + while (NULL != ibfrag && slen > 0) { + /* one to one mapping of frags to sg structures */ + frag = &skb_shinfo(skb)->frags[i]; + + /* save off all the sg pieces to the skb frags we are creating */ + frag->size = ibfrag->f_sg.length; + frag->page_offset = ibfrag->f_sg.offset; + frag->page.p = sg_page(&ibfrag->f_sg); + + /* AA: do we need to bump up the page reference */ + /* get_page(frag->page); */ + + /* dec the amount of data we are consuming */ + slen -= frag->size; + + /* bump to the next entry */ + ibfrag = list_entry(ibfrag->f_item.next, struct rds_page_frag, f_item); + i++; + + /* for now we will only have a single chain of fragments in the skb */ + if (i >= MAX_SKB_FRAGS) { + rdsdebug("too many fragments in op %u > max %u, skb %p", + i, (int)MAX_SKB_FRAGS, skb); + goto done; + } + } + + /* track the full message length too */ + skb->len = len; + + /* all good */ + ret = 1; + +done: + /* track all the fragments we saved */ + skb_shinfo(skb)->nr_frags = i; + + return ret; +} + MODULE_LICENSE("GPL"); diff --git a/net/rds/iw.c b/net/rds/iw.c index bca67d0ff4af..d9451aa1d9e4 100644 --- a/net/rds/iw.c +++ b/net/rds/iw.c @@ -271,6 +271,7 @@ struct rds_transport rds_iw_transport = { .conn_shutdown = rds_iw_conn_shutdown, .inc_copy_to_user = rds_iw_inc_copy_to_user, .inc_free = rds_iw_inc_free, + .skb_local = rds_skb_local, .cm_initiate_connect = rds_iw_cm_initiate_connect, .cm_handle_connect = rds_iw_cm_handle_connect, .cm_connect_complete = rds_iw_cm_connect_complete, diff --git a/net/rds/loop.c b/net/rds/loop.c index 272acad97224..c4c03a02a5b9 100644 --- a/net/rds/loop.c +++ b/net/rds/loop.c @@ -148,6 +148,26 @@ static void rds_loop_conn_shutdown(struct rds_connection *conn) { } +static int rds_message_skb_local(struct sk_buff *skb) +{ + struct rds_nf_hdr *dst, *org; + + /* pull out the headers */ + dst = rds_nf_hdr_dst(skb); + org = rds_nf_hdr_org(skb); + + /* assuming original and dest are exactly the same then it's our own node */ + if (dst->daddr == org->daddr && dst->saddr == org->saddr && + dst->sport == org->sport && dst->dport == org->dport) { + return 1; + } + /* otherwise, the sport/dport have likely swapped so consider + * it a different node */ + else { + return 0; + } +} + void rds_loop_exit(void) { struct rds_loop_connection *lc, *_lc; @@ -179,6 +199,8 @@ struct rds_transport rds_loop_transport = { .conn_connect = rds_loop_conn_connect, .conn_shutdown = rds_loop_conn_shutdown, .inc_copy_to_user = rds_message_inc_copy_to_user, + .inc_to_skb = rds_message_inc_to_skb, + .skb_local = rds_message_skb_local, .inc_free = rds_loop_inc_free, .t_name = "loopback", }; diff --git a/net/rds/message.c b/net/rds/message.c index b4d32e94d5db..277f8e306722 100644 --- a/net/rds/message.c +++ b/net/rds/message.c @@ -279,7 +279,8 @@ struct rds_message *rds_message_map_pages(unsigned long *page_addrs, unsigned in return rm; } -int rds_message_copy_from_user(struct rds_message *rm, struct iov_iter *from) +int rds_message_copy_from_user(struct rds_message *rm, struct iov_iter *from, + gfp_t gfp) { unsigned long to_copy, nbytes; unsigned long sg_off; @@ -297,7 +298,9 @@ int rds_message_copy_from_user(struct rds_message *rm, struct iov_iter *from) while (iov_iter_count(from)) { if (!sg_page(sg)) { ret = rds_page_remainder_alloc(sg, iov_iter_count(from), - GFP_HIGHUSER); + GFP_ATOMIC == gfp ? + gfp : GFP_HIGHUSER); + if (ret) return ret; rm->data.op_nents++; @@ -379,3 +382,50 @@ void rds_message_unmapped(struct rds_message *rm) } EXPORT_SYMBOL_GPL(rds_message_unmapped); +int rds_message_inc_to_skb(struct rds_incoming *inc, struct sk_buff *skb) +{ + struct rds_message *rm; + struct scatterlist *sg; + skb_frag_t *frag; + int ret = 0; + u32 len; + int i; + + rm = container_of(inc, struct rds_message, m_inc); + len = be32_to_cpu(rm->m_inc.i_hdr.h_len); + i = 0; + + /* for now we will only have a single chain of fragments in the skb */ + if (rm->data.op_nents >= MAX_SKB_FRAGS) { + rdsdebug("too many fragments in op %u > max %u, rm %p", + rm->data.op_nents, (int)MAX_SKB_FRAGS, rm); + goto done; + } + + /* run through the entire scatter gather list and save off the buffers */ + for (i = 0; i < rm->data.op_nents; i++) { + /* one to one mapping of frags to sg structures */ + frag = &skb_shinfo(skb)->frags[i]; + sg = &rm->data.op_sg[i]; + + /* save off all the sg pieces to the skb frags we are creating */ + frag->size = sg->length; + frag->page_offset = sg->offset; + frag->page.p = sg_page(sg); + + /* AA: do we need to bump up the page reference too */ + /* get_page(frag->page); */ + } + + /* track the full message length too */ + skb->len = len; + + /* all good */ + ret = 1; + +done: + /* track all the fragments we saved */ + skb_shinfo(skb)->nr_frags = i; + + return ret; +} diff --git a/net/rds/page.c b/net/rds/page.c index fad4f8cf7a71..e35a92fde68f 100644 --- a/net/rds/page.c +++ b/net/rds/page.c @@ -57,6 +57,9 @@ int rds_page_copy_user(struct page *page, unsigned long offset, unsigned long ret; void *addr; + /* AA: can this be removed as sometimes it gives false negative - this doesn't + * exist in the OFA 1.5.3 code line */ +#if 0 if (to_user) ret = access_ok(VERIFY_WRITE, ptr, bytes); else @@ -64,6 +67,7 @@ int rds_page_copy_user(struct page *page, unsigned long offset, if (!ret) return -EFAULT; +#endif if (to_user) rds_stats_add(s_copy_to_user, bytes); diff --git a/net/rds/rds.h b/net/rds/rds.h index 5881b3d977ad..a5a362d6e872 100644 --- a/net/rds/rds.h +++ b/net/rds/rds.h @@ -239,6 +239,10 @@ struct rds_incoming { unsigned long i_rx_jiffies; __be32 i_saddr; + /* extension fields for dealing with netfilter */ + struct rds_connection *i_oconn; + struct sk_buff *i_skb; + rds_rdma_cookie_t i_rdma_cookie; }; @@ -466,6 +470,9 @@ struct rds_transport { int (*inc_copy_to_user)(struct rds_incoming *inc, struct iov_iter *to); void (*inc_free)(struct rds_incoming *inc); + int (*inc_to_skb)(struct rds_incoming *inc, struct sk_buff *skb); + int (*skb_local)(struct sk_buff *skb); + int (*cm_handle_connect)(struct rdma_cm_id *cm_id, struct rdma_cm_event *event); int (*cm_initiate_connect)(struct rdma_cm_id *cm_id); @@ -546,7 +553,8 @@ struct rds_sock { /* Socket options - in case there will be more */ unsigned char rs_recverr, rs_cong_monitor; - int poison; + int poison; + int rs_netfilter_enabled; u8 rs_tos; }; @@ -701,7 +709,8 @@ rds_conn_connecting(struct rds_connection *conn) /* message.c */ struct rds_message *rds_message_alloc(unsigned int nents, gfp_t gfp); struct scatterlist *rds_message_alloc_sgs(struct rds_message *rm, int nents); -int rds_message_copy_from_user(struct rds_message *rm, struct iov_iter *from); +int rds_message_copy_from_user(struct rds_message *rm, struct iov_iter *from, + gfp_t gfp); struct rds_message *rds_message_map_pages(unsigned long *page_addrs, unsigned int total_len); void rds_message_populate_header(struct rds_header *hdr, __be16 sport, __be16 dport, u64 seq); @@ -714,6 +723,7 @@ int rds_message_get_version_extension(struct rds_header *hdr, unsigned int *vers int rds_message_add_rdma_dest_extension(struct rds_header *hdr, u32 r_key, u32 offset); int rds_message_inc_copy_to_user(struct rds_incoming *inc, struct iov_iter *to); void rds_message_inc_free(struct rds_incoming *inc); +int rds_message_inc_to_skb(struct rds_incoming *inc, struct sk_buff *skb); void rds_message_addref(struct rds_message *rm); void rds_message_put(struct rds_message *rm); void rds_message_wait(struct rds_message *rm); @@ -757,6 +767,7 @@ int rds_notify_queue_get(struct rds_sock *rs, struct msghdr *msg); void rds_inc_info_copy(struct rds_incoming *inc, struct rds_info_iterator *iter, __be32 saddr, __be32 daddr, int flip); +int rds_skb_local(struct sk_buff *skb); /* send.c */ int rds_sendmsg(struct socket *sock, struct msghdr *msg, size_t payload_len); @@ -772,6 +783,8 @@ int rds_send_pong(struct rds_connection *conn, __be16 dport); int rds_send_hb(struct rds_connection *conn, int response); struct rds_message *rds_send_get_message(struct rds_connection *, struct rm_rdma_op *); +int rds_send_internal(struct rds_connection *conn, struct rds_sock *rs, + struct sk_buff *skb, gfp_t gfp); extern unsigned int rds_async_send_enabled; @@ -858,4 +871,7 @@ unsigned int rds_trans_stats_info_copy(struct rds_info_iterator *iter, int rds_trans_init(void); void rds_trans_exit(void); +/* ib.c */ +int rds_ib_inc_to_skb(struct rds_incoming *inc, struct sk_buff *skb); + #endif diff --git a/net/rds/recv.c b/net/rds/recv.c index 3924a2242033..e990f124f7e7 100644 --- a/net/rds/recv.c +++ b/net/rds/recv.c @@ -33,8 +33,35 @@ #include #include #include +#include +#include #include "rds.h" +#include "tcp.h" + +/* forward prototypes */ +static void +rds_recv_drop(struct rds_connection *conn, __be32 saddr, __be32 daddr, + struct rds_incoming *inc, gfp_t gfp); + +static void +rds_recv_route(struct rds_connection *conn, struct rds_incoming *inc, + gfp_t gfp); + +static void +rds_recv_forward(struct rds_connection *conn, struct rds_incoming *inc, + gfp_t gfp); + +static void +rds_recv_local(struct rds_connection *conn, __be32 saddr, __be32 daddr, + struct rds_incoming *inc, gfp_t gfp); + +static int +rds_recv_ok(struct sock *sk, struct sk_buff *skb) +{ + /* don't do anything here, just continue along */ + return NF_ACCEPT; +} void rds_inc_init(struct rds_incoming *inc, struct rds_connection *conn, __be32 saddr) @@ -44,6 +71,8 @@ void rds_inc_init(struct rds_incoming *inc, struct rds_connection *conn, inc->i_conn = conn; inc->i_saddr = saddr; inc->i_rdma_cookie = 0; + inc->i_oconn = NULL; + inc->i_skb = NULL; } EXPORT_SYMBOL_GPL(rds_inc_init); @@ -60,6 +89,16 @@ void rds_inc_put(struct rds_incoming *inc) if (atomic_dec_and_test(&inc->i_refcount)) { BUG_ON(!list_empty(&inc->i_item)); + /* free up the skb if any were created */ + if (NULL != inc->i_skb) { + /* wipe out any fragments so they don't get released */ + skb_shinfo(inc->i_skb)->nr_frags = 0; + + /* and free the whole skb */ + kfree_skb(inc->i_skb); + inc->i_skb = NULL; + } + inc->i_conn->c_trans->inc_free(inc); } } @@ -155,6 +194,247 @@ static void rds_recv_incoming_exthdrs(struct rds_incoming *inc, struct rds_sock */ void rds_recv_incoming(struct rds_connection *conn, __be32 saddr, __be32 daddr, struct rds_incoming *inc, gfp_t gfp) +{ + struct sk_buff *skb; + struct rds_sock *rs; + struct sock *sk; + struct rds_nf_hdr *dst, *org; + int ret; + + rdsdebug(KERN_ALERT "incoming: conn %p, inc %p, %u.%u.%u.%u : %d -> %u.%u.%u.%u : %d\n", + conn, inc, NIPQUAD(saddr), inc->i_hdr.h_sport, NIPQUAD(daddr), inc->i_hdr.h_dport); + + /* initialize some globals */ + rs = NULL; + sk = NULL; + + /* save off the original connection against which the request arrived */ + inc->i_oconn = conn; + inc->i_skb = NULL; + + /* lets find a socket to which this request belongs */ + rs = rds_find_bound(daddr, inc->i_hdr.h_dport); + + /* pass it on locally if there is no socket bound, or if netfilter is + * disabled for this socket */ + if (NULL == rs || !rs->rs_netfilter_enabled) { + /* drop the reference if we had taken one */ + if (NULL != rs) + rds_sock_put(rs); + + rds_recv_local(conn, saddr, daddr, inc, gfp); + return; + } + + /* otherwise pull out the socket */ + sk = rds_rs_to_sk(rs); + + /* create an skb with some additional space to store our rds_nf_hdr info */ + skb = alloc_skb(sizeof(struct rds_nf_hdr) * 2, gfp); + if (NULL == skb) { + /* if we have allocation problems, then we just need to depart */ + rdsdebug("failure to allocate space for inc %p, %u.%u.%u.%u -> %u.%d.%u.%u\n", + inc, NIPQUAD(saddr), NIPQUAD(daddr)); + rds_recv_local(conn, saddr, daddr, inc, gfp); + return; + } + + /* once we've allocated an skb, also store it in our structures */ + inc->i_skb = skb; + + /* now pull out the rds headers */ + dst = rds_nf_hdr_dst(skb); + org = rds_nf_hdr_org(skb); + + /* now update our rds_nf_hdr for tracking locations of the request */ + dst->saddr = saddr; + dst->daddr = daddr; + dst->sport = inc->i_hdr.h_sport; + dst->dport = inc->i_hdr.h_dport; + dst->flags = 0; + + /* assign the appropriate protocol if any */ + if (NULL != sk) { + dst->protocol = sk->sk_protocol; + dst->sk = sk; + } else { + dst->protocol = 0; + dst->sk = NULL; + } + + /* cleanup any references taken */ + if (NULL != rs) + rds_sock_put(rs); + + /* the original info is just a copy */ + memcpy(org, dst, sizeof(struct rds_nf_hdr)); + + /* convert our local data structures in the message to a generalized skb form */ + if (conn->c_trans->inc_to_skb(inc, skb)) { + rdsdebug("handing off to PRE_ROUTING hook\n"); + /* call down through the hook layers */ + ret = NF_HOOK(PF_RDS_HOOK, NF_RDS_PRE_ROUTING, sk, skb, NULL, NULL, rds_recv_ok); + } + /* if we had a failure to convert, then just assuming to continue as local */ + else { + rdsdebug("failed to create skb form, conn %p, inc %p, %u.%u.%u.%u -> %u.%u.%u.%u\n", + conn, inc, NIPQUAD(saddr), NIPQUAD(daddr)); + ret = 1; + } + + /* pull back out the rds headers */ + dst = rds_nf_hdr_dst(skb); + org = rds_nf_hdr_org(skb); + + /* now depending upon we got back we can perform appropriate activities */ + if (dst->flags & RDS_NF_HDR_FLAG_DONE) { + rds_recv_drop(conn, saddr, daddr, inc, gfp); + } + /* this is the normal good processed state */ + else if (ret >= 0) { + /* check the original header and if changed do the needful */ + if (dst->saddr == org->saddr && dst->daddr == org->daddr && + conn->c_trans->skb_local(skb)) { + rds_recv_local(conn, saddr, daddr, inc, gfp); + } + /* the send both case does both a local recv and a reroute */ + else if (dst->flags & RDS_NF_HDR_FLAG_BOTH) { + /* we must be sure to take an extra reference on the inc + * to be sure it doesn't accidentally get freed in between */ + rds_inc_addref(inc); + + /* send it up the stream locally */ + rds_recv_local(conn, saddr, daddr, inc, gfp); + + /* and also reroute the request */ + rds_recv_route(conn, inc, gfp); + + /* since we are done with processing we can drop this additional reference */ + rds_inc_put(inc); + + } + /* anything else is a change in possible destination so pass to route */ + else + rds_recv_route(conn, inc, gfp); + } + /* we don't really expect an error state from this call that isn't the done above */ + else { + /* we don't really know how to handle this yet - just ignore for now */ + printk(KERN_ERR "unacceptible state for skb ret %d, conn %p, inc %p, " + "%u.%u.%u.%u -> %u.%u.%u.%u\n", + ret, conn, inc, NIPQUAD(saddr), NIPQUAD(daddr)); + } +} +EXPORT_SYMBOL_GPL(rds_recv_incoming); + +static void +rds_recv_drop(struct rds_connection *conn, __be32 saddr, __be32 daddr, + struct rds_incoming *inc, gfp_t gfp) +{ + /* drop the existing incoming message */ + rdsdebug("dropping request on conn %p, inc %p, %u.%u.%u.%u -> %u.%u.%u.%u", + conn, inc, NIPQUAD(saddr), NIPQUAD(daddr)); +} + +static void +rds_recv_route(struct rds_connection *conn, struct rds_incoming *inc, + gfp_t gfp) +{ + struct rds_connection *nconn; + struct rds_nf_hdr *dst, *org; + + /* pull out the rds header */ + dst = rds_nf_hdr_dst(inc->i_skb); + org = rds_nf_hdr_org(inc->i_skb); + + /* special case where we are swapping the message back on the same connection */ + if (dst->saddr == org->daddr && dst->daddr == org->saddr) { + nconn = conn; + } else { + /* reroute to a new conn structure, possibly the same one */ + nconn = rds_conn_find(dst->saddr, dst->daddr, conn->c_trans, + conn->c_tos); + } + + /* cannot find a matching connection so drop the request */ + if (NULL == nconn) { + printk(KERN_ALERT "cannot find matching conn for inc %p, %u.%u.%u.%u -> %u.%u.%u.%u\n", + inc, NIPQUAD(dst->saddr), NIPQUAD(dst->daddr)); + + rdsdebug("cannot find matching conn for inc %p, %u.%u.%u.%u -> %u.%u.%u.%u", + inc, NIPQUAD(dst->saddr), NIPQUAD(dst->daddr)); + rds_recv_drop(conn, dst->saddr, dst->daddr, inc, gfp); + } + /* this is a request for our local node, but potentially a different source + * either way we process it locally */ + else if (conn->c_trans->skb_local(inc->i_skb)) { + rds_recv_local(nconn, dst->saddr, dst->daddr, inc, gfp); + } + /* looks like this request is going out to another node */ + else { + rds_recv_forward(nconn, inc, gfp); + } +} + +static void +rds_recv_forward(struct rds_connection *conn, struct rds_incoming *inc, + gfp_t gfp) +{ + int len, ret; + struct rds_nf_hdr *dst, *org; + struct rds_sock *rs; + struct sock *sk = NULL; + + /* initialize some bits */ + rs = NULL; + + /* pull out the destination and original rds headers */ + dst = rds_nf_hdr_dst(inc->i_skb); + org = rds_nf_hdr_org(inc->i_skb); + + /* find the proper output socket - it should be the local one on which we originated */ + rs = rds_find_bound(dst->saddr, dst->sport); + if (!rs) { + rdsdebug("failed to find output rds_socket dst %u.%u.%u.%u : %u, inc %p, conn %p\n", + NIPQUAD(dst->daddr), dst->dport, inc, conn); + rds_stats_inc(s_recv_drop_no_sock); + goto out; + } + + /* pull out the actual message len */ + len = be32_to_cpu(inc->i_hdr.h_len); + + /* now lets see if we can send it all */ + ret = rds_send_internal(conn, rs, inc->i_skb, gfp); + if (len != ret) { + rdsdebug("failed to send rds_data dst %u.%u.%u.%u : %u, inc %p, conn %p, len %d != ret %d\n", + NIPQUAD(dst->daddr), dst->dport, inc, conn, len, ret); + goto out; + } + + if (NULL != rs) + rds_sock_put(rs); + + /* all good so we are done */ + return; + +out: + /* cleanup any handles */ + if (NULL != rs) { + sk = rds_rs_to_sk(rs); + rds_sock_put(rs); + } + + /* on error lets take a shot at hook cleanup */ + NF_HOOK(PF_RDS_HOOK, NF_RDS_FORWARD_ERROR, sk, inc->i_skb, NULL, NULL, rds_recv_ok); + + /* then hand the request off to normal local processing on the old connection */ + rds_recv_local(inc->i_oconn, org->saddr, org->daddr, inc, gfp); +} + +static void +rds_recv_local(struct rds_connection *conn, __be32 saddr, __be32 daddr, + struct rds_incoming *inc, gfp_t gfp) { struct rds_sock *rs = NULL; struct sock *sk; @@ -246,7 +526,6 @@ out: if (rs) rds_sock_put(rs); } -EXPORT_SYMBOL_GPL(rds_recv_incoming); /* * be very careful here. This is being called as the condition in @@ -570,3 +849,23 @@ void rds_inc_info_copy(struct rds_incoming *inc, rds_info_copy(iter, &minfo, sizeof(minfo)); } + +int rds_skb_local(struct sk_buff *skb) +{ + struct rds_nf_hdr *dst, *org; + + /* pull out the headers */ + dst = rds_nf_hdr_dst(skb); + org = rds_nf_hdr_org(skb); + + /* just check to see that the destination is still the same */ + if (dst->daddr == org->daddr && dst->dport == org->dport) { + return 1; + } + /* otherwise, the sport/dport have likely swapped so consider + * it a different node */ + else { + return 0; + } +} +EXPORT_SYMBOL(rds_skb_local); diff --git a/net/rds/send.c b/net/rds/send.c index 523d7c238a51..65007113ffd9 100644 --- a/net/rds/send.c +++ b/net/rds/send.c @@ -36,6 +36,7 @@ #include #include "rds.h" +#include "tcp.h" /* When transmitting messages in rds_send_xmit, we need to emerge from * time to time and briefly release the CPU. Otherwise the softlock watchdog @@ -1212,7 +1213,7 @@ int rds_sendmsg(struct socket *sock, struct msghdr *msg, size_t payload_len) /* Attach data to the rm */ if (payload_len) { rm->data.op_sg = rds_message_alloc_sgs(rm, ceil(payload_len, PAGE_SIZE)); - ret = rds_message_copy_from_user(rm, &msg->msg_iter); + ret = rds_message_copy_from_user(rm, &msg->msg_iter, GFP_KERNEL); if (ret) goto out; } @@ -1360,6 +1361,151 @@ out: return ret; } +/* this function and rds_sendmsg can likely be folded together into a single function that understand to package + * up a message for transmission either from the user or from an internal source. + * + * Also there is potentially a need to allow either for a retry of the send attempt if we are at a high enough level + * in the stack, or for only a single shot attempt for the send if we are low enough in the stack that we cannot afford + * to sleep or block forever. + * + * At present this form of the code will only ever do a single shot at the send and it assumes that the source is internal + */ + +int rds_send_internal(struct rds_connection *conn, struct rds_sock *rs, + struct sk_buff *skb, gfp_t gfp) +{ + struct rds_nf_hdr *dst; + struct rds_message *rm = NULL; + struct scatterlist *sg; + skb_frag_t *frags; + int ret = 0; + int queued = 0; + int i; + + /* pull out the destination info */ + dst = rds_nf_hdr_dst(skb); + + /* size of rm including all sgs */ + ret = ceil(skb->len, PAGE_SIZE) * sizeof(struct scatterlist); + if (ret < 0) + goto out; + + /* create ourselves a new message to send out the data */ + rm = rds_message_alloc(ret, gfp); + if (!rm) { + rdsdebug("failed to allocate response message rs %p", rs); + ret = -ENOMEM; + goto out; + } + + /* Attach data to the rm */ + if (skb->len) { + /* innitialize the segments we need to use */ + rm->data.op_sg = rds_message_alloc_sgs(rm, ceil(skb->len, PAGE_SIZE)); + + /* copy out all the pages from the skb */ + for (i = 0; i < skb_shinfo(skb)->nr_frags; i++) { + /* one to one mapping from skb info to rm info */ + frags = &skb_shinfo(skb)->frags[i]; + sg = &rm->data.op_sg[i]; + + /* just save the pieces directly */ + sg_set_page(sg, frags->page.p, frags->size, frags->page_offset); + + /* and take an extra reference on the page */ + get_page(frags->page.p); + } + + /* finalization of the pieces of the message */ + rm->m_inc.i_hdr.h_len = cpu_to_be32(skb->len); + rm->data.op_nents = skb_shinfo(skb)->nr_frags; + } + + rdsdebug("Created send rm %p, nents %d, len %d, skbLen %d\n", + rm, rm->data.op_nents, be32_to_cpu(rm->m_inc.i_hdr.h_len), skb->len); + + /* initializes all the subpieces of the message */ + rm->data.op_active = 1; + rm->m_daddr = dst->daddr; + + if (rm->rdma.op_active && !conn->c_trans->xmit_rdma) { + if (printk_ratelimit()) + printk(KERN_NOTICE "rdma_op %p conn xmit_rdma %p\n", + &rm->rdma, conn->c_trans->xmit_rdma); + ret = -EOPNOTSUPP; + goto out; + } + + if (rm->atomic.op_active && !conn->c_trans->xmit_atomic) { + if (printk_ratelimit()) + printk(KERN_NOTICE "atomic_op %p conn xmit_atomic %p\n", + &rm->atomic, conn->c_trans->xmit_atomic); + ret = -EOPNOTSUPP; + goto out; + } + + /* retry the connection if it hasn't actually been made */ + rds_conn_connect_if_down(conn); + + /* simple congestion check */ + ret = rds_cong_wait(conn->c_fcong, dst->dport, 1, rs); + if (ret) { + rs->rs_seen_congestion = 1; + goto out; + } + + /* only take a single pass */ + if (!rds_send_queue_rm(rs, conn, rm, rs->rs_bound_port, + dst->dport, &queued)) { + rdsdebug("cannot block on internal send rs %p", rs); + rds_stats_inc(s_send_queue_full); + + /* force a requeue of the work for later */ + queue_delayed_work(rds_wq, &conn->c_send_w, 1); + + ret = -EAGAIN; + goto out; + } + + /* + * By now we've committed to the send. We reuse rds_send_worker() + * to retry sends in the rds thread if the transport asks us to. + */ + rds_stats_inc(s_send_queued); + +/* Set this to 1 for normal testing but 0 when building the TCP version + * of the code. The TCP version has hang issues otherwise. */ +#if 1 + /* for the time being it looks like the send_xmit code may lead to a + * deadlock/hang, so we are not going to use it yet */ + ret = rds_send_xmit(conn); + if (ret == -ENOMEM || ret == -EAGAIN) + queue_delayed_work(rds_wq, &conn->c_send_w, 1); +#else + /* always hand the send off to the worker thread */ + queue_delayed_work(rds_wq, &conn->c_send_w, 0); +#endif + + rdsdebug("message sent for rs %p, conn %p, len %d, %u.%u.%u.%u : %u -> %u.%u.%u.%u : %u\n", + rs, conn, skb->len, NIPQUAD(dst->saddr), dst->sport, NIPQUAD(dst->daddr), dst->dport); + ret = skb->len; + +out: + /* on error free up page references but don't allow the pages to be freed */ + if (ret < 0 && rm) { + for (i = 0; i < rm->data.op_nents; i++) { + sg = &rm->data.op_sg[i]; + put_page(sg_page(sg)); + sg_set_page(sg, NULL, 0, 0); + } + rm->data.op_nents = 0; + } + + if (rm) + rds_message_put(rm); + return ret; +} + /* * Reply to a ping packet. */ diff --git a/net/rds/tcp.c b/net/rds/tcp.c index 74bef621c1e7..a3dcc1e381a6 100644 --- a/net/rds/tcp.c +++ b/net/rds/tcp.c @@ -265,6 +265,7 @@ struct rds_transport rds_tcp_transport = { .conn_shutdown = rds_tcp_conn_shutdown, .inc_copy_to_user = rds_tcp_inc_copy_to_user, .inc_free = rds_tcp_inc_free, + .skb_local = rds_skb_local, .stats_info_copy = rds_tcp_stats_info_copy, .exit = rds_tcp_exit, .t_owner = THIS_MODULE,