- Delete Namespace.
"""
+import json
import math
import subprocess
import time
- nsze : namespace size.
- ncap : namespace capacity.
- ctrl_id : controller id.
- - lba_format_list : lis of supported format.
- - ms_list : list of metadat size per format.
- - lbads_list : list of LBA data size per format.
+ - lba_format_list : json list of supported format.
- test_log_dir : directory for logs, temp files.
"""
self.nsze = ncap
self.ctrl_id = self.get_ctrl_id()
self.lba_format_list = []
- self.ms_list = []
- self.lbads_list = []
self.test_log_dir = self.log_dir + "/" + self.__class__.__name__
self.setup_log_dir(self.__class__.__name__)
self.delete_all_ns()
self.dps), 0)
self.assertEqual(self.attach_ns(self.ctrl_id, self.default_nsid), 0)
# read lbaf information
- id_ns = f"{self.nvme_bin} id-ns {self.ctrl} " + \
- f"--namespace-id={self.default_nsid} " + \
- "| grep ^lbaf | awk '{print $2}' | tr -s \"\\n\" \" \""
- proc = subprocess.Popen(id_ns, shell=True, stdout=subprocess.PIPE,
+ id_ns_cmd = f"{self.nvme_bin} id-ns {self.ctrl} " + \
+ f"--namespace-id={self.default_nsid} --output-format=json"
+ proc = subprocess.Popen(id_ns_cmd,
+ shell=True,
+ stdout=subprocess.PIPE,
encoding='utf-8')
- self.lba_format_list = proc.stdout.read().strip().split(" ")
- if proc.wait() == 0:
- # read lbads information
- id_ns = f"{self.nvme_bin} id-ns {self.ctrl} " + \
- f"--namespace-id={self.default_nsid} " + \
- "| grep ^lbaf | awk '{print $5}' | cut -f 2 -d ':' | tr -s \"\\n\" \" \""
- proc = subprocess.Popen(id_ns, shell=True, stdout=subprocess.PIPE,
- encoding='utf-8')
- self.lbads_list = proc.stdout.read().strip().split(" ")
- # read metadata information
- id_ns = f"{self.nvme_bin} id-ns {self.ctrl} " + \
- f"--namespace-id={self.default_nsid} " + \
- "| grep ^lbaf | awk '{print $4}' | cut -f 2 -d ':' | tr -s \"\\n\" \" \""
- proc = subprocess.Popen(id_ns, shell=True, stdout=subprocess.PIPE,
- encoding='utf-8')
- self.ms_list = proc.stdout.read().strip().split(" ")
- self.assertEqual(self.detach_ns(self.ctrl_id, self.default_nsid), 0)
- self.assertEqual(self.delete_and_validate_ns(self.default_nsid), 0)
- self.nvme_reset_ctrl()
+ err = proc.wait()
+ self.assertEqual(err, 0, "ERROR : nvme id-ns failed")
+ json_output = json.loads(proc.stdout.read())
+ self.lba_format_list = json_output['lbafs']
+ self.assertTrue(len(self.lba_format_list) > 0,
+ "ERROR : nvme id-ns could not find any lba formats")
+ self.assertEqual(self.detach_ns(self.ctrl_id, self.default_nsid), 0)
+ self.assertEqual(self.delete_and_validate_ns(self.default_nsid), 0)
+ self.nvme_reset_ctrl()
def test_format_ns(self):
""" Testcase main """
self.attach_detach_primary_ns()
# iterate through all supported format
- for i in range(0, len(self.lba_format_list)):
- print("\nlba format " + str(self.lba_format_list[i]) +
- " lbad " + str(self.lbads_list[i]) +
- " ms " + str(self.ms_list[i]))
- metadata_size = 1 if self.ms_list[i] == '8' else 0
+ for flbas, lba_format in enumerate(self.lba_format_list):
+ ds = lba_format['ds']
+ ms = lba_format['ms']
+ print(f"\nlba format {str(flbas)}"
+ f"\nds {str(ds)}"
+ f"\nms {str(ms)}")
+ dps = 1 if str(ms) == '8' else 0
err = self.create_and_validate_ns(self.default_nsid,
self.nsze,
self.ncap,
- self.lba_format_list[i],
- metadata_size)
+ flbas,
+ dps)
self.assertEqual(err, 0)
self.assertEqual(self.attach_ns(self.ctrl_id, self.default_nsid), 0)
- self.run_ns_io(self.default_nsid, self.lbads_list[i])
- time.sleep(5)
+ self.run_ns_io(self.default_nsid, int(ds))
self.assertEqual(self.detach_ns(self.ctrl_id, self.default_nsid), 0)
self.assertEqual(self.delete_and_validate_ns(self.default_nsid), 0)
self.nvme_reset_ctrl()
""" Pre Section for TestNVMeIdentifyNamespace. """
super().setUp()
self.setup_log_dir(self.__class__.__name__)
- self.ns_list = self.get_ns_list()
+ self.nsid_list = self.get_nsid_list()
def tearDown(self):
"""
- 0 on success, error code on failure.
"""
err = 0
- for namespace in self.ns_list:
+ for namespace in self.nsid_list:
err = self.get_id_ns(str(namespace))
return err
- Returns:
- 0 on success, error code on failure.
"""
- ns_list = self.get_ns_list()
- for nsid in range(0, len(ns_list)):
- self.get_smart_log_ns(ns_list[nsid])
+ nsid_list = self.get_nsid_list()
+ for nsid in nsid_list:
+ self.get_smart_log_ns(nsid)
return 0
def test_smart_log(self):
""" Testcase main """
self.assertEqual(self.get_smart_log_ctrl(), 0)
- smlp = self.supp_check_id_ctrl("lpa")
+ smlp = int(self.get_id_ctrl_field_value("lpa"), 16)
if smlp & 0x1:
self.assertEqual(self.get_smart_log_all_ns(), 0)
"ERROR : nvme list-ctrl could not find ctrl")
return str(json_output['ctrl_list'][0]['ctrl_id'])
- def get_ns_list(self):
+ def get_nsid_list(self):
""" Wrapper for extracting the namespace list.
- Args:
- None
- List of the namespaces.
"""
ns_list = []
- ns_list_cmd = f"{self.nvme_bin} list-ns {self.ctrl}"
+ ns_list_cmd = f"{self.nvme_bin} list-ns {self.ctrl} " + \
+ "--output-format=json"
proc = subprocess.Popen(ns_list_cmd,
shell=True,
stdout=subprocess.PIPE,
encoding='utf-8')
self.assertEqual(proc.wait(), 0, "ERROR : nvme list namespace failed")
- for line in proc.stdout:
- ns_list.append(line.split('x')[-1])
+ json_output = json.loads(proc.stdout.read())
+
+ for ns in json_output['nsid_list']:
+ ns_list.append(ns['nsid'])
return ns_list
- Returns:
- maximum number of namespaces supported.
"""
- pattern = re.compile("^nn[ ]+: [0-9]", re.IGNORECASE)
- max_ns = -1
- max_ns_cmd = f"{self.nvme_bin} id-ctrl {self.ctrl}"
+ max_ns_cmd = f"{self.nvme_bin} id-ctrl {self.ctrl} " + \
+ "--output-format=json"
proc = subprocess.Popen(max_ns_cmd,
shell=True,
stdout=subprocess.PIPE,
encoding='utf-8')
err = proc.wait()
self.assertEqual(err, 0, "ERROR : reading maximum namespace count failed")
-
- for line in proc.stdout:
- if pattern.match(line):
- max_ns = line.split(":")[1].strip()
- break
- print(max_ns)
- return int(max_ns)
+ json_output = json.loads(proc.stdout.read())
+ return int(json_output['nn'])
def get_lba_status_supported(self):
""" Check if 'Get LBA Status' command is supported by the device
- Args:
- None
- Returns:
- - maximum number of namespaces supported.
+ - Total NVM capacity.
"""
- pattern = re.compile("^tnvmcap[ ]+: [0-9]", re.IGNORECASE)
- ncap = -1
- ncap_cmd = f"{self.nvme_bin} id-ctrl {self.ctrl}"
- proc = subprocess.Popen(ncap_cmd,
- shell=True,
- stdout=subprocess.PIPE,
- encoding='utf-8')
- err = proc.wait()
- self.assertEqual(err, 0, "ERROR : reading nvm capacity failed")
-
- for line in proc.stdout:
- if pattern.match(line):
- ncap = line.split(":")[1].strip()
- break
- print(ncap)
- return int(ncap)
+ return int(self.get_id_ctrl_field_value("tnvmcap"))
def get_id_ctrl_field_value(self, field):
""" Wrapper for extracting id-ctrl field values
err = proc.wait()
self.assertEqual(err, 0, "ERROR : reading nvm capacity failed")
+ # Not using json output here because parsing flbas makes this less
+ # readable as the format index is split into lower and upper bits
for line in proc.stdout:
if "in use" in line:
nvm_format = 2 ** int(line.split(":")[3].split()[0])
- print(nvm_format)
return int(nvm_format)
def delete_all_ns(self):
delete_ns_cmd = f"{self.nvme_bin} delete-ns {self.ctrl} " + \
"--namespace-id=0xFFFFFFFF"
self.assertEqual(self.exec_cmd(delete_ns_cmd), 0)
- list_ns_cmd = f"{self.nvme_bin} list-ns {self.ctrl} --all | wc -l"
+ list_ns_cmd = f"{self.nvme_bin} list-ns {self.ctrl} --all " + \
+ "--output-format=json"
proc = subprocess.Popen(list_ns_cmd,
shell=True,
stdout=subprocess.PIPE,
encoding='utf-8')
- output = proc.stdout.read().strip()
- self.assertEqual(output, '0', "ERROR : deleting all namespace failed")
+ self.assertEqual(proc.wait(), 0, "ERROR : nvme list-ns failed")
+ json_output = json.loads(proc.stdout.read())
+ self.assertEqual(len(json_output['nsid_list']), 0,
+ "ERROR : deleting all namespace failed")
def create_ns(self, nsze, ncap, flbas, dps):
""" Wrapper for creating a namespace.
encoding='utf-8')
err = proc.wait()
self.assertEqual(err, 0, "ERROR : nvme error log failed")
+ # This sanity checkes the 'normal' output
line = proc.stdout.readline()
err_log_entry_count = int(line.split(" ")[5].strip().split(":")[1])
entry_count = 0
encoding='utf-8')
run_io_result = run_io.communicate()[1]
self.assertEqual(run_io_result, None)
-
- def supp_check_id_ctrl(self, key):
- """ Wrapper for support check.
- - Args:
- - key : search key.
- - Returns:
- - value for key requested.
- """
- id_ctrl = f"{self.nvme_bin} id-ctrl {self.ctrl}"
- print("\n" + id_ctrl)
- proc = subprocess.Popen(id_ctrl,
- shell=True,
- stdout=subprocess.PIPE,
- encoding='utf-8')
- err = proc.wait()
- self.assertEqual(err, 0, "ERROR : nvme Identify controller Data \
- structure failed")
- for line in proc.stdout:
- if key in line:
- key = line.replace(",", "", 1)
- print(key)
- val = (key.split(':'))[1].strip()
- return int(val, 16)