]> www.infradead.org Git - users/jedix/linux-maple.git/commitdiff
Revert "ipc semaphores: reduce ipc_lock contention in semtimedop"
authorGuru Anbalagane <guru.anbalagane@oracle.com>
Tue, 13 Sep 2011 19:10:38 +0000 (12:10 -0700)
committerGuru Anbalagane <guru.anbalagane@oracle.com>
Tue, 13 Sep 2011 19:10:38 +0000 (12:10 -0700)
This reverts commit c7fa322dd72b08450a440ef800124705a1fa148c.

include/linux/sem.h
ipc/sem.c

index 5a97a370c71deb3fc533bd922ae00d6f83807dc4..f2961afa2f6657e9bbf9c79cd25b0d2c2648d86d 100644 (file)
@@ -87,7 +87,6 @@ struct task_struct;
 struct sem {
        int     semval;         /* current value */
        int     sempid;         /* pid of last operation */
-       spinlock_t              lock;
        struct list_head sem_pending; /* pending single-sop operations */
 };
 
@@ -98,12 +97,15 @@ struct sem_array {
        time_t                  sem_otime;      /* last semop time */
        time_t                  sem_ctime;      /* last change time */
        struct sem              *sem_base;      /* ptr to first semaphore in array */
+       struct list_head        sem_pending;    /* pending operations to be processed */
        struct list_head        list_id;        /* undo requests on this array */
        int                     sem_nsems;      /* no. of semaphores in array */
+       int                     complex_count;  /* pending complex operations */
 };
 
 /* One queue for each sleeping process in the system. */
 struct sem_queue {
+       struct list_head        simple_list; /* queue of pending operations */
        struct list_head        list;    /* queue of pending operations */
        struct task_struct      *sleeper; /* this process */
        struct sem_undo         *undo;   /* undo structure */
index 4643652be3f530571df84a6d02e7c546c1f3804c..e68a8f57682c1113f1bf036ab25e2493f54e2b27 100644 (file)
--- a/ipc/sem.c
+++ b/ipc/sem.c
@@ -86,8 +86,6 @@
 #include <linux/rwsem.h>
 #include <linux/nsproxy.h>
 #include <linux/ipc_namespace.h>
-#include <linux/sort.h>
-#include <linux/list_sort.h>
 
 #include <asm/uaccess.h>
 #include "util.h"
@@ -200,23 +198,24 @@ static inline void sem_rmid(struct ipc_namespace *ns, struct sem_array *s)
  * Without the check/retry algorithm a lockless wakeup is possible:
  * - queue.status is initialized to -EINTR before blocking.
  * - wakeup is performed by
+ *     * unlinking the queue entry from sma->sem_pending
  *     * setting queue.status to IN_WAKEUP
  *       This is the notification for the blocked thread that a
  *       result value is imminent.
  *     * call wake_up_process
  *     * set queue.status to the final value.
  * - the previously blocked thread checks queue.status:
- *     * if it's IN_WAKEUP, then it must wait until the value changes
- *     * if it's not -EINTR, then the operation was completed by
- *       update_queue. semtimedop can return queue.status without
- *       performing any operation on the sem array.
- *     * otherwise it must find itself on the list of pending operations.
+ *     * if it's IN_WAKEUP, then it must wait until the value changes
+ *     * if it's not -EINTR, then the operation was completed by
+ *       update_queue. semtimedop can return queue.status without
+ *       performing any operation on the sem array.
+ *     * otherwise it must acquire the spinlock and check what's up.
  *
  * The two-stage algorithm is necessary to protect against the following
  * races:
  * - if queue.status is set after wake_up_process, then the woken up idle
- *   thread could race forward and not realize its semaphore operation had
- *   happened.
+ *   thread could race forward and try (and fail) to acquire sma->lock
+ *   before update_queue had a chance to set queue.status
  * - if queue.status is written before wake_up_process and if the
  *   blocked process is woken up by a signal between writing
  *   queue.status and the wake_up_process, then the woken up
@@ -279,11 +278,11 @@ static int newary(struct ipc_namespace *ns, struct ipc_params *params)
 
        sma->sem_base = (struct sem *) &sma[1];
 
-       for (i = 0; i < nsems; i++) {
+       for (i = 0; i < nsems; i++)
                INIT_LIST_HEAD(&sma->sem_base[i].sem_pending);
-               spin_lock_init(&sma->sem_base[i].lock);
-       }
 
+       sma->complex_count = 0;
+       INIT_LIST_HEAD(&sma->sem_pending);
        INIT_LIST_HEAD(&sma->list_id);
        sma->sem_nsems = nsems;
        sma->sem_ctime = get_seconds();
@@ -341,116 +340,35 @@ SYSCALL_DEFINE3(semget, key_t, key, int, nsems, int, semflg)
        return ipcget(ns, &sem_ids(ns), &sem_ops, &sem_params);
 }
 
-/*
- * when a semaphore is modified, we want to retry the series of operations
- * for anyone that was blocking on that semaphore.  This breaks down into
- * a few different common operations:
- *
- * 1) One modification releases one or more waiters for zero.
- * 2) Many waiters are trying to get a single lock, only one will get it.
- * 3) Many modifications to the count will succeed.
- *
- * For case one, we copy over anyone waiting for zero when the semval is
- * zero.  We don't bother copying them over if the semval isn't zero yet.
- *
- * For case two, we copy over the first queue trying to modify the semaphore,
- * assuming it is trying to get a lock.
- *
- * For case three, after the first queue trying to change this semaphore is
- * run, it will call this function again.  It'll find the next queue
- * that wants to change things at that time.
- *
- * The goal behind all of this is to avoid retrying atomic ops that have
- * no hope of actually completing.  It is optimized for the case where a
- * call modifies a single semaphore at a time.
- */
-static void copy_sem_queue(unsigned long semval,
-                          unsigned short sem_num, struct list_head *queue,
-                          struct list_head *dest)
-{
-       struct sem_queue *q;
-       struct sem_queue *safe;
-
-       list_for_each_entry_safe(q, safe, queue, list) {
-               /*
-                * if this is a complex operation, we don't really know what is
-                * going on.  Splice the whole list over to preserve the queue
-                * order.
-                */
-               if (q->sops[0].sem_num != sem_num) {
-                       list_splice_tail_init(queue, dest);
-                       break;
-               }
-
-               /*
-                * they are waiting for zero, leave it on the list if
-                * we're not at zero yet, otherwise copy it over
-                */
-               if (q->sops[0].sem_op == 0) {
-                       if (semval == 0) {
-                               list_del(&q->list);
-                               list_add_tail(&q->list, dest);
-                       }
-                       continue;
-               }
-
-               /*
-                * at this point we know the first sop in the queue is
-                * changing this semaphore.  Copy this one queue over
-                * and leave the rest.  If more than one alter is going
-                * to succeed, the others will bubble in after each
-                * one is able to modify the queue.
-                */
-               list_del(&q->list);
-               list_add_tail(&q->list, dest);
-               break;
-       }
-}
-
 /*
  * Determine whether a sequence of semaphore operations would succeed
  * all at once. Return 0 if yes, 1 if need to sleep, else return error code.
  */
