rxrpc_recvmsg_full,
        rxrpc_recvmsg_hole,
        rxrpc_recvmsg_next,
+       rxrpc_recvmsg_requeue,
        rxrpc_recvmsg_return,
        rxrpc_recvmsg_terminal,
        rxrpc_recvmsg_to_be_accepted,
        EM(rxrpc_recvmsg_full,                  "FULL") \
        EM(rxrpc_recvmsg_hole,                  "HOLE") \
        EM(rxrpc_recvmsg_next,                  "NEXT") \
+       EM(rxrpc_recvmsg_requeue,               "REQU") \
        EM(rxrpc_recvmsg_return,                "RETN") \
        EM(rxrpc_recvmsg_terminal,              "TERM") \
        EM(rxrpc_recvmsg_to_be_accepted,        "TBAC") \
 
        cp.exclusive            = false;
        cp.service_id           = srx->srx_service;
        call = rxrpc_new_client_call(rx, &cp, srx, user_call_ID, gfp);
+       /* The socket has been unlocked. */
        if (!IS_ERR(call))
                call->notify_rx = notify_rx;
 
-       release_sock(&rx->sk);
+       mutex_unlock(&call->user_mutex);
        _leave(" = %p", call);
        return call;
 }
 void rxrpc_kernel_end_call(struct socket *sock, struct rxrpc_call *call)
 {
        _enter("%d{%d}", call->debug_id, atomic_read(&call->usage));
+
+       mutex_lock(&call->user_mutex);
        rxrpc_release_call(rxrpc_sk(sock->sk), call);
+       mutex_unlock(&call->user_mutex);
        rxrpc_put_call(call, rxrpc_call_put_kernel);
 }
 EXPORT_SYMBOL(rxrpc_kernel_end_call);
        case RXRPC_SERVER_BOUND:
        case RXRPC_SERVER_LISTENING:
                ret = rxrpc_do_sendmsg(rx, m, len);
-               break;
+               /* The socket has been unlocked */
+               goto out;
        default:
                ret = -EINVAL;
-               break;
+               goto error_unlock;
        }
 
 error_unlock:
        release_sock(&rx->sk);
+out:
        _leave(" = %d", ret);
        return ret;
 }
 
        struct rxrpc_connection *conn;          /* connection carrying call */
        struct rxrpc_peer       *peer;          /* Peer record for remote address */
        struct rxrpc_sock __rcu *socket;        /* socket responsible */
+       struct mutex            user_mutex;     /* User access mutex */
        ktime_t                 ack_at;         /* When deferred ACK needs to happen */
        ktime_t                 resend_at;      /* When next resend needs to happen */
        ktime_t                 ping_at;        /* When next to send a ping */
 
  *
  * If we want to report an error, we mark the skb with the packet type and
  * abort code and return NULL.
