diff --git a/src/tests/ftest/dfuse/pil4dfs_fio.py b/src/tests/ftest/dfuse/pil4dfs_fio.py new file mode 100644 index 00000000000..49121e38237 --- /dev/null +++ b/src/tests/ftest/dfuse/pil4dfs_fio.py @@ -0,0 +1,221 @@ +""" + (C) Copyright 2019-2024 Intel Corporation. + + SPDX-License-Identifier: BSD-2-Clause-Patent +""" + +import json +import os + +from ClusterShell.NodeSet import NodeSet +from cpu_utils import CpuInfo +from dfuse_test_base import DfuseTestBase +from fio_utils import FioCommand +from general_utils import bytes_to_human + + +class Pil4dfsFio(DfuseTestBase): + """Test class Description: Runs Fio with in small config. + + :avocado: recursive + """ + + _FIO_RW_NAMES = ["write", "read"] + + def __init__(self, *args, **kwargs): + """Initialize a FioPil4dfs object.""" + super().__init__(*args, **kwargs) + + self.fio_cmd = None + self.fio_params = {"thread": "", "blocksize": "", "size": ""} + self.fio_numjobs = 0 + self.fio_cpus_allowed = "" + + def setUp(self): + """Set up each test case.""" + # obtain separate logs + self.update_log_file_names() + + # Start the servers and agents + super().setUp() + + for name in self.fio_params: + self.fio_params[name] = self.params.get(name, "/run/fio/job/*", "") + + cpu_info = CpuInfo(self.log, self.hostlist_clients) + cpu_info.scan() + _, arch = cpu_info.get_architectures()[0] + if arch.numas != 2: + self.fail(f"Client with unsupported quantity of NUMA nodes: want=2, got={arch.numas}") + self.fio_numjobs = int(arch.quantity / arch.threads_core) + cpus = [] + cores_quantity = int(self.fio_numjobs / 2) + for numa_idx in range(2): + cpus += arch.get_numa_cpus(numa_idx)[:cores_quantity] + self.fio_cpus_allowed = str(NodeSet(str(cpus)))[1:-1] + + def _create_container(self): + """Created a DAOS POSIX container""" + self.log.info("Creating pool") + self.assertIsNone(self.pool, "Unexpected pool before starting test") + self.add_pool() + + self.log.info("Creating container") + self.assertIsNone(self.container, "Unexpected container before starting test") + self.add_container(self.pool) + + def _destroy_container(self): + """Destroy DAOS POSIX container previously created""" + if self.container is not None: + self.log.debug("Destroying container %s", str(self.container)) + self.destroy_containers(self.container) + self.container = None + + if self.pool is not None: + self.log.debug("Destroying pool %s", str(self.pool)) + self.destroy_pools(self.pool) + self.pool = None + + def _get_bandwidth(self, fio_result, rw): + """Returns FIO bandwidth of a given I/O pattern + + Args: + fio_result (RemoteCommandResult): results of a FIO command. + rw (str): Type of I/O pattern. + + Returns: + int: Bandwidth of the FIO command. + + """ + fio_stdout = next(iter(fio_result.all_stdout.values())) + # NOTE With dfuse and pil4dfs some junk messages could be eventually printed + if fio_stdout[0] != '{': + fio_stdout = '{' + fio_stdout.partition('{')[2] + fio_json = json.loads(fio_stdout) + return fio_json["jobs"][0][rw]['bw_bytes'] + + def _run_fio_dfuse(self): + """Run and return the result of running FIO over a DFuse mount point. + + Returns: + dict: Read and Write bandwidths of the FIO command. + + """ + self._create_container() + self.log.info("Mounting DFuse mount point") + self.start_dfuse(self.hostlist_clients, self.pool, self.container) + self.log.debug("Mounted DFuse mount point %s", str(self.dfuse)) + + self.fio_cmd = FioCommand() + self.fio_cmd.get_params(self) + self.fio_cmd.update( + "global", "directory", self.dfuse.mount_dir.value, + f"fio --name=global --directory={self.dfuse.mount_dir.value}") + self.fio_cmd.update("global", "ioengine", "psync", "fio --name=global --ioengine='psync'") + self.fio_cmd.update( + "global", "numjobs", self.fio_numjobs, + f"fio --name=global --numjobs={self.fio_numjobs}") + self.fio_cmd.update( + "global", "cpus_allowed", self.fio_cpus_allowed, + f"fio --name=global --cpus_allowed={self.fio_cpus_allowed}") + self.fio_cmd.env['LD_PRELOAD'] = os.path.join(self.prefix, 'lib64', 'libpil4dfs.so') + self.fio_cmd.hosts = self.hostlist_clients + + bws = {} + for rw in Pil4dfsFio._FIO_RW_NAMES: + self.fio_cmd.update("job", "rw", rw, f"fio --name=job --rw={rw}") + + params = ", ".join(f"{name}={value}" for name, value in self.fio_params.items()) + self.log.info("Running FIO command: rw=%s, %s", rw, params) + self.log.debug( + "FIO command: LD_PRELOAD=%s %s", self.fio_cmd.env['LD_PRELOAD'], str(self.fio_cmd)) + result = self.fio_cmd.run() + bws[rw] = self._get_bandwidth(result, rw) + self.log.debug("DFuse bandwidths for %s: %s", rw, bws[rw]) + + if self.dfuse is not None: + self.log.debug("Stopping DFuse mount point %s", str(self.dfuse)) + self.stop_dfuse() + self._destroy_container() + + return bws + + def _run_fio_dfs(self): + """Run and return the result of running FIO with DFS ioengine. + + Returns: + dict: Read and Write bandwidths of the FIO command. + + """ + self._create_container() + + self.fio_cmd = FioCommand() + self.fio_cmd.get_params(self) + self.fio_cmd.update("global", "ioengine", "dfs", "fio --name=global --ioengine='dfs'") + self.fio_cmd.update( + "global", "numjobs", self.fio_numjobs, + f"fio --name=global --numjobs={self.fio_numjobs}") + self.fio_cmd.update( + "global", "cpus_allowed", self.fio_cpus_allowed, + f"fio --name=global --cpus_allowed={self.fio_cpus_allowed}") + # NOTE DFS ioengine options must come after the ioengine that defines them is selected. + self.fio_cmd.update( + "job", "pool", self.pool.uuid, + f"fio --name=job --pool={self.pool.uuid}") + self.fio_cmd.update( + "job", "cont", self.container.uuid, + f"fio --name=job --cont={self.container.uuid}") + self.fio_cmd.hosts = self.hostlist_clients + + bws = {} + for rw in Pil4dfsFio._FIO_RW_NAMES: + self.fio_cmd.update("job", "rw", rw, f"fio --name=job --rw={rw}") + + params = ", ".join(f"{name}={value}" for name, value in self.fio_params.items()) + self.log.info("Running FIO command: rw=%s, %s", rw, params) + self.log.debug("FIO command: %s", str(self.fio_cmd)) + result = self.fio_cmd.run() + bws[rw] = self._get_bandwidth(result, rw) + self.log.debug("DFS bandwidths for %s: %s", rw, bws[rw]) + + self._destroy_container() + + return bws + + def test_pil4dfs_vs_dfs(self): + """Jira ID: DAOS-14657. + + Test Description: + Run FIO over DFUSE mount point with PIL4DFS interception library + Run FIO with DFS ioengine + Check bandwidth consistency + + :avocado: tags=all,daily_regression + :avocado: tags=hw,medium + :avocado: tags=pil4dfs,dfuse,dfs,fio + :avocado: tags=Pil4dfsFio,test_pil4dfs_vs_dfs + """ + bw_deltas = {} + for name in self._FIO_RW_NAMES: + bw_deltas[name] = self.params.get( + name.lower(), "/run/test_pil4dfs_vs_dfs/bw_deltas/*", 0) + + self.log_step("Running FIO with DFuse") + dfuse_bws = self._run_fio_dfuse() + + self.log_step("Running FIO with DFS") + dfs_bws = self._run_fio_dfs() + + self.log_step("Comparing FIO bandwidths of DFuse and DFS") + for rw in Pil4dfsFio._FIO_RW_NAMES: + delta = abs(dfuse_bws[rw] - dfs_bws[rw]) * 100 / max(dfuse_bws[rw], dfs_bws[rw]) + self.log.debug( + "Comparing %s bandwidths: delta=%.2f%%, DFuse=%s (%iB), DFS=%s (%iB)", + rw, delta, bytes_to_human(dfuse_bws[rw]), dfuse_bws[rw], + bytes_to_human(dfs_bws[rw]), dfs_bws[rw]) + if bw_deltas[rw] <= delta: + self.log.info( + "FIO %s bandwidth difference should be < %i%%: got=%.2f%%", + rw, bw_deltas[rw], delta) + + self.log_step("Test passed") diff --git a/src/tests/ftest/dfuse/pil4dfs_fio.yaml b/src/tests/ftest/dfuse/pil4dfs_fio.yaml new file mode 100644 index 00000000000..d6bd1ea43c4 --- /dev/null +++ b/src/tests/ftest/dfuse/pil4dfs_fio.yaml @@ -0,0 +1,70 @@ +hosts: + test_servers: 2 + test_clients: 1 + +timeout: 600 + +server_config: + name: daos_server + engines_per_host: 2 + engines: + 0: + pinned_numa_node: 0 + fabric_iface: ib0 + fabric_iface_port: 31317 + log_file: daos_server0.log + storage: auto + 1: + pinned_numa_node: 1 + fabric_iface: ib1 + fabric_iface_port: 31417 + log_file: daos_server1.log + storage: auto + +pool: + size: 90% + +container: + type: POSIX + control_method: daos + +test_pil4dfs_vs_dfs: + bw_deltas: + read: 10 + write: 10 + +fio: + names: + - global + - job + output_format: 'json' + global: + direct: 1 + iodepth: 1 + time_based: 1 + runtime: '60s' + ramp_time: '5s' + group_reporting: 1 + cpus_allowed_policy: split + numa_mem_policy: bind:all + percentile_list: '99.0:99.9:99.99:99.999:99.9999:100' + disable_slat: 1 + disable_clat: 1 + job: + params: !mux + 256B_fork: + thread: 0 + blocksize: '256B' + size: '32K' + 256B_pthread: + thread: 1 + blocksize: '256B' + size: '32K' + 1M_fork: + thread: 0 + blocksize: '1M' + size: '64M' + 1M_pthread: + thread: 1 + blocksize: '1M' + size: '64M' diff --git a/src/tests/ftest/util/cpu_utils.py b/src/tests/ftest/util/cpu_utils.py new file mode 100644 index 00000000000..a41bc51bedb --- /dev/null +++ b/src/tests/ftest/util/cpu_utils.py @@ -0,0 +1,478 @@ +""" + (C) Copyright 2024 Intel Corporation. + + SPDX-License-Identifier: BSD-2-Clause-Patent +""" + +import json +import re + +from ClusterShell.NodeSet import NodeSet +from general_utils import get_host_data + +DATA_ERROR = "[ERROR]" + + +class CpuException(Exception): + """Exception for the CpuInfo class.""" + + +class CpuArchitecture(): + # pylint: disable=unnecessary-lambda,too-many-public-methods + """Information about a CPU architecture.""" + + _lscpu_converter = { + 'Architecture:': ('_arch', lambda x: x), + 'CPU op-mode(s):': ('_op_modes', lambda x: [s.strip() for s in x.split(',')]), + 'Byte Order:': ('_byte_order', lambda x: 'little' if x == 'Little Endian' else 'big'), + 'CPU(s):': ('_quantity', lambda x: int(x)), + 'On-line CPU(s) list:': ('_online', lambda x: [int(i) for i in NodeSet('[' + x + ']')]), + 'Thread(s) per core:': ('_threads_core', lambda x: int(x)), + 'Core(s) per socket:': ('_cores_socket', lambda x: int(x)), + 'Socket(s):': ('_sockets', lambda x: int(x)), + 'NUMA node(s):': ('_numas', lambda x: int(x)), + 'Vendor ID:': ('_vendor', lambda x: x), + 'CPU family:': ('_family', lambda x: int(x)), + 'Model:': ('_model', lambda x: int(x)), + 'Model name:': ('_model_name', lambda x: x), + 'Stepping:': ('_stepping', lambda x: int(x)), + 'CPU MHz:': ('_freq', lambda x: float(x)), + 'CPU min MHz:': ('_freq_min', lambda x: float(x)), + 'CPU max MHz:': ('_freq_max', lambda x: float(x)), + 'BogoMIPS:': ('_bogo', lambda x: float(x)), + 'Virtualization:': ('_virt', lambda x: x), + 'L1d cache:': ('_cache_l1d', lambda x: x), + 'L1i cache:': ('_cache_l1i', lambda x: x), + 'L2 cache:': ('_cache_l2', lambda x: x), + 'L3 cache:': ('_cache_l3', lambda x: x), + 'Flags:': ('_flags', lambda x: list(x.split(' '))) + } + _lscpu_numas_re = re.compile(r'NUMA node(\d+) CPU\(s\):', re.ASCII) + + def __init__(self, logger, data): + """Initialize a StorageDevice object. + + Args: + logger (logger): logger for the messages produced by this class + data (str): json output of the 'lscpu' command + """ + self._log = logger + cpu_arch = json.loads(data) + self._numa_cpus = {} + for it in cpu_arch['lscpu']: + field = it['field'] + + if field in CpuArchitecture._lscpu_converter: + key, func = CpuArchitecture._lscpu_converter[field] + setattr(self, key, func(it['data'])) + continue + + numa_match = CpuArchitecture._lscpu_numas_re.fullmatch(field) + if numa_match: + numa_idx = int(numa_match.group(1)) + self._numa_cpus[numa_idx] = [int(i) for i in NodeSet('[' + it['data'] + ']')] + + def __str__(self): + """Convert this CpuArchitecture into a string. + + Returns: + str: the string version of the parameter's value + + """ + msg = "CPU Architecture:\n" + for key, it in CpuArchitecture._lscpu_converter.items(): + msg += f"\t- {key} " + attr_name = it[0] + if hasattr(self, attr_name): + val = str(getattr(self, attr_name)) + if attr_name == '_online': + val = str(NodeSet(val)) + msg += f"{val}\n" + else: + msg += "Undefined\n" + for numa_idx in sorted(self._numa_cpus): + msg += f"\t- NUMA node{numa_idx}: {NodeSet(str(self._numa_cpus[numa_idx]))}\n" + + return msg + + def __repr__(self): + """Convert this CpuArchitecture into a string representation. + + Returns: + str: raw string representation of the parameter's value + + """ + msg = r'{"lscpu":[' + + first_it = True + for key, it in CpuArchitecture._lscpu_converter.items(): + attr_name = it[0] + if not hasattr(self, attr_name): + continue + if first_it: + first_it = False + else: + msg += ',' + msg += '{' + f'"field":"{key}","data":"' + val = getattr(self, attr_name) + if attr_name == '_online': + val = str(NodeSet(str(val)))[1:-1] + elif attr_name == '_flags': + val = ' '.join(flag for flag in val) + elif attr_name == '_op_modes': + val = ', '.join(mode for mode in val) + elif attr_name == '_byte_order': + val = 'Little Endian' if val == 'little' else 'Big Endian' + else: + val = str(val) + msg += f'{val}"' + '}' + + for numa_idx in sorted(self._numa_cpus): + if first_it: + first_it = False + else: + msg += ',' + msg += '{' + f'"field":"NUMA node{numa_idx} CPU(s):","data":"' + msg += str(NodeSet(str(self._numa_cpus[numa_idx])))[1:-1] + '"}' + + msg += ']}' + + return msg + + def _raise_error(self, message, error=None): + """Raise and log the error message. + + Args: + message (str): error description + error (optional, Exception): exception from which to raise. Defaults to None. + + Raises: + StorageException: with the provided error description + + """ + self._log.error(message) + if error: + raise CpuException(message) from error + raise CpuException(message) + + def _getattr(self, name): + """Return the value of the named attribute of object. + + Args: + name (str): name of the attribute to retrieve + + Returns: + object: value of the attributes + + """ + if not hasattr(self, name): + self._raise_error(f"ERROR: the CPU property '{name}' is undefined.") + return getattr(self, name) + + @property + def arch(self): + """Get the CPU architecture name. + + Returns: + str: the CPU architecture name (e.g. 'x86_64') + + """ + return self._getattr('_arch') + + @property + def op_modes(self): + """Get the CPU operations mode(s) + + Returns: + list: a list of supported CPU operation mode + + """ + return self._getattr('_op_modes') + + @property + def byte_order(self): + """Get the endianness of the CPU architecture. + + Returns: + str: the name of of the endianness ('little' or 'big') + + """ + return self._getattr('_byte_order') + + @property + def quantity(self): + """Get the quantity of CPUs. + + Returns: + int: the quantity of CPUs + + """ + return self._getattr('_quantity') + + @property + def online(self): + """Get the list of online CPUs. + + Returns: + list: a list of online CPUs + + """ + return self._getattr('_online') + + @property + def threads_core(self): + """Get the quantity of thread per core. + + Returns: + int: the quantity of thread per core + + """ + return self._getattr('_threads_core') + + @property + def cores_socket(self): + """Get the quantity of core per socket. + + Returns: + int: the quantity of core per socket + + """ + return self._getattr('_cores_socket') + + @property + def sockets(self): + """Get the quantity of socket. + + Returns: + int: the quantity of socket + + """ + return self._getattr('_sockets') + + @property + def numas(self): + """Get the quantity of numa node. + + Returns: + int: the quantity of numa node + + """ + return self._getattr('_numas') + + @property + def vendor(self): + """Get the vendor name identifier. + + Returns: + str: the vendor name identifier + + """ + return self._getattr('_vendor') + + @property + def family(self): + """Get the CPU family identifier. + + Returns: + int: the CPU family identifier + + """ + return self._getattr('_family') + + @property + def model(self): + """Get the CPU model identifier. + + Returns: + int: the CPU model identifier + + """ + return self._getattr('_model') + + @property + def model_name(self): + """Get the CPU model name. + + Returns: + str: the CPU model name + + """ + return self._getattr('_model_name') + + @property + def stepping(self): + """Get the stepping lithography level. + + Returns: + int: the stepping lithography level + + """ + return self._getattr('_stepping') + + @property + def freq(self): + """Get the CPU frequency in MHz. + + Returns: + float: the CPU frequency in MHz + + """ + return self._getattr('_freq') + + @property + def freq_min(self): + """Get the minimal CPU frequency in MHz. + + Returns: + float: the minimal CPU frequency in MHz + + """ + return self._getattr('_freq_min') + + @property + def freq_max(self): + """Get the maximal CPU frequency in MHz. + + Returns: + float: the maximal CPU frequency in MHz + + """ + return self._getattr('_freq_max') + + @property + def bogo(self): + """Get the BogoMips performance measure. + + Returns: + float: the BogoMips performance measure + + """ + return self._getattr('_bogo') + + @property + def virt(self): + """Get the name of the virtualization supported technology. + + Returns: + str: the name of the virtualization supported technology + + """ + return self._getattr('_virt') + + @property + def cache_l1d(self): + """Get the size of the L1 data cache. + + Returns: + str: the size of the L1 data cache + + """ + return self._getattr('_cache_l1d') + + @property + def cache_l1i(self): + """Get the size of the L1 instruction cache. + + Returns: + str: the size of the L1 instruction cache + + """ + return self._getattr('_cache_l1i') + + @property + def cache_l2(self): + """Get the size of the L2 cache. + + Returns: + str: the size of the L2 cache. + + """ + return self._getattr('_cache_l2') + + @property + def cache_l3(self): + """Get the size of the L3 cache. + + Returns: + str: the size of the L3 cache + + """ + return self._getattr('_cache_l3') + + @property + def flags(self): + """Get the list of supported features. + + Returns: + list: a list of supported features + + """ + return self._getattr('_flags') + + def get_numa_cpus(self, numa_id): + """Get the list of cpus for a given numa node. + + Args: + numa_id (int): numa identifier + + Returns: + list: a list of cpu index + + """ + if numa_id not in self._numa_cpus: + self._raise_error("Error: Invalid NUMA identifier") + return self._numa_cpus[numa_id] + + +class CpuInfo(): + """Information about CPU architecture""" + + def __init__(self, logger, hosts, timeout=60): + """Initialize a CpuInfo object. + + Args: + logger (logger): logger for the messages produced by this class + hosts (NodeSet): set of hosts from which to obtain the CPU architecture information + timeout (int, optional): timeout used when obtaining host data. + Defaults to 60 seconds. + """ + self._log = logger + self._hosts = hosts.copy() + self._timeout = timeout + self._architectures = {} + + def scan(self): + """Detect the cpu architecture on every host.""" + cmd = 'lscpu --json' + text = "CPU" + error = "No CPUs detected" + host_data = get_host_data(self._hosts, cmd, text, error, self._timeout) + + for it in host_data: + data = it['data'] + if data == DATA_ERROR: + self._log.error(f"Error issuing command '{cmd}' on hosts {it.hosts}") + + key = str(it["hosts"]) + self._architectures[key] = CpuArchitecture(self._log, data) + + def get_architectures(self, hosts=None): + """Get the cpu architectures of a given set of hosts. + + Args: + hosts (NodeSet): target hosts + Defaults to None. + + Returns: + list: list of couple containing dictionary of cpu architecture for NodeSet of hosts + + """ + hosts_arch = [] + for nodes_str, arch in self._architectures.items(): + nodes = NodeSet(nodes_str) + if hosts is not None: + nodes = hosts.intersection(nodes) + if len(nodes) == 0: + continue + hosts_arch.append((nodes, arch)) + return hosts_arch diff --git a/src/tests/ftest/util/fio_utils.py b/src/tests/ftest/util/fio_utils.py index 7a0fa3a60ef..1a511257afd 100644 --- a/src/tests/ftest/util/fio_utils.py +++ b/src/tests/ftest/util/fio_utils.py @@ -1,5 +1,5 @@ """ - (C) Copyright 2019-2023 Intel Corporation. + (C) Copyright 2019-2024 Intel Corporation. SPDX-License-Identifier: BSD-2-Clause-Patent """ @@ -393,3 +393,9 @@ def __init__(self, namespace, name): self.steadystate = FormattedParameter("--steadystate={}") self.steadystate_duration = FormattedParameter("--steadystate_duration={}") self.steadystate_ramp_time = FormattedParameter("--steadystate_ramp_time={}") + + # NOTE DFS ioengine options must come after the ioengine that defines them is selected. + self.pool = FormattedParameter("--pool={}") + self.cont = FormattedParameter("--cont={}") + self.chunk_size = FormattedParameter("--chunk_size={}") + self.object_class = FormattedParameter("--object_class={}") diff --git a/utils/cq/words.dict b/utils/cq/words.dict index c5ca4dd5d41..1cee09198db 100644 --- a/utils/cq/words.dict +++ b/utils/cq/words.dict @@ -165,6 +165,8 @@ dsync dtx eg enablement +endian +endianness endswith enospace enum