Source code for eta_utility.connectors.forecast_solar

"""
This module provides a read-only REST API connector to the forecast.solar API.

You can obtain an estimate of solar production for a specific location, defined by latitude and longitude,
and a specific plane orientation, defined by declination and azimuth, based on the installed module power.

Supported endpoints include: "Estimate", "Historic", and "Clearsky":

**Estimate Solar Production**
The `estimate` endpoint provides the forecast for today and the upcoming days, depending on the account model.

**Historic Solar Production**
The `historic` endpoint calculates the average solar production for a given day based on historical weather data,
excluding current weather conditions.

**Clear Sky Solar Production**
The `clearsky` endpoint calculates the theoretically possible solar production assuming no cloud cover.

For more information, visit the `forecast.solar API documentation <https://doc.forecast.solar/start>`_.

"""

from __future__ import annotations

import concurrent.futures
import traceback
from collections.abc import Mapping
from datetime import datetime, timedelta
from logging import getLogger
from typing import TYPE_CHECKING

import pandas as pd
import requests
from requests_cache import DO_NOT_CACHE, CachedSession

from eta_utility.connectors.node import NodeForecastSolar
from eta_utility.timeseries import df_interpolate
from eta_utility.util import round_timestamp

from .base_classes import SeriesConnection, SubscriptionHandler

if TYPE_CHECKING:
    from types import TracebackType
    from typing import Any, ClassVar

    from typing_extensions import Self

    from eta_utility.type_hints import AnyNode, Nodes, TimeStep


log = getLogger(__name__)


