跳到主要内容

Events — ua.events.*

OpcUaSession.events 是 SDK 内部事件统一通道, 把会话生命周期 / 订阅生命周期 / 数据变化 / 通讯异常都汇集到一个对象上, 方便上层 UI 做日志 / 状态栏 / 调试.

与 OPC UA 协议事件区分
  • ua.events.* — SDK 内部事件 (本节, 与协议无关)
  • ua.subscribe_events(...) — OPC UA 协议事件 (Alarms & Conditions), 见 事件订阅

Python 事件机制

Python 没有 C# EventHandler<T> 强类型机制, SDK 用 List[Callable] 模拟:

ua.events.connected.append(callback)        # 注册
ua.events.connected.remove(callback) # 取消
ua.events.connected.clear() # 清空

每个事件就是一个 list, 触发时遍历调用所有 callable. 单个回调抛异常被 SDK 内部 try/except 吞掉, 不影响其他回调和主线程.


完整事件清单

会话生命周期

事件EventArgs 类字段触发时机
connectedConnectionEventArgsendpoint_url, status_code, message, timestamp_utc首次连接成功
disconnectedConnectionEventArgs同上主动 / 被动断开
reconnectingConnectionEventArgs同上检测到断线, 启动自动重连
reconnectedConnectionEventArgs同上自动重连成功
keep_aliveKeepAliveEventArgsserver_time_utc, server_status, timestamp_utcKeepAlive 心跳成功
state_changedSessionStateChangedEventArgsold_state, new_state, reason, timestamp_utc任何 SessionState 变化

异常

事件EventArgs 类字段触发时机
communication_errorCommunicationErrorEventArgsstatus_code, category, message, timestamp_utc通讯失败 (KeepAlive 失败 / read 异常等)
subscription_lostSubscriptionLostEventArgssubscription_id, reason, will_reconnect, timestamp_utc订阅被服务端清理

统一通道 events.any

所有类别事件都会同步触发此通道, 字段类 OpcUaEventEntry:

字段类型说明
categoryOpcUaEventCategory类别枚举
severityOpcUaEventSeverity严重度 (Trace/Debug/Info/Warn/Error/Fatal)
sourcestr事件源 (NodeId / 子组件名)
messagestr事件文本
status_codeStatusCode关联状态码
timestamp_utcdatetime触发时刻 (UTC)
originalAny强类型 EventArgs (Connected → ConnectionEventArgs 等), 可空

OpcUaEventCategory 枚举

events.any 接收的 OpcUaEventEntry.category 完整清单:

Connected, Disconnected, Reconnecting, Reconnected, KeepAlive, StateChanged,
SubscriptionCreated, SubscriptionLost, SubscriptionRestored,
MonitoredItemAdded, MonitoredItemRemoved, SubscriptionCleared,
DataChange, ServerEvent, Alarm,
Read, Write, Browse, Call, HistoryRead,
CommunicationError, SecurityError, ProtocolError,
Info, Diagnostic

OpcUaEventSeverity 枚举

Trace, Debug, Info, Warn, Error, Fatal

用法示例

通用日志钩子 (一行打印 SDK 内所有事件)

def on_any(e):
print(f"[{e.severity.name}] [{e.category.name}] {e.source}: {e.message} ({e.status_code.name})")

ua.events.any.append(on_any)

调试期非常省事 — 不订阅具体事件, 一行 hook 看全部.

状态栏绑定 (PyQt / Tk)

def on_state_changed(e):
# PyQt 必须切回 GUI 线程: QMetaObject.invokeMethod / signal-slot
label.setText(e.new_state.name)
color = "green" if e.new_state == SessionState.Connected else "red"
label.setStyleSheet(f"color: {color}")

ua.events.state_changed.append(on_state_changed)

错误统计

import threading
lock = threading.Lock()
counter = {"err": 0}

def on_err(e):
with lock:
counter["err"] += 1

ua.events.communication_error.append(on_err)

与订阅 data_changed 的关系

订阅 sub.data_changed 仅收到该订阅的事件; ua.events.any 通道里 category == DataChange 的项收到 SDK 下所有订阅的事件 (统一通道). 两者并存:

  • 单订阅业务逻辑 → sub.data_changed
  • 全局日志 / 录波 / 调试 → ua.events.any

线程模型

GIL + Publish 线程

事件回调在 C 层 Publish 线程上调用 (Python 端会先拿 GIL). UI 操作必须自行 marshal 到主线程 (PyQt: QMetaObject.invokeMethod / Tk: root.after / asyncio: loop.call_soon_threadsafe).

不要在事件里执行长操作 (>100 ms), 否则会阻塞 Publish 队列. 把数据放队列, 让另一个线程消费:

import queue, threading
q = queue.Queue()

ua.events.any.append(lambda e: q.put(e)) # 短回调

def consumer():
while True:
e = q.get()
# 慢处理 (写文件 / 数据库 / DNN 推理)

threading.Thread(target=consumer, daemon=True).start()

下一步