#define SEMMSL_FAST 256 /* 512 bytes on stack */
#define SEMOPM_FAST 64 /* ~ 372 bytes on stack */
+/*
+ * we use an atomic as a sequence counter for the
+ * semaphore waiters, which helps make 100% sure they
+ * are processed in fifo order.
+ */
+#define ATOMIC_SEQ_MAX (LONG_MAX - 20000)
+
/*
* linked list protection:
* sem_undo.id_next,
INIT_LIST_HEAD(&sma->list_id);
sma->sem_nsems = nsems;
sma->sem_ctime = get_seconds();
+ atomic_long_set(&sma->sequence, 1);
sem_unlock(sma);
return sma->sem_perm.id;
struct sembuf *sop;
struct sem * curr;
int last = 0;
+ int i;
for (sop = sops; sop < sops + nsops; sop++) {
curr = sma->sem_base + sop->sem_num;
* required so that we can try to wakeup people waiting on the
* sems we've changed.
*/
- for (sop = sops; sop < sops + nsops; sop++) {
- /* if there are duplicate sem_nums in the list
- * we only want to process the first one
- */
- if (sop != sops && last == sop->sem_num)
- continue;
-
+ for (i = 0; i < nsops; i++) {
+ sop = sops + i;
curr = sma->sem_base + sop->sem_num;
+
+ /*
+ * if there are duplicates (very unlikely) it is safe
+ * to run copy_sem_queue more than once
+ */
if (sop->sem_op)
copy_sem_queue(curr->semval, sop->sem_num,
&curr->sem_pending, pending);
- spin_unlock(&curr->lock);
- last = sop->sem_num;
+
+ /*
+ * make sure we don't unlock until the last sop for
+ * this sem_num
+ */
+ if (i + 1 == nsops || sops[i + 1].sem_num != sop->sem_num)
+ spin_unlock(&curr->lock);
}
return 0;
return result;
}
+/*
+ * sorting helper for struct sem_queues by sequence number
+ */
+int list_reseq_comp(void *priv, struct list_head *a, struct list_head *b)
+{
+ struct sem_queue *qa;
+ struct sem_queue *qb;
+
+ qa = list_entry(a, struct sem_queue, list);
+ qb = list_entry(b, struct sem_queue, list);
+
+ if (qa->sequence < qb->sequence)
+ return -1;
+ if (qa->sequence > qb->sequence)
+ return 1;
+ return 0;
+}
+
/** wake_up_sem_queue_prepare(q, error): Prepare wake-up
* @q: queue entry that must be signaled
* @error: Error value for the signal
}
+/*
+ * when our sequence number wraps, we have to take all the operations waiting
+ * in this array and rebase their sequence numbers. This isn't too difficult,
+ * we pick a new base (1), sort them all based on their current sequence numbers
+ * and then go through the list and reindex them starting at 1
+ *
+ * Since we remove them from their lists during the reindex, we have to use
+ * update_queue to retry all the operations and put them back into their proper
+ * place.
+ */
+unsigned long resequence_sops(struct sem_array *sma, struct sembuf *sops,
+ int nsops, struct list_head *pt)
+{
+ struct sembuf *sop;
+ struct sem * curr;
+ LIST_HEAD(pending);
+ struct sem_queue *q;
+ unsigned long seq = 1;
+
+ /* collect all the pending sops into one list */
+ for (sop = sops; sop < sops + nsops; sop++) {
+ curr = sma->sem_base + sop->sem_num;
+ spin_lock(&curr->lock);
+ list_splice_tail_init(&curr->sem_pending, &pending);
+ spin_unlock(&curr->lock);
+ }
+
+ /* sort our private list */
+ list_sort(NULL, &pending, list_reseq_comp);
+
+ /* adjust all the seqs based to something small */
+ list_for_each_entry(q, &pending, list)
+ q->sequence = seq++;
+
+ /* scatter them all back to the appropriate per-semaphore list */
+ update_queue(sma, pt, &pending);
+
+ /* return the seq so we can update the semarray sequence number */
+ return seq;
+}
+
+
/* The following counts are associated to each semaphore:
* semncnt number of tasks waiting on semval being nonzero
* semzcnt number of tasks waiting on semval being zero
return -1;
if (abuf->sem_num > bbuf->sem_num)
return 1;
+
+ return 0;
+}
+
+struct sembuf_indexed {
+ struct sembuf buf;
+ int index;
+};
+
+/*
+ * since we take spinlocks on the semaphores based on the
+ * values from userland, we have to sort them to make sure
+ * we lock them in order. This sorting func takes
+ * into account an index field to make sure we preserve order
+ * of operations on the same semmnum.
+ */
+static int sembuf_dup_compare(const void *a, const void *b)
+{
+ const struct sembuf_indexed *abuf = a;
+ const struct sembuf_indexed *bbuf = b;
+
+ if (abuf->buf.sem_num < bbuf->buf.sem_num)
+ return -1;
+ if (abuf->buf.sem_num > bbuf->buf.sem_num)
+ return 1;
+
+ /*
+ * at this point we have two sembufs changing
+ * the same semaphore number. We want to make
+ * sure their order in the sembuf array stays constant
+ * relative to each other
+ */
+ if (abuf->index < bbuf->index)
+ return -1;
+ if (abuf->index > bbuf->index)
+ return 1;
return 0;
}
+/*
+ * this is only used when the sembuf array from userland has
+ * two operations on the same semnum. Our normal sorting
+ * routing will change the order of those operations,
+ * which isn't allowed.
+ *
+ * We handle this in the lamest possible way, which is to just
+ * duplicate the sops array into something we can easily sort
+ * and then copy the results back in the proper order.
+ *
+ * There are many many better ways to do this, but hopefully this
+ * one is simple.
+ */
+int sort_duplicate_sops(struct sembuf *sops, int nsops)
+{
+ struct sembuf_indexed *indexed;
+ int i;
+
+ indexed = kmalloc(sizeof(*indexed) * nsops, GFP_KERNEL);
+
+ if (!indexed)
+ return -ENOMEM;
+
+ for (i = 0; i < nsops ; i++) {
+ indexed[i].buf = sops[i];
+ indexed[i].index = i;
+ }
+
+ sort(indexed, nsops, sizeof(*indexed), sembuf_dup_compare, NULL);
+ for (i = 0; i < nsops ; i++)
+ sops[i] = indexed[i].buf;
+
+ kfree(indexed);
+ return 0;
+}
+
+
/*
* if a process wakes up on its own while on a semaphore list
* we have to take it off the list before that process can exit.
struct sembuf* sops = fast_sops, *sop;
struct sem_undo *un;
int undos = 0, alter = 0, max;
+ int duplicate_semnums = 0;
struct sem_queue queue;
unsigned long jiffies_left = 0;
struct ipc_namespace *ns;
}
jiffies_left = timespec_to_jiffies(&_timeout);
}
+
+ /*
+ * try_atomic_semop takes all the locks of all the semaphores in
+ * the sops array. We have to make sure we don't deadlock if userland
+ * happens to send them out of order, so we sort them by semnum.
+ */
+ if (nsops > 1)
+ sort(sops, nsops, sizeof(*sops), sembuf_compare, NULL);
+
max = 0;
for (sop = sops; sop < sops + nsops; sop++) {
if (sop->sem_num >= max)
undos = 1;
if (sop->sem_op != 0)
alter = 1;
+
+ /* duplicates mean we have to do extra work to make sure the
+ * operations are sane
+ */
+ if (sop != sops && (sop-1)->sem_num == sop->sem_num)
+ duplicate_semnums = 1;
}
- /*
- * try_atomic_semop takes all the locks of all the semaphores in
- * the sops array. We have to make sure we don't deadlock if userland
- * happens to send them out of order, so we sort them by semnum.
+ /* the application can request just about anything, and the expectation
+ * is that the ops will be done in the order they were sent from
+ * userland. Sorting the array changes this order, which is fine
+ * unless the array is changing a single semaphore more than once.
+ *
+ * This seems like a pretty crazy thing to do, so we make no attempt to
+ * do it quickly. But we do make sure to do it correctly by preserving
+ * the order of duplicate semaphore numbers with a special sorting
+ * function
*/
- if (nsops > 1)
- sort(sops, nsops, sizeof(*sops), sembuf_compare, NULL);
+ if (duplicate_semnums) {
+ if (copy_from_user (sops, tsops, nsops * sizeof(*tsops))) {
+ error=-EFAULT;
+ goto out_free;
+ }
+ if (sort_duplicate_sops(sops, nsops)) {
+ error = -ENOMEM;
+ goto out_free;
+ }
+ }
if (undos) {
un = find_alloc_undo(ns, semid);
*/
if (!un)
sem_getref_and_read_unlock(sma);
-
+reseq_again:
error = try_atomic_semop (sma, sops, nsops, un, task_tgid_vnr(current),
&pending, &blocker);
if (error <= 0) {
goto out_putref;
}
- /* We need to sleep on this operation, so we put the current
+ /*
+ * We need to sleep on this operation, so we put the current
* task into the pending queue and go to sleep.
+ *
+ * The pending queue has a sequence number which we use to make
+ * sure that operations don't get starved as they hop from
+ * queue to queue.
*/
+ if (atomic_long_read(&sma->sequence) >= ATOMIC_SEQ_MAX)
+ queue.sequence = ATOMIC_SEQ_MAX + 1;
+ else
+ queue.sequence = atomic_long_inc_return(&sma->sequence);
+
+ if (queue.sequence >= ATOMIC_SEQ_MAX) {
+ spin_unlock(&blocker->lock);
+
+ /*
+ * if we were the one that bumped to atomic_seq_max,
+ * then we get to resequence all the current waiters.
+ * Otherwise, just spin for a while until
+ * the reseq is complete
+ */
+ if (queue.sequence == ATOMIC_SEQ_MAX) {
+ long new_seq = resequence_sops(sma, sops, nsops,
+ &tasks);
+ atomic_long_set(&sma->sequence, new_seq + 1);
+ } else {
+ while(atomic_long_read(&sma->sequence) >
+ ATOMIC_SEQ_MAX) {
+ schedule_timeout_interruptible(1);
+ }
+ }
+
+ /*
+ * we had to drop our lock on the blocker, so we have to
+ * go back and try our op again. We won't come back here
+ * unless MAX_LONG - 200000 procs manage to race their way
+ * in while we goto
+ */
+ goto reseq_again;
+ }
queue.sops = sops;
queue.nsops = nsops;
* blocker's list and unlock the spinlock. So, all queue setup
* must be done before this point
*/
- if (alter)
- list_add_tail(&queue.list, &blocker->sem_pending);
- else
+ if (alter) {
+ struct sem_queue *pos;
+
+ /* the sequence numbers allow us to make sure that our queue
+ * entry doesn't get starved by new entries being added by
+ * later sops. 99% of the time, we'll just add ourselves
+ * to the tail of the list with this loop.
+ */
+ queue.list.next = NULL;
+ list_for_each_entry_reverse(pos, &blocker->sem_pending, list) {
+ if (pos->sequence < queue.sequence) {
+ list_add(&queue.list, &pos->list);
+ break;
+ }
+ }
+ if (!queue.list.next)
+ list_add(&queue.list, &blocker->sem_pending);
+ } else
list_add(&queue.list, &blocker->sem_pending);
spin_unlock(&blocker->lock);
* locks held. Someone could change us from -EINTR to IN_WAKEUP at
* any time.
*/
- if (error != -EINTR && error != IN_WAKEUP) {
+ if (error != -EINTR) {
/* fast path: update_queue already obtained all requested
* resources.
* Perform a smp_mb(): User space could assume that semop()