__u32 autoclose;
        __u8 nodelay;
        __u8 disable_fragments;
-       __u8 pd_mode;
        __u8 v4mapped;
+       __u8 frag_interleave;
        __u32 adaptation_ind;
 
+       atomic_t pd_mode;
        /* Receive to here while partial delivery is in effect. */
        struct sk_buff_head pd_lobby;
 };
 
 void sctp_ulpq_abort_pd(struct sctp_ulpq *, gfp_t);
 
 /* Clear the partial data delivery condition on this socket. */
-int sctp_clear_pd(struct sock *sk);
+int sctp_clear_pd(struct sock *sk, struct sctp_association *asoc);
 
 /* Skip over an SSN. */
 void sctp_ulpq_skip(struct sctp_ulpq *ulpq, __u16 sid, __u16 ssn);
 
 #define SCTP_DELAYED_ACK_TIME SCTP_DELAYED_ACK_TIME
        SCTP_CONTEXT,   /* Receive Context */
 #define SCTP_CONTEXT SCTP_CONTEXT
+       SCTP_FRAGMENT_INTERLEAVE,
+#define SCTP_FRAGMENT_INTERLEAVE SCTP_FRAGMENT_INTERLEAVE
 
        /* Internal Socket Options. Some of the sctp library functions are 
         * implemented using these socket options.
        __u32                   spp_flags;
 } __attribute__((packed, aligned(4)));
 
-/* 7.1.24. Delayed Ack Timer (SCTP_DELAYED_ACK_TIME)
+/* 7.1.23. Delayed Ack Timer (SCTP_DELAYED_ACK_TIME)
  *
  *   This options will get or set the delayed ack timer.  The time is set
  *   in milliseconds.  If the assoc_id is 0, then this sets or gets the
 
        return 0;
 }
 
-/* 7.1.24. Delayed Ack Timer (SCTP_DELAYED_ACK_TIME)
+/* 7.1.23. Delayed Ack Timer (SCTP_DELAYED_ACK_TIME)
  *
  *   This options will get or set the delayed ack timer.  The time is set
  *   in milliseconds.  If the assoc_id is 0, then this sets or gets the
        return 0;
 }
 
+/*
+ * 7.1.24.  Get or set fragmented interleave (SCTP_FRAGMENT_INTERLEAVE)
+ *
+ * This options will at a minimum specify if the implementation is doing
+ * fragmented interleave.  Fragmented interleave, for a one to many
+ * socket, is when subsequent calls to receive a message may return
+ * parts of messages from different associations.  Some implementations
+ * may allow you to turn this value on or off.  If so, when turned off,
+ * no fragment interleave will occur (which will cause a head of line
+ * blocking amongst multiple associations sharing the same one to many
+ * socket).  When this option is turned on, then each receive call may
+ * come from a different association (thus the user must receive data
+ * with the extended calls (e.g. sctp_recvmsg) to keep track of which
+ * association each receive belongs to.
+ *
+ * This option takes a boolean value.  A non-zero value indicates that
+ * fragmented interleave is on.  A value of zero indicates that
+ * fragmented interleave is off.
+ *
+ * Note that it is important that an implementation that allows this
+ * option to be turned on, have it off by default.  Otherwise an unaware
+ * application using the one to many model may become confused and act
+ * incorrectly.
+ */
+static int sctp_setsockopt_fragment_interleave(struct sock *sk,
+                                              char __user *optval,
+                                              int optlen)
+{
+       int val;
+
+       if (optlen != sizeof(int))
+               return -EINVAL;
+       if (get_user(val, (int __user *)optval))
+               return -EFAULT;
+
+       sctp_sk(sk)->frag_interleave = (val == 0) ? 0 : 1;
+
+       return 0;
+}
+
 /* API 6.2 setsockopt(), getsockopt()
  *
  * Applications use setsockopt() and getsockopt() to set or retrieve
        case SCTP_CONTEXT:
                retval = sctp_setsockopt_context(sk, optval, optlen);
                break;
-
+       case SCTP_FRAGMENT_INTERLEAVE:
+               retval = sctp_setsockopt_fragment_interleave(sk, optval, optlen);
+               break;
        default:
                retval = -ENOPROTOOPT;
                break;
        sp->pf = sctp_get_pf_specific(sk->sk_family);
 
        /* Control variables for partial data delivery. */
