Source code for eta_utility.connectors.wetterdienst_connection

from __future__ import annotations

from abc import ABC, abstractmethod
from logging import getLogger
from typing import TYPE_CHECKING, Generic, TypeVar

import pandas as pd
from wetterdienst import Settings
from wetterdienst.provider.dwd.mosmix.api import DwdMosmixRequest
from wetterdienst.provider.dwd.observation.api import DwdObservationRequest

if TYPE_CHECKING:
    from collections.abc import Mapping
    from typing import Any

    from wetterdienst.core.timeseries.result import StationsResult

    from eta_utility.type_hints import Nodes, TimeStep

from datetime import datetime, timedelta

from eta_utility.connectors.node import (
    NodeWetterdienst,
    NodeWetterdienstObservation,
    NodeWetterdienstPrediction,
)

from .base_classes import SeriesConnection, SubscriptionHandler

log = getLogger(__name__)

NW = TypeVar("NW", bound=NodeWetterdienst)


[docs] class WetterdienstConnection(Generic[NW], SeriesConnection[NW], ABC): """ The WetterdienstConnection class is a connector to the Wetterdienst API for retrieving weather data. This class is an abstract base class and should not be used directly. Instead, use the subclasses :class:`WetterdienstObservationConnection` and :class:`WetterdienstPredictionConnection`. :param url: The base URL of the Wetterdienst API :param nodes: Nodes to select in connection :param settings: Wetterdienst settings object """ def __init__( self, *, nodes: Nodes[NW] | None = None, settings: Settings | None = None, **kwargs: Any, ) -> None: self.settings = Settings(settings=settings) self.settings.ts_skip_empty = True self.settings.ts_si_units = False self.settings.ts_humanize = True super().__init__("https://opendata.dwd.de/", nodes=nodes) # dummy url @classmethod def _from_node(cls, node: NW, **kwargs: Any) -> WetterdienstConnection: """Initialize the connection object from an wetterdienst protocol node object :param node: Node to initialize from :param kwargs: Extra keyword arguments """ settings = kwargs.get("settings") return super()._from_node(node, settings=settings)
[docs] @abstractmethod def read_series( self, from_time: datetime, to_time: datetime, nodes: Nodes[NW] | None = None, interval: TimeStep = 60, **kwargs: Any, ) -> pd.DataFrame: """Abstract base method for read_series(). Is fully implemented in :func:`~wetterdienst.WetterdienstObservationConnection.read_series` and :func:`~wetterdienst.WetterdienstPredictionConnection.read_series`. :param nodes: List of nodes to read values from. :param from_time: Starting time to begin reading (included in output). :param to_time: Time to stop reading at (not included in output). :param interval: interval between time steps. It is interpreted as seconds if given as integer. :param kwargs: additional argument list, to be defined by subclasses. :return: pandas.DataFrame containing the data read from the connection. """ if from_time.tzinfo != to_time.tzinfo: log.warning( f"Timezone of from_time and to_time are different. Using from_time timezone: {from_time.tzinfo}" )
[docs] def read(self, nodes: Nodes[NW] | None = None) -> pd.DataFrame: """ .. warning:: Cannot read single values from the Wetterdienst API. Use read_series instead :param nodes: List of nodes to read values from :return: Pandas DataFrame containing the data read from the connection """ raise NotImplementedError("Cannot read single values from the Wetterdienst API. Use read_series instead")
[docs] def write(self, values: Mapping[NW, Any], time_interval: timedelta | None = None) -> None: """ .. warning:: Cannot write to the Wetterdienst API. :param values: Dictionary of nodes and data to write. {node: value} :param time_interval: Interval between datapoints, default 1s """ raise NotImplementedError("Cannot write to the Wetterdienst API.")
[docs] def subscribe(self, handler: SubscriptionHandler, nodes: Nodes[NW] | None = None, interval: TimeStep = 1) -> None: """Subscribe to nodes and call handler when new data is available. This will return only the last available values. :param handler: SubscriptionHandler object with a push method that accepts node, value pairs :param interval: interval for receiving new data. It is interpreted as seconds when given as an integer. :param nodes: identifiers for the nodes to subscribe to """ raise NotImplementedError("Cannot subscribe to data from the Wetterdienst API.")
[docs] def subscribe_series( self, handler: SubscriptionHandler, req_interval: TimeStep, offset: TimeStep | None = None, nodes: Nodes[NW] | None = None, interval: TimeStep = 1, data_interval: TimeStep = 1, **kwargs: Any, ) -> None: """ .. warning:: Not implemented: Cannot subscribe to data from the Wetterdienst API. :param handler: SubscriptionHandler object with a push method that accepts node, value pairs :param req_interval: Duration covered by requested data (time interval). Interpreted as seconds if given as int :param offset: Offset from datetime.now from which to start requesting data (time interval). Interpreted as seconds if given as int. Use negative values to go to past timestamps. :param data_interval: Time interval between values in returned data. Interpreted as seconds if given as int. :param interval: interval (between requests) for receiving new data. It it interpreted as seconds when given as an integer. :param nodes: identifiers for the nodes to subscribe to """ raise NotImplementedError("Cannot subscribe to data from the Wetterdienst API.")
[docs] def close_sub(self) -> None: """ .. warning:: Not implemented: Cannot subscribe to data from the the Wetterdienst API. """ raise NotImplementedError("Cannot subscribe to data from the the Wetterdienst API.")
[docs] def retrieve_stations(self, node: NodeWetterdienst, request: DwdObservationRequest) -> pd.DataFrame: """ Retrieve stations from the Wetterdienst API and return the values as a pandas DataFrame Stations are filtered by the node's station_id or latlon and number_of_stations :param node: Node to retrieve stations for :param request: Wetterdienst request object, containing the station data """ # Retrieve stations. If station_id is provided, use it, otherwise use latlon to get nearest stations stations: StationsResult if node.station_id is not None: stations = request.filter_by_station_id(node.station_id) else: stations = request.filter_by_rank(node.latlon, rank=node.number_of_stations) # Convert to pandas and pivot values so date is the index and # node names combined with the station_id are the columns result_df: pd.DataFrame = stations.values.all().df.to_pandas() # noqa: PD011 (stations is not a dataframe) result_df = result_df.pivot_table(values="value", columns=("parameter", "station_id"), index="date") # Rename the columns to the node names result_df = result_df.rename({node.parameter.lower(): node.name}, axis="columns") return result_df.rename_axis(("Name", "station_id"), axis="columns")
[docs] class WetterdienstObservationConnection( WetterdienstConnection[NodeWetterdienstObservation], protocol="wetterdienst_observation" ): """ The WetterdienstObservationConnection class is a connector to the Wetterdienst API for retrieving weather observation data. Data can only be read with :func:`~wetterdienst.WetterdienstObservationConnection.read_series`. """
[docs] def read_series( self, from_time: datetime, to_time: datetime, nodes: Nodes[NodeWetterdienstObservation] | None = None, interval: TimeStep = 60, **kwargs: Any, ) -> pd.DataFrame: """ Read weather observation data from the Wetterdienst API for the given nodes and time interval :param from_time: Start time for the data retrieval :param to_time: End time for the data retrieval :param nodes: Nodes to read data from :param interval: Time interval between data points in seconds :return: Pandas DataFrame containing the data read from the connection """ super().read_series(from_time, to_time, nodes, interval) nodes = self._validate_nodes(nodes) interval = interval if isinstance(interval, timedelta) else timedelta(seconds=interval) def _read_node(node: NodeWetterdienstObservation) -> pd.Dataframe: # Get the resolution for the node from the interval resolution = NodeWetterdienstObservation.convert_interval_to_resolution(node.interval) # Create a request object for the node request: DwdObservationRequest = DwdObservationRequest( parameter=node.parameter, resolution=resolution, start_date=from_time, end_date=to_time, settings=self.settings, ) return self.retrieve_stations(node, request) # We can't use a ThreadPoolExecutor here, as the Wetterdienst library uses asyncio. # As a result, we have to call the _read_node method directly, which causes type errors. results = [] for node in nodes: results.append(_read_node(node)) result = pd.concat(results, axis=1, sort=False) # Convert the data to the requested interval return result.asfreq(interval, method="ffill").ffill()
[docs] class WetterdienstPredictionConnection( WetterdienstConnection[NodeWetterdienstPrediction], protocol="wetterdienst_prediction" ): """ The WetterdienstPredictionConnection class is a connector to the Wetterdienst API for retrieving weather prediction data (MOSMIX). Data can only be read with :func:`~wetterdienst.WetterdienstPredictionConnection.read_series`. """
[docs] def read_series( self, from_time: datetime, to_time: datetime, nodes: Nodes[NodeWetterdienstPrediction] | None = None, interval: TimeStep = 0, **kwargs: Any, ) -> pd.DataFrame: """ Read weather prediction data from the Wetterdienst API for the given nodes. The interval parameter is not used for prediction data, as predictions are always given hourly. :param from_time: Start time for the data retrieval :param to_time: End time for the data retrieval :param nodes: Nodes to read data from :param interval: - Not used for prediction data :return: Pandas DataFrame containing the data read from the connection """ super().read_series(from_time, to_time, nodes, interval) nodes = self._validate_nodes(nodes) def _read_node(node: NodeWetterdienstPrediction) -> pd.Dataframe: request = DwdMosmixRequest( parameter=node.parameter, mosmix_type=node.mosmix_type, start_date=from_time, end_date=to_time, settings=self.settings, ) return self.retrieve_stations(node, request) # We can't use a ThreadPoolExecutor here, as the Wetterdienst library uses asyncio. # As a result, we have to call the _read_node method directly, which causes type errors. results = [] for node in nodes: results.append(_read_node(node)) return pd.concat(results, axis=1, sort=False)