-static noinline int try_atomic_semop (struct sem_array * sma, struct sembuf * sops,
-                            int nsops, struct sem_undo *un, int pid,
-                            struct list_head *pending, struct sem **blocker)
+
+static int try_atomic_semop (struct sem_array * sma, struct sembuf * sops,
+                            int nsops, struct sem_undo *un, int pid)
 {
        int result, sem_op;
        struct sembuf *sop;
        struct sem * curr;
-       int last = 0;
 
        for (sop = sops; sop < sops + nsops; sop++) {
                curr = sma->sem_base + sop->sem_num;
-
-               /*
-                * deal with userland sending the same
-                * sem_num twice.  Thanks to sort they will
-                * be adjacent.  We unlock in the loops below.
-                */
-               if (sop == sops || last != sop->sem_num)
-                       spin_lock(&curr->lock);
-
-               last = sop->sem_num;
                sem_op = sop->sem_op;
                result = curr->semval;
-
-               if (!sem_op && result) {
-                       *blocker = curr;
+  
+               if (!sem_op && result)
                        goto would_block;
-               }
 
                result += sem_op;
-               if (result < 0) {
-                       *blocker = curr;
+               if (result < 0)
                        goto would_block;
-               }
                if (result > SEMVMX)
                        goto out_of_range;
                if (sop->sem_flg & SEM_UNDO) {
                        int undo = un->semadj[sop->sem_num] - sem_op;
                        /*
-                        *      Exceeding the undo range is an error.
+                        *      Exceeding the undo range is an error.
                         */
                        if (undo < (-SEMAEM - 1) || undo > SEMAEM)
                                goto out_of_range;
@@ -465,27 +383,7 @@ static noinline int try_atomic_semop (struct sem_array * sma, struct sembuf * so
                        un->semadj[sop->sem_num] -= sop->sem_op;
                sop--;
        }
-
-       /*
-        * our operation is going to succeed, do any list splicing
-        * required so that we can try to wakeup people waiting on the
-        * sems we've changed.
-        */
-       for (sop = sops; sop < sops + nsops; sop++) {
-               /* if there are duplicate sem_nums in the list
-                * we only want to process the first one
-                */
-               if (sop != sops && last == sop->sem_num)
-                       continue;
-
-               curr = sma->sem_base + sop->sem_num;
-               if (sop->sem_op)
-                       copy_sem_queue(curr->semval, sop->sem_num,
-                                      &curr->sem_pending, pending);
-               spin_unlock(&curr->lock);
-               last = sop->sem_num;
-       }
-
+       
        return 0;
 
 out_of_range:
@@ -493,32 +391,15 @@ out_of_range:
        goto undo;
 
 would_block:
-       if (sop->sem_flg & IPC_NOWAIT) {
+       if (sop->sem_flg & IPC_NOWAIT)
                result = -EAGAIN;
-               if (*blocker) {
-                       /*
-                        * the blocker doesn't put itself on any
-                        * list for -EAGAIN, unlock it here
-                        */
-                       spin_unlock(&(*blocker)->lock);
-                       *blocker = NULL;
-               }
-       } else
+       else
                result = 1;
 
 undo:
        sop--;
        while (sop >= sops) {
-               curr = sma->sem_base + sop->sem_num;
-
-               curr->semval -= sop->sem_op;
-               /* we leave the blocker locked, and we make sure not
-                * to unlock duplicates in the list twice
-                */
-               if (curr != *blocker &&
-                   (sop == sops || (sop - 1)->sem_num != sop->sem_num)) {
-                       spin_unlock(&curr->lock);
-               }
+               sma->sem_base[sop->sem_num].semval -= sop->sem_op;
                sop--;
        }
 
@@ -544,7 +425,7 @@ static void wake_up_sem_queue_prepare(struct list_head *pt,
        q->status = IN_WAKEUP;
        q->pid = error;
 
-       list_add_tail(&q->list, pt);
+       list_add_tail(&q->simple_list, pt);
 }
 
 /**
@@ -562,7 +443,7 @@ static void wake_up_sem_queue_do(struct list_head *pt)
        int did_something;
 
        did_something = !list_empty(pt);
-       list_for_each_entry_safe(q, t, pt, list) {
+       list_for_each_entry_safe(q, t, pt, simple_list) {
                wake_up_process(q->sleeper);
                /* q can disappear immediately after writing q->status. */
                smp_wmb();
@@ -572,70 +453,153 @@ static void wake_up_sem_queue_do(struct list_head *pt)
                preempt_enable();
 }
 
+static void unlink_queue(struct sem_array *sma, struct sem_queue *q)
+{
+       list_del(&q->list);
+       if (q->nsops == 1)
+               list_del(&q->simple_list);
+       else
+               sma->complex_count--;
+}
+
+/** check_restart(sma, q)
+ * @sma: semaphore array
+ * @q: the operation that just completed
+ *
+ * update_queue is O(N^2) when it restarts scanning the whole queue of
+ * waiting operations. Therefore this function checks if the restart is
+ * really necessary. It is called after a previously waiting operation
+ * was completed.
+ */
+static int check_restart(struct sem_array *sma, struct sem_queue *q)
+{
+       struct sem *curr;
+       struct sem_queue *h;
+
+       /* if the operation didn't modify the array, then no restart */
+       if (q->alter == 0)
+               return 0;
+
+       /* pending complex operations are too difficult to analyse */
+       if (sma->complex_count)
+               return 1;
+
+       /* we were a sleeping complex operation. Too difficult */
+       if (q->nsops > 1)
+               return 1;
+
+       curr = sma->sem_base + q->sops[0].sem_num;
+
+       /* No-one waits on this queue */
+       if (list_empty(&curr->sem_pending))
+               return 0;
+
+       /* the new semaphore value */
+       if (curr->semval) {
+               /* It is impossible that someone waits for the new value:
+                * - q is a previously sleeping simple operation that
+                *   altered the array. It must be a decrement, because
+                *   simple increments never sleep.
+                * - The value is not 0, thus wait-for-zero won't proceed.
+                * - If there are older (higher priority) decrements
+                *   in the queue, then they have observed the original
+                *   semval value and couldn't proceed. The operation
+                *   decremented to value - thus they won't proceed either.
+                */
+               BUG_ON(q->sops[0].sem_op >= 0);
+               return 0;
+       }
+       /*
+        * semval is 0. Check if there are wait-for-zero semops.
+        * They must be the first entries in the per-semaphore simple queue
+        */
+       h = list_first_entry(&curr->sem_pending, struct sem_queue, simple_list);
+       BUG_ON(h->nsops != 1);
+       BUG_ON(h->sops[0].sem_num != q->sops[0].sem_num);
+
+       /* Yes, there is a wait-for-zero semop. Restart */
+       if (h->sops[0].sem_op == 0)
+               return 1;
+
+       /* Again - no-one is waiting for the new value. */
+       return 0;
+}
+
+
 /**
  * update_queue(sma, semnum): Look for tasks that can be completed.
  * @sma: semaphore array.
+ * @semnum: semaphore that was modified.
  * @pt: list head for the tasks that must be woken up.
- * @pending_list: list of struct sem_queues to try
  *
  * update_queue must be called after a semaphore in a semaphore array
- * was modified.
- *
+ * was modified. If multiple semaphore were modified, then @semnum
+ * must be set to -1.
  * The tasks that must be woken up are added to @pt. The return code
  * is stored in q->pid.
  * The function return 1 if at least one semop was completed successfully.
  */
-static int update_queue(struct sem_array *sma, struct list_head *pt,
-                       struct list_head *pending_list)
+static int update_queue(struct sem_array *sma, int semnum, struct list_head *pt)
 {
        struct sem_queue *q;
-       LIST_HEAD(new_pending);
-       LIST_HEAD(work_list);
+       struct list_head *walk;
+       struct list_head *pending_list;
+       int offset;
        int semop_completed = 0;
 
-       /*
-        * this seems strange, but what we want to do is process everything
-        * on the pending list, and then process any queues that have a chance
-        * to finish because of processing the pending list.
-        *
-        * So, we send new_pending to try_atomic_semop each time, and it
-        * splices any additional queues we have to try into new_pending.
-        * When the work list is empty, we splice new_pending into the
-        * work list and loop again.
-        *
-        * At the end of the whole thing, after we've built the largest
-        * possible list of tasks to wake up, we wake them in bulk.
+       /* if there are complex operations around, then knowing the semaphore
+        * that was modified doesn't help us. Assume that multiple semaphores
+        * were modified.
         */
-       list_splice_init(pending_list, &work_list);
-again:
-       while (!list_empty(&work_list)) {
-               struct sem *blocker;
-               int error;
+       if (sma->complex_count)
+               semnum = -1;
+
+       if (semnum == -1) {
+               pending_list = &sma->sem_pending;
+               offset = offsetof(struct sem_queue, list);
+       } else {
+               pending_list = &sma->sem_base[semnum].sem_pending;
+               offset = offsetof(struct sem_queue, simple_list);
+       }
 
-               q = list_entry(work_list.next, struct sem_queue, list);
-               list_del_init(&q->list);
+again:
+       walk = pending_list->next;
+       while (walk != pending_list) {
+               int error, restart;
+
+               q = (struct sem_queue *)((char *)walk - offset);
+               walk = walk->next;
+
+               /* If we are scanning the single sop, per-semaphore list of
+                * one semaphore and that semaphore is 0, then it is not
+                * necessary to scan the "alter" entries: simple increments
+                * that affect only one entry succeed immediately and cannot
+                * be in the  per semaphore pending queue, and decrements
+                * cannot be successful if the value is already 0.
+                */
+               if (semnum != -1 && sma->sem_base[semnum].semval == 0 &&
+                               q->alter)
+                       break;
 
-               blocker = NULL;
                error = try_atomic_semop(sma, q->sops, q->nsops,
-                                        q->undo, q->pid, &new_pending,
-                                        &blocker);
+                                        q->undo, q->pid);
 
                /* Does q->sleeper still need to sleep? */
-               if (error > 0) {
-                       list_add_tail(&q->list, &blocker->sem_pending);
-                       spin_unlock(&blocker->lock);
+               if (error > 0)
                        continue;
-               }
 
-               if (!error)
+               unlink_queue(sma, q);
+
+               if (error) {
+                       restart = 0;
+               } else {
                        semop_completed = 1;
+                       restart = check_restart(sma, q);
+               }
 
                wake_up_sem_queue_prepare(pt, q, error);
-
-               if (!list_empty(&new_pending)) {
-                       list_splice_init(&new_pending, &work_list);
+               if (restart)
                        goto again;
-               }
        }
        return semop_completed;
 }
@@ -654,20 +618,25 @@ again:
  * responsible for calling wake_up_sem_queue_do(@pt).
  * It is safe to perform this call after dropping all locks.
  */
-static void do_smart_update(struct sem_array *sma, struct sembuf *sops,
-                           int nsops, int otime, struct list_head *pt,
-                           struct list_head *pending_list)
+static void do_smart_update(struct sem_array *sma, struct sembuf *sops, int nsops,
+                       int otime, struct list_head *pt)
 {
        int i;
 
+       if (sma->complex_count || sops == NULL) {
+               if (update_queue(sma, -1, pt))
+                       otime = 1;
+               goto done;
+       }
+
        for (i = 0; i < nsops; i++) {
                if (sops[i].sem_op > 0 ||
                        (sops[i].sem_op < 0 &&
                                sma->sem_base[sops[i].sem_num].semval == 0))
-                       if (update_queue(sma, pt, pending_list))
+                       if (update_queue(sma, sops[i].sem_num, pt))
                                otime = 1;
        }
-
+done:
        if (otime)
                sma->sem_otime = get_seconds();
 }
@@ -686,11 +655,9 @@ static int count_semncnt (struct sem_array * sma, ushort semnum)
 {
        int semncnt;
        struct sem_queue * q;
-       struct sem *curr;
 
-       curr = &sma->sem_base[semnum];
        semncnt = 0;
-       list_for_each_entry(q, &curr->sem_pending, list) {
+       list_for_each_entry(q, &sma->sem_pending, list) {
                struct sembuf * sops = q->sops;
                int nsops = q->nsops;
                int i;
@@ -707,12 +674,9 @@ static int count_semzcnt (struct sem_array * sma, ushort semnum)
 {
        int semzcnt;
        struct sem_queue * q;
-       struct sem *curr;
-
-       curr = &sma->sem_base[semnum];
 
        semzcnt = 0;
-       list_for_each_entry(q, &curr->sem_pending, list) {
+       list_for_each_entry(q, &sma->sem_pending, list) {
                struct sembuf * sops = q->sops;
                int nsops = q->nsops;
                int i;
@@ -741,7 +705,6 @@ static void freeary(struct ipc_namespace *ns, struct kern_ipc_perm *ipcp)
        struct sem_queue *q, *tq;
        struct sem_array *sma = container_of(ipcp, struct sem_array, sem_perm);
        struct list_head tasks;
-       int i;
 
        /* Free the existing undo structures for this semaphore set.  */
        assert_spin_locked(&sma->sem_perm.lock);
@@ -754,15 +717,11 @@ static void freeary(struct ipc_namespace *ns, struct kern_ipc_perm *ipcp)
                call_rcu(&un->rcu, free_un);
        }
 
+       /* Wake up all pending processes and let them fail with EIDRM. */
        INIT_LIST_HEAD(&tasks);
-       for (i = 0; i < sma->sem_nsems; i++) {
-               struct sem *curr = sma->sem_base + i;
-               spin_lock(&curr->lock);
-               list_for_each_entry_safe(q, tq, &curr->sem_pending, list) {
-                       list_del_init(&q->list);
-                       wake_up_sem_queue_prepare(&tasks, q, -EIDRM);
-               }
-               spin_unlock(&curr->lock);
+       list_for_each_entry_safe(q, tq, &sma->sem_pending, list) {
+               unlink_queue(sma, q);
+               wake_up_sem_queue_prepare(&tasks, q, -EIDRM);
        }
 
        /* Remove the semaphore set from the IDR */
@@ -947,7 +906,6 @@ static int semctl_main(struct ipc_namespace *ns, int semid, int semnum,
        {
                int i;
                struct sem_undo *un;
-               LIST_HEAD(pending);
 
                sem_getref_and_unlock(sma);
 
@@ -979,15 +937,8 @@ static int semctl_main(struct ipc_namespace *ns, int semid, int semnum,
                        goto out_free;
                }
 
-               for (i = 0; i < nsems; i++) {
-                       curr = &sma->sem_base[i];
-
-                       spin_lock(&curr->lock);
-                       curr->semval = sem_io[i];
-                       copy_sem_queue(curr->semval, i,
-                                      &curr->sem_pending, &pending);
-                       spin_unlock(&curr->lock);
-               }
+               for (i = 0; i < nsems; i++)
+                       sma->sem_base[i].semval = sem_io[i];
 
                assert_spin_locked(&sma->sem_perm.lock);
                list_for_each_entry(un, &sma->list_id, list_id) {
@@ -996,7 +947,7 @@ static int semctl_main(struct ipc_namespace *ns, int semid, int semnum,
                }
                sma->sem_ctime = get_seconds();
                /* maybe some queued-up processes were waiting for this */
-               do_smart_update(sma, NULL, 0, 0, &tasks, &pending);
+               do_smart_update(sma, NULL, 0, 0, &tasks);
                err = 0;
                goto out_unlock;
        }
@@ -1025,7 +976,6 @@ static int semctl_main(struct ipc_namespace *ns, int semid, int semnum,
        {
                int val = arg.val;
                struct sem_undo *un;
-               LIST_HEAD(pending);
 
                err = -ERANGE;
                if (val > SEMVMX || val < 0)
@@ -1035,16 +985,11 @@ static int semctl_main(struct ipc_namespace *ns, int semid, int semnum,
                list_for_each_entry(un, &sma->list_id, list_id)
                        un->semadj[semnum] = 0;
 
-               spin_lock(&curr->lock);
                curr->semval = val;
-               copy_sem_queue(curr->semval, semnum,
-                              &curr->sem_pending, &pending);
                curr->sempid = task_tgid_vnr(current);
-               spin_unlock(&curr->lock);
-
                sma->sem_ctime = get_seconds();
                /* maybe some queued-up processes were waiting for this */
-               do_smart_update(sma, NULL, 0, 0, &tasks, &pending);
+               do_smart_update(sma, NULL, 0, 0, &tasks);
                err = 0;
                goto out_unlock;
        }
@@ -1342,67 +1287,6 @@ static int get_queue_result(struct sem_queue *q)
 }
 
 
-/*
- * since we take spinlocks on the semaphores based on the
- * values from userland, we have to sort them to make sure
- * we lock them in order
- */
-static int sembuf_compare(const void *a, const void *b)
-{
-       const struct sembuf *abuf = a;
-       const struct sembuf *bbuf = b;
-
-       if (abuf->sem_num < bbuf->sem_num)
-               return -1;
-       if (abuf->sem_num > bbuf->sem_num)
-               return 1;
-       return 0;
-}
-
-/*
- * if a process wakes up on its own while on a semaphore list
- * we have to take it off the list before that process can exit.
- *
- * We check all the semaphore's the sem_queue was trying to modify
- * and if we find the sem_queue, we remove it and return.
- *
- * If we don't find the sem_queue its because someone is about to
- * wake us up, and they have removed us from the list.
- * We schedule and try again in hopes that they do it real soon now.
- *
- * We check queue->status to detect if someone did actually manage to
- * wake us up.
- */
-static int remove_queue_from_lists(struct sem_array *sma,
-                                  struct sem_queue *queue)
-{
-       struct sembuf *sops = queue->sops;
-       struct sembuf *sop;
-       struct sem * curr;
-       struct sem_queue *test;
-
-again:
-       for (sop = sops; sop < sops + queue->nsops; sop++) {
-               curr = sma->sem_base + sop->sem_num;
-               spin_lock(&curr->lock);
-               list_for_each_entry(test, &curr->sem_pending, list) {
-                       if (test == queue) {
-                               list_del(&test->list);
-                               spin_unlock(&curr->lock);
-                               goto found;
-                       }
-               }
-               spin_unlock(&curr->lock);
-       }
-       if (queue->status == -EINTR) {
-               set_current_state(TASK_RUNNING);
-               schedule();
-               goto again;
-       }
-found:
-       return 0;
-}
-
 SYSCALL_DEFINE4(semtimedop, int, semid, struct sembuf __user *, tsops,
                unsigned, nsops, const struct timespec __user *, timeout)
 {
@@ -1416,8 +1300,6 @@ SYSCALL_DEFINE4(semtimedop, int, semid, struct sembuf __user *, tsops,
        unsigned long jiffies_left = 0;
        struct ipc_namespace *ns;
        struct list_head tasks;
-       struct sem *blocker = NULL;
-       LIST_HEAD(pending);
 
        ns = current->nsproxy->ipc_ns;
 
@@ -1457,14 +1339,6 @@ SYSCALL_DEFINE4(semtimedop, int, semid, struct sembuf __user *, tsops,
                        alter = 1;
        }
 
-       /*
-        * try_atomic_semop takes all the locks of all the semaphores in
-        * the sops array.  We have to make sure we don't deadlock if userland
-        * happens to send them out of order, so we sort them by semnum.
-        */
-       if (nsops > 1)
-               sort(sops, nsops, sizeof(*sops), sembuf_compare, NULL);
-
        if (undos) {
                un = find_alloc_undo(ns, semid);
                if (IS_ERR(un)) {
@@ -1521,23 +1395,12 @@ SYSCALL_DEFINE4(semtimedop, int, semid, struct sembuf __user *, tsops,
        if (error)
                goto out_unlock_free;
 
-       /*
-        * undos are scary, keep the lock if we have to deal with undos.
-        * Otherwise, drop the big fat ipc lock and use the fine grained
-        * per-semaphore locks instead.
-        */
-       if (!un)
-               sem_getref_and_unlock(sma);
-
-       error = try_atomic_semop (sma, sops, nsops, un, task_tgid_vnr(current),
-                                 &pending, &blocker);
+       error = try_atomic_semop (sma, sops, nsops, un, task_tgid_vnr(current));
        if (error <= 0) {
                if (alter && error == 0)
-                       do_smart_update(sma, sops, nsops, 1, &tasks, &pending);
-               if (un)
-                       goto out_unlock_free;
-               else
-                       goto out_putref;
+                       do_smart_update(sma, sops, nsops, 1, &tasks);
+
+               goto out_unlock_free;
        }
 
        /* We need to sleep on this operation, so we put the current
@@ -1549,23 +1412,28 @@ SYSCALL_DEFINE4(semtimedop, int, semid, struct sembuf __user *, tsops,
        queue.undo = un;
        queue.pid = task_tgid_vnr(current);
        queue.alter = alter;
-       queue.status = -EINTR;
-       queue.sleeper = current;
-       current->state = TASK_INTERRUPTIBLE;
-
-       /*
-        * we could be woken up at any time after we add ourselves to the
-        * blocker's list and unlock the spinlock.  So, all queue setup
-        * must be done before this point
-        */
        if (alter)
-               list_add_tail(&queue.list, &blocker->sem_pending);
+               list_add_tail(&queue.list, &sma->sem_pending);
        else
-               list_add(&queue.list, &blocker->sem_pending);
-       spin_unlock(&blocker->lock);
+               list_add(&queue.list, &sma->sem_pending);
 
-       if (un)
-               sem_getref_and_unlock(sma);
+       if (nsops == 1) {
+               struct sem *curr;
+               curr = &sma->sem_base[sops->sem_num];
+
+               if (alter)
+                       list_add_tail(&queue.simple_list, &curr->sem_pending);
+               else
+                       list_add(&queue.simple_list, &curr->sem_pending);
+       } else {
+               INIT_LIST_HEAD(&queue.simple_list);
+               sma->complex_count++;
+       }
+
+       queue.status = -EINTR;
+       queue.sleeper = current;
+       current->state = TASK_INTERRUPTIBLE;
+       sem_unlock(sma);
 
        if (timeout)
                jiffies_left = schedule_timeout(jiffies_left);
@@ -1574,32 +1442,7 @@ SYSCALL_DEFINE4(semtimedop, int, semid, struct sembuf __user *, tsops,
 
        error = get_queue_result(&queue);
 
-       /*
-        * we are lock free right here, and we could have timed out or
-        * gotten a signal, so we need to be really careful with how we
-        * play with queue.status.  It has three possible states:
-        *
-        * -EINTR, which means nobody has changed it since we slept.  This
-        * means we woke up on our own.
-        *
-        * IN_WAKEUP, someone is currently waking us up.  We need to loop
-        * here until they change it to the operation error value.  If
-        * we don't loop, our process could exit before they are done waking us
-        *
-        * operation error value: we've been properly woken up and can exit
-        * at any time.
-        *
-        * If queue.status is currently -EINTR, we are still being processed
-        * by the semtimedop core.  Someone either has us on a list head
-        * or is currently poking our queue struct.  We need to find that
-        * reference and remove it, which is what remove_queue_from_lists
-        * does.
-        *
-        * We always check for both -EINTR and IN_WAKEUP because we have no
-        * locks held.  Someone could change us from -EINTR to IN_WAKEUP at
-        * any time.
-        */
-       if (error != -EINTR && error != IN_WAKEUP) {
+       if (error != -EINTR) {
                /* fast path: update_queue already obtained all requested
                 * resources.
                 * Perform a smp_mb(): User space could assume that semop()
@@ -1609,41 +1452,44 @@ SYSCALL_DEFINE4(semtimedop, int, semid, struct sembuf __user *, tsops,
                 */
                smp_mb();
 
-               goto out_putref;
+               goto out_free;
        }
 
+       sma = sem_lock(ns, semid);
+
        /*
-        * Someone has a reference on us, lets find it.
+        * Wait until it's guaranteed that no wakeup_sem_queue_do() is ongoing.
         */
-       remove_queue_from_lists(sma, &queue);
-
-       /* check the status again in case we were woken up */
        error = get_queue_result(&queue);
-       while(unlikely(error == IN_WAKEUP)) {
-               cpu_relax();
-               error = get_queue_result(&queue);
+
+       /*
+        * Array removed? If yes, leave without sem_unlock().
+        */
+       if (IS_ERR(sma)) {
+               error = -EIDRM;
+               goto out_free;
        }
+
+
        /*
-        * at this point we know nobody can possibly wake us up, if error
-        * isn't -EINTR, the wakeup did happen and our semaphore operation is
-        * complete.  Otherwise, we return -EAGAIN.
+        * If queue.status != -EINTR we are woken up by another process.
+        * Leave without unlink_queue(), but with sem_unlock().
         */
-       if (error != -EINTR)
-               goto out_putref;
+
+       if (error != -EINTR) {
+               goto out_unlock_free;
+       }
 
        /*
         * If an interrupt occurred we have to clean up the queue
         */
        if (timeout && jiffies_left == 0)
                error = -EAGAIN;
-
-out_putref:
-       sem_putref(sma);
-       goto out_wakeup;
+       unlink_queue(sma, &queue);
 
 out_unlock_free:
        sem_unlock(sma);
-out_wakeup:
+
        wake_up_sem_queue_do(&tasks);
 out_free:
        if(sops != fast_sops)
@@ -1703,15 +1549,12 @@ void exit_sem(struct task_struct *tsk)
                return;
 
        for (;;) {
-               struct list_head pending;
                struct sem_array *sma;
                struct sem_undo *un;
                struct list_head tasks;
                int semid;
                int i;
 
-               INIT_LIST_HEAD(&pending);
-
                rcu_read_lock();
                un = list_entry_rcu(ulp->list_proc.next,
                                    struct sem_undo, list_proc);
@@ -1751,7 +1594,6 @@ void exit_sem(struct task_struct *tsk)
                for (i = 0; i < sma->sem_nsems; i++) {
                        struct sem * semaphore = &sma->sem_base[i];
                        if (un->semadj[i]) {
-                               spin_lock(&semaphore->lock);
                                semaphore->semval += un->semadj[i];
                                /*
                                 * Range checks of the new semaphore value,
@@ -1771,15 +1613,11 @@ void exit_sem(struct task_struct *tsk)
                                if (semaphore->semval > SEMVMX)
                                        semaphore->semval = SEMVMX;
                                semaphore->sempid = task_tgid_vnr(current);
-                               copy_sem_queue(semaphore->semval, i,
-                                              &semaphore->sem_pending,
-                                              &pending);
-                               spin_unlock(&semaphore->lock);
                        }
                }
                /* maybe some queued-up processes were waiting for this */
                INIT_LIST_HEAD(&tasks);
-               do_smart_update(sma, NULL, 0, 1, &tasks, &pending);
+               do_smart_update(sma, NULL, 0, 1, &tasks);
                sem_unlock(sma);
                wake_up_sem_queue_do(&tasks);