*     - allocation and destruction of environment is amortized by caching no
  *     longer used environments instead of destroying them;
  *
- *     - there is a notion of "current" environment, attached to the kernel
- *     data structure representing current thread Top-level lustre code
- *     allocates an environment and makes it current, then calls into
- *     non-lustre code, that in turn calls lustre back. Low-level lustre
- *     code thus called can fetch environment created by the top-level code
- *     and reuse it, avoiding additional environment allocation.
- *       Right now, three interfaces can attach the cl_env to running thread:
- *       - cl_env_get
- *       - cl_env_implant
- *       - cl_env_reexit(cl_env_reenter had to be called priorly)
- *
  * \see lu_env, lu_context, lu_context_key
  * @{
  */
 
-struct cl_env_nest {
-       int   cen_refcheck;
-       void *cen_cookie;
-};
-
 struct lu_env *cl_env_get(int *refcheck);
 struct lu_env *cl_env_alloc(int *refcheck, __u32 tags);
-struct lu_env *cl_env_nested_get(struct cl_env_nest *nest);
 void cl_env_put(struct lu_env *env, int *refcheck);
-void cl_env_nested_put(struct cl_env_nest *nest, struct lu_env *env);
-void *cl_env_reenter(void);
-void cl_env_reexit(void *cookie);
-void cl_env_implant(struct lu_env *env, int *refcheck);
-void cl_env_unplant(struct lu_env *env, int *refcheck);
 unsigned int cl_env_cache_purge(unsigned int nr);
 struct lu_env *cl_env_percpu_get(void);
 void cl_env_percpu_put(struct lu_env *env);
 
        int nr_ns;
        struct ldlm_namespace *ns;
        struct ldlm_namespace *ns_old = NULL; /* loop detection */
-       void *cookie;
 
        if (client == LDLM_NAMESPACE_CLIENT && !(gfp_mask & __GFP_FS))
                return 0;
        CDEBUG(D_DLMTRACE, "Request to count %s locks from all pools\n",
               client == LDLM_NAMESPACE_CLIENT ? "client" : "server");
 
-       cookie = cl_env_reenter();
-
        /*
         * Find out how many resources we may release.
         */
                mutex_lock(ldlm_namespace_lock(client));
                if (list_empty(ldlm_namespace_list(client))) {
                        mutex_unlock(ldlm_namespace_lock(client));
-                       cl_env_reexit(cookie);
                        return 0;
                }
                ns = ldlm_namespace_first_locked(client);
                ldlm_namespace_put(ns);
        }
 
-       cl_env_reexit(cookie);
        return total;
 }
 
        unsigned long freed = 0;
        int tmp, nr_ns;
        struct ldlm_namespace *ns;
-       void *cookie;
 
        if (client == LDLM_NAMESPACE_CLIENT && !(gfp_mask & __GFP_FS))
                return -1;
 
-       cookie = cl_env_reenter();
-
        /*
         * Shrink at least ldlm_namespace_nr_read(client) namespaces.
         */
                freed += ldlm_pool_shrink(&ns->ns_pool, cancel, gfp_mask);
                ldlm_namespace_put(ns);
        }
