--- /dev/null
+/*
+ * net/tipc/server.c: TIPC server infrastructure
+ *
+ * Copyright (c) 2012-2013, Wind River Systems
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above copyright
+ *    notice, this list of conditions and the following disclaimer.
+ * 2. Redistributions in binary form must reproduce the above copyright
+ *    notice, this list of conditions and the following disclaimer in the
+ *    documentation and/or other materials provided with the distribution.
+ * 3. Neither the names of the copyright holders nor the names of its
+ *    contributors may be used to endorse or promote products derived from
+ *    this software without specific prior written permission.
+ *
+ * Alternatively, this software may be distributed under the terms of the
+ * GNU General Public License ("GPL") version 2 as published by the Free
+ * Software Foundation.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
+ * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+ * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+ * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+ * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+ * POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#include "server.h"
+#include "core.h"
+#include <net/sock.h>
+
+/* Number of messages to send before rescheduling */
+#define MAX_SEND_MSG_COUNT     25
+#define MAX_RECV_MSG_COUNT     25
+#define CF_CONNECTED           1
+
+#define sock2con(x) ((struct tipc_conn *)(x)->sk_user_data)
+
+/**
+ * struct tipc_conn - TIPC connection structure
+ * @kref: reference counter to connection object
+ * @conid: connection identifier
+ * @sock: socket handler associated with connection
+ * @flags: indicates connection state
+ * @server: pointer to connected server
+ * @rwork: receive work item
+ * @usr_data: user-specified field
+ * @rx_action: what to do when connection socket is active
+ * @outqueue: pointer to first outbound message in queue
+ * @outqueue_lock: controll access to the outqueue
+ * @outqueue: list of connection objects for its server
+ * @swork: send work item
+ */
+struct tipc_conn {
+       struct kref kref;
+       int conid;
+       struct socket *sock;
+       unsigned long flags;
+       struct tipc_server *server;
+       struct work_struct rwork;
+       int (*rx_action) (struct tipc_conn *con);
+       void *usr_data;
+       struct list_head outqueue;
+       spinlock_t outqueue_lock;
+       struct work_struct swork;
+};
+
+/* An entry waiting to be sent */
+struct outqueue_entry {
+       struct list_head list;
+       struct kvec iov;
+       struct sockaddr_tipc dest;
+};
+
+static void tipc_recv_work(struct work_struct *work);
+static void tipc_send_work(struct work_struct *work);
+static void tipc_clean_outqueues(struct tipc_conn *con);
+
+static void tipc_conn_kref_release(struct kref *kref)
+{
+       struct tipc_conn *con = container_of(kref, struct tipc_conn, kref);
+       struct tipc_server *s = con->server;
+
+       if (con->sock) {
+               tipc_sock_release_local(con->sock);
+               con->sock = NULL;
+       }
+
+       tipc_clean_outqueues(con);
+
+       if (con->conid)
+               s->tipc_conn_shutdown(con->conid, con->usr_data);
+
+       kfree(con);
+}
+
+static void conn_put(struct tipc_conn *con)
+{
+       kref_put(&con->kref, tipc_conn_kref_release);
+}
+
+static void conn_get(struct tipc_conn *con)
+{
+       kref_get(&con->kref);
+}
+
+static struct tipc_conn *tipc_conn_lookup(struct tipc_server *s, int conid)
+{
+       struct tipc_conn *con;
+
+       spin_lock_bh(&s->idr_lock);
+       con = idr_find(&s->conn_idr, conid);
+       if (con)
+               conn_get(con);
+       spin_unlock_bh(&s->idr_lock);
+       return con;
+}
+
+static void sock_data_ready(struct sock *sk, int unused)
+{
+       struct tipc_conn *con;
+
+       read_lock(&sk->sk_callback_lock);
+       con = sock2con(sk);
+       if (con && test_bit(CF_CONNECTED, &con->flags)) {
+               conn_get(con);
+               if (!queue_work(con->server->rcv_wq, &con->rwork))
+                       conn_put(con);
+       }
+       read_unlock(&sk->sk_callback_lock);
+}
+
+static void sock_write_space(struct sock *sk)
+{
+       struct tipc_conn *con;
+
+       read_lock(&sk->sk_callback_lock);
+       con = sock2con(sk);
+       if (con && test_bit(CF_CONNECTED, &con->flags)) {
+               conn_get(con);
+               if (!queue_work(con->server->send_wq, &con->swork))
+                       conn_put(con);
+       }
+       read_unlock(&sk->sk_callback_lock);
+}
+
+static void tipc_register_callbacks(struct socket *sock, struct tipc_conn *con)
+{
+       struct sock *sk = sock->sk;
+
+       write_lock_bh(&sk->sk_callback_lock);
+
+       sk->sk_data_ready = sock_data_ready;
+       sk->sk_write_space = sock_write_space;
+       sk->sk_user_data = con;
+
+       con->sock = sock;
+
+       write_unlock_bh(&sk->sk_callback_lock);
+}
+
+static void tipc_unregister_callbacks(struct tipc_conn *con)
+{
+       struct sock *sk = con->sock->sk;
+
+       write_lock_bh(&sk->sk_callback_lock);
+       sk->sk_user_data = NULL;
+       write_unlock_bh(&sk->sk_callback_lock);
+}
+
+static void tipc_close_conn(struct tipc_conn *con)
+{
+       struct tipc_server *s = con->server;
+
+       if (test_and_clear_bit(CF_CONNECTED, &con->flags)) {
+               spin_lock_bh(&s->idr_lock);
+               idr_remove(&s->conn_idr, con->conid);
+               s->idr_in_use--;
+               spin_unlock_bh(&s->idr_lock);
+
+               tipc_unregister_callbacks(con);
+
+               /* We shouldn't flush pending works as we may be in the
+                * thread. In fact the races with pending rx/tx work structs
+                * are harmless for us here as we have already deleted this
+                * connection from server connection list and set
+                * sk->sk_user_data to 0 before releasing connection object.
+                */
+               kernel_sock_shutdown(con->sock, SHUT_RDWR);
+
+               conn_put(con);
+       }
+}
+
+static struct tipc_conn *tipc_alloc_conn(struct tipc_server *s)
+{
+       struct tipc_conn *con;
+       int ret;
+
+       con = kzalloc(sizeof(struct tipc_conn), GFP_ATOMIC);
+       if (!con)
+               return ERR_PTR(-ENOMEM);
+
+       kref_init(&con->kref);
+       INIT_LIST_HEAD(&con->outqueue);
+       spin_lock_init(&con->outqueue_lock);
+       INIT_WORK(&con->swork, tipc_send_work);
+       INIT_WORK(&con->rwork, tipc_recv_work);
+
+       spin_lock_bh(&s->idr_lock);
+       ret = idr_alloc(&s->conn_idr, con, 0, 0, GFP_ATOMIC);
+       if (ret < 0) {
+               kfree(con);
+               spin_unlock_bh(&s->idr_lock);
+               return ERR_PTR(-ENOMEM);
+       }
+       con->conid = ret;
+       s->idr_in_use++;
+       spin_unlock_bh(&s->idr_lock);
+
+       set_bit(CF_CONNECTED, &con->flags);
+       con->server = s;
+
+       return con;
+}
+
+static int tipc_receive_from_sock(struct tipc_conn *con)
+{
+       struct msghdr msg = {};
+       struct tipc_server *s = con->server;
+       struct sockaddr_tipc addr;
+       struct kvec iov;
+       void *buf;
+       int ret;
+
+       buf = kmem_cache_alloc(s->rcvbuf_cache, GFP_ATOMIC);
+       if (!buf) {
+               ret = -ENOMEM;
+               goto out_close;
+       }
+
+       iov.iov_base = buf;
+       iov.iov_len = s->max_rcvbuf_size;
+       msg.msg_name = &addr;
+       ret = kernel_recvmsg(con->sock, &msg, &iov, 1, iov.iov_len,
+                            MSG_DONTWAIT);
+       if (ret <= 0) {
+               kmem_cache_free(s->rcvbuf_cache, buf);
+               goto out_close;
+       }
+
+       s->tipc_conn_recvmsg(con->conid, &addr, con->usr_data, buf, ret);
+
+       kmem_cache_free(s->rcvbuf_cache, buf);
+
+       return 0;
+
+out_close:
+       if (ret != -EWOULDBLOCK)
+               tipc_close_conn(con);
+       else if (ret == 0)
+               /* Don't return success if we really got EOF */
+               ret = -EAGAIN;
+
+       return ret;
+}
+
+static int tipc_accept_from_sock(struct tipc_conn *con)
+{
+       struct tipc_server *s = con->server;
+       struct socket *sock = con->sock;
+       struct socket *newsock;
+       struct tipc_conn *newcon;
+       int ret;
+
+       ret = tipc_sock_accept_local(sock, &newsock, O_NONBLOCK);
+       if (ret < 0)
+               return ret;
+
+       newcon = tipc_alloc_conn(con->server);
+       if (IS_ERR(newcon)) {
+               ret = PTR_ERR(newcon);
+               sock_release(newsock);
+               return ret;
+       }
+
+       newcon->rx_action = tipc_receive_from_sock;
+       tipc_register_callbacks(newsock, newcon);
+
+       /* Notify that new connection is incoming */
+       newcon->usr_data = s->tipc_conn_new(newcon->conid);
+
+       /* Wake up receive process in case of 'SYN+' message */
+       newsock->sk->sk_data_ready(newsock->sk, 0);
+       return ret;
+}
+
+static struct socket *tipc_create_listen_sock(struct tipc_conn *con)
+{
+       struct tipc_server *s = con->server;
+       struct socket *sock = NULL;
+       int ret;
+
+       ret = tipc_sock_create_local(s->type, &sock);
+       if (ret < 0)
+               return NULL;
+       ret = kernel_setsockopt(sock, SOL_TIPC, TIPC_IMPORTANCE,
+                               (char *)&s->imp, sizeof(s->imp));
+       if (ret < 0)
+               goto create_err;
+       ret = kernel_bind(sock, (struct sockaddr *)s->saddr, sizeof(*s->saddr));
+       if (ret < 0)
+               goto create_err;
+
+       switch (s->type) {
+       case SOCK_STREAM:
+       case SOCK_SEQPACKET:
+               con->rx_action = tipc_accept_from_sock;
+
+               ret = kernel_listen(sock, 0);
+               if (ret < 0)
+                       goto create_err;
+               break;
+       case SOCK_DGRAM:
+       case SOCK_RDM:
+               con->rx_action = tipc_receive_from_sock;
+               break;
+       default:
+               pr_err("Unknown socket type %d\n", s->type);
+               goto create_err;
+       }
+       return sock;
+
+create_err:
+       sock_release(sock);
+       con->sock = NULL;
+       return NULL;
+}
+
+static int tipc_open_listening_sock(struct tipc_server *s)
+{
+       struct socket *sock;
+       struct tipc_conn *con;
+
+       con = tipc_alloc_conn(s);
+       if (IS_ERR(con))
+               return PTR_ERR(con);
+
+       sock = tipc_create_listen_sock(con);
+       if (!sock)
+               return -EINVAL;
+
+       tipc_register_callbacks(sock, con);
+       return 0;
+}
+
+static struct outqueue_entry *tipc_alloc_entry(void *data, int len)
+{
+       struct outqueue_entry *entry;
+       void *buf;
+
+       entry = kmalloc(sizeof(struct outqueue_entry), GFP_ATOMIC);
+       if (!entry)
+               return NULL;
+
+       buf = kmalloc(len, GFP_ATOMIC);
+       if (!buf) {
+               kfree(entry);
+               return NULL;
+       }
+
+       memcpy(buf, data, len);
+       entry->iov.iov_base = buf;
+       entry->iov.iov_len = len;
+
+       return entry;
+}
+
+static void tipc_free_entry(struct outqueue_entry *e)
+{
+       kfree(e->iov.iov_base);
+       kfree(e);
+}
+
+static void tipc_clean_outqueues(struct tipc_conn *con)
+{
+       struct outqueue_entry *e, *safe;
+
+       spin_lock_bh(&con->outqueue_lock);
+       list_for_each_entry_safe(e, safe, &con->outqueue, list) {
+               list_del(&e->list);
+               tipc_free_entry(e);
+       }
+       spin_unlock_bh(&con->outqueue_lock);
+}
+
+int tipc_conn_sendmsg(struct tipc_server *s, int conid,
+                     struct sockaddr_tipc *addr, void *data, size_t len)
+{
+       struct outqueue_entry *e;
+       struct tipc_conn *con;
+
+       con = tipc_conn_lookup(s, conid);
+       if (!con)
+               return -EINVAL;
+
+       e = tipc_alloc_entry(data, len);
+       if (!e) {
+               conn_put(con);
+               return -ENOMEM;
+       }
+
+       if (addr)
+               memcpy(&e->dest, addr, sizeof(struct sockaddr_tipc));
+
+       spin_lock_bh(&con->outqueue_lock);
+       list_add_tail(&e->list, &con->outqueue);
+       spin_unlock_bh(&con->outqueue_lock);
+
+       if (test_bit(CF_CONNECTED, &con->flags))
+               if (!queue_work(s->send_wq, &con->swork))
+                       conn_put(con);
+
+       return 0;
+}
+
+void tipc_conn_terminate(struct tipc_server *s, int conid)
+{
+       struct tipc_conn *con;
+
+       con = tipc_conn_lookup(s, conid);
+       if (con) {
+               tipc_close_conn(con);
+               conn_put(con);
+       }
+}
+
+static void tipc_send_to_sock(struct tipc_conn *con)
+{
+       int count = 0;
+       struct tipc_server *s = con->server;
+       struct outqueue_entry *e;
+       struct msghdr msg;
+       int ret;
+
+       spin_lock_bh(&con->outqueue_lock);
+       while (1) {
+               e = list_entry(con->outqueue.next, struct outqueue_entry,
+                              list);
+               if ((struct list_head *) e == &con->outqueue)
+                       break;
+               spin_unlock_bh(&con->outqueue_lock);
+
+               memset(&msg, 0, sizeof(msg));
+               msg.msg_flags = MSG_DONTWAIT;
+
+               if (s->type == SOCK_DGRAM || s->type == SOCK_RDM) {
+                       msg.msg_name = &e->dest;
+                       msg.msg_namelen = sizeof(struct sockaddr_tipc);
+               }
+               ret = kernel_sendmsg(con->sock, &msg, &e->iov, 1,
+                                    e->iov.iov_len);
+               if (ret == -EWOULDBLOCK || ret == 0) {
+                       cond_resched();
+                       goto out;
+               } else if (ret < 0) {
+                       goto send_err;
+               }
+
+               /* Don't starve users filling buffers */
+               if (++count >= MAX_SEND_MSG_COUNT) {
+                       cond_resched();
+                       count = 0;
+               }
+
+               spin_lock_bh(&con->outqueue_lock);
+               list_del(&e->list);
+               tipc_free_entry(e);
+       }
+       spin_unlock_bh(&con->outqueue_lock);
+out:
+       return;
+
+send_err:
+       tipc_close_conn(con);
+}
+
+static void tipc_recv_work(struct work_struct *work)
+{
+       struct tipc_conn *con = container_of(work, struct tipc_conn, rwork);
+       int count = 0;
+
+       while (test_bit(CF_CONNECTED, &con->flags)) {
+               if (con->rx_action(con))
+                       break;
+
+               /* Don't flood Rx machine */
+               if (++count >= MAX_RECV_MSG_COUNT) {
+                       cond_resched();
+                       count = 0;
+               }
+       }
+       conn_put(con);
+}
+
+static void tipc_send_work(struct work_struct *work)
+{
+       struct tipc_conn *con = container_of(work, struct tipc_conn, swork);
+
+       if (test_bit(CF_CONNECTED, &con->flags))
+               tipc_send_to_sock(con);
+
+       conn_put(con);
+}
+
+static void tipc_work_stop(struct tipc_server *s)
+{
+       destroy_workqueue(s->rcv_wq);
+       destroy_workqueue(s->send_wq);
+}
+
+static int tipc_work_start(struct tipc_server *s)
+{
+       s->rcv_wq = alloc_workqueue("tipc_rcv", WQ_UNBOUND, 1);
+       if (!s->rcv_wq) {
+               pr_err("can't start tipc receive workqueue\n");
+               return -ENOMEM;
+       }
+
+       s->send_wq = alloc_workqueue("tipc_send", WQ_UNBOUND, 1);
+       if (!s->send_wq) {
+               pr_err("can't start tipc send workqueue\n");
+               destroy_workqueue(s->rcv_wq);
+               return -ENOMEM;
+       }
+
+       return 0;
+}
+
+int tipc_server_start(struct tipc_server *s)
+{
+       int ret;
+
+       spin_lock_init(&s->idr_lock);
+       idr_init(&s->conn_idr);
+       s->idr_in_use = 0;
+
+       s->rcvbuf_cache = kmem_cache_create(s->name, s->max_rcvbuf_size,
+                                           0, SLAB_HWCACHE_ALIGN, NULL);
+       if (!s->rcvbuf_cache)
+               return -ENOMEM;
+
+       ret = tipc_work_start(s);
+       if (ret < 0) {
+               kmem_cache_destroy(s->rcvbuf_cache);
+               return ret;
+       }
+       s->enabled = 1;
+
+       return tipc_open_listening_sock(s);
+}
+
+void tipc_server_stop(struct tipc_server *s)
+{
+       struct tipc_conn *con;
+       int total = 0;
+       int id;
+
+       if (!s->enabled)
+               return;
+
+       s->enabled = 0;
+       spin_lock_bh(&s->idr_lock);
+       for (id = 0; total < s->idr_in_use; id++) {
+               con = idr_find(&s->conn_idr, id);
+               if (con) {
+                       total++;
+                       spin_unlock_bh(&s->idr_lock);
+                       tipc_close_conn(con);
+                       spin_lock_bh(&s->idr_lock);
+               }
+       }
+       spin_unlock_bh(&s->idr_lock);
+
+       tipc_work_stop(s);
+       kmem_cache_destroy(s->rcvbuf_cache);
+       idr_destroy(&s->conn_idr);
+}
 
