Source code for eta_utility.connectors.entso_e

"""Utility functions for connecting to the ENTSO-E Transparency database and for reading data. This connector
does not have the ability to write data.
"""

from __future__ import annotations

import concurrent.futures
from datetime import datetime, timedelta, timezone
from logging import getLogger
from typing import TYPE_CHECKING

import numpy as np
import pandas as pd
import requests
from lxml import etree
from lxml.builder import E
from requests_cache import DO_NOT_CACHE, CachedSession

from eta_utility.connectors.node import NodeEntsoE
from eta_utility.timeseries import df_resample, df_time_slice
from eta_utility.util import dict_search, round_timestamp

if TYPE_CHECKING:
    from collections.abc import Mapping
    from typing import Any, Final

    from eta_utility.type_hints import Nodes, TimeStep

from .base_classes import SeriesConnection, SubscriptionHandler

log = getLogger(__name__)


[docs] class ENTSOEConnection(SeriesConnection[NodeEntsoE], protocol="entsoe"): """ ENTSOEConnection is a class to download and upload multiple features from and to the ENTSO-E transparency platform database as timeseries. The platform contains data about the european electricity markets. :param url: Url of the server with scheme (https://web-api.tp.entsoe.eu/) :param usr: Username for login to the platform (usually not required - default: None) :param pwd: Password for login to the platform (usually not required - default: None) :param api_token: Token for API authentication :param nodes: Nodes to select in connection """ API_PATH: str = "/api" def __init__( self, url: str = "https://web-api.tp.entsoe.eu/", *, api_token: str, nodes: Nodes[NodeEntsoE] | None = None, ) -> None: url = url + self.API_PATH self._api_token: str = api_token super().__init__(url, None, None, nodes=nodes) self._node_ids: str | None = None self.config = _ConnectionConfiguration() self._session: CachedSession = CachedSession( cache_name="eta_utility/connectors/requests_cache/entso_e_cache", urls_expire_after={ "https://web-api.tp.entsoe.eu/*": timedelta(minutes=15), "*": DO_NOT_CACHE, # Don't cache other URLs }, allowable_codes=(200, 400, 401), use_cache_dir=True, ) @classmethod def _from_node(cls, node: NodeEntsoE, **kwargs: Any) -> ENTSOEConnection: """Initialize the connection object from an entso-e protocol node object :param node: Node to initialize from :param kwargs: Keyword arguments for API authentication, where "api_token" is required :return: ENTSOEConnection object """ if "api_token" not in kwargs: raise AttributeError("Missing required function parameter api_token.") api_token = kwargs["api_token"] return super()._from_node(node, api_token=api_token)
[docs] def read(self, nodes: Nodes[NodeEntsoE] | None = None) -> pd.DataFrame: """ .. warning:: Cannot read single values from ENTSO-E transparency platform. Use read_series instead :param nodes: List of nodes to read values from :return: Pandas DataFrame containing the data read from the connection """ raise NotImplementedError( "Cannot read single values from ENTSO-E transparency platform. Use read_series instead" )
[docs] def write( self, values: Mapping[NodeEntsoE, Mapping[datetime, Any]], time_interval: timedelta | None = None ) -> None: """ .. warning:: Cannot write to ENTSO-E transparency platform. :param values: Dictionary of nodes and data to write. {node: value} :param time_interval: Interval between datapoints (i.e. between "From" and "To" in EnEffCo Upload), default 1s """ raise NotImplementedError("Cannot write to ENTSO-E transparency platform.")
[docs] def subscribe( self, handler: SubscriptionHandler, nodes: Nodes[NodeEntsoE] | None = None, interval: TimeStep = 1 ) -> None: """Subscribe to nodes and call handler when new data is available. This will return only the last available values. :param handler: SubscriptionHandler object with a push method that accepts node, value pairs :param interval: interval for receiving new data. It is interpreted as seconds when given as an integer. :param nodes: identifiers for the nodes to subscribe to """ self.subscribe_series(handler=handler, req_interval=1, nodes=nodes, interval=interval, data_interval=interval)
def _handle_xml(self, xml_content: bytes) -> dict[str, dict[str, list[pd.Series]]]: """Transform XML data from request response into dictionary containing resolutions and time series for the node. :param xml_content: XML data :return: Dictionary with resolutions and time series data """ parser = etree.XMLParser(load_dtd=False, ns_clean=True, remove_pis=True) xml_data = etree.XML(xml_content, parser) ns = xml_data.nsmap data: dict[str, dict[str, list[pd.Series]]] = {} request_type = xml_data.find(".//type", namespaces=ns).text timeseries = xml_data.findall(".//TimeSeries", namespaces=ns) for ts in timeseries: # Day-Ahead Price if request_type == "A44": col_name = "Price" # Actual Generation per Type if request_type == "A75": psr_type = ts.find(".//MktPSRType", namespaces=ns).find("psrType", namespaces=ns).text col_name = dict_search(self.config.psr_types, psr_type) if ts.find(".//inBiddingZone_Domain.mRID", namespaces=ns) is not None: col_name = col_name + "_Generation" elif ts.find(".//outBiddingZone_Domain.mRID", namespaces=ns) is not None: col_name = col_name + "_Consumption" # contains the data points period = ts.find(".//Period", namespaces=ns) # datetime range of the data points time_interval = period.find(".//timeInterval", namespaces=ns).getchildren() resolution = period.find(".//resolution", namespaces=ns).text[2:4] # truncating string PR60M datetime_range = pd.date_range( datetime.strptime(time_interval[0].text, "%Y-%m-%dT%H:%MZ"), datetime.strptime(time_interval[1].text, "%Y-%m-%dT%H:%MZ"), freq=resolution + "min", inclusive="left", ) points = period.findall(".//Point", namespaces=ns) ts_data = [point.getchildren()[-1].text for point in points] # Handle missing data points if len(ts_data) < len(datetime_range): indices = set(range(len(datetime_range))) for point in points: indices.remove(int(point.getchildren()[0].text) - 1) for miss in indices: ts_data.insert(miss, np.nan) s = pd.Series(data=ts_data, index=datetime_range, name=col_name) s.index = s.index.tz_localize(tz="UTC") # ENTSO-E returns always UTC if resolution not in data: data[resolution] = {} if col_name not in data[resolution]: data[resolution][col_name] = [] data[resolution][col_name].append(s.astype(float)) return data
[docs] def read_series( self, from_time: datetime, to_time: datetime, nodes: Nodes[NodeEntsoE] | None = None, interval: TimeStep = 1, **kwargs: Any, ) -> pd.DataFrame: """Download timeseries data from the ENTSO-E Database :param nodes: List of nodes to read values from :param from_time: Starting time to begin reading (included in output) :param to_time: Time to stop reading at (not included in output) :param interval: interval between time steps. It is interpreted as seconds if given as integer. :return: Pandas DataFrame containing the data read from the connection """ nodes = self._validate_nodes(nodes) interval = interval if isinstance(interval, timedelta) else timedelta(seconds=interval) from_time = round_timestamp(from_time, interval.total_seconds()) to_time = round_timestamp(to_time, interval.total_seconds()) if from_time.tzinfo != to_time.tzinfo: log.warning( f"Timezone of from_time and to_time are different. Using from_time timezone: {from_time.tzinfo}" ) def read_node(node: NodeEntsoE) -> pd.DataFrame: params = self.config.create_params(node, from_time, to_time) result = self._raw_request(params) data = self._handle_xml(result.content) df_dict = {} # All resolutions are resampled separately and concatenated to one dataframe in the end for resolution in data: data_resolution = { f"{node.name}_{column}": pd.concat(series) for column, series in data[resolution].items() } df_resolution = pd.DataFrame.from_dict(data_resolution, orient="columns") # entsoe always returns a dataframe in UTC time, convert to same time zone as given from_time df_resolution.index = df_resolution.index.tz_convert(tz=from_time.tzinfo) df_resolution = df_resample(df_resolution, interval, missing_data="fillna") df_resolution = df_time_slice(df_resolution, from_time, to_time) df_dict[resolution] = df_resolution value_df = pd.concat(df_dict.values(), axis=1, keys=df_dict.keys()) return value_df.swaplevel(axis=1) with concurrent.futures.ThreadPoolExecutor() as executor: results = executor.map(read_node, nodes) return pd.concat(results, axis=1, sort=False)
[docs] def subscribe_series( self, handler: SubscriptionHandler, req_interval: TimeStep, offset: TimeStep | None = None, nodes: Nodes[NodeEntsoE] | None = None, interval: TimeStep = 1, data_interval: TimeStep = 1, **kwargs: Any, ) -> None: """ .. warning:: Not implemented: Cannot subscribe to data from the ENTSO-E transparency platform. :param handler: SubscriptionHandler object with a push method that accepts node, value pairs :param req_interval: Duration covered by requested data (time interval). Interpreted as seconds if given as int :param offset: Offset from datetime.now from which to start requesting data (time interval). Interpreted as seconds if given as int. Use negative values to go to past timestamps. :param data_interval: Time interval between values in returned data. Interpreted as seconds if given as int. :param interval: interval (between requests) for receiving new data. It it interpreted as seconds when given as an integer. :param nodes: identifiers for the nodes to subscribe to """ raise NotImplementedError("Cannot subscribe to data from the ENTSO-E transparency platform.")
[docs] def close_sub(self) -> None: """ .. warning:: Not implemented: Cannot subscribe to data from the ENTSO-E transparency platform. """ raise NotImplementedError("Cannot subscribe to data from the ENTSO-E transparency platform.")
def _raw_request(self, params: Mapping[str, str], **kwargs: Mapping[str, Any]) -> requests.Response: """Perform ENTSO-E request and handle possibly resulting errors. :param params: Parameters to identify the endpoint :param kwargs: Additional arguments for the request. :return: request response """ # Prepare the basic request for usage in the requests. headers = {"Content-Type": "application/xml", "SECURITY_TOKEN": self._api_token} xml = self.config.xml_head() for param, val in params.items(): xml.append(self.config.xml_param(param, val)) response = self._session.post(self.url, data=etree.tostring(xml), headers=headers, **kwargs) if response.status_code == 400: try: parser = etree.XMLParser(load_dtd=False, ns_clean=True, remove_pis=True) e_msg = etree.XML(response.content, parser) ns = e_msg.nsmap e_code = e_msg.find(".//Reason", namespaces=ns).find("code", namespaces=ns).text e_text = e_msg.find(".//Reason", namespaces=ns).find("text", namespaces=ns).text response.reason = f"ENTSO-E Error {response.status_code} ({e_code}: {e_text})" except Exception: pass response.raise_for_status() return response
class _ConnectionConfiguration: """Auxiliary class to configure the parameters for establishing a connection to ENTSO-E API. Currently, the connection class only supports two types of data requests through the method read_series, they are: **Energy price day ahead** and **Actual energy generation per type**. All the data requests available are listed in the _doc_type class attribute, but each of them contains a mandatory list of parameters to establish the connection, which can be seemed in the ENTSO-E documentation_. .. _documentation: https://transparency.entsoe.eu/content/static_content/Static%20content/web%20api/Guide.html """ #: XML Namespace for the API _XMLNS: Final[str] = "urn:iec62325.351:tc57wg16:451-5:statusrequestdocument:4:0" #: bidding zones is a mapping of three letter iso country codes to bidding zones. _BIDDING_ZONES: Final[dict[str, str]] = { "DEU": "10Y1001A1001A83F", "DEU-AUT-LUX": "10Y1001A1001A63L", "ALB": "10YAL-KESH-----5", "AUT": "10YAT-APG------L", "BLR": "10Y1001A1001A51S", "BEL": "10YBE----------2", "BIH": "10YBA-JPCC-----D", "BGR": "10YCA-BULGARIA-R", "CZE-DEU-SVK": "10YDOM-CZ-DE-SKK", "HRV": "10YHR-HEP------M", "CYP": "10YCY-1001A0003J", "CZE": "10YCZ-CEPS-----N", "DEU-LUX": "10Y1001A1001A82H", "DNK_west": "10YDK-1--------W", "DNK_central": "10YDK-2--------M", "EST": "10Y1001A1001A39I", "FIN": "10YFI-1--------U", "MKD": "10YMK-MEPSO----8", "FRA": "10YFR-RTE------C", "GB": "17Y0000009369493", "GRC": "10YGR-HTSO-----Y", "HUN": "10YHU-MAVIR----U", "IRL": "10Y1001A1001A59C", "ITA_brindisi": "10Y1001A1001A699", "ITA_calabria": "10Y1001C--00096J", "ITA_central_north": "10Y1001A1001A70O", "ITA_central_south": "10Y1001A1001A71M", "ITA_foggia": "10Y1001A1001A72K", "ITA-GRC": "10Y1001A1001A66F", "ITA_malta": "10Y1001A1001A877", "ITA_north": "10Y1001A1001A73I", "ITA-AUT": "10Y1001A1001A80L", "ITA-CHE": "10Y1001A1001A68B", "ITA-FRA": "10Y1001A1001A81J", "ITA-SVN": "10Y1001A1001A67D", "ITA_priolo": "10Y1001A1001A76C", "ITA_rossano": "10Y1001A1001A77A", "ITA_sardinia": "10Y1001A1001A74G", "ITA_sicily": "10Y1001A1001A75E", "ITA_south": "10Y1001A1001A788", "RUS_kaliningrad": "10Y1001A1001A50U", "LVA": "10YLV-1001A00074", "LTU": "10YLT-1001A0008Q", "LUX": "10YLU-CEGEDEL-NQ", "MLT": "10Y1001A1001A93C", "MNE": "10YCS-CG-TSO---S", "GBR": "10YGB----------A", "NLD": "10YNL----------L", "NOR_1": "10YNO-1--------2", "NOR_2": "10YNO-2--------T", "NOR_3": "10YNO-3--------J", "NOR_4": "10YNO-4--------9", "NOR_5": "10Y1001A1001A48H", "POL": "10YPL-AREA-----S", "PRT": "10YPT-REN------W", "MDA": "10Y1001A1001A990", "ROU": "10YRO-TEL------P", "RUS": "10Y1001A1001A49F", "SWE_1": "10Y1001A1001A44P", "SWE_2": "10Y1001A1001A45N", "SWE_3": "10Y1001A1001A46L", "SWE_4": "10Y1001A1001A47J", "SRB": "10YCS-SERBIATSOV", "SVK": "10YSK-SEPS-----K", "SVN": "10YSI-ELES-----O", "ESP": "10YES-REE------0", "SWE": "10YSE-1--------K", "CHE": "10YCH-SWISSGRIDZ", "TUR": "10YTR-TEIAS----W", "UKR": "10Y1001C--00003F", } _MARKET_AGREEMENTS: Final[dict[str, str]] = { "Daily": "A01", "Weekly": "A02", "Monthly": "A03", "Yearly": "A04", "Total": "A05", "Long term": "A06", "Intraday": "A07", "Hourly": "A13", } _AUCTION_TYPES: Final[dict[str, str]] = { "Implicit": "A01", "Explicit": "A02", } _AUCTION_CATEGORIES: Final[dict[str, str]] = { "Base": "A01", "Peak": "A02", "Off Peak": "A03", "Hourly": "A04", } _PSR_TYPES: Final[dict[str, str]] = { "Mixed": "A03", "Generation": "A04", "Load": "A05", "Biomass": "B01", "Fossil Brown coal/Lignite": "B02", "Fossil Coal-derived gas": "B03", "Fossil Gas": "B04", "Fossil Hard coal": "B05", "Fossil Oil": "B06", "Fossil Oil shale": "B07", "Fossil Peat": "B08", "Geothermal": "B09", "Hydro Pumped Storage": "B10", "Hydro Run-of-river and poundage": "B11", "Hydro Water Reservoir": "B12", "Marine": "B13", "Nuclear": "B14", "Other renewable": "B15", "Solar": "B16", "Waste": "B17", "Wind Offshore": "B18", "Wind Onshore": "B19", "Other": "B20", "AC Link": "B21", "DC Link": "B22", "Substation": "B23", "Transformer": "B24", } _BUSINESS_TYPES: Final[dict[str, str]] = { "General Capacity Information": "A25", "Already allocated capacity (AAC)": "A29", "Requested capacity (without price)": "A43", "System Operator redispatching": "A46", "Planned maintenance": "A53", "Unplanned outage": "A54", "Internal redispatch": "A85", "Frequency containment reserve": "A95", "Automatic frequency restoration reserve": "A96", "Manual frequency restoration reserve": "A97", "Replacement reserve": "A98", "Interconnector network evolution": "B01", "Interconnector network dismantling": "B02", "Counter trade": "B03", "Congestion costs": "B04", "Capacity allocated (including price)": "B05", "Auction revenue": "B07", "Total nominated capacity": "B08", "Net position": "B09", "Congestion income": "B10", "Production unit": "B11", "Area Control Error": "B33", "Procured capacity": "B95", "Shared Balancing Reserve Capacity": "C22", "Share of reserve capacity": "C23", "Actual reserve capacity": "C24", } _PROCESS_TYPES: Final[dict[str, str]] = { "Day ahead": "A01", "Intra day incremental": "A02", "Realised": "A16", "Intraday total": "A18", "Week ahead": "A31", "Month ahead": "A32", "Year ahead": "A33", "Synchronisation process": "A39", "Intraday process": "A40", "Replacement reserve": "A46", "Manual frequency restoration reserve": "A47", "Automatic frequency restoration reserve": "A51", "Frequency containment reserve": "A52", "Frequency restoration reserve": "A56", } _DOC_STATES: Final[dict[str, str]] = { "Intermediate": "A01", "Final": "A02", "Active": "A05", "Cancelled": "A09", "Withdrawn": "A13", "Estimated": "X01", } _DOC_TYPES: Final[dict[str, str]] = { "FinalisedSchedule": "A09", "AggregatedEnergyDataReport": "A11", "AcquiringSystemOperatorReserveSchedule": "A15", "Bid": "A24", "AllocationResult": "A25", "Capacity": "A26", "AgreedCapacity": "A31", "ReserveAllocationResult": "A38", "Price": "A44", "EstimatedNetTransferCapacity": "A61", "RedispatchNotice": "A63", "SystemTotalLoad": "A65", "InstalledGenerationPerType": "A68", "WindAndSolarForecast": "A69", "LoadForecastMargin": "A70", "GenerationForecast": "A71", "ReservoirFillingInformation": "A72", "ActualGeneration": "A73", "WindAndSolarGeneration": "A74", "ActualGenerationPerType": "A75", "LoadUnavailability": "A76", "ProductionUnavailability": "A77", "TransmissionUnavailability": "A78", "OffshoreGridInfrastructureUnavailability": "A79", "GenerationUnavailability": "A80", "ContractedReserves": "A81", "AcceptedOffers": "A82", "ActivatedBalancingQuantities": "A83", "ActivatedBalancingPrices": "A84", "ImbalancePrices": "A85", "ImbalanceVolume": "A86", "FinancialSituation": "A87", "CrossBorderBalancing": "A88", "ContractedReservePrices": "A89", "InterconnectionNetworkExpansion": "A90", "CounterTradeNotice": "A91", "CongestionCosts": "A92", "DCLinkCapacity": "A93", "NonEUAllocations": "A94", "Configuration": "A95", "FlowBasedAllocations": "B11", } def create_params(self, node: NodeEntsoE, from_time: datetime, to_time: datetime) -> dict[str, str]: """Create request parameters object according to API specifications Handle configuration parameters for each type of connection :param node: ENTSO-E Node :param from_time: Starting time :param to_time: End time :return: Dictionary with parameters """ if node.endpoint not in self._DOC_TYPES: raise ValueError(f"Unsupported endpoint for ENTSO-E connection: {node.endpoint}.") params = {"DocumentType": node.endpoint} if node.endpoint == "ActualGenerationPerType": params["ProcessType"] = "Realised" params["In_Domain"] = node.bidding_zone elif node.endpoint == "Price": params["ProcessType"] = "Day ahead" params["In_Domain"] = node.bidding_zone params["Out_Domain"] = node.bidding_zone else: raise NotImplementedError(f"Endpoint not available: {node.endpoint}") # Round down at from_time and up at to_time to receive all necessary values from entsoe # entsoe uses always a full hour rounded_from_time_utc = round_timestamp(from_time.astimezone(timezone.utc), 3600) - timedelta(hours=1) rounded_to_time_utc = round_timestamp(to_time.astimezone(timezone.utc), 3600) params["TimeInterval"] = ( f"{rounded_from_time_utc.strftime('%Y-%m-%dT%H:%M:%SZ')}/" f"{rounded_to_time_utc.strftime('%Y-%m-%dT%H:%M:%SZ')}" ) return params def xml_head(self) -> etree.ElementTree: """Create header of the xml data for the POST method. :return: tree of elements with the pre-defined values for the request """ now = datetime.utcnow() # Prepare XML Header data data = E("StatusRequest_MarketDocument", xmlns=self._XMLNS) data.append(E("mRID", f"Request_{now.isoformat(sep='T', timespec='seconds')}")) data.append(E("type", "A59")) data.append(E("sender_MarketParticipant.mRID", "10X1001A1001A450", codingScheme="A01")) data.append(E("sender_MarketParticipant.marketRole.type", "A07")) data.append(E("receiver_MarketParticipant.mRID", "10X1001A1001A450", codingScheme="A01")) data.append(E("receiver_MarketParticipant.marketRole.type", "A32")) data.append(E("createdDateTime", f"{now.isoformat(sep='T', timespec='seconds')}Z")) return data def xml_param(self, parameter: str, value: str) -> etree.Element: """Map parameters to request values for the xml document. :return: tree with parameters """ if parameter in {"Contract_MarketAgreement.Type", "Type_MarketAgreement.Type"}: value = self._MARKET_AGREEMENTS[value] elif parameter == "Auction.Type": value = self._AUCTION_TYPES[value] elif parameter == "Auction.Category": value = self._AUCTION_CATEGORIES[value] elif parameter == "PsrType": value = self._PSR_TYPES[value] elif parameter == "BusinessType": value = self._BUSINESS_TYPES[value] elif parameter == "ProcessType": value = self._PROCESS_TYPES[value] elif parameter == "DocStatus": value = self._DOC_STATES[value] elif parameter == "DocumentType": value = self._DOC_TYPES[value] elif parameter in {"In_Domain", "Out_Domain"}: value = self._BIDDING_ZONES[value] return E("AttributeInstanceComponent", E("attribute", parameter), E("attributeValue", value)) @property def psr_types(self) -> dict[str, str]: return self._PSR_TYPES @property def doc_types(self) -> dict[str, str]: return self._DOC_TYPES