-       cl_env_reexit(cookie);
        /*
         * we only decrease the SLV in server pools shrinker, return
         * SHRINK_STOP to kernel to avoid needless loop. LU-1128
 
  */
 int ll_hsm_release(struct inode *inode)
 {
-       struct cl_env_nest nest;
        struct lu_env *env;
        struct obd_client_handle *och = NULL;
        __u64 data_version = 0;
        int rc;
+       int refcheck;
 
        CDEBUG(D_INODE, "%s: Releasing file "DFID".\n",
               ll_get_fsname(inode->i_sb, NULL, 0),
        if (rc != 0)
                goto out;
 
-       env = cl_env_nested_get(&nest);
+       env = cl_env_get(&refcheck);
        if (IS_ERR(env)) {
                rc = PTR_ERR(env);
                goto out;
        }
 
        ll_merge_attr(env, inode);
-       cl_env_nested_put(&nest, env);
+       cl_env_put(env, &refcheck);
 
        /* Release the file.
         * NB: lease lock handle is released in mdc_hsm_release_pack() because
 int cl_sync_file_range(struct inode *inode, loff_t start, loff_t end,
                       enum cl_fsync_mode mode, int ignore_layout)
 {
-       struct cl_env_nest nest;
        struct lu_env *env;
        struct cl_io *io;
        struct cl_fsync_io *fio;
        int result;
+       int refcheck;
 
        if (mode != CL_FSYNC_NONE && mode != CL_FSYNC_LOCAL &&
            mode != CL_FSYNC_DISCARD && mode != CL_FSYNC_ALL)
                return -EINVAL;
 
-       env = cl_env_nested_get(&nest);
+       env = cl_env_get(&refcheck);
        if (IS_ERR(env))
                return PTR_ERR(env);
 
        if (result == 0)
                result = fio->fi_nr_written;
        cl_io_fini(env, io);
-       cl_env_nested_put(&nest, env);
+       cl_env_put(env, &refcheck);
 
        return result;
 }
 {
        struct ll_inode_info *lli = ll_i2info(inode);
        struct cl_object *obj = lli->lli_clob;
-       struct cl_env_nest nest;
        struct lu_env *env;
        int rc;
+       int refcheck;
 
        if (!obj)
                return 0;
 
-       env = cl_env_nested_get(&nest);
+       env = cl_env_get(&refcheck);
        if (IS_ERR(env))
                return PTR_ERR(env);
 
                ll_layout_version_set(lli, cl.cl_layout_gen);
        }
 out:
-       cl_env_nested_put(&nest, env);
+       cl_env_put(env, &refcheck);
        return rc;
 }
 
 
        int emergency;
 
        if (clob) {
-               void                *cookie;
-
-               cookie = cl_env_reenter();
                env = cl_env_get(&refcheck);
                emergency = IS_ERR(env);
                if (emergency) {
                        mutex_lock(&cl_inode_fini_guard);
                        LASSERT(cl_inode_fini_env);
-                       cl_env_implant(cl_inode_fini_env, &refcheck);
                        env = cl_inode_fini_env;
                }
                /*
                lu_object_ref_del(&clob->co_lu, "inode", inode);
                cl_object_put_last(env, clob);
                lli->lli_clob = NULL;
-               if (emergency) {
-                       cl_env_unplant(cl_inode_fini_env, &refcheck);
+               if (emergency)
                        mutex_unlock(&cl_inode_fini_guard);
-               } else {
+               else
                        cl_env_put(env, &refcheck);
-               }
-               cl_env_reexit(cookie);
        }
 }
 
 
                return rc;
        }
 
-       cg->lg_env  = cl_env_get(&refcheck);
+       cg->lg_env  = env;
        cg->lg_io   = io;
        cg->lg_lock = lock;
        cg->lg_gid  = gid;
-       LASSERT(cg->lg_env == env);
 
-       cl_env_unplant(env, &refcheck);
        return 0;
 }
 
        struct lu_env  *env  = cg->lg_env;
        struct cl_io   *io   = cg->lg_io;
        struct cl_lock *lock = cg->lg_lock;
-       int          refcheck;
 
        LASSERT(cg->lg_env);
        LASSERT(cg->lg_gid);
 
-       cl_env_implant(env, &refcheck);
-       cl_env_put(env, &refcheck);
-
        cl_lock_release(env, lock);
        cl_io_fini(env, io);
        cl_env_put(env, NULL);
 
  * API independent part for page fault initialization.
  * \param vma - virtual memory area addressed to page fault
  * \param env - corespondent lu_env to processing
- * \param nest - nested level
  * \param index - page index corespondent to fault.
  * \parm ra_flags - vma readahead flags.
  *
- * \return allocated and initialized env for fault operation.
- * \retval EINVAL if env can't allocated
- * \return other error codes from cl_io_init.
+ * \return error codes from cl_io_init.
  */
 static struct cl_io *
-ll_fault_io_init(struct vm_area_struct *vma, struct lu_env **env_ret,
-                struct cl_env_nest *nest, pgoff_t index,
-                unsigned long *ra_flags)
+ll_fault_io_init(struct lu_env *env, struct vm_area_struct *vma,
+                pgoff_t index, unsigned long *ra_flags)
 {
        struct file            *file = vma->vm_file;
        struct inode           *inode = file_inode(file);
        struct cl_io           *io;
        struct cl_fault_io     *fio;
-       struct lu_env          *env;
        int                     rc;
 
-       *env_ret = NULL;
        if (ll_file_nolock(file))
                return ERR_PTR(-EOPNOTSUPP);
 
-       /*
-        * page fault can be called when lustre IO is
-        * already active for the current thread, e.g., when doing read/write
-        * against user level buffer mapped from Lustre buffer. To avoid
-        * stomping on existing context, optionally force an allocation of a new
-        * one.
-        */
-       env = cl_env_nested_get(nest);
-       if (IS_ERR(env))
-               return ERR_PTR(-EINVAL);
-
-       *env_ret = env;
-
 restart:
        io = vvp_env_thread_io(env);
        io->ci_obj = ll_i2info(inode)->lli_clob;
                if (io->ci_need_restart)
                        goto restart;
 
-               cl_env_nested_put(nest, env);
                io = ERR_PTR(rc);
        }
 
        struct lu_env      *env;
        struct cl_io        *io;
        struct vvp_io      *vio;
-       struct cl_env_nest       nest;
        int                   result;
+       int refcheck;
        sigset_t             set;
        struct inode         *inode;
        struct ll_inode_info     *lli;
 
-       io = ll_fault_io_init(vma, &env,  &nest, vmpage->index, NULL);
+       env = cl_env_get(&refcheck);
+       if (IS_ERR(env))
+               return PTR_ERR(env);
+
+       io = ll_fault_io_init(env, vma, vmpage->index, NULL);
        if (IS_ERR(io)) {
                result = PTR_ERR(io);
                goto out;
 
 out_io:
        cl_io_fini(env, io);
-       cl_env_nested_put(&nest, env);
 out:
+       cl_env_put(env, &refcheck);
        CDEBUG(D_MMAP, "%s mkwrite with %d\n", current->comm, result);
        LASSERT(ergo(result == 0, PageLocked(vmpage)));
 
        struct vvp_io      *vio = NULL;
        struct page          *vmpage;
        unsigned long       ra_flags;
-       struct cl_env_nest       nest;
-       int                   result;
+       int                   result = 0;
        int                   fault_ret = 0;
+       int refcheck;
 
-       io = ll_fault_io_init(vma, &env,  &nest, vmf->pgoff, &ra_flags);
-       if (IS_ERR(io))
-               return to_fault_error(PTR_ERR(io));
+       env = cl_env_get(&refcheck);
+       if (IS_ERR(env))
+               return PTR_ERR(env);
+
+       io = ll_fault_io_init(env, vma, vmf->pgoff, &ra_flags);
+       if (IS_ERR(io)) {
+               result = to_fault_error(PTR_ERR(io));
+               goto out;
+       }
 
        result = io->ci_result;
        if (result == 0) {
                }
        }
        cl_io_fini(env, io);
-       cl_env_nested_put(&nest, env);
 
        vma->vm_flags |= ra_flags;
+
+out:
+       cl_env_put(env, &refcheck);
        if (result != 0 && !(fault_ret & VM_FAULT_RETRY))
                fault_ret |= to_fault_error(result);
 
-       CDEBUG(D_MMAP, "%s fault %d/%d\n",
-              current->comm, fault_ret, result);
+       CDEBUG(D_MMAP, "%s fault %d/%d\n", current->comm, fault_ret, result);
        return fault_ret;
 }
 
 
        struct cl_io       *io;
        struct cl_page   *page;
        struct cl_object       *clob;
-       struct cl_env_nest      nest;
        bool redirtied = false;
        bool unlocked = false;
        int result;
+       int refcheck;
 
        LASSERT(PageLocked(vmpage));
        LASSERT(!PageWriteback(vmpage));
 
        LASSERT(ll_i2dtexp(inode));
 
-       env = cl_env_nested_get(&nest);
+       env = cl_env_get(&refcheck);
        if (IS_ERR(env)) {
                result = PTR_ERR(env);
                goto out;
                }
        }
 
