Skip to content

Commit

Permalink
Merge pull request #48 from george0st/changes
Browse files Browse the repository at this point in the history
Add core evaluation tests
  • Loading branch information
george0st authored Oct 3, 2024
2 parents 2b0c534 + e28c39b commit a884167
Show file tree
Hide file tree
Showing 7 changed files with 222 additions and 67 deletions.
1 change: 1 addition & 0 deletions qgate_perf/file_format.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ class FileFormat:
HR_PRF_DETAIL_CALLS = "call"
HR_PRF_DETAIL_AVRG = "avr"
HR_PRF_DETAIL_STDEV = "std"
HR_PRF_DETAIL_TOTAL = "tduration"

# core output
PRF_CORE_TYPE = "core"
Expand Down
14 changes: 14 additions & 0 deletions qgate_perf/output_performance.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@

class OutputPerformance:
def __init__(self, row, col, process, thread, calls_sec):

self.bundle_row = row
self.bundle_col = col

self.executor_process = process
self.executor_thread = thread

self.calls_sec = calls_sec

def __str__(self):
return f"bundle ({self.bundle_row}x{self.bundle_col}), executor ({self.executor_process}x{self.executor_thread}) = {self.calls_sec}"
58 changes: 44 additions & 14 deletions qgate_perf/parallel_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from packaging import version
from contextlib import suppress
from qgate_perf.output_setup import OutputSetup
from qgate_perf.output_performance import OutputPerformance


def _executor_wrapper(func, run_return: RunReturn, run_setup: RunSetup):
Expand Down Expand Up @@ -230,6 +231,7 @@ def _print_detail(self, file, run_setup: RunSetup, return_dict, processes, threa
:param processes: Number of processes
:param threads: Number of threads
:param group: Name of group
:return: Performance, total calls per one second
"""
sum_avrg_time = 0
sum_deviation = 0
Expand Down Expand Up @@ -257,8 +259,6 @@ def _print_detail(self, file, run_setup: RunSetup, return_dict, processes, threa
# 1 / (sum_avrg_time/count) = average amount of calls per one second (cross executors)
total_call_per_sec = 0 if (sum_avrg_time / executors) == 0 else (1 / (sum_avrg_time / executors)) * executors * run_setup._bulk_row

# TODO: Save to the output buffer final value of performance, for possible check in unit tests, etc.

out = {
FileFormat.PRF_TYPE: FileFormat.PRF_CORE_TYPE,
FileFormat.PRF_CORE_PLAN_EXECUTOR_ALL: processes * threads,
Expand All @@ -284,6 +284,8 @@ def _print_detail(self, file, run_setup: RunSetup, return_dict, processes, threa
f" {json.dumps(out)}",
f" {json.dumps(readable_out, separators = OutputSetup().human_json_separator)}")

return total_call_per_sec

def _open_output(self):
dirname = os.path.dirname(self._output_file)
if dirname:
Expand All @@ -293,7 +295,7 @@ def _open_output(self):

def _executeCore(self, run_setup: RunSetup, return_dict, processes=2, threads=2):

from qgate_perf.run_return import RunReturn
#from qgate_perf.run_return import RunReturn

proc = []

Expand Down Expand Up @@ -333,21 +335,24 @@ def _executeCore(self, run_setup: RunSetup, return_dict, processes=2, threads=2)
print(f"SYSTEM ERROR in '_executeCore': '{str(ex)}'")

def run_bulk_executor(self,
bulk_list= BundleHelper.ROW_1_COL_10_100,
executor_list= ExecutorHelper.PROCESS_2_8_THREAD_1_4_SHORT,
run_setup: RunSetup=None,
sleep_between_bulks=0) -> bool:
bulk_list = BundleHelper.ROW_1_COL_10_100,
executor_list = ExecutorHelper.PROCESS_2_8_THREAD_1_4_SHORT,
run_setup: RunSetup = None,
sleep_between_bulks = 0,
return_performance = False):
""" Run cykle of bulks in cycle of sequences for function execution
:param bulk_list: list of bulks for execution in format [[rows, columns], ...]
:param executor_list: list of executors for execution in format [[processes, threads, 'label'], ...]
:param run_setup: setup of execution
:param sleep_between_bulks: sleep between bulks
:param return_performance: add to the return also performance, return will be state and performance (default is False)
:return: return state, True - all executions was without exceptions,
False - some exceptions
"""
final_state = True
count = 0
performance = []
for bulk in bulk_list:

# sleep before other bulk
Expand All @@ -357,24 +362,37 @@ def run_bulk_executor(self,

# execute
run_setup.set_bulk(bulk[0], bulk[1])
if not self.run_executor(executor_list, run_setup):
final_state=False

if return_performance:
state, bulk_performance = self.run_executor(executor_list, run_setup, return_performance)
if not state:
final_state=False
for bulk_perf in bulk_performance:
performance.append(bulk_perf)
else:
if not self.run_executor(executor_list, run_setup):
final_state=False
# memory clean
gc.collect()

if return_performance:
return final_state, performance
return final_state

def run_executor(self, executor_list= ExecutorHelper.PROCESS_2_8_THREAD_1_4_SHORT,
run_setup: RunSetup=None) -> bool:
""" Run executor sequencies
def run_executor(self, executor_list = ExecutorHelper.PROCESS_2_8_THREAD_1_4_SHORT,
run_setup: RunSetup = None,
return_performance = False):
""" Run executor sequences
:param executor_list: list of executors for execution in format [[processes, threads, 'label'], ...]
:param run_setup: setup of execution
:param return_performance: add to the return also performance, return will be state and performance (default is False)
:return: return state, True - all executions was without exceptions,
False - some exceptions
"""
file = None
final_state=True
final_state = True
performance = []
print('Execution...')

try:
Expand All @@ -391,15 +409,24 @@ def run_executor(self, executor_list= ExecutorHelper.PROCESS_2_8_THREAD_1_4_SHOR
with multiprocessing.Manager() as manager:
return_dict = manager.dict()
self._executeCore(run_setup, return_dict, executors[0], executors[1])
self._print_detail(file,
cals_sec = self._print_detail(file,
run_setup,
return_dict,
executors[0],
executors[1],
'' if len(executors) <= 2 else executors[2])
if return_performance:
performance.append(OutputPerformance(run_setup.bulk_row,
run_setup.bulk_col,
executors[0],
executors[1],
cals_sec))
if not self._final_state(return_dict):
final_state=False

# memory clean
gc.collect(generation = 1)

self._print_footer(file, final_state)

except Exception as ex:
Expand All @@ -408,6 +435,9 @@ def run_executor(self, executor_list= ExecutorHelper.PROCESS_2_8_THREAD_1_4_SHOR
finally:
if file is not None:
file.close()

if return_performance:
return final_state, performance
return final_state

def run(self, processes=2, threads=2, run_setup: RunSetup=None) -> bool:
Expand Down
2 changes: 1 addition & 1 deletion qgate_perf/parallel_probe.py
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ def readable_str(self, compact_form = True):
FileFormat.PRF_DETAIL_MIN: round(self.min_duration, OutputSetup().human_precision),
FileFormat.PRF_DETAIL_MAX: round(self.max_duration, OutputSetup().human_precision),
FileFormat.HR_PRF_DETAIL_STDEV: round(self.standard_deviation, OutputSetup().human_precision),
FileFormat.PRF_DETAIL_TOTAL: round(self.total_duration, OutputSetup().human_precision)
FileFormat.HR_PRF_DETAIL_TOTAL: round(self.total_duration, OutputSetup().human_precision)
}, separators = OutputSetup().human_json_separator if compact_form else (', ', ': '))
else:
return ParallelProbe.readable_dump_error(self.exception, self.pid, self.counter)
Expand Down
160 changes: 160 additions & 0 deletions tests/test_core_evaluation.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
import os
import unittest
import logging
from qgate_perf.parallel_executor import ParallelExecutor
from qgate_perf.parallel_probe import ParallelProbe
from qgate_perf.run_setup import RunSetup
from qgate_perf.executor_helper import ExecutorHelper
from qgate_perf.run_return import RunReturn
from qgate_perf.bundle_helper import BundleHelper
from qgate_perf.executor_helper import ExecutorHelper
from qgate_perf.output_setup import OutputSetup
import time
from os import path
import shutil

def prf_calibration_onehundred_ms(run_setup: RunSetup) -> ParallelProbe:
""" Function for performance testing"""

# init (contain executor synchonization, if needed)
probe = ParallelProbe(run_setup)

if run_setup.is_init:
print(f"!!! INIT CALL !!! {run_setup.bulk_row} x {run_setup.bulk_col}")

while (True):

# START - performance measure for specific part of code
probe.start()

for r in range(run_setup.bulk_row * run_setup.bulk_col):
time.sleep(0.1)

# STOP - performance measure specific part of code
if probe.stop():
break

if run_setup.param("generate_error"):
raise Exception('Simulated error')

# return outputs
return probe

class TestCaseCoreEvaluation(unittest.TestCase):
"""
Test, if calculation of performance is correct
IMPORTANT (main ideas)
- all cases will have similar calls per second
- the different duration time of tests does not change performance (calls per second)
- peformance will affect size of bundle and amount of executors
"""
OUTPUT_ADR = "../output/test_perf/"

@classmethod
def setUpClass(cls):
shutil.rmtree(TestCaseCoreEvaluationCheck.OUTPUT_ADR, True)

@classmethod
def tearDownClass(cls):
pass

def test_expected_output1(self):

generator = ParallelExecutor(prf_calibration_onehundred_ms,
label = "GIL_impact",
detail_output = True,
output_file = path.join(self.OUTPUT_ADR, "perf_gil_impact_test.txt"))

# first
setup=RunSetup(duration_second = 1, start_delay = 0)
state, perf = generator.run_bulk_executor(bulk_list = [[1,1]],
executor_list = [[1,1]],
run_setup = setup,
return_performance = True)
self.assertTrue(state)
self.assertTrue(perf[0].calls_sec >= 9 and perf[0].calls_sec <= 11)

# second
setup=RunSetup(duration_second = 2, start_delay = 0)
state, perf = generator.run_bulk_executor(bulk_list = [[1,1]],
executor_list = [[1,1]],
run_setup = setup,
return_performance = True)
self.assertTrue(state)
self.assertTrue(perf[0].calls_sec >= 9 and perf[0].calls_sec <= 11)

# third
setup=RunSetup(duration_second = 10, start_delay = 0)
state, perf = generator.run_bulk_executor(bulk_list = [[1,1]],
executor_list = [[1,1]],
run_setup = setup,
return_performance = True)
self.assertTrue(state)
self.assertTrue(perf[0].calls_sec >= 9 and perf[0].calls_sec <= 11)

def test_expected_output2(self):
generator = ParallelExecutor(prf_calibration_onehundred_ms,
label="GIL_impact",
detail_output=True,
output_file=path.join(self.OUTPUT_ADR, "perf_gil_impact_test.txt"))
# first
setup=RunSetup(duration_second=1, start_delay=0)
state, perf = generator.run_bulk_executor(bulk_list=[[1,1]],
executor_list=[[2,1]],
run_setup=setup,
return_performance = True)
self.assertTrue(state)
self.assertTrue(perf[0].calls_sec >= 19 and perf[0].calls_sec <= 21)

# second
setup=RunSetup(duration_second=2, start_delay=0)
state, perf = generator.run_bulk_executor(bulk_list=[[1,1]],
executor_list=[[2,1]],
run_setup=setup,
return_performance = True)
self.assertTrue(state)
self.assertTrue(perf[0].calls_sec >= 19 and perf[0].calls_sec <= 21)

# third
setup=RunSetup(duration_second=10, start_delay=0)
state, perf = generator.run_bulk_executor(bulk_list=[[1,1]],
executor_list=[[2,1]],
run_setup=setup,
return_performance = True)
self.assertTrue(state)
self.assertTrue(perf[0].calls_sec >= 19 and perf[0].calls_sec <= 21)

def test_expected_output3(self):
generator = ParallelExecutor(prf_calibration_onehundred_ms,
label="GIL_impact",
detail_output=True,
output_file=path.join(self.OUTPUT_ADR, "perf_gil_impact_test.txt"))

# first
setup=RunSetup(duration_second=1, start_delay=0)
state, perf = generator.run_bulk_executor(bulk_list=[[1,1]],
executor_list=[[4,1]],
run_setup=setup,
return_performance = True)
self.assertTrue(state)
self.assertTrue(perf[0].calls_sec >= 39 and perf[0].calls_sec <= 41)

# second
setup=RunSetup(duration_second=2, start_delay=0)
state, perf = generator.run_bulk_executor(bulk_list=[[1,1]],
executor_list=[[4,1]],
run_setup=setup,
return_performance = True)
self.assertTrue(state)
self.assertTrue(perf[0].calls_sec >= 39 and perf[0].calls_sec <= 41)

# third
setup=RunSetup(duration_second=10, start_delay=0)
state, perf = generator.run_bulk_executor(bulk_list=[[1,1]],
executor_list=[[4,1]],
run_setup=setup,
return_performance = True)
self.assertTrue(state)
self.assertTrue(perf[0].calls_sec >= 39 and perf[0].calls_sec <= 41)

1 change: 1 addition & 0 deletions tests/test_internal.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@


class TestCaseInternal(unittest.TestCase):
"""Only small internal checks"""
@classmethod
def setUpClass(cls):
pass
Expand Down
Loading

0 comments on commit a884167

Please sign in to comment.