#define ADV_UNIT (((MAX_MSG_SIZE + MAX_H_SIZE) / FLOWCTL_BLK_SZ) + 1)
 #define ADV_IDLE ADV_UNIT
+#define ADV_ACTIVE (ADV_UNIT * 12)
 
 enum mbr_state {
        MBR_QUARANTINED,
 struct tipc_member {
        struct rb_node tree_node;
        struct list_head list;
+       struct list_head congested;
        struct sk_buff *event_msg;
+       struct tipc_group *group;
        u32 node;
        u32 port;
        u32 instance;
        enum mbr_state state;
+       u16 advertised;
+       u16 window;
        u16 bc_rcv_nxt;
+       bool usr_pending;
 };
 
 struct tipc_group {
        struct rb_root members;
+       struct list_head congested;
        struct tipc_nlist dests;
        struct net *net;
        int subid;
 static void tipc_group_proto_xmit(struct tipc_group *grp, struct tipc_member *m,
                                  int mtyp, struct sk_buff_head *xmitq);
 
+static int tipc_group_rcvbuf_limit(struct tipc_group *grp)
+{
+       int mcnt = grp->member_cnt + 1;
+
+       /* Scale to bytes, considering worst-case truesize/msgsize ratio */
+       return mcnt * ADV_ACTIVE * FLOWCTL_BLK_SZ * 4;
+}
+
 u16 tipc_group_bc_snd_nxt(struct tipc_group *grp)
 {
        return grp->bc_snd_nxt;
 }
 
+static bool tipc_group_is_enabled(struct tipc_member *m)
+{
+       return m->state != MBR_QUARANTINED && m->state != MBR_LEAVING;
+}
+
 static bool tipc_group_is_receiver(struct tipc_member *m)
 {
        return m && m->state >= MBR_JOINED;
        if (!grp)
                return NULL;
        tipc_nlist_init(&grp->dests, tipc_own_addr(net));
+       INIT_LIST_HEAD(&grp->congested);
        grp->members = RB_ROOT;
        grp->net = net;
        grp->portid = portid;
        if (!m)
                return NULL;
        INIT_LIST_HEAD(&m->list);
+       INIT_LIST_HEAD(&m->congested);
+       m->group = grp;
        m->node = node;
        m->port = port;
        grp->member_cnt++;
        rb_erase(&m->tree_node, &grp->members);
        grp->member_cnt--;
        list_del_init(&m->list);
+       list_del_init(&m->congested);
 
        /* If last member on a node, remove node from dest list */
        if (!tipc_group_find_node(grp, m->node))
        *scope = grp->scope;
 }
 
-void tipc_group_update_bc_members(struct tipc_group *grp)
+void tipc_group_update_member(struct tipc_member *m, int len)
+{
+       struct tipc_group *grp = m->group;
+       struct tipc_member *_m, *tmp;
+
+       if (!tipc_group_is_enabled(m))
+               return;
+
+       m->window -= len;
+
+       if (m->window >= ADV_IDLE)
+               return;
+
+       if (!list_empty(&m->congested))
+               return;
+
+       /* Sort member into congested members' list */
+       list_for_each_entry_safe(_m, tmp, &grp->congested, congested) {
+               if (m->window > _m->window)
+                       continue;
+               list_add_tail(&m->congested, &_m->congested);
+               return;
+       }
+       list_add_tail(&m->congested, &grp->congested);
+}
+
+void tipc_group_update_bc_members(struct tipc_group *grp, int len)
 {
+       struct tipc_member *m;
+       struct rb_node *n;
+
+       for (n = rb_first(&grp->members); n; n = rb_next(n)) {
+               m = container_of(n, struct tipc_member, tree_node);
+               if (tipc_group_is_enabled(m))
+                       tipc_group_update_member(m, len);
+       }
        grp->bc_snd_nxt++;
 }
 
+bool tipc_group_bc_cong(struct tipc_group *grp, int len)
+{
+       struct tipc_member *m;
+
+       if (list_empty(&grp->congested))
+               return false;
+
+       m = list_first_entry(&grp->congested, struct tipc_member, congested);
+       if (m->window >= len)
+               return false;
+
+       return true;
+}
+
 /* tipc_group_filter_msg() - determine if we should accept arriving message
  */
 void tipc_group_filter_msg(struct tipc_group *grp, struct sk_buff_head *inputq,
        kfree_skb(skb);
 }
 
+void tipc_group_update_rcv_win(struct tipc_group *grp, int blks, u32 node,
+                              u32 port, struct sk_buff_head *xmitq)
+{
+       struct tipc_member *m;
+
+       m = tipc_group_find_member(grp, node, port);
+       if (!m)
+               return;
+
+       m->advertised -= blks;
+
+       switch (m->state) {
+       case MBR_JOINED:
+               if (m->advertised <= (ADV_ACTIVE - ADV_UNIT))
+                       tipc_group_proto_xmit(grp, m, GRP_ADV_MSG, xmitq);
+               break;
+       case MBR_DISCOVERED:
+       case MBR_JOINING:
+       case MBR_LEAVING:
+       default:
+               break;
+       }
+}
+
 static void tipc_group_proto_xmit(struct tipc_group *grp, struct tipc_member *m,
                                  int mtyp, struct sk_buff_head *xmitq)
 {
        struct tipc_msg *hdr;
        struct sk_buff *skb;
+       int adv = 0;
 
        skb = tipc_msg_create(GROUP_PROTOCOL, mtyp, INT_H_SIZE, 0,
                              m->node, tipc_own_addr(grp->net),
        if (!skb)
                return;
 
+       if (m->state == MBR_JOINED)
+               adv = ADV_ACTIVE - m->advertised;
+
        hdr = buf_msg(skb);
-       if (mtyp == GRP_JOIN_MSG)
+
+       if (mtyp == GRP_JOIN_MSG) {
                msg_set_grp_bc_syncpt(hdr, grp->bc_snd_nxt);
+               msg_set_adv_win(hdr, adv);
+               m->advertised += adv;
+       } else if (mtyp == GRP_ADV_MSG) {
+               msg_set_adv_win(hdr, adv);
+               m->advertised += adv;
+       }
        __skb_queue_tail(xmitq, skb);
 }
 
-void tipc_group_proto_rcv(struct tipc_group *grp, struct tipc_msg *hdr,
-                         struct sk_buff_head *inputq,
+void tipc_group_proto_rcv(struct tipc_group *grp, bool *usr_wakeup,
+                         struct tipc_msg *hdr, struct sk_buff_head *inputq,
                          struct sk_buff_head *xmitq)
 {
        u32 node = msg_orignode(hdr);
                if (!m)
                        return;
                m->bc_rcv_nxt = msg_grp_bc_syncpt(hdr);
+               m->window += msg_adv_win(hdr);
 
                /* Wait until PUBLISH event is received */
                if (m->state == MBR_DISCOVERED) {
                        m->state = MBR_JOINING;
                } else if (m->state == MBR_PUBLISHED) {
                        m->state = MBR_JOINED;
+                       *usr_wakeup = true;
+                       m->usr_pending = false;
+                       tipc_group_proto_xmit(grp, m, GRP_ADV_MSG, xmitq);
                        __skb_queue_tail(inputq, m->event_msg);
                }
+               if (m->window < ADV_IDLE)
+                       tipc_group_update_member(m, 0);
+               else
+                       list_del_init(&m->congested);
                return;
        case GRP_LEAVE_MSG:
                if (!m)
                }
                /* Otherwise deliver already received WITHDRAW event */
                __skb_queue_tail(inputq, m->event_msg);
+               *usr_wakeup = m->usr_pending;
                tipc_group_delete_member(grp, m);
+               list_del_init(&m->congested);
+               return;
+       case GRP_ADV_MSG:
+               if (!m)
+                       return;
+               m->window += msg_adv_win(hdr);
+               *usr_wakeup = m->usr_pending;
+               m->usr_pending = false;
+               list_del_init(&m->congested);
                return;
        default:
                pr_warn("Received unknown GROUP_PROTO message\n");
        }
 }
 
+/* tipc_group_member_evt() - receive and handle a member up/down event
+ */
 void tipc_group_member_evt(struct tipc_group *grp,
+                          bool *usr_wakeup,
+                          int *sk_rcvbuf,
                           struct sk_buff *skb,
                           struct sk_buff_head *inputq,
                           struct sk_buff_head *xmitq)
                } else {
                        __skb_queue_tail(inputq, skb);
                        m->state = MBR_JOINED;
+                       *usr_wakeup = true;
+                       m->usr_pending = false;
                }
                m->instance = instance;
                TIPC_SKB_CB(skb)->orig_member = m->instance;
                tipc_group_proto_xmit(grp, m, GRP_JOIN_MSG, xmitq);
+               if (m->window < ADV_IDLE)
+                       tipc_group_update_member(m, 0);
+               else
+                       list_del_init(&m->congested);
        } else if (event == TIPC_WITHDRAWN) {
                if (!m)
                        goto drop;
 
                TIPC_SKB_CB(skb)->orig_member = m->instance;
 
+               *usr_wakeup = m->usr_pending;
+               m->usr_pending = false;
+
                /* Hold back event if more messages might be expected */
                if (m->state != MBR_LEAVING && tipc_node_is_up(net, node)) {
                        m->event_msg = skb;
                        __skb_queue_tail(inputq, skb);
                        tipc_group_delete_member(grp, m);
                }
+               list_del_init(&m->congested);
        }
