/**
  * struct tipc_bc_base - link used for broadcast messages
- * @link: (non-standard) broadcast link structure
+ * @link: broadcast send link structure
  * @node: (non-standard) node structure representing b'cast link's peer node
  * @bcast_nodes: map of broadcast-capable nodes
  * @retransmit_to: node that most recently requested a retransmit
+ * @dest_nnt: array indicating number of reachable destinations per bearer
+ * @bearers: array of bearers, sorted by number of reachable destinations
  *
  * Handles sequence numbering, fragmentation, bundling, etc.
  */
        struct sk_buff_head arrvq;
        struct sk_buff_head inputq;
        struct sk_buff_head namedq;
+       int dests[MAX_BEARERS];
+       int primary_bearer;
        struct tipc_node_map bcast_nodes;
        struct tipc_node *retransmit_to;
 };
        bcbuf_set_acks(buf, bcbuf_acks(buf) - 1);
 }
 
+/* tipc_bcbase_select_primary(): find a bearer with links to all destinations,
+ *                               if any, and make it primary bearer
+ */
+static void tipc_bcbase_select_primary(struct net *net)
+{
+       struct tipc_bc_base *bb = tipc_bc_base(net);
+       int all_dests =  tipc_link_bc_peers(bb->link);
+       int i;
+
+       bb->primary_bearer = INVALID_BEARER_ID;
+
+       if (!all_dests)
+               return;
+
+       for (i = 0; i < MAX_BEARERS; i++) {
+               if (bb->dests[i] < all_dests)
+                       continue;
+
+               bb->primary_bearer = i;
+
+               /* Reduce risk that all nodes select same primary */
+               if ((i ^ tipc_own_addr(net)) & 1)
+                       break;
+       }
+}
+
+void tipc_bcast_inc_bearer_dst_cnt(struct net *net, int bearer_id)
+{
+       struct tipc_bc_base *bb = tipc_bc_base(net);
+
+       tipc_bcast_lock(net);
+       bb->dests[bearer_id]++;
+       tipc_bcbase_select_primary(net);
+       tipc_bcast_unlock(net);
+}
+
+void tipc_bcast_dec_bearer_dst_cnt(struct net *net, int bearer_id)
+{
+       struct tipc_bc_base *bb = tipc_bc_base(net);
+
+       tipc_bcast_lock(net);
+       bb->dests[bearer_id]--;
+       tipc_bcbase_select_primary(net);
+       tipc_bcast_unlock(net);
+}
+
 static void bclink_set_last_sent(struct net *net)
 {
        struct tipc_net *tn = net_generic(net, tipc_net_id);
        tipc_node_put(n_ptr);
 }
 
+/* tipc_bcbase_xmit - broadcast a packet queue across one or more bearers
+ *
+ * Note that number of reachable destinations, as indicated in the dests[]
+ * array, may transitionally differ from the number of destinations indicated
+ * in each sent buffer. We can sustain this. Excess destination nodes will
+ * drop and never acknowledge the unexpected packets, and missing destinations
+ * will either require retransmission (if they are just about to be added to
+ * the bearer), or be removed from the buffer's 'ackers' counter (if they
+ * just went down)
+ */
+static void tipc_bcbase_xmit(struct net *net, struct sk_buff_head *xmitq)
+{
+       int bearer_id;
+       struct tipc_bc_base *bb = tipc_bc_base(net);
+       struct sk_buff *skb, *_skb;
+       struct sk_buff_head _xmitq;
+
+       if (skb_queue_empty(xmitq))
+               return;
+
+       /* The typical case: at least one bearer has links to all nodes */
+       bearer_id = bb->primary_bearer;
+       if (bearer_id >= 0) {
+               tipc_bearer_bc_xmit(net, bearer_id, xmitq);
+               return;
+       }
+
+       /* We have to transmit across all bearers */
+       skb_queue_head_init(&_xmitq);
+       for (bearer_id = 0; bearer_id < MAX_BEARERS; bearer_id++) {
+               if (!bb->dests[bearer_id])
+                       continue;
+
+               skb_queue_walk(xmitq, skb) {
+                       _skb = pskb_copy_for_clone(skb, GFP_ATOMIC);
+                       if (!_skb)
+                               break;
+                       __skb_queue_tail(&_xmitq, _skb);
+               }
+               tipc_bearer_bc_xmit(net, bearer_id, &_xmitq);
+       }
+       __skb_queue_purge(xmitq);
+       __skb_queue_purge(&_xmitq);
+}
+
 /* tipc_bcast_xmit - deliver buffer chain to all nodes in cluster
  *                    and to identified node local sockets
  * @net: the applicable net namespace
        tipc_bcast_lock(net);
        if (tipc_link_bc_peers(l))
                rc = tipc_link_xmit(l, list, &xmitq);
-       bclink_set_last_sent(net);
        tipc_bcast_unlock(net);
 
        /* Don't send to local node if adding to link failed */
        }
 
        /* Broadcast to all nodes, inluding local node */
-       tipc_bcbearer_xmit(net, &xmitq);
+       tipc_bcbase_xmit(net, &xmitq);
        tipc_sk_mcast_rcv(net, &rcvq, &inputq);
        __skb_queue_purge(list);
        return 0;
                rc = tipc_link_rcv(l, skb, NULL);
        tipc_bcast_unlock(net);
 
-       if (!skb_queue_empty(&xmitq))
-               tipc_bcbearer_xmit(net, &xmitq);
+       tipc_bcbase_xmit(net, &xmitq);
 
        /* Any socket wakeup messages ? */
        if (!skb_queue_empty(inputq))
        tipc_link_bc_ack_rcv(l, acked, &xmitq);
        tipc_bcast_unlock(net);
 
-       tipc_bcbearer_xmit(net, &xmitq);
+       tipc_bcbase_xmit(net, &xmitq);
 
        /* Any socket wakeup messages ? */
        if (!skb_queue_empty(inputq))
        }
        tipc_bcast_unlock(net);
 
-       tipc_bcbearer_xmit(net, &xmitq);
+       tipc_bcbase_xmit(net, &xmitq);
 
        /* Any socket wakeup messages ? */
        if (!skb_queue_empty(inputq))
  *
  * RCU is locked, node lock is set
  */
-void tipc_bcast_add_peer(struct net *net, u32 addr, struct tipc_link *uc_l,
+void tipc_bcast_add_peer(struct net *net, struct tipc_link *uc_l,
                         struct sk_buff_head *xmitq)
 {
-       struct tipc_net *tn = net_generic(net, tipc_net_id);
        struct tipc_link *snd_l = tipc_bc_sndlink(net);
 
-       tipc_bclink_lock(net);
-       tipc_nmap_add(&tn->bcbase->bcast_nodes, addr);
+       tipc_bcast_lock(net);
        tipc_link_add_bc_peer(snd_l, uc_l, xmitq);
-       tipc_bclink_unlock(net);
+       tipc_bcbase_select_primary(net);
+       tipc_bcast_unlock(net);
 }
 
 /* tipc_bcast_remove_peer - remove a peer node from broadcast link and bearer
  *
  * RCU is locked, node lock is set
  */
-void tipc_bcast_remove_peer(struct net *net, u32 addr,
-                           struct tipc_link *rcv_l)
+void tipc_bcast_remove_peer(struct net *net, struct tipc_link *rcv_l)
 {
-       struct tipc_net *tn = net_generic(net, tipc_net_id);
-       struct sk_buff_head *inputq = &tipc_bc_base(net)->inputq;
        struct tipc_link *snd_l = tipc_bc_sndlink(net);
+       struct sk_buff_head *inputq = &tipc_bc_base(net)->inputq;
        struct sk_buff_head xmitq;
 
        __skb_queue_head_init(&xmitq);
 
-       tipc_bclink_lock(net);
-       tipc_nmap_remove(&tn->bcbase->bcast_nodes, addr);
+       tipc_bcast_lock(net);
        tipc_link_remove_bc_peer(snd_l, rcv_l, &xmitq);
-       tipc_bclink_unlock(net);
+       tipc_bcbase_select_primary(net);
+       tipc_bcast_unlock(net);
 
-       tipc_bcbearer_xmit(net, &xmitq);
+       tipc_bcbase_xmit(net, &xmitq);
 
        /* Any socket wakeup messages ? */
        if (!skb_queue_empty(inputq))
        return 0;
 }
 
-static void tipc_bcbearer_xmit(struct net *net, struct sk_buff_head *xmitq)
-{
-       struct sk_buff *skb, *tmp;
-
-       skb_queue_walk_safe(xmitq, skb, tmp) {
-               __skb_dequeue(xmitq);
-               tipc_bcbearer_send(net, skb, NULL, NULL);
-
-               /* Until we remove cloning in tipc_l2_send_msg(): */
-               kfree_skb(skb);
-       }
-}
-
 /**
  * tipc_bcbearer_sort - create sets of bearer pairs used by broadcast bearer
  */
 
        n->links[bearer_id].mtu = nl->mtu - INT_H_SIZE;
 
        tipc_bearer_add_dest(n->net, bearer_id, n->addr);
+       tipc_bcast_inc_bearer_dst_cnt(n->net, bearer_id);
 
        pr_debug("Established link <%s> on network plane %c\n",
                 nl->name, nl->net_plane);
                *slot1 = bearer_id;
                tipc_node_fsm_evt(n, SELF_ESTABL_CONTACT_EVT);
                n->action_flags |= TIPC_NOTIFY_NODE_UP;
-               tipc_bcast_add_peer(n->net, n->addr, nl, xmitq);
+               tipc_bcast_add_peer(n->net, nl, xmitq);
                return;
        }
 
                tipc_link_build_reset_msg(l, xmitq);
                *maddr = &n->links[*bearer_id].maddr;
                node_lost_contact(n, &le->inputq);
+               tipc_bcast_dec_bearer_dst_cnt(n->net, *bearer_id);
                return;
        }
+       tipc_bcast_dec_bearer_dst_cnt(n->net, *bearer_id);
 
        /* There is still a working link => initiate failover */
        tnl = node_active_link(n, 0);
                 tipc_addr_string_fill(addr_string, n->addr));
 
        /* Clean up broadcast state */
-       tipc_bcast_remove_peer(n->net, n->addr, n->bc_entry.link);
+       tipc_bcast_remove_peer(n->net, n->bc_entry.link);
 
        /* Abort any ongoing link failover */
        for (i = 0; i < MAX_BEARERS; i++) {