Skip to content

Commit

Permalink
Merge remote-tracking branch 'refs/remotes/origin/testnet_5_pairs' in…
Browse files Browse the repository at this point in the history
…to testnet_5_pairs
  • Loading branch information
xadahiya committed Sep 20, 2023
2 parents 8b759a7 + db0653b commit 61894ca
Show file tree
Hide file tree
Showing 3 changed files with 162 additions and 1 deletion.
3 changes: 2 additions & 1 deletion snapshotter/core_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -543,6 +543,7 @@ async def get_snapshotter_epoch_processing_status(
for epoch_id in range(current_epoch_id, current_epoch_id - 30 - 1, -1):
epoch_specific_report = SnapshotterEpochProcessingReportItem.construct()
epoch_specific_report.epochId = epoch_id

epoch_release_status = await redis_conn.get(
epoch_id_epoch_released_key(epoch_id=epoch_id),
)
Expand Down Expand Up @@ -572,7 +573,7 @@ async def get_snapshotter_epoch_processing_status(
epoch_processing_final_report.append(epoch_specific_report)
await redis_conn.set(
epoch_process_report_cached_key,
json.dumps(list(map(lambda x: x.json(), epoch_processing_final_report))),
json.dumps(list(map(lambda x: x.dict(), epoch_processing_final_report))),
ex=60,
)
return paginate(epoch_processing_final_report)
80 changes: 80 additions & 0 deletions snapshotter/worker_process_report.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
import json
import subprocess
import redis
import psutil
import os
from snapshotter.utils.redis.redis_conn import REDIS_CONN_CONF
from snapshotter.settings.config import settings


def process_up(pid):
"""
Is the process up?
:return: True if process is up
"""
p_ = psutil.Process(pid)
return p_.is_running()
# try:
# return os.waitpid(pid, os.WNOHANG) is not None
# except ChildProcessError: # no child processes
# return False
# try:
# call = subprocess.check_output("pidof '{}'".format(self.processName), shell=True)
# return True
# except subprocess.CalledProcessError:
# return False


def main():
connection_pool = redis.BlockingConnectionPool(**REDIS_CONN_CONF)
redis_conn = redis.Redis(connection_pool=connection_pool)
map_raw = redis_conn.hgetall(
name=f'powerloom:snapshotter:{settings.namespace}:{settings.instance_id}:Processes'
)
event_det_pid = map_raw[b'SystemEventDetector']
print('\n' + '=' * 20 + 'System Event Detector' + '=' * 20)
try:
event_det_pid = int(event_det_pid)
except ValueError:
print('Event detector pid found in process map not a PID: ', event_det_pid)
else:
# event_detector_proc = psutil.Process(event_det_pid)
print('Event detector process running status: ', process_up(event_det_pid))

print('\n' + '=' * 20 + 'Worker Processor Distributor' + '=' * 20)
proc_dist_pid = map_raw[b'ProcessorDistributor']
try:
proc_dist_pid = int(proc_dist_pid)
except ValueError:
print('Processor distributor pid found in process map not a PID: ', proc_dist_pid)
else:
# proc_dist_proc = psutil.Process(proc_dist_pid)
print('Processor distributor process running status: ', process_up(proc_dist_pid))

print('\n' + '=' * 20 + 'Worker Processes' + '=' * 20)
cb_worker_map = map_raw[b'callback_workers']
try:
cb_worker_map = json.loads(cb_worker_map)
except json.JSONDecodeError:
print('Callback worker entries in cache corrupted...', cb_worker_map)
return
for worker_type, worker_details in cb_worker_map.items():
section_name = worker_type.capitalize()
print('\n' + '*' * 10 + section_name + '*' * 10)
if not worker_details or not isinstance(worker_details, dict):
print(f'No {section_name} workers found in process map: ', worker_details)
continue
for short_id, worker_details in worker_details.items():
print('\n' + '-' * 5 + short_id + '-' * 5)
proc_pid = worker_details['pid']
try:
proc_pid = int(proc_pid)
except ValueError:
print(f'Process name {worker_details["id"]} pid found in process map not a PID: ', proc_pid)
else:
# proc = psutil.Process(proc_pid)
print('Process name ' + worker_details['id'] + ' running status: ', process_up(proc_pid))


if __name__ == '__main__':
main()
80 changes: 80 additions & 0 deletions worker_process_report.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
import json
import subprocess
import redis
import psutil
import os
from snapshotter.utils.redis.redis_conn import REDIS_CONN_CONF
from snapshotter.settings.config import settings


def process_up(pid):
"""
Is the process up?
:return: True if process is up
"""
p_ = psutil.Process(pid)
return p_.is_running()
# try:
# return os.waitpid(pid, os.WNOHANG) is not None
# except ChildProcessError: # no child processes
# return False
# try:
# call = subprocess.check_output("pidof '{}'".format(self.processName), shell=True)
# return True
# except subprocess.CalledProcessError:
# return False


def main():
connection_pool = redis.BlockingConnectionPool(**REDIS_CONN_CONF)
redis_conn = redis.Redis(connection_pool=connection_pool)
map_raw = redis_conn.hgetall(
name=f'powerloom:snapshotter:{settings.namespace}:{settings.instance_id}:Processes'
)
event_det_pid = map_raw[b'SystemEventDetector']
print('\n' + '=' * 20 + 'System Event Detector' + '=' * 20)
try:
event_det_pid = int(event_det_pid)
except ValueError:
print('Event detector pid found in process map not a PID: ', event_det_pid)
else:
# event_detector_proc = psutil.Process(event_det_pid)
print('Event detector process running status: ', process_up(event_det_pid))

print('\n' + '=' * 20 + 'Worker Processor Distributor' + '=' * 20)
proc_dist_pid = map_raw[b'ProcessorDistributor']
try:
proc_dist_pid = int(proc_dist_pid)
except ValueError:
print('Processor distributor pid found in process map not a PID: ', proc_dist_pid)
else:
# proc_dist_proc = psutil.Process(proc_dist_pid)
print('Processor distributor process running status: ', process_up(proc_dist_pid))

print('\n' + '=' * 20 + 'Worker Processes' + '=' * 20)
cb_worker_map = map_raw[b'callback_workers']
try:
cb_worker_map = json.loads(cb_worker_map)
except json.JSONDecodeError:
print('Callback worker entries in cache corrupted...', cb_worker_map)
return
for worker_type, worker_details in cb_worker_map.items():
section_name = worker_type.capitalize()
print('\n' + '*' * 10 + section_name + '*' * 10)
if not worker_details or not isinstance(worker_details, dict):
print(f'No {section_name} workers found in process map: ', worker_details)
continue
for short_id, worker_details in worker_details.items():
print('\n' + '-' * 5 + short_id + '-' * 5)
proc_pid = worker_details['pid']
try:
proc_pid = int(proc_pid)
except ValueError:
print(f'Process name {worker_details["id"]} pid found in process map not a PID: ', proc_pid)
else:
# proc = psutil.Process(proc_pid)
print('Process name ' + worker_details['id'] + ' running status: ', process_up(proc_pid))


if __name__ == '__main__':
main()

0 comments on commit 61894ca

Please sign in to comment.