Source code for eta_utility.connectors.opc_ua

"""The OPC UA module provides utilities for the flexible creation of OPC UA connections."""

from __future__ import annotations

import asyncio
import concurrent.futures
import socket
from concurrent.futures import (
    CancelledError as ConCancelledError,
    TimeoutError as ConTimeoutError,
)
from contextlib import contextmanager
from datetime import datetime, timedelta
from logging import getLogger
from typing import TYPE_CHECKING

import asyncua.sync
import pandas as pd

# TODO: add async import: from asyncua import Client as asyncClient
# https://git.ptw.maschinenbau.tu-darmstadt.de/eta-fabrik/public/eta-utility/-/issues/270
from asyncua import ua

# TODO: add async import: from asyncua.common.subscription import Subscription as asyncSubscription
# https://git.ptw.maschinenbau.tu-darmstadt.de/eta-fabrik/public/eta-utility/-/issues/270
from asyncua.crypto.security_policies import SecurityPolicyBasic256Sha256

# Synchronous imports
from asyncua.sync import Client, Subscription
from asyncua.ua import SecurityPolicy, uaerrors

from eta_utility import KeyCertPair, Suppressor
from eta_utility.connectors.node import NodeOpcUa

from .util import IntervalChecker, RetryWaiter

if TYPE_CHECKING:
    from collections.abc import Generator, Mapping, Sequence
    from typing import Any

    # Sync import
    from asyncua.sync import SyncNode as SyncOpcNode

    # Async import
    # TODO: add async import: from asyncua import Node as asyncSyncOpcNode
    # https://git.ptw.maschinenbau.tu-darmstadt.de/eta-fabrik/public/eta-utility/-/issues/270
    from eta_utility.type_hints import Nodes, TimeStep

from .base_classes import Connection, SubscriptionHandler

log = getLogger(__name__)


