*/
 static void tsk_advance_rx_queue(struct sock *sk)
 {
+       trace_tipc_sk_advance_rx(sk, NULL, TIPC_DUMP_SK_RCVQ, " ");
        kfree_skb(__skb_dequeue(&sk->sk_receive_queue));
 }
 
        if (!tipc_msg_reverse(onode, &skb, err))
                return;
 
+       trace_tipc_sk_rej_msg(sk, skb, TIPC_DUMP_NONE, "@sk_respond!");
        dnode = msg_destnode(buf_msg(skb));
        selector = msg_origport(buf_msg(skb));
        tipc_node_xmit_skb(sock_net(sk), skb, dnode, selector);
                        tsk_set_unreliable(tsk, true);
        }
 
+       trace_tipc_sk_create(sk, NULL, TIPC_DUMP_NONE, " ");
        return 0;
 }
 
        tsk = tipc_sk(sk);
        lock_sock(sk);
 
+       trace_tipc_sk_release(sk, NULL, TIPC_DUMP_ALL, " ");
        __tipc_shutdown(sock, TIPC_ERR_NO_PORT);
        sk->sk_shutdown = SHUTDOWN_MASK;
        tipc_sk_leave(tsk);
        __poll_t revents = 0;
 
        sock_poll_wait(file, sock, wait);
+       trace_tipc_sk_poll(sk, NULL, TIPC_DUMP_ALL, " ");
 
        if (sk->sk_shutdown & RCV_SHUTDOWN)
                revents |= EPOLLRDHUP | EPOLLIN | EPOLLRDNORM;
        rc = tipc_msg_build(hdr, msg, 0, dlen, mtu, &pkts);
 
        /* Send message if build was successful */
-       if (unlikely(rc == dlen))
+       if (unlikely(rc == dlen)) {
+               trace_tipc_sk_sendmcast(sk, skb_peek(&pkts),
+                                       TIPC_DUMP_SK_SNDQ, " ");
                rc = tipc_mcast_xmit(net, &pkts, method, &dsts,
                                     &tsk->cong_link_cnt);
+       }
 
        tipc_nlist_purge(&dsts);
 
        bool conn_cong;
 
        /* Ignore if connection cannot be validated: */
-       if (!tsk_peer_msg(tsk, hdr))
+       if (!tsk_peer_msg(tsk, hdr)) {
+               trace_tipc_sk_drop_msg(sk, skb, TIPC_DUMP_NONE, "@proto_rcv!");
                goto exit;
+       }
 
        if (unlikely(msg_errcode(hdr))) {
                tipc_set_sk_state(sk, TIPC_DISCONNECTING);
        if (unlikely(syn && !tipc_msg_skb_clone(&pkts, &sk->sk_write_queue)))
                return -ENOMEM;
 
+       trace_tipc_sk_sendmsg(sk, skb_peek(&pkts), TIPC_DUMP_SK_SNDQ, " ");
        rc = tipc_node_xmit(net, &pkts, dnode, tsk->portid);
        if (unlikely(rc == -ELINKCONG)) {
                tipc_dest_push(clinks, dnode, 0);
                if (unlikely(rc != send))
                        break;
 
+               trace_tipc_sk_sendstream(sk, skb_peek(&pkts),
+                                        TIPC_DUMP_SK_SNDQ, " ");
                rc = tipc_node_xmit(net, &pkts, dnode, tsk->portid);
                if (unlikely(rc == -ELINKCONG)) {
                        tsk->cong_link_cnt = 1;
        struct sk_buff_head inputq;
        int limit, err = TIPC_OK;
 
+       trace_tipc_sk_filter_rcv(sk, skb, TIPC_DUMP_ALL, " ");
        TIPC_SKB_CB(skb)->bytes_read = 0;
        __skb_queue_head_init(&inputq);
        __skb_queue_tail(&inputq, skb);
                    (!grp && msg_in_group(hdr)))
                        err = TIPC_ERR_NO_PORT;
                else if (sk_rmem_alloc_get(sk) + skb->truesize >= limit) {
+                       trace_tipc_sk_dump(sk, skb, TIPC_DUMP_ALL,
+                                          "err_overload2!");
                        atomic_inc(&sk->sk_drops);
                        err = TIPC_ERR_OVERLOAD;
                }
 
                if (unlikely(err)) {
-                       tipc_skb_reject(net, err, skb, xmitq);
+                       if (tipc_msg_reverse(tipc_own_addr(net), &skb, err)) {
+                               trace_tipc_sk_rej_msg(sk, skb, TIPC_DUMP_NONE,
+                                                     "@filter_rcv!");
+                               __skb_queue_tail(xmitq, skb);
+                       }
                        err = TIPC_OK;
                        continue;
                }
                __skb_queue_tail(&sk->sk_receive_queue, skb);
                skb_set_owner_r(skb, sk);
+               trace_tipc_sk_overlimit2(sk, skb, TIPC_DUMP_ALL,
+                                        "rcvq >90% allocated!");
                sk->sk_data_ready(sk);
        }
 }
                if (!sk->sk_backlog.len)
                        atomic_set(dcnt, 0);
                lim = rcvbuf_limit(sk, skb) + atomic_read(dcnt);
