Source code for eta_utility.eta_x.envs.base_env

from __future__ import annotations

import abc
import inspect
import pathlib
import time
from datetime import datetime, timedelta
from logging import getLogger
from typing import TYPE_CHECKING

import numpy as np
import pandas as pd
from gymnasium import Env

from eta_utility import timeseries
from eta_utility.eta_x.envs.state import StateConfig
from eta_utility.util import csv_export

if TYPE_CHECKING:
    from collections.abc import Mapping, Sequence
    from typing import Any, Callable

    from eta_utility.eta_x import ConfigOptRun
    from eta_utility.type_hints import ObservationType, Path, StepResult, TimeStep


log = getLogger(__name__)


[docs] class BaseEnv(Env, abc.ABC): """Abstract environment definition, providing some basic functionality for concrete environments to use. The class implements and adapts functions from gymnasium.Env. It provides additional functionality as required by the ETA-X framework and should be used as the starting point for new environments. The initialization of this superclass performs many of the necessary tasks, required to specify a concrete environment. Read the documentation carefully to understand, how new environments can be developed, building on this starting point. There are some attributes that must be set and some methods that must be implemented to satisfy the interface. This is required to create concrete environments. The required attributes are: - **version**: Version number of the environment. - **description**: Short description string of the environment. - **action_space**: The action space of the environment (see also gymnasium.spaces for options). - **observation_space**: The observation space of the environment (see also gymnasium.spaces for options). The gymnasium interface requires the following methods for the environment to work correctly within the framework. Consult the documentation of each method for more detail. - **step()** - **reset()** - **close()** :param env_id: Identification for the environment, useful when creating multiple environments. :param config_run: Configuration of the optimization run. :param verbose: Verbosity to use for logging. :param callback: callback that should be called after each episode. :param state_modification_callback: callback that should be called after state setup, before logging the state. :param scenario_time_begin: Beginning time of the scenario. :param scenario_time_end: Ending time of the scenario. :param episode_duration: Duration of the episode in seconds. :param sampling_time: Duration of a single time sample / time step in seconds. :param render_mode: Renders the environments to help visualise what the agent see, examples modes are "human", "rgb_array", "ansi" for text. :param kwargs: Other keyword arguments (for subclasses). """ @property @abc.abstractmethod def version(self) -> str: """Version of the environment""" return "" @property @abc.abstractmethod def description(self) -> str: """Long description of the environment""" return "" def __init__( self, env_id: int, config_run: ConfigOptRun, verbose: int = 2, callback: Callable | None = None, state_modification_callback: Callable | None = None, *, scenario_time_begin: datetime | str, scenario_time_end: datetime | str, episode_duration: TimeStep | str, sampling_time: TimeStep | str, sim_steps_per_sample: int | str = 1, render_mode: str | None = None, **kwargs: Any, ) -> None: super().__init__() #: Verbosity level used for logging. self.verbose: int = verbose log.setLevel(int(verbose * 10)) # Set some standard path settings #: Information about the optimization run and information about the paths. #: For example, it defines path_results and path_scenarios. self.config_run: ConfigOptRun = config_run #: Path for storing results. self.path_results: pathlib.Path = self.config_run.path_series_results #: Path for the scenario data. self.path_scenarios: pathlib.Path | None = self.config_run.path_scenarios #: Path of the environment file. self.path_env: pathlib.Path for f in inspect.stack(): if "__class__" in f.frame.f_locals and f.frame.f_locals["__class__"] is self.__class__: self.path_env = pathlib.Path(f.filename).parent #: Callback can be used for logging and plotting. self.callback: Callable | None = callback #: Callback can be used for modifying the state at each time step. self.state_modification_callback: Callable | None = state_modification_callback # Store some important settings #: ID of the environment (useful for vectorized environments). self.env_id: int = int(env_id) #: Name of the current optimization run. self.run_name: str = self.config_run.name #: Number of completed episodes. self.n_episodes: int = 0 #: Current step of the model (number of completed steps) in the current episode. self.n_steps: int = 0 #: Current step of the model (total over all episodes). self.n_steps_longtime: int = 0 #: Render mode for rendering the environment self.render_mode: str | None = render_mode # Set some standard environment settings #: Duration of one episode in seconds. self.episode_duration: float = float( episode_duration if not isinstance(episode_duration, timedelta) else episode_duration.total_seconds() ) #: Sampling time (interval between optimization time steps) in seconds. self.sampling_time: float = float( sampling_time if not isinstance(sampling_time, timedelta) else sampling_time.total_seconds() ) #: Number of time steps (of width sampling_time) in each episode. self.n_episode_steps: int = int(self.episode_duration // self.sampling_time) #: Duration of the scenario for each episode (for total time imported from csv). self.scenario_duration: float = self.episode_duration + self.sampling_time #: Beginning time of the scenario. self.scenario_time_begin: datetime if isinstance(scenario_time_begin, datetime): self.scenario_time_begin = scenario_time_begin else: self.scenario_time_begin = datetime.strptime(scenario_time_begin, "%Y-%m-%d %H:%M") #: Ending time of the scenario (should be in the format %Y-%m-%d %H:%M). self.scenario_time_end: datetime if isinstance(scenario_time_end, datetime): self.scenario_time_end = scenario_time_end else: self.scenario_time_end = datetime.strptime(scenario_time_end, "%Y-%m-%d %H:%M") # Check if scenario begin and end times make sense if self.scenario_time_begin > self.scenario_time_end: raise ValueError("Start time of the scenario should be smaller than or equal to end time.") #: The time series DataFrame contains all time series scenario data. It can be filled by the #: import_scenario method. self.timeseries: pd.DataFrame = pd.DataFrame() #: Data frame containing the currently valid range of time series data. self.ts_current: pd.DataFrame = pd.DataFrame() #: Configuration to describe what the environment state looks like. self.state_config: StateConfig | None = None # Store data logs and log other information #: Episode timer (stores the start time of the episode). self.episode_timer: float = time.time() #: Current state of the environment. self.state: dict[str, float] #: Additional state information to append to the state during stepping and reset self.additional_state: dict[str, float] | None = None #: Log of the environment state. self.state_log: list[dict[str, float]] = [] #: Log of the environment state over multiple episodes. self.state_log_longtime: list[list[dict[str, float]]] = [] #: Some specific current environment settings / other data, apart from state. self.data: dict[str, Any] #: Log of specific environment settings / other data, apart from state for the episode. self.data_log: list[dict[str, Any]] = [] #: Log of specific environment settings / other data, apart from state, over multiple episodes. self.data_log_longtime: list[list[dict[str, Any]]] #: Number of simulation steps to be taken for each sample. This must be a divisor of 'sampling_time'. self.sim_steps_per_sample: int = int(sim_steps_per_sample)
[docs] def import_scenario(self, *scenario_paths: Mapping[str, Any], prefix_renamed: bool = True) -> pd.DataFrame: """Load data from csv into self.timeseries_data by using scenario_from_csv :param scenario_paths: One or more scenario configuration dictionaries (or a list of dicts), which each contain a path for loading data from a scenario file. The dictionary should have the following structure, with <X> denoting the variable value: .. note :: [{*path*: <X>, *prefix*: <X>, *interpolation_method*: <X>, *resample_method*: <X>, *scale_factors*: {col_name: <X>}, *rename_cols*: {col_name: <X>}, *infer_datetime_cols*: <X>, *time_conversion_str*: <X>}] * **path**: Path to the scenario file (relative to scenario_path). * **prefix**: Prefix for all columns in the file, useful if multiple imported files have the same column names. * **interpolation_method**: A pandas interpolation method, required if the frequency of values must be increased in comparison to the files' data. (e.g.: 'linear' or 'pad'). * **scale_factors**: Scaling factors for specific columns. This can be useful for example, if a column contains data in kilowatt and should be imported in watts. In this case, the scaling factor for the column would be 1000. * **rename_cols**: Mapping of column names from the file to new names for the imported data. * **infer_datetime_cols**: Number of the column which contains datetime data. If this value is not present, the time_conversion_str variable will be used to determine the datetime format. * **time_conversion_str**: Time conversion string, determining the datetime format used in the imported file (default: %Y-%m-%d %H:%M). :param prefix_renamed: Determine whether the prefix is also applied to renamed columns. :return: Data Frame of the imported and formatted scenario data. """ paths = [] prefix = [] int_methods = [] scale_factors = [] rename_cols = {} infer_datetime_from = [] time_conversion_str = [] for path in scenario_paths: paths.append(self.path_scenarios / path["path"]) prefix.append(path.get("prefix", None)) int_methods.append(path.get("interpolation_method", None)) scale_factors.append(path.get("scale_factors", None)) (rename_cols.update(path.get("rename_cols", {})),) infer_datetime_from.append(path.get("infer_datetime_cols", "string")) time_conversion_str.append(path.get("time_conversion_str", "%Y-%m-%d %H:%M")) self.ts_current = timeseries.scenario_from_csv( paths=paths, resample_time=self.sampling_time, start_time=self.scenario_time_begin, end_time=self.scenario_time_end, total_time=self.scenario_duration, random=self.np_random, interpolation_method=int_methods, scaling_factors=scale_factors, rename_cols=rename_cols, prefix_renamed=prefix_renamed, infer_datetime_from=infer_datetime_from, time_conversion_str=time_conversion_str, ) return self.ts_current
[docs] def get_scenario_state(self) -> dict[str, Any]: """Get scenario data for the current time step of the environment, as specified in state_config. This assumes that scenario data in self.ts_current is available and scaled correctly. :return: Scenario data for current time step. """ assert self.state_config is not None, "StateConfig must be specified in the environment." scenario_state = {} for scen in self.state_config.scenarios: scenario_state[scen] = self.ts_current[self.state_config.map_scenario_ids[scen]].iloc[self.n_steps] return scenario_state
[docs] @abc.abstractmethod def step(self, action: np.ndarray) -> StepResult: """Perform one time step and return its results. This is called for every event or for every time step during the simulation/optimization run. It should utilize the actions as supplied by the agent to determine the new state of the environment. The method must return a five-tuple of observations, rewards, terminated, truncated, info. .. note :: Do not forget to increment n_steps and n_steps_longtime. :param action: Actions taken by the agent. :return: The return value represents the state of the environment after the step was performed. * **observations**: A numpy array with new observation values as defined by the observation space. Observations is a np.array() (numpy array) with floating point or integer values. * **reward**: The value of the reward function. This is just one floating point value. * **terminated**: Boolean value specifying whether an episode has been completed. If this is set to true, the reset function will automatically be called by the agent or by eta_i. * **truncated**: Boolean, whether the truncation condition outside the scope is satisfied. Typically, this is a timelimit, but could also be used to indicate an agent physically going out of bounds. Can be used to end the episode prematurely before a terminal state is reached. If true, the user needs to call the `reset` function. * **info**: Provide some additional info about the state of the environment. The contents of this may be used for logging purposes in the future but typically do not currently serve a purpose. """ raise NotImplementedError("Cannot step an abstract Environment.")
[docs] def _actions_valid(self, action: np.ndarray) -> None: """Check whether the actions are within the specified action space. :param action: Actions taken by the agent. :raise: RuntimeError, when the actions are not inside of the action space. """ if self.action_space.shape is not None and self.action_space.shape != action.shape: raise RuntimeError( f"Agent action {action} (shape: {action.shape})" f" does not correspond to shape of environment action space (shape: {self.action_space.shape})." )
[docs] def _create_new_state(self, additional_state: dict[str, float] | None) -> None: """Take some initial values and create a new environment state object, stored in self.state. :param additional_state: Values to initialize the state. """ self.state = {} if additional_state is None else additional_state
[docs] def _actions_to_state(self, actions: np.ndarray) -> None: """Gather actions and store them in self.state. :param actions: Actions taken by the agent. """ assert self.state_config is not None, "Set state_config before calling _actions_to_state function." for idx, act in enumerate(self.state_config.actions): self.state[act] = actions[idx]
[docs] def _observations(self) -> np.ndarray: """Determine the observations list from environment state. This uses state_config to determine all observations. :return: Observations for the agent as determined by state_config. """ assert self.state_config is not None, "Set state_config before calling _observations function." return np.array([self.state[name] for name in self.state_config.observations], dtype=np.float64)
[docs] def _done(self) -> bool: """Check if the episode is over or not using the number of steps (n_steps) and the total number of steps in an episode (n_episode_steps). :return: boolean showing, whether the episode is done. """ return self.n_steps >= self.n_episode_steps
[docs] def reset( self, *, seed: int | None = None, options: dict[str, Any] | None = None, ) -> tuple[ObservationType, dict[str, Any]]: """Resets the environment to an initial internal state, returning an initial observation and info. This method generates a new starting state often with some randomness to ensure that the agent explores the state space and learns a generalised policy about the environment. This randomness can be controlled with the ``seed`` parameter otherwise if the environment already has a random number generator and :meth:`reset` is called with ``seed=None``, the RNG is not reset. When using the environment in conjunction with *stable_baselines3*, the vectorized environment will take care of seeding your custom environment automatically. For Custom environments, the first line of :meth:`reset` should be ``super().reset(seed=seed)`` which implements the seeding correctly. .. note :: Don't forget to store and reset the episode_timer by calling self._reset_state() if you overwrite this function. :param seed: The seed that is used to initialize the environment's PRNG (`np_random`). If the environment does not already have a PRNG and ``seed=None`` (the default option) is passed, a seed will be chosen from some source of entropy (e.g. timestamp or /dev/urandom). However, if the environment already has a PRNG and ``seed=None`` is passed, the PRNG will *not* be reset. If you pass an integer, the PRNG will be reset even if it already exists. (default: None) :param options: Additional information to specify how the environment is reset (optional, depending on the specific environment) (default: None) :return: Tuple of observation and info. The observation of the initial state will be an element of :attr:`observation_space` (typically a numpy array) and is analogous to the observation returned by :meth:`step`. Info is a dictionary containing auxiliary information complementing ``observation``. It should be analogous to the ``info`` returned by :meth:`step`. """ self._reset_state() return super().reset(seed=seed, options=options)
[docs] def _reduce_state_log(self) -> list[dict[str, float]]: """Removes unwanted parameters from state_log before storing in state_log_longtime :return: The return value is a list of dictionaries, where the parameters that should not be stored were removed """ assert self.state_config is not None, "Set state_config before calling reduced_state_log." dataframe = pd.DataFrame(self.state_log) return dataframe.drop(columns=list(set(dataframe.keys()) - self.state_config.add_to_state_log)).to_dict( "records" )
[docs] def _reset_state(self) -> None: """Store episode statistics and reset episode counters.""" if self.n_steps > 0: if self.callback is not None: self.callback(self) # Store some logging data self.n_episodes += 1 # store reduced_state_log in state_log_longtime self.state_log_longtime.append(self._reduce_state_log()) self.n_steps_longtime += self.n_steps # Reset episode variables self.n_steps = 0 self.state_log = []
[docs] @abc.abstractmethod def close(self) -> None: """Close the environment. This should always be called when an entire run is finished. It should be used to close any resources (i.e. simulation models) used by the environment. """ raise NotImplementedError("Cannot close an abstract Environment.")
[docs] @abc.abstractmethod def render(self) -> None: """Render the environment The set of supported modes varies per environment. Some environments do not support rendering at all. By convention in Farama *gymnasium*, if mode is: * human: render to the current display or terminal and return nothing. Usually for human consumption. * rgb_array: Return a numpy.ndarray with shape (x, y, 3), representing RGB values for an x-by-y pixel image, suitable for turning into a video. * ansi: Return a string (str) or StringIO.StringIO containing a terminal-style text representation. The text can include newlines and ANSI escape sequences (e.g. for colors). """ raise NotImplementedError("Cannot render an abstract Environment.")
[docs] @classmethod def get_info(cls) -> tuple[str, str]: """Get info about environment. :return: Tuple of version and description. """ return cls.version, cls.description # type: ignore
[docs] def export_state_log( self, path: Path, names: Sequence[str] | None = None, *, sep: str = ";", decimal: str = ".", ) -> None: """Extension of csv_export to include timeseries on the data :param names: Field names used when data is a Matrix without column names. :param sep: Separator to use between the fields. :param decimal: Sign to use for decimal points. """ start_time = datetime.fromtimestamp(self.episode_timer) step = self.sampling_time / self.sim_steps_per_sample timerange = [start_time + timedelta(seconds=(k * step)) for k in range(len(self.state_log))] csv_export(path=path, data=self.state_log, index=timerange, names=names, sep=sep, decimal=decimal)