From 40759b0d3e7f842933c66b08ba294cca92dfc2bc Mon Sep 17 00:00:00 2001 From: vsivanandharao_expedia Date: Wed, 1 Jan 2025 14:34:50 -0600 Subject: [PATCH] Restructure memory module --- pyarchitecture/memory/__init__.py | 22 +++++-- pyarchitecture/memory/linux.py | 32 +++++++++++ pyarchitecture/memory/macOS.py | 75 ++++++++++++++++++++++++ pyarchitecture/memory/main.py | 96 ------------------------------- pyarchitecture/memory/windows.py | 62 ++++++++++++++++++++ pyarchitecture/models.py | 12 ++-- pyarchitecture/squire.py | 45 +++++++++++++++ 7 files changed, 235 insertions(+), 109 deletions(-) create mode 100644 pyarchitecture/memory/linux.py create mode 100644 pyarchitecture/memory/macOS.py delete mode 100644 pyarchitecture/memory/main.py create mode 100644 pyarchitecture/memory/windows.py diff --git a/pyarchitecture/memory/__init__.py b/pyarchitecture/memory/__init__.py index e8180e3..f0b3c81 100644 --- a/pyarchitecture/memory/__init__.py +++ b/pyarchitecture/memory/__init__.py @@ -1,9 +1,9 @@ import logging import os -from typing import Dict, List +from typing import Dict from pyarchitecture import models -from pyarchitecture.memory import main +from pyarchitecture.memory import linux, macOS, windows LOGGER = logging.getLogger(__name__) @@ -20,7 +20,9 @@ def _get_mem_lib(user_input: str | os.PathLike) -> str: or os.environ.get("MEM_LIB") or models.default_mem_lib()[models.OPERATING_SYSTEM] ) - assert os.path.isfile(mem_lib), f"Memory library {mem_lib!r} doesn't exist" if mem_lib else None + assert os.path.isfile(mem_lib), ( + f"Memory library {mem_lib!r} doesn't exist" if mem_lib else None + ) return mem_lib @@ -31,7 +33,15 @@ def get_memory_info(mem_lib: str | os.PathLike = None) -> Dict[str, int]: mem_lib: Custom memory library path. Returns: - List[Dict[str, str]]: - Returns the memory model and vendor information as a list of key-value pairs. + Dict[str, int]: + Returns the memory information as key-value pairs. """ - return main.get_mem_info(_get_mem_lib(mem_lib)) + os_map = { + models.OperatingSystem.darwin: macOS.get_memory_info, + models.OperatingSystem.linux: linux.get_memory_info, + models.OperatingSystem.windows: windows.get_memory_info, + } + try: + return os_map[models.OPERATING_SYSTEM](mem_lib) + except Exception as error: + LOGGER.error(error) diff --git a/pyarchitecture/memory/linux.py b/pyarchitecture/memory/linux.py new file mode 100644 index 0000000..1ba3537 --- /dev/null +++ b/pyarchitecture/memory/linux.py @@ -0,0 +1,32 @@ +import os +from typing import Dict + + +def get_memory_info(mem_lib: str | os.PathLike) -> Dict[str, int | str]: + """Get memory information on Linux systems. + + Args: + mem_lib: Memory library path. + + Returns: + Dict[str, int]: + Returns the memory information as key-value pairs. + """ + memory_info = {} + with open(mem_lib) as f: + for line in f: + if line.startswith( + ("MemTotal", "MemFree", "MemAvailable", "Buffers", "Cached") + ): + parts = line.split() + memory_info[parts[0][:-1]] = int( + parts[1] + ) # Convert the memory value to int (in kB) + + # Convert values to bytes (kB to bytes) + total = memory_info.get("MemTotal", 0) * 1024 + free = memory_info.get("MemFree", 0) * 1024 + available = memory_info.get("MemAvailable", 0) * 1024 + used = total - free - available + + return {"total": total, "free": free, "used": used} diff --git a/pyarchitecture/memory/macOS.py b/pyarchitecture/memory/macOS.py new file mode 100644 index 0000000..dc807bb --- /dev/null +++ b/pyarchitecture/memory/macOS.py @@ -0,0 +1,75 @@ +import os +import subprocess +from typing import Dict + +from pyarchitecture import squire + + +def byte_value(text: str, key: str) -> int: + """Converts the string value to bytes. + + Args: + text: Text containing the value. + key: Key to extract the value. + + Returns: + int: + Returns the value in bytes as an integer. + """ + return squire.convert_to_bytes(text.split(f"{key} = ")[1].split()[0]) + + +def get_sysctl_value(mem_lib: str | os.PathLike, key: str): + """Get the value of the key from sysctl. + + Args: + mem_lib: Memory library path. + key: Key to extract the value. + + Returns: + int: + Returns the value of the key as an integer. + """ + result = subprocess.run([mem_lib, key], capture_output=True, text=True) + if text := result.stdout.strip(): + if key == "vm.swapusage": + swap_usage = {} + if "total" in text: + swap_usage["swap_total"] = byte_value(text, "total") + if "used" in text: + swap_usage["swap_used"] = byte_value(text, "used") + if "free" in text: + swap_usage["swap_free"] = byte_value(text, "free") + return swap_usage + return int(text.split(":")[1].strip()) + return 0 + + +def get_memory_info(mem_lib: str | os.PathLike) -> Dict[str, int | str]: + """Get memory information on macOS systems. + + Args: + mem_lib: Memory library path. + + Returns: + Dict[str, int]: + Returns the memory information as key-value pairs. + """ + # Physical memory information + total = get_sysctl_value(mem_lib, "hw.memsize") + free = get_sysctl_value(mem_lib, "vm.page_free_count") * get_sysctl_value( + mem_lib, "hw.pagesize" + ) + used = total - free + + # Virtual memory information + swap_info = get_sysctl_value(mem_lib, "vm.swapusage") + + return { + **{ + "total": total, + "free": free, + "used": used, + }, + **swap_info, + } diff --git a/pyarchitecture/memory/main.py b/pyarchitecture/memory/main.py deleted file mode 100644 index 577bfd5..0000000 --- a/pyarchitecture/memory/main.py +++ /dev/null @@ -1,96 +0,0 @@ -import logging -import os -import subprocess -from typing import Dict - -from pyarchitecture import models - -LOGGER = logging.getLogger(__name__) - - -def get_memory_info_linux(mem_lib: str | os.PathLike): - memory_info = {} - with open(mem_lib) as f: - for line in f: - if line.startswith(('MemTotal', 'MemFree', 'MemAvailable', 'Buffers', 'Cached')): - parts = line.split() - memory_info[parts[0][:-1]] = int(parts[1]) # Convert the memory value to int (in kB) - - # Convert values to bytes (kB to bytes) - total = memory_info.get('MemTotal', 0) * 1024 - free = memory_info.get('MemFree', 0) * 1024 - available = memory_info.get('MemAvailable', 0) * 1024 - used = total - free - available - - return {'total': total, 'free': free, 'used': used} - - -def get_memory_info_macos(mem_lib: str | os.PathLike): - def get_sysctl_value(key): - result = subprocess.run([mem_lib, key], capture_output=True, text=True) - if result.stdout.strip(): - return int(result.stdout.split(":")[1].strip()) - return 0 - - total = get_sysctl_value('hw.memsize') - free = get_sysctl_value('vm.page_free_count') * get_sysctl_value('hw.pagesize') - used = total - free - - return {'total': total, 'free': free, 'used': used} - - -def get_memory_info_windows(*args): - import ctypes - class MEMORYSTATUSEX(ctypes.Structure): - _fields_ = [ - ("dwLength", ctypes.c_uint), - ("dwMemoryLoad", ctypes.c_uint), - ("ullTotalPhys", ctypes.c_ulonglong), - ("ullAvailPhys", ctypes.c_ulonglong), - ("ullTotalPageFile", ctypes.c_ulonglong), - ("ullAvailPageFile", ctypes.c_ulonglong), - ("ullTotalVirtual", ctypes.c_ulonglong), - ("ullAvailVirtual", ctypes.c_ulonglong), - ("sullAvailExtendedVirtual", ctypes.c_ulonglong) - ] - - # Initialize the MEMORYSTATUSEX structure - memory_status = MEMORYSTATUSEX() - memory_status.dwLength = ctypes.sizeof(MEMORYSTATUSEX) - - # Load the kernel32 DLL and call GlobalMemoryStatusEx - memory_info = ctypes.windll.kernel32.GlobalMemoryStatusEx - - # Call GlobalMemoryStatusEx to fill in the memory_status structure - if memory_info(ctypes.byref(memory_status)) == 0: - LOGGER.error("Failed to retrieve memory status") - return {} - - # Extract the values from the structure - total = memory_status.ullTotalPhys # Total physical memory (in bytes) - available = memory_status.ullAvailPhys # Available physical memory (in bytes) - used = total - available # Used memory (in bytes) - - # Optionally, you can also include virtual memory information - virtual_total = memory_status.ullTotalVirtual - virtual_available = memory_status.ullAvailVirtual - - return { - 'total': total, - 'available': available, - 'used': used, - 'virtual_total': virtual_total, - 'virtual_available': virtual_available, - } - - -def get_mem_info(mem_lib: str | os.PathLike) -> Dict[str, int]: - os_map = { - models.OperatingSystem.darwin: get_memory_info_macos, - models.OperatingSystem.linux: get_memory_info_linux, - models.OperatingSystem.windows: get_memory_info_windows - } - try: - return os_map[models.OPERATING_SYSTEM](mem_lib) - except Exception as error: - LOGGER.error(error) diff --git a/pyarchitecture/memory/windows.py b/pyarchitecture/memory/windows.py new file mode 100644 index 0000000..b7e90ef --- /dev/null +++ b/pyarchitecture/memory/windows.py @@ -0,0 +1,62 @@ +import ctypes +import logging +from typing import Dict + +LOGGER = logging.getLogger(__name__) + + +class MEMORYSTATUSEX(ctypes.Structure): + """Structure for the GlobalMemoryStatusEx function overriden by ctypes.Structure. + + >>> MEMORYSTATUSEX + + """ + + _fields_ = [ + ("dwLength", ctypes.c_uint), + ("dwMemoryLoad", ctypes.c_uint), + ("ullTotalPhys", ctypes.c_ulonglong), + ("ullAvailPhys", ctypes.c_ulonglong), + ("ullTotalPageFile", ctypes.c_ulonglong), + ("ullAvailPageFile", ctypes.c_ulonglong), + ("ullTotalVirtual", ctypes.c_ulonglong), + ("ullAvailVirtual", ctypes.c_ulonglong), + ("sullAvailExtendedVirtual", ctypes.c_ulonglong), + ] + + +def get_memory_info(*args) -> Dict[str, int]: + """Get memory information for Windows OS. + + Returns: + Dict[str, int]: + Returns the memory information as key-value pairs. + """ + # Initialize the MEMORYSTATUSEX structure + memory_status = MEMORYSTATUSEX() + memory_status.dwLength = ctypes.sizeof(MEMORYSTATUSEX) + + # Load the kernel32 DLL and call GlobalMemoryStatusEx + memory_info = ctypes.windll.kernel32.GlobalMemoryStatusEx + + # Call GlobalMemoryStatusEx to fill in the memory_status structure + if memory_info(ctypes.byref(memory_status)) == 0: + LOGGER.error("Failed to retrieve memory status") + return {} + + # Extract the values from the structure + total = memory_status.ullTotalPhys # Total physical memory (in bytes) + available = memory_status.ullAvailPhys # Available physical memory (in bytes) + used = total - available # Used memory (in bytes) + + # Optionally, you can also include virtual memory information + virtual_total = memory_status.ullTotalVirtual + virtual_available = memory_status.ullAvailVirtual + + return { + "total": total, + "available": available, + "used": used, + "virtual_total": virtual_total, + "virtual_available": virtual_available, + } diff --git a/pyarchitecture/models.py b/pyarchitecture/models.py index 9021967..c2529bd 100644 --- a/pyarchitecture/models.py +++ b/pyarchitecture/models.py @@ -5,10 +5,10 @@ except ImportError: from enum import Enum - class StrEnum(str, Enum): """Custom StrEnum object for python3.10.""" + OPERATING_SYSTEM = platform.system().lower() @@ -25,9 +25,9 @@ class OperatingSystem(StrEnum): if OPERATING_SYSTEM not in ( - OperatingSystem.linux, - OperatingSystem.darwin, - OperatingSystem.windows, + OperatingSystem.linux, + OperatingSystem.darwin, + OperatingSystem.windows, ): raise RuntimeError( f"{OPERATING_SYSTEM!r} is unsupported.\n\t" @@ -38,9 +38,7 @@ class OperatingSystem(StrEnum): def default_mem_lib(): """Returns the default memory library dedicated to linux and macOS.""" return dict( - linux="/proc/meminfo", - darwin="/usr/sbin/sysctl", - windows="" # placeholder + linux="/proc/meminfo", darwin="/usr/sbin/sysctl", windows="" # placeholder ) diff --git a/pyarchitecture/squire.py b/pyarchitecture/squire.py index 7679208..055dde0 100644 --- a/pyarchitecture/squire.py +++ b/pyarchitecture/squire.py @@ -31,3 +31,48 @@ def size_converter(byte_size: int | float) -> str: f"{format_nos(round(byte_size / pow(1024, index), 2))} {size_name[index]}" ) return "0 B" + + +def convert_to_bytes(size_str: str) -> int: + """Convert a size string to bytes. + + Args: + size_str (str): Size string to convert to bytes. + + Returns: + int: + Size in bytes. + """ + # Dictionary to map size units to their respective byte multipliers + units = { + "B": 1, # Bytes + "K": 1024, # Kilobytes + "M": 1024 * 1024, # Megabytes + "G": 1024 * 1024 * 1024, # Gigabytes + "T": 1024 * 1024 * 1024 * 1024, # Terabytes + "P": 1024 * 1024 * 1024 * 1024 * 1024, # Petabytes + "E": 1024 * 1024 * 1024 * 1024 * 1024 * 1024, # Exabytes + } + + # Strip extra spaces and make the string uppercase + size_str = size_str.strip().upper() + + # Find the last character, which should indicate the unit (B, K, M, G, T, P, E) + if size_str[-1] in units: + # Extract the numeric value and unit + numeric_part = size_str[ + :-1 + ].strip() # everything except the last character (unit) + unit_part = size_str[-1] # the last character (unit) + + # Ensure the numeric part is a valid number + try: + numeric_value = float(numeric_part) + except ValueError: + raise ValueError("Invalid numeric value.") + + # Convert the size to bytes using the multiplier from the dictionary + return int(numeric_value * units[unit_part]) + + else: + raise ValueError("Invalid size unit. Supported units are B, K, M, G, T, P, E.")