asyncua package#

Subpackages#

Submodules#

asyncua.sync module#

sync API of asyncua

class asyncua.sync.Client(url: str, timeout: int = 4, tloop=None, sync_wrapper_timeout: float | None = 120)#

Bases: object

Sync Client, see doc for async Client the sync client has one extra parameter: sync_wrapper_timeout. if no ThreadLoop is provided this timeout is used to define how long the sync wrapper waits for an async call to return. defualt is 120s and hopefully should fit most applications

activate_session(*args, **kwargs)#
property application_uri#
browse_nodes(*args, **kwargs)#
check_connection(*args, **kwargs)#
close_secure_channel(*args, **kwargs)#
close_session(*args, **kwargs)#
connect(*args, **kwargs)#
connect_and_find_servers(*args, **kwargs)#
connect_and_find_servers_on_network(*args, **kwargs)#
connect_and_get_server_endpoints(*args, **kwargs)#
connect_sessionless(*args, **kwargs)#
connect_socket(*args, **kwargs)#
create_session(*args, **kwargs)#
create_subscription(period: CreateSubscriptionParameters | float, handler: SubHandler, publishing: bool = True) Subscription#
delete_nodes(*args, **kwargs)#
delete_subscriptions(*args, **kwargs)#
disconnect() None#
disconnect_sessionless() None#
disconnect_socket() None#
export_xml(*args, **kwargs)#
find_servers(*args, **kwargs)#
find_servers_on_network(*args, **kwargs)#
get_endpoints(*args, **kwargs)#
get_keepalive_count(period: float) int#
get_namespace_array(*args, **kwargs)#
get_namespace_index(*args, **kwargs)#
get_node(nodeid: SyncNode | NodeId | str | int) SyncNode#
get_objects_node() SyncNode#
get_root_node() SyncNode#
get_server_node() SyncNode#
get_subscription_revised_params(params: CreateSubscriptionParameters, results: CreateSubscriptionResult) ModifySubscriptionParameters | None#
import_xml(*args, **kwargs)#
load_client_certificate(*args, **kwargs)#
load_data_type_definitions(*args, **kwargs)#
load_enums(*args, **kwargs)#
load_private_key(*args, **kwargs)#
load_type_definitions(*args, **kwargs)#
open_secure_channel(*args, **kwargs)#
read_attributes(*args, **kwargs)#
read_values(*args, **kwargs)#
register_namespace(*args, **kwargs)#
register_nodes(*args, **kwargs)#
register_server(*args, **kwargs)#
send_hello(*args, **kwargs)#
server_policy_id(token_type: UserTokenType, default: str) str#
server_policy_uri(token_type: UserTokenType) str#
set_locale(locale: Sequence[str]) None#
set_password(pwd: str) None#
set_security(*args, **kwargs)#
set_security_string(*args, **kwargs)#
set_user(username: str) None#
translate_browsepaths(*args, **kwargs)#
unregister_nodes(*args, **kwargs)#
unregister_server(*args, **kwargs)#
write_values(*args, **kwargs)#
class asyncua.sync.DataTypeDictionaryBuilder(server, idx, ns_urn, dict_name, dict_node_id=None)#

Bases: object

create_data_type(*args, **kwargs)#
property dict_id#
init(*args, **kwargs)#
set_dict_byte_string(*args, **kwargs)#
class asyncua.sync.EventGenerator(tloop, aio_evgen)#

Bases: object

property event#
trigger(time=None, message=None)#
class asyncua.sync.Server(shelf_file=None, tloop=None, sync_wrapper_timeout: float | None = 120)#

Bases: object

Sync Server, see doc for async Server the sync server has one extra parameter: sync_wrapper_timeout. if no ThreadLoop is provided this timeout is used to define how long the sync wrapper waits for an async call to return. defualt is 120s and hopefully should fit most applications

