struct rpc_xprt                *xprt_create_transport(struct xprt_create *args);
 void                   xprt_connect(struct rpc_task *task);
 void                   xprt_reserve(struct rpc_task *task);
+void                   xprt_retry_reserve(struct rpc_task *task);
 int                    xprt_reserve_xprt(struct rpc_xprt *xprt, struct rpc_task *task);
 int                    xprt_reserve_xprt_cong(struct rpc_xprt *xprt, struct rpc_task *task);
 void                   xprt_alloc_slot(struct rpc_xprt *xprt, struct rpc_task *task);
 #define XPRT_CLOSING           (6)
 #define XPRT_CONNECTION_ABORT  (7)
 #define XPRT_CONNECTION_CLOSE  (8)
+#define XPRT_CONGESTED         (9)
 
 static inline void xprt_set_connected(struct rpc_xprt *xprt)
 {
 
        xprt_reserve(task);
 }
 
+static void call_retry_reserve(struct rpc_task *task);
+
 /*
  * 1b. Grok the result of xprt_reserve()
  */
        case -ENOMEM:
                rpc_delay(task, HZ >> 2);
        case -EAGAIN:   /* woken up; retry */
-               task->tk_action = call_reserve;
+               task->tk_action = call_retry_reserve;
                return;
        case -EIO:      /* probably a shutdown */
                break;
        rpc_exit(task, status);
 }
 
+/*
+ * 1c. Retry reserving an RPC call slot
+ */
+static void
+call_retry_reserve(struct rpc_task *task)
+{
+       dprint_status(task);
+
+       task->tk_status  = 0;
+       task->tk_action  = call_reserveresult;
+       xprt_retry_reserve(task);
+}
+
 /*
  * 2.  Bind and/or refresh the credentials
  */
 
        spin_unlock_bh(&xprt->transport_lock);
 }
 
+static void xprt_add_backlog(struct rpc_xprt *xprt, struct rpc_task *task)
+{
+       set_bit(XPRT_CONGESTED, &xprt->state);
+       rpc_sleep_on(&xprt->backlog, task, NULL);
+}
+
+static void xprt_wake_up_backlog(struct rpc_xprt *xprt)
+{
+       if (rpc_wake_up_next(&xprt->backlog) == NULL)
+               clear_bit(XPRT_CONGESTED, &xprt->state);
+}
+
+static bool xprt_throttle_congested(struct rpc_xprt *xprt, struct rpc_task *task)
+{
+       bool ret = false;
+
+       if (!test_bit(XPRT_CONGESTED, &xprt->state))
+               goto out;
+       spin_lock(&xprt->reserve_lock);
+       if (test_bit(XPRT_CONGESTED, &xprt->state)) {
+               rpc_sleep_on(&xprt->backlog, task, NULL);
+               ret = true;
+       }
+       spin_unlock(&xprt->reserve_lock);
+out:
+       return ret;
+}
+
 static struct rpc_rqst *xprt_dynamic_alloc_slot(struct rpc_xprt *xprt, gfp_t gfp_flags)
 {
        struct rpc_rqst *req = ERR_PTR(-EAGAIN);
                task->tk_status = -ENOMEM;
                break;
        case -EAGAIN:
-               rpc_sleep_on(&xprt->backlog, task, NULL);
+               xprt_add_backlog(xprt, task);
                dprintk("RPC:       waiting for request slot\n");
        default:
                task->tk_status = -EAGAIN;
                memset(req, 0, sizeof(*req));   /* mark unused */
                list_add(&req->rq_list, &xprt->free);
        }
-       rpc_wake_up_next(&xprt->backlog);
+       xprt_wake_up_backlog(xprt);
        spin_unlock(&xprt->reserve_lock);
 }
 
  * xprt_reserve - allocate an RPC request slot
  * @task: RPC task requesting a slot allocation
  *
- * If no more slots are available, place the task on the transport's
+ * If the transport is marked as being congested, or if no more
+ * slots are available, place the task on the transport's
  * backlog queue.
  */
 void xprt_reserve(struct rpc_task *task)
 {
        struct rpc_xprt *xprt;
 
+       task->tk_status = 0;
+       if (task->tk_rqstp != NULL)
+               return;
+
+       task->tk_timeout = 0;
+       task->tk_status = -EAGAIN;
+       rcu_read_lock();
+       xprt = rcu_dereference(task->tk_client->cl_xprt);
+       if (!xprt_throttle_congested(xprt, task))
+               xprt->ops->alloc_slot(xprt, task);
+       rcu_read_unlock();
+}
+
+/**
+ * xprt_retry_reserve - allocate an RPC request slot
+ * @task: RPC task requesting a slot allocation
+ *
+ * If no more slots are available, place the task on the transport's
+ * backlog queue.
+ * Note that the only difference with xprt_reserve is that we now
+ * ignore the value of the XPRT_CONGESTED flag.
+ */
+void xprt_retry_reserve(struct rpc_task *task)
+{
+       struct rpc_xprt *xprt;
+
        task->tk_status = 0;
        if (task->tk_rqstp != NULL)
                return;