Source code for eta_utility.servers.opc_ua

from __future__ import annotations

import socket
from datetime import datetime
from logging import getLogger
from typing import TYPE_CHECKING

# Async import
import asyncua.sync
import pandas as pd
from asyncua import ua  # , Server as asyncServer

# Sync import
from asyncua.sync import Server, ThreadLoopNotRunning
from asyncua.ua import uaerrors

from eta_utility import ensure_timezone, url_parse
from eta_utility.connectors.node import NodeOpcUa

if TYPE_CHECKING:
    import types
    from collections.abc import Mapping
    from typing import Any

    # Sync import
    from asyncua.sync import SyncNode as SyncOpcNode
    from typing_extensions import Self

    # Async import
    # TODO: add async import: from asyncua import Node as asyncSyncOpcNode
    # https://git.ptw.maschinenbau.tu-darmstadt.de/eta-fabrik/public/eta-utility/-/issues/270
    from eta_utility.type_hints import Nodes

log = getLogger(__name__)


[docs] class OpcUaServer: """Provides an OPC UA server with a number of specified nodes. Each node can contain single values or arrays. :param namespace: Namespace of the OPC UA Server. :param ip: IP Address to listen on (default: None). :param port: Port to listen on (default: 4840). """ def __init__(self, namespace: str | int, ip: str | None = None, port: int = 4840) -> None: #: URL of the OPC UA Server. self.url: str if ip is None: try: host = socket.gethostbyname(socket.gethostname()) except socket.gaierror: host = "127.0.0.1" self.url = f"opc.tcp://{host}:{port}" else: self.url = f"opc.tcp://{ip}:{port}" log.info(f"Server Address is {self.url}") self._url, _, _ = url_parse(self.url) self._server: Server = Server() self._server.set_endpoint(self.url) self.idx: int = self._server.register_namespace(str(namespace)) #: idx: Namespace of the OPC UA _server log.debug(f'Server Namespace set to "{namespace}"') self._server.set_security_policy([ua.SecurityPolicyType.NoSecurity]) self._server.set_server_name("ETA Utility OPC UA Server") self._server.start()
[docs] def write(self, values: Mapping[NodeOpcUa, Any]) -> None: """Write some values directly to the OPC UA server. :param values: Dictionary of data to write {node.name: value}. """ nodes = self._validate_nodes(set(values.keys())) for node in nodes: var = self._server.get_node(node.opc_id) try: opc_type = var.read_data_type_as_variant_type() except asyncua.sync.ThreadLoopNotRunning as e: raise ConnectionError(f"Server {self} is not running.") from e var.write_value(ua.Variant(values[node], opc_type))
[docs] def read(self, nodes: NodeOpcUa | Nodes[NodeOpcUa] | None = None) -> pd.DataFrame: """ Read some manually selected values directly from the OPC UA server. :param nodes: Single node or list/set of nodes to read from. :return: pandas.DataFrame containing current values of the OPC UA-variables. :raises RuntimeError: When an error occurs during reading. """ _nodes = self._validate_nodes(nodes) _dikt = {} for node in _nodes: try: opcua_variable = self._server.get_node(node.opc_id) value = opcua_variable.read_value() _dikt[node.name] = [value] except uaerrors.BadNodeIdUnknown as e: raise RuntimeError( f"The node id ({node.opc_id}) refers to a node that does not exist in the server address space " f"{self.url}. (BadNodeIdUnknown)" ) from e return pd.DataFrame(_dikt, index=[ensure_timezone(datetime.now())])
[docs] def create_nodes(self, nodes: Nodes[NodeOpcUa]) -> None: """Create nodes on the server from a list of nodes. This will try to create the entire node path. :param nodes: List or set of nodes to create. """ def create_object(parent: SyncOpcNode, opc_name: str, opc_path_str: str, opc_id: str) -> SyncOpcNode: children: list[SyncOpcNode] = asyncua.sync._to_sync(parent.tloop, parent.get_children()) for child in children: ident = child.nodeid.Identifier if isinstance(ident, str) and ident.strip() == opc_path_str: return child return asyncua.sync._to_sync(parent.tloop, parent.add_object(opc_id, opc_name)) _nodes = self._validate_nodes(nodes) for node in _nodes: try: split_path = node.opc_path_str.split(".") # type: ignore # If the path starts with a dot, the dot belongs to the root node and is not a separator if node.opc_path_str.startswith("."): # type: ignore split_path = node.opc_path_str.rsplit(".", maxsplit=len(split_path) - 2) # type: ignore # Create SyncNode from asyncNode last_obj = asyncua.sync._to_sync(self._server.tloop, self._server.aio_obj.get_objects_node()) for i in range(len(split_path) - 1): _opc_name = split_path[i].strip(" .") _opc_path_str = ".".join(split_path[: i + 1]) _opc_id = f"ns={node.opc_ns};s={_opc_path_str}" last_obj = create_object(last_obj, _opc_name, _opc_path_str, _opc_id) init_val: Any if not hasattr(node, "dtype"): init_val = 0.0 elif node.dtype is int: init_val = 0 elif node.dtype is bool: init_val = False elif node.dtype is str: init_val = "" else: init_val = 0.0 last_obj.add_variable(node.opc_id, node.opc_name, init_val) log.debug(f"OPC UA Node created: {node.opc_id}") except uaerrors.BadNodeIdExists: log.warning(f"Node with NodeId : {node.opc_id} could not be created. It already exists.") except RuntimeError as e: raise ConnectionError(str(e)) from e
[docs] def delete_nodes(self, nodes: Nodes[NodeOpcUa]) -> None: """Delete the given nodes and their parents (if the parents do not have other children). :param nodes: List or set of nodes to be deleted. """ def delete_node_parents(node: SyncOpcNode, depth: int = 20) -> None: parents = node.get_references(direction=ua.BrowseDirection.Inverse) if not node.get_children(): node.delete(delete_references=True) log.info(f"Deleted Node {node.nodeid} from server {self.url}.") else: log.info(f"Node {node.nodeid} on server {self.url} has remaining children and was not deleted.") for parent in parents: if depth > 0: delete_node_parents(self._server.get_node(parent.NodeId), depth=depth - 1) nodes = self._validate_nodes(nodes) for node in nodes: delete_node_parents(self._server.get_node(node.opc_id))
[docs] def start(self) -> None: """Restart the server after it was stopped.""" self._server.start()
[docs] def stop(self) -> None: """This should always be called, when the server is not needed anymore. It stops the server.""" try: self._server.stop() except AttributeError: # Occurs only if server did not exist and can be ignored. pass except ThreadLoopNotRunning: # Occurs only if server was already stopped (and therefore the ThreadLoop as well) and can be ignored. pass
@property def active(self) -> bool: return self._server.aio_obj.bserver._server._serving
[docs] def allow_remote_admin(self, allow: bool) -> None: """Allow remote administration of the server. :param allow: Set to true to enable remote administration of the server. """ self._server.aio_obj.allow_remote_admin(allow)
def _validate_nodes(self, nodes: NodeOpcUa | Nodes[NodeOpcUa] | None) -> set[NodeOpcUa]: """Make sure that nodes are a Set of nodes and that all nodes correspond to the protocol and url of the connection. :param nodes: Sequence of Node objects to validate. :return: Set of valid Node objects for this connection. """ _nodes = None if nodes: # If not using preselected nodes from self.selected_nodes, check if nodes correspond to the connection nodes = {nodes} if isinstance(nodes, NodeOpcUa) else nodes _nodes = { node for node in nodes if isinstance(node, NodeOpcUa) and node.url_parsed.hostname == self._url.hostname } # Make sure that some nodes remain after the checks and raise an error if there are none. if not _nodes or len(_nodes) == 0: raise ValueError( f"Some nodes to read from/write to must be specified. If nodes were specified, they do not " f"match the connection {self.url}" ) return _nodes def __enter__(self) -> Self: return self def __exit__( self, exc_type: type[BaseException] | None, exc_val: BaseException | None, exc_tb: types.TracebackType | None ) -> None: self.stop()