Add a trace event for debuging rxrpc_call struct usage.
Signed-off-by: David Howells <dhowells@redhat.com>
 
 #include <linux/tracepoint.h>
 
+TRACE_EVENT(rxrpc_call,
+           TP_PROTO(struct rxrpc_call *call, int op, int usage, int nskb,
+                    const void *where, const void *aux),
+
+           TP_ARGS(call, op, usage, nskb, where, aux),
+
+           TP_STRUCT__entry(
+                   __field(struct rxrpc_call *,        call            )
+                   __field(int,                        op              )
+                   __field(int,                        usage           )
+                   __field(int,                        nskb            )
+                   __field(const void *,               where           )
+                   __field(const void *,               aux             )
+                            ),
+
+           TP_fast_assign(
+                   __entry->call = call;
+                   __entry->op = op;
+                   __entry->usage = usage;
+                   __entry->nskb = nskb;
+                   __entry->where = where;
+                   __entry->aux = aux;
+                          ),
+
+           TP_printk("c=%p %s u=%d s=%d p=%pSR a=%p",
+                     __entry->call,
+                     (__entry->op == 0 ? "NWc" :
+                      __entry->op == 1 ? "NWs" :
+                      __entry->op == 2 ? "SEE" :
+                      __entry->op == 3 ? "GET" :
+                      __entry->op == 4 ? "Gsb" :
+                      __entry->op == 5 ? "PUT" :
+                      "Psb"),
+                     __entry->usage,
+                     __entry->nskb,
+                     __entry->where,
+                     __entry->aux)
+           );
+
 TRACE_EVENT(rxrpc_skb,
            TP_PROTO(struct sk_buff *skb, int op, int usage, int mod_count,
                     const void *where),
 
                                       struct sk_buff *);
 void rxrpc_release_call(struct rxrpc_call *);
 void rxrpc_release_calls_on_socket(struct rxrpc_sock *);
-void __rxrpc_put_call(struct rxrpc_call *);
+void rxrpc_see_call(struct rxrpc_call *);
+void rxrpc_get_call(struct rxrpc_call *);
+void rxrpc_put_call(struct rxrpc_call *);
+void rxrpc_get_call_for_skb(struct rxrpc_call *, struct sk_buff *);
+void rxrpc_put_call_for_skb(struct rxrpc_call *, struct sk_buff *);
 void __exit rxrpc_destroy_all_calls(void);
 
 static inline bool rxrpc_is_service_call(const struct rxrpc_call *call)
 } while (0)
 
 #endif /* __KDEBUGALL */
-
-
-#define rxrpc_get_call(CALL)                           \
-do {                                                   \
-       CHECK_SLAB_OKAY(&(CALL)->usage);                \
-       if (atomic_inc_return(&(CALL)->usage) == 1)     \
-               BUG();                                  \
-} while (0)
-
-#define rxrpc_put_call(CALL)                           \
-do {                                                   \
-       __rxrpc_put_call(CALL);                         \
-} while (0)
 
                        _debug("conn ready");
                        call->state = RXRPC_CALL_SERVER_ACCEPTING;
                        list_add_tail(&call->accept_link, &rx->acceptq);
-                       rxrpc_get_call(call);
-                       atomic_inc(&call->skb_count);
+                       rxrpc_get_call_for_skb(call, notification);
                        nsp = rxrpc_skb(notification);
                        nsp->call = call;
 
        call = list_entry(rx->acceptq.next, struct rxrpc_call, accept_link);
        list_del_init(&call->accept_link);
        sk_acceptq_removed(&rx->sk);
