If an attempt to wake up users of broadcast link is made when there is
no enough place in send queue than it may hang up inside the
tipc_sk_rcv() function since the loop breaks only after the wake up
queue becomes empty. This can lead to complete CPU stall with the
following message generated by RCU:
INFO: rcu_sched self-detected stall on CPU { 0}  (t=2101 jiffies
					g=54225 c=54224 q=11465)
Task dump for CPU 0:
tpch            R  running task        0 39949  39948 0x0000000a
 
ffffffff818536c0 ffff88181fa037a0 ffffffff8106a4be 0000000000000000
 ffffffff818536c0 ffff88181fa037c0 ffffffff8106d8a8 ffff88181fa03800
 0000000000000001 ffff88181fa037f0 ffffffff81094a50 ffff88181fa15680
Call Trace:
 <IRQ>  [<
ffffffff8106a4be>] sched_show_task+0xae/0x120
 [<
ffffffff8106d8a8>] dump_cpu_task+0x38/0x40
 [<
ffffffff81094a50>] rcu_dump_cpu_stacks+0x90/0xd0
 [<
ffffffff81097c3b>] rcu_check_callbacks+0x3eb/0x6e0
 [<
ffffffff8106e53f>] ? account_system_time+0x7f/0x170
 [<
ffffffff81099e64>] update_process_times+0x34/0x60
 [<
ffffffff810a84d1>] tick_sched_handle.isra.18+0x31/0x40
 [<
ffffffff810a851c>] tick_sched_timer+0x3c/0x70
 [<
ffffffff8109a43d>] __run_hrtimer.isra.34+0x3d/0xc0
 [<
ffffffff8109aa95>] hrtimer_interrupt+0xc5/0x1e0
 [<
ffffffff81030d52>] ? native_smp_send_reschedule+0x42/0x60
 [<
ffffffff81032f04>] local_apic_timer_interrupt+0x34/0x60
 [<
ffffffff810335bc>] smp_apic_timer_interrupt+0x3c/0x60
 [<
ffffffff8165a3fb>] apic_timer_interrupt+0x6b/0x70
 [<
ffffffff81659129>] ? _raw_spin_unlock_irqrestore+0x9/0x10
 [<
ffffffff8107eb9f>] __wake_up_sync_key+0x4f/0x60
 [<
ffffffffa313ddd1>] tipc_write_space+0x31/0x40 [tipc]
 [<
ffffffffa313dadf>] filter_rcv+0x31f/0x520 [tipc]
 [<
ffffffffa313d699>] ? tipc_sk_lookup+0xc9/0x110 [tipc]
 [<
ffffffff81659259>] ? _raw_spin_lock_bh+0x19/0x30
 [<
ffffffffa314122c>] tipc_sk_rcv+0x2dc/0x3e0 [tipc]
 [<
ffffffffa312e7ff>] tipc_bclink_wakeup_users+0x2f/0x40 [tipc]
 [<
ffffffffa313ce26>] tipc_node_unlock+0x186/0x190 [tipc]
 [<
ffffffff81597c1c>] ? kfree_skb+0x2c/0x40
 [<
ffffffffa313475c>] tipc_rcv+0x2ac/0x8c0 [tipc]
 [<
ffffffffa312ff58>] tipc_l2_rcv_msg+0x38/0x50 [tipc]
 [<
ffffffff815a76d3>] __netif_receive_skb_core+0x5a3/0x950
 [<
ffffffff815a98d3>] __netif_receive_skb+0x13/0x60
 [<
ffffffff815a993e>] netif_receive_skb_internal+0x1e/0x90
 [<
ffffffff815aa138>] napi_gro_receive+0x78/0xa0
 [<
ffffffffa07f93f4>] tg3_poll_work+0xc54/0xf40 [tg3]
 [<
ffffffff81597c8c>] ? consume_skb+0x2c/0x40
 [<
ffffffffa07f9721>] tg3_poll_msix+0x41/0x160 [tg3]
 [<
ffffffff815ab0f2>] net_rx_action+0xe2/0x290
 [<
ffffffff8104b92a>] __do_softirq+0xda/0x1f0
 [<
ffffffff8104bc26>] irq_exit+0x76/0xa0
 [<
ffffffff81004355>] do_IRQ+0x55/0xf0
 [<
ffffffff8165a12b>] common_interrupt+0x6b/0x6b
 <EOI>
