return NULL;
}
-/* Allocate a work queue and threads. */
+/* Allocate a work queue and threads. Returns zero or negative error code. */
int
workqueue_create(
struct workqueue *wq,
int err = 0;
memset(wq, 0, sizeof(*wq));
- err = pthread_cond_init(&wq->wakeup, NULL);
+ err = -pthread_cond_init(&wq->wakeup, NULL);
if (err)
return err;
- err = pthread_mutex_init(&wq->lock, NULL);
+ err = -pthread_mutex_init(&wq->lock, NULL);
if (err)
goto out_cond;
wq->thread_count = nr_workers;
wq->threads = malloc(nr_workers * sizeof(pthread_t));
if (!wq->threads) {
- err = errno;
+ err = -errno;
goto out_mutex;
}
wq->terminate = false;
wq->terminated = false;
for (i = 0; i < nr_workers; i++) {
- err = pthread_create(&wq->threads[i], NULL, workqueue_thread,
+ err = -pthread_create(&wq->threads[i], NULL, workqueue_thread,
wq);
if (err)
break;
}
/*
- * Create a work item consisting of a function and some arguments and
- * schedule the work item to be run via the thread pool.
+ * Create a work item consisting of a function and some arguments and schedule
+ * the work item to be run via the thread pool. Returns zero or a negative
+ * error code.
*/
int
workqueue_add(
wi = malloc(sizeof(struct workqueue_item));
if (!wi)
- return errno;
+ return -errno;
wi->function = func;
wi->index = index;
pthread_mutex_lock(&wq->lock);
if (wq->next_item == NULL) {
assert(wq->item_count == 0);
- ret = pthread_cond_signal(&wq->wakeup);
+ ret = -pthread_cond_signal(&wq->wakeup);
if (ret) {
pthread_mutex_unlock(&wq->lock);
free(wi);
/*
* Wait for all pending work items to be processed and tear down the
- * workqueue thread pool.
+ * workqueue thread pool. Returns zero or a negative error code.
*/
int
workqueue_terminate(
wq->terminate = true;
pthread_mutex_unlock(&wq->lock);
- ret = pthread_cond_broadcast(&wq->wakeup);
+ ret = -pthread_cond_broadcast(&wq->wakeup);
if (ret)
return ret;
for (i = 0; i < wq->thread_count; i++) {
- ret = pthread_join(wq->threads[i], NULL);
+ ret = -pthread_join(wq->threads[i], NULL);
if (ret)
return ret;
}
{
int err;
- err = workqueue_create(wq, mp, nworkers);
+ err = -workqueue_create(wq, mp, nworkers);
if (err)
do_error(_("cannot create worker threads, error = [%d] %s\n"),
err, strerror(err));
{
int err;
- err = workqueue_add(wq, func, agno, arg);
+ err = -workqueue_add(wq, func, agno, arg);
if (err)
do_error(_("cannot allocate worker item, error = [%d] %s\n"),
err, strerror(err));
{
int err;
- err = workqueue_terminate(wq);
+ err = -workqueue_terminate(wq);
if (err)
do_error(_("cannot terminate worker item, error = [%d] %s\n"),
err, strerror(err));
if (!ci)
return errno;
- ret = workqueue_create(&wq, (struct xfs_mount *)ctx,
+ ret = -workqueue_create(&wq, (struct xfs_mount *)ctx,
scrub_nproc_workqueue(ctx));
if (ret)
goto out_free;
for (agno = 0; agno < ctx->mnt.fsgeom.agcount && !ci->error; agno++) {
- ret = workqueue_add(&wq, count_ag_inodes, agno, ci);
+ ret = -workqueue_add(&wq, count_ag_inodes, agno, ci);
if (ret)
break;
}
- ret2 = workqueue_terminate(&wq);
+ ret2 = -workqueue_terminate(&wq);
if (!ret && ret2)
ret = ret2;
workqueue_destroy(&wq);
struct workqueue wq;
int ret;
- ret = workqueue_create(&wq, (struct xfs_mount *)ctx,
+ ret = -workqueue_create(&wq, (struct xfs_mount *)ctx,
scrub_nproc_workqueue(ctx));
if (ret) {
str_liberror(ctx, ret, _("creating bulkstat workqueue"));
}
for (agno = 0; agno < ctx->mnt.fsgeom.agcount; agno++) {
- ret = workqueue_add(&wq, scan_ag_inodes, agno, &si);
+ ret = -workqueue_add(&wq, scan_ag_inodes, agno, &si);
if (ret) {
si.aborted = true;
str_liberror(ctx, ret, _("queueing bulkstat work"));
}
}
- ret = workqueue_terminate(&wq);
+ ret = -workqueue_terminate(&wq);
if (ret) {
si.aborted = true;
str_liberror(ctx, ret, _("finishing bulkstat work"));
bool aborted = false;
int ret, ret2;
- ret = workqueue_create(&wq, (struct xfs_mount *)ctx,
+ ret = -workqueue_create(&wq, (struct xfs_mount *)ctx,
scrub_nproc_workqueue(ctx));
if (ret) {
str_liberror(ctx, ret, _("creating scrub workqueue"));
goto out;
for (agno = 0; !aborted && agno < ctx->mnt.fsgeom.agcount; agno++) {
- ret = workqueue_add(&wq, scan_ag_metadata, agno, &aborted);
+ ret = -workqueue_add(&wq, scan_ag_metadata, agno, &aborted);
if (ret) {
str_liberror(ctx, ret, _("queueing per-AG scrub work"));
goto out;
if (aborted)
goto out;
- ret = workqueue_add(&wq, scan_fs_metadata, 0, &aborted);
+ ret = -workqueue_add(&wq, scan_fs_metadata, 0, &aborted);
if (ret) {
str_liberror(ctx, ret, _("queueing per-FS scrub work"));
goto out;
}
out:
- ret2 = workqueue_terminate(&wq);
+ ret2 = -workqueue_terminate(&wq);
if (ret2) {
str_liberror(ctx, ret2, _("finishing scrub work"));
if (!ret && ret2)
bool aborted = false;
int ret;
- ret = workqueue_create(&wq, (struct xfs_mount *)ctx,
+ ret = -workqueue_create(&wq, (struct xfs_mount *)ctx,
scrub_nproc_workqueue(ctx));
if (ret) {
str_liberror(ctx, ret, _("creating repair workqueue"));
if (action_list_length(&ctx->action_lists[agno]) == 0)
continue;
- ret = workqueue_add(&wq, repair_ag, agno, &aborted);
+ ret = -workqueue_add(&wq, repair_ag, agno, &aborted);
if (ret) {
str_liberror(ctx, ret, _("queueing repair work"));
break;
}
}
- ret = workqueue_terminate(&wq);
+ ret = -workqueue_terminate(&wq);
if (ret)
str_liberror(ctx, ret, _("finishing repair work"));
workqueue_destroy(&wq);
&rvp->rvstate);
if (ret)
goto out_counter;
- ret = workqueue_create(&rvp->wq, (struct xfs_mount *)rvp,
+ ret = -workqueue_create(&rvp->wq, (struct xfs_mount *)rvp,
verifier_threads == 1 ? 0 : verifier_threads);
if (ret)
goto out_rvstate;
read_verify_pool_flush(
struct read_verify_pool *rvp)
{
- return workqueue_terminate(&rvp->wq);
+ return -workqueue_terminate(&rvp->wq);
}
/* Finish up any read verification work and tear it down. */
memcpy(tmp, rv, sizeof(*tmp));
- ret = workqueue_add(&rvp->wq, read_verify, 0, tmp);
+ ret = -workqueue_add(&rvp->wq, read_verify, 0, tmp);
if (ret) {
free(tmp);
rvp->runtime_error = ret;
xfs_agnumber_t agno;
int ret;
- ret = workqueue_create(&wq, (struct xfs_mount *)ctx,
+ ret = -workqueue_create(&wq, (struct xfs_mount *)ctx,
scrub_nproc_workqueue(ctx));
if (ret) {
str_liberror(ctx, ret, _("creating fsmap workqueue"));
return ret;
}
if (ctx->fsinfo.fs_rt) {
- ret = workqueue_add(&wq, scan_rt_rmaps,
+ ret = -workqueue_add(&wq, scan_rt_rmaps,
ctx->mnt.fsgeom.agcount + 1, &sbx);
if (ret) {
sbx.aborted = true;
}
}
if (ctx->fsinfo.fs_log) {
- ret = workqueue_add(&wq, scan_log_rmaps,
+ ret = -workqueue_add(&wq, scan_log_rmaps,
ctx->mnt.fsgeom.agcount + 2, &sbx);
if (ret) {
sbx.aborted = true;
}
}
for (agno = 0; agno < ctx->mnt.fsgeom.agcount; agno++) {
- ret = workqueue_add(&wq, scan_ag_rmaps, agno, &sbx);
+ ret = -workqueue_add(&wq, scan_ag_rmaps, agno, &sbx);
if (ret) {
sbx.aborted = true;
str_liberror(ctx, ret, _("queueing per-AG fsmap work"));
}
}
out:
- ret = workqueue_terminate(&wq);
+ ret = -workqueue_terminate(&wq);
if (ret) {
sbx.aborted = true;
str_liberror(ctx, ret, _("finishing fsmap work"));
new_sftd->rootdir = is_rootdir;
inc_nr_dirs(sft);
- error = workqueue_add(wq, scan_fs_dir, 0, new_sftd);
+ error = -workqueue_add(wq, scan_fs_dir, 0, new_sftd);
if (error) {
dec_nr_dirs(sft);
str_liberror(ctx, error, _("queueing directory scan work"));
goto out_mutex;
}
- ret = workqueue_create(&wq, (struct xfs_mount *)ctx,
+ ret = -workqueue_create(&wq, (struct xfs_mount *)ctx,
scrub_nproc_workqueue(ctx));
if (ret) {
str_liberror(ctx, ret, _("creating directory scan workqueue"));
assert(sft.nr_dirs == 0);
pthread_mutex_unlock(&sft.lock);
- ret = workqueue_terminate(&wq);
+ ret = -workqueue_terminate(&wq);
if (ret) {
str_liberror(ctx, ret, _("finishing directory scan work"));
goto out_wq;