spin_lock_bh(&s->idr_lock);
        con = idr_find(&s->conn_idr, conid);
-       if (con && test_bit(CF_CONNECTED, &con->flags))
-               conn_get(con);
-       else
-               con = NULL;
+       if (con) {
+               if (!test_bit(CF_CONNECTED, &con->flags) ||
+                   !kref_get_unless_zero(&con->kref))
+                       con = NULL;
+       }
        spin_unlock_bh(&s->idr_lock);
        return con;
 }
        write_unlock_bh(&sk->sk_callback_lock);
 }
 
-static void tipc_unregister_callbacks(struct tipc_conn *con)
-{
-       struct sock *sk = con->sock->sk;
-
-       write_lock_bh(&sk->sk_callback_lock);
-       sk->sk_user_data = NULL;
-       write_unlock_bh(&sk->sk_callback_lock);
-}
-
 static void tipc_close_conn(struct tipc_conn *con)
 {
        struct tipc_server *s = con->server;
+       struct sock *sk = con->sock->sk;
+       bool disconnect = false;
 
-       if (test_and_clear_bit(CF_CONNECTED, &con->flags)) {
-               if (con->sock)
-                       tipc_unregister_callbacks(con);
-
+       write_lock_bh(&sk->sk_callback_lock);
+       disconnect = test_and_clear_bit(CF_CONNECTED, &con->flags);
+       if (disconnect) {
+               sk->sk_user_data = NULL;
                if (con->conid)
                        s->tipc_conn_release(con->conid, con->usr_data);
-
-               /* We shouldn't flush pending works as we may be in the
-                * thread. In fact the races with pending rx/tx work structs
-                * are harmless for us here as we have already deleted this
-                * connection from server connection list.
-                */
-               if (con->sock)
-                       kernel_sock_shutdown(con->sock, SHUT_RDWR);
-               conn_put(con);
        }
+       write_unlock_bh(&sk->sk_callback_lock);
+
+       /* Handle concurrent calls from sending and receiving threads */
+       if (!disconnect)
+               return;
+
+       /* Don't flush pending works, -just let them expire */
+       kernel_sock_shutdown(con->sock, SHUT_RDWR);
+       conn_put(con);
 }
 
 static struct tipc_conn *tipc_alloc_conn(struct tipc_server *s)
 
 static int tipc_receive_from_sock(struct tipc_conn *con)
 {
-       struct msghdr msg = {};
        struct tipc_server *s = con->server;
+       struct sock *sk = con->sock->sk;
        struct sockaddr_tipc addr;
+       struct msghdr msg = {};
        struct kvec iov;
        void *buf;
        int ret;
                goto out_close;
        }
 
-       s->tipc_conn_recvmsg(sock_net(con->sock->sk), con->conid, &addr,
-                            con->usr_data, buf, ret);
-
+       read_lock_bh(&sk->sk_callback_lock);
+       if (test_bit(CF_CONNECTED, &con->flags))
+               ret = s->tipc_conn_recvmsg(sock_net(con->sock->sk), con->conid,
+                                          &addr, con->usr_data, buf, ret);
+       read_unlock_bh(&sk->sk_callback_lock);
        kmem_cache_free(s->rcvbuf_cache, buf);
-
-       return 0;
+       if (ret < 0)
+               tipc_conn_terminate(s, con->conid);
+       return ret;
 
 out_close:
        if (ret != -EWOULDBLOCK)
 void tipc_topsrv_kern_unsubscr(struct net *net, int conid)
 {
        struct tipc_conn *con;
+       struct tipc_server *srv;
 
        con = tipc_conn_lookup(tipc_topsrv(net), conid);
        if (!con)
                return;
-       tipc_close_conn(con);
+
+       test_and_clear_bit(CF_CONNECTED, &con->flags);
+       srv = con->server;
+       if (con->conid)
+               srv->tipc_conn_release(con->conid, con->usr_data);
+       conn_put(con);
        conn_put(con);
 }
 
 
        int max_rcvbuf_size;
        void *(*tipc_conn_new)(int conid);
        void (*tipc_conn_release)(int conid, void *usr_data);
-       void (*tipc_conn_recvmsg)(struct net *net, int conid,
-                                 struct sockaddr_tipc *addr, void *usr_data,
-                                 void *buf, size_t len);
+       int (*tipc_conn_recvmsg)(struct net *net, int conid,
+                                struct sockaddr_tipc *addr, void *usr_data,
+                                void *buf, size_t len);
        struct sockaddr_tipc *saddr;
        char name[TIPC_SERVER_NAME_LEN];
        int imp;
 
        return sub;
 }
 
-static void tipc_subscrp_subscribe(struct net *net, struct tipc_subscr *s,
-                                  struct tipc_subscriber *subscriber, int swap,
-                                  bool status)
+static int tipc_subscrp_subscribe(struct net *net, struct tipc_subscr *s,
+                                 struct tipc_subscriber *subscriber, int swap,
+                                 bool status)
 {
-       struct tipc_net *tn = net_generic(net, tipc_net_id);
        struct tipc_subscription *sub = NULL;
        u32 timeout;
 
        sub = tipc_subscrp_create(net, s, swap);
        if (!sub)
-               return tipc_conn_terminate(tn->topsrv, subscriber->conid);
+               return -1;
 
        spin_lock_bh(&subscriber->lock);
        list_add(&sub->subscrp_list, &subscriber->subscrp_list);
 
        if (timeout != TIPC_WAIT_FOREVER)
                mod_timer(&sub->timer, jiffies + msecs_to_jiffies(timeout));
+       return 0;
 }
 
 /* Handle one termination request for the subscriber */
 }
 
 /* Handle one request to create a new subscription for the subscriber */
-static void tipc_subscrb_rcv_cb(struct net *net, int conid,
-                               struct sockaddr_tipc *addr, void *usr_data,
-                               void *buf, size_t len)
+static int tipc_subscrb_rcv_cb(struct net *net, int conid,
+                              struct sockaddr_tipc *addr, void *usr_data,
+                              void *buf, size_t len)
 {
        struct tipc_subscriber *subscriber = usr_data;
        struct tipc_subscr *s = (struct tipc_subscr *)buf;
        /* Detect & process a subscription cancellation request */
        if (s->filter & htohl(TIPC_SUB_CANCEL, swap)) {
                s->filter &= ~htohl(TIPC_SUB_CANCEL, swap);
-               return tipc_subscrp_cancel(s, subscriber);
+               tipc_subscrp_cancel(s, subscriber);
+               return 0;
        }
        status = !(s->filter & htohl(TIPC_SUB_NO_STATUS, swap));
-       tipc_subscrp_subscribe(net, s, subscriber, swap, status);
+       return tipc_subscrp_subscribe(net, s, subscriber, swap, status);
 }
 
 /* Handle one request to establish a new subscriber */