Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Сode documentation #26

Merged
merged 12 commits into from
Jun 14, 2023
4 changes: 4 additions & 0 deletions sampo/generator/environment/contractor.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@

def _get_stochastic_counts(pack_count: float, sigma_scaler: float, proportions: dict[str, float] | None,
available_types: list | None = None, rand: Random | None = None) -> dict[str, int]:
"""
Return random quantity of each type of resources. Random value is gotten from Gaussian distribution
"""
available_types = available_types or list(proportions.keys())
counts = {name: prop * pack_count for name, prop in proportions.items() if name in available_types}
stochastic_counts = {
Expand Down Expand Up @@ -51,6 +54,7 @@ def get_contractor(pack_worker_count: float,
available_worker_types: list | None = None, rand: Random | None = None) -> Contractor:
"""
Generates a contractor for a synthetic graph for a given resource scalar and generation parameters

:param pack_worker_count: The number of resource sets
:param sigma_scaler: parameter to calculate the scatter by Gaussian distribution with mean=0 amount from the
transferred proportions
Expand Down
18 changes: 10 additions & 8 deletions sampo/generator/pipeline/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,11 @@


def _add_addition_work(probability: float, rand: Random | None = None) -> bool:
"""Return answer, if addition work will be added"""
return IntervalUniform(0, 1).rand_float(rand) <= probability


# TODO: describe
def _get_roads(parents: list[GraphNode], cluster_name: str, dist: float,
rand: Random | None = None) -> dict[str, GraphNode]:
road_nodes = dict()
Expand All @@ -28,7 +30,7 @@ def _get_roads(parents: list[GraphNode], cluster_name: str, dist: float,
road_nodes['final'] = GraphNode(final_r, [(road_nodes['temp'], wr.ATOMIC_ROAD_LEN, EdgeType.LagFinishStart)])
return road_nodes


# TODO: describe
def _get_engineering_preparation(parents: list[GraphNode], cluster_name: str, boreholes_count: int,
rand: Random | None = None) -> GraphNode:
worker_req = wr.mul_borehole_volume(wr.ENGINEERING_PREPARATION, boreholes_count, wr.ENGINEERING_PREPARATION_BASE)
Expand All @@ -38,7 +40,7 @@ def _get_engineering_preparation(parents: list[GraphNode], cluster_name: str, bo
node = GraphNode(work, parents)
return node


# TODO: describe
def _get_power_lines(parents: list[GraphNode], cluster_name: str, dist_line: float,
dist_high_line: float | None = None, rand: Random | None = None) -> list[GraphNode]:
worker_req = wr.scale_reqs(wr.POWER_LINE, dist_line)
Expand All @@ -61,7 +63,7 @@ def _get_power_lines(parents: list[GraphNode], cluster_name: str, dist_line: flo

return power_lines


# TODO: describe
def _get_pipe_lines(parents: list[GraphNode], cluster_name: str, pipe_dists: list[float],
rand: Random | None = None) -> list[GraphNode]:
worker_req_pipe = wr.scale_reqs(wr.PIPE_LINE, pipe_dists[0])
Expand All @@ -85,7 +87,7 @@ def _get_pipe_lines(parents: list[GraphNode], cluster_name: str, pipe_dists: lis
graph_nodes.append(GraphNode(looping, graph_nodes[0:1]))
return graph_nodes


# TODO: describe
def _get_boreholes_equipment_group(parents: list[GraphNode], cluster_name: str, group_ind: int, borehole_count: int,
rand: Random | None = None) -> list[GraphNode]:
metering_install = WorkUnit(uuid_str(rand), "metering installation",
Expand All @@ -105,7 +107,7 @@ def _get_boreholes_equipment_group(parents: list[GraphNode], cluster_name: str,
]
return nodes


# TODO: describe
def _get_boreholes_equipment_shared(parents: list[GraphNode], cluster_name: str,
rand: Random | None = None) -> list[GraphNode]:
water_block = WorkUnit(uuid_str(rand), "block water distribution", wr.WATER_BLOCK,
Expand All @@ -126,7 +128,7 @@ def _get_boreholes_equipment_shared(parents: list[GraphNode], cluster_name: str,
]
return nodes


# TODO: describe
def _get_boreholes(parents: list[GraphNode], cluster_name: str, group_ind: int, borehole_count: int,
rand: Random | None = None) -> list[GraphNode]:
nodes = []
Expand All @@ -136,7 +138,7 @@ def _get_boreholes(parents: list[GraphNode], cluster_name: str, group_ind: int,
nodes.append(GraphNode(borehole_work, parents))
return nodes


# TODO: describe
def _get_boreholes_equipment_general(parents: list[GraphNode], cluster_name: str, pipes_count: int, masts_count: int,
rand: Random | None = None) -> list[GraphNode]:
nodes = []
Expand All @@ -160,7 +162,7 @@ def _get_boreholes_equipment_general(parents: list[GraphNode], cluster_name: str
nodes.append(GraphNode(light_mast_work, parents))
return nodes


# TODO: describe
def _get_handing_stage(parents: list[GraphNode], cluster_name: str, borehole_count: int,
rand: Random | None = None) -> GraphNode:
worker_req = wr.mul_borehole_volume(wr.HANDING_STAGE, borehole_count, wr.HANDING_STAGE_BASE)
Expand Down
4 changes: 3 additions & 1 deletion sampo/generator/pipeline/extension.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ def _update_work_name(node: GraphNode, name_to_suffixes: dict[str, list[str]], r
def extend_resources(uniq_resources: int, wg: WorkGraph, rand: Random) -> WorkGraph:
"""
Increases the number of unique resources in WorkGraph

:param uniq_resources: the amount to which you need to increase
:param wg: original WorkGraph
:param rand: Number generator with a fixed seed, or None for no fixed seed
Expand All @@ -63,7 +64,8 @@ def extend_resources(uniq_resources: int, wg: WorkGraph, rand: Random) -> WorkGr

def extend_names(uniq_activities: int, wg: WorkGraph, rand: Random) -> WorkGraph:
"""
Increases the number of unique work names in WorkGraph
Increases the number of unique work names in WorkGraph

:param uniq_activities: the amount to which you need to increase
:param wg: original WorkGraph
:param rand: Number generator with a fixed seed, or None for no fixed seed
Expand Down
10 changes: 5 additions & 5 deletions sampo/generator/pipeline/project.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,11 @@

def get_small_graph(cluster_name: str | None = 'C1', rand: Random | None = None) -> WorkGraph:
"""
Creates a small graph of works consisting of 30-50 vertices;
Creates a small graph of works consisting of 30-50 vertices

:param cluster_name: str - the first cluster name
:param rand: Optional[Random] - generator of numbers with a given seed or None
:return:
work_graph: WorkGraph - work graph where count of vertex between 30 and 50
:return: work_graph: WorkGraph - work graph where count of vertex between 30 and 50
"""

s = get_start_stage()
Expand Down Expand Up @@ -73,8 +73,8 @@ def get_graph(mode: SyntheticGraphType | None = SyntheticGraphType.General,
top_border: int | None = 0,
rand: Random | None = None) -> WorkGraph:
"""
Invokes a graph of the given type if at least one positive value of
cluster_counts, addition_cluster_probability, bottom_border is given;
Invokes a graph of the given type if at least one positive value of cluster_counts, addition_cluster_probability, bottom_border is given

:param mode: str - 'general' or 'sequence' or 'parallel - the type of the returned graph
:param cluster_name_prefix: str - cluster name prefix, if the prefix is 'C',
the clusters will be called 'C1', 'C2' etc.
Expand Down
112 changes: 72 additions & 40 deletions sampo/metrics/resources_in_time/base.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from abc import abstractmethod, ABC
from collections import defaultdict
from typing import Tuple, List, Optional, Union
from typing import Optional, Union
from uuid import uuid4

import numpy as np
Expand All @@ -16,43 +16,60 @@

class ResourceOptimizer(ABC):
@abstractmethod
def optimize(self, wg: WorkGraph, deadline: Time) -> Tuple[Contractor, Time]:
def optimize(self, wg: WorkGraph, deadline: Time) -> tuple[Contractor, Time]:
...


# TODO: remove ScheduleWorkDict option
def max_time_schedule(schedule: Union[ScheduleWorkDict, Schedule]):
return max(work["finish"] for work in schedule.values()) \
if isinstance(schedule, ScheduleWorkDict) \
else max(swork.start_end_time[1] for swork in schedule.works)
if isinstance(schedule, ScheduleWorkDict) \
else max(swork.start_end_time[1] for swork in schedule.works)


def get_schedule_with_time(scheduler: Scheduler, wg: WorkGraph,
agent_counts: np.array, agent_names: List[str]) -> Tuple[Schedule, Time]:
scheduled_works = scheduler.schedule(wg, agents_to_contractors(agent_counts, agent_names))
worker_counts: np.array, worker_names: list[str]) -> tuple[Schedule, Time]:
"""
Build schedule and return it with max execution time

:param scheduler:
:param wg: received WorkGraph
:param worker_counts:
:param worker_names:
:return:
"""
scheduled_works = scheduler.schedule(wg, workers_to_contractors(worker_counts, worker_names))
max_time = max_time_schedule(scheduled_works)
return scheduled_works, max_time


