Subscription 订阅
OpcUaSubscription 是订阅容器, 管理一批 MonitoredItem (MI), 提供 onDataChanged 回调列表 (Java 用 List<Consumer> 替代 C# event +=).
实现 AutoCloseable, 必须在 try-with-resources 内使用.
子页跳转
- 创建订阅请参考 创建 Subscription.
- 添加 / 批量添加 MonitoredItem 请参考 添加 MonitoredItem.
- 改 publishingInterval / lifetime / sampling 请参考 Modify.
- 切 Disabled / Sampling / Reporting 请参考 SetMonitoringMode.
- 请求重发指定 SeqNo 请参考 Republish.
- onDataChanged 字段与线程模型请参考 DataChanged 回调.
- OPC UA 报警 / 事件订阅请参考 事件订阅.
一分钟速览
import com.darra.opcua.*;
import java.util.Arrays;
import java.util.List;
// 创建
try (OpcUaSubscription sub = ua.createSubscription(500.0 /* publishingIntervalMs */)) {
// 数据变更回调 (Java 用 List<Consumer> 替代 C# event +=)
sub.onDataChanged.add(e -> System.out.printf(
"%s = %s (%s)%n", e.nodeId, e.valueString, e.status));
// 添加 MI
int h1 = sub.add("ns=2;s=Counter", null);
int h2 = sub.add("ns=2;s=Temperature", null);
// 批量添加
List<String> nodes = Arrays.asList("ns=2;s=T1", "ns=2;s=T2", "ns=2;s=T3");
List<int[]> results = sub.addMany(nodes, null, Enums.AttributeId.Value);
// 改单个 MI 的采样间隔
ModifyMonitoredItemResult mod = sub.modifyItem(h1, 100.0);
// 暂停 / 恢复采集
sub.setMode(h1, Enums.MonitoringMode.Disabled);
sub.setMode(h1, Enums.MonitoringMode.Reporting);
// 移除
sub.remove(h1);
Thread.sleep(60_000); // 让回调跑一会
}
订阅生命周期事件
订阅相关的内部事件全部走统一 ua.events.onAny 通道, Category 区分:
ua.events.onAny.add(e -> {
switch (e.category) {
case SubscriptionCreated: /* sub 创建 */ break;
case MonitoredItemAdded: /* mi 添加 */ break;
case MonitoredItemRemoved: /* mi 移除 */ break;
case SubscriptionLost: /* 订阅丢失 */ break;
case DataChange: /* 数据变更 */ break;
}
});
详见 Session 事件.
OpcUaSubscription 主要 API
| 方法 | 说明 |
|---|---|
add(nodeId, Consumer) | 添加单 MI (Value, samplingInterval=-1) |
add(nodeId, Consumer, AttributeId, samplingMs) | 添加单 MI (全参) |
addMany(nodeIds, Consumer, AttributeId) | 批量添加 |
remove(int handle) | 移除单 MI |
removeByNodeId(String nodeId) | 按 NodeId 移除全部对应 MI |
modify(double publishingMs, ...) | 改订阅参数 |
modifyItem(handle, samplingMs, queueSize, discardOldest) | 改单 MI 采样 |
modifyMonitoredItems(List<MonitoredItemModifyRequest>) | 批量改 |
setMonitoringMode(MonitoringMode, List<Integer>) | 批量切 mode |
setMode(handle, MonitoringMode) | 切单 MI mode |
setPublishingEnabled(boolean) | 整条订阅启停 |
republish(int seqNo) | 请求重发 |
pump(int timeoutMs) | 手动触发一次 Publish |
getSession() / getSubscriptionId() / getPublishingIntervalMs() / size() / getMonitoredNodeIds() | 状态查询 |
close() | 释放, 服务端 DeleteSubscription |