The issue occurs only when tipc_sk_rcv() is used to wake up postponed
senders:
	tipc_bclink_wakeup_users()
		// wakeupq - is a queue which consists of special
		// 		 messages with SOCK_WAKEUP type.
		tipc_sk_rcv(wakeupq)
			...
			while (skb_queue_len(inputq)) {
				filter_rcv(skb)
					// Here the type of message is checked
					// and if it is SOCK_WAKEUP then
					// it tries to wake up a sender.
					tipc_write_space(sk)
						wake_up_interruptible_sync_poll()
			}
After the sender thread is woke up it can gather control and perform
an attempt to send a message. But if there is no enough place in send
queue it will call link_schedule_user() function which puts a message
of type SOCK_WAKEUP to the wakeup queue and put the sender to sleep.
Thus the size of the queue actually is not changed and the while()
loop never exits.
The approach I proposed is to wake up only senders for which there is
enough place in send queue so the described issue can't occur.
Moreover the same approach is already used to wake up senders on
unicast links.
I have got into the issue on our product code but to reproduce the
issue I changed a benchmark test application (from
tipcutils/demos/benchmark) to perform the following scenario:
	1. Run 64 instances of test application (nodes). It can be done
	   on the one physical machine.
	2. Each application connects to all other using TIPC sockets in
	   RDM mode.
	3. When setup is done all nodes start simultaneously send
	   broadcast messages.
	4. Everything hangs up.
The issue is reproducible only when a congestion on broadcast link
occurs. For example, when there are only 8 nodes it works fine since
congestion doesn't occur. Send queue limit is 40 in my case (I use a
critical importance level) and when 64 nodes send a message at the
same moment a congestion occurs every time.
Signed-off-by: Dmitry S Kolmakov <kolmakov.dmitriy@huawei.com>
Reviewed-by: Jon Maloy <jon.maloy@ericsson.com>
Acked-by: Ying Xue <ying.xue@windriver.com>
Signed-off-by: David S. Miller <davem@davemloft.net>
        }
 }
 
+/**
+ * bclink_prepare_wakeup - prepare users for wakeup after congestion
+ * @bcl: broadcast link
+ * @resultq: queue for users which can be woken up
+ * Move a number of waiting users, as permitted by available space in
+ * the send queue, from link wait queue to specified queue for wakeup
+ */
+static void bclink_prepare_wakeup(struct tipc_link *bcl, struct sk_buff_head *resultq)
+{
+       int pnd[TIPC_SYSTEM_IMPORTANCE + 1] = {0,};
+       int imp, lim;
+       struct sk_buff *skb, *tmp;
+
+       skb_queue_walk_safe(&bcl->wakeupq, skb, tmp) {
+               imp = TIPC_SKB_CB(skb)->chain_imp;
+               lim = bcl->window + bcl->backlog[imp].limit;
+               pnd[imp] += TIPC_SKB_CB(skb)->chain_sz;
+               if ((pnd[imp] + bcl->backlog[imp].len) >= lim)
+                       continue;
+               skb_unlink(skb, &bcl->wakeupq);
+               skb_queue_tail(resultq, skb);
+       }
+}
+
 /**
  * tipc_bclink_wakeup_users - wake up pending users
  *
 void tipc_bclink_wakeup_users(struct net *net)
 {
        struct tipc_net *tn = net_generic(net, tipc_net_id);
+       struct tipc_link *bcl = tn->bcl;
+       struct sk_buff_head resultq;
 
-       tipc_sk_rcv(net, &tn->bclink->link.wakeupq);
+       skb_queue_head_init(&resultq);
+       bclink_prepare_wakeup(bcl, &resultq);
+       tipc_sk_rcv(net, &resultq);
 }
 
 /**