跳到主要内容

大规模监控 (1000+ 节点)

工厂级 SCADA 经常要监控成千上万个 Tag. 关键不是 SDK 性能, 是怎么用 — 多订阅分流 / AddMany 批量 / 异步消费. 用 C# 示例.

配套示例

完整代码

using DarraOpcUa_Client;
using System;
using System.Collections.Concurrent;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using System.Threading.Channels;

class Program
{
static async Task Main()
{
const string endpoint = "opc.tcp://localhost:4840";

// 模拟: 1500 个 Tag, 按更新频率分三组
var fastTags = Enumerable.Range(1, 100 ).Select(i => $"ns=2;s=Fast.T{i}").ToList(); // 100ms
var mediumTags = Enumerable.Range(1, 800 ).Select(i => $"ns=2;s=Medium.T{i}").ToList(); // 1s
var slowTags = Enumerable.Range(1, 600 ).Select(i => $"ns=2;s=Slow.T{i}").ToList(); // 30s

using var ua = new DarraOpcUa(endpoint);
ua.Connect();
Console.WriteLine($"[OK] 已连接 {endpoint}");

// 1. 异步消费队列
var channel = Channel.CreateUnbounded<DataChangeEventArgs>();
long counter = 0;
_ = Task.Run(async () =>
{
await foreach (var e in channel.Reader.ReadAllAsync())
{
Interlocked.Increment(ref counter);
// TODO: 在这里写数据库 / 文件 / 转发
}
});

// 2. 三个订阅, 不同 publishingInterval
using var fast = ua.CreateSubscription(100);
using var medium = ua.CreateSubscription(1_000);
using var slow = ua.CreateSubscription(30_000);

EventHandler<DataChangeEventArgs> handler = (s, e) => channel.Writer.TryWrite(e);
fast.DataChanged += handler;
medium.DataChanged += handler;
slow.DataChanged += handler;

// 3. AddMany 一次 RPC 加 N 个监控项
fast .AddMany(fastTags);
medium.AddMany(mediumTags);
slow .AddMany(slowTags);
Console.WriteLine($"已订阅 {fastTags.Count + mediumTags.Count + slowTags.Count} 个节点 (3 个 Subscription)");

// 4. 每分钟报告吞吐
var timer = new Timer(_ =>
{
var n = Interlocked.Exchange(ref counter, 0);
Console.WriteLine($"{DateTime.Now:HH:mm:ss} 上一分钟 DataChanges = {n} ({n / 60.0:F0}/s)");
}, null, 60_000, 60_000);

Console.WriteLine("\n运行中, Ctrl+C 退出...");
await Task.Delay(Timeout.Infinite);
}
}

分段说明

第 1 步: 多 Subscription 按频率分流

using var fast   = ua.CreateSubscription(100);     // 100ms 推
using var medium = ua.CreateSubscription(1_000); // 1s 推
using var slow = ua.CreateSubscription(30_000); // 30s 推

核心原则: 不同更新频率的 Tag 分到不同 Subscription. 高频 Tag 放慢订阅 = 浪费带宽 (服务端凑批延迟); 低频 Tag 放快订阅 = 浪费 CPU (服务端反复采样). 经验值: 单 Subscription 不超过 1000 MI, 超过分多个.

第 2 步: AddMany 批量 RPC

fast.AddMany(fastTags);   // 1 次 RPC, ~10ms

循环 Add(tag) 是 N 次 RPC, AddMany 是 1 次. 100 个 Tag 差距是 50 倍.

第 3 步: 回调异步消费

EventHandler<DataChangeEventArgs> handler = (s, e) => channel.Writer.TryWrite(e);

回调在 Publish 线程, 任何阻塞 (写文件 / 写数据库 / 同步 HTTP) 都会拖慢 Publish, 服务端 NotificationQueue 满了会丢数据. 正确做法: 回调只 TryWrite 入 Channel (~us), 后台 Task 异步消费.

注意事项

  • 单 Session 极限: 一般认为 ~10000 MI 是单 Session 上限, 再多建议起多个 Session 分到多个 CPU 核.
  • KeepAliveCount / LifetimeCount 默认能用, 长肥管道 (高带宽高延迟) 可以适当调大避免 Publish 抖动误判断线.
  • DataChange 用 Interlocked.Increment 计数监控吞吐, 比定时拉 ServerStatus 更直接.

相关链接