asyncua.server package#

Subpackages#

Submodules#

asyncua.server.address_space module#

class asyncua.server.address_space.AddressSpace#

Bases: object

The address space object stores all the nodes of the OPC-UA server and helper methods. The methods are thread safe

https://reference.opcfoundation.org/Core/docs/Part3/

add_datachange_callback(nodeid: ua.NodeId, attr: ua.AttributeIds, callback: Callable) Tuple[ua.StatusCode, int]#
add_method_callback(methodid: ua.NodeId, callback: Callable)#
clear()#

Delete all nodes in address space

delete_datachange_callback(handle: int)#
dump(path)#

Dump address space as binary to file; note that server must be stopped for this method to work DO NOT DUMP AN ADDRESS SPACE WHICH IS USING A SHELF (load_aspace_shelf), ONLY CACHED NODES WILL GET DUMPED!

generate_nodeid(idx: int | None = None) ua.NodeId#
get(nodeid: ua.NodeId) NodeData | None#
keys()#
load(path)#

Load address space from a binary file, overwriting everything in the current address space

load_aspace_shelf(path)#

Load the standard address space nodes from a python shelve via LazyLoadingDict as needed. The dump() method can no longer be used if the address space is being loaded from a shelf

Note: Intended for slow devices, such as Raspberry Pi, to greatly improve start up time

make_aspace_shelf(path)#

Make a shelf for containing the nodes from the standard address space; this is typically only done on first start of the server. Subsequent server starts will load the shelf, nodes are then moved to a cache by the LazyLoadingDict class when they are accessed. Saving data back to the shelf is currently NOT supported, it is only used for the default OPC UA standard address space

Note: Intended for slow devices, such as Raspberry Pi, to greatly improve start up time

read_attribute_value(nodeid: NodeId, attr: AttributeIds) DataValue#
set_attribute_value_callback(nodeid: ua.NodeId, attr: ua.AttributeIds, callback: Callable[[ua.NodeId, ua.AttributeIds], ua.DataValue]) ua.StatusCode#
async write_attribute_value(nodeid: NodeId, attr: AttributeIds, value: DataValue) StatusCode#
class asyncua.server.address_space.AttributeService(aspace: AddressSpace)#

Bases: object

This class implements the attribute service set defined in the opc ua standard. https://reference.opcfoundation.org/v104/Core/docs/Part4/5.10.1/

read(params: ua.ReadParameters) List[ua.DataValue]#
async write(params: ua.WriteParameters, user: User = User(role=<UserRole.Admin: 0>, name=None)) List[ua.StatusCode]#
class asyncua.server.address_space.AttributeValue(value: DataValue)#

Bases: object

The class holds the value(s) of an attribute and callbacks.

class asyncua.server.address_space.MethodService(aspace: AddressSpace)#

Bases: object

This class implements the method service set defined in the opc ua standard. https://reference.opcfoundation.org/v104/Core/docs/Part4/5.11.1/

async call(methods)#
stop()#
class asyncua.server.address_space.NodeData(nodeid: NodeId)#

Bases: object

The class is internal to asyncua and holds all the information about a Node.

class asyncua.server.address_space.NodeManagementService(aspace: AddressSpace)#

Bases: object

This class implements the node management service set defined in the opc ua standard. https://reference.opcfoundation.org/v105/Core/docs/Part4/5.7.1/

add_nodes(addnodeitems: List[ua.AddNodesItem], user: User = User(role=<UserRole.Admin: 0>, name=None)) List[ua.AddNodesResult]#
add_references(refs: List[ua.AddReferencesItem], user: User = User(role=<UserRole.Admin: 0>, name=None))#
delete_nodes(deletenodeitems: ua.DeleteNodesParameters, user: User = User(role=<UserRole.Admin: 0>, name=None)) List[ua.StatusCode]#
delete_references(refs: List[ua.DeleteReferencesItem], user: User = User(role=<UserRole.Admin: 0>, name=None)) List[ua.StatusCode]#
try_add_nodes(addnodeitems: List[ua.AddNodesItem], user: User = User(role=<UserRole.Admin: 0>, name=None), check: bool = True)#
try_add_references(refs: List[ua.AddReferencesItem], user: User = User(role=<UserRole.Admin: 0>, name=None))#
class asyncua.server.address_space.ViewService(aspace: AddressSpace)#

Bases: object

This class implements the view service set defined in the opc ua standard. https://reference.opcfoundation.org/v104/Core/docs/Part4/5.8.1/

