跳到主要内容

OPC UA → MQTT 桥接

OPC UA 适合本地高质量通讯, MQTT 适合公网大规模订阅. 桥接两者是 IIoT 标准方案. 用 C# + MQTTnet 示例.

配套示例

依赖

dotnet add package DarraOpcUa
dotnet add package MQTTnet

完整代码

using DarraOpcUa_Client;
using MQTTnet;
using MQTTnet.Client;
using MQTTnet.Protocol;
using System;
using System.Text.Json;
using System.Threading;
using System.Threading.Tasks;

class Program
{
static async Task Main()
{
const string opcuaEndpoint = "opc.tcp://localhost:4840";
const string mqttBroker = "mqtt.example.com";
const int mqttPort = 8883;
const string topicPrefix = "factory1/opcua";

var nodes = new[]
{
"ns=2;s=Boiler1.Temperature",
"ns=2;s=Boiler1.Pressure",
"ns=2;s=Line1.Counter",
};

// 1. MQTT Client 连接
var mqttFactory = new MqttFactory();
var mqtt = mqttFactory.CreateMqttClient();
var mqttOpts = new MqttClientOptionsBuilder()
.WithTcpServer(mqttBroker, mqttPort)
.WithCredentials("factory1", "secret")
.WithTls()
.WithCleanSession()
.Build();
await mqtt.ConnectAsync(mqttOpts);
Console.WriteLine($"[OK] MQTT connected {mqttBroker}:{mqttPort}");

// 2. OPC UA Client 连接
using var ua = new DarraOpcUa(opcuaEndpoint);
ua.Connect();
Console.WriteLine($"[OK] OPC UA connected {opcuaEndpoint}");

// 3. 订阅 + 转 MQTT (JSON payload)
using var sub = ua.CreateSubscription(1_000);
sub.DataChanged += async (s, e) =>
{
var topic = $"{topicPrefix}/{e.NodeId.Replace(";", "_").Replace("=", "_")}";
var payload = JsonSerializer.Serialize(new
{
nodeId = e.NodeId,
value = e.ValueString,
status = e.Status.ToString(),
ts = e.SourceTimestamp,
});

await mqtt.PublishAsync(new MqttApplicationMessageBuilder()
.WithTopic(topic)
.WithPayload(payload)
.WithRetainFlag()
.WithQualityOfServiceLevel(MqttQualityOfServiceLevel.AtLeastOnce)
.Build());

Console.WriteLine($" -> {topic}: {e.ValueString}");
};

sub.AddMany(nodes);
Console.WriteLine($"\nBridging {nodes.Length} tags. Ctrl+C 退出...");
await Task.Delay(Timeout.Infinite);
}
}

分段说明

第 1 步: Topic 命名规范

var topic = $"{topicPrefix}/{e.NodeId.Replace(";", "_").Replace("=", "_")}";

关键: NodeId 含 ; = 这些 MQTT topic 分隔符要替换. 推荐工厂级命名: <plant>/<line>/<device>/<tag>, 而不是直接拼 NodeId, 这样订阅者用 wildcard factory1/+/Boiler1/# 才好用.

第 2 步: QoS 选择

.WithQualityOfServiceLevel(MqttQualityOfServiceLevel.AtLeastOnce)
QoS含义用途
0At most once (可能丢)高频遥测 (温度每秒 100 个)
1At least once (可能重)业务关键值 (默认选这个)
2Exactly once (慢)计费 / 命令下发

第 3 步: Retain Flag

.WithRetainFlag()

让 Broker 保留每个 topic 最新一条, 新订阅者一连上立即拿到当前值, 不用等下一次发布. 状态量 (开关 / 模式) 必须 retain, 高频遥测可以不 retain.

注意事项

  • 公网带宽贵 — 高频 Tag 桥接前先在客户端做死区过滤 (变化超过 X% 才推), 否则带宽爆炸.
  • 反向 (MQTT 下发指令 → ua.Write) 在另一个 topic 命名空间 (factory1/cmd/#), 与上行数据分开, 避免 loop.
  • 对接 Eclipse Sparkplug B 标准时, payload 改 Protobuf 而不是 JSON, 接入云端工具链 (Ignition / HiveMQ) 成本更低.

相关链接