rxrpc_peer_new,
        rxrpc_peer_processing,
        rxrpc_peer_put,
-       rxrpc_peer_queued_error,
 };
 
 enum rxrpc_conn_trace {
        EM(rxrpc_peer_got,                      "GOT") \
        EM(rxrpc_peer_new,                      "NEW") \
        EM(rxrpc_peer_processing,               "PRO") \
-       EM(rxrpc_peer_put,                      "PUT") \
-       E_(rxrpc_peer_queued_error,             "QER")
+       E_(rxrpc_peer_put,                      "PUT")
 
 #define rxrpc_conn_traces \
        EM(rxrpc_conn_got,                      "GOT") \
 
        struct hlist_node       hash_link;
        struct rxrpc_local      *local;
        struct hlist_head       error_targets;  /* targets for net error distribution */
-       struct work_struct      error_distributor;
        struct rb_root          service_conns;  /* Service connections */
        struct list_head        keepalive_link; /* Link in net->peer_keepalive[] */
        time64_t                last_tx_at;     /* Last time packet sent here */
        unsigned int            maxdata;        /* data size (MTU - hdrsize) */
        unsigned short          hdrsize;        /* header size (IP + UDP + RxRPC) */
        int                     debug_id;       /* debug ID for printks */
-       int                     error_report;   /* Net (+0) or local (+1000000) to distribute */
-#define RXRPC_LOCAL_ERROR_OFFSET 1000000
        struct sockaddr_rxrpc   srx;            /* remote address */
 
        /* calculated RTT cache */
  * peer_event.c
  */
 void rxrpc_error_report(struct sock *);
-void rxrpc_peer_error_distributor(struct work_struct *);
 void rxrpc_peer_add_rtt(struct rxrpc_call *, enum rxrpc_rtt_rx_trace,
                        rxrpc_serial_t, rxrpc_serial_t, ktime_t, ktime_t);
 void rxrpc_peer_keepalive_worker(struct work_struct *);
 struct rxrpc_peer *rxrpc_get_peer(struct rxrpc_peer *);
 struct rxrpc_peer *rxrpc_get_peer_maybe(struct rxrpc_peer *);
 void rxrpc_put_peer(struct rxrpc_peer *);
-void __rxrpc_queue_peer_error(struct rxrpc_peer *);
 
 /*
  * proc.c
 
        rcu_assign_pointer(conn->channels[chan].call, call);
 
        spin_lock(&conn->params.peer->lock);
-       hlist_add_head(&call->error_link, &conn->params.peer->error_targets);
+       hlist_add_head_rcu(&call->error_link, &conn->params.peer->error_targets);
        spin_unlock(&conn->params.peer->lock);
 
        _net("CALL incoming %d on CONN %d", call->debug_id, call->conn->debug_id);
 
        }
 
        spin_lock_bh(&call->conn->params.peer->lock);
-       hlist_add_head(&call->error_link,
-                      &call->conn->params.peer->error_targets);
+       hlist_add_head_rcu(&call->error_link,
+                          &call->conn->params.peer->error_targets);
        spin_unlock_bh(&call->conn->params.peer->lock);
 
 out:
 
        call->peer->cong_cwnd = call->cong_cwnd;
 
        spin_lock_bh(&conn->params.peer->lock);
-       hlist_del_init(&call->error_link);
+       hlist_del_rcu(&call->error_link);
        spin_unlock_bh(&conn->params.peer->lock);
 
        if (rxrpc_is_client_call(call))
 
 #include "ar-internal.h"
 
 static void rxrpc_store_error(struct rxrpc_peer *, struct sock_exterr_skb *);
+static void rxrpc_distribute_error(struct rxrpc_peer *, int,
+                                  enum rxrpc_call_completion);
 
 /*
  * Find the peer associated with an ICMP packet.
        rcu_read_unlock();
        rxrpc_free_skb(skb, rxrpc_skb_rx_freed);
 
-       /* The ref we obtained is passed off to the work item */
-       __rxrpc_queue_peer_error(peer);
        _leave("");
 }
 
 static void rxrpc_store_error(struct rxrpc_peer *peer,
                              struct sock_exterr_skb *serr)
 {
+       enum rxrpc_call_completion compl = RXRPC_CALL_NETWORK_ERROR;
        struct sock_extended_err *ee;
        int err;
 
        case SO_EE_ORIGIN_NONE:
        case SO_EE_ORIGIN_LOCAL:
                _proto("Rx Received local error { error=%d }", err);
-               err += RXRPC_LOCAL_ERROR_OFFSET;
+               compl = RXRPC_CALL_LOCAL_ERROR;
                break;
 
        case SO_EE_ORIGIN_ICMP6:
                break;
        }
 
-       peer->error_report = err;
+       rxrpc_distribute_error(peer, err, compl);
 }
 
 /*
- * Distribute an error that occurred on a peer
+ * Distribute an error that occurred on a peer.
  */
-void rxrpc_peer_error_distributor(struct work_struct *work)
+static void rxrpc_distribute_error(struct rxrpc_peer *peer, int error,
+                                  enum rxrpc_call_completion compl)
 {
-       struct rxrpc_peer *peer =
-               container_of(work, struct rxrpc_peer, error_distributor);
        struct rxrpc_call *call;
-       enum rxrpc_call_completion compl;
-       int error;
-
-       _enter("");
-
-       error = READ_ONCE(peer->error_report);
-       if (error < RXRPC_LOCAL_ERROR_OFFSET) {
-               compl = RXRPC_CALL_NETWORK_ERROR;
-       } else {
-               compl = RXRPC_CALL_LOCAL_ERROR;
-               error -= RXRPC_LOCAL_ERROR_OFFSET;
-       }
 
-       _debug("ISSUE ERROR %s %d", rxrpc_call_completions[compl], error);
-
-       spin_lock_bh(&peer->lock);
-
-       while (!hlist_empty(&peer->error_targets)) {
-               call = hlist_entry(peer->error_targets.first,
-                                  struct rxrpc_call, error_link);
-               hlist_del_init(&call->error_link);
+       hlist_for_each_entry_rcu(call, &peer->error_targets, error_link) {
                rxrpc_see_call(call);
-
-               if (rxrpc_set_call_completion(call, compl, 0, -error))
+               if (call->state < RXRPC_CALL_COMPLETE &&
+                   rxrpc_set_call_completion(call, compl, 0, -error))
                        rxrpc_notify_socket(call);
        }
-
-       spin_unlock_bh(&peer->lock);
-
-       rxrpc_put_peer(peer);
-       _leave("");
 }
 
 /*
 
                atomic_set(&peer->usage, 1);
                peer->local = local;
                INIT_HLIST_HEAD(&peer->error_targets);
-               INIT_WORK(&peer->error_distributor,
-                         &rxrpc_peer_error_distributor);
                peer->service_conns = RB_ROOT;
                seqlock_init(&peer->service_conn_lock);
                spin_lock_init(&peer->lock);
        return peer;
 }
 
-/*
- * Queue a peer record.  This passes the caller's ref to the workqueue.
- */
-void __rxrpc_queue_peer_error(struct rxrpc_peer *peer)
-{
-       const void *here = __builtin_return_address(0);
-       int n;
-
-       n = atomic_read(&peer->usage);
-       if (rxrpc_queue_work(&peer->error_distributor))
-               trace_rxrpc_peer(peer, rxrpc_peer_queued_error, n, here);
-       else
-               rxrpc_put_peer(peer);
-}
-
 /*
  * Discard a peer record.
  */