self.assertEqual(proc.wait(), 0, "ERROR : pci rescan failed")
def get_ctrl_id(self):
- """ Wrapper for extracting the controller id.
+ """ Wrapper for extracting the first controller id.
- Args:
- None
- Returns:
- controller id.
"""
- get_ctrl_id = "nvme list-ctrl " + self.ctrl
+ get_ctrl_id = f"nvme list-ctrl {self.ctrl} --output-format=json"
proc = subprocess.Popen(get_ctrl_id,
shell=True,
stdout=subprocess.PIPE,
encoding='utf-8')
err = proc.wait()
self.assertEqual(err, 0, "ERROR : nvme list-ctrl failed")
- line = proc.stdout.readline()
- ctrl_id = line.split(":")[1].strip()
- return ctrl_id
+ json_output = json.loads(proc.stdout.read())
+ self.assertTrue(len(json_output['ctrl_list']) > 0,
+ "ERROR : nvme list-ctrl could not find ctrl")
+ return str(json_output['ctrl_list'][0]['ctrl_id'])
def get_ns_list(self):
""" Wrapper for extracting the namespace list.
print(max_ns)
return int(max_ns)
+ def get_lba_format_size(self):
+ """ Wrapper for extracting lba format size of the given flbas
+ - Args:
+ - None
+ - Returns:
+ - lba format size as a tuple of (data_size, metadata_size) in bytes.
+ """
+ nvme_id_ns_cmd = f"nvme id-ns {self.ns1} --output-format=json"
+ proc = subprocess.Popen(nvme_id_ns_cmd,
+ shell=True,
+ stdout=subprocess.PIPE,
+ encoding='utf-8')
+ err = proc.wait()
+ self.assertEqual(err, 0, "ERROR : reading id-ns")
+ json_output = json.loads(proc.stdout.read())
+ self.assertTrue(len(json_output['lbafs']) > self.flbas,
+ "Error : could not match the given flbas to an existing lbaf")
+ lbaf_json = json_output['lbafs'][int(self.flbas)]
+ ms_expo = int(lbaf_json['ms'])
+ ds_expo = int(lbaf_json['ds'])
+ ds = 0
+ ms = 0
+ if ds_expo > 0:
+ ds = (1 << ds_expo)
+ if ms_expo > 0:
+ ms = (1 << ms_expo)
+ return (ds, ms)
+
def get_ncap(self):
""" Wrapper for extracting capacity.
- Args: