全流程数据采集到 CSV
把"连接 → 自动发现节点 → 订阅 → 数据落盘"串成一段完整程序. 用 ConcurrentQueue 缓冲, 后台线程异步写 CSV, 避免阻塞 Publish 回调. 用 C# 示例.
完整代码
using DarraOpcUa_Client;
using System;
using System.Collections.Concurrent;
using System.IO;
using System.Threading;
using System.Threading.Tasks;
class Program
{
static async Task Main()
{
const string endpoint = "opc.tcp://localhost:4840";
const string browseRoot = "ns=2;s=Boiler1";
const string csvPath = "data.csv";
// 1. 连接
using var ua = new DarraOpcUa(endpoint);
ua.Connect();
Console.WriteLine($"[OK] 已连接 {endpoint}");
// 2. Browse 自动发现 Variable 节点
var children = ua.Browse(browseRoot, filter: NodeClass.Variable);
Console.WriteLine($"\n在 {browseRoot} 下发现 {children.Count} 个变量:");
foreach (var c in children)
Console.WriteLine($" {c.DisplayName,-20} {c.NodeId}");
// 3. 后台 CSV 写线程 (Producer / Consumer)
var queue = new ConcurrentQueue<string>();
using var cts = new CancellationTokenSource();
var writerTask = Task.Run(() =>
{
using var sw = new StreamWriter(csvPath, append: false);
sw.WriteLine("timestamp,node_id,value,status");
while (!cts.IsCancellationRequested || !queue.IsEmpty)
{
if (queue.TryDequeue(out var line)) sw.WriteLine(line);
else Thread.Sleep(50);
}
sw.Flush();
});
// 4. 订阅全部发现的节点
using var sub = ua.CreateSubscription(500);
sub.DataChanged += (s, e) =>
{
var ts = (e.SourceTimestamp ?? DateTime.UtcNow).ToString("O");
queue.Enqueue($"{ts},{e.NodeId},{e.ValueString},{e.Status}");
};
var nodeIds = children.Select(c => c.NodeId).ToList();
sub.AddMany(nodeIds);
Console.WriteLine($"\n开始写入 {csvPath}, 按 Ctrl+C 停止...\n");
// 5. 优雅退出 (Ctrl+C)
Console.CancelKeyPress += (s, e) => { e.Cancel = true; cts.Cancel(); };
await Task.Delay(Timeout.Infinite, cts.Token).ContinueWith(_ => { });
// 6. 等队列排空再退出
await writerTask;
Console.WriteLine($"已停止, 数据保存到 {csvPath}");
}
}
分段说明
第 1 步: Browse 自动发现节点
var children = ua.Browse(browseRoot, filter: NodeClass.Variable);
filter: NodeClass.Variable 只列变量, 跳过 Object / Method, 拿来即可订阅. 不知道有什么节点时这一步比硬编码 NodeId 灵活.
第 2 步: 后台 CSV 写线程
var queue = new ConcurrentQueue<string>();
var writerTask = Task.Run(() => { /* 取队列写文件 */ });
关键设计: DataChanged 在 Publish 线程触发, 文件 IO 慢 (磁盘 ms 级 vs 内存 us 级), 直接在回调里写会拖慢 Publish 导致服务端 NotificationQueue 堆积.
正确做法: 回调只 Enqueue (~us 级), 后台线程批量 WriteLine.
第 3 步: 优雅退出
Console.CancelKeyPress += (s, e) => { e.Cancel = true; cts.Cancel(); };
捕获 Ctrl+C, 设置 cancel token. 主循环退出后等 writerTask 把队列排空再 return, 否则尾部数据会丢.
注意事项
- CSV 用
,分隔, 如果 NodeId 或 ValueString 含逗号要转义 (加引号), 这里为简洁省略. - 长期运行建议按日 rotate (
data-2026-04-25.csv) + 异步 gzip, 否则单文件无限增长. - 高频 (>100Hz) 数据建议直接落 SQLite / TimescaleDB, CSV 不适合大量随机查询.