create_subscription(period, handler)#
disable_clock(val: bool = True)#
get_event_generator(*args, **kwargs)#
get_namespace_array(*args, **kwargs)#
get_namespace_index(*args, **kwargs)#
get_node(nodeid)#
import_xml(*args, **kwargs)#
load_certificate(*args, **kwargs)#
load_data_type_definitions(*args, **kwargs)#
load_enums(*args, **kwargs)#
load_private_key(*args, **kwargs)#
load_type_definitions(*args, **kwargs)#
register_namespace(*args, **kwargs)#
set_attribute_value_callback(nodeid: NodeId, callback: Callable[[NodeId, AttributeIds], DataValue], attr=AttributeIds.Value) None#
set_endpoint(url)#
set_security_IDs(policy_ids)#
set_security_policy(security_policy, permission_ruleset=None)#
set_server_name(name)#
start(*args, **kwargs)#
stop()#
write_attribute_value(*args, **kwargs)#
class asyncua.sync.Shortcuts(tloop, aio_server)#

Bases: object

HasComponent: SyncNode#
HasEncoding: SyncNode#
HasProperty: SyncNode#
Organizes: SyncNode#
base_data_type: SyncNode#
base_event_type: SyncNode#
base_object_type: SyncNode#
base_structure_type: SyncNode#
base_union_type: SyncNode#
base_variable_type: SyncNode#
data_types: SyncNode#
enum_data_type: SyncNode#
event_types: SyncNode#
folder_type: SyncNode#
namespace_array: SyncNode#
namespaces: SyncNode#
object_types: SyncNode#
objects: SyncNode#
opc_binary: SyncNode#
option_set_type: SyncNode#
reference_types: SyncNode#
root: SyncNode#
server: SyncNode#
server_state: SyncNode#
service_level: SyncNode#
types: SyncNode#
variable_types: SyncNode#
class asyncua.sync.Subscription(tloop, sub)#

Bases: object

create_monitored_items(*args, **kwargs)#
delete(*args, **kwargs)#
subscribe_data_change(*args, **kwargs)#
subscribe_events(*args, **kwargs)#
unsubscribe(*args, **kwargs)#
class asyncua.sync.SyncNode(tloop: ThreadLoop, aio_node: Node)#

Bases: object

add_data_type(*args, **kwargs)#
add_folder(*args, **kwargs)#
add_method(*args, **kwargs)#
add_object(*args, **kwargs)#
add_object_type(*args, **kwargs)#
add_property(*args, **kwargs)#
add_reference(*args, **kwargs)#
add_reference_type(*args, **kwargs)#
add_variable(*args, **kwargs)#
add_variable_type(*args, **kwargs)#
call_method(*args, **kwargs)#
delete(*args, **kwargs)#
delete_reference(*args, **kwargs)#
get_access_level(*args, **kwargs)#
get_child(path: QualifiedName | str | Iterable[QualifiedName | str], return_all: Literal[False] = False) SyncNode#
get_child(path: QualifiedName | str | Iterable[QualifiedName | str], return_all: Literal[True] = True) List[SyncNode]
get_children(*args, **kwargs)#
get_children_by_path(*args, **kwargs)#
get_children_descriptions(*args, **kwargs)#
get_data_type_as_variant_type(*args, **kwargs)#
get_data_value(*args, **kwargs)#
get_description_refs(*args, **kwargs)#
get_encoding_refs(*args, **kwargs)#
get_methods(*args, **kwargs)#
get_parent(*args, **kwargs)#
get_path(max_length: int = 20, as_string: Literal[False] = False) List[SyncNode]#
get_path(max_length: int = 20, as_string: Literal[True] = True) List[str]
get_properties(*args, **kwargs)#
get_referenced_nodes(*args, **kwargs)#
get_references(*args, **kwargs)#
get_user_access_level(*args, **kwargs)#
get_value(*args, **kwargs)#
get_variables(*args, **kwargs)#
history_read(*args, **kwargs)#
history_read_events(*args, **kwargs)#
property nodeid#
read_array_dimensions(*args, **kwargs)#
read_attribute(*args, **kwargs)#
read_attributes(*args, **kwargs)#
read_browse_name(*args, **kwargs)#
read_data_type(*args, **kwargs)#
read_data_type_as_variant_type(*args, **kwargs)#
read_data_type_definition(*args, **kwargs)#
read_data_value(*args, **kwargs)#
read_description(*args, **kwargs)#
read_display_name(*args, **kwargs)#
read_event_history(*args, **kwargs)#
read_event_notifier(*args, **kwargs)#
read_node_class(*args, **kwargs)#
read_params(*args, **kwargs)#
read_raw_history(*args, **kwargs)#
read_type_definition(*args, **kwargs)#
read_value(*args, **kwargs)#
read_value_rank(*args, **kwargs)#
register(*args, **kwargs)#
set_attr_bit(*args, **kwargs)#
set_event_notifier(*args, **kwargs)#
set_modelling_rule(*args, **kwargs)#
set_value(*args, **kwargs)#
set_writable(*args, **kwargs)#
unregister(*args, **kwargs)#
unset_attr_bit(*args, **kwargs)#
write_array_dimensions(*args, **kwargs)#
write_attribute(*args, **kwargs)#
write_data_type_definition(*args, **kwargs)#
write_params(*args, **kwargs)#
write_value(*args, **kwargs)#
write_value_rank(*args, **kwargs)#
class asyncua.sync.ThreadLoop(timeout: float | None = 120)#

