diff --git a/README.md b/README.md index e61702a..fd6c90d 100644 --- a/README.md +++ b/README.md @@ -19,10 +19,8 @@ Automatic search and download of snapshots for Solana 5. Checks the download speed from RPC with the most recent snapshot. If `download_speed max_latency --> skip + --with_private_rpc Enable adding and checking RPCs with the --private-rpc option.This slow down checking and searching but potentially increases the number of RPCs from which snapshots can be downloaded. --measurement_time MEASUREMENT_TIME Time in seconds during which the script will measure the download speed --snapshot_path SNAPSHOT_PATH - The location where the snapshot will be downloaded (absolute path). Example: - /home/ubuntu/solana/validator-ledger + The location where the snapshot will be downloaded (absolute path). Example: /home/ubuntu/solana/validator-ledger --num_of_retries NUM_OF_RETRIES The number of retries if a suitable server for downloading the snapshot was not found --sleep SLEEP Sleep before next retry (seconds) --sort_order SORT_ORDER Priority way to sort the found servers. latency or slots_diff -b BLACKLIST, --blacklist BLACKLIST - If the same corrupted archive is constantly downloaded, you can exclude it. Specify either the - number of the slot you want to exclude, or the hash of the archive name. You can specify several, - separated by commas. Example: -b 135501350,135501360 or --blacklist 135501350,some_hash + If the same corrupted archive is constantly downloaded, you can exclude it. Specify either the number of the slot you want to exclude, or the hash of the archive name. You can specify several, separated by commas. Example: + -b 135501350,135501360 or --blacklist 135501350,some_hash + -v, --verbose increase output verbosity to DEBUG ``` ![alt text](https://raw.githubusercontent.com/c29r3/solana-snapshot-finder/aec9a59a7517a5049fa702675bdc8c770acbef99/2021-07-23_22-38.png?raw=true) diff --git a/snapshot-finder.py b/snapshot-finder.py index 6b6c580..4ce61d9 100644 --- a/snapshot-finder.py +++ b/snapshot-finder.py @@ -6,19 +6,16 @@ import json import sys import argparse +import logging +import datetime from requests import ReadTimeout, ConnectTimeout, HTTPError, Timeout, ConnectionError from tqdm import tqdm from multiprocessing.dummy import Pool as ThreadPool import statistics - -print("Version: 0.2.7") -print("https://github.com/c29r3/solana-snapshot-finder\n\n") - parser = argparse.ArgumentParser(description='Solana snapshot finder') parser.add_argument('-t', '--threads-count', default=1000, type=int, help='the number of concurrently running threads that check snapshots for rpc nodes') - parser.add_argument('-r', '--rpc_address', default='https://api.mainnet-beta.solana.com', type=str, help='RPC address of the node from which the current slot number will be taken\n' @@ -27,6 +24,8 @@ parser.add_argument('--max_snapshot_age', default=1300, type=int, help='How many slots ago the snapshot was created (in slots)') parser.add_argument('--min_download_speed', default=60, type=int, help='Minimum average snapshot download speed in megabytes') parser.add_argument('--max_latency', default=40, type=int, help='The maximum value of latency (milliseconds). If latency > max_latency --> skip') +parser.add_argument('--with_private_rpc', action="store_true", help='Enable adding and checking RPCs with the --private-rpc option.This slow down checking and searching but potentially increases' + ' the number of RPCs from which snapshots can be downloaded.') parser.add_argument('--measurement_time', default=7, type=int, help='Time in seconds during which the script will measure the download speed') parser.add_argument('--snapshot_path', type=str, default=".", help='The location where the snapshot will be downloaded (absolute path).' ' Example: /home/ubuntu/solana/validator-ledger') @@ -36,16 +35,18 @@ parser.add_argument('-b', '--blacklist', default='', type=str, help='If the same corrupted archive is constantly downloaded, you can exclude it.' ' Specify either the number of the slot you want to exclude, or the hash of the archive name. ' 'You can specify several, separated by commas. Example: -b 135501350,135501360 or --blacklist 135501350,some_hash') +parser.add_argument("-v", "--verbose", help="increase output verbosity to DEBUG", action="store_true") args = parser.parse_args() DEFAULT_HEADERS = {"Content-Type": "application/json"} RPC = args.rpc_address +WITH_PRIVATE_RPC = args.with_private_rpc MAX_SNAPSHOT_AGE_IN_SLOTS = args.max_snapshot_age THREADS_COUNT = args.threads_count MIN_DOWNLOAD_SPEED_MB = args.min_download_speed SPEED_MEASURE_TIME_SEC = args.measurement_time MAX_LATENCY = args.max_latency -SNAPSHOT_PATH = args.snapshot_path if args.snapshot_path[-1] != '\\' else args.snapshot_path[:-1] +SNAPSHOT_PATH = args.snapshot_path if args.snapshot_path[-1] != '/' else args.snapshot_path[:-1] NUM_OF_MAX_ATTEMPTS = args.num_of_retries SLEEP_BEFORE_RETRY = args.sleep NUM_OF_ATTEMPTS = 1 @@ -60,35 +61,34 @@ FULL_LOCAL_SNAPSHOTS = [] # skip servers that do not fit the filters so as not to check them again unsuitable_servers = set() - -print(f'{RPC=}\n' - f'{MAX_SNAPSHOT_AGE_IN_SLOTS=}\n' - f'{MIN_DOWNLOAD_SPEED_MB=}\n' - f'{SNAPSHOT_PATH=}\n' - f'{THREADS_COUNT=}\n' - f'{NUM_OF_MAX_ATTEMPTS=}\n' - f'{SORT_ORDER=}\n' - f'{BLACKLIST=}') - -try: - f_ = open(f'{SNAPSHOT_PATH}/write_perm_test', 'w') - f_.close() - os.remove(f'{SNAPSHOT_PATH}/write_perm_test') -except IOError: - print(f'\nCheck {SNAPSHOT_PATH=} and permissions') - sys.exit(f'{os.system("ls -l")}') - -json_data = ({"last_update_at": 0.0, - "last_update_slot": 0, - "total_rpc_nodes": 0, - "rpc_nodes_with_actual_snapshot": 0, - "rpc_nodes": [] - }) +# Configure Logging +logging.getLogger('urllib3').setLevel(logging.WARNING) +if args.verbose: + logging.basicConfig( + level=logging.DEBUG, + format="%(asctime)s [%(levelname)s] %(message)s", + handlers=[ + logging.FileHandler(f'{SNAPSHOT_PATH}/snapshot-finder.log'), + logging.StreamHandler(sys.stdout), + ] + ) + +else: + # logging.basicConfig(stream=sys.stdout, encoding='utf-8', level=logging.INFO, format='|%(asctime)s| %(message)s') + logging.basicConfig( + level=logging.INFO, + format="%(asctime)s [%(levelname)s] %(message)s", + handlers=[ + logging.FileHandler(f'{SNAPSHOT_PATH}/snapshot-finder.log'), + logging.StreamHandler(sys.stdout), + ] + ) +logger = logging.getLogger(__name__) def convert_size(size_bytes): if size_bytes == 0: - return "0B" + return "0B" size_name = ("B", "KB", "MB", "GB", "TB", "PB", "EB", "ZB", "YB") i = int(math.floor(math.log(size_bytes, 1024))) p = math.pow(1024, i) @@ -97,6 +97,7 @@ def convert_size(size_bytes): def measure_speed(url: str, measure_time: int) -> float: + logging.debug('measure_speed()') url = f'http://{url}/snapshot.tar.bz2' r = requests.get(url, stream=True, timeout=measure_time+2) r.raise_for_status() @@ -146,30 +147,43 @@ def do_request(url_: str, method_: str = 'GET', data_: str = '', timeout_: int = def get_current_slot(): - print("get_current_slot()") + logger.debug("get_current_slot()") d = '{"jsonrpc":"2.0","id":1, "method":"getSlot"}' try: r = do_request(url_=RPC, method_='post', data_=d, timeout_=25) if 'result' in str(r.text): return r.json()["result"] else: - print(f'Can\'t get current slot') + logger.error(f'Can\'t get current slot') return None except: - print(f'Can\'t get current slot') + logger.error(f'Can\'t get current slot') return None def get_all_rpc_ips(): + logger.debug("get_all_rpc_ips()") d = '{"jsonrpc":"2.0", "id":1, "method":"getClusterNodes"}' r = do_request(url_=RPC, method_='post', data_=d, timeout_=25) if 'result' in str(r.text): - rpc_ips = [rpc["rpc"] for rpc in r.json()["result"] if rpc["rpc"] is not None] + if WITH_PRIVATE_RPC is True: + rpc_ips = [] + for node in r.json()["result"]: + if node["rpc"] is not None: + rpc_ips.append(node["rpc"]) + else: + gossip_ip = node["gossip"].split(":")[0] + rpc_ips.append(f'{gossip_ip}:8899') + + else: + rpc_ips = [rpc["rpc"] for rpc in r.json()["result"] if rpc["rpc"] is not None] + rpc_ips = list(set(rpc_ips)) return rpc_ips else: - sys.exit(f'Can\'t get RPC ip addresses {r.text}') + logger.error(f'Can\'t get RPC ip addresses {r.text}') + sys.exit() def get_snapshot_slot(rpc_address: str): @@ -240,13 +254,12 @@ def get_snapshot_slot(rpc_address: str): return None except Exception as getSnapErr: - # print(f'error in get_snapshot_slot(): {getSnapErr}') return None def download(url: str, fname_: str): #actual_name = url[url.rfind('/'):] - temp_fname = f'{SNAPSHOT_PATH}/tmpblet-{fname_}' + temp_fname = f'{SNAPSHOT_PATH}/tmp-{fname_}' fname = f'{SNAPSHOT_PATH}/{fname_}' try: resp = requests.get(url, stream=True) @@ -262,14 +275,14 @@ def download(url: str, fname_: str): size = file.write(data) bar.update(size) - print(f'Rename the downloaded file {temp_fname} --> {fname}') + logger.info(f'Rename the downloaded file {temp_fname} --> {fname}') os.rename(temp_fname, fname) except (ReadTimeout, ConnectTimeout, HTTPError, Timeout, ConnectionError) as downlErr: - print(f'Exception in download() func\n {downlErr}') + logger.error(f'Exception in download() func\n {downlErr}') except Exception as unknwErr: - print(f'Exception in download() func\n{unknwErr}') + logger.error(f'Exception in download() func\n{unknwErr}') def main_worker(): @@ -278,7 +291,7 @@ def main_worker(): rpc_nodes = list(set(get_all_rpc_ips())) global pbar pbar = tqdm(total=len(rpc_nodes)) - print(f'RPC servers in total: {len(rpc_nodes)} \nCurrent slot number: {current_slot}\n') + logger.info(f'RPC servers in total: {len(rpc_nodes)} | Current slot number: {current_slot}\n') # Search for full local snapshots. # If such a snapshot is found and it is not too old, then the script will try to find and download an incremental snapshot @@ -286,19 +299,19 @@ def main_worker(): if len(FULL_LOCAL_SNAPSHOTS) > 0: FULL_LOCAL_SNAPSHOTS.sort(reverse=True) FULL_LOCAL_SNAP_SLOT = FULL_LOCAL_SNAPSHOTS[0].replace(SNAPSHOT_PATH, "").split("-")[1] - print(f'Found full local snapshot {FULL_LOCAL_SNAPSHOTS[0]} | {FULL_LOCAL_SNAP_SLOT=}') + logger.info(f'Found full local snapshot {FULL_LOCAL_SNAPSHOTS[0]} | {FULL_LOCAL_SNAP_SLOT=}') else: - print(f'Can\'t find any full local snapshots in this path {SNAPSHOT_PATH} --> the search will be carried out on full snapshots') + logger.info(f'Can\'t find any full local snapshots in this path {SNAPSHOT_PATH} --> the search will be carried out on full snapshots') print(f'Searching information about snapshots on all found RPCs') pool = ThreadPool() pool.map(get_snapshot_slot, rpc_nodes) - print(f'Found suitable RPCs: {len(json_data["rpc_nodes"])}') + logger.info(f'Found suitable RPCs: {len(json_data["rpc_nodes"])}') if len(json_data["rpc_nodes"]) == 0: - sys.exit(f'No snapshot nodes were found matching the given parameters:\n' - f'- {args.max_snapshot_age=}') + logger.info(f'No snapshot nodes were found matching the given parameters: {args.max_snapshot_age=}') + sys.exit() # sort list of rpc node by SORT_ORDER (latency) rpc_nodes_sorted = sorted(json_data["rpc_nodes"], key=lambda k: k[SORT_ORDER]) @@ -313,34 +326,34 @@ def main_worker(): with open(f'{SNAPSHOT_PATH}/snapshot.json', "w") as result_f: json.dump(json_data, result_f, indent=2) - print(f'All data is saved to json file - {SNAPSHOT_PATH}/snapshot.json') + logger.info(f'All data is saved to json file - {SNAPSHOT_PATH}/snapshot.json') best_snapshot_node = {} num_of_rpc_to_check = 15 rpc_nodes_inc_sorted = [] - print("TRYING TO DOWNLOADING FILES") + logger.info("TRYING TO DOWNLOADING FILES") for i, rpc_node in enumerate(json_data["rpc_nodes"], start=1): # filter blacklisted snapshots if BLACKLIST != ['']: if any(i in str(rpc_node["files_to_download"]) for i in BLACKLIST): - print(f'{i}\\{len(json_data["rpc_nodes"])} excluded via --blacklist') + logger.info(f'{i}\\{len(json_data["rpc_nodes"])} BLACKLISTED --> {rpc_node}') continue - print(f'{i}\\{len(json_data["rpc_nodes"])} checking the speed {rpc_node}') + logger.info(f'{i}\\{len(json_data["rpc_nodes"])} checking the speed {rpc_node}') if rpc_node["snapshot_address"] in unsuitable_servers: - print(f'Rpc node already in unsuitable list --> skip {rpc_node["snapshot_address"]}') + logger.info(f'Rpc node already in unsuitable list --> skip {rpc_node["snapshot_address"]}') continue down_speed_bytes = measure_speed(url=rpc_node["snapshot_address"], measure_time=SPEED_MEASURE_TIME_SEC) down_speed_mb = convert_size(down_speed_bytes) if down_speed_bytes < MIN_DOWNLOAD_SPEED_MB * 1e6: - print(f'Too slow: {rpc_node=} {down_speed_mb=}') + logger.info(f'Too slow: {rpc_node=} {down_speed_mb=}') unsuitable_servers.add(rpc_node["snapshot_address"]) continue elif down_speed_bytes >= MIN_DOWNLOAD_SPEED_MB * 1e6: - print(f'Suitable snapshot server found: {rpc_node=} {down_speed_mb=}') + logger.info(f'Suitable snapshot server found: {rpc_node=} {down_speed_mb=}') for path in reversed(rpc_node["files_to_download"]): # do not download full snapshot if it already exists locally if str(path).startswith("/snapshot-"): @@ -353,20 +366,20 @@ def main_worker(): else: best_snapshot_node = f'http://{rpc_node["snapshot_address"]}/snapshot.tar.bz2' - print(f'Downloading {best_snapshot_node} snapshot to {SNAPSHOT_PATH}') + logger.info(f'Downloading {best_snapshot_node} snapshot to {SNAPSHOT_PATH}') download(url=best_snapshot_node, fname_=path.replace("/", "")) return 0 elif i > num_of_rpc_to_check: - print(f'The limit on the number of RPC nodes from' + logger.info(f'The limit on the number of RPC nodes from' ' which we measure the speed has been reached {num_of_rpc_to_check=}\n') break else: - print(f'{down_speed_mb=} < {MIN_DOWNLOAD_SPEED_MB=}') + logger.info(f'{down_speed_mb=} < {MIN_DOWNLOAD_SPEED_MB=}') if best_snapshot_node is {}: - print(f'No snapshot nodes were found matching the given parameters:{args.min_download_speed=}\n' + logger.error(f'No snapshot nodes were found matching the given parameters:{args.min_download_speed=}\n' f'RETRY #{NUM_OF_ATTEMPTS}\\{NUM_OF_MAX_ATTEMPTS}') return 1 @@ -379,9 +392,36 @@ def main_worker(): return 1 +logger.info("Version: 0.2.8") +logger.info("https://github.com/c29r3/solana-snapshot-finder\n\n") +logger.info(f'{RPC=}\n' + f'{MAX_SNAPSHOT_AGE_IN_SLOTS=}\n' + f'{MIN_DOWNLOAD_SPEED_MB=}\n' + f'{SNAPSHOT_PATH=}\n' + f'{THREADS_COUNT=}\n' + f'{NUM_OF_MAX_ATTEMPTS=}\n' + f'{WITH_PRIVATE_RPC=}\n' + f'{SORT_ORDER=}') + +try: + f_ = open(f'{SNAPSHOT_PATH}/write_perm_test', 'w') + f_.close() + os.remove(f'{SNAPSHOT_PATH}/write_perm_test') +except IOError: + logger.error(f'\nCheck {SNAPSHOT_PATH=} and permissions') + sys.exit(f'{os.system("ls -l")}') + +json_data = ({"last_update_at": 0.0, + "last_update_slot": 0, + "total_rpc_nodes": 0, + "rpc_nodes_with_actual_snapshot": 0, + "rpc_nodes": [] + }) + + while NUM_OF_ATTEMPTS <= NUM_OF_MAX_ATTEMPTS: current_slot = get_current_slot() - print(f'Attempt number: {NUM_OF_ATTEMPTS}. Total attempts: {NUM_OF_MAX_ATTEMPTS}') + logger.info(f'Attempt number: {NUM_OF_ATTEMPTS}. Total attempts: {NUM_OF_MAX_ATTEMPTS}') NUM_OF_ATTEMPTS += 1 if current_slot is None: @@ -390,11 +430,12 @@ def main_worker(): worker_result = main_worker() if worker_result == 0: - print("Done") + logger.info("Done") exit(0) if NUM_OF_ATTEMPTS >= NUM_OF_MAX_ATTEMPTS: - sys.exit(f'Could not find a suitable snapshot') + logger.error(f'Could not find a suitable snapshot --> exit') + sys.exit() - print(f"Sleeping {SLEEP_BEFORE_RETRY} seconds before next try") - time.sleep(SLEEP_BEFORE_RETRY) + logger.info(f"Sleeping {SLEEP_BEFORE_RETRY} seconds before next try") + time.sleep(SLEEP_BEFORE_RETRY) \ No newline at end of file