Skip to content

Commit

Permalink
Асинхронная параллельная схема (#166)
Browse files Browse the repository at this point in the history
* async initial

* async up

* async second

* pep8

* pep8

* switch multiprocessing to multiprocess (part of pathos)

* revert gkls example

* revert requirements.txt

* move async implementation from async_parallel_process to async_calculator

* pep8

* redundant code removed

* test for async parallel process

* gkls async example
add multiprocess to requirements

* gkls async example

* async initial

* async up

* async second

* pep8

* pep8

* switch multiprocessing to multiprocess (part of pathos)

* revert gkls example

* revert requirements.txt

* move async implementation from async_parallel_process to async_calculator

* pep8

* redundant code removed

* test for async parallel process

* gkls async example
add multiprocess to requirements

* gkls async example

* semi-fix for iter-tasks
  • Loading branch information
oleg-w570 authored Dec 6, 2023
1 parent 35f8cac commit 6647f28
Show file tree
Hide file tree
Showing 8 changed files with 291 additions and 2 deletions.
31 changes: 31 additions & 0 deletions examples/GKLS_async_example.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
from iOpt.output_system.listeners.console_outputers import ConsoleOutputListener
from iOpt.output_system.listeners.static_painters import StaticPainterNDListener
from iOpt.solver import Solver
from iOpt.solver_parametrs import SolverParameters
from problems.GKLS import GKLS


def solve_single_gkls():
"""
Минимизация тестовой функции из GKLS генератора с номером 39
"""

# создание объекта задачи
problem = GKLS(dimension=3, functionNumber=39)

# Формируем параметры решателя
params = SolverParameters(r=4, eps=0.01, number_of_parallel_points=4, async_scheme=True)

# Создаем решатель
solver = Solver(problem=problem, parameters=params)

# Добавляем вывод результатов в консоль
cfol = ConsoleOutputListener(mode='full')
solver.add_listener(cfol)

# Решение задачи
solver.solve()


if __name__ == "__main__":
solve_single_gkls()
92 changes: 92 additions & 0 deletions iOpt/method/async_calculator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
import multiprocess as mp

from iOpt.method.icriterion_evaluate_method import ICriterionEvaluateMethod
from iOpt.method.search_data import SearchDataItem
from iOpt.solver_parametrs import SolverParameters


class Worker(mp.Process):
def __init__(
self,
evaluate_method: ICriterionEvaluateMethod,
task_queue: mp.Queue,
done_queue: mp.Queue,
):
super(Worker, self).__init__()
self.evaluate_method = evaluate_method
self.task_queue = task_queue
self.done_queue = done_queue

def run(self):
for point in iter(self.task_queue.get, "STOP"):
point = self.evaluate_method.calculate_functionals(point)
self.done_queue.put_nowait(point)


class AsyncCalculator:
def __init__(
self, evaluate_method: ICriterionEvaluateMethod, parameters: SolverParameters
):
self.evaluate_method = evaluate_method
self.task_queue = mp.Queue()
self.done_queue = mp.Queue()
self.workers = [
Worker(evaluate_method, self.task_queue, self.done_queue)
for _ in range(parameters.number_of_parallel_points)
]
self.waiting_workers = parameters.number_of_parallel_points
self.waiting_oldpoints: dict[float, SearchDataItem] = dict()

def start(self) -> None:
for w in self.workers:
w.start()

def give_point(self, newpoint: SearchDataItem, oldpoint: SearchDataItem) -> None:
self.task_queue.put_nowait(newpoint)
self.waiting_oldpoints[newpoint.get_x()] = oldpoint
oldpoint.blocked = True

def _take_calculated_point(
self, block: bool
) -> tuple[SearchDataItem, SearchDataItem]:
newpoint = self.done_queue.get(block=block)
oldpoint = self.waiting_oldpoints.pop(newpoint.get_x())
oldpoint.blocked = False
return newpoint, oldpoint

def take_list_of_calculated_points(
self
) -> list[tuple[SearchDataItem, SearchDataItem]]:
list_points = []
points = self._take_calculated_point(block=True)
list_points.append(points)
self.waiting_workers = 1
while not self.done_queue.empty():
points = self._take_calculated_point(block=False)
list_points.append(points)
self.waiting_workers += 1
return list_points

def stop(self) -> list[tuple[SearchDataItem, SearchDataItem]]:
for _ in range(len(self.workers)):
self.task_queue.put_nowait("STOP")
for w in self.workers:
w.join()
list_points = []
while not self.done_queue.empty():
points = self._take_calculated_point(block=False)
list_points.append(points)
return list_points

def calculate_functionals_for_items(
self, points: list[SearchDataItem]
) -> list[SearchDataItem]:
for point in points:
self.task_queue.put_nowait(point)
points_res = []
for _ in range(len(points)):
points_res.append(self.done_queue.get())
points_res.sort(key=lambda p: p.get_x())
for point, point_r in zip(points, points_res):
self.evaluate_method.copy_functionals(point, point_r)
return points
88 changes: 88 additions & 0 deletions iOpt/method/async_parallel_process.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
import traceback
from datetime import datetime

from iOpt.evolvent.evolvent import Evolvent
from iOpt.method.async_calculator import AsyncCalculator
from iOpt.method.index_method_calculator import IndexMethodCalculator
from iOpt.method.listener import Listener
from iOpt.method.method import Method
from iOpt.method.optim_task import OptimizationTask
from iOpt.method.process import Process
from iOpt.method.search_data import SearchData
from iOpt.solution import Solution
from iOpt.solver_parametrs import SolverParameters


class AsyncParallelProcess(Process):
def __init__(
self,
parameters: SolverParameters,
task: OptimizationTask,
evolvent: Evolvent,
search_data: SearchData,
method: Method,
listeners: list[Listener],
):
super(AsyncParallelProcess, self).__init__(
parameters, task, evolvent, search_data, method, listeners
)
self.calculator = AsyncCalculator(IndexMethodCalculator(task), parameters)

def do_global_iteration(self, number: int = 1) -> None:
done_trials = []
if self._first_iteration is True:
for listener in self._listeners:
listener.before_method_start(self.method)
done_trials = self.method.first_iteration(self.calculator)
self._first_iteration = False
number -= 1

for _ in range(number):
for _ in range(self.calculator.waiting_workers):
newpoint, oldpoint = self.method.calculate_iteration_point()
self.calculator.give_point(newpoint, oldpoint)
self.method.finalize_iteration()

for newpoint, oldpoint in self.calculator.take_list_of_calculated_points():
self.method.update_optimum(newpoint)
self.method.renew_search_data(newpoint, oldpoint)

done_trials.extend(
self.search_data.get_last_items(self.calculator.waiting_workers)
)

for listener in self._listeners:
listener.on_end_iteration(done_trials, self.get_results())

def solve(self) -> Solution:
"""
Метод позволяет решить задачу оптимизации. Остановка поиска выполняется согласно критерию,
заданному при создании класса Solver.
:return: Текущая оценка решения задачи оптимизации
"""
self.calculator.start()

start_time = datetime.now()
try:
while not self.method.check_stop_condition():
self.do_global_iteration()
except Exception:
print("Exception was thrown")
print(traceback.format_exc())

for newpoint, oldpoint in self.calculator.stop():
self.method.update_optimum(newpoint)
self.method.renew_search_data(newpoint, oldpoint)

if self.parameters.refine_solution:
self.do_local_refinement(self.parameters.local_method_iteration_count)

result = self.get_results()
result.solving_time = (datetime.now() - start_time).total_seconds()

for listener in self._listeners:
status = self.method.check_stop_condition()
listener.on_method_stop(self.search_data, self.get_results(), status)

return result
4 changes: 3 additions & 1 deletion iOpt/method/search_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ def __init__(self, y: Point, x: np.double,
self.globalR: np.double = -1.0
self.localR: np.double = -1.0
self.iterationNumber: int = -1
self.blocked: bool = False

def get_x(self) -> np.double:
"""
Expand Down Expand Up @@ -321,7 +322,8 @@ def refill_queue(self):
"""
self._RGlobalQueue.Clear()
for itr in self:
self._RGlobalQueue.insert(itr.globalR, itr)
if not itr.blocked:
self._RGlobalQueue.insert(itr.globalR, itr)

# Возвращает текущее число интервалов в дереве
def get_count(self) -> int:
Expand Down
5 changes: 5 additions & 0 deletions iOpt/method/solverFactory.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from typing import List

from iOpt.evolvent.evolvent import Evolvent
from iOpt.method.async_parallel_process import AsyncParallelProcess
from iOpt.method.index_method import IndexMethod
from iOpt.method.listener import Listener
from iOpt.method.method import Method
Expand Down Expand Up @@ -65,6 +66,10 @@ def create_process(parameters: SolverParameters,
if parameters.number_of_parallel_points == 1:
return Process(parameters=parameters, task=task, evolvent=evolvent,
search_data=search_data, method=method, listeners=listeners)
elif parameters.async_scheme:
return AsyncParallelProcess(parameters=parameters, task=task, evolvent=evolvent,
search_data=search_data, method=method, listeners=listeners)
else:
return ParallelProcess(parameters=parameters, task=task, evolvent=evolvent,
search_data=search_data, method=method, listeners=listeners)

2 changes: 2 additions & 0 deletions iOpt/solver_parametrs.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ def __init__(self,
refine_solution: bool = False,
start_point: Point = [],
number_of_parallel_points: int = 1,
async_scheme: bool = False,
timeout: int = -1,
proportion_of_global_iterations: float = 0.95
):
Expand Down Expand Up @@ -54,6 +55,7 @@ def __init__(self,
self.refine_solution = refine_solution
self.start_point = start_point
self.number_of_parallel_points = number_of_parallel_points
self.async_scheme = async_scheme
self.timeout = timeout

def to_string(self) -> str:
Expand Down
3 changes: 2 additions & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,5 @@ sphinx_rtd_theme
readthedocs-sphinx-search
sphinxcontrib-details-directive
autodocsumm
pathos
pathos
multiprocess
68 changes: 68 additions & 0 deletions test/iOpt/method/test_async_parallel_process.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
import unittest
from time import sleep

from iOpt.evolvent.evolvent import Evolvent
from iOpt.method.async_parallel_process import AsyncParallelProcess
from iOpt.method.method import Method
from iOpt.method.optim_task import OptimizationTask
from iOpt.method.search_data import SearchData
from iOpt.problem import Problem
from iOpt.solver_parametrs import SolverParameters
from iOpt.trial import Point, FunctionValue


class ProblemForTest(Problem):
def __init__(self):
super().__init__()
self.dimension = 1
self.number_of_float_variables = 1
self.number_of_constraints = 0
self.number_of_objectives = 1
self.lower_bound_of_float_variables = [-1]
self.upper_bound_of_float_variables = [1]

def calculate(self, point: Point, function_value: FunctionValue) -> FunctionValue:
sleep(1 - point.float_variables[0])
function_value.value = point.float_variables[0] ** 2
return function_value


class TestAsyncParallelProcess(unittest.TestCase):
def setUp(self):
param = SolverParameters(number_of_parallel_points=2, iters_limit=4)
problem = ProblemForTest()
task = OptimizationTask(problem)
evolvent = Evolvent(
problem.lower_bound_of_float_variables,
problem.upper_bound_of_float_variables,
problem.number_of_float_variables,
)
search_data = SearchData(problem)
method = Method(param, task, evolvent, search_data)
self.async_parallel_process = AsyncParallelProcess(
param, task, evolvent, search_data, method, []
)

def test_Solve(self):
self.async_parallel_process.solve()
items = self.async_parallel_process.search_data.get_last_items(5)
self.assertAlmostEqual(-1 / 3, items[0].get_y().float_variables[0])
self.assertAlmostEqual(1 / 3, items[1].get_y().float_variables[0])
self.assertAlmostEqual(2 / 3, items[2].get_y().float_variables[0])
self.assertAlmostEqual(0.0, items[3].get_y().float_variables[0])
self.assertAlmostEqual(-2 / 3, items[4].get_y().float_variables[0])
self.assertAlmostEqual(
0.0,
self.async_parallel_process.search_data.solution.best_trials[0]
.point.float_variables[0],
)
self.assertAlmostEqual(
0.0,
self.async_parallel_process.search_data.solution.best_trials[0]
.function_values[0].value,
)
self.assertEqual(4, self.async_parallel_process.method.iterations_count)


if __name__ == "__main__":
unittest.main()

0 comments on commit 6647f28

Please sign in to comment.