Skip to content

Commit

Permalink
feat: Refactor dataclass
Browse files Browse the repository at this point in the history
  • Loading branch information
ittuann committed Sep 8, 2024
1 parent 5e32b54 commit c434106
Show file tree
Hide file tree
Showing 2 changed files with 58 additions and 53 deletions.
12 changes: 7 additions & 5 deletions scripts/get_ip.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

import logging
import time
from dataclasses import dataclass

import requests # type: ignore
from pydantic import BaseModel, Field, HttpUrl, ValidationError
Expand All @@ -24,6 +25,7 @@ class DNSRecord(BaseModel):
data: str


@dataclass
class URLIPMapping:
"""Class for storing URL and IP address mappings."""

Expand All @@ -36,7 +38,7 @@ def __init__(self, url: str, ip: str):

def to_dict(self) -> dict[str, str]:
"""Convert the URLIPMapping object to a dictionary."""
return {"url": self.url, "ip": self.ip}
return {self.url: self.ip}

def __repr__(self):
"""Return a string representation of the URLIPMapping object. __str__ is also set to this method."""
Expand Down Expand Up @@ -134,7 +136,7 @@ def fetch_dns_records(dns_api: str, url: str, retries_num: int = 3) -> list[DNSR
return []


def get_all_ips(urls: list[str], dns_api: str, retries_num: int = 3) -> list[dict[str, str]]:
def get_all_ips(urls: list[str], dns_api: str, retries_num: int = 3) -> list[URLIPMapping]:
"""获取指定URL列表的IP地址
Parameters:
Expand All @@ -148,7 +150,7 @@ def get_all_ips(urls: list[str], dns_api: str, retries_num: int = 3) -> list[dic
Returns:
List[Dict[str, str]]: 包含URL和IP地址的字典列表。
"""
results: list[dict[str, str]] = []
results: list[URLIPMapping] = []

# 解析每个URL的IP地址
for url in urls:
Expand All @@ -164,10 +166,10 @@ def get_all_ips(urls: list[str], dns_api: str, retries_num: int = 3) -> list[dic
for idx, record in enumerate(records, start=1):
logging.info(f"{url} - {idx}: {record.data}, TTL: {record.ttl}")

results.extend({"url": url, "ip": record.data} for record in records)
results.extend(URLIPMapping(url, record.data) for record in records)

except RuntimeError as e:
logging.error(f"{url} 无法解析IP地址: {e}")
results.append({"url": url, "ip": "# "})
results.append(URLIPMapping(url, "# "))

return results
99 changes: 51 additions & 48 deletions scripts/process_ip.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,10 @@

from ping3 import ping # type: ignore

from scripts.get_ip import URLIPMapping

def select_best_ip(ip_list: list[dict[str, str]]) -> list[dict[str, str]]:

def select_best_ip(ip_list: list[URLIPMapping]) -> list[URLIPMapping]:
"""选择每个URL的最佳IP地址。
Parameters:
Expand All @@ -22,19 +24,19 @@ def select_best_ip(ip_list: list[dict[str, str]]) -> list[dict[str, str]]:
Returns:
List[Dict[str, str]]: 包含URL和最佳IP地址的字典列表。
"""
best_ips = []
url_to_ips: dict[str, list[str]] = {}
best_ips: list[URLIPMapping] = []
url_respond_ips: dict[str, list[str]] = {}

# 存储每个URL对应的所有IP地址
for record in ip_list:
url = record["url"]
ip = record["ip"]
if url not in url_to_ips:
url_to_ips[url] = []
url_to_ips[url].append(ip)

for url, ips in url_to_ips.items():
best_ip = None
lowest_latency = float("inf")
if record.url not in url_respond_ips:
url_respond_ips[record.url] = []
url_respond_ips[record.url].append(record.ip)

# 为每个URL选择最佳IP地址
for url, ips in url_respond_ips.items():
best_ip: str | None = None
lowest_latency: float = float("inf")
for ip in ips:
latency = ping(ip, timeout=1)
logging.info(f"{url} - {ip}: 延迟: {latency}")
Expand All @@ -43,15 +45,15 @@ def select_best_ip(ip_list: list[dict[str, str]]) -> list[dict[str, str]]:
best_ip = ip

if best_ip is not None:
best_ips.append({"url": url, "ip": best_ip})
best_ips.append(URLIPMapping(url, best_ip))
logging.info(f"{url} 最佳IP地址: {best_ip}, 延迟: {lowest_latency}")
else:
logging.warning(f"{url} 无法找到最佳IP地址")

return best_ips


def select_first_ip(ip_list: list[dict[str, str]]) -> list[dict[str, str]]:
def select_first_ip(ip_list: list[URLIPMapping]) -> list[URLIPMapping]:
"""选择每个URL的第一个IP地址。
GitHub runners are placed in Azure. Ping doesn't work in azure by design.
Expand All @@ -63,17 +65,16 @@ def select_first_ip(ip_list: list[dict[str, str]]) -> list[dict[str, str]]:
Returns:
List[Dict[str, str]]: 包含URL和第一个IP地址的字典列表。
"""
first_ips = {}
first_ips: dict[str, str] = {}
for record in ip_list:
url, ip = record["url"], record["ip"]
if url not in first_ips:
first_ips[url] = ip
logging.info(f"{url} 第一个IP地址: {ip}")
if record.url not in first_ips:
first_ips[record.url] = record.ip
logging.info(f"{record.url} 第一个IP地址: {record.ip}")

return [{"url": url, "ip": ip} for url, ip in first_ips.items()]
return [URLIPMapping(url, ip) for url, ip in first_ips.items()]


def select_limited_ips(ip_list: list[dict[str, str]], max_ips_per_url: int = 4) -> list[dict[str, str]]:
def select_limited_ips(ip_list: list[URLIPMapping], max_ips_per_url: int = 4) -> list[URLIPMapping]:
"""选择每个URL的限定数量的IP地址
Parameters:
Expand All @@ -83,23 +84,22 @@ def select_limited_ips(ip_list: list[dict[str, str]], max_ips_per_url: int = 4)
Returns:
List[Dict[str, str]]: 根据限定数量选择的URL和IP地址的字典列表。
"""
ips_per_url = {}
ips_per_url: dict[str, list[str]] = {}
for record in ip_list:
url, ip = record["url"], record["ip"]
if url not in ips_per_url:
ips_per_url[url] = [ip]
elif len(ips_per_url[url]) < max_ips_per_url:
ips_per_url[url].append(ip)
if record.url not in ips_per_url:
ips_per_url[record.url] = [record.ip]
elif len(ips_per_url[record.url]) < max_ips_per_url:
ips_per_url[record.url].append(record.ip)

result = []
result: list[URLIPMapping] = []
for url, ips in ips_per_url.items():
for ip in ips:
result.append({"url": url, "ip": ip})
result.append(URLIPMapping(url, ip))

return result


def merge_deduplicate_ips(ips1: list[dict[str, str]], ips2: list[dict[str, str]]) -> list[dict[str, str]]:
def merge_deduplicate_ips(ips1: list[URLIPMapping], ips2: list[URLIPMapping]) -> list[URLIPMapping]:
"""合并并去重从两个API得到的IP地址。
Parameters:
Expand All @@ -109,15 +109,14 @@ def merge_deduplicate_ips(ips1: list[dict[str, str]], ips2: list[dict[str, str]]
Returns:
List[Dict[str, str]]: 合并并去重后按URL排序的IP地址列表。
"""
combined_ips = {(record["url"], record["ip"]) for record in ips1}
combined_ips.update({(record["url"], record["ip"]) for record in ips2})
sorted_ips = sorted(combined_ips, key=lambda x: (x[0], x[1]))
return [{"url": url, "ip": ip} for url, ip in sorted_ips]
combined_ips: set[tuple[str, str]] = {(record.url, record.ip) for record in ips1}
combined_ips.update({(record.url, record.ip) for record in ips2})

sorted_ips: list[tuple[str, str]] = sorted(combined_ips, key=lambda x: (x[0], x[1]))
return [URLIPMapping(url, ip) for url, ip in sorted_ips]


def reorder_ips_with_best_first(
ips_merged: list[dict[str, str]], ips_best: list[dict[str, str]]
) -> list[dict[str, str]]:
def reorder_ips_with_best_first(ips_merged: list[URLIPMapping], ips_best: list[URLIPMapping]) -> list[URLIPMapping]:
"""重新排序IP地址列表 将每个URL的最佳IP地址放置在前面。
Parameters:
Expand All @@ -127,21 +126,25 @@ def reorder_ips_with_best_first(
Returns:
List[Dict[str, str]]: 重新排序后的IP地址列表 每个URL的最佳IP地址位于前面。
"""
url_to_best_ip = {record["url"]: record["ip"] for record in ips_best}
ips_res = []
url_to_best_ip: dict[str, str] = {}
for record in ips_best:
url_to_best_ip.update(record.to_dict())
ips_res: list[URLIPMapping] = []

# 先添加最佳IP
for record in ips_merged:
url = record["url"]
if url in url_to_best_ip:
ips_res.append({"url": url, "ip": url_to_best_ip[url]})
del url_to_best_ip[url]
if record.url in url_to_best_ip:
ips_res.append(URLIPMapping(record.url, url_to_best_ip[record.url]))
del url_to_best_ip[record.url]
# 再添加其他IP
for record in ips_merged:
if record not in ips_res:
ips_res.append(record)
ips_res = sorted(ips_res, key=lambda x: x["url"])
ips_res = sorted(ips_res, key=lambda x: x.url)
return ips_res


def format_host_strings(ip_list: list[dict[str, str]]) -> str:
def format_host_strings(ip_list: list[URLIPMapping]) -> str:
"""将列表中的字典中的IP地址和URL转换为host格式的字符串 并确保每个URL的开头对齐。
Parameters:
Expand All @@ -150,10 +153,10 @@ def format_host_strings(ip_list: list[dict[str, str]]) -> str:
Returns:
str: 包含host格式的字符串。
"""
host_strings = []
max_ip_length = max(len(record["ip"]) for record in ip_list)
host_strings: list[str] = []
max_ip_length = max(len(record.ip) for record in ip_list)
for record in ip_list:
host_format = f"{record['ip'].ljust(max_ip_length)} {record['url']}"
host_format = f"{record.ip.ljust(max_ip_length)} {record.url}"
host_strings.append(host_format)

return "\n".join(host_strings)

0 comments on commit c434106

Please sign in to comment.