]> www.infradead.org Git - users/jedix/linux-maple.git/commitdiff
IPC lock reduction corners
authorChris Mason <chris.mason@oracle.com>
Tue, 5 Jul 2011 19:07:26 +0000 (14:07 -0500)
committerGuru Anbalagane <guru.anbalagane@oracle.com>
Wed, 24 Aug 2011 00:37:59 +0000 (17:37 -0700)
Deal properly with out of order semtimedop args and other corners.

Signed-off-by: Chris Mason <chris.mason@oracle.com>
include/linux/sem.h
ipc/sem.c

index 15da841d1161496c9069d9f5c0bca16d158de3a9..e6865a238a76023a99e3107414de87193a8c76ad 100644 (file)
@@ -87,7 +87,7 @@ struct task_struct;
 struct sem {
        int     semval;         /* current value */
        int     sempid;         /* pid of last operation */
-       spinlock_t              lock;
+       spinlock_t              lock; /* protects semval and sem_pending */
        struct list_head sem_pending; /* pending single-sop operations */
 };
 
@@ -100,6 +100,7 @@ struct sem_array {
        struct sem              *sem_base;      /* ptr to first semaphore in array */
        struct list_head        list_id;        /* undo requests on this array */
        int                     sem_nsems;      /* no. of semaphores in array */
+       atomic_long_t           sequence;       /* keeps FIFO track for waiters */
 };
 
 /* One queue for each sleeping process in the system. */
@@ -108,6 +109,7 @@ struct sem_queue {
        struct task_struct      *sleeper; /* this process */
        unsigned long           sleep_cpu;
        struct sem_undo         *undo;   /* undo structure */
+       long                    sequence; /* preserve FIFO as we hop lists */
        int                     pid;     /* process id of requesting process */
        int                     status;  /* completion status of operation */
        struct sembuf           *sops;   /* array of pending operations */
index 2df779307fb581decd07e68592788bd37473fc92..3cf16b408f6d85ff8374681fefffa447203067f5 100644 (file)
--- a/ipc/sem.c
+++ b/ipc/sem.c
@@ -107,6 +107,13 @@ static int sysvipc_sem_proc_show(struct seq_file *s, void *it);
 #define SEMMSL_FAST    256 /* 512 bytes on stack */
 #define SEMOPM_FAST    64  /* ~ 372 bytes on stack */
 
+/*
+ * we use an atomic as a sequence counter for the
+ * semaphore waiters, which helps make 100% sure they
+ * are processed in fifo order.
+ */
+#define ATOMIC_SEQ_MAX (LONG_MAX - 20000)
+
 /*
  * linked list protection:
  *     sem_undo.id_next,
@@ -305,6 +312,7 @@ static int newary(struct ipc_namespace *ns, struct ipc_params *params)
        INIT_LIST_HEAD(&sma->list_id);
        sma->sem_nsems = nsems;
        sma->sem_ctime = get_seconds();
+       atomic_long_set(&sma->sequence, 1);
        sem_unlock(sma);
 
        return sma->sem_perm.id;
@@ -437,6 +445,7 @@ static noinline int try_atomic_semop (struct sem_array * sma, struct sembuf * so
        struct sembuf *sop;
        struct sem * curr;
        int last = 0;
+       int i;
 
        for (sop = sops; sop < sops + nsops; sop++) {
                curr = sma->sem_base + sop->sem_num;
@@ -489,19 +498,24 @@ static noinline int try_atomic_semop (struct sem_array * sma, struct sembuf * so
         * required so that we can try to wakeup people waiting on the
         * sems we've changed.
         */
-       for (sop = sops; sop < sops + nsops; sop++) {
-               /* if there are duplicate sem_nums in the list
-                * we only want to process the first one
-                */
-               if (sop != sops && last == sop->sem_num)
-                       continue;
-
+       for (i = 0; i < nsops; i++) {
+               sop = sops + i;
                curr = sma->sem_base + sop->sem_num;
+
+               /*
+                * if there are duplicates (very unlikely) it is safe
+                * to run copy_sem_queue more than once
+                */
                if (sop->sem_op)
                        copy_sem_queue(curr->semval, sop->sem_num,
                                       &curr->sem_pending, pending);
-               spin_unlock(&curr->lock);
-               last = sop->sem_num;
+
+               /*
+                * make sure we don't unlock until the last sop for
+                * this sem_num
+                */
+               if (i + 1 == nsops || sops[i + 1].sem_num != sop->sem_num)
+                       spin_unlock(&curr->lock);
        }
 
        return 0;
@@ -543,6 +557,24 @@ undo:
        return result;
 }
 
+/*
+ * sorting helper for struct sem_queues by sequence number
+ */
+int list_reseq_comp(void *priv, struct list_head *a, struct list_head *b)
+{
+       struct sem_queue *qa;
+       struct sem_queue *qb;
+
+       qa = list_entry(a, struct sem_queue, list);
+       qb = list_entry(b, struct sem_queue, list);
+
+       if (qa->sequence < qb->sequence)
+               return -1;
+       if (qa->sequence > qb->sequence)
+               return 1;
+       return 0;
+}
+
 /** wake_up_sem_queue_prepare(q, error): Prepare wake-up
  * @q: queue entry that must be signaled
  * @error: Error value for the signal
@@ -722,6 +754,48 @@ static void do_smart_update(struct sem_array *sma, struct sembuf *sops,
 }
 
 
+/*
+ * when our sequence number wraps, we have to take all the operations waiting
+ * in this array and rebase their sequence numbers.  This isn't too difficult,
+ * we pick a new base (1), sort them all based on their current sequence numbers
+ * and then go through the list and reindex them starting at 1
+ *
+ * Since we remove them from their lists during the reindex, we have to use
+ * update_queue to retry all the operations and put them back into their proper
+ * place.
+ */
+unsigned long resequence_sops(struct sem_array *sma, struct sembuf *sops,
+                             int nsops, struct list_head *pt)
+{
+       struct sembuf *sop;
+       struct sem * curr;
+       LIST_HEAD(pending);
+       struct sem_queue *q;
+       unsigned long seq = 1;
+
+       /* collect all the pending sops into one list */
+       for (sop = sops; sop < sops + nsops; sop++) {
+               curr = sma->sem_base + sop->sem_num;
+               spin_lock(&curr->lock);
+               list_splice_tail_init(&curr->sem_pending, &pending);
+               spin_unlock(&curr->lock);
+       }
+
+       /* sort our private list */
+       list_sort(NULL, &pending, list_reseq_comp);
+
+       /* adjust all the seqs based to something small */
+       list_for_each_entry(q, &pending, list)
+               q->sequence = seq++;
+
+       /* scatter them all back to the appropriate per-semaphore list */
+       update_queue(sma, pt, &pending);
+
+       /* return the seq so we can update the semarray sequence number */
+       return seq;
+}
+
+
 /* The following counts are associated to each semaphore:
  *   semncnt        number of tasks waiting on semval being nonzero
  *   semzcnt        number of tasks waiting on semval being zero
@@ -1405,9 +1479,82 @@ static int sembuf_compare(const void *a, const void *b)
                return -1;
        if (abuf->sem_num > bbuf->sem_num)
                return 1;
+
+       return 0;
+}
+
+struct sembuf_indexed {
+       struct sembuf buf;
+       int index;
+};
+
+/*
+ * since we take spinlocks on the semaphores based on the
+ * values from userland, we have to sort them to make sure
+ * we lock them in order.  This sorting func takes
+ * into account an index field to make sure we preserve order
+ * of operations on the same semmnum.
+ */
+static int sembuf_dup_compare(const void *a, const void *b)
+{
+       const struct sembuf_indexed *abuf = a;
+       const struct sembuf_indexed *bbuf = b;
+
+       if (abuf->buf.sem_num < bbuf->buf.sem_num)
+               return -1;
+       if (abuf->buf.sem_num > bbuf->buf.sem_num)
+               return 1;
+
+       /*
+        * at this point we have two sembufs changing
+        * the same semaphore number.  We want to make
+        * sure their order in the sembuf array stays constant
+        * relative to each other
+        */
+       if (abuf->index < bbuf->index)
+               return -1;
+       if (abuf->index > bbuf->index)
+               return 1;
        return 0;
 }
 
+/*
+ * this is only used when the sembuf array from userland has
+ * two operations on the same semnum.  Our normal sorting
+ * routing will change the order of those operations,
+ * which isn't allowed.
+ *
+ * We handle this in the lamest possible way, which is to just
+ * duplicate the sops array into something we can easily sort
+ * and then copy the results back in the proper order.
+ *
+ * There are many many better ways to do this, but hopefully this
+ * one is simple.
+ */
+int sort_duplicate_sops(struct sembuf *sops, int nsops)
+{
+       struct sembuf_indexed *indexed;
+       int i;
+
+       indexed = kmalloc(sizeof(*indexed) * nsops, GFP_KERNEL);
+
+       if (!indexed)
+               return -ENOMEM;
+
+       for (i = 0; i < nsops ; i++) {
+               indexed[i].buf = sops[i];
+               indexed[i].index = i;
+       }
+
+       sort(indexed, nsops, sizeof(*indexed), sembuf_dup_compare, NULL);
+       for (i = 0; i < nsops ; i++)
+               sops[i] = indexed[i].buf;
+
+       kfree(indexed);
+       return 0;
+}
+
+
 /*
  * if a process wakes up on its own while on a semaphore list
  * we have to take it off the list before that process can exit.
@@ -1461,6 +1608,7 @@ SYSCALL_DEFINE4(semtimedop, int, semid, struct sembuf __user *, tsops,
        struct sembuf* sops = fast_sops, *sop;
        struct sem_undo *un;
        int undos = 0, alter = 0, max;
+       int duplicate_semnums = 0;
        struct sem_queue queue;
        unsigned long jiffies_left = 0;
        struct ipc_namespace *ns;
@@ -1496,6 +1644,15 @@ SYSCALL_DEFINE4(semtimedop, int, semid, struct sembuf __user *, tsops,
                }
                jiffies_left = timespec_to_jiffies(&_timeout);
        }
+
+       /*
+        * try_atomic_semop takes all the locks of all the semaphores in
+        * the sops array.  We have to make sure we don't deadlock if userland
+        * happens to send them out of order, so we sort them by semnum.
+        */
+       if (nsops > 1)
+               sort(sops, nsops, sizeof(*sops), sembuf_compare, NULL);
+
        max = 0;
        for (sop = sops; sop < sops + nsops; sop++) {
                if (sop->sem_num >= max)
@@ -1504,15 +1661,34 @@ SYSCALL_DEFINE4(semtimedop, int, semid, struct sembuf __user *, tsops,
                        undos = 1;
                if (sop->sem_op != 0)
                        alter = 1;
+
+               /* duplicates mean we have to do extra work to make sure the
+                * operations are sane
+                */
+               if (sop != sops && (sop-1)->sem_num == sop->sem_num)
+                       duplicate_semnums = 1;
        }
 
-       /*
-        * try_atomic_semop takes all the locks of all the semaphores in
-        * the sops array.  We have to make sure we don't deadlock if userland
-        * happens to send them out of order, so we sort them by semnum.
+       /* the application can request just about anything, and the expectation
+        * is that the ops will be done in the order they were sent from
+        * userland.  Sorting the array changes this order, which is fine
+        * unless the array is changing a single semaphore more than once.
+        *
+        * This seems like a pretty crazy thing to do, so we make no attempt to
+        * do it quickly. But we do make sure to do it correctly by preserving
+        * the order of duplicate semaphore numbers with a special sorting
+        * function
         */
-       if (nsops > 1)
-               sort(sops, nsops, sizeof(*sops), sembuf_compare, NULL);
+       if (duplicate_semnums) {
+               if (copy_from_user (sops, tsops, nsops * sizeof(*tsops))) {
+                       error=-EFAULT;
+                       goto out_free;
+               }
+               if (sort_duplicate_sops(sops, nsops)) {
+                       error = -ENOMEM;
+                       goto out_free;
+               }
+       }
 
        if (undos) {
                un = find_alloc_undo(ns, semid);
@@ -1577,7 +1753,7 @@ SYSCALL_DEFINE4(semtimedop, int, semid, struct sembuf __user *, tsops,
         */
        if (!un)
                sem_getref_and_read_unlock(sma);
-
+reseq_again:
        error = try_atomic_semop (sma, sops, nsops, un, task_tgid_vnr(current),
                                  &pending, &blocker);
        if (error <= 0) {
@@ -1589,9 +1765,47 @@ SYSCALL_DEFINE4(semtimedop, int, semid, struct sembuf __user *, tsops,
                        goto out_putref;
        }
 
-       /* We need to sleep on this operation, so we put the current
+       /*
+        * We need to sleep on this operation, so we put the current
         * task into the pending queue and go to sleep.
+        *
+        * The pending queue has a sequence number which we use to make
+        * sure that operations don't get starved as they hop from
+        * queue to queue.
         */
+       if (atomic_long_read(&sma->sequence) >= ATOMIC_SEQ_MAX)
+               queue.sequence = ATOMIC_SEQ_MAX + 1;
+       else
+               queue.sequence = atomic_long_inc_return(&sma->sequence);
+
+       if (queue.sequence >= ATOMIC_SEQ_MAX) {
+               spin_unlock(&blocker->lock);
+
+               /*
+                * if we were the one that bumped to atomic_seq_max,
+                * then we get to resequence all the current waiters.
+                * Otherwise, just spin for a while until
+                * the reseq is complete
+                */
+               if (queue.sequence == ATOMIC_SEQ_MAX) {
+                       long new_seq = resequence_sops(sma, sops, nsops,
+                                                      &tasks);
+                       atomic_long_set(&sma->sequence, new_seq + 1);
+               } else {
+                       while(atomic_long_read(&sma->sequence) >
+                             ATOMIC_SEQ_MAX) {
+                               schedule_timeout_interruptible(1);
+                       }
+               }
+
+               /*
+                * we had to drop our lock on the blocker, so we have to
+                * go back and try our op again.  We won't come back here
+                * unless MAX_LONG - 200000 procs manage to race their way
+                * in while we goto
+                */
+               goto reseq_again;
+       }
                
        queue.sops = sops;
        queue.nsops = nsops;
@@ -1613,9 +1827,24 @@ SYSCALL_DEFINE4(semtimedop, int, semid, struct sembuf __user *, tsops,
         * blocker's list and unlock the spinlock.  So, all queue setup
         * must be done before this point
         */
-       if (alter)
-               list_add_tail(&queue.list, &blocker->sem_pending);
-       else
+       if (alter) {
+               struct sem_queue *pos;
+
+               /* the sequence numbers allow us to make sure that our queue
+                * entry doesn't get starved by new entries being added by
+                * later sops.  99% of the time, we'll just add ourselves
+                * to the tail of the list with this loop.
+                */
+               queue.list.next = NULL;
+               list_for_each_entry_reverse(pos, &blocker->sem_pending, list) {
+                       if (pos->sequence < queue.sequence) {
+                               list_add(&queue.list, &pos->list);
+                               break;
+                       }
+               }
+               if (!queue.list.next)
+                       list_add(&queue.list, &blocker->sem_pending);
+       } else
                list_add(&queue.list, &blocker->sem_pending);
        spin_unlock(&blocker->lock);
 
@@ -1654,7 +1883,7 @@ SYSCALL_DEFINE4(semtimedop, int, semid, struct sembuf __user *, tsops,
         * locks held.  Someone could change us from -EINTR to IN_WAKEUP at
         * any time.
         */
-       if (error != -EINTR && error != IN_WAKEUP) {
+       if (error != -EINTR) {
                /* fast path: update_queue already obtained all requested
                 * resources.
                 * Perform a smp_mb(): User space could assume that semop()