Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Chore/cache matt #68

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion snapshotter/snapshotter_id_ping.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ async def main():
anchor_rpc = RpcHelper(settings.anchor_chain_rpc)
protocol_abi = read_json_file(settings.protocol_state.abi)
protocol_state_contract = anchor_rpc.get_current_node()['web3_client'].eth.contract(
address=Web3.toChecksumAddress(
address=Web3.to_checksum_address(
settings.protocol_state.address,
),
abi=protocol_abi,
Expand Down
30 changes: 13 additions & 17 deletions snapshotter/system_event_detector.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,18 @@
from snapshotter.utils.file_utils import read_json_file
from snapshotter.utils.models.data_models import EpochReleasedEvent
from snapshotter.utils.models.data_models import EventBase
from snapshotter.utils.models.data_models import ProjectsUpdatedEvent
from snapshotter.utils.models.data_models import SnapshotFinalizedEvent
from snapshotter.utils.models.data_models import SnapshottersUpdatedEvent
from snapshotter.utils.rabbitmq_helpers import RabbitmqThreadedSelectLoopInteractor
from snapshotter.utils.redis.redis_conn import RedisPoolCache
from snapshotter.utils.redis.redis_keys import event_detector_last_processed_block
from snapshotter.utils.redis.redis_keys import last_epoch_detected_epoch_id_key
from snapshotter.utils.redis.redis_keys import last_epoch_detected_timestamp_key
from snapshotter.utils.rpc import get_event_sig_and_abi
from snapshotter.utils.rpc import RpcHelper



def rabbitmq_and_redis_cleanup(fn):
"""
A decorator function that wraps the given function and handles cleanup of RabbitMQ and Redis connections in case of
Expand Down Expand Up @@ -110,14 +111,14 @@ def __init__(self, name, **kwargs):
self._rabbitmq_queue = queue.Queue()
self._shutdown_initiated = False
self._logger = logger.bind(
module=f'{name}|{settings.namespace}-{settings.instance_id[:5]}',
module=name,
)

self._exchange = (
f'{settings.rabbitmq.setup.event_detector.exchange}:{settings.namespace}'
)
self._routing_key_prefix = (
f'powerloom-event-detector:{settings.namespace}:{settings.instance_id}.'
f'event-detector:{settings.namespace}:{settings.instance_id}.'
)
self._aioredis_pool = None
self._redis_conn = None
Expand All @@ -131,7 +132,7 @@ def __init__(self, name, **kwargs):
)
self.contract_address = settings.protocol_state.address
self.contract = self.rpc_helper.get_current_node()['web3_client'].eth.contract(
address=Web3.toChecksumAddress(
address=Web3.to_checksum_address(
self.contract_address,
),
abi=self.contract_abi,
Expand All @@ -140,19 +141,16 @@ def __init__(self, name, **kwargs):
# event EpochReleased(uint256 indexed epochId, uint256 begin, uint256 end, uint256 timestamp);
# event SnapshotFinalized(uint256 indexed epochId, uint256 epochEnd, string projectId,
# string snapshotCid, uint256 timestamp);
# event ProjectsUpdated(string projectId, bool allowed);

EVENTS_ABI = {
'EpochReleased': self.contract.events.EpochReleased._get_event_abi(),
'SnapshotFinalized': self.contract.events.SnapshotFinalized._get_event_abi(),
'ProjectsUpdated': self.contract.events.ProjectsUpdated._get_event_abi(),
'allSnapshottersUpdated': self.contract.events.allSnapshottersUpdated._get_event_abi(),
}

EVENT_SIGS = {
'EpochReleased': 'EpochReleased(uint256,uint256,uint256,uint256)',
'SnapshotFinalized': 'SnapshotFinalized(uint256,uint256,string,string,uint256)',
'ProjectsUpdated': 'ProjectsUpdated(string,bool,uint256)',
'allSnapshottersUpdated': 'allSnapshottersUpdated(address,bool)',

}
Expand Down Expand Up @@ -197,6 +195,7 @@ async def get_events(self, from_block: int, to_block: int):

events = []
new_epoch_detected = False
latest_epoch_id = - 1
for log in events_log:
if log.event == 'EpochReleased':
event = EpochReleasedEvent(
Expand All @@ -206,6 +205,7 @@ async def get_events(self, from_block: int, to_block: int):
timestamp=log.args.timestamp,
)
new_epoch_detected = True
latest_epoch_id = max(latest_epoch_id, log.args.epochId)
events.append((log.event, event))

elif log.event == 'SnapshotFinalized':
Expand All @@ -217,14 +217,6 @@ async def get_events(self, from_block: int, to_block: int):
timestamp=log.args.timestamp,
)
events.append((log.event, event))
elif log.event == 'ProjectsUpdated':
event = ProjectsUpdatedEvent(
projectId=log.args.projectId,
allowed=log.args.allowed,
enableEpochId=log.args.enableEpochId,
timestamp=int(time.time()),
)
events.append((log.event, event))
elif log.event == 'allSnapshottersUpdated':
event = SnapshottersUpdatedEvent(
snapshotterAddress=log.args.snapshotterAddress,
Expand All @@ -238,6 +230,10 @@ async def get_events(self, from_block: int, to_block: int):
last_epoch_detected_timestamp_key(),
int(time.time()),
)
await self._redis_conn.set(
last_epoch_detected_epoch_id_key(),
latest_epoch_id,
)

self._logger.info('Events: {}', events)
return events
Expand Down Expand Up @@ -298,7 +294,7 @@ async def _detect_events(self):
"""
while True:
try:
current_block = await self.rpc_helper.get_current_block(redis_conn=self._redis_conn)
current_block = await self.rpc_helper.get_current_block_number()
self._logger.info('Current block: {}', current_block)

except Exception as e:
Expand Down Expand Up @@ -421,4 +417,4 @@ def run(self):

self.ev_loop.run_until_complete(
self._detect_events(),
)
)
79 changes: 57 additions & 22 deletions snapshotter/utils/data_utils.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import asyncio
import json
import os
from typing import List

import tenacity
Expand All @@ -11,10 +10,7 @@
from tenacity import stop_after_attempt
from tenacity import wait_random_exponential

from snapshotter.settings.config import settings
from snapshotter.utils.default_logger import logger
from snapshotter.utils.file_utils import read_json_file
from snapshotter.utils.file_utils import write_json_file
from snapshotter.utils.models.data_models import ProjectStatus
from snapshotter.utils.models.data_models import SnapshotterIncorrectSnapshotSubmission
from snapshotter.utils.models.data_models import SnapshotterMissedSnapshotSubmission
Expand Down Expand Up @@ -135,6 +131,54 @@ async def w3_get_and_cache_finalized_cid(
return f'null_{epoch_id}', epoch_id


async def get_project_last_finalized_cid_and_epoch(redis_conn: aioredis.Redis, state_contract_obj, rpc_helper, project_id):
"""
Get the last epoch for a given project ID.

Args:
redis_conn (aioredis.Redis): Redis connection object.
state_contract_obj: Contract object for the state contract.
rpc_helper: RPC helper object.
project_id (str): ID of the project.

Returns:
int: The last epoch for the given project ID.
"""
project_last_finalized = await redis_conn.zrevrangebyscore(
project_finalized_data_zset(project_id),
max='+inf',
min='-inf',
withscores=True,
start=0,
num=1,
)

if project_last_finalized:
project_last_finalized_cid, project_last_finalized_epoch = project_last_finalized[0]
project_last_finalized_epoch = int(project_last_finalized_epoch)
project_last_finalized_cid = project_last_finalized_cid.decode('utf-8')

if project_last_finalized_cid and 'null' not in project_last_finalized_cid:
return project_last_finalized_cid, project_last_finalized_epoch

tasks = [
state_contract_obj.functions.lastFinalizedSnapshot(project_id),
]

[last_finalized_epoch] = await rpc_helper.web3_call(tasks, redis_conn=redis_conn)
logger.info(f'last finalized epoch for project {project_id} is {last_finalized_epoch}')

# getting finalized cid for last finalized epoch
last_finalized_cid = await get_project_finalized_cid(
redis_conn, state_contract_obj, rpc_helper, last_finalized_epoch, project_id,
)

if last_finalized_cid and 'null' not in last_finalized_cid:
return last_finalized_cid, int(last_finalized_epoch)
else:
return '', 0


# TODO: warmup cache to reduce RPC calls overhead
async def get_project_first_epoch(redis_conn: aioredis.Redis, state_contract_obj, rpc_helper, project_id):
"""
Expand Down Expand Up @@ -216,23 +260,14 @@ async def get_submission_data(redis_conn: aioredis.Redis, cid, ipfs_reader, proj
if 'null' in cid:
return dict()

cached_data_path = os.path.join(settings.ipfs.local_cache_path, project_id, 'snapshots')
filename = f'{cid}.json'
logger.info('Project {} CID {}, fetching data from IPFS', project_id, cid)
try:
submission_data = read_json_file(os.path.join(cached_data_path, filename))
except Exception as e:
# Fetch from IPFS
logger.trace('Error while reading from cache', error=e)
logger.info('Project {} CID {}, fetching data from IPFS', project_id, cid)
try:
submission_data = await fetch_file_from_ipfs(ipfs_reader, cid)
except:
logger.error('Error while fetching data from IPFS | Project {} | CID {}', project_id, cid)
submission_data = dict()
else:
# Cache it
write_json_file(cached_data_path, filename, submission_data)
submission_data = json.loads(submission_data)
submission_data = await fetch_file_from_ipfs(ipfs_reader, cid)
except:
logger.error('Error while fetching data from IPFS | Project {} | CID {}', project_id, cid)
submission_data = dict()
else:
submission_data = json.loads(submission_data)
return submission_data


Expand Down Expand Up @@ -352,7 +387,7 @@ async def build_projects_list_from_events(redis_conn: aioredis.Redis, state_cont
redis_conn=redis_conn,
)

current_block = await rpc_helper.get_current_block_number(redis_conn)
current_block = await rpc_helper.get_current_block_number()
event_sig, event_abi = get_event_sig_and_abi(EVENT_SIGS, EVENT_ABI)

# from start_block to current block, get all events in batches of 1000, 10 requests parallelly
Expand Down Expand Up @@ -726,4 +761,4 @@ async def get_snapshotter_project_status(redis_conn: aioredis.Redis, project_id:
),
)

return project_status
return project_status
Loading
Loading