#include <linux/file.h>
 #include <linux/falloc.h>
 #include <linux/slab.h>
+#include <linux/kthread.h>
 
 #include "idmap.h"
 #include "cache.h"
        return status;
 }
 
+void nfs4_put_copy(struct nfsd4_copy *copy)
+{
+       if (!refcount_dec_and_test(©->refcount))
+               return;
+       kfree(copy);
+}
+
+static bool
+check_and_set_stop_copy(struct nfsd4_copy *copy)
+{
+       bool value;
+
+       spin_lock(©->cp_clp->async_lock);
+       value = copy->stopped;
+       if (!copy->stopped)
+               copy->stopped = true;
+       spin_unlock(©->cp_clp->async_lock);
+       return value;
+}
+
+static void nfsd4_stop_copy(struct nfsd4_copy *copy)
+{
+       /* only 1 thread should stop the copy */
+       if (!check_and_set_stop_copy(copy))
+               kthread_stop(copy->copy_task);
+       nfs4_put_copy(copy);
+}
+
+static struct nfsd4_copy *nfsd4_get_copy(struct nfs4_client *clp)
+{
+       struct nfsd4_copy *copy = NULL;
+
+       spin_lock(&clp->async_lock);
+       if (!list_empty(&clp->async_copies)) {
+               copy = list_first_entry(&clp->async_copies, struct nfsd4_copy,
+                                       copies);
+               refcount_inc(©->refcount);
+       }
+       spin_unlock(&clp->async_lock);
+       return copy;
+}
+
+void nfsd4_shutdown_copy(struct nfs4_client *clp)
+{
+       struct nfsd4_copy *copy;
+
+       while ((copy = nfsd4_get_copy(clp)) != NULL)
+               nfsd4_stop_copy(copy);
+}
+
+static void nfsd4_cb_offload_release(struct nfsd4_callback *cb)
+{
+       struct nfsd4_copy *copy = container_of(cb, struct nfsd4_copy, cp_cb);
+
+       nfs4_put_copy(copy);
+}
+
+static int nfsd4_cb_offload_done(struct nfsd4_callback *cb,
+                                struct rpc_task *task)
+{
+       return 1;
+}
+
+static const struct nfsd4_callback_ops nfsd4_cb_offload_ops = {
+       .release = nfsd4_cb_offload_release,
+       .done = nfsd4_cb_offload_done
+};
+
+static void nfsd4_init_copy_res(struct nfsd4_copy *copy, bool sync)
+{
+       copy->cp_res.wr_stable_how = NFS_UNSTABLE;
+       copy->cp_synchronous = sync;
+       gen_boot_verifier(©->cp_res.wr_verifier, copy->cp_clp->net);
+}
+
+static ssize_t _nfsd_copy_file_range(struct nfsd4_copy *copy)
+{
+       ssize_t bytes_copied = 0;
+       size_t bytes_total = copy->cp_count;
+       u64 src_pos = copy->cp_src_pos;
+       u64 dst_pos = copy->cp_dst_pos;
+
+       do {
+               if (kthread_should_stop())
+                       break;
+               bytes_copied = nfsd_copy_file_range(copy->file_src, src_pos,
+                               copy->file_dst, dst_pos, bytes_total);
+               if (bytes_copied <= 0)
+                       break;
+               bytes_total -= bytes_copied;
+               copy->cp_res.wr_bytes_written += bytes_copied;
+               src_pos += bytes_copied;
+               dst_pos += bytes_copied;
+       } while (bytes_total > 0 && !copy->cp_synchronous);
+       return bytes_copied;
+}
+
+static __be32 nfsd4_do_copy(struct nfsd4_copy *copy, bool sync)
+{
+       __be32 status;
+       ssize_t bytes;
+
+       bytes = _nfsd_copy_file_range(copy);
+       /* for async copy, we ignore the error, client can always retry
+        * to get the error
+        */
+       if (bytes < 0 && !copy->cp_res.wr_bytes_written)
+               status = nfserrno(bytes);
+       else {
+               nfsd4_init_copy_res(copy, sync);
+               status = nfs_ok;
+       }
+
+       fput(copy->file_src);
+       fput(copy->file_dst);
+       return status;
+}
+
+static void dup_copy_fields(struct nfsd4_copy *src, struct nfsd4_copy *dst)
+{
+       dst->cp_src_pos = src->cp_src_pos;
+       dst->cp_dst_pos = src->cp_dst_pos;
+       dst->cp_count = src->cp_count;
+       dst->cp_synchronous = src->cp_synchronous;
+       memcpy(&dst->cp_res, &src->cp_res, sizeof(src->cp_res));
+       memcpy(&dst->fh, &src->fh, sizeof(src->fh));
+       dst->cp_clp = src->cp_clp;
+       dst->file_dst = get_file(src->file_dst);
+       dst->file_src = get_file(src->file_src);
+       memcpy(&dst->cp_stateid, &src->cp_stateid, sizeof(src->cp_stateid));
+}
+
+static void cleanup_async_copy(struct nfsd4_copy *copy)
+{
+       nfs4_free_cp_state(copy);
+       fput(copy->file_dst);
+       fput(copy->file_src);
+       spin_lock(©->cp_clp->async_lock);
+       list_del(©->copies);
+       spin_unlock(©->cp_clp->async_lock);
+       nfs4_put_copy(copy);
+}
+
+static int nfsd4_do_async_copy(void *data)
+{
+       struct nfsd4_copy *copy = (struct nfsd4_copy *)data;
+       struct nfsd4_copy *cb_copy;
+
+       copy->nfserr = nfsd4_do_copy(copy, 0);
+       cb_copy = kzalloc(sizeof(struct nfsd4_copy), GFP_KERNEL);
+       if (!cb_copy)
+               goto out;
+       memcpy(&cb_copy->cp_res, ©->cp_res, sizeof(copy->cp_res));
+       cb_copy->cp_clp = copy->cp_clp;
+       cb_copy->nfserr = copy->nfserr;
+       memcpy(&cb_copy->fh, ©->fh, sizeof(copy->fh));
+       nfsd4_init_cb(&cb_copy->cp_cb, cb_copy->cp_clp,
+                       &nfsd4_cb_offload_ops, NFSPROC4_CLNT_CB_OFFLOAD);
+       nfsd4_run_cb(&cb_copy->cp_cb);
+out:
+       cleanup_async_copy(copy);
+       return 0;
+}
+
 static __be32
 nfsd4_copy(struct svc_rqst *rqstp, struct nfsd4_compound_state *cstate,
                union nfsd4_op_u *u)
 {
        struct nfsd4_copy *copy = &u->copy;
-       struct file *src, *dst;
        __be32 status;
-       ssize_t bytes;
+       struct nfsd4_copy *async_copy = NULL;
 
-       status = nfsd4_verify_copy(rqstp, cstate, ©->cp_src_stateid, &src,
-                                  ©->cp_dst_stateid, &dst);
+       status = nfsd4_verify_copy(rqstp, cstate, ©->cp_src_stateid,
+                                  ©->file_src, ©->cp_dst_stateid,
+                                  ©->file_dst);
        if (status)
                goto out;
 
-       bytes = nfsd_copy_file_range(src, copy->cp_src_pos,
-                       dst, copy->cp_dst_pos, copy->cp_count);
+       copy->cp_clp = cstate->clp;
+       memcpy(©->fh, &cstate->current_fh.fh_handle,
+               sizeof(struct knfsd_fh));
+       if (!copy->cp_synchronous) {
+               struct nfsd_net *nn = net_generic(SVC_NET(rqstp), nfsd_net_id);
 
-       if (bytes < 0)
-               status = nfserrno(bytes);
-       else {
-               copy->cp_res.wr_bytes_written = bytes;
-               copy->cp_res.wr_stable_how = NFS_UNSTABLE;
-               copy->cp_synchronous = 1;
-               gen_boot_verifier(©->cp_res.wr_verifier, SVC_NET(rqstp));
+               status = nfserrno(-ENOMEM);
+               async_copy = kzalloc(sizeof(struct nfsd4_copy), GFP_KERNEL);
+               if (!async_copy)
+                       goto out;
+               if (!nfs4_init_cp_state(nn, copy)) {
+                       kfree(async_copy);
+                       goto out;
+               }
+               refcount_set(&async_copy->refcount, 1);
+               memcpy(©->cp_res.cb_stateid, ©->cp_stateid,
+                       sizeof(copy->cp_stateid));
+               dup_copy_fields(copy, async_copy);
+               async_copy->copy_task = kthread_create(nfsd4_do_async_copy,
+                               async_copy, "%s", "copy thread");
+               if (IS_ERR(async_copy->copy_task))
+                       goto out_err;
+               spin_lock(&async_copy->cp_clp->async_lock);
+               list_add(&async_copy->copies,
+                               &async_copy->cp_clp->async_copies);
+               spin_unlock(&async_copy->cp_clp->async_lock);
+               wake_up_process(async_copy->copy_task);
                status = nfs_ok;
-       }
-
-       fput(src);
-       fput(dst);
+       } else
+               status = nfsd4_do_copy(copy, 1);
 out:
        return status;
+out_err:
+       cleanup_async_copy(async_copy);
+       goto out;
+}
+
+struct nfsd4_copy *
+find_async_copy(struct nfs4_client *clp, stateid_t *stateid)
+{
+       struct nfsd4_copy *copy;
+
+       spin_lock(&clp->async_lock);
+       list_for_each_entry(copy, &clp->async_copies, copies) {
+               if (memcmp(©->cp_stateid, stateid, NFS4_STATEID_SIZE))
+                       continue;
+               refcount_inc(©->refcount);
+               spin_unlock(&clp->async_lock);
+               return copy;
+       }
+       spin_unlock(&clp->async_lock);
+       return NULL;
 }
 
 static __be32
                     struct nfsd4_compound_state *cstate,
                     union nfsd4_op_u *u)
 {
-       return 0;
+       struct nfsd4_offload_status *os = &u->offload_status;
+       __be32 status = 0;
+       struct nfsd4_copy *copy;
+       struct nfs4_client *clp = cstate->clp;
+
+       copy = find_async_copy(clp, &os->stateid);
+       if (copy)
+               nfsd4_stop_copy(copy);
+       else
+               status = nfserr_bad_stateid;
+
+       return status;
 }
 
 static __be32
                     struct nfsd4_compound_state *cstate,
                     union nfsd4_op_u *u)
 {
-       return nfserr_notsupp;
+       struct nfsd4_offload_status *os = &u->offload_status;
+       __be32 status = 0;
+       struct nfsd4_copy *copy;
+       struct nfs4_client *clp = cstate->clp;
+
+       copy = find_async_copy(clp, &os->stateid);
+       if (copy) {
+               os->count = copy->cp_res.wr_bytes_written;
+               nfs4_put_copy(copy);
+       } else
+               status = nfserr_bad_stateid;
+
+       return status;
 }
 
 static __be32
 
        return NULL;
 }
 