-       cl_env_nested_put(&nest, env);
+       cl_env_put(env, &refcheck);
        goto out;
 
 out:
 
 static int ll_releasepage(struct page *vmpage, gfp_t gfp_mask)
 {
        struct lu_env     *env;
-       void                    *cookie;
        struct cl_object  *obj;
        struct cl_page    *page;
        struct address_space *mapping;
        if (!page)
                return 1;
 
-       cookie = cl_env_reenter();
        env = cl_env_percpu_get();
        LASSERT(!IS_ERR(env));
 
        cl_page_put(env, page);
 
        cl_env_percpu_put(env);
-       cl_env_reexit(cookie);
        return result;
 }
 
                       PAGE_SIZE) & ~(DT_MAX_BRW_SIZE - 1))
 static ssize_t ll_direct_IO_26(struct kiocb *iocb, struct iov_iter *iter)
 {
-       struct lu_env *env;
+       struct ll_cl_context *lcc;
+       const struct lu_env *env;
        struct cl_io *io;
        struct file *file = iocb->ki_filp;
        struct inode *inode = file->f_mapping->host;
        ssize_t count = iov_iter_count(iter);
        ssize_t tot_bytes = 0, result = 0;
        long size = MAX_DIO_SIZE;
-       int refcheck;
 
        /* FIXME: io smaller than PAGE_SIZE is broken on ia64 ??? */
        if ((file_offset & ~PAGE_MASK) || (count & ~PAGE_MASK))
        if (iov_iter_alignment(iter) & ~PAGE_MASK)
                return -EINVAL;
 
-       env = cl_env_get(&refcheck);
+       lcc = ll_cl_find(file);
+       if (!lcc)
+               return -EIO;
+
+       env = lcc->lcc_env;
        LASSERT(!IS_ERR(env));
-       io = vvp_env_io(env)->vui_cl.cis_io;
+       io = lcc->lcc_io;
        LASSERT(io);
 
        while (iov_iter_count(iter)) {
                vio->u.write.vui_written += tot_bytes;
        }
 
-       cl_env_put(env, &refcheck);
        return tot_bytes ? tot_bytes : result;
 }
 
 
                sub->sub_env = ld->ld_emrg[stripe]->emrg_env;
                sub->sub_borrowed = 1;
        } else {
-               void *cookie;
-
-               /* obtain new environment */
-               cookie = cl_env_reenter();
                sub->sub_env = cl_env_get(&sub->sub_refcheck);
-               cl_env_reexit(cookie);
                if (IS_ERR(sub->sub_env))
                        result = PTR_ERR(sub->sub_env);
 
 
        union lov_layout_state *state = &lov->u;
        const struct lov_layout_operations *old_ops;
        const struct lov_layout_operations *new_ops;
-       void *cookie;
        struct lu_env *env;
        int refcheck;
        int rc;
 
        LASSERT(0 <= lov->lo_type && lov->lo_type < ARRAY_SIZE(lov_dispatch));
 
-       cookie = cl_env_reenter();
        env = cl_env_get(&refcheck);
-       if (IS_ERR(env)) {
-               cl_env_reexit(cookie);
+       if (IS_ERR(env))
                return PTR_ERR(env);
-       }
 
        LASSERT(0 <= llt && llt < ARRAY_SIZE(lov_dispatch));
 
        lov->lo_type = llt;
 out:
        cl_env_put(env, &refcheck);
-       cl_env_reexit(cookie);
        return rc;
 }
 
 
        CNL_NR
 };
 
-/**
- * Counters used to check correctness of cl_lock interface usage.
- */
-struct cl_thread_counters {
-       /**
-        * Number of outstanding calls to cl_lock_mutex_get() made by the
-        * current thread. For debugging.
-        */
-       int        ctc_nr_locks_locked;
-       /** List of locked locks. */
-       struct lu_ref ctc_locks_locked;
-       /** Number of outstanding holds on locks. */
-       int        ctc_nr_held;
-       /** Number of outstanding uses on locks. */
-       int        ctc_nr_used;
-       /** Number of held extent locks. */
-       int        ctc_nr_locks_acquired;
-};
-
 /**
  * Thread local state internal for generic cl-code.
  */
         */
        struct cl_lock_descr clt_descr;
        struct cl_page_list  clt_list;
-       /**
-        * Counters for every level of lock nesting.
-        */
-       struct cl_thread_counters clt_counters[CNL_NR];
        /** @} debugging */
 
        /*
 
                        scan->cis_iop->op[io->ci_type].cio_unlock(env, scan);
        }
        io->ci_state = CIS_UNLOCKED;
-       LASSERT(!cl_env_info(env)->clt_counters[CNL_TOP].ctc_nr_locks_acquired);
 }
 EXPORT_SYMBOL(cl_io_unlock);
 
 
  * bz20044, bz22683.
  */
 
