跳到主要内容

read_history_at_time

给一组离散时间点, 服务端按 "前一个 / 后一个 / 线性插值" 算法返回那个时刻的值. 用于"对齐时序":

前置 / 配套

签名

def read_history_at_time(self,
node_id: str,
times: List[datetime],
use_simple_bounds: bool = True) -> List[OpcUaDataValue]: ...
参数说明
node_id目标 NodeId
times离散时间点列表 (UTC datetime)
use_simple_boundsTrue = 阶梯型 (用 ≤ t 的最近点); False = 线性插值 (server 必须支持)

用法

import datetime as dt

# 拉过去 24 小时, 每整点的温度
today_midnight = dt.datetime.now(dt.timezone.utc).replace(
hour=0, minute=0, second=0, microsecond=0)
hours = [today_midnight + dt.timedelta(hours=h) for h in range(25)]

values = ua.read_history_at_time("ns=2;s=Temperature", hours)
for t, dv in zip(hours, values):
with dv:
print(f"{t.strftime('%H:%M')}: {dv.value}")

与 read_history_processed 的区别

API返回
read_history_at_time每个时间点对应一个内插值 (不聚合)
read_history_processed每个子区间一个聚合值 (Avg / Min / Max / ...)

例 "拉 24 小时, 每小时一个值":

  • read_history_at_time: 输入 25 个整点, 返回 25 个内插点 (每个整点的值, 可能是相邻原始点的线性插值)
  • read_history_processed: aggregate=Average, interval=1h, 返回 24 个区间的平均值

应用场景

  • 时序对齐: 不同传感器采样时间不同, 要拉到同一时刻才能比较
  • 数据库导出: 整点抽样
  • 报表: 每天 24 行

下一步