#define OSD_OPREPLY_FRONT_LEN  512
 
 const static struct ceph_connection_operations osd_con_ops;
+static int __kick_requests(struct ceph_osd_client *osdc,
+                         struct ceph_osd *kickosd);
 
 static void kick_requests(struct ceph_osd_client *osdc, struct ceph_osd *osd);
 
        osd->o_con.ops = &osd_con_ops;
        osd->o_con.peer_name.type = CEPH_ENTITY_TYPE_OSD;
 
+       INIT_LIST_HEAD(&osd->o_keepalive_item);
        return osd;
 }
 
        return NULL;
 }
 
+static void __schedule_osd_timeout(struct ceph_osd_client *osdc)
+{
+       schedule_delayed_work(&osdc->timeout_work,
+                       osdc->client->mount_args->osd_keepalive_timeout * HZ);
+}
+
+static void __cancel_osd_timeout(struct ceph_osd_client *osdc)
+{
+       cancel_delayed_work(&osdc->timeout_work);
+}
 
 /*
  * Register request, assign tid.  If this is the first request, set up
        mutex_lock(&osdc->request_mutex);
        req->r_tid = ++osdc->last_tid;
        req->r_request->hdr.tid = cpu_to_le64(req->r_tid);
+       INIT_LIST_HEAD(&req->r_req_lru_item);
 
        dout("register_request %p tid %lld\n", req, req->r_tid);
        __insert_request(osdc, req);
        ceph_osdc_get_request(req);
        osdc->num_requests++;
 
-       req->r_timeout_stamp =
-               jiffies + osdc->client->mount_args->osd_timeout*HZ;
-
        if (osdc->num_requests == 1) {
-               osdc->timeout_tid = req->r_tid;
-               dout("  timeout on tid %llu at %lu\n", req->r_tid,
-                    req->r_timeout_stamp);
-               schedule_delayed_work(&osdc->timeout_work,
-                     round_jiffies_relative(req->r_timeout_stamp - jiffies));
+               dout(" first request, scheduling timeout\n");
+               __schedule_osd_timeout(osdc);
        }
        mutex_unlock(&osdc->request_mutex);
 }
 
        ceph_osdc_put_request(req);
 
-       if (req->r_tid == osdc->timeout_tid) {
-               if (osdc->num_requests == 0) {
-                       dout("no requests, canceling timeout\n");
-                       osdc->timeout_tid = 0;
-                       cancel_delayed_work(&osdc->timeout_work);
-               } else {
-                       req = rb_entry(rb_first(&osdc->requests),
-                                      struct ceph_osd_request, r_node);
-                       osdc->timeout_tid = req->r_tid;
-                       dout("rescheduled timeout on tid %llu at %lu\n",
-                            req->r_tid, req->r_timeout_stamp);
-                       schedule_delayed_work(&osdc->timeout_work,
-                             round_jiffies_relative(req->r_timeout_stamp -
-                                                    jiffies));
-               }
+       list_del_init(&req->r_req_lru_item);
+       if (osdc->num_requests == 0) {
+               dout(" no requests, canceling timeout\n");
+               __cancel_osd_timeout(osdc);
        }
 }
 
                ceph_con_revoke(&req->r_osd->o_con, req->r_request);
                req->r_sent = 0;
        }
+       list_del_init(&req->r_req_lru_item);
 }
 
 /*
        reqhead->flags |= cpu_to_le32(req->r_flags);  /* e.g., RETRY */
        reqhead->reassert_version = req->r_reassert_version;
 
-       req->r_timeout_stamp = jiffies+osdc->client->mount_args->osd_timeout*HZ;
+       req->r_sent_stamp = jiffies;
+       list_move_tail(&osdc->req_lru, &req->r_req_lru_item);
 
        ceph_msg_get(req->r_request); /* send consumes a ref */
        ceph_con_send(&req->r_osd->o_con, req->r_request);
 {
        struct ceph_osd_client *osdc =
                container_of(work, struct ceph_osd_client, timeout_work.work);
-       struct ceph_osd_request *req;
+       struct ceph_osd_request *req, *last_req = NULL;
        struct ceph_osd *osd;
        unsigned long timeout = osdc->client->mount_args->osd_timeout * HZ;
-       unsigned long next_timeout = timeout + jiffies;
+       unsigned long keepalive =
+               osdc->client->mount_args->osd_keepalive_timeout * HZ;
+       unsigned long last_sent = 0;
        struct rb_node *p;
+       struct list_head slow_osds;
 
        dout("timeout\n");
        down_read(&osdc->map_sem);
                        continue;
                }
        }