-static LIST_HEAD(cl_envs);
-static unsigned int cl_envs_cached_nr;
-static unsigned int cl_envs_cached_max = 128; /* XXX: prototype: arbitrary limit
-                                              * for now.
-                                              */
-static DEFINE_SPINLOCK(cl_envs_guard);
+static unsigned int cl_envs_cached_max = 32; /* XXX: prototype: arbitrary limit
+                                             * for now.
+                                             */
+static struct cl_env_cache {
+       rwlock_t                cec_guard;
+       unsigned int            cec_count;
+       struct list_head        cec_envs;
+} *cl_envs = NULL;
 
 struct cl_env {
        void         *ce_magic;
        struct lu_env     ce_lu;
        struct lu_context ce_ses;
 
-       /**
-        * This allows cl_env to be entered into cl_env_hash which implements
-        * the current thread -> client environment lookup.
-        */
-       struct hlist_node  ce_node;
-       /**
-        * Owner for the current cl_env.
-        *
-        * If LL_TASK_CL_ENV is defined, this point to the owning current,
-        * only for debugging purpose ;
-        * Otherwise hash is used, and this is the key for cfs_hash.
-        * Now current thread pid is stored. Note using thread pointer would
-        * lead to unbalanced hash because of its specific allocation locality
-        * and could be varied for different platforms and OSes, even different
-        * OS versions.
-        */
-       void         *ce_owner;
-
        /*
         * Linkage into global list of all client environments. Used for
         * garbage collection.
 {
        LASSERT(cle->ce_ref == 0);
        LASSERT(cle->ce_magic == &cl_env_init0);
-       LASSERT(!cle->ce_debug && !cle->ce_owner);
+       LASSERT(!cle->ce_debug);
 
        cle->ce_ref = 1;
        cle->ce_debug = debug;
        CL_ENV_INC(busy);
 }
 
-/*
- * The implementation of using hash table to connect cl_env and thread
- */
-
-static struct cfs_hash *cl_env_hash;
-
-static unsigned cl_env_hops_hash(struct cfs_hash *lh,
-                                const void *key, unsigned mask)
-{
-#if BITS_PER_LONG == 64
-       return cfs_hash_u64_hash((__u64)key, mask);
-#else
-       return cfs_hash_u32_hash((__u32)key, mask);
-#endif
-}
-
-static void *cl_env_hops_obj(struct hlist_node *hn)
-{
-       struct cl_env *cle = hlist_entry(hn, struct cl_env, ce_node);
-
-       LASSERT(cle->ce_magic == &cl_env_init0);
-       return (void *)cle;
-}
-
-static int cl_env_hops_keycmp(const void *key, struct hlist_node *hn)
-{
-       struct cl_env *cle = cl_env_hops_obj(hn);
-
-       LASSERT(cle->ce_owner);
-       return (key == cle->ce_owner);
-}
-
-static void cl_env_hops_noop(struct cfs_hash *hs, struct hlist_node *hn)
-{
-       struct cl_env *cle = hlist_entry(hn, struct cl_env, ce_node);
-
-       LASSERT(cle->ce_magic == &cl_env_init0);
-}
-
-static struct cfs_hash_ops cl_env_hops = {
-       .hs_hash        = cl_env_hops_hash,
-       .hs_key         = cl_env_hops_obj,
-       .hs_keycmp      = cl_env_hops_keycmp,
-       .hs_object      = cl_env_hops_obj,
-       .hs_get         = cl_env_hops_noop,
-       .hs_put_locked  = cl_env_hops_noop,
-};
-
-static inline struct cl_env *cl_env_fetch(void)
-{
-       struct cl_env *cle;
-
-       cle = cfs_hash_lookup(cl_env_hash, (void *)(long)current->pid);
-       LASSERT(ergo(cle, cle->ce_magic == &cl_env_init0));
-       return cle;
-}
-
-static inline void cl_env_attach(struct cl_env *cle)
-{
-       if (cle) {
-               int rc;
-
-               LASSERT(!cle->ce_owner);
-               cle->ce_owner = (void *)(long)current->pid;
-               rc = cfs_hash_add_unique(cl_env_hash, cle->ce_owner,
-                                        &cle->ce_node);
-               LASSERT(rc == 0);
-       }
-}
-
-static inline void cl_env_do_detach(struct cl_env *cle)
-{
-       void *cookie;
-
-       LASSERT(cle->ce_owner == (void *)(long)current->pid);
-       cookie = cfs_hash_del(cl_env_hash, cle->ce_owner,
-                             &cle->ce_node);
-       LASSERT(cookie == cle);
-       cle->ce_owner = NULL;
-}
-
-static int cl_env_store_init(void)
-{
-       cl_env_hash = cfs_hash_create("cl_env",
-                                     HASH_CL_ENV_BITS, HASH_CL_ENV_BITS,
-                                     HASH_CL_ENV_BKT_BITS, 0,
-                                     CFS_HASH_MIN_THETA,
-                                     CFS_HASH_MAX_THETA,
-                                     &cl_env_hops,
-                                     CFS_HASH_RW_BKTLOCK);
-       return cl_env_hash ? 0 : -ENOMEM;
-}
-
-static void cl_env_store_fini(void)
-{
-       cfs_hash_putref(cl_env_hash);
-}
-
-static inline struct cl_env *cl_env_detach(struct cl_env *cle)
-{
-       if (!cle)
-               cle = cl_env_fetch();
-
-       if (cle && cle->ce_owner)
-               cl_env_do_detach(cle);
-
-       return cle;
-}
-
 static struct lu_env *cl_env_new(__u32 ctx_tags, __u32 ses_tags, void *debug)
 {
        struct lu_env *env;
 {
        struct cl_env *cle;
        struct lu_env *env;
+       int cpu = get_cpu();
 
-       spin_lock(&cl_envs_guard);
-       LASSERT(equi(cl_envs_cached_nr == 0, list_empty(&cl_envs)));
-       if (cl_envs_cached_nr > 0) {
+       read_lock(&cl_envs[cpu].cec_guard);
+       LASSERT(equi(cl_envs[cpu].cec_count == 0,
+                    list_empty(&cl_envs[cpu].cec_envs)));
+       if (cl_envs[cpu].cec_count > 0) {
                int rc;
 
-               cle = container_of(cl_envs.next, struct cl_env, ce_linkage);
+               cle = container_of(cl_envs[cpu].cec_envs.next, struct cl_env,
+                                  ce_linkage);
                list_del_init(&cle->ce_linkage);
-               cl_envs_cached_nr--;
-               spin_unlock(&cl_envs_guard);
+               cl_envs[cpu].cec_count--;
+               read_unlock(&cl_envs[cpu].cec_guard);
+               put_cpu();
 
                env = &cle->ce_lu;
                rc = lu_env_refill(env);
                        env = ERR_PTR(rc);
                }
        } else {
-               spin_unlock(&cl_envs_guard);
+               read_unlock(&cl_envs[cpu].cec_guard);
+               put_cpu();
                env = cl_env_new(lu_context_tags_default,
                                 lu_session_tags_default, debug);
        }
        return container_of(env, struct cl_env, ce_lu);
 }
 
-static struct lu_env *cl_env_peek(int *refcheck)
-{
-       struct lu_env *env;
-       struct cl_env *cle;
-
-       CL_ENV_INC(lookup);
-
-       /* check that we don't go far from untrusted pointer */
-       CLASSERT(offsetof(struct cl_env, ce_magic) == 0);
-
-       env = NULL;
-       cle = cl_env_fetch();
-       if (cle) {
-               CL_ENV_INC(hit);
-               env = &cle->ce_lu;
-               *refcheck = ++cle->ce_ref;
-       }
-       CDEBUG(D_OTHER, "%d@%p\n", cle ? cle->ce_ref : 0, cle);
-       return env;
-}
-
 /**
  * Returns lu_env: if there already is an environment associated with the
  * current thread, it is returned, otherwise, new environment is allocated.
 {
        struct lu_env *env;
 
-       env = cl_env_peek(refcheck);
-       if (!env) {
-               env = cl_env_obtain(__builtin_return_address(0));
-               if (!IS_ERR(env)) {
-                       struct cl_env *cle;
+       env = cl_env_obtain(__builtin_return_address(0));
+       if (!IS_ERR(env)) {
+               struct cl_env *cle;
 
-                       cle = cl_env_container(env);
-                       cl_env_attach(cle);
-                       *refcheck = cle->ce_ref;
-                       CDEBUG(D_OTHER, "%d@%p\n", cle->ce_ref, cle);
-               }
+               cle = cl_env_container(env);
+               *refcheck = cle->ce_ref;
+               CDEBUG(D_OTHER, "%d@%p\n", cle->ce_ref, cle);
        }
        return env;
 }
 {
        struct lu_env *env;
 
-       LASSERT(!cl_env_peek(refcheck));
        env = cl_env_new(tags, tags, __builtin_return_address(0));
        if (!IS_ERR(env)) {
                struct cl_env *cle;
 
 static void cl_env_exit(struct cl_env *cle)
 {
-       LASSERT(!cle->ce_owner);
        lu_context_exit(&cle->ce_lu.le_ctx);
        lu_context_exit(&cle->ce_ses);
 }
 unsigned int cl_env_cache_purge(unsigned int nr)
 {
        struct cl_env *cle;
+       unsigned int i;
 
-       spin_lock(&cl_envs_guard);
-       for (; !list_empty(&cl_envs) && nr > 0; --nr) {
-               cle = container_of(cl_envs.next, struct cl_env, ce_linkage);
-               list_del_init(&cle->ce_linkage);
-               LASSERT(cl_envs_cached_nr > 0);
-               cl_envs_cached_nr--;
-               spin_unlock(&cl_envs_guard);
+       for_each_possible_cpu(i) {
+               write_lock(&cl_envs[i].cec_guard);
+               for (; !list_empty(&cl_envs[i].cec_envs) && nr > 0; --nr) {
+                       cle = container_of(cl_envs[i].cec_envs.next,
+                                          struct cl_env, ce_linkage);
+                       list_del_init(&cle->ce_linkage);
+                       LASSERT(cl_envs[i].cec_count > 0);
+                       cl_envs[i].cec_count--;
+                       write_unlock(&cl_envs[i].cec_guard);
 
-               cl_env_fini(cle);
-               spin_lock(&cl_envs_guard);
+                       cl_env_fini(cle);
+                       write_lock(&cl_envs[i].cec_guard);
+               }
+               LASSERT(equi(cl_envs[i].cec_count == 0,
+                            list_empty(&cl_envs[i].cec_envs)));
+               write_unlock(&cl_envs[i].cec_guard);
        }
-       LASSERT(equi(cl_envs_cached_nr == 0, list_empty(&cl_envs)));
-       spin_unlock(&cl_envs_guard);
        return nr;
 }
 EXPORT_SYMBOL(cl_env_cache_purge);
 
        CDEBUG(D_OTHER, "%d@%p\n", cle->ce_ref, cle);
        if (--cle->ce_ref == 0) {
+               int cpu = get_cpu();
+
                CL_ENV_DEC(busy);
-               cl_env_detach(cle);
                cle->ce_debug = NULL;
                cl_env_exit(cle);
                /*
                 * Return environment to the cache only when it was allocated
                 * with the standard tags.
                 */
-               if (cl_envs_cached_nr < cl_envs_cached_max &&
+               if (cl_envs[cpu].cec_count < cl_envs_cached_max &&
                    (env->le_ctx.lc_tags & ~LCT_HAS_EXIT) == LCT_CL_THREAD &&
                    (env->le_ses->lc_tags & ~LCT_HAS_EXIT) == LCT_SESSION) {
-                       spin_lock(&cl_envs_guard);
-                       list_add(&cle->ce_linkage, &cl_envs);
-                       cl_envs_cached_nr++;
-                       spin_unlock(&cl_envs_guard);
+                       read_lock(&cl_envs[cpu].cec_guard);
+                       list_add(&cle->ce_linkage, &cl_envs[cpu].cec_envs);
+                       cl_envs[cpu].cec_count++;
+                       read_unlock(&cl_envs[cpu].cec_guard);
                } else {
                        cl_env_fini(cle);
                }
+               put_cpu();
        }
 }
 EXPORT_SYMBOL(cl_env_put);
 
-/**
- * Declares a point of re-entrancy.
- *
- * \see cl_env_reexit()
- */
-void *cl_env_reenter(void)
-{
-       return cl_env_detach(NULL);
-}
-EXPORT_SYMBOL(cl_env_reenter);
-
-/**
- * Exits re-entrancy.
- */
-void cl_env_reexit(void *cookie)
-{
-       cl_env_detach(NULL);
-       cl_env_attach(cookie);
-}
-EXPORT_SYMBOL(cl_env_reexit);
-
-/**
- * Setup user-supplied \a env as a current environment. This is to be used to
- * guaranteed that environment exists even when cl_env_get() fails. It is up
- * to user to ensure proper concurrency control.
- *
- * \see cl_env_unplant()
- */
-void cl_env_implant(struct lu_env *env, int *refcheck)
-{
-       struct cl_env *cle = cl_env_container(env);
-
-       LASSERT(cle->ce_ref > 0);
-
-       cl_env_attach(cle);
-       cl_env_get(refcheck);
-       CDEBUG(D_OTHER, "%d@%p\n", cle->ce_ref, cle);
-}
-EXPORT_SYMBOL(cl_env_implant);
-
-/**
- * Detach environment installed earlier by cl_env_implant().
- */
-void cl_env_unplant(struct lu_env *env, int *refcheck)
-{
-       struct cl_env *cle = cl_env_container(env);
-
-       LASSERT(cle->ce_ref > 1);
-
-       CDEBUG(D_OTHER, "%d@%p\n", cle->ce_ref, cle);
-
-       cl_env_detach(cle);
-       cl_env_put(env, refcheck);
-}
-EXPORT_SYMBOL(cl_env_unplant);
-
-struct lu_env *cl_env_nested_get(struct cl_env_nest *nest)
-{
-       struct lu_env *env;
-
-       nest->cen_cookie = NULL;
-       env = cl_env_peek(&nest->cen_refcheck);
-       if (env) {
-               if (!cl_io_is_going(env))
-                       return env;
-               cl_env_put(env, &nest->cen_refcheck);
-               nest->cen_cookie = cl_env_reenter();
-       }
-       env = cl_env_get(&nest->cen_refcheck);
-       if (IS_ERR(env)) {
-               cl_env_reexit(nest->cen_cookie);
-               return env;
-       }
-
-       LASSERT(!cl_io_is_going(env));
-       return env;
-}
-EXPORT_SYMBOL(cl_env_nested_get);
-
-void cl_env_nested_put(struct cl_env_nest *nest, struct lu_env *env)
-{
-       cl_env_put(env, &nest->cen_refcheck);
-       cl_env_reexit(nest->cen_cookie);
-}
-EXPORT_SYMBOL(cl_env_nested_put);
-
 /**
  * Converts struct ost_lvb to struct cl_attr.
  *
        for_each_possible_cpu(i) {
                struct lu_env *env;
 
+               rwlock_init(&cl_envs[i].cec_guard);
+               INIT_LIST_HEAD(&cl_envs[i].cec_envs);
+               cl_envs[i].cec_count = 0;
+
                cle = &cl_env_percpu[i];
                env = &cle->ce_lu;
 
        LASSERT(cle->ce_ref == 0);
 
        CL_ENV_DEC(busy);
-       cl_env_detach(cle);
        cle->ce_debug = NULL;
 
        put_cpu();
        cle = &cl_env_percpu[get_cpu()];
        cl_env_init0(cle, __builtin_return_address(0));
 
-       cl_env_attach(cle);
        return &cle->ce_lu;
 }
 EXPORT_SYMBOL(cl_env_percpu_get);
 static void *cl_key_init(const struct lu_context *ctx,
                         struct lu_context_key *key)
 {
-       struct cl_thread_info *info;
-
-       info = cl0_key_init(ctx, key);
-       if (!IS_ERR(info)) {
-               size_t i;
-
-               for (i = 0; i < ARRAY_SIZE(info->clt_counters); ++i)
-                       lu_ref_init(&info->clt_counters[i].ctc_locks_locked);
-       }
-       return info;
+       return cl0_key_init(ctx, key);
 }
 
 static void cl_key_fini(const struct lu_context *ctx,
                        struct lu_context_key *key, void *data)
 {
-       struct cl_thread_info *info;
-       size_t i;
-
-       info = data;
-       for (i = 0; i < ARRAY_SIZE(info->clt_counters); ++i)
-               lu_ref_fini(&info->clt_counters[i].ctc_locks_locked);
        cl0_key_fini(ctx, key, data);
 }
 
-static void cl_key_exit(const struct lu_context *ctx,
-                       struct lu_context_key *key, void *data)
-{
-       struct cl_thread_info *info = data;
-       size_t i;
-
-       for (i = 0; i < ARRAY_SIZE(info->clt_counters); ++i) {
-               LASSERT(info->clt_counters[i].ctc_nr_held == 0);
-               LASSERT(info->clt_counters[i].ctc_nr_used == 0);
-               LASSERT(info->clt_counters[i].ctc_nr_locks_acquired == 0);
-               LASSERT(info->clt_counters[i].ctc_nr_locks_locked == 0);
-               lu_ref_fini(&info->clt_counters[i].ctc_locks_locked);
-               lu_ref_init(&info->clt_counters[i].ctc_locks_locked);
-       }
-}
-
 static struct lu_context_key cl_key = {
        .lct_tags = LCT_CL_THREAD,
        .lct_init = cl_key_init,
        .lct_fini = cl_key_fini,
-       .lct_exit = cl_key_exit
 };
 
 static struct lu_kmem_descr cl_object_caches[] = {
 {
        int result;
 
-       result = cl_env_store_init();
-       if (result)
-               return result;
+       cl_envs = kzalloc(sizeof(*cl_envs) * num_possible_cpus(), GFP_KERNEL);
+       if (!cl_envs) {
+               result = -ENOMEM;
+               goto out;
+       }
 
        result = lu_kmem_init(cl_object_caches);
        if (result)
-               goto out_store;
+               goto out_envs;
 
        LU_CONTEXT_KEY_INIT(&cl_key);
        result = lu_context_key_register(&cl_key);
        result = cl_env_percpu_init();
        if (result)
                /* no cl_env_percpu_fini on error */
-               goto out_context;
+               goto out_keys;
 
        return 0;
 
-out_context:
+out_keys:
        lu_context_key_degister(&cl_key);
 out_kmem:
        lu_kmem_fini(cl_object_caches);
-out_store:
-       cl_env_store_fini();
+out_envs:
+       kfree(cl_envs);
+out:
        return result;
 }
 
        cl_env_percpu_fini();
        lu_context_key_degister(&cl_key);
        lu_kmem_fini(cl_object_caches);
