From 8a501a21e146b3e005b4fc044f8e64945c16c178 Mon Sep 17 00:00:00 2001 From: jist <95856749+george0st@users.noreply.github.com> Date: Thu, 24 Oct 2024 12:40:30 +0200 Subject: [PATCH 1/6] Output refactoring --- qgate_perf/parallel_executor.py | 384 ++++++++++++++++---------------- 1 file changed, 197 insertions(+), 187 deletions(-) diff --git a/qgate_perf/parallel_executor.py b/qgate_perf/parallel_executor.py index 13a1c2c..caa24a2 100644 --- a/qgate_perf/parallel_executor.py +++ b/qgate_perf/parallel_executor.py @@ -31,90 +31,94 @@ def _executor_wrapper(func, run_return: RunReturn, run_setup: RunSetup): except Exception as ex: run_return.probe=ParallelProbe(None, f"{type(ex).__name__}: {str(ex)}") +class ParallelOutput: -class ParallelExecutor: - """ Helper for parallel execution of defined function (via start new process with aim to avoid GIL) """ - - def __init__(self, - func, - label = None, - detail_output = True, - output_file = None, - init_each_bulk = False): - """ Setting of execution - - :param func: function for parallel run in format see 'def my_func(run_setup: RunSetup) -> ParallelProbe:' - :param label: text label for parallel run - :param detail_output: provide detailed output from visualization of time, when executor was started - see usage in method create_graph_exec, default is True - :param output_file: output to the file, default is without file - :param init_each_bulk: call 'init_run' before each bulk (useful e.g. change amount of columns in target), - default is False - """ - self._func = func - self._func_wrapper = _executor_wrapper - self._detail_output = detail_output + def __init__(self, label = None, detail_output = True, output_file = None): self._label = label + self._detail_output = detail_output self._output_file = output_file - self._init_each_bulk = init_each_bulk + self._file = None - # Technical point, how to close Process - # in python >= 3.7 Close() as soft closing - # in python < 3.7 Terminate() as hard closing (the Close method does not exist in python lower versions) - self._process_close=True if version.parse(python_version()) >= version.parse("3.7") else False + def open(self): + if self._output_file is not None: + self._file = self._open_output() - # region CORE + def close(self): + if self._file is not None: + self._file.close() + self._file = None - def _coreThreadClassPool(self, threads, return_key, return_dict, run_setup: RunSetup): - try: - with concurrent.futures.ThreadPoolExecutor(max_workers=threads) as executor: - features = [] - for threadKey in range(threads): - run_return=RunReturn(f"{return_key}x{threadKey}", return_dict) - features.append( - executor.submit(self._func_wrapper, self._func, run_return, run_setup)) + def _create_percentile_list(self, run_setup: RunSetup, return_dict): - for future in concurrent.futures.as_completed(features): - future.result() - except Exception as ex: - print(f"SYSTEM ERROR in '_coreThreadClassPool': {type(ex).__name__} - '{str(ex)}'") + percentile_list = {} - def _executeCore(self, run_setup: RunSetup, return_dict, processes=2, threads=2): + # pre-calculation + # iteration cross executors results + for return_key in return_dict: + response = return_dict[return_key] + if response: + if response.exception is None: + # iteration cross all percentiles + for result in response.percentile_results: + if result.count > 0: + # sum of average time for one call + if percentile_list.get(result.percentile, None) is None: + percentile_list[result.percentile] = PercentileSummary(result.percentile, + result.count, + 0, + 0, + result.total_duration / result.count, + result.std, + result.min, + result.max, + 1) + else: + itm = percentile_list[result.percentile] + itm.count += result.count + itm.avrg += result.total_duration / result.count + itm.std += result.std + itm.min = min(result.min, itm.min) + itm.max = max(result.max, itm.max) + itm.executors += 1 + else: + if percentile_list.get(result.percentile, None) is None: + percentile_list[result.percentile] = PercentileSummary(result.percentile, + 0, + 0, + 0, + 0, + 0, + ParallelProbe.MIN_DURATION, + 0, + 0) - proc = [] - # define synch time for run of all executors - run_setup.set_start_time() + # define percentile, if not exist + # if 100 percentile does not exist, create it + if percentile_list.get(1, None) is None: + percentile_list[1] = PercentileSummary(1,0,0, 0, 0,0, 0, 0,0) - try: - if threads == 1: - for process_key in range(processes): - run_return = RunReturn(process_key, return_dict) - p = multiprocessing.Process(target=self._func_wrapper, - args=(self._func, run_return, run_setup)) - proc.append(p) - else: - for process_key in range(processes): - p = multiprocessing.Process(target=self._coreThreadClassPool, - args=(threads, process_key, return_dict, run_setup)) - proc.append(p) + # if expected percentile does not exist, create it + if run_setup.exist("percentile"): + if percentile_list.get(run_setup["percentile"], None) is None: + percentile_list[run_setup["percentile"]] = PercentileSummary(run_setup["percentile"], 0, 0, 0, 0, 0, 0, 0, 0) - # start - for p in proc: - p.start() + # final calculation + for percentile in percentile_list.values(): + if percentile.executors > 0: + # Calc clarification (for better understanding): + # avrg / count = average time for one executor (average is cross all calls and executors) + # 1 / (avrg/count) = average amount of calls per one second (cross executors) + percentile.call_per_sec_raw = 0 if (percentile.avrg / percentile.executors) == 0 else (1 / (percentile.avrg / percentile.executors)) * percentile.executors + percentile.call_per_sec = percentile.call_per_sec_raw * run_setup._bulk_row - # wait for finish - for p in proc: - p.join() - if self._process_close: - p.close() # soft close - else: - p.terminate() # hard close - except Exception as ex: - print(f"SYSTEM ERROR in '_executeCore': '{str(ex)}'") + percentile.avrg = 0 if percentile.executors == 0 else percentile.avrg / percentile.executors + percentile.std = 0 if percentile.executors == 0 else percentile.std / percentile.executors + else: + percentile.min = 0 + percentile.max = 0 - # endregion CORE + return percentile_list - # region PRINT OUTPUT def _open_output(self): dirname = os.path.dirname(self._output_file) if dirname: @@ -122,18 +126,18 @@ def _open_output(self): os.makedirs(dirname, mode=0o777) return open(self._output_file, 'a') - def _print(self, file, out: str, readable_out: str = None): + def print(self, out: str, readable_out: str = None): # print to the file 'out' - if file is not None: - file.write(out + "\n") + if self._file is not None: + self._file.write(out + "\n") # print to the console 'readable_out' or 'out' print(readable_out if readable_out else out) - def _print_header(self, file, run_setup: RunSetup=None): + def print_header(self, run_setup: RunSetup=None): self._start_tasks = datetime.utcnow() - self._print(file, f"############### {self._start_tasks.isoformat(' ')} ###############") + self.print(f"############### {self._start_tasks.isoformat(' ')} ###############") total, free = get_memory() out = {} out[FileMarker.PRF_TYPE] = FileMarker.PRF_HDR_TYPE @@ -157,19 +161,16 @@ def _print_header(self, file, run_setup: RunSetup=None): readable_out[FileMarker.PRF_HDR_AVIALABLE_CPU] = multiprocessing.cpu_count() readable_out[FileMarker.HR_PRF_HDR_MEMORY] = f"{total}/{free}" - - self._print(file, - dumps(out, separators=OutputSetup().json_separator), + self.print(dumps(out, separators=OutputSetup().json_separator), dumps(readable_out, separators = OutputSetup().human_json_separator)) - def _print_footer(self, file, final_state): + def print_footer(self, final_state): seconds = round((datetime.utcnow() - self._start_tasks).total_seconds(), 1) - self._print(file, - f"############### State: {'OK' if final_state else 'Error'}, " + self.print(f"############### State: {'OK' if final_state else 'Error'}, " f" Duration: {get_readable_duration(seconds)} ({seconds}" f" seconds) ###############") - def _print_detail(self, file, run_setup: RunSetup, return_dict, processes, threads, group=''): + def print_detail(self, run_setup: RunSetup, return_dict, processes, threads, group=''): """ Print detail from executors @@ -184,9 +185,8 @@ def _print_detail(self, file, run_setup: RunSetup, return_dict, processes, threa if self._detail_output == True: for return_key in return_dict: parallel_ret = return_dict[return_key] - self._print(file, - f" {str(parallel_ret) if parallel_ret else ParallelProbe.dump_error('SYSTEM overloaded')}", - f" {parallel_ret.readable_str() if parallel_ret else ParallelProbe.readable_dump_error('SYSTEM overloaded')}") + self.print(f" {str(parallel_ret) if parallel_ret else ParallelProbe.dump_error('SYSTEM overloaded')}", + f" {parallel_ret.readable_str() if parallel_ret else ParallelProbe.readable_dump_error('SYSTEM overloaded')}") # new calculation percentile_list = self._create_percentile_list(run_setup, return_dict) @@ -229,15 +229,96 @@ def _print_detail(self, file, run_setup: RunSetup, return_dict, processes, threa readable_out[FileMarker.PRF_CORE_MAX + suffix] = round(result.max, OutputSetup().human_precision) # final dump - self._print(file, - f" {dumps(out, separators = OutputSetup().json_separator)}", + self.print(f" {dumps(out, separators = OutputSetup().json_separator)}", f" {dumps(readable_out, separators = OutputSetup().human_json_separator)}") return percentile_list - # endregion PRINT OUTPUT - def _check_state(self, return_dict): +class ParallelExecutor: + """ Helper for parallel execution of defined function (via start new process with aim to avoid GIL) """ + + def __init__(self, + func, + label = None, + detail_output = True, + output_file = None, + init_each_bulk = False): + """ Setting of execution + + :param func: function for parallel run in format see 'def my_func(run_setup: RunSetup) -> ParallelProbe:' + :param label: text label for parallel run + :param detail_output: provide detailed output from visualization of time, when executor was started + see usage in method create_graph_exec, default is True + :param output_file: output to the file, default is without file + :param init_each_bulk: call 'init_run' before each bulk (useful e.g. change amount of columns in target), + default is False + """ + self._func = func + self._func_wrapper = _executor_wrapper + self._init_each_bulk = init_each_bulk + + self._label = label + self._detail_output = detail_output + self._output_file = output_file + + # Technical point, how to close Process + # in python >= 3.7 Close() as soft closing + # in python < 3.7 Terminate() as hard closing (the Close method does not exist in python lower versions) + self._process_close=True if version.parse(python_version()) >= version.parse("3.7") else False + + # region CORE + + def _coreThreadClassPool(self, threads, return_key, return_dict, run_setup: RunSetup): + try: + with concurrent.futures.ThreadPoolExecutor(max_workers=threads) as executor: + features = [] + for threadKey in range(threads): + run_return=RunReturn(f"{return_key}x{threadKey}", return_dict) + features.append( + executor.submit(self._func_wrapper, self._func, run_return, run_setup)) + + for future in concurrent.futures.as_completed(features): + future.result() + except Exception as ex: + print(f"SYSTEM ERROR in '_coreThreadClassPool': {type(ex).__name__} - '{str(ex)}'") + + def _executeCore(self, run_setup: RunSetup, return_dict, processes=2, threads=2): + + proc = [] + # define synch time for run of all executors + run_setup.set_start_time() + + try: + if threads == 1: + for process_key in range(processes): + run_return = RunReturn(process_key, return_dict) + p = multiprocessing.Process(target=self._func_wrapper, + args=(self._func, run_return, run_setup)) + proc.append(p) + else: + for process_key in range(processes): + p = multiprocessing.Process(target=self._coreThreadClassPool, + args=(threads, process_key, return_dict, run_setup)) + proc.append(p) + + # start + for p in proc: + p.start() + + # wait for finish + for p in proc: + p.join() + if self._process_close: + p.close() # soft close + else: + p.terminate() # hard close + except Exception as ex: + print(f"SYSTEM ERROR in '_executeCore': '{str(ex)}'") + + # endregion CORE + + def _get_summary_state(self, return_dict): """ Check, if the processing was fine based on check exception in each executor @@ -252,78 +333,6 @@ def _check_state(self, return_dict): return False return True - def _create_percentile_list(self, run_setup: RunSetup, return_dict): - - percentile_list = {} - - # pre-calculation - # iteration cross executors results - for return_key in return_dict: - response = return_dict[return_key] - if response: - if response.exception is None: - # iteration cross all percentiles - for result in response.percentile_results: - if result.count > 0: - # sum of average time for one call - if percentile_list.get(result.percentile, None) is None: - percentile_list[result.percentile] = PercentileSummary(result.percentile, - result.count, - 0, - 0, - result.total_duration / result.count, - result.std, - result.min, - result.max, - 1) - else: - itm = percentile_list[result.percentile] - itm.count += result.count - itm.avrg += result.total_duration / result.count - itm.std += result.std - itm.min = min(result.min, itm.min) - itm.max = max(result.max, itm.max) - itm.executors += 1 - else: - if percentile_list.get(result.percentile, None) is None: - percentile_list[result.percentile] = PercentileSummary(result.percentile, - 0, - 0, - 0, - 0, - 0, - ParallelProbe.MIN_DURATION, - 0, - 0) - - # define percentile, if not exist - # if 100 percentile does not exist, create it - if percentile_list.get(1, None) is None: - percentile_list[1] = PercentileSummary(1,0,0, 0, 0,0, 0, 0,0) - - # if expected percentile does not exist, create it - if run_setup.exist("percentile"): - if percentile_list.get(run_setup["percentile"], None) is None: - percentile_list[run_setup["percentile"]] = PercentileSummary(run_setup["percentile"], 0, 0, 0, 0, 0, 0, 0, 0) - - # final calculation - for percentile in percentile_list.values(): - if percentile.executors > 0: - # Calc clarification (for better understanding): - # avrg / count = average time for one executor (average is cross all calls and executors) - # 1 / (avrg/count) = average amount of calls per one second (cross executors) - percentile.call_per_sec_raw = 0 if (percentile.avrg / percentile.executors) == 0 else (1 / (percentile.avrg / percentile.executors)) * percentile.executors - percentile.call_per_sec = percentile.call_per_sec_raw * run_setup._bulk_row - - percentile.avrg = 0 if percentile.executors == 0 else percentile.avrg / percentile.executors - percentile.std = 0 if percentile.executors == 0 else percentile.std / percentile.executors - else: - percentile.min = 0 - percentile.max = 0 - - return percentile_list - - # region RUN's def run_bulk_executor(self, @@ -377,7 +386,8 @@ def run_executor(self, executor_list = ExecutorHelper.PROCESS_2_8_THREAD_1_4_SHO True - all executions was without exceptions, False - some exceptions. """ performance = PerfResults() - file = None + #file = None + output = ParallelOutput(self._label, self._detail_output, self._output_file) print('Execution...') @@ -385,25 +395,25 @@ def run_executor(self, executor_list = ExecutorHelper.PROCESS_2_8_THREAD_1_4_SHO if self._init_each_bulk: self.init_run(run_setup) - if self._output_file is not None: - file=self._open_output() - - self._print_header(file, run_setup) + # if self._output_file is not None: + # file=self._open_output() + output.open() + output.print_header(run_setup) + #self._print_header(file, run_setup) for executors in executor_list: # execution with multiprocessing.Manager() as manager: return_dict = manager.dict() self._executeCore(run_setup, return_dict, executors[0], executors[1]) - percentile_list = self._print_detail(file, - run_setup, - return_dict, - executors[0], - executors[1], - '' if len(executors) <= 2 else executors[2]) + percentile_list = output.print_detail(run_setup, + return_dict, + executors[0], + executors[1], + '' if len(executors) <= 2 else executors[2]) # check state - state = self._check_state(return_dict) + state = self._get_summary_state(return_dict) if performance_detail: performance.append(PerfResult(state, run_setup.bulk_row, @@ -417,14 +427,13 @@ def run_executor(self, executor_list = ExecutorHelper.PROCESS_2_8_THREAD_1_4_SHO # memory clean gc.collect(generation = 2) - self._print_footer(file, performance.state) + output.print_footer(performance.state) except Exception as ex: - self._print(file,f"SYSTEM ERROR in 'run_executor': {type(ex).__name__} - '{str(ex) if ex is not None else '!! Noname exception !!'}'") + output.print(f"SYSTEM ERROR in 'run_executor': {type(ex).__name__} - '{str(ex) if ex is not None else '!! Noname exception !!'}'") performance.add_state(False) finally: - if file is not None: - file.close() + output.close() return performance @@ -439,7 +448,8 @@ def run(self, processes = 2, threads = 2, run_setup: RunSetup = None, performanc True - all executions was without exceptions, False - some exceptions. """ performance = PerfResults() - file = None + output = ParallelOutput(self._label, self._detail_output, self._output_file) + #file = None print('Execution...') @@ -447,19 +457,20 @@ def run(self, processes = 2, threads = 2, run_setup: RunSetup = None, performanc if self._init_each_bulk: self.init_run(run_setup) - if self._output_file is not None: - file = self._open_output() + # if self._output_file is not None: + # file = self._open_output() - self._print_header(file, run_setup) + output.open() + output.print_header(run_setup) # Execution with multiprocessing.Manager() as manager: return_dict = manager.dict() self._executeCore(run_setup, return_dict, processes, threads) - percentile_list = self._print_detail(file, run_setup, return_dict, processes, threads) + percentile_list = output.print_detail(run_setup, return_dict, processes, threads) # check state - state = self._check_state(return_dict) + state = self._get_summary_state(return_dict) if performance_detail: performance.append(PerfResult(state, run_setup.bulk_row, @@ -471,14 +482,13 @@ def run(self, processes = 2, threads = 2, run_setup: RunSetup = None, performanc performance.add_state(state) gc.collect(generation = 2) - self._print_footer(file, performance.state) + output.print_footer(performance.state) except Exception as e: - self._print(file, f"SYSTEM ERROR in 'run': '{str(e) if e is not None else '!! Noname exception !!'}'") + output.print(f"SYSTEM ERROR in 'run': '{str(e) if e is not None else '!! Noname exception !!'}'") performance.add_state(False) finally: - if file is not None: - file.close() + output.close() return performance From d356bcc9c02859d4b94366ae16618361857ea0ca Mon Sep 17 00:00:00 2001 From: jist <95856749+george0st@users.noreply.github.com> Date: Thu, 24 Oct 2024 12:57:32 +0200 Subject: [PATCH 2/6] Cleaning --- qgate_perf/parallel_executor.py | 14 ++++---------- 1 file changed, 4 insertions(+), 10 deletions(-) diff --git a/qgate_perf/parallel_executor.py b/qgate_perf/parallel_executor.py index caa24a2..ee78e9d 100644 --- a/qgate_perf/parallel_executor.py +++ b/qgate_perf/parallel_executor.py @@ -39,7 +39,6 @@ def __init__(self, label = None, detail_output = True, output_file = None): self._output_file = output_file self._file = None - def open(self): if self._output_file is not None: self._file = self._open_output() @@ -386,8 +385,7 @@ def run_executor(self, executor_list = ExecutorHelper.PROCESS_2_8_THREAD_1_4_SHO True - all executions was without exceptions, False - some exceptions. """ performance = PerfResults() - #file = None - output = ParallelOutput(self._label, self._detail_output, self._output_file) + output = None print('Execution...') @@ -395,11 +393,8 @@ def run_executor(self, executor_list = ExecutorHelper.PROCESS_2_8_THREAD_1_4_SHO if self._init_each_bulk: self.init_run(run_setup) - # if self._output_file is not None: - # file=self._open_output() - output.open() + output = ParallelOutput(self._label, self._detail_output, self._output_file) output.print_header(run_setup) - #self._print_header(file, run_setup) for executors in executor_list: # execution @@ -448,8 +443,7 @@ def run(self, processes = 2, threads = 2, run_setup: RunSetup = None, performanc True - all executions was without exceptions, False - some exceptions. """ performance = PerfResults() - output = ParallelOutput(self._label, self._detail_output, self._output_file) - #file = None + output = None print('Execution...') @@ -460,7 +454,7 @@ def run(self, processes = 2, threads = 2, run_setup: RunSetup = None, performanc # if self._output_file is not None: # file = self._open_output() - output.open() + output = ParallelOutput(self._label, self._detail_output, self._output_file) output.print_header(run_setup) # Execution From 460e5040058c750532ae668ca32db2006ac9bc55 Mon Sep 17 00:00:00 2001 From: jist <95856749+george0st@users.noreply.github.com> Date: Thu, 24 Oct 2024 13:36:01 +0200 Subject: [PATCH 3/6] Update parallel_executor.py --- qgate_perf/parallel_executor.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/qgate_perf/parallel_executor.py b/qgate_perf/parallel_executor.py index ee78e9d..3e64183 100644 --- a/qgate_perf/parallel_executor.py +++ b/qgate_perf/parallel_executor.py @@ -39,6 +39,7 @@ def __init__(self, label = None, detail_output = True, output_file = None): self._output_file = output_file self._file = None + def open(self): if self._output_file is not None: self._file = self._open_output() @@ -394,6 +395,7 @@ def run_executor(self, executor_list = ExecutorHelper.PROCESS_2_8_THREAD_1_4_SHO self.init_run(run_setup) output = ParallelOutput(self._label, self._detail_output, self._output_file) + output.open() output.print_header(run_setup) for executors in executor_list: @@ -455,6 +457,7 @@ def run(self, processes = 2, threads = 2, run_setup: RunSetup = None, performanc # file = self._open_output() output = ParallelOutput(self._label, self._detail_output, self._output_file) + output.open() output.print_header(run_setup) # Execution From 5e50a7648f646e66e54d93e5879fece0ff95da8b Mon Sep 17 00:00:00 2001 From: jist <95856749+george0st@users.noreply.github.com> Date: Thu, 24 Oct 2024 13:50:58 +0200 Subject: [PATCH 4/6] ... --- qgate_perf/output_result.py | 214 ++++++++++++++++++++++++++++++- qgate_perf/parallel_executor.py | 218 +------------------------------- 2 files changed, 216 insertions(+), 216 deletions(-) diff --git a/qgate_perf/output_result.py b/qgate_perf/output_result.py index 6407b00..1a0e555 100644 --- a/qgate_perf/output_result.py +++ b/qgate_perf/output_result.py @@ -1,5 +1,12 @@ -from qgate_perf.parallel_probe import PercentileSummary - +import multiprocessing +import os.path +from json import dumps +from datetime import datetime +from qgate_perf.file_marker import FileMarker +from qgate_perf.run_setup import RunSetup +from qgate_perf.helper import get_host, get_memory, get_readable_duration +from qgate_perf.parallel_probe import ParallelProbe, PercentileSummary +from qgate_perf.output_setup import OutputSetup class PerfResult: """Output from one performance test""" @@ -35,7 +42,6 @@ def __str__(self): return (f"bundle ({self.bundle_row}x{self.bundle_col}), executor ({self.executor_process}x{self.executor_thread})" f" = {self.percentiles[0].call_per_sec}") - class PerfResults(list): """Output from all performance tests""" @@ -72,3 +78,205 @@ def __str__(self): # TODO: improve output pass +class OutputResults: + + def __init__(self, label = None, detail_output = True, output_file = None): + self._label = label + self._detail_output = detail_output + self._output_file = output_file + self._file = None + + def open(self): + if self._output_file is not None: + self._file = self._open_output() + + def close(self): + if self._file is not None: + self._file.close() + self._file = None + + def _create_percentile_list(self, run_setup: RunSetup, return_dict): + + percentile_list = {} + + # pre-calculation + # iteration cross executors results + for return_key in return_dict: + response = return_dict[return_key] + if response: + if response.exception is None: + # iteration cross all percentiles + for result in response.percentile_results: + if result.count > 0: + # sum of average time for one call + if percentile_list.get(result.percentile, None) is None: + percentile_list[result.percentile] = PercentileSummary(result.percentile, + result.count, + 0, + 0, + result.total_duration / result.count, + result.std, + result.min, + result.max, + 1) + else: + itm = percentile_list[result.percentile] + itm.count += result.count + itm.avrg += result.total_duration / result.count + itm.std += result.std + itm.min = min(result.min, itm.min) + itm.max = max(result.max, itm.max) + itm.executors += 1 + else: + if percentile_list.get(result.percentile, None) is None: + percentile_list[result.percentile] = PercentileSummary(result.percentile, + 0, + 0, + 0, + 0, + 0, + ParallelProbe.MIN_DURATION, + 0, + 0) + + # define percentile, if not exist + # if 100 percentile does not exist, create it + if percentile_list.get(1, None) is None: + percentile_list[1] = PercentileSummary(1,0,0, 0, 0,0, 0, 0,0) + + # if expected percentile does not exist, create it + if run_setup.exist("percentile"): + if percentile_list.get(run_setup["percentile"], None) is None: + percentile_list[run_setup["percentile"]] = PercentileSummary(run_setup["percentile"], 0, 0, 0, 0, 0, 0, 0, 0) + + # final calculation + for percentile in percentile_list.values(): + if percentile.executors > 0: + # Calc clarification (for better understanding): + # avrg / count = average time for one executor (average is cross all calls and executors) + # 1 / (avrg/count) = average amount of calls per one second (cross executors) + percentile.call_per_sec_raw = 0 if (percentile.avrg / percentile.executors) == 0 else (1 / (percentile.avrg / percentile.executors)) * percentile.executors + percentile.call_per_sec = percentile.call_per_sec_raw * run_setup._bulk_row + + percentile.avrg = 0 if percentile.executors == 0 else percentile.avrg / percentile.executors + percentile.std = 0 if percentile.executors == 0 else percentile.std / percentile.executors + else: + percentile.min = 0 + percentile.max = 0 + + return percentile_list + + def _open_output(self): + dirname = os.path.dirname(self._output_file) + if dirname: + if not os.path.exists(dirname): + os.makedirs(dirname, mode=0o777) + return open(self._output_file, 'a') + + def print(self, out: str, readable_out: str = None): + + # print to the file 'out' + if self._file is not None: + self._file.write(out + "\n") + + # print to the console 'readable_out' or 'out' + print(readable_out if readable_out else out) + + def print_header(self, run_setup: RunSetup=None): + self._start_tasks = datetime.utcnow() + self.print(f"############### {self._start_tasks.isoformat(' ')} ###############") + total, free = get_memory() + out = {} + out[FileMarker.PRF_TYPE] = FileMarker.PRF_HDR_TYPE + out[FileMarker.PRF_HDR_LABEL] = self._label if self._label is not None else "Noname" + out[FileMarker.PRF_HDR_BULK] = [run_setup._bulk_row, run_setup._bulk_col] + out[FileMarker.PRF_HDR_DURATION] = run_setup._duration_second + if run_setup.exist('percentile'): + out[FileMarker.PRF_HDR_PERCENTILE] = run_setup['percentile'] + out[FileMarker.PRF_HDR_AVIALABLE_CPU] = multiprocessing.cpu_count() + out[FileMarker.PRF_HDR_MEMORY] = total + out[FileMarker.PRF_HDR_MEMORY_FREE] = free + out[FileMarker.PRF_HDR_HOST] = get_host() + out[FileMarker.PRF_HDR_NOW] = self._start_tasks.isoformat(' ') + + readable_out = {} + readable_out[FileMarker.HR_PRF_HDR_LABEL] = self._label if self._label is not None else "Noname" + readable_out[FileMarker.PRF_HDR_BULK] = [run_setup._bulk_row, run_setup._bulk_col] + readable_out[FileMarker.PRF_HDR_DURATION] = run_setup._duration_second + if run_setup.exist('percentile'): + readable_out[FileMarker.HR_PRF_HDR_PERCENTILE] = run_setup['percentile'] + readable_out[FileMarker.PRF_HDR_AVIALABLE_CPU] = multiprocessing.cpu_count() + readable_out[FileMarker.HR_PRF_HDR_MEMORY] = f"{total}/{free}" + + self.print(dumps(out, separators=OutputSetup().json_separator), + dumps(readable_out, separators = OutputSetup().human_json_separator)) + + def print_footer(self, final_state): + seconds = round((datetime.utcnow() - self._start_tasks).total_seconds(), 1) + self.print(f"############### State: {'OK' if final_state else 'Error'}, " + f" Duration: {get_readable_duration(seconds)} ({seconds}" + f" seconds) ###############") + + def print_detail(self, run_setup: RunSetup, return_dict, processes, threads, group=''): + """ + Print detail from executors + + :param file: Output stream for print + :param run_setup: Setting for executors + :param return_dict: Return values from executors + :param processes: Number of processes + :param threads: Number of threads + :param group: Name of group + :return: Performance, total calls per one second + """ + if self._detail_output == True: + for return_key in return_dict: + parallel_ret = return_dict[return_key] + self.print(f" {str(parallel_ret) if parallel_ret else ParallelProbe.dump_error('SYSTEM overloaded')}", + f" {parallel_ret.readable_str() if parallel_ret else ParallelProbe.readable_dump_error('SYSTEM overloaded')}") + + # new calculation + percentile_list = self._create_percentile_list(run_setup, return_dict) + + # A2A form + out = {} + out[FileMarker.PRF_TYPE] = FileMarker.PRF_CORE_TYPE + out[FileMarker.PRF_CORE_PLAN_EXECUTOR_ALL] = processes * threads + out[FileMarker.PRF_CORE_PLAN_EXECUTOR] = [processes, threads] + out[FileMarker.PRF_CORE_REAL_EXECUTOR] = percentile_list[1].executors #executors + out[FileMarker.PRF_CORE_GROUP] = group + for result in percentile_list.values(): + suffix = f"_{int(result.percentile * 100)}" if result.percentile < 1 else "" + out[FileMarker.PRF_CORE_TOTAL_CALL + suffix] = result.count # ok + out[FileMarker.PRF_CORE_TOTAL_CALL_PER_SEC_RAW + suffix] = result.call_per_sec_raw # ok + out[FileMarker.PRF_CORE_TOTAL_CALL_PER_SEC + suffix] = result.call_per_sec # ok + out[FileMarker.PRF_CORE_AVRG_TIME + suffix] = result.avrg # ok + out[FileMarker.PRF_CORE_STD_DEVIATION + suffix] = result.std # ok + out[FileMarker.PRF_CORE_MIN + suffix] = result.min # ok + out[FileMarker.PRF_CORE_MAX + suffix] = result.max # ok + out[FileMarker.PRF_CORE_TIME_END] = datetime.utcnow().isoformat(' ') + + # human readable form + readable_out = {} + readable_out[FileMarker.HM_PRF_CORE_PLAN_EXECUTOR_ALL] = f"{processes * threads} [{processes},{threads}]" + readable_out[FileMarker.HM_PRF_CORE_REAL_EXECUTOR] = percentile_list[1].executors # executors + readable_out[FileMarker.HM_PRF_CORE_GROUP] = group + for result in percentile_list.values(): + suffix = f"_{int(result.percentile * 100)}" if result.percentile < 1 else "" + #readable_out[FileMarker.HM_PRF_CORE_TOTAL_CALL + suffix] = result.count + readable_out[FileMarker.HM_PRF_CORE_TOTAL_CALL + suffix] = result.count + if result.call_per_sec_raw == result.call_per_sec: + call_readable = f"{round(result.call_per_sec_raw, OutputSetup().human_precision)}" + else: + call_readable = f"{round(result.call_per_sec_raw, OutputSetup().human_precision)}/{round(result.call_per_sec, OutputSetup().human_precision)}" + readable_out[FileMarker.HM_PRF_CORE_TOTAL_CALL_PER_SEC + suffix] = call_readable + readable_out[FileMarker.HM_PRF_CORE_AVRG_TIME + suffix] = round(result.avrg, OutputSetup().human_precision) + readable_out[FileMarker.HM_PRF_CORE_STD_DEVIATION + suffix] = round(result.std, OutputSetup().human_precision) + readable_out[FileMarker.PRF_CORE_MIN + suffix] = round(result.min, OutputSetup().human_precision) + readable_out[FileMarker.PRF_CORE_MAX + suffix] = round(result.max, OutputSetup().human_precision) + + # final dump + self.print(f" {dumps(out, separators = OutputSetup().json_separator)}", + f" {dumps(readable_out, separators = OutputSetup().human_json_separator)}") + + return percentile_list diff --git a/qgate_perf/parallel_executor.py b/qgate_perf/parallel_executor.py index 3e64183..580c47a 100644 --- a/qgate_perf/parallel_executor.py +++ b/qgate_perf/parallel_executor.py @@ -2,18 +2,14 @@ import multiprocessing import os.path import gc -from json import dumps -from datetime import datetime from time import sleep -from qgate_perf.file_marker import FileMarker from qgate_perf.run_setup import RunSetup -from qgate_perf.helper import ExecutorHelper, GraphScope, BundleHelper, get_host, get_memory, get_readable_duration -from qgate_perf.parallel_probe import ParallelProbe, PercentileSummary +from qgate_perf.helper import ExecutorHelper, GraphScope, BundleHelper #, get_host, get_memory, get_readable_duration +from qgate_perf.parallel_probe import ParallelProbe #, PercentileSummary from qgate_perf.run_return import RunReturn from platform import python_version from packaging import version -from qgate_perf.output_setup import OutputSetup -from qgate_perf.output_result import PerfResult, PerfResults +from qgate_perf.output_result import PerfResult, PerfResults, OutputResults def _executor_wrapper(func, run_return: RunReturn, run_setup: RunSetup): @@ -31,210 +27,6 @@ def _executor_wrapper(func, run_return: RunReturn, run_setup: RunSetup): except Exception as ex: run_return.probe=ParallelProbe(None, f"{type(ex).__name__}: {str(ex)}") -class ParallelOutput: - - def __init__(self, label = None, detail_output = True, output_file = None): - self._label = label - self._detail_output = detail_output - self._output_file = output_file - self._file = None - - def open(self): - if self._output_file is not None: - self._file = self._open_output() - - def close(self): - if self._file is not None: - self._file.close() - self._file = None - - def _create_percentile_list(self, run_setup: RunSetup, return_dict): - - percentile_list = {} - - # pre-calculation - # iteration cross executors results - for return_key in return_dict: - response = return_dict[return_key] - if response: - if response.exception is None: - # iteration cross all percentiles - for result in response.percentile_results: - if result.count > 0: - # sum of average time for one call - if percentile_list.get(result.percentile, None) is None: - percentile_list[result.percentile] = PercentileSummary(result.percentile, - result.count, - 0, - 0, - result.total_duration / result.count, - result.std, - result.min, - result.max, - 1) - else: - itm = percentile_list[result.percentile] - itm.count += result.count - itm.avrg += result.total_duration / result.count - itm.std += result.std - itm.min = min(result.min, itm.min) - itm.max = max(result.max, itm.max) - itm.executors += 1 - else: - if percentile_list.get(result.percentile, None) is None: - percentile_list[result.percentile] = PercentileSummary(result.percentile, - 0, - 0, - 0, - 0, - 0, - ParallelProbe.MIN_DURATION, - 0, - 0) - - # define percentile, if not exist - # if 100 percentile does not exist, create it - if percentile_list.get(1, None) is None: - percentile_list[1] = PercentileSummary(1,0,0, 0, 0,0, 0, 0,0) - - # if expected percentile does not exist, create it - if run_setup.exist("percentile"): - if percentile_list.get(run_setup["percentile"], None) is None: - percentile_list[run_setup["percentile"]] = PercentileSummary(run_setup["percentile"], 0, 0, 0, 0, 0, 0, 0, 0) - - # final calculation - for percentile in percentile_list.values(): - if percentile.executors > 0: - # Calc clarification (for better understanding): - # avrg / count = average time for one executor (average is cross all calls and executors) - # 1 / (avrg/count) = average amount of calls per one second (cross executors) - percentile.call_per_sec_raw = 0 if (percentile.avrg / percentile.executors) == 0 else (1 / (percentile.avrg / percentile.executors)) * percentile.executors - percentile.call_per_sec = percentile.call_per_sec_raw * run_setup._bulk_row - - percentile.avrg = 0 if percentile.executors == 0 else percentile.avrg / percentile.executors - percentile.std = 0 if percentile.executors == 0 else percentile.std / percentile.executors - else: - percentile.min = 0 - percentile.max = 0 - - return percentile_list - - def _open_output(self): - dirname = os.path.dirname(self._output_file) - if dirname: - if not os.path.exists(dirname): - os.makedirs(dirname, mode=0o777) - return open(self._output_file, 'a') - - def print(self, out: str, readable_out: str = None): - - # print to the file 'out' - if self._file is not None: - self._file.write(out + "\n") - - # print to the console 'readable_out' or 'out' - print(readable_out if readable_out else out) - - def print_header(self, run_setup: RunSetup=None): - self._start_tasks = datetime.utcnow() - self.print(f"############### {self._start_tasks.isoformat(' ')} ###############") - total, free = get_memory() - out = {} - out[FileMarker.PRF_TYPE] = FileMarker.PRF_HDR_TYPE - out[FileMarker.PRF_HDR_LABEL] = self._label if self._label is not None else "Noname" - out[FileMarker.PRF_HDR_BULK] = [run_setup._bulk_row, run_setup._bulk_col] - out[FileMarker.PRF_HDR_DURATION] = run_setup._duration_second - if run_setup.exist('percentile'): - out[FileMarker.PRF_HDR_PERCENTILE] = run_setup['percentile'] - out[FileMarker.PRF_HDR_AVIALABLE_CPU] = multiprocessing.cpu_count() - out[FileMarker.PRF_HDR_MEMORY] = total - out[FileMarker.PRF_HDR_MEMORY_FREE] = free - out[FileMarker.PRF_HDR_HOST] = get_host() - out[FileMarker.PRF_HDR_NOW] = self._start_tasks.isoformat(' ') - - readable_out = {} - readable_out[FileMarker.HR_PRF_HDR_LABEL] = self._label if self._label is not None else "Noname" - readable_out[FileMarker.PRF_HDR_BULK] = [run_setup._bulk_row, run_setup._bulk_col] - readable_out[FileMarker.PRF_HDR_DURATION] = run_setup._duration_second - if run_setup.exist('percentile'): - readable_out[FileMarker.HR_PRF_HDR_PERCENTILE] = run_setup['percentile'] - readable_out[FileMarker.PRF_HDR_AVIALABLE_CPU] = multiprocessing.cpu_count() - readable_out[FileMarker.HR_PRF_HDR_MEMORY] = f"{total}/{free}" - - self.print(dumps(out, separators=OutputSetup().json_separator), - dumps(readable_out, separators = OutputSetup().human_json_separator)) - - def print_footer(self, final_state): - seconds = round((datetime.utcnow() - self._start_tasks).total_seconds(), 1) - self.print(f"############### State: {'OK' if final_state else 'Error'}, " - f" Duration: {get_readable_duration(seconds)} ({seconds}" - f" seconds) ###############") - - def print_detail(self, run_setup: RunSetup, return_dict, processes, threads, group=''): - """ - Print detail from executors - - :param file: Output stream for print - :param run_setup: Setting for executors - :param return_dict: Return values from executors - :param processes: Number of processes - :param threads: Number of threads - :param group: Name of group - :return: Performance, total calls per one second - """ - if self._detail_output == True: - for return_key in return_dict: - parallel_ret = return_dict[return_key] - self.print(f" {str(parallel_ret) if parallel_ret else ParallelProbe.dump_error('SYSTEM overloaded')}", - f" {parallel_ret.readable_str() if parallel_ret else ParallelProbe.readable_dump_error('SYSTEM overloaded')}") - - # new calculation - percentile_list = self._create_percentile_list(run_setup, return_dict) - - # A2A form - out = {} - out[FileMarker.PRF_TYPE] = FileMarker.PRF_CORE_TYPE - out[FileMarker.PRF_CORE_PLAN_EXECUTOR_ALL] = processes * threads - out[FileMarker.PRF_CORE_PLAN_EXECUTOR] = [processes, threads] - out[FileMarker.PRF_CORE_REAL_EXECUTOR] = percentile_list[1].executors #executors - out[FileMarker.PRF_CORE_GROUP] = group - for result in percentile_list.values(): - suffix = f"_{int(result.percentile * 100)}" if result.percentile < 1 else "" - out[FileMarker.PRF_CORE_TOTAL_CALL + suffix] = result.count # ok - out[FileMarker.PRF_CORE_TOTAL_CALL_PER_SEC_RAW + suffix] = result.call_per_sec_raw # ok - out[FileMarker.PRF_CORE_TOTAL_CALL_PER_SEC + suffix] = result.call_per_sec # ok - out[FileMarker.PRF_CORE_AVRG_TIME + suffix] = result.avrg # ok - out[FileMarker.PRF_CORE_STD_DEVIATION + suffix] = result.std # ok - out[FileMarker.PRF_CORE_MIN + suffix] = result.min # ok - out[FileMarker.PRF_CORE_MAX + suffix] = result.max # ok - out[FileMarker.PRF_CORE_TIME_END] = datetime.utcnow().isoformat(' ') - - # human readable form - readable_out = {} - readable_out[FileMarker.HM_PRF_CORE_PLAN_EXECUTOR_ALL] = f"{processes * threads} [{processes},{threads}]" - readable_out[FileMarker.HM_PRF_CORE_REAL_EXECUTOR] = percentile_list[1].executors # executors - readable_out[FileMarker.HM_PRF_CORE_GROUP] = group - for result in percentile_list.values(): - suffix = f"_{int(result.percentile * 100)}" if result.percentile < 1 else "" - #readable_out[FileMarker.HM_PRF_CORE_TOTAL_CALL + suffix] = result.count - readable_out[FileMarker.HM_PRF_CORE_TOTAL_CALL + suffix] = result.count - if result.call_per_sec_raw == result.call_per_sec: - call_readable = f"{round(result.call_per_sec_raw, OutputSetup().human_precision)}" - else: - call_readable = f"{round(result.call_per_sec_raw, OutputSetup().human_precision)}/{round(result.call_per_sec, OutputSetup().human_precision)}" - readable_out[FileMarker.HM_PRF_CORE_TOTAL_CALL_PER_SEC + suffix] = call_readable - readable_out[FileMarker.HM_PRF_CORE_AVRG_TIME + suffix] = round(result.avrg, OutputSetup().human_precision) - readable_out[FileMarker.HM_PRF_CORE_STD_DEVIATION + suffix] = round(result.std, OutputSetup().human_precision) - readable_out[FileMarker.PRF_CORE_MIN + suffix] = round(result.min, OutputSetup().human_precision) - readable_out[FileMarker.PRF_CORE_MAX + suffix] = round(result.max, OutputSetup().human_precision) - - # final dump - self.print(f" {dumps(out, separators = OutputSetup().json_separator)}", - f" {dumps(readable_out, separators = OutputSetup().human_json_separator)}") - - return percentile_list - - class ParallelExecutor: """ Helper for parallel execution of defined function (via start new process with aim to avoid GIL) """ @@ -394,7 +186,7 @@ def run_executor(self, executor_list = ExecutorHelper.PROCESS_2_8_THREAD_1_4_SHO if self._init_each_bulk: self.init_run(run_setup) - output = ParallelOutput(self._label, self._detail_output, self._output_file) + output = OutputResults(self._label, self._detail_output, self._output_file) output.open() output.print_header(run_setup) @@ -456,7 +248,7 @@ def run(self, processes = 2, threads = 2, run_setup: RunSetup = None, performanc # if self._output_file is not None: # file = self._open_output() - output = ParallelOutput(self._label, self._detail_output, self._output_file) + output = OutputResults(self._label, self._detail_output, self._output_file) output.open() output.print_header(run_setup) From 3e4e92504663d8bfd54b3cc57e4f697c950fdae0 Mon Sep 17 00:00:00 2001 From: jist <95856749+george0st@users.noreply.github.com> Date: Thu, 24 Oct 2024 14:21:54 +0200 Subject: [PATCH 5/6] ... --- qgate_perf/output_result.py | 2 +- qgate_perf/parallel_executor.py | 10 +++++----- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/qgate_perf/output_result.py b/qgate_perf/output_result.py index 1a0e555..fe1c7fe 100644 --- a/qgate_perf/output_result.py +++ b/qgate_perf/output_result.py @@ -78,7 +78,7 @@ def __str__(self): # TODO: improve output pass -class OutputResults: +class Output: def __init__(self, label = None, detail_output = True, output_file = None): self._label = label diff --git a/qgate_perf/parallel_executor.py b/qgate_perf/parallel_executor.py index 580c47a..22cbf91 100644 --- a/qgate_perf/parallel_executor.py +++ b/qgate_perf/parallel_executor.py @@ -4,12 +4,12 @@ import gc from time import sleep from qgate_perf.run_setup import RunSetup -from qgate_perf.helper import ExecutorHelper, GraphScope, BundleHelper #, get_host, get_memory, get_readable_duration -from qgate_perf.parallel_probe import ParallelProbe #, PercentileSummary +from qgate_perf.helper import ExecutorHelper, GraphScope, BundleHelper +from qgate_perf.parallel_probe import ParallelProbe from qgate_perf.run_return import RunReturn from platform import python_version from packaging import version -from qgate_perf.output_result import PerfResult, PerfResults, OutputResults +from qgate_perf.output_result import PerfResult, PerfResults, Output def _executor_wrapper(func, run_return: RunReturn, run_setup: RunSetup): @@ -186,7 +186,7 @@ def run_executor(self, executor_list = ExecutorHelper.PROCESS_2_8_THREAD_1_4_SHO if self._init_each_bulk: self.init_run(run_setup) - output = OutputResults(self._label, self._detail_output, self._output_file) + output = Output(self._label, self._detail_output, self._output_file) output.open() output.print_header(run_setup) @@ -248,7 +248,7 @@ def run(self, processes = 2, threads = 2, run_setup: RunSetup = None, performanc # if self._output_file is not None: # file = self._open_output() - output = OutputResults(self._label, self._detail_output, self._output_file) + output = Output(self._label, self._detail_output, self._output_file) output.open() output.print_header(run_setup) From 8662a178101a9c2ee109d8e61b5c59c7f6827be1 Mon Sep 17 00:00:00 2001 From: jist <95856749+george0st@users.noreply.github.com> Date: Thu, 24 Oct 2024 14:29:21 +0200 Subject: [PATCH 6/6] Update version.py --- qgate_perf/version.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/qgate_perf/version.py b/qgate_perf/version.py index d5c4623..65c5cae 100644 --- a/qgate_perf/version.py +++ b/qgate_perf/version.py @@ -1,3 +1,3 @@ # Store the version here so: -__version__ = '0.4.42' \ No newline at end of file +__version__ = '0.4.43' \ No newline at end of file