browse(params: ua.BrowseParameters) List[ua.BrowseResult]#
translate_browsepaths_to_nodeids(browsepaths: List[ua.BrowsePath]) List[ua.BrowsePathResult]#

asyncua.server.binary_server_asyncio module#

Socket server forwarding request to internal server

class asyncua.server.binary_server_asyncio.BinaryServer(internal_server: InternalServer, hostname, port, limits: TransportLimits)#

Bases: object

set_policies(policies)#
async start()#
async stop()#
class asyncua.server.binary_server_asyncio.OPCUAProtocol(iserver: InternalServer, policies, clients, closing_tasks, limits: TransportLimits)#

Bases: Protocol

Instantiated for every connection.

connection_lost(ex)#

Called when the connection is lost or closed.

The argument is an exception object or None (the latter meaning a regular EOF is received or the connection was aborted or closed).

connection_made(transport)#

Called when a connection is made.

The argument is the transport representing the pipe connection. To receive data, wait for data_received() calls. When the connection is closed, connection_lost() is called.

data_received(data)#

Called when some data is received.

The argument is a bytes object.

asyncua.server.event_generator module#

class asyncua.server.event_generator.EventGenerator(isession: InternalSession)#

Bases: object

Create an event based on an event type. Per default is BaseEventType used. Object members are dynamically created from the base event type and send to client when evebt is triggered (see example code in source)

Arguments to constructor are:

server: The InternalSession object to use for query and event triggering source: The emiting source for the node, either an objectId, NodeId or a Node etype: The event type, either an objectId, a NodeId or a Node object

async init(etype=None, emitting_node=2253, add_generates_event=True)#
async trigger(time_attr=None, message=None, subscription_id=None)#

Trigger the event. This will send a notification to all subscribed clients

asyncua.server.history module#

class asyncua.server.history.HistoryDict(max_history_data_response_size=10000)#

Bases: HistoryStorageInterface

Very minimal history backend storing data in memory using a Python dictionary

async init()#

Async. initialization. Has to be called once after creation.

async new_historized_event(source_id, evtypes, period, count=0)#

Called when historization of events is enabled on server side Returns None

async new_historized_node(node_id, period, count=0)#

Called when a new node is to be historized Returns None

async read_event_history(source_id, start, end, nb_values, evfilter)#

Called when a client make a history read request for events Start time and end time are inclusive Returns a list of Events and a continuation point which is None if all events are read or the SourceTimeStamp of the last rejected event

async read_node_history(node_id, start, end, nb_values)#

Called when a client make a history read request for a node if start or end is missing then nb_values is used to limit query nb_values is the max number of values to read. Ignored if 0 Start time and end time are inclusive Returns a list of DataValues and a continuation point which is None if all nodes are read or the SourceTimeStamp of the last rejected DataValue

async save_event(event)#

Called when a new event has been generated ans should be saved in history Returns None

async save_node_value(node_id, datavalue)#

Called when the value of a historized node has changed and should be saved in history Returns None

async stop()#

Called when the server shuts down Can be used to close database connections etc.

class asyncua.server.history.HistoryManager(iserver)#

Bases: object

async dehistorize(node)#

Remove subscription to the node/source which is being historized

SQL Implementation Only the subscriptions is removed. The historical data remains.

async historize_data_change(node, period=datetime.timedelta(days=7), count=0)#

Subscribe to the nodes’ data changes and store the data in the active storage.

async historize_event(source, period=datetime.timedelta(days=7), count=0)#

Subscribe to the source nodes’ events and store the data in the active storage.

SQL Implementation The default is to historize every event type the source generates, custom event properties are included. At this time there is no way to historize a specific event type. The user software can filter out events which are not desired when reading.

Note that adding custom events to a source node AFTER historizing has been activated is not supported at this time (in SQL history there will be no columns in the SQL table for the new event properties). For SQL The table must be deleted manually so that a new table with the custom event fields can be created.

async init()#
async read_history(params)#

Read history for a node This is the part AttributeService, but implemented as its own service since it requires more logic than other attribute service methods

set_storage(storage)#

set the desired HistoryStorageInterface which History Manager will use for historizing

async stop()#

call stop methods of active storage interface whenever the server is stopped

update_history(params)#

Update history for a node This is the part AttributeService, but implemented as its own service since it requires more logic than other attribute service methods

class asyncua.server.history.HistoryStorageInterface(max_history_data_response_size=10000)#

Bases: object

Interface of a history backend. Must be implemented by backends

async init()#

Async. initialization. Has to be called once after creation.

async new_historized_event(source_id, evtypes, period, count=0)#

