void rxrpc_kernel_end_call(struct socket *sock, struct rxrpc_call *call)
 {
        _enter("%d{%d}", call->debug_id, atomic_read(&call->usage));
-       rxrpc_remove_user_ID(rxrpc_sk(sock->sk), call);
-       rxrpc_purge_queue(&call->knlrecv_queue);
+       rxrpc_release_call(rxrpc_sk(sock->sk), call);
        rxrpc_put_call(call, rxrpc_call_put);
 }
 EXPORT_SYMBOL(rxrpc_kernel_end_call);
                return -ENOMEM;
 
        sock_init_data(sock, sk);
+       sock_set_flag(sk, SOCK_RCU_FREE);
        sk->sk_state            = RXRPC_UNBOUND;
        sk->sk_write_space      = rxrpc_write_space;
        sk->sk_max_ack_backlog  = 0;
 
 #define rxrpc_queue_delayed_work(WS,D) \
        queue_delayed_work(rxrpc_workqueue, (WS), (D))
 
-#define rxrpc_queue_call(CALL) rxrpc_queue_work(&(CALL)->processor)
-
 struct rxrpc_connection;
 
 /*
        RXRPC_CALL_EV_ACCEPTED,         /* incoming call accepted by userspace app */
        RXRPC_CALL_EV_SECURED,          /* incoming call's connection is now secure */
        RXRPC_CALL_EV_POST_ACCEPT,      /* need to post an "accept?" message to the app */
-       RXRPC_CALL_EV_RELEASE,          /* need to release the call's resources */
 };
 
 /*
        RXRPC_CALL_SERVER_SEND_REPLY,   /* - server sending reply */
        RXRPC_CALL_SERVER_AWAIT_ACK,    /* - server awaiting final ACK */
        RXRPC_CALL_COMPLETE,            /* - call complete */
-       RXRPC_CALL_DEAD,                /* - call is dead */
        NR__RXRPC_CALL_STATES
 };
 
        struct rcu_head         rcu;
        struct rxrpc_connection *conn;          /* connection carrying call */
        struct rxrpc_peer       *peer;          /* Peer record for remote address */
-       struct rxrpc_sock       *socket;        /* socket responsible */
+       struct rxrpc_sock __rcu *socket;        /* socket responsible */
        struct timer_list       lifetimer;      /* lifetime remaining on call */
-       struct timer_list       deadspan;       /* reap timer for re-ACK'ing, etc  */
        struct timer_list       ack_timer;      /* ACK generation timer */
        struct timer_list       resend_timer;   /* Tx resend timer */
-       struct work_struct      destroyer;      /* call destroyer */
        struct work_struct      processor;      /* packet processor and ACK generator */
        rxrpc_notify_rx_t       notify_rx;      /* kernel service Rx notification function */
        struct list_head        link;           /* link in master call list */
 extern const char *const rxrpc_call_states[];
 extern const char *const rxrpc_call_completions[];
 extern unsigned int rxrpc_max_call_lifetime;
-extern unsigned int rxrpc_dead_call_expiry;
 extern struct kmem_cache *rxrpc_call_jar;
 extern struct list_head rxrpc_calls;
 extern rwlock_t rxrpc_call_lock;
 struct rxrpc_call *rxrpc_incoming_call(struct rxrpc_sock *,
                                       struct rxrpc_connection *,
                                       struct sk_buff *);
-void rxrpc_release_call(struct rxrpc_call *);
+void rxrpc_release_call(struct rxrpc_sock *, struct rxrpc_call *);
 void rxrpc_release_calls_on_socket(struct rxrpc_sock *);
+bool __rxrpc_queue_call(struct rxrpc_call *);
+bool rxrpc_queue_call(struct rxrpc_call *);
 void rxrpc_see_call(struct rxrpc_call *);
 void rxrpc_get_call(struct rxrpc_call *, enum rxrpc_call_trace);
 void rxrpc_put_call(struct rxrpc_call *, enum rxrpc_call_trace);
 /*
  * output.c
  */
+int rxrpc_send_call_packet(struct rxrpc_call *, u8);
 int rxrpc_send_data_packet(struct rxrpc_connection *, struct sk_buff *);
 
 /*
 /*
  * recvmsg.c
  */
-void rxrpc_remove_user_ID(struct rxrpc_sock *, struct rxrpc_call *);
 int rxrpc_recvmsg(struct socket *, struct msghdr *, size_t, int);
 
 /*
 
        _debug("invalid");
        read_unlock_bh(&local->services_lock);
 
-       read_lock_bh(&call->state_lock);
-       if (!test_bit(RXRPC_CALL_RELEASED, &call->flags) &&
-           !test_and_set_bit(RXRPC_CALL_EV_RELEASE, &call->events)) {
-               rxrpc_get_call(call, rxrpc_call_got);
-               rxrpc_queue_call(call);
-       }
-       read_unlock_bh(&call->state_lock);
+       rxrpc_release_call(rx, call);
        rxrpc_put_call(call, rxrpc_call_put);
        ret = -ECONNREFUSED;
 error:
        if (sk_acceptq_is_full(&rx->sk))
                goto backlog_full;
        sk_acceptq_added(&rx->sk);
-       sock_hold(&rx->sk);
        read_unlock_bh(&local->services_lock);
 
        ret = rxrpc_accept_incoming_call(local, rx, skb, &srx);
        if (ret < 0)
                sk_acceptq_removed(&rx->sk);
-       sock_put(&rx->sk);
        switch (ret) {
        case -ECONNRESET: /* old calls are ignored */
        case -ECONNABORTED: /* aborted calls are reaborted or ignored */
        case RXRPC_CALL_COMPLETE:
                ret = call->error;
                goto out_release;
