Controller 控制器

FastSim 的控制层解决一个核心问题:物理仿真必须在主线程单步执行,但控制逻辑(策略、规划、人机交互)往往需要在独立线程中异步运行。 Command 队列是连接两者的桥梁——控制线程将"要做什么"封装成 Command 投入队列,主线程在每帧的固定时机取出并执行。

Command:控制的基本单元

Command 是控制层的最小执行单元,本质上是对"一次仿真主线程操作"的封装。

SimpleCommand

一次性执行一个函数,适合单步控制(设置关节目标、执行抓取等):

python
from fastsim.controllers.commands.simple_command import SimpleCommand

cmd = SimpleCommand(
    func=lambda: robot.set_joint_targets(target_positions),
    sync=True,       # 阻塞直到本帧执行完毕并返回 Result
    priority=10      # 数值越大越先执行
)
result = sim.controller_manager.add_command(cmd)
result.unwrap()      # success=False 时抛出异常

MultiCommand

将一组 SimpleCommand 按帧分批执行,适合轨迹跟踪等多步控制:

python
from fastsim.controllers.commands.multi_command import MultiCommand

# 每帧执行一个 step_commands,直到所有步骤完成
multi_cmd = MultiCommand(
    step_commands=[cmd_frame_1, cmd_frame_2, cmd_frame_3, ...],
    sync=True
)
sim.controller_manager.add_command(multi_cmd)

源码:fastsim/controllers/commands/

调度机制:队列、优先级与 Result

执行时机

每帧 FastSim.step() 的第一件事就是清空并执行命令队列:

text
ControllerManager.run_all_commands()    ← 执行本帧所有 Command
SceneManager.pre_physics_update()       ← 将 Command 设置的目标写入仿真器
UniSim.step()                           ← 物理推进
SceneManager.post_physics_update()      ← 读取最新状态

这意味着:Command 中的操作与物理推进在同一帧内完成,设置目标后当帧即生效。

优先级

同一帧内的多个 Command 按 priority 降序执行。默认 priority=0,需要抢先执行的 Command 可设更高值:

python
# 紧急停止优先于普通轨迹命令
emergency_stop = SimpleCommand(func=stop_all_joints, priority=100)
trajectory_cmd  = SimpleCommand(func=next_waypoint,  priority=0)

Result:统一返回结构

所有 Command 返回 Result 对象(源码:fastsim/controllers/result.py):

字段类型说明
successbool执行是否成功
msgstr错误或状态信息
dataAny返回值(如 IK 结果、关节状态等)
python
result = sim.controller_manager.add_command(cmd)

# 方式一:手动判断
if not result.success:
    print(result.msg)

# 方式二:失败时直接抛异常(推荐用于调试和强约束流程)
result.unwrap()

# 方式三:取返回值
joint_positions = result.data

内置控制器 API

FastSim 提供四类内置控制器,均以 @apiclass 装饰,可通过 Python 直接调用,也可在启用 server 扩展后通过 HTTP 调用。

SpawnableController:通用实体调用

对场景中任意实体(object / robot / sensor)的方法进行安全的反射调用,并自动封装为 Command 在主线程执行。适合需要动态指定操作目标和方法的场景。

python
# 调用机器人的任意方法(参数以字典传入)
result = sim.spawnable_controller.control_robot(
    robot_name="arm",
    function="set_joint_targets",
    parameters={"positions": [0.0, -1.57, 0.0, -1.57, 0.0, 0.0]},
    sync=True
)

同理有 control_object()control_sensor()。源码:fastsim/controllers/spawnable_controller.py


RobotController:轨迹执行与轻量 IK

提供不依赖运动规划器的轨迹跟踪和基础 IK,适合对速度要求高、且不需要碰撞检测的场景。

方法说明
move_robot_along_trajectory(robot_name, trajectory, joint_names, step_size, sync)按帧跟踪关节空间轨迹
solve_ik_no_collision_check(robot_name, base_frame_ee_pose)返回满足末端位姿的关节角(无碰撞检测)
move_robot_to_ee_pose_no_collision_check(robot_name, base_frame_ee_pose, sync)IK + 直接执行(无碰撞检测)

如需碰撞感知的运动规划,请使用 MotionPlanController

源码:fastsim/controllers/robot_controller.py


MotionPlanController:运动规划器接口

调用挂载在机器人上的规划器(如 Curobo),提供完整的运动规划能力。使用前需在机器人配置中启用 use_planner: true

方法说明
solve_ik(robot_name, target_pose, ...)逆运动学(碰撞感知)
solve_fk(robot_name, joint_positions)正运动学
gen_motion_plan(robot_name, target_pose, ...)生成完整运动轨迹
move_robot_to_ee_pose(robot_name, target_pose, sync)规划 + 执行一步完成
attach_objects(robot_name, object_names)将物体附着到末端执行器(影响碰撞计算)
detach_objects(robot_name, object_names)解除附着

源码:fastsim/controllers/motion_plan_controller.py


ActionController:任务动作执行

将配置中定义的 ActionConfig(字典形式)转为 action 实例并执行,可选检查 action 内置的 goal 是否满足。通常由 TaskManager 内部调用,也可单独使用。

python
action_cfg = {
    "robot_name": "arm",
    "stereotype": "pick",
    "extra_parameters": {"object": "cube"}
}
result = sim.action_controller.execute_action(action_cfg, check_goal=True)

源码:fastsim/controllers/action_controller.py

线程模型

FastSim 支持将控制逻辑放在独立的控制线程中运行,通过 FastSim.add_controller_thread() 注册:

python
def my_control_loop(sim):
    while sim.is_running():
        # ✅ 通过 Command 在主线程执行控制
        result = sim.robot_controller.move_robot_to_ee_pose(
            "arm", target_pose, sync=True   # 阻塞直到本帧执行
        )
        result.unwrap()

        # ✅ 通过 request_observations() 在主线程读取传感器数据
        obs = sim.data_collector.request_observations()

        # ❌ 不要在控制线程直接读写仿真对象
        # robot.get_joint_positions()  ← 存在线程安全风险

sim.add_controller_thread(my_control_loop)
sim.start_simulation_loop()

规则总结:

操作控制线程中说明
调用内置控制器(sync=True✅ 安全自动封装为 Command,在主线程执行
request_observations()✅ 安全DataCollector 提供的线程安全采集接口
直接读写仿真实体属性⚠️ 风险需确保与主线程不存在竞争
调用 sim.step()❌ 禁止step 只应由主线程驱动

sync=True 会阻塞控制线程,直到当前帧主线程执行完该 Command 并返回 Result。这是控制线程与主线程通信的标准模式。