struct list_head list;
 };
 
-static void add_processed_node(int nodeid, struct list_head *processed_nodes)
-{
-       struct dlm_processed_nodes *n;
-
-       list_for_each_entry(n, processed_nodes, list) {
-               /* we already remembered this node */
-               if (n->nodeid == nodeid)
-                       return;
-       }
-
-       /* if it's fails in worst case we simple don't send an ack back.
-        * We try it next time.
-        */
-       n = kmalloc(sizeof(*n), GFP_NOFS);
-       if (!n)
-               return;
-
-       n->nodeid = nodeid;
-       list_add(&n->list, processed_nodes);
-}
-
 static void process_dlm_messages(struct work_struct *work)
 {
-       struct dlm_processed_nodes *n, *n_tmp;
        struct processqueue_entry *pentry;
        LIST_HEAD(processed_nodes);
 
        for (;;) {
                dlm_process_incoming_buffer(pentry->nodeid, pentry->buf,
                                            pentry->buflen);
-               add_processed_node(pentry->nodeid, &processed_nodes);
                free_processqueue_entry(pentry);
 
                spin_lock(&processqueue_lock);
                list_del(&pentry->list);
                spin_unlock(&processqueue_lock);
        }
-
-       /* send ack back after we processed couple of messages */
-       list_for_each_entry_safe(n, n_tmp, &processed_nodes, list) {
-               list_del(&n->list);
-               dlm_midcomms_receive_done(n->nodeid);
-               kfree(n);
-       }
 }
 
 /* Data received from remote end */
 
 /* 5 seconds wait to sync ending of dlm */
 #define DLM_SHUTDOWN_TIMEOUT   msecs_to_jiffies(5000)
 #define DLM_VERSION_NOT_SET    0
+#define DLM_SEND_ACK_BACK_MSG_THRESHOLD 32
+#define DLM_RECV_ACK_BACK_MSG_THRESHOLD (DLM_SEND_ACK_BACK_MSG_THRESHOLD * 8)
 
 struct midcomms_node {
        int nodeid;
 #define DLM_NODE_FLAG_CLOSE    1
 #define DLM_NODE_FLAG_STOP_TX  2
 #define DLM_NODE_FLAG_STOP_RX  3
-#define DLM_NODE_ULP_DELIVERED 4
+       atomic_t ulp_delivered;
        unsigned long flags;
        wait_queue_head_t shutdown_wait;
 
 
        atomic_set(&node->seq_next, DLM_SEQ_INIT);
        atomic_set(&node->seq_send, DLM_SEQ_INIT);
+       atomic_set(&node->ulp_delivered, 0);
        node->version = DLM_VERSION_NOT_SET;
        node->flags = 0;
 
        return 0;
 }
 
+static void dlm_send_ack_threshold(struct midcomms_node *node,
+                                  uint32_t threshold)
+{
+       uint32_t oval, nval;
+       bool send_ack;
+
+       /* let only send one user trigger threshold to send ack back */
+       do {
+               oval = atomic_read(&node->ulp_delivered);
+               send_ack = (oval > threshold);
+               /* abort if threshold is not reached */
+               if (!send_ack)
+                       break;
+
+               nval = 0;
+               /* try to reset ulp_delivered counter */
+       } while (atomic_cmpxchg(&node->ulp_delivered, oval, nval) != oval);
+
+       if (send_ack)
+               dlm_send_ack(node->nodeid, atomic_read(&node->seq_next));
+}
+
 static int dlm_send_fin(struct midcomms_node *node,
                        void (*ack_rcv)(struct midcomms_node *node))
 {
                        WARN_ON_ONCE(test_bit(DLM_NODE_FLAG_STOP_RX, &node->flags));
                        dlm_receive_buffer_3_2_trace(seq, p);
                        dlm_receive_buffer(p, node->nodeid);
-                       set_bit(DLM_NODE_ULP_DELIVERED, &node->flags);
+                       atomic_inc(&node->ulp_delivered);
+                       /* unlikely case to send ack back when we don't transmit */
+                       dlm_send_ack_threshold(node, DLM_RECV_ACK_BACK_MSG_THRESHOLD);
                        break;
                }
        } else {
        return ret;
 }
 
-void dlm_midcomms_receive_done(int nodeid)
-{
-       struct midcomms_node *node;
-       int idx;
-
-       idx = srcu_read_lock(&nodes_srcu);
-       node = nodeid2node(nodeid, 0);
-       if (!node) {
-               srcu_read_unlock(&nodes_srcu, idx);
-               return;
-       }
-
-       /* old protocol, we do nothing */
-       switch (node->version) {
-       case DLM_VERSION_3_2:
-               break;
-       default:
-               srcu_read_unlock(&nodes_srcu, idx);
-               return;
-       }
-
-       /* do nothing if we didn't delivered stateful to ulp */
-       if (!test_and_clear_bit(DLM_NODE_ULP_DELIVERED,
-                               &node->flags)) {
-               srcu_read_unlock(&nodes_srcu, idx);
-               return;
-       }
-
-       spin_lock(&node->state_lock);
-       /* we only ack if state is ESTABLISHED */
-       switch (node->state) {
-       case DLM_ESTABLISHED:
-               spin_unlock(&node->state_lock);
-               dlm_send_ack(node->nodeid, atomic_read(&node->seq_next));
-               break;
-       default:
-               spin_unlock(&node->state_lock);
-               /* do nothing FIN has it's own ack send */
-               break;
-       }
-       srcu_read_unlock(&nodes_srcu, idx);
-}
-
 void dlm_midcomms_unack_msg_resend(int nodeid)
 {
        struct midcomms_node *node;
                        goto err;
                }
 
+               /* send ack back if necessary */
+               dlm_send_ack_threshold(node, DLM_SEND_ACK_BACK_MSG_THRESHOLD);
                break;
        default:
                dlm_free_mhandle(mh);