from __future__ import annotations
import struct
from asyncio import sleep as async_sleep
from datetime import datetime
from itertools import groupby
from time import sleep
from typing import TYPE_CHECKING
import pandas as pd
from eta_utility.util import ensure_timezone
if TYPE_CHECKING:
from collections.abc import Iterable, Sequence
from typing import Any, Callable
from eta_utility.type_hints import AnyNode, TimeStep
[docs]
class RetryWaiter:
"""Helper class which keeps track of waiting time before retrying a connection."""
values = [0, 1, 3, 5, 5, 10, 20, 30, 40, 60]
def __init__(self) -> None:
self.counter = 0
[docs]
def tried(self) -> None:
"""Register a retry with the RetryWaiter."""
self.counter += 1
[docs]
def success(self) -> None:
"""Register a successful connection with the RetryWaiter."""
self.counter = 0
@property
def wait_time(self) -> int:
"""Return the time to wait for."""
if self.counter >= len(self.values) - 1:
return self.values[-1]
else:
return self.values[self.counter]
[docs]
def wait(self) -> None:
"""Wait/sleep synchronously."""
sleep(self.wait_time)
[docs]
async def wait_async(self) -> None:
"""Wait/sleep asynchronously - must be awaited."""
await async_sleep(self.wait_time)
[docs]
def decode_modbus_value(
value: Sequence[int], byteorder: str, type_: Callable | None = None, wordorder: str = "big"
) -> Any:
r"""Method to decode incoming modbus values. Strings are always decoded as utf-8 values. If you do not
want this behaviour specify 'bytes' as the data type.
:param value: Current value to be decoded into float.
:param byteorder: Byteorder for decoding i.e. 'little' or 'big' endian.
:param type\_: Type of the output value. See `Python struct format character documentation
<https://docs.python.org/3/library/struct.html#format-characters>` for all possible
format strings (default: f).
:return: Decoded value as a python type.
"""
if byteorder not in ("little", "big"):
raise ValueError(f"Specified an invalid byteorder: '{byteorder}'")
if wordorder not in ("little", "big"):
raise ValueError(f"Specified an invalid wordorder: '{wordorder}'")
bo = "<" if byteorder == "little" else ">"
# Swap words if word order is little endian
if type_ is int or type_ is float:
if wordorder == "little":
value = value[::-1]
dtype, _len = _get_decode_params(value, type_)
# Boolean values don't need decoding
if type_ is bool:
return bool(value[0])
# Determine the format strings for packing and unpacking the received byte sequences. These format strings
# depend on the endianness (determined by bo), the length of the value in bytes and the data type.
pack = f">{len(value):1d}H"
unpack = f"{bo}{_len}{dtype}"
# Convert the value into the appropriate format
val = struct.unpack(unpack, struct.pack(pack, *value))[0]
if type_ is str:
try:
val = type_(val, "utf-8")
except UnicodeDecodeError:
val = ""
elif type_ is not None:
val = type_(val)
else:
val = float(val)
return val
def _get_decode_params(value: Sequence[int], type_: Callable | None = None) -> tuple[str, int]:
if type_ is str or type_ is bytes:
dtype = "s"
_len = len(value) * 2
elif type_ is bool:
dtype = "?"
_len = 1
if _len != len(value):
raise ValueError(f"The length of the received value ({len(value)})does not match the data type {type_}")
elif type_ is int:
_int_types = {1: "b", 2: "h", 4: "i", 8: "q"}
_len = 1
try:
dtype = _int_types[len(value) * 2]
except KeyError:
raise ValueError(f"The length of the received value ({len(value)})does not match the data type {type_}")
elif type_ is float or type_ is None:
_float_types = {2: "e", 4: "f", 8: "d"}
_len = 1
try:
dtype = _float_types[len(value) * 2]
except KeyError:
raise ValueError(f"The length of the received value ({len(value)}) does not match the data type: {type_}")
else:
raise ValueError(f"The given modbus data type was not recognized: {type_}")
return dtype, _len
[docs]
def encode_bits(
value: str | int | float | bytes, byteorder: str, bit_length: int, type_: Callable | None = None
) -> list[int]:
r"""Method to encode python data type to modbus value. This means an array of bytes to send to a
modbus server.
:param value: Current value to be decoded into float.
:param byteorder: Byteorder for decoding i.e. 'little' or 'big' endian.
:param bit_length: Length of the value in bits.
:param type\_: Type of the output value. See `Python struct format character documentation
<https://docs.python.org/3/library/struct.html#format-characters>` for all possible
format strings (default: f).
:return: Decoded value as a python type.
"""
byte_length = bit_length // 8
# Make sure that value is of the type specified by the node.
if type_ is not None:
value = type_(value)
if isinstance(value, int):
if value < 0:
_types = {1: "b", 2: "h", 4: "i", 8: "q"}
else:
_types = {1: "B", 2: "H", 4: "I", 8: "Q"}
try:
_type = _types[byte_length]
except KeyError:
raise ValueError(f"Byte length for integers must be either 1, 2, 4 or 8. Got {byte_length}.")
_len: str | int = ""
elif isinstance(value, float):
_types = {2: "e", 4: "f", 8: "d"}
try:
_type = _types[byte_length]
except KeyError:
raise ValueError(f"Byte length for floats must be either 4 or 8. Got {byte_length}.")
_len = ""
else:
_type = "s"
_len = byte_length
if not isinstance(value, bytes):
value = bytes(value, "utf-8")
_order = {"big": ">", "little": "<"}
try:
bo = _order[byteorder]
except KeyError:
raise ValueError(f"Unknown byte order specified: {byteorder}")
try:
byte = struct.pack(f"{bo}{_len}{_type}", value)
except struct.error:
raise ValueError(f"Could not convert value {value!r} to bits.")
bitstrings = [f"{bin(x)[2:]:0>8}" for x in byte]
return [int(z) for z in "".join(bitstrings)]
[docs]
def bitarray_to_registers(bits: list[int | bool]) -> list[int]:
"""Convert a list of bits into a list of 16 bit 'bytes'."""
# Make sure that _bits is a list of integers, not bools.
_bits = [int(x) for x in bits] if isinstance(bits[0], bool) else bits
b_size = (len(_bits) + 15) // 16
register_list = [0] * b_size
for i in range(0, b_size):
start = i * 16
register_list[i] = int("".join([str(v) for v in _bits[start : start + 16]]), 2)
return register_list
[docs]
def all_equal(iterable: Iterable[Any]) -> bool:
"""Check if all values inside iterable are equal
:param iterable: python iterable
:return: True if all values are equal False elsewhere
"""
g = groupby(iterable)
return bool(next(g, True)) and not bool(next(g, False))
[docs]
class IntervalChecker:
"""Class for the subscription interval checking."""
def __init__(self) -> None:
#: Dictionary that stores the value and the time for checking changes and the time interval
self.node_latest_values: dict[AnyNode, list] = {}
#: :py:func:`eta_utility.util.ensure_timezone`
self._assert_tz_awareness = ensure_timezone
[docs]
def push(
self,
node: AnyNode,
value: Any | pd.Series | Sequence[Any],
timestamp: datetime | pd.DatetimeIndex | TimeStep | None = None,
) -> None:
"""Push value and time in dictionary for a node. If the value doesn't change compared to the previous
timestamp, the push is skipped.
:param node: Node to check.
:param value: Value from the subscription.
:param timestamp: Time of the incoming value of the node.
"""
if node in self.node_latest_values:
if value != self.node_latest_values[node][0]:
self.node_latest_values[node] = [value, timestamp]
elif node.interval is not None:
self.node_latest_values[node] = [value, timestamp]
[docs]
def check_interval_connection(self) -> bool | None:
"""Check the interval between old and new value. If no interval has been defined, the check interval is skipped.
:return: Boolean for the interval check.
"""
# Get the current time to compare the interval
time = self._assert_tz_awareness(datetime.now())
if len(self.node_latest_values) > 0:
for node in self.node_latest_values:
_time_since_last_check = (
(time - self.node_latest_values[node][1]).total_seconds()
if node in self.node_latest_values
else None
)
if node in self.node_latest_values and _time_since_last_check is not None:
if _time_since_last_check <= float(node.interval): # type: ignore
_changed_within_interval = True
else:
_changed_within_interval = False
break
else:
_changed_within_interval = True
else:
_changed_within_interval = True
return _changed_within_interval