DataChange 回调
Subscription::add_node / add_many 接受一个闭包作为回调, 每次 MI 数据变化时由服务端推送触发.
pub fn add_node<F>(&self, node_id: &str, callback: F) -> Result<u32, OpcUaError>
where F: FnMut(&DataChangeEventArgs) + Send + 'static;
关联事件
- 全局统一通道见
s.events.on_any. - OPC UA 报警 / 条件事件 (协议事件) 见 事件订阅.
DataChangeEventArgs
pub struct DataChangeEventArgs {
pub monitored_item_handle: u32,
pub node_id: Option<String>,
/// 值的字符串表示 (在 C 层同步抽取, 跨线程安全)
pub value_string: Option<String>,
/// 内置数据类型枚举名 (如 "Double", "Int32")
pub data_type_name: Option<String>,
pub status: StatusCode,
pub source_timestamp: Option<SystemTime>,
pub server_timestamp: Option<SystemTime>,
}
| 字段 | 类型 | 说明 |
|---|---|---|
monitored_item_handle | u32 | 触发的 MI 本地句柄 |
node_id | Option<String> | 该 MI 的 NodeId |
value_string | Option<String> | 值的字符串表示 (在 C 层同步抽取, 跨线程安全) |
data_type_name | Option<String> | 内置数据类型枚举名 (如 Double, Int32) |
status | StatusCode | DataValue 的 Status |
source_timestamp | Option<SystemTime> | 数据源时间戳 |
server_timestamp | Option<SystemTime> | 服务端时间戳 |
设计权衡
历史上回调里曾暴露 DataValue 原始指针, 但因为 native 内存生命周期跨线程, 经常
use-after-free 闪退. 现在 SDK 在 Publish 线程内同步抽取所有字段为扁平 Rust 对象
(Option<String> + StatusCode + Option<SystemTime>), 跨线程使用零风险, 但代价是
不能拿原始 Variant 做高级操作.
如果需要原始 Variant (如解析复杂 ExtensionObject), 在回调里用 s.read(&node_id)
同步重读一次 — 比维护 native 生命周期可靠. 不过注意: 回调在 Publish 线程, 重新 RPC
会阻塞下一条通知, 高频场景下不推荐.
例子
简单日志
sub.add_node("ns=2;s=Temperature", |args| {
println!("{:?} = {:?} ({:?}) @ {:?}",
args.node_id, args.value_string, args.data_type_name, args.source_timestamp);
})?;
写文件
use std::sync::{Arc, Mutex};
use std::fs::OpenOptions;
use std::io::Write;
let log_file = Arc::new(Mutex::new(
OpenOptions::new().create(true).append(true).open("data.csv")?
));
let f = log_file.clone();
sub.add_node("ns=2;s=Temperature", move |args| {
if let Ok(mut g) = f.lock() {
let _ = writeln!(g, "{:?},{:?},{:?}",
args.source_timestamp,
args.node_id,
args.value_string);
}
})?;
跨线程通信 (mpsc)
use std::sync::mpsc;
let (tx, rx) = mpsc::channel::<f64>();
sub.add_node("ns=2;s=Temperature", move |args| {
if let Some(s) = &args.value_string {
if let Ok(v) = s.parse::<f64>() {
let _ = tx.send(v);
}
}
})?;
// 主线程收数据 + 更新 UI
while let Ok(v) = rx.recv() {
println!("UI temp = {}", v);
}
计数器 (FnMut + Mutex)
use std::sync::{Arc, Mutex};
let count = Arc::new(Mutex::new(0u64));
let count2 = count.clone();
sub.add_node("ns=2;s=Counter", move |_args| {
let mut g = count2.lock().unwrap();
*g += 1;
})?;
线程模型
- 回调在 C 层 Publish 线程执行, 单线程串行 (同一订阅内部不会并发)
- 回调里不要执行长操作 (>100 ms), 否则会阻塞 Publish 队列
- 回调里
panic!会被 SDKcatch_unwind兜住, 不会跨 FFI 栈帧 - 跨订阅的回调在不同 Publish 线程, 可能并发 — 共享数据自己加锁 / 用
Atomic*
闭包要求详解
| 约束 | 说明 |
|---|---|
FnMut | 闭包可改捕获变量, 但同一时刻只能一处持锁 |
Send | 闭包跨线程, 闭包内部不能持非 Send 引用 (如 Rc) |
'static | items map 长期持有, 闭包不能借用栈上短期数据 |
如果闭包需要持有 GUI 句柄等非 Send 类型, 用 mpsc::channel 把数据传出, GUI 线程消费.
与 s.events.on_any 的关系
// 方式 A: 单订阅级回调
sub.add_node("ns=2;s=X", |args| { /* ... */ })?;
// 方式 B: 全局统一通道 (所有订阅汇集)
s.events.lock().unwrap().on_any.push(Box::new(|e| {
if e.category == darra_opcua::OpcUaEventCategory::DataChange {
println!("any: {} = {} ({})", e.source, e.message, e.status_code);
}
}));
两个事件对同一数据点都会触发, 选一个用即可.