data_changed 事件
OpcUaSubscription.data_changed 是 List[Callable[[DataChangeEventArgs], None]]. MI 数据变化时由服务端推送触发, 内部按列表顺序串行调用所有回调.
sub.data_changed.append(callback) # 注册
sub.data_changed.remove(callback) # 取消
sub.data_changed.clear() # 清空
关联事件
- 全局统一通道见
ua.events.any. - OPC UA 报警 / 条件事件 (协议事件) 见 事件订阅.
DataChangeEventArgs
@dataclass
class DataChangeEventArgs:
monitored_item_handle: int = 0
node_id: str = ""
value_string: Optional[str] = None
data_type_name: Optional[str] = None
status: StatusCode = StatusCode.Bad
source_timestamp: Optional[datetime] = None
server_timestamp: Optional[datetime] = None
| 字段 | 类型 | 说明 |
|---|---|---|
monitored_item_handle | int | 触发的 MI 本地句柄 |
node_id | str | 该 MI 的 NodeId |
value_string | Optional[str] | 值的字符串表示 (在 C 层同步抽取, 跨线程安全) |
data_type_name | Optional[str] | 内置数据类型枚举名 (如 Double, Int32) |
status | StatusCode | DataValue 的 Status |
source_timestamp | Optional[datetime] | 数据源时间戳 (UTC) |
server_timestamp | Optional[datetime] | 服务端时间戳 (UTC) |
设计权衡
历史上回调里曾暴露 native DataValue 指针, 但因为 native 内存生命周期跨线程, 经常 use-after-free 闪退. 现在 SDK 在 Publish 线程内同步抽取所有字段为扁平 Python dataclass (str + 枚举 + datetime), 跨线程使用零风险, 但代价是不能拿原始 Variant 做高级操作.
如果需要原始 Variant (如解析复杂 ExtensionObject), 在回调里用 ua.read(e.node_id) 同步重读一次 — 比维护 native 生命周期可靠.
例子
简单日志
sub.data_changed.append(
lambda e: print(f"{e.node_id} = {e.value_string} ({e.data_type_name}) @ {e.source_timestamp}"))
写文件 / 数据库
import threading
file_lock = threading.Lock()
def to_csv(e):
with file_lock:
with open("data.csv", "a") as f:
f.write(f"{e.source_timestamp.isoformat()},{e.node_id},{e.value_string}\n")
sub.data_changed.append(to_csv)
UI 绑定 (PyQt)
from PyQt5.QtCore import QMetaObject, Qt, Q_ARG
def on_change(e):
# 切回 GUI 线程
QMetaObject.invokeMethod(label, "setText", Qt.QueuedConnection,
Q_ARG(str, e.value_string or ""))
sub.data_changed.append(on_change)
必须切回 GUI 线程
PyQt / Tk / wxPython 的 UI 操作必须在主线程, 否则崩溃. PyQt 用 QMetaObject.invokeMethod; Tk 用 root.after(0, ...); asyncio 用 loop.call_soon_threadsafe.
线程模型
- 回调在 C 层 Publish 线程执行 (Python 端拿 GIL), 单线程串行 (同一订阅内部不会并发)
- 回调里不要执行长操作 (>100 ms), 否则会阻塞 Publish 队列
- 回调里抛异常会被 SDK 吃掉 (catch + 内部记录), 不会传播到主线程
- 跨订阅的回调在不同 Publish 线程, 可能并发 — 共享数据自己加锁
与 ua.events.any 的关系
# 方式 A: 单订阅级
sub.data_changed.append(lambda e: ...)
# 方式 B: 全局统一通道 (所有订阅汇集, 字段为 OpcUaEventEntry)
ua.events.any.append(lambda e: ... if e.category == OpcUaEventCategory.DataChange else None)
两个事件对同一数据点都会触发, 选一个用即可.