from __future__ import annotations
import abc
import inspect
import json
import pathlib
from functools import partial
from typing import TYPE_CHECKING
import torch as th
from attrs import asdict
from stable_baselines3.common.vec_env import DummyVecEnv, VecMonitor, VecNormalize
from eta_utility import dict_get_any, log_add_filehandler
from . import processors
from .policies import NoPolicy
if TYPE_CHECKING:
from collections.abc import Mapping, Sequence
from typing import Any, Callable
from gymnasium import Env
from stable_baselines3.common.base_class import BaseAlgorithm, BasePolicy
from stable_baselines3.common.vec_env import VecEnv
from eta_utility.eta_x import ConfigOpt, ConfigOptRun
from eta_utility.eta_x.envs import BaseEnv
from eta_utility.type_hints import AlgoSettings, EnvSettings, Path
from logging import getLogger
log = getLogger(__name__)
[docs]
def vectorize_environment(
env: type[BaseEnv],
config_run: ConfigOptRun,
env_settings: EnvSettings,
callback: Callable[[BaseEnv], None],
verbose: int = 2,
vectorizer: type[DummyVecEnv] = DummyVecEnv,
n: int = 1,
*,
training: bool = False,
monitor_wrapper: bool = False,
norm_wrapper_obs: bool = False,
norm_wrapper_reward: bool = False,
) -> VecNormalize | VecEnv:
"""Vectorize the environment and automatically apply normalization wrappers if configured. If the environment
is initialized as an interaction_env it will not have normalization wrappers and use the appropriate configuration
automatically.
:param env: Environment class which will be instantiated and vectorized.
:param config_run: Configuration for a specific optimization run.
:param env_settings: Configuration settings dictionary for the environment which is being initialized.
:param callback: Callback to call with an environment instance.
:param verbose: Logging verbosity to use in the environment.
:param vectorizer: Vectorizer class to use for vectorizing the environments.
:param n: Number of vectorized environments to create.
:param training: Flag to identify whether the environment should be initialized for training or playing. If true,
it will be initialized for training.
:param norm_wrapper_obs: Flag to determine whether observations from the environments should be normalized.
:param norm_wrapper_reward: Flag to determine whether rewards from the environments should be normalized.
:return: Vectorized environments, possibly also wrapped in a normalizer.
"""
# Create the vectorized environment
log.debug("Trying to vectorize the environment.")
# Ensure n is one, if the DummyVecEnv is used (it doesn't support more than one)
if vectorizer.__class__.__name__ == "DummyVecEnv" and n != 1:
n = 1
log.warning("Setting number of environments to 1 because DummyVecEnv (default) is used.")
if "verbose" in env_settings and env_settings["verbose"] is not None:
verbose = env_settings.pop("verbose")
# Create the vectorized environment
def create_env(env_id: int) -> Env:
env_id += 1
return env(env_id=env_id, config_run=config_run, verbose=verbose, callback=callback, **env_settings)
envs: VecEnv | VecNormalize
envs = vectorizer([partial(create_env, i) for i in range(n)])
# The VecMonitor knows the ep_reward and so this can be logged to tensorboard
if monitor_wrapper:
envs = VecMonitor(envs)
# Automatically normalize the input features
if norm_wrapper_obs or norm_wrapper_reward:
# check if normalization data is available and load it if possible, otherwise
# create a new normalization wrapper.
if config_run.path_vec_normalize.is_file():
log.info(
f"Normalization data detected. Loading running averages into normalization wrapper: \n"
f"\t {config_run.path_vec_normalize}"
)
envs = VecNormalize.load(str(config_run.path_vec_normalize), envs)
envs.training = training
envs.norm_obs = norm_wrapper_obs
envs.norm_reward = norm_wrapper_reward
else:
log.info("No Normalization data detected.")
envs = VecNormalize(envs, training=training, norm_obs=norm_wrapper_obs, norm_reward=norm_wrapper_reward)
return envs
[docs]
def initialize_model(
algo: type[BaseAlgorithm],
policy: type[BasePolicy],
envs: VecEnv | VecNormalize,
algo_settings: AlgoSettings,
seed: int | None = None,
*,
tensorboard_log: bool = False,
log_path: Path | None = None,
) -> BaseAlgorithm:
"""Initialize a new model or algorithm.
:param algo: Algorithm to initialize.
:param policy: The policy that should be used by the algorithm.
:param envs: The environment which the algorithm operates on.
:param algo_settings: Additional settings for the algorithm.
:param seed: Random seed to be used by the algorithm.
:param tensorboard_log: Flag to enable logging to tensorboard.
:param log_path: Path for tensorboard log. Online required if logging is true
:return: Initialized model.
"""
log.debug(f"Trying to initialize model: {algo.__name__}")
_log_path = log_path if log_path is None or isinstance(log_path, pathlib.Path) else pathlib.Path(log_path)
# tensorboard logging
algo_kwargs = {}
if tensorboard_log:
if _log_path is None:
raise ValueError("If tensorboard logging is enabled, a path for results must be specified as well.")
log.info(f"Tensorboard logging is enabled. Log file: {_log_path}")
log.info(
f"Please run the following command in the console to start tensorboard: \n"
f'tensorboard --logdir "{_log_path}" --port 6006'
)
algo_kwargs = {"tensorboard_log": str(_log_path)}
# check if the agent takes all the default parameters.
algo_settings.setdefault("seed", seed)
algo_params = inspect.signature(algo).parameters
if "seed" not in algo_params and inspect.Parameter.VAR_KEYWORD not in {p.kind for p in algo_params.values()}:
del algo_settings["seed"]
log.warning(
f"'seed' is not a valid parameter for agent {algo.__name__}. This default parameter will be ignored."
)
# create model instance
return algo(policy, envs, **algo_settings, **algo_kwargs) # type: ignore
[docs]
def load_model(
algo: type[BaseAlgorithm],
envs: VecEnv | VecNormalize,
algo_settings: AlgoSettings,
path_model: Path,
*,
tensorboard_log: bool = False,
log_path: Path | None = None,
) -> BaseAlgorithm:
"""Load an existing model.
:param algo: Algorithm type of the model to be loaded.
:param envs: The environment which the algorithm operates on.
:param algo_settings: Additional settings for the algorithm.
:param path_model: Path to load the model from.
:param tensorboard_log: Flag to enable logging to tensorboard.
:param log_path: Path for tensorboard log. Online required if logging is true
:return: Initialized model.
"""
log.debug(f"Trying to load existing model: {path_model}")
_path_model = path_model if isinstance(path_model, pathlib.Path) else pathlib.Path(path_model)
_log_path = log_path if log_path is None or isinstance(log_path, pathlib.Path) else pathlib.Path(log_path)
if not _path_model.exists():
raise OSError(f"Model couldn't be loaded. Path not found: {_path_model}")
# tensorboard logging
algo_kwargs = {}
if tensorboard_log:
if _log_path is None:
raise ValueError("If tensorboard logging is enabled, a path for results must be specified as well.")
log.info(f"Tensorboard logging is enabled. Log file: {_log_path}")
log.info(
f"Please run the following command in the console to start tensorboard: \n"
f"tensorboard --logdir '{_log_path}' --port 6006"
)
algo_kwargs = {"tensorboard_log": str(_log_path)}
try:
model = algo.load(_path_model, envs, **algo_settings, **algo_kwargs) # type: ignore
log.debug("Model loaded successfully.")
except OSError as e:
raise OSError(f"Model couldn't be loaded: {e.strerror}. Filename: {e.filename}") from e
return model
[docs]
def log_to_file(config: ConfigOpt, config_run: ConfigOptRun) -> None:
"""Log output in terminal to the run_info file.
:param config: Configuration to figure out the logging settings.
:param config_run: Configuration for this optimization run.
"""
file_path = config_run.path_log_output
if config.settings.log_to_file:
try:
log_add_filehandler(filename=file_path)
except Exception:
log.error("Log file could not be created.")
[docs]
def log_run_info(config: ConfigOpt, config_run: ConfigOptRun) -> None:
"""Save run configuration to the run_info file.
:param config: Configuration for the framework.
:param config_run: Configuration for this optimization run.
"""
with config_run.path_run_info.open("w") as f:
class Encoder(json.JSONEncoder):
def default(self, o: object) -> object:
if isinstance(o, pathlib.Path):
return str(o)
if isinstance(o, abc.ABCMeta):
return None
return repr(o)
try:
json.dump({**asdict(config_run), **asdict(config)}, f, indent=4, cls=Encoder)
log.info("Log file successfully created.")
except TypeError:
log.warning("Log file could not be created because of non-serializable input in config.")
[docs]
def deserialize_net_arch(
net_arch: Sequence[Mapping[str, Any]], in_features: int, device: th.device | str = "auto"
) -> th.nn.Sequential:
"""Deserialize_net_arch can take a list of dictionaries describing a sequential torch network and deserialize
it by instantiating the corresponding classes.
An example for a possible net_arch would be:
.. code-block::
[{"layer": "Linear", "out_features": 60},
{"activation_func": "Tanh"},
{"layer": "Linear", "out_features": 60},
{"activation_func": "Tanh"}]
One key of the dictionary should be either 'layer', 'activation_func' or 'process'. If the 'layer' key is present,
a layer from the :py:mod:`torch.nn` module is instantiated, if the 'activation_func' key is present, the
value will be instantiated as an activation function from :py:mod:`torch.nn`. If the key 'process' is present,
the value will be interpreted as a data processor from :py:mod:`eta_utility.eta_x.common.processors`.
All other keys of each dictionary will be used as keyword parameters to the instantiation of the layer,
activation function or processor.
Only the number of input features for the first layer must be specified (using the 'in_features') parameter.
The function will then automatically determine the number of input features for all other layers in the
sequential network.
:param net_arch: List of dictionaries describing the network architecture.
:param in_features: Number of input features for the first layer.
:param device: Torch device to use for training the network.
:return: Sequential torch network.
"""
network = th.nn.Sequential()
_features = in_features
for net in net_arch:
_net = dict(net)
if "process" in net:
process = getattr(processors, _net.pop("process"))
# The "Split" process must be treated differently, because it needs to be deserialized recursively.
if {"net_arch" and "sizes"} < inspect.signature(process).parameters.keys():
sizes = process.get_full_sizes(_features, _net["sizes"])
_net["net_arch"] = [deserialize_net_arch(e, sizes[i], device) for i, e in enumerate(_net["net_arch"])]
try:
if len({"in_channels", "in_features"} & inspect.signature(process).parameters.keys()) > 0:
network.append(process(_features, **_net))
else:
network.append(process(**_net))
except TypeError as e:
raise TypeError(f"Could not instantiate processing module {process.__name__}: {e}") from e
elif "layer" in net:
layer = getattr(th.nn, _net.pop("layer"))
# Set the number of input features if required by the layer class
try:
if len({"in_channels", "in_features"} & inspect.signature(layer).parameters.keys()) > 0:
network.append(layer(_features, **_net))
else:
network.append(layer(**_net))
except TypeError as e:
raise TypeError(f"Could not instantiate layer module {layer.__name__}: {e}") from e
elif "activation_func" in net:
activation_func = _net.pop("activation_func")
try:
network.append(getattr(th.nn, activation_func)(**_net))
except TypeError as e:
raise TypeError(f"Could not instantiate activation function module {activation_func}: {e}") from e
else:
raise ValueError(f"Unknown process or layer type: {net}.")
_features = dict_get_any(_net, "out_channels", "out_features", fail=False, default=_features)
network.to(device)
return network
[docs]
def log_net_arch(model: BaseAlgorithm, config_run: ConfigOptRun) -> None:
"""Store network architecture or policy information in a file. This requires for the model to be initialized,
otherwise it will raise a ValueError.
:param model: The algorithm whose network architecture is stored.
:param config_run: Optimization run configuration (which contains info about the file to store info in).
:raises: ValueError.
"""
if not config_run.path_net_arch.exists() and model.policy is not None and model.policy.__class__ is not NoPolicy:
with pathlib.Path(config_run.path_net_arch).open("w") as f:
f.write(str(model.policy))
log.info(f"Net arch / Policy information store successfully in: {config_run.path_net_arch}.")
elif config_run.path_net_arch.exists():
log.info(f"Net arch / Policy information already exists in {config_run.path_net_arch}")
[docs]
def is_vectorized_env(env: BaseEnv | VecEnv | VecNormalize | None) -> bool:
"""Check if an environment is vectorized.
:param env: The environment to check.
"""
if env is None:
return False
return hasattr(env, "num_envs")
[docs]
def is_env_closed(env: BaseEnv | VecEnv | VecNormalize | None) -> bool:
"""Check whether an environment has been closed.
:param env: The environment to check.
"""
if env is None:
return True
if hasattr(env, "closed"):
return env.closed
if hasattr(env, "venv"):
return is_env_closed(env.venv)
return False
[docs]
def episode_results_path(series_results_path: Path, run_name: str, episode: int, env_id: int = 1) -> pathlib.Path:
"""Generate a filepath which can be used for storing episode results of a specific environment as a csv file.
Name is of the format: ThisRun_001_01.csv (run name _ episode number _ environment id .csv)
:param series_results_path: Path for results of the series of optimization runs.
:param run_name: Name of the optimization run.
:param episode: Number of the episode the environment is working on.
:param env_id: Identification of the environment.
"""
path = series_results_path if isinstance(series_results_path, pathlib.Path) else pathlib.Path(series_results_path)
return path / f"{episode_name_string(run_name, episode, env_id)}.csv"
[docs]
def episode_name_string(run_name: str, episode: int, env_id: int = 1) -> str:
"""Generate a name which can be used to pre or postfix files from a specific episode and run of an environment.
Name is of the format: ThisRun_001_01 (run name _ episode number _ environment id)
:param run_name: Name of the optimization run.
:param episode: Number of the episode the environment is working on.
:param env_id: Identification of the environment.
"""
return f"{run_name}_{episode:0>#3}_{env_id:0>#2}"