跳到主要内容

Events — s.events.lock().unwrap().*

Session.events: Arc<Mutex<OpcUaEvents>> 是 SDK 内部事件统一通道, 把会话生命周期 / 订阅生命周期 / 数据变化 / 通讯异常都汇集到一个对象上, 方便上层 UI 做日志 / 状态栏 / 调试.

注册回调通过 lock() 获取可变引用, 把 Box<dyn Fn(...)> push 到对应字段:

s.events.lock().unwrap().on_connected.push(Box::new(|e| {
println!("connected to {}", e.endpoint_url);
}));
与 OPC UA 协议事件区分
  • s.events.* — SDK 内部事件 (本节, 与协议无关)
  • s.subscribe_events(...) — OPC UA 协议事件 (Alarms & Conditions), 见 事件订阅

完整事件清单

会话生命周期

字段EventArgs 字段触发时机
on_connectedConnectionEventArgs { endpoint_url, status_code, message, timestamp_utc }首次连接成功
on_disconnected同上主动 / 被动断开
on_reconnecting同上检测到断线, 启动自动重连
on_reconnected同上自动重连成功
on_keep_aliveKeepAliveEventArgs { server_time_utc, server_status, timestamp_utc }KeepAlive 心跳成功
on_state_changedSessionStateChangedEventArgs { old_state, new_state, reason, timestamp_utc }任何 SessionState 变化

通讯异常

字段EventArgs触发时机
on_communication_errorCommunicationErrorEventArgs { status_code, category, message, timestamp_utc }通讯失败 (KeepAlive / Read 异常等)
on_subscription_lostSubscriptionLostEventArgs { subscription_id, reason, will_reconnect, timestamp_utc }订阅被服务端清理

统一通道 (Any)

on_any 接收所有内部事件 (含 DataChange / Read / Write / Browse / Call / HistoryRead / 订阅生命周期), 用 OpcUaEventEntry 表示:

pub struct OpcUaEventEntry {
pub category: OpcUaEventCategory,
pub severity: OpcUaEventSeverity,
pub source: String,
pub message: String,
pub status_code: StatusCode,
pub timestamp_utc: SystemTime,
}

OpcUaEventCategory 枚举

pub enum OpcUaEventCategory {
Unknown,
Connected, Disconnected, Reconnecting, Reconnected, KeepAlive, StateChanged,
SubscriptionCreated, SubscriptionLost, SubscriptionRestored,
MonitoredItemAdded, MonitoredItemRemoved, SubscriptionCleared,
DataChange, ServerEvent, Alarm,
Read, Write, Browse, Call, HistoryRead,
CommunicationError, SecurityError, ProtocolError,
Info, Diagnostic,
}

OpcUaEventSeverity 枚举

pub enum OpcUaEventSeverity { Trace, Debug, Info, Warn, Error, Fatal }

用法示例

通用日志钩子 (一行打印 SDK 内所有事件)

s.events.lock().unwrap().on_any.push(Box::new(|e| {
println!("[{:?}] [{:?}] {}: {} ({})",
e.severity, e.category, e.source, e.message, e.status_code);
}));

调试期非常省事 — 不订阅具体事件, 一行 hook 看全部.

状态机回调 + 跨线程通信

use std::sync::mpsc;

let (tx, rx) = mpsc::channel::<bool>();
let tx2 = tx.clone();

s.events.lock().unwrap().on_state_changed.push(Box::new(move |e| {
let connected = matches!(e.new_state, darra_opcua::SessionState::Connected);
let _ = tx2.send(connected);
}));

s.connect()?;
while let Ok(connected) = rx.recv() {
if connected { println!("UI 上更新连接状态"); break; }
}

错误统计

use std::sync::atomic::{AtomicU32, Ordering};
use std::sync::Arc;

let count = Arc::new(AtomicU32::new(0));
let count2 = count.clone();
s.events.lock().unwrap().on_communication_error.push(Box::new(move |_e| {
count2.fetch_add(1, Ordering::SeqCst);
}));

与订阅回调的关系

订阅回调 (sub.add_node(..., callback)) 仅收到该订阅的事件; on_any 会收到 SDK 下 所有订阅的 DataChange 事件 (统一通道). 两者并存, 上层按场景选用:

  • 单订阅业务逻辑 → sub.add_node(..., cb)
  • 全局日志 / 录波 / 调试 → on_any

闭包要求

所有 listener 是 Box<dyn Fn(...) + Send + Sync>:

  • Fn (不是 FnMut): 闭包不能直接修改捕获变量, 用 Mutex / Atomic* / mpsc::channel 把可变性外移
  • Send + Sync: 闭包内部不能持非线程安全的引用
  • 实际生命周期需要 'static (push 后被 Vec 拥有)

如需可变状态, 用 Mutex<T>:

use std::sync::{Arc, Mutex};

let log: Arc<Mutex<Vec<String>>> = Arc::new(Mutex::new(Vec::new()));
let log2 = log.clone();
s.events.lock().unwrap().on_state_changed.push(Box::new(move |e| {
let line = format!("{:?} -> {:?}", e.old_state, e.new_state);
if let Ok(mut g) = log2.lock() { g.push(line); }
}));

线程模型

回调线程

事件回调在 C 层 Publish 线程 / KeepAlive 线程 上调用, 不在调用方线程. 回调里:

  • 不能执行长操作 (>100 ms), 否则阻塞 Publish 队列
  • panic 会被 SDK catch_unwind 兜住, 不会跨 FFI 栈帧
  • 跨线程访问需要自己加 Mutex / 用 Atomic* / 通过 mpsc::channel 传出

UI 库 (egui / iced / Tauri 等) 通常要求在主线程更新, 把数据通过 mpsc::Sender 传到主线程再渲染.


完整示例

use darra_opcua::{Session, SessionState};

let mut s = Session::new("opc.tcp://localhost:4840")?;
{
let mut e = s.events.lock().unwrap();
e.on_connected.push(Box::new(|e| {
println!("CONNECTED: {} ({})", e.endpoint_url, e.status_code);
}));
e.on_disconnected.push(Box::new(|e| {
println!("DISCONNECTED: {} ({})", e.endpoint_url, e.status_code);
}));
e.on_reconnecting.push(Box::new(|e| {
println!("RECONNECTING: {}", e.endpoint_url);
}));
e.on_state_changed.push(Box::new(|e| {
println!("STATE: {:?} -> {:?}", e.old_state, e.new_state);
}));
}
s.connect()?;

下一步