Source code for eta_utility.eta_x.envs.base_env_sim

from __future__ import annotations

import abc
import pathlib
import time
from logging import getLogger
from typing import TYPE_CHECKING

import numpy as np

from eta_utility.eta_x.envs import BaseEnv
from eta_utility.simulators import FMUSimulator

if TYPE_CHECKING:
    from collections.abc import Mapping
    from datetime import datetime
    from typing import Any, Callable

    from eta_utility.eta_x import ConfigOptRun
    from eta_utility.type_hints import ObservationType, StepResult, TimeStep

log = getLogger(__name__)


[docs] class BaseEnvSim(BaseEnv, abc.ABC): """Base class for FMU Simulation models environments. :param env_id: Identification for the environment, useful when creating multiple environments. :param config_run: Configuration of the optimization run. :param verbose: Verbosity to use for logging. :param callback: callback which should be called after each episode. :param scenario_time_begin: Beginning time of the scenario. :param scneario_time_end: Ending time of the scenario. :param episode_duration: Duration of the episode in seconds. :param sampling_time: Duration of a single time sample / time step in seconds. :param model_parameters: Parameters for the mathematical model. :param sim_steps_per_sample: Number of simulation steps to perform during every sample. :param render_mode: Renders the environments to help visualise what the agent see, examples modes are "human", "rgb_array", "ansi" for text. :param kwargs: Other keyword arguments (for subclasses). """ @property @abc.abstractmethod def fmu_name(self) -> str: """Name of the FMU file""" return "" def __init__( self, env_id: int, config_run: ConfigOptRun, verbose: int = 2, callback: Callable | None = None, *, scenario_time_begin: datetime | str, scenario_time_end: datetime | str, episode_duration: TimeStep | str, sampling_time: TimeStep | str, model_parameters: Mapping[str, Any] | None = None, sim_steps_per_sample: int | str = 1, render_mode: str | None = None, **kwargs: Any, ) -> None: super().__init__( env_id=env_id, config_run=config_run, verbose=verbose, callback=callback, scenario_time_begin=scenario_time_begin, scenario_time_end=scenario_time_end, episode_duration=episode_duration, sampling_time=sampling_time, render_mode=render_mode, **kwargs, ) #: Number of simulation steps to be taken for each sample. This must be a divisor of 'sampling_time'. self.sim_steps_per_sample: int = int(sim_steps_per_sample) #: The FMU is expected to be placed in the same folder as the environment self.path_fmu: pathlib.Path = self.path_env / (self.fmu_name + ".fmu") #: Configuration for the FMU model parameters, that need to be set for initialization of the Model. self.model_parameters: Mapping[str, int | float] | None = model_parameters #: Instance of the FMU. This can be used to directly access the eta_utility.FMUSimulator interface. self.simulator: FMUSimulator
[docs] def _init_simulator(self, init_values: Mapping[str, int | float] | None = None) -> None: """Initialize the simulator object. Make sure to call _names_from_state before this or to otherwise initialize the names array. This can also be used to reset the simulator after an episode is completed. It will reuse the same simulator object and reset it to the given initial values. :param init_values: Dictionary of initial values for some FMU variables. """ assert self.state_config is not None, "Set state_config before calling _init_simulator function." _init_vals = {} if init_values is None else init_values if hasattr(self, "simulator") and isinstance(self.simulator, FMUSimulator): self.simulator.reset(_init_vals) else: # Instance of the FMU. This can be used to directly access the eta_utility.FMUSimulator interface. self.simulator = FMUSimulator( self.env_id, self.path_fmu, start_time=0.0, stop_time=self.episode_duration, step_size=float(self.sampling_time / self.sim_steps_per_sample), names_inputs=[str(self.state_config.map_ext_ids[name]) for name in self.state_config.ext_inputs], names_outputs=[str(self.state_config.map_ext_ids[name]) for name in self.state_config.ext_outputs], init_values=_init_vals, )
[docs] def simulate(self, state: Mapping[str, float]) -> tuple[dict[str, float], bool, float]: """Perform a simulator step and return data as specified by the is_ext_observation parameter of the state_config. :param state: State of the environment before the simulation. :return: Output of the simulation, boolean showing whether all simulation steps where successful, time elapsed during simulation. """ assert self.state_config is not None, "Set state_config before calling simulate function." # generate FMU input from current state step_inputs = [] for key in self.state_config.ext_inputs: try: value = state[key] if isinstance(value, (int, float)): scale_config = self.state_config.ext_scale[key] value = value / scale_config["multiply"] - scale_config["add"] step_inputs.append(value) except KeyError as e: raise KeyError(f"{e!s} is unavailable in environment state.") from e sim_time_start = time.time() step_success = True try: step_output = self.simulator.step(step_inputs) except Exception as e: step_success = False log.error(e) # stop timer for simulation step time debugging sim_time_elapsed = time.time() - sim_time_start # save step_outputs into data_store output = {} if step_success: for idx, name in enumerate(self.state_config.ext_outputs): value = step_output[idx] if isinstance(value, (int, float)): scale_config = self.state_config.ext_scale[name] value = (value + scale_config["add"]) * scale_config["multiply"] output[name] = value return output, step_success, sim_time_elapsed
[docs] def step(self, action: np.ndarray) -> StepResult: """Perform one time step and return its results. This is called for every event or for every time step during the simulation/optimization run. It should utilize the actions as supplied by the agent to determine the new state of the environment. The method must return a five-tuple of observations, rewards, terminated, truncated, info. This also updates self.state and self.state_log to store current state information. .. note:: This function always returns 0 reward. Therefore, it must be extended if it is to be used with reinforcement learning agents. If you need to manipulate actions (discretization, policy shaping, ...) do this before calling this function. If you need to manipulate observations and rewards, do this after calling this function. :param action: Actions to perform in the environment. :return: The return value represents the state of the environment after the step was performed. * **observations**: A numpy array with new observation values as defined by the observation space. Observations is a np.array() (numpy array) with floating point or integer values. * **reward**: The value of the reward function. This is just one floating point value. * **terminated**: Boolean value specifying whether an episode has been completed. If this is set to true, the reset function will automatically be called by the agent or by eta_i. * **truncated**: Boolean, whether the truncation condition outside the scope is satisfied. Typically, this is a timelimit, but could also be used to indicate an agent physically going out of bounds. Can be used to end the episode prematurely before a terminal state is reached. If true, the user needs to call the `reset` function. * **info**: Provide some additional info about the state of the environment. The contents of this may be used for logging purposes in the future but typically do not currently serve a purpose. """ self._actions_valid(action) step_success, sim_time_elapsed = self._update_state(action) self.state_log.append(self.state) terminated = self._done() or not step_success info: dict[str, Any] = {"sim_time_elapsed": sim_time_elapsed} # Render the environment at each step if self.render_mode is not None: self.render() return self._observations(), 0, terminated, False, info
[docs] def _update_state(self, action: np.ndarray) -> tuple[bool, float]: """Take additional_state, execute simulation and get state information from scenario. This function updates self.state and increments the step counter. .. warning:: You have to update self.state_log with the entire state before leaving the step to store the state information. :param action: Actions to perform in the environment. :return: Success of the simulation, time taken for simulation. """ assert self.state_config is not None, "Set state_config before calling _update_state function." # Store actions new_state = {} if self.additional_state is None else self.additional_state for idx, act in enumerate(self.state_config.actions): new_state[act] = action[idx] step_success, sim_time_elapsed = False, 0.0 # simulate one time step and store the results. for i in range(self.sim_steps_per_sample): # do multiple FMU steps in one environment-step sim_result, step_success, sim_time_elapsed = self.simulate({**self.state, **new_state}) new_state.update(sim_result) # Append intermediate simulation results to the state_log if i < self.sim_steps_per_sample - 1: self.state_log.append({**self.state, **new_state}) self.n_steps += 1 # Update scenario and environment state new_state.update(self.get_scenario_state()) self.state = new_state return step_success, sim_time_elapsed
[docs] def reset( self, *, seed: int | None = None, options: dict[str, Any] | None = None, ) -> tuple[ObservationType, dict[str, Any]]: """Resets the environment to an initial internal state, returning an initial observation and info. This method generates a new starting state often with some randomness to ensure that the agent explores the state space and learns a generalised policy about the environment. This randomness can be controlled with the ``seed`` parameter otherwise if the environment already has a random number generator and :meth:`reset` is called with ``seed=None``, the RNG is not reset. When using the environment in conjunction with *stable_baselines3*, the vectorized environment will take care of seeding your custom environment automatically. For Custom environments, the first line of :meth:`reset` should be ``super().reset(seed=seed)`` which implements the seeding correctly. .. note :: Don't forget to store and reset the episode_timer. :param seed: The seed that is used to initialize the environment's PRNG (`np_random`). If the environment does not already have a PRNG and ``seed=None`` (the default option) is passed, a seed will be chosen from some source of entropy (e.g. timestamp or /dev/urandom). However, if the environment already has a PRNG and ``seed=None`` is passed, the PRNG will *not* be reset. If you pass an integer, the PRNG will be reset even if it already exists. (default: None) :param options: Additional information to specify how the environment is reset (optional, depending on the specific environment) (default: None) :return: Tuple of observation and info. The observation of the initial state will be an element of :attr:`observation_space` (typically a numpy array) and is analogous to the observation returned by :meth:`step`. Info is a dictionary containing auxiliary information complementing ``observation``. It should be analogous to the ``info`` returned by :meth:`step`. """ assert self.state_config is not None, "Set state_config before calling reset function." super().reset(seed=seed, options=options) # reset the FMU after every episode with new parameters self._init_simulator(self.model_parameters) # Update scenario data, read values from the fmu without time step and store the results start_obs = [] for obs in self.state_config.ext_outputs: start_obs.append(str(self.state_config.map_ext_ids[obs])) output = self.simulator.read_values(start_obs) self.state = {} if self.additional_state is None else self.additional_state for idx, name in enumerate(self.state_config.ext_outputs): value = output[idx] if isinstance(value, (int, float)): scale_config = self.state_config.ext_scale[name] value = (value + scale_config["add"]) * scale_config["multiply"] self.state[name] = value self.state.update(self.get_scenario_state()) self.state_log.append(self.state) # Render the environment when calling the reset function if self.render_mode is not None: self.render() return self._observations(), {}
[docs] def close(self) -> None: """Close the environment. This should always be called when an entire run is finished. It should be used to close any resources (i.e. simulation models) used by the environment. Default behavior for the Simulation environment is to close the FMU object. """ self.simulator.close() # close the FMU