跳到主要内容

Subscription 订阅

OpcUaSubscription 是订阅容器, 管理一批 MonitoredItem (MI), 提供 onDataChanged 回调列表 (Java 用 List<Consumer> 替代 C# event +=).

实现 AutoCloseable, 必须在 try-with-resources 内使用.

子页跳转

一分钟速览

import com.darra.opcua.*;
import java.util.Arrays;
import java.util.List;

// 创建
try (OpcUaSubscription sub = ua.createSubscription(500.0 /* publishingIntervalMs */)) {

// 数据变更回调 (Java 用 List<Consumer> 替代 C# event +=)
sub.onDataChanged.add(e -> System.out.printf(
"%s = %s (%s)%n", e.nodeId, e.valueString, e.status));

// 添加 MI
int h1 = sub.add("ns=2;s=Counter", null);
int h2 = sub.add("ns=2;s=Temperature", null);

// 批量添加
List<String> nodes = Arrays.asList("ns=2;s=T1", "ns=2;s=T2", "ns=2;s=T3");
List<int[]> results = sub.addMany(nodes, null, Enums.AttributeId.Value);

// 改单个 MI 的采样间隔
ModifyMonitoredItemResult mod = sub.modifyItem(h1, 100.0);

// 暂停 / 恢复采集
sub.setMode(h1, Enums.MonitoringMode.Disabled);
sub.setMode(h1, Enums.MonitoringMode.Reporting);

// 移除
sub.remove(h1);

Thread.sleep(60_000); // 让回调跑一会
}

订阅生命周期事件

订阅相关的内部事件全部走统一 ua.events.onAny 通道, Category 区分:

ua.events.onAny.add(e -> {
switch (e.category) {
case SubscriptionCreated: /* sub 创建 */ break;
case MonitoredItemAdded: /* mi 添加 */ break;
case MonitoredItemRemoved: /* mi 移除 */ break;
case SubscriptionLost: /* 订阅丢失 */ break;
case DataChange: /* 数据变更 */ break;
}
});

详见 Session 事件.

OpcUaSubscription 主要 API

方法说明
add(nodeId, Consumer)添加单 MI (Value, samplingInterval=-1)
add(nodeId, Consumer, AttributeId, samplingMs)添加单 MI (全参)
addMany(nodeIds, Consumer, AttributeId)批量添加
remove(int handle)移除单 MI
removeByNodeId(String nodeId)按 NodeId 移除全部对应 MI
modify(double publishingMs, ...)改订阅参数
modifyItem(handle, samplingMs, queueSize, discardOldest)改单 MI 采样
modifyMonitoredItems(List<MonitoredItemModifyRequest>)批量改
setMonitoringMode(MonitoringMode, List<Integer>)批量切 mode
setMode(handle, MonitoringMode)切单 MI mode
setPublishingEnabled(boolean)整条订阅启停
republish(int seqNo)请求重发
pump(int timeoutMs)手动触发一次 Publish
getSession() / getSubscriptionId() / getPublishingIntervalMs() / size() / getMonitoredNodeIds()状态查询
close()释放, 服务端 DeleteSubscription

下一步