Extension 扩展功能

对应目录:demo/extension/,展示录制、回放、Benchmark 与 HTTP 服务化等扩展能力的完整用法。

示例一览

目录重点主要文件
01_server_extensionREST API 服务化与外部调用server_extension.py / client.py
02_benchmark_and_data_collect多 episode 策略评估与数据采集benchmark.yaml / gt_policy.py
03_replay_recorded_data从录制产物加载并回放轨迹load_trajectory.py / replay.yaml

01:HTTP Server(01_server_extension)

启动

python
# server_extension.py
from fastsim.app import FastSim

sim = FastSim("demo_config.yaml")
sim.start()   # server 扩展在后台线程自动启动 Flask 服务

配置中启用 server 扩展:

yaml
extension:
  extension_cfg_dict:
    my_server:
      stereotype: server
      enable: true
      host: 0.0.0.0
      port: 5000
      debug: false

客户端调用

仿真运行后,外部程序通过 HTTP POST 调用控制接口:

python
# client.py
import requests

def control_object(base_url, object_name, function, parameters=None):
    result = requests.post(
        f"{base_url}/spawnable/control_object",
        json={"object_name": object_name, "function": function, "parameters": parameters}
    ).json()
    if result["success"]:
        return result["data"]
    raise Exception(result["msg"])

base_url = "http://localhost:5000"

# 获取关节名称与限位
joint_names = control_object(base_url, "test_laptop", "get_joint_names")
limits = control_object(base_url, "test_laptop", "get_joint_position_limit",
                        {"joint_names": [joint_names[-1]]})

# 驱动关节运动
import numpy as np
for pos in np.linspace(limits[0][0], limits[0][1], 100):
    control_object(base_url, "test_laptop", "set_joint_position",
                   {"position": [pos], "joint_names": [joint_names[-1]]})

查看所有可用路由:

bash
fastsim show_static_api

02:Benchmark + DataCollect(02_benchmark_and_data_collect)

配置结构

benchmark.yaml 中的 extension 块:

yaml
extension:
  extension_cfg_dict:
    my_data_collect:              # ← 必须在 benchmark 之前
      stereotype: data_collect
      enable: true
      observer_cfgs:
        - stereotype: robot_observer
          name: my_robot
          observe_joint_positions: true
          observe_ee_pose: true
          observe_gripper_state: true
        - stereotype: sensor_observer
          name: front_camera
          observe_rgb: true
          observe_depth: true
          observe_intrinsic_matrix: true
          observe_extrinsic_matrix: true
        - stereotype: object_observer
          name: target
          observe_position: true
          observe_rotation: true
        - stereotype: task_observer
          name: task

    my_benchmark:
      stereotype: benchmark
      enable: true
      data_collector_name: my_data_collect
      episode_num: 100
      action_frequency: 10.0
      timeout_per_episode: 30.0
      terminate_sim_when_finished: true
      policy:
        stereotype: gt_policy         # ← 用户自定义策略
        robot_name: my_robot
        object_name: target
        gt_grasp_pose_file: ./grasp_pose_ball_020.json
      goals:
        - stereotype: pose
          name: reach_target
          description: Reach the target
          pose_A_source: ee
          pose_A_params:
            robot_name: my_robot
          pose_B_source: spawnable
          pose_B_params:
            spawnable_name: target
          position_tolerance: 0.005

自定义策略实现(gt_policy.py)

策略类需实现六个接口方法,并通过 @stereotype 注册:

python
from fastsim.annotations.config_class import configclass, field
from fastsim.annotations.stereotype import stereotype
from fastsim.extensions.benchmark.policy import Policy, PolicyConfig
from fastsim.extensions.benchmark.benchmark import BenchmarkAction, BenchmarkObservation, ControlMode
from fastsim.extensions.benchmark.action import RobotAction

@configclass
@stereotype.register_config("gt_policy")
class GT_PolicyConfig(PolicyConfig):
    robot_name: str = field(default="my_robot", required=True)
    gt_grasp_pose_file: str = field(default="./grasp_pose.json", required=True)

@stereotype.register_model("gt_policy")
class GT_Policy(Policy):
    def reset(self) -> None:
        """每个 episode 开始时重置状态"""
        self.trajectory = []
        self.traj_index = 0

    def warmup(self, obs: BenchmarkObservation) -> None:
        """预热阶段(如加载模型、初始化规划器)"""
        pass

    def needs_observation(self) -> bool:
        """返回 True 时 benchmark 才会请求新的观测"""
        return len(self.trajectory) == 0 or self.traj_index >= len(self.trajectory)

    def preprocess_observation(self, obs: BenchmarkObservation) -> dict:
        """将原始观测转为策略所需格式"""
        robot_data = obs.get_robot_observations(self.robot_name)["robot_data"]
        object_data = obs.get_object_observations(self.object_name)["object_data"]
        return {"robot": robot_data, "object": object_data}

    def compute_action(self, observation: dict) -> dict:
        """根据观测计算下一步动作"""
        # 调用规划器生成轨迹 ...
        return {"joint_names": [...], "joint_positions": [...]}

    def postprocess_action(self, action: dict) -> BenchmarkAction:
        """将动作封装为 BenchmarkAction"""
        benchmark_action = BenchmarkAction()
        benchmark_action.add_robot_action(RobotAction(
            control_mode=ControlMode.POSITION,
            robot_name=self.robot_name,
            joint_names=action["joint_names"],
            joint_positions=action["joint_positions"],
        ))
        return benchmark_action

运行

bash
cd demo/extension/02_benchmark_and_data_collect
python gt_policy.py   # 入口在文件底部的 __main__ 块

03:轨迹回放(03_replay_recorded_data)

从录制产物加载轨迹

FastSim 支持两种轨迹加载方式:

从 HDF5 文件加载(推荐):

python
from fastsim.extensions.common.trajectory.loader.hdf5_trajectory_loader import Hdf5TrajectoryLoaderConfig
from fastsim.extensions.common.trajectory.trajectory_loader_factory import TrajectoryLoaderFactory

cfg = Hdf5TrajectoryLoaderConfig(
    stereotype="hdf5",
    hdf5_path="/path/to/record.h5",
    spawnable_names=["franka_panda", "target_obj"],   # 为空则加载全部
)
loader = TrajectoryLoaderFactory.create(cfg)
trajectory = loader.load()

从本地文件系统(LFS)加载:

python
from fastsim.extensions.common.trajectory.loader.lfs_trajectory_loader import LfsTrajectoryLoaderConfig

cfg = LfsTrajectoryLoaderConfig(
    stereotype="lfs",
    record_root_path="/path/to/recordings/run001",
    fps=30.0,
    spawnable_names=["franka_panda"],
)
loader = TrajectoryLoaderFactory.create(cfg)
trajectory = loader.load()

在仿真中回放

通过配置文件启用 replay 扩展,仿真启动后自动驱动实体按录制轨迹运动:

yaml
extension:
  extension_cfg_dict:
    replay:
      stereotype: replay
      enable: true
      hdf5_path: datasets://runs/run001/record.h5
      scene_cfg_path: ./simple_scene.yaml
      spawnable_names: [franka_panda, target_obj]
bash
fastsim launch_simulation --config replay.yaml