]> www.infradead.org Git - nvme.git/commitdiff
libceph: multiple workspaces for CRUSH computations
authorIlya Dryomov <idryomov@gmail.com>
Mon, 17 Aug 2020 11:45:04 +0000 (13:45 +0200)
committerIlya Dryomov <idryomov@gmail.com>
Mon, 12 Oct 2020 13:29:26 +0000 (15:29 +0200)
Replace a global map->crush_workspace (protected by a global mutex)
with a list of workspaces, up to the number of CPUs + 1.

This is based on a patch from Robin Geuze <robing@nl.team.blue>.
Robin and his team have observed a 10-20% increase in IOPS on all
queue depths and lower CPU usage as well on a high-end all-NVMe
100GbE cluster.

Signed-off-by: Ilya Dryomov <idryomov@gmail.com>
include/linux/ceph/osdmap.h
include/linux/crush/crush.h
net/ceph/osdmap.c

index 3f4498fef6ad8b2b1e06a7cb69b089dfdc0a2dda..cad9acfbc3205bfe557df64c3d43766790ccf65b 100644 (file)
@@ -137,6 +137,17 @@ int ceph_oid_aprintf(struct ceph_object_id *oid, gfp_t gfp,
                     const char *fmt, ...);
 void ceph_oid_destroy(struct ceph_object_id *oid);
 
+struct workspace_manager {
+       struct list_head idle_ws;
+       spinlock_t ws_lock;
+       /* Number of free workspaces */
+       int free_ws;
+       /* Total number of allocated workspaces */
+       atomic_t total_ws;
+       /* Waiters for a free workspace */
+       wait_queue_head_t ws_wait;
+};
+
 struct ceph_pg_mapping {
        struct rb_node node;
        struct ceph_pg pgid;
@@ -184,8 +195,7 @@ struct ceph_osdmap {
         * the list of osds that store+replicate them. */
        struct crush_map *crush;
 
-       struct mutex crush_workspace_mutex;
-       void *crush_workspace;
+       struct workspace_manager crush_wsm;
 };
 
 static inline bool ceph_osd_exists(struct ceph_osdmap *map, int osd)
index 2f811baf78d2453ada57f257e41305125040d317..30dba392b7302de3e14861f8b769242cd8cbac9e 100644 (file)
@@ -346,6 +346,9 @@ struct crush_work_bucket {
 
 struct crush_work {
        struct crush_work_bucket **work; /* Per-bucket working store */
+#ifdef __KERNEL__
+       struct list_head item;
+#endif
 };
 
 #ifdef __KERNEL__
index 96c25f5e064aaf698ae3dd832cb08f768f56020b..fa08c15be0c0c0d2b9ced03392fdf3184bf1547b 100644 (file)
@@ -964,6 +964,143 @@ bad:
        return -EINVAL;
 }
 
+/*
+ * CRUSH workspaces
+ *
+ * workspace_manager framework borrowed from fs/btrfs/compression.c.
+ * Two simplifications: there is only one type of workspace and there
+ * is always at least one workspace.
+ */
+static struct crush_work *alloc_workspace(const struct crush_map *c)
+{
+       struct crush_work *work;
+       size_t work_size;
+
+       WARN_ON(!c->working_size);
+       work_size = crush_work_size(c, CEPH_PG_MAX_SIZE);
+       dout("%s work_size %zu bytes\n", __func__, work_size);
+
+       work = ceph_kvmalloc(work_size, GFP_NOIO);
+       if (!work)
+               return NULL;
+
+       INIT_LIST_HEAD(&work->item);
+       crush_init_workspace(c, work);
+       return work;
+}
+
+static void free_workspace(struct crush_work *work)
+{
+       WARN_ON(!list_empty(&work->item));
+       kvfree(work);
+}
+
+static void init_workspace_manager(struct workspace_manager *wsm)
+{
+       INIT_LIST_HEAD(&wsm->idle_ws);
+       spin_lock_init(&wsm->ws_lock);
+       atomic_set(&wsm->total_ws, 0);
+       wsm->free_ws = 0;
+       init_waitqueue_head(&wsm->ws_wait);
+}
+
+static void add_initial_workspace(struct workspace_manager *wsm,
+                                 struct crush_work *work)
+{
+       WARN_ON(!list_empty(&wsm->idle_ws));
+
+       list_add(&work->item, &wsm->idle_ws);
+       atomic_set(&wsm->total_ws, 1);
+       wsm->free_ws = 1;
+}
+
+static void cleanup_workspace_manager(struct workspace_manager *wsm)
+{
+       struct crush_work *work;
+
+       while (!list_empty(&wsm->idle_ws)) {
+               work = list_first_entry(&wsm->idle_ws, struct crush_work,
+                                       item);
+               list_del_init(&work->item);
+               free_workspace(work);
+       }
+       atomic_set(&wsm->total_ws, 0);
+       wsm->free_ws = 0;
+}
+
+/*
+ * Finds an available workspace or allocates a new one.  If it's not
+ * possible to allocate a new one, waits until there is one.
+ */
+static struct crush_work *get_workspace(struct workspace_manager *wsm,
+                                       const struct crush_map *c)
+{
+       struct crush_work *work;
+       int cpus = num_online_cpus();
+
+again:
+       spin_lock(&wsm->ws_lock);
+       if (!list_empty(&wsm->idle_ws)) {
+               work = list_first_entry(&wsm->idle_ws, struct crush_work,
+                                       item);
+               list_del_init(&work->item);
+               wsm->free_ws--;
+               spin_unlock(&wsm->ws_lock);
+               return work;
+
+       }
+       if (atomic_read(&wsm->total_ws) > cpus) {
+               DEFINE_WAIT(wait);
+
+               spin_unlock(&wsm->ws_lock);
+               prepare_to_wait(&wsm->ws_wait, &wait, TASK_UNINTERRUPTIBLE);
+               if (atomic_read(&wsm->total_ws) > cpus && !wsm->free_ws)
+                       schedule();
+               finish_wait(&wsm->ws_wait, &wait);
+               goto again;
+       }
+       atomic_inc(&wsm->total_ws);
+       spin_unlock(&wsm->ws_lock);
+
+       work = alloc_workspace(c);
+       if (!work) {
+               atomic_dec(&wsm->total_ws);
+               wake_up(&wsm->ws_wait);
+
+               /*
+                * Do not return the error but go back to waiting.  We
+                * have the inital workspace and the CRUSH computation
+                * time is bounded so we will get it eventually.
+                */
+               WARN_ON(atomic_read(&wsm->total_ws) < 1);
+               goto again;
+       }
+       return work;
+}
+
+/*
+ * Puts a workspace back on the list or frees it if we have enough
+ * idle ones sitting around.
+ */
+static void put_workspace(struct workspace_manager *wsm,
+                         struct crush_work *work)
+{
+       spin_lock(&wsm->ws_lock);
+       if (wsm->free_ws <= num_online_cpus()) {
+               list_add(&work->item, &wsm->idle_ws);
+               wsm->free_ws++;
+               spin_unlock(&wsm->ws_lock);
+               goto wake;
+       }
+       spin_unlock(&wsm->ws_lock);
+
+       free_workspace(work);
+       atomic_dec(&wsm->total_ws);
+wake:
+       if (wq_has_sleeper(&wsm->ws_wait))
+               wake_up(&wsm->ws_wait);
+}
+
 /*
  * osd map
  */
@@ -981,7 +1118,8 @@ struct ceph_osdmap *ceph_osdmap_alloc(void)
        map->primary_temp = RB_ROOT;
        map->pg_upmap = RB_ROOT;
        map->pg_upmap_items = RB_ROOT;
-       mutex_init(&map->crush_workspace_mutex);
+
+       init_workspace_manager(&map->crush_wsm);
 
        return map;
 }