+       *sk_rcvbuf = tipc_group_rcvbuf_limit(grp);
        return;
 drop:
        kfree_skb(skb);
 
        return tsk->snt_unacked > tsk->snd_win;
 }
 
+static u16 tsk_blocks(int len)
+{
+       return ((len / FLOWCTL_BLK_SZ) + 1);
+}
+
 /* tsk_blocks(): translate a buffer size in bytes to number of
  * advertisable blocks, taking into account the ratio truesize(len)/len
  * We can trust that this ratio is always < 4 for len >= FLOWCTL_BLK_SZ
        struct tipc_group *grp = tsk->group;
        struct tipc_nlist *dsts = tipc_group_dests(grp);
        struct tipc_mc_method *method = &tsk->mc_method;
+       int blks = tsk_blocks(MCAST_H_SIZE + dlen);
        struct tipc_msg *hdr = &tsk->phdr;
        int mtu = tipc_bcast_get_mtu(net);
        struct sk_buff_head pkts;
        if (!dsts->local && !dsts->remote)
                return -EHOSTUNREACH;
 
-       /* Block or return if any destination link is congested */
-       rc = tipc_wait_for_cond(sock, &timeout, !tsk->cong_link_cnt);
+       /* Block or return if any destination link or member is congested */
+       rc = tipc_wait_for_cond(sock, &timeout, !tsk->cong_link_cnt &&
+                               !tipc_group_bc_cong(grp, blks));
        if (unlikely(rc))
                return rc;
 
        /* Complete message header */
        msg_set_type(hdr, TIPC_GRP_BCAST_MSG);
