read_history_at_time
给一组离散时间点, 服务端按 "前一个 / 后一个 / 线性插值" 算法返回那个时刻的值. 用于"对齐时序":
前置 / 配套
- 想拉聚合统计请用 read_history_processed.
- 想拉区间所有原始点请用 read_history (Raw).
签名
def read_history_at_time(self,
node_id: str,
times: List[datetime],
use_simple_bounds: bool = True) -> List[OpcUaDataValue]: ...
| 参数 | 说明 |
|---|---|
node_id | 目标 NodeId |
times | 离散时间点列表 (UTC datetime) |
use_simple_bounds | True = 阶梯型 (用 ≤ t 的最近点); False = 线性插值 (server 必须支持) |
用法
import datetime as dt
# 拉过去 24 小时, 每整点的温度
today_midnight = dt.datetime.now(dt.timezone.utc).replace(
hour=0, minute=0, second=0, microsecond=0)
hours = [today_midnight + dt.timedelta(hours=h) for h in range(25)]
values = ua.read_history_at_time("ns=2;s=Temperature", hours)
for t, dv in zip(hours, values):
with dv:
print(f"{t.strftime('%H:%M')}: {dv.value}")
与 read_history_processed 的区别
| API | 返回 |
|---|---|
| read_history_at_time | 每个时间点对应一个内插值 (不聚合) |
| read_history_processed | 每个子区间一个聚合值 (Avg / Min / Max / ...) |
例 "拉 24 小时, 每小时一个值":
read_history_at_time: 输入 25 个整点, 返回 25 个内插点 (每个整点的值, 可能是相邻原始点的线性插值)read_history_processed: aggregate=Average, interval=1h, 返回 24 个区间的平均值
应用场景
- 时序对齐: 不同传感器采样时间不同, 要拉到同一时刻才能比较
- 数据库导出: 整点抽样
- 报表: 每天 24 行