From f52378532ca28315249209a1924f185f1b4905ff Mon Sep 17 00:00:00 2001 From: deuce1957 Date: Tue, 12 Mar 2024 11:19:48 +0100 Subject: [PATCH] Add: Compact Episode Data, Debug: Fix 0-d array --- grid2op/Episode/CompactEpisodeData.py | 312 +++++++++++++++++++++++ grid2op/Episode/EpisodeReplay.py | 2 +- grid2op/Episode/__init__.py | 1 + grid2op/Runner/aux_fun.py | 215 +++++++++------- grid2op/Runner/runner.py | 12 +- grid2op/tests/test_CompactEpisodeData.py | 265 +++++++++++++++++++ grid2op/tests/test_EpisodeData.py | 12 +- 7 files changed, 712 insertions(+), 107 deletions(-) create mode 100644 grid2op/Episode/CompactEpisodeData.py create mode 100644 grid2op/tests/test_CompactEpisodeData.py diff --git a/grid2op/Episode/CompactEpisodeData.py b/grid2op/Episode/CompactEpisodeData.py new file mode 100644 index 000000000..3ed6af14a --- /dev/null +++ b/grid2op/Episode/CompactEpisodeData.py @@ -0,0 +1,312 @@ +# Copyright (c) 2019-2020, RTE (https://www.rte-france.com) +# Addition by Xavier Weiss (@DEUCE1957) +# This Source Code Form is subject to the terms of the Mozilla Public License, version 2.0. +# If a copy of the Mozilla Public License, version 2.0 was not distributed with this file, +# you can obtain one at http://mozilla.org/MPL/2.0/. +# SPDX-License-Identifier: MPL-2.0 +# This file is part of Grid2Op, Grid2Op a testbed platform to model sequential decision making in power systems. +import json +import os +import warnings +import copy +import numpy as np +import grid2op +from grid2op.Exceptions import ( + Grid2OpException, + EnvError, + IncorrectNumberOfElements, + NonFiniteElement, +) +from grid2op.Action import ActionSpace +from grid2op.Observation import ObservationSpace + +from pathlib import Path as p + +class CompactEpisodeData(): + + """ + This module provides a compact way to serialize/deserialize one episode of a Reinforcement Learning (RL) run. + This enables episodes to be replayed, so we can understand the behaviour of the agent. + It is compatible with :class:`EpisodeData` through the "to_dict()" method. + + If enabled when using the :class:`Runner`, the :class:`CompactEpisodeData` + will save the information in a structured and compact way. + For each unique environment it will store a folder with: + - "dict_action_space.json" + - "dict_attack_space.json" + - "dict_env_modification.json" + - "dict_observation_space.json" + Then for each episode it stores a single compressed Numpy archive (.npz) file, identified by the chronics ID (e.g. "003"). + Inside this archive we find: + - "actions": actions taken by the :class:`grid2op.BaseAgent.BaseAgent`, each row of this numpy 2d-array is a vector representation of the action + taken by the agent at a particular timestep. + - "env_actions": represents the modification of the powergrid by the environment, these modification usually concern hazards, maintenance, as well as modification of the generators production + setpoint or the loads consumption. + - "attacks": actions taken by any opponent present in the RL environment, stored similary to "actions". + - "observations": observations of the class :class:`grid2op.BaseObservation.BaseObservation made by the :class:`grid2op.Agent.BaseAgent` after taking an action, stored as a numpy 2d-array + where each row corresponds to a vector representation of the observation at that timestep. Note this includes the initial timestep, hence this array is 1 row longer than (e.g.) the actionss. + - "rewards": reward received by the :class:`grid2op.Agent.BaseAgent from the :class:`grid2op.Environment` at timestep 't', represented as 1d-array. + - "other_rewards": any other rewards logged by the :class:`grid2op.Environment` (but not necessarily passed to the agent), represented as a 2d-array. + - "disc_lines": gives which lines have been disconnected during the simulation at each time step. The same convention as for "rewards" has been adopted. This means that the powerlines are + disconnected when the :class:`grid2op.Agent.BaseAgent` takes the :class:`grid2op.BaseAction` at timestep 't`. + - "times": gives some information about the processor time spent (in seconds), mainly the time taken by + :class:`grid2op.Agent.BaseAgent` (and especially its method :func:`grid2op.BaseAgent.act`) and amount of time + spent in the :class:`grid2op.Environment.Environment` + All of the above can be read back from disk. + + Inputs + ---------- + environment: :class:`grid2op.Environment` + The environment we are running, contains most of the metadata required to store the episode. + obs: :class:`grid2op.Observation` + The initial observation of the environment in the current episode. Used to store the first observation. + + Examples + -------- + Here is an example on how to use the :class:`CompactEpisodeData` class outside of the :class:`grid2op.Runner.Runner`. + + .. code-block:: python + from pathlib import Path as p + from grid2op.Agent import DoNothingAgent + env = grid2op.make(""rte_case14_realistic") + obs = env.reset() + ep_id = env.chronics_handler.get_name() + data_dir = p.cwd() # CHANGE THIS TO DESIRED LOCATION ON DISK + agent = DoNothingAgent(env.action_space) + reward = 0.0 + episode_store = CompactEpisodeData(env, obs) + for t in range(env.max_episode_duration()): + start = time.perf_counter() + act = agent.act(obs, reward) + obs, reward, done, info = env.step(act) + duration = time.perf_counter() - start + episode_store.update(t, env, act, obs, reward, duration, info) + # Store Episode Data to file (compactly) + episode_store.to_disk() + # Load Episode Data from disk by referring to the specific episode ID + episode_store.from_disk(ep_id) + """ + + def __init__(self, env, obs, exp_dir, ep_id:str=None): + """ + Creates Dictionary of Numpy Arrays for storing the details of a Grid2Op Episode (actions, observations, etc.). + Pre-allocating the arrays like this is more efficient than appending to a mutable datatype (like a list). + For the initial timestep, an extra observation is stored (the initial state of the Environment). + + Args: + env (grid2op.Environment): Current Grid2Op Environment, used to grab static attributes. + obs (grid2op.Observation): Initial Observation (before agent is active) + exp_dir (pathlib.Path): Where experiment data is stored + ep_id (str | None): If provided tries to load previously stored episode from disk. + + Returns: + dict: Contains all data to fully represent what happens in an episode + """ + if exp_dir is not None: + self.exp_dir = p(exp_dir) + else: + self.exp_dir = None + self.array_names = ("actions", "env_actions", "attacks", "observations", "rewards", "other_rewards", "disc_lines", "times") + self.space_names = ("observation_space", "action_space", "attack_space", "env_modification_space") + if ep_id is None: + self.ep_id = env.chronics_handler.get_name() + max_no_of_timesteps = int(env.max_episode_duration()) + + # Numpy Arrays + self.actions = np.full((max_no_of_timesteps, env.action_space.n), fill_value=np.NaN, dtype=np.float16) + self.env_actions = np.full((max_no_of_timesteps, env._helper_action_env.n), fill_value=np.NaN, dtype=np.float32) + self.attacks = np.full((max_no_of_timesteps, env._opponent_action_space.n), fill_value=0.0, dtype=np.float32) + self.observations = np.full((max_no_of_timesteps + 1, len(obs.to_vect())),fill_value=np.NaN,dtype=np.float32) + self.rewards = np.full(max_no_of_timesteps, fill_value=np.NaN, dtype=np.float32) + self.other_reward_names = list(sorted(env.other_rewards.keys())) + self.other_rewards = np.full((max_no_of_timesteps, len(self.other_reward_names)), fill_value=np.NaN, dtype=np.float32) + self.disc_lines = np.full((max_no_of_timesteps, env.backend.n_line), fill_value=np.NaN, dtype=np.bool_) + self.times = np.full(max_no_of_timesteps, fill_value=np.NaN, dtype=np.float32) + + self.disc_lines_templ = np.full((1, env.backend.n_line), fill_value=False, dtype=np.bool_) + # AttackTempl: Not used, kept for comptabiility with EpisodeData + self.attack_templ = np.full((1, env._oppSpace.action_space.size()), fill_value=0.0, dtype=np.float32) + + self.legal = np.full(max_no_of_timesteps, fill_value=True, dtype=np.bool_) + self.ambiguous = np.full(max_no_of_timesteps, fill_value=False, dtype=np.bool_) + self.n_cols = env.action_space.n + env._helper_action_env.n + len(obs.to_vect()) + env.backend.n_line + env._oppSpace.action_space.size() + 6 + + # Store First Observation + self.observations[0] = obs.to_vect() + self.game_over_timestep = max_no_of_timesteps + + # JSON-serializable Objects + self.observation_space=env.observation_space + self.action_space=env.action_space + self.attack_space=env._opponent_action_space + self.env_modification_space=env._helper_action_env + + # Special JSON-Serializable Object: Episode MetaData + self.meta = dict( + chronics_path = self.ep_id, + chronics_max_timestep = max_no_of_timesteps, + game_over_timestep = self.game_over_timestep, + other_reward_names = self.other_reward_names, + grid_path = env._init_grid_path, + backend_type = type(env.backend).__name__, + env_type = type(env).__name__, + env_seed = (env.seed_used.item() if env.seed_used.ndim == 0 else list(env.seed_used)) if isinstance(env.seed_used, np.ndarray) else env.seed_used, + agent_seed = self.action_space.seed_used, + nb_timestep_played = 0, + cumulative_reward = 0.0, + ) + elif exp_dir is not None: + self.load_metadata(ep_id) + self.load_spaces() + self.load_arrays(ep_id) + + def update(self, t:int, env, action, + obs, reward:float, done:bool, duration:float, info): + """ + Update the arrays in the Episode Store for each step of the environment. + Args: + t (int): Current time step + env (grid2op.Environment): State of Environment + action (grid2op.Action): Action agent took on the Environment + obs (grid2op.Observation): Observed result of action on Environment + reward (float): Numeric reward returned by Environment for the given action + duration (float): Time in seconds needed to choose and execute the action + info (dict): Dictionary containing information on legality and ambiguity of action + """ + self.actions[t - 1] = action.to_vect() + self.env_actions[t - 1] = env._env_modification.to_vect() + self.observations[t] = obs.to_vect() + opp_attack = env._oppSpace.last_attack + if opp_attack is not None: + self.attacks[t - 1] = opp_attack.to_vect() + self.rewards[t - 1] = reward + if "disc_lines" in info: + arr = info["disc_lines"] + if arr is not None: + self.disc_lines[t - 1] = arr + else: + self.disc_lines[t - 1] = self.disc_lines_templ + if "rewards" in info: + for i, other_reward_name in enumerate(self.other_reward_names): + self.other_rewards[t-1, i] = info["rewards"][other_reward_name] + self.times[t - 1] = duration + self.legal[t - 1] = not info["is_illegal"] + self.ambiguous[t - 1] = info["is_ambiguous"] + if done: + self.game_over_timestep = t + # Update metadata + self.meta.update( + nb_timestep_played = t, + cumulative_reward = self.meta["cumulative_reward"] + float(reward), + ) + return self.meta["cumulative_reward"] + + def asdict(self): + """ + Return the Episode Store as a dictionary. + Compatible with Grid2Op's internal EpisodeData format as keyword arguments. + """ + # Other rewards in Grid2op's internal Episode Data is a list of dictionaries, so we convert to that format + other_rewards = [{other_reward_name:float(self.other_rewards[t, i]) for i, other_reward_name in enumerate(self.other_reward_names)} for t in range(len(self.times))] + return dict(actions=self.actions, env_actions=self.env_actions, + observations=self.observations, + rewards=self.rewards, + other_rewards=other_rewards, + disc_lines=self.disc_lines, times=self.times, + disc_lines_templ=self.disc_lines_templ, attack_templ=self.attack_templ, + attack=self.attacks, legal=self.legal, ambiguous=self.ambiguous, + observation_space=self.observation_space, action_space=self.action_space, + attack_space=self.attack_space, helper_action_env=self.env_modification_space) + + def store_metadata(self): + """ + Store this Episode's meta data to disk. + """ + print({k:(v,type(v)) for k,v in self.meta.items()}) + with open(self.exp_dir / f"{self.ep_id}_metadata.json", "w", encoding="utf-8") as f: + json.dump(self.meta, f, indent=4, sort_keys=True) + + def load_metadata(self, ep_id:str): + """ + Load metadata from a specific Episode. + """ + with open(self.exp_dir / f"{ep_id}_metadata.json", "r", encoding="utf-8") as f: + self.meta = json.load(f) + self.other_reward_names = self.meta["other_reward_names"] + self.game_over_timestep = self.meta["game_over_timestep"] + + def store_spaces(self): + """ + Store the Observation, Action, Environment and Opponent spaces to disk. + """ + for space_name in self.space_names: + with open(self.exp_dir / f"dict_{space_name}.json", "w", encoding="utf-8") as f: + json.dump(getattr(self, space_name).cls_to_dict(), f, indent=4, sort_keys=True) + + def load_spaces(self): + """ + Load the Observation, Action, Environment and Opponent spaces from disk + """ + for space_name in self.space_names: + with open(self.exp_dir / f"dict_{space_name}.json", "r", encoding="utf-8") as f: + if space_name == "observation_space": + setattr(self, space_name, ObservationSpace.from_dict(json.load(f))) + else: + setattr(self, space_name, ActionSpace.from_dict(json.load(f))) + + def store_arrays(self): + """ + Store compressed versions of the Actions, Observations, Rewards, Attacks and other metadata + to disk as a compressed numpy archive (single file per episode). + """ + np.savez_compressed(self.exp_dir / f"{self.ep_id}.npz", **{array_name: getattr(self, array_name) for array_name in self.array_names}) + + def load_arrays(self, ep_id:str): + """ + Load Actions, Observations, Rewards, Attacks and other metadata from disk + for a specific Episode ID (identified by Chronics name) + """ + arrays = np.load(self.exp_dir / f"{ep_id}.npz") + for array_name in self.array_names: + setattr(self, array_name, arrays[array_name]) + self.ep_id = ep_id + + def to_disk(self): + """ + Store this EpisodeStore object instance to disk (as .json and .npz files) + """ + if self.exp_dir is not None: + # Store Episode metadata + self.store_metadata() + # Store Spaces (values are static, so only save once per experiment) + if len([f for f in self.exp_dir.glob("*.json")]) != 4: + self.store_spaces() + # Store Arrays as Compressed Numpy archive + self.store_arrays() + + @classmethod + def from_disk(cls, path, ep_id:str): + """ + Load EpisodeStore data from disk for a specific episode. + """ + return cls(env=None, obs=None, exp_dir=p(path), ep_id=ep_id) + + @staticmethod + def list_episode(path): + """ + From a given path, extracts the episodes that can be loaded + + Parameters + ---------- + path: ``str`` + The path where to look for data coming from "episode" + + Returns + ------- + res: ``list`` + A list of possible episodes. Each element of this list is a tuple: (full_path, episode_name) + """ + return [(str(full_path), full_path.stem) for full_path in path.glob("*.npz")] + + def __len__(self): + return self.game_over_timestep diff --git a/grid2op/Episode/EpisodeReplay.py b/grid2op/Episode/EpisodeReplay.py index 0e9d98a91..83aaafc25 100644 --- a/grid2op/Episode/EpisodeReplay.py +++ b/grid2op/Episode/EpisodeReplay.py @@ -15,7 +15,7 @@ from grid2op.Exceptions import Grid2OpException from grid2op.PlotGrid.PlotMatplot import PlotMatplot from grid2op.Episode.EpisodeData import EpisodeData - +from grid2op.Episode.CompactEpisodeData import CompactEpisodeData class EpisodeReplay(object): """ diff --git a/grid2op/Episode/__init__.py b/grid2op/Episode/__init__.py index 46040fba3..12abb7475 100644 --- a/grid2op/Episode/__init__.py +++ b/grid2op/Episode/__init__.py @@ -1,6 +1,7 @@ __all__ = ["EpisodeData"] from grid2op.Episode.EpisodeData import EpisodeData +from grid2op.Episode.CompactEpisodeData import CompactEpisodeData # Try to import optional module try: diff --git a/grid2op/Runner/aux_fun.py b/grid2op/Runner/aux_fun.py index 2f69d520f..db8b4ba68 100644 --- a/grid2op/Runner/aux_fun.py +++ b/grid2op/Runner/aux_fun.py @@ -14,7 +14,7 @@ from grid2op.Environment import Environment from grid2op.Agent import BaseAgent -from grid2op.Episode import EpisodeData +from grid2op.Episode import EpisodeData, CompactEpisodeData from grid2op.Runner.FakePBar import _FakePbar from grid2op.dtypes import dt_int, dt_float, dt_bool from grid2op.Chronics import ChronicsHandler @@ -79,6 +79,7 @@ def _aux_one_process_parrallel( max_iter=max_iter, agent_seed=agt_seed, detailed_output=add_detailed_output, + use_compact_episode_data=runner.use_compact_episode_data, ) (name_chron, cum_reward, nb_time_step, max_ts, episode_data, nb_highres_sim) = tmp_ id_chron = chronics_handler.get_id() @@ -104,6 +105,7 @@ def _aux_run_one_episode( agent_seed=None, max_iter=None, detailed_output=False, + use_compact_episode_data=False, ): done = False time_step = int(0) @@ -135,96 +137,99 @@ def _aux_run_one_episode( efficient_storing = nb_timestep_max > 0 nb_timestep_max = max(nb_timestep_max, 0) max_ts = nb_timestep_max - if path_save is None and not detailed_output: - # i don't store anything on drive, so i don't need to store anything on memory - nb_timestep_max = 0 + if use_compact_episode_data: + episode = CompactEpisodeData(env, obs, exp_dir=path_save) + else: + if path_save is None and not detailed_output: + # i don't store anything on drive, so i don't need to store anything on memory + nb_timestep_max = 0 - disc_lines_templ = np.full((1, env.backend.n_line), fill_value=False, dtype=dt_bool) + disc_lines_templ = np.full((1, env.backend.n_line), fill_value=False, dtype=dt_bool) - attack_templ = np.full( - (1, env._oppSpace.action_space.size()), fill_value=0.0, dtype=dt_float - ) - - if efficient_storing: - times = np.full(nb_timestep_max, fill_value=np.NaN, dtype=dt_float) - rewards = np.full(nb_timestep_max, fill_value=np.NaN, dtype=dt_float) - actions = np.full( - (nb_timestep_max, env.action_space.n), fill_value=np.NaN, dtype=dt_float - ) - env_actions = np.full( - (nb_timestep_max, env._helper_action_env.n), - fill_value=np.NaN, - dtype=dt_float, - ) - observations = np.full( - (nb_timestep_max + 1, env.observation_space.n), - fill_value=np.NaN, - dtype=dt_float, - ) - disc_lines = np.full( - (nb_timestep_max, env.backend.n_line), fill_value=np.NaN, dtype=dt_bool - ) - attack = np.full( - (nb_timestep_max, env._opponent_action_space.n), - fill_value=0.0, - dtype=dt_float, - ) - legal = np.full(nb_timestep_max, fill_value=True, dtype=dt_bool) - ambiguous = np.full(nb_timestep_max, fill_value=False, dtype=dt_bool) - else: - times = np.full(0, fill_value=np.NaN, dtype=dt_float) - rewards = np.full(0, fill_value=np.NaN, dtype=dt_float) - actions = np.full((0, env.action_space.n), fill_value=np.NaN, dtype=dt_float) - env_actions = np.full( - (0, env._helper_action_env.n), fill_value=np.NaN, dtype=dt_float - ) - observations = np.full( - (0, env.observation_space.n), fill_value=np.NaN, dtype=dt_float + attack_templ = np.full( + (1, env._oppSpace.action_space.size()), fill_value=0.0, dtype=dt_float ) - disc_lines = np.full((0, env.backend.n_line), fill_value=np.NaN, dtype=dt_bool) - attack = np.full( - (0, env._opponent_action_space.n), fill_value=0.0, dtype=dt_float - ) - legal = np.full(0, fill_value=True, dtype=dt_bool) - ambiguous = np.full(0, fill_value=False, dtype=dt_bool) - - need_store_first_act = path_save is not None or detailed_output - if need_store_first_act: - # store observation at timestep 0 + if efficient_storing: - observations[time_step, :] = obs.to_vect() + times = np.full(nb_timestep_max, fill_value=np.NaN, dtype=dt_float) + rewards = np.full(nb_timestep_max, fill_value=np.NaN, dtype=dt_float) + actions = np.full( + (nb_timestep_max, env.action_space.n), fill_value=np.NaN, dtype=dt_float + ) + env_actions = np.full( + (nb_timestep_max, env._helper_action_env.n), + fill_value=np.NaN, + dtype=dt_float, + ) + observations = np.full( + (nb_timestep_max + 1, env.observation_space.n), + fill_value=np.NaN, + dtype=dt_float, + ) + disc_lines = np.full( + (nb_timestep_max, env.backend.n_line), fill_value=np.NaN, dtype=dt_bool + ) + attack = np.full( + (nb_timestep_max, env._opponent_action_space.n), + fill_value=0.0, + dtype=dt_float, + ) + legal = np.full(nb_timestep_max, fill_value=True, dtype=dt_bool) + ambiguous = np.full(nb_timestep_max, fill_value=False, dtype=dt_bool) else: - observations = np.concatenate((observations, obs.to_vect().reshape(1, -1))) - - episode = EpisodeData( - actions=actions, - env_actions=env_actions, - observations=observations, - rewards=rewards, - disc_lines=disc_lines, - times=times, - observation_space=env.observation_space, - action_space=env.action_space, - helper_action_env=env._helper_action_env, - path_save=path_save, - disc_lines_templ=disc_lines_templ, - attack_templ=attack_templ, - attack=attack, - attack_space=env._opponent_action_space, - logger=logger, - name=env.chronics_handler.get_name(), - force_detail=detailed_output, - other_rewards=[], - legal=legal, - ambiguous=ambiguous, - has_legal_ambiguous=True, - ) - if need_store_first_act: - # I need to manually force in the first observation (otherwise it's not computed) - episode.observations.objects[0] = episode.observations.helper.from_vect( - observations[time_step, :] + times = np.full(0, fill_value=np.NaN, dtype=dt_float) + rewards = np.full(0, fill_value=np.NaN, dtype=dt_float) + actions = np.full((0, env.action_space.n), fill_value=np.NaN, dtype=dt_float) + env_actions = np.full( + (0, env._helper_action_env.n), fill_value=np.NaN, dtype=dt_float + ) + observations = np.full( + (0, env.observation_space.n), fill_value=np.NaN, dtype=dt_float + ) + disc_lines = np.full((0, env.backend.n_line), fill_value=np.NaN, dtype=dt_bool) + attack = np.full( + (0, env._opponent_action_space.n), fill_value=0.0, dtype=dt_float + ) + legal = np.full(0, fill_value=True, dtype=dt_bool) + ambiguous = np.full(0, fill_value=False, dtype=dt_bool) + + need_store_first_act = path_save is not None or detailed_output + if need_store_first_act: + # store observation at timestep 0 + if efficient_storing: + observations[time_step, :] = obs.to_vect() + else: + observations = np.concatenate((observations, obs.to_vect().reshape(1, -1))) + + episode = EpisodeData( + actions=actions, + env_actions=env_actions, + observations=observations, + rewards=rewards, + disc_lines=disc_lines, + times=times, + observation_space=env.observation_space, + action_space=env.action_space, + helper_action_env=env._helper_action_env, + path_save=path_save, + disc_lines_templ=disc_lines_templ, + attack_templ=attack_templ, + attack=attack, + attack_space=env._opponent_action_space, + logger=logger, + name=env.chronics_handler.get_name(), + force_detail=detailed_output, + other_rewards=[], + legal=legal, + ambiguous=ambiguous, + has_legal_ambiguous=True, ) - episode.set_parameters(env) + if need_store_first_act: + # I need to manually force in the first observation (otherwise it's not computed) + episode.observations.objects[0] = episode.observations.helper.from_vect( + observations[time_step, :] + ) + episode.set_parameters(env) beg_ = time.perf_counter() @@ -246,26 +251,38 @@ def _aux_run_one_episode( res_env_tmp = env.steps(act) for (obs, reward, done, info), opp_attack in zip(*res_env_tmp): time_step += 1 - cum_reward += _aux_add_data(reward, env, episode, - efficient_storing, - end__, beg__, act, - obs, info, time_step, - opp_attack) + if use_compact_episode_data: + duration = end__ - beg__ + cum_reward = episode.update(time_step, env, act, + obs, reward, done, duration, info) + else: + cum_reward += _aux_add_data(reward, env, episode, + efficient_storing, + end__, beg__, act, + obs, info, time_step, + opp_attack) pbar_.update(1) else: # regular environment obs, reward, done, info = env.step(act) time_step += 1 opp_attack = env._oppSpace.last_attack - cum_reward += _aux_add_data(reward, env, episode, - efficient_storing, - end__, beg__, act, - obs, info, time_step, - opp_attack) + if use_compact_episode_data: + duration = end__ - beg__ + cum_reward = episode.update(time_step, env, act, + obs, reward, done, duration, info) + else: + cum_reward += _aux_add_data(reward, env, episode, + efficient_storing, + end__, beg__, act, + obs, info, time_step, + opp_attack) pbar_.update(1) - episode.set_game_over(time_step) + if not use_compact_episode_data: + episode.set_game_over(time_step) end_ = time.perf_counter() - episode.set_meta(env, time_step, float(cum_reward), env_seed, agent_seed) + if not use_compact_episode_data: + episode.set_meta(env, time_step, float(cum_reward), env_seed, agent_seed) li_text = [ "Env: {:.2f}s", "\t - apply act {:.2f}s", @@ -287,8 +304,8 @@ def _aux_run_one_episode( cum_reward, ) ) - - episode.set_episode_times(env, time_act, beg_, end_) + if not use_compact_episode_data: + episode.set_episode_times(env, time_act, beg_, end_) episode.to_disk() name_chron = env.chronics_handler.get_name() diff --git a/grid2op/Runner/runner.py b/grid2op/Runner/runner.py index 6aa8624f6..1c8dc1637 100644 --- a/grid2op/Runner/runner.py +++ b/grid2op/Runner/runner.py @@ -34,7 +34,7 @@ _aux_one_process_parrallel, ) from grid2op.Runner.basic_logger import DoNothingLog, ConsoleLog -from grid2op.Episode import EpisodeData +from grid2op.Episode import EpisodeData, CompactEpisodeData # on windows if i start using sequential, i need to continue using sequential # if i start using parallel i need to continue using parallel @@ -281,9 +281,11 @@ def __init__( kwargs_attention_budget=None, has_attention_budget=False, logger=None, + use_compact_episode_data=False, kwargs_observation=None, observation_bk_class=None, observation_bk_kwargs=None, + # experimental: whether to read from local dir or generate the classes on the fly: _read_from_local_dir=False, _is_test=False, # TODO not implemented !! @@ -344,6 +346,10 @@ def __init__( voltagecontrolerClass: :class:`grid2op.VoltageControler.ControlVoltageFromFile`, optional The controler that will change the voltage setpoints of the generators. + use_compact_episode_data: ``bool``, optional + Whether to use :class:`grid2op.Episode.CompactEpisodeData` instead of :class:`grid2op.Episode.EpisodeData` to store + Episode to disk (allows it to be replayed later). Defaults to False. + # TODO documentation on the opponent # TOOD doc for the attention budget """ @@ -504,6 +510,8 @@ def __init__( else: self.logger = logger.getChild("grid2op_Runner") + self.use_compact_episode_data = use_compact_episode_data + # store _parameters self.init_env_path = init_env_path self.init_grid_path = init_grid_path @@ -749,6 +757,7 @@ def run_one_episode( max_iter=max_iter, agent_seed=agent_seed, detailed_output=detailed_output, + use_compact_episode_data = self.use_compact_episode_data, ) if max_iter is not None: env.chronics_handler.set_max_iter(-1) @@ -1048,6 +1057,7 @@ def _get_params(self): "kwargs_attention_budget": self._kwargs_attention_budget, "has_attention_budget": self._has_attention_budget, "logger": self.logger, + "use_compact_episode_data": self.use_compact_episode_data, "kwargs_observation": self._kwargs_observation, "_read_from_local_dir": self._read_from_local_dir, "_is_test": self._is_test, diff --git a/grid2op/tests/test_CompactEpisodeData.py b/grid2op/tests/test_CompactEpisodeData.py new file mode 100644 index 000000000..e3dc8713a --- /dev/null +++ b/grid2op/tests/test_CompactEpisodeData.py @@ -0,0 +1,265 @@ +# Copyright (c) 2019-2020, RTE (https://www.rte-france.com) +# See AUTHORS.txt +# This Source Code Form is subject to the terms of the Mozilla Public License, version 2.0. +# If a copy of the Mozilla Public License, version 2.0 was not distributed with this file, +# you can obtain one at http://mozilla.org/MPL/2.0/. +# SPDX-License-Identifier: MPL-2.0 +# This file is part of Grid2Op, Grid2Op a testbed platform to model sequential decision making in power systems. + +import tempfile +import warnings +import pdb +import unittest + +import grid2op +from grid2op.Agent import OneChangeThenNothing +from grid2op.tests.helper_path_test import * +from grid2op.Chronics import Multifolder +from grid2op.Reward import L2RPNReward +from grid2op.Backend import PandaPowerBackend +from grid2op.Runner import Runner +from grid2op.Episode import CompactEpisodeData, EpisodeData +from grid2op.dtypes import dt_float +from grid2op.Agent import BaseAgent +from grid2op.Action import TopologyAction +from grid2op.Parameters import Parameters +from grid2op.Opponent.baseActionBudget import BaseActionBudget +from grid2op.Opponent import RandomLineOpponent + + +DEBUG = True +PATH_ADN_CHRONICS_FOLDER = os.path.abspath( + os.path.join(PATH_CHRONICS, "test_multi_chronics") +) + + +class TestCompactEpisodeData(unittest.TestCase): + def setUp(self): + """ + The case file is a representation of the case14 as found in the ieee14 powergrid. + :return: + """ + self.tolvect = dt_float(1e-2) + self.tol_one = dt_float(1e-5) + self.max_iter = 10 + self.real_reward = dt_float(179.99818) + + self.init_grid_path = os.path.join(PATH_DATA_TEST_PP, "test_case14.json") + self.path_chron = PATH_ADN_CHRONICS_FOLDER + self.parameters_path = None + self.names_chronics_to_backend = { + "loads": { + "2_C-10.61": "load_1_0", + "3_C151.15": "load_2_1", + "14_C63.6": "load_13_2", + "4_C-9.47": "load_3_3", + "5_C201.84": "load_4_4", + "6_C-6.27": "load_5_5", + "9_C130.49": "load_8_6", + "10_C228.66": "load_9_7", + "11_C-138.89": "load_10_8", + "12_C-27.88": "load_11_9", + "13_C-13.33": "load_12_10", + }, + "lines": { + "1_2_1": "0_1_0", + "1_5_2": "0_4_1", + "9_10_16": "8_9_2", + "9_14_17": "8_13_3", + "10_11_18": "9_10_4", + "12_13_19": "11_12_5", + "13_14_20": "12_13_6", + "2_3_3": "1_2_7", + "2_4_4": "1_3_8", + "2_5_5": "1_4_9", + "3_4_6": "2_3_10", + "4_5_7": "3_4_11", + "6_11_11": "5_10_12", + "6_12_12": "5_11_13", + "6_13_13": "5_12_14", + "4_7_8": "3_6_15", + "4_9_9": "3_8_16", + "5_6_10": "4_5_17", + "7_8_14": "6_7_18", + "7_9_15": "6_8_19", + }, + "prods": { + "1_G137.1": "gen_0_4", + "3_G36.31": "gen_2_1", + "6_G63.29": "gen_5_2", + "2_G-56.47": "gen_1_0", + "8_G40.43": "gen_7_3", + }, + } + self.gridStateclass = Multifolder + self.backendClass = PandaPowerBackend + self.runner = Runner( + init_grid_path=self.init_grid_path, + init_env_path=self.init_grid_path, + path_chron=self.path_chron, + parameters_path=self.parameters_path, + names_chronics_to_backend=self.names_chronics_to_backend, + gridStateclass=self.gridStateclass, + backendClass=self.backendClass, + rewardClass=L2RPNReward, + other_rewards={"test": L2RPNReward}, + max_iter=self.max_iter, + name_env="test_episodedata_env", + use_compact_episode_data=True, + ) + + def test_load_ambiguous(self): + f = tempfile.mkdtemp() + + class TestSuitAgent(BaseAgent): + def __init__(self, *args, **kwargs): + BaseAgent.__init__(self, *args, **kwargs) + + def act(self, observation, reward, done=False): + # do a ambiguous action + return self.action_space( + {"set_line_status": [(0, 1)], "change_line_status": [0]} + ) + + with warnings.catch_warnings(): + warnings.filterwarnings("ignore") + with grid2op.make("rte_case14_test", test=True, _add_to_name=type(self).__name__) as env: + my_agent = TestSuitAgent(env.action_space) + runner = Runner( + **env.get_params_for_runner(), + agentClass=None, + agentInstance=my_agent, + use_compact_episode_data=True, + ) + + # test that the right seeds are assigned to the agent + res = runner.run(nb_episode=1, max_iter=self.max_iter, path_save=f) + episode_data = CompactEpisodeData.from_disk(path=f, ep_id=res[0][1]) + assert int(episode_data.meta["chronics_max_timestep"]) == self.max_iter + assert len(episode_data.actions) == self.max_iter + assert len(episode_data.observations) == self.max_iter + 1 + assert len(episode_data.env_actions) == self.max_iter + assert len(episode_data.attacks) == self.max_iter + + def test_one_episode_with_saving(self): + f = tempfile.mkdtemp() + ( + episode_name, + cum_reward, + timestep, + max_ts + ) = self.runner.run_one_episode(path_save=f) + episode_data = CompactEpisodeData.from_disk(path=f, ep_id=episode_name) + assert int(episode_data.meta["chronics_max_timestep"]) == self.max_iter + assert len(episode_data.other_rewards) == self.max_iter + print("\n\nOther Rewards:", episode_data.other_reward_names) + other_reward_idx = episode_data.other_reward_names.index("test") + other_reward = episode_data.other_rewards[:, other_reward_idx] + assert np.all(np.abs(other_reward - episode_data.rewards) <= self.tol_one) + assert np.abs(episode_data.meta["cumulative_reward"] - self.real_reward) <= self.tol_one + + + def test_collection_wrapper_after_run(self): + OneChange = OneChangeThenNothing.gen_next( + {"set_bus": {"lines_or_id": [(1, -1)]}} + ) + runner = Runner( + init_grid_path=self.init_grid_path, + init_env_path=self.init_grid_path, + path_chron=self.path_chron, + parameters_path=self.parameters_path, + names_chronics_to_backend=self.names_chronics_to_backend, + gridStateclass=self.gridStateclass, + backendClass=self.backendClass, + rewardClass=L2RPNReward, + other_rewards={"test": L2RPNReward}, + max_iter=self.max_iter, + name_env="test_episodedata_env", + agentClass=OneChange, + use_compact_episode_data=True, + ) + _, cum_reward, timestep, max_ts, episode_data = runner.run_one_episode( + max_iter=self.max_iter, detailed_output=True + ) + # Check that the type of first action is set bus + assert episode_data.action_space.from_vect(episode_data.actions[0]).get_types()[2] + + def test_len(self): + """test i can use the function "len" of the episode data""" + f = tempfile.mkdtemp() + ( + episode_name, + cum_reward, + timestep, + max_ts + ) = self.runner.run_one_episode(path_save=f) + episode_data = CompactEpisodeData.from_disk(path=f, ep_id=episode_name) + len(episode_data) + + def test_3_episode_with_saving(self): + f = tempfile.mkdtemp() + res = self.runner._run_sequential(nb_episode=3, path_save=f) + for i, episode_name, cum_reward, timestep, total_ts in res: + episode_data = CompactEpisodeData.from_disk(path=f, ep_id=episode_name) + assert int(episode_data.meta["chronics_max_timestep"]) == self.max_iter + assert np.abs(episode_data.meta["cumulative_reward"] - self.real_reward) <= self.tol_one + + def test_3_episode_3process_with_saving(self): + f = tempfile.mkdtemp() + nb_episode = 2 + res = self.runner._run_parrallel( + nb_episode=nb_episode, nb_process=2, path_save=f, + ) + assert len(res) == nb_episode + print(f"\n\n{f}\n",'\n'.join([str(elt) for elt in Path(f).glob('*')])) + for i, episode_name, cum_reward, timestep, total_ts in res: + episode_data = CompactEpisodeData.from_disk(path=f, ep_id=episode_name) + assert int(episode_data.meta["chronics_max_timestep"]) == self.max_iter + assert np.abs(episode_data.meta["cumulative_reward"] - self.real_reward) <= self.tol_one + + def test_with_opponent(self): + init_budget = 1000 + opponent_attack_duration = 15 + opponent_attack_cooldown = 30 + opponent_budget_per_ts = 0.0 + opponent_action_class = TopologyAction + + LINES_ATTACKED = ["1_3_3", "1_4_4", "3_6_15", "9_10_12", "11_12_13", "12_13_14"] + + p = Parameters() + p.NO_OVERFLOW_DISCONNECTION = True + with warnings.catch_warnings(): + warnings.filterwarnings("ignore") + env = grid2op.make( + "rte_case14_realistic", + test=True, + param=p, + opponent_init_budget=init_budget, + opponent_budget_per_ts=opponent_budget_per_ts, + opponent_attack_cooldown=opponent_attack_cooldown, + opponent_attack_duration=opponent_attack_duration, + opponent_action_class=opponent_action_class, + opponent_budget_class=BaseActionBudget, + opponent_class=RandomLineOpponent, + kwargs_opponent={"lines_attacked": LINES_ATTACKED}, + _add_to_name=type(self).__name__, + ) + env.seed(0) + runner = Runner(**env.get_params_for_runner(), use_compact_episode_data=True) + + f = tempfile.mkdtemp() + res = runner.run( + nb_episode=1, + env_seeds=[4], + agent_seeds=[0], + max_iter=opponent_attack_cooldown - 1, + path_save=f, + ) + + episode_data = CompactEpisodeData.from_disk(path=f, ep_id=res[0][1]) + lines_impacted, subs_impacted = episode_data.attack_space.from_vect(episode_data.attacks[0]).get_topological_impact() + assert lines_impacted[3] + + +if __name__ == "__main__": + unittest.main() diff --git a/grid2op/tests/test_EpisodeData.py b/grid2op/tests/test_EpisodeData.py index 15f231979..7cfda9ba7 100644 --- a/grid2op/tests/test_EpisodeData.py +++ b/grid2op/tests/test_EpisodeData.py @@ -133,7 +133,7 @@ def act(self, observation, reward, done=False): # test that the right seeds are assigned to the agent res = runner.run(nb_episode=1, max_iter=self.max_iter, path_save=f) - episode_data = EpisodeData.from_disk(agent_path=f, name=res[0][1]) + episode_data = EpisodeData.from_disk(agent_path=f, ep_id=res[0][1]) assert int(episode_data.meta["chronics_max_timestep"]) == self.max_iter assert len(episode_data.actions) == self.max_iter assert len(episode_data.observations) == self.max_iter + 1 @@ -148,7 +148,7 @@ def test_one_episode_with_saving(self): timestep, max_ts ) = self.runner.run_one_episode(path_save=f) - episode_data = EpisodeData.from_disk(agent_path=f, name=episode_name) + episode_data = EpisodeData.from_disk(agent_path=f, ep_id=episode_name) assert int(episode_data.meta["chronics_max_timestep"]) == self.max_iter assert len(episode_data.other_rewards) == self.max_iter for other, real in zip(episode_data.other_rewards, episode_data.rewards): @@ -191,14 +191,14 @@ def test_len(self): timestep, max_ts ) = self.runner.run_one_episode(path_save=f) - episode_data = EpisodeData.from_disk(agent_path=f, name=episode_name) + episode_data = EpisodeData.from_disk(agent_path=f, ep_id=episode_name) len(episode_data) def test_3_episode_with_saving(self): f = tempfile.mkdtemp() res = self.runner._run_sequential(nb_episode=3, path_save=f) for i, episode_name, cum_reward, timestep, total_ts in res: - episode_data = EpisodeData.from_disk(agent_path=f, name=episode_name) + episode_data = EpisodeData.from_disk(agent_path=f, ep_id=episode_name) assert int(episode_data.meta["chronics_max_timestep"]) == self.max_iter assert ( np.abs( @@ -215,7 +215,7 @@ def test_3_episode_3process_with_saving(self): ) assert len(res) == nb_episode for i, episode_name, cum_reward, timestep, total_ts in res: - episode_data = EpisodeData.from_disk(agent_path=f, name=episode_name) + episode_data = EpisodeData.from_disk(agent_path=f, ep_id=episode_name) assert int(episode_data.meta["chronics_max_timestep"]) == self.max_iter assert ( np.abs( @@ -263,7 +263,7 @@ def test_with_opponent(self): path_save=f, ) - episode_data = EpisodeData.from_disk(agent_path=f, name=res[0][1]) + episode_data = EpisodeData.from_disk(agent_path=f, ep_id=res[0][1]) lines_impacted, subs_impacted = episode_data.attacks[0].get_topological_impact() assert lines_impacted[3]