跳到主要内容

data_changed 事件

OpcUaSubscription.data_changedList[Callable[[DataChangeEventArgs], None]]. MI 数据变化时由服务端推送触发, 内部按列表顺序串行调用所有回调.

sub.data_changed.append(callback)        # 注册
sub.data_changed.remove(callback) # 取消
sub.data_changed.clear() # 清空
关联事件

DataChangeEventArgs

@dataclass
class DataChangeEventArgs:
monitored_item_handle: int = 0
node_id: str = ""
value_string: Optional[str] = None
data_type_name: Optional[str] = None
status: StatusCode = StatusCode.Bad
source_timestamp: Optional[datetime] = None
server_timestamp: Optional[datetime] = None
字段类型说明
monitored_item_handleint触发的 MI 本地句柄
node_idstr该 MI 的 NodeId
value_stringOptional[str]值的字符串表示 (在 C 层同步抽取, 跨线程安全)
data_type_nameOptional[str]内置数据类型枚举名 (如 Double, Int32)
statusStatusCodeDataValue 的 Status
source_timestampOptional[datetime]数据源时间戳 (UTC)
server_timestampOptional[datetime]服务端时间戳 (UTC)

设计权衡

历史上回调里曾暴露 native DataValue 指针, 但因为 native 内存生命周期跨线程, 经常 use-after-free 闪退. 现在 SDK 在 Publish 线程内同步抽取所有字段为扁平 Python dataclass (str + 枚举 + datetime), 跨线程使用零风险, 但代价是不能拿原始 Variant 做高级操作.

如果需要原始 Variant (如解析复杂 ExtensionObject), 在回调里用 ua.read(e.node_id) 同步重读一次 — 比维护 native 生命周期可靠.


例子

简单日志

sub.data_changed.append(
lambda e: print(f"{e.node_id} = {e.value_string} ({e.data_type_name}) @ {e.source_timestamp}"))

写文件 / 数据库

import threading
file_lock = threading.Lock()

def to_csv(e):
with file_lock:
with open("data.csv", "a") as f:
f.write(f"{e.source_timestamp.isoformat()},{e.node_id},{e.value_string}\n")

sub.data_changed.append(to_csv)

UI 绑定 (PyQt)

from PyQt5.QtCore import QMetaObject, Qt, Q_ARG

def on_change(e):
# 切回 GUI 线程
QMetaObject.invokeMethod(label, "setText", Qt.QueuedConnection,
Q_ARG(str, e.value_string or ""))

sub.data_changed.append(on_change)
必须切回 GUI 线程

PyQt / Tk / wxPython 的 UI 操作必须在主线程, 否则崩溃. PyQt 用 QMetaObject.invokeMethod; Tk 用 root.after(0, ...); asyncio 用 loop.call_soon_threadsafe.


线程模型

  • 回调在 C 层 Publish 线程执行 (Python 端拿 GIL), 单线程串行 (同一订阅内部不会并发)
  • 回调里不要执行长操作 (>100 ms), 否则会阻塞 Publish 队列
  • 回调里抛异常会被 SDK 吃掉 (catch + 内部记录), 不会传播到主线程
  • 跨订阅的回调在不同 Publish 线程, 可能并发 — 共享数据自己加锁

与 ua.events.any 的关系

# 方式 A: 单订阅级
sub.data_changed.append(lambda e: ...)

# 方式 B: 全局统一通道 (所有订阅汇集, 字段为 OpcUaEventEntry)
ua.events.any.append(lambda e: ... if e.category == OpcUaEventCategory.DataChange else None)

两个事件对同一数据点都会触发, 选一个用即可.

下一步