wake_up(&node->shutdown_wait);
 }
 
-static struct midcomms_node *nodeid2node(int nodeid, gfp_t alloc)
+static struct midcomms_node *nodeid2node(int nodeid)
 {
-       struct midcomms_node *node, *tmp;
-       int r = nodeid_hash(nodeid);
+       return __find_node(nodeid, nodeid_hash(nodeid));
+}
+
+int dlm_midcomms_addr(int nodeid, struct sockaddr_storage *addr, int len)
+{
+       int ret, r = nodeid_hash(nodeid);
+       struct midcomms_node *node;
 
-       node = __find_node(nodeid, r);
-       if (node || !alloc)
-               return node;
+       ret = dlm_lowcomms_addr(nodeid, addr, len);
+       if (ret)
+               return ret;
 
-       node = kmalloc(sizeof(*node), alloc);
+       node = kmalloc(sizeof(*node), GFP_NOFS);
        if (!node)
-               return NULL;
+               return -ENOMEM;
 
        node->nodeid = nodeid;
        spin_lock_init(&node->state_lock);
        midcomms_node_reset(node);
 
        spin_lock(&nodes_lock);
-       /* check again if there was somebody else
-        * earlier here to add the node
-        */
-       tmp = __find_node(nodeid, r);
-       if (tmp) {
-               spin_unlock(&nodes_lock);
-               kfree(node);
-               return tmp;
-       }
-
        hlist_add_head_rcu(&node->hlist, &node_hash[r]);
        spin_unlock(&nodes_lock);
 
        node->debugfs = dlm_create_debug_comms_file(nodeid, node);
-       return node;
+       return 0;
 }
 
 static int dlm_send_ack(int nodeid, uint32_t seq)
        }
 }
 