@@ -989,8 +1127,11 @@ struct ceph_osdmap *ceph_osdmap_alloc(void)
 void ceph_osdmap_destroy(struct ceph_osdmap *map)
 {
        dout("osdmap_destroy %p\n", map);
+
        if (map->crush)
                crush_destroy(map->crush);
+       cleanup_workspace_manager(&map->crush_wsm);
+
        while (!RB_EMPTY_ROOT(&map->pg_temp)) {
                struct ceph_pg_mapping *pg =
                        rb_entry(rb_first(&map->pg_temp),
@@ -1029,7 +1170,6 @@ void ceph_osdmap_destroy(struct ceph_osdmap *map)
        kvfree(map->osd_weight);
        kvfree(map->osd_addr);
        kvfree(map->osd_primary_affinity);
-       kvfree(map->crush_workspace);
        kfree(map);
 }
 
@@ -1104,26 +1244,22 @@ static int osdmap_set_max_osd(struct ceph_osdmap *map, u32 max)
 
 static int osdmap_set_crush(struct ceph_osdmap *map, struct crush_map *crush)
 {
-       void *workspace;
-       size_t work_size;
+       struct crush_work *work;
 
        if (IS_ERR(crush))
                return PTR_ERR(crush);
 
-       work_size = crush_work_size(crush, CEPH_PG_MAX_SIZE);
-       dout("%s work_size %zu bytes\n", __func__, work_size);
-       workspace = ceph_kvmalloc(work_size, GFP_NOIO);
-       if (!workspace) {
+       work = alloc_workspace(crush);
+       if (!work) {
                crush_destroy(crush);
                return -ENOMEM;
        }
-       crush_init_workspace(crush, workspace);
 
        if (map->crush)
                crush_destroy(map->crush);
-       kvfree(map->crush_workspace);
+       cleanup_workspace_manager(&map->crush_wsm);
        map->crush = crush;
-       map->crush_workspace = workspace;
+       add_initial_workspace(&map->crush_wsm, work);
        return 0;
 }
 
@@ -2322,6 +2458,7 @@ static int do_crush(struct ceph_osdmap *map, int ruleno, int x,
                    s64 choose_args_index)
 {
        struct crush_choose_arg_map *arg_map;
+       struct crush_work *work;
        int r;
 
        BUG_ON(result_max > CEPH_PG_MAX_SIZE);
@@ -2332,12 +2469,11 @@ static int do_crush(struct ceph_osdmap *map, int ruleno, int x,
                arg_map = lookup_choose_arg_map(&map->crush->choose_args,
                                                CEPH_DEFAULT_CHOOSE_ARGS);
 
-       mutex_lock(&map->crush_workspace_mutex);
+       work = get_workspace(&map->crush_wsm, map->crush);
        r = crush_do_rule(map->crush, ruleno, x, result, result_max,
-                         weight, weight_max, map->crush_workspace,
+                         weight, weight_max, work,
                          arg_map ? arg_map->args : NULL);
-       mutex_unlock(&map->crush_workspace_mutex);
-
+       put_workspace(&map->crush_wsm, work);
        return r;
 }