-       cl_env_store_fini();
+       kfree(cl_envs);
 }
 
        struct lu_context *ctx;
 
        if (!(key->lct_tags & LCT_QUIESCENT)) {
-               /*
-                * XXX layering violation.
-                */
-               cl_env_cache_purge(~0);
                /*
                 * XXX memory barrier has to go here.
                 */
 
        kmem_cache_free(echo_thread_kmem, info);
 }
 
-static void echo_thread_key_exit(const struct lu_context *ctx,
-                                struct lu_context_key *key, void *data)
-{
-}
-
 static struct lu_context_key echo_thread_key = {
        .lct_tags = LCT_CL_THREAD,
        .lct_init = echo_thread_key_init,
        .lct_fini = echo_thread_key_fini,
-       .lct_exit = echo_thread_key_exit
 };
 
 static void *echo_session_key_init(const struct lu_context *ctx,
        kmem_cache_free(echo_session_kmem, session);
 }
 
-static void echo_session_key_exit(const struct lu_context *ctx,
-                                 struct lu_context_key *key, void *data)
-{
-}
-
 static struct lu_context_key echo_session_key = {
        .lct_tags = LCT_SESSION,
        .lct_init = echo_session_key_init,
        .lct_fini = echo_session_key_fini,
-       .lct_exit = echo_session_key_exit
 };
 
 LU_TYPE_INIT_FINI(echo, &echo_thread_key, &echo_session_key);
        cl_device_fini(&ed->ed_cl);
        kfree(ed);
 
+       cl_env_cache_purge(~0);
+
        return NULL;
 }
 
 
 static int osc_extent_truncate(struct osc_extent *ext, pgoff_t trunc_index,
                               bool partial)
 {
-       struct cl_env_nest nest;
        struct lu_env *env;
        struct cl_io *io;
        struct osc_object *obj = ext->oe_obj;
        int grants = 0;
        int nr_pages = 0;
        int rc = 0;
+       int refcheck;
 
        LASSERT(sanity_check(ext) == 0);
        EASSERT(ext->oe_state == OES_TRUNC, ext);
         * We can't use that env from osc_cache_truncate_start() because
         * it's from lov_io_sub and not fully initialized.
         */
-       env = cl_env_nested_get(&nest);
+       env = cl_env_get(&refcheck);
        io  = &osc_env_info(env)->oti_io;
        io->ci_obj = cl_object_top(osc2cl(obj));
        rc = cl_io_init(env, io, CIT_MISC, io->ci_obj);
 
 out:
        cl_io_fini(env, io);
-       cl_env_nested_put(&nest, env);
+       cl_env_put(env, &refcheck);
        return rc;
 }
 
 
        struct osc_lock *oscl = cookie;
        struct cl_lock_slice *slice = &oscl->ols_cl;
        struct lu_env *env;
