OPC UA → MQTT 桥接
OPC UA 适合本地高质量通讯, MQTT 适合公网大规模订阅. 桥接两者是 IIoT 标准方案. 用 C# + MQTTnet 示例.
配套示例
- 桥到老设备 → OPC UA → Modbus TCP 桥接
- 多 Tag 调优 → 大规模监控
依赖
dotnet add package DarraOpcUa
dotnet add package MQTTnet
完整代码
using DarraOpcUa_Client;
using MQTTnet;
using MQTTnet.Client;
using MQTTnet.Protocol;
using System;
using System.Text.Json;
using System.Threading;
using System.Threading.Tasks;
class Program
{
static async Task Main()
{
const string opcuaEndpoint = "opc.tcp://localhost:4840";
const string mqttBroker = "mqtt.example.com";
const int mqttPort = 8883;
const string topicPrefix = "factory1/opcua";
var nodes = new[]
{
"ns=2;s=Boiler1.Temperature",
"ns=2;s=Boiler1.Pressure",
"ns=2;s=Line1.Counter",
};
// 1. MQTT Client 连接
var mqttFactory = new MqttFactory();
var mqtt = mqttFactory.CreateMqttClient();
var mqttOpts = new MqttClientOptionsBuilder()
.WithTcpServer(mqttBroker, mqttPort)
.WithCredentials("factory1", "secret")
.WithTls()
.WithCleanSession()
.Build();
await mqtt.ConnectAsync(mqttOpts);
Console.WriteLine($"[OK] MQTT connected {mqttBroker}:{mqttPort}");
// 2. OPC UA Client 连接
using var ua = new DarraOpcUa(opcuaEndpoint);
ua.Connect();
Console.WriteLine($"[OK] OPC UA connected {opcuaEndpoint}");
// 3. 订阅 + 转 MQTT (JSON payload)
using var sub = ua.CreateSubscription(1_000);
sub.DataChanged += async (s, e) =>
{
var topic = $"{topicPrefix}/{e.NodeId.Replace(";", "_").Replace("=", "_")}";
var payload = JsonSerializer.Serialize(new
{
nodeId = e.NodeId,
value = e.ValueString,
status = e.Status.ToString(),
ts = e.SourceTimestamp,
});
await mqtt.PublishAsync(new MqttApplicationMessageBuilder()
.WithTopic(topic)
.WithPayload(payload)
.WithRetainFlag()
.WithQualityOfServiceLevel(MqttQualityOfServiceLevel.AtLeastOnce)
.Build());
Console.WriteLine($" -> {topic}: {e.ValueString}");
};
sub.AddMany(nodes);
Console.WriteLine($"\nBridging {nodes.Length} tags. Ctrl+C 退出...");
await Task.Delay(Timeout.Infinite);
}
}
分段说明
第 1 步: Topic 命名规范
var topic = $"{topicPrefix}/{e.NodeId.Replace(";", "_").Replace("=", "_")}";
关键: NodeId 含
;=这些 MQTT topic 分隔符要替换. 推荐工厂级命名:<plant>/<line>/<device>/<tag>, 而不是直接拼 NodeId, 这样订阅者用 wildcardfactory1/+/Boiler1/#才好用.
第 2 步: QoS 选择
.WithQualityOfServiceLevel(MqttQualityOfServiceLevel.AtLeastOnce)
| QoS | 含义 | 用途 |
|---|---|---|
| 0 | At most once (可能丢) | 高频遥测 (温度每秒 100 个) |
| 1 | At least once (可能重) | 业务关键值 (默认选这个) |
| 2 | Exactly once (慢) | 计费 / 命令下发 |
第 3 步: Retain Flag
.WithRetainFlag()
让 Broker 保留每个 topic 最新一条, 新订阅者一连上立即拿到当前值, 不用等下一次发布. 状态量 (开关 / 模式) 必须 retain, 高频遥测可以不 retain.
注意事项
- 公网带宽贵 — 高频 Tag 桥接前先在客户端做死区过滤 (变化超过 X% 才推), 否则带宽爆炸.
- 反向 (MQTT 下发指令 →
ua.Write) 在另一个 topic 命名空间 (factory1/cmd/#), 与上行数据分开, 避免 loop. - 对接 Eclipse Sparkplug B 标准时, payload 改 Protobuf 而不是 JSON, 接入云端工具链 (Ignition / HiveMQ) 成本更低.