From: Guru Anbalagane Date: Tue, 13 Sep 2011 19:10:38 +0000 (-0700) Subject: Revert "ipc semaphores: reduce ipc_lock contention in semtimedop" X-Git-Tag: v2.6.39-400.9.0~912 X-Git-Url: https://www.infradead.org/git/?a=commitdiff_plain;h=5055d2f8619df80488a7419e4bc48d5eeb77b634;p=users%2Fjedix%2Flinux-maple.git Revert "ipc semaphores: reduce ipc_lock contention in semtimedop" This reverts commit c7fa322dd72b08450a440ef800124705a1fa148c. --- diff --git a/include/linux/sem.h b/include/linux/sem.h index 5a97a370c71d..f2961afa2f66 100644 --- a/include/linux/sem.h +++ b/include/linux/sem.h @@ -87,7 +87,6 @@ struct task_struct; struct sem { int semval; /* current value */ int sempid; /* pid of last operation */ - spinlock_t lock; struct list_head sem_pending; /* pending single-sop operations */ }; @@ -98,12 +97,15 @@ struct sem_array { time_t sem_otime; /* last semop time */ time_t sem_ctime; /* last change time */ struct sem *sem_base; /* ptr to first semaphore in array */ + struct list_head sem_pending; /* pending operations to be processed */ struct list_head list_id; /* undo requests on this array */ int sem_nsems; /* no. of semaphores in array */ + int complex_count; /* pending complex operations */ }; /* One queue for each sleeping process in the system. */ struct sem_queue { + struct list_head simple_list; /* queue of pending operations */ struct list_head list; /* queue of pending operations */ struct task_struct *sleeper; /* this process */ struct sem_undo *undo; /* undo structure */ diff --git a/ipc/sem.c b/ipc/sem.c index 4643652be3f5..e68a8f57682c 100644 --- a/ipc/sem.c +++ b/ipc/sem.c @@ -86,8 +86,6 @@ #include #include #include -#include -#include #include #include "util.h" @@ -200,23 +198,24 @@ static inline void sem_rmid(struct ipc_namespace *ns, struct sem_array *s) * Without the check/retry algorithm a lockless wakeup is possible: * - queue.status is initialized to -EINTR before blocking. * - wakeup is performed by + * * unlinking the queue entry from sma->sem_pending * * setting queue.status to IN_WAKEUP * This is the notification for the blocked thread that a * result value is imminent. * * call wake_up_process * * set queue.status to the final value. * - the previously blocked thread checks queue.status: - * * if it's IN_WAKEUP, then it must wait until the value changes - * * if it's not -EINTR, then the operation was completed by - * update_queue. semtimedop can return queue.status without - * performing any operation on the sem array. - * * otherwise it must find itself on the list of pending operations. + * * if it's IN_WAKEUP, then it must wait until the value changes + * * if it's not -EINTR, then the operation was completed by + * update_queue. semtimedop can return queue.status without + * performing any operation on the sem array. + * * otherwise it must acquire the spinlock and check what's up. * * The two-stage algorithm is necessary to protect against the following * races: * - if queue.status is set after wake_up_process, then the woken up idle - * thread could race forward and not realize its semaphore operation had - * happened. + * thread could race forward and try (and fail) to acquire sma->lock + * before update_queue had a chance to set queue.status * - if queue.status is written before wake_up_process and if the * blocked process is woken up by a signal between writing * queue.status and the wake_up_process, then the woken up @@ -279,11 +278,11 @@ static int newary(struct ipc_namespace *ns, struct ipc_params *params) sma->sem_base = (struct sem *) &sma[1]; - for (i = 0; i < nsems; i++) { + for (i = 0; i < nsems; i++) INIT_LIST_HEAD(&sma->sem_base[i].sem_pending); - spin_lock_init(&sma->sem_base[i].lock); - } + sma->complex_count = 0; + INIT_LIST_HEAD(&sma->sem_pending); INIT_LIST_HEAD(&sma->list_id); sma->sem_nsems = nsems; sma->sem_ctime = get_seconds(); @@ -341,116 +340,35 @@ SYSCALL_DEFINE3(semget, key_t, key, int, nsems, int, semflg) return ipcget(ns, &sem_ids(ns), &sem_ops, &sem_params); } -/* - * when a semaphore is modified, we want to retry the series of operations - * for anyone that was blocking on that semaphore. This breaks down into - * a few different common operations: - * - * 1) One modification releases one or more waiters for zero. - * 2) Many waiters are trying to get a single lock, only one will get it. - * 3) Many modifications to the count will succeed. - * - * For case one, we copy over anyone waiting for zero when the semval is - * zero. We don't bother copying them over if the semval isn't zero yet. - * - * For case two, we copy over the first queue trying to modify the semaphore, - * assuming it is trying to get a lock. - * - * For case three, after the first queue trying to change this semaphore is - * run, it will call this function again. It'll find the next queue - * that wants to change things at that time. - * - * The goal behind all of this is to avoid retrying atomic ops that have - * no hope of actually completing. It is optimized for the case where a - * call modifies a single semaphore at a time. - */ -static void copy_sem_queue(unsigned long semval, - unsigned short sem_num, struct list_head *queue, - struct list_head *dest) -{ - struct sem_queue *q; - struct sem_queue *safe; - - list_for_each_entry_safe(q, safe, queue, list) { - /* - * if this is a complex operation, we don't really know what is - * going on. Splice the whole list over to preserve the queue - * order. - */ - if (q->sops[0].sem_num != sem_num) { - list_splice_tail_init(queue, dest); - break; - } - - /* - * they are waiting for zero, leave it on the list if - * we're not at zero yet, otherwise copy it over - */ - if (q->sops[0].sem_op == 0) { - if (semval == 0) { - list_del(&q->list); - list_add_tail(&q->list, dest); - } - continue; - } - - /* - * at this point we know the first sop in the queue is - * changing this semaphore. Copy this one queue over - * and leave the rest. If more than one alter is going - * to succeed, the others will bubble in after each - * one is able to modify the queue. - */ - list_del(&q->list); - list_add_tail(&q->list, dest); - break; - } -} - /* * Determine whether a sequence of semaphore operations would succeed * all at once. Return 0 if yes, 1 if need to sleep, else return error code. */ -static noinline int try_atomic_semop (struct sem_array * sma, struct sembuf * sops, - int nsops, struct sem_undo *un, int pid, - struct list_head *pending, struct sem **blocker) + +static int try_atomic_semop (struct sem_array * sma, struct sembuf * sops, + int nsops, struct sem_undo *un, int pid) { int result, sem_op; struct sembuf *sop; struct sem * curr; - int last = 0; for (sop = sops; sop < sops + nsops; sop++) { curr = sma->sem_base + sop->sem_num; - - /* - * deal with userland sending the same - * sem_num twice. Thanks to sort they will - * be adjacent. We unlock in the loops below. - */ - if (sop == sops || last != sop->sem_num) - spin_lock(&curr->lock); - - last = sop->sem_num; sem_op = sop->sem_op; result = curr->semval; - - if (!sem_op && result) { - *blocker = curr; + + if (!sem_op && result) goto would_block; - } result += sem_op; - if (result < 0) { - *blocker = curr; + if (result < 0) goto would_block; - } if (result > SEMVMX) goto out_of_range; if (sop->sem_flg & SEM_UNDO) { int undo = un->semadj[sop->sem_num] - sem_op; /* - * Exceeding the undo range is an error. + * Exceeding the undo range is an error. */ if (undo < (-SEMAEM - 1) || undo > SEMAEM) goto out_of_range; @@ -465,27 +383,7 @@ static noinline int try_atomic_semop (struct sem_array * sma, struct sembuf * so un->semadj[sop->sem_num] -= sop->sem_op; sop--; } - - /* - * our operation is going to succeed, do any list splicing - * required so that we can try to wakeup people waiting on the - * sems we've changed. - */ - for (sop = sops; sop < sops + nsops; sop++) { - /* if there are duplicate sem_nums in the list - * we only want to process the first one - */ - if (sop != sops && last == sop->sem_num) - continue; - - curr = sma->sem_base + sop->sem_num; - if (sop->sem_op) - copy_sem_queue(curr->semval, sop->sem_num, - &curr->sem_pending, pending); - spin_unlock(&curr->lock); - last = sop->sem_num; - } - + return 0; out_of_range: @@ -493,32 +391,15 @@ out_of_range: goto undo; would_block: - if (sop->sem_flg & IPC_NOWAIT) { + if (sop->sem_flg & IPC_NOWAIT) result = -EAGAIN; - if (*blocker) { - /* - * the blocker doesn't put itself on any - * list for -EAGAIN, unlock it here - */ - spin_unlock(&(*blocker)->lock); - *blocker = NULL; - } - } else + else result = 1; undo: sop--; while (sop >= sops) { - curr = sma->sem_base + sop->sem_num; - - curr->semval -= sop->sem_op; - /* we leave the blocker locked, and we make sure not - * to unlock duplicates in the list twice - */ - if (curr != *blocker && - (sop == sops || (sop - 1)->sem_num != sop->sem_num)) { - spin_unlock(&curr->lock); - } + sma->sem_base[sop->sem_num].semval -= sop->sem_op; sop--; } @@ -544,7 +425,7 @@ static void wake_up_sem_queue_prepare(struct list_head *pt, q->status = IN_WAKEUP; q->pid = error; - list_add_tail(&q->list, pt); + list_add_tail(&q->simple_list, pt); } /** @@ -562,7 +443,7 @@ static void wake_up_sem_queue_do(struct list_head *pt) int did_something; did_something = !list_empty(pt); - list_for_each_entry_safe(q, t, pt, list) { + list_for_each_entry_safe(q, t, pt, simple_list) { wake_up_process(q->sleeper); /* q can disappear immediately after writing q->status. */ smp_wmb(); @@ -572,70 +453,153 @@ static void wake_up_sem_queue_do(struct list_head *pt) preempt_enable(); } +static void unlink_queue(struct sem_array *sma, struct sem_queue *q) +{ + list_del(&q->list); + if (q->nsops == 1) + list_del(&q->simple_list); + else + sma->complex_count--; +} + +/** check_restart(sma, q) + * @sma: semaphore array + * @q: the operation that just completed + * + * update_queue is O(N^2) when it restarts scanning the whole queue of + * waiting operations. Therefore this function checks if the restart is + * really necessary. It is called after a previously waiting operation + * was completed. + */ +static int check_restart(struct sem_array *sma, struct sem_queue *q) +{ + struct sem *curr; + struct sem_queue *h; + + /* if the operation didn't modify the array, then no restart */ + if (q->alter == 0) + return 0; + + /* pending complex operations are too difficult to analyse */ + if (sma->complex_count) + return 1; + + /* we were a sleeping complex operation. Too difficult */ + if (q->nsops > 1) + return 1; + + curr = sma->sem_base + q->sops[0].sem_num; + + /* No-one waits on this queue */ + if (list_empty(&curr->sem_pending)) + return 0; + + /* the new semaphore value */ + if (curr->semval) { + /* It is impossible that someone waits for the new value: + * - q is a previously sleeping simple operation that + * altered the array. It must be a decrement, because + * simple increments never sleep. + * - The value is not 0, thus wait-for-zero won't proceed. + * - If there are older (higher priority) decrements + * in the queue, then they have observed the original + * semval value and couldn't proceed. The operation + * decremented to value - thus they won't proceed either. + */ + BUG_ON(q->sops[0].sem_op >= 0); + return 0; + } + /* + * semval is 0. Check if there are wait-for-zero semops. + * They must be the first entries in the per-semaphore simple queue + */ + h = list_first_entry(&curr->sem_pending, struct sem_queue, simple_list); + BUG_ON(h->nsops != 1); + BUG_ON(h->sops[0].sem_num != q->sops[0].sem_num); + + /* Yes, there is a wait-for-zero semop. Restart */ + if (h->sops[0].sem_op == 0) + return 1; + + /* Again - no-one is waiting for the new value. */ + return 0; +} + + /** * update_queue(sma, semnum): Look for tasks that can be completed. * @sma: semaphore array. + * @semnum: semaphore that was modified. * @pt: list head for the tasks that must be woken up. - * @pending_list: list of struct sem_queues to try * * update_queue must be called after a semaphore in a semaphore array - * was modified. - * + * was modified. If multiple semaphore were modified, then @semnum + * must be set to -1. * The tasks that must be woken up are added to @pt. The return code * is stored in q->pid. * The function return 1 if at least one semop was completed successfully. */ -static int update_queue(struct sem_array *sma, struct list_head *pt, - struct list_head *pending_list) +static int update_queue(struct sem_array *sma, int semnum, struct list_head *pt) { struct sem_queue *q; - LIST_HEAD(new_pending); - LIST_HEAD(work_list); + struct list_head *walk; + struct list_head *pending_list; + int offset; int semop_completed = 0; - /* - * this seems strange, but what we want to do is process everything - * on the pending list, and then process any queues that have a chance - * to finish because of processing the pending list. - * - * So, we send new_pending to try_atomic_semop each time, and it - * splices any additional queues we have to try into new_pending. - * When the work list is empty, we splice new_pending into the - * work list and loop again. - * - * At the end of the whole thing, after we've built the largest - * possible list of tasks to wake up, we wake them in bulk. + /* if there are complex operations around, then knowing the semaphore + * that was modified doesn't help us. Assume that multiple semaphores + * were modified. */ - list_splice_init(pending_list, &work_list); -again: - while (!list_empty(&work_list)) { - struct sem *blocker; - int error; + if (sma->complex_count) + semnum = -1; + + if (semnum == -1) { + pending_list = &sma->sem_pending; + offset = offsetof(struct sem_queue, list); + } else { + pending_list = &sma->sem_base[semnum].sem_pending; + offset = offsetof(struct sem_queue, simple_list); + } - q = list_entry(work_list.next, struct sem_queue, list); - list_del_init(&q->list); +again: + walk = pending_list->next; + while (walk != pending_list) { + int error, restart; + + q = (struct sem_queue *)((char *)walk - offset); + walk = walk->next; + + /* If we are scanning the single sop, per-semaphore list of + * one semaphore and that semaphore is 0, then it is not + * necessary to scan the "alter" entries: simple increments + * that affect only one entry succeed immediately and cannot + * be in the per semaphore pending queue, and decrements + * cannot be successful if the value is already 0. + */ + if (semnum != -1 && sma->sem_base[semnum].semval == 0 && + q->alter) + break; - blocker = NULL; error = try_atomic_semop(sma, q->sops, q->nsops, - q->undo, q->pid, &new_pending, - &blocker); + q->undo, q->pid); /* Does q->sleeper still need to sleep? */ - if (error > 0) { - list_add_tail(&q->list, &blocker->sem_pending); - spin_unlock(&blocker->lock); + if (error > 0) continue; - } - if (!error) + unlink_queue(sma, q); + + if (error) { + restart = 0; + } else { semop_completed = 1; + restart = check_restart(sma, q); + } wake_up_sem_queue_prepare(pt, q, error); - - if (!list_empty(&new_pending)) { - list_splice_init(&new_pending, &work_list); + if (restart) goto again; - } } return semop_completed; } @@ -654,20 +618,25 @@ again: * responsible for calling wake_up_sem_queue_do(@pt). * It is safe to perform this call after dropping all locks. */ -static void do_smart_update(struct sem_array *sma, struct sembuf *sops, - int nsops, int otime, struct list_head *pt, - struct list_head *pending_list) +static void do_smart_update(struct sem_array *sma, struct sembuf *sops, int nsops, + int otime, struct list_head *pt) { int i; + if (sma->complex_count || sops == NULL) { + if (update_queue(sma, -1, pt)) + otime = 1; + goto done; + } + for (i = 0; i < nsops; i++) { if (sops[i].sem_op > 0 || (sops[i].sem_op < 0 && sma->sem_base[sops[i].sem_num].semval == 0)) - if (update_queue(sma, pt, pending_list)) + if (update_queue(sma, sops[i].sem_num, pt)) otime = 1; } - +done: if (otime) sma->sem_otime = get_seconds(); } @@ -686,11 +655,9 @@ static int count_semncnt (struct sem_array * sma, ushort semnum) { int semncnt; struct sem_queue * q; - struct sem *curr; - curr = &sma->sem_base[semnum]; semncnt = 0; - list_for_each_entry(q, &curr->sem_pending, list) { + list_for_each_entry(q, &sma->sem_pending, list) { struct sembuf * sops = q->sops; int nsops = q->nsops; int i; @@ -707,12 +674,9 @@ static int count_semzcnt (struct sem_array * sma, ushort semnum) { int semzcnt; struct sem_queue * q; - struct sem *curr; - - curr = &sma->sem_base[semnum]; semzcnt = 0; - list_for_each_entry(q, &curr->sem_pending, list) { + list_for_each_entry(q, &sma->sem_pending, list) { struct sembuf * sops = q->sops; int nsops = q->nsops; int i; @@ -741,7 +705,6 @@ static void freeary(struct ipc_namespace *ns, struct kern_ipc_perm *ipcp) struct sem_queue *q, *tq; struct sem_array *sma = container_of(ipcp, struct sem_array, sem_perm); struct list_head tasks; - int i; /* Free the existing undo structures for this semaphore set. */ assert_spin_locked(&sma->sem_perm.lock); @@ -754,15 +717,11 @@ static void freeary(struct ipc_namespace *ns, struct kern_ipc_perm *ipcp) call_rcu(&un->rcu, free_un); } + /* Wake up all pending processes and let them fail with EIDRM. */ INIT_LIST_HEAD(&tasks); - for (i = 0; i < sma->sem_nsems; i++) { - struct sem *curr = sma->sem_base + i; - spin_lock(&curr->lock); - list_for_each_entry_safe(q, tq, &curr->sem_pending, list) { - list_del_init(&q->list); - wake_up_sem_queue_prepare(&tasks, q, -EIDRM); - } - spin_unlock(&curr->lock); + list_for_each_entry_safe(q, tq, &sma->sem_pending, list) { + unlink_queue(sma, q); + wake_up_sem_queue_prepare(&tasks, q, -EIDRM); } /* Remove the semaphore set from the IDR */ @@ -947,7 +906,6 @@ static int semctl_main(struct ipc_namespace *ns, int semid, int semnum, { int i; struct sem_undo *un; - LIST_HEAD(pending); sem_getref_and_unlock(sma); @@ -979,15 +937,8 @@ static int semctl_main(struct ipc_namespace *ns, int semid, int semnum, goto out_free; } - for (i = 0; i < nsems; i++) { - curr = &sma->sem_base[i]; - - spin_lock(&curr->lock); - curr->semval = sem_io[i]; - copy_sem_queue(curr->semval, i, - &curr->sem_pending, &pending); - spin_unlock(&curr->lock); - } + for (i = 0; i < nsems; i++) + sma->sem_base[i].semval = sem_io[i]; assert_spin_locked(&sma->sem_perm.lock); list_for_each_entry(un, &sma->list_id, list_id) { @@ -996,7 +947,7 @@ static int semctl_main(struct ipc_namespace *ns, int semid, int semnum, } sma->sem_ctime = get_seconds(); /* maybe some queued-up processes were waiting for this */ - do_smart_update(sma, NULL, 0, 0, &tasks, &pending); + do_smart_update(sma, NULL, 0, 0, &tasks); err = 0; goto out_unlock; } @@ -1025,7 +976,6 @@ static int semctl_main(struct ipc_namespace *ns, int semid, int semnum, { int val = arg.val; struct sem_undo *un; - LIST_HEAD(pending); err = -ERANGE; if (val > SEMVMX || val < 0) @@ -1035,16 +985,11 @@ static int semctl_main(struct ipc_namespace *ns, int semid, int semnum, list_for_each_entry(un, &sma->list_id, list_id) un->semadj[semnum] = 0; - spin_lock(&curr->lock); curr->semval = val; - copy_sem_queue(curr->semval, semnum, - &curr->sem_pending, &pending); curr->sempid = task_tgid_vnr(current); - spin_unlock(&curr->lock); - sma->sem_ctime = get_seconds(); /* maybe some queued-up processes were waiting for this */ - do_smart_update(sma, NULL, 0, 0, &tasks, &pending); + do_smart_update(sma, NULL, 0, 0, &tasks); err = 0; goto out_unlock; } @@ -1342,67 +1287,6 @@ static int get_queue_result(struct sem_queue *q) } -/* - * since we take spinlocks on the semaphores based on the - * values from userland, we have to sort them to make sure - * we lock them in order - */ -static int sembuf_compare(const void *a, const void *b) -{ - const struct sembuf *abuf = a; - const struct sembuf *bbuf = b; - - if (abuf->sem_num < bbuf->sem_num) - return -1; - if (abuf->sem_num > bbuf->sem_num) - return 1; - return 0; -} - -/* - * if a process wakes up on its own while on a semaphore list - * we have to take it off the list before that process can exit. - * - * We check all the semaphore's the sem_queue was trying to modify - * and if we find the sem_queue, we remove it and return. - * - * If we don't find the sem_queue its because someone is about to - * wake us up, and they have removed us from the list. - * We schedule and try again in hopes that they do it real soon now. - * - * We check queue->status to detect if someone did actually manage to - * wake us up. - */ -static int remove_queue_from_lists(struct sem_array *sma, - struct sem_queue *queue) -{ - struct sembuf *sops = queue->sops; - struct sembuf *sop; - struct sem * curr; - struct sem_queue *test; - -again: - for (sop = sops; sop < sops + queue->nsops; sop++) { - curr = sma->sem_base + sop->sem_num; - spin_lock(&curr->lock); - list_for_each_entry(test, &curr->sem_pending, list) { - if (test == queue) { - list_del(&test->list); - spin_unlock(&curr->lock); - goto found; - } - } - spin_unlock(&curr->lock); - } - if (queue->status == -EINTR) { - set_current_state(TASK_RUNNING); - schedule(); - goto again; - } -found: - return 0; -} - SYSCALL_DEFINE4(semtimedop, int, semid, struct sembuf __user *, tsops, unsigned, nsops, const struct timespec __user *, timeout) { @@ -1416,8 +1300,6 @@ SYSCALL_DEFINE4(semtimedop, int, semid, struct sembuf __user *, tsops, unsigned long jiffies_left = 0; struct ipc_namespace *ns; struct list_head tasks; - struct sem *blocker = NULL; - LIST_HEAD(pending); ns = current->nsproxy->ipc_ns; @@ -1457,14 +1339,6 @@ SYSCALL_DEFINE4(semtimedop, int, semid, struct sembuf __user *, tsops, alter = 1; } - /* - * try_atomic_semop takes all the locks of all the semaphores in - * the sops array. We have to make sure we don't deadlock if userland - * happens to send them out of order, so we sort them by semnum. - */ - if (nsops > 1) - sort(sops, nsops, sizeof(*sops), sembuf_compare, NULL); - if (undos) { un = find_alloc_undo(ns, semid); if (IS_ERR(un)) { @@ -1521,23 +1395,12 @@ SYSCALL_DEFINE4(semtimedop, int, semid, struct sembuf __user *, tsops, if (error) goto out_unlock_free; - /* - * undos are scary, keep the lock if we have to deal with undos. - * Otherwise, drop the big fat ipc lock and use the fine grained - * per-semaphore locks instead. - */ - if (!un) - sem_getref_and_unlock(sma); - - error = try_atomic_semop (sma, sops, nsops, un, task_tgid_vnr(current), - &pending, &blocker); + error = try_atomic_semop (sma, sops, nsops, un, task_tgid_vnr(current)); if (error <= 0) { if (alter && error == 0) - do_smart_update(sma, sops, nsops, 1, &tasks, &pending); - if (un) - goto out_unlock_free; - else - goto out_putref; + do_smart_update(sma, sops, nsops, 1, &tasks); + + goto out_unlock_free; } /* We need to sleep on this operation, so we put the current @@ -1549,23 +1412,28 @@ SYSCALL_DEFINE4(semtimedop, int, semid, struct sembuf __user *, tsops, queue.undo = un; queue.pid = task_tgid_vnr(current); queue.alter = alter; - queue.status = -EINTR; - queue.sleeper = current; - current->state = TASK_INTERRUPTIBLE; - - /* - * we could be woken up at any time after we add ourselves to the - * blocker's list and unlock the spinlock. So, all queue setup - * must be done before this point - */ if (alter) - list_add_tail(&queue.list, &blocker->sem_pending); + list_add_tail(&queue.list, &sma->sem_pending); else - list_add(&queue.list, &blocker->sem_pending); - spin_unlock(&blocker->lock); + list_add(&queue.list, &sma->sem_pending); - if (un) - sem_getref_and_unlock(sma); + if (nsops == 1) { + struct sem *curr; + curr = &sma->sem_base[sops->sem_num]; + + if (alter) + list_add_tail(&queue.simple_list, &curr->sem_pending); + else + list_add(&queue.simple_list, &curr->sem_pending); + } else { + INIT_LIST_HEAD(&queue.simple_list); + sma->complex_count++; + } + + queue.status = -EINTR; + queue.sleeper = current; + current->state = TASK_INTERRUPTIBLE; + sem_unlock(sma); if (timeout) jiffies_left = schedule_timeout(jiffies_left); @@ -1574,32 +1442,7 @@ SYSCALL_DEFINE4(semtimedop, int, semid, struct sembuf __user *, tsops, error = get_queue_result(&queue); - /* - * we are lock free right here, and we could have timed out or - * gotten a signal, so we need to be really careful with how we - * play with queue.status. It has three possible states: - * - * -EINTR, which means nobody has changed it since we slept. This - * means we woke up on our own. - * - * IN_WAKEUP, someone is currently waking us up. We need to loop - * here until they change it to the operation error value. If - * we don't loop, our process could exit before they are done waking us - * - * operation error value: we've been properly woken up and can exit - * at any time. - * - * If queue.status is currently -EINTR, we are still being processed - * by the semtimedop core. Someone either has us on a list head - * or is currently poking our queue struct. We need to find that - * reference and remove it, which is what remove_queue_from_lists - * does. - * - * We always check for both -EINTR and IN_WAKEUP because we have no - * locks held. Someone could change us from -EINTR to IN_WAKEUP at - * any time. - */ - if (error != -EINTR && error != IN_WAKEUP) { + if (error != -EINTR) { /* fast path: update_queue already obtained all requested * resources. * Perform a smp_mb(): User space could assume that semop() @@ -1609,41 +1452,44 @@ SYSCALL_DEFINE4(semtimedop, int, semid, struct sembuf __user *, tsops, */ smp_mb(); - goto out_putref; + goto out_free; } + sma = sem_lock(ns, semid); + /* - * Someone has a reference on us, lets find it. + * Wait until it's guaranteed that no wakeup_sem_queue_do() is ongoing. */ - remove_queue_from_lists(sma, &queue); - - /* check the status again in case we were woken up */ error = get_queue_result(&queue); - while(unlikely(error == IN_WAKEUP)) { - cpu_relax(); - error = get_queue_result(&queue); + + /* + * Array removed? If yes, leave without sem_unlock(). + */ + if (IS_ERR(sma)) { + error = -EIDRM; + goto out_free; } + + /* - * at this point we know nobody can possibly wake us up, if error - * isn't -EINTR, the wakeup did happen and our semaphore operation is - * complete. Otherwise, we return -EAGAIN. + * If queue.status != -EINTR we are woken up by another process. + * Leave without unlink_queue(), but with sem_unlock(). */ - if (error != -EINTR) - goto out_putref; + + if (error != -EINTR) { + goto out_unlock_free; + } /* * If an interrupt occurred we have to clean up the queue */ if (timeout && jiffies_left == 0) error = -EAGAIN; - -out_putref: - sem_putref(sma); - goto out_wakeup; + unlink_queue(sma, &queue); out_unlock_free: sem_unlock(sma); -out_wakeup: + wake_up_sem_queue_do(&tasks); out_free: if(sops != fast_sops) @@ -1703,15 +1549,12 @@ void exit_sem(struct task_struct *tsk) return; for (;;) { - struct list_head pending; struct sem_array *sma; struct sem_undo *un; struct list_head tasks; int semid; int i; - INIT_LIST_HEAD(&pending); - rcu_read_lock(); un = list_entry_rcu(ulp->list_proc.next, struct sem_undo, list_proc); @@ -1751,7 +1594,6 @@ void exit_sem(struct task_struct *tsk) for (i = 0; i < sma->sem_nsems; i++) { struct sem * semaphore = &sma->sem_base[i]; if (un->semadj[i]) { - spin_lock(&semaphore->lock); semaphore->semval += un->semadj[i]; /* * Range checks of the new semaphore value, @@ -1771,15 +1613,11 @@ void exit_sem(struct task_struct *tsk) if (semaphore->semval > SEMVMX) semaphore->semval = SEMVMX; semaphore->sempid = task_tgid_vnr(current); - copy_sem_queue(semaphore->semval, i, - &semaphore->sem_pending, - &pending); - spin_unlock(&semaphore->lock); } } /* maybe some queued-up processes were waiting for this */ INIT_LIST_HEAD(&tasks); - do_smart_update(sma, NULL, 0, 1, &tasks, &pending); + do_smart_update(sma, NULL, 0, 1, &tasks); sem_unlock(sma); wake_up_sem_queue_do(&tasks);