Called when historization of events is enabled on server side Returns None

async new_historized_node(node_id, period, count=0)#

Called when a new node is to be historized Returns None

async read_event_history(source_id, start, end, nb_values, evfilter)#

Called when a client make a history read request for events Start time and end time are inclusive Returns a list of Events and a continuation point which is None if all events are read or the SourceTimeStamp of the last rejected event

async read_node_history(node_id, start, end, nb_values)#

Called when a client make a history read request for a node if start or end is missing then nb_values is used to limit query nb_values is the max number of values to read. Ignored if 0 Start time and end time are inclusive Returns a list of DataValues and a continuation point which is None if all nodes are read or the SourceTimeStamp of the last rejected DataValue

async save_event(event)#

Called when a new event has been generated ans should be saved in history Returns None

async save_node_value(node_id, datavalue)#

Called when the value of a historized node has changed and should be saved in history Returns None

async stop()#

Called when the server shuts down Can be used to close database connections etc.

class asyncua.server.history.SubHandler(storage: HistoryStorageInterface)#

Bases: SubHandler

datachange_notification(node, val, data)#

called for every datachange notification from server

event_notification(event)#

called for every event notification from server

exception asyncua.server.history.UaNodeAlreadyHistorizedError#

Bases: UaError

asyncua.server.history_sql module#

class asyncua.server.history_sql.HistorySQLite(path='history.db', max_history_data_response_size=10000)#

Bases: HistoryStorageInterface

history backend which stores data values and object events in a SQLite database this backend is intended to only be accessed via OPC UA, therefore all UA Variants saved in the history database are in binary format (SQLite BLOBs) note that PARSE_DECLTYPES is active so certain data types (such as datetime) will not be BLOBs

async execute_sql_delete(condition: str, args: Iterable, table: str, node_id)#
async init()#

Async. initialization. Has to be called once after creation.

async new_historized_event(source_id, evtypes, period, count=0)#

Called when historization of events is enabled on server side Returns None

async new_historized_node(node_id, period, count=0)#

Called when a new node is to be historized Returns None

async read_event_history(source_id, start, end, nb_values, evfilter)#

Called when a client make a history read request for events Start time and end time are inclusive Returns a list of Events and a continuation point which is None if all events are read or the SourceTimeStamp of the last rejected event

async read_node_history(node_id, start, end, nb_values)#

Called when a client make a history read request for a node if start or end is missing then nb_values is used to limit query nb_values is the max number of values to read. Ignored if 0 Start time and end time are inclusive Returns a list of DataValues and a continuation point which is None if all nodes are read or the SourceTimeStamp of the last rejected DataValue

async save_event(event)#

Called when a new event has been generated ans should be saved in history Returns None

async save_node_value(node_id, datavalue)#

Called when the value of a historized node has changed and should be saved in history Returns None

async stop()#

Called when the server shuts down Can be used to close database connections etc.

asyncua.server.internal_server module#

Internal server implementing opcu-ua interface. Can be used on server side or to implement binary/https opc-ua servers

class asyncua.server.internal_server.InternalServer(user_manager: UserManager | None = None)#

Bases: object

There is one InternalServer for every Server.

add_endpoint(endpoint)#
certificate_validator: Callable[[Certificate, ApplicationDescription], Awaitable[None]] | None#

hook to validate a certificate, raises a ServiceError when not valid

check_user_token(isession, token)#

unpack the username and password for the benefit of the user defined user manager

create_session(name, user=User(role=<UserRole.Anonymous: 1>, name=None), external=False)#
async disable_history_data_change(node)#

Set attribute Historizing of node to False and stop storing data for history

async disable_history_event(source)#

Set attribute History Read of node to False and stop storing data for history

dump_address_space(path)#

Dump current address space to path

async enable_history_data_change(node, period=datetime.timedelta(days=7), count=0)#

Set attribute Historizing of node to True and start storing data for history

async enable_history_event(source, period=datetime.timedelta(days=7), count=0)#

Set attribute History Read of object events to True and start storing data for history

find_servers(params, sockname=None)#
async get_endpoints(params=None, sockname=None)#
get_new_channel_id()#
async init(shelffile=None)#
load_address_space(path)#

Load address space from path

async load_standard_address_space(shelf_file=None)#
read_attribute_value(nodeid, attr=AttributeIds.Value)#

directly read datavalue of the Attribute

register_server(server, conf=None)#
register_server2(params)#
set_attribute_value_callback(nodeid: NodeId, callback: Callable[[NodeId, AttributeIds], DataValue], attr=AttributeIds.Value) None#