--- /dev/null
+/*
+ * net/tipc/server.h: Include file for TIPC server code
+ *
+ * Copyright (c) 2012-2013, Wind River Systems
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above copyright
+ *    notice, this list of conditions and the following disclaimer.
+ * 2. Redistributions in binary form must reproduce the above copyright
+ *    notice, this list of conditions and the following disclaimer in the
+ *    documentation and/or other materials provided with the distribution.
+ * 3. Neither the names of the copyright holders nor the names of its
+ *    contributors may be used to endorse or promote products derived from
+ *    this software without specific prior written permission.
+ *
+ * Alternatively, this software may be distributed under the terms of the
+ * GNU General Public License ("GPL") version 2 as published by the Free
+ * Software Foundation.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
+ * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+ * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+ * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+ * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+ * POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#ifndef _TIPC_SERVER_H
+#define _TIPC_SERVER_H
+
+#include "core.h"
+
+#define TIPC_SERVER_NAME_LEN   32
+
+/**
+ * struct tipc_server - TIPC server structure
+ * @conn_idr: identifier set of connection
+ * @idr_lock: protect the connection identifier set
+ * @idr_in_use: amount of allocated identifier entry
+ * @rcvbuf_cache: memory cache of server receive buffer
+ * @rcv_wq: receive workqueue
+ * @send_wq: send workqueue
+ * @max_rcvbuf_size: maximum permitted receive message length
+ * @tipc_conn_new: callback will be called when new connection is incoming
+ * @tipc_conn_shutdown: callback will be called when connection is shut down
+ * @tipc_conn_recvmsg: callback will be called when message arrives
+ * @saddr: TIPC server address
+ * @name: server name
+ * @imp: message importance
+ * @type: socket type
+ * @enabled: identify whether server is launched or not
+ */
+struct tipc_server {
+       struct idr conn_idr;
+       spinlock_t idr_lock;
+       int idr_in_use;
+       struct kmem_cache *rcvbuf_cache;
+       struct workqueue_struct *rcv_wq;
+       struct workqueue_struct *send_wq;
+       int max_rcvbuf_size;
+       void *(*tipc_conn_new) (int conid);
+       void (*tipc_conn_shutdown) (int conid, void *usr_data);
+       void (*tipc_conn_recvmsg) (int conid, struct sockaddr_tipc *addr,
+                                  void *usr_data, void *buf, size_t len);
+       struct sockaddr_tipc *saddr;
+       const char name[TIPC_SERVER_NAME_LEN];
+       int imp;
+       int type;
+       int enabled;
+};
+
+int tipc_conn_sendmsg(struct tipc_server *s, int conid,
+                     struct sockaddr_tipc *addr, void *data, size_t len);
+
+/**
+ * tipc_conn_terminate - terminate connection with server
+ *
+ * Note: Must call it in process context since it might sleep
+ */
+void tipc_conn_terminate(struct tipc_server *s, int conid);
+
+int tipc_server_start(struct tipc_server *s);
+
+void tipc_server_stop(struct tipc_server *s);
+
+#endif
 
  * net/tipc/socket.c: TIPC socket API
  *
  * Copyright (c) 2001-2007, 2012 Ericsson AB
