Skip to content

Commit

Permalink
Merge pull request #49 from PowerLoom/feat/health-recovery
Browse files Browse the repository at this point in the history
Feat/health recovery (testnet lite mode)
  • Loading branch information
xadahiya authored Oct 2, 2023
2 parents 61894ca + 984ae27 commit 6e962f8
Show file tree
Hide file tree
Showing 11 changed files with 497 additions and 357 deletions.
22 changes: 19 additions & 3 deletions snapshotter/core_api.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import json
from threading import currentThread
from typing import List

from fastapi import Depends
Expand Down Expand Up @@ -97,7 +98,7 @@ async def startup_boilerplate():
app.state.ipfs_singleton = AsyncIPFSClientSingleton(settings.ipfs)
await app.state.ipfs_singleton.init_sessions()
app.state.ipfs_reader_client = app.state.ipfs_singleton._ipfs_read_client

app.state.epoch_size = 0

# Health check endpoint
@app.get('/health')
Expand Down Expand Up @@ -540,15 +541,30 @@ async def get_snapshotter_epoch_processing_status(
'message': f'Unable to get current epoch, error: {e}',
}
current_epoch_id = current_epoch['epochId']
if request.app.state.epoch_size == 0:
[epoch_size, ] = await request.app.state.anchor_rpc_helper.web3_call(
[request.app.state.protocol_state_contract.functions.EPOCH_SIZE()],
redis_conn=request.app.state.redis_pool,
)
rest_logger.info(f'Setting Epoch size: {epoch_size}')
request.app.state.epoch_size = epoch_size
for epoch_id in range(current_epoch_id, current_epoch_id - 30 - 1, -1):
epoch_specific_report = SnapshotterEpochProcessingReportItem.construct()
epoch_specific_report.epochId = epoch_id

epoch_release_status = await redis_conn.get(
epoch_id_epoch_released_key(epoch_id=epoch_id),
)
if not epoch_release_status:
continue
epoch_specific_report.epochId = epoch_id
if epoch_id == current_epoch_id:
epoch_specific_report.epochEnd = current_epoch['end']
else:
epoch_specific_report.epochEnd = current_epoch['end'] - (
(current_epoch_id - epoch_id) * request.app.state.epoch_size
)
rest_logger.debug(
f'Epoch End for epoch_id: {epoch_id} is {epoch_specific_report.epochEnd}',
)
epoch_specific_report.transitionStatus = dict()
if epoch_release_status:
epoch_specific_report.transitionStatus['EPOCH_RELEASED'] = SnapshotterStateUpdate(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,8 @@ async def _calculate_from_scratch(
protocol_state_contract,
project_id: str,
):
# skipping calculate from scratch for pretest
# skipping calculate from scratch for pretest and testnet participants if they are not 'full' nodes
# TODO: check against snapshotter identity on protocol state contract
self._logger.info('skipping calculate from scratch for pretest')
return

Expand Down
493 changes: 325 additions & 168 deletions snapshotter/process_hub_core.py

Large diffs are not rendered by default.

123 changes: 81 additions & 42 deletions snapshotter/processhub_cmd.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,50 +14,74 @@
app = typer.Typer()


def process_up(pid):
"""
Is the process up?
:return: True if process is up
"""
p_ = psutil.Process(pid)
return p_.is_running()
# try:
# return os.waitpid(pid, os.WNOHANG) is not None
# except ChildProcessError: # no child processes
# return False
# try:
# call = subprocess.check_output("pidof '{}'".format(self.processName), shell=True)
# return True
# except subprocess.CalledProcessError:
# return False


@app.command()
def pidStatus(connections: bool = False):
def print_formatted_status(process_name, pid):
try:
process = psutil.Process(pid=pid)
print(f'{pid} -')
print(f'\t name: {process.name()}')
print(f'\t status: {process.status()}')
print(f'\t threads: {process.num_threads()}')
print(f'\t file descriptors: {process.num_fds()}')
print(f'\t memory: {process.memory_info()}')
print(f'\t cpu: {process.cpu_times()}')
print(
f"\t number of connections: {len(process.connections(kind='inet'))}",
)
if connections:
print(
f"\t number of connections: {process.connections(kind='inet')}",
)
print('\n')
except Exception as err:
if type(err).__name__ == 'NoSuchProcess':
print(f'{pid} - NoSuchProcess')
print(f'\t name: {process_name}\n')
def processReport():
connection_pool = redis.BlockingConnectionPool(**REDIS_CONN_CONF)
redis_conn = redis.Redis(connection_pool=connection_pool)
map_raw = redis_conn.hgetall(
name=f'powerloom:snapshotter:{settings.namespace}:{settings.instance_id}:Processes',
)
event_det_pid = map_raw[b'SystemEventDetector']
print('\n' + '=' * 20 + 'System Event Detector' + '=' * 20)
try:
event_det_pid = int(event_det_pid)
except ValueError:
print('Event detector pid found in process map not a PID: ', event_det_pid)
else:
# event_detector_proc = psutil.Process(event_det_pid)
print('Event detector process running status: ', process_up(event_det_pid))

print('\n' + '=' * 20 + 'Worker Processor Distributor' + '=' * 20)
proc_dist_pid = map_raw[b'ProcessorDistributor']
try:
proc_dist_pid = int(proc_dist_pid)
except ValueError:
print('Processor distributor pid found in process map not a PID: ', proc_dist_pid)
else:
# proc_dist_proc = psutil.Process(proc_dist_pid)
print('Processor distributor process running status: ', process_up(proc_dist_pid))

print('\n' + '=' * 20 + 'Worker Processes' + '=' * 20)
cb_worker_map = map_raw[b'callback_workers']
try:
cb_worker_map = json.loads(cb_worker_map)
except json.JSONDecodeError:
print('Callback worker entries in cache corrupted...', cb_worker_map)
return
for worker_type, worker_details in cb_worker_map.items():
section_name = worker_type.capitalize()
print('\n' + '*' * 10 + section_name + '*' * 10)
if not worker_details or not isinstance(worker_details, dict):
print(f'No {section_name} workers found in process map: ', worker_details)
continue
for short_id, worker_details in worker_details.items():
print('\n' + '-' * 5 + short_id + '-' * 5)
proc_pid = worker_details['pid']
try:
proc_pid = int(proc_pid)
except ValueError:
print(f'Process name {worker_details["id"]} pid found in process map not a PID: ', proc_pid)
else:
print(f'Unknown Error: {str(err)}')

r = redis.Redis(**REDIS_CONN_CONF, single_connection_client=True)
print('\n')
for k, v in r.hgetall(
name=f'powerloom:uniswap:{settings.namespace}:{settings.instance_id}:Processes',
).items():
key = k.decode('utf-8')
value = v.decode('utf-8')

if key == 'callback_workers':
value = json.loads(value)
for i, j in value.items():
print_formatted_status(j['id'], int(j['pid']))
elif value.isdigit():
print_formatted_status(key, int(value))
else:
print(f'# Unknown type of key:{key}, value:{value}')
print('\n')
# proc = psutil.Process(proc_pid)
print('Process name ' + worker_details['id'] + ' running status: ', process_up(proc_pid))


# https://typer.tiangolo.com/tutorial/commands/context/#configuring-the-context
Expand Down Expand Up @@ -143,5 +167,20 @@ def stop(
)


@app.command()
def respawn():
c = create_rabbitmq_conn()
typer.secho('Opening RabbitMQ channel...', fg=typer.colors.GREEN)
ch = c.channel()
proc_hub_cmd = ProcessHubCommand(
command='respawn',
)
processhub_command_publish(ch, proc_hub_cmd.json())
typer.secho(
f'Sent command to ProcessHubCore | Command: {proc_hub_cmd.json()}',
fg=typer.colors.YELLOW,
)


if __name__ == '__main__':
app()
14 changes: 14 additions & 0 deletions snapshotter/processor_distributor.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
import json
import multiprocessing
import queue
import resource
from signal import SIGINT, SIGQUIT, SIGTERM, signal
import time
from collections import defaultdict
from functools import partial
Expand Down Expand Up @@ -115,6 +117,11 @@ def __init__(self, name, **kwargs):
transport=self._async_transport,
)

