扩展功能配置
extension 块通过 extension_cfg_dict 声明需要启用的扩展实例。每个扩展以键名标识(如 data_collect、record),键名同时也是运行时通过 ExtensionManager.get_enabled_extension() 检索该实例的唯一名称。
源码:fastsim/configs/extension_cfg.py(ExtensionConfig)、fastsim/extensions/extension_cfg.py(ExtensionBaseConfig)
通用字段
所有扩展配置均包含以下基础字段:
| 字段 | 类型 | 说明 |
|---|---|---|
stereotype | str | 扩展类型标识(data_collect / record / replay / server / benchmark) |
enable | bool | 是否在仿真启动时自动调用 enable(sim) |
启用顺序与依赖
extension_cfg_dict 按键的声明顺序依次启用。以下依赖关系须在配置中保证顺序:
data_collect ──→ record
└──→ benchmark(若依赖观测采集)
record 在 enable() 时会主动查找已启用的 DataCollector 实例。若 data_collect 尚未启用,运行时会立即报错。必须将 data_collect 置于 record 之前。
data_collect:观测采集
配置类:fastsim/extensions/data_collect/data_collector_cfg.py
data_collect 按配置注册 observer,每个 observer 负责从场景中采集特定类型的数据(RGB 图像、深度图、关节状态、末端位姿等)。
| 字段 | 类型 | 说明 |
|---|---|---|
observer_cfgs | list[ObserverConfig] | 要注册的 observer 列表;每个 observer 通过 stereotype 确定采集类型 |
内置 observer stereotype:
| Stereotype | 采集目标 | 典型字段 |
|---|---|---|
robot_observer | 机器人状态 | name(机器人实例名)、observe_joint_positions、observe_ee_pose、observe_gripper_state 等 |
sensor_observer | 传感器数据 | name(传感器实例名)、observe_rgb、observe_depth、observe_intrinsic_matrix 等 |
object_observer | 物体状态 | name(物体实例名)、observe_position、observe_rotation、observe_scale |
task_observer | 任务状态 | name |
注意:
extension_cfg_dict中的键名(如my_data_collect)即为该扩展实例的运行时名称,record和benchmark的data_collector_name字段须与之对应。
record:录制与导出
配置类:fastsim/extensions/record/recorder_cfg.py
record 依赖 data_collect,将每帧采集的 Observation 异步写入本地文件系统,并在仿真终止时执行后处理,生成结构化产物(HDF5、视频等)。
| 字段 | 类型 | 说明 |
|---|---|---|
backend_root_path | str | 录制文件写入根目录(支持 root_key:// 路径) |
record_backend | str | 存储后端类型,如 lfs |
record_fps | int | 录制帧率(节流依据) |
postprocess_list | list[str] | 终止后处理类型:hdf5 / video / png_depth / preview_video |
use_recorder_step | bool | 是否使用 recorder 自己的步进计数(而非全局仿真步) |
data_collector_name | str | DataCollector 扩展实例名(默认 data_collect) |
record_observer_names | list[str] | 限定录制的 observer 名称(为空则录制全部) |
record_task_summary | bool | 是否在终止时导出 task_summary.json |
导出目录结构:
{backend_root_path}/
└─ {group}/{name}/
├─ 000001/
│ ├─ rgb.jpeg
│ ├─ depth.npy
│ └─ state.json
├─ 000002/
│ └─ ...
└─ record.h5
replay:轨迹回放
配置类:fastsim/extensions/replay/replay_cfg.py
replay 从 record.h5 中读取轨迹,并在仿真中驱动实体按录制轨迹运动,用于可视化验证或生成对比视频。
| 字段 | 类型 | 说明 |
|---|---|---|
hdf5_path | str | record.h5 文件路径(支持 root_key:// 路径) |
scene_cfg_path | str | 回放时使用的场景配置文件路径 |
spawnable_names | list[str] | 限定加载的 observer/实体名称(为空则加载全部) |
replay_camera_cfgs | list[ReplayCameraConfig] | 回放附加相机配置(可选) |
server:REST API 服务化
配置类:fastsim/extensions/servitize/server_cfg.py
server 启动 Flask HTTP 服务,将所有标注 @apiclass 的控制器方法自动注册为 POST /{class}/{method} 路由,使外部程序可通过网络调用 FastSim 控制接口。
| 字段 | 类型 | 说明 |
|---|---|---|
host | str | 监听地址(通常 0.0.0.0) |
port | int | 监听端口(1024–65535) |
debug | bool | Flask debug 模式开关 |
benchmark:Episode 评估
配置类:fastsim/extensions/benchmark/benchmark_cfg.py
benchmark 在独立控制线程中以 episode 为单位循环运行策略,并在每个 episode 结束后评估 goal 条件,输出成功率等统计指标。
| 字段 | 类型 | 说明 |
|---|---|---|
episode_num | int | 总 episode 数 |
action_frequency | float | 策略控制频率(Hz) |
timeout_per_episode | float | null | 单 episode 超时时间(秒),为 null 则不超时 |
goals | list[GoalConfig] | 每个 episode 的评估目标集合 |
policy | PolicyConfig | 策略配置,stereotype 指定策略类型,由用户自定义实现 |
data_collector_name | str | DataCollector 扩展实例名 |
required_observer_names | list[str] | benchmark 运行所需的 observer 名称 |
terminate_sim_when_finished | bool | 全部 episodes 完成后是否终止仿真 |
自定义 Policy
benchmark 所使用的策略需要用户自己提供。做法是:
- 用
@configclass+@stereotype.register_config定义配置类,继承PolicyConfig - 用
@stereotype.register_model定义策略类,继承Policy,实现以下接口方法
| 方法 | 调用时机 | 说明 |
|---|---|---|
reset() | 每个 episode 开始前 | 重置策略内部状态 |
warmup(benchmark_observation) | episode 开始后的预热阶段 | 可用于初始化规划器等耗时操作 |
needs_observation() -> bool | 每个控制步 | 返回 True 时 benchmark 才会请求新的观测 |
preprocess_observation(benchmark_observation) -> dict | 获取到观测后 | 将原始 BenchmarkObservation 转为策略所需格式 |
compute_action(observation: dict) -> dict | 每个控制步 | 根据处理后的观测计算动作 |
postprocess_action(action: dict) -> BenchmarkAction | 动作计算后 | 将动作封装为 BenchmarkAction,指定控制模式与目标关节 |
from fastsim.annotations.config_class import configclass, field
from fastsim.annotations.stereotype import stereotype
from fastsim.extensions.benchmark.policy import Policy, PolicyConfig
from fastsim.extensions.benchmark.benchmark import BenchmarkAction, BenchmarkObservation, ControlMode
from fastsim.extensions.benchmark.action import RobotAction
@configclass
@stereotype.register_config("my_policy")
class MyPolicyConfig(PolicyConfig):
robot_name: str = field(default="arm", required=True)
model_path: str = field(default="", required=True)
@stereotype.register_model("my_policy")
class MyPolicy(Policy):
def __init__(self, config: MyPolicyConfig):
super().__init__(config)
self.robot_name = config.robot_name
def reset(self) -> None:
# 每个 episode 开始时重置状态
pass
def warmup(self, benchmark_observation: BenchmarkObservation) -> None:
# 预热(如初始化规划器)
pass
def needs_observation(self) -> bool:
return True
def preprocess_observation(self, benchmark_observation: BenchmarkObservation) -> dict:
robot_obs = benchmark_observation.get_robot_observations(self.robot_name)["robot_data"]
return {"joint_positions": robot_obs["joint_positions"]}
def compute_action(self, observation: dict) -> dict:
# 根据观测计算目标关节位置
return {"joint_names": [...], "joint_positions": [...]}
def postprocess_action(self, action: dict) -> BenchmarkAction:
benchmark_action = BenchmarkAction()
benchmark_action.add_robot_action(RobotAction(
control_mode=ControlMode.POSITION,
robot_name=self.robot_name,
joint_names=action["joint_names"],
joint_positions=action["joint_positions"],
))
return benchmark_action
在配置中通过 policy.stereotype 引用,并传入配置类中定义的字段:
my_benchmark:
stereotype: benchmark
enable: true
data_collector_name: my_data_collect # ← 对应 data_collect 实例的键名
episode_num: 100
action_frequency: 10.0
timeout_per_episode: 30.0
terminate_sim_when_finished: true
policy:
stereotype: my_policy # ← 对应 @stereotype.register_config/model("my_policy")
robot_name: arm
model_path: assets://checkpoints/policy.pt
goals:
- stereotype: pose
name: reach_target
description: Reach the target pose
pose_A_source: ee
pose_A_params:
robot_name: arm
pose_B_source: spawnable
pose_B_params:
spawnable_name: cube
position_tolerance: 0.005
完整配置示例
extension:
extension_cfg_dict:
my_data_collect: # ← 键名即为运行时实例名,record/benchmark 的 data_collector_name 须与此一致
stereotype: data_collect
enable: true
observer_cfgs:
- stereotype: robot_observer
name: arm
observe_joint_positions: true
observe_ee_pose: true
observe_gripper_state: true
- stereotype: sensor_observer
name: wrist_cam
observe_rgb: true
observe_depth: true
observe_intrinsic_matrix: true
observe_extrinsic_matrix: true
- stereotype: object_observer
name: cube
observe_position: true
observe_rotation: true
record:
stereotype: record
enable: true
backend_root_path: datasets://runs/run001
record_backend: lfs
record_fps: 30
postprocess_list: [hdf5, video]
use_recorder_step: true
data_collector_name: my_data_collect
record_task_summary: true
server:
stereotype: server
enable: false
host: 0.0.0.0
port: 8080
debug: false