添加 MonitoredItem
前置阅读
- 先创建 Subscription, 见 创建 Subscription.
- DataChange 字段请看 DataChange 回调.
- 改采样参数请看 Modify.
sub.add_node
pub fn add_node<F>(&self, node_id: &str, callback: F) -> Result<u32, OpcUaError>
where F: FnMut(&DataChangeEventArgs) + Send + 'static;
监控 Value 属性, 跟订阅 publishing 间隔采样.
let h: u32 = sub.add_node("ns=2;s=Temperature", |args| {
println!("{:?} = {:?}", args.node_id, args.value_string);
})?;
返回 u32 MonitoredItem handle (本地句柄, 用于后续 modify_item / remove).
sub.add_node_with
完整参数版本.
pub fn add_node_with<F>(
&self,
node_id: &str,
attr: AttributeId,
sampling_ms: f64,
callback: F,
) -> Result<u32, OpcUaError>
where F: FnMut(&DataChangeEventArgs) + Send + 'static;
| 参数 | 默认 | 说明 |
|---|---|---|
node_id | — | 监控的 NodeId |
attr | AttributeId::Value | 监控的 Attribute |
sampling_ms | -1.0 (跟订阅) | 服务端采样间隔. 0.0 = 尽快 |
use darra_opcua::AttributeId;
let h = sub.add_node_with(
"ns=2;s=Pressure",
AttributeId::Value,
100.0, // 100 ms 采样
|args| println!("press = {:?}", args.value_string),
)?;
错误: BAD_NODE_ID_UNKNOWN 等通过 Result::Err 返回.
sub.add_many
批量添加多个 MI, 共享一个回调:
pub fn add_many<F>(
&self,
node_ids: &[&str],
attr: AttributeId,
callback: F,
) -> Result<Vec<(u32, StatusCode)>, OpcUaError>
where F: FnMut(&DataChangeEventArgs) + Send + 'static;
返回 Vec<(handle, status)>, 顺序与 node_ids 一致, 失败项 handle == 0.
let nodes: Vec<String> = (1..=100).map(|i| format!("ns=2;s=T{}", i)).collect();
let nodes_ref: Vec<&str> = nodes.iter().map(String::as_str).collect();
let results = sub.add_many(&nodes_ref, AttributeId::Value, |args| {
println!("{:?} = {:?}", args.node_id, args.value_string);
})?;
for (i, (h, st)) in results.iter().enumerate() {
if st.is_good() {
println!(" {} -> handle={}", nodes[i], h);
} else {
println!(" {} FAILED: {}", nodes[i], st);
}
}
设计权衡
C 端 AddNodes API 接受单个 cb + 无 ctx, trampoline 没法反查每个 MI 的 NodeId. 因此实现走循环
add_node_with 路径 (N 次 RPC), 每个 MI 独立 ctx, 共享用户回调用 Arc<Mutex<F>> 包装.
性能略差但语义/内存可靠.
DataChangeEventArgs
pub struct DataChangeEventArgs {
pub monitored_item_handle: u32,
pub node_id: Option<String>,
/// 值的字符串表示 (回调线程预抽取). 若值为 null/解码失败, 此字段为 None.
pub value_string: Option<String>,
/// 内置数据类型枚举名 (如 "Double" / "Int32"). None = Variant null.
pub data_type_name: Option<String>,
pub status: StatusCode,
pub source_timestamp: Option<SystemTime>,
pub server_timestamp: Option<SystemTime>,
}
安全设计
value_string / data_type_name 已在 C 层 Publish 线程同步抽取, 跨线程使用安全.
不再暴露原始 DataValue 指针, 避免 use-after-free 闪退.
详见 DataChange 回调.
移除
sub.remove(h); // 单个
sub.remove_by_node_id("ns=2;s=Temperature"); // 按 NodeId 移除全部对应
不同 Attribute 的监控
除了默认监控 Value, 也能监控 StatusCode / Description 等:
sub.add_node_with("ns=2;s=T1", AttributeId::Value, -1.0, |a| {
println!("value = {:?}", a.value_string);
})?;
sub.add_node_with("ns=2;s=T1", AttributeId::Description, -1.0, |a| {
println!("desc = {:?}", a.value_string);
})?;
闭包要求
| 约束 | 原因 |
|---|---|
FnMut | 闭包可以修改捕获 (用 move + Mutex) |
Send | C Publish 线程调用回调, 不在创建线程 |
'static | items map 长期持有 |
闭包内部如需共享可变状态, 用 Arc<Mutex<T>>:
use std::sync::{Arc, Mutex};
let counter = Arc::new(Mutex::new(0u64));
let counter2 = counter.clone();
sub.add_node("ns=2;s=X", move |_args| {
let mut g = counter2.lock().unwrap();
*g += 1;
})?;