*/
 enum rxrpc_conn_flag {
        RXRPC_CONN_HAS_IDR,             /* Has a client conn ID assigned */
+       RXRPC_CONN_IN_SERVICE_CONNS,    /* Conn is in peer->service_conns */
+       RXRPC_CONN_IN_CLIENT_CONNS,     /* Conn is in local->client_conns */
 };
 
 /*
  */
 extern struct idr rxrpc_client_conn_ids;
 
-void rxrpc_put_client_connection_id(struct rxrpc_connection *);
 void rxrpc_destroy_client_conn_ids(void);
 int rxrpc_connect_call(struct rxrpc_call *, struct rxrpc_conn_parameters *,
                       struct sockaddr_rxrpc *, gfp_t);
+void rxrpc_unpublish_client_conn(struct rxrpc_connection *);
 
 /*
  * conn_event.c
 struct rxrpc_connection *rxrpc_incoming_connection(struct rxrpc_local *,
                                                   struct sockaddr_rxrpc *,
                                                   struct sk_buff *);
+void rxrpc_unpublish_service_conn(struct rxrpc_connection *);
 
 /*
  * input.c
 
 /*
  * Release a connection ID for a client connection from the global pool.
  */
-void rxrpc_put_client_connection_id(struct rxrpc_connection *conn)
+static void rxrpc_put_client_connection_id(struct rxrpc_connection *conn)
 {
        if (test_bit(RXRPC_CONN_HAS_IDR, &conn->flags)) {
                spin_lock(&rxrpc_conn_id_lock);
         * lock before dropping the client conn lock.
         */
        _debug("new conn");
+       set_bit(RXRPC_CONN_IN_CLIENT_CONNS, &candidate->flags);
+       rb_link_node(&candidate->client_node, parent, pp);
+       rb_insert_color(&candidate->client_node, &local->client_conns);
+attached:
        conn = candidate;
        candidate = NULL;
 
-       rb_link_node(&conn->client_node, parent, pp);
-       rb_insert_color(&conn->client_node, &local->client_conns);
-
        atomic_set(&conn->avail_chans, RXRPC_MAXCALLS - 1);
        spin_lock(&conn->channel_lock);
        spin_unlock(&local->client_conns_lock);
        _leave(" = %p {u=%d}", conn, atomic_read(&conn->usage));
        return 0;
 
-       /* We found a suitable connection already in existence.  Discard any
-        * candidate we may have allocated, and try to get a channel on this
-        * one.
+       /* We found a potentially suitable connection already in existence.  If
+        * we can reuse it (ie. its usage count hasn't been reduced to 0 by the
+        * reaper), discard any candidate we may have allocated, and try to get
+        * a channel on this one, otherwise we have to replace it.
         */
 found_extant_conn:
        _debug("found conn");
-       rxrpc_get_connection(conn);
+       if (!rxrpc_get_connection_maybe(conn)) {
+               set_bit(RXRPC_CONN_IN_CLIENT_CONNS, &candidate->flags);
+               rb_replace_node(&conn->client_node,
+                               &candidate->client_node,
+                               &local->client_conns);
+               clear_bit(RXRPC_CONN_IN_CLIENT_CONNS, &conn->flags);
+               goto attached;
+       }
+
        spin_unlock(&local->client_conns_lock);
 
        rxrpc_put_connection(candidate);
        _leave(" = -ERESTARTSYS");
        return -ERESTARTSYS;
 }
+
+/*
+ * Remove a client connection from the local endpoint's tree, thereby removing
+ * it as a target for reuse for new client calls.
+ */
+void rxrpc_unpublish_client_conn(struct rxrpc_connection *conn)
+{
+       struct rxrpc_local *local = conn->params.local;
+
+       spin_lock(&local->client_conns_lock);
+       if (test_and_clear_bit(RXRPC_CONN_IN_CLIENT_CONNS, &conn->flags))
+               rb_erase(&conn->client_node, &local->client_conns);
+       spin_unlock(&local->client_conns_lock);
+
+       rxrpc_put_client_connection_id(conn);
+}
 
                skb_queue_head_init(&conn->rx_queue);
                conn->security = &rxrpc_no_security;
                spin_lock_init(&conn->state_lock);
-               atomic_set(&conn->usage, 1);
+               /* We maintain an extra ref on the connection whilst it is
+                * on the rxrpc_connections list.
+                */
+               atomic_set(&conn->usage, 2);
                conn->debug_id = atomic_inc_return(&rxrpc_debug_id);
                atomic_set(&conn->avail_chans, RXRPC_MAXCALLS);
                conn->size_align = 4;
        return NULL;
 
 found:
-       rxrpc_get_connection(conn);
+       conn = rxrpc_get_connection_maybe(conn);
        read_unlock_bh(&peer->conn_lock);
        _leave(" = %p", conn);
        return conn;
        _enter("%p{u=%d,d=%d}",
               conn, atomic_read(&conn->usage), conn->debug_id);
 
-       ASSERTCMP(atomic_read(&conn->usage), >, 0);
+       ASSERTCMP(atomic_read(&conn->usage), >, 1);
 
        conn->put_time = ktime_get_seconds();
-       if (atomic_dec_and_test(&conn->usage)) {
+       if (atomic_dec_return(&conn->usage) == 1) {
                _debug("zombie");
                rxrpc_queue_delayed_work(&rxrpc_connection_reap, 0);
        }
 static void rxrpc_connection_reaper(struct work_struct *work)
 {
        struct rxrpc_connection *conn, *_p;
-       struct rxrpc_peer *peer;
-       unsigned long now, earliest, reap_time;
+       unsigned long reap_older_than, earliest, put_time, now;
 
        LIST_HEAD(graveyard);
 
        _enter("");
 
        now = ktime_get_seconds();
+       reap_older_than =  now - rxrpc_connection_expiry;
        earliest = ULONG_MAX;
 
        write_lock(&rxrpc_connection_lock);
        list_for_each_entry_safe(conn, _p, &rxrpc_connections, link) {
-               _debug("reap CONN %d { u=%d,t=%ld }",
-                      conn->debug_id, atomic_read(&conn->usage),
-                      (long) now - (long) conn->put_time);
-
-               if (likely(atomic_read(&conn->usage) > 0))
+               ASSERTCMP(atomic_read(&conn->usage), >, 0);
+               if (likely(atomic_read(&conn->usage) > 1))
                        continue;
 
-               if (rxrpc_conn_is_client(conn)) {
-                       struct rxrpc_local *local = conn->params.local;
-                       spin_lock(&local->client_conns_lock);
-                       reap_time = conn->put_time + rxrpc_connection_expiry;
-
-                       if (atomic_read(&conn->usage) > 0) {
-                               ;
-                       } else if (reap_time <= now) {
-                               list_move_tail(&conn->link, &graveyard);
-                               rxrpc_put_client_connection_id(conn);
-                               rb_erase(&conn->client_node,
-                                        &local->client_conns);
-                       } else if (reap_time < earliest) {
-                               earliest = reap_time;
-                       }
-
-                       spin_unlock(&local->client_conns_lock);
-               } else {
-                       peer = conn->params.peer;
-                       write_lock_bh(&peer->conn_lock);
-                       reap_time = conn->put_time + rxrpc_connection_expiry;
-
-                       if (atomic_read(&conn->usage) > 0) {
-                               ;
-                       } else if (reap_time <= now) {
-                               list_move_tail(&conn->link, &graveyard);
-                               rb_erase(&conn->service_node,
-                                        &peer->service_conns);
-                       } else if (reap_time < earliest) {
-                               earliest = reap_time;
-                       }
-
-                       write_unlock_bh(&peer->conn_lock);
+               put_time = READ_ONCE(conn->put_time);
+               if (time_after(put_time, reap_older_than)) {
+                       if (time_before(put_time, earliest))
+                               earliest = put_time;
+                       continue;
                }
+
+               /* The usage count sits at 1 whilst the object is unused on the
+                * list; we reduce that to 0 to make the object unavailable.
+                */
+               if (atomic_cmpxchg(&conn->usage, 1, 0) != 1)
+                       continue;
+
+               if (rxrpc_conn_is_client(conn))
+                       rxrpc_unpublish_client_conn(conn);
+               else
+                       rxrpc_unpublish_service_conn(conn);
+
+               list_move_tail(&conn->link, &graveyard);
        }
        write_unlock(&rxrpc_connection_lock);
 
                                         (earliest - now) * HZ);
        }
 
-       /* then destroy all those pulled out */
        while (!list_empty(&graveyard)) {
                conn = list_entry(graveyard.next, struct rxrpc_connection,
                                  link);
 
        }
 
        /* we can now add the new candidate to the list */
+       set_bit(RXRPC_CONN_IN_SERVICE_CONNS, &candidate->flags);
+       rb_link_node(&candidate->service_node, p, pp);
+       rb_insert_color(&candidate->service_node, &peer->service_conns);
+attached:
        conn = candidate;
        candidate = NULL;
-       rb_link_node(&conn->service_node, p, pp);
-       rb_insert_color(&conn->service_node, &peer->service_conns);
        rxrpc_get_peer(peer);
        rxrpc_get_local(local);
 
 
        /* we found the connection in the list immediately */
 found_extant_connection:
+       if (!rxrpc_get_connection_maybe(conn)) {
+               set_bit(RXRPC_CONN_IN_SERVICE_CONNS, &candidate->flags);
+               rb_replace_node(&conn->service_node,
+                               &candidate->service_node,
+                               &peer->service_conns);
+               clear_bit(RXRPC_CONN_IN_SERVICE_CONNS, &conn->flags);
+               goto attached;
+       }
+
        if (sp->hdr.securityIndex != conn->security_ix) {
                read_unlock_bh(&peer->conn_lock);
-               goto security_mismatch;
+               goto security_mismatch_put;
        }
-       rxrpc_get_connection(conn);
        read_unlock_bh(&peer->conn_lock);
        goto success;
 
        kfree(candidate);
        goto success;
 
+security_mismatch_put:
+       rxrpc_put_connection(conn);
 security_mismatch:
        kfree(candidate);
        _leave(" = -EKEYREJECTED");
        return ERR_PTR(-EKEYREJECTED);
 }
+
+/*
+ * Remove the service connection from the peer's tree, thereby removing it as a
+ * target for incoming packets.
+ */
+void rxrpc_unpublish_service_conn(struct rxrpc_connection *conn)
+{
+       struct rxrpc_peer *peer = conn->params.peer;
+
+       write_lock_bh(&peer->conn_lock);
+       if (test_and_clear_bit(RXRPC_CONN_IN_SERVICE_CONNS, &conn->flags))
+               rb_erase(&conn->service_node, &peer->service_conns);
+       write_unlock_bh(&peer->conn_lock);
+}