Extension 扩展

Extension 是 FastSim 的可插拔功能层。录制、回放、数据采集、HTTP 服务化、Benchmark 等能力均以扩展的形式存在——它们不侵入仿真核心,而是通过在生命周期的特定时机注册回调来介入仿真流程。

扩展如何运作

每个扩展实现 Extension.enable(sim) 方法,在初始化时向仿真实例注册所需的回调:

text
setup_all_before_loop()
  └─ ExtensionManager.enable_all_extensions()
       ├─ data_collect.enable(sim)   → 注册 observer,准备采集接口
       ├─ record.enable(sim)         → 注册 step_callback + terminate_callback
       ├─ server.enable(sim)         → 启动 Flask 服务线程
       └─ benchmark.enable(sim)      → 注册控制线程,驱动 episode 循环

扩展挂接的时机:

回调类型触发时机典型使用方
step_callback每帧 post_physics_update 之后record(逐帧写入数据)
terminate_callback仿真终止时record(后处理导出 HDF5/视频)
controller_thread独立线程,与主循环并行benchmark(episode 编排)

源码:fastsim/extensions/extension.pyfastsim/extensions/extension_manager.py

启用顺序与依赖

扩展按配置字典的键顺序依次启用,顺序错误会导致运行时报错。当前已知的依赖关系:

text
data_collect  ──────→  record
                  └──→  benchmark(若依赖观测采集)

recordenable() 时会主动获取已启用的 DataCollector 实例。如果 data_collect 尚未启用,会立即报错。

正确的配置顺序:

yaml
extension:
  extension_cfg_dict:
    data_collect:          # ← 必须在 record 之前
      enable: true
      ...
    record:
      enable: true
      ...
    server:
      enable: true
      ...

内置扩展

data_collect:观测采集

data_collect 是录制与 Benchmark 的基础设施。它按配置注册一组 observer,每个 observer 负责从场景中采集特定类型的数据(RGB 图像、深度图、关节状态、末端位姿等),并将其打包为 Observation 对象。

两种采集接口:

方法适用场景
collect_observations()在仿真主线程中直接调用
request_observations()在控制线程中安全请求,内部封装为 Command

Observation 的数据格式按 observer 类型决定:JSON(状态数据)、JPEG(RGB 图像)、NPY(深度/数组数据)。

源码:fastsim/extensions/data_collect/


record:录制与导出

record 依赖 data_collect,将每帧采集的 Observation 异步写入本地文件系统,并在仿真终止时执行后处理,生成结构化产物。

生命周期挂接:

text
每帧 step_callback:
  DataCollector.collect_observations()
  → 异步写入原始帧数据(rgb.jpeg / depth.npy / state.json)

terminate_callback:
  postprocess()
  → 合并帧数据 → 生成 record.h5 + 视频 + 预览视频

导出目录结构:

text
recordings/
└─ {group}/
   └─ {name}/
      ├─ 000001/
      │   ├─ rgb.jpeg
      │   ├─ depth.npy
      │   └─ state.json
      ├─ 000002/
      │   └─ ...
      └─ record.h5       ← 后处理合并产物

postprocess_list 配置字段决定生成哪些产物(h5、视频、深度可视化等)。

源码:fastsim/extensions/record/


replay:轨迹回放

replay 从 record.h5 中读取轨迹数据,并在仿真中驱动实体按录制轨迹运动,用于可视化验证、数据审查或生成对比视频。

核心对象:

  • TrajectoryLoader:将 HDF5 文件解析为 Trajectory 对象,按 observer 分 TrajectoryTrack
  • Replayer:消费 TrajectoryTrack,逐帧通过 Command 驱动对应实体

HDF5 文件结构:

text
record.h5
├─ joint_states/          ← 关节状态轨迹
│   ├─ timestamps
│   └─ data
├─ ee_pose/               ← 末端位姿轨迹
└─ wrist_cam/             ← 相机图像序列

可通过 spawnable_names 配置限制只回放指定实体的轨迹。

源码:fastsim/extensions/replay/


server:REST API 服务化

server 扩展启动一个 Flask HTTP 服务,将所有标注了 @apiclass 的控制器方法自动注册为 HTTP POST 路由,使得外部程序可以通过网络调用 FastSim 的控制接口。

路由规则:

text
POST /{apiclass_name}/{method_name}

例如:POST /robot_controller/move_robot_to_ee_pose

请求与响应格式:

json
// 请求体
{ "robot_name": "arm", "target_pose": [...] }

// 响应体(Result 的 JSON 序列化)
{ "success": true, "msg": "", "data": {...} }

启用后,FastSim 的完整控制能力均可通过网络调用,适合与外部策略网络、ROS 节点或 Web 前端集成。

源码:fastsim/extensions/servitize/fastsim/annotations/api_class.py


benchmark:Episode 评估

benchmark 以 episode 为单位循环运行策略,并在每个 episode 结束后评估 goal 条件是否满足,输出成功率等统计指标。它在独立控制线程中运行,不阻塞主仿真循环。

典型流程:

text
控制线程:
  for episode in range(num_episodes):
      reset_scene()                   ← 随机化或固定初始状态
      policy.run(sim)                 ← 执行策略
      result = evaluate_goals(sim)    ← 检查目标是否达成
      log_result(episode, result)     ← 记录结果
  generate_report()                   ← 输出统计报告

配置字段定义见 fastsim/extensions/benchmark/benchmark_cfg.py,详细用法参考示例:Benchmark 与数据采集

Record → Replay 完整数据链路

以下是从录制到回放的完整端到端流程:

text
① 启用 data_collect
   注册 observer(关节状态、相机图像、末端位姿等)

② 启用 record
   每帧自动采集并异步写入原始帧数据

③ 仿真运行 + 终止
   terminate_callback 触发 postprocess,合并生成 record.h5

④ 启用 replay,指向 record.h5
   TrajectoryLoader 解析轨迹
   Replayer 逐帧驱动实体按录制轨迹运动

⑤(可选)导出对比视频
   原始录制视频 vs 回放渲染视频

这条链路覆盖了机器人学习数据采集的完整流程:在仿真中执行演示 → 录制传感器数据与状态 → 导出为标准格式 → 可视化验证数据质量。