From 235a99782c72ee7532f9043b3f6421eaa0ee34cc Mon Sep 17 00:00:00 2001 From: Vignesh Rao Date: Tue, 31 Dec 2024 19:27:32 -0600 Subject: [PATCH] Finalize module and release alpha1 version --- .github/workflows/python-publish.yaml | 48 ++++++++ .gitignore | 2 + pydisk/__init__.py | 34 ++++++ pydisk/linux.py | 44 +++++++ pydisk/macOS.py | 113 ++++++++++++++++++ pydisk/models.py | 44 +++++++ pydisk/squire.py | 33 ++++++ pydisk/windows.py | 161 ++++++++++++++++++++++++++ pyproject.toml | 47 ++++++++ 9 files changed, 526 insertions(+) create mode 100644 .github/workflows/python-publish.yaml create mode 100644 .gitignore create mode 100644 pydisk/__init__.py create mode 100644 pydisk/linux.py create mode 100644 pydisk/macOS.py create mode 100644 pydisk/models.py create mode 100644 pydisk/squire.py create mode 100644 pydisk/windows.py create mode 100644 pyproject.toml diff --git a/.github/workflows/python-publish.yaml b/.github/workflows/python-publish.yaml new file mode 100644 index 0000000..a35b254 --- /dev/null +++ b/.github/workflows/python-publish.yaml @@ -0,0 +1,48 @@ +name: pypi-publish + +on: + release: + types: + - published + push: + branches: + - main + paths: + - '**/*.py' + - 'pyproject.toml' + workflow_dispatch: + inputs: + dry_run: + type: choice + description: Dry run mode + required: true + options: + - "true" + - "false" + +concurrency: + group: ${{ github.workflow }} + cancel-in-progress: true + +jobs: + pypi-publisher: + runs-on: thevickypedia-lite + steps: + - name: Set dry-run + run: | + if [[ "${{ github.event_name }}" == "workflow_dispatch" ]]; then + echo "::notice title=DryRun::Setting dry run to ${{ inputs.dry_run }} for '${{ github.event_name }}' event" + echo "dry_run=${{ inputs.dry_run }}" >> $GITHUB_ENV + elif [[ "${{ github.event_name }}" == "push" ]]; then + echo "::notice title=DryRun::Setting dry run to true for '${{ github.event_name }}' event" + echo "dry_run=true" >> $GITHUB_ENV + else + echo "::notice title=DryRun::Setting dry run to false for '${{ github.event_name }}' event" + echo "dry_run=false" >> $GITHUB_ENV + fi + shell: bash + - uses: thevickypedia/pypi-publisher@v3 + env: + token: ${{ secrets.PYPI_TOKEN }} + with: + dry-run: ${{ env.dry_run }} diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..e04276f --- /dev/null +++ b/.gitignore @@ -0,0 +1,2 @@ +.idea +venv diff --git a/pydisk/__init__.py b/pydisk/__init__.py new file mode 100644 index 0000000..f08fd0e --- /dev/null +++ b/pydisk/__init__.py @@ -0,0 +1,34 @@ +import logging +import os +from typing import Dict, List + +from . import linux, macOS, windows, models + +version = "0.0.0-a1" + +LOGGER = logging.getLogger(__name__) + + +def get_disk_lib(user_input: str | os.PathLike): + disk_lib = ( + user_input or + os.environ.get("disk_lib") or + os.environ.get("DISK_LIB") or + models.default_disk_lib()[models.OPERATING_SYSTEM] + ) + assert os.path.isfile(disk_lib), f"Disk library {disk_lib!r} doesn't exist" + return disk_lib + + +def get_all_disks(disk_lib: str | os.PathLike = None) -> List[Dict[str, str]]: + """OS-agnostic function to get all disks connected to the host system.""" + os_map = { + models.OperatingSystem.darwin: macOS.drive_info, + models.OperatingSystem.linux: linux.drive_info, + models.OperatingSystem.windows: windows.drive_info, + } + try: + disk_lib = get_disk_lib(disk_lib) + return os_map[models.OperatingSystem(models.OPERATING_SYSTEM)](disk_lib) + except Exception as error: + LOGGER.error(error) diff --git a/pydisk/linux.py b/pydisk/linux.py new file mode 100644 index 0000000..3ae6398 --- /dev/null +++ b/pydisk/linux.py @@ -0,0 +1,44 @@ +import json +import os +import subprocess +from typing import Dict, List + + +def drive_info(disk_lib: str | os.PathLike) -> List[Dict[str, str]]: + """Get disks attached to Linux devices. + + Returns: + List[Dict[str, str]]: + Returns disks information for Linux distros. + """ + # Using -d to list only physical disks, and filtering out loop devices + result = subprocess.run( + [disk_lib, "-o", "NAME,SIZE,TYPE,MODEL,MOUNTPOINT", "-J"], + capture_output=True, + text=True, + ) + data = json.loads(result.stdout) + disks = [] + for device in data.get("blockdevices", []): + if device["type"] == "disk": + disk_info = { + "DeviceID": device["name"], + "Size": device["size"], + "Name": device.get("model", "Unknown"), + "Mountpoints": [], + } + # Collect mount points from partitions + if "children" in device: + for partition in device["children"]: + if partition.get("mountpoint"): + disk_info["Mountpoints"].append(partition["mountpoint"]) + if not disk_info["Mountpoints"] and device.get("mountpoint"): + if isinstance(device["mountpoint"], list): + disk_info["Mountpoints"] = device["mountpoint"] + else: + disk_info["Mountpoints"] = [device["mountpoint"]] + elif not disk_info["Mountpoints"]: + disk_info["Mountpoints"] = ["Not Mounted"] + disk_info["Mountpoints"] = ", ".join(disk_info["Mountpoints"]) + disks.append(disk_info) + return disks diff --git a/pydisk/macOS.py b/pydisk/macOS.py new file mode 100644 index 0000000..a937e2f --- /dev/null +++ b/pydisk/macOS.py @@ -0,0 +1,113 @@ +import logging +import os +import re +import subprocess +from collections import defaultdict +from typing import Dict, List + +from pydisk import squire + +LOGGER = logging.getLogger(__name__) + + +def parse_size(input_string: str) -> int: + """Extracts size in bytes from a string. + + Args: + input_string: Input string from diskutil output. + + Returns: + int: + Returns the size in bytes as an integer. + """ + match = re.search(r"\((\d+) Bytes\)", input_string) + return int(match.group(1)) if match else 0 + + +def update_mountpoints( + disks: List[Dict[str, str]], device_ids: defaultdict +) -> defaultdict: + """Updates mount points for physical devices based on diskutil data. + + Args: + disks: All disk info data as list. + device_ids: Device IDs as default dict. + + Returns: + defaultdict: + Returns a defaultdict object with updated mountpoints as list. + """ + for disk in disks: + part_of_whole = disk.get("Part of Whole") + apfs_store = disk.get("APFS Physical Store", "") + mount_point = disk.get("Mount Point") + read_only = "Yes" in disk.get("Volume Read-Only") + if mount_point and not mount_point.startswith("/System/Volumes/"): + if part_of_whole in device_ids: + device_ids[part_of_whole].append(mount_point) + else: + for device_id in device_ids: + if apfs_store.startswith(device_id) and read_only: + device_ids[device_id].append(mount_point) + for device_id, mountpoints in device_ids.items(): + if not mountpoints: + device_ids[device_id] = ["Not Mounted"] + return device_ids + + +def parse_diskutil_output(stdout: str) -> List[Dict[str, str]]: + """Parses `diskutil info -all` output into structured data. + + Args: + stdout: Standard output from diskutil command. + + Returns: + List[Dict[str, str]]: + Returns a list of dictionaries with parsed drives' data. + """ + disks = [] + disk_info = {} + for line in stdout.splitlines(): + line = line.strip() + if not line: + continue + if line == "**********": + disks.append(disk_info) + disk_info = {} + else: + key, value = map(str.strip, line.split(":", 1)) + disk_info[key] = value + return disks + + +def drive_info(disk_lib: str | os.PathLike) -> List[Dict[str, str]]: + """Get disks attached to macOS devices. + + Returns: + List[Dict[str, str]]: + Returns disks information for macOS devices. + """ + result = subprocess.run( + [disk_lib, "info", "-all"], capture_output=True, text=True + ) + disks = parse_diskutil_output(result.stdout) + device_ids = defaultdict(list) + physical_disks = [] + for disk in disks: + if disk.get("Virtual") == "No": + physical_disks.append( + { + "Name": disk.get("Device / Media Name"), + "Size": squire.size_converter( + parse_size(disk.get("Disk Size", "")) + ), + "DeviceID": disk.get("Device Identifier"), + "Node": disk.get("Device Node"), + } + ) + # Instantiate default dict with keys as DeviceIDs and values as empty list + _ = device_ids[disk["Device Identifier"]] + mountpoints = update_mountpoints(disks, device_ids) + for disk in physical_disks: + disk["Mountpoints"] = ", ".join(mountpoints[disk["DeviceID"]]) + return physical_disks diff --git a/pydisk/models.py b/pydisk/models.py new file mode 100644 index 0000000..d681588 --- /dev/null +++ b/pydisk/models.py @@ -0,0 +1,44 @@ +import platform + +try: + from enum import StrEnum +except ImportError: + from enum import Enum + + + class StrEnum(str, Enum): + """Custom StrEnum object for python3.10""" + +OPERATING_SYSTEM = platform.system().lower() + + +class OperatingSystem(StrEnum): + """Operating system names. + + >>> OperatingSystem + + """ + + linux: str = "linux" + darwin: str = "darwin" + windows: str = "windows" + + +if OPERATING_SYSTEM not in ( + OperatingSystem.linux, + OperatingSystem.darwin, + OperatingSystem.windows, +): + raise RuntimeError( + f"{OPERATING_SYSTEM!r} is unsupported.\n\t" + "Host machine should either be macOS, Windows or any Linux distros" + ) + + +def default_disk_lib(): + """Returns the default disks' library dedicated to each supported operating system.""" + return dict( + linux="/usr/bin/lsblk", + darwin="/usr/sbin/diskutil", + windows="C:\\Program Files\\PowerShell\\7\\pwsh.exe" + ) diff --git a/pydisk/squire.py b/pydisk/squire.py new file mode 100644 index 0000000..7679208 --- /dev/null +++ b/pydisk/squire.py @@ -0,0 +1,33 @@ +import math + + +def format_nos(input_: float) -> int | float: + """Removes ``.0`` float values. + + Args: + input_: Strings or integers with ``.0`` at the end. + + Returns: + int | float: + Int if found, else returns the received float value. + """ + return int(input_) if isinstance(input_, float) and input_.is_integer() else input_ + + +def size_converter(byte_size: int | float) -> str: + """Gets the current memory consumed and converts it to human friendly format. + + Args: + byte_size: Receives byte size as argument. + + Returns: + str: + Converted human-readable size. + """ + if byte_size: + size_name = ("B", "KB", "MB", "GB", "TB", "PB", "EB", "ZB", "YB") + index = int(math.floor(math.log(byte_size, 1024))) + return ( + f"{format_nos(round(byte_size / pow(1024, index), 2))} {size_name[index]}" + ) + return "0 B" diff --git a/pydisk/windows.py b/pydisk/windows.py new file mode 100644 index 0000000..50db5c6 --- /dev/null +++ b/pydisk/windows.py @@ -0,0 +1,161 @@ +import collections +import json +import logging +import os +import re +import subprocess +from typing import Dict, List, Tuple + +from pydisk import squire + +LOGGER = logging.getLogger(__name__) + + +def reformat_windows(data: Dict[str, str | int | float]) -> Dict[str, str]: + """Reformats each drive's information for Windows OS. + + Args: + data: Data as a dictionary. + + Returns: + Dict[str, str]: + Returns a dictionary of key-value pairs. + """ + data["ID"] = data["DeviceID"][-1] + data["Name"] = data["Model"] + data["DeviceID"] = data["DeviceID"].replace("\\", "").replace(".", "") + data["Size"] = squire.size_converter(data["Size"]) + del data["Caption"] + del data["Model"] + return data + + +def get_drives(disk_lib: str | os.PathLike) -> List[Dict[str, str]]: + """Get physical drives connected to a Windows machine. + + Returns: + List[Dict[str, str]]: + Returns the formatted data for all the drives as a list of key-value pairs. + """ + # noinspection LongLine + ps_command = "Get-CimInstance Win32_DiskDrive | Select-Object Caption, DeviceID, Model, Partitions, Size | ConvertTo-Json" # noqa: E501 + result = subprocess.run( + [disk_lib, "-Command", ps_command], capture_output=True, text=True + ) + disks_info = json.loads(result.stdout) + if isinstance(disks_info, list): + return [reformat_windows(info) for info in disks_info] + return [reformat_windows(disks_info)] + + +def clean_ansi_escape_sequences(text: str) -> str: + """Regular expression to remove ANSI escape sequences. + + Args: + text: Text with ansi escape characters. + + Returns: + str: + Cleaned text. + """ + ansi_escape = re.compile(r"\x1b\[[0-9;]*[mGKF]") + return ansi_escape.sub("", text) + + +def get_physical_disks_and_partitions(disk_lib: str | os.PathLike) -> List[Tuple[str, str, str]]: + """Powershell Core command to get physical disks and their partitions with drive letters (mount points). + + Returns: + List[Tuple[str, str, str]]: + List of tuples with disk_number, partition_number, mount_point. + """ + command_ps = [ + disk_lib, + "-Command", + """ + Get-PhysicalDisk | ForEach-Object { + $disk = $_ + $partitions = Get-Partition -DiskNumber $disk.DeviceID + $partitions | ForEach-Object { + [PSCustomObject]@{ + DiskNumber = $disk.DeviceID + Partition = $_.PartitionNumber + DriveLetter = (Get-Volume -Partition $_).DriveLetter + MountPoint = (Get-Volume -Partition $_).DriveLetter + } + } + } + """, + ] + + # Run the PowerShell command using subprocess.run + result = subprocess.run( + command_ps, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True + ) + + if result.stderr: + LOGGER.error(result.stderr) + return [] + + # Clean the output to remove ANSI escape sequences + cleaned_output = clean_ansi_escape_sequences(result.stdout) + + # Parse the output to get disk and partition info + disks_and_partitions = [] + # Split the cleaned output into lines and skip header and separator lines + lines = cleaned_output.splitlines() + for line in lines: + # Skip empty lines and headers (first 2 lines are headers) + if line.startswith("DiskNumber") or line.startswith("-"): + continue + + # Split the line into parts and extract the required info + parts = line.split() + if len(parts) >= 4: + disk_number = parts[0] + partition_number = parts[1] + mount_point = parts[3] # Assuming this is the drive letter (e.g., C, D) + disks_and_partitions.append((disk_number, partition_number, mount_point)) + + return disks_and_partitions + + +def get_disk_usage(disk_lib: str | os.PathLike) -> Dict[str, List[str]]: + """Get all physical disks and their partitions with mount points. + + Returns: + Dict[str, List[str]]: + Returns a dictionary of DeviceID as key and mount paths as value. + """ + disks_and_partitions = get_physical_disks_and_partitions(disk_lib) + + if not disks_and_partitions: + LOGGER.error("No disks or partitions found.") + return {} + + output_data = collections.defaultdict(list) + # Loop through the list of disks and partitions, and fetch disk usage for each mount point + for disk_number, partition_number, mount_point in disks_and_partitions: + # Construct the mount point path (e.g., C:\, D:\, etc.) + mount_path = f"{mount_point}:\\" + output_data[disk_number].append(mount_path) + return output_data + + +def drive_info(disk_lib: str | os.PathLike) -> List[Dict[str, str]]: + """Get disks attached to Windows devices. + + Returns: + List[Dict[str, str]]: + Returns disks information for Windows machines. + """ + data = get_drives(disk_lib) + usage = get_disk_usage(disk_lib) + for item in data: + device_id = item["ID"] + item.pop("ID") + if device_id in usage: + item["Mountpoints"] = ", ".join(usage[device_id]) + else: + item["Mountpoints"] = "Not Mounted" + return data diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 0000000..096040c --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,47 @@ +[project] +name = "PyDisk" +dynamic = ["version", "dependencies"] +description = "Python module to get physical drives connected to a host machine" +readme = "README.md" +authors = [{ name = "Vignesh Rao", email = "svignesh1793@gmail.com" }] +license = { file = "LICENSE" } +classifiers = [ + "License :: OSI Approved :: MIT License", + "Programming Language :: Python :: 3", + "Development Status :: 5 - Production/Stable", + "Operating System :: OS Independent", + "Topic :: System :: Hardware", +] +keywords = ["physical-drives", "PyDisk"] +requires-python = ">=3.10" + +[tool.setuptools] +packages = [ + "pydisk" +] +#[tool.setuptools.package-data] +#"pydisk.templates" = ["*.html"] + +[tool.setuptools.dynamic] +version = {attr = "pydisk.version"} +dependencies = { file = ["requirements.txt"] } + +[project.optional-dependencies] +dev = ["pre-commit"] +standard = [ + "Jinja2==3.1.*", + "requests==2.*", + "gmail-connector" +] + +[project.scripts] +# sends all the args to commandline function, where the arbitary commands as processed accordingly +pydisk = "pydisk:commandline" + +[build-system] +requires = ["setuptools", "wheel"] +build-backend = "setuptools.build_meta" + +[project.urls] +Homepage = "https://github.com/thevickypedia/PyDisk" +Source = "https://github.com/thevickypedia/PyDisk"