-       sp->pd_mode           = 0;
+       atomic_set(&sp->pd_mode, 0);
        skb_queue_head_init(&sp->pd_lobby);
+       sp->frag_interleave = 0;
 
        /* Create a per socket endpoint structure.  Even if we
         * change the data structure relationships, this may still
        return 0;
 }
 
-/* 7.1.24. Delayed Ack Timer (SCTP_DELAYED_ACK_TIME)
+/* 7.1.23. Delayed Ack Timer (SCTP_DELAYED_ACK_TIME)
  *
  *   This options will get or set the delayed ack timer.  The time is set
  *   in milliseconds.  If the assoc_id is 0, then this sets or gets the
        return 0;
 }
 
+/*
+ * 7.1.24.  Get or set fragmented interleave (SCTP_FRAGMENT_INTERLEAVE)
+ * (chapter and verse is quoted at sctp_setsockopt_fragment_interleave())
+ */
+static int sctp_getsockopt_fragment_interleave(struct sock *sk, int len,
+                                              char __user *optval, int __user *optlen)
+{
+       int val;
+
+       if (len < sizeof(int))
+               return -EINVAL;
+
+       len = sizeof(int);
+
+       val = sctp_sk(sk)->frag_interleave;
+       if (put_user(len, optlen))
+               return -EFAULT;
+       if (copy_to_user(optval, &val, len))
+               return -EFAULT;
+
+       return 0;
+}
+
 SCTP_STATIC int sctp_getsockopt(struct sock *sk, int level, int optname,
                                char __user *optval, int __user *optlen)
 {
        case SCTP_CONTEXT:
                retval = sctp_getsockopt_context(sk, len, optval, optlen);
                break;
+       case SCTP_FRAGMENT_INTERLEAVE:
+               retval = sctp_getsockopt_fragment_interleave(sk, len, optval,
+                                                            optlen);
+               break;
        default:
                retval = -ENOPROTOOPT;
                break;
         * 3) Peeling off non-partial delivery; move pd_lobby to receive_queue.
         */
        skb_queue_head_init(&newsp->pd_lobby);
-       sctp_sk(newsk)->pd_mode = assoc->ulpq.pd_mode;
+       atomic_set(&sctp_sk(newsk)->pd_mode, assoc->ulpq.pd_mode);
 