-               if (likely(!sk_add_backlog(sk, skb, lim)))
+               if (likely(!sk_add_backlog(sk, skb, lim))) {
+                       trace_tipc_sk_overlimit1(sk, skb, TIPC_DUMP_ALL,
+                                                "bklg & rcvq >90% allocated!");
                        continue;
+               }
 
+               trace_tipc_sk_dump(sk, skb, TIPC_DUMP_ALL, "err_overload!");
                /* Overload => reject message back to sender */
                onode = tipc_own_addr(sock_net(sk));
                atomic_inc(&sk->sk_drops);
-               if (tipc_msg_reverse(onode, &skb, TIPC_ERR_OVERLOAD))
+               if (tipc_msg_reverse(onode, &skb, TIPC_ERR_OVERLOAD)) {
+                       trace_tipc_sk_rej_msg(sk, skb, TIPC_DUMP_ALL,
+                                             "@sk_enqueue!");
                        __skb_queue_tail(xmitq, skb);
+               }
                break;
        }
 }
                /* Prepare for message rejection */
                if (!tipc_msg_reverse(tipc_own_addr(net), &skb, err))
                        continue;
+
+               trace_tipc_sk_rej_msg(NULL, skb, TIPC_DUMP_NONE, "@sk_rcv!");
 xmit:
                dnode = msg_destnode(buf_msg(skb));
                tipc_node_xmit_skb(net, skb, dnode, dport);
 
        lock_sock(sk);
 
+       trace_tipc_sk_shutdown(sk, NULL, TIPC_DUMP_ALL, " ");
        __tipc_shutdown(sock, TIPC_CONN_SHUTDOWN);
        sk->sk_shutdown = SEND_SHUTDOWN;
 
        return skb->len;
 }
 
+/**
+ * tipc_sk_filtering - check if a socket should be traced
+ * @sk: the socket to be examined
+ * @sysctl_tipc_sk_filter[]: the socket tuple for filtering,
+ *  (portid, sock type, name type, name lower, name upper)
+ *
+ * Returns true if the socket meets the socket tuple data
+ * (value 0 = 'any') or when there is no tuple set (all = 0),
+ * otherwise false
+ */
+bool tipc_sk_filtering(struct sock *sk)
+{
+       struct tipc_sock *tsk;
+       struct publication *p;
+       u32 _port, _sktype, _type, _lower, _upper;
+       u32 type = 0, lower = 0, upper = 0;
+
+       if (!sk)
+               return true;
+
+       tsk = tipc_sk(sk);
+
+       _port = sysctl_tipc_sk_filter[0];
+       _sktype = sysctl_tipc_sk_filter[1];
+       _type = sysctl_tipc_sk_filter[2];
+       _lower = sysctl_tipc_sk_filter[3];
+       _upper = sysctl_tipc_sk_filter[4];
+
+       if (!_port && !_sktype && !_type && !_lower && !_upper)
+               return true;
+
+       if (_port)
+               return (_port == tsk->portid);
+
+       if (_sktype && _sktype != sk->sk_type)
+               return false;
+
+       if (tsk->published) {
+               p = list_first_entry_or_null(&tsk->publications,
+                                            struct publication, binding_sock);
+               if (p) {
+                       type = p->type;
+                       lower = p->lower;
+                       upper = p->upper;
+               }
+       }
+
+       if (!tipc_sk_type_connectionless(sk)) {
+               type = tsk->conn_type;
+               lower = tsk->conn_instance;
+               upper = tsk->conn_instance;
+       }
+
+       if ((_type && _type != type) || (_lower && _lower != lower) ||
+           (_upper && _upper != upper))
+               return false;
+
+       return true;
+}
+
 u32 tipc_sock_get_portid(struct sock *sk)
 {
        return (sk) ? (tipc_sk(sk))->portid : 0;
 }
 
