扩展功能配置

extension 块通过 extension_cfg_dict 声明需要启用的扩展实例。每个扩展以键名标识(如 data_collectrecord),键名同时也是运行时通过 ExtensionManager.get_enabled_extension() 检索该实例的唯一名称。

源码:fastsim/configs/extension_cfg.pyExtensionConfig)、fastsim/extensions/extension_cfg.pyExtensionBaseConfig

通用字段

所有扩展配置均包含以下基础字段:

字段类型说明
stereotypestr扩展类型标识(data_collect / record / replay / server / benchmark
enablebool是否在仿真启动时自动调用 enable(sim)

启用顺序与依赖

extension_cfg_dict 按键的声明顺序依次启用。以下依赖关系须在配置中保证顺序:

text
data_collect  ──→  record
              └──→  benchmark(若依赖观测采集)

recordenable() 时会主动查找已启用的 DataCollector 实例。若 data_collect 尚未启用,运行时会立即报错。必须将 data_collect 置于 record 之前。


data_collect:观测采集

配置类:fastsim/extensions/data_collect/data_collector_cfg.py

data_collect 按配置注册 observer,每个 observer 负责从场景中采集特定类型的数据(RGB 图像、深度图、关节状态、末端位姿等)。

字段类型说明
observer_cfgslist[ObserverConfig]要注册的 observer 列表;每个 observer 通过 stereotype 确定采集类型

内置 observer stereotype:

Stereotype采集目标典型字段
robot_observer机器人状态name(机器人实例名)、observe_joint_positionsobserve_ee_poseobserve_gripper_state
sensor_observer传感器数据name(传感器实例名)、observe_rgbobserve_depthobserve_intrinsic_matrix
object_observer物体状态name(物体实例名)、observe_positionobserve_rotationobserve_scale
task_observer任务状态name

注意:extension_cfg_dict 中的键名(如 my_data_collect)即为该扩展实例的运行时名称,recordbenchmarkdata_collector_name 字段须与之对应。


record:录制与导出

配置类:fastsim/extensions/record/recorder_cfg.py

record 依赖 data_collect,将每帧采集的 Observation 异步写入本地文件系统,并在仿真终止时执行后处理,生成结构化产物(HDF5、视频等)。

字段类型说明
backend_root_pathstr录制文件写入根目录(支持 root_key:// 路径)
record_backendstr存储后端类型,如 lfs
record_fpsint录制帧率(节流依据)
postprocess_listlist[str]终止后处理类型:hdf5 / video / png_depth / preview_video
use_recorder_stepbool是否使用 recorder 自己的步进计数(而非全局仿真步)
data_collector_namestrDataCollector 扩展实例名(默认 data_collect
record_observer_nameslist[str]限定录制的 observer 名称(为空则录制全部)
record_task_summarybool是否在终止时导出 task_summary.json

导出目录结构:

text
{backend_root_path}/
└─ {group}/{name}/
   ├─ 000001/
   │   ├─ rgb.jpeg
   │   ├─ depth.npy
   │   └─ state.json
   ├─ 000002/
   │   └─ ...
   └─ record.h5

replay:轨迹回放

配置类:fastsim/extensions/replay/replay_cfg.py

replay 从 record.h5 中读取轨迹,并在仿真中驱动实体按录制轨迹运动,用于可视化验证或生成对比视频。

字段类型说明
hdf5_pathstrrecord.h5 文件路径(支持 root_key:// 路径)
scene_cfg_pathstr回放时使用的场景配置文件路径
spawnable_nameslist[str]限定加载的 observer/实体名称(为空则加载全部)
replay_camera_cfgslist[ReplayCameraConfig]回放附加相机配置(可选)

server:REST API 服务化

配置类:fastsim/extensions/servitize/server_cfg.py

server 启动 Flask HTTP 服务,将所有标注 @apiclass 的控制器方法自动注册为 POST /{class}/{method} 路由,使外部程序可通过网络调用 FastSim 控制接口。

字段类型说明
hoststr监听地址(通常 0.0.0.0
portint监听端口(1024–65535)
debugboolFlask debug 模式开关

benchmark:Episode 评估

配置类:fastsim/extensions/benchmark/benchmark_cfg.py

benchmark 在独立控制线程中以 episode 为单位循环运行策略,并在每个 episode 结束后评估 goal 条件,输出成功率等统计指标。

字段类型说明
episode_numint总 episode 数
action_frequencyfloat策略控制频率(Hz)
timeout_per_episodefloat | null单 episode 超时时间(秒),为 null 则不超时
goalslist[GoalConfig]每个 episode 的评估目标集合
policyPolicyConfig策略配置,stereotype 指定策略类型,由用户自定义实现
data_collector_namestrDataCollector 扩展实例名
required_observer_nameslist[str]benchmark 运行所需的 observer 名称
terminate_sim_when_finishedbool全部 episodes 完成后是否终止仿真

自定义 Policy

benchmark 所使用的策略需要用户自己提供。做法是:

  1. @configclass + @stereotype.register_config 定义配置类,继承 PolicyConfig
  2. @stereotype.register_model 定义策略类,继承 Policy,实现以下接口方法
方法调用时机说明
reset()每个 episode 开始前重置策略内部状态
warmup(benchmark_observation)episode 开始后的预热阶段可用于初始化规划器等耗时操作
needs_observation() -> bool每个控制步返回 True 时 benchmark 才会请求新的观测
preprocess_observation(benchmark_observation) -> dict获取到观测后将原始 BenchmarkObservation 转为策略所需格式
compute_action(observation: dict) -> dict每个控制步根据处理后的观测计算动作
postprocess_action(action: dict) -> BenchmarkAction动作计算后将动作封装为 BenchmarkAction,指定控制模式与目标关节
python
from fastsim.annotations.config_class import configclass, field
from fastsim.annotations.stereotype import stereotype
from fastsim.extensions.benchmark.policy import Policy, PolicyConfig
from fastsim.extensions.benchmark.benchmark import BenchmarkAction, BenchmarkObservation, ControlMode
from fastsim.extensions.benchmark.action import RobotAction

@configclass
@stereotype.register_config("my_policy")
class MyPolicyConfig(PolicyConfig):
    robot_name: str = field(default="arm", required=True)
    model_path: str = field(default="", required=True)

@stereotype.register_model("my_policy")
class MyPolicy(Policy):
    def __init__(self, config: MyPolicyConfig):
        super().__init__(config)
        self.robot_name = config.robot_name

    def reset(self) -> None:
        # 每个 episode 开始时重置状态
        pass

    def warmup(self, benchmark_observation: BenchmarkObservation) -> None:
        # 预热(如初始化规划器)
        pass

    def needs_observation(self) -> bool:
        return True

    def preprocess_observation(self, benchmark_observation: BenchmarkObservation) -> dict:
        robot_obs = benchmark_observation.get_robot_observations(self.robot_name)["robot_data"]
        return {"joint_positions": robot_obs["joint_positions"]}

    def compute_action(self, observation: dict) -> dict:
        # 根据观测计算目标关节位置
        return {"joint_names": [...], "joint_positions": [...]}

    def postprocess_action(self, action: dict) -> BenchmarkAction:
        benchmark_action = BenchmarkAction()
        benchmark_action.add_robot_action(RobotAction(
            control_mode=ControlMode.POSITION,
            robot_name=self.robot_name,
            joint_names=action["joint_names"],
            joint_positions=action["joint_positions"],
        ))
        return benchmark_action

在配置中通过 policy.stereotype 引用,并传入配置类中定义的字段:

yaml
my_benchmark:
  stereotype: benchmark
  enable: true
  data_collector_name: my_data_collect   # ← 对应 data_collect 实例的键名
  episode_num: 100
  action_frequency: 10.0
  timeout_per_episode: 30.0
  terminate_sim_when_finished: true
  policy:
    stereotype: my_policy               # ← 对应 @stereotype.register_config/model("my_policy")
    robot_name: arm
    model_path: assets://checkpoints/policy.pt
  goals:
    - stereotype: pose
      name: reach_target
      description: Reach the target pose
      pose_A_source: ee
      pose_A_params:
        robot_name: arm
      pose_B_source: spawnable
      pose_B_params:
        spawnable_name: cube
      position_tolerance: 0.005

完整配置示例

yaml
extension:
  extension_cfg_dict:
    my_data_collect:                       # ← 键名即为运行时实例名,record/benchmark 的 data_collector_name 须与此一致
      stereotype: data_collect
      enable: true
      observer_cfgs:
        - stereotype: robot_observer
          name: arm
          observe_joint_positions: true
          observe_ee_pose: true
          observe_gripper_state: true
        - stereotype: sensor_observer
          name: wrist_cam
          observe_rgb: true
          observe_depth: true
          observe_intrinsic_matrix: true
          observe_extrinsic_matrix: true
        - stereotype: object_observer
          name: cube
          observe_position: true
          observe_rotation: true

    record:
      stereotype: record
      enable: true
      backend_root_path: datasets://runs/run001
      record_backend: lfs
      record_fps: 30
      postprocess_list: [hdf5, video]
      use_recorder_step: true
      data_collector_name: my_data_collect
      record_task_summary: true

    server:
      stereotype: server
      enable: false
      host: 0.0.0.0
      port: 8080
      debug: false