Set a callback function to the Attribute that returns a value for read_attribute_value() instead of the written value. Note that it does not trigger the datachange_callbacks unlike write_attribute_value().

set_user_manager(user_manager)#

set up a function which that will check for authorize users. Input function takes username and password as parameters and returns True of user is allowed access, False otherwise.

async setup_condition_methods()#
async setup_nodes()#

Set up some nodes as defined by spec

async start()#
async stop()#
subscribe_server_callback(event, handle)#

Create a subscription from event to handle

unsubscribe_server_callback(event, handle)#

Remove a subscription from event to handle

async write_attribute_value(nodeid, datavalue, attr=AttributeIds.Value)#

directly write datavalue to the Attribute, bypassing some checks and structure creation, so it is a little faster

class asyncua.server.internal_server.ServerDesc(serv, cap=None)#

Bases: object

asyncua.server.internal_session module#

class asyncua.server.internal_session.InternalSession(internal_server: InternalServer, aspace: ~asyncua.server.address_space.AddressSpace, submgr: ~asyncua.server.subscription_service.SubscriptionService, name, user=User(role=<UserRole.Anonymous: 1>, name=None), external=False)#

Bases: AbstractSession

activate_session(params, peer_certificate)#
add_method_callback(methodid, callback)#
async add_nodes(params)#

https://reference.opcfoundation.org/Core/Part4/v104/5.7.2/

This Service is used to add one or more Nodes into the AddressSpace hierarchy. Using this Service, each Node is added as the TargetNode of a HierarchicalReference to ensure that the AddressSpace is fully connected and that the Node is added as a child within the AddressSpace hierarchy (see OPC 10000-3).

async add_references(params)#

https://reference.opcfoundation.org/Core/Part4/v104/5.7.3/

This Service is used to add one or more References to one or more Nodes. The NodeClass is an input parameter that is used to validate that the Reference to be added matches the NodeClass of the TargetNode. This parameter is not validated if the Reference refers to a TargetNode in a remote Server.

async browse(params)#

https://reference.opcfoundation.org/Core/Part4/v104/5.8.2/

This Service is used to discover the References of a specified Node. The browse can be further limited by the use of a View. This Browse Service also supports a primitive filtering capability.

async browse_next(parameters: BrowseNextParameters) List[BrowseResult]#

https://reference.opcfoundation.org/Core/Part4/v104/5.8.3/

This Service is used to request the next set of Browse or BrowseNext response information that is too large to be sent in a single response. “Too large” in this context means that the Server is not able to return a larger response or that the number of results to return exceeds the maximum number of results to return that was specified by the Client in the original Browse request. The BrowseNext shall be submitted on the same Session that was used to submit the Browse or BrowseNext that is being continued.

async call(params)#

COROUTINE

async close_session(delete_subs=True)#
async create_monitored_items(params: CreateMonitoredItemsParameters)#

Returns Future

async create_session(params: CreateSessionParameters, sockname: Tuple[str, int] | None = None)#
async create_subscription(params, callback, request_callback=None)#

https://reference.opcfoundation.org/Core/Part4/v104/5.13.2/

This Service is used to create a Subscription.

Subscriptions monitor a set of MonitoredItems for Notifications and return them to the Client in response to Publish requests.

async delete_monitored_items(params)#

https://reference.opcfoundation.org/Core/Part4/v104/5.12.6/

This Service is used to remove one or more MonitoredItems of a Subscription. When a MonitoredItem is deleted, its triggered item links are also deleted.

async delete_nodes(params)#

https://reference.opcfoundation.org/Core/Part4/v104/5.7.4/

This Service is used to delete one or more Nodes from the AddressSpace.

async delete_references(params)#

https://reference.opcfoundation.org/Core/Part4/v104/5.7.5/

This Service is used to delete one or more References of a Node.

async delete_subscriptions(ids)#

https://reference.opcfoundation.org/Core/Part4/v104/5.13.8/

This Service is invoked to delete one or more Subscriptions that belong to the Client’s Session.

async get_endpoints(params=None, sockname=None)#
async history_read(params) List[HistoryReadResult]#

https://reference.opcfoundation.org/Core/Part4/v104/5.10.3/

This Service is used to read historical values or Events of one or more Nodes. For constructed Attribute values whose elements are indexed, such as an array, this Service allows Clients to read the entire set of indexed values as a composite, to read individual elements or to read ranges of elements of the composite. Servers may make historical values available to Clients using this Service, although the historical values themselves are not visible in the AddressSpace.

