Events — ua.events.*
OpcUaSession.events 是 SDK 内部事件统一通道, 把会话生命周期 / 订阅生命周期 / 数据变化 / 通讯异常都汇集到一个对象上, 方便上层 UI 做日志 / 状态栏 / 调试.
与 OPC UA 协议事件区分
ua.events.*— SDK 内部事件 (本节, 与协议无关)ua.subscribe_events(...)— OPC UA 协议事件 (Alarms & Conditions), 见 事件订阅
Python 事件机制
Python 没有 C# EventHandler<T> 强类型机制, SDK 用 List[Callable] 模拟:
ua.events.connected.append(callback) # 注册
ua.events.connected.remove(callback) # 取消
ua.events.connected.clear() # 清空
每个事件就是一个 list, 触发时遍历调用所有 callable. 单个回调抛异常被 SDK 内部 try/except 吞掉, 不影响其他回调和主线程.
完整事件清单
会话生命周期
| 事件 | EventArgs 类 | 字段 | 触发时机 |
|---|---|---|---|
connected | ConnectionEventArgs | endpoint_url, status_code, message, timestamp_utc | 首次连接成功 |
disconnected | ConnectionEventArgs | 同上 | 主动 / 被动断开 |
reconnecting | ConnectionEventArgs | 同上 | 检测到断线, 启动自动重连 |
reconnected | ConnectionEventArgs | 同上 | 自动重连成功 |
keep_alive | KeepAliveEventArgs | server_time_utc, server_status, timestamp_utc | KeepAlive 心跳成功 |
state_changed | SessionStateChangedEventArgs | old_state, new_state, reason, timestamp_utc | 任何 SessionState 变化 |
异常
| 事件 | EventArgs 类 | 字段 | 触发时机 |
|---|---|---|---|
communication_error | CommunicationErrorEventArgs | status_code, category, message, timestamp_utc | 通讯失败 (KeepAlive 失败 / read 异常等) |
subscription_lost | SubscriptionLostEventArgs | subscription_id, reason, will_reconnect, timestamp_utc | 订阅被服务端清理 |
统一通道 events.any
所有类别事件都会同步触发此通道, 字段类 OpcUaEventEntry:
| 字段 | 类型 | 说明 |
|---|---|---|
category | OpcUaEventCategory | 类别枚举 |
severity | OpcUaEventSeverity | 严重度 (Trace/Debug/Info/Warn/Error/Fatal) |
source | str | 事件源 (NodeId / 子组件名) |
message | str | 事件文本 |
status_code | StatusCode | 关联状态码 |
timestamp_utc | datetime | 触发时刻 (UTC) |
original | Any | 强类型 EventArgs (Connected → ConnectionEventArgs 等), 可空 |
OpcUaEventCategory 枚举
events.any 接收的 OpcUaEventEntry.category 完整清单:
Connected, Disconnected, Reconnecting, Reconnected, KeepAlive, StateChanged,
SubscriptionCreated, SubscriptionLost, SubscriptionRestored,
MonitoredItemAdded, MonitoredItemRemoved, SubscriptionCleared,
DataChange, ServerEvent, Alarm,
Read, Write, Browse, Call, HistoryRead,
CommunicationError, SecurityError, ProtocolError,
Info, Diagnostic
OpcUaEventSeverity 枚举
Trace, Debug, Info, Warn, Error, Fatal
用法示例
通用日志钩子 (一行打印 SDK 内所有事件)
def on_any(e):
print(f"[{e.severity.name}] [{e.category.name}] {e.source}: {e.message} ({e.status_code.name})")
ua.events.any.append(on_any)
调试期非常省事 — 不订阅具体事件, 一行 hook 看全部.
状态栏绑定 (PyQt / Tk)
def on_state_changed(e):
# PyQt 必须切回 GUI 线程: QMetaObject.invokeMethod / signal-slot
label.setText(e.new_state.name)
color = "green" if e.new_state == SessionState.Connected else "red"
label.setStyleSheet(f"color: {color}")
ua.events.state_changed.append(on_state_changed)
错误统计
import threading
lock = threading.Lock()
counter = {"err": 0}
def on_err(e):
with lock:
counter["err"] += 1
ua.events.communication_error.append(on_err)
与订阅 data_changed 的关系
订阅 sub.data_changed 仅收到该订阅的事件; ua.events.any 通道里 category == DataChange 的项收到 SDK 下所有订阅的事件 (统一通道). 两者并存:
- 单订阅业务逻辑 →
sub.data_changed - 全局日志 / 录波 / 调试 →
ua.events.any
线程模型
GIL + Publish 线程
事件回调在 C 层 Publish 线程上调用 (Python 端会先拿 GIL). UI 操作必须自行 marshal 到主线程 (PyQt: QMetaObject.invokeMethod / Tk: root.after / asyncio: loop.call_soon_threadsafe).
不要在事件里执行长操作 (>100 ms), 否则会阻塞 Publish 队列. 把数据放队列, 让另一个线程消费:
import queue, threading
q = queue.Queue()
ua.events.any.append(lambda e: q.put(e)) # 短回调
def consumer():
while True:
e = q.get()
# 慢处理 (写文件 / 数据库 / DNN 推理)
threading.Thread(target=consumer, daemon=True).start()