Source code for eta_utility.eta_x.eta_x

from __future__ import annotations

import os
import pathlib
from contextlib import contextmanager
from datetime import datetime
from logging import getLogger
from typing import TYPE_CHECKING

import numpy as np
from stable_baselines3.common.callbacks import CheckpointCallback
from stable_baselines3.common.vec_env import VecNormalize

from eta_utility.eta_x import ConfigOpt, ConfigOptRun
from eta_utility.eta_x.common import (
    CallbackEnvironment,
    initialize_model,
    is_env_closed,
    load_model,
    log_net_arch,
    log_run_info,
    log_to_file,
    merge_callbacks,
    vectorize_environment,
)

if TYPE_CHECKING:
    from collections.abc import Generator, Mapping
    from typing import Any

    from stable_baselines3.common.base_class import BaseAlgorithm
    from stable_baselines3.common.type_aliases import MaybeCallback
    from stable_baselines3.common.vec_env import VecEnv
    from stable_baselines3.common.vec_env.base_vec_env import VecEnvObs

log = getLogger(__name__)


[docs] class ETAx: """Initialize an optimization model and provide interfaces for optimization, learning and execution (play). :param root_path: Root path of the eta_x application (the configuration will be interpreted relative to this). :param config_name: Name of configuration .ini file in configuration directory (should be JSON format). :param config_overwrite: Dictionary to overwrite selected configurations. :param relpath_config: Relative path to configuration file, starting from root path. """ def __init__( self, root_path: str | os.PathLike, config_name: str, config_overwrite: Mapping[str, Any] | None = None, relpath_config: str | os.PathLike = "config/", ) -> None: # Load configuration for the optimization _root_path = root_path if isinstance(root_path, pathlib.Path) else pathlib.Path(root_path) _relpath_config = relpath_config if isinstance(relpath_config, pathlib.Path) else pathlib.Path(relpath_config) #: Path to the configuration file. self.path_config = _root_path / _relpath_config / f"{config_name}" #: ConfigOpt object for the optimization run. self.config: ConfigOpt = ConfigOpt.from_config_file(self.path_config, root_path, config_overwrite) log.setLevel(int(self.config.settings.verbose * 10)) #: Configuration for an optimization run. self.config_run: ConfigOptRun | None = None #: The vectorized environments. self.environments: VecEnv | VecNormalize | None = None #: Vectorized interaction environments. self.interaction_env: VecEnv | None = None #: The model or algorithm. self.model: BaseAlgorithm | None = None
[docs] @contextmanager def prepare_environments_models( self, series_name: str | None, run_name: str | None, run_description: str = "", reset: bool = False, training: bool = False, ) -> Generator: if is_env_closed(self.environments) or self.model is None: _series_name = series_name if series_name is not None else "" _run_name = run_name if run_name is not None else "" self.prepare_run(_series_name, _run_name, run_description) with self.prepare_environments(training): assert ( self.environments is not None ), "Initialized environments could not be found. Call prepare_environments first." self.prepare_model(reset) yield
[docs] def prepare_run(self, series_name: str, run_name: str, run_description: str = "") -> None: """Prepare the learn and play methods by reading configuration, creating results folders and the model. :param series_name: Name for a series of runs. :param run_name: Name for a specific run. :param run_description: Description for a specific run. """ self.config_run = ConfigOptRun( series=series_name, name=run_name, description=run_description, path_root=self.config.path_root, path_results=self.config.path_results, path_scenarios=self.config.path_scenarios, ) self.config_run.create_results_folders() # Add file handler to parent logger to log the terminal output log_to_file(config=self.config, config_run=self.config_run) log.info("Run prepared successfully.")
[docs] def prepare_model(self, reset: bool = False) -> None: """Check for existing model and load it or back it up and create a new model. :param reset: Flag to determine whether an existing model should be reset. """ self._prepare_model(reset=reset)
def _prepare_model(self, reset: bool = False) -> None: """Check for existing model and load it or back it up and create a new model. :param reset: Flag to determine whether an existing model should be reset. """ assert self.config_run is not None, ( "Set the config_run attribute before trying to initialize the model " "(for example by calling prepare_run)." ) assert ( self.environments is not None ), "Initialize the environments before trying to initialize the model(for example by calling prepare_run)." path_model = self.config_run.path_run_model if path_model.is_file() and reset: log.info(f"Existing model detected: {path_model}") bak_name = path_model / f"_{datetime.fromtimestamp(path_model.stat().st_mtime).strftime('%Y%m%d_%H%M')}.bak" path_model.rename(bak_name) log.info(f"Reset is active. Existing model will be backed up. Backup file name: {bak_name}") elif path_model.is_file(): log.info(f"Existing model detected: {path_model}. Loading model.") self.model = load_model( self.config.setup.agent_class, self.environments, self.config.settings.agent, self.config_run.path_run_model, tensorboard_log=self.config.setup.tensorboard_log, log_path=self.config_run.path_series_results, ) return # Initialize the model if it wasn't loaded from a file self.model = initialize_model( self.config.setup.agent_class, self.config.setup.policy_class, self.environments, self.config.settings.agent, self.config.settings.seed, tensorboard_log=self.config.setup.tensorboard_log, log_path=self.config_run.path_series_results, )
[docs] @contextmanager def prepare_environments(self, training: bool = True) -> Generator: """Context manager which prepares the environments and closes them after it exits. :param training: Should preparation be done for training (alternative: playing)? """ # If the agents specifies the population parameter, the number of environments usually has to be # equal to that value as well. See NSGA-II agent. if ( "population" in self.config.settings.agent and self.config.settings.n_environments != self.config.settings.agent["population"] ): if self.config.settings.n_environments != 1: log.warning( f"Agent specifies 'population' parameter but the number of environments " f"({self.config.settings.n_environments}) is not equal to the population. " f"Setting 'n_environments' to {self.config.settings.agent['population']}" ) self.config.settings.n_environments = self.config.settings.agent["population"] try: self._prepare_environments(training) yield finally: # close all environments when done (kill processes) if self.environments is not None: log.debug("Closing environments.") self.environments.close() else: log.error("Environment initialization failed.") if self.config.settings.interact_with_env: if self.interaction_env is not None: log.debug("Closing interaction environment.") self.interaction_env.close() else: log.error("Interaction environment initialization failed.")
def _prepare_environments(self, training: bool = True) -> None: """Vectorize and prepare the environments and potentially the interaction environments. :param training: Should preparation be done for training (alternative: playing)? """ # If the agents specifies the population parameter, the number of environments usually has to be # equal to that value as well. See NSGA-II agent. if ( "population" in self.config.settings.agent and self.config.settings.n_environments != self.config.settings.agent["population"] ): if self.config.settings.n_environments != 1: log.warning( f"Agent specifies 'population' parameter but the number of environments " f"({self.config.settings.n_environments}) is not equal to the population. " f"Setting 'n_environments' to {self.config.settings.agent['population']}" ) self.config.settings.n_environments = self.config.settings.agent["population"] assert self.config_run is not None, ( "Set the config_run attribute before trying to initialize the environments " "(for example by calling prepare_run)." ) env_class = self.config.setup.environment_class self.config_run.set_env_info(env_class) callback = CallbackEnvironment(self.config.settings.plot_interval) # Vectorize the environments self.environments = vectorize_environment( env_class, self.config_run, self.config.settings.environment, callback, self.config.settings.verbose, self.config.setup.vectorizer_class, self.config.settings.n_environments, training=training, monitor_wrapper=self.config.setup.monitor_wrapper, norm_wrapper_obs=self.config.setup.norm_wrapper_obs, norm_wrapper_reward=self.config.setup.norm_wrapper_reward, ) if self.config.settings.interact_with_env: # Perform some checks to ensure the interaction environment is configured correctly. if self.config.setup.interaction_env_class is None: raise ValueError( "If 'interact_with_env' is specified, an interaction env class must be specified as well." ) if self.config.settings.interaction_env is None: raise ValueError( "If 'interact_with_env' is specified, the interaction_env settings must be specified as well." ) interaction_env_class = self.config.setup.interaction_env_class self.config_run.set_interaction_env_info(interaction_env_class) # Vectorize the environment self.interaction_env = vectorize_environment( interaction_env_class, self.config_run, self.config.settings.interaction_env, callback, self.config.settings.verbose, training=training, )
[docs] def learn( self, series_name: str | None = None, run_name: str | None = None, run_description: str = "", reset: bool = False, callbacks: MaybeCallback = None, ) -> None: """Start the learning job for an agent with the specified environment. :param series_name: Name for a series of runs. :param run_name: Name for a specific run. :param run_description: Description for a specific run. :param reset: Indication whether possibly existing models should be reset. Learning will be continued if model exists and reset is false. :param callbacks: Provide additional callbacks to send to the model.learn() call. """ with self.prepare_environments_models(series_name, run_name, run_description, reset, training=True): assert self.config_run is not None, "Run configuration could not be found. Call prepare_run first." assert ( self.environments is not None ), "Initialized environments could not be found. Call prepare_environments first." assert self.model is not None, "Initialized model could not be found. Call prepare_model first." # Log some information about the model and configuration log_net_arch(self.model, self.config_run) log_run_info(self.config, self.config_run) # Genetic algorithm has a slightly different concept for saving since it does not stop between time steps if "n_generations" in self.config.settings.agent: save_freq = self.config.settings.save_model_every_x_episodes total_timesteps = self.config.settings.agent["n_generations"] else: # Check if all required config values are present if self.config.settings.episode_duration is None: raise ValueError("Missing configuration values for learning: 'episode_duration'.") if self.config.settings.sampling_time is None: raise ValueError("Missing configuration values for learning: 'sampling_time'.") if self.config.settings.n_episodes_learn is None: raise ValueError("Missing configuration values for learning: 'n_episodes_learn'.") # define callback for periodically saving models save_freq = int( self.config.settings.episode_duration / self.config.settings.sampling_time * self.config.settings.save_model_every_x_episodes ) total_timesteps = int( self.config.settings.n_episodes_learn * self.config.settings.episode_duration / self.config.settings.sampling_time ) # Set the seed for the environments before starting to learn self.environments.seed(self.config.settings.seed) callback_learn = merge_callbacks( CheckpointCallback( save_freq=save_freq, save_path=str(self.config_run.path_series_results / "models"), name_prefix=self.config_run.name, ), callbacks, ) # The experiments are reset before the learning phase begins, start learning log.info("Start learning process of agent in environment.") try: self.model.learn( total_timesteps=total_timesteps, callback=callback_learn, tb_log_name=self.config_run.name, ) except OSError: filename = str(self.config_run.path_series_results / f"{self.config_run.name}_model_before_error.pkl") log.info(f"Saving model to file: {filename}.") self.model.save(filename) raise try: log.debug("Resetting environment one more time to call environment callback one last time.") self.environments.reset() except ValueError as e: raise ValueError("An error occurred when the environment is resetting.") from e # Save model log.debug(f"Saving model to file: {self.config_run.path_run_model}.") self.model.save(self.config_run.path_run_model) if isinstance(self.environments, VecNormalize): log.debug(f"Saving environment normalization data to file: {self.config_run.path_vec_normalize}.") self.environments.save(str(self.config_run.path_vec_normalize)) log.info(f"Learning finished: {series_name} / {run_name}")
[docs] def play(self, series_name: str | None = None, run_name: str | None = None, run_description: str = "") -> None: """Play with previously learned agent model in environment. :param series_name: Name for a series of runs. :param run_name: Name for a specific run. :param run_description: Description for a specific run. """ with self.prepare_environments_models(series_name, run_name, run_description, reset=False, training=False): assert self.config_run is not None, "Run configuration could not be found. Call prepare_run first." assert ( self.environments is not None ), "Initialized environments could not be found. Call prepare_environments first." assert self.model is not None, "Initialized model could not be found. Call prepare_model first." if self.config.settings.n_episodes_play is None: raise ValueError("Missing configuration value for playing: 'n_episodes_play' in section 'settings'") # Log some information about the model and configuration log_net_arch(self.model, self.config_run) log_run_info(self.config, self.config_run) n_episodes_stop = self.config.settings.n_episodes_play # Reset the environments before starting to play try: log.debug("Resetting environments before starting to play.") observations = self._reset_envs() except ValueError as e: raise ValueError( "It is likely that returned observations do not conform to the specified state config." ) from e n_episodes = 0 log.debug("Start playing process of agent in environment.") if self.config.settings.interact_with_env: log.info("Starting agent with environment/optimization interaction.") else: log.info("Starting without an additional interaction environment.") _round_actions = self.config.settings.round_actions _scale_actions = self.config.settings.scale_actions if self.config.settings.scale_actions is not None else 1 while n_episodes < n_episodes_stop: try: observations, dones = self._play_step(_round_actions, _scale_actions, observations) except BaseException as e: log.error( "Exception occurred during an environment step. Aborting and trying to reset environments." ) try: observations = self._reset_envs() except BaseException as followup_exception: raise e from followup_exception log.debug("Environment reset successful - re-raising exception") raise e n_episodes += sum(dones)
def _play_step( self, _round_actions: int | None, _scale_actions: float, observations: VecEnvObs ) -> tuple[VecEnvObs, np.ndarray]: assert self.environments is not None, "Initialized environments could not be found. Call prepare_run first." # set policy prediction to deterministic for playing; type: ignore # Type ignored because typing in eta_x is bad action, _ = self.model.predict(observation=observations, deterministic=True) # type: ignore # Round and scale actions if required if _round_actions is not None: action = np.round(action * _scale_actions, _round_actions) else: action *= _scale_actions # Some agents (i.e. MPC) can interact with an additional environment if self.config.settings.interact_with_env: assert ( self.interaction_env is not None ), "Initialized interaction environments could not be found. Call prepare_run first." # Perform a step with the interaction environment and update the normal environment with # its observations observations, _rewards, dones, info = self.interaction_env.step(action) observations = np.array(self.environments.env_method("update", observations, indices=0)) # Make sure to also reset the environment, if the interaction_env says it's done. For the interaction # env this is done inside the vectorizer. for idx in range(self.environments.num_envs): if dones[idx]: info[idx]["terminal_observation"] = observations observations[idx] = self._reset_env_interaction(observations) else: observations, _rewards, dones, info = self.environments.step(action) return observations, dones def _reset_envs(self) -> VecEnvObs: """Reset the environment before and afterwards when the play and learn function is calling. :return: Observations after reset. """ assert self.environments is not None, "Initialized environments could not be found. Call prepare_run first." log.debug("Resetting environments.") self.environments.seed(self.config.settings.seed) if self.config.settings.interact_with_env: assert ( self.interaction_env is not None ), "Initialized interaction environments could not be found. Call prepare_run first." self.interaction_env.seed(self.config.settings.seed) observations = self.interaction_env.reset() return self._reset_env_interaction(observations) return self.environments.reset() def _reset_env_interaction(self, observations: VecEnvObs) -> VecEnvObs: """Reset the environments when interaction with another environment is taking place. :param Observations: Observations from the interaction env. :return: Observations after reset. """ assert self.environments is not None, "Initialized environments could not be found. Call prepare_run first." log.debug("Resetting main environment during environment interaction.") try: observations = np.array(self.environments.env_method("first_update", observations, indices=0)) except AttributeError as e: if "first_update" in str(e): observations = self.environments.reset() else: raise e return observations