Place a spinlock around the invocation of call->notify_rx() for a kernel
service call and lock again when ending the call and replace the
notification pointer with a pointer to a dummy function.
This is required because it's possible for rxrpc_notify_socket() to be
called after the call has been ended by the kernel service if called from
the asynchronous work function rxrpc_process_call().
However, rxrpc_notify_socket() currently only holds the RCU read lock when
invoking ->notify_rx(), which means that the afs_call struct would need to
be disposed of by call_rcu() rather than by kfree().
But we shouldn't see any notifications from a call after calling
rxrpc_kernel_end_call(), so a lock is required in rxrpc code.
Without this, we may see the call wait queue as having a corrupt spinlock:
    BUG: spinlock bad magic on CPU#0, kworker/0:2/1612
    general protection fault: 0000 [#1] SMP
    ...
    Workqueue: krxrpcd rxrpc_process_call
    task: 
ffff88040b83c400 task.stack: 
ffff88040adfc000
    RIP: 0010:spin_bug+0x161/0x18f
    RSP: 0018:
ffff88040adffcc0 EFLAGS: 
00010002
    RAX: 
0000000000000032 RBX: 
6b6b6b6b6b6b6b6b RCX: 
ffffffff81ab16cf
    RDX: 
ffff88041fa14c01 RSI: 
ffff88041fa0ccb8 RDI: 
ffff88041fa0ccb8
    RBP: 
ffff88040adffcd8 R08: 
00000000ffffffff R09: 
00000000ffffffff
    R10: 
ffff88040adffc60 R11: 
000000000000022c R12: 
ffff88040aca2208
    R13: 
ffffffff81a58114 R14: 
0000000000000000 R15: 
0000000000000000
    ....
    Call Trace:
     do_raw_spin_lock+0x1d/0x89
     _raw_spin_lock_irqsave+0x3d/0x49
     ? __wake_up_common_lock+0x4c/0xa7
     __wake_up_common_lock+0x4c/0xa7
     ? __lock_is_held+0x47/0x7a
     __wake_up+0xe/0x10
     afs_wake_up_call_waiter+0x11b/0x122 [kafs]
     rxrpc_notify_socket+0x12b/0x258
     rxrpc_process_call+0x18e/0x7d0
     process_one_work+0x298/0x4de
     ? rescuer_thread+0x280/0x280
     worker_thread+0x1d1/0x2ae
     ? rescuer_thread+0x280/0x280
     kthread+0x12c/0x134
     ? kthread_create_on_node+0x3a/0x3a
     ret_from_fork+0x27/0x40
In this case, note the corrupt data in EBX.  The address of the offending
afs_call is in R12, plus the offset to the spinlock.
Signed-off-by: David Howells <dhowells@redhat.com>
 }
 EXPORT_SYMBOL(rxrpc_kernel_begin_call);
 
+/*
+ * Dummy function used to stop the notifier talking to recvmsg().
+ */
+static void rxrpc_dummy_notify_rx(struct sock *sk, struct rxrpc_call *rxcall,
+                                 unsigned long call_user_ID)
+{
+}
+
 /**
  * rxrpc_kernel_end_call - Allow a kernel service to end a call it was using
  * @sock: The socket the call is on
 
        mutex_lock(&call->user_mutex);
        rxrpc_release_call(rxrpc_sk(sock->sk), call);
+
+       /* Make sure we're not going to call back into a kernel service */
+       if (call->notify_rx) {
+               spin_lock_bh(&call->notify_lock);
+               call->notify_rx = rxrpc_dummy_notify_rx;
+               spin_unlock_bh(&call->notify_lock);
+       }
+
        mutex_unlock(&call->user_mutex);
        rxrpc_put_call(call, rxrpc_call_put_kernel);
 }
 
        unsigned long           flags;
        unsigned long           events;
        spinlock_t              lock;
+       spinlock_t              notify_lock;    /* Kernel notification lock */
        rwlock_t                state_lock;     /* lock for state transition */
        u32                     abort_code;     /* Local/remote abort code */
        int                     error;          /* Local error incurred */
 
        INIT_LIST_HEAD(&call->sock_link);
        init_waitqueue_head(&call->waitq);
        spin_lock_init(&call->lock);
+       spin_lock_init(&call->notify_lock);
        rwlock_init(&call->state_lock);
        atomic_set(&call->usage, 1);
        call->debug_id = atomic_inc_return(&rxrpc_debug_id);
 
        sk = &rx->sk;
        if (rx && sk->sk_state < RXRPC_CLOSE) {
                if (call->notify_rx) {
+                       spin_lock_bh(&call->notify_lock);
                        call->notify_rx(sk, call, call->user_call_ID);
+                       spin_unlock_bh(&call->notify_lock);
                } else {
                        write_lock_bh(&rx->recvmsg_lock);
                        if (list_empty(&call->recvmsg_link)) {