+ *
+ * The call is returned with the user access mutex held.
  */
 struct rxrpc_call *rxrpc_new_incoming_call(struct rxrpc_local *local,
                                           struct rxrpc_connection *conn,
        trace_rxrpc_receive(call, rxrpc_receive_incoming,
                            sp->hdr.serial, sp->hdr.seq);
 
+       /* Lock the call to prevent rxrpc_kernel_send/recv_data() and
+        * sendmsg()/recvmsg() inconveniently stealing the mutex once the
+        * notification is generated.
+        *
+        * The BUG should never happen because the kernel should be well
+        * behaved enough not to access the call before the first notification
+        * event and userspace is prevented from doing so until the state is
+        * appropriate.
+        */
+       if (!mutex_trylock(&call->user_mutex))
+               BUG();
+
        /* Make the call live. */
        rxrpc_incoming_call(rx, call, skb);
        conn = call->conn;
 /*
  * handle acceptance of a call by userspace
  * - assign the user call ID to the call at the front of the queue
+ * - called with the socket locked.
  */
 struct rxrpc_call *rxrpc_accept_call(struct rxrpc_sock *rx,
                                     unsigned long user_call_ID,
                                     rxrpc_notify_rx_t notify_rx)
+       __releases(&rx->sk.sk_lock.slock)
 {
        struct rxrpc_call *call;
        struct rb_node *parent, **pp;
 
        if (list_empty(&rx->to_be_accepted)) {
                write_unlock(&rx->call_lock);
+               release_sock(&rx->sk);
                kleave(" = -ENODATA [empty]");
                return ERR_PTR(-ENODATA);
        }
         */
        call = list_entry(rx->to_be_accepted.next,
                          struct rxrpc_call, accept_link);
+       write_unlock(&rx->call_lock);
+
+       /* We need to gain the mutex from the interrupt handler without
+        * upsetting lockdep, so we have to release it there and take it here.
+        * We are, however, still holding the socket lock, so other accepts
+        * must wait for us and no one can add the user ID behind our backs.
+        */
+       if (mutex_lock_interruptible(&call->user_mutex) < 0) {
+               release_sock(&rx->sk);
+               kleave(" = -ERESTARTSYS");
+               return ERR_PTR(-ERESTARTSYS);
+       }
+
+       write_lock(&rx->call_lock);
        list_del_init(&call->accept_link);
        sk_acceptq_removed(&rx->sk);
        rxrpc_see_call(call);
 
+       /* Find the user ID insertion point. */
+       pp = &rx->calls.rb_node;
+       parent = NULL;
+       while (*pp) {
+               parent = *pp;
+               call = rb_entry(parent, struct rxrpc_call, sock_node);
+
+               if (user_call_ID < call->user_call_ID)
+                       pp = &(*pp)->rb_left;
+               else if (user_call_ID > call->user_call_ID)
+                       pp = &(*pp)->rb_right;
+               else
+                       BUG();
+       }
+
        write_lock_bh(&call->state_lock);
        switch (call->state) {
        case RXRPC_CALL_SERVER_ACCEPTING:
        write_unlock(&rx->call_lock);
        rxrpc_notify_socket(call);
        rxrpc_service_prealloc(rx, GFP_KERNEL);
+       release_sock(&rx->sk);
        _leave(" = %p{%d}", call, call->debug_id);
        return call;
 
        write_unlock(&rx->call_lock);
 out:
        rxrpc_service_prealloc(rx, GFP_KERNEL);
+       release_sock(&rx->sk);
        _leave(" = %d", ret);
        return ERR_PTR(ret);
 }
 
        if (!call->rxtx_annotations)
                goto nomem_2;
 
+       mutex_init(&call->user_mutex);
        setup_timer(&call->timer, rxrpc_call_timer_expired,
                    (unsigned long)call);
        INIT_WORK(&call->processor, &rxrpc_process_call);
 }
 
 /*
- * set up a call for the given data
- * - called in process context with IRQs enabled
+ * Set up a call for the given parameters.
+ * - Called with the socket lock held, which it must release.
+ * - If it returns a call, the call's lock will need releasing by the caller.
  */
 struct rxrpc_call *rxrpc_new_client_call(struct rxrpc_sock *rx,
                                         struct rxrpc_conn_parameters *cp,
                                         struct sockaddr_rxrpc *srx,
                                         unsigned long user_call_ID,
                                         gfp_t gfp)
+       __releases(&rx->sk.sk_lock.slock)
 {
        struct rxrpc_call *call, *xcall;
        struct rb_node *parent, **pp;
 
        call = rxrpc_alloc_client_call(srx, gfp);
        if (IS_ERR(call)) {
+               release_sock(&rx->sk);
                _leave(" = %ld", PTR_ERR(call));
                return call;
        }
        trace_rxrpc_call(call, rxrpc_call_new_client, atomic_read(&call->usage),
                         here, (const void *)user_call_ID);
 
+       /* We need to protect a partially set up call against the user as we
+        * will be acting outside the socket lock.
+        */
+       mutex_lock(&call->user_mutex);
+
        /* Publish the call, even though it is incompletely set up as yet */
        write_lock(&rx->call_lock);
 
        list_add_tail(&call->link, &rxrpc_calls);
        write_unlock(&rxrpc_call_lock);
 
+       /* From this point on, the call is protected by its own lock. */
+       release_sock(&rx->sk);
+
        /* Set up or get a connection record and set the protocol parameters,
         * including channel number and call ID.
         */
         */
 error_dup_user_ID:
        write_unlock(&rx->call_lock);
+       release_sock(&rx->sk);
        ret = -EEXIST;
 
 error:
        trace_rxrpc_call(call, rxrpc_call_error, atomic_read(&call->usage),
                         here, ERR_PTR(ret));
        rxrpc_release_call(rx, call);
+       mutex_unlock(&call->user_mutex);
        rxrpc_put_call(call, rxrpc_call_put);
        _leave(" = %d", ret);
        return ERR_PTR(ret);
 
                        goto reject_packet;
                }
                rxrpc_send_ping(call, skb, skew);
+               mutex_unlock(&call->user_mutex);
        }
 
        rxrpc_input_call_packet(call, skb, skew);
 
 
        trace_rxrpc_recvmsg(call, rxrpc_recvmsg_dequeue, 0, 0, 0, 0);
 
