static DECLARE_WORK(process_work, process_dlm_messages);
 static DEFINE_SPINLOCK(processqueue_lock);
 static bool process_dlm_messages_pending;
+static DECLARE_WAIT_QUEUE_HEAD(processqueue_wq);
 static atomic_t processqueue_count;
 static LIST_HEAD(processqueue);
 
        }
 
        list_del(&pentry->list);
-       atomic_dec(&processqueue_count);
+       if (atomic_dec_and_test(&processqueue_count))
+               wake_up(&processqueue_wq);
        spin_unlock_bh(&processqueue_lock);
 
        for (;;) {
                }
 
                list_del(&pentry->list);
-               atomic_dec(&processqueue_count);
+               if (atomic_dec_and_test(&processqueue_count))
+                       wake_up(&processqueue_wq);
                spin_unlock_bh(&processqueue_lock);
        }
 }
                /* CF_RECV_PENDING cleared */
                break;
        case DLM_IO_FLUSH:
-               flush_workqueue(process_workqueue);
+               /* we can't flush the process_workqueue here because a
+                * WQ_MEM_RECLAIM workequeue can occurr a deadlock for a non
+                * WQ_MEM_RECLAIM workqueue such as process_workqueue. Instead
+                * we have a waitqueue to wait until all messages are
+                * processed.
+                *
+                * This handling is only necessary to backoff the sender and
+                * not queue all messages from the socket layer into DLM
+                * processqueue. When DLM is capable to parse multiple messages
+                * on an e.g. per socket basis this handling can might be
+                * removed. Especially in a message burst we are too slow to
+                * process messages and the queue will fill up memory.
+                */
+               wait_event(processqueue_wq, !atomic_read(&processqueue_count));
                fallthrough;
        case DLM_IO_RESCHED:
                cond_resched();
                return -ENOMEM;
        }
 
-       /* ordered dlm message process queue,
-        * should be converted to a tasklet
-        */
-       process_workqueue = alloc_ordered_workqueue("dlm_process",
-                                                   WQ_HIGHPRI | WQ_MEM_RECLAIM);
+       process_workqueue = alloc_workqueue("dlm_process", WQ_HIGHPRI | WQ_BH, 0);
        if (!process_workqueue) {
                log_print("can't start dlm_process");
                destroy_workqueue(io_workqueue);