+/*
+ * Create a unique stateid_t to represent each COPY.
+ */
+int nfs4_init_cp_state(struct nfsd_net *nn, struct nfsd4_copy *copy)
+{
+       int new_id;
+
+       idr_preload(GFP_KERNEL);
+       spin_lock(&nn->s2s_cp_lock);
+       new_id = idr_alloc_cyclic(&nn->s2s_cp_stateids, copy, 0, 0, GFP_NOWAIT);
+       spin_unlock(&nn->s2s_cp_lock);
+       idr_preload_end();
+       if (new_id < 0)
+               return 0;
+       copy->cp_stateid.si_opaque.so_id = new_id;
+       copy->cp_stateid.si_opaque.so_clid.cl_boot = nn->boot_time;
+       copy->cp_stateid.si_opaque.so_clid.cl_id = nn->s2s_cp_cl_id;
+       return 1;
+}
+
+void nfs4_free_cp_state(struct nfsd4_copy *copy)
+{
+       struct nfsd_net *nn;
+
+       nn = net_generic(copy->cp_clp->net, nfsd_net_id);
+       spin_lock(&nn->s2s_cp_lock);
+       idr_remove(&nn->s2s_cp_stateids, copy->cp_stateid.si_opaque.so_id);
+       spin_unlock(&nn->s2s_cp_lock);
+}
+
 static struct nfs4_ol_stateid * nfs4_alloc_open_stateid(struct nfs4_client *clp)
 {
        struct nfs4_stid *stid;
 #ifdef CONFIG_NFSD_PNFS
        INIT_LIST_HEAD(&clp->cl_lo_states);
 #endif
+       INIT_LIST_HEAD(&clp->async_copies);
+       spin_lock_init(&clp->async_lock);
        spin_lock_init(&clp->cl_lock);
        rpc_init_wait_queue(&clp->cl_cb_waitq, "Backchannel slot table");
        return clp;
                }
        }
        nfsd4_return_all_client_layouts(clp);