-       for (p = rb_first(&osdc->osds); p; p = rb_next(p)) {
-               osd = rb_entry(p, struct ceph_osd, o_node);
-               if (list_empty(&osd->o_requests))
-                       continue;
-               req = list_first_entry(&osd->o_requests,
-                                      struct ceph_osd_request, r_osd_item);
-               if (time_before(jiffies, req->r_timeout_stamp))
-                       continue;
 
-               dout(" tid %llu (at least) timed out on osd%d\n",
+       /*
+        * reset osds that appear to be _really_ unresponsive.  this
+        * is a failsafe measure.. we really shouldn't be getting to
+        * this point if the system is working properly.  the monitors
+        * should mark the osd as failed and we should find out about
+        * it from an updated osd map.
+        */
+       while (!list_empty(&osdc->req_lru)) {
+               req = list_entry(osdc->req_lru.next, struct ceph_osd_request,
+                                r_req_lru_item);
+
+               if (time_before(jiffies, req->r_sent_stamp + timeout))
+                       break;
+
+               BUG_ON(req == last_req && req->r_sent_stamp == last_sent);
+               last_req = req;
+               last_sent = req->r_sent_stamp;
+
+               osd = req->r_osd;
+               BUG_ON(!osd);
+               pr_warning(" tid %llu timed out on osd%d, will reset osd\n",
+                          req->r_tid, osd->o_osd);
+               __kick_requests(osdc, osd);
+       }
+
+       /*
+        * ping osds that are a bit slow.  this ensures that if there
+        * is a break in the TCP connection we will notice, and reopen
+        * a connection with that osd (from the fault callback).
+        */
+       INIT_LIST_HEAD(&slow_osds);
+       list_for_each_entry(req, &osdc->req_lru, r_req_lru_item) {
+               if (time_before(jiffies, req->r_sent_stamp + keepalive))
+                       break;
+
+               osd = req->r_osd;
+               BUG_ON(!osd);
+               dout(" tid %llu is slow, will send keepalive on osd%d\n",
                     req->r_tid, osd->o_osd);
-               req->r_timeout_stamp = next_timeout;
+               list_move_tail(&osd->o_keepalive_item, &slow_osds);
+       }
+       while (!list_empty(&slow_osds)) {
+               osd = list_entry(slow_osds.next, struct ceph_osd,
+                                o_keepalive_item);
+               list_del_init(&osd->o_keepalive_item);
                ceph_con_keepalive(&osd->o_con);
        }
 
-       if (osdc->timeout_tid)
-               schedule_delayed_work(&osdc->timeout_work,
-                                     round_jiffies_relative(timeout));
-
+       __schedule_osd_timeout(osdc);
        mutex_unlock(&osdc->request_mutex);
 
        up_read(&osdc->map_sem);
 }
 
 
-/*
- * Resubmit osd requests whose osd or osd address has changed.  Request
- * a new osd map if osds are down, or we are otherwise unable to determine
- * how to direct a request.
- *
- * Close connections to down osds.
- *
- * If @who is specified, resubmit requests for that specific osd.
- *
- * Caller should hold map_sem for read and request_mutex.
- */
-static void kick_requests(struct ceph_osd_client *osdc,
+static int __kick_requests(struct ceph_osd_client *osdc,
                          struct ceph_osd *kickosd)
 {
        struct ceph_osd_request *req;
        int err;
 
        dout("kick_requests osd%d\n", kickosd ? kickosd->o_osd : -1);
-       mutex_lock(&osdc->request_mutex);
        if (kickosd) {
                __reset_osd(osdc, kickosd);
        } else {
                        req->r_resend = true;
                }
        }
+
+       return needmap;
+}
+
+/*
+ * Resubmit osd requests whose osd or osd address has changed.  Request
+ * a new osd map if osds are down, or we are otherwise unable to determine
+ * how to direct a request.
+ *
+ * Close connections to down osds.
+ *
+ * If @who is specified, resubmit requests for that specific osd.
+ *
+ * Caller should hold map_sem for read and request_mutex.
+ */
+static void kick_requests(struct ceph_osd_client *osdc,
+                         struct ceph_osd *kickosd)
+{
+       int needmap;
+
+       mutex_lock(&osdc->request_mutex);
+       needmap = __kick_requests(osdc, kickosd);
        mutex_unlock(&osdc->request_mutex);
 
        if (needmap) {
                dout("%d requests for down osds, need new map\n", needmap);
                ceph_monc_request_next_osdmap(&osdc->client->monc);
        }
-}
 
+}
 /*
  * Process updated osd map.
  *
        init_completion(&osdc->map_waiters);
        osdc->last_requested_map = 0;
        mutex_init(&osdc->request_mutex);
-       osdc->timeout_tid = 0;
        osdc->last_tid = 0;
        osdc->osds = RB_ROOT;
        INIT_LIST_HEAD(&osdc->osd_lru);
        osdc->requests = RB_ROOT;
+       INIT_LIST_HEAD(&osdc->req_lru);
        osdc->num_requests = 0;
        INIT_DELAYED_WORK(&osdc->timeout_work, handle_timeout);
        INIT_DELAYED_WORK(&osdc->osds_timeout_work, handle_osds_timeout);