Extension 扩展功能
对应目录:demo/extension/,展示录制、回放、Benchmark 与 HTTP 服务化等扩展能力的完整用法。
示例一览
| 目录 | 重点 | 主要文件 |
|---|---|---|
01_server_extension | REST API 服务化与外部调用 | server_extension.py / client.py |
02_benchmark_and_data_collect | 多 episode 策略评估与数据采集 | benchmark.yaml / gt_policy.py |
03_replay_recorded_data | 从录制产物加载并回放轨迹 | load_trajectory.py / replay.yaml |
01:HTTP Server(01_server_extension)
启动
python
# server_extension.py
from fastsim.app import FastSim
sim = FastSim("demo_config.yaml")
sim.start() # server 扩展在后台线程自动启动 Flask 服务
配置中启用 server 扩展:
yaml
extension:
extension_cfg_dict:
my_server:
stereotype: server
enable: true
host: 0.0.0.0
port: 5000
debug: false
客户端调用
仿真运行后,外部程序通过 HTTP POST 调用控制接口:
python
# client.py
import requests
def control_object(base_url, object_name, function, parameters=None):
result = requests.post(
f"{base_url}/spawnable/control_object",
json={"object_name": object_name, "function": function, "parameters": parameters}
).json()
if result["success"]:
return result["data"]
raise Exception(result["msg"])
base_url = "http://localhost:5000"
# 获取关节名称与限位
joint_names = control_object(base_url, "test_laptop", "get_joint_names")
limits = control_object(base_url, "test_laptop", "get_joint_position_limit",
{"joint_names": [joint_names[-1]]})
# 驱动关节运动
import numpy as np
for pos in np.linspace(limits[0][0], limits[0][1], 100):
control_object(base_url, "test_laptop", "set_joint_position",
{"position": [pos], "joint_names": [joint_names[-1]]})
查看所有可用路由:
bash
fastsim show_static_api
02:Benchmark + DataCollect(02_benchmark_and_data_collect)
配置结构
benchmark.yaml 中的 extension 块:
yaml
extension:
extension_cfg_dict:
my_data_collect: # ← 必须在 benchmark 之前
stereotype: data_collect
enable: true
observer_cfgs:
- stereotype: robot_observer
name: my_robot
observe_joint_positions: true
observe_ee_pose: true
observe_gripper_state: true
- stereotype: sensor_observer
name: front_camera
observe_rgb: true
observe_depth: true
observe_intrinsic_matrix: true
observe_extrinsic_matrix: true
- stereotype: object_observer
name: target
observe_position: true
observe_rotation: true
- stereotype: task_observer
name: task
my_benchmark:
stereotype: benchmark
enable: true
data_collector_name: my_data_collect
episode_num: 100
action_frequency: 10.0
timeout_per_episode: 30.0
terminate_sim_when_finished: true
policy:
stereotype: gt_policy # ← 用户自定义策略
robot_name: my_robot
object_name: target
gt_grasp_pose_file: ./grasp_pose_ball_020.json
goals:
- stereotype: pose
name: reach_target
description: Reach the target
pose_A_source: ee
pose_A_params:
robot_name: my_robot
pose_B_source: spawnable
pose_B_params:
spawnable_name: target
position_tolerance: 0.005
自定义策略实现(gt_policy.py)
策略类需实现六个接口方法,并通过 @stereotype 注册:
python
from fastsim.annotations.config_class import configclass, field
from fastsim.annotations.stereotype import stereotype
from fastsim.extensions.benchmark.policy import Policy, PolicyConfig
from fastsim.extensions.benchmark.benchmark import BenchmarkAction, BenchmarkObservation, ControlMode
from fastsim.extensions.benchmark.action import RobotAction
@configclass
@stereotype.register_config("gt_policy")
class GT_PolicyConfig(PolicyConfig):
robot_name: str = field(default="my_robot", required=True)
gt_grasp_pose_file: str = field(default="./grasp_pose.json", required=True)
@stereotype.register_model("gt_policy")
class GT_Policy(Policy):
def reset(self) -> None:
"""每个 episode 开始时重置状态"""
self.trajectory = []
self.traj_index = 0
def warmup(self, obs: BenchmarkObservation) -> None:
"""预热阶段(如加载模型、初始化规划器)"""
pass
def needs_observation(self) -> bool:
"""返回 True 时 benchmark 才会请求新的观测"""
return len(self.trajectory) == 0 or self.traj_index >= len(self.trajectory)
def preprocess_observation(self, obs: BenchmarkObservation) -> dict:
"""将原始观测转为策略所需格式"""
robot_data = obs.get_robot_observations(self.robot_name)["robot_data"]
object_data = obs.get_object_observations(self.object_name)["object_data"]
return {"robot": robot_data, "object": object_data}
def compute_action(self, observation: dict) -> dict:
"""根据观测计算下一步动作"""
# 调用规划器生成轨迹 ...
return {"joint_names": [...], "joint_positions": [...]}
def postprocess_action(self, action: dict) -> BenchmarkAction:
"""将动作封装为 BenchmarkAction"""
benchmark_action = BenchmarkAction()
benchmark_action.add_robot_action(RobotAction(
control_mode=ControlMode.POSITION,
robot_name=self.robot_name,
joint_names=action["joint_names"],
joint_positions=action["joint_positions"],
))
return benchmark_action
运行
bash
cd demo/extension/02_benchmark_and_data_collect
python gt_policy.py # 入口在文件底部的 __main__ 块
03:轨迹回放(03_replay_recorded_data)
从录制产物加载轨迹
FastSim 支持两种轨迹加载方式:
从 HDF5 文件加载(推荐):
python
from fastsim.extensions.common.trajectory.loader.hdf5_trajectory_loader import Hdf5TrajectoryLoaderConfig
from fastsim.extensions.common.trajectory.trajectory_loader_factory import TrajectoryLoaderFactory
cfg = Hdf5TrajectoryLoaderConfig(
stereotype="hdf5",
hdf5_path="/path/to/record.h5",
spawnable_names=["franka_panda", "target_obj"], # 为空则加载全部
)
loader = TrajectoryLoaderFactory.create(cfg)
trajectory = loader.load()
从本地文件系统(LFS)加载:
python
from fastsim.extensions.common.trajectory.loader.lfs_trajectory_loader import LfsTrajectoryLoaderConfig
cfg = LfsTrajectoryLoaderConfig(
stereotype="lfs",
record_root_path="/path/to/recordings/run001",
fps=30.0,
spawnable_names=["franka_panda"],
)
loader = TrajectoryLoaderFactory.create(cfg)
trajectory = loader.load()
在仿真中回放
通过配置文件启用 replay 扩展,仿真启动后自动驱动实体按录制轨迹运动:
yaml
extension:
extension_cfg_dict:
replay:
stereotype: replay
enable: true
hdf5_path: datasets://runs/run001/record.h5
scene_cfg_path: ./simple_scene.yaml
spawnable_names: [franka_panda, target_obj]
bash
fastsim launch_simulation --config replay.yaml