-       case RXRPC_CALL_DEAD:
-               ret = -ETIME;
-               goto out_discard;
        default:
                BUG();
        }
                BUG();
        if (test_and_set_bit(RXRPC_CALL_EV_ACCEPTED, &call->events))
                BUG();
-       rxrpc_queue_call(call);
 
        write_unlock_bh(&call->state_lock);
        write_unlock(&rx->call_lock);
+       rxrpc_queue_call(call);
        _leave(" = %p{%d}", call, call->debug_id);
        return call;
 
-       /* if the call is already dying or dead, then we leave the socket's ref
-        * on it to be released by rxrpc_dead_call_expired() as induced by
-        * rxrpc_release_call() */
 out_release:
-       _debug("release %p", call);
-       if (!test_bit(RXRPC_CALL_RELEASED, &call->flags) &&
-           !test_and_set_bit(RXRPC_CALL_EV_RELEASE, &call->events))
-               rxrpc_queue_call(call);
-out_discard:
        write_unlock_bh(&call->state_lock);
-       _debug("discard %p", call);
+       write_unlock(&rx->call_lock);
+       _debug("release %p", call);
+       rxrpc_release_call(rx, call);
+       _leave(" = %d", ret);
+       return ERR_PTR(ret);
 out:
        write_unlock(&rx->call_lock);
        _leave(" = %d", ret);
        write_lock(&rx->call_lock);
 
        ret = -ENODATA;
-       if (list_empty(&rx->acceptq))
-               goto out;
+       if (list_empty(&rx->acceptq)) {
+               write_unlock(&rx->call_lock);
+               _leave(" = -ENODATA");
+               return -ENODATA;
+       }
 
        /* dequeue the first call and check it's still valid */
        call = list_entry(rx->acceptq.next, struct rxrpc_call, accept_link);
                if (test_and_set_bit(RXRPC_CALL_EV_REJECT_BUSY, &call->events))
                        rxrpc_queue_call(call);
                ret = 0;
-               goto out_release;
+               break;
        case RXRPC_CALL_COMPLETE:
                ret = call->error;
-               goto out_release;
-       case RXRPC_CALL_DEAD:
-               ret = -ETIME;
-               goto out_discard;
+               break;
        default:
                BUG();
        }
 
-       /* if the call is already dying or dead, then we leave the socket's ref
-        * on it to be released by rxrpc_dead_call_expired() as induced by
-        * rxrpc_release_call() */
-out_release:
-       _debug("release %p", call);
-       if (!test_bit(RXRPC_CALL_RELEASED, &call->flags) &&
-           !test_and_set_bit(RXRPC_CALL_EV_RELEASE, &call->events))
-               rxrpc_queue_call(call);
-out_discard:
        write_unlock_bh(&call->state_lock);
-       _debug("discard %p", call);
-out:
        write_unlock(&rx->call_lock);
+       rxrpc_release_call(rx, call);
        _leave(" = %d", ret);
        return ret;
 }
 
 }
 
 /*
- * handle background processing of incoming call packets and ACK / abort
- * generation
+ * Handle background processing of incoming call packets and ACK / abort
+ * generation.  A ref on the call is donated to us by whoever queued the work
+ * item.
  */
 void rxrpc_process_call(struct work_struct *work)
 {
        unsigned long bits;
        __be32 data, pad;
        size_t len;
+       bool requeue = false;
        int loop, nbit, ioc, ret, mtu;
        u32 serial, abort_code = RX_PROTOCOL_ERROR;
        u8 *acks = NULL;
               call->debug_id, rxrpc_call_states[call->state], call->events,
               (jiffies - call->creation_jif) / (HZ / 10));
 
+       if (call->state >= RXRPC_CALL_COMPLETE) {
+               rxrpc_put_call(call, rxrpc_call_put);
+               return;
+       }
+
        if (!call->conn)
                goto skip_msg_init;
 
                spin_lock_bh(&call->lock);
 
                if (call->state == RXRPC_CALL_SERVER_SECURING) {
+                       struct rxrpc_sock *rx;
                        _debug("securing");
-                       write_lock(&call->socket->call_lock);
-                       if (!test_bit(RXRPC_CALL_RELEASED, &call->flags) &&
-                           !test_bit(RXRPC_CALL_EV_RELEASE, &call->events)) {
-                               _debug("not released");
-                               call->state = RXRPC_CALL_SERVER_ACCEPTING;
-                               list_move_tail(&call->accept_link,
-                                              &call->socket->acceptq);
+                       rcu_read_lock();
+                       rx = rcu_dereference(call->socket);
+                       if (rx) {
+                               write_lock(&rx->call_lock);
+                               if (!test_bit(RXRPC_CALL_RELEASED, &call->flags)) {
+                                       _debug("not released");
+                                       call->state = RXRPC_CALL_SERVER_ACCEPTING;
+                                       list_move_tail(&call->accept_link,
+                                                      &rx->acceptq);
+                               }
+                               write_unlock(&rx->call_lock);
                        }
-                       write_unlock(&call->socket->call_lock);
+                       rcu_read_unlock();
                        read_lock(&call->state_lock);
                        if (call->state < RXRPC_CALL_COMPLETE)
                                set_bit(RXRPC_CALL_EV_POST_ACCEPT, &call->events);
                goto maybe_reschedule;
        }
 
-       if (test_bit(RXRPC_CALL_EV_RELEASE, &call->events)) {
-               rxrpc_release_call(call);
-               clear_bit(RXRPC_CALL_EV_RELEASE, &call->events);
-       }
-
        /* other events may have been raised since we started checking */
        goto maybe_reschedule;
 
                             &msg, iov, ioc, len);
        if (ret < 0) {
                _debug("sendmsg failed: %d", ret);
-               read_lock_bh(&call->state_lock);
-               if (call->state < RXRPC_CALL_DEAD)
-                       rxrpc_queue_call(call);
-               read_unlock_bh(&call->state_lock);
+               if (call->state < RXRPC_CALL_COMPLETE)
+                       requeue = true;
                goto error;
        }
 
 
 kill_ACKs:
        del_timer_sync(&call->ack_timer);
