diff --git a/scripts/get_ip.py b/scripts/get_ip.py index 4cb18df..4451f09 100644 --- a/scripts/get_ip.py +++ b/scripts/get_ip.py @@ -10,6 +10,7 @@ import logging import time +from dataclasses import dataclass import requests # type: ignore from pydantic import BaseModel, Field, HttpUrl, ValidationError @@ -24,6 +25,7 @@ class DNSRecord(BaseModel): data: str +@dataclass class URLIPMapping: """Class for storing URL and IP address mappings.""" @@ -36,7 +38,7 @@ def __init__(self, url: str, ip: str): def to_dict(self) -> dict[str, str]: """Convert the URLIPMapping object to a dictionary.""" - return {"url": self.url, "ip": self.ip} + return {self.url: self.ip} def __repr__(self): """Return a string representation of the URLIPMapping object. __str__ is also set to this method.""" @@ -134,7 +136,7 @@ def fetch_dns_records(dns_api: str, url: str, retries_num: int = 3) -> list[DNSR return [] -def get_all_ips(urls: list[str], dns_api: str, retries_num: int = 3) -> list[dict[str, str]]: +def get_all_ips(urls: list[str], dns_api: str, retries_num: int = 3) -> list[URLIPMapping]: """获取指定URL列表的IP地址 Parameters: @@ -148,7 +150,7 @@ def get_all_ips(urls: list[str], dns_api: str, retries_num: int = 3) -> list[dic Returns: List[Dict[str, str]]: 包含URL和IP地址的字典列表。 """ - results: list[dict[str, str]] = [] + results: list[URLIPMapping] = [] # 解析每个URL的IP地址 for url in urls: @@ -164,10 +166,10 @@ def get_all_ips(urls: list[str], dns_api: str, retries_num: int = 3) -> list[dic for idx, record in enumerate(records, start=1): logging.info(f"{url} - {idx}: {record.data}, TTL: {record.ttl}") - results.extend({"url": url, "ip": record.data} for record in records) + results.extend(URLIPMapping(url, record.data) for record in records) except RuntimeError as e: logging.error(f"{url} 无法解析IP地址: {e}") - results.append({"url": url, "ip": "# "}) + results.append(URLIPMapping(url, "# ")) return results diff --git a/scripts/process_ip.py b/scripts/process_ip.py index e55e4d5..da36db0 100644 --- a/scripts/process_ip.py +++ b/scripts/process_ip.py @@ -12,8 +12,10 @@ from ping3 import ping # type: ignore +from scripts.get_ip import URLIPMapping -def select_best_ip(ip_list: list[dict[str, str]]) -> list[dict[str, str]]: + +def select_best_ip(ip_list: list[URLIPMapping]) -> list[URLIPMapping]: """选择每个URL的最佳IP地址。 Parameters: @@ -22,19 +24,19 @@ def select_best_ip(ip_list: list[dict[str, str]]) -> list[dict[str, str]]: Returns: List[Dict[str, str]]: 包含URL和最佳IP地址的字典列表。 """ - best_ips = [] - url_to_ips: dict[str, list[str]] = {} + best_ips: list[URLIPMapping] = [] + url_respond_ips: dict[str, list[str]] = {} + # 存储每个URL对应的所有IP地址 for record in ip_list: - url = record["url"] - ip = record["ip"] - if url not in url_to_ips: - url_to_ips[url] = [] - url_to_ips[url].append(ip) - - for url, ips in url_to_ips.items(): - best_ip = None - lowest_latency = float("inf") + if record.url not in url_respond_ips: + url_respond_ips[record.url] = [] + url_respond_ips[record.url].append(record.ip) + + # 为每个URL选择最佳IP地址 + for url, ips in url_respond_ips.items(): + best_ip: str | None = None + lowest_latency: float = float("inf") for ip in ips: latency = ping(ip, timeout=1) logging.info(f"{url} - {ip}: 延迟: {latency}") @@ -43,7 +45,7 @@ def select_best_ip(ip_list: list[dict[str, str]]) -> list[dict[str, str]]: best_ip = ip if best_ip is not None: - best_ips.append({"url": url, "ip": best_ip}) + best_ips.append(URLIPMapping(url, best_ip)) logging.info(f"{url} 最佳IP地址: {best_ip}, 延迟: {lowest_latency}") else: logging.warning(f"{url} 无法找到最佳IP地址") @@ -51,7 +53,7 @@ def select_best_ip(ip_list: list[dict[str, str]]) -> list[dict[str, str]]: return best_ips -def select_first_ip(ip_list: list[dict[str, str]]) -> list[dict[str, str]]: +def select_first_ip(ip_list: list[URLIPMapping]) -> list[URLIPMapping]: """选择每个URL的第一个IP地址。 GitHub runners are placed in Azure. Ping doesn't work in azure by design. @@ -63,17 +65,16 @@ def select_first_ip(ip_list: list[dict[str, str]]) -> list[dict[str, str]]: Returns: List[Dict[str, str]]: 包含URL和第一个IP地址的字典列表。 """ - first_ips = {} + first_ips: dict[str, str] = {} for record in ip_list: - url, ip = record["url"], record["ip"] - if url not in first_ips: - first_ips[url] = ip - logging.info(f"{url} 第一个IP地址: {ip}") + if record.url not in first_ips: + first_ips[record.url] = record.ip + logging.info(f"{record.url} 第一个IP地址: {record.ip}") - return [{"url": url, "ip": ip} for url, ip in first_ips.items()] + return [URLIPMapping(url, ip) for url, ip in first_ips.items()] -def select_limited_ips(ip_list: list[dict[str, str]], max_ips_per_url: int = 4) -> list[dict[str, str]]: +def select_limited_ips(ip_list: list[URLIPMapping], max_ips_per_url: int = 4) -> list[URLIPMapping]: """选择每个URL的限定数量的IP地址 Parameters: @@ -83,23 +84,22 @@ def select_limited_ips(ip_list: list[dict[str, str]], max_ips_per_url: int = 4) Returns: List[Dict[str, str]]: 根据限定数量选择的URL和IP地址的字典列表。 """ - ips_per_url = {} + ips_per_url: dict[str, list[str]] = {} for record in ip_list: - url, ip = record["url"], record["ip"] - if url not in ips_per_url: - ips_per_url[url] = [ip] - elif len(ips_per_url[url]) < max_ips_per_url: - ips_per_url[url].append(ip) + if record.url not in ips_per_url: + ips_per_url[record.url] = [record.ip] + elif len(ips_per_url[record.url]) < max_ips_per_url: + ips_per_url[record.url].append(record.ip) - result = [] + result: list[URLIPMapping] = [] for url, ips in ips_per_url.items(): for ip in ips: - result.append({"url": url, "ip": ip}) + result.append(URLIPMapping(url, ip)) return result -def merge_deduplicate_ips(ips1: list[dict[str, str]], ips2: list[dict[str, str]]) -> list[dict[str, str]]: +def merge_deduplicate_ips(ips1: list[URLIPMapping], ips2: list[URLIPMapping]) -> list[URLIPMapping]: """合并并去重从两个API得到的IP地址。 Parameters: @@ -109,15 +109,14 @@ def merge_deduplicate_ips(ips1: list[dict[str, str]], ips2: list[dict[str, str]] Returns: List[Dict[str, str]]: 合并并去重后按URL排序的IP地址列表。 """ - combined_ips = {(record["url"], record["ip"]) for record in ips1} - combined_ips.update({(record["url"], record["ip"]) for record in ips2}) - sorted_ips = sorted(combined_ips, key=lambda x: (x[0], x[1])) - return [{"url": url, "ip": ip} for url, ip in sorted_ips] + combined_ips: set[tuple[str, str]] = {(record.url, record.ip) for record in ips1} + combined_ips.update({(record.url, record.ip) for record in ips2}) + + sorted_ips: list[tuple[str, str]] = sorted(combined_ips, key=lambda x: (x[0], x[1])) + return [URLIPMapping(url, ip) for url, ip in sorted_ips] -def reorder_ips_with_best_first( - ips_merged: list[dict[str, str]], ips_best: list[dict[str, str]] -) -> list[dict[str, str]]: +def reorder_ips_with_best_first(ips_merged: list[URLIPMapping], ips_best: list[URLIPMapping]) -> list[URLIPMapping]: """重新排序IP地址列表 将每个URL的最佳IP地址放置在前面。 Parameters: @@ -127,21 +126,25 @@ def reorder_ips_with_best_first( Returns: List[Dict[str, str]]: 重新排序后的IP地址列表 每个URL的最佳IP地址位于前面。 """ - url_to_best_ip = {record["url"]: record["ip"] for record in ips_best} - ips_res = [] + url_to_best_ip: dict[str, str] = {} + for record in ips_best: + url_to_best_ip.update(record.to_dict()) + ips_res: list[URLIPMapping] = [] + + # 先添加最佳IP for record in ips_merged: - url = record["url"] - if url in url_to_best_ip: - ips_res.append({"url": url, "ip": url_to_best_ip[url]}) - del url_to_best_ip[url] + if record.url in url_to_best_ip: + ips_res.append(URLIPMapping(record.url, url_to_best_ip[record.url])) + del url_to_best_ip[record.url] + # 再添加其他IP for record in ips_merged: if record not in ips_res: ips_res.append(record) - ips_res = sorted(ips_res, key=lambda x: x["url"]) + ips_res = sorted(ips_res, key=lambda x: x.url) return ips_res -def format_host_strings(ip_list: list[dict[str, str]]) -> str: +def format_host_strings(ip_list: list[URLIPMapping]) -> str: """将列表中的字典中的IP地址和URL转换为host格式的字符串 并确保每个URL的开头对齐。 Parameters: @@ -150,10 +153,10 @@ def format_host_strings(ip_list: list[dict[str, str]]) -> str: Returns: str: 包含host格式的字符串。 """ - host_strings = [] - max_ip_length = max(len(record["ip"]) for record in ip_list) + host_strings: list[str] = [] + max_ip_length = max(len(record.ip) for record in ip_list) for record in ip_list: - host_format = f"{record['ip'].ljust(max_ip_length)} {record['url']}" + host_format = f"{record.ip.ljust(max_ip_length)} {record.url}" host_strings.append(host_format) return "\n".join(host_strings)