is_activated() bool#
max_connections = 1000#
async modify_monitored_items(params)#

https://reference.opcfoundation.org/Core/Part4/v104/5.12.3/

This Service is used to modify MonitoredItems of a Subscription. Changes to the MonitoredItem settings shall be applied immediately by the Server. They take effect as soon as practical but not later than twice the new revisedSamplingInterval.

modify_subscription(params)#

https://reference.opcfoundation.org/Core/Part4/v104/5.13.3/

This Service is used to modify a Subscription.

publish(acks: Iterable[SubscriptionAcknowledgement] | None = None)#
async read(params)#

https://reference.opcfoundation.org/Core/Part4/v104/5.10.2/

This Service is used to read one or more Attributes of one or more Nodes. For constructed Attribute values whose elements are indexed, such as an array, this Service allows Clients to read the entire set of indexed values as a composite, to read individual elements or to read ranges of elements of the composite.

async register_nodes(nodes: List[NodeId]) List[NodeId]#

https://reference.opcfoundation.org/Core/Part4/v104/5.8.5/

A Server often has no direct access to the information that it manages. Variables or services might be in underlying systems where additional effort is required to establish a connection to these systems. The RegisterNodes Service can be used by Clients to register the Nodes that they know they will access repeatedly (e.g. Write, Call). It allows Servers to set up anything needed so that the access operations will be more efficient. Clients can expect performance improvements when using registered NodeIds, but the optimization measures are vendor-specific. For Variable Nodes Servers shall concentrate their optimization efforts on the Value Attribute.

republish(params)#
async transfer_subscriptions(params: TransferSubscriptionsParameters) List[TransferResult]#

https://reference.opcfoundation.org/Core/Part4/v104/5.13.7/

This Service is used to transfer a Subscription and its MonitoredItems from one Session to another. For example, a Client may need to reopen a Session and then transfer its Subscriptions to that Session. It may also be used by one Client to take over a Subscription from another Client by transferring the Subscription to its Session.

async translate_browsepaths_to_nodeids(params)#

https://reference.opcfoundation.org/Core/Part4/v104/5.8.4/

This Service is used to request that the Server translates one or more browse paths to NodeIds. Each browse path is constructed of a starting Node and a RelativePath. The specified starting Node identifies the Node from which the RelativePath is based. The RelativePath contains a sequence of ReferenceTypes and BrowseNames.

async unregister_nodes(nodes: List[NodeId]) List[NodeId]#

https://reference.opcfoundation.org/Core/Part4/v104/5.8.6/

This Service is used to unregister NodeIds that have been obtained via the RegisterNodes service.

async write(params)#

https://reference.opcfoundation.org/Core/Part4/v104/5.10.4/

This Service is used to write values to one or more Attributes of one or more Nodes. For constructed Attribute values whose elements are indexed, such as an array, this Service allows Clients to write the entire set of indexed values as a composite, to write individual elements or to write ranges of elements of the composite.

class asyncua.server.internal_session.SessionState(value)#

Bases: Enum

An enumeration.

Activated = 1#
Closed = 2#
Created = 0#

asyncua.server.internal_subscription module#

server side implementation of a subscription object

class asyncua.server.internal_subscription.InternalSubscription(data: CreateSubscriptionResult, aspace: AddressSpace, callback, request_callback=None)#

Bases: object

Server internal subscription. Runs the publication loop and stores the Publication Results until they are acknowledged.

async enqueue_datachange_event(mid: int, eventdata: MonitoredItemNotification, maxsize: int)#

Enqueue a monitored item data change. :param mid: Monitored Item Id :param eventdata: Monitored Item Notification :param maxsize: Max queue size (0: No limit)

async enqueue_event(mid: int, eventdata: EventFieldList, maxsize: int)#

Enqueue a event. :param mid: Monitored Item Id :param eventdata: Event Field List :param maxsize: Max queue size (0: No limit)

async enqueue_statuschange(code)#

Enqueue a status change. :param code:

has_published_results()#
publish(acks: Iterable[int])#

Reset publish cycle count, acknowledge PublishResults. :param acks: Sequence number of the PublishResults to acknowledge

async publish_results(requestdata=None)#

Publish all enqueued data changes, events and status changes though the callback. This method gets first called without publish request from subscription loop. It tries to get a publish request itself (if needed). If it doesn’t succeed, method gets queued to be called back with publish request when one is available.

republish(nb)#
async start()#
async stop()#

asyncua.server.monitored_item_service module#

