diff --git a/snapshotter/snapshotter_id_ping.py b/snapshotter/snapshotter_id_ping.py index 079afdf6..4afcd6c3 100644 --- a/snapshotter/snapshotter_id_ping.py +++ b/snapshotter/snapshotter_id_ping.py @@ -23,7 +23,7 @@ async def main(): anchor_rpc = RpcHelper(settings.anchor_chain_rpc) protocol_abi = read_json_file(settings.protocol_state.abi) protocol_state_contract = anchor_rpc.get_current_node()['web3_client'].eth.contract( - address=Web3.toChecksumAddress( + address=Web3.to_checksum_address( settings.protocol_state.address, ), abi=protocol_abi, diff --git a/snapshotter/system_event_detector.py b/snapshotter/system_event_detector.py index e62b15a5..7bc84716 100644 --- a/snapshotter/system_event_detector.py +++ b/snapshotter/system_event_detector.py @@ -20,17 +20,18 @@ from snapshotter.utils.file_utils import read_json_file from snapshotter.utils.models.data_models import EpochReleasedEvent from snapshotter.utils.models.data_models import EventBase -from snapshotter.utils.models.data_models import ProjectsUpdatedEvent from snapshotter.utils.models.data_models import SnapshotFinalizedEvent from snapshotter.utils.models.data_models import SnapshottersUpdatedEvent from snapshotter.utils.rabbitmq_helpers import RabbitmqThreadedSelectLoopInteractor from snapshotter.utils.redis.redis_conn import RedisPoolCache from snapshotter.utils.redis.redis_keys import event_detector_last_processed_block +from snapshotter.utils.redis.redis_keys import last_epoch_detected_epoch_id_key from snapshotter.utils.redis.redis_keys import last_epoch_detected_timestamp_key from snapshotter.utils.rpc import get_event_sig_and_abi from snapshotter.utils.rpc import RpcHelper + def rabbitmq_and_redis_cleanup(fn): """ A decorator function that wraps the given function and handles cleanup of RabbitMQ and Redis connections in case of @@ -110,14 +111,14 @@ def __init__(self, name, **kwargs): self._rabbitmq_queue = queue.Queue() self._shutdown_initiated = False self._logger = logger.bind( - module=f'{name}|{settings.namespace}-{settings.instance_id[:5]}', + module=name, ) self._exchange = ( f'{settings.rabbitmq.setup.event_detector.exchange}:{settings.namespace}' ) self._routing_key_prefix = ( - f'powerloom-event-detector:{settings.namespace}:{settings.instance_id}.' + f'event-detector:{settings.namespace}:{settings.instance_id}.' ) self._aioredis_pool = None self._redis_conn = None @@ -131,7 +132,7 @@ def __init__(self, name, **kwargs): ) self.contract_address = settings.protocol_state.address self.contract = self.rpc_helper.get_current_node()['web3_client'].eth.contract( - address=Web3.toChecksumAddress( + address=Web3.to_checksum_address( self.contract_address, ), abi=self.contract_abi, @@ -140,19 +141,16 @@ def __init__(self, name, **kwargs): # event EpochReleased(uint256 indexed epochId, uint256 begin, uint256 end, uint256 timestamp); # event SnapshotFinalized(uint256 indexed epochId, uint256 epochEnd, string projectId, # string snapshotCid, uint256 timestamp); - # event ProjectsUpdated(string projectId, bool allowed); EVENTS_ABI = { 'EpochReleased': self.contract.events.EpochReleased._get_event_abi(), 'SnapshotFinalized': self.contract.events.SnapshotFinalized._get_event_abi(), - 'ProjectsUpdated': self.contract.events.ProjectsUpdated._get_event_abi(), 'allSnapshottersUpdated': self.contract.events.allSnapshottersUpdated._get_event_abi(), } EVENT_SIGS = { 'EpochReleased': 'EpochReleased(uint256,uint256,uint256,uint256)', 'SnapshotFinalized': 'SnapshotFinalized(uint256,uint256,string,string,uint256)', - 'ProjectsUpdated': 'ProjectsUpdated(string,bool,uint256)', 'allSnapshottersUpdated': 'allSnapshottersUpdated(address,bool)', } @@ -197,6 +195,7 @@ async def get_events(self, from_block: int, to_block: int): events = [] new_epoch_detected = False + latest_epoch_id = - 1 for log in events_log: if log.event == 'EpochReleased': event = EpochReleasedEvent( @@ -206,6 +205,7 @@ async def get_events(self, from_block: int, to_block: int): timestamp=log.args.timestamp, ) new_epoch_detected = True + latest_epoch_id = max(latest_epoch_id, log.args.epochId) events.append((log.event, event)) elif log.event == 'SnapshotFinalized': @@ -217,14 +217,6 @@ async def get_events(self, from_block: int, to_block: int): timestamp=log.args.timestamp, ) events.append((log.event, event)) - elif log.event == 'ProjectsUpdated': - event = ProjectsUpdatedEvent( - projectId=log.args.projectId, - allowed=log.args.allowed, - enableEpochId=log.args.enableEpochId, - timestamp=int(time.time()), - ) - events.append((log.event, event)) elif log.event == 'allSnapshottersUpdated': event = SnapshottersUpdatedEvent( snapshotterAddress=log.args.snapshotterAddress, @@ -238,6 +230,10 @@ async def get_events(self, from_block: int, to_block: int): last_epoch_detected_timestamp_key(), int(time.time()), ) + await self._redis_conn.set( + last_epoch_detected_epoch_id_key(), + latest_epoch_id, + ) self._logger.info('Events: {}', events) return events @@ -298,7 +294,7 @@ async def _detect_events(self): """ while True: try: - current_block = await self.rpc_helper.get_current_block(redis_conn=self._redis_conn) + current_block = await self.rpc_helper.get_current_block_number() self._logger.info('Current block: {}', current_block) except Exception as e: @@ -421,4 +417,4 @@ def run(self): self.ev_loop.run_until_complete( self._detect_events(), - ) + ) \ No newline at end of file diff --git a/snapshotter/utils/data_utils.py b/snapshotter/utils/data_utils.py index 821c76a2..72616d7a 100644 --- a/snapshotter/utils/data_utils.py +++ b/snapshotter/utils/data_utils.py @@ -1,6 +1,5 @@ import asyncio import json -import os from typing import List import tenacity @@ -11,10 +10,7 @@ from tenacity import stop_after_attempt from tenacity import wait_random_exponential -from snapshotter.settings.config import settings from snapshotter.utils.default_logger import logger -from snapshotter.utils.file_utils import read_json_file -from snapshotter.utils.file_utils import write_json_file from snapshotter.utils.models.data_models import ProjectStatus from snapshotter.utils.models.data_models import SnapshotterIncorrectSnapshotSubmission from snapshotter.utils.models.data_models import SnapshotterMissedSnapshotSubmission @@ -135,6 +131,54 @@ async def w3_get_and_cache_finalized_cid( return f'null_{epoch_id}', epoch_id +async def get_project_last_finalized_cid_and_epoch(redis_conn: aioredis.Redis, state_contract_obj, rpc_helper, project_id): + """ + Get the last epoch for a given project ID. + + Args: + redis_conn (aioredis.Redis): Redis connection object. + state_contract_obj: Contract object for the state contract. + rpc_helper: RPC helper object. + project_id (str): ID of the project. + + Returns: + int: The last epoch for the given project ID. + """ + project_last_finalized = await redis_conn.zrevrangebyscore( + project_finalized_data_zset(project_id), + max='+inf', + min='-inf', + withscores=True, + start=0, + num=1, + ) + + if project_last_finalized: + project_last_finalized_cid, project_last_finalized_epoch = project_last_finalized[0] + project_last_finalized_epoch = int(project_last_finalized_epoch) + project_last_finalized_cid = project_last_finalized_cid.decode('utf-8') + + if project_last_finalized_cid and 'null' not in project_last_finalized_cid: + return project_last_finalized_cid, project_last_finalized_epoch + + tasks = [ + state_contract_obj.functions.lastFinalizedSnapshot(project_id), + ] + + [last_finalized_epoch] = await rpc_helper.web3_call(tasks, redis_conn=redis_conn) + logger.info(f'last finalized epoch for project {project_id} is {last_finalized_epoch}') + + # getting finalized cid for last finalized epoch + last_finalized_cid = await get_project_finalized_cid( + redis_conn, state_contract_obj, rpc_helper, last_finalized_epoch, project_id, + ) + + if last_finalized_cid and 'null' not in last_finalized_cid: + return last_finalized_cid, int(last_finalized_epoch) + else: + return '', 0 + + # TODO: warmup cache to reduce RPC calls overhead async def get_project_first_epoch(redis_conn: aioredis.Redis, state_contract_obj, rpc_helper, project_id): """ @@ -216,23 +260,14 @@ async def get_submission_data(redis_conn: aioredis.Redis, cid, ipfs_reader, proj if 'null' in cid: return dict() - cached_data_path = os.path.join(settings.ipfs.local_cache_path, project_id, 'snapshots') - filename = f'{cid}.json' + logger.info('Project {} CID {}, fetching data from IPFS', project_id, cid) try: - submission_data = read_json_file(os.path.join(cached_data_path, filename)) - except Exception as e: - # Fetch from IPFS - logger.trace('Error while reading from cache', error=e) - logger.info('Project {} CID {}, fetching data from IPFS', project_id, cid) - try: - submission_data = await fetch_file_from_ipfs(ipfs_reader, cid) - except: - logger.error('Error while fetching data from IPFS | Project {} | CID {}', project_id, cid) - submission_data = dict() - else: - # Cache it - write_json_file(cached_data_path, filename, submission_data) - submission_data = json.loads(submission_data) + submission_data = await fetch_file_from_ipfs(ipfs_reader, cid) + except: + logger.error('Error while fetching data from IPFS | Project {} | CID {}', project_id, cid) + submission_data = dict() + else: + submission_data = json.loads(submission_data) return submission_data @@ -352,7 +387,7 @@ async def build_projects_list_from_events(redis_conn: aioredis.Redis, state_cont redis_conn=redis_conn, ) - current_block = await rpc_helper.get_current_block_number(redis_conn) + current_block = await rpc_helper.get_current_block_number() event_sig, event_abi = get_event_sig_and_abi(EVENT_SIGS, EVENT_ABI) # from start_block to current block, get all events in batches of 1000, 10 requests parallelly @@ -726,4 +761,4 @@ async def get_snapshotter_project_status(redis_conn: aioredis.Redis, project_id: ), ) - return project_status + return project_status \ No newline at end of file diff --git a/snapshotter/utils/rpc.py b/snapshotter/utils/rpc.py index 801548f6..f187ba74 100644 --- a/snapshotter/utils/rpc.py +++ b/snapshotter/utils/rpc.py @@ -1,7 +1,9 @@ import asyncio import json import time +from typing import cast from typing import List +from typing import Set from typing import Union import eth_abi @@ -18,13 +20,19 @@ from tenacity import retry_if_exception_type from tenacity import stop_after_attempt from tenacity import wait_random_exponential +from web3 import AsyncHTTPProvider +from web3 import AsyncWeb3 from web3 import Web3 from web3._utils.abi import map_abi_data from web3._utils.events import get_event_data from web3._utils.normalizers import BASE_RETURN_NORMALIZERS from web3.eth import AsyncEth +from web3.middleware import async_construct_simple_cache_middleware +from web3.middleware import construct_simple_cache_middleware +from web3.types import RPCEndpoint from web3.types import TxParams from web3.types import Wei +from web3.utils.caching import SimpleCache from snapshotter.settings.config import settings from snapshotter.utils.default_logger import logger @@ -39,6 +47,15 @@ from snapshotter.utils.redis.redis_keys import rpc_json_rpc_calls from snapshotter.utils.redis.redis_keys import rpc_web3_calls +SIMPLE_CACHE_RPC_CHAIN_ID = cast( + Set[RPCEndpoint], + ( + 'web3_clientVersion', + 'net_version', + 'eth_chainId', + ), +) + def get_contract_abi_dict(abi): """ @@ -129,6 +146,8 @@ def __init__(self, rpc_settings: RPCConfigBase = settings.rpc, archive_mode=Fals self._client = None self._async_transport = None self._rate_limit_lua_script_shas = None + self._async_cache_chain_id_middleware = None + self._cache_chain_id_middleware = None async def _load_rate_limit_shas(self, redis_conn): """ @@ -175,52 +194,63 @@ async def _load_async_web3_providers(self): Loads async web3 providers for each node in the list of nodes. If a node already has a web3 client, it is skipped. """ + if not self._async_cache_chain_id_middleware: + self._async_cache_chain_id_middleware = await async_construct_simple_cache_middleware( + SimpleCache(), + SIMPLE_CACHE_RPC_CHAIN_ID, + ) for node in self._nodes: if node['web3_client_async'] is not None: continue - node['web3_client_async'] = Web3( - Web3.AsyncHTTPProvider(node['rpc_url']), - modules={'eth': (AsyncEth,)}, - middlewares=[], + + node['web3_client_async'] = AsyncWeb3(AsyncHTTPProvider(node['rpc_url'])) + node['web3_client_async'].middleware_onion.add( + self._async_cache_chain_id_middleware, + name='simple_cache_chain_id', ) - async def init(self, redis_conn): + async def init(self): """ - Initializes the RPC client by loading web3 providers and rate limits, - loading rate limit SHAs, initializing HTTP clients, and loading async + Initializes the RPC client by loading web3 providers, + initializing HTTP clients, and loading async web3 providers. - Args: - redis_conn: Redis connection object. - Returns: None """ if not self._sync_nodes_initialized: - self._load_web3_providers_and_rate_limits() + self._load_web3_providers() self._sync_nodes_initialized = True - await self._load_rate_limit_shas(redis_conn) await self._init_http_clients() await self._load_async_web3_providers() self._initialized = True - def _load_web3_providers_and_rate_limits(self): + def _load_web3_providers(self): """ - Load web3 providers and rate limits based on the archive mode. + Load web3 providers based on the archive mode. If archive mode is True, load archive nodes, otherwise load full nodes. """ + + if not self._cache_chain_id_middleware: + self._cache_chain_id_middleware = construct_simple_cache_middleware( + SimpleCache(), + SIMPLE_CACHE_RPC_CHAIN_ID, + ) if self._archive_mode: nodes = self._rpc_settings.archive_nodes else: nodes = self._rpc_settings.full_nodes for node in nodes: + self._logger.debug('Loading web3 provider for node {}', node.url) try: + _w3 = Web3(Web3.HTTPProvider(node.url)) + _w3.middleware_onion.add(self._cache_chain_id_middleware, name='simple_cache_chain_id') + self._nodes.append( { - 'web3_client': Web3(Web3.HTTPProvider(node.url)), + 'web3_client': _w3, 'web3_client_async': None, - 'rate_limit': limit_parse_many(node.rate_limit), 'rpc_url': node.url, }, ) @@ -246,7 +276,7 @@ def get_current_node(self): The current node to use for RPC calls. """ if not self._sync_nodes_initialized: - self._load_web3_providers_and_rate_limits() + self._load_web3_providers() self._sync_nodes_initialized = True if self._node_count == 0: @@ -274,13 +304,10 @@ def _on_node_exception(self, retry_state: tenacity.RetryCallState): next_node_idx, retry_state.outcome.exception(), ) - async def get_current_block_number(self, redis_conn): + async def get_current_block_number(self): """ Returns the current block number of the Ethereum blockchain. - Args: - redis_conn: Redis connection object. - Returns: The current block number of the Ethereum blockchain. @@ -296,40 +323,11 @@ async def get_current_block_number(self, redis_conn): ) async def f(node_idx): if not self._initialized: - await self.init(redis_conn=redis_conn) + await self.init() node = self._nodes[node_idx] - rpc_url = node.get('rpc_url') web3_provider = node['web3_client_async'] - await check_rpc_rate_limit( - parsed_limits=node.get('rate_limit', []), - app_id=rpc_url.split('/')[-1], - redis_conn=redis_conn, - request_payload='get_current_block_number', - error_msg={ - 'msg': 'exhausted_api_key_rate_limit inside get_current_blocknumber', - }, - logger=self._logger, - rate_limit_lua_script_shas=self._rate_limit_lua_script_shas, - limit_incr_by=1, - ) try: - cur_time = time.time() - await asyncio.gather( - redis_conn.zadd( - name=rpc_get_block_number_calls, - mapping={ - json.dumps( - 'get_current_block_number', - ): cur_time, - }, - ), - redis_conn.zremrangebyscore( - name=rpc_get_block_number_calls, - min=0, - max=cur_time - 3600, - ), - ) current_block = await web3_provider.eth.block_number except Exception as e: exc = RPCException( @@ -474,7 +472,7 @@ async def get_transaction_receipt(self, tx_hash, redis_conn): ) async def f(node_idx): if not self._initialized: - await self.init(redis_conn=redis_conn) + await self.init() node = self._nodes[node_idx] rpc_url = node.get('rpc_url') @@ -529,54 +527,6 @@ async def f(node_idx): return tx_receipt_details return await f(node_idx=0) - async def get_current_block(self, redis_conn, node_idx=0): - """ - Returns the current block number from the Ethereum node at the specified index. - - Args: - redis_conn (redis.Redis): Redis connection object. - node_idx (int): Index of the Ethereum node to use. Defaults to 0. - - Returns: - int: The current block number. - - Raises: - ExhaustedApiKeyRateLimitError: If the API key rate limit for the node is exhausted. - """ - node = self._nodes[node_idx] - rpc_url = node.get('rpc_url') - - await check_rpc_rate_limit( - parsed_limits=node.get('rate_limit', []), - app_id=rpc_url.split('/')[-1], - redis_conn=redis_conn, - request_payload='get_current_block', - error_msg={ - 'msg': 'exhausted_api_key_rate_limit inside web3_call', - }, - logger=self._logger, - rate_limit_lua_script_shas=self._rate_limit_lua_script_shas, - limit_incr_by=1, - ) - current_block = node['web3_client'].eth.block_number - - cur_time = time.time() - payload = {'time': cur_time, 'fn_name': 'get_current_block'} - await asyncio.gather( - redis_conn.zadd( - name=rpc_blocknumber_calls, - mapping={ - json.dumps(payload): cur_time, - }, - ), - redis_conn.zremrangebyscore( - name=rpc_blocknumber_calls, - min=0, - max=cur_time - 3600, - ), - ) - return current_block - async def web3_call(self, tasks, redis_conn, from_address=None): """ Calls the given tasks asynchronously using web3 and returns the response. @@ -590,7 +540,7 @@ async def web3_call(self, tasks, redis_conn, from_address=None): list: List of responses from the contract function calls. """ if not self._initialized: - await self.init(redis_conn) + await self.init() try: web3_tasks = [ @@ -720,7 +670,7 @@ async def batch_eth_get_balance_on_block_range( None is returned in its place. """ if not self._initialized: - await self.init(redis_conn) + await self.init() rpc_query = [] request_id = 1 @@ -761,7 +711,7 @@ async def batch_eth_call_on_block_range( from_block, to_block, params: Union[List, None] = None, - from_address=Web3.toChecksumAddress('0x0000000000000000000000000000000000000000'), + from_address=Web3.to_checksum_address('0x0000000000000000000000000000000000000000'), ): """ Batch executes an Ethereum contract function call on a range of blocks. @@ -780,7 +730,7 @@ async def batch_eth_call_on_block_range( list: A list of decoded results from the function call. """ if not self._initialized: - await self.init(redis_conn) + await self.init() if params is None: params = [] @@ -837,7 +787,7 @@ async def batch_eth_get_block(self, from_block, to_block, redis_conn): dict: A dictionary containing the response data from the JSON-RPC call. """ if not self._initialized: - await self.init(redis_conn) + await self.init() rpc_query = [] @@ -877,7 +827,7 @@ async def get_events_logs( List[Dict]: A list of dictionaries representing the decoded events logs. """ if not self._initialized: - await self.init(redis_conn) + await self.init() @retry( reraise=True,