from __future__ import annotations
from collections.abc import Iterable
from datetime import datetime, timedelta
from logging import getLogger
from typing import TYPE_CHECKING
import pandas as pd
from eta_utility.connectors.base_classes import Connection, SubscriptionHandler
from eta_utility.connectors.modbus import ModbusConnection
from eta_utility.connectors.node import Node, NodeEmonio, NodeModbus
if TYPE_CHECKING:
from collections.abc import Mapping
from typing import Any
from eta_utility.type_hints import Nodes, TimeStep
log = getLogger(__name__)
[docs]
class EmonioConnection(Connection[NodeEmonio], protocol="emonio"):
"""
Thin wrapper class for the Emonio that uses a modbus TCP Connection.
Internally the Emonio nodes are converted to modbus nodes with
fixed parameters, expect for the name, url and channel.
If nodes have specified a phase, the connection will check if the phase is connected.
Additionally, the connection will check for Emonio errors and warnings (max. every minute).
When creating a :class:`~node.NodeEmonio` the :attr:`~node.NodeEmonio.parameter` (and resulting modbus channel)
is set by the name of the node (case insensitive).
See `Available Emonio Nodes` for for possible parameter names.
Alternatively, the :attr:`~node.NodeEmonio.address` (modbus channel) can be set manually.
The phase is set by the :attr:`~node.NodeEmonio.phase` attribute of the node.
Possible values are ``a``, ``b``, ``c`` or ``abc``, with ``abc`` being the default.
"""
def __init__(self, url: str, *, nodes: Nodes[NodeEmonio] | None = None, check_error: bool = True) -> None:
"""
Initialize the Emonio connection.
:param url: URL of the Emonio.
:param nodes: List of nodes to connect to.
:param check_error: Setting to check for errors and warnings.
"""
self.nodes_factory = NodeModbusFactory(url)
# The actual (Modbus) connection which will be used for reading
self._connection = ModbusConnection(url=url)
# Dictionary to keep track of the connected phases
self._phases_connected: dict[str, bool] = self._get_phases_status()
self.check_error = check_error
self._last_error_check = datetime.now() - timedelta(minutes=1)
self.check_warnings_and_errors()
# This calls self._validate_nodes(nodes) and checks phases and errors/warnings
super().__init__(url, nodes=nodes)
@classmethod
def _from_node(cls, node: NodeEmonio, **kwargs: Any) -> EmonioConnection:
"""
Create an Emonio connection from a single node.
:param node: The node to connect to.
:return: The Emonio connection.
"""
return super()._from_node(node)
[docs]
def read(self, nodes: Nodes[NodeEmonio] | None = None) -> pd.DataFrame:
"""
Read the values of the selected nodes.
If nodes is None, all previously selected nodes will be read.
:param nodes: List of nodes to read from.
:return: Dataframe with the read values.
"""
_nodes: set[NodeEmonio] = self._validate_nodes(nodes)
return self._connection.read(self._prepare_modbus_nodes(_nodes))
[docs]
def write(self, values: Mapping[NodeEmonio, Any]) -> None:
"""
.. warning::
Not implemented: Writing to Emonio nodes is not supported.
"""
raise NotImplementedError("Writing to Emonio nodes is not supported")
[docs]
def subscribe(
self, handler: SubscriptionHandler, nodes: Nodes[NodeEmonio] | None = None, interval: TimeStep = 1
) -> None:
"""
Subscribe to the selected nodes. The Modbus connection does all the handling.
:param handler: The handler to subscribe.
:param nodes: List of nodes to subscribe to.
:param interval: The interval in seconds to read the values.
"""
_nodes: set[NodeEmonio] = self._validate_nodes(nodes)
self._connection.subscribe(handler, self._prepare_modbus_nodes(_nodes), interval)
[docs]
def close_sub(self) -> None:
"""Close the subscription of the modus connection."""
self._connection.close_sub()
def _prepare_modbus_nodes(self, nodes: Nodes[NodeEmonio]) -> list[NodeModbus]:
"""
Convert the Emonio nodes to modbus nodes with fixed parameters.
The wordorder is little because the Emonio uses zero based word indexing.
All values are 32 bit float holding registers.
:param nodes: List of Emonio nodes.
:return: List of modbus nodes (will return empty list when no nodes are passed).
"""
if not isinstance(nodes, Iterable):
nodes = {nodes}
return [self.nodes_factory.get_default_node(node.name, node.address) for node in nodes]
def _validate_nodes(self, nodes: Nodes[NodeEmonio] | None) -> set[NodeEmonio]:
"""
Validate the nodes and return the set of nodes.
If nodes is none, return the previously selected nodes.
:param nodes: List of nodes to validate.
:return: Set of nodes.
"""
_nodes: set[NodeEmonio] = super()._validate_nodes(nodes)
if self.check_error:
self.check_warnings_and_errors()
# Check if the phases of the nodes are connected
self._check_phases(_nodes)
return _nodes
def _get_phases_status(self) -> dict[str, bool]:
"""
Get connection information about the phases of the Emonio.
The status is read from the discrete input registers of the Emonio.
"""
nodes = [self.nodes_factory.get_discrete_input_node(phase, i) for i, phase in enumerate(["a", "b", "c"])]
# Read the status of the phases from the Emonio
result = self._connection.read(nodes)
# Convert the result to a dictionary
result = result.iloc[0].to_dict()
log.debug("Connected phases: %s", result)
return result
def _check_phases(self, nodes: Nodes[NodeEmonio]) -> None:
"""
Check the connection status of the Emonio phases.
:param nodes: List of nodes to check.
"""
if isinstance(nodes, Node):
nodes = [nodes]
for node in nodes:
phase = node.phase
if phase == "abc":
continue
if not self._phases_connected[phase]:
raise ValueError(f"Phase '{phase}' is not connected.")
[docs]
def check_warnings_and_errors(self) -> None:
"""
Calls the error and warning check if the last check was more than a minute ago.
"""
now = datetime.now()
if self._last_error_check is None:
pass
elif now - self._last_error_check < timedelta(minutes=1):
return
self._last_error_check = now
self._check_warnings_and_errors()
def _check_warnings_and_errors(self) -> None:
"""
Check the Emonio for errors and warnings.
This is done by reading the error and warning bits from the Emonio.
If an error bit is set, a ValueError is raised.
If a warning bit is set, a warning is logged.
"""
error_node = self.nodes_factory.get_warnings_errors_node("Error", 1000)
warning_node = self.nodes_factory.get_warnings_errors_node("Warning", 1001)
result = self._connection.read([error_node, warning_node])
# Check binary if corresponding error/warning bit is set
for name, value in result.items():
# Convert the value to a binary string and reverse it
for i, bit in enumerate(bin(value.iloc[0])[:1:-1]):
if bit == "1":
# chr(97) = "a", chr(98) = "b", ...
msg = (
f"{name} bit '{chr(97+i)}' is set. "
f"See https://wiki.emonio.de/de/Emonio_P3 for more information."
)
if name == "Warning":
log.warning(msg)
else:
log.error(msg)
raise ValueError(msg)
[docs]
class NodeModbusFactory:
"""
The NodeModbusFactory is a factory class that creates NodeModbus objects
with fixed parameters, expect: name, url and mb_channel.
Has to be initialized with the url of the Emonio.
It's a helper class for the EmonioConnection to create its modbus nodes.
It also can be used to manually create a NodeModbus object, which has to be read with a ModbusConnection.
(not recommended, use the EmonioConnection instead)
"""
def __init__(self, url: str):
common = {"url": url, "protocol": "modbus", "mb_byteorder": "big"}
self.default_params = {
**common,
"mb_wordorder": "little",
"mb_register": "holding",
"dtype": "float",
"mb_bit_length": 32,
}
self.discrete_input_params = {
**common,
"mb_register": "discrete_input",
"dtype": "bool",
"mb_bit_length": 1,
}
self.warnings_errors_params = {
**common,
"mb_register": "holding",
"dtype": "int",
"mb_bit_length": 16,
}
def __return_nodes(self, name: str, channel: int, params: dict[str, Any]) -> NodeModbus:
return NodeModbus(name=name, **params, mb_channel=channel)
[docs]
def get_default_node(self, name: str, channel: int) -> NodeModbus:
"""
Create a modbus node for reading Emonio values.
:param name: Name of the node.
:param channel: Modbus channel of the node. (Emonio address)
"""
return self.__return_nodes(name, channel, self.default_params)
[docs]
def get_warnings_errors_node(self, name: str, channel: int) -> NodeModbus:
"""
Create a modbus node for reading the error and warning registers of the Emonio.
:param name: Name of the node.
:param channel: Modbus channel of the node. (Emonio address)
"""
return self.__return_nodes(name, channel, self.warnings_errors_params)