-       if (test_and_clear_bit(RXRPC_CALL_EV_ACK_FINAL, &call->events))
-               rxrpc_put_call(call, rxrpc_call_put);
        clear_bit(RXRPC_CALL_EV_ACK, &call->events);
 
 maybe_reschedule:
        if (call->events || !skb_queue_empty(&call->rx_queue)) {
-               read_lock_bh(&call->state_lock);
-               if (call->state < RXRPC_CALL_DEAD)
-                       rxrpc_queue_call(call);
-               read_unlock_bh(&call->state_lock);
-       }
-
-       /* don't leave aborted connections on the accept queue */
-       if (call->state >= RXRPC_CALL_COMPLETE &&
-           !list_empty(&call->accept_link)) {
-               _debug("X unlinking once-pending call %p { e=%lx f=%lx c=%x }",
-                      call, call->events, call->flags, call->conn->proto.cid);
-
-               read_lock_bh(&call->state_lock);
-               if (!test_bit(RXRPC_CALL_RELEASED, &call->flags) &&
-                   !test_and_set_bit(RXRPC_CALL_EV_RELEASE, &call->events))
-                       rxrpc_queue_call(call);
-               read_unlock_bh(&call->state_lock);
+               if (call->state < RXRPC_CALL_COMPLETE)
+                       requeue = true;
        }
 
 error:
        kfree(acks);
 
