From: Chris Mason Date: Tue, 5 Jul 2011 19:07:26 +0000 (-0500) Subject: IPC lock reduction corners X-Git-Tag: v2.6.39-400.9.0~943 X-Git-Url: https://www.infradead.org/git/?a=commitdiff_plain;h=8385de45ab8e4b40eaf8341f599bf0c19b08bb64;p=users%2Fjedix%2Flinux-maple.git IPC lock reduction corners Deal properly with out of order semtimedop args and other corners. Signed-off-by: Chris Mason --- diff --git a/include/linux/sem.h b/include/linux/sem.h index 15da841d1161..e6865a238a76 100644 --- a/include/linux/sem.h +++ b/include/linux/sem.h @@ -87,7 +87,7 @@ struct task_struct; struct sem { int semval; /* current value */ int sempid; /* pid of last operation */ - spinlock_t lock; + spinlock_t lock; /* protects semval and sem_pending */ struct list_head sem_pending; /* pending single-sop operations */ }; @@ -100,6 +100,7 @@ struct sem_array { struct sem *sem_base; /* ptr to first semaphore in array */ struct list_head list_id; /* undo requests on this array */ int sem_nsems; /* no. of semaphores in array */ + atomic_long_t sequence; /* keeps FIFO track for waiters */ }; /* One queue for each sleeping process in the system. */ @@ -108,6 +109,7 @@ struct sem_queue { struct task_struct *sleeper; /* this process */ unsigned long sleep_cpu; struct sem_undo *undo; /* undo structure */ + long sequence; /* preserve FIFO as we hop lists */ int pid; /* process id of requesting process */ int status; /* completion status of operation */ struct sembuf *sops; /* array of pending operations */ diff --git a/ipc/sem.c b/ipc/sem.c index 2df779307fb5..3cf16b408f6d 100644 --- a/ipc/sem.c +++ b/ipc/sem.c @@ -107,6 +107,13 @@ static int sysvipc_sem_proc_show(struct seq_file *s, void *it); #define SEMMSL_FAST 256 /* 512 bytes on stack */ #define SEMOPM_FAST 64 /* ~ 372 bytes on stack */ +/* + * we use an atomic as a sequence counter for the + * semaphore waiters, which helps make 100% sure they + * are processed in fifo order. + */ +#define ATOMIC_SEQ_MAX (LONG_MAX - 20000) + /* * linked list protection: * sem_undo.id_next, @@ -305,6 +312,7 @@ static int newary(struct ipc_namespace *ns, struct ipc_params *params) INIT_LIST_HEAD(&sma->list_id); sma->sem_nsems = nsems; sma->sem_ctime = get_seconds(); + atomic_long_set(&sma->sequence, 1); sem_unlock(sma); return sma->sem_perm.id; @@ -437,6 +445,7 @@ static noinline int try_atomic_semop (struct sem_array * sma, struct sembuf * so struct sembuf *sop; struct sem * curr; int last = 0; + int i; for (sop = sops; sop < sops + nsops; sop++) { curr = sma->sem_base + sop->sem_num; @@ -489,19 +498,24 @@ static noinline int try_atomic_semop (struct sem_array * sma, struct sembuf * so * required so that we can try to wakeup people waiting on the * sems we've changed. */ - for (sop = sops; sop < sops + nsops; sop++) { - /* if there are duplicate sem_nums in the list - * we only want to process the first one - */ - if (sop != sops && last == sop->sem_num) - continue; - + for (i = 0; i < nsops; i++) { + sop = sops + i; curr = sma->sem_base + sop->sem_num; + + /* + * if there are duplicates (very unlikely) it is safe + * to run copy_sem_queue more than once + */ if (sop->sem_op) copy_sem_queue(curr->semval, sop->sem_num, &curr->sem_pending, pending); - spin_unlock(&curr->lock); - last = sop->sem_num; + + /* + * make sure we don't unlock until the last sop for + * this sem_num + */ + if (i + 1 == nsops || sops[i + 1].sem_num != sop->sem_num) + spin_unlock(&curr->lock); } return 0; @@ -543,6 +557,24 @@ undo: return result; } +/* + * sorting helper for struct sem_queues by sequence number + */ +int list_reseq_comp(void *priv, struct list_head *a, struct list_head *b) +{ + struct sem_queue *qa; + struct sem_queue *qb; + + qa = list_entry(a, struct sem_queue, list); + qb = list_entry(b, struct sem_queue, list); + + if (qa->sequence < qb->sequence) + return -1; + if (qa->sequence > qb->sequence) + return 1; + return 0; +} + /** wake_up_sem_queue_prepare(q, error): Prepare wake-up * @q: queue entry that must be signaled * @error: Error value for the signal @@ -722,6 +754,48 @@ static void do_smart_update(struct sem_array *sma, struct sembuf *sops, } +/* + * when our sequence number wraps, we have to take all the operations waiting + * in this array and rebase their sequence numbers. This isn't too difficult, + * we pick a new base (1), sort them all based on their current sequence numbers + * and then go through the list and reindex them starting at 1 + * + * Since we remove them from their lists during the reindex, we have to use + * update_queue to retry all the operations and put them back into their proper + * place. + */ +unsigned long resequence_sops(struct sem_array *sma, struct sembuf *sops, + int nsops, struct list_head *pt) +{ + struct sembuf *sop; + struct sem * curr; + LIST_HEAD(pending); + struct sem_queue *q; + unsigned long seq = 1; + + /* collect all the pending sops into one list */ + for (sop = sops; sop < sops + nsops; sop++) { + curr = sma->sem_base + sop->sem_num; + spin_lock(&curr->lock); + list_splice_tail_init(&curr->sem_pending, &pending); + spin_unlock(&curr->lock); + } + + /* sort our private list */ + list_sort(NULL, &pending, list_reseq_comp); + + /* adjust all the seqs based to something small */ + list_for_each_entry(q, &pending, list) + q->sequence = seq++; + + /* scatter them all back to the appropriate per-semaphore list */ + update_queue(sma, pt, &pending); + + /* return the seq so we can update the semarray sequence number */ + return seq; +} + + /* The following counts are associated to each semaphore: * semncnt number of tasks waiting on semval being nonzero * semzcnt number of tasks waiting on semval being zero @@ -1405,9 +1479,82 @@ static int sembuf_compare(const void *a, const void *b) return -1; if (abuf->sem_num > bbuf->sem_num) return 1; + + return 0; +} + +struct sembuf_indexed { + struct sembuf buf; + int index; +}; + +/* + * since we take spinlocks on the semaphores based on the + * values from userland, we have to sort them to make sure + * we lock them in order. This sorting func takes + * into account an index field to make sure we preserve order + * of operations on the same semmnum. + */ +static int sembuf_dup_compare(const void *a, const void *b) +{ + const struct sembuf_indexed *abuf = a; + const struct sembuf_indexed *bbuf = b; + + if (abuf->buf.sem_num < bbuf->buf.sem_num) + return -1; + if (abuf->buf.sem_num > bbuf->buf.sem_num) + return 1; + + /* + * at this point we have two sembufs changing + * the same semaphore number. We want to make + * sure their order in the sembuf array stays constant + * relative to each other + */ + if (abuf->index < bbuf->index) + return -1; + if (abuf->index > bbuf->index) + return 1; return 0; } +/* + * this is only used when the sembuf array from userland has + * two operations on the same semnum. Our normal sorting + * routing will change the order of those operations, + * which isn't allowed. + * + * We handle this in the lamest possible way, which is to just + * duplicate the sops array into something we can easily sort + * and then copy the results back in the proper order. + * + * There are many many better ways to do this, but hopefully this + * one is simple. + */ +int sort_duplicate_sops(struct sembuf *sops, int nsops) +{ + struct sembuf_indexed *indexed; + int i; + + indexed = kmalloc(sizeof(*indexed) * nsops, GFP_KERNEL); + + if (!indexed) + return -ENOMEM; + + for (i = 0; i < nsops ; i++) { + indexed[i].buf = sops[i]; + indexed[i].index = i; + } + + sort(indexed, nsops, sizeof(*indexed), sembuf_dup_compare, NULL); + for (i = 0; i < nsops ; i++) + sops[i] = indexed[i].buf; + + kfree(indexed); + return 0; +} + + /* * if a process wakes up on its own while on a semaphore list * we have to take it off the list before that process can exit. @@ -1461,6 +1608,7 @@ SYSCALL_DEFINE4(semtimedop, int, semid, struct sembuf __user *, tsops, struct sembuf* sops = fast_sops, *sop; struct sem_undo *un; int undos = 0, alter = 0, max; + int duplicate_semnums = 0; struct sem_queue queue; unsigned long jiffies_left = 0; struct ipc_namespace *ns; @@ -1496,6 +1644,15 @@ SYSCALL_DEFINE4(semtimedop, int, semid, struct sembuf __user *, tsops, } jiffies_left = timespec_to_jiffies(&_timeout); } + + /* + * try_atomic_semop takes all the locks of all the semaphores in + * the sops array. We have to make sure we don't deadlock if userland + * happens to send them out of order, so we sort them by semnum. + */ + if (nsops > 1) + sort(sops, nsops, sizeof(*sops), sembuf_compare, NULL); + max = 0; for (sop = sops; sop < sops + nsops; sop++) { if (sop->sem_num >= max) @@ -1504,15 +1661,34 @@ SYSCALL_DEFINE4(semtimedop, int, semid, struct sembuf __user *, tsops, undos = 1; if (sop->sem_op != 0) alter = 1; + + /* duplicates mean we have to do extra work to make sure the + * operations are sane + */ + if (sop != sops && (sop-1)->sem_num == sop->sem_num) + duplicate_semnums = 1; } - /* - * try_atomic_semop takes all the locks of all the semaphores in - * the sops array. We have to make sure we don't deadlock if userland - * happens to send them out of order, so we sort them by semnum. + /* the application can request just about anything, and the expectation + * is that the ops will be done in the order they were sent from + * userland. Sorting the array changes this order, which is fine + * unless the array is changing a single semaphore more than once. + * + * This seems like a pretty crazy thing to do, so we make no attempt to + * do it quickly. But we do make sure to do it correctly by preserving + * the order of duplicate semaphore numbers with a special sorting + * function */ - if (nsops > 1) - sort(sops, nsops, sizeof(*sops), sembuf_compare, NULL); + if (duplicate_semnums) { + if (copy_from_user (sops, tsops, nsops * sizeof(*tsops))) { + error=-EFAULT; + goto out_free; + } + if (sort_duplicate_sops(sops, nsops)) { + error = -ENOMEM; + goto out_free; + } + } if (undos) { un = find_alloc_undo(ns, semid); @@ -1577,7 +1753,7 @@ SYSCALL_DEFINE4(semtimedop, int, semid, struct sembuf __user *, tsops, */ if (!un) sem_getref_and_read_unlock(sma); - +reseq_again: error = try_atomic_semop (sma, sops, nsops, un, task_tgid_vnr(current), &pending, &blocker); if (error <= 0) { @@ -1589,9 +1765,47 @@ SYSCALL_DEFINE4(semtimedop, int, semid, struct sembuf __user *, tsops, goto out_putref; } - /* We need to sleep on this operation, so we put the current + /* + * We need to sleep on this operation, so we put the current * task into the pending queue and go to sleep. + * + * The pending queue has a sequence number which we use to make + * sure that operations don't get starved as they hop from + * queue to queue. */ + if (atomic_long_read(&sma->sequence) >= ATOMIC_SEQ_MAX) + queue.sequence = ATOMIC_SEQ_MAX + 1; + else + queue.sequence = atomic_long_inc_return(&sma->sequence); + + if (queue.sequence >= ATOMIC_SEQ_MAX) { + spin_unlock(&blocker->lock); + + /* + * if we were the one that bumped to atomic_seq_max, + * then we get to resequence all the current waiters. + * Otherwise, just spin for a while until + * the reseq is complete + */ + if (queue.sequence == ATOMIC_SEQ_MAX) { + long new_seq = resequence_sops(sma, sops, nsops, + &tasks); + atomic_long_set(&sma->sequence, new_seq + 1); + } else { + while(atomic_long_read(&sma->sequence) > + ATOMIC_SEQ_MAX) { + schedule_timeout_interruptible(1); + } + } + + /* + * we had to drop our lock on the blocker, so we have to + * go back and try our op again. We won't come back here + * unless MAX_LONG - 200000 procs manage to race their way + * in while we goto + */ + goto reseq_again; + } queue.sops = sops; queue.nsops = nsops; @@ -1613,9 +1827,24 @@ SYSCALL_DEFINE4(semtimedop, int, semid, struct sembuf __user *, tsops, * blocker's list and unlock the spinlock. So, all queue setup * must be done before this point */ - if (alter) - list_add_tail(&queue.list, &blocker->sem_pending); - else + if (alter) { + struct sem_queue *pos; + + /* the sequence numbers allow us to make sure that our queue + * entry doesn't get starved by new entries being added by + * later sops. 99% of the time, we'll just add ourselves + * to the tail of the list with this loop. + */ + queue.list.next = NULL; + list_for_each_entry_reverse(pos, &blocker->sem_pending, list) { + if (pos->sequence < queue.sequence) { + list_add(&queue.list, &pos->list); + break; + } + } + if (!queue.list.next) + list_add(&queue.list, &blocker->sem_pending); + } else list_add(&queue.list, &blocker->sem_pending); spin_unlock(&blocker->lock); @@ -1654,7 +1883,7 @@ SYSCALL_DEFINE4(semtimedop, int, semid, struct sembuf __user *, tsops, * locks held. Someone could change us from -EINTR to IN_WAKEUP at * any time. */ - if (error != -EINTR && error != IN_WAKEUP) { + if (error != -EINTR) { /* fast path: update_queue already obtained all requested * resources. * Perform a smp_mb(): User space could assume that semop()