Source code for eta_utility.timeseries.scenarios

from __future__ import annotations

import pathlib
from collections.abc import Sequence, Sized
from datetime import datetime, timedelta
from typing import TYPE_CHECKING

import numpy as np
import pandas as pd

from eta_utility import timeseries

if TYPE_CHECKING:
    from collections.abc import Mapping
    from typing import SupportsFloat

    from eta_utility.type_hints import Path, TimeStep


[docs] def scenario_from_csv( paths: Path | Sequence[Path], data_prefixes: Sequence[str] | None = None, *, start_time: datetime, end_time: datetime | None = None, total_time: TimeStep | None = None, random: np.random.Generator | bool | None = False, resample_time: TimeStep | None = None, interpolation_method: Sequence[str | None] | str | None = None, rename_cols: Mapping[str, str] | None = None, prefix_renamed: bool = True, infer_datetime_from: str | Sequence[Sequence[int]] | Sequence[str] = "string", time_conversion_str: str | Sequence[str] = "%Y-%m-%d %H:%M", scaling_factors: Sequence[Mapping[str, SupportsFloat] | None] | Mapping[str, SupportsFloat] | None = None, ) -> pd.DataFrame: """Import (possibly multiple) scenario data files from csv files and return them as a single pandas data frame. The import function supports column renaming and will slice and resample data as specified. :raises ValueError: If start and/or end times are outside the scope of the imported scenario files. .. note:: The ValueError will only be raised when this is true for all files. If only one file is outside the range, an empty series will be returned for that file. :param paths: Path(s) to one or more CSV data files. The paths should be fully qualified. :param data_prefixes: If more than one file is imported, a list of data_prefixes must be supplied such that ambiguity of column names between the files can be avoided. There must be one prefix for every imported file, such that a distinct prefix can be prepended to all columns of a file. :param start_time: Starting time for the scenario import. :param end_time: Latest ending time for the scenario import (default: inferred from start_time and total_time). :param total_time: Total duration of the imported scenario. If given as int this will be interpreted as seconds (default: inferred from start_time and end_time). :param random: Set to true if a random starting point (within the interval determined by start_time and end_time) should be chosen. This will use the environments' random generator. :param resample_time: Resample the scenario data to the specified interval. If this is specified one of 'upsample_fill' or downsample_method' must be supplied as well to determine how the new data points should be determined. If given as an int, this will be interpreted as seconds (default: no resampling). :param interpolation_method: Method for interpolating missing data values. Pandas missing data handling methods are supported. If a list with one value per file is given, the specified method will be selected according to the order of paths. :param rename_cols: Rename columns of the imported data. Maps the columns as they appear in the data files to new names. Format: {old_name: new_name}. .. note:: The column names are normalized to lowercase and underscores are added in place of spaces. Additionally, everything after the first symbol is removed. For example "Water Temperature #2" becomes "water_temperature". So if you want to rename the column, you need to specify for example: {"water_temperature": "T_W"}. :param prefix_renamed: Should prefixes be applied to renamed columns as well? When setting this to false make sure that all columns in all loaded scenario files have different names. Otherwise, there is a risk of overwriting data. :param infer_datetime_from: Specify how datetime values should be converted. 'dates' will use pandas to automatically determine the format. 'string' uses the conversion string specified in the 'time_conversion_str' parameter. If a two-tuple of the format (row, col) is given, data from the specified field in the data files will be used to determine the date format. :param time_conversion_str: Time conversion string. This must be specified if the infer_datetime_from parameter is set to 'string'. The string should specify the datetime format in the python strptime format. :param scaling_factors: Scaling factors for each imported column. :return: Imported and processed data as pandas.DataFrame. """ if not isinstance(paths, Sized): paths = [paths] _paths = [] for path in paths: _paths.append(path if isinstance(path, pathlib.Path) else pathlib.Path(path)) # interpolation methods needs to be a list, so in case of None create a list of Nones if isinstance(interpolation_method, str) or interpolation_method is None: interpolation_method = [interpolation_method] * len(_paths) elif len(interpolation_method) != len(_paths): raise ValueError("The number of interpolation methods does not match the number of paths.") _interpolation_method: list[str | None] = list(interpolation_method) # scaling needs to be a list, so in case of None create a list of Nones if not isinstance(scaling_factors, Sequence): if len(_paths) > 1: raise ValueError("The scaling factors need to be defined for each path") scaling_factors = [scaling_factors] elif len(scaling_factors) != len(_paths): raise ValueError("The number of scaling factors does not match the number of paths.") _scaling_factors: list[Mapping[str, SupportsFloat] | None] = list(scaling_factors) # time conversion string needs to be a list if isinstance(time_conversion_str, str): time_conversion_str = [time_conversion_str] * len(_paths) elif len(time_conversion_str) != len(_paths): raise ValueError("The number of time conversion strings does not match the number of paths.") _time_conversion_str: list[str] = list(time_conversion_str) # columns to consider as datetime values (infer_datetime_from) # needs to be a list, so in case of a single string create a list if isinstance(infer_datetime_from, str): infer_datetime_from = [infer_datetime_from] * len(_paths) _infer_datetime_from: list[str | Sequence[int]] = list(infer_datetime_from) # Set defaults and convert values where necessary if total_time is not None: total_time = total_time if isinstance(total_time, timedelta) else timedelta(seconds=total_time) # If resample_time is None, set it to 0 resample_time = resample_time or 0 _resample_time = resample_time if isinstance(resample_time, timedelta) else timedelta(seconds=resample_time) _random = random if random is not None else False slice_begin, slice_end = timeseries.find_time_slice( start_time, end_time, total_time=total_time, random=_random, round_to_interval=_resample_time, ) scenario = pd.DataFrame() for i, path in enumerate(_paths): data = timeseries.df_from_csv( path, infer_datetime_from=_infer_datetime_from[i], time_conversion_str=_time_conversion_str[i], ) data = timeseries.df_resample( data, _resample_time, missing_data=_interpolation_method[i], ) data = data[slice_begin:slice_end].copy() # type: ignore scaling_factor = _scaling_factors[i] col_names = {} for col in data.columns: prefix = data_prefixes[i] if data_prefixes else None col_names[col] = _fix_col_name(col, prefix, prefix_renamed, rename_cols) if scaling_factor is None: continue if col in scaling_factor: data[col] = data[col].multiply(scaling_factor[col]) # rename all columns with the name mapping determined above data = data.rename(columns=col_names) scenario = pd.concat((data, scenario), axis=1) # Make sure that the resulting file corresponds to the requested time slice if ( len(scenario) <= 0 or scenario.first_valid_index() > slice_begin + _resample_time or scenario.last_valid_index() < slice_end - _resample_time ): raise ValueError( "The loaded scenario file does not contain enough data for the entire selected time slice. Or the set " "scenario times do not correspond to the provided data." ) return scenario
def _fix_col_name( name: str, prefix: str | None = None, prefix_renamed: bool = False, rename_cols: Mapping[str, str] | None = None, ) -> str: """Figure out correct name for the column. :param name: Name to rename. :param prefix: Prefix to prepend to the name. :param prefix_renamed: Prepend prefix if name is renamed? :param rename_cols: Mapping of old names to new names. """ if not prefix_renamed and rename_cols is not None and name in rename_cols: pre = "" name = str(rename_cols[name]) elif prefix_renamed and rename_cols is not None and name in rename_cols: pre = f"{prefix}_" if prefix else "" name = str(rename_cols[name]) else: pre = f"{prefix}_" if prefix is not None else "" name = str(name) return f"{pre}{name}"