server side implementation of a subscription object

class asyncua.server.monitored_item_service.MonitoredItemData#

Bases: object

class asyncua.server.monitored_item_service.MonitoredItemService(isub, aspace: AddressSpace)#

Bases: object

Implements monitored item service for one subscription

async create_monitored_items(params: CreateMonitoredItemsParameters)#
async datachange_callback(handle: int, value: DataValue, error=None)#
delete_all_monitored_items()#
delete_monitored_items(ids)#
modify_monitored_items(params: ModifyMonitoredItemsParameters)#
async trigger_datachange(handle, nodeid, attr)#
async trigger_event(event, mid=None)#
async trigger_statuschange(code)#
class asyncua.server.monitored_item_service.MonitoredItemValues#

Bases: object

get_current_datavalue() DataValue | None#
get_old_datavalue() DataValue | None#
set_current_datavalue(cur_val: DataValue)#
class asyncua.server.monitored_item_service.WhereClauseEvaluator(logger, aspace: AddressSpace, whereclause)#

Bases: object

eval(event)#

asyncua.server.server module#

High level interface to pure python OPC-UA server

class asyncua.server.server.Server(iserver: InternalServer | None = None, user_manager=None)#

Bases: object

High level Server class

This class creates an asyncua server with default values

Create your own namespace and then populate your server address space using use the get_root() or get_objects() to get Node objects. and get_event_object() to fire events. Then start server. See server-example.py All methods are threadsafe

If you need more flexibility you call directly the Ua Service methods on the iserver or iserver.isession object members.

During startup the standard address space will be constructed, which may be time-consuming when running a server on a less powerful device (e.g. a Raspberry Pi). In order to improve startup performance, an optional path to a cache file can be passed to the server constructor. If the parameter is defined, the address space will be loaded from the cache file or the file will be created if it does not exist yet. As a result the first startup will be even slower due to the cache file generation but all further start ups will be significantly faster. ┌────────┐ │ Server │ ── BinaryServer ── OPCUAProtocol ── UaProcessor │ │ ── InternalServer ── InternalSession └────────┘ ── SubscriptionService

Variables:
  • product_uri

  • name

  • default_timeout – timeout in milliseconds for sessions and secure channel

  • iserverInternalServer instance

  • bserver – binary protocol server BinaryServer

  • nodes – shortcuts to common nodes - Shortcuts instance

  • socket_address – A tuple of IP address and port describing the server socket address. Used when the IP address of the network interface is different from the endpoint IP offered to the client during discovery. Helpful when the server is running behind NAT or inside a Docker container, where the client connects to an external IP, while the server listens on some internal IP.

allow_remote_admin(allow)#

Enable or disable the builtin Admin user from network clients

async create_custom_data_type(idx, name, basetype=24, properties=None, description=None)#
async create_custom_event_type(idx, name, basetype=2041, properties=None)#
async create_custom_object_type(idx, name, basetype=58, properties=None, variables=None, methods=None)#
async create_custom_variable_type(idx, name, basetype=62, properties=None, variables=None, methods=None)#
async create_subscription(period, handler)#

Create a subscription. Returns a Subscription object which allow to subscribe to events or data changes on server :param period: Period in milliseconds :param handler: A class instance - see SubHandler base class for details

async dehistorize_node_data_change(node)#

Stop historizing supplied nodes; see history module :param node: node or list of nodes that can be historized (UA variables/properties)

async dehistorize_node_event(node)#

Stop historizing events from node (typically a UA object); see history module :param node: node or list of nodes that can be historized (UA objects)

async delete_nodes(nodes, recursive=False)#
static determine_security_level(security_policy_uri: str, security_mode: MessageSecurityMode) Byte#

Determine the security level of an EndPoint. The security level indicates how secure an EndPoint is, compared to other EndPoints of the same server. Value 0 is a special value; EndPoint isn’t recommended, typical for ua.MessageSecurityMode.None_.

See Part 4 7.10

To the determine the level the value of ua.SecurityPolicyType is used. The enum values already correspond to and The Enum ua.SecurityPolicyType already contains a value per Enum entry that already correspond to

Args:

security_policy (ua.SecurityPolicy): the used policy security_mode (ua.MessageSecurityMode): the used security mode for the messages.

Returns:

ua.Byte: the returned security level

disable_clock(val: bool = True)#

for debugging you may want to disable clock that write every second to address space

async export_xml(nodes, path, export_values: bool = False)#

Export defined nodes to xml :param export_value: export values from variants

async export_xml_by_ns(path: str, namespaces: list | None = None, export_values: bool = False)#