def agents_to_contractors(agent_counts: np.array, agent_names: List[str],
vanoha marked this conversation as resolved.
Show resolved Hide resolved
contractor_id: Optional[str or None] = None,
contractor_name: Optional[str] = ""):
def workers_to_contractors(worker_counts: np.array,
worker_names: list[str],
contractor_id: Optional[str or None] = None,
contractor_name: Optional[str] = ""):
contractor_id = contractor_id or uuid4()
# TODO Remove...
# TODO: remove try
try:
workers = [Worker(str(uuid4()), name, count) for name, count in zip(agent_names, agent_counts)]
except Exception:
print('обосрамс...')
workers = [Worker(str(uuid4()), name, count) for name, count in zip(worker_names, worker_counts)]
workers_dict = {(w.name, w.productivity_class): w for w in workers}
return [Contractor(contractor_id, contractor_name, agent_names, [], workers_dict, {})]
return [Contractor(contractor_id, contractor_name, worker_names, [], workers_dict, {})]


def is_resources_good(wg: WorkGraph,
agent_counts: np.array, agent_names: List[str],
scheduler: Scheduler, deadline: Time) -> bool:
worker_counts: np.array,
worker_names: list[str],
scheduler: Scheduler,
deadline: Time) -> bool:
"""
Return if schedule with given set of resources will be done before deadline

:param wg:
:param worker_counts:
:param worker_names:
:param scheduler:
:param deadline:
:return:
"""
try:
_, schedule_time = get_schedule_with_time(scheduler, wg, agent_counts, agent_names)
_, schedule_time = get_schedule_with_time(scheduler, wg, worker_counts, worker_names)
return schedule_time <= deadline
# TODO: remove
except AssertionError:
Expand All @@ -62,52 +79,67 @@ def is_resources_good(wg: WorkGraph,
return False


def find_min_agents(wg: WorkGraph, max_workers: int) -> Tuple[np.array, List[str]]:
min_agents = defaultdict(lambda: max_workers)
def find_min_workers(wg: WorkGraph, max_workers: int) -> tuple[np.array, list[str]]:
"""
Find minimum amount of resource of certain type among each worker and build list of workers with minimum amount
certain type of resource

:param wg:
:param max_workers:
:return:
"""
min_workers = defaultdict(lambda: max_workers)
for node in wg.nodes:
for req in node.work_unit.worker_reqs:
min_agents[req.kind] = min(min_agents[req.kind], req.min_count)
min_counts_list, agent_names = list(zip(*[(count, name) for name, count in min_agents.items()]))
min_workers[req.kind] = min(min_workers[req.kind], req.min_count)
min_counts_list, worker_names = list(zip(*[(count, name) for name, count in min_workers.items()]))
min_counts: np.array = np.array(min_counts_list, dtype=int)
return min_counts, agent_names
return min_counts, worker_names


def get_minimal_counts_by_schedule(scheduled_works: Schedule, agent_names: List[str]) -> np.array:
def get_minimal_counts_by_schedule(scheduled_works: Schedule, worker_names: list[str]) -> np.array:
workers_intervals = get_workers_intervals(scheduled_works)
max_used_counts = defaultdict(int)
for name_index in workers_intervals:
name, index = name_index.split(SPLITTER)
max_used_counts[name] = max(max_used_counts[name], int(index))
optimal_counts = np.array([max_used_counts[name] + 1 for name in agent_names], dtype=int)
optimal_counts = np.array([max_used_counts[name] + 1 for name in worker_names], dtype=int)
return optimal_counts


def init_borders(wg: WorkGraph, scheduler: Scheduler, deadline: Time,
def init_borders(wg: WorkGraph,
scheduler: Scheduler,
deadline: Time,
worker_factor: int,
max_workers: int,
right_agents: WorkerContractorPool or None) -> Tuple[Optional[np.ndarray], Optional[np.ndarray], List[str]]:
left_counts, agent_names = find_min_agents(wg, max_workers)
if right_agents is None:
right_workers: WorkerContractorPool or None) -> tuple[
Optional[np.ndarray], Optional[np.ndarray], list[str]]:
left_counts, worker_names = find_min_workers(wg, max_workers)
if right_workers is None:
right_counts: np.ndarray = left_counts * worker_factor
else:
right_counts = np.array([right_agents[name] for name in agent_names])
right_counts = np.array([right_workers[name] for name in worker_names])

if is_resources_good(wg, left_counts, agent_names, scheduler, deadline):
if is_resources_good(wg, left_counts, worker_names, scheduler, deadline):
# wg is resolved in time by minimal set of workers
return left_counts, None, agent_names
return left_counts, None, worker_names

if not is_resources_good(wg, right_counts, agent_names, scheduler, deadline):
if not is_resources_good(wg, right_counts, worker_names, scheduler, deadline):
# wg is not resolved in time by any set of workers
return None, right_counts, agent_names
return left_counts, right_counts, agent_names
return None, right_counts, worker_names

return left_counts, right_counts, worker_names


def prepare_answer(counts: np.array, agent_names: List[str], wg: WorkGraph, scheduler: Scheduler,
def prepare_answer(counts: np.array,
worker_names: list[str],
wg: WorkGraph,
scheduler: Scheduler,
dry_resources: bool):
schedule, max_time = get_schedule_with_time(scheduler, wg, counts, agent_names)
schedule, max_time = get_schedule_with_time(scheduler, wg, counts, worker_names)
if dry_resources:
optimal_counts = get_minimal_counts_by_schedule(schedule, agent_names)
optimal_counts = get_minimal_counts_by_schedule(schedule, worker_names)
else:
optimal_counts = counts
contractor = agents_to_contractors(optimal_counts, agent_names)[0]
contractor = workers_to_contractors(optimal_counts, worker_names)[0]
return contractor, max_time
Loading