Source code for eta_utility.servers.modbus

from __future__ import annotations

import socket
from collections.abc import Sized
from datetime import datetime
from logging import getLogger
from typing import TYPE_CHECKING

import pandas as pd
from pyModbusTCP.server import ModbusServer as BaseModbusServer

from eta_utility import ensure_timezone, url_parse
from eta_utility.connectors.node import NodeModbus
from eta_utility.connectors.util import (
    bitarray_to_registers,
    decode_modbus_value,
    encode_bits,
)

if TYPE_CHECKING:
    import types
    from collections.abc import Mapping
    from typing import Any

    from typing_extensions import Self

    from eta_utility.type_hints import Nodes

log = getLogger(__name__)


[docs] class ModbusServer: """Provides a Modbus server with a number of specified nodes. When building a data structure make sure to consider the following. Numbers (integers and flaots) will be stored depending on the byte_length setting of the Modbus node. This is 2 by default and means that each number will take up 16 bits. This affects, how many "channels" are needed for each number. You have to ensure not to overwrite parts of a number by leaving enough channels after the start of a number empty. :param ip: IP Address to listen on (default: None). :param port: Port to listen on (default: 502). :param big_endian: The server will encode values as big endian by default. If you would like to have little endian encoding instead, set this to False. """ def __init__(self, ip: str | None = None, port: int = 502, big_endian: bool = True) -> None: #: URL of the Modbus Server. self.url: str if ip is None: try: host = socket.gethostbyname(socket.gethostname()) except socket.gaierror: host = "127.0.0.1" self.url = f"modbus.tcp://{host}:{port}" else: self.url = f"modbus.tcp://{ip}:{port}" log.info(f"Server Address is {self.url}") self._url, _, _ = url_parse(self.url) self._big_endian = big_endian self._server: BaseModbusServer = BaseModbusServer(self._url.hostname, self._url.port, no_block=True) self.start()
[docs] def write(self, values: Mapping[NodeModbus, Any]) -> None: """Write some values directly to the Modbus server. This function supports writing int, float and string objects. If you have another object, convert it to bytes before writing. :param values: Dictionary of data to write {node.name: value}. """ nodes = self._validate_nodes(set(values.keys())) srv_info = BaseModbusServer.ServerInfo() for node in nodes: byteorder = "big" if self._big_endian else "little" if not isinstance(values[node], list): bits = encode_bits(values[node], byteorder, node.mb_bit_length, node.dtype) else: bits = values[node] if node.mb_register == "coils": self._server.data_hdl.write_coils(node.mb_channel, bits, srv_info) elif node.mb_register == "holding": bits = bitarray_to_registers(bits) # If the wordorder is little, the bits have to be reversed. if node.mb_wordorder == "little": bits = bits[::-1] self._server.data_hdl.write_h_regs(node.mb_channel, bits, srv_info)
[docs] def read(self, nodes: Nodes[NodeModbus] | None = None) -> pd.DataFrame: """ Read some manually selected values directly from the Modbusserver. :param nodes: List of nodes to read from. :return: pandas.DataFrame containing current values of the Modbus-variables. :raises RuntimeError: When an error occurs during reading. """ _nodes = self._validate_nodes(nodes) srv_info = BaseModbusServer.ServerInfo() results = {} for node in _nodes: if node.mb_register == "holding": val = self._server.data_hdl.read_h_regs(node.mb_channel, node.mb_bit_length // 16, srv_info) elif node.mb_register == "coils": val = self._server.data_hdl.read_coils(node.mb_channel, node.mb_bit_length, srv_info) elif node.mb_register == "discrete_input": val = self._server.data_hdl.read_d_inputs(node.mb_channel, node.mb_bit_length, srv_info) elif node.mb_register == "input": val = self._server.data_hdl.read_i_regs(node.mb_channel, node.mb_bit_length // 16, srv_info) else: raise ValueError(f"The specified register type is not supported: {node.mb_register}") if val.ok and (node.mb_register in ("holding", "input")): byteorder = "big" if self._big_endian else "little" results[node.name] = decode_modbus_value(val.data, byteorder, node.dtype, node.mb_wordorder) elif val.ok and isinstance(val.data, list): if len(val.data) > 1: for idx, value in enumerate(val.data): results[f"{node.name}_{idx}"] = value else: results[node.name] = val.data[0] elif val.ok: results[node.name] = val.data else: raise RuntimeError("Could not decode bits from ModbusServer.") return pd.DataFrame(results, index=[ensure_timezone(datetime.now())])
[docs] def start(self) -> None: """Restart the server after it was stopped.""" if not self._server.is_run: self._server.start()
[docs] def stop(self) -> None: """This should always be called, when the server is not needed anymore. It stops the server.""" try: self._server.stop() except AttributeError: # Occurs only if server did not exist and can be ignored. pass
@property def active(self) -> bool: return self._server.is_run def _validate_nodes(self, nodes: Nodes[NodeModbus] | None) -> set[NodeModbus]: """Make sure that nodes are a Set of nodes and that all nodes correspond to the protocol and url of the connection. :param nodes: Sequence of Node objects to validate. :return: Set of valid Node objects for this connection. """ _nodes = None if nodes: if not isinstance(nodes, Sized): nodes = {nodes} # If not using preselected nodes from self.selected_nodes, check if nodes correspond to the connection _nodes = { node for node in nodes if isinstance(node, NodeModbus) and node.url_parsed.hostname == self._url.hostname } # Make sure that some nodes remain after the checks and raise an error if there are none. if not _nodes or len(_nodes) == 0: raise ValueError( f"Some nodes to read from/write to must be specified. If nodes were specified, they do not " f"match the connection {self.url}" ) return _nodes def __enter__(self) -> Self: return self def __exit__( self, exc_type: type[BaseException] | None, exc_val: BaseException | None, exc_tb: types.TracebackType | None ) -> None: self.stop()