Export nodes of one or more namespaces to an XML file. Namespaces used by nodes are always exported for consistency. :param path: name of the xml file to write :param namespaces: list of string uris or int indexes of the namespace to export, :param export_values: export values from variants

if not provide all ns are used except 0

async find_servers(uris=None)#

find_servers. mainly implemented for symmetry with client

get_application_uri()#
async get_endpoints()#
async get_event_generator(etype=None, emitting_node=2253)#

Returns an event object using an event type from address space. Use this object to fire events

async get_namespace_array()#

get all namespace defined in server

async get_namespace_index(uri)#

get index of a namespace using its uri

get_node(nodeid: Node | NodeId | str | int) Node#

Get a specific node using NodeId object or a string representing a NodeId

get_objects_node()#

Get Objects node of server. Returns a Node object.

get_root_node()#

Get Root node of server. Returns a Node object.

async historize_node_data_change(node, period=datetime.timedelta(days=7), count=0)#

Start historizing supplied nodes; see history module :param node: node or list of nodes that can be historized (variables/properties) :param period: time delta to store the history; older data will be deleted from the storage :param count: number of changes to store in the history

async historize_node_event(node, period=datetime.timedelta(days=7), count: int = 0)#

Start historizing events from node (typically a UA object); see history module :param node: node or list of nodes that can be historized (UA objects) :param period: time delta to store the history; older data will be deleted from the storage :param count: number of events to store in the history

async import_xml(path=None, xmlstring=None, strict_mode=True)#

Import nodes defined in xml

async init(shelf_file=None)#

Link a python function to a UA method in the address space; required when a UA method has been imported to the address space via XML; the python executable must be linked manually :param node: UA method node :param callback: python function that the UA method will call

async load_certificate(path_or_content: str | bytes, format: str | None = None)#

load server certificate from file, either pem or der

async load_data_type_definitions(node=None)#

Load custom types (custom structures/extension objects) definition from server Generate Python classes for custom structures/extension objects defined in server These classes will be available in ua module

async load_enums()#

load UA structures and generate python Enums in ua module for custom enums in server

async load_private_key(path_or_content: str | Path | bytes, password=None, format=None)#
async load_type_definitions(nodes=None)#

load custom structures from our server. Server side this can be used to create python objects from custom structures imported through xml into server

static lookup_security_level_for_policy_type(security_policy_type: SecurityPolicyType) Byte#

Returns the security level for an ua.SecurityPolicyType.

This is endpoint & server implementation specific!

Returns:

ua.Byte: the found security level

read_attribute_value(nodeid, attr=AttributeIds.Value)#

directly read datavalue of the Attribute

async register_namespace(uri) int#

Register a new namespace. Nodes should in custom namespace, not 0.

async register_to_discovery(url: str = 'opc.tcp://localhost:4840', period: int = 60, discovery_configuration=None)#

Register to an OPC-UA Discovery server. Registering must be renewed at least every 10 minutes, so this method will use our asyncio thread to re-register every period seconds if period is 0 registration is not automatically renewed

async set_application_uri(uri: str)#

Set application/server URI. This uri is supposed to be unique. If you intend to register your server to a discovery server, it really should be unique in your system! default is : “urn:freeopcua:python:server

set_attribute_value_callback(nodeid: NodeId, callback: Callable[[NodeId, AttributeIds], DataValue], attr=AttributeIds.Value) None#

Set a callback function to the Attribute that returns a value for read_attribute_value() instead of the written value. Note that it does not trigger the datachange_callbacks unlike write_attribute_value().

async set_build_info(product_uri, manufacturer_name, product_name, software_version, build_number, build_date)#
set_certificate_validator(validator: Callable[[Certificate, ApplicationDescription], Awaitable[None]])#

Assign a method to be called when certificate needs to be validated.

Function is called with certificate and application description and should raise the correct status code when invalid.

async def example_validation_method(certificate: x509.Certificate, app_description: ua.ApplicationDescription):

… if not_valid_condition:

raise ServiceError(ua.StatusCodes.BadCertificateInvalid)

set_endpoint(url)#
set_force_server_timestamp(force_server_timestamp: bool)#

Enables or disables automatically setting ServerTimestamp on Value attributes

set_match_discovery_client_ip(match_discovery_client_ip: bool)#

Enables or disables the matching of an endpoint IP to a client IP during discovery.

When True (default), the IP address of endpoints sent during the discovery is modified to an IP address of the server network interface used to communicate with the client. Disabling comes handy when the real client IP is different from the client IP that the server sees (e.g., behind NAT or inside Docker container). Do not call unless you know what you are doing.

