Skip to content

Instantly share code, notes, and snippets.

@zarch
Last active September 10, 2024 09:43
Show Gist options
  • Select an option

  • Save zarch/b42bc41349e1bc891fd9741c130e6caf to your computer and use it in GitHub Desktop.

Select an option

Save zarch/b42bc41349e1bc891fd9741c130e6caf to your computer and use it in GitHub Desktop.
Import from xml and populate nodes avoiding duplicates
#!/usr/bin/env -S uv run
# /// script
# requires-python = ">=3.12,<3.13"
# dependencies = [
# "asyncua==1.1.5",
# ]
# ///
"""Testing import/export from and to an xml file."""
import asyncio
import logging
import tempfile
from collections.abc import AsyncGenerator, Iterable
from dataclasses import dataclass
from pathlib import Path
from typing import Any
from asyncua import Server, ua
from asyncua.common.node import Node
log = logging.getLogger("syngwopcua")
type Idx = int
type TypeName = str
type SignalName = str
type NodesTypes = dict[TypeName, Node]
type RulesTypes = dict[str, Any]
type NodesSignals = dict[SignalName, Node]
type RulesSignals = dict[SignalName, TypeName]
@dataclass
class TNode:
"""Class to keep track of the OPCUA tree."""
id: ua.NodeId
cls: ua.NodeClass
cls_type: str
bname: str
val: Any
nd: Node
def __str__(self) -> str:
"""Return class string."""
return f"{self.id} | {self.cls_type} | {self.bname}"
async def filter_nodes(
server: Server,
node: Node | None = None,
ns: Iterable[int] | None = None,
id_min: int | None = None,
id_max: int | None = None,
subtype: str = "",
) -> AsyncGenerator[TNode]:
"""Get node tree."""
node = node or server.get_root_node()
for ch_id in await node.get_children():
ch_nodes = None
ch = server.get_node(ch_id)
ch_cl = await ch.read_node_class()
ch_bname = await ch.read_browse_name()
match ch_cl:
case ua.NodeClass.Object:
ch_type = subtype + "object"
ch_nodes = filter_nodes(
server, node=ch, ns=ns, id_min=id_min, id_max=id_max
)
# ch_bname = await ch.read_browse_name()
ch_val = None
case ua.NodeClass.ObjectType:
ch_type = subtype + "object_type"
ch_nodes = filter_nodes(
server,
node=ch,
ns=ns,
id_min=id_min,
id_max=id_max,
subtype="type+",
)
# ch_bname = await ch.get_browse_name()
ch_val = None
case ua.NodeClass.Variable:
ch_type = subtype + "variable"
# ch_bname = await ch.get_browse_name()
ch_val = await ch.get_value()
ch_nodes = None
case ua.NodeClass.VariableType:
ch_type = subtype + "variable_type"
# ch_bname = await ch.get_browse_name()
ch_val = await ch.get_value()
ch_nodes = None
case ua.NodeClass.DataType:
ch_type = subtype + "data_type"
# ch_bname = await ch.get_browse_name()
ch_val = None
ch_nodes = filter_nodes(
server, node=ch, ns=ns, id_min=id_min, id_max=id_max
)
case _:
# log.debug(f"NodeClass of type: {ch_cl}:{ch_cl.name} not supported!")
continue
# check filtering conditions
if (
(id_min is not None and id_min <= ch_id.nodeid.Identifier)
and (id_max is not None and ch_id.nodeid.Identifier <= id_max)
or (ns is not None and ch_id.nodeid.NamespaceIndex in ns)
):
yield TNode(
id=ch_id,
cls=ch_cl,
cls_type=ch_type,
bname=ch_bname,
val=ch_val,
nd=ch,
)
if ch_nodes is not None:
async for tnode in ch_nodes:
yield tnode
async def print_filter(nodes: Iterable[TNode]) -> None:
"""Print filtered nodes."""
print()
for _, tnode in sorted([(tnode.id.nodeid.Identifier, tnode) for tnode in nodes]):
print(str(tnode))
print()
async def get_server(
*,
endpoint: str = "opc.tcp://0.0.0.0:4840/freeopcua/server/",
uri: str = "http://examples.freeopcua.github.io",
) -> tuple[Server, Idx]:
"""Return an OPCUA a tuple with OPCUA server and idx."""
log.info(f"Initializing an OPCUA Server on: {endpoint} | {uri}")
server = Server()
await server.init()
log.info(f"Set endpoint {endpoint}")
server.set_endpoint(endpoint)
log.info(f"Set register namespace: {uri}")
idx = await server.register_namespace(uri)
log.info(f"Idx: {idx}")
return server, idx
async def find_node_by_browsname(server: Server, idx: Idx, bname: str) -> None | Node:
"""Find node by browse name."""
# check if the node already exists
path = [
ua.QualifiedName(Name=bname, NamespaceIndex=idx),
]
try:
node = await server.get_node(server.get_root_node()).get_child(path)
except Exception as exc:
log.debug(f"{bname} not found, Exception raised: {exc}")
else:
return node
async def gen_single_type(
*,
server: Server,
idx: Idx,
type_name: str,
type_kws: dict[str, Any],
parent_node: ua.NodeClass | None = None,
parent_name: str | None = None,
) -> AsyncGenerator[tuple[TypeName, Node]]:
"""Generate a single ua.Node."""
nd_type = type_kws["type"]
nd_nodes = type_kws.get("nodes", {})
parent_node = parent_node or server.nodes.objects
bname = type_name if parent_name is None else f"{parent_name}/{type_name}"
# check if the node already exists
if node := await find_node_by_browsname(server, idx, bname):
log.debug(f"{bname} found! Skip node creation: {type(node)}")
yield bname, node
else:
log.debug(f"{bname} not found, creating a new shiny node!")
match nd_type:
case "object_type":
log.info(f"Creating 'object_type' {idx=} | {bname=}")
new_node = await server.nodes.base_object_type.add_object_type(
idx, bname
)
yield bname, new_node
case "variable" | "property":
params = type_kws.get("params", {})
val = params.get("val", 0.0)
varianttype = getattr(ua, params.get("varianttype", "Double"))
dtype = params.get("datatype", None)
datatype = getattr(ua, dtype) if dtype else None
log.info(
f"Creating '{nd_type}' {idx=} | {type_name=} | "
f"{val=} | {varianttype=} | {datatype=}"
)
new_node = await parent_node.add_variable(
nodeid=idx,
bname=bname,
val=val,
varianttype=varianttype,
datatype=datatype,
)
yield bname, new_node
case "object":
raise NotImplementedError
case _:
raise ValueError(f"{nd_type=}: {type_name=} | {type_kws=}")
# resursivelly check the other sub-nodes
for nd_name, nd_kws in nd_nodes.items():
async for tpl in gen_single_type(
server=server,
idx=idx,
type_name=nd_name,
type_kws=nd_kws,
parent_node=new_node,
parent_name=bname,
):
yield tpl
async def get_or_create_type(
*,
server: Server,
idx: Idx,
type_name: str,
type_kws: dict[str, Any],
parent_node: ua.NodeClass | None = None,
parent_name: str | None = None,
types: NodesTypes | None = None,
) -> AsyncGenerator[tuple[TypeName, Node]]:
"""Generate a single ua.Node."""
if type_name in types:
type_node = types[type_name]
yield type_name, type_node
# check if type already exists
elif type_node := await find_node_by_browsname(server, idx, type_name):
log.debug(f"Type: {type_name} found! Skip type creation")
yield type_name, type_node
else:
nd_type = type_kws["type"]
parent_node = parent_node or server.nodes.objects
log.debug(f"{type_name} not found, creating a new shiny node!")
match nd_type:
case "object_type":
log.info(f"Creating 'object_type' {idx=} | {type_name=}")
type_node = await server.nodes.base_object_type.add_object_type(
idx, type_name
)
yield type_name, type_node
case "variable" | "property":
params = type_kws.get("params", {})
val = params.get("val", 0.0)
varianttype = getattr(ua, params.get("varianttype", "Double"))
dtype = params.get("datatype", None)
datatype = getattr(ua, dtype) if dtype else None
log.info(
f"Creating '{nd_type}' {idx=} | {type_name=} | "
f"{val=} | {varianttype=} | {datatype=}"
)
type_node = await parent_node.add_variable(
nodeid=idx,
bname=type_name,
val=val,
varianttype=varianttype,
datatype=datatype,
)
yield type_name, type_node
case "object":
raise NotImplementedError
case _:
raise ValueError(f"{nd_type=}: {type_name=} | {type_kws=}")
if nd_nodes := type_kws.get("nodes", {}):
# resursivelly check the other sub-nodes
for nd_name, nd_kws in nd_nodes.items():
# print(nd_name, nd_kws)
async for tpl in get_or_create_type(
server=server,
idx=idx,
type_name=nd_name,
type_kws=nd_kws,
parent_node=type_node,
parent_name=type_name,
types=types,
):
yield tpl
async def gen_types(
*,
server: Server,
idx: Idx,
rules_types: dict[str, Any],
types: NodesTypes | None,
) -> NodesTypes:
"""Generate types from dictionary rules."""
types = types or {}
for type_name, type_kws in rules_types.items():
async for typ_name, typ_node in get_or_create_type(
server=server,
idx=idx,
type_name=type_name,
type_kws=type_kws,
parent_node=None,
parent_name=None,
types=types,
):
types[typ_name] = typ_node
return types
async def create_node(
server: Server,
idx: Idx,
bname: str,
nd_type: Node,
parent_node: Node,
) -> Node:
"""Generate single node."""
# check if the node already exists
nd_type_cl = await nd_type.read_node_class()
match nd_type_cl:
case ua.NodeClass.ObjectType | ua.NodeClass.Object:
return await parent_node.add_object(
nodeid=idx,
bname=bname,
objecttype=nd_type,
)
case ua.NodeClass.VariableType | ua.NodeClass.Variable:
node = await parent_node.add_variable(
nodeid=idx,
bname=bname,
val=0.0,
varianttype=ua.VariantType.Double,
datatype=nd_type.nodeid,
)
await node.set_writable(True)
return node
case _:
raise ValueError(
f"{bname} | {nd_type_cl} => Node type not supported!",
)
async def gen_nodes(
folder: str,
*,
server: Server,
idx: Idx,
rules_signals: RulesSignals,
types: NodesTypes,
signals: NodesSignals | None = None,
) -> NodesSignals:
"""Generate signals nodes."""
nodes = signals or {}
if folder not in nodes:
# all signals will be defined inside this folder
if node_folder := await find_node_by_browsname(server, idx, folder):
log.debug(f"Folder: {folder} found! Skip node creation")
nodes[folder] = node_folder
else:
nodes[folder] = await server.nodes.objects.add_folder(idx, folder)
for sname, stype in rules_signals.items():
s_items = sname.split("/")
t_items = stype.split("/")
for j, (s_item, t_item) in enumerate(zip(s_items, t_items, strict=True)):
bname = "/".join(s_items[: j + 1])
if bname in nodes:
# path already created: skip it! do avoid to duplicate nodes
continue
if node := await find_node_by_browsname(server, idx, bname):
log.debug(f"Node: {bname} found! Skip node creation")
nodes[bname] = node
continue
# find parent node to be used
parent_node = (
nodes[folder]
if j == 0
else nodes["/".join(s_items[: s_items.index(s_item)])]
)
# define node type to be used
t_path = "/".join(t_items[: j + 1])
try:
t_type = types[t_path]
except KeyError as err:
raise KeyError(
f"{j}> {t_path!r} | {bname} @ {t_item} => type not found!",
) from err
log.debug(f"Node: {bname} not found, creating a new shiny node!")
nodes[bname] = await create_node(
server=server,
idx=idx,
bname=bname,
nd_type=t_type,
parent_node=parent_node,
)
return nodes
async def main_server(
folder: str,
rules_types: RulesTypes,
rules_signals: RulesSignals,
*,
endpoint: str = "opc.tcp://0.0.0.0:4840/freeopcua/server/",
uri: str = "http://examples.freeopcua.github.io",
xml: Path | None = None,
) -> tuple[NodesTypes, NodesSignals]:
"""Generate server nodes."""
server, idx = await get_server(endpoint=endpoint, uri=uri)
nodes = None
if xml is not None and xml.exists():
log.info(f"Importing nodes from {xml}")
# import nodes from previous sessions
await server.import_xml(xml.as_posix())
nodes = [nd async for nd in filter_nodes(server, ns=(idx,))]
await print_filter(nodes)
if nodes is not None:
types: NodesTypes = {
nd.bname.Name: nd.nd for nd in nodes if "type" in nd.cls_type
}
signals: NodesSignals = {
nd.bname.Name: nd.nd for nd in nodes if "type" not in nd.cls_type
}
else:
types, signals = None, None
# breakpoint()
# generate types
types = await gen_types(
server=server, idx=idx, rules_types=rules_types, types=types
)
# generate signals
signals = await gen_nodes(
folder,
server=server,
idx=idx,
rules_signals=rules_signals,
types=types,
signals=signals,
)
if xml is not None:
log.info(f"Exporting nodes to: {xml}")
nodes = [nd async for nd in filter_nodes(server, ns=(idx,))]
await print_filter(nodes)
await server.export_xml(
[
# server.nodes.objects,
# server.nodes.root,
*types.values(),
*signals.values(),
],
xml.as_posix(),
)
# server.start()
return types, signals
if __name__ == "__main__":
import copy
from pprint import pformat as pf
# log level
logging.basicConfig(level=logging.WARNING)
xml = Path(tempfile.gettempdir()) / "opcua_nodes.xml"
# defnire types
# Define some data structures
double = {
"type": "variable",
"params": {
"val": 0.0,
"varianttype": "Double",
},
}
rtypes0 = {
"ElectricPanel": {
"type": "object_type",
"nodes": {
"ElectricPanel/current_l1": double,
"ElectricPanel/current_l2": double,
"ElectricPanel/current_l3": double,
},
},
}
rsignals0 = {
"ElPnl0/cur_l1": "ElectricPanel/current_l1",
"ElPnl0/cur_l2": "ElectricPanel/current_l2",
"ElPnl0/cur_l3": "ElectricPanel/current_l3",
}
rtypes1 = copy.deepcopy(rtypes0)
rtypes1["ElectricPanel"]["nodes"]["ElectricPanel/active_power"] = double
rsignals1 = copy.deepcopy(rsignals0)
rsignals1["ElPnl0/act_pw"] = "ElectricPanel/active_power"
# Start testing
if xml.exists():
print(f"Removing previous generated xml file: {xml}")
xml.unlink()
print("Start an empty server")
print(f"Adding the following types:\n{pf(rtypes0, width=40)}")
print(f"Adding the following signals:\n{pf(rsignals0, width=40)}")
types, nodes = asyncio.run(
main_server(
folder="factory", rules_types=rtypes0, rules_signals=rsignals0, xml=xml
)
)
print("\n\nStart a server loading from xml generated from previous step.")
types, nodes = asyncio.run(
main_server(
folder="factory", rules_types=rtypes0, rules_signals=rsignals0, xml=xml
)
)
print("\n\nStart a server loading from xml and adding new types and signals.")
print("Add type: ElectricPanel/active_power")
print("Add signal: ElPnl0/act_pw")
types, nodes = asyncio.run(
main_server(
folder="factory", rules_types=rtypes1, rules_signals=rsignals1, xml=xml
)
)
log.info("Done!")
@zarch
Copy link
Author

zarch commented Aug 30, 2024

Code used for asyncua discussion on: Import from xml and populate nodes avoiding duplicates

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment