From 85310370978eca796efa02a4614826182540c200 Mon Sep 17 00:00:00 2001 From: Vignesh Rao Date: Wed, 1 Jan 2025 13:09:57 -0600 Subject: [PATCH] Restructure memory library --- pyarchitecture/memory.py | 101 ------------------------------ pyarchitecture/memory/__init__.py | 37 +++++++++++ pyarchitecture/memory/main.py | 96 ++++++++++++++++++++++++++++ 3 files changed, 133 insertions(+), 101 deletions(-) delete mode 100644 pyarchitecture/memory.py create mode 100644 pyarchitecture/memory/__init__.py create mode 100644 pyarchitecture/memory/main.py diff --git a/pyarchitecture/memory.py b/pyarchitecture/memory.py deleted file mode 100644 index 8d61eae..0000000 --- a/pyarchitecture/memory.py +++ /dev/null @@ -1,101 +0,0 @@ -import logging -import os -import subprocess - -from pyarchitecture import models - -LOGGER = logging.getLogger(__name__) - - -def get_memory_info_linux(mem_lib: str | os.PathLike): - memory_info = {} - with open(mem_lib) as f: - for line in f: - if line.startswith(('MemTotal', 'MemFree', 'MemAvailable', 'Buffers', 'Cached')): - parts = line.split() - memory_info[parts[0][:-1]] = int(parts[1]) # Convert the memory value to int (in kB) - - # Convert values to bytes (kB to bytes) - total = memory_info.get('MemTotal', 0) * 1024 - free = memory_info.get('MemFree', 0) * 1024 - available = memory_info.get('MemAvailable', 0) * 1024 - used = total - free - available - - return {'total': total, 'free': free, 'used': used} - - -def get_memory_info_macos(mem_lib: str | os.PathLike): - def get_sysctl_value(key): - result = subprocess.run([mem_lib, key], capture_output=True, text=True) - if result.stdout.strip(): - return int(result.stdout.split(":")[1].strip()) - return 0 - - total = get_sysctl_value('hw.memsize') - free = get_sysctl_value('vm.page_free_count') * get_sysctl_value('hw.pagesize') - used = total - free - - return {'total': total, 'free': free, 'used': used} - - -def get_memory_info_windows(): - import ctypes - try: - memory_info = ctypes.windll.kernel32.GlobalMemoryStatusEx - memstatus = ctypes.create_string_buffer(64) - memory_info(ctypes.byref(memstatus)) - total = ctypes.cast(memstatus[0], ctypes.c_ulonglong).value - free = ctypes.cast(memstatus[1], ctypes.c_ulonglong).value - "TypeError: cast() argument 2 must be a pointer type, not c_ulonglong" - used = total - free - return {'total': total, 'free': free, 'used': used} - except Exception: - class MEMORYSTATUSEX(ctypes.Structure): - _fields_ = [ - ("dwLength", ctypes.c_uint), - ("dwMemoryLoad", ctypes.c_uint), - ("ullTotalPhys", ctypes.c_ulonglong), - ("ullAvailPhys", ctypes.c_ulonglong), - ("ullTotalPageFile", ctypes.c_ulonglong), - ("ullAvailPageFile", ctypes.c_ulonglong), - ("ullTotalVirtual", ctypes.c_ulonglong), - ("ullAvailVirtual", ctypes.c_ulonglong), - ("sullAvailExtendedVirtual", ctypes.c_ulonglong) - ] - - # Initialize the MEMORYSTATUSEX structure - memory_status = MEMORYSTATUSEX() - memory_status.dwLength = ctypes.sizeof(MEMORYSTATUSEX) - - # Call GlobalMemoryStatusEx to fill in the memory_status structure - if ctypes.windll.kernel32.GlobalMemoryStatusEx(ctypes.byref(memory_status)) == 0: - raise Exception("Failed to retrieve memory status") - - # Extract the values from the structure - total = memory_status.ullTotalPhys # Total physical memory (in bytes) - available = memory_status.ullAvailPhys # Available physical memory (in bytes) - used = total - available # Used memory (in bytes) - - # Optionally, you can also include virtual memory information - virtual_total = memory_status.ullTotalVirtual - virtual_available = memory_status.ullAvailVirtual - - return { - 'total': total, - 'available': available, - 'used': used, - 'virtual_total': virtual_total, - 'virtual_available': virtual_available, - } - - -def get_memory_info(mem_lib: str | os.PathLike): - os_map = { - models.OperatingSystem.darwin: get_memory_info_macos, - models.OperatingSystem.linux: get_memory_info_linux, - models.OperatingSystem.windows: get_memory_info_windows - } - try: - return os_map[models.OPERATING_SYSTEM](mem_lib) - except Exception as error: - LOGGER.error(error) diff --git a/pyarchitecture/memory/__init__.py b/pyarchitecture/memory/__init__.py new file mode 100644 index 0000000..e8180e3 --- /dev/null +++ b/pyarchitecture/memory/__init__.py @@ -0,0 +1,37 @@ +import logging +import os +from typing import Dict, List + +from pyarchitecture import models +from pyarchitecture.memory import main + +LOGGER = logging.getLogger(__name__) + + +def _get_mem_lib(user_input: str | os.PathLike) -> str: + """Get the memory library for the appropriate OS. + + Args: + user_input: Memory library input by user. + """ + mem_lib = ( + user_input + or os.environ.get("mem_lib") + or os.environ.get("MEM_LIB") + or models.default_mem_lib()[models.OPERATING_SYSTEM] + ) + assert os.path.isfile(mem_lib), f"Memory library {mem_lib!r} doesn't exist" if mem_lib else None + return mem_lib + + +def get_memory_info(mem_lib: str | os.PathLike = None) -> Dict[str, int]: + """OS-agnostic function to get memory information. + + Args: + mem_lib: Custom memory library path. + + Returns: + List[Dict[str, str]]: + Returns the memory model and vendor information as a list of key-value pairs. + """ + return main.get_mem_info(_get_mem_lib(mem_lib)) diff --git a/pyarchitecture/memory/main.py b/pyarchitecture/memory/main.py new file mode 100644 index 0000000..577bfd5 --- /dev/null +++ b/pyarchitecture/memory/main.py @@ -0,0 +1,96 @@ +import logging +import os +import subprocess +from typing import Dict + +from pyarchitecture import models + +LOGGER = logging.getLogger(__name__) + + +def get_memory_info_linux(mem_lib: str | os.PathLike): + memory_info = {} + with open(mem_lib) as f: + for line in f: + if line.startswith(('MemTotal', 'MemFree', 'MemAvailable', 'Buffers', 'Cached')): + parts = line.split() + memory_info[parts[0][:-1]] = int(parts[1]) # Convert the memory value to int (in kB) + + # Convert values to bytes (kB to bytes) + total = memory_info.get('MemTotal', 0) * 1024 + free = memory_info.get('MemFree', 0) * 1024 + available = memory_info.get('MemAvailable', 0) * 1024 + used = total - free - available + + return {'total': total, 'free': free, 'used': used} + + +def get_memory_info_macos(mem_lib: str | os.PathLike): + def get_sysctl_value(key): + result = subprocess.run([mem_lib, key], capture_output=True, text=True) + if result.stdout.strip(): + return int(result.stdout.split(":")[1].strip()) + return 0 + + total = get_sysctl_value('hw.memsize') + free = get_sysctl_value('vm.page_free_count') * get_sysctl_value('hw.pagesize') + used = total - free + + return {'total': total, 'free': free, 'used': used} + + +def get_memory_info_windows(*args): + import ctypes + class MEMORYSTATUSEX(ctypes.Structure): + _fields_ = [ + ("dwLength", ctypes.c_uint), + ("dwMemoryLoad", ctypes.c_uint), + ("ullTotalPhys", ctypes.c_ulonglong), + ("ullAvailPhys", ctypes.c_ulonglong), + ("ullTotalPageFile", ctypes.c_ulonglong), + ("ullAvailPageFile", ctypes.c_ulonglong), + ("ullTotalVirtual", ctypes.c_ulonglong), + ("ullAvailVirtual", ctypes.c_ulonglong), + ("sullAvailExtendedVirtual", ctypes.c_ulonglong) + ] + + # Initialize the MEMORYSTATUSEX structure + memory_status = MEMORYSTATUSEX() + memory_status.dwLength = ctypes.sizeof(MEMORYSTATUSEX) + + # Load the kernel32 DLL and call GlobalMemoryStatusEx + memory_info = ctypes.windll.kernel32.GlobalMemoryStatusEx + + # Call GlobalMemoryStatusEx to fill in the memory_status structure + if memory_info(ctypes.byref(memory_status)) == 0: + LOGGER.error("Failed to retrieve memory status") + return {} + + # Extract the values from the structure + total = memory_status.ullTotalPhys # Total physical memory (in bytes) + available = memory_status.ullAvailPhys # Available physical memory (in bytes) + used = total - available # Used memory (in bytes) + + # Optionally, you can also include virtual memory information + virtual_total = memory_status.ullTotalVirtual + virtual_available = memory_status.ullAvailVirtual + + return { + 'total': total, + 'available': available, + 'used': used, + 'virtual_total': virtual_total, + 'virtual_available': virtual_available, + } + + +def get_mem_info(mem_lib: str | os.PathLike) -> Dict[str, int]: + os_map = { + models.OperatingSystem.darwin: get_memory_info_macos, + models.OperatingSystem.linux: get_memory_info_linux, + models.OperatingSystem.windows: get_memory_info_windows + } + try: + return os_map[models.OPERATING_SYSTEM](mem_lib) + except Exception as error: + LOGGER.error(error)