-       struct cl_env_nest nest;
        int rc;
+       int refcheck;
 
-       env = cl_env_nested_get(&nest);
+       env = cl_env_get(&refcheck);
        /* should never happen, similar to osc_ldlm_blocking_ast(). */
        LASSERT(!IS_ERR(env));
 
 
        if (oscl->ols_owner)
                cl_sync_io_note(env, oscl->ols_owner, rc);
-       cl_env_nested_put(&nest, env);
+       cl_env_put(env, &refcheck);
 
        return rc;
 }
        struct osc_object *osc = cookie;
        struct ldlm_lock *dlmlock;
        struct lu_env *env;
-       struct cl_env_nest nest;
+       int refcheck;
 
-       env = cl_env_nested_get(&nest);
+       env = cl_env_get(&refcheck);
        LASSERT(!IS_ERR(env));
 
        if (errcode == ELDLM_LOCK_MATCHED) {
 
 out:
        cl_object_put(env, osc2cl(osc));
-       cl_env_nested_put(&nest, env);
+       cl_env_put(env, &refcheck);
        return ldlm_error2errno(errcode);
 }
 
                          enum cl_lock_mode mode, int discard)
 {
        struct lu_env *env;
-       struct cl_env_nest nest;
+       int refcheck;
        int rc = 0;
        int rc2 = 0;
 
-       env = cl_env_nested_get(&nest);
+       env = cl_env_get(&refcheck);
        if (IS_ERR(env))
                return PTR_ERR(env);
 
        if (rc == 0 && rc2 < 0)
                rc = rc2;
 
-       cl_env_nested_put(&nest, env);
+       cl_env_put(env, &refcheck);
        return rc;
 }
 
        }
        case LDLM_CB_CANCELING: {
                struct lu_env *env;
-               struct cl_env_nest nest;
+               int refcheck;
 
                /*
                 * This can be called in the context of outer IO, e.g.,
                 * new environment has to be created to not corrupt outer
                 * context.
                 */
-               env = cl_env_nested_get(&nest);
+               env = cl_env_get(&refcheck);
                if (IS_ERR(env)) {
                        result = PTR_ERR(env);
                        break;
                }
 
                result = osc_dlm_blocking_ast0(env, dlmlock, data, flag);
-               cl_env_nested_put(&nest, env);
+               cl_env_put(env, &refcheck);
                break;
                }
        default:
 static int osc_ldlm_glimpse_ast(struct ldlm_lock *dlmlock, void *data)
 {
        struct ptlrpc_request *req = data;
-       struct cl_env_nest nest;
        struct lu_env *env;
        struct ost_lvb *lvb;
        struct req_capsule *cap;
+       struct cl_object *obj = NULL;
        int result;
+       int refcheck;
 
        LASSERT(lustre_msg_get_opc(req->rq_reqmsg) == LDLM_GL_CALLBACK);
 
-       env = cl_env_nested_get(&nest);
-       if (!IS_ERR(env)) {
-               struct cl_object *obj = NULL;
+       env = cl_env_get(&refcheck);
+       if (IS_ERR(env)) {
+               result = PTR_ERR(env);
+               goto out;
+       }
 
-               lock_res_and_lock(dlmlock);
-               if (dlmlock->l_ast_data) {
-                       obj = osc2cl(dlmlock->l_ast_data);
-                       cl_object_get(obj);
-               }
-               unlock_res_and_lock(dlmlock);
+       lock_res_and_lock(dlmlock);
+       if (dlmlock->l_ast_data) {
+               obj = osc2cl(dlmlock->l_ast_data);
+               cl_object_get(obj);
+       }
+       unlock_res_and_lock(dlmlock);
 
-               if (obj) {
-                       /* Do not grab the mutex of cl_lock for glimpse.
-                        * See LU-1274 for details.
-                        * BTW, it's okay for cl_lock to be cancelled during
-                        * this period because server can handle this race.
-                        * See ldlm_server_glimpse_ast() for details.
-                        * cl_lock_mutex_get(env, lock);
-                        */
-                       cap = &req->rq_pill;
-                       req_capsule_extend(cap, &RQF_LDLM_GL_CALLBACK);
-                       req_capsule_set_size(cap, &RMF_DLM_LVB, RCL_SERVER,
-                                            sizeof(*lvb));
-                       result = req_capsule_server_pack(cap);
-                       if (result == 0) {
-                               lvb = req_capsule_server_get(cap, &RMF_DLM_LVB);
-                               result = cl_object_glimpse(env, obj, lvb);
-                       }
-                       if (!exp_connect_lvb_type(req->rq_export))
-                               req_capsule_shrink(&req->rq_pill,
-                                                  &RMF_DLM_LVB,
-                                                  sizeof(struct ost_lvb_v1),
-                                                  RCL_SERVER);
-                       cl_object_put(env, obj);
-               } else {
-                       /*
-                        * These errors are normal races, so we don't want to
-                        * fill the console with messages by calling
-                        * ptlrpc_error()
-                        */
-                       lustre_pack_reply(req, 1, NULL, NULL);
-                       result = -ELDLM_NO_LOCK_DATA;
+       if (obj) {
+               /* Do not grab the mutex of cl_lock for glimpse.
+                * See LU-1274 for details.
+                * BTW, it's okay for cl_lock to be cancelled during
+                * this period because server can handle this race.
+                * See ldlm_server_glimpse_ast() for details.
+                * cl_lock_mutex_get(env, lock);
+                */
+               cap = &req->rq_pill;
+               req_capsule_extend(cap, &RQF_LDLM_GL_CALLBACK);
+               req_capsule_set_size(cap, &RMF_DLM_LVB, RCL_SERVER,
+                                    sizeof(*lvb));
+               result = req_capsule_server_pack(cap);
+               if (result == 0) {
+                       lvb = req_capsule_server_get(cap, &RMF_DLM_LVB);
+                       result = cl_object_glimpse(env, obj, lvb);
                }
-               cl_env_nested_put(&nest, env);
+               if (!exp_connect_lvb_type(req->rq_export))
+               req_capsule_shrink(&req->rq_pill, &RMF_DLM_LVB,
+                                  sizeof(struct ost_lvb_v1), RCL_SERVER);
+               cl_object_put(env, obj);
        } else {
-               result = PTR_ERR(env);
+               /*
+                * These errors are normal races, so we don't want to
+                * fill the console with messages by calling
+                * ptlrpc_error()
+                */
+               lustre_pack_reply(req, 1, NULL, NULL);
+               result = -ELDLM_NO_LOCK_DATA;
        }
+       cl_env_put(env, &refcheck);
+
+out:
        req->rq_status = result;
        return result;
 }
  */
 unsigned long osc_ldlm_weigh_ast(struct ldlm_lock *dlmlock)
 {
-       struct cl_env_nest       nest;
        struct lu_env           *env;
        struct osc_object       *obj;
        struct osc_lock         *oscl;
        unsigned long            weight;
        bool                     found = false;
+       int refcheck;
 
        might_sleep();
        /*
         * the upper context because cl_lock_put don't modify environment
         * variables. But just in case ..
         */
-       env = cl_env_nested_get(&nest);
+       env = cl_env_get(&refcheck);
        if (IS_ERR(env))
                /* Mostly because lack of memory, do not eliminate this lock */
                return 1;
        weight = osc_lock_weight(env, obj, &dlmlock->l_policy_data.l_extent);
 
 out:
-       cl_env_nested_put(&nest, env);
+       cl_env_put(env, &refcheck);
        return weight;
 }
 
 
 
 long osc_lru_reclaim(struct client_obd *cli)
 {
-       struct cl_env_nest nest;
        struct lu_env *env;
        struct cl_client_cache *cache = cli->cl_cache;
        int max_scans;
+       int refcheck;
        long rc = 0;
 
        LASSERT(cache);
 
-       env = cl_env_nested_get(&nest);
+       env = cl_env_get(&refcheck);
        if (IS_ERR(env))
                return 0;
 
        spin_unlock(&cache->ccc_lru_lock);
 
 out:
-       cl_env_nested_put(&nest, env);
+       cl_env_put(env, &refcheck);
        CDEBUG(D_CACHE, "%s: cli %p freed %ld pages.\n",
               cli->cl_import->imp_obd->obd_name, cli, rc);
        return rc;