- * Copyright (c) 2004-2008, 2010-2012, Wind River Systems
+ * Copyright (c) 2004-2008, 2010-2013, Wind River Systems
  * All rights reserved.
  *
  * Redistribution and use in source and binary forms, with or without
 static void wakeupdispatch(struct tipc_port *tport);
 static void tipc_data_ready(struct sock *sk, int len);
 static void tipc_write_space(struct sock *sk);
+static int release(struct socket *sock);
+static int accept(struct socket *sock, struct socket *new_sock, int flags);
 
 static const struct proto_ops packet_ops;
 static const struct proto_ops stream_ops;
 static const struct proto_ops msg_ops;
 
 static struct proto tipc_proto;
+static struct proto tipc_proto_kern;
 
 static int sockets_enabled;
 
 }
 
 /**
- * tipc_create - create a TIPC socket
+ * tipc_sk_create - create a TIPC socket
  * @net: network namespace (must be default network)
  * @sock: pre-allocated socket structure
  * @protocol: protocol indicator (must be 0)
  *
  * Returns 0 on success, errno otherwise
  */
-static int tipc_create(struct net *net, struct socket *sock, int protocol,
-                      int kern)
+static int tipc_sk_create(struct net *net, struct socket *sock, int protocol,
+                         int kern)
 {
        const struct proto_ops *ops;
        socket_state state;
        }
 
        /* Allocate socket's protocol area */
