asyncua.client.ha package#

Submodules#

asyncua.client.ha.common module#

exception asyncua.client.ha.common.ClientNotFound#

Bases: Exception

asyncua.client.ha.common.batch(iterable, size)#
async asyncua.client.ha.common.event_wait(evt, timeout) bool#
asyncua.client.ha.common.get_digest(conf) str#

asyncua.client.ha.ha_client module#

class asyncua.client.ha.ha_client.ConnectionStates(value)#

Bases: IntEnum

OPC UA Part 4 - Services Release Section 6.6.2.4.2 ServiceLevel

DEGRADED = 2#
HEALTHY = 200#
IN_MAINTENANCE = 0#
NO_DATA = 1#
class asyncua.client.ha.ha_client.HaClient(config: HaConfig, security: HaSecurityConfig | None = None)#

Bases: object

The HaClient is responsible for managing non-transparent server redundancy. The two servers must have:

  • Identical NodeIds

  • Identical browse path and AddressSpace structure

  • Identical Service Level logic

  • However nodes in the server local namespace can differ

  • Time synchronization (e.g NTP)

It starts the OPC-UA clients and connect to the server that fits in the HaMode selected.

HEALTHY_STATE = 200#
async create_subscription(period: int, handler: Any) str#
async debug_status()#

Return the class attribute for troubleshooting purposes

async delete_subscriptions(sub_names: List[str]) None#
async failover_warm(primary: Client | None, secondaries: Iterable[Client]) None#
generate_sub_name() Generator[str, None, None]#

Asyncio unsafe - yield names for subscriptions.

get_client_by_url(url) Client#
get_client_warm_mode() List[Client]#
get_clients() List[Client]#
async get_serving_client(clients: List[Client], serving_client: Client | None) Client | None#

Returns the client with the higher service level.

The service level reference is taken from the active_client, thus we prevent failing over when mutliple clients return the same number.

async group_clients_by_health() Tuple[List[Client], List[Client]]#
property ha_mode: str#
async hook_on_reconnect(**kwargs)#
async hook_on_subscribe(**kwargs)#
async hook_on_unsubscribe(**kwargs)#
async reconnect(client: Client) None#

Reconnect a client of the HA set and add its URL to the reset list.

property session_timeout: int#
set_security(policy: Type[SecurityPolicy], certificate: CertProperties, private_key: CertProperties, server_certificate: CertProperties | None = None, mode: MessageSecurityMode = MessageSecurityMode.SignAndEncrypt) None#
async start() None#
async stop()#
async subscribe_data_change(sub_name: str, nodes: Iterable[Node] | Iterable[str], attr=AttributeIds.Value, queuesize=0) None#
async unsubscribe(nodes: Iterable[Node] | Iterable[str]) None#
property urls: List[str]#
class asyncua.client.ha.ha_client.HaConfig(ha_mode: ~asyncua.client.ha.ha_client.HaMode, keepalive_timer: int = 15, manager_timer: int = 15, reconciliator_timer: int = 15, session_timeout: int = 60, request_timeout: int = 30, secure_channel_timeout: int = 3600, session_name: str = 'HaClient', urls: ~typing.List[str] = <factory>)#

Bases: object

Parameters for the HaClient constructor. Timers and timeouts are all in seconds.

ha_mode: HaMode#
keepalive_timer: int = 15#
manager_timer: int = 15#
reconciliator_timer: int = 15#
request_timeout: int = 30#
secure_channel_timeout: int = 3600#
session_name: str = 'HaClient'#
session_timeout: int = 60#
urls: List[str]#
class asyncua.client.ha.ha_client.HaManager(ha_client: HaClient, timer: int | None = None)#

Bases: object

The manager handles individual client connections according to the selected HaMode

async reconnect_warm() None#

Reconnect disconnected clients

async run() None#
set_loop_timer(timer: int | None)#
async stop() None#
async update_state_warm() None#
class asyncua.client.ha.ha_client.HaMode(value)#

Bases: IntEnum

An enumeration.

COLD = 0#
HOT_A = 2#
HOT_B = 3#
WARM = 1#
class asyncua.client.ha.ha_client.HaSecurityConfig(policy: Type[asyncua.ua.uaprotocol_hand.SecurityPolicy] | None = None, certificate: asyncua.crypto.uacrypto.CertProperties | None = None, private_key: asyncua.crypto.uacrypto.CertProperties | None = None, server_certificate: asyncua.crypto.uacrypto.CertProperties | None = None, mode: asyncua.ua.uaprotocol_auto.MessageSecurityMode | None = None)#

Bases: object

certificate: CertProperties | None = None#
mode: MessageSecurityMode | None = None#
policy: Type[SecurityPolicy] | None = None#
private_key: CertProperties | None = None#
server_certificate: CertProperties | None = None#
class asyncua.client.ha.ha_client.KeepAlive(client, server, timer)#

Bases: object

Ping the server status regularly to check its health

async run() None#
async stop() None#
class asyncua.client.ha.ha_client.ServerInfo(url: str, status: asyncua.client.ha.ha_client.ConnectionStates = <ConnectionStates.NO_DATA: 1>)#

Bases: object

status: ConnectionStates = 1#
url: str#

asyncua.client.ha.reconciliator module#

class asyncua.client.ha.reconciliator.Method(value)#

Bases: Enum

Map the actions to the lower level object methods

ADD_MI = 'subscribe_data_change'#
ADD_SUB = 'create_subscription'#
DEL_MI = 'unsubscribe'#
DEL_SUB = 'delete_subscription'#
MONITORING = 'set_monitoring_mode'#
PUBLISHING = 'set_publishing_mode'#
class asyncua.client.ha.reconciliator.Reconciliator(timer: int, ha_client: HaClient)#

Bases: object

Reconciliator is a side-task of HaClient. It regularly applies the HaClient subscription configurations to actual OPC-UA objects.

After a successfull reconciliation and if all the client status are >= HEALTHY_STATE, the ideal_map is equal to the real_map.

BATCH_MI_SIZE = 1000#
add_to_map(url: str, action: Method, fut: Task, **kwargs) None#
change_mode(url: str, action: Method, val: bool | MonitoringMode, fut: Task, **kwargs) None#
async debug_status() None#

Return the class attribute for troubleshooting purposes

del_from_map(url: str, action: Method, fut: Task, **kwargs) None#
hook_add_to_map(fut: Task, url: str, action: Method, **kwargs)#

placeholder for easily superclass the HaClient and implement custom logic

hook_add_to_map_error(url: str, action: Method, fut: Task, **kwargs)#

placeholder for easily superclass the HaClient and implement custom logic

hook_del_from_map(fut: Task, url: str, **kwargs)#

placeholder for easily superclass the HaClient and implement custom logic

hook_mi_request(url: str, sub_name: str, nodes: Set[SortedDict], action: Method)#

placeholder for easily superclass the HaClient and implement custom logic

init_hooks() None#

Implement hooks for custom actions like collecting metrics or triggering external events.

async reconciliate() None#

Identify the differences between the ideal and the real_map and take actual actions on the underlying OPC-UA objects.

We only tries to reconciliate healthy clients, since most of the unhealthy clients will end up resubscribing and their map will be cleared anyway.

Reconciliator steps are ordered this way:

1 - Resubscribe newly reconnected clients 2 - Identify gap with healthy client configurations 3 - Remove/Add subscription 4 - Add nodes to subscriptions 5 - Update publishing/monitoring options

async resubscribe() None#

Remove all the subscriptions from the real_map.

Deleting them from the remote server would be helpless because they are tied to a deleted session, however they should eventually time out.

async run() None#
async stop() None#
async update_nodes(real_map: Dict[str, SortedDict], ideal_map: Dict[str, SortedDict], targets: Set[str]) None#
async update_subscription_modes(real_map: Dict[str, SortedDict], ideal_map: Dict[str, SortedDict], targets: Set[str]) None#
async update_subscriptions(real_map, ideal_map, targets: Set[str]) None#

asyncua.client.ha.virtual_subscription module#

class asyncua.client.ha.virtual_subscription.NodeAttr(attr: asyncua.ua.attribute_ids.AttributeIds | None = None, queuesize: int = 0)#

Bases: object

attr: AttributeIds | None = None#
queuesize: int = 0#
class asyncua.client.ha.virtual_subscription.VirtualSubscription(period: int, handler: Any, publishing: bool, monitoring: asyncua.ua.uaprotocol_auto.MonitoringMode, nodes: sortedcontainers.sorteddict.SortedDict = <factory>)#

Bases: object

get_nodes() Set[str]#
handler: Any#
monitoring: MonitoringMode#
nodes: SortedDict#
period: int#
publishing: bool#
set_monitoring_mode(mode: MonitoringMode) None#
set_publishing_mode(mode: bool) None#
subscribe_data_change(nodes: Iterable[str], attr: AttributeIds, queuesize: int) None#
unsubscribe(nodes: Iterable[str]) None#