+       nfsd4_shutdown_copy(clp);
        nfsd4_shutdown_callback(clp);
        if (clp->cl_cb_conn.cb_xprt)
                svc_xprt_put(clp->cl_cb_conn.cb_xprt);
                || !list_empty(&clp->cl_lo_states)
 #endif
                || !list_empty(&clp->cl_delegations)
-               || !list_empty(&clp->cl_sessions);
+               || !list_empty(&clp->cl_sessions)
+               || !list_empty(&clp->async_copies);
 }
 
 __be32
        INIT_LIST_HEAD(&nn->close_lru);
        INIT_LIST_HEAD(&nn->del_recall_lru);
        spin_lock_init(&nn->client_lock);
+       spin_lock_init(&nn->s2s_cp_lock);
+       idr_init(&nn->s2s_cp_stateids);
 
        spin_lock_init(&nn->blocked_locks_lock);
        INIT_LIST_HEAD(&nn->blocked_locks_lru);
 
        struct rpc_wait_queue   cl_cb_waitq;    /* backchannel callers may */
                                                /* wait here for slots */
        struct net              *net;
+       struct list_head        async_copies;   /* list of async copies */
+       spinlock_t              async_lock;     /* lock for async copies */
 };
 
 /* struct nfs4_client_reset
 
 struct nfsd4_compound_state;
 struct nfsd_net;
+struct nfsd4_copy;
 
 extern __be32 nfs4_preprocess_stateid_op(struct svc_rqst *rqstp,
                struct nfsd4_compound_state *cstate, struct svc_fh *fhp,
                     struct nfs4_stid **s, struct nfsd_net *nn);
 struct nfs4_stid *nfs4_alloc_stid(struct nfs4_client *cl, struct kmem_cache *slab,
                                  void (*sc_free)(struct nfs4_stid *));
+int nfs4_init_cp_state(struct nfsd_net *nn, struct nfsd4_copy *copy);
+void nfs4_free_cp_state(struct nfsd4_copy *copy);
 void nfs4_unhash_stid(struct nfs4_stid *s);
 void nfs4_put_stid(struct nfs4_stid *s);
 void nfs4_inc_and_copy_stateid(stateid_t *dst, struct nfs4_stid *stid);
 extern int nfsd4_create_callback_queue(void);
 extern void nfsd4_destroy_callback_queue(void);
 extern void nfsd4_shutdown_callback(struct nfs4_client *);
+extern void nfsd4_shutdown_copy(struct nfs4_client *clp);
 extern void nfsd4_prepare_cb_recall(struct nfs4_delegation *dp);
 extern struct nfs4_client_reclaim *nfs4_client_to_reclaim(const char *name,
                                                        struct nfsd_net *nn);
 
 struct nfs4_file *find_file(struct knfsd_fh *fh);
 void put_nfs4_file(struct nfs4_file *fi);
+extern void nfs4_put_copy(struct nfsd4_copy *copy);
+extern struct nfsd4_copy *
+find_async_copy(struct nfs4_client *clp, stateid_t *staetid);
 static inline void get_nfs4_file(struct nfs4_file *fi)
 {
        refcount_inc(&fi->fi_ref);