Source code for eta_utility.servers.opc_ua

from __future__ import annotations

import socket
from collections.abc import Sized
from datetime import datetime
from typing import TYPE_CHECKING

# Async import
import asyncua.sync
import pandas as pd
from asyncua import ua  # , Server as asyncServer

# Sync import
from asyncua.sync import Server, ThreadLoopNotRunning
from asyncua.ua import uaerrors

from eta_utility import ensure_timezone, get_logger, url_parse
from eta_utility.connectors.node import NodeOpcUa

if TYPE_CHECKING:
    import types
    from collections.abc import Mapping
    from typing import Any

    # Sync import
    from asyncua.sync import SyncNode as SyncOpcNode

    # Async import
    # FIXME: add async import: from asyncua import Node as asyncSyncOpcNode
    from eta_utility.type_hints import AnyNode, Nodes

log = get_logger("servers.opcua")


[docs] class OpcUaServer: """Provides an OPC UA server with a number of specified nodes. Each node can contain single values or arrays. :param namespace: Namespace of the OPC UA Server. :param ip: IP Address to listen on (default: None). :param port: Port to listen on (default: 4840). """ def __init__(self, namespace: str | int, ip: str | None = None, port: int = 4840) -> None: #: URL of the OPC UA Server. self.url: str if ip is None: self.url = f"opc.tcp://{socket.gethostbyname(socket.gethostname())}:{port}" else: self.url = f"opc.tcp://{ip}:{port}" log.info(f"Server Address is {self.url}") self._url, _, _ = url_parse(self.url) self._server: Server = Server() self._server.set_endpoint(self.url) self.idx: int = self._server.register_namespace(str(namespace)) #: idx: Namespace of the OPC UA _server log.debug(f'Server Namespace set to "{namespace}"') self._server.set_security_policy([ua.SecurityPolicyType.NoSecurity]) self._server.set_server_name("ETA Utility OPC UA Server") self._server.start()
[docs] def write(self, values: Mapping[AnyNode, Any]) -> None: """Write some values directly to the OPC UA server. :param values: Dictionary of data to write {node.name: value}. """ nodes = self._validate_nodes(set(values.keys())) for node in nodes: var = self._server.get_node(node.opc_id) opc_type = var.get_data_type_as_variant_type() var.set_value(ua.Variant(values[node], opc_type))
[docs] def read(self, nodes: Nodes | None = None) -> pd.DataFrame: """ Read some manually selected values directly from the OPC UA server. :param nodes: List of nodes to read from. :return: pandas.DataFrame containing current values of the OPC UA-variables. :raises RuntimeError: When an error occurs during reading. """ _nodes = self._validate_nodes(nodes) _dikt = {} for node in _nodes: try: opcua_variable = self._server.get_node(node.opc_id) value = opcua_variable.get_value() _dikt[node.name] = [value] except uaerrors.BadNodeIdUnknown: raise RuntimeError( f"The node id ({node.opc_id}) refers to a node that does not exist in the server address space " f"{self.url}. (BadNodeIdUnknown)" ) return pd.DataFrame(_dikt, index=[ensure_timezone(datetime.now())])
[docs] def create_nodes(self, nodes: Nodes) -> None: """Create nodes on the server from a list of nodes. This will try to create the entire node path. :param nodes: List or set of nodes to create. """ def create_object(parent: SyncOpcNode, child: NodeOpcUa) -> SyncOpcNode: children: list[SyncOpcNode] = asyncua.sync._to_sync(parent.tloop, parent.get_children()) for obj in children: ident = obj.nodeid.Identifier if type(obj.nodeid.Identifier) is str else obj.nodeid.Identifier if child.opc_path_str == ident: return obj else: return asyncua.sync._to_sync(parent.tloop, parent.add_object(child.opc_id, child.opc_name)) _nodes = self._validate_nodes(nodes) for node in _nodes: try: if len(node.opc_path) == 0: last_obj = asyncua.sync._to_sync(self._server.tloop, self._server.aio_obj.get_objects_node()) else: # Create SyncNode from asyncNode sync_node = asyncua.sync._to_sync(self._server.tloop, self._server.aio_obj.get_objects_node()) last_obj = create_object(sync_node, node.opc_path[0]) for key in range(1, len(node.opc_path)): last_obj = create_object(last_obj, node.opc_path[key]) init_val: Any if not hasattr(node, "dtype"): init_val = 0.0 elif node.dtype is int: init_val = 0 elif node.dtype is bool: init_val = False elif node.dtype is str: init_val = "" else: init_val = 0.0 last_obj.add_variable(node.opc_id, node.opc_name, init_val) log.debug(f"OPC UA Node created: {node.opc_id}") except uaerrors.BadNodeIdExists: log.warning(f"Node with NodeId : {node.opc_id} could not be created. It already exists.") except RuntimeError as e: raise ConnectionError(str(e)) from e
[docs] def delete_nodes(self, nodes: Nodes) -> None: """Delete the given nodes and their parents (if the parents do not have other children). :param nodes: List or set of nodes to be deleted. """ def delete_node_parents(node: SyncOpcNode, depth: int = 20) -> None: parents = node.get_references(direction=ua.BrowseDirection.Inverse) if not node.get_children(): node.delete(delete_references=True) log.info(f"Deleted Node {node.nodeid} from server {self.url}.") else: log.info(f"Node {node.nodeid} on server {self.url} has remaining children and was not deleted.") for parent in parents: if depth > 0: delete_node_parents(self._server.get_node(parent.NodeId), depth=depth - 1) nodes = self._validate_nodes(nodes) for node in nodes: delete_node_parents(self._server.get_node(node.opc_id))
[docs] def start(self) -> None: """Restart the server after it was stopped.""" self._server.start()
[docs] def stop(self) -> None: """This should always be called, when the server is not needed anymore. It stops the server.""" try: self._server.stop() except AttributeError: # Occurs only if server did not exist and can be ignored. pass except ThreadLoopNotRunning: # Occurs only if server was already stopped (and therefore the ThreadLoop as well) and can be ignored. pass
@property def active(self) -> bool: return self._server.aio_obj.bserver._server._serving
[docs] def allow_remote_admin(self, allow: bool) -> None: """Allow remote administration of the server. :param allow: Set to true to enable remote administration of the server. """ self._server.aio_obj.allow_remote_admin(allow)
def _validate_nodes(self, nodes: Nodes | None) -> set[NodeOpcUa]: """Make sure that nodes are a Set of nodes and that all nodes correspond to the protocol and url of the connection. :param nodes: Sequence of Node objects to validate. :return: Set of valid Node objects for this connection. """ _nodes = None if nodes: if not isinstance(nodes, Sized): nodes = {nodes} # If not using preselected nodes from self.selected_nodes, check if nodes correspond to the connection _nodes = { node for node in nodes if isinstance(node, NodeOpcUa) and node.url_parsed.hostname == self._url.hostname } # Make sure that some nodes remain after the checks and raise an error if there are none. if not _nodes or len(_nodes) == 0: raise ValueError( f"Some nodes to read from/write to must be specified. If nodes were specified, they do not " f"match the connection {self.url}" ) return _nodes def __enter__(self) -> OpcUaServer: return self def __exit__( self, exc_type: type[BaseException] | None, exc_val: BaseException | None, exc_tb: types.TracebackType | None ) -> None: self.stop()