+       /* We're going to drop the socket lock, so we need to lock the call
+        * against interference by sendmsg.
+        */
+       if (!mutex_trylock(&call->user_mutex)) {
+               ret = -EWOULDBLOCK;
+               if (flags & MSG_DONTWAIT)
+                       goto error_requeue_call;
+               ret = -ERESTARTSYS;
+               if (mutex_lock_interruptible(&call->user_mutex) < 0)
+                       goto error_requeue_call;
+       }
+
+       release_sock(&rx->sk);
+
        if (test_bit(RXRPC_CALL_RELEASED, &call->flags))
                BUG();
 
                                       &call->user_call_ID);
                }
                if (ret < 0)
-                       goto error;
+                       goto error_unlock_call;
        }
 
        if (msg->msg_name) {
        }
 
        if (ret < 0)
-               goto error;
+               goto error_unlock_call;
 
        if (call->state == RXRPC_CALL_COMPLETE) {
                ret = rxrpc_recvmsg_term(call, msg);
                if (ret < 0)
-                       goto error;
+                       goto error_unlock_call;
                if (!(flags & MSG_PEEK))
                        rxrpc_release_call(rx, call);
                msg->msg_flags |= MSG_EOR;
                msg->msg_flags &= ~MSG_MORE;
        ret = copied;
 
-error:
+error_unlock_call:
+       mutex_unlock(&call->user_mutex);
        rxrpc_put_call(call, rxrpc_call_put);
+       trace_rxrpc_recvmsg(call, rxrpc_recvmsg_return, 0, 0, 0, ret);
+       return ret;
+
+error_requeue_call:
+       if (!(flags & MSG_PEEK)) {
+               write_lock_bh(&rx->recvmsg_lock);
+               list_add(&call->recvmsg_link, &rx->recvmsg_q);
+               write_unlock_bh(&rx->recvmsg_lock);
+               trace_rxrpc_recvmsg(call, rxrpc_recvmsg_requeue, 0, 0, 0, 0);
+       } else {
+               rxrpc_put_call(call, rxrpc_call_put);
+       }
 error_no_call:
        release_sock(&rx->sk);
        trace_rxrpc_recvmsg(call, rxrpc_recvmsg_return, 0, 0, 0, ret);
        iov.iov_len = size - *_offset;
        iov_iter_kvec(&iter, ITER_KVEC | READ, &iov, 1, size - *_offset);
 
-       lock_sock(sock->sk);
+       mutex_lock(&call->user_mutex);
 
        switch (call->state) {
        case RXRPC_CALL_CLIENT_RECV_REPLY:
 read_phase_complete:
        ret = 1;
 out:
-       release_sock(sock->sk);
+       mutex_unlock(&call->user_mutex);
        _leave(" = %d [%zu,%d]", ret, *_offset, *_abort);
        return ret;
 
 
                }
 
                trace_rxrpc_transmit(call, rxrpc_transmit_wait);
-               release_sock(&rx->sk);
+               mutex_unlock(&call->user_mutex);
                *timeo = schedule_timeout(*timeo);
-               lock_sock(&rx->sk);
+               if (mutex_lock_interruptible(&call->user_mutex) < 0) {
+                       ret = sock_intr_errno(*timeo);
+                       break;
+               }
        }
 
        remove_wait_queue(&call->waitq, &myself);
 /*
  * send data through a socket
  * - must be called in process context
- * - caller holds the socket locked
+ * - The caller holds the call user access mutex, but not the socket lock.
  */
 static int rxrpc_send_data(struct rxrpc_sock *rx,
                           struct rxrpc_call *call,
 
 /*
  * Create a new client call for sendmsg().
+ * - Called with the socket lock held, which it must release.
+ * - If it returns a call, the call's lock will need releasing by the caller.
  */
 static struct rxrpc_call *
 rxrpc_new_client_call_for_sendmsg(struct rxrpc_sock *rx, struct msghdr *msg,
                                  unsigned long user_call_ID, bool exclusive)
+       __releases(&rx->sk.sk_lock.slock)
 {
        struct rxrpc_conn_parameters cp;
        struct rxrpc_call *call;
 
        _enter("");
 
-       if (!msg->msg_name)
+       if (!msg->msg_name) {
+               release_sock(&rx->sk);
                return ERR_PTR(-EDESTADDRREQ);
+       }
 
        key = rx->key;
        if (key && !rx->key->payload.data[0])
        cp.exclusive            = rx->exclusive | exclusive;
        cp.service_id           = srx->srx_service;
        call = rxrpc_new_client_call(rx, &cp, srx, user_call_ID, GFP_KERNEL);
+       /* The socket is now unlocked */
 
        _leave(" = %p\n", call);
        return call;
  * - the socket may be either a client socket or a server socket
  */
 int rxrpc_do_sendmsg(struct rxrpc_sock *rx, struct msghdr *msg, size_t len)
+       __releases(&rx->sk.sk_lock.slock)
 {
        enum rxrpc_command cmd;
        struct rxrpc_call *call;
        ret = rxrpc_sendmsg_cmsg(msg, &user_call_ID, &cmd, &abort_code,
                                 &exclusive);
        if (ret < 0)
-               return ret;
+               goto error_release_sock;
 
        if (cmd == RXRPC_CMD_ACCEPT) {
+               ret = -EINVAL;
                if (rx->sk.sk_state != RXRPC_SERVER_LISTENING)
-                       return -EINVAL;
+                       goto error_release_sock;
                call = rxrpc_accept_call(rx, user_call_ID, NULL);
+               /* The socket is now unlocked. */
                if (IS_ERR(call))
                        return PTR_ERR(call);
                rxrpc_put_call(call, rxrpc_call_put);
 
        call = rxrpc_find_call_by_user_ID(rx, user_call_ID);
        if (!call) {
+               ret = -EBADSLT;
                if (cmd != RXRPC_CMD_SEND_DATA)
-                       return -EBADSLT;
+                       goto error_release_sock;
+               ret = -EBUSY;
+               if (call->state == RXRPC_CALL_UNINITIALISED ||
+                   call->state == RXRPC_CALL_CLIENT_AWAIT_CONN ||
+                   call->state == RXRPC_CALL_SERVER_PREALLOC ||
+                   call->state == RXRPC_CALL_SERVER_SECURING ||
+                   call->state == RXRPC_CALL_SERVER_ACCEPTING)
+                       goto error_release_sock;
                call = rxrpc_new_client_call_for_sendmsg(rx, msg, user_call_ID,
                                                         exclusive);
+               /* The socket is now unlocked... */
                if (IS_ERR(call))
                        return PTR_ERR(call);
+               /* ... and we have the call lock. */
+       } else {
+               ret = mutex_lock_interruptible(&call->user_mutex);
+               release_sock(&rx->sk);
+               if (ret < 0) {
+                       ret = -ERESTARTSYS;
+                       goto error_put;
+               }
        }
 
        _debug("CALL %d USR %lx ST %d on CONN %p",
                ret = rxrpc_send_data(rx, call, msg, len);
        }
 
+       mutex_unlock(&call->user_mutex);
+error_put:
        rxrpc_put_call(call, rxrpc_call_put);
        _leave(" = %d", ret);
        return ret;
+
+error_release_sock:
+       release_sock(&rx->sk);
+       return ret;
 }
 
 /**
        ASSERTCMP(msg->msg_name, ==, NULL);
        ASSERTCMP(msg->msg_control, ==, NULL);
 
-       lock_sock(sock->sk);
+       mutex_lock(&call->user_mutex);
 
        _debug("CALL %d USR %lx ST %d on CONN %p",
               call->debug_id, call->user_call_ID, call->state, call->conn);
                ret = rxrpc_send_data(rxrpc_sk(sock->sk), call, msg, len);
        }
 
-       release_sock(sock->sk);
+       mutex_unlock(&call->user_mutex);
        _leave(" = %d", ret);
        return ret;
 }
 {
        _enter("{%d},%d,%d,%s", call->debug_id, abort_code, error, why);
 
-       lock_sock(sock->sk);
+       mutex_lock(&call->user_mutex);
 
        if (rxrpc_abort_call(why, call, 0, abort_code, error))
                rxrpc_send_abort_packet(call);
 
-       release_sock(sock->sk);
+       mutex_unlock(&call->user_mutex);
        _leave("");
 }