Extension 扩展
Extension 是 FastSim 的可插拔功能层。录制、回放、数据采集、HTTP 服务化、Benchmark 等能力均以扩展的形式存在——它们不侵入仿真核心,而是通过在生命周期的特定时机注册回调来介入仿真流程。
扩展如何运作
每个扩展实现 Extension.enable(sim) 方法,在初始化时向仿真实例注册所需的回调:
setup_all_before_loop()
└─ ExtensionManager.enable_all_extensions()
├─ data_collect.enable(sim) → 注册 observer,准备采集接口
├─ record.enable(sim) → 注册 step_callback + terminate_callback
├─ server.enable(sim) → 启动 Flask 服务线程
└─ benchmark.enable(sim) → 注册控制线程,驱动 episode 循环
扩展挂接的时机:
| 回调类型 | 触发时机 | 典型使用方 |
|---|---|---|
step_callback | 每帧 post_physics_update 之后 | record(逐帧写入数据) |
terminate_callback | 仿真终止时 | record(后处理导出 HDF5/视频) |
controller_thread | 独立线程,与主循环并行 | benchmark(episode 编排) |
源码:fastsim/extensions/extension.py、fastsim/extensions/extension_manager.py
启用顺序与依赖
扩展按配置字典的键顺序依次启用,顺序错误会导致运行时报错。当前已知的依赖关系:
data_collect ──────→ record
└──→ benchmark(若依赖观测采集)
record 在 enable() 时会主动获取已启用的 DataCollector 实例。如果 data_collect 尚未启用,会立即报错。
正确的配置顺序:
extension:
extension_cfg_dict:
data_collect: # ← 必须在 record 之前
enable: true
...
record:
enable: true
...
server:
enable: true
...
内置扩展
data_collect:观测采集
data_collect 是录制与 Benchmark 的基础设施。它按配置注册一组 observer,每个 observer 负责从场景中采集特定类型的数据(RGB 图像、深度图、关节状态、末端位姿等),并将其打包为 Observation 对象。
两种采集接口:
| 方法 | 适用场景 |
|---|---|
collect_observations() | 在仿真主线程中直接调用 |
request_observations() | 在控制线程中安全请求,内部封装为 Command |
Observation 的数据格式按 observer 类型决定:JSON(状态数据)、JPEG(RGB 图像)、NPY(深度/数组数据)。
源码:fastsim/extensions/data_collect/
record:录制与导出
record 依赖 data_collect,将每帧采集的 Observation 异步写入本地文件系统,并在仿真终止时执行后处理,生成结构化产物。
生命周期挂接:
每帧 step_callback:
DataCollector.collect_observations()
→ 异步写入原始帧数据(rgb.jpeg / depth.npy / state.json)
terminate_callback:
postprocess()
→ 合并帧数据 → 生成 record.h5 + 视频 + 预览视频
导出目录结构:
recordings/
└─ {group}/
└─ {name}/
├─ 000001/
│ ├─ rgb.jpeg
│ ├─ depth.npy
│ └─ state.json
├─ 000002/
│ └─ ...
└─ record.h5 ← 后处理合并产物
postprocess_list 配置字段决定生成哪些产物(h5、视频、深度可视化等)。
源码:fastsim/extensions/record/
replay:轨迹回放
replay 从 record.h5 中读取轨迹数据,并在仿真中驱动实体按录制轨迹运动,用于可视化验证、数据审查或生成对比视频。
核心对象:
TrajectoryLoader:将 HDF5 文件解析为Trajectory对象,按 observer 分TrajectoryTrackReplayer:消费 TrajectoryTrack,逐帧通过 Command 驱动对应实体
HDF5 文件结构:
record.h5
├─ joint_states/ ← 关节状态轨迹
│ ├─ timestamps
│ └─ data
├─ ee_pose/ ← 末端位姿轨迹
└─ wrist_cam/ ← 相机图像序列
可通过 spawnable_names 配置限制只回放指定实体的轨迹。
源码:fastsim/extensions/replay/
server:REST API 服务化
server 扩展启动一个 Flask HTTP 服务,将所有标注了 @apiclass 的控制器方法自动注册为 HTTP POST 路由,使得外部程序可以通过网络调用 FastSim 的控制接口。
路由规则:
POST /{apiclass_name}/{method_name}
例如:POST /robot_controller/move_robot_to_ee_pose
请求与响应格式:
// 请求体
{ "robot_name": "arm", "target_pose": [...] }
// 响应体(Result 的 JSON 序列化)
{ "success": true, "msg": "", "data": {...} }
启用后,FastSim 的完整控制能力均可通过网络调用,适合与外部策略网络、ROS 节点或 Web 前端集成。
源码:fastsim/extensions/servitize/、fastsim/annotations/api_class.py
benchmark:Episode 评估
benchmark 以 episode 为单位循环运行策略,并在每个 episode 结束后评估 goal 条件是否满足,输出成功率等统计指标。它在独立控制线程中运行,不阻塞主仿真循环。
典型流程:
控制线程:
for episode in range(num_episodes):
reset_scene() ← 随机化或固定初始状态
policy.run(sim) ← 执行策略
result = evaluate_goals(sim) ← 检查目标是否达成
log_result(episode, result) ← 记录结果
generate_report() ← 输出统计报告
配置字段定义见 fastsim/extensions/benchmark/benchmark_cfg.py,详细用法参考示例:Benchmark 与数据采集。
Record → Replay 完整数据链路
以下是从录制到回放的完整端到端流程:
① 启用 data_collect
注册 observer(关节状态、相机图像、末端位姿等)
② 启用 record
每帧自动采集并异步写入原始帧数据
③ 仿真运行 + 终止
terminate_callback 触发 postprocess,合并生成 record.h5
④ 启用 replay,指向 record.h5
TrajectoryLoader 解析轨迹
Replayer 逐帧驱动实体按录制轨迹运动
⑤(可选)导出对比视频
原始录制视频 vs 回放渲染视频
这条链路覆盖了机器人学习数据采集的完整流程:在仿真中执行演示 → 录制传感器数据与状态 → 导出为标准格式 → 可视化验证数据质量。