def _signal_handler(self, signum, frame):
if signum in [SIGINT, SIGTERM, SIGQUIT]:
self._core_rmq_consumer.cancel()


async def _init_redis_pool(self):
self._aioredis_pool = RedisPoolCache()
await self._aioredis_pool.populate()
Expand Down Expand Up @@ -721,6 +728,13 @@ def run(self) -> None:
self._logger = logger.bind(
module=f'Powerloom|Callbacks|ProcessDistributor:{settings.namespace}-{settings.instance_id}',
)
soft, hard = resource.getrlimit(resource.RLIMIT_NOFILE)
resource.setrlimit(
resource.RLIMIT_NOFILE,
(settings.rlimit.file_descriptors, hard),
)
for signame in [SIGINT, SIGTERM, SIGQUIT]:
signal(signame, self._signal_handler)
asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
ev_loop = asyncio.get_event_loop()
ev_loop.run_until_complete(self.init_worker())
Expand Down
6 changes: 6 additions & 0 deletions snapshotter/system_event_detector.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import json
import multiprocessing
import queue
import resource
import signal
import sys
import threading
Expand Down Expand Up @@ -326,6 +327,11 @@ async def _detect_events(self):

@rabbitmq_and_redis_cleanup
def run(self):
soft, hard = resource.getrlimit(resource.RLIMIT_NOFILE)
resource.setrlimit(
resource.RLIMIT_NOFILE,
(settings.rlimit.file_descriptors, hard),
)
for signame in [signal.SIGINT, signal.SIGTERM, signal.SIGQUIT]:
signal.signal(signame, self._generic_exit_handler)
self._rabbitmq_thread = threading.Thread(
Expand Down
49 changes: 37 additions & 12 deletions snapshotter/utils/aggregation_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -155,15 +155,42 @@ async def _processor_task(
snapshot=snapshot,
storage_flag=settings.web3storage.upload_aggregates,
)
await self._redis_conn.hset(
name=epoch_id_project_to_state_mapping(
epoch_id=msg_obj.epochId, state_id=SnapshotterStates.SNAPSHOT_BUILD.value,
),
mapping={
project_id: SnapshotterStateUpdate(
status='success', timestamp=int(time.time()),
).json(),
},
if not snapshot:
await self._redis_conn.hset(
name=epoch_id_project_to_state_mapping(
epoch_id=msg_obj.epochId, state_id=SnapshotterStates.SNAPSHOT_BUILD.value,
),
mapping={
project_id: SnapshotterStateUpdate(
status='failed', timestamp=int(time.time()), error='Empty snapshot',
).json(),
},
)
notification_message = SnapshotterIssue(
instanceID=settings.instance_id,
issueType=SnapshotterReportState.MISSED_SNAPSHOT.value,
projectID=project_id,
epochId=str(msg_obj.epochId),
timeOfReporting=str(time.time()),
extra=json.dumps({'issueDetails': 'Error : Empty snapshot'}),
)
await send_failure_notifications_async(
client=self._client, message=notification_message,
)
else:
await self._redis_conn.hset(
name=epoch_id_project_to_state_mapping(
epoch_id=msg_obj.epochId, state_id=SnapshotterStates.SNAPSHOT_BUILD.value,
),
mapping={
project_id: SnapshotterStateUpdate(
status='success', timestamp=int(time.time()),
).json(),
},
)
self._logger.debug(
'Updated epoch processing status in aggregation worker for project {} for transition {}',
project_id, SnapshotterStates.SNAPSHOT_BUILD.value,
)
await self._redis_conn.close()

Expand All @@ -180,9 +207,7 @@ async def _on_rabbitmq_message(self, message: IncomingMessage):
# TODO: Update based on new single project based design
if task_type in self._single_project_types:
try:
msg_obj: PowerloomSnapshotSubmittedMessage = (
PowerloomSnapshotSubmittedMessage.parse_raw(message.body)
)
msg_obj: PowerloomSnapshotSubmittedMessage = PowerloomSnapshotSubmittedMessage.parse_raw(message.body)
except ValidationError as e:
self._logger.opt(exception=True).error(
(
Expand Down
9 changes: 8 additions & 1 deletion snapshotter/utils/generic_worker.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
import asyncio
import multiprocessing
import resource
from signal import SIGINT, SIGQUIT, SIGTERM
import time
from functools import partial
from typing import Dict
from typing import Union
from uuid import uuid4

from signal import signal
from aio_pika import IncomingMessage
from aio_pika import Message
from aio_pika.pool import Pool
Expand Down Expand Up @@ -72,6 +73,10 @@ def __init__(self, name, **kwargs):
)
self._initialized = False

def _signal_handler(self, signum, frame):
if signum in [SIGINT, SIGTERM, SIGQUIT]:
self._core_rmq_consumer.cancel()

async def _rabbitmq_consumer(self, loop):
self._rmq_connection_pool = Pool(get_rabbitmq_robust_connection_async, max_size=5, loop=loop)
self._rmq_channel_pool = Pool(
Expand Down Expand Up @@ -241,6 +246,8 @@ def run(self) -> None:
resource.RLIMIT_NOFILE,
(settings.rlimit.file_descriptors, hard),
)
for signame in [SIGINT, SIGTERM, SIGQUIT]:
signal(signame, self._signal_handler)
ev_loop = asyncio.get_event_loop()
self._logger.debug(
f'Starting asynchronous callback worker {self._unique_id}...',
Expand Down
44 changes: 1 addition & 43 deletions snapshotter/utils/helper_functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,49 +33,7 @@ def wrapper(self, *args, **kwargs):
# for p in alive:
# logger.error(f'killing process: {p.name()}')
# p.kill()
logger.error('Waiting on spawned callback workers to join...')
for (
worker_class_name,
unique_worker_entries,
) in self._spawned_cb_processes_map.items():
for (
worker_unique_id,
worker_unique_process_details,
) in unique_worker_entries.items():
if worker_unique_process_details is not None and worker_unique_process_details.pid:
logger.error(
(
'Waiting on spawned callback worker {} | Unique'
' ID {} | PID {} to join...'
),
worker_class_name,
worker_unique_id,
worker_unique_process_details.pid,
)
psutil.Process(pid=worker_unique_process_details.pid).wait()

logger.error(
'Waiting on spawned core workers to join... {}',
self._spawned_processes_map,
)
for (
worker_class_name,
worker_pid,
) in self._spawned_processes_map.items():
logger.error(
'spawned Process Pid to wait on {}',
worker_pid,
)
if worker_pid is not None:
logger.error(
(
'Waiting on spawned core worker {} | PID {} to'
' join...'
),
worker_class_name,
worker_pid,
)
psutil.Process(worker_pid).wait()
self._kill_all_children()
logger.error('Finished waiting for all children...now can exit.')
finally:
logger.error('Finished waiting for all children...now can exit.')
Expand Down
Loading

0 comments on commit 6e962f8

Please sign in to comment.