-static struct midcomms_node *
-dlm_midcomms_recv_node_lookup(int nodeid, const union dlm_packet *p,
-                             uint16_t msglen, int (*cb)(struct midcomms_node *node))
-{
-       struct midcomms_node *node = NULL;
-       gfp_t allocation = 0;
-       int ret;
-
-       switch (p->header.h_cmd) {
-       case DLM_RCOM:
-               if (msglen < sizeof(struct dlm_rcom)) {
-                       log_print("rcom msg too small: %u, will skip this message from node %d",
-                                 msglen, nodeid);
-                       return NULL;
-               }
-
-               switch (p->rcom.rc_type) {
-               case cpu_to_le32(DLM_RCOM_NAMES):
-                       fallthrough;
-               case cpu_to_le32(DLM_RCOM_NAMES_REPLY):
-                       fallthrough;
-               case cpu_to_le32(DLM_RCOM_STATUS):
-                       fallthrough;
-               case cpu_to_le32(DLM_RCOM_STATUS_REPLY):
-                       node = nodeid2node(nodeid, 0);
-                       if (node) {
-                               spin_lock(&node->state_lock);
-                               if (node->state != DLM_ESTABLISHED)
-                                       pr_debug("receive begin RCOM msg from node %d with state %s\n",
-                                                node->nodeid, dlm_state_str(node->state));
-
-                               switch (node->state) {
-                               case DLM_CLOSED:
-                                       node->state = DLM_ESTABLISHED;
-                                       pr_debug("switch node %d to state %s\n",
-                                                node->nodeid, dlm_state_str(node->state));
-                                       break;
-                               case DLM_ESTABLISHED:
-                                       break;
-                               default:
-                                       spin_unlock(&node->state_lock);
-                                       return NULL;
-                               }
-                               spin_unlock(&node->state_lock);
-                       }
-
-                       allocation = GFP_NOFS;
-                       break;
-               default:
-                       break;
-               }
-
-               break;
-       default:
-               break;
-       }
-
-       node = nodeid2node(nodeid, allocation);
-       if (!node) {
-               switch (p->header.h_cmd) {
-               case DLM_OPTS:
-                       if (msglen < sizeof(struct dlm_opts)) {
-                               log_print("opts msg too small: %u, will skip this message from node %d",
-                                         msglen, nodeid);
-                               return NULL;
-                       }
-
-                       log_print_ratelimited("received dlm opts message nextcmd %d from node %d in an invalid sequence",
-                                             p->opts.o_nextcmd, nodeid);
-                       break;
-               default:
-                       log_print_ratelimited("received dlm message cmd %d from node %d in an invalid sequence",
-                                             p->header.h_cmd, nodeid);
-                       break;
-               }
-
-               return NULL;
-       }
-
-       ret = cb(node);
-       if (ret < 0)
-               return NULL;
-
-       return node;
-}
-
-static int dlm_midcomms_version_check_3_2(struct midcomms_node *node)
-{
-       switch (node->version) {
-       case DLM_VERSION_NOT_SET:
-               node->version = DLM_VERSION_3_2;
-               wake_up(&node->shutdown_wait);
-               log_print("version 0x%08x for node %d detected", DLM_VERSION_3_2,
-                         node->nodeid);
-               break;
-       case DLM_VERSION_3_2:
-               break;
-       default:
-               log_print_ratelimited("version mismatch detected, assumed 0x%08x but node %d has 0x%08x",
-                                     DLM_VERSION_3_2, node->nodeid, node->version);
-               return -1;
-       }
-
-       return 0;
-}
-
 static int dlm_opts_check_msglen(const union dlm_packet *p, uint16_t msglen,
                                 int nodeid)
 {
        int ret, idx;
 
        idx = srcu_read_lock(&nodes_srcu);
-       node = dlm_midcomms_recv_node_lookup(nodeid, p, msglen,
-                                            dlm_midcomms_version_check_3_2);
-       if (!node)
+       node = nodeid2node(nodeid);
+       if (WARN_ON_ONCE(!node))
+               goto out;
+
+       switch (node->version) {
+       case DLM_VERSION_NOT_SET:
+               node->version = DLM_VERSION_3_2;
+               wake_up(&node->shutdown_wait);
+               log_print("version 0x%08x for node %d detected", DLM_VERSION_3_2,
+                         node->nodeid);
+
+               spin_lock(&node->state_lock);
+               switch (node->state) {
+               case DLM_CLOSED:
+                       node->state = DLM_ESTABLISHED;
+                       pr_debug("switch node %d to state %s\n",
+                                node->nodeid, dlm_state_str(node->state));
+                       break;
+               default:
+                       break;
+               }
+               spin_unlock(&node->state_lock);
+
+               break;
+       case DLM_VERSION_3_2:
+               break;
+       default:
+               log_print_ratelimited("version mismatch detected, assumed 0x%08x but node %d has 0x%08x",
+                                     DLM_VERSION_3_2, node->nodeid, node->version);
                goto out;
+       }
 
        switch (p->header.h_cmd) {
        case DLM_RCOM:
        srcu_read_unlock(&nodes_srcu, idx);
 }
 
-static int dlm_midcomms_version_check_3_1(struct midcomms_node *node)
+static void dlm_midcomms_receive_buffer_3_1(const union dlm_packet *p, int nodeid)
 {
+       uint16_t msglen = le16_to_cpu(p->header.h_length);
+       struct midcomms_node *node;
+       int idx;
+
+       idx = srcu_read_lock(&nodes_srcu);
+       node = nodeid2node(nodeid);
+       if (WARN_ON_ONCE(!node)) {
+               srcu_read_unlock(&nodes_srcu, idx);
+               return;
+       }
+
        switch (node->version) {
        case DLM_VERSION_NOT_SET:
                node->version = DLM_VERSION_3_1;
        default:
                log_print_ratelimited("version mismatch detected, assumed 0x%08x but node %d has 0x%08x",
                                      DLM_VERSION_3_1, node->nodeid, node->version);
-               return -1;
-       }
-
-       return 0;
-}
-
-static void dlm_midcomms_receive_buffer_3_1(const union dlm_packet *p, int nodeid)
-{
-       uint16_t msglen = le16_to_cpu(p->header.h_length);
-       struct midcomms_node *node;
-       int idx;
-
-       idx = srcu_read_lock(&nodes_srcu);
-       node = dlm_midcomms_recv_node_lookup(nodeid, p, msglen,
-                                            dlm_midcomms_version_check_3_1);
-       if (!node) {
                srcu_read_unlock(&nodes_srcu, idx);
                return;
        }
        int idx, ret;
 
        idx = srcu_read_lock(&nodes_srcu);
-       node = nodeid2node(nodeid, 0);
-       if (!node) {
+       node = nodeid2node(nodeid);
+       if (WARN_ON_ONCE(!node)) {
                srcu_read_unlock(&nodes_srcu, idx);
                return;
        }
        int idx;
 
        idx = srcu_read_lock(&nodes_srcu);
-       node = nodeid2node(nodeid, 0);
-       if (!node) {
-               WARN_ON_ONCE(1);
+       node = nodeid2node(nodeid);
+       if (WARN_ON_ONCE(!node))
                goto err;
-       }
 
        /* this is a bug, however we going on and hope it will be resolved */
        WARN_ON_ONCE(test_bit(DLM_NODE_FLAG_STOP_TX, &node->flags));
        dlm_lowcomms_init();
 }
 
+static void midcomms_node_release(struct rcu_head *rcu)
+{
+       struct midcomms_node *node = container_of(rcu, struct midcomms_node, rcu);
+
+       WARN_ON_ONCE(atomic_read(&node->send_queue_cnt));
+       dlm_send_queue_flush(node);
+       kfree(node);
+}
+
 void dlm_midcomms_exit(void)
 {
+       struct midcomms_node *node;
+       int i, idx;
+
+       idx = srcu_read_lock(&nodes_srcu);
+       for (i = 0; i < CONN_HASH_SIZE; i++) {
+               hlist_for_each_entry_rcu(node, &node_hash[i], hlist) {
+                       dlm_delete_debug_comms_file(node->debugfs);
+
+                       spin_lock(&nodes_lock);
+                       hlist_del_rcu(&node->hlist);
+                       spin_unlock(&nodes_lock);
+
+                       call_srcu(&nodes_srcu, &node->rcu, midcomms_node_release);
+               }
+       }
+       srcu_read_unlock(&nodes_srcu, idx);
+
        dlm_lowcomms_exit();
 }
 
        int idx;
 
        idx = srcu_read_lock(&nodes_srcu);
-       node = nodeid2node(nodeid, GFP_NOFS);
-       if (!node) {
+       node = nodeid2node(nodeid);
+       if (WARN_ON_ONCE(!node)) {
                srcu_read_unlock(&nodes_srcu, idx);
                return;
        }
        int idx;
 
        idx = srcu_read_lock(&nodes_srcu);
-       node = nodeid2node(nodeid, 0);
-       if (!node) {
+       node = nodeid2node(nodeid);
+       if (WARN_ON_ONCE(!node)) {
                srcu_read_unlock(&nodes_srcu, idx);
                return;
        }
        srcu_read_unlock(&nodes_srcu, idx);
 }
 
-static void midcomms_node_release(struct rcu_head *rcu)
-{
-       struct midcomms_node *node = container_of(rcu, struct midcomms_node, rcu);
-
-       WARN_ON_ONCE(atomic_read(&node->send_queue_cnt));
-       dlm_send_queue_flush(node);
-       kfree(node);
-}
-
 void dlm_midcomms_version_wait(void)
 {
        struct midcomms_node *node;
                                 node->state == DLM_CLOSED ||
                                 test_bit(DLM_NODE_FLAG_CLOSE, &node->flags),
                                 DLM_SHUTDOWN_TIMEOUT);
-       if (!ret || test_bit(DLM_NODE_FLAG_CLOSE, &node->flags))
+       if (!ret)
                pr_debug("active shutdown timed out for node %d with state %s\n",
                         node->nodeid, dlm_state_str(node->state));
        else
        for (i = 0; i < CONN_HASH_SIZE; i++) {
                hlist_for_each_entry_rcu(node, &node_hash[i], hlist) {
                        midcomms_shutdown(node);
-
-                       dlm_delete_debug_comms_file(node->debugfs);
-
-                       spin_lock(&nodes_lock);
-                       hlist_del_rcu(&node->hlist);
-                       spin_unlock(&nodes_lock);
-
-                       call_srcu(&nodes_srcu, &node->rcu, midcomms_node_release);
                }
        }
        srcu_read_unlock(&nodes_srcu, idx);
 
        idx = srcu_read_lock(&nodes_srcu);
        /* Abort pending close/remove operation */
-       node = nodeid2node(nodeid, 0);
+       node = nodeid2node(nodeid);
        if (node) {
                /* let shutdown waiters leave */
                set_bit(DLM_NODE_FLAG_CLOSE, &node->flags);
 
        mutex_lock(&close_lock);
        idx = srcu_read_lock(&nodes_srcu);
-       node = nodeid2node(nodeid, 0);
+       node = nodeid2node(nodeid);
        if (!node) {
                srcu_read_unlock(&nodes_srcu, idx);
                mutex_unlock(&close_lock);
        }
 
        ret = dlm_lowcomms_close(nodeid);
-       spin_lock(&node->state_lock);
-       midcomms_node_reset(node);
-       spin_unlock(&node->state_lock);
+       dlm_delete_debug_comms_file(node->debugfs);
+
+       spin_lock(&nodes_lock);
+       hlist_del_rcu(&node->hlist);
+       spin_unlock(&nodes_lock);
        srcu_read_unlock(&nodes_srcu, idx);
+
+       /* wait that all readers left until flush send queue */
+       synchronize_srcu(&nodes_srcu);
+
+       /* drop all pending dlm messages, this is fine as
+        * this function get called when the node is fenced
+        */
+       dlm_send_queue_flush(node);
+
+       call_srcu(&nodes_srcu, &node->rcu, midcomms_node_release);
        mutex_unlock(&close_lock);
 
        return ret;