+/**
+ * tipc_sk_overlimit1 - check if socket rx queue is about to be overloaded,
+ *                     both the rcv and backlog queues are considered
+ * @sk: tipc sk to be checked
+ * @skb: tipc msg to be checked
+ *
+ * Returns true if the socket rx queue allocation is > 90%, otherwise false
+ */
+
+bool tipc_sk_overlimit1(struct sock *sk, struct sk_buff *skb)
+{
+       atomic_t *dcnt = &tipc_sk(sk)->dupl_rcvcnt;
+       unsigned int lim = rcvbuf_limit(sk, skb) + atomic_read(dcnt);
+       unsigned int qsize = sk->sk_backlog.len + sk_rmem_alloc_get(sk);
+
+       return (qsize > lim * 90 / 100);
+}
+
+/**
+ * tipc_sk_overlimit2 - check if socket rx queue is about to be overloaded,
+ *                     only the rcv queue is considered
+ * @sk: tipc sk to be checked
+ * @skb: tipc msg to be checked
+ *
+ * Returns true if the socket rx queue allocation is > 90%, otherwise false
+ */
+
+bool tipc_sk_overlimit2(struct sock *sk, struct sk_buff *skb)
+{
+       unsigned int lim = rcvbuf_limit(sk, skb);
+       unsigned int qsize = sk_rmem_alloc_get(sk);
+
+       return (qsize > lim * 90 / 100);
+}
+
 /**
  * tipc_sk_dump - dump TIPC socket
  * @sk: tipc sk to be dumped
 
                        {(0xcbe),       "SYNCH_BEGIN_EVT"               },\
                        {(0xcee),       "SYNCH_END_EVT"                 })
 
+extern unsigned long sysctl_tipc_sk_filter[5] __read_mostly;
+
 int tipc_skb_dump(struct sk_buff *skb, bool more, char *buf);
 int tipc_list_dump(struct sk_buff_head *list, bool more, char *buf);
 int tipc_sk_dump(struct sock *sk, u16 dqueues, char *buf);
 int tipc_link_dump(struct tipc_link *l, u16 dqueues, char *buf);
 int tipc_node_dump(struct tipc_node *n, bool more, char *buf);
+bool tipc_sk_filtering(struct sock *sk);
 
 DECLARE_EVENT_CLASS(tipc_skb_class,
 
                  __get_str(skb_buf), __get_str(buf))
 );
 
-#define DEFINE_SK_EVENT(name) \
-DEFINE_EVENT(tipc_sk_class, name, \
+#define DEFINE_SK_EVENT_FILTER(name) \
+DEFINE_EVENT_CONDITION(tipc_sk_class, name, \
+       TP_PROTO(struct sock *sk, struct sk_buff *skb, u16 dqueues, \
+                const char *header), \
+       TP_ARGS(sk, skb, dqueues, header), \
+       TP_CONDITION(tipc_sk_filtering(sk)))
+DEFINE_SK_EVENT_FILTER(tipc_sk_dump);
+DEFINE_SK_EVENT_FILTER(tipc_sk_create);
+DEFINE_SK_EVENT_FILTER(tipc_sk_sendmcast);
+DEFINE_SK_EVENT_FILTER(tipc_sk_sendmsg);
+DEFINE_SK_EVENT_FILTER(tipc_sk_sendstream);
+DEFINE_SK_EVENT_FILTER(tipc_sk_poll);
+DEFINE_SK_EVENT_FILTER(tipc_sk_filter_rcv);
+DEFINE_SK_EVENT_FILTER(tipc_sk_advance_rx);
+DEFINE_SK_EVENT_FILTER(tipc_sk_rej_msg);
+DEFINE_SK_EVENT_FILTER(tipc_sk_drop_msg);
+DEFINE_SK_EVENT_FILTER(tipc_sk_release);
+DEFINE_SK_EVENT_FILTER(tipc_sk_shutdown);
+
+#define DEFINE_SK_EVENT_FILTER_COND(name, cond) \
+DEFINE_EVENT_CONDITION(tipc_sk_class, name, \
        TP_PROTO(struct sock *sk, struct sk_buff *skb, u16 dqueues, \
                 const char *header), \
-       TP_ARGS(sk, skb, dqueues, header))
-DEFINE_SK_EVENT(tipc_sk_dump);
+       TP_ARGS(sk, skb, dqueues, header), \
+       TP_CONDITION(tipc_sk_filtering(sk) && (cond)))
+DEFINE_SK_EVENT_FILTER_COND(tipc_sk_overlimit1, tipc_sk_overlimit1(sk, skb));
+DEFINE_SK_EVENT_FILTER_COND(tipc_sk_overlimit2, tipc_sk_overlimit2(sk, skb));
 
 DECLARE_EVENT_CLASS(tipc_link_class,