Remove unused prints and add some informative prints.
Signed-off-by: Dennis Maisenbacher <dennis.maisenbacher@wdc.com>
def setUp(self):
""" Pre Section for TestNVMeCopy """
super().setUp()
- print("\nSetting up test...")
self.ocfs = self.get_ocfs()
self.host_behavior_data = None
cross_namespace_copy = self.ocfs & 0xc
def tearDown(self):
""" Post Section for TestNVMeCopy """
- print("Tearing down test...")
if self.host_behavior_data:
# restore saved host behavior support data
set_features_cmd = f"{self.nvme_bin} set-feature {self.ctrl} " + \
if "sopts" in kwargs:
copy_cmd += f" --sopts={kwargs['sopts']}"
# run and assert success
- print("Running command:", copy_cmd)
self.assertEqual(self.exec_cmd(copy_cmd), 0)
def test_copy(self):
""" Testcase main """
- print("Running test...")
self.copy(0, 1, 2, descriptor_format=0)
self.copy(0, 1, 2, descriptor_format=1)
self.copy(0, 1, 2, descriptor_format=2, snsids=self.ns1_nsid)
""" Testcase main """
print(f"##### Testing max_ns: {self.max_ns}")
for nsid in range(1, self.max_ns + 1):
- print("##### Creating " + str(nsid))
+ print(f"##### Creating {nsid}")
err = self.create_and_validate_ns(nsid,
self.nsze,
self.ncap,
self.flbas,
self.dps)
self.assertEqual(err, 0)
- print("##### Attaching " + str(nsid))
+ print(f"##### Attaching {nsid}")
self.assertEqual(self.attach_ns(self.ctrl_id, nsid), 0)
- print("##### Running IOs in " + str(nsid))
+ print(f"##### Running IOs in {nsid}")
self.run_ns_io(nsid, 9, 1)
for nsid in range(1, self.max_ns + 1):
- print("##### Detaching " + str(nsid))
+ print(f"##### Detaching {nsid}")
self.assertEqual(self.detach_ns(self.ctrl_id, nsid), 0)
- print("#### Deleting " + str(nsid))
+ print(f"#### Deleting {nsid}")
self.assertEqual(self.delete_and_validate_ns(nsid), 0)
self.nvme_reset_ctrl()
"""
flush_cmd = f"{self.nvme_bin} flush {self.ctrl} " + \
f"--namespace-id={str(self.default_nsid)}"
- print(flush_cmd)
return self.exec_cmd(flush_cmd)
def test_nvme_flush(self):
# extract the supported format information.
self.attach_detach_primary_ns()
+ print("##### Testing lba formats:")
# iterate through all supported format
for flbas, lba_format in enumerate(self.lba_format_list):
ds = lba_format['ds']
- Returns:
- 0 on success, error code on failure.
"""
- err = 0
fw_log_cmd = f"{self.nvme_bin} fw-log {self.ctrl}"
proc = subprocess.Popen(fw_log_cmd,
shell=True,
stdout=subprocess.PIPE,
encoding='utf-8')
- fw_log_output = proc.communicate()[0]
- print("\n" + fw_log_output + "\n")
- err = proc.wait()
- return err
+ return proc.wait()
def test_fw_log(self):
""" Testcase main """
shell=True,
stdout=subprocess.PIPE,
encoding='utf-8')
- feature_output = proc.communicate()[0]
- print(feature_output)
self.assertEqual(proc.wait(), 0)
else:
get_feat_cmd = f"{self.nvme_bin} get-feature {self.ctrl} " + \
shell=True,
stdout=subprocess.PIPE,
encoding='utf-8')
- feature_output = proc.communicate()[0]
- print(feature_output)
self.assertEqual(proc.wait(), 0)
def test_get_mandatory_features(self):
- Returns:
- 0 on success, error code on failure.
"""
- err = 0
get_lba_status_cmd = f"{self.nvme_bin} get-lba-status {self.ctrl} " + \
f"--namespace-id={str(self.ns1)} " + \
f"--start-lba={str(self.start_lba)} " + \
shell=True,
stdout=subprocess.PIPE,
encoding='utf-8')
- get_lba_status_output = proc.communicate()[0]
- print("\n" + get_lba_status_output + "\n")
- err = proc.wait()
- return err
+ return proc.wait()
def test_get_lba_status(self):
""" Testcase main """
- Returns:
- 0 on success, error code on failure.
"""
- err = 0
id_ns_cmd = f"{self.nvme_bin} id-ns {self.ctrl} " + \
f"--namespace-id={str(nsid)}"
proc = subprocess.Popen(id_ns_cmd,
shell=True,
stdout=subprocess.PIPE,
encoding='utf-8')
- id_ns_output = proc.communicate()[0]
- print(id_ns_output + "\n")
- err = proc.wait()
- return err
+ return proc.wait()
def get_id_ns_all(self):
"""
- Returns:
- 0 on success, error code on failure.
"""
- err = 0
lba_stat_log_cmd = f"{self.nvme_bin} lba-status-log {self.ctrl}"
proc = subprocess.Popen(lba_stat_log_cmd,
shell=True,
stdout=subprocess.PIPE,
encoding='utf-8')
- lba_stat_log_output = proc.communicate()[0]
- print("\n" + lba_stat_log_output + "\n")
- err = proc.wait()
- return err
+ return proc.wait()
def test_lba_stat_log(self):
""" Testcase main """
self.load_config()
if self.do_validate_pci_device:
self.validate_pci_device()
+ print(f"\nsetup: ctrl: {self.ctrl}, ns1: {self.ns1}, default_nsid: {self.default_nsid}, flbas: {self.flbas}\n")
def tearDown(self):
""" Post Section for TestNVMe. """
if self.clear_log_dir is True:
shutil.rmtree(self.log_dir, ignore_errors=True)
self.create_and_attach_default_ns()
+ print(f"\nteardown: ctrl: {self.ctrl}, ns1: {self.ns1}, default_nsid: {self.default_nsid}, flbas: {self.flbas}\n")
+
+ @classmethod
+ def tearDownClass(cls):
+ print("\n")
def create_and_attach_default_ns(self):
""" Creates a default namespace with the full capacity of the ctrls NVM
- None
"""
x1, x2, dev = self.ctrl.split('/')
- cmd = cmd = "find /sys/devices -name \\*" + dev + " | grep -i pci"
- err = subprocess.call(cmd, shell=True)
+ cmd = "find /sys/devices -name \\*" + dev + " | grep -i pci"
+ err = subprocess.call(cmd, shell=True, stdout=subprocess.DEVNULL)
self.assertEqual(err, 0, "ERROR : Only NVMe PCI subsystem is supported")
def load_config(self):
"""
smart_log_cmd = f"{self.nvme_bin} smart-log {self.ctrl} " + \
f"--namespace-id={str(nsid)}"
- print(smart_log_cmd)
proc = subprocess.Popen(smart_log_cmd,
shell=True,
stdout=subprocess.PIPE,
encoding='utf-8')
err = proc.wait()
self.assertEqual(err, 0, "ERROR : nvme smart log failed")
- smart_log_output = proc.communicate()[0]
- print(f"{smart_log_output}")
return err
def get_id_ctrl(self, vendor=False):
else:
id_ctrl_cmd = f"{self.nvme_bin} id-ctrl " +\
f"--vendor-specific {self.ctrl}"
- print(id_ctrl_cmd)
proc = subprocess.Popen(id_ctrl_cmd,
shell=True,
stdout=subprocess.PIPE,
ns_path = self.ctrl + "n" + str(nsid)
io_cmd = "dd if=" + ns_path + " of=/dev/null" + " bs=" + \
str(block_size) + " count=" + str(count) + " > /dev/null 2>&1"
- print(io_cmd)
+ print(f"Running io: {io_cmd}")
run_io = subprocess.Popen(io_cmd, shell=True, stdout=subprocess.PIPE,
encoding='utf-8')
run_io_result = run_io.communicate()[1]
self.assertEqual(run_io_result, None)
io_cmd = "dd if=/dev/zero of=" + ns_path + " bs=" + \
str(block_size) + " count=" + str(count) + " > /dev/null 2>&1"
- print(io_cmd)
+ print(f"Running io: {io_cmd}")
run_io = subprocess.Popen(io_cmd, shell=True, stdout=subprocess.PIPE,
encoding='utf-8')
run_io_result = run_io.communicate()[1]
f"--start-block={str(self.start_block)} " + \
f"--block-count={str(self.block_count)} " + \
f"--data-size={str(self.data_size)} --data={self.read_file}"
- print(read_cmd)
return self.exec_cmd(read_cmd)