-       msg_set_hdr_sz(hdr, MCAST_H_SIZE);
+       msg_set_hdr_sz(hdr, GROUP_H_SIZE);
        msg_set_destport(hdr, 0);
        msg_set_destnode(hdr, 0);
        msg_set_nameinst(hdr, 0);
        if (unlikely(rc))
                return rc;
 
-       /* Update broadcast sequence number */
-       tipc_group_update_bc_members(tsk->group);
-
+       /* Update broadcast sequence number and send windows */
+       tipc_group_update_bc_members(tsk->group, blks);
        return dlen;
 }
 
        if (unlikely(dlen > TIPC_MAX_USER_MSG_SIZE))
                return -EMSGSIZE;
 
-       if (unlikely(grp))
+       if (unlikely(grp && !dest))
                return tipc_send_group_bcast(sock, m, dlen, timeout);
 
        if (unlikely(!dest)) {
        bool connected = !tipc_sk_type_connectionless(sk);
        struct tipc_sock *tsk = tipc_sk(sk);
        int rc, err, hlen, dlen, copy;
+       struct sk_buff_head xmitq;
        struct tipc_msg *hdr;
        struct sk_buff *skb;
        bool grp_evt;
        }
        timeout = sock_rcvtimeo(sk, flags & MSG_DONTWAIT);
 
+       /* Step rcv queue to first msg with data or error; wait if necessary */
        do {
-               /* Look at first msg in receive queue; wait if necessary */
                rc = tipc_wait_for_rcvmsg(sock, &timeout);
                if (unlikely(rc))
                        goto exit;
        if (unlikely(flags & MSG_PEEK))
                goto exit;
 
+       /* Send group flow control advertisement when applicable */
+       if (tsk->group && msg_in_group(hdr) && !grp_evt) {
+               skb_queue_head_init(&xmitq);
+               tipc_group_update_rcv_win(tsk->group, tsk_blocks(hlen + dlen),
+                                         msg_orignode(hdr), msg_origport(hdr),
+                                         &xmitq);
+               tipc_node_distr_xmit(sock_net(sk), &xmitq);
+       }
+
        tsk_advance_rx_queue(sk);
 
        if (likely(!connected))
                goto exit;
 
-       /* Send connection flow control ack when applicable */
+       /* Send connection flow control advertisement when applicable */
        tsk->rcv_unacked += tsk_inc(tsk, hlen + dlen);
        if (tsk->rcv_unacked >= tsk->rcv_win / TIPC_ACK_RATE)
                tipc_sk_send_ack(tsk);
        struct tipc_sock *tsk = tipc_sk(sk);
        struct tipc_msg *hdr = buf_msg(skb);
        struct tipc_group *grp = tsk->group;
+       bool wakeup = false;
 
        switch (msg_user(hdr)) {
        case CONN_MANAGER:
        case SOCK_WAKEUP:
                tipc_dest_del(&tsk->cong_links, msg_orignode(hdr), 0);
                tsk->cong_link_cnt--;
-               sk->sk_write_space(sk);
+               wakeup = true;
                break;
        case GROUP_PROTOCOL:
-               tipc_group_proto_rcv(grp, hdr, inputq, xmitq);
+               tipc_group_proto_rcv(grp, &wakeup, hdr, inputq, xmitq);
                break;
        case TOP_SRV:
-               tipc_group_member_evt(tsk->group, skb, inputq, xmitq);
+               tipc_group_member_evt(tsk->group, &wakeup, &sk->sk_rcvbuf,
+                                     skb, inputq, xmitq);
                skb = NULL;
                break;
        default:
                break;
        }
 
+       if (wakeup)
+               sk->sk_write_space(sk);
+
        kfree_skb(skb);
 }
 
        struct tipc_sock *tsk = tipc_sk(sk);
        struct tipc_msg *hdr = buf_msg(skb);
 
+       if (unlikely(msg_in_group(hdr)))
+               return sk->sk_rcvbuf;
+
        if (unlikely(!msg_connected(hdr)))
                return sk->sk_rcvbuf << msg_importance(hdr);