-       /* because we don't want two CPUs both processing the work item for one
-        * call at the same time, we use a flag to note when it's busy; however
-        * this means there's a race between clearing the flag and setting the
-        * work pending bit and the work item being processed again */
-       if (call->events && !work_pending(&call->processor)) {
+       if ((requeue || call->events) && !work_pending(&call->processor)) {
                _debug("jumpstart %x", call->conn->proto.cid);
-               rxrpc_queue_call(call);
+               __rxrpc_queue_call(call);
+       } else {
+               rxrpc_put_call(call, rxrpc_call_put);
        }
 
        _leave("");
 
  */
 unsigned int rxrpc_max_call_lifetime = 60 * HZ;
 
-/*
- * Time till dead call expires after last use (in jiffies).
- */
-unsigned int rxrpc_dead_call_expiry = 2 * HZ;
-
 const char *const rxrpc_call_states[NR__RXRPC_CALL_STATES] = {
        [RXRPC_CALL_UNINITIALISED]              = "Uninit  ",
        [RXRPC_CALL_CLIENT_AWAIT_CONN]          = "ClWtConn",
        [RXRPC_CALL_SERVER_SEND_REPLY]          = "SvSndRpl",
        [RXRPC_CALL_SERVER_AWAIT_ACK]           = "SvAwtACK",
        [RXRPC_CALL_COMPLETE]                   = "Complete",
-       [RXRPC_CALL_DEAD]                       = "Dead    ",
 };
 
 const char *const rxrpc_call_completions[NR__RXRPC_CALL_COMPLETIONS] = {
 LIST_HEAD(rxrpc_calls);
 DEFINE_RWLOCK(rxrpc_call_lock);
 
-static void rxrpc_destroy_call(struct work_struct *work);
 static void rxrpc_call_life_expired(unsigned long _call);
-static void rxrpc_dead_call_expired(unsigned long _call);
 static void rxrpc_ack_time_expired(unsigned long _call);
 static void rxrpc_resend_time_expired(unsigned long _call);
+static void rxrpc_cleanup_call(struct rxrpc_call *call);
 
 /*
  * find an extant server call
 
        setup_timer(&call->lifetimer, &rxrpc_call_life_expired,
                    (unsigned long) call);
-       setup_timer(&call->deadspan, &rxrpc_dead_call_expired,
-                   (unsigned long) call);
        setup_timer(&call->ack_timer, &rxrpc_ack_time_expired,
                    (unsigned long) call);
        setup_timer(&call->resend_timer, &rxrpc_resend_time_expired,
                    (unsigned long) call);
-       INIT_WORK(&call->destroyer, &rxrpc_destroy_call);
        INIT_WORK(&call->processor, &rxrpc_process_call);
        INIT_LIST_HEAD(&call->link);
        INIT_LIST_HEAD(&call->chan_wait_link);
        if (!call)
                return ERR_PTR(-ENOMEM);
        call->state = RXRPC_CALL_CLIENT_AWAIT_CONN;
-
-       sock_hold(&rx->sk);
-       call->socket = rx;
        call->rx_data_post = 1;
        call->service_id = srx->srx_service;
+       rcu_assign_pointer(call->socket, rx);
 
        _leave(" = %p", call);
        return call;
                return call;
        }
 
-       trace_rxrpc_call(call, 0, atomic_read(&call->usage), 0, here,
-                        (const void *)user_call_ID);
+       trace_rxrpc_call(call, rxrpc_call_new_client,
+                        atomic_read(&call->usage), 0,
+                        here, (const void *)user_call_ID);
 
        /* Publish the call, even though it is incompletely set up as yet */
        call->user_call_ID = user_call_ID;
        list_del_init(&call->link);
        write_unlock_bh(&rxrpc_call_lock);
 
+error_out:
+       __rxrpc_set_call_completion(call, RXRPC_CALL_LOCAL_ERROR,
+                                   RX_CALL_DEAD, ret);
        set_bit(RXRPC_CALL_RELEASED, &call->flags);
-       call->state = RXRPC_CALL_DEAD;
        rxrpc_put_call(call, rxrpc_call_put);
        _leave(" = %d", ret);
        return ERR_PTR(ret);
         */
 found_user_ID_now_present:
        write_unlock(&rx->call_lock);
-       set_bit(RXRPC_CALL_RELEASED, &call->flags);
-       call->state = RXRPC_CALL_DEAD;
-       rxrpc_put_call(call, rxrpc_call_put);
-       _leave(" = -EEXIST [%p]", call);
-       return ERR_PTR(-EEXIST);
+       ret = -EEXIST;
+       goto error_out;
 }
 
 /*
                         atomic_read(&candidate->usage), 0, here, NULL);
 
        chan = sp->hdr.cid & RXRPC_CHANNELMASK;
-       candidate->socket       = rx;
        candidate->conn         = conn;
        candidate->peer         = conn->params.peer;
        candidate->cid          = sp->hdr.cid;
        candidate->flags        |= (1 << RXRPC_CALL_IS_SERVICE);
        if (conn->security_ix > 0)
                candidate->state = RXRPC_CALL_SERVER_SECURING;
+       rcu_assign_pointer(candidate->socket, rx);
 
        spin_lock(&conn->channel_lock);
 
        candidate = NULL;
        conn->channels[chan].call_counter = call_id;
        rcu_assign_pointer(conn->channels[chan].call, call);
-       sock_hold(&rx->sk);
        rxrpc_get_connection(conn);
        rxrpc_get_peer(call->peer);
        spin_unlock(&conn->channel_lock);
        return ERR_PTR(-ECONNRESET);
 }
 
+/*
+ * Queue a call's work processor, getting a ref to pass to the work queue.
+ */
+bool rxrpc_queue_call(struct rxrpc_call *call)
+{
+       const void *here = __builtin_return_address(0);
+       int n = __atomic_add_unless(&call->usage, 1, 0);
+       int m = atomic_read(&call->skb_count);
+       if (n == 0)
+               return false;
+       if (rxrpc_queue_work(&call->processor))
+               trace_rxrpc_call(call, rxrpc_call_queued, n + 1, m, here, NULL);
+       else
+               rxrpc_put_call(call, rxrpc_call_put_noqueue);
+       return true;
+}
+
+/*
+ * Queue a call's work processor, passing the callers ref to the work queue.
+ */
+bool __rxrpc_queue_call(struct rxrpc_call *call)
+{
+       const void *here = __builtin_return_address(0);
+       int n = atomic_read(&call->usage);
+       int m = atomic_read(&call->skb_count);
+       ASSERTCMP(n, >=, 1);
+       if (rxrpc_queue_work(&call->processor))
+               trace_rxrpc_call(call, rxrpc_call_queued_ref, n, m, here, NULL);
+       else
+               rxrpc_put_call(call, rxrpc_call_put_noqueue);
+       return true;
+}
+
 /*
  * Note the re-emergence of a call.
  */
 /*
  * detach a call from a socket and set up for release
  */
-void rxrpc_release_call(struct rxrpc_call *call)
+void rxrpc_release_call(struct rxrpc_sock *rx, struct rxrpc_call *call)
 {
-       struct rxrpc_connection *conn = call->conn;
-       struct rxrpc_sock *rx = call->socket;
-
        _enter("{%d,%d,%d,%d}",
               call->debug_id, atomic_read(&call->usage),
               atomic_read(&call->ackr_not_idle),
        /* dissociate from the socket
         * - the socket's ref on the call is passed to the death timer
         */
-       _debug("RELEASE CALL %p (%d CONN %p)", call, call->debug_id, conn);
+       _debug("RELEASE CALL %p (%d)", call, call->debug_id);
 
        if (call->peer) {
                spin_lock(&call->peer->lock);
                rb_erase(&call->sock_node, &rx->calls);
                memset(&call->sock_node, 0xdd, sizeof(call->sock_node));
                clear_bit(RXRPC_CALL_HAS_USERID, &call->flags);
+               rxrpc_put_call(call, rxrpc_call_put_userid);
        }
        write_unlock_bh(&rx->call_lock);
 
        /* free up the channel for reuse */
-       write_lock_bh(&call->state_lock);
+       if (call->state == RXRPC_CALL_CLIENT_FINAL_ACK) {
+               clear_bit(RXRPC_CALL_EV_ACK_FINAL, &call->events);
+               rxrpc_send_call_packet(call, RXRPC_PACKET_TYPE_ACK);
+               rxrpc_call_completed(call);
+       } else {
+               write_lock_bh(&call->state_lock);
+
+               if (call->state < RXRPC_CALL_COMPLETE) {
+                       _debug("+++ ABORTING STATE %d +++\n", call->state);
+                       __rxrpc_abort_call(call, RX_CALL_DEAD, ECONNRESET);
+                       clear_bit(RXRPC_CALL_EV_ACK_FINAL, &call->events);
+                       rxrpc_send_call_packet(call, RXRPC_PACKET_TYPE_ABORT);
+               }
 
-       if (call->state < RXRPC_CALL_COMPLETE &&
-           call->state != RXRPC_CALL_CLIENT_FINAL_ACK) {
-               _debug("+++ ABORTING STATE %d +++\n", call->state);
-               __rxrpc_abort_call(call, RX_CALL_DEAD, ECONNRESET);
+               write_unlock_bh(&call->state_lock);
        }
-       write_unlock_bh(&call->state_lock);
 
-       rxrpc_disconnect_call(call);
+       if (call->conn)
+               rxrpc_disconnect_call(call);
 
        /* clean up the Rx queue */
        if (!skb_queue_empty(&call->rx_queue) ||
                }
                spin_unlock_bh(&call->lock);
        }
+       rxrpc_purge_queue(&call->knlrecv_queue);
 
        del_timer_sync(&call->resend_timer);
        del_timer_sync(&call->ack_timer);
        del_timer_sync(&call->lifetimer);
-       call->deadspan.expires = jiffies + rxrpc_dead_call_expiry;
-       add_timer(&call->deadspan);
 
        _leave("");
 }
 
-/*
- * handle a dead call being ready for reaping
- */
-static void rxrpc_dead_call_expired(unsigned long _call)
-{
-       struct rxrpc_call *call = (struct rxrpc_call *) _call;
-
-       _enter("{%d}", call->debug_id);
-
-       rxrpc_see_call(call);
-       write_lock_bh(&call->state_lock);
-       call->state = RXRPC_CALL_DEAD;
-       write_unlock_bh(&call->state_lock);
-       rxrpc_put_call(call, rxrpc_call_put);
-}
-
-/*
- * mark a call as to be released, aborting it if it's still in progress
- * - called with softirqs disabled
- */
-static void rxrpc_mark_call_released(struct rxrpc_call *call)
-{
-       bool sched = false;
-
-       rxrpc_see_call(call);
-       write_lock(&call->state_lock);
-       if (call->state < RXRPC_CALL_DEAD) {
-               sched = __rxrpc_abort_call(call, RX_CALL_DEAD, ECONNRESET);
-               if (!test_and_set_bit(RXRPC_CALL_EV_RELEASE, &call->events))
-                       sched = true;
-       }
-       write_unlock(&call->state_lock);
-       if (sched)
-               rxrpc_queue_call(call);
-}
-
 /*
  * release all the calls associated with a socket
  */
 
        /* kill the not-yet-accepted incoming calls */
        list_for_each_entry(call, &rx->secureq, accept_link) {
-               rxrpc_mark_call_released(call);
+               rxrpc_release_call(rx, call);
        }
 
        list_for_each_entry(call, &rx->acceptq, accept_link) {
-               rxrpc_mark_call_released(call);
+               rxrpc_release_call(rx, call);
        }
 
        /* mark all the calls as no longer wanting incoming packets */
        for (p = rb_first(&rx->calls); p; p = rb_next(p)) {
                call = rb_entry(p, struct rxrpc_call, sock_node);
-               rxrpc_mark_call_released(call);
+               rxrpc_release_call(rx, call);
        }
 
        read_unlock_bh(&rx->call_lock);
        if (n == 0) {
                _debug("call %d dead", call->debug_id);
                WARN_ON(m != 0);
-               ASSERTCMP(call->state, ==, RXRPC_CALL_DEAD);
-               rxrpc_queue_work(&call->destroyer);
+               rxrpc_cleanup_call(call);
        }
 }
 
        if (n == 0) {
                _debug("call %d dead", call->debug_id);
                WARN_ON(m != 0);
-               ASSERTCMP(call->state, ==, RXRPC_CALL_DEAD);
-               rxrpc_queue_work(&call->destroyer);
+               rxrpc_cleanup_call(call);
        }
 }
 
 {
        _net("DESTROY CALL %d", call->debug_id);
 
-       ASSERT(call->socket);
+       write_lock_bh(&rxrpc_call_lock);
+       list_del_init(&call->link);
+       write_unlock_bh(&rxrpc_call_lock);
 
        memset(&call->sock_node, 0xcd, sizeof(call->sock_node));
 
        del_timer_sync(&call->lifetimer);
-       del_timer_sync(&call->deadspan);
        del_timer_sync(&call->ack_timer);
        del_timer_sync(&call->resend_timer);
 
+       ASSERTCMP(call->state, ==, RXRPC_CALL_COMPLETE);
        ASSERT(test_bit(RXRPC_CALL_RELEASED, &call->flags));
-       ASSERTCMP(call->events, ==, 0);
-       if (work_pending(&call->processor)) {
-               _debug("defer destroy");
-               rxrpc_queue_work(&call->destroyer);
-               return;
-       }
-
+       ASSERT(!work_pending(&call->processor));
        ASSERTCMP(call->conn, ==, NULL);
 
        if (call->acks_window) {
        rxrpc_purge_queue(&call->rx_queue);
        ASSERT(skb_queue_empty(&call->rx_oos_queue));
        rxrpc_purge_queue(&call->knlrecv_queue);
-       sock_put(&call->socket->sk);
        call_rcu(&call->rcu, rxrpc_rcu_destroy_call);
 }
 
 /*
- * destroy a call
- */
-static void rxrpc_destroy_call(struct work_struct *work)
-{
-       struct rxrpc_call *call =
-               container_of(work, struct rxrpc_call, destroyer);
-
-       _enter("%p{%d,%x,%p}",
-              call, atomic_read(&call->usage), call->cid, call->conn);
-
-       ASSERTCMP(call->state, ==, RXRPC_CALL_DEAD);
-
-       write_lock_bh(&rxrpc_call_lock);
-       list_del_init(&call->link);
-       write_unlock_bh(&rxrpc_call_lock);
-
-       rxrpc_cleanup_call(call);
-       _leave("");
-}
-
-/*
- * preemptively destroy all the call records from a transport endpoint rather
- * than waiting for them to time out
+ * Make sure that all calls are gone.
  */
 void __exit rxrpc_destroy_all_calls(void)
 {
        struct rxrpc_call *call;
 
        _enter("");
+
+       if (list_empty(&rxrpc_calls))
+               return;
+       
        write_lock_bh(&rxrpc_call_lock);
 
        while (!list_empty(&rxrpc_calls)) {
                rxrpc_see_call(call);
                list_del_init(&call->link);
 
-               switch (atomic_read(&call->usage)) {
-               case 0:
-                       ASSERTCMP(call->state, ==, RXRPC_CALL_DEAD);
-                       break;
-               case 1:
-                       if (del_timer_sync(&call->deadspan) != 0 &&
-                           call->state != RXRPC_CALL_DEAD)
-                               rxrpc_dead_call_expired((unsigned long) call);
-                       if (call->state != RXRPC_CALL_DEAD)
-                               break;
-               default:
-                       pr_err("Call %p still in use (%d,%d,%s,%lx,%lx)!\n",
-                              call, atomic_read(&call->usage),
-                              atomic_read(&call->ackr_not_idle),
-                              rxrpc_call_states[call->state],
-                              call->flags, call->events);
-                       if (!skb_queue_empty(&call->rx_queue))
-                               pr_err("Rx queue occupied\n");
-                       if (!skb_queue_empty(&call->rx_oos_queue))
-                               pr_err("OOS queue occupied\n");
-                       break;
-               }
+               pr_err("Call %p still in use (%d,%d,%s,%lx,%lx)!\n",
+                      call, atomic_read(&call->usage),
+                      atomic_read(&call->ackr_not_idle),
+                      rxrpc_call_states[call->state],
+                      call->flags, call->events);
+               if (!skb_queue_empty(&call->rx_queue))
+                       pr_err("Rx queue occupied\n");
+               if (!skb_queue_empty(&call->rx_oos_queue))
+                       pr_err("OOS queue occupied\n");
 
                write_unlock_bh(&rxrpc_call_lock);
                cond_resched();
 
                        bool force, bool terminal)
 {
        struct rxrpc_skb_priv *sp;
-       struct rxrpc_sock *rx = call->socket;
+       struct rxrpc_sock *rx;
        struct sock *sk;
        int ret;
 
                return 0;
        }
 
+       /* The socket may go away under us */
+       ret = 0;
+       rcu_read_lock();
+       rx = rcu_dereference(call->socket);
+       if (!rx)
+               goto out;
        sk = &rx->sk;
+       if (sock_flag(sk, SOCK_DEAD))
+               goto out;
 
        if (!force) {
                /* cast skb->rcvbuf to unsigned...  It's pointless, but
        spin_lock_bh(&sk->sk_receive_queue.lock);
        if (!test_bit(RXRPC_CALL_TERMINAL_MSG, &call->flags) &&
            !test_bit(RXRPC_CALL_RELEASED, &call->flags) &&
-           call->socket->sk.sk_state != RXRPC_CLOSE) {
+           sk->sk_state != RXRPC_CLOSE) {
                skb->destructor = rxrpc_packet_destructor;
                skb->dev = NULL;
                skb->sk = sk;
                        __skb_queue_tail(&sk->sk_receive_queue, skb);
                        spin_unlock_bh(&sk->sk_receive_queue.lock);
 
-                       if (!sock_flag(sk, SOCK_DEAD))
-                               sk->sk_data_ready(sk);
+                       sk->sk_data_ready(sk);
                }
                skb = NULL;
        } else {
 
 out:
        rxrpc_free_skb(skb);
+       rcu_read_unlock();
 
        _leave(" = %d", ret);
        return ret;
        skb_queue_tail(&call->rx_queue, skb);
        atomic_inc(&call->ackr_not_idle);
        read_lock(&call->state_lock);
-       if (call->state < RXRPC_CALL_DEAD)
+       if (call->state < RXRPC_CALL_COMPLETE)
                rxrpc_queue_call(call);
        read_unlock(&call->state_lock);
        _leave(" = 0 [queued]");
        case RXRPC_PACKET_TYPE_ACK:
                /* ACK processing is done in process context */
                read_lock_bh(&call->state_lock);
-               if (call->state < RXRPC_CALL_DEAD) {
+               if (call->state < RXRPC_CALL_COMPLETE) {
                        skb_queue_tail(&call->rx_queue, skb);
                        rxrpc_queue_call(call);
                        skb = NULL;
 
        read_lock(&call->state_lock);
        switch (call->state) {
-       case RXRPC_CALL_DEAD:
-               goto dead_call;
-
        case RXRPC_CALL_COMPLETE:
                switch (call->completion) {
                case RXRPC_CALL_LOCALLY_ABORTED:
        }
 
        read_unlock(&call->state_lock);
-       rxrpc_get_call(call, rxrpc_call_got);
 
        if (sp->hdr.type == RXRPC_PACKET_TYPE_DATA &&
            sp->hdr.flags & RXRPC_JUMBO_PACKET)
        else
                rxrpc_fast_process_packet(call, skb);
 
-       rxrpc_put_call(call, rxrpc_call_put);
        goto done;
 
 resend_final_ack:
        _debug("final ack again");
-       rxrpc_get_call(call, rxrpc_call_got);
        set_bit(RXRPC_CALL_EV_ACK_FINAL, &call->events);
        rxrpc_queue_call(call);
        goto free_unlock;
 
 #include <net/af_rxrpc.h>
 #include "ar-internal.h"
 
+struct rxrpc_pkt_buffer {
+       struct rxrpc_wire_header whdr;
+       union {
+               struct {
+                       struct rxrpc_ackpacket ack;
+                       u8 acks[255];
+                       u8 pad[3];
+               };
+               __be32 abort_code;
+       };
+       struct rxrpc_ackinfo ackinfo;
+};
+
+/*
+ * Fill out an ACK packet.
+ */
+static size_t rxrpc_fill_out_ack(struct rxrpc_call *call,
+                                struct rxrpc_pkt_buffer *pkt)
+{
+       u32 mtu, jmax;
+       u8 *ackp = pkt->acks;
+
+       pkt->ack.bufferSpace    = htons(8);
+       pkt->ack.maxSkew        = htons(0);
+       pkt->ack.firstPacket    = htonl(call->rx_data_eaten + 1);
+       pkt->ack.previousPacket = htonl(call->ackr_prev_seq);
+       pkt->ack.serial         = htonl(call->ackr_serial);
+       pkt->ack.reason         = RXRPC_ACK_IDLE;
+       pkt->ack.nAcks          = 0;
+
+       mtu = call->peer->if_mtu;
+       mtu -= call->peer->hdrsize;
+       jmax = rxrpc_rx_jumbo_max;
+       pkt->ackinfo.rxMTU      = htonl(rxrpc_rx_mtu);
+       pkt->ackinfo.maxMTU     = htonl(mtu);
+       pkt->ackinfo.rwind      = htonl(rxrpc_rx_window_size);
+       pkt->ackinfo.jumbo_max  = htonl(jmax);
+
+       *ackp++ = 0;
+       *ackp++ = 0;
+       *ackp++ = 0;
+       return 3;
+}
+
+/*
+ * Send a final ACK or ABORT call packet.
+ */
+int rxrpc_send_call_packet(struct rxrpc_call *call, u8 type)
+{
+       struct rxrpc_connection *conn = NULL;
+       struct rxrpc_pkt_buffer *pkt;
+       struct msghdr msg;
+       struct kvec iov[2];
+       rxrpc_serial_t serial;
+       size_t len, n;
+       int ioc, ret;
+       u32 abort_code;
+
+       _enter("%u,%s", call->debug_id, rxrpc_pkts[type]);
+
+       spin_lock_bh(&call->lock);
+       if (call->conn)
+               conn = rxrpc_get_connection_maybe(call->conn);
+       spin_unlock_bh(&call->lock);
+       if (!conn)
+               return -ECONNRESET;
+
+       pkt = kzalloc(sizeof(*pkt), GFP_KERNEL);
+       if (!pkt) {
+               rxrpc_put_connection(conn);
+               return -ENOMEM;
+       }
+
+       serial = atomic_inc_return(&conn->serial);
+
+       msg.msg_name    = &call->peer->srx.transport;
+       msg.msg_namelen = call->peer->srx.transport_len;
+       msg.msg_control = NULL;
+       msg.msg_controllen = 0;
+       msg.msg_flags   = 0;
+
+       pkt->whdr.epoch         = htonl(conn->proto.epoch);
+       pkt->whdr.cid           = htonl(call->cid);
+       pkt->whdr.callNumber    = htonl(call->call_id);
+       pkt->whdr.seq           = 0;
+       pkt->whdr.serial        = htonl(serial);
+       pkt->whdr.type          = type;
+       pkt->whdr.flags         = conn->out_clientflag;
+       pkt->whdr.userStatus    = 0;
+       pkt->whdr.securityIndex = call->security_ix;
+       pkt->whdr._rsvd         = 0;
+       pkt->whdr.serviceId     = htons(call->service_id);
+
+       iov[0].iov_base = pkt;
+       iov[0].iov_len  = sizeof(pkt->whdr);
+       len = sizeof(pkt->whdr);
+
+       switch (type) {
+       case RXRPC_PACKET_TYPE_ACK:
+               spin_lock_bh(&call->lock);
+               n = rxrpc_fill_out_ack(call, pkt);
+               call->ackr_reason = 0;
+
+               spin_unlock_bh(&call->lock);
+
+               _proto("Tx ACK %%%u { m=%hu f=#%u p=#%u s=%%%u r=%s n=%u }",
+                      serial,
+                      ntohs(pkt->ack.maxSkew),
+                      ntohl(pkt->ack.firstPacket),
+                      ntohl(pkt->ack.previousPacket),
+                      ntohl(pkt->ack.serial),
+                      rxrpc_acks(pkt->ack.reason),
+                      pkt->ack.nAcks);
+
+               iov[0].iov_len += sizeof(pkt->ack) + n;
+               iov[1].iov_base = &pkt->ackinfo;
+               iov[1].iov_len  = sizeof(pkt->ackinfo);
+               len += sizeof(pkt->ack) + n + sizeof(pkt->ackinfo);
+               ioc = 2;
+               break;
+
+       case RXRPC_PACKET_TYPE_ABORT:
+               abort_code = call->abort_code;
+               pkt->abort_code = htonl(abort_code);
+               _proto("Tx ABORT %%%u { %d }", serial, abort_code);
+               iov[0].iov_len += sizeof(pkt->abort_code);
+               len += sizeof(pkt->abort_code);
+               ioc = 1;
+               break;
+
+       default:
+               BUG();
+               ret = -ENOANO;
+               goto out;
+       }
+
+       ret = kernel_sendmsg(conn->params.local->socket,
+                            &msg, iov, ioc, len);
+
+out:
+       rxrpc_put_connection(conn);
+       kfree(pkt);
+       return ret;
+}
+
 /*
  * send a packet through the transport endpoint
  */
 
  */
 static void *rxrpc_call_seq_start(struct seq_file *seq, loff_t *_pos)
 {
+       rcu_read_lock();
        read_lock(&rxrpc_call_lock);
        return seq_list_start_head(&rxrpc_calls, *_pos);
 }
 static void rxrpc_call_seq_stop(struct seq_file *seq, void *v)
 {
        read_unlock(&rxrpc_call_lock);
+       rcu_read_unlock();
 }
 
 static int rxrpc_call_seq_show(struct seq_file *seq, void *v)
 
        call = list_entry(v, struct rxrpc_call, link);
 
-       rx = READ_ONCE(call->socket);
+       rx = rcu_dereference(call->socket);
        if (rx) {
                local = READ_ONCE(rx->local);
                if (local)
 
 #include <net/af_rxrpc.h>
 #include "ar-internal.h"
 
-/*
- * removal a call's user ID from the socket tree to make the user ID available
- * again and so that it won't be seen again in association with that call
- */
-void rxrpc_remove_user_ID(struct rxrpc_sock *rx, struct rxrpc_call *call)
-{
-       _debug("RELEASE CALL %d", call->debug_id);
-
-       if (test_bit(RXRPC_CALL_HAS_USERID, &call->flags)) {
-               write_lock_bh(&rx->call_lock);
-               rb_erase(&call->sock_node, &call->socket->calls);
-               clear_bit(RXRPC_CALL_HAS_USERID, &call->flags);
-               write_unlock_bh(&rx->call_lock);
-       }
-
-       read_lock_bh(&call->state_lock);
-       if (!test_bit(RXRPC_CALL_RELEASED, &call->flags) &&
-           !test_and_set_bit(RXRPC_CALL_EV_RELEASE, &call->events))
-               rxrpc_queue_call(call);
-       read_unlock_bh(&call->state_lock);
-}
-
 /*
  * receive a message from an RxRPC socket
  * - we need to be careful about two or more threads calling recvmsg
                if (skb_dequeue(&rx->sk.sk_receive_queue) != skb)
                        BUG();
                rxrpc_free_skb(skb);
-               rxrpc_remove_user_ID(rx, call);
+               rxrpc_release_call(rx, call);
        }
 
        release_sock(&rx->sk);
 
                call->state = RXRPC_CALL_CLIENT_FINAL_ACK;
                _debug("request final ACK");
 
-               /* get an extra ref on the call for the final-ACK generator to
-                * release */
-               rxrpc_get_call(call, rxrpc_call_got);
                set_bit(RXRPC_CALL_EV_ACK_FINAL, &call->events);
                if (try_to_del_timer_sync(&call->ack_timer) >= 0)
                        rxrpc_queue_call(call);
 
                .proc_handler   = proc_dointvec_jiffies,
                .extra1         = (void *)&one,
        },
-       {
-               .procname       = "dead_call_expiry",
-               .data           = &rxrpc_dead_call_expiry,
-               .maxlen         = sizeof(unsigned int),
-               .mode           = 0644,
-               .proc_handler   = proc_dointvec_jiffies,
-               .extra1         = (void *)&one,
-       },
 
        /* Non-time values */
        {