+       rxrpc_see_call(call);
 
        write_lock_bh(&call->state_lock);
        switch (call->state) {
        call = list_entry(rx->acceptq.next, struct rxrpc_call, accept_link);
        list_del_init(&call->accept_link);
        sk_acceptq_removed(&rx->sk);
+       rxrpc_see_call(call);
 
        write_lock_bh(&call->state_lock);
        switch (call->state) {
 
        skb->destructor = rxrpc_packet_destructor;
        ASSERTCMP(sp->call, ==, NULL);
        sp->call = call;
-       rxrpc_get_call(call);
-       atomic_inc(&call->skb_count);
+       rxrpc_get_call_for_skb(call, skb);
 
        /* insert into the buffer in sequence order */
        spin_lock_bh(&call->lock);
                _debug("post ACK");
                skb->mark = RXRPC_SKB_MARK_FINAL_ACK;
                sp->call = call;
-               rxrpc_get_call(call);
-               atomic_inc(&call->skb_count);
+               rxrpc_get_call_for_skb(call, skb);
                spin_lock_bh(&call->lock);
                if (rxrpc_queue_rcv_skb(call, skb, true, true) < 0)
                        BUG();
                memset(sp, 0, sizeof(*sp));
                sp->error = error;
                sp->call = call;
-               rxrpc_get_call(call);
-               atomic_inc(&call->skb_count);
+               rxrpc_get_call_for_skb(call, skb);
 
                spin_lock_bh(&call->lock);
                ret = rxrpc_queue_rcv_skb(call, skb, true, fatal);
        u32 serial, abort_code = RX_PROTOCOL_ERROR;
        u8 *acks = NULL;
 
+       rxrpc_see_call(call);
+
        //printk("\n--------------------\n");
        _enter("{%d,%s,%lx} [%lu]",
               call->debug_id, rxrpc_call_states[call->state], call->events,
 
 {
        struct rxrpc_call *call, *xcall;
        struct rb_node *parent, **pp;
+       const void *here = __builtin_return_address(0);
        int ret;
 
        _enter("%p,%lx", rx, user_call_ID);
                return call;
        }
 
+       trace_rxrpc_call(call, 0, atomic_read(&call->usage), 0, here,
+                        (const void *)user_call_ID);
+
        /* Publish the call, even though it is incompletely set up as yet */
        call->user_call_ID = user_call_ID;
        __set_bit(RXRPC_CALL_HAS_USERID, &call->flags);
 {
        struct rxrpc_skb_priv *sp = rxrpc_skb(skb);
        struct rxrpc_call *call, *candidate;
+       const void *here = __builtin_return_address(0);
        u32 call_id, chan;
 
        _enter(",%d", conn->debug_id);
        if (!candidate)
                return ERR_PTR(-EBUSY);
 
+       trace_rxrpc_call(candidate, 1, atomic_read(&candidate->usage),
+                        0, here, NULL);
+
        chan = sp->hdr.cid & RXRPC_CHANNELMASK;
        candidate->socket       = rx;
        candidate->conn         = conn;
        return ERR_PTR(-ECONNRESET);
 }
 
+/*
+ * Note the re-emergence of a call.
+ */
+void rxrpc_see_call(struct rxrpc_call *call)
+{
+       const void *here = __builtin_return_address(0);
+       if (call) {
+               int n = atomic_read(&call->usage);
+               int m = atomic_read(&call->skb_count);
+
+               trace_rxrpc_call(call, 2, n, m, here, 0);
+       }
+}
+
+/*
+ * Note the addition of a ref on a call.
+ */
+void rxrpc_get_call(struct rxrpc_call *call)
+{
+       const void *here = __builtin_return_address(0);
+       int n = atomic_inc_return(&call->usage);
+       int m = atomic_read(&call->skb_count);
+
+       trace_rxrpc_call(call, 3, n, m, here, 0);
+}
+
+/*
+ * Note the addition of a ref on a call for a socket buffer.
+ */
+void rxrpc_get_call_for_skb(struct rxrpc_call *call, struct sk_buff *skb)
+{
+       const void *here = __builtin_return_address(0);
+       int n = atomic_inc_return(&call->usage);
+       int m = atomic_inc_return(&call->skb_count);
+
+       trace_rxrpc_call(call, 4, n, m, here, skb);
+}
+
 /*
  * detach a call from a socket and set up for release
  */
               atomic_read(&call->ackr_not_idle),
               call->rx_first_oos);
 
+       rxrpc_see_call(call);
+
        spin_lock_bh(&call->lock);
        if (test_and_set_bit(RXRPC_CALL_RELEASED, &call->flags))
                BUG();
 
        _enter("{%d}", call->debug_id);
 
+       rxrpc_see_call(call);
        write_lock_bh(&call->state_lock);
        call->state = RXRPC_CALL_DEAD;
        write_unlock_bh(&call->state_lock);
 {
        bool sched;
 
+       rxrpc_see_call(call);
        write_lock(&call->state_lock);
        if (call->state < RXRPC_CALL_DEAD) {
                sched = __rxrpc_abort_call(call, RX_CALL_DEAD, ECONNRESET);
 /*
  * release a call
  */
-void __rxrpc_put_call(struct rxrpc_call *call)
+void rxrpc_put_call(struct rxrpc_call *call)
 {
-       ASSERT(call != NULL);
+       const void *here = __builtin_return_address(0);
+       int n, m;
 
-       _enter("%p{u=%d}", call, atomic_read(&call->usage));
+       ASSERT(call != NULL);
 
-       ASSERTCMP(atomic_read(&call->usage), >, 0);
+       n = atomic_dec_return(&call->usage);
+       m = atomic_read(&call->skb_count);
+       trace_rxrpc_call(call, 5, n, m, here, NULL);
+       ASSERTCMP(n, >=, 0);
+       if (n == 0) {
+               _debug("call %d dead", call->debug_id);
+               WARN_ON(m != 0);
+               ASSERTCMP(call->state, ==, RXRPC_CALL_DEAD);
+               rxrpc_queue_work(&call->destroyer);
+       }
+}
 
-       if (atomic_dec_and_test(&call->usage)) {
+/*
+ * Release a call ref held by a socket buffer.
+ */
+void rxrpc_put_call_for_skb(struct rxrpc_call *call, struct sk_buff *skb)
+{
+       const void *here = __builtin_return_address(0);
+       int n, m;
+
+       n = atomic_dec_return(&call->usage);
+       m = atomic_dec_return(&call->skb_count);
+       trace_rxrpc_call(call, 6, n, m, here, skb);
+       ASSERTCMP(n, >=, 0);
+       if (n == 0) {
                _debug("call %d dead", call->debug_id);
-               WARN_ON(atomic_read(&call->skb_count) != 0);
+               WARN_ON(m != 0);
                ASSERTCMP(call->state, ==, RXRPC_CALL_DEAD);
                rxrpc_queue_work(&call->destroyer);
        }
-       _leave("");
 }
 
 /*
                call = list_entry(rxrpc_calls.next, struct rxrpc_call, link);
                _debug("Zapping call %p", call);
 
+               rxrpc_see_call(call);
                list_del_init(&call->link);
 
                switch (atomic_read(&call->usage)) {
 
        _enter("{%d}", call->debug_id);
 
+       rxrpc_see_call(call);
        if (call->state >= RXRPC_CALL_COMPLETE)
                return;
 
 
        _enter("{%d}", call->debug_id);
 
+       rxrpc_see_call(call);
        if (call->state >= RXRPC_CALL_COMPLETE)
                return;
 
 
        _enter("{%d}", call->debug_id);
 
+       rxrpc_see_call(call);
        if (call->state >= RXRPC_CALL_COMPLETE)
                return;
 
 
                                             struct rxrpc_call, chan_wait_link);
        u32 call_id = chan->call_counter + 1;
 
+       rxrpc_see_call(call);
        list_del_init(&call->chan_wait_link);
        conn->active_chans |= 1 << channel;
        call->peer      = rxrpc_get_peer(conn->params.peer);
 
                        conn->channels[i].call,
                        lockdep_is_held(&conn->channel_lock));
                if (call) {
+                       rxrpc_see_call(call);
                        write_lock_bh(&call->state_lock);
                        if (rxrpc_set_call_completion(call, compl, abort_code,
                                                      error)) {
 
                goto enqueue_packet;
 
        sp->call = call;
-       rxrpc_get_call(call);
-       atomic_inc(&call->skb_count);
+       rxrpc_get_call_for_skb(call, skb);
        terminal = ((flags & RXRPC_LAST_PACKET) &&
                    !(flags & RXRPC_CLIENT_INITIATED));
        ret = rxrpc_queue_rcv_skb(call, skb, false, terminal);
                if (!call || atomic_read(&call->usage) == 0)
                        goto cant_route_call;
 
+               rxrpc_see_call(call);
                rxrpc_post_packet_to_call(call, skb);
                goto out_unlock;
        }
 
                        return PTR_ERR(call);
        }
 
+       rxrpc_see_call(call);
        _debug("CALL %d USR %lx ST %d on CONN %p",
               call->debug_id, call->user_call_ID, call->state, call->conn);
 
 
                call = hlist_entry(peer->error_targets.first,
                                   struct rxrpc_call, error_link);
                hlist_del_init(&call->error_link);
+               rxrpc_see_call(call);
 
                queue = false;
                write_lock(&call->state_lock);
 
                sp = rxrpc_skb(skb);
                call = sp->call;
                ASSERT(call != NULL);
+               rxrpc_see_call(call);
 
                _debug("next pkt %s", rxrpc_pkts[sp->hdr.type]);
 
 
        _enter("%p{%p}", skb, call);
 
        if (call) {
-               if (atomic_dec_return(&call->skb_count) < 0)
-                       BUG();
-               rxrpc_put_call(call);
+               rxrpc_put_call_for_skb(call, skb);
                sp->call = NULL;
        }