asyncua package#
Subpackages#
Submodules#
asyncua.sync module#
sync API of asyncua
- class asyncua.sync.Client(url: str, timeout: int = 4, tloop=None, sync_wrapper_timeout: float | None = 120)#
Bases:
objectSync Client, see doc for async Client the sync client has one extra parameter: sync_wrapper_timeout. if no ThreadLoop is provided this timeout is used to define how long the sync wrapper waits for an async call to return. defualt is 120s and hopefully should fit most applications
- activate_session(*args, **kwargs)#
- property application_uri#
- browse_nodes(*args, **kwargs)#
- check_connection(*args, **kwargs)#
- close_secure_channel(*args, **kwargs)#
- close_session(*args, **kwargs)#
- connect(*args, **kwargs)#
- connect_and_find_servers(*args, **kwargs)#
- connect_and_find_servers_on_network(*args, **kwargs)#
- connect_and_get_server_endpoints(*args, **kwargs)#
- connect_sessionless(*args, **kwargs)#
- connect_socket(*args, **kwargs)#
- create_session(*args, **kwargs)#
- create_subscription(period: CreateSubscriptionParameters | float, handler: SubHandler, publishing: bool = True) Subscription#
- delete_nodes(*args, **kwargs)#
- delete_subscriptions(*args, **kwargs)#
- export_xml(*args, **kwargs)#
- find_servers(*args, **kwargs)#
- find_servers_on_network(*args, **kwargs)#
- get_endpoints(*args, **kwargs)#
- get_namespace_array(*args, **kwargs)#
- get_namespace_index(*args, **kwargs)#
- get_subscription_revised_params(params: CreateSubscriptionParameters, results: CreateSubscriptionResult) ModifySubscriptionParameters | None#
- import_xml(*args, **kwargs)#
- load_client_certificate(*args, **kwargs)#
- load_data_type_definitions(*args, **kwargs)#
- load_enums(*args, **kwargs)#
- load_private_key(*args, **kwargs)#
- load_type_definitions(*args, **kwargs)#
- open_secure_channel(*args, **kwargs)#
- read_attributes(*args, **kwargs)#
- read_values(*args, **kwargs)#
- register_namespace(*args, **kwargs)#
- register_nodes(*args, **kwargs)#
- register_server(*args, **kwargs)#
- send_hello(*args, **kwargs)#
- set_security(*args, **kwargs)#
- set_security_string(*args, **kwargs)#
- translate_browsepaths(*args, **kwargs)#
- unregister_nodes(*args, **kwargs)#
- unregister_server(*args, **kwargs)#
- write_values(*args, **kwargs)#
- class asyncua.sync.DataTypeDictionaryBuilder(server, idx, ns_urn, dict_name, dict_node_id=None)#
Bases:
object- create_data_type(*args, **kwargs)#
- property dict_id#
- init(*args, **kwargs)#
- set_dict_byte_string(*args, **kwargs)#
- class asyncua.sync.EventGenerator(tloop, aio_evgen)#
Bases:
object- property event#
- trigger(time=None, message=None)#
- class asyncua.sync.Server(shelf_file=None, tloop=None, sync_wrapper_timeout: float | None = 120)#
Bases:
objectSync Server, see doc for async Server the sync server has one extra parameter: sync_wrapper_timeout. if no ThreadLoop is provided this timeout is used to define how long the sync wrapper waits for an async call to return. defualt is 120s and hopefully should fit most applications
- create_subscription(period, handler)#
- get_event_generator(*args, **kwargs)#
- get_namespace_array(*args, **kwargs)#
- get_namespace_index(*args, **kwargs)#
- get_node(nodeid)#
- import_xml(*args, **kwargs)#
- link_method(node, callback)#
- load_certificate(*args, **kwargs)#
- load_data_type_definitions(*args, **kwargs)#
- load_enums(*args, **kwargs)#
- load_private_key(*args, **kwargs)#
- load_type_definitions(*args, **kwargs)#
- register_namespace(*args, **kwargs)#
- set_attribute_value_callback(nodeid: NodeId, callback: Callable[[NodeId, AttributeIds], DataValue], attr=AttributeIds.Value) None#
- set_endpoint(url)#
- set_security_IDs(policy_ids)#
- set_security_policy(security_policy, permission_ruleset=None)#
- set_server_name(name)#
- start(*args, **kwargs)#
- stop()#
- write_attribute_value(*args, **kwargs)#
- class asyncua.sync.Subscription(tloop, sub)#
Bases:
object- create_monitored_items(*args, **kwargs)#
- delete(*args, **kwargs)#
- subscribe_data_change(*args, **kwargs)#
- subscribe_events(*args, **kwargs)#
- unsubscribe(*args, **kwargs)#
- class asyncua.sync.SyncNode(tloop: ThreadLoop, aio_node: Node)#
Bases:
object- add_data_type(*args, **kwargs)#
- add_folder(*args, **kwargs)#
- add_method(*args, **kwargs)#
- add_object(*args, **kwargs)#
- add_object_type(*args, **kwargs)#
- add_property(*args, **kwargs)#
- add_reference(*args, **kwargs)#
- add_reference_type(*args, **kwargs)#
- add_variable(*args, **kwargs)#
- add_variable_type(*args, **kwargs)#
- call_method(*args, **kwargs)#
- delete(*args, **kwargs)#
- delete_reference(*args, **kwargs)#
- get_access_level(*args, **kwargs)#
- get_child(path: QualifiedName | str | Iterable[QualifiedName | str], return_all: Literal[False] = False) SyncNode#
- get_child(path: QualifiedName | str | Iterable[QualifiedName | str], return_all: Literal[True] = True) List[SyncNode]
- get_children(*args, **kwargs)#
- get_children_by_path(*args, **kwargs)#
- get_children_descriptions(*args, **kwargs)#
- get_data_type_as_variant_type(*args, **kwargs)#
- get_data_value(*args, **kwargs)#
- get_description_refs(*args, **kwargs)#
- get_encoding_refs(*args, **kwargs)#
- get_methods(*args, **kwargs)#
- get_parent(*args, **kwargs)#
- get_path(max_length: int = 20, as_string: Literal[False] = False) List[SyncNode]#
- get_path(max_length: int = 20, as_string: Literal[True] = True) List[str]
- get_properties(*args, **kwargs)#
- get_referenced_nodes(*args, **kwargs)#
- get_references(*args, **kwargs)#
- get_user_access_level(*args, **kwargs)#
- get_value(*args, **kwargs)#
- get_variables(*args, **kwargs)#
- history_read(*args, **kwargs)#
- history_read_events(*args, **kwargs)#
- property nodeid#
- read_array_dimensions(*args, **kwargs)#
- read_attribute(*args, **kwargs)#
- read_attributes(*args, **kwargs)#
- read_browse_name(*args, **kwargs)#
- read_data_type(*args, **kwargs)#
- read_data_type_as_variant_type(*args, **kwargs)#
- read_data_type_definition(*args, **kwargs)#
- read_data_value(*args, **kwargs)#
- read_description(*args, **kwargs)#
- read_display_name(*args, **kwargs)#
- read_event_history(*args, **kwargs)#
- read_event_notifier(*args, **kwargs)#
- read_node_class(*args, **kwargs)#
- read_params(*args, **kwargs)#
- read_raw_history(*args, **kwargs)#
- read_type_definition(*args, **kwargs)#
- read_value(*args, **kwargs)#
- read_value_rank(*args, **kwargs)#
- register(*args, **kwargs)#
- set_attr_bit(*args, **kwargs)#
- set_event_notifier(*args, **kwargs)#
- set_modelling_rule(*args, **kwargs)#
- set_value(*args, **kwargs)#
- set_writable(*args, **kwargs)#
- unregister(*args, **kwargs)#
- unset_attr_bit(*args, **kwargs)#
- write_array_dimensions(*args, **kwargs)#
- write_attribute(*args, **kwargs)#
- write_data_type_definition(*args, **kwargs)#
- write_params(*args, **kwargs)#
- write_value(*args, **kwargs)#
- write_value_rank(*args, **kwargs)#
- class asyncua.sync.ThreadLoop(timeout: float | None = 120)#
Bases:
Thread- post(coro)#
- run()#
Method representing the thread’s activity.
You may override this method in a subclass. The standard run() method invokes the callable object passed to the object’s constructor as the target argument, if any, with sequential and keyword arguments taken from the args and kwargs arguments, respectively.
- start()#
Start the thread’s activity.
It must be called at most once per thread object. It arranges for the object’s run() method to be invoked in a separate thread of control.
This method will raise a RuntimeError if called more than once on the same thread object.
- stop()#
- class asyncua.sync.XmlExporter(sync_server)#
Bases:
object- build_etree(*args, **kwargs)#
- write_xml(*args, **kwargs)#
- asyncua.sync.call_method_full(*args, **kwargs)#
- asyncua.sync.copy_node(*args, **kwargs)#
- asyncua.sync.data_type_to_variant_type(*args, **kwargs)#
- asyncua.sync.instantiate(*args, **kwargs)#
- asyncua.sync.new_enum(*args, **kwargs)#
- asyncua.sync.new_node(sync_node, nodeid)#
given a sync node, create a new SyncNode with the given nodeid
- asyncua.sync.new_struct(*args, **kwargs)#
- asyncua.sync.new_struct_field(name: str, dtype: NodeId | SyncNode | VariantType, array: bool = False, optional: bool = False, description: str = '') StructureField#
- asyncua.sync.sync_async_client_method(aio_func)#
Usage:
```python from asyncua.client import Client as AsyncClient from asyncua.sync import Client
- with Client(‘otp.tcp://localhost’) as client:
read_attributes = sync_async_client_method(AsyncClient.read_attributes)(client) results = read_attributes(…) …
- asyncua.sync.sync_uaclient_method(aio_func)#
Usage:
```python from asyncua.client.ua_client import UaClient from asyncua.sync import Client
- with Client(‘otp.tcp://localhost’) as client:
read_attributes = sync_uaclient_method(UaClient.read_attributes)(client) results = read_attributes(…) …
- asyncua.sync.sync_wrapper(aio_func)#
- asyncua.sync.syncfunc(aio_func)#
decorator for sync function
- asyncua.sync.syncmethod(func)#
decorator for sync methods
asyncua.tools module#
- class asyncua.tools.SubHandler#
Bases:
object- datachange_notification(node, val, data)#
- event_notification(event)#
- asyncua.tools.add_common_args(parser, default_node='i=84', require_node=False)#
- asyncua.tools.add_minimum_args(parser)#
- asyncua.tools.application_to_strings(app)#
- asyncua.tools.cert_to_string(der)#
- asyncua.tools.embed()#
- asyncua.tools.endpoint_to_strings(ep)#
- async asyncua.tools.get_node(client, args)#
- asyncua.tools.parse_args(parser, requirenodeid=False)#
- asyncua.tools.print_history(o)#
- asyncua.tools.str_to_datetime(s, default=None)#
- asyncua.tools.uacall()#
- asyncua.tools.uaclient()#
- asyncua.tools.uadiscover()#
- asyncua.tools.uageneratestructs()#
- asyncua.tools.uahistoryread()#
- asyncua.tools.uals()#
- asyncua.tools.uaread()#
- asyncua.tools.uaserver()#
- asyncua.tools.uasubscribe()#
- asyncua.tools.uawrite()#