跳到主要内容

DataChanged 回调

OpcUaSubscription.onDataChangedList<Consumer<DataChangeEventArgs>> (CopyOnWriteArrayList), 在 MI 数据变化时由服务端推送触发.

public final List<Consumer<DataChangeEventArgs>> onDataChanged;
关联事件

DataChangeEventArgs

字段类型说明
monitoredItemHandleint触发的 MI 本地句柄
nodeIdString该 MI 的 NodeId
valueStringString值的字符串表示 (在 native Publish 线程内同步抽取, 跨线程安全)
dataTypeNameString内置数据类型枚举名 (如 Double, Int32)
statusEnums.StatusCodeDataValue 的 Status
sourceTimestampFtlong数据源 FileTime, 0 = null
serverTimestampFtlong服务端 FileTime, 0 = null
arrivedAtUtcInstantJava 端记录的到达时间

设计权衡

历史上回调里曾暴露 OpcUaDataValue 原始指针, 但因为 native 内存生命周期跨线程, 经常 use-after-free 闪退. 现在 SDK 在 Publish 线程内同步抽取所有字段为扁平 Java 对象 (字符串 + 枚举 + long FileTime), 跨线程使用零风险, 但代价是不能拿原始 OpcUaVariant 做高级操作.

如果需要原始 OpcUaVariant (如解析复杂 ExtensionObject), 在回调里用 ua.read(e.nodeId) 同步重读一次 — 比维护 native 生命周期可靠.


例子

简单日志

sub.onDataChanged.add(e ->
System.out.printf("%s = %s (%s) ft=%d%n",
e.nodeId, e.valueString, e.dataTypeName, e.sourceTimestampFt));

写文件

final Object writeLock = new Object();
sub.onDataChanged.add(e -> {
synchronized (writeLock) {
try (java.io.FileWriter fw = new java.io.FileWriter("data.csv", true)) {
long ft = e.sourceTimestampFt;
Instant t = (ft > 0)
? Instant.ofEpochMilli((ft - 116444736000000000L) / 10000L)
: Instant.now();
fw.write(t + "," + e.nodeId + "," + e.valueString + "\n");
} catch (Exception ex) { /* 日志 */ }
}
});

UI 绑定 (Swing)

sub.onDataChanged.add(e -> {
SwingUtilities.invokeLater(() -> {
labelTemp.setText(e.valueString);
labelStatus.setForeground(e.status == Enums.StatusCode.Good
? Color.GREEN : Color.RED);
});
});
必须 invokeLater

否则跨线程 UI 操作会触发 Swing 模型异常 (在 EDT 之外修改组件).

JavaFX 用 Platform.runLater(() -> { ... }) 等价.


线程模型

  • 回调在 C 层 Publish 线程执行, 单线程串行 (同一订阅内部不会并发)
  • 回调里不要执行长操作 (>100 ms), 否则会阻塞 Publish 队列
  • 回调里抛异常会被 SDK 吞掉 (try-catch ignored), 不会传播到 native
  • 跨订阅的回调在不同 Publish 线程, 可能并发 — 共享数据自己加锁
  • onDataChangedCopyOnWriteArrayList, add(consumer) / remove(consumer) 线程安全, 派发期间也能动态增删

与 ua.events.onAny 的关系

// 方式 A: 单订阅级
sub.onDataChanged.add(e -> /* ... */);

// 方式 B: 全局统一通道 (所有订阅汇集)
ua.events.onAny.add(entry -> {
if (entry.category == Enums.OpcUaEventCategory.DataChange) {
// entry.source = nodeId, entry.message = valueString
}
});

两个事件对同一数据点都会触发, 选一个用即可.

下一步