return (tp->nonagle & TCP_NAGLE_CORK) ? true : false;
 }
 
+/* If we have pending CDC messages, do not send:
+ * Because CQE of this CDC message will happen shortly, it gives
+ * a chance to coalesce future sendmsg() payload in to one RDMA Write,
+ * without need for a timer, and with no latency trade off.
+ * Algorithm here:
+ *  1. First message should never cork
+ *  2. If we have pending Tx CDC messages, wait for the first CDC
+ *     message's completion
+ *  3. Don't cork to much data in a single RDMA Write to prevent burst
+ *     traffic, total corked message should not exceed sendbuf/2
+ */
+static bool smc_should_autocork(struct smc_sock *smc)
+{
+       struct smc_connection *conn = &smc->conn;
+       int corking_size;
+
+       corking_size = min(SMC_AUTOCORKING_DEFAULT_SIZE,
+                          conn->sndbuf_desc->len >> 1);
+
+       if (atomic_read(&conn->cdc_pend_tx_wr) == 0 ||
+           smc_tx_prepared_sends(conn) > corking_size)
+               return false;
+       return true;
+}
+
+static bool smc_tx_should_cork(struct smc_sock *smc, struct msghdr *msg)
+{
+       struct smc_connection *conn = &smc->conn;
+
+       if (smc_should_autocork(smc))
+               return true;
+
+       /* for a corked socket defer the RDMA writes if
+        * sndbuf_space is still available. The applications
+        * should known how/when to uncork it.
+        */
+       if ((msg->msg_flags & MSG_MORE ||
+            smc_tx_is_corked(smc) ||
+            msg->msg_flags & MSG_SENDPAGE_NOTLAST) &&
+           atomic_read(&conn->sndbuf_space))
+               return true;
+
+       return false;
+}
+
 /* sndbuf producer: main API called by socket layer.
  * called under sock lock.
  */
                 */
                if ((msg->msg_flags & MSG_OOB) && !send_remaining)
                        conn->urg_tx_pend = true;
-               /* for a corked socket defer the RDMA writes if
-                * sndbuf_space is still available. The applications
-                * should known how/when to uncork it.
+               /* If we need to cork, do nothing and wait for the next
+                * sendmsg() call or push on tx completion
                 */
-               if (!((msg->msg_flags & MSG_MORE || smc_tx_is_corked(smc) ||
-                      msg->msg_flags & MSG_SENDPAGE_NOTLAST) &&
-                     atomic_read(&conn->sndbuf_space)))
+               if (!smc_tx_should_cork(smc, msg))
                        smc_tx_sndbuf_nonempty(conn);
 
                trace_smc_tx_sendmsg(smc, copylen);
        return rc;
 }
 
-int smc_tx_sndbuf_nonempty(struct smc_connection *conn)
+static int __smc_tx_sndbuf_nonempty(struct smc_connection *conn)
 {
-       int rc;
+       struct smc_sock *smc = container_of(conn, struct smc_sock, conn);
+       int rc = 0;
+
+       /* No data in the send queue */
+       if (unlikely(smc_tx_prepared_sends(conn) <= 0))
+               goto out;
+
+       /* Peer don't have RMBE space */
+       if (unlikely(atomic_read(&conn->peer_rmbe_space) <= 0)) {
+               SMC_STAT_RMB_TX_PEER_FULL(smc, !conn->lnk);
+               goto out;
+       }
 
        if (conn->killed ||
-           conn->local_rx_ctrl.conn_state_flags.peer_conn_abort)
-               return -EPIPE;  /* connection being aborted */
+           conn->local_rx_ctrl.conn_state_flags.peer_conn_abort) {
+               rc = -EPIPE;    /* connection being aborted */
+               goto out;
+       }
        if (conn->lgr->is_smcd)
                rc = smcd_tx_sndbuf_nonempty(conn);
        else
 
        if (!rc) {
                /* trigger socket release if connection is closing */
-               struct smc_sock *smc = container_of(conn, struct smc_sock,
-                                                   conn);
                smc_close_wake_tx_prepared(smc);
        }
+
+out:
+       return rc;
+}
+
+int smc_tx_sndbuf_nonempty(struct smc_connection *conn)
+{
+       int rc;
+
+       /* This make sure only one can send simultaneously to prevent wasting
+        * of CPU and CDC slot.
+        * Record whether someone has tried to push while we are pushing.
+        */
+       if (atomic_inc_return(&conn->tx_pushing) > 1)
+               return 0;
+
+again:
+       atomic_set(&conn->tx_pushing, 1);
+       smp_wmb(); /* Make sure tx_pushing is 1 before real send */
+       rc = __smc_tx_sndbuf_nonempty(conn);
+
+       /* We need to check whether someone else have added some data into
+        * the send queue and tried to push but failed after the atomic_set()
+        * when we are pushing.
+        * If so, we need to push again to prevent those data hang in the send
+        * queue.
+        */
+       if (unlikely(!atomic_dec_and_test(&conn->tx_pushing)))
+               goto again;
+
        return rc;
 }