跳到主要内容

添加 MonitoredItem

前置阅读

sub.add_node

pub fn add_node<F>(&self, node_id: &str, callback: F) -> Result<u32, OpcUaError>
where F: FnMut(&DataChangeEventArgs) + Send + 'static;

监控 Value 属性, 跟订阅 publishing 间隔采样.

let h: u32 = sub.add_node("ns=2;s=Temperature", |args| {
println!("{:?} = {:?}", args.node_id, args.value_string);
})?;

返回 u32 MonitoredItem handle (本地句柄, 用于后续 modify_item / remove).

sub.add_node_with

完整参数版本.

pub fn add_node_with<F>(
&self,
node_id: &str,
attr: AttributeId,
sampling_ms: f64,
callback: F,
) -> Result<u32, OpcUaError>
where F: FnMut(&DataChangeEventArgs) + Send + 'static;
参数默认说明
node_id监控的 NodeId
attrAttributeId::Value监控的 Attribute
sampling_ms-1.0 (跟订阅)服务端采样间隔. 0.0 = 尽快
use darra_opcua::AttributeId;

let h = sub.add_node_with(
"ns=2;s=Pressure",
AttributeId::Value,
100.0, // 100 ms 采样
|args| println!("press = {:?}", args.value_string),
)?;

错误: BAD_NODE_ID_UNKNOWN 等通过 Result::Err 返回.


sub.add_many

批量添加多个 MI, 共享一个回调:

pub fn add_many<F>(
&self,
node_ids: &[&str],
attr: AttributeId,
callback: F,
) -> Result<Vec<(u32, StatusCode)>, OpcUaError>
where F: FnMut(&DataChangeEventArgs) + Send + 'static;

返回 Vec<(handle, status)>, 顺序与 node_ids 一致, 失败项 handle == 0.

let nodes: Vec<String> = (1..=100).map(|i| format!("ns=2;s=T{}", i)).collect();
let nodes_ref: Vec<&str> = nodes.iter().map(String::as_str).collect();

let results = sub.add_many(&nodes_ref, AttributeId::Value, |args| {
println!("{:?} = {:?}", args.node_id, args.value_string);
})?;

for (i, (h, st)) in results.iter().enumerate() {
if st.is_good() {
println!(" {} -> handle={}", nodes[i], h);
} else {
println!(" {} FAILED: {}", nodes[i], st);
}
}
设计权衡

C 端 AddNodes API 接受单个 cb + 无 ctx, trampoline 没法反查每个 MI 的 NodeId. 因此实现走循环 add_node_with 路径 (N 次 RPC), 每个 MI 独立 ctx, 共享用户回调用 Arc<Mutex<F>> 包装. 性能略差但语义/内存可靠.


DataChangeEventArgs

pub struct DataChangeEventArgs {
pub monitored_item_handle: u32,
pub node_id: Option<String>,
/// 值的字符串表示 (回调线程预抽取). 若值为 null/解码失败, 此字段为 None.
pub value_string: Option<String>,
/// 内置数据类型枚举名 (如 "Double" / "Int32"). None = Variant null.
pub data_type_name: Option<String>,
pub status: StatusCode,
pub source_timestamp: Option<SystemTime>,
pub server_timestamp: Option<SystemTime>,
}
安全设计

value_string / data_type_name 已在 C 层 Publish 线程同步抽取, 跨线程使用安全. 不再暴露原始 DataValue 指针, 避免 use-after-free 闪退.

详见 DataChange 回调.


移除

sub.remove(h);                          // 单个
sub.remove_by_node_id("ns=2;s=Temperature"); // 按 NodeId 移除全部对应

不同 Attribute 的监控

除了默认监控 Value, 也能监控 StatusCode / Description 等:

sub.add_node_with("ns=2;s=T1", AttributeId::Value, -1.0, |a| {
println!("value = {:?}", a.value_string);
})?;
sub.add_node_with("ns=2;s=T1", AttributeId::Description, -1.0, |a| {
println!("desc = {:?}", a.value_string);
})?;

闭包要求

约束原因
FnMut闭包可以修改捕获 (用 move + Mutex)
SendC Publish 线程调用回调, 不在创建线程
'staticitems map 长期持有

闭包内部如需共享可变状态, 用 Arc<Mutex<T>>:

use std::sync::{Arc, Mutex};
let counter = Arc::new(Mutex::new(0u64));
let counter2 = counter.clone();
sub.add_node("ns=2;s=X", move |_args| {
let mut g = counter2.lock().unwrap();
*g += 1;
})?;

下一步