[docs] class ForecastSolarConnection(SeriesConnection, protocol="forecast_solar"): """ ForecastSolarConnection is a class to download and upload multiple features from and to the Forecast.Solar database as timeseries. :param url: URL of the server with scheme (https://). :param usr: Not needed for Forecast.Solar. :param pwd: Not needed for Forecast.Solar. :param api_key: Token for API authentication. :param nodes: Nodes to select in connection. """ _baseurl: ClassVar[str] = "https://api.forecast.solar" _time_format: ClassVar[str] = "%Y-%m-%dT%H:%M:%SZ" _headers: ClassVar[dict[str, str]] = {"Content-Type": "application/json"} def __init__( self, url: str = _baseurl, *, api_key: str = "None", url_params: dict[str, Any] | None = None, query_params: dict[str, Any] | None = None, nodes: Nodes | None = None, ) -> None: super().__init__(url, None, None, nodes=nodes) #: Url parameters for the forecast.Solar api self.url_params: dict[str, Any] | None = url_params #: Query parameters for the forecast.Solar api self.query_params: dict[str, Any] | None = query_params #: Key to use the Forecast.Solar api. If API key is none, only the public functions are usable. self._api_key: str = api_key #: Cached session to handle the requests self._session: CachedSession = CachedSession( cache_name="eta_utility/connectors/requests_cache/forecast_solar_cache", urls_expire_after={ "https://api.forecast.solar*": 900, # 15 minutes "*": DO_NOT_CACHE, # Don't cache other URLs }, allowable_codes=(200, 400, 401, 403), use_cache_dir=True, ) @classmethod def _from_node(cls, node: AnyNode, **kwargs: Any) -> ForecastSolarConnection: """Initialize the connection object from an Forecast.Solar protocol node object :param node: Node to initialize from. :param kwargs: Keyword arguments for API authentication, where "api_key" is required :return: ForecastSolarConnection object. """ api_key = kwargs.get("api_key", "None") return super()._from_node(node, api_key=api_key) def _read_node(self, node: NodeForecastSolar) -> pd.DataFrame: """Download data from the Forecast.Solar Database. :param node: Node to read values from. :return: pandas.DataFrame containing the data read from the connection. """ url, query_params = node.url, node._query_params query_params["time"] = "utc" raw_response = self._raw_request("GET", url, params=query_params, headers=self._headers) response = raw_response.json() timestamps = pd.to_datetime(list(response["result"].keys())) watts = response["result"].values() data = pd.DataFrame( data=watts, index=timestamps.tz_convert(self._local_tz), dtype="float64", ) data.index.name = "Time (with timezone)" data.columns = [node.name] return data def _select_data( self, results: pd.DataFrame, from_time: pd.Timestamp | None = None, to_time: pd.Timestamp | None = None ) -> tuple[pd.DataFrame, pd.Timestamp]: """Forecast.solar api returns the data for the whole day. Select data only for the time interval. :param nodes: pandas.DataFrame containing the raw data read from the connection. :param from_time: Starting time to begin reading (included in output). :param to_time: Time to stop reading at (included in output). :return: pandas.DataFrame containing the selected data read from the connection and the current timestamp. """ now = pd.Timestamp.now().tz_localize(self._local_tz) if isinstance(from_time, pd.Timestamp): previous_time = from_time.floor("15T") if self._api_key != "None" else from_time.floor("h") else: # When from_time is None, no series is selected previous_time = now.floor("15T") if self._api_key != "None" else now.floor("h") if isinstance(to_time, pd.Timestamp): next_time = ( to_time.floor("15T") + timedelta(minutes=15) if self._api_key != "None" else to_time.floor("h") + timedelta(minutes=60) ) else: # When to_time is None, no series is selected next_time = ( previous_time + timedelta(minutes=15) if self._api_key != "None" else previous_time + timedelta(minutes=60) ) if previous_time not in results: results.loc[previous_time] = 0 if next_time not in results: results.loc[next_time] = 0 results = results.sort_index() return results.loc[previous_time:next_time], now
[docs] def read(self, nodes: Nodes | None = None) -> pd.DataFrame: """Return forecast data from the Forecast.Solar Database. :param nodes: List of nodes to read values from. :return: pandas.DataFrame containing the data read from the connection. """ nodes = self._validate_nodes(nodes) with concurrent.futures.ThreadPoolExecutor() as executor: results = executor.map(self._read_node, nodes) # Filter out empty or all-NA DataFrames filtered_results = [df for df in results if not df.empty and not df.isna().all().all()] # Concatenate the filtered DataFrames values = pd.concat(filtered_results, axis=1, sort=False) values, now = self._select_data(values) # Insert the current timestamp _now and sort the index column to finish with the linear interpolation method values.loc[now] = pd.NA values = values.sort_index() return pd.DataFrame(values.interpolate(method="linear").loc[now])
[docs] def write(self, values: Mapping[AnyNode, Any]) -> None: """ .. warning:: Cannot read single values from the Forecast.Solar API. Use read_series instead :raises NotImplementedError: """ raise NotImplementedError("Write is not implemented for Forecast.Solar.")
[docs] def subscribe(self, handler: SubscriptionHandler, nodes: Nodes | None = None, interval: TimeStep = 1) -> None: """ .. warning:: Cannot read single values from the Forecast.Solar API. Use read_series instead :raises NotImplementedError: """ raise NotImplementedError("Subscribe is not implemented for Forecast.Solar.")
[docs] def read_series( self, from_time: datetime, to_time: datetime, nodes: Nodes | None = None, interval: TimeStep = 1, **kwargs: Any ) -> pd.DataFrame: """Return a time series of forecast data from the Forecast.Solar Database. :param nodes: List of nodes to read values from. :param from_time: Starting time to begin reading (included in output). :param to_time: Time to stop reading at (not included in output). :param interval: Interval between time steps. It is interpreted as seconds if given as integer. :param kwargs: Other parameters (ignored by this connector). :return: Pandas DataFrame containing the data read from the connection. """ _interval = interval if isinstance(interval, timedelta) else timedelta(seconds=interval) nodes = self._validate_nodes(nodes) from_time = pd.Timestamp(round_timestamp(from_time, _interval.total_seconds())).tz_convert(self._local_tz) to_time = pd.Timestamp(round_timestamp(to_time, _interval.total_seconds())).tz_convert(self._local_tz) with concurrent.futures.ThreadPoolExecutor() as executor: results = executor.map(self._read_node, nodes) # Filter out empty or all-NA DataFrames filtered_results = [df for df in results if not df.empty and not df.isna().all().all()] # Concatenate the filtered DataFrames values = pd.concat(filtered_results, axis=1, sort=False) values, _ = self._select_data(values, from_time, to_time) values = df_interpolate(values, _interval) return values.loc[from_time:to_time] # type: ignore
[docs] def subscribe_series( self, handler: SubscriptionHandler, req_interval: TimeStep, offset: TimeStep | None = None, nodes: Nodes | None = None, interval: TimeStep = 1, data_interval: TimeStep = 1, **kwargs: Any, ) -> None: """ .. warning:: Cannot read single values from the Forecast.Solar API. Use read_series instead :raises NotImplementedError: """ raise NotImplementedError("Subscribe series is not implemented for Forecast.Solar.")
[docs] def close_sub(self) -> None: """ .. warning:: Cannot read single values from the Forecast.Solar API. Use read_series instead :raises NotImplementedError: """ raise NotImplementedError("Close subscription is not implemented for Forecast.Solar.")
async def _subscription_loop( self, handler: SubscriptionHandler, interval: TimeStep, req_interval: TimeStep, offset: TimeStep, data_interval: TimeStep, ) -> None: """ .. warning:: Cannot read single values from the Forecast.Solar API. Use read_series instead :raises NotImplementedError: """ raise NotImplementedError("Subscription loop is not implemented for Forecast.Solar.")
[docs] def timestr_from_datetime(self, dt: datetime) -> str: """Create an Forecast.Solar compatible time string. :param dt: Datetime object to convert to string. :return: Forecast.Solar compatible time string. """ return dt.isoformat(sep="T", timespec="seconds").replace(":", "%3A").replace("+", "%2B")
def _raw_request(self, method: str, url: str, **kwargs: Any) -> requests.Response: """Perform Forecast.Solar request and handle possibly resulting errors. :param method: HTTP request method. :param endpoint: Endpoint for the request (server URI is added automatically). :param kwargs: Additional arguments for the request. """ if self._api_key != "None": log.info("The api_key is None and only the public functions are available of the forecastsolar.api.") response = self._session.request(method, url, **kwargs) # Check for request errors response.raise_for_status() return response def _validate_nodes(self, nodes: Nodes | None) -> set[NodeForecastSolar]: # type: ignore vnodes = super()._validate_nodes(nodes) _nodes = set() for node in vnodes: if isinstance(node, NodeForecastSolar): _nodes.add(node) return _nodes
[docs] @classmethod def route_valid(cls, nodes: Nodes) -> bool: """Check if node routes make up a valid route, by using the Forecast.Solar API's check endpoint. :param nodes: List of nodes to check. :return: Boolean if the nodes are on the same route. """ conn = ForecastSolarConnection() nodes = conn._validate_nodes(nodes) for node in nodes: _check_node = node.evolve(api_key=None, endpoint="check", data=None) try: conn._raw_request("GET", _check_node.url, headers=conn._headers) except requests.exceptions.HTTPError as e: log.error(f"\nRoute of node: {node.name} could not be verified: \n{e}") return False # If no request error occurred, the routes are valid return True
[docs] @classmethod def calculate_watt_hours_period(cls, df: pd.DataFrame, watts_column: str) -> pd.DataFrame: """ Calculates watt hours for each period based on the average watts provided. Assumes the DataFrame is indexed by timestamps. """ # Calculate the duration of each period in hours df["period_hours"] = df.index.to_series().diff().dt.total_seconds() / 3600 df["period_hours"].iloc[0] = df["period_hours"].mean() # Handle the first NaN # Calculate watt_hours for each period df["watt_hours_period"] = df[watts_column] * df["period_hours"] return df.drop(columns=["period_hours"])
[docs] @classmethod def summarize_watt_hours_over_day(cls, df: pd.DataFrame) -> pd.DataFrame: """ Sums up watt hours over each day. """ df["date"] = df.index.date daily_energy = df.groupby("date")["watt_hours_period"].sum().reset_index() daily_energy.columns = ["date", "watt_hours"] return daily_energy
[docs] @classmethod def get_dataframe_of_values( cls, df: pd.DataFrame, watts_column: str = "watts" ) -> tuple[pd.DataFrame, pd.DataFrame]: """ Process the original DataFrame to return a DataFrame with watt_hours_period, watt_hours (summarized over the day), and watt_hours_day. """ watt_hours = cls.calculate_watt_hours_period(df, watts_column) daily_sum = cls.summarize_watt_hours_over_day(watt_hours) return watt_hours, daily_sum
def __enter__(self) -> Self: return self def __exit__( self, exc_type: type[BaseException] | None, exc_value: BaseException | None, tb: TracebackType | None, ) -> bool: if exc_type is not None: traceback.print_exception(exc_type, exc_value, tb) return False try: self._session.close() except Exception as e: log.error(f"Error closing the connection: {e}") return True def __del__(self) -> None: try: self._session.close() finally: pass