Bases: Thread

post(coro)#
run()#

Method representing the thread’s activity.

You may override this method in a subclass. The standard run() method invokes the callable object passed to the object’s constructor as the target argument, if any, with sequential and keyword arguments taken from the args and kwargs arguments, respectively.

start()#

Start the thread’s activity.

It must be called at most once per thread object. It arranges for the object’s run() method to be invoked in a separate thread of control.

This method will raise a RuntimeError if called more than once on the same thread object.

stop()#
exception asyncua.sync.ThreadLoopNotRunning#

Bases: Exception

class asyncua.sync.XmlExporter(sync_server)#

Bases: object

build_etree(*args, **kwargs)#
write_xml(*args, **kwargs)#
asyncua.sync.call_method_full(*args, **kwargs)#
asyncua.sync.copy_node(*args, **kwargs)#
asyncua.sync.data_type_to_variant_type(*args, **kwargs)#
asyncua.sync.instantiate(*args, **kwargs)#
asyncua.sync.new_enum(*args, **kwargs)#
asyncua.sync.new_node(sync_node, nodeid)#

given a sync node, create a new SyncNode with the given nodeid

asyncua.sync.new_struct(*args, **kwargs)#
asyncua.sync.new_struct_field(name: str, dtype: NodeId | SyncNode | VariantType, array: bool = False, optional: bool = False, description: str = '') StructureField#
asyncua.sync.sync_async_client_method(aio_func)#

Usage:

```python from asyncua.client import Client as AsyncClient from asyncua.sync import Client

with Client(‘otp.tcp://localhost’) as client:

read_attributes = sync_async_client_method(AsyncClient.read_attributes)(client) results = read_attributes(…) …

```

asyncua.sync.sync_uaclient_method(aio_func)#

Usage:

```python from asyncua.client.ua_client import UaClient from asyncua.sync import Client

with Client(‘otp.tcp://localhost’) as client:

read_attributes = sync_uaclient_method(UaClient.read_attributes)(client) results = read_attributes(…) …

```

asyncua.sync.sync_wrapper(aio_func)#
asyncua.sync.syncfunc(aio_func)#

decorator for sync function

asyncua.sync.syncmethod(func)#

decorator for sync methods

asyncua.tools module#

class asyncua.tools.SubHandler#

Bases: object

datachange_notification(node, val, data)#
event_notification(event)#
asyncua.tools.add_common_args(parser, default_node='i=84', require_node=False)#
asyncua.tools.add_minimum_args(parser)#
asyncua.tools.application_to_strings(app)#
asyncua.tools.cert_to_string(der)#
asyncua.tools.embed()#
asyncua.tools.endpoint_to_strings(ep)#
async asyncua.tools.get_node(client, args)#
asyncua.tools.parse_args(parser, requirenodeid=False)#
asyncua.tools.print_history(o)#
asyncua.tools.str_to_datetime(s, default=None)#
asyncua.tools.uacall()#
asyncua.tools.uaclient()#
asyncua.tools.uadiscover()#
asyncua.tools.uageneratestructs()#
asyncua.tools.uahistoryread()#
asyncua.tools.uals()#
asyncua.tools.uaread()#
asyncua.tools.uaserver()#
asyncua.tools.uasubscribe()#
asyncua.tools.uawrite()#