[docs] class OpcUaConnection(Connection[NodeOpcUa], protocol="opcua"): """The OPC UA Connection class allows reading and writing from and to OPC UA servers. Additionally, it implements a subscription method, which reads continuously in a specified interval. :param url: URL of the OPC UA Server. :param usr: Username in OPC UA for login. :param pwd: Password in OPC UA for login. :param nodes: List of nodes to use for all operations. """ def __init__( self, url: str, usr: str | None = None, pwd: str | None = None, *, nodes: Nodes[NodeOpcUa] | None = None, key_cert: KeyCertPair | None = None, **kwargs: Any, ) -> None: super().__init__(url, usr, pwd, nodes=nodes) if self._url.scheme != "opc.tcp": raise ValueError("Given URL is not a valid OPC url (scheme: opc.tcp).") self.connection: Client self._connected = False self._retry = RetryWaiter() self._retry_interval_checker = RetryWaiter() self._conn_check_interval = 1 self._sub: Subscription self._subbed_nodes: list[int] = [] self._sub_task: asyncio.Task self._subscription_open: bool = False self._subscription_nodes: set[NodeOpcUa] = set() self.connection_interval_checker = IntervalChecker() self._key_cert: KeyCertPair | None = key_cert self._try_secure_connect = True @classmethod def _from_node( cls, node: NodeOpcUa, usr: str | None = None, pwd: str | None = None, **kwargs: Any ) -> OpcUaConnection: """Initialize the connection object from an OpcUa protocol Node object. :param node: Node to initialize from. :param usr: Username to use. :param pwd: Password to use. :param kwargs: Other arguments are ignored. :return: OpcUaConnection object. """ key_cert = kwargs.get("key_cert") return super()._from_node(node, usr=usr, pwd=pwd, key_cert=key_cert)
[docs] @classmethod def from_ids( cls, ids: Sequence[str], url: str, usr: str | None = None, pwd: str | None = None, ) -> OpcUaConnection: """Initialize the connection object from an OPC UA protocol through the node IDs. :param ids: Identification of the Node. :param url: URL for connection. :param usr: Username in OPC UA for login. :param pwd: Password in OPC UA for login. :return: OpcUaConnection object. """ nodes = [NodeOpcUa(name=opc_id, usr=usr, pwd=pwd, url=url, protocol="opcua", opc_id=opc_id) for opc_id in ids] return cls(nodes[0].url, usr, pwd, nodes=nodes)
[docs] def read(self, nodes: Nodes[NodeOpcUa] | None = None) -> pd.DataFrame: """ Read some manually selected values from OPC UA capable controller. :param nodes: List of nodes to read from. :return: pandas.DataFrame containing current values of the OPC UA-variables. :raises ConnectionError: When an error occurs during reading. """ _nodes = self._validate_nodes(nodes) def read_node(node: NodeOpcUa) -> dict[str, list]: try: opcua_variable = self.connection.get_node(node.opc_id) value = opcua_variable.get_value() if node.dtype is not None: try: value = node.dtype(value) except ValueError as e: raise ConnectionError( f"Failed to typecast value '{value}' at {node.name} to {node.dtype.__name__}." ) from e return {node.name: [value]} except uaerrors.BadNodeIdUnknown: raise ConnectionError( f"The node id ({node.opc_id}) refers to a node that does not exist in the server address space " f"{self.url}. (BadNodeIdUnknown)" ) from None except RuntimeError as e: raise ConnectionError(str(e)) from e values: dict[str, list] = {} with self._connection(), concurrent.futures.ThreadPoolExecutor() as executor: results = executor.map(read_node, _nodes) for result in results: values.update(result) return pd.DataFrame(values, index=[self._assert_tz_awareness(datetime.now())])
[docs] def write(self, values: Mapping[NodeOpcUa, Any]) -> None: """ Writes some manually selected values on OPC UA capable controller. :param values: Dictionary of nodes and data to write {node: value}. :raises ConnectionError: When an error occurs during reading. """ nodes = self._validate_nodes(set(values.keys())) with self._connection(): for node in nodes: try: opcua_variable = self.connection.get_node(node.opc_id) opcua_variable_type = opcua_variable.get_data_type_as_variant_type() value = node.dtype(values[node]) if node.dtype is not None else values[node] opcua_variable.set_value(ua.DataValue(ua.Variant(value, opcua_variable_type))) except uaerrors.BadNodeIdUnknown as e: raise ConnectionError( f"The node id ({node.opc_id}) refers to a node that does not exist in the server address space " f"{self.url}. (BadNodeIdUnknown)" ) from e except RuntimeError as e: raise ConnectionError(str(e)) from e
[docs] def create_nodes(self, nodes: Nodes[NodeOpcUa]) -> None: """Create nodes on the server from a list of nodes. This will try to create the entire node path. :param nodes: List or set of nodes to create. :raises ConnectionError: When an error occurs during node creation. """ def create_object(parent: SyncOpcNode, child: NodeOpcUa) -> SyncOpcNode: children: list[SyncOpcNode] = asyncua.sync._to_sync(parent.tloop, parent.get_children()) for obj in children: ident = obj.nodeid.Identifier ident = ident.strip() if isinstance(ident, str) else ident if child.opc_path_str == ident: return obj return asyncua.sync._to_sync(parent.tloop, parent.add_object(child.opc_id, child.opc_name)) _nodes = self._validate_nodes(nodes) with self._connection(): for node in _nodes: try: if len(node.opc_path) == 0: last_obj = asyncua.sync._to_sync( self.connection.tloop, self.connection.aio_obj.get_objects_node() ) else: sync_node = asyncua.sync._to_sync( self.connection.tloop, self.connection.aio_obj.get_objects_node() ) last_obj = create_object(sync_node, node.opc_path[0]) for key in range(1, len(node.opc_path)): last_obj = create_object(last_obj, node.opc_path[key]) init_val: Any if not hasattr(node, "dtype"): init_val = 0.0 elif node.dtype is int: init_val = 0 elif node.dtype is bool: init_val = False elif node.dtype is str: init_val = "" else: init_val = 0.0 last_obj.add_variable(node.opc_id, node.opc_name, init_val) log.debug(f"OPC UA Node created: {node.opc_id}") except uaerrors.BadNodeIdExists: log.warning(f"Node with NodeId : {node.opc_id} could not be created. It already exists.") except RuntimeError as e: raise ConnectionError(str(e)) from e
[docs] def delete_nodes(self, nodes: Nodes[NodeOpcUa]) -> None: """Delete the given nodes and their parents (if the parents do not have other children). :param nodes: List or set of nodes to be deleted. :raises ConnectionError: If deletion of nodes fails. """ def delete_node_parents(node: SyncOpcNode, depth: int = 20) -> None: parents = node.get_references(direction=ua.BrowseDirection.Inverse) if not node.get_children(): node.delete(delete_references=True) log.info(f"Deleted Node {node.nodeid} from server {self.url}.") else: log.info(f"Node {node.nodeid} on server {self.url} has remaining children and was not deleted.") for parent in parents: if depth > 0: delete_node_parents(self.connection.get_node(parent.NodeId), depth=depth - 1) _nodes = self._validate_nodes(nodes) with self._connection(): for node in _nodes: try: delete_node_parents(self.connection.get_node(node.opc_id)) except uaerrors.BadNodeIdUnknown as e: raise ConnectionError( f"The node id ({node.opc_id}) refers to a node that does not exist in the server address space " f"{self.url}. (BadNodeIdUnknown)" ) from e except RuntimeError as e: raise ConnectionError(str(e)) from e
[docs] def subscribe( self, handler: SubscriptionHandler, nodes: Nodes[NodeOpcUa] | None = None, interval: TimeStep = 1 ) -> None: """Subscribe to nodes and call handler when new data is available. Basic architecture of the subscription is the client- server communication via subscription notify. This function works asynchronously. Subscriptions must always be closed using the close_sub function (use try, finally!). :param nodes: Identifiers for the nodes to subscribe to. :param handler: SubscriptionHandler object with a push method that accepts node, value pairs. :param interval: Interval for receiving new data. It is interpreted as seconds when given as an integer. """ _nodes = self._validate_nodes(nodes) interval = interval if isinstance(interval, timedelta) else timedelta(seconds=interval) self._subscription_nodes.update(_nodes) if self._subscription_open: # Adding nodes to subscription is enough to include them in the query. Do not start an additional loop # if one already exists return self._subscription_open = True loop = asyncio.get_event_loop() self._sub_task = loop.create_task( self._subscription_loop( _OPCSubHandler(handler=handler, interval_check_handler=self.connection_interval_checker), float(interval.total_seconds()), ) )
async def _subscription_loop(self, handler: _OPCSubHandler, interval: float) -> None: """The subscription loop makes sure that the subscription is reset in case the server generates an error. :param handler: Handler object with a push function to receive data. :param interval: Interval for requesting data in seconds. """ subscribed = False while self._subscription_open: try: if not self._connected: await self._retry.wait_async() try: self._connect() except ConnectionError as e: log.warning(f"Retrying connection to {self.url} after: {e}.") continue elif self._connected and not subscribed: try: self._sub = self.connection.create_subscription(interval * 1000, handler) subscribed = True except RuntimeError as e: subscribed = False log.warning(f"Unable to subscribe to server {self.url} - Retrying: {e}.") self._disconnect() continue for node in self._subscription_nodes: try: handler.add_node(node.opc_id, node) # type: ignore self._subbed_nodes.append( self._sub.subscribe_data_change(self.connection.get_node(node.opc_id)) ) except RuntimeError as e: log.warning(f"Could not subscribe to node '{node.name}' on server {self.url}, error: {e}") except (ConnectionAbortedError, ConnectionResetError, TimeoutError, ConCancelledError, BaseException) as e: if isinstance(e, (ConnectionAbortedError, ConnectionResetError)): msg = f"Subscription to the OPC UA server {self.url} is unexpectedly terminated." if isinstance(e, TimeoutError): msg = f"OPC UA client for server {self.url} doesn't receive a response from the server." if isinstance(e, ConCancelledError): msg = ( f"Connection to OPC UA-Server {self.url} was terminated " "during connection establishment or maintenance." ) log.error(f"Handling exception ({e}) for server {self.url}.") if msg: msg += " Trying to reconnect." log.info(msg) subscribed = False self._connected = False # Exit point in case the connection operates normally. if not self._check_connection(): # Push Nan for every node for node in self._subscription_nodes: handler.handler.push(node=node, value=float("nan"), timestamp=datetime.now()) subscribed = False self._connected = False self._disconnect() elif self._connected and subscribed: _changed_within_interval = self.connection_interval_checker.check_interval_connection() if not _changed_within_interval: subscribed = False self._connected = False log.warning( f"The subscription connection for {self.url} doesn't change the values " "anymore. Trying to reconnect." ) self._disconnect() self._retry_interval_checker.tried() await self._retry_interval_checker.wait_async() else: self._retry_interval_checker.success() await asyncio.sleep(self._conn_check_interval)
[docs] def close_sub(self) -> None: """Close an open subscription.""" self._subscription_open = False try: self._sub.unsubscribe(self._subbed_nodes) except BaseException: pass finally: self._subbed_nodes = [] try: self._sub_task.cancel() self._sub.delete() except (OSError, RuntimeError) as e: log.debug(f"Deleting subscription for server {self.url} failed.") log.debug(f"Server {self.url} returned error: {e}.") except (TimeoutError, ConTimeoutError): log.debug(f"Timeout occurred while trying to close the subscription to server {self.url}.") except AttributeError: # Occurs if the subscription did not exist and can be ignored. pass except asyncua.sync.ThreadLoopNotRunning: # Occurs if the subscription (and therefore the thread loop) was already closed and can be ignored. pass self._disconnect()
def _connect(self) -> None: """Connect to server. This will try to securely connect using Basic256SHA256 method before trying an insecure connection.""" if not hasattr(self, "connection"): # Do not reninitialize connection if it already exists self.connection = Client(self.url) self._connected = False if self.usr is not None: self.connection.set_user(self.usr) if self.pwd is not None: self.connection.set_password(self.pwd) self._retry.tried() def _connect_insecure() -> None: self.connection.aio_obj.security_policy = SecurityPolicy() self.connection.aio_obj.uaclient.set_security(self.connection.aio_obj.security_policy) self.connection.connect() def _connect_secure() -> None: assert self._key_cert is not None try: self.connection.set_security( SecurityPolicyBasic256Sha256, self._key_cert.cert_path, self._key_cert.key_path ) with Suppressor(): self.connection.connect() except ua.uaerrors.BadSecurityPolicyRejected: self._try_secure_connect = False _connect_insecure() except ua.UaError as e: if "No matching endpoints" in str(e): self._try_secure_connect = False _connect_insecure() else: raise e except (TimeoutError, ConTimeoutError, asyncio.exceptions.TimeoutError) as e: self._try_secure_connect = False raise ConnectionError("Host timeout during secure connect") from e try: if self._key_cert is not None and self._try_secure_connect: _connect_secure() else: _connect_insecure() except (socket.herror, socket.gaierror) as e: raise ConnectionError(f"Host not found: {self.url}") from e except (socket.timeout, TimeoutError, ConTimeoutError, asyncio.exceptions.TimeoutError) as e: raise ConnectionError(f"Host timeout: {self.url}") from e except ConCancelledError as e: raise ConnectionError(f"Connection cancelled by host: {self.url}") from e except (RuntimeError, ConnectionError) as e: raise ConnectionError(f"OPC Connection Error: {self.url}: {e!s}") from e else: log.debug(f"Connected to OPC UA server: {self.url}") self._connected = True self._retry.success() def _check_connection(self) -> bool: if self._connected: try: self.connection.get_node(ua.FourByteNodeId(ua.ObjectIds.Server_ServerStatus_State)).get_value() except AttributeError: self._connected = False log.debug(f"Connection to server {self.url} did not exist - connection check failed.") except BaseException as e: self._connected = False log.error(f"Error while checking connection to server {self.url}: {e}.") else: self._connected = True if not self._connected: self._disconnect() return self._connected def _disconnect(self) -> None: """Disconnect from server.""" self._connected = False try: self.connection.disconnect() except (ConCancelledError, ConnectionAbortedError): log.debug(f"Connection to {self.url} already closed by server.") except (OSError, RuntimeError) as e: log.debug(f"Closing connection to server {self.url} failed") log.debug(f"Connection to {self.url} returned an error while closing the connection: {e}") except AttributeError: log.debug(f"Connection to server {self.url} already closed.") @contextmanager def _connection(self) -> Generator: """Connect to the server and return a context manager that automatically disconnects when finished.""" try: self._connect() yield None finally: self._disconnect() def _validate_nodes(self, nodes: Nodes[NodeOpcUa] | None) -> set[NodeOpcUa]: vnodes = super()._validate_nodes(nodes) _nodes = set() for node in vnodes: if isinstance(node, NodeOpcUa): _nodes.add(node) return _nodes
class _OPCSubHandler: """Wrapper for the OPC UA subscription. Enables the subscription to use the standardized eta_utility subscription format. :param handler: *eta_utility* style subscription handler. """ def __init__(self, handler: SubscriptionHandler, interval_check_handler: IntervalChecker) -> None: self.handler = handler self._sub_nodes: dict[str | int, NodeOpcUa] = {} self._node_interval_to_check = interval_check_handler def add_node(self, opc_id: str | int, node: NodeOpcUa) -> None: """Add a node to the subscription. This is necessary to translate between formats.""" self._sub_nodes[opc_id] = node def datachange_notification(self, node: NodeOpcUa, val: Any, data: Any) -> None: """ datachange_notification is called whenever subscribed input data is received via OPC UA. This pushes data to the actual eta_utility subscription handler. :param node: Node Object, which was subscribed to and which has sent an updated value. :param val: New value of OPC UA node. :param data: Raw data of OPC UA (not used). """ _time = self.handler._assert_tz_awareness(datetime.now()) self.handler.push(self._sub_nodes[str(node)], val, _time) self._node_interval_to_check.push(node=self._sub_nodes[str(node)], value=val, timestamp=_time) def status_change_notification(self, status: ua.StatusChangeNotification) -> None: pass def event_notification(self, event: ua.EventNotificationList) -> None: pass