RDS uses a single threaded work queue for connection management. This
involves creation and deletion of QPs and CQs. On certain HCAs, such
as CX-3, these operations are para-virtualized and some part of the
work has to be conducted by the Physical Function (PF) driver.
In fail-over and fail-back situations, there might be 1000s of
connections to tear down and re-establish. Hence, expand the number
work queues.
The local_wq is removed for simplicity and symmetry reasons.
Orabug:
29391909
Signed-off-by: Håkon Bugge <haakon.bugge@oracle.com>
Tested-by: Rosa Lopez <rosa.lopez@oracle.com>
Reviewed-by: Hans Westgaard Ry <hans.westgaard.ry@oracle.com>
---
Signed-off-by: Brian Maly <brian.maly@oracle.com>
unsigned long flags;
int ret, i;
int npaths;
+ int cp_wqs_inx = jhash_3words(laddr->s6_addr32[3],
+ faddr->s6_addr32[3],
+ tos,
+ 0) % RDS_NMBR_CP_WQS;
rcu_read_lock();
conn = rds_conn_lookup(net, head, laddr, faddr, trans, tos, dev_if);
__rds_conn_path_init(conn, cp, is_outgoing);
cp->cp_index = i;
- if (conn->c_loopback)
- cp->cp_wq = rds_local_wq;
- else
- cp->cp_wq = rds_wq;
+ rds_rtd(RDS_RTD_CM_EXT, "using rds_cp_wqs index %d\n", cp_wqs_inx);
+ cp->cp_wq = rds_cp_wqs[cp_wqs_inx];
}
ret = trans->conn_alloc(conn, gfp);
if (ret) {
if (--garps->garps_left >= 0)
queue_delayed_work(send_garps_wq, &garps->work, garps->delay);
else
- kfree(garps);
+ kfree(_work);
}
static void rds_ib_send_gratuitous_arp(struct net_device *out_dev,
static void rds_ib_unregister_client(void)
{
+ int i;
+
ib_unregister_client(&rds_ib_client);
/* wait for rds_ib_dev_free() to complete */
flush_workqueue(rds_ip_wq);
- flush_workqueue(rds_local_wq);
+
+ for (i = 0; i < RDS_NMBR_CP_WQS; ++i)
+ flush_workqueue(rds_cp_wqs[i]);
}
static void rds_ib_update_ip_config(void)
#endif
#define RDS_RECONNECT_RETRY_MS 15000
+#define RDS_NMBR_CP_WQS 16
#ifdef RDS_DEBUG
#define rdsdebug(fmt, args...) pr_debug("%s(): " fmt, __func__ , ##args)
int rds_threads_init(void);
void rds_threads_exit(void);
extern struct workqueue_struct *rds_wq;
-extern struct workqueue_struct *rds_local_wq;
+extern struct workqueue_struct *rds_cp_wqs[RDS_NMBR_CP_WQS];
void rds_queue_reconnect(struct rds_conn_path *cp);
void rds_connect_worker(struct work_struct *);
void rds_shutdown_worker(struct work_struct *);
*/
#include <linux/kernel.h>
#include <linux/random.h>
+#include <linux/workqueue.h>
#include "rds.h"
*/
struct workqueue_struct *rds_wq;
EXPORT_SYMBOL_GPL(rds_wq);
-struct workqueue_struct *rds_local_wq;
-EXPORT_SYMBOL_GPL(rds_local_wq);
+struct workqueue_struct *rds_cp_wqs[RDS_NMBR_CP_WQS];
+EXPORT_SYMBOL_GPL(rds_cp_wqs);
static inline void rds_update_avg_connect_time(struct rds_conn_path *cp)
{
void rds_threads_exit(void)
{
+ int i;
+
destroy_workqueue(rds_wq);
- destroy_workqueue(rds_local_wq);
+ for (i = 0; i < RDS_NMBR_CP_WQS; ++i)
+ destroy_workqueue(rds_cp_wqs[i]);
}
int rds_threads_init(void)
{
+ int i, j;
+
rds_wq = create_singlethread_workqueue("krdsd");
if (!rds_wq)
return -ENOMEM;
- rds_local_wq = create_singlethread_workqueue("krdsd_local");
- if (!rds_local_wq)
- return -ENOMEM;
+ for (i = 0; i < RDS_NMBR_CP_WQS; ++i) {
+ rds_cp_wqs[i] = alloc_ordered_workqueue("krds_cp_wq_%d",
+ WQ_MEM_RECLAIM, i);
+ if (!rds_cp_wqs[i])
+ goto err;
+ }
return 0;
+
+err:
+ destroy_workqueue(rds_wq);
+ for (j = 0; j < i; ++j)
+ destroy_workqueue(rds_cp_wqs[j]);
+
+ return -ENOMEM;
}
/* Compare two IPv6 addresses. Return 0 if the two addresses are equal.