diff --git a/src/linktools/_config.py b/src/linktools/_config.py index 9974250c..73a1c3fe 100644 --- a/src/linktools/_config.py +++ b/src/linktools/_config.py @@ -39,6 +39,7 @@ TYPE_CHECKING, TypeVar, Type, Optional, Generator, \ Any, Tuple, IO, Mapping, Union, List, Dict, Callable +from . import utils from .decorator import cached_property from .metadata import __missing__ from .rich import prompt, confirm, choose @@ -158,8 +159,8 @@ def update_from_pyfile(self, filename: str, silent: bool = False) -> bool: d.sample = Config.Sample d.confirm = Config.Confirm try: - with open(filename, "rb") as config_file: - exec(compile(config_file.read(), filename, "exec"), d.__dict__) + data = utils.read_file(filename, text=False) + exec(compile(data, filename, "exec"), d.__dict__) except OSError as e: if silent and e.errno in (errno.ENOENT, errno.EISDIR, errno.ENOTDIR): return False diff --git a/src/linktools/assets/containers/100-nginx/container.py b/src/linktools/assets/containers/100-nginx/container.py index 6ebffd64..a419b3fa 100644 --- a/src/linktools/assets/containers/100-nginx/container.py +++ b/src/linktools/assets/containers/100-nginx/container.py @@ -40,9 +40,10 @@ class Container(BaseContainer): @cached_property def keys(self): # dnsapi.txt 内容从 https://github.com/acmesh-official/acme.sh/wiki/dnsapi 拷贝 - with open(os.path.join(os.path.dirname(__file__), "dnsapi.txt"), "rt") as fd: - pattern = re.compile(r'export +(\w+)="?') - return sorted(list(set(pattern.findall(fd.read())))) + path = os.path.join(os.path.dirname(__file__), "dnsapi.txt") + data = utils.read_file(path, text=True) + pattern = re.compile(r'export +(\w+)="?') + return sorted(list(set(pattern.findall(data)))) @cached_property def configs(self): diff --git a/src/linktools/assets/containers/120-flare/container.py b/src/linktools/assets/containers/120-flare/container.py index 8483d698..0360062a 100644 --- a/src/linktools/assets/containers/120-flare/container.py +++ b/src/linktools/assets/containers/120-flare/container.py @@ -29,7 +29,7 @@ import yaml -from linktools import Config +from linktools import Config, utils from linktools.container import BaseContainer from linktools.container.container import ExposeMixin, ExposeLink, ExposeCategory from linktools.decorator import cached_property @@ -58,7 +58,8 @@ def exposes(self) -> [ExposeLink]: return [ self.expose_other("在线工具集合", "tools", "", "https://tool.lu/"), self.expose_other("在线正则表达式", "regex", "", "https://regex101.com/"), - self.expose_other("正则表达式手册", "regex", "", "https://tool.oschina.net/uploads/apidocs/jquery/regexp.html"), + self.expose_other("正则表达式手册", "regex", "", + "https://tool.oschina.net/uploads/apidocs/jquery/regexp.html"), self.expose_other("在线json解析", "codeJson", "", "https://www.json.cn/"), self.expose_other("DNS查询", "dns", "", "https://tool.chinaz.com/dns/"), self.expose_other("图标下载", "progressDownload", "", "https://materialdesignicons.com/"), @@ -82,34 +83,38 @@ def on_starting(self): apps.append(expose) bookmarks.append(expose) - with open(self.get_app_path("app", "apps.yml", create_parent=True), "wt") as fd: - data = {"links": []} - for app in apps: + data = {"links": []} + for app in apps: + data["links"].append({ + "name": app.name, + "desc": app.desc, + "icon": app.icon, + "link": app.url, + }) + utils.write_file( + self.get_app_path("app", "apps.yml", create_parent=True), + yaml.dump(data), + ) + + data = {"categories": [], "links": []} + for category, links in categories.items(): + if not links: + continue + data["categories"].append({ + "id": category.name, + "title": category.desc, + }) + for link in links: data["links"].append({ - "name": app.name, - "desc": app.desc, - "icon": app.icon, - "link": app.url, + "category": category.name, + "name": link.name, + "icon": link.icon, + "link": link.url, }) - yaml.dump(data, fd) - - with open(self.get_app_path("app", "bookmarks.yml", create_parent=True), "wt") as fd: - data = {"categories": [], "links": []} - for category, links in categories.items(): - if not links: - continue - data["categories"].append({ - "id": category.name, - "title": category.desc, - }) - for link in links: - data["links"].append({ - "category": category.name, - "name": link.name, - "icon": link.icon, - "link": link.url, - }) - yaml.dump(data, fd) + utils.write_file( + self.get_app_path("app", "bookmarks.yml", create_parent=True), + yaml.dump(data), + ) self.manager.change_owner( self.get_app_path("app"), @@ -119,5 +124,4 @@ def on_starting(self): self.write_nginx_conf( self.manager.config.get("FLARE_DOAMIN"), self.get_path("nginx.conf"), - name="flare", ) diff --git a/src/linktools/cli/commands/common/cert.py b/src/linktools/cli/commands/common/cert.py index 72a2a0db..f1f195bf 100644 --- a/src/linktools/cli/commands/common/cert.py +++ b/src/linktools/cli/commands/common/cert.py @@ -87,7 +87,7 @@ def subject_name_hash_old(cert: OpenSSL.SSL.X509): cert = OpenSSL.crypto.load_certificate( OpenSSL.crypto.FILETYPE_PEM, - utils.read_file(os.path.expanduser(args.path), binary=True) + utils.read_file(os.path.expanduser(args.path), text=False) ) issuer = cert.get_issuer() diff --git a/src/linktools/cli/commands/common/grep.py b/src/linktools/cli/commands/common/grep.py index 78fac478..8648d897 100755 --- a/src/linktools/cli/commands/common/grep.py +++ b/src/linktools/cli/commands/common/grep.py @@ -111,7 +111,7 @@ def match(self, path: str): def on_file(self, filename: str): if os.path.exists(filename): try: - with open(filename, 'rb') as fd: + with open(filename, "rb") as fd: buffer = fd.read(1024) mimetype = magic.from_buffer(buffer, mime=True) if not GrepHandler.handle(self, filename, mimetype): diff --git a/src/linktools/cli/device.py b/src/linktools/cli/device.py index eeff3f77..a865d9e4 100644 --- a/src/linktools/cli/device.py +++ b/src/linktools/cli/device.py @@ -34,6 +34,7 @@ from typing import Optional, Callable, List, Type, Generic from . import BaseCommand +from .. import utils from ..android import Adb, AdbError, Device as AdbDevice from ..device import Bridge, BridgeError, BaseDevice, BridgeType, DeviceType from ..ios import Sib, SibError, Device as SibDevice @@ -48,13 +49,11 @@ def __init__(self, path: str): def read(self) -> Optional[str]: if os.path.exists(self.path): - with open(self.path, "rt") as fd: - return fd.read().strip() + return utils.read_file(self.path, text=True).strip() return None def write(self, cache: str) -> None: - with open(self.path, "wt") as fd: - fd.write(cache) + utils.write_file(self.path, cache) def __call__(self, fn: Callable[..., BaseDevice]): @functools.wraps(fn) diff --git a/src/linktools/container/container.py b/src/linktools/container/container.py index c1fd24b4..a1f66812 100644 --- a/src/linktools/container/container.py +++ b/src/linktools/container/container.py @@ -236,8 +236,10 @@ def get_docker_compose_file(self) -> Optional[str]: f"{self.name}.yml", create_parent=True, ) - with open(destination, "wt") as fd: - yaml.dump(self.docker_compose, fd) + utils.write_file( + destination, + yaml.dump(self.docker_compose) + ) return destination def get_docker_file_path(self) -> Optional[str]: @@ -249,8 +251,10 @@ def get_docker_file_path(self) -> Optional[str]: f"{self.name}.Dockerfile", create_parent=True, ) - with open(destination, "wt") as fd: - fd.write(self.docker_file) + utils.write_file( + destination, + self.docker_file + ) return destination @subcommand("shell", help="exec into container using command sh", prefix_chars=chr(0)) @@ -403,12 +407,10 @@ def render_template(self, source: str, destination: str = None, **kwargs: Any): context.setdefault("int", lambda obj, default=0: self.manager.config.cast(obj, type=int, default=default)) context.setdefault("float", lambda obj, default=0.0: self.manager.config.cast(obj, type=float, default=default)) - with open(source, "rt") as fd: - template = Template(fd.read()) + template = Template(utils.read_file(source, text=True)) result = template.render(context) - if destination is not None: - with open(destination, "wt") as fd: - fd.write(result) + if destination: + utils.write_file(destination, result) return result diff --git a/src/linktools/container/manager.py b/src/linktools/container/manager.py index f9cbfd27..f37714cb 100644 --- a/src/linktools/container/manager.py +++ b/src/linktools/container/manager.py @@ -529,15 +529,18 @@ def _repo_config_path(self): def _load_config(self, path: str) -> Union[Dict, List, Tuple]: if os.path.exists(path): try: - with open(path, "rt") as fd: - return json.load(fd) + return json.loads( + utils.read_file(path, text=True) + ) except Exception as e: self.logger.warning(f"Failed to load config file {path}: {e}") return {} def _dump_config(self, path: str, config: Union[Dict, List, Tuple]): try: - with open(path, "wt") as fd: - json.dump(config, fd, indent=2, ensure_ascii=False) + utils.write_file( + path, + json.dumps(config, indent=2, ensure_ascii=False) + ) except Exception as e: self.logger.warning(f"Failed to dump config file {path}: {e}") diff --git a/src/linktools/frida/script.py b/src/linktools/frida/script.py index 7cf03bb0..d4d5ce57 100644 --- a/src/linktools/frida/script.py +++ b/src/linktools/frida/script.py @@ -96,9 +96,8 @@ def filename(self): return self._path def _load(self) -> Optional[str]: - with open(self._path, "rb") as f: - _logger.info(f"Load {self}") - return f.read().decode("utf-8") + _logger.info(f"Load {self}") + return utils.read_file(self._path, text=True) class FridaEvalCode(FridaUserScript): @@ -137,9 +136,7 @@ def _load(self): _logger.info(f"Download {self}") target_path = file.save() - with open(target_path, "rb") as f: - source = f.read().decode("utf-8") - + source = utils.read_file(target_path, text=True) if self._trusted: _logger.info(f"Load trusted {self}") return source @@ -147,8 +144,7 @@ def _load(self): cached_md5 = "" cached_md5_path = target_path + ".md5" if os.path.exists(cached_md5_path): - with open(cached_md5_path, "rt") as fd: - cached_md5 = fd.read() + cached_md5 = utils.read_file(cached_md5_path, text=True) source_md5 = utils.get_md5(source) if cached_md5 == source_md5: @@ -169,8 +165,7 @@ def _load(self): f"Source: {os.linesep}{source_summary}{os.linesep}" \ f"Are you sure you'd like to trust it?" if confirm(prompt): - with open(cached_md5_path, "wt") as fd: - fd.write(source_md5) + utils.write_file(cached_md5_path, source_md5) _logger.info(f"Load trusted {self}") return source else: diff --git a/src/linktools/utils/_utils.py b/src/linktools/utils/_utils.py index e4a0843e..527b62c3 100755 --- a/src/linktools/utils/_utils.py +++ b/src/linktools/utils/_utils.py @@ -55,6 +55,8 @@ T = TypeVar("T") P = ParamSpec("P") +DEFAULT_ENCODING = "utf-8" + class Timeout: @@ -425,28 +427,42 @@ def read_file(path: str) -> bytes: ... @overload - def read_file(path: str, binary: Literal[True]) -> bytes: ... + def read_file(path: str, text: Literal[False]) -> bytes: ... @overload - def read_file(path: str, binary: Literal[False]) -> str: ... + def read_file(path: str, text: Literal[True], encoding=DEFAULT_ENCODING) -> str: ... @overload - def read_file(path: str, binary: bool) -> Union[str, bytes]: ... + def read_file(path: str, text: bool, encoding=DEFAULT_ENCODING) -> Union[str, bytes]: ... -def read_file(path: str, binary: bool = True) -> Union[str, bytes]: - with open(path, "rb" if binary else "rt") as f: - return f.read() +def read_file(path: str, text: bool = False, encoding=DEFAULT_ENCODING) -> Union[str, bytes]: + """ + 读取文件数据 + """ + with open(path, "rb") as fd: + data = fd.read() + if text: + data = data.decode(encoding) + return data -def write_file(path: str, data: [str, bytes]) -> None: - with open(path, "wb" if isinstance(data, bytes) else "wt") as f: - f.write(data) +def write_file(path: str, data: [str, bytes], encoding=DEFAULT_ENCODING) -> None: + """ + 写入文件数据 + """ + if isinstance(data, str): + data = bytes(data, DEFAULT_ENCODING) + with open(path, "wb") as fd: + fd.write(data) def get_lan_ip() -> Optional[str]: + """ + 获取本地IP地址 + """ s = None try: s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) @@ -460,6 +476,9 @@ def get_lan_ip() -> Optional[str]: def get_wan_ip() -> Optional[str]: + """ + 获取外网IP地址 + """ try: with urlopen("http://ifconfig.me/ip") as response: return response.read().decode().strip() @@ -468,6 +487,9 @@ def get_wan_ip() -> Optional[str]: def parse_version(version: str) -> Tuple[int, ...]: + """ + 将字符串版本号解析成元组 + """ result = [] for x in version.split("."): if x.isdigit(): @@ -493,6 +515,9 @@ def parse_version(version: str) -> Tuple[int, ...]: def get_char_width(char): + """ + 获取字符宽度 + """ global _widths o = ord(char) if o == 0xe or o == 0xf: @@ -514,6 +539,9 @@ def __init__(self): def user_agent(style=None) -> str: + """ + 随机获取一个User-Agent + """ ua = _UserAgent() try: @@ -534,6 +562,9 @@ def user_agent(style=None) -> str: def make_url(url: str, *paths: str, **kwargs: "QueryType") -> str: + """ + 拼接URL + """ result = url for path in paths: @@ -553,6 +584,9 @@ def make_url(url: str, *paths: str, **kwargs: "QueryType") -> str: def guess_file_name(url: str) -> str: + """ + 根据url推测文件名 + """ if not url: return "" try: @@ -596,6 +630,9 @@ def parse_header(line): def parser_cookie(cookie: str) -> Dict[str, str]: + """ + 解析cookie成字典 + """ cookies = {} for item in cookie.split(";"): key_value = item.split("=", 1) @@ -608,18 +645,30 @@ def parser_cookie(cookie: str) -> Dict[str, str]: def get_system(): + """ + 获取系统类型 + """ return _SYSTEM def get_machine(): + """ + 获取机器类型 + """ return _MACHINE def get_user(): + """ + 获取当前用户 + """ return getpass.getuser() def get_uid(user: str = None): + """ + 获取用户ID,如果没有指定用户则返回当前用户ID + """ if get_system() in ("darwin", "linux"): if user: import pwd @@ -631,6 +680,9 @@ def get_uid(user: str = None): def get_gid(user: str = None): + """ + 获取用户组ID,如果没有指定用户则返回当前用户组ID + """ if get_system() in ("darwin", "linux"): if user: import pwd