diff --git a/python/dendro/sdk/_resource_utilization_log_reader.py b/python/dendro/sdk/_resource_utilization_log_reader.py index 5c9fc93..75cd5b0 100644 --- a/python/dendro/sdk/_resource_utilization_log_reader.py +++ b/python/dendro/sdk/_resource_utilization_log_reader.py @@ -22,6 +22,7 @@ def _resource_utilization_log_reader(outq: queue.Queue, exit_event: threading.Ev net_io_counters = psutil.net_io_counters(pernic=False, nowrap=True) gpu_loads = _get_gpu_loads() log_record = { + 'type': 'utilization_event', 'timestamp': time.time(), 'cpu': { 'percent': cpu_percent diff --git a/python/dendro/sdk/_run_job_parent_process.py b/python/dendro/sdk/_run_job_parent_process.py index c0d4d35..e81840b 100644 --- a/python/dendro/sdk/_run_job_parent_process.py +++ b/python/dendro/sdk/_run_job_parent_process.py @@ -1,5 +1,6 @@ import sys import os +import json from typing import Union, Optional, Dict, Any import threading import queue @@ -53,11 +54,26 @@ def _run_job_parent_process(*, job_id: str, job_private_key: str, app_executable resource_utilization_log_upload_url: Union[str, None] = None resource_utilization_log_upload_url_timestamp = 0 + # detect custom events in the console output + # of the form ##dendro-event##{...} + def on_detect_custom_event(evt): + nonlocal all_resource_utilization_log + nonlocal resource_utilization_log_changed + log_record = { + 'type': 'custom_event', + 'timestamp': time.time(), + 'event': evt + } + all_resource_utilization_log += (json.dumps(log_record) + '\n').encode('utf-8') # important to encode this string + resource_utilization_log_changed = True + custom_event_detector = CustomEventDetector(on_detect_custom_event) + def check_for_new_console_output(): nonlocal console_out_q nonlocal last_newline_index_in_console_output nonlocal all_console_output nonlocal console_output_changed + nonlocal custom_event_detector while True: try: # get the latest output from the job @@ -69,6 +85,7 @@ def check_for_new_console_output(): # handle carriage return (e.g. in progress bar) all_console_output = all_console_output[:last_newline_index_in_console_output + 1] all_console_output += x + custom_event_detector.process_bytes(x) console_output_changed = True except queue.Empty: break @@ -471,3 +488,26 @@ def _debug_log(msg: str): timestamp_str = time.strftime('%Y-%m-%d %H:%M:%S') with open('dendro-job.log', 'a', encoding='utf-8') as f: f.write(f'{timestamp_str} {msg}\n') + +# Look for the events of the form ##dendro-event##{...} in the console output +class CustomEventDetector: + def __init__(self, on_detect_custom_event): + self._on_detect_custom_event = on_detect_custom_event + self._last_line = b'' + def process_bytes(self, x: bytes): + for i in range(len(x)): + self._process_byte(x[i:i + 1]) + def _process_byte(self, x: bytes): + if x == b'\n': + self._process_line(self._last_line) + self._last_line = b'' + else: + self._last_line += x + def _process_line(self, line: bytes): + if line.startswith(b'##dendro-event##'): + evt_json = line[len(b'##dendro-event##'):] + try: + evt = json.loads(evt_json) + self._on_detect_custom_event(evt) + except: # noqa: E722 + pass diff --git a/src/pages/ProjectPage/ResourceUtilizationView/ResourceUtilizationView.tsx b/src/pages/ProjectPage/ResourceUtilizationView/ResourceUtilizationView.tsx index f726e61..d504759 100644 --- a/src/pages/ProjectPage/ResourceUtilizationView/ResourceUtilizationView.tsx +++ b/src/pages/ProjectPage/ResourceUtilizationView/ResourceUtilizationView.tsx @@ -10,6 +10,7 @@ type ResourceUtilizationViewProps = { type ResourceUtilizationLog = ResourceUtilizationLogLine[] type ResourceUtilizationLogLine = { + type: 'utilization_event', timestamp: number, cpu: { percent: number @@ -48,6 +49,10 @@ type ResourceUtilizationLogLine = { gpu: { loads: number[] } | null +} | { + type: 'custom_event', + timestamp: number, + event: any } const useResourceUtilizationLog = (job: DendroJob) => { @@ -97,9 +102,13 @@ const ResourceUtilizationView: FunctionComponent = const {resourceUtilizationLog, refreshResourceUtilizationLog} = useResourceUtilizationLog(job) const referenceTime = resourceUtilizationLog && resourceUtilizationLog.length > 0 ? resourceUtilizationLog[0].timestamp : 0 + const resourceUtilizationLogFiltered: (ResourceUtilizationLogLine & {type: 'utilization_event'})[] = useMemo(() => { + return resourceUtilizationLog.filter(l => l.type === 'utilization_event') as any + }, [resourceUtilizationLog]) + const handleDownloadCsv = useCallback(() => { const headerLine = "timestamp,cpu_percent,memory_used,memory_total,network_sent,network_received,disk_read,disk_write" - const lines = resourceUtilizationLog.map(l => { + const lines = resourceUtilizationLogFiltered.map(l => { const cpuPercent = l.cpu.percent const memoryUsed = l.virtual_memory.used / 1024 / 1024 / 1024 const memoryTotal = l.virtual_memory.total / 1024 / 1024 / 1024 @@ -117,7 +126,7 @@ const ResourceUtilizationView: FunctionComponent = a.href = url a.download = `resource_utilization_${job.jobId}.csv` a.click() - }, [resourceUtilizationLog, job]) + }, [resourceUtilizationLogFiltered, job]) const handleDownloadJsonl = useCallback(() => { const lines = resourceUtilizationLog.map(l => JSON.stringify(l)) @@ -145,7 +154,7 @@ const ResourceUtilizationView: FunctionComponent = series={[ { label: 'CPU percent', - data: resourceUtilizationLog.map(l => ({ + data: resourceUtilizationLogFiltered.map(l => ({ x: l.timestamp, y: l.cpu.percent })), @@ -160,7 +169,7 @@ const ResourceUtilizationView: FunctionComponent = series={[ { label: 'Memory used', - data: resourceUtilizationLog.map(l => ({ + data: resourceUtilizationLogFiltered.map(l => ({ x: l.timestamp, y: l.virtual_memory.used / 1024 / 1024 / 1024 })), @@ -168,7 +177,7 @@ const ResourceUtilizationView: FunctionComponent = }, { label: 'Total memory', - data: resourceUtilizationLog.map(l => ({ + data: resourceUtilizationLogFiltered.map(l => ({ x: l.timestamp, y: l.virtual_memory.total / 1024 / 1024 / 1024 })), @@ -183,7 +192,7 @@ const ResourceUtilizationView: FunctionComponent = series={[ { label: 'GPU load', - data: resourceUtilizationLog.map(l => ({ + data: resourceUtilizationLogFiltered.map(l => ({ x: l.timestamp, y: l.gpu ? l.gpu.loads.reduce((a, b) => a + b, 0) : 0 })), @@ -199,7 +208,7 @@ const ResourceUtilizationView: FunctionComponent = series={[ { label: 'Network sent', - data: resourceUtilizationLog.map(l => ({ + data: resourceUtilizationLogFiltered.map(l => ({ x: l.timestamp, y: (l.net_io_counters?.bytes_sent || 0) / 1024 / 1024 / 1024 })), @@ -207,7 +216,7 @@ const ResourceUtilizationView: FunctionComponent = }, { label: 'Network received', - data: resourceUtilizationLog.map(l => ({ + data: resourceUtilizationLogFiltered.map(l => ({ x: l.timestamp, y: (l.net_io_counters?.bytes_recv || 0) / 1024 / 1024 / 1024 })), @@ -222,7 +231,7 @@ const ResourceUtilizationView: FunctionComponent = series={[ { label: 'Network sent', - data: cumulativeToInstantaneous(resourceUtilizationLog.map(l => ({ + data: cumulativeToInstantaneous(resourceUtilizationLogFiltered.map(l => ({ x: l.timestamp, y: (l.net_io_counters?.bytes_sent || 0) / 1024 / 1024 }))), @@ -230,7 +239,7 @@ const ResourceUtilizationView: FunctionComponent = }, { label: 'Network received', - data: cumulativeToInstantaneous(resourceUtilizationLog.map(l => ({ + data: cumulativeToInstantaneous(resourceUtilizationLogFiltered.map(l => ({ x: l.timestamp, y: (l.net_io_counters?.bytes_recv || 0) / 1024 / 1024 }))), @@ -247,7 +256,7 @@ const ResourceUtilizationView: FunctionComponent = series={[ { label: 'Disk read', - data: resourceUtilizationLog.map(l => ({ + data: resourceUtilizationLogFiltered.map(l => ({ x: l.timestamp, y: (l.disk_io_counters?.read_bytes || 0) / 1024 / 1024 })), @@ -255,7 +264,7 @@ const ResourceUtilizationView: FunctionComponent = }, { label: 'Disk write', - data: resourceUtilizationLog.map(l => ({ + data: resourceUtilizationLogFiltered.map(l => ({ x: l.timestamp, y: (l.disk_io_counters?.write_bytes || 0) / 1024 / 1024 })), @@ -270,7 +279,7 @@ const ResourceUtilizationView: FunctionComponent = series={[ { label: 'Disk read', - data: cumulativeToInstantaneous(resourceUtilizationLog.map(l => ({ + data: cumulativeToInstantaneous(resourceUtilizationLogFiltered.map(l => ({ x: l.timestamp, y: (l.disk_io_counters?.read_bytes || 0) / 1024 / 1024 }))), @@ -278,7 +287,7 @@ const ResourceUtilizationView: FunctionComponent = }, { label: 'Disk write', - data: cumulativeToInstantaneous(resourceUtilizationLog.map(l => ({ + data: cumulativeToInstantaneous(resourceUtilizationLogFiltered.map(l => ({ x: l.timestamp, y: (l.disk_io_counters?.write_bytes || 0) / 1024 / 1024 }))),