-       if (sctp_sk(oldsk)->pd_mode) {
+       if (atomic_read(&sctp_sk(oldsk)->pd_mode)) {
                struct sk_buff_head *queue;
 
                /* Decide which queue to move pd_lobby skbs to. */
                 * delivery to finish.
                 */
                if (assoc->ulpq.pd_mode)
-                       sctp_clear_pd(oldsk);
+                       sctp_clear_pd(oldsk, NULL);
 
        }
 
 
 /* Clear the partial delivery mode for this socket.   Note: This
  * assumes that no association is currently in partial delivery mode.
  */
-int sctp_clear_pd(struct sock *sk)
+int sctp_clear_pd(struct sock *sk, struct sctp_association *asoc)
 {
        struct sctp_sock *sp = sctp_sk(sk);
 
-       sp->pd_mode = 0;
-       if (!skb_queue_empty(&sp->pd_lobby)) {
-               struct list_head *list;
-               sctp_skb_list_tail(&sp->pd_lobby, &sk->sk_receive_queue);
-               list = (struct list_head *)&sctp_sk(sk)->pd_lobby;
-               INIT_LIST_HEAD(list);
-               return 1;
+       if (atomic_dec_and_test(&sp->pd_mode)) {
+               /* This means there are no other associations in PD, so
+                * we can go ahead and clear out the lobby in one shot
+                */
+               if (!skb_queue_empty(&sp->pd_lobby)) {
+                       struct list_head *list;
+                       sctp_skb_list_tail(&sp->pd_lobby, &sk->sk_receive_queue);
+                       list = (struct list_head *)&sctp_sk(sk)->pd_lobby;
+                       INIT_LIST_HEAD(list);
+                       return 1;
+               }
+       } else {
+               /* There are other associations in PD, so we only need to
+                * pull stuff out of the lobby that belongs to the
+                * associations that is exiting PD (all of its notifications
+                * are posted here).
+                */
+               if (!skb_queue_empty(&sp->pd_lobby) && asoc) {
+                       struct sk_buff *skb, *tmp;
+                       struct sctp_ulpevent *event;
+
+                       sctp_skb_for_each(skb, &sp->pd_lobby, tmp) {
+                               event = sctp_skb2event(skb);
+                               if (event->asoc == asoc) {
+                                       __skb_unlink(skb, &sp->pd_lobby);
+                                       __skb_queue_tail(&sk->sk_receive_queue,
+                                                        skb);
+                               }
+                       }
+               }
        }
+
        return 0;
 }
 
 static int sctp_ulpq_clear_pd(struct sctp_ulpq *ulpq)
 {
        ulpq->pd_mode = 0;
-       return sctp_clear_pd(ulpq->asoc->base.sk);
+       return sctp_clear_pd(ulpq->asoc->base.sk, ulpq->asoc);
 }
 
 /* If the SKB of 'event' is on a list, it is the first such member
         * the association the cause of the partial delivery.
         */
 
-       if (!sctp_sk(sk)->pd_mode) {
+       if (atomic_read(&sctp_sk(sk)->pd_mode) == 0) {
                queue = &sk->sk_receive_queue;
-       } else if (ulpq->pd_mode) {
-               /* If the association is in partial delivery, we
-                * need to finish delivering the partially processed
-                * packet before passing any other data.  This is
-                * because we don't truly support stream interleaving.
-                */
-               if ((event->msg_flags & MSG_NOTIFICATION) ||
-                   (SCTP_DATA_NOT_FRAG ==
-                           (event->msg_flags & SCTP_DATA_FRAG_MASK)))
-                       queue = &sctp_sk(sk)->pd_lobby;
-               else {
-                       clear_pd = event->msg_flags & MSG_EOR;
-                       queue = &sk->sk_receive_queue;
+       } else {
+               if (ulpq->pd_mode) {
+                       /* If the association is in partial delivery, we
+                        * need to finish delivering the partially processed
+                        * packet before passing any other data.  This is
+                        * because we don't truly support stream interleaving.
+                        */
+                       if ((event->msg_flags & MSG_NOTIFICATION) ||
+                           (SCTP_DATA_NOT_FRAG ==
+                                   (event->msg_flags & SCTP_DATA_FRAG_MASK)))
+                               queue = &sctp_sk(sk)->pd_lobby;
+                       else {
+                               clear_pd = event->msg_flags & MSG_EOR;
+                               queue = &sk->sk_receive_queue;
+                       }
+               } else {
+                       /*
+                        * If fragment interleave is enabled, we
+                        * can queue this to the recieve queue instead
+                        * of the lobby.
+                        */
+                       if (sctp_sk(sk)->frag_interleave)
+                               queue = &sk->sk_receive_queue;
+                       else
+                               queue = &sctp_sk(sk)->pd_lobby;
                }
-       } else
-               queue = &sctp_sk(sk)->pd_lobby;
-
+       }
 
        /* If we are harvesting multiple skbs they will be
         * collected on a list.
 {
        struct sctp_ulpevent *event;
        struct sctp_association *asoc;
+       struct sctp_sock *sp;
 
        asoc = ulpq->asoc;
+       sp = sctp_sk(asoc->base.sk);
 
-       /* Are we already in partial delivery mode?  */
-       if (!sctp_sk(asoc->base.sk)->pd_mode) {
+       /* If the association is already in Partial Delivery mode
+        * we have noting to do.
+        */
+       if (ulpq->pd_mode)
+               return;
 
+       /* If the user enabled fragment interleave socket option,
+        * multiple associations can enter partial delivery.
+        * Otherwise, we can only enter partial delivery if the
+        * socket is not in partial deliver mode.
+        */
+       if (sp->frag_interleave || atomic_read(&sp->pd_mode) == 0) {
                /* Is partial delivery possible?  */
                event = sctp_ulpq_retrieve_first(ulpq);
                /* Send event to the ULP.   */
                if (event) {
                        sctp_ulpq_tail_event(ulpq, event);
-                       sctp_sk(asoc->base.sk)->pd_mode = 1;
+                       atomic_inc(&sp->pd_mode);
                        ulpq->pd_mode = 1;
                        return;
                }