-       sk = sk_alloc(net, AF_TIPC, GFP_KERNEL, &tipc_proto);
+       if (!kern)
+               sk = sk_alloc(net, AF_TIPC, GFP_KERNEL, &tipc_proto);
+       else
+               sk = sk_alloc(net, AF_TIPC, GFP_KERNEL, &tipc_proto_kern);
+
        if (sk == NULL)
                return -ENOMEM;
 
        return 0;
 }
 
+/**
+ * tipc_sock_create_local - create TIPC socket from inside TIPC module
+ * @type: socket type - SOCK_RDM or SOCK_SEQPACKET
+ *
+ * We cannot use sock_creat_kern here because it bumps module user count.
+ * Since socket owner and creator is the same module we must make sure
+ * that module count remains zero for module local sockets, otherwise
+ * we cannot do rmmod.
+ *
+ * Returns 0 on success, errno otherwise
+ */
+int tipc_sock_create_local(int type, struct socket **res)
+{
+       int rc;
+       struct sock *sk;
+
+       rc = sock_create_lite(AF_TIPC, type, 0, res);
+       if (rc < 0) {
+               pr_err("Failed to create kernel socket\n");
+               return rc;
+       }
+       tipc_sk_create(&init_net, *res, 0, 1);
+
+       sk = (*res)->sk;
+
+       return 0;
+}
+
+/**
+ * tipc_sock_release_local - release socket created by tipc_sock_create_local
+ * @sock: the socket to be released.
+ *
+ * Module reference count is not incremented when such sockets are created,
+ * so we must keep it from being decremented when they are released.
+ */
+void tipc_sock_release_local(struct socket *sock)
+{
+       release(sock);
+       sock->ops = NULL;
+       sock_release(sock);
+}
+
+/**
+ * tipc_sock_accept_local - accept a connection on a socket created
+ * with tipc_sock_create_local. Use this function to avoid that
+ * module reference count is inadvertently incremented.
+ *
+ * @sock:    the accepting socket
+ * @newsock: reference to the new socket to be created
+ * @flags:   socket flags
+ */
+
+int tipc_sock_accept_local(struct socket *sock, struct socket **newsock,
+                           int flags)
+{
+       struct sock *sk = sock->sk;
+       int ret;
+
+       ret = sock_create_lite(sk->sk_family, sk->sk_type,
+                              sk->sk_protocol, newsock);
+       if (ret < 0)
+               return ret;
+
+       ret = accept(sock, *newsock, flags);
+       if (ret < 0) {
+               sock_release(*newsock);
+               return ret;
+       }
+       (*newsock)->ops = sock->ops;
+       return ret;
+}
+
 /**
  * release - destroy a TIPC socket
  * @sock: socket to destroy
 
        buf = skb_peek(&sk->sk_receive_queue);
 
-       res = tipc_create(sock_net(sock->sk), new_sock, 0, 0);
+       res = tipc_sk_create(sock_net(sock->sk), new_sock, 0, 1);
        if (res)
                goto exit;
 
 static const struct net_proto_family tipc_family_ops = {
        .owner          = THIS_MODULE,
        .family         = AF_TIPC,
-       .create         = tipc_create
+       .create         = tipc_sk_create
 };
 
 static struct proto tipc_proto = {
        .sysctl_rmem    = sysctl_tipc_rmem
 };
 
+static struct proto tipc_proto_kern = {
+       .name           = "TIPC",
+       .obj_size       = sizeof(struct tipc_sock),
+       .sysctl_rmem    = sysctl_tipc_rmem
+};
+
 /**
  * tipc_socket_init - initialize TIPC socket interface
  *