diff --git a/snapshotter/core_api.py b/snapshotter/core_api.py index 66078661..1143d0dc 100644 --- a/snapshotter/core_api.py +++ b/snapshotter/core_api.py @@ -1,4 +1,5 @@ import json +from threading import currentThread from typing import List from fastapi import Depends @@ -97,7 +98,7 @@ async def startup_boilerplate(): app.state.ipfs_singleton = AsyncIPFSClientSingleton(settings.ipfs) await app.state.ipfs_singleton.init_sessions() app.state.ipfs_reader_client = app.state.ipfs_singleton._ipfs_read_client - + app.state.epoch_size = 0 # Health check endpoint @app.get('/health') @@ -540,15 +541,30 @@ async def get_snapshotter_epoch_processing_status( 'message': f'Unable to get current epoch, error: {e}', } current_epoch_id = current_epoch['epochId'] + if request.app.state.epoch_size == 0: + [epoch_size, ] = await request.app.state.anchor_rpc_helper.web3_call( + [request.app.state.protocol_state_contract.functions.EPOCH_SIZE()], + redis_conn=request.app.state.redis_pool, + ) + rest_logger.info(f'Setting Epoch size: {epoch_size}') + request.app.state.epoch_size = epoch_size for epoch_id in range(current_epoch_id, current_epoch_id - 30 - 1, -1): epoch_specific_report = SnapshotterEpochProcessingReportItem.construct() - epoch_specific_report.epochId = epoch_id - epoch_release_status = await redis_conn.get( epoch_id_epoch_released_key(epoch_id=epoch_id), ) if not epoch_release_status: continue + epoch_specific_report.epochId = epoch_id + if epoch_id == current_epoch_id: + epoch_specific_report.epochEnd = current_epoch['end'] + else: + epoch_specific_report.epochEnd = current_epoch['end'] - ( + (current_epoch_id - epoch_id) * request.app.state.epoch_size + ) + rest_logger.debug( + f'Epoch End for epoch_id: {epoch_id} is {epoch_specific_report.epochEnd}', + ) epoch_specific_report.transitionStatus = dict() if epoch_release_status: epoch_specific_report.transitionStatus['EPOCH_RELEASED'] = SnapshotterStateUpdate( diff --git a/snapshotter/modules/pooler/uniswapv2/aggregate/single_uniswap_trade_volume_24h.py b/snapshotter/modules/pooler/uniswapv2/aggregate/single_uniswap_trade_volume_24h.py index 6630dd28..208b8893 100644 --- a/snapshotter/modules/pooler/uniswapv2/aggregate/single_uniswap_trade_volume_24h.py +++ b/snapshotter/modules/pooler/uniswapv2/aggregate/single_uniswap_trade_volume_24h.py @@ -65,7 +65,8 @@ async def _calculate_from_scratch( protocol_state_contract, project_id: str, ): - # skipping calculate from scratch for pretest + # skipping calculate from scratch for pretest and testnet participants if they are not 'full' nodes + # TODO: check against snapshotter identity on protocol state contract self._logger.info('skipping calculate from scratch for pretest') return diff --git a/snapshotter/process_hub_core.py b/snapshotter/process_hub_core.py index 923524d6..c560c445 100644 --- a/snapshotter/process_hub_core.py +++ b/snapshotter/process_hub_core.py @@ -1,10 +1,17 @@ from datetime import datetime +from distutils import core import json import os +from re import S +import resource import threading import time from urllib.parse import urljoin import uuid +import psutil +import pydantic +import redis +import httpx from multiprocessing import Process from signal import SIGCHLD from signal import SIGINT @@ -14,12 +21,7 @@ from threading import Thread from typing import Dict from typing import Optional -import httpx - -import psutil -import pydantic -import redis - +from eth_utils.address import to_checksum_address from snapshotter.processor_distributor import ProcessorDistributor from snapshotter.settings.config import settings from snapshotter.system_event_detector import EventDetectorProcess @@ -28,13 +30,15 @@ from snapshotter.utils.default_logger import logger from snapshotter.utils.delegate_worker import DelegateAsyncWorker from snapshotter.utils.exceptions import SelfExitException +from snapshotter.utils.file_utils import read_json_file from snapshotter.utils.helper_functions import cleanup_proc_hub_children -from snapshotter.utils.models.data_models import ProcessorWorkerDetails, SnapshotterIssue, SnapshotterReportState -from snapshotter.utils.models.data_models import SnapshotWorkerDetails +from snapshotter.utils.models.data_models import ProcessorWorkerDetails, SnapshotterEpochProcessingReportItem, SnapshotterIssue, SnapshotterReportState, SnapshotterStateUpdate, SnapshotterStates from snapshotter.utils.models.data_models import SnapshotterPing from snapshotter.utils.models.message_models import ProcessHubCommand from snapshotter.utils.rabbitmq_helpers import RabbitmqSelectLoopInteractor -from snapshotter.utils.redis.redis_conn import provide_redis_conn +from snapshotter.utils.redis.redis_conn import REDIS_CONN_CONF, provide_redis_conn +from snapshotter.utils.redis.redis_keys import epoch_id_epoch_released_key, epoch_id_project_to_state_mapping +from snapshotter.utils.rpc import RpcHelper from snapshotter.utils.snapshot_worker import SnapshotAsyncWorker PROC_STR_ID_TO_CLASS_MAP = { @@ -52,10 +56,14 @@ class ProcessHubCore(Process): + _anchor_rpc_helper: RpcHelper + _redis_connection_pool_sync: redis.BlockingConnectionPool + _redis_conn_sync: redis.Redis + def __init__(self, name, **kwargs): Process.__init__(self, name=name, **kwargs) self._spawned_processes_map: Dict[str, Optional[int]] = dict() # process name to pid map - self._spawned_cb_processes_map: Dict[str, Dict[str, Optional[SnapshotWorkerDetails]]] = ( + self._spawned_cb_processes_map: Dict[str, Dict[str, Optional[ProcessorWorkerDetails]]] = ( dict() ) # separate map for callback worker spawns. unique ID -> dict(unique_name, pid) self._httpx_client = httpx.Client( @@ -67,125 +75,15 @@ def __init__(self, name, **kwargs): ), ) self._last_reporting_service_ping = 0 + self._last_epoch_processing_health_check = 0 + self._start_time = 0 + self._source_chain_block_time = 0 + self._epoch_size = 0 self._thread_shutdown_event = threading.Event() self._shutdown_initiated = False def signal_handler(self, signum, frame): - if signum == SIGCHLD and not self._shutdown_initiated: - pid, status = os.waitpid( - -1, os.WNOHANG | os.WUNTRACED | os.WCONTINUED, - ) - if os.WIFCONTINUED(status) or os.WIFSTOPPED(status): - return - if os.WIFSIGNALED(status) or os.WIFEXITED(status): - self._logger.debug( - ( - 'Received process crash notification for child process' - ' PID: {}' - ), - pid, - ) - callback_worker_module_file = None - callback_worker_class = None - callback_worker_name = None - callback_worker_unique_id = None - for ( - cb_worker_type, - worker_unique_id_entries, - ) in self._spawned_cb_processes_map.items(): - for ( - unique_id, - worker_process_details, - ) in worker_unique_id_entries.items(): - if worker_process_details is not None and worker_process_details.pid == pid: - self._logger.debug( - ( - 'Found crashed child process PID in spawned' - ' callback workers | Callback worker class:' - ' {} | Unique worker identifier: {}' - ), - cb_worker_type, - worker_process_details.unique_name, - ) - callback_worker_name = worker_process_details.unique_name - callback_worker_unique_id = unique_id - callback_worker_class = cb_worker_type - - break - - if ( - callback_worker_name and - callback_worker_unique_id and callback_worker_class - ): - - if callback_worker_class == 'snapshot_workers': - worker_obj: Process = SnapshotAsyncWorker( - name=callback_worker_name, - ) - elif callback_worker_class == 'aggregation_workers': - worker_obj: Process = AggregationAsyncWorker( - name=callback_worker_name, - ) - - elif callback_worker_class == 'delegate_workers': - worker_obj: Process = DelegateAsyncWorker( - name=callback_worker_name, - ) - - worker_obj.start() - self._spawned_cb_processes_map[callback_worker_class][callback_worker_unique_id] = \ - SnapshotWorkerDetails(unique_name=callback_worker_unique_id, pid=worker_obj.pid) - self._logger.debug( - ( - 'Respawned callback worker class {} unique ID {}' - ' with PID {} after receiving crash signal against' - ' PID {}' - ), - callback_worker_class, - callback_worker_unique_id, - worker_obj.pid, - pid, - ) - if settings.reporting.service_url: - send_failure_notifications_sync( - client=self._httpx_client, - message=SnapshotterIssue( - instanceID=settings.instance_id, - issueType=SnapshotterReportState.CRASHED_CHILD_WORKER.value, - projectID='', - epochId='', - timeOfReporting=datetime.now().isoformat(), - extra=json.dumps( - { - 'worker_name': callback_worker_name, - 'pid': pid, - 'worker_class': callback_worker_class, - 'worker_unique_id': callback_worker_unique_id, - 'respawned_pid': worker_obj.pid, - } - ), - ) - ) - return - - for cb_worker_type, worker_pid in self._spawned_processes_map.items(): - if worker_pid is not None and worker_pid == pid: - self._logger.debug('RESPAWNING: process for {}', cb_worker_type) - proc_details: dict = PROC_STR_ID_TO_CLASS_MAP.get(cb_worker_type) - init_kwargs = dict(name=proc_details['name']) - if proc_details.get('class'): - proc_obj = proc_details['class'](**init_kwargs) - proc_obj.start() - else: - proc_obj = Process(target=proc_details['target']) - proc_obj.start() - self._logger.debug( - 'RESPAWNED: process for {} with PID: {}', - cb_worker_type, - proc_obj.pid, - ) - self._spawned_processes_map[cb_worker_type] = proc_obj.pid - elif signum in [SIGINT, SIGTERM, SIGQUIT]: + if signum in [SIGINT, SIGTERM, SIGQUIT]: self._shutdown_initiated = True if settings.reporting.service_url: self._logger.debug('Sending shutdown signal to reporting service') @@ -222,12 +120,14 @@ def kill_process(self, pid: int): for unique_worker_entry in v.values(): if unique_worker_entry is not None and unique_worker_entry.pid == pid: psutil.Process(pid).wait() + break for k, v in self._spawned_processes_map.items(): if v is not None and v == pid: self._logger.debug('Waiting for process ID {} to join...', pid) psutil.Process(pid).wait() self._logger.debug('Process ID {} joined...', pid) + break @provide_redis_conn def internal_state_reporter(self, redis_conn: redis.Redis = None): @@ -266,6 +166,7 @@ def internal_state_reporter(self, redis_conn: redis.Redis = None): mapping=proc_id_map, ) if settings.reporting.service_url and int(time.time()) - self._last_reporting_service_ping >= 30: + self._last_reporting_service_ping = int(time.time()) try: self._httpx_client.post( url=urljoin(settings.reporting.service_url, '/ping'), @@ -278,7 +179,123 @@ def internal_state_reporter(self, redis_conn: redis.Redis = None): self._logger.error( 'Error while pinging reporting service: {}', e, ) - self._last_reporting_service_ping = int(time.time()) + if int(time.time()) - self._last_epoch_processing_health_check >= 60: + if self._source_chain_block_time != 0 and self._epoch_size != 0: + if int(time.time()) - self._start_time <= 4 * self._source_chain_block_time * self._epoch_size: + # self._logger.info( + # 'Skipping epoch processing health check because ' + # 'not enough time has passed for 4 epochs to consider health check since process start | ' + # 'Start time: {} | Currentime: {} | Source chain block time: {}', + # datetime.fromtimestamp(self._start_time).isoformat(), + # datetime.now().isoformat(), + # self._source_chain_block_time, + # ) + continue + else: + self._logger.info( + 'Skipping epoch processing health check because source chain block time or epoch size is not known | ' + 'Source chain block time: {} | Epoch size: {}', + self._source_chain_block_time, + self._epoch_size, + ) + continue + self._last_epoch_processing_health_check = int(time.time()) + self._logger.debug( + 'Continuing with epoch processing health check since 4 or more epochs have passed since process start' + ) + # check for epoch processing status + try: + current_epoch_data = self._protocol_state_contract.functions.currentEpoch().call() + current_epoch = { + 'begin': current_epoch_data[0], + 'end': current_epoch_data[1], + 'epochId': current_epoch_data[2], + } + + except Exception as e: + self._logger.exception( + 'Exception in get_current_epoch', + e=e, + ) + continue + current_epoch_id = current_epoch['epochId'] + epoch_health = dict() + # check from previous epoch processing status until 2 further epochs + for epoch_id in range(current_epoch_id - 1, current_epoch_id - 3 - 1, -1): + epoch_specific_report = SnapshotterEpochProcessingReportItem.construct() + success_percentage = 0 + divisor = 1 + epoch_specific_report.epochId = epoch_id + for state in SnapshotterStates: + if state not in [SnapshotterStates.SNAPSHOT_BUILD, SnapshotterStates.RELAYER_SEND]: + continue + state_report_entries = self._redis_conn_sync.hgetall( + name=epoch_id_project_to_state_mapping(epoch_id=epoch_id, state_id=state.value), + ) + if state_report_entries: + project_state_report_entries = dict() + epoch_specific_report.transitionStatus = dict() + # epoch_specific_report.transitionStatus[state.value] = dict() + project_state_report_entries = { + project_id.decode('utf-8'): SnapshotterStateUpdate.parse_raw(project_state_entry) + for project_id, project_state_entry in state_report_entries.items() + } + epoch_specific_report.transitionStatus[state.value] = project_state_report_entries + success_percentage += len( + [ + project_state_report_entry + for project_state_report_entry in project_state_report_entries.values() + if project_state_report_entry.status == 'success' + ], + ) / len(project_state_report_entries) + success_percentage /= divisor + divisor += 1 + else: + epoch_specific_report.transitionStatus[state.value] = None + if success_percentage != 0: + self._logger.debug( + 'Epoch {} processing success percentage within states {}: {}', + list(epoch_specific_report.transitionStatus.keys()), + epoch_id, + success_percentage * 100, + ) + + if any([x is None for x in epoch_specific_report.transitionStatus.values()]): + epoch_health[epoch_id] = False + self._logger.debug( + 'Marking epoch {} as unhealthy due to missing state reports against transitions {}', + epoch_id, + [x for x, y in epoch_specific_report.transitionStatus.items() if y is None], + ) + if success_percentage < 0.5: + epoch_health[epoch_id] = False + self._logger.debug( + 'Marking epoch {} as unhealthy due to low success percentage: {}', + epoch_id, + success_percentage, + ) + if len([epoch_id for epoch_id, healthy in epoch_health.items() if not healthy]) >= 2: + self._logger.debug( + 'Sending unhealthy epoch report to reporting service: {}', + epoch_health, + ) + send_failure_notifications_sync( + client=self._httpx_client, + message=SnapshotterIssue( + instanceID=settings.instance_id, + issueType=SnapshotterReportState.UNHEALTHY_EPOCH_PROCESSING.value, + projectID='', + epochId='', + timeOfReporting=datetime.now().isoformat(), + extra=json.dumps( + { + 'epoch_health': epoch_health, + } + ), + ) + ) + self._logger.info('Proceeding to respawn all children because epochs were found unhealthy: {}', epoch_health) + self._respawn_all_children() self._logger.error( ( 'Caught thread shutdown notification event. Deleting process' @@ -289,13 +306,75 @@ def internal_state_reporter(self, redis_conn: redis.Redis = None): f'powerloom:snapshotter:{settings.namespace}:{settings.instance_id}:Processes', ) - @cleanup_proc_hub_children - def run(self) -> None: - self._logger = logger.bind(module='Powerloom|ProcessHub|Core') - - for signame in [SIGINT, SIGTERM, SIGQUIT, SIGCHLD]: - signal(signame, self.signal_handler) + def _kill_all_children(self, core_workers=True): + self._logger.error('Waiting on spawned callback workers to join...') + for ( + worker_class_name, + unique_worker_entries, + ) in self._spawned_cb_processes_map.items(): + procs = [] + for ( + worker_unique_id, + worker_unique_process_details, + ) in unique_worker_entries.items(): + if worker_unique_process_details is not None and worker_unique_process_details.pid: + self._logger.error( + ( + 'Waiting on spawned callback worker {} | Unique' + ' ID {} | PID {} to join...' + ), + worker_class_name, + worker_unique_id, + worker_unique_process_details.pid, + ) + _ = psutil.Process(pid=worker_unique_process_details.pid) + procs.append(_) + _.terminate() + gone, alive = psutil.wait_procs(procs, timeout=3) + for p in alive: + self._logger.error( + 'Sending SIGKILL to spawned callback worker {} after not exiting on SIGTERM | PID {}', + worker_class_name, + p.pid, + ) + p.kill() + self._spawned_cb_processes_map = dict() + if core_workers: + logger.error( + 'Waiting on spawned core workers to join... {}', + self._spawned_processes_map, + ) + procs = [] + for ( + worker_class_name, + worker_pid, + ) in self._spawned_processes_map.items(): + self._logger.error( + 'spawned Process Pid to wait on {}', + worker_pid, + ) + if worker_pid is not None: + self._logger.error( + ( + 'Waiting on spawned core worker {} | PID {} to' + ' join...' + ), + worker_class_name, + worker_pid, + ) + _ = psutil.Process(worker_pid) + procs.append(_) + _.terminate() + gone, alive = psutil.wait_procs(procs, timeout=3) + for p in alive: + self._logger.error( + 'Sending SIGKILL to spawned core worker after not exiting on SIGTERM | PID {}', + p.pid, + ) + p.kill() + self._spawned_processes_map = dict() + def _launch_snapshot_cb_workers(self): self._logger.debug('=' * 80) self._logger.debug('Launching Workers') @@ -316,8 +395,8 @@ def run(self) -> None: ) self._logger.debug( ( - 'Process Hub Core launched process {} for snapshot' - ' worker {} with PID: {}' + 'Process Hub Core launched process {} for' + ' worker type {} with PID: {}' ), unique_name, 'snapshot_workers', @@ -341,7 +420,7 @@ def run(self) -> None: self._logger.debug( ( 'Process Hub Core launched process {} for' - ' worker {} with PID: {}' + ' worker type {} with PID: {}' ), unique_name, 'aggregation_workers', @@ -368,16 +447,106 @@ def run(self) -> None: self._logger.debug( ( 'Process Hub Core launched process {} for' - ' worker {} with PID: {}' + ' worker type {} with PID: {}' ), unique_name, 'delegate_workers', delegate_worker_obj.pid, ) + def _launch_core_worker(self, proc_name, proc_init_kwargs=dict()): + try: + proc_details: dict = PROC_STR_ID_TO_CLASS_MAP[proc_name] + init_kwargs = dict(name=proc_details['name']) + init_kwargs.update(proc_init_kwargs) + if proc_details.get('class'): + proc_obj = proc_details['class'](**init_kwargs) + proc_obj.start() + else: + proc_obj = Process( + target=proc_details['target'], + kwargs=proc_init_kwargs, + ) + proc_obj.start() + self._logger.debug( + 'Process Hub Core launched process for {} with PID: {}', + proc_name, + proc_obj.pid, + ) + self._spawned_processes_map[proc_name] = proc_obj.pid + except Exception as err: + self._logger.opt(exception=True).error( + 'Error while starting process {} | ' + '{}', + proc_name, + str(err), + ) + + def _respawn_all_children(self): + self._kill_all_children() + self._launch_all_children() + self._start_time = time.time() + self._last_epoch_processing_health_check = 0 + + def _launch_all_children(self): + self._logger.debug('=' * 80) + self._logger.debug('Launching Core Workers') + self._launch_snapshot_cb_workers() + for proc_name in PROC_STR_ID_TO_CLASS_MAP.keys(): + self._launch_core_worker(proc_name) + self._launch_snapshot_cb_workers() + + @cleanup_proc_hub_children + def run(self) -> None: + self._logger = logger.bind(module='Powerloom|ProcessHub|Core') + + for signame in [SIGINT, SIGTERM, SIGQUIT, SIGCHLD]: + signal(signame, self.signal_handler) + + soft, hard = resource.getrlimit(resource.RLIMIT_NOFILE) + resource.setrlimit( + resource.RLIMIT_NOFILE, + (settings.rlimit.file_descriptors, hard), + ) + + self._redis_connection_pool_sync = redis.BlockingConnectionPool(**REDIS_CONN_CONF) + self._redis_conn_sync = redis.Redis(connection_pool=self._redis_connection_pool_sync) + self._anchor_rpc_helper = RpcHelper( + rpc_settings=settings.anchor_chain_rpc + ) + self._anchor_rpc_helper._load_web3_providers_and_rate_limits() + protocol_abi = read_json_file(settings.protocol_state.abi, self._logger) + self._protocol_state_contract = self._anchor_rpc_helper.get_current_node()['web3_client'].eth.contract( + address=to_checksum_address( + settings.protocol_state.address, + ), + abi=protocol_abi, + ) + try: + source_block_time = self._protocol_state_contract.functions.SOURCE_CHAIN_BLOCK_TIME().call() + except Exception as e: + self._logger.exception( + 'Exception in querying protocol state for source chain block time: {}', + e, + ) + else: + self._source_chain_block_time = source_block_time / 10 ** 4 + self._logger.debug('Set source chain block time to {}', self._source_chain_block_time) + + try: + epoch_size = self._protocol_state_contract.functions.EPOCH_SIZE().call() + except Exception as e: + self._logger.exception( + 'Exception in querying protocol state for epoch size: {}', + e, + ) + else: + self._epoch_size = epoch_size + self._launch_snapshot_cb_workers() self._logger.debug( 'Starting Internal Process State reporter for Process Hub Core...', ) + self._start_time = time.time() self._reporter_thread = Thread(target=self.internal_state_reporter) self._reporter_thread.start() self._logger.debug('Starting Process Hub Core...') @@ -443,7 +612,7 @@ def callback(self, dont_use_ch, method, properties, body): self.kill_process(worker_process_details.pid) self._spawned_cb_processes_map[ cb_worker_type - ][worker_unique_id] = SnapshotWorkerDetails( + ][worker_unique_id] = ProcessorWorkerDetails( unique_name=worker_unique_id, pid=None, ) self._logger.info( @@ -455,39 +624,25 @@ def callback(self, dont_use_ch, method, properties, body): self._spawned_processes_map[proc_str_id] = None elif cmd_json.command == 'start': - try: - self._logger.debug( - 'Process Hub Core received start command: {}', cmd_json, - ) - proc_name = cmd_json.proc_str_id - self._logger.debug( - 'Process Hub Core launching process for {}', proc_name, - ) - proc_details: dict = PROC_STR_ID_TO_CLASS_MAP.get(proc_name) - init_kwargs = dict(name=proc_details['name']) - init_kwargs.update(cmd_json.init_kwargs) - if proc_details.get('class'): - proc_obj = proc_details['class'](**init_kwargs) - proc_obj.start() - else: - proc_obj = Process( - target=proc_details['target'], - kwargs=cmd_json.init_kwargs, - ) - proc_obj.start() - self._logger.debug( - 'Process Hub Core launched process for {} with PID: {}', - proc_name, - proc_obj.pid, + self._logger.debug( + 'Process Hub Core received start command: {}', cmd_json, + ) + proc_name = cmd_json.proc_str_id + if not proc_name: + self._logger.error( + 'Received start command without process name', ) - self._spawned_processes_map[proc_name] = proc_obj.pid - except Exception as err: - self._logger.opt(exception=True).error( - ( - f'Error while starting a process:{cmd_json} |' - f' error_msg: {str(err)}' - ), + return + if proc_name not in PROC_STR_ID_TO_CLASS_MAP.keys(): + self._logger.error( + 'Received unrecognized process name to start: {}', proc_name, ) + return + self._logger.debug( + 'Process Hub Core launching process for {}', proc_name, + ) + self._launch_core_worker(proc_name, cmd_json.init_kwargs) + elif cmd_json.command == 'restart': try: self._logger.debug( @@ -508,6 +663,8 @@ def callback(self, dont_use_ch, method, properties, body): f' error_msg: {str(err)}' ), ) + elif cmd_json.command == 'respawn': + self._respawn_all_children() if __name__ == '__main__': diff --git a/snapshotter/processhub_cmd.py b/snapshotter/processhub_cmd.py index 14337330..cae2291f 100644 --- a/snapshotter/processhub_cmd.py +++ b/snapshotter/processhub_cmd.py @@ -14,50 +14,74 @@ app = typer.Typer() +def process_up(pid): + """ + Is the process up? + :return: True if process is up + """ + p_ = psutil.Process(pid) + return p_.is_running() + # try: + # return os.waitpid(pid, os.WNOHANG) is not None + # except ChildProcessError: # no child processes + # return False + # try: + # call = subprocess.check_output("pidof '{}'".format(self.processName), shell=True) + # return True + # except subprocess.CalledProcessError: + # return False + + @app.command() -def pidStatus(connections: bool = False): - def print_formatted_status(process_name, pid): - try: - process = psutil.Process(pid=pid) - print(f'{pid} -') - print(f'\t name: {process.name()}') - print(f'\t status: {process.status()}') - print(f'\t threads: {process.num_threads()}') - print(f'\t file descriptors: {process.num_fds()}') - print(f'\t memory: {process.memory_info()}') - print(f'\t cpu: {process.cpu_times()}') - print( - f"\t number of connections: {len(process.connections(kind='inet'))}", - ) - if connections: - print( - f"\t number of connections: {process.connections(kind='inet')}", - ) - print('\n') - except Exception as err: - if type(err).__name__ == 'NoSuchProcess': - print(f'{pid} - NoSuchProcess') - print(f'\t name: {process_name}\n') +def processReport(): + connection_pool = redis.BlockingConnectionPool(**REDIS_CONN_CONF) + redis_conn = redis.Redis(connection_pool=connection_pool) + map_raw = redis_conn.hgetall( + name=f'powerloom:snapshotter:{settings.namespace}:{settings.instance_id}:Processes', + ) + event_det_pid = map_raw[b'SystemEventDetector'] + print('\n' + '=' * 20 + 'System Event Detector' + '=' * 20) + try: + event_det_pid = int(event_det_pid) + except ValueError: + print('Event detector pid found in process map not a PID: ', event_det_pid) + else: + # event_detector_proc = psutil.Process(event_det_pid) + print('Event detector process running status: ', process_up(event_det_pid)) + + print('\n' + '=' * 20 + 'Worker Processor Distributor' + '=' * 20) + proc_dist_pid = map_raw[b'ProcessorDistributor'] + try: + proc_dist_pid = int(proc_dist_pid) + except ValueError: + print('Processor distributor pid found in process map not a PID: ', proc_dist_pid) + else: + # proc_dist_proc = psutil.Process(proc_dist_pid) + print('Processor distributor process running status: ', process_up(proc_dist_pid)) + + print('\n' + '=' * 20 + 'Worker Processes' + '=' * 20) + cb_worker_map = map_raw[b'callback_workers'] + try: + cb_worker_map = json.loads(cb_worker_map) + except json.JSONDecodeError: + print('Callback worker entries in cache corrupted...', cb_worker_map) + return + for worker_type, worker_details in cb_worker_map.items(): + section_name = worker_type.capitalize() + print('\n' + '*' * 10 + section_name + '*' * 10) + if not worker_details or not isinstance(worker_details, dict): + print(f'No {section_name} workers found in process map: ', worker_details) + continue + for short_id, worker_details in worker_details.items(): + print('\n' + '-' * 5 + short_id + '-' * 5) + proc_pid = worker_details['pid'] + try: + proc_pid = int(proc_pid) + except ValueError: + print(f'Process name {worker_details["id"]} pid found in process map not a PID: ', proc_pid) else: - print(f'Unknown Error: {str(err)}') - - r = redis.Redis(**REDIS_CONN_CONF, single_connection_client=True) - print('\n') - for k, v in r.hgetall( - name=f'powerloom:uniswap:{settings.namespace}:{settings.instance_id}:Processes', - ).items(): - key = k.decode('utf-8') - value = v.decode('utf-8') - - if key == 'callback_workers': - value = json.loads(value) - for i, j in value.items(): - print_formatted_status(j['id'], int(j['pid'])) - elif value.isdigit(): - print_formatted_status(key, int(value)) - else: - print(f'# Unknown type of key:{key}, value:{value}') - print('\n') + # proc = psutil.Process(proc_pid) + print('Process name ' + worker_details['id'] + ' running status: ', process_up(proc_pid)) # https://typer.tiangolo.com/tutorial/commands/context/#configuring-the-context @@ -143,5 +167,20 @@ def stop( ) +@app.command() +def respawn(): + c = create_rabbitmq_conn() + typer.secho('Opening RabbitMQ channel...', fg=typer.colors.GREEN) + ch = c.channel() + proc_hub_cmd = ProcessHubCommand( + command='respawn', + ) + processhub_command_publish(ch, proc_hub_cmd.json()) + typer.secho( + f'Sent command to ProcessHubCore | Command: {proc_hub_cmd.json()}', + fg=typer.colors.YELLOW, + ) + + if __name__ == '__main__': app() diff --git a/snapshotter/processor_distributor.py b/snapshotter/processor_distributor.py index 2122972f..04c56832 100644 --- a/snapshotter/processor_distributor.py +++ b/snapshotter/processor_distributor.py @@ -4,6 +4,8 @@ import json import multiprocessing import queue +import resource +from signal import SIGINT, SIGQUIT, SIGTERM, signal import time from collections import defaultdict from functools import partial @@ -115,6 +117,11 @@ def __init__(self, name, **kwargs): transport=self._async_transport, ) + def _signal_handler(self, signum, frame): + if signum in [SIGINT, SIGTERM, SIGQUIT]: + self._core_rmq_consumer.cancel() + + async def _init_redis_pool(self): self._aioredis_pool = RedisPoolCache() await self._aioredis_pool.populate() @@ -721,6 +728,13 @@ def run(self) -> None: self._logger = logger.bind( module=f'Powerloom|Callbacks|ProcessDistributor:{settings.namespace}-{settings.instance_id}', ) + soft, hard = resource.getrlimit(resource.RLIMIT_NOFILE) + resource.setrlimit( + resource.RLIMIT_NOFILE, + (settings.rlimit.file_descriptors, hard), + ) + for signame in [SIGINT, SIGTERM, SIGQUIT]: + signal(signame, self._signal_handler) asyncio.set_event_loop_policy(uvloop.EventLoopPolicy()) ev_loop = asyncio.get_event_loop() ev_loop.run_until_complete(self.init_worker()) diff --git a/snapshotter/system_event_detector.py b/snapshotter/system_event_detector.py index 8c59ed9e..8ae771dd 100644 --- a/snapshotter/system_event_detector.py +++ b/snapshotter/system_event_detector.py @@ -2,6 +2,7 @@ import json import multiprocessing import queue +import resource import signal import sys import threading @@ -326,6 +327,11 @@ async def _detect_events(self): @rabbitmq_and_redis_cleanup def run(self): + soft, hard = resource.getrlimit(resource.RLIMIT_NOFILE) + resource.setrlimit( + resource.RLIMIT_NOFILE, + (settings.rlimit.file_descriptors, hard), + ) for signame in [signal.SIGINT, signal.SIGTERM, signal.SIGQUIT]: signal.signal(signame, self._generic_exit_handler) self._rabbitmq_thread = threading.Thread( diff --git a/snapshotter/utils/aggregation_worker.py b/snapshotter/utils/aggregation_worker.py index e75e3c05..87248bc0 100644 --- a/snapshotter/utils/aggregation_worker.py +++ b/snapshotter/utils/aggregation_worker.py @@ -155,15 +155,42 @@ async def _processor_task( snapshot=snapshot, storage_flag=settings.web3storage.upload_aggregates, ) - await self._redis_conn.hset( - name=epoch_id_project_to_state_mapping( - epoch_id=msg_obj.epochId, state_id=SnapshotterStates.SNAPSHOT_BUILD.value, - ), - mapping={ - project_id: SnapshotterStateUpdate( - status='success', timestamp=int(time.time()), - ).json(), - }, + if not snapshot: + await self._redis_conn.hset( + name=epoch_id_project_to_state_mapping( + epoch_id=msg_obj.epochId, state_id=SnapshotterStates.SNAPSHOT_BUILD.value, + ), + mapping={ + project_id: SnapshotterStateUpdate( + status='failed', timestamp=int(time.time()), error='Empty snapshot', + ).json(), + }, + ) + notification_message = SnapshotterIssue( + instanceID=settings.instance_id, + issueType=SnapshotterReportState.MISSED_SNAPSHOT.value, + projectID=project_id, + epochId=str(msg_obj.epochId), + timeOfReporting=str(time.time()), + extra=json.dumps({'issueDetails': 'Error : Empty snapshot'}), + ) + await send_failure_notifications_async( + client=self._client, message=notification_message, + ) + else: + await self._redis_conn.hset( + name=epoch_id_project_to_state_mapping( + epoch_id=msg_obj.epochId, state_id=SnapshotterStates.SNAPSHOT_BUILD.value, + ), + mapping={ + project_id: SnapshotterStateUpdate( + status='success', timestamp=int(time.time()), + ).json(), + }, + ) + self._logger.debug( + 'Updated epoch processing status in aggregation worker for project {} for transition {}', + project_id, SnapshotterStates.SNAPSHOT_BUILD.value, ) await self._redis_conn.close() @@ -180,9 +207,7 @@ async def _on_rabbitmq_message(self, message: IncomingMessage): # TODO: Update based on new single project based design if task_type in self._single_project_types: try: - msg_obj: PowerloomSnapshotSubmittedMessage = ( - PowerloomSnapshotSubmittedMessage.parse_raw(message.body) - ) + msg_obj: PowerloomSnapshotSubmittedMessage = PowerloomSnapshotSubmittedMessage.parse_raw(message.body) except ValidationError as e: self._logger.opt(exception=True).error( ( diff --git a/snapshotter/utils/generic_worker.py b/snapshotter/utils/generic_worker.py index e77d10fa..a7992b2c 100644 --- a/snapshotter/utils/generic_worker.py +++ b/snapshotter/utils/generic_worker.py @@ -1,12 +1,13 @@ import asyncio import multiprocessing import resource +from signal import SIGINT, SIGQUIT, SIGTERM import time from functools import partial from typing import Dict from typing import Union from uuid import uuid4 - +from signal import signal from aio_pika import IncomingMessage from aio_pika import Message from aio_pika.pool import Pool @@ -72,6 +73,10 @@ def __init__(self, name, **kwargs): ) self._initialized = False + def _signal_handler(self, signum, frame): + if signum in [SIGINT, SIGTERM, SIGQUIT]: + self._core_rmq_consumer.cancel() + async def _rabbitmq_consumer(self, loop): self._rmq_connection_pool = Pool(get_rabbitmq_robust_connection_async, max_size=5, loop=loop) self._rmq_channel_pool = Pool( @@ -241,6 +246,8 @@ def run(self) -> None: resource.RLIMIT_NOFILE, (settings.rlimit.file_descriptors, hard), ) + for signame in [SIGINT, SIGTERM, SIGQUIT]: + signal(signame, self._signal_handler) ev_loop = asyncio.get_event_loop() self._logger.debug( f'Starting asynchronous callback worker {self._unique_id}...', diff --git a/snapshotter/utils/helper_functions.py b/snapshotter/utils/helper_functions.py index dcf473b6..1baca48f 100644 --- a/snapshotter/utils/helper_functions.py +++ b/snapshotter/utils/helper_functions.py @@ -33,49 +33,7 @@ def wrapper(self, *args, **kwargs): # for p in alive: # logger.error(f'killing process: {p.name()}') # p.kill() - logger.error('Waiting on spawned callback workers to join...') - for ( - worker_class_name, - unique_worker_entries, - ) in self._spawned_cb_processes_map.items(): - for ( - worker_unique_id, - worker_unique_process_details, - ) in unique_worker_entries.items(): - if worker_unique_process_details is not None and worker_unique_process_details.pid: - logger.error( - ( - 'Waiting on spawned callback worker {} | Unique' - ' ID {} | PID {} to join...' - ), - worker_class_name, - worker_unique_id, - worker_unique_process_details.pid, - ) - psutil.Process(pid=worker_unique_process_details.pid).wait() - - logger.error( - 'Waiting on spawned core workers to join... {}', - self._spawned_processes_map, - ) - for ( - worker_class_name, - worker_pid, - ) in self._spawned_processes_map.items(): - logger.error( - 'spawned Process Pid to wait on {}', - worker_pid, - ) - if worker_pid is not None: - logger.error( - ( - 'Waiting on spawned core worker {} | PID {} to' - ' join...' - ), - worker_class_name, - worker_pid, - ) - psutil.Process(worker_pid).wait() + self._kill_all_children() logger.error('Finished waiting for all children...now can exit.') finally: logger.error('Finished waiting for all children...now can exit.') diff --git a/snapshotter/utils/models/data_models.py b/snapshotter/utils/models/data_models.py index dc7c7711..efff4551 100644 --- a/snapshotter/utils/models/data_models.py +++ b/snapshotter/utils/models/data_models.py @@ -11,11 +11,6 @@ from snapshotter.utils.callback_helpers import GenericPreloader -class SnapshotWorkerDetails(BaseModel): - unique_name: str - pid: Union[int, None] = None - - class ProcessorWorkerDetails(BaseModel): unique_name: str pid: Union[None, int] = None @@ -34,6 +29,7 @@ class SnapshotterReportState(Enum): SUBMITTED_INCORRECT_SNAPSHOT = 'SUBMITTED_INCORRECT_SNAPSHOT' SHUTDOWN_INITIATED = 'SHUTDOWN_INITIATED' CRASHED_CHILD_WORKER = 'CRASHED_CHILD_WORKER' + UNHEALTHY_EPOCH_PROCESSING = 'UNHEALTHY_EPOCH_PROCESSING' class SnapshotterStates(Enum): @@ -52,9 +48,10 @@ class SnapshotterStateUpdate(BaseModel): class SnapshotterEpochProcessingReportItem(BaseModel): - epochId: int + epochId: int = 0 + epochEnd: int = 0 # map transition like EPOCH_RELEASED to its status - transitionStatus: Dict[str, Union[SnapshotterStateUpdate, None, Dict[str, SnapshotterStateUpdate]]] + transitionStatus: Dict[str, Union[SnapshotterStateUpdate, None, Dict[str, SnapshotterStateUpdate]]] = dict() class SnapshotterIssue(BaseModel): diff --git a/worker_process_report.py b/worker_process_report.py deleted file mode 100644 index 8a10f594..00000000 --- a/worker_process_report.py +++ /dev/null @@ -1,80 +0,0 @@ -import json -import subprocess -import redis -import psutil -import os -from snapshotter.utils.redis.redis_conn import REDIS_CONN_CONF -from snapshotter.settings.config import settings - - -def process_up(pid): - """ - Is the process up? - :return: True if process is up - """ - p_ = psutil.Process(pid) - return p_.is_running() - # try: - # return os.waitpid(pid, os.WNOHANG) is not None - # except ChildProcessError: # no child processes - # return False - # try: - # call = subprocess.check_output("pidof '{}'".format(self.processName), shell=True) - # return True - # except subprocess.CalledProcessError: - # return False - - -def main(): - connection_pool = redis.BlockingConnectionPool(**REDIS_CONN_CONF) - redis_conn = redis.Redis(connection_pool=connection_pool) - map_raw = redis_conn.hgetall( - name=f'powerloom:snapshotter:{settings.namespace}:{settings.instance_id}:Processes' - ) - event_det_pid = map_raw[b'SystemEventDetector'] - print('\n' + '=' * 20 + 'System Event Detector' + '=' * 20) - try: - event_det_pid = int(event_det_pid) - except ValueError: - print('Event detector pid found in process map not a PID: ', event_det_pid) - else: - # event_detector_proc = psutil.Process(event_det_pid) - print('Event detector process running status: ', process_up(event_det_pid)) - - print('\n' + '=' * 20 + 'Worker Processor Distributor' + '=' * 20) - proc_dist_pid = map_raw[b'ProcessorDistributor'] - try: - proc_dist_pid = int(proc_dist_pid) - except ValueError: - print('Processor distributor pid found in process map not a PID: ', proc_dist_pid) - else: - # proc_dist_proc = psutil.Process(proc_dist_pid) - print('Processor distributor process running status: ', process_up(proc_dist_pid)) - - print('\n' + '=' * 20 + 'Worker Processes' + '=' * 20) - cb_worker_map = map_raw[b'callback_workers'] - try: - cb_worker_map = json.loads(cb_worker_map) - except json.JSONDecodeError: - print('Callback worker entries in cache corrupted...', cb_worker_map) - return - for worker_type, worker_details in cb_worker_map.items(): - section_name = worker_type.capitalize() - print('\n' + '*' * 10 + section_name + '*' * 10) - if not worker_details or not isinstance(worker_details, dict): - print(f'No {section_name} workers found in process map: ', worker_details) - continue - for short_id, worker_details in worker_details.items(): - print('\n' + '-' * 5 + short_id + '-' * 5) - proc_pid = worker_details['pid'] - try: - proc_pid = int(proc_pid) - except ValueError: - print(f'Process name {worker_details["id"]} pid found in process map not a PID: ', proc_pid) - else: - # proc = psutil.Process(proc_pid) - print('Process name ' + worker_details['id'] + ' running status: ', process_up(proc_pid)) - - -if __name__ == '__main__': - main()