complete(rm, notify_status);
 }
 
-static void rds_ib_send_unmap_data(struct rds_ib_connection *ic,
-                                  struct rm_data_op *op,
-                                  int wc_status)
-{
-       if (op->op_nents)
-               ib_dma_unmap_sg(ic->i_cm_id->device,
-                               op->op_sg, op->op_nents,
-                               DMA_TO_DEVICE);
-}
-
 static void rds_ib_send_unmap_rdma(struct rds_ib_connection *ic,
                                   struct rm_rdma_op *op,
                                   int wc_status)
                rds_ib_stats_inc(s_ib_atomic_fadd);
 }
 
+static void rds_ib_send_unmap_data(struct rds_ib_connection *ic,
+                                  struct rm_data_op *op,
+                                  int wc_status)
+{
+       struct rds_message *rm = container_of(op, struct rds_message, data);
+
+       if (op->op_nents)
+               ib_dma_unmap_sg(ic->i_cm_id->device,
+                               op->op_sg, op->op_nents,
+                               DMA_TO_DEVICE);
+
+       if (rm->rdma.op_active && rm->data.op_notify)
+               rds_ib_send_unmap_rdma(ic, &rm->rdma, wc_status);
+}
+
 /*
  * Unmap the resources associated with a struct send_work.
  *
 
                }
                op->op_notifier->n_user_token = args->user_token;
                op->op_notifier->n_status = RDS_RDMA_SUCCESS;
+
+               /* Enable rmda notification on data operation for composite
+                * rds messages and make sure notification is enabled only
+                * for the data operation which follows it so that application
+                * gets notified only after full message gets delivered.
+                */
+               if (rm->data.op_sg) {
+                       rm->rdma.op_notify = 0;
+                       rm->data.op_notify = !!(args->flags & RDS_RDMA_NOTIFY_ME);
+               }
        }
 
        /* The cookie contains the R_Key of the remote memory region, and
 
                } rdma;
                struct rm_data_op {
                        unsigned int            op_active:1;
+                       unsigned int            op_notify:1;
                        unsigned int            op_nents;
                        unsigned int            op_count;
                        unsigned int            op_dmasg;
 
        struct rm_rdma_op *ro;
        struct rds_notifier *notifier;
        unsigned long flags;
+       unsigned int notify = 0;
 
        spin_lock_irqsave(&rm->m_rs_lock, flags);
 
+       notify =  rm->rdma.op_notify | rm->data.op_notify;
        ro = &rm->rdma;
        if (test_bit(RDS_MSG_ON_SOCK, &rm->m_flags) &&
-           ro->op_active && ro->op_notify && ro->op_notifier) {
+           ro->op_active && notify && ro->op_notifier) {
                notifier = ro->op_notifier;
                rs = rm->m_rs;
                sock_hold(rds_rs_to_sk(rs));