跳到主要内容

DataChange 回调

Subscription::add_node / add_many 接受一个闭包作为回调, 每次 MI 数据变化时由服务端推送触发.

pub fn add_node<F>(&self, node_id: &str, callback: F) -> Result<u32, OpcUaError>
where F: FnMut(&DataChangeEventArgs) + Send + 'static;
关联事件

DataChangeEventArgs

pub struct DataChangeEventArgs {
pub monitored_item_handle: u32,
pub node_id: Option<String>,
/// 值的字符串表示 (在 C 层同步抽取, 跨线程安全)
pub value_string: Option<String>,
/// 内置数据类型枚举名 (如 "Double", "Int32")
pub data_type_name: Option<String>,
pub status: StatusCode,
pub source_timestamp: Option<SystemTime>,
pub server_timestamp: Option<SystemTime>,
}
字段类型说明
monitored_item_handleu32触发的 MI 本地句柄
node_idOption<String>该 MI 的 NodeId
value_stringOption<String>值的字符串表示 (在 C 层同步抽取, 跨线程安全)
data_type_nameOption<String>内置数据类型枚举名 (如 Double, Int32)
statusStatusCodeDataValue 的 Status
source_timestampOption<SystemTime>数据源时间戳
server_timestampOption<SystemTime>服务端时间戳

设计权衡

历史上回调里曾暴露 DataValue 原始指针, 但因为 native 内存生命周期跨线程, 经常 use-after-free 闪退. 现在 SDK 在 Publish 线程内同步抽取所有字段为扁平 Rust 对象 (Option<String> + StatusCode + Option<SystemTime>), 跨线程使用零风险, 但代价是 不能拿原始 Variant 做高级操作.

如果需要原始 Variant (如解析复杂 ExtensionObject), 在回调里用 s.read(&node_id) 同步重读一次 — 比维护 native 生命周期可靠. 不过注意: 回调在 Publish 线程, 重新 RPC 会阻塞下一条通知, 高频场景下不推荐.


例子

简单日志

sub.add_node("ns=2;s=Temperature", |args| {
println!("{:?} = {:?} ({:?}) @ {:?}",
args.node_id, args.value_string, args.data_type_name, args.source_timestamp);
})?;

写文件

use std::sync::{Arc, Mutex};
use std::fs::OpenOptions;
use std::io::Write;

let log_file = Arc::new(Mutex::new(
OpenOptions::new().create(true).append(true).open("data.csv")?
));

let f = log_file.clone();
sub.add_node("ns=2;s=Temperature", move |args| {
if let Ok(mut g) = f.lock() {
let _ = writeln!(g, "{:?},{:?},{:?}",
args.source_timestamp,
args.node_id,
args.value_string);
}
})?;

跨线程通信 (mpsc)

use std::sync::mpsc;

let (tx, rx) = mpsc::channel::<f64>();

sub.add_node("ns=2;s=Temperature", move |args| {
if let Some(s) = &args.value_string {
if let Ok(v) = s.parse::<f64>() {
let _ = tx.send(v);
}
}
})?;

// 主线程收数据 + 更新 UI
while let Ok(v) = rx.recv() {
println!("UI temp = {}", v);
}

计数器 (FnMut + Mutex)

use std::sync::{Arc, Mutex};

let count = Arc::new(Mutex::new(0u64));
let count2 = count.clone();

sub.add_node("ns=2;s=Counter", move |_args| {
let mut g = count2.lock().unwrap();
*g += 1;
})?;

线程模型

  • 回调在 C 层 Publish 线程执行, 单线程串行 (同一订阅内部不会并发)
  • 回调里不要执行长操作 (>100 ms), 否则会阻塞 Publish 队列
  • 回调里 panic! 会被 SDK catch_unwind 兜住, 不会跨 FFI 栈帧
  • 跨订阅的回调在不同 Publish 线程, 可能并发 — 共享数据自己加锁 / 用 Atomic*

闭包要求详解

约束说明
FnMut闭包可改捕获变量, 但同一时刻只能一处持锁
Send闭包跨线程, 闭包内部不能持非 Send 引用 (如 Rc)
'staticitems map 长期持有, 闭包不能借用栈上短期数据

如果闭包需要持有 GUI 句柄等非 Send 类型, 用 mpsc::channel 把数据传出, GUI 线程消费.


与 s.events.on_any 的关系

// 方式 A: 单订阅级回调
sub.add_node("ns=2;s=X", |args| { /* ... */ })?;

// 方式 B: 全局统一通道 (所有订阅汇集)
s.events.lock().unwrap().on_any.push(Box::new(|e| {
if e.category == darra_opcua::OpcUaEventCategory::DataChange {
println!("any: {} = {} ({})", e.source, e.message, e.status_code);
}
}));

两个事件对同一数据点都会触发, 选一个用即可.

下一步