Last active
September 10, 2024 09:43
-
-
Save zarch/b42bc41349e1bc891fd9741c130e6caf to your computer and use it in GitHub Desktop.
Import from xml and populate nodes avoiding duplicates
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env -S uv run | |
| # /// script | |
| # requires-python = ">=3.12,<3.13" | |
| # dependencies = [ | |
| # "asyncua==1.1.5", | |
| # ] | |
| # /// | |
| """Testing import/export from and to an xml file.""" | |
| import asyncio | |
| import logging | |
| import tempfile | |
| from collections.abc import AsyncGenerator, Iterable | |
| from dataclasses import dataclass | |
| from pathlib import Path | |
| from typing import Any | |
| from asyncua import Server, ua | |
| from asyncua.common.node import Node | |
| log = logging.getLogger("syngwopcua") | |
| type Idx = int | |
| type TypeName = str | |
| type SignalName = str | |
| type NodesTypes = dict[TypeName, Node] | |
| type RulesTypes = dict[str, Any] | |
| type NodesSignals = dict[SignalName, Node] | |
| type RulesSignals = dict[SignalName, TypeName] | |
| @dataclass | |
| class TNode: | |
| """Class to keep track of the OPCUA tree.""" | |
| id: ua.NodeId | |
| cls: ua.NodeClass | |
| cls_type: str | |
| bname: str | |
| val: Any | |
| nd: Node | |
| def __str__(self) -> str: | |
| """Return class string.""" | |
| return f"{self.id} | {self.cls_type} | {self.bname}" | |
| async def filter_nodes( | |
| server: Server, | |
| node: Node | None = None, | |
| ns: Iterable[int] | None = None, | |
| id_min: int | None = None, | |
| id_max: int | None = None, | |
| subtype: str = "", | |
| ) -> AsyncGenerator[TNode]: | |
| """Get node tree.""" | |
| node = node or server.get_root_node() | |
| for ch_id in await node.get_children(): | |
| ch_nodes = None | |
| ch = server.get_node(ch_id) | |
| ch_cl = await ch.read_node_class() | |
| ch_bname = await ch.read_browse_name() | |
| match ch_cl: | |
| case ua.NodeClass.Object: | |
| ch_type = subtype + "object" | |
| ch_nodes = filter_nodes( | |
| server, node=ch, ns=ns, id_min=id_min, id_max=id_max | |
| ) | |
| # ch_bname = await ch.read_browse_name() | |
| ch_val = None | |
| case ua.NodeClass.ObjectType: | |
| ch_type = subtype + "object_type" | |
| ch_nodes = filter_nodes( | |
| server, | |
| node=ch, | |
| ns=ns, | |
| id_min=id_min, | |
| id_max=id_max, | |
| subtype="type+", | |
| ) | |
| # ch_bname = await ch.get_browse_name() | |
| ch_val = None | |
| case ua.NodeClass.Variable: | |
| ch_type = subtype + "variable" | |
| # ch_bname = await ch.get_browse_name() | |
| ch_val = await ch.get_value() | |
| ch_nodes = None | |
| case ua.NodeClass.VariableType: | |
| ch_type = subtype + "variable_type" | |
| # ch_bname = await ch.get_browse_name() | |
| ch_val = await ch.get_value() | |
| ch_nodes = None | |
| case ua.NodeClass.DataType: | |
| ch_type = subtype + "data_type" | |
| # ch_bname = await ch.get_browse_name() | |
| ch_val = None | |
| ch_nodes = filter_nodes( | |
| server, node=ch, ns=ns, id_min=id_min, id_max=id_max | |
| ) | |
| case _: | |
| # log.debug(f"NodeClass of type: {ch_cl}:{ch_cl.name} not supported!") | |
| continue | |
| # check filtering conditions | |
| if ( | |
| (id_min is not None and id_min <= ch_id.nodeid.Identifier) | |
| and (id_max is not None and ch_id.nodeid.Identifier <= id_max) | |
| or (ns is not None and ch_id.nodeid.NamespaceIndex in ns) | |
| ): | |
| yield TNode( | |
| id=ch_id, | |
| cls=ch_cl, | |
| cls_type=ch_type, | |
| bname=ch_bname, | |
| val=ch_val, | |
| nd=ch, | |
| ) | |
| if ch_nodes is not None: | |
| async for tnode in ch_nodes: | |
| yield tnode | |
| async def print_filter(nodes: Iterable[TNode]) -> None: | |
| """Print filtered nodes.""" | |
| print() | |
| for _, tnode in sorted([(tnode.id.nodeid.Identifier, tnode) for tnode in nodes]): | |
| print(str(tnode)) | |
| print() | |
| async def get_server( | |
| *, | |
| endpoint: str = "opc.tcp://0.0.0.0:4840/freeopcua/server/", | |
| uri: str = "http://examples.freeopcua.github.io", | |
| ) -> tuple[Server, Idx]: | |
| """Return an OPCUA a tuple with OPCUA server and idx.""" | |
| log.info(f"Initializing an OPCUA Server on: {endpoint} | {uri}") | |
| server = Server() | |
| await server.init() | |
| log.info(f"Set endpoint {endpoint}") | |
| server.set_endpoint(endpoint) | |
| log.info(f"Set register namespace: {uri}") | |
| idx = await server.register_namespace(uri) | |
| log.info(f"Idx: {idx}") | |
| return server, idx | |
| async def find_node_by_browsname(server: Server, idx: Idx, bname: str) -> None | Node: | |
| """Find node by browse name.""" | |
| # check if the node already exists | |
| path = [ | |
| ua.QualifiedName(Name=bname, NamespaceIndex=idx), | |
| ] | |
| try: | |
| node = await server.get_node(server.get_root_node()).get_child(path) | |
| except Exception as exc: | |
| log.debug(f"{bname} not found, Exception raised: {exc}") | |
| else: | |
| return node | |
| async def gen_single_type( | |
| *, | |
| server: Server, | |
| idx: Idx, | |
| type_name: str, | |
| type_kws: dict[str, Any], | |
| parent_node: ua.NodeClass | None = None, | |
| parent_name: str | None = None, | |
| ) -> AsyncGenerator[tuple[TypeName, Node]]: | |
| """Generate a single ua.Node.""" | |
| nd_type = type_kws["type"] | |
| nd_nodes = type_kws.get("nodes", {}) | |
| parent_node = parent_node or server.nodes.objects | |
| bname = type_name if parent_name is None else f"{parent_name}/{type_name}" | |
| # check if the node already exists | |
| if node := await find_node_by_browsname(server, idx, bname): | |
| log.debug(f"{bname} found! Skip node creation: {type(node)}") | |
| yield bname, node | |
| else: | |
| log.debug(f"{bname} not found, creating a new shiny node!") | |
| match nd_type: | |
| case "object_type": | |
| log.info(f"Creating 'object_type' {idx=} | {bname=}") | |
| new_node = await server.nodes.base_object_type.add_object_type( | |
| idx, bname | |
| ) | |
| yield bname, new_node | |
| case "variable" | "property": | |
| params = type_kws.get("params", {}) | |
| val = params.get("val", 0.0) | |
| varianttype = getattr(ua, params.get("varianttype", "Double")) | |
| dtype = params.get("datatype", None) | |
| datatype = getattr(ua, dtype) if dtype else None | |
| log.info( | |
| f"Creating '{nd_type}' {idx=} | {type_name=} | " | |
| f"{val=} | {varianttype=} | {datatype=}" | |
| ) | |
| new_node = await parent_node.add_variable( | |
| nodeid=idx, | |
| bname=bname, | |
| val=val, | |
| varianttype=varianttype, | |
| datatype=datatype, | |
| ) | |
| yield bname, new_node | |
| case "object": | |
| raise NotImplementedError | |
| case _: | |
| raise ValueError(f"{nd_type=}: {type_name=} | {type_kws=}") | |
| # resursivelly check the other sub-nodes | |
| for nd_name, nd_kws in nd_nodes.items(): | |
| async for tpl in gen_single_type( | |
| server=server, | |
| idx=idx, | |
| type_name=nd_name, | |
| type_kws=nd_kws, | |
| parent_node=new_node, | |
| parent_name=bname, | |
| ): | |
| yield tpl | |
| async def get_or_create_type( | |
| *, | |
| server: Server, | |
| idx: Idx, | |
| type_name: str, | |
| type_kws: dict[str, Any], | |
| parent_node: ua.NodeClass | None = None, | |
| parent_name: str | None = None, | |
| types: NodesTypes | None = None, | |
| ) -> AsyncGenerator[tuple[TypeName, Node]]: | |
| """Generate a single ua.Node.""" | |
| if type_name in types: | |
| type_node = types[type_name] | |
| yield type_name, type_node | |
| # check if type already exists | |
| elif type_node := await find_node_by_browsname(server, idx, type_name): | |
| log.debug(f"Type: {type_name} found! Skip type creation") | |
| yield type_name, type_node | |
| else: | |
| nd_type = type_kws["type"] | |
| parent_node = parent_node or server.nodes.objects | |
| log.debug(f"{type_name} not found, creating a new shiny node!") | |
| match nd_type: | |
| case "object_type": | |
| log.info(f"Creating 'object_type' {idx=} | {type_name=}") | |
| type_node = await server.nodes.base_object_type.add_object_type( | |
| idx, type_name | |
| ) | |
| yield type_name, type_node | |
| case "variable" | "property": | |
| params = type_kws.get("params", {}) | |
| val = params.get("val", 0.0) | |
| varianttype = getattr(ua, params.get("varianttype", "Double")) | |
| dtype = params.get("datatype", None) | |
| datatype = getattr(ua, dtype) if dtype else None | |
| log.info( | |
| f"Creating '{nd_type}' {idx=} | {type_name=} | " | |
| f"{val=} | {varianttype=} | {datatype=}" | |
| ) | |
| type_node = await parent_node.add_variable( | |
| nodeid=idx, | |
| bname=type_name, | |
| val=val, | |
| varianttype=varianttype, | |
| datatype=datatype, | |
| ) | |
| yield type_name, type_node | |
| case "object": | |
| raise NotImplementedError | |
| case _: | |
| raise ValueError(f"{nd_type=}: {type_name=} | {type_kws=}") | |
| if nd_nodes := type_kws.get("nodes", {}): | |
| # resursivelly check the other sub-nodes | |
| for nd_name, nd_kws in nd_nodes.items(): | |
| # print(nd_name, nd_kws) | |
| async for tpl in get_or_create_type( | |
| server=server, | |
| idx=idx, | |
| type_name=nd_name, | |
| type_kws=nd_kws, | |
| parent_node=type_node, | |
| parent_name=type_name, | |
| types=types, | |
| ): | |
| yield tpl | |
| async def gen_types( | |
| *, | |
| server: Server, | |
| idx: Idx, | |
| rules_types: dict[str, Any], | |
| types: NodesTypes | None, | |
| ) -> NodesTypes: | |
| """Generate types from dictionary rules.""" | |
| types = types or {} | |
| for type_name, type_kws in rules_types.items(): | |
| async for typ_name, typ_node in get_or_create_type( | |
| server=server, | |
| idx=idx, | |
| type_name=type_name, | |
| type_kws=type_kws, | |
| parent_node=None, | |
| parent_name=None, | |
| types=types, | |
| ): | |
| types[typ_name] = typ_node | |
| return types | |
| async def create_node( | |
| server: Server, | |
| idx: Idx, | |
| bname: str, | |
| nd_type: Node, | |
| parent_node: Node, | |
| ) -> Node: | |
| """Generate single node.""" | |
| # check if the node already exists | |
| nd_type_cl = await nd_type.read_node_class() | |
| match nd_type_cl: | |
| case ua.NodeClass.ObjectType | ua.NodeClass.Object: | |
| return await parent_node.add_object( | |
| nodeid=idx, | |
| bname=bname, | |
| objecttype=nd_type, | |
| ) | |
| case ua.NodeClass.VariableType | ua.NodeClass.Variable: | |
| node = await parent_node.add_variable( | |
| nodeid=idx, | |
| bname=bname, | |
| val=0.0, | |
| varianttype=ua.VariantType.Double, | |
| datatype=nd_type.nodeid, | |
| ) | |
| await node.set_writable(True) | |
| return node | |
| case _: | |
| raise ValueError( | |
| f"{bname} | {nd_type_cl} => Node type not supported!", | |
| ) | |
| async def gen_nodes( | |
| folder: str, | |
| *, | |
| server: Server, | |
| idx: Idx, | |
| rules_signals: RulesSignals, | |
| types: NodesTypes, | |
| signals: NodesSignals | None = None, | |
| ) -> NodesSignals: | |
| """Generate signals nodes.""" | |
| nodes = signals or {} | |
| if folder not in nodes: | |
| # all signals will be defined inside this folder | |
| if node_folder := await find_node_by_browsname(server, idx, folder): | |
| log.debug(f"Folder: {folder} found! Skip node creation") | |
| nodes[folder] = node_folder | |
| else: | |
| nodes[folder] = await server.nodes.objects.add_folder(idx, folder) | |
| for sname, stype in rules_signals.items(): | |
| s_items = sname.split("/") | |
| t_items = stype.split("/") | |
| for j, (s_item, t_item) in enumerate(zip(s_items, t_items, strict=True)): | |
| bname = "/".join(s_items[: j + 1]) | |
| if bname in nodes: | |
| # path already created: skip it! do avoid to duplicate nodes | |
| continue | |
| if node := await find_node_by_browsname(server, idx, bname): | |
| log.debug(f"Node: {bname} found! Skip node creation") | |
| nodes[bname] = node | |
| continue | |
| # find parent node to be used | |
| parent_node = ( | |
| nodes[folder] | |
| if j == 0 | |
| else nodes["/".join(s_items[: s_items.index(s_item)])] | |
| ) | |
| # define node type to be used | |
| t_path = "/".join(t_items[: j + 1]) | |
| try: | |
| t_type = types[t_path] | |
| except KeyError as err: | |
| raise KeyError( | |
| f"{j}> {t_path!r} | {bname} @ {t_item} => type not found!", | |
| ) from err | |
| log.debug(f"Node: {bname} not found, creating a new shiny node!") | |
| nodes[bname] = await create_node( | |
| server=server, | |
| idx=idx, | |
| bname=bname, | |
| nd_type=t_type, | |
| parent_node=parent_node, | |
| ) | |
| return nodes | |
| async def main_server( | |
| folder: str, | |
| rules_types: RulesTypes, | |
| rules_signals: RulesSignals, | |
| *, | |
| endpoint: str = "opc.tcp://0.0.0.0:4840/freeopcua/server/", | |
| uri: str = "http://examples.freeopcua.github.io", | |
| xml: Path | None = None, | |
| ) -> tuple[NodesTypes, NodesSignals]: | |
| """Generate server nodes.""" | |
| server, idx = await get_server(endpoint=endpoint, uri=uri) | |
| nodes = None | |
| if xml is not None and xml.exists(): | |
| log.info(f"Importing nodes from {xml}") | |
| # import nodes from previous sessions | |
| await server.import_xml(xml.as_posix()) | |
| nodes = [nd async for nd in filter_nodes(server, ns=(idx,))] | |
| await print_filter(nodes) | |
| if nodes is not None: | |
| types: NodesTypes = { | |
| nd.bname.Name: nd.nd for nd in nodes if "type" in nd.cls_type | |
| } | |
| signals: NodesSignals = { | |
| nd.bname.Name: nd.nd for nd in nodes if "type" not in nd.cls_type | |
| } | |
| else: | |
| types, signals = None, None | |
| # breakpoint() | |
| # generate types | |
| types = await gen_types( | |
| server=server, idx=idx, rules_types=rules_types, types=types | |
| ) | |
| # generate signals | |
| signals = await gen_nodes( | |
| folder, | |
| server=server, | |
| idx=idx, | |
| rules_signals=rules_signals, | |
| types=types, | |
| signals=signals, | |
| ) | |
| if xml is not None: | |
| log.info(f"Exporting nodes to: {xml}") | |
| nodes = [nd async for nd in filter_nodes(server, ns=(idx,))] | |
| await print_filter(nodes) | |
| await server.export_xml( | |
| [ | |
| # server.nodes.objects, | |
| # server.nodes.root, | |
| *types.values(), | |
| *signals.values(), | |
| ], | |
| xml.as_posix(), | |
| ) | |
| # server.start() | |
| return types, signals | |
| if __name__ == "__main__": | |
| import copy | |
| from pprint import pformat as pf | |
| # log level | |
| logging.basicConfig(level=logging.WARNING) | |
| xml = Path(tempfile.gettempdir()) / "opcua_nodes.xml" | |
| # defnire types | |
| # Define some data structures | |
| double = { | |
| "type": "variable", | |
| "params": { | |
| "val": 0.0, | |
| "varianttype": "Double", | |
| }, | |
| } | |
| rtypes0 = { | |
| "ElectricPanel": { | |
| "type": "object_type", | |
| "nodes": { | |
| "ElectricPanel/current_l1": double, | |
| "ElectricPanel/current_l2": double, | |
| "ElectricPanel/current_l3": double, | |
| }, | |
| }, | |
| } | |
| rsignals0 = { | |
| "ElPnl0/cur_l1": "ElectricPanel/current_l1", | |
| "ElPnl0/cur_l2": "ElectricPanel/current_l2", | |
| "ElPnl0/cur_l3": "ElectricPanel/current_l3", | |
| } | |
| rtypes1 = copy.deepcopy(rtypes0) | |
| rtypes1["ElectricPanel"]["nodes"]["ElectricPanel/active_power"] = double | |
| rsignals1 = copy.deepcopy(rsignals0) | |
| rsignals1["ElPnl0/act_pw"] = "ElectricPanel/active_power" | |
| # Start testing | |
| if xml.exists(): | |
| print(f"Removing previous generated xml file: {xml}") | |
| xml.unlink() | |
| print("Start an empty server") | |
| print(f"Adding the following types:\n{pf(rtypes0, width=40)}") | |
| print(f"Adding the following signals:\n{pf(rsignals0, width=40)}") | |
| types, nodes = asyncio.run( | |
| main_server( | |
| folder="factory", rules_types=rtypes0, rules_signals=rsignals0, xml=xml | |
| ) | |
| ) | |
| print("\n\nStart a server loading from xml generated from previous step.") | |
| types, nodes = asyncio.run( | |
| main_server( | |
| folder="factory", rules_types=rtypes0, rules_signals=rsignals0, xml=xml | |
| ) | |
| ) | |
| print("\n\nStart a server loading from xml and adding new types and signals.") | |
| print("Add type: ElectricPanel/active_power") | |
| print("Add signal: ElPnl0/act_pw") | |
| types, nodes = asyncio.run( | |
| main_server( | |
| folder="factory", rules_types=rtypes1, rules_signals=rsignals1, xml=xml | |
| ) | |
| ) | |
| log.info("Done!") |
Author
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Code used for asyncua discussion on: Import from xml and populate nodes avoiding duplicates