Events — s.events.lock().unwrap().*
Session.events: Arc<Mutex<OpcUaEvents>> 是 SDK 内部事件统一通道, 把会话生命周期 /
订阅生命周期 / 数据变化 / 通讯异常都汇集到一个对象上, 方便上层 UI 做日志 / 状态栏 / 调试.
注册回调通过 lock() 获取可变引用, 把 Box<dyn Fn(...)> push 到对应字段:
s.events.lock().unwrap().on_connected.push(Box::new(|e| {
println!("connected to {}", e.endpoint_url);
}));
与 OPC UA 协议事件区分
s.events.*— SDK 内部事件 (本节, 与协议无关)s.subscribe_events(...)— OPC UA 协议事件 (Alarms & Conditions), 见 事件订阅
完整事件清单
会话生命周期
| 字段 | EventArgs 字段 | 触发时机 |
|---|---|---|
on_connected | ConnectionEventArgs { endpoint_url, status_code, message, timestamp_utc } | 首次连接成功 |
on_disconnected | 同上 | 主动 / 被动断开 |
on_reconnecting | 同上 | 检测到断线, 启动自动重连 |
on_reconnected | 同上 | 自动重连成功 |
on_keep_alive | KeepAliveEventArgs { server_time_utc, server_status, timestamp_utc } | KeepAlive 心跳成功 |
on_state_changed | SessionStateChangedEventArgs { old_state, new_state, reason, timestamp_utc } | 任何 SessionState 变化 |
通讯异常
| 字段 | EventArgs | 触发时机 |
|---|---|---|
on_communication_error | CommunicationErrorEventArgs { status_code, category, message, timestamp_utc } | 通讯失败 (KeepAlive / Read 异常等) |
on_subscription_lost | SubscriptionLostEventArgs { subscription_id, reason, will_reconnect, timestamp_utc } | 订阅被服务端清理 |
统一通道 (Any)
on_any 接收所有内部事件 (含 DataChange / Read / Write / Browse / Call /
HistoryRead / 订阅生命周期), 用 OpcUaEventEntry 表示:
pub struct OpcUaEventEntry {
pub category: OpcUaEventCategory,
pub severity: OpcUaEventSeverity,
pub source: String,
pub message: String,
pub status_code: StatusCode,
pub timestamp_utc: SystemTime,
}
OpcUaEventCategory 枚举
pub enum OpcUaEventCategory {
Unknown,
Connected, Disconnected, Reconnecting, Reconnected, KeepAlive, StateChanged,
SubscriptionCreated, SubscriptionLost, SubscriptionRestored,
MonitoredItemAdded, MonitoredItemRemoved, SubscriptionCleared,
DataChange, ServerEvent, Alarm,
Read, Write, Browse, Call, HistoryRead,
CommunicationError, SecurityError, ProtocolError,
Info, Diagnostic,
}
OpcUaEventSeverity 枚举
pub enum OpcUaEventSeverity { Trace, Debug, Info, Warn, Error, Fatal }
用法示例
通用日志钩子 (一行打印 SDK 内所有事件)
s.events.lock().unwrap().on_any.push(Box::new(|e| {
println!("[{:?}] [{:?}] {}: {} ({})",
e.severity, e.category, e.source, e.message, e.status_code);
}));
调试期非常省事 — 不订阅具体事件, 一行 hook 看全部.
状态机回调 + 跨线程通信
use std::sync::mpsc;
let (tx, rx) = mpsc::channel::<bool>();
let tx2 = tx.clone();
s.events.lock().unwrap().on_state_changed.push(Box::new(move |e| {
let connected = matches!(e.new_state, darra_opcua::SessionState::Connected);
let _ = tx2.send(connected);
}));
s.connect()?;
while let Ok(connected) = rx.recv() {
if connected { println!("UI 上更新连接状态"); break; }
}
错误统计
use std::sync::atomic::{AtomicU32, Ordering};
use std::sync::Arc;
let count = Arc::new(AtomicU32::new(0));
let count2 = count.clone();
s.events.lock().unwrap().on_communication_error.push(Box::new(move |_e| {
count2.fetch_add(1, Ordering::SeqCst);
}));
与订阅回调的关系
订阅回调 (sub.add_node(..., callback)) 仅收到该订阅的事件; on_any 会收到 SDK 下
所有订阅的 DataChange 事件 (统一通道). 两者并存, 上层按场景选用:
- 单订阅业务逻辑 →
sub.add_node(..., cb) - 全局日志 / 录波 / 调试 →
on_any
闭包要求
所有 listener 是 Box<dyn Fn(...) + Send + Sync>:
Fn(不是FnMut): 闭包不能直接修改捕获变量, 用Mutex/Atomic*/mpsc::channel把可变性外移Send + Sync: 闭包内部不能持非线程安全的引用- 实际生命周期需要
'static(push 后被 Vec 拥有)
如需可变状态, 用 Mutex<T>:
use std::sync::{Arc, Mutex};
let log: Arc<Mutex<Vec<String>>> = Arc::new(Mutex::new(Vec::new()));
let log2 = log.clone();
s.events.lock().unwrap().on_state_changed.push(Box::new(move |e| {
let line = format!("{:?} -> {:?}", e.old_state, e.new_state);
if let Ok(mut g) = log2.lock() { g.push(line); }
}));
线程模型
回调线程
事件回调在 C 层 Publish 线程 / KeepAlive 线程 上调用, 不在调用方线程. 回调里:
- 不能执行长操作 (>100 ms), 否则阻塞 Publish 队列
- panic 会被 SDK
catch_unwind兜住, 不会跨 FFI 栈帧 - 跨线程访问需要自己加
Mutex/ 用Atomic*/ 通过mpsc::channel传出
UI 库 (egui / iced / Tauri 等) 通常要求在主线程更新, 把数据通过 mpsc::Sender 传到主线程再渲染.
完整示例
use darra_opcua::{Session, SessionState};
let mut s = Session::new("opc.tcp://localhost:4840")?;
{
let mut e = s.events.lock().unwrap();
e.on_connected.push(Box::new(|e| {
println!("CONNECTED: {} ({})", e.endpoint_url, e.status_code);
}));
e.on_disconnected.push(Box::new(|e| {
println!("DISCONNECTED: {} ({})", e.endpoint_url, e.status_code);
}));
e.on_reconnecting.push(Box::new(|e| {
println!("RECONNECTING: {}", e.endpoint_url);
}));
e.on_state_changed.push(Box::new(|e| {
println!("STATE: {:?} -> {:?}", e.old_state, e.new_state);
}));
}
s.connect()?;