set_match_discovery_endpoint_url(match_discovery_endpoint_url: bool)#

Enables or disables the matching of the EndpointUrl request parameter during discovery.

When True (default), the host/port of endpoints sent during the discovery is modified to the host/port which is specified in the EndpointUrl request parameter.

set_security_IDs(policy_ids)#

Method setting up the security endpoints for identification of clients. During server object initialization, all possible endpoints are enabled:

self._policyIDs = [“Anonymous”, “Basic256Sha256”, “Username”]

E.g. to limit the number of IDs and disable anonymous clients:

set_security_IDs([“Basic256Sha256”])

(Implementation for ID check is currently not finalized…)

set_security_policy(security_policy, permission_ruleset=None)#

Method setting up the security policies for connections to the server, where security_policy is a list of integers. During server initialization, all endpoints are enabled:

security_policy = [

ua.SecurityPolicyType.NoSecurity, ua.SecurityPolicyType.Basic256Sha256_SignAndEncrypt, ua.SecurityPolicyType.Basic256Sha256_Sign

]

E.g. to limit the number of endpoints and disable no encryption:

set_security_policy([

ua.SecurityPolicyType.Basic256Sha256_SignAndEncrypt])

set_server_name(name)#
async start()#

Start to listen on network

async stop()#

Stop server

subscribe_server_callback(event, handle)#
async unregister_from_discovery(url: str = 'opc.tcp://localhost:4840', discovery_configuration=None)#

stop registration thread

unsubscribe_server_callback(event, handle)#
async write_attribute_value(nodeid, datavalue, attr=AttributeIds.Value)#

directly write datavalue to the Attribute, bypassing some checks and structure creation, so it is a little faster

asyncua.server.subscription_service module#

server side implementation of subscription service

class asyncua.server.subscription_service.SubscriptionService(aspace: AddressSpace)#

Bases: object

Manages subscriptions on the server side. There is one SubscriptionService instance for every Server/InternalServer.

async condition_refresh(*args)#
async create_monitored_items(params: CreateMonitoredItemsParameters)#
async create_subscription(params, callback, request_callback=None)#
delete_monitored_items(params)#
async delete_subscriptions(ids)#
modify_monitored_items(params)#
modify_subscription(params)#
publish(acks: Iterable[SubscriptionAcknowledgement])#
republish(params)#
async trigger_event(event, subscription_id=None)#

asyncua.server.uaprocessor module#

class asyncua.server.uaprocessor.PublishRequestData(requesthdr=None, seqhdr=None)#

Bases: object

class asyncua.server.uaprocessor.UaProcessor(internal_server: InternalServer, transport, limits: TransportLimits)#

Bases: object

Processor for OPC UA messages. Implements the OPC UA protocol for the server side.

async close()#

to be called when client has disconnected to ensure we really close everything we should

async forward_publish_response(result: PublishResult, requestdata: PublishRequestData)#

Try to send a PublishResponse with the given PublishResult.

get_publish_request(subscription_id: UInt32)#
open_secure_channel(algohdr, seqhdr, body)#
async process(header, body)#
async process_message(seqhdr, body)#

Process incoming messages.

send_response(requesthandle, seqhdr, response, msgtype=b'MSG')#
set_policies(policies)#

asyncua.server.user_managers module#

class asyncua.server.user_managers.CertificateUserManager#

Bases: object

Certificate user manager, takes a certificate handler with its associated users and provides those users.

async add_admin(certificate_path: Path, name: str, format: str | None = None)#
async add_role(certificate_path: Path, user_role: UserRole, name: str, format: str | None = None)#
async add_user(certificate_path: Path, name: str, format: str | None = None)#
get_user(iserver, username=None, password=None, certificate=None)#
class asyncua.server.user_managers.PermissiveUserManager#

Bases: object

get_user(iserver, username=None, password=None, certificate=None)#

Default user_manager, does nothing much but check for admin

class asyncua.server.user_managers.UserManager#

Bases: object

get_user(iserver, username=None, password=None, certificate=None)#

asyncua.server.users module#

Implement user management here.

class asyncua.server.users.User(role: asyncua.server.users.UserRole = <UserRole.Anonymous: 1>, name: Optional[str] = None)#

Bases: object

name: str | None = None#
role: UserRole = 1#
class asyncua.server.users.UserRole(value)#

Bases: Enum

User Roles

Admin = 0#
Anonymous = 1#
User = 3#