From 6647f28140ad5ba26d3795deb5f277b96fcfc074 Mon Sep 17 00:00:00 2001 From: oleg-w570 <73493289+oleg-w570@users.noreply.github.com> Date: Wed, 6 Dec 2023 17:41:57 +0300 Subject: [PATCH] =?UTF-8?q?=D0=90=D1=81=D0=B8=D0=BD=D1=85=D1=80=D0=BE?= =?UTF-8?q?=D0=BD=D0=BD=D0=B0=D1=8F=20=D0=BF=D0=B0=D1=80=D0=B0=D0=BB=D0=BB?= =?UTF-8?q?=D0=B5=D0=BB=D1=8C=D0=BD=D0=B0=D1=8F=20=D1=81=D1=85=D0=B5=D0=BC?= =?UTF-8?q?=D0=B0=20(#166)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * async initial * async up * async second * pep8 * pep8 * switch multiprocessing to multiprocess (part of pathos) * revert gkls example * revert requirements.txt * move async implementation from async_parallel_process to async_calculator * pep8 * redundant code removed * test for async parallel process * gkls async example add multiprocess to requirements * gkls async example * async initial * async up * async second * pep8 * pep8 * switch multiprocessing to multiprocess (part of pathos) * revert gkls example * revert requirements.txt * move async implementation from async_parallel_process to async_calculator * pep8 * redundant code removed * test for async parallel process * gkls async example add multiprocess to requirements * gkls async example * semi-fix for iter-tasks --- examples/GKLS_async_example.py | 31 +++++++ iOpt/method/async_calculator.py | 92 +++++++++++++++++++ iOpt/method/async_parallel_process.py | 88 ++++++++++++++++++ iOpt/method/search_data.py | 4 +- iOpt/method/solverFactory.py | 5 + iOpt/solver_parametrs.py | 2 + requirements.txt | 3 +- .../method/test_async_parallel_process.py | 68 ++++++++++++++ 8 files changed, 291 insertions(+), 2 deletions(-) create mode 100644 examples/GKLS_async_example.py create mode 100644 iOpt/method/async_calculator.py create mode 100644 iOpt/method/async_parallel_process.py create mode 100644 test/iOpt/method/test_async_parallel_process.py diff --git a/examples/GKLS_async_example.py b/examples/GKLS_async_example.py new file mode 100644 index 0000000..529f7f6 --- /dev/null +++ b/examples/GKLS_async_example.py @@ -0,0 +1,31 @@ +from iOpt.output_system.listeners.console_outputers import ConsoleOutputListener +from iOpt.output_system.listeners.static_painters import StaticPainterNDListener +from iOpt.solver import Solver +from iOpt.solver_parametrs import SolverParameters +from problems.GKLS import GKLS + + +def solve_single_gkls(): + """ + Минимизация тестовой функции из GKLS генератора с номером 39 + """ + + # создание объекта задачи + problem = GKLS(dimension=3, functionNumber=39) + + # Формируем параметры решателя + params = SolverParameters(r=4, eps=0.01, number_of_parallel_points=4, async_scheme=True) + + # Создаем решатель + solver = Solver(problem=problem, parameters=params) + + # Добавляем вывод результатов в консоль + cfol = ConsoleOutputListener(mode='full') + solver.add_listener(cfol) + + # Решение задачи + solver.solve() + + +if __name__ == "__main__": + solve_single_gkls() diff --git a/iOpt/method/async_calculator.py b/iOpt/method/async_calculator.py new file mode 100644 index 0000000..12bc489 --- /dev/null +++ b/iOpt/method/async_calculator.py @@ -0,0 +1,92 @@ +import multiprocess as mp + +from iOpt.method.icriterion_evaluate_method import ICriterionEvaluateMethod +from iOpt.method.search_data import SearchDataItem +from iOpt.solver_parametrs import SolverParameters + + +class Worker(mp.Process): + def __init__( + self, + evaluate_method: ICriterionEvaluateMethod, + task_queue: mp.Queue, + done_queue: mp.Queue, + ): + super(Worker, self).__init__() + self.evaluate_method = evaluate_method + self.task_queue = task_queue + self.done_queue = done_queue + + def run(self): + for point in iter(self.task_queue.get, "STOP"): + point = self.evaluate_method.calculate_functionals(point) + self.done_queue.put_nowait(point) + + +class AsyncCalculator: + def __init__( + self, evaluate_method: ICriterionEvaluateMethod, parameters: SolverParameters + ): + self.evaluate_method = evaluate_method + self.task_queue = mp.Queue() + self.done_queue = mp.Queue() + self.workers = [ + Worker(evaluate_method, self.task_queue, self.done_queue) + for _ in range(parameters.number_of_parallel_points) + ] + self.waiting_workers = parameters.number_of_parallel_points + self.waiting_oldpoints: dict[float, SearchDataItem] = dict() + + def start(self) -> None: + for w in self.workers: + w.start() + + def give_point(self, newpoint: SearchDataItem, oldpoint: SearchDataItem) -> None: + self.task_queue.put_nowait(newpoint) + self.waiting_oldpoints[newpoint.get_x()] = oldpoint + oldpoint.blocked = True + + def _take_calculated_point( + self, block: bool + ) -> tuple[SearchDataItem, SearchDataItem]: + newpoint = self.done_queue.get(block=block) + oldpoint = self.waiting_oldpoints.pop(newpoint.get_x()) + oldpoint.blocked = False + return newpoint, oldpoint + + def take_list_of_calculated_points( + self + ) -> list[tuple[SearchDataItem, SearchDataItem]]: + list_points = [] + points = self._take_calculated_point(block=True) + list_points.append(points) + self.waiting_workers = 1 + while not self.done_queue.empty(): + points = self._take_calculated_point(block=False) + list_points.append(points) + self.waiting_workers += 1 + return list_points + + def stop(self) -> list[tuple[SearchDataItem, SearchDataItem]]: + for _ in range(len(self.workers)): + self.task_queue.put_nowait("STOP") + for w in self.workers: + w.join() + list_points = [] + while not self.done_queue.empty(): + points = self._take_calculated_point(block=False) + list_points.append(points) + return list_points + + def calculate_functionals_for_items( + self, points: list[SearchDataItem] + ) -> list[SearchDataItem]: + for point in points: + self.task_queue.put_nowait(point) + points_res = [] + for _ in range(len(points)): + points_res.append(self.done_queue.get()) + points_res.sort(key=lambda p: p.get_x()) + for point, point_r in zip(points, points_res): + self.evaluate_method.copy_functionals(point, point_r) + return points diff --git a/iOpt/method/async_parallel_process.py b/iOpt/method/async_parallel_process.py new file mode 100644 index 0000000..e1e4bf0 --- /dev/null +++ b/iOpt/method/async_parallel_process.py @@ -0,0 +1,88 @@ +import traceback +from datetime import datetime + +from iOpt.evolvent.evolvent import Evolvent +from iOpt.method.async_calculator import AsyncCalculator +from iOpt.method.index_method_calculator import IndexMethodCalculator +from iOpt.method.listener import Listener +from iOpt.method.method import Method +from iOpt.method.optim_task import OptimizationTask +from iOpt.method.process import Process +from iOpt.method.search_data import SearchData +from iOpt.solution import Solution +from iOpt.solver_parametrs import SolverParameters + + +class AsyncParallelProcess(Process): + def __init__( + self, + parameters: SolverParameters, + task: OptimizationTask, + evolvent: Evolvent, + search_data: SearchData, + method: Method, + listeners: list[Listener], + ): + super(AsyncParallelProcess, self).__init__( + parameters, task, evolvent, search_data, method, listeners + ) + self.calculator = AsyncCalculator(IndexMethodCalculator(task), parameters) + + def do_global_iteration(self, number: int = 1) -> None: + done_trials = [] + if self._first_iteration is True: + for listener in self._listeners: + listener.before_method_start(self.method) + done_trials = self.method.first_iteration(self.calculator) + self._first_iteration = False + number -= 1 + + for _ in range(number): + for _ in range(self.calculator.waiting_workers): + newpoint, oldpoint = self.method.calculate_iteration_point() + self.calculator.give_point(newpoint, oldpoint) + self.method.finalize_iteration() + + for newpoint, oldpoint in self.calculator.take_list_of_calculated_points(): + self.method.update_optimum(newpoint) + self.method.renew_search_data(newpoint, oldpoint) + + done_trials.extend( + self.search_data.get_last_items(self.calculator.waiting_workers) + ) + + for listener in self._listeners: + listener.on_end_iteration(done_trials, self.get_results()) + + def solve(self) -> Solution: + """ + Метод позволяет решить задачу оптимизации. Остановка поиска выполняется согласно критерию, + заданному при создании класса Solver. + + :return: Текущая оценка решения задачи оптимизации + """ + self.calculator.start() + + start_time = datetime.now() + try: + while not self.method.check_stop_condition(): + self.do_global_iteration() + except Exception: + print("Exception was thrown") + print(traceback.format_exc()) + + for newpoint, oldpoint in self.calculator.stop(): + self.method.update_optimum(newpoint) + self.method.renew_search_data(newpoint, oldpoint) + + if self.parameters.refine_solution: + self.do_local_refinement(self.parameters.local_method_iteration_count) + + result = self.get_results() + result.solving_time = (datetime.now() - start_time).total_seconds() + + for listener in self._listeners: + status = self.method.check_stop_condition() + listener.on_method_stop(self.search_data, self.get_results(), status) + + return result diff --git a/iOpt/method/search_data.py b/iOpt/method/search_data.py index 01258fb..71e3a30 100644 --- a/iOpt/method/search_data.py +++ b/iOpt/method/search_data.py @@ -47,6 +47,7 @@ def __init__(self, y: Point, x: np.double, self.globalR: np.double = -1.0 self.localR: np.double = -1.0 self.iterationNumber: int = -1 + self.blocked: bool = False def get_x(self) -> np.double: """ @@ -321,7 +322,8 @@ def refill_queue(self): """ self._RGlobalQueue.Clear() for itr in self: - self._RGlobalQueue.insert(itr.globalR, itr) + if not itr.blocked: + self._RGlobalQueue.insert(itr.globalR, itr) # Возвращает текущее число интервалов в дереве def get_count(self) -> int: diff --git a/iOpt/method/solverFactory.py b/iOpt/method/solverFactory.py index ed7cfdf..1b86d02 100644 --- a/iOpt/method/solverFactory.py +++ b/iOpt/method/solverFactory.py @@ -1,6 +1,7 @@ from typing import List from iOpt.evolvent.evolvent import Evolvent +from iOpt.method.async_parallel_process import AsyncParallelProcess from iOpt.method.index_method import IndexMethod from iOpt.method.listener import Listener from iOpt.method.method import Method @@ -65,6 +66,10 @@ def create_process(parameters: SolverParameters, if parameters.number_of_parallel_points == 1: return Process(parameters=parameters, task=task, evolvent=evolvent, search_data=search_data, method=method, listeners=listeners) + elif parameters.async_scheme: + return AsyncParallelProcess(parameters=parameters, task=task, evolvent=evolvent, + search_data=search_data, method=method, listeners=listeners) else: return ParallelProcess(parameters=parameters, task=task, evolvent=evolvent, search_data=search_data, method=method, listeners=listeners) + diff --git a/iOpt/solver_parametrs.py b/iOpt/solver_parametrs.py index b3d84f4..bb36f03 100644 --- a/iOpt/solver_parametrs.py +++ b/iOpt/solver_parametrs.py @@ -16,6 +16,7 @@ def __init__(self, refine_solution: bool = False, start_point: Point = [], number_of_parallel_points: int = 1, + async_scheme: bool = False, timeout: int = -1, proportion_of_global_iterations: float = 0.95 ): @@ -54,6 +55,7 @@ def __init__(self, self.refine_solution = refine_solution self.start_point = start_point self.number_of_parallel_points = number_of_parallel_points + self.async_scheme = async_scheme self.timeout = timeout def to_string(self) -> str: diff --git a/requirements.txt b/requirements.txt index 38db274..b1cac43 100644 --- a/requirements.txt +++ b/requirements.txt @@ -8,4 +8,5 @@ sphinx_rtd_theme readthedocs-sphinx-search sphinxcontrib-details-directive autodocsumm -pathos \ No newline at end of file +pathos +multiprocess diff --git a/test/iOpt/method/test_async_parallel_process.py b/test/iOpt/method/test_async_parallel_process.py new file mode 100644 index 0000000..b38e7ab --- /dev/null +++ b/test/iOpt/method/test_async_parallel_process.py @@ -0,0 +1,68 @@ +import unittest +from time import sleep + +from iOpt.evolvent.evolvent import Evolvent +from iOpt.method.async_parallel_process import AsyncParallelProcess +from iOpt.method.method import Method +from iOpt.method.optim_task import OptimizationTask +from iOpt.method.search_data import SearchData +from iOpt.problem import Problem +from iOpt.solver_parametrs import SolverParameters +from iOpt.trial import Point, FunctionValue + + +class ProblemForTest(Problem): + def __init__(self): + super().__init__() + self.dimension = 1 + self.number_of_float_variables = 1 + self.number_of_constraints = 0 + self.number_of_objectives = 1 + self.lower_bound_of_float_variables = [-1] + self.upper_bound_of_float_variables = [1] + + def calculate(self, point: Point, function_value: FunctionValue) -> FunctionValue: + sleep(1 - point.float_variables[0]) + function_value.value = point.float_variables[0] ** 2 + return function_value + + +class TestAsyncParallelProcess(unittest.TestCase): + def setUp(self): + param = SolverParameters(number_of_parallel_points=2, iters_limit=4) + problem = ProblemForTest() + task = OptimizationTask(problem) + evolvent = Evolvent( + problem.lower_bound_of_float_variables, + problem.upper_bound_of_float_variables, + problem.number_of_float_variables, + ) + search_data = SearchData(problem) + method = Method(param, task, evolvent, search_data) + self.async_parallel_process = AsyncParallelProcess( + param, task, evolvent, search_data, method, [] + ) + + def test_Solve(self): + self.async_parallel_process.solve() + items = self.async_parallel_process.search_data.get_last_items(5) + self.assertAlmostEqual(-1 / 3, items[0].get_y().float_variables[0]) + self.assertAlmostEqual(1 / 3, items[1].get_y().float_variables[0]) + self.assertAlmostEqual(2 / 3, items[2].get_y().float_variables[0]) + self.assertAlmostEqual(0.0, items[3].get_y().float_variables[0]) + self.assertAlmostEqual(-2 / 3, items[4].get_y().float_variables[0]) + self.assertAlmostEqual( + 0.0, + self.async_parallel_process.search_data.solution.best_trials[0] + .point.float_variables[0], + ) + self.assertAlmostEqual( + 0.0, + self.async_parallel_process.search_data.solution.best_trials[0] + .function_values[0].value, + ) + self.assertEqual(4, self.async_parallel_process.method.iterations_count) + + +if __name__ == "__main__": + unittest.main()