DataChanged 回调
OpcUaSubscription.onDataChanged 是 List<Consumer<DataChangeEventArgs>> (CopyOnWriteArrayList), 在 MI 数据变化时由服务端推送触发.
public final List<Consumer<DataChangeEventArgs>> onDataChanged;
关联事件
- 全局统一通道见
ua.events.onAny(Category=DataChange). - OPC UA 报警 / 条件事件 (协议事件) 见 事件订阅.
DataChangeEventArgs
| 字段 | 类型 | 说明 |
|---|---|---|
monitoredItemHandle | int | 触发的 MI 本地句柄 |
nodeId | String | 该 MI 的 NodeId |
valueString | String | 值的字符串表示 (在 native Publish 线程内同步抽取, 跨线程安全) |
dataTypeName | String | 内置数据类型枚举名 (如 Double, Int32) |
status | Enums.StatusCode | DataValue 的 Status |
sourceTimestampFt | long | 数据源 FileTime, 0 = null |
serverTimestampFt | long | 服务端 FileTime, 0 = null |
arrivedAtUtc | Instant | Java 端记录的到达时间 |
设计权衡
历史上回调里曾暴露 OpcUaDataValue 原始指针, 但因为 native 内存生命周期跨线程, 经常 use-after-free 闪退. 现在 SDK 在 Publish 线程内同步抽取所有字段为扁平 Java 对象 (字符串 + 枚举 + long FileTime), 跨线程使用零风险, 但代价是不能拿原始 OpcUaVariant 做高级操作.
如果需要原始 OpcUaVariant (如解析复杂 ExtensionObject), 在回调里用 ua.read(e.nodeId) 同步重读一次 — 比维护 native 生命周期可靠.
例子
简单日志
sub.onDataChanged.add(e ->
System.out.printf("%s = %s (%s) ft=%d%n",
e.nodeId, e.valueString, e.dataTypeName, e.sourceTimestampFt));
写文件
final Object writeLock = new Object();
sub.onDataChanged.add(e -> {
synchronized (writeLock) {
try (java.io.FileWriter fw = new java.io.FileWriter("data.csv", true)) {
long ft = e.sourceTimestampFt;
Instant t = (ft > 0)
? Instant.ofEpochMilli((ft - 116444736000000000L) / 10000L)
: Instant.now();
fw.write(t + "," + e.nodeId + "," + e.valueString + "\n");
} catch (Exception ex) { /* 日志 */ }
}
});
UI 绑定 (Swing)
sub.onDataChanged.add(e -> {
SwingUtilities.invokeLater(() -> {
labelTemp.setText(e.valueString);
labelStatus.setForeground(e.status == Enums.StatusCode.Good
? Color.GREEN : Color.RED);
});
});
必须 invokeLater
否则跨线程 UI 操作会触发 Swing 模型异常 (在 EDT 之外修改组件).
JavaFX 用 Platform.runLater(() -> { ... }) 等价.
线程模型
- 回调在 C 层 Publish 线程执行, 单线程串行 (同一订阅内部不会并发)
- 回调里不要执行长操作 (>100 ms), 否则会阻塞 Publish 队列
- 回调里抛异常会被 SDK 吞掉 (try-catch ignored), 不会传播到 native
- 跨订阅的回调在不同 Publish 线程, 可能并发 — 共享数据自己加锁
onDataChanged用CopyOnWriteArrayList,add(consumer)/remove(consumer)线程安全, 派发期间也能动态增删
与 ua.events.onAny 的关系
// 方式 A: 单订阅级
sub.onDataChanged.add(e -> /* ... */);
// 方式 B: 全局统一通道 (所有订阅汇集)
ua.events.onAny.add(entry -> {
if (entry.category == Enums.OpcUaEventCategory.DataChange) {
// entry.source = nodeId, entry.message = valueString
}
});
两个事件对同一数据点都会触发, 选一个用即可.