* Consumes the buffer chain.
  * Returns 0 if success, otherwise errno: -EHOSTUNREACH,-EMSGSIZE
  */
-static int tipc_bcast_xmit(struct net *net, struct sk_buff_head *pkts,
-                          u16 *cong_link_cnt)
+int tipc_bcast_xmit(struct net *net, struct sk_buff_head *pkts,
+                   u16 *cong_link_cnt)
 {
        struct tipc_link *l = tipc_bc_sndlink(net);
        struct sk_buff_head xmitq;
        nl->local = false;
 }
 
-u32 tipc_bcast_get_broadcast_mode(struct net *net)
+u32 tipc_bcast_get_mode(struct net *net)
 {
        struct tipc_bc_base *bb = tipc_bc_base(net);
 
 
 int tipc_mcast_xmit(struct net *net, struct sk_buff_head *pkts,
                    struct tipc_mc_method *method, struct tipc_nlist *dests,
                    u16 *cong_link_cnt);
+int tipc_bcast_xmit(struct net *net, struct sk_buff_head *pkts,
+                   u16 *cong_link_cnt);
 int tipc_bcast_rcv(struct net *net, struct tipc_link *l, struct sk_buff *skb);
 void tipc_bcast_ack_rcv(struct net *net, struct tipc_link *l,
                        struct tipc_msg *hdr);
 int tipc_nl_bc_link_set(struct net *net, struct nlattr *attrs[]);
 int tipc_bclink_reset_stats(struct net *net, struct tipc_link *l);
 
-u32 tipc_bcast_get_broadcast_mode(struct net *net);
+u32 tipc_bcast_get_mode(struct net *net);
 u32 tipc_bcast_get_broadcast_ratio(struct net *net);
 
 void tipc_mcast_filter_msg(struct net *net, struct sk_buff_head *defq,
 
        void *hdr;
        struct nlattr *attrs;
        struct nlattr *prop;
-       u32 bc_mode = tipc_bcast_get_broadcast_mode(net);
+       u32 bc_mode = tipc_bcast_get_mode(net);
        u32 bc_ratio = tipc_bcast_get_broadcast_ratio(net);
 
        if (!bcl)
 
        msg_set_bits(m, 1, 25, 0xf, err);
 }
 
+static inline void msg_set_bulk(struct tipc_msg *m)
+{
+       msg_set_bits(m, 1, 28, 0x1, 1);
+}
+
+static inline u32 msg_is_bulk(struct tipc_msg *m)
+{
+       return msg_bits(m, 1, 28, 0x1);
+}
+
+static inline void msg_set_last_bulk(struct tipc_msg *m)
+{
+       msg_set_bits(m, 1, 27, 0x1, 1);
+}
+
+static inline u32 msg_is_last_bulk(struct tipc_msg *m)
+{
+       return msg_bits(m, 1, 27, 0x1);
+}
+
+static inline void msg_set_non_legacy(struct tipc_msg *m)
+{
+       msg_set_bits(m, 1, 26, 0x1, 1);
+}
+
+static inline u32 msg_is_legacy(struct tipc_msg *m)
+{
+       return !msg_bits(m, 1, 26, 0x1);
+}
+
 static inline u32 msg_reroute_cnt(struct tipc_msg *m)
 {
        return msg_bits(m, 1, 21, 0xf);
        msg_set_word(m, 4, p);
 }
 
+static inline u16 msg_named_seqno(struct tipc_msg *m)
+{
+       return msg_bits(m, 4, 0, 0xffff);
+}
+
+static inline void msg_set_named_seqno(struct tipc_msg *m, u16 n)
+{
+       msg_set_bits(m, 4, 0, 0xffff, n);
+}
+
 static inline u32 msg_destport(struct tipc_msg *m)
 {
        return msg_word(m, 5);
 
                pr_warn("Publication distribution failure\n");
                return NULL;
        }
-
+       msg_set_named_seqno(buf_msg(skb), nt->snd_nxt++);
+       msg_set_non_legacy(buf_msg(skb));
        item = (struct distr_item *)msg_data(buf_msg(skb));
        publ_to_item(item, publ);
        return skb;
 struct sk_buff *tipc_named_withdraw(struct net *net, struct publication *publ)
 {
        struct name_table *nt = tipc_name_table(net);
-       struct sk_buff *buf;
        struct distr_item *item;
+       struct sk_buff *skb;
 
        write_lock_bh(&nt->cluster_scope_lock);
        list_del(&publ->binding_node);
        if (publ->scope == TIPC_NODE_SCOPE)
                return NULL;
 
-       buf = named_prepare_buf(net, WITHDRAWAL, ITEM_SIZE, 0);
-       if (!buf) {
+       skb = named_prepare_buf(net, WITHDRAWAL, ITEM_SIZE, 0);
+       if (!skb) {
                pr_warn("Withdrawal distribution failure\n");
                return NULL;
        }
-
-       item = (struct distr_item *)msg_data(buf_msg(buf));
+       msg_set_named_seqno(buf_msg(skb), nt->snd_nxt++);
+       msg_set_non_legacy(buf_msg(skb));
+       item = (struct distr_item *)msg_data(buf_msg(skb));
        publ_to_item(item, publ);
-       return buf;
+       return skb;
 }
 
 /**
  * @pls: linked list of publication items to be packed into buffer chain
  */
 static void named_distribute(struct net *net, struct sk_buff_head *list,
-                            u32 dnode, struct list_head *pls)
+                            u32 dnode, struct list_head *pls, u16 seqno)
 {
        struct publication *publ;
        struct sk_buff *skb = NULL;
        u32 msg_dsz = ((tipc_node_get_mtu(net, dnode, 0, false) - INT_H_SIZE) /
                        ITEM_SIZE) * ITEM_SIZE;
        u32 msg_rem = msg_dsz;
+       struct tipc_msg *hdr;
 
        list_for_each_entry(publ, pls, binding_node) {
                /* Prepare next buffer: */
                                pr_warn("Bulk publication failure\n");
                                return;
                        }
-                       msg_set_bc_ack_invalid(buf_msg(skb), true);
-                       item = (struct distr_item *)msg_data(buf_msg(skb));
+                       hdr = buf_msg(skb);
+                       msg_set_bc_ack_invalid(hdr, true);
+                       msg_set_bulk(hdr);
+                       msg_set_non_legacy(hdr);
+                       item = (struct distr_item *)msg_data(hdr);
                }
 
                /* Pack publication into message: */
                }
        }
        if (skb) {
-               msg_set_size(buf_msg(skb), INT_H_SIZE + (msg_dsz - msg_rem));
+               hdr = buf_msg(skb);
+               msg_set_size(hdr, INT_H_SIZE + (msg_dsz - msg_rem));
                skb_trim(skb, INT_H_SIZE + (msg_dsz - msg_rem));
                __skb_queue_tail(list, skb);
        }
+       hdr = buf_msg(skb_peek_tail(list));
+       msg_set_last_bulk(hdr);
+       msg_set_named_seqno(hdr, seqno);
 }
 
 /**
  * tipc_named_node_up - tell specified node about all publications by this node
  */
-void tipc_named_node_up(struct net *net, u32 dnode)
+void tipc_named_node_up(struct net *net, u32 dnode, u16 capabilities)
 {
        struct name_table *nt = tipc_name_table(net);
+       struct tipc_net *tn = tipc_net(net);
        struct sk_buff_head head;
+       u16 seqno;
 
        __skb_queue_head_init(&head);
+       spin_lock_bh(&tn->nametbl_lock);
+       if (!(capabilities & TIPC_NAMED_BCAST))
+               nt->rc_dests++;
+       seqno = nt->snd_nxt;
+       spin_unlock_bh(&tn->nametbl_lock);
 
        read_lock_bh(&nt->cluster_scope_lock);
-       named_distribute(net, &head, dnode, &nt->cluster_scope);
+       named_distribute(net, &head, dnode, &nt->cluster_scope, seqno);
        tipc_node_xmit(net, &head, dnode, 0);
        read_unlock_bh(&nt->cluster_scope_lock);
 }
        spin_unlock_bh(&tn->nametbl_lock);
 }
 
-void tipc_publ_notify(struct net *net, struct list_head *nsub_list, u32 addr)
+void tipc_publ_notify(struct net *net, struct list_head *nsub_list,
+                     u32 addr, u16 capabilities)
 {
+       struct name_table *nt = tipc_name_table(net);
+       struct tipc_net *tn = tipc_net(net);
+
        struct publication *publ, *tmp;
 
        list_for_each_entry_safe(publ, tmp, nsub_list, binding_node)
                tipc_publ_purge(net, publ, addr);
        tipc_dist_queue_purge(net, addr);
+       spin_lock_bh(&tn->nametbl_lock);
+       if (!(capabilities & TIPC_NAMED_BCAST))
+               nt->rc_dests--;
+       spin_unlock_bh(&tn->nametbl_lock);
 }
 
 /**
        return false;
 }
 
+static struct sk_buff *tipc_named_dequeue(struct sk_buff_head *namedq,
+                                         u16 *rcv_nxt, bool *open)
+{
+       struct sk_buff *skb, *tmp;
+       struct tipc_msg *hdr;
+       u16 seqno;
+
+       skb_queue_walk_safe(namedq, skb, tmp) {
+               skb_linearize(skb);
+               hdr = buf_msg(skb);
+               seqno = msg_named_seqno(hdr);
+               if (msg_is_last_bulk(hdr)) {
+                       *rcv_nxt = seqno;
+                       *open = true;
+               }
+
+               if (msg_is_bulk(hdr) || msg_is_legacy(hdr)) {
+                       __skb_unlink(skb, namedq);
+                       return skb;
+               }
+
+               if (*open && (*rcv_nxt == seqno)) {
+                       (*rcv_nxt)++;
+                       __skb_unlink(skb, namedq);
+                       return skb;
+               }
+
+               if (less(seqno, *rcv_nxt)) {
+                       __skb_unlink(skb, namedq);
+                       kfree_skb(skb);
+                       continue;
+               }
+       }
+       return NULL;
+}
+
 /**
  * tipc_named_rcv - process name table update messages sent by another node
  */
-void tipc_named_rcv(struct net *net, struct sk_buff_head *inputq)
+void tipc_named_rcv(struct net *net, struct sk_buff_head *namedq,
+                   u16 *rcv_nxt, bool *open)
 {
-       struct tipc_net *tn = net_generic(net, tipc_net_id);
-       struct tipc_msg *msg;
+       struct tipc_net *tn = tipc_net(net);
        struct distr_item *item;
-       uint count;
-       u32 node;
+       struct tipc_msg *hdr;
        struct sk_buff *skb;
-       int mtype;
+       u32 count, node;
 
        spin_lock_bh(&tn->nametbl_lock);
-       for (skb = skb_dequeue(inputq); skb; skb = skb_dequeue(inputq)) {
-               skb_linearize(skb);
-               msg = buf_msg(skb);
-               mtype = msg_type(msg);
-               item = (struct distr_item *)msg_data(msg);
-               count = msg_data_sz(msg) / ITEM_SIZE;
-               node = msg_orignode(msg);
+       while ((skb = tipc_named_dequeue(namedq, rcv_nxt, open))) {
+               hdr = buf_msg(skb);
+               node = msg_orignode(hdr);
+               item = (struct distr_item *)msg_data(hdr);
+               count = msg_data_sz(hdr) / ITEM_SIZE;
                while (count--) {
-                       tipc_update_nametbl(net, item, node, mtype);
+                       tipc_update_nametbl(net, item, node, msg_type(hdr));
                        item++;
                }
                kfree_skb(skb);
                publ->node = self;
        list_for_each_entry_rcu(publ, &nt->cluster_scope, binding_node)
                publ->node = self;
-
+       nt->rc_dests = 0;
        spin_unlock_bh(&tn->nametbl_lock);
 }
 
        __be32 key;
 };
 
+void tipc_named_bcast(struct net *net, struct sk_buff *skb);
 struct sk_buff *tipc_named_publish(struct net *net, struct publication *publ);
 struct sk_buff *tipc_named_withdraw(struct net *net, struct publication *publ);
-void tipc_named_node_up(struct net *net, u32 dnode);
-void tipc_named_rcv(struct net *net, struct sk_buff_head *msg_queue);
+void tipc_named_node_up(struct net *net, u32 dnode, u16 capabilities);
+void tipc_named_rcv(struct net *net, struct sk_buff_head *namedq,
+                   u16 *rcv_nxt, bool *open);
 void tipc_named_reinit(struct net *net);
-void tipc_publ_notify(struct net *net, struct list_head *nsub_list, u32 addr);
+void tipc_publ_notify(struct net *net, struct list_head *nsub_list,
+                     u32 addr, u16 capabilities);
 
 #endif
 
        struct tipc_net *tn = tipc_net(net);
        struct publication *p = NULL;
        struct sk_buff *skb = NULL;
+       u32 rc_dests;
 
        spin_lock_bh(&tn->nametbl_lock);
 
                nt->local_publ_count++;
                skb = tipc_named_publish(net, p);
        }
+       rc_dests = nt->rc_dests;
 exit:
        spin_unlock_bh(&tn->nametbl_lock);
 
        if (skb)
-               tipc_node_broadcast(net, skb);
+               tipc_node_broadcast(net, skb, rc_dests);
        return p;
+
 }
 
 /**
        u32 self = tipc_own_addr(net);
        struct sk_buff *skb = NULL;
        struct publication *p;
+       u32 rc_dests;
 
        spin_lock_bh(&tn->nametbl_lock);
 
                pr_err("Failed to remove local publication {%u,%u,%u}/%u\n",
                       type, lower, upper, key);
        }
+       rc_dests = nt->rc_dests;
        spin_unlock_bh(&tn->nametbl_lock);
 
        if (skb) {
-               tipc_node_broadcast(net, skb);
+               tipc_node_broadcast(net, skb, rc_dests);
                return 1;
        }
        return 0;
 
        struct list_head cluster_scope;
        rwlock_t cluster_scope_lock;
        u32 local_publ_count;
+       u32 rc_dests;
+       u32 snd_nxt;
 };
 
 int tipc_nl_name_table_dump(struct sk_buff *skb, struct netlink_callback *cb);
 
        struct sk_buff_head arrvq;
        struct sk_buff_head inputq2;
        struct sk_buff_head namedq;
+       u16 named_rcv_nxt;
+       bool named_open;
 };
 
 /**
        write_unlock_bh(&n->lock);
 
        if (flags & TIPC_NOTIFY_NODE_DOWN)
-               tipc_publ_notify(net, publ_list, addr);
+               tipc_publ_notify(net, publ_list, addr, n->capabilities);
 
        if (flags & TIPC_NOTIFY_NODE_UP)
-               tipc_named_node_up(net, addr);
+               tipc_named_node_up(net, addr, n->capabilities);
 
        if (flags & TIPC_NOTIFY_LINK_UP) {
                tipc_mon_peer_up(net, addr, bearer_id);
 
        /* Clean up broadcast state */
        tipc_bcast_remove_peer(n->net, n->bc_entry.link);
+       __skb_queue_purge(&n->bc_entry.namedq);
 
        /* Abort any ongoing link failover */
        for (i = 0; i < MAX_BEARERS; i++) {
        return 0;
 }
 
-void tipc_node_broadcast(struct net *net, struct sk_buff *skb)
+void tipc_node_broadcast(struct net *net, struct sk_buff *skb, int rc_dests)
 {
+       struct sk_buff_head xmitq;
        struct sk_buff *txskb;
        struct tipc_node *n;
+       u16 dummy;
        u32 dst;
 
+       /* Use broadcast if all nodes support it */
+       if (!rc_dests && tipc_bcast_get_mode(net) != BCLINK_MODE_RCAST) {
+               __skb_queue_head_init(&xmitq);
+               __skb_queue_tail(&xmitq, skb);
+               tipc_bcast_xmit(net, &xmitq, &dummy);
+               return;
+       }
+
+       /* Otherwise use legacy replicast method */
        rcu_read_lock();
        list_for_each_entry_rcu(n, tipc_nodes(net), list) {
                dst = n->addr;
                tipc_node_xmit_skb(net, txskb, dst, 0);
        }
        rcu_read_unlock();
-
        kfree_skb(skb);
 }
 
 
        /* Handle NAME_DISTRIBUTOR messages sent from 1.7 nodes */
        if (!skb_queue_empty(&n->bc_entry.namedq))
-               tipc_named_rcv(net, &n->bc_entry.namedq);
+               tipc_named_rcv(net, &n->bc_entry.namedq,
+                              &n->bc_entry.named_rcv_nxt,
+                              &n->bc_entry.named_open);
 
        /* If reassembly or retransmission failure => reset all links to peer */
        if (rc & TIPC_LINK_DOWN_EVT)
                tipc_node_link_down(n, bearer_id, false);
 
        if (unlikely(!skb_queue_empty(&n->bc_entry.namedq)))
-               tipc_named_rcv(net, &n->bc_entry.namedq);
+               tipc_named_rcv(net, &n->bc_entry.namedq,
+                              &n->bc_entry.named_rcv_nxt,
+                              &n->bc_entry.named_open);
 
        if (unlikely(!skb_queue_empty(&n->bc_entry.inputq1)))
                tipc_node_mcast_rcv(n);
 
        TIPC_MCAST_RBCTL      = (1 << 7),
        TIPC_GAP_ACK_BLOCK    = (1 << 8),
        TIPC_TUNNEL_ENHANCED  = (1 << 9),
-       TIPC_NAGLE            = (1 << 10)
+       TIPC_NAGLE            = (1 << 10),
+       TIPC_NAMED_BCAST      = (1 << 11)
 };
 
 #define TIPC_NODE_CAPABILITIES (TIPC_SYN_BIT           |  \
                                TIPC_MCAST_RBCTL       |   \
                                TIPC_GAP_ACK_BLOCK     |   \
                                TIPC_TUNNEL_ENHANCED   |   \
-                               TIPC_NAGLE)
+                               TIPC_NAGLE             |   \
+                               TIPC_NAMED_BCAST)
 
 #define INVALID_BEARER_ID -1
 
                       u32 selector);
 void tipc_node_subscribe(struct net *net, struct list_head *subscr, u32 addr);
 void tipc_node_unsubscribe(struct net *net, struct list_head *subscr, u32 addr);
-void tipc_node_broadcast(struct net *net, struct sk_buff *skb);
+void tipc_node_broadcast(struct net *net, struct sk_buff *skb, int rc_dests);
 int tipc_node_add_conn(struct net *net, u32 dnode, u32 port, u32 peer_port);
 void tipc_node_remove_conn(struct net *net, u32 dnode, u32 port);
 int tipc_node_get_mtu(struct net *net, u32 addr, u32 sel, bool connected);