pybullet_fleet package
Submodules
pybullet_fleet.action module
action.py Base classes and implementations for robot actions.
Provides a unified interface for different types of robot actions: - MoveAction: Path following - PickAction: Object picking with approach - DropAction: Object dropping with approach - WaitAction: Timed waiting (e.g., charging)
Actions are executed through an action queue system in the Agent class.
- pybullet_fleet.action.DEFAULT_MOVE_SKIP_THRESHOLD: float = 0.05
Default minimum distance (m) for MOVING_TO_PICK / MOVING_TO_DROP.
When the robot is already closer than this to the pick/drop pose, the MoveAction sub-step is skipped entirely. Prevents edge-case hangs with zero-distance paths.
- pybullet_fleet.action.DEFAULT_MAX_FORCE: float = 500.0
Default maximum motor force (N) for joint/EE control.
- pybullet_fleet.action.DEFAULT_JOINT_TOLERANCE: float = 0.01
Default joint-space tolerance (rad or m) for convergence checks.
- pybullet_fleet.action.DEFAULT_EE_TOLERANCE: float = 0.02
Default Cartesian end-effector tolerance (m) for convergence checks.
- class pybullet_fleet.action.PickPhase(value)
Bases:
EnumPhases of pick action execution.
- INIT = 'init'
- APPROACHING = 'approaching'
- MOVING_TO_PICK = 'moving_to_pick'
- PICKING = 'picking'
- RETREATING = 'retreating'
- class pybullet_fleet.action.DropPhase(value)
Bases:
EnumPhases of drop action execution.
- INIT = 'init'
- APPROACHING = 'approaching'
- MOVING_TO_DROP = 'moving_to_drop'
- DROPPING = 'dropping'
- RETREATING = 'retreating'
- class pybullet_fleet.action.Action
Bases:
ABCBase class for all robot actions.
Actions represent high-level tasks that agents can perform: - MoveAction: Navigate along a path - PickAction: Pick up an object - DropAction: Drop an object - WaitAction: Wait for a duration
Actions are executed through Agent.add_action() or Agent.add_action_sequence(). Each action manages its own state and returns True when complete.
- status
Current execution status
- start_time
When action started (simulation time)
- end_time
When action completed (simulation time)
- error_message
Error description if action failed
- abstractmethod execute(agent, dt)
Execute action for one timestep.
- abstractmethod reset()
Reset action state for re-execution.
- cancel()
Cancel action execution.
- get_duration()
Get action execution duration.
- is_complete()
Check if action has completed (successfully or with failure).
- Return type:
- class pybullet_fleet.action.MoveAction(path, auto_approach=True, final_orientation_align=True, direction=MovementDirection.FORWARD)
Bases:
ActionMove along a path.
Wraps the existing Agent path following functionality into the Action interface.
- Parameters:
path (
Path) – Path object with waypoints to followauto_approach (
bool) – Add approach waypoint if far from start (default: True)final_orientation_align (
bool) – Align to final orientation after reaching end (default: True)direction (
Union[MovementDirection,str]) – Movement direction - MovementDirection.FORWARD (default, face movement direction) or MovementDirection.BACKWARD (maintain orientation, move backward). Can also accept string “forward” or “backward” for convenience.
Example:
# Move forward (normal) path = Path.from_positions([[0, 0, 0], [1, 0, 0], [1, 1, 0]]) action = MoveAction(path=path) agent.add_action(action) # Move backward action = MoveAction(path=path, direction=MovementDirection.BACKWARD) agent.add_action(action)
- path: Path
- auto_approach: bool = True
- final_orientation_align: bool = True
- direction: MovementDirection | str = 'forward'
- execute(agent, dt)
Execute move action.
- Return type:
- reset()
Reset action state.
- class pybullet_fleet.action.JointAction(target_joint_positions, max_force=500.0, tolerance=None)
Bases:
ActionJoint positions control as an Action.
- Parameters:
target_joint_positions (
Union[list,dict]) – List of target joint positions (radians for revolute, metres for prismatic)max_force (
float) – Maximum force to apply (default: 500.0)tolerance (
Union[float,list,dict,None]) –Position tolerance to consider as reached. Accepts a single float (applied to all joints), a list (one per joint), or a dict
{joint_name: tol}. WhenNone(the default), the tolerance is resolved fromagent.joint_toleranceat the firstexecute()call and written back to this attribute so the resolved value is inspectable after the action starts.Note
A scalar tolerance applies the same value to every joint regardless of type. For mixed prismatic / revolute chains, consider specifying per-joint tolerances so that prismatic joints (metres) can be tighter than revolute joints (radians).
wait_time – Optional seconds to wait after reaching target (default: 0.0)
Example:
# Scalar tolerance (same for all joints) action = JointAction(target_joint_positions=[0.0, 1.0, 0.0, 0.0]) agent.add_action(action) # Per-joint tolerance for mixed prismatic/revolute action = JointAction( target_joint_positions={"rail_joint": 0.5, "elbow_to_wrist": -0.3}, tolerance={"rail_joint": 0.005, "elbow_to_wrist": 0.05}, # 5 mm / 0.05 rad )
- max_force: float = 500.0
- execute(agent, dt)
Execute action for one timestep.
- reset()
Reset action state for re-execution.
- class pybullet_fleet.action.PoseAction(target_position, target_orientation=None, end_effector_link=None, max_force=500.0, tolerance=0.02)
Bases:
ActionEnd-effector pose control via inverse kinematics.
Calls
agent.move_end_effector()on the first step. Joint targets are always set and the action waits for joints to settle. After settling, the action completes withCOMPLETEDif the target was reachable, orFAILEDif it was not.toleranceis used for both the IK reachability pre-check (viamove_end_effector()) and the final EE Cartesian distance verification (viaare_ee_at_target()). Joint convergence uses the default joint tolerance (not this value).- Parameters:
target_position (
List[float]) – Target EE position [x, y, z] in world frame.target_orientation (
Optional[tuple]) – Target EE orientation as quaternion [qx, qy, qz, qw] (optional).end_effector_link (
Union[int,str,None]) – Link index (int), name (str), or None for auto-detect.max_force (
float) – Maximum force for motor control (default: 500.0).tolerance (
float) – EE Cartesian distance tolerance in metres (default: 0.02).
Example:
action = PoseAction(target_position=[0.3, 0.0, 0.5]) agent.add_action(action) action = PoseAction( target_position=[0.3, 0.0, 0.5], target_orientation=[0, 0, 0, 1], ) agent.add_action(action)
- max_force: float = 500.0
- tolerance: float = 0.02
- execute(agent, dt)
Execute action for one timestep.
- reset()
Reset action state for re-execution.
- class pybullet_fleet.action.WaitAction(duration, action_type='idle')
Bases:
ActionWait for a specified duration.
Useful for simulating charging, loading, or other time-based operations.
- Parameters:
Example:
# Wait for 5 seconds (charging) action = WaitAction(duration=5.0, action_type="charge") agent.add_action(action)
- duration: float
- action_type: str = 'idle'
- execute(agent, dt)
Execute wait action.
- Return type:
- reset()
Reset action state.
- class pybullet_fleet.action.PickAction(target_object_id=None, target_position=None, search_radius=0.5, use_2d_search=True, use_approach=True, approach_pose=None, approach_offset=1.0, pick_offset=0.0, move_skip_threshold=0.05, attach_link=-1, attach_relative_pose=<factory>, joint_targets=None, joint_tolerance=None, joint_max_force=500.0, ee_target_position=None, ee_target_orientation=None, ee_end_effector_link=None, ee_tolerance=0.02, continue_on_ik_failure=True)
Bases:
ActionPick an object and attach it to the robot.
Kinematic implementation: 1. Move to approach pose (if specified or auto-calculated) 2. Teleport object to attachment position 3. Attach object using constraint
- Parameters:
target_object_id (
Optional[int]) – Specific object body ID to pick (optional)target_position (
Optional[List[float]]) – Pick from position - auto-select nearest pickable object (optional)search_radius (
float) – Search radius when using target_position (default: 0.5m)approach_pose (
Optional[Pose]) – Explicit approach pose (optional, auto-calculated if None)approach_offset (
float) – Auto-calculate approach offset distance (default: 1.0m)pick_offset (
float) – Distance from target where pick is executed (default: 0.0m)attach_link (
Union[int,str]) – Link to attach to - can be index (int) or name (str). -1 or “base_link” for base link (default: -1)attach_relative_pose (
Pose) – Position and orientation offset in link’s frame as Pose object
Note
Path visualization is controlled by agent.path_visualize setting.
Example:
# Pick specific object with position offset (using link index) action = PickAction( target_object_id=pallet.body_id, approach_offset=1.0, pick_offset=0.3, attach_link=-1, # Base link attach_relative_pose=Pose.from_euler(0.6, 0, -0.2, roll=np.pi/2, pitch=0, yaw=0) ) agent.add_action(action) # Pick using link name action = PickAction( target_object_id=pallet.body_id, attach_link="sensor_mast", # Attach to named link attach_relative_pose=Pose.from_euler(0, 0, 0.5) ) agent.add_action(action) # Pick nearest object at location action = PickAction( target_position=[5, 0, 0.1], search_radius=0.5 ) agent.add_action(action)
- search_radius: float = 0.5
- use_2d_search: bool = True
- use_approach: bool = True
- approach_pose: Pose | None = None
- approach_offset: float = 1.0
- pick_offset: float = 0.0
- move_skip_threshold: float = 0.05
- attach_relative_pose: Pose
- joint_max_force: float = 500.0
- ee_tolerance: float = 0.02
- continue_on_ik_failure: bool = True
- execute(agent, dt)
Execute pick action.
- Return type:
- reset()
Reset action state.
- class pybullet_fleet.action.DropAction(drop_pose, use_approach=True, approach_pose=None, approach_offset=1.0, drop_offset=0.0, move_skip_threshold=0.05, place_gently=True, drop_height=0.1, target_object_id=None, drop_relative_pose=None, joint_targets=None, joint_tolerance=None, joint_max_force=500.0, ee_target_position=None, ee_target_orientation=None, ee_end_effector_link=None, ee_tolerance=0.02, continue_on_ik_failure=True)
Bases:
ActionDrop an attached object at a specified location.
Kinematic implementation: 1. Move to approach pose (if specified or auto-calculated) 2. Detach object 3. Teleport object to drop position
- Parameters:
drop_pose (
Pose) – Where to drop the object (position and orientation). Orientation defaults to no rotation[0, 0, 0, 1].approach_pose (
Optional[Pose]) – Explicit approach pose (optional, auto-calculated if None)approach_offset (
float) – Auto-calculate approach offset distance (default: 0.2m)place_gently (
bool) – Place at exact position vs drop from height (default: True)drop_height (
float) – Height above drop_pose to release if not gentle (default: 0.1m)target_object_id (
Optional[int]) – Specific object to drop (None = drop first attached)
Example:
# Drop at specific location action = DropAction( drop_pose=Pose(position=[10, 5, 0.1]), place_gently=True ) agent.add_action(action) # Drop with specific orientation action = DropAction( drop_pose=Pose(position=[10, 5, 0.1], orientation=[0, 0, 0.7, 0.7]), )
Note
Path visualization is controlled by agent.path_visualize setting.
- drop_pose: Pose
- use_approach: bool = True
- approach_pose: Pose | None = None
- approach_offset: float = 1.0
- drop_offset: float = 0.0
- move_skip_threshold: float = 0.05
- place_gently: bool = True
- drop_height: float = 0.1
- drop_relative_pose: Pose | None = None
- joint_max_force: float = 500.0
- ee_tolerance: float = 0.02
- continue_on_ik_failure: bool = True
- execute(agent, dt)
Execute drop action.
- Return type:
- reset()
Reset action state.
pybullet_fleet.agent module
core/agent.py Agent class for goal-based position control with max velocity and acceleration constraints. Supports both mobile (use_fixed_base=False) and fixed (use_fixed_base=True) robots. Supports both Mesh and URDF loading.
- class pybullet_fleet.agent.AgentSpawnParams(visual_shape=None, collision_shape=None, initial_pose=<factory>, mass=0.0, pickable=True, name=None, visual_frame_pose=None, collision_frame_pose=None, collision_mode=CollisionMode.NORMAL_3D, user_data=<factory>, urdf_path=None, motion_mode=MotionMode.DIFFERENTIAL, use_fixed_base=False, ik_params=None, controller=None, plugins=None)
Bases:
SimObjectSpawnParamsAgent spawn parameters extending SimObjectSpawnParams.
These parameters define the physical properties, appearance, and initial state of an agent. They are used by Agent.from_params() and AgentManager.spawn_agents_grid().
Supports three types of robots: 1. Mesh robots: Specify visual_shape (and optionally collision_shape) 2. URDF robots: Specify urdf_path (for robots with joints) 3. Virtual agents: Specify neither (invisible, no collision, useful for tracking/planning)
- urdf_path
Path to robot URDF file (for URDF-based robots with joints)
- motion_mode
MotionMode.OMNIDIRECTIONAL (move in any direction) or MotionMode.DIFFERENTIAL (rotate then move forward)
- use_fixed_base
If True, robot base is fixed and doesn’t move (default: False)
- plugins
List of plugin config dicts for per-agent plugins. Each entry is
{"type": "battery", "config": {...}}or{"class": "my.Module", "config": {...}}.
- controller
Movement controller selector / configuration. Accepts a registry-name string (
"omni"), a flat dict, a list of dicts, aControllerParams, or a pre-builtController. Each dict entry uses eithertype:(registry name) orclass:(dotted import path), plus controller-specific config inline. KinematicController entries (omni, differential) replace the base controller; high-level entries (patrol, random_walk) are appended on top of it. A lone high-level entry auto-creates the base frommotion_mode. Example single:{"type": "patrol", "waypoints": [...], "loop": True}Example custom:{"class": "my_pkg.MyController", "max_linear_vel": 1.5}Example list:[{"type": "differential", ...}, {"type": "patrol", ...}]
See
Agent.__init__for thecontrollerargument (selector / configuration for the Agent’s movement controller; supports str / dict / list / Controller / ControllerParams).- Inherited from SimObjectSpawnParams:
- name: Optional string name for human-readable identification.
Duplicates allowed - use for debugging/filtering, not unique lookup. For unique identification, use object_id after spawning.
Note
AgentManager.spawn_agents_grid() calculates positions automatically and may override initial_pose.
- motion_mode: MotionMode | str = 'differential'
- use_fixed_base: bool = False
- ik_params: IKParams | None = None
- controller: str | Dict[str, Any] | List[Dict[str, Any]] | ControllerParams | Controller | None = None
- classmethod from_dict(config)
Create AgentSpawnParams from configuration dictionary.
- Parameters:
Dictionary with robot parameters including:
visual_shape/collision_shapeorurdf_pathinitial_pose:Poseobject (optional)controller: dict / str /None— seecontrollerand the Controller Configuration how-touse_fixed_base,motion_mode,ik_params(optional)
- Returns:
AgentSpawnParams instance
Example:
config = { 'visual_shape': ShapeParams( shape_type='mesh', mesh_path='robot.obj', mesh_scale=[1.0, 1.0, 1.0], rgba_color=[0.2, 0.2, 0.2, 1.0] ), 'collision_shape': ShapeParams( shape_type='box', half_extents=[0.2, 0.1, 0.2] ), 'initial_pose': Pose.from_xyz(1.0, 2.0, 0.0), 'controller': {'max_linear_vel': 2.0, 'max_linear_accel': 5.0}, 'use_fixed_base': False } params = AgentSpawnParams.from_dict(config) # Or for URDF: config = { 'urdf_path': 'robots/mobile_robot.urdf', 'initial_pose': Pose.from_xyz(1.0, 2.0, 0.0), 'use_fixed_base': False } params = AgentSpawnParams.from_dict(config)
- class pybullet_fleet.agent.IKParams(max_outer_iterations=5, convergence_threshold=0.01, max_inner_iterations=200, residual_threshold=0.0001, reachability_tolerance=0.02, seed_quartiles=(0.25, 0.5, 0.75), ik_joint_names=None)
Bases:
objectInverse kinematics solver configuration.
Controls the multi-seed iterative IK solver used by
Agent._solve_ik()andAgent.move_end_effector().- max_outer_iterations
Maximum refinement iterations per seed.
- convergence_threshold
EE-to-target distance (m) below which the IK solution is accepted early.
- max_inner_iterations
maxNumIterationspassed topybullet.calculateInverseKinematics.
- residual_threshold
residualThresholdpassed topybullet.calculateInverseKinematics.
- reachability_tolerance
Maximum EE distance (m) for
Agent.move_end_effector()to report reachable.
- seed_quartiles
Tuple of joint-range fractions (0–1) used to build diversified rest-pose seeds. Each value
qproduces a seed atlower + q * rangefor every joint.
- ik_joint_names
Explicit tuple of joint names that the IK solver is allowed to move. All other movable joints are locked at their current position. When
None(default), the solver falls back to the automatic heuristic (continuous joints are locked, everything else is free).
Example:
cfg = IKParams(max_outer_iterations=10, seed_quartiles=(0.1, 0.5, 0.9)) arm = Agent.from_urdf("arm.urdf", ik_params=cfg) # Mobile manipulator — solve only for arm joints cfg = IKParams(ik_joint_names=( "mount_to_shoulder", "shoulder_to_elbow", "elbow_to_wrist", "wrist_to_end", )) robot = Agent.from_urdf("mobile_manipulator.urdf", ik_params=cfg)
- max_outer_iterations: int = 5
- convergence_threshold: float = 0.01
- max_inner_iterations: int = 200
- residual_threshold: float = 0.0001
- reachability_tolerance: float = 0.02
- class pybullet_fleet.agent.Agent(body_id, urdf_path=None, motion_mode=MotionMode.DIFFERENTIAL, use_fixed_base=False, collision_mode=CollisionMode.NORMAL_3D, sim_core=None, name=None, user_data=None, mass=None, ik_params=None, joint_tolerance=None, controller=None, notify_spawn=True)
Bases:
SimObjectAgent class with goal-based position control.
Inherits from SimObject to get attach/detach and callback functionality. Supports both mobile robots (use_fixed_base=False) and fixed robots (use_fixed_base=True). Supports both Mesh and URDF loading.
- Class Attributes:
- _spawn_params_cls: The SpawnParams dataclass used by config-driven
grid spawning (
AgentSpawnParams).
- Class Constants:
- _KINEMATIC_JOINT_FALLBACK_VELOCITY: Default max joint velocity for
revolute joints (rad/s) used when the URDF
<limit velocity="...">is 0 or missing.- _KINEMATIC_PRISMATIC_FALLBACK_VELOCITY: Default max joint velocity
for prismatic joints (m/s) used when the URDF limit is 0 or missing.
- _DEFAULT_JOINT_TOLERANCE: Default tolerance (rad / m) for joint
target comparison methods.
Features: - Accepts Pose goals (compatible with ROS2 geometry_msgs/Pose) - Max velocity and acceleration constraints (for mobile robots) - Current pose query API - Shared shape optimization for fast spawning - Fixed robot support (non-moving objects) - URDF support with joint control - Attach/detach objects (inherited from SimObject)
Object Identification (inherited from SimObject): - object_id: Unique integer ID (assigned by sim_core, use for lookups) - name: Optional string name (duplicates allowed, use for debugging/filtering)
This class is designed to be eventually controlled by ROS2 nodes, so the API follows ROS message conventions.
- __init__(body_id, urdf_path=None, motion_mode=MotionMode.DIFFERENTIAL, use_fixed_base=False, collision_mode=CollisionMode.NORMAL_3D, sim_core=None, name=None, user_data=None, mass=None, ik_params=None, joint_tolerance=None, controller=None, notify_spawn=True)
Initialize Agent.
- Parameters:
body_id (
int) – PyBullet body IDurdf_path (
Optional[str]) – Path to robot URDF file (if URDF-based)motion_mode (
Union[MotionMode,str]) –MotionMode.OMNIDIRECTIONAL or MotionMode.DIFFERENTIAL drive. Used as the default controller selector when
controllerhas no"type"key. Ifcontroller={"type": ...}is given and disagrees withmotion_mode, a warning is logged andcontroller.typewins.Deprecated since version Prefer:
controller="omni"/controller="differential"(or a fullcontroller={...}dict).motion_modeis retained for backward compatibility and as a coarse “2D mobile base shape” hint for batched controllers.use_fixed_base (
bool) – If True, robot base is fixed and doesn’t movecollision_mode (
CollisionMode) – Collision detection mode (default: NORMAL_3D)sim_core – Reference to simulation core (optional)
ik_params (
Optional[IKParams]) – IK solver configuration (default:IKParams()).joint_tolerance (
Union[float,list,dict,None]) – Default joint tolerance for convergence checks. Acceptsfloat(all joints),list(per-joint), ordict({joint_name: tol}). IfNone, uses the class default (0.01). For mixed prismatic / revolute chains, a dict is recommended so prismatic joints (metres) can use a tighter tolerance than revolute joints (radians).controller (
Union[str,Dict[str,Any],ControllerParams,Controller,None]) –Controller selector / configuration. Accepts:
None— framework defaults;motion_modeselectsOmniControllerorDifferentialController.str— registry name, e.g."omni","differential","patrol". Equivalent to{"type": <str>}.dict—{"type": "...", "max_linear_vel": 1.5, "wheel_separation": 0.3, ...}.ControllerParamsfields populateControllerParams; impl-specific extras (e.g.wheel_separation) flow through to the controller’s__init__. Recommended for YAML / config-file workflows.Controllerinstance — any pre-built controller (built-in or custom). Installed as-is;self.controller_paramsis taken fromcontroller.params. Recommended for Python workflows that need custom controllers or want type-safe construction.ControllerParamsinstance — params only;motion_modestill picks the Omni/Diff class. Use this only when you want to tweak shared params but keep the default class.
Note
For spawning robots from AgentSpawnParams, use Agent.from_params() instead. For mesh robots, use Agent.from_mesh() factory method. For URDF robots, use Agent.from_urdf() factory method.
- path_visualize: bool
- path_visualize_width: float
- controller_params: ControllerParams
- property max_linear_vel: ndarray
- property max_linear_accel: ndarray
- property max_angular_vel: ndarray
- property max_angular_accel: ndarray
- property joint_tolerance: float | list | dict
Default joint tolerance used by convergence checks.
Returns the instance-level tolerance if set, otherwise the class default (
_DEFAULT_JOINT_TOLERANCE = 0.01).For robots with mixed prismatic (metres) and revolute (radians) joints, set this to a
dictso each joint type can have an appropriate tolerance:agent = Agent.from_urdf( ..., joint_tolerance={"rail_joint": 0.005, "elbow_to_wrist": 0.05}, )
- property is_moving: bool
Check if robot is currently moving.
To change movement state, use set_goal_pose(), set_path(), or stop().
- Returns:
True if robot is moving, False otherwise
- Type:
Read-only property
- property battery_soc: float
Current state of charge [0.0, 1.0].
Always returns
1.0when noBatteryPluginis attached.
- property is_charging: bool
Whether the agent is currently charging.
- property battery_plugin
The
BatteryPlugininstance, orNone.
- set_charging(charging)
Start or stop charging.
- property plugins: List[AgentPlugin]
Read-only list of active per-agent plugins.
- add_plugin(plugin)
Attach a per-agent plugin.
- Parameters:
plugin (
AgentPlugin) – AnAgentPlugininstance bound to this agent.- Raises:
ValueError – If plugin is not bound to this agent.
- Return type:
- remove_plugin(plugin_or_cls)
Remove a plugin by instance or class.
When a class is given, the first instance of that class is removed. When an instance is given, that exact instance is removed.
- Parameters:
plugin_or_cls – An
AgentPlugininstance or subclass.- Return type:
- get_plugin(cls)
Return the first plugin matching cls, or
None.
- property motion_mode: MotionMode
Get current motion mode.
To change motion mode, use set_motion_mode() method.
- Returns:
Current MotionMode (OMNIDIRECTIONAL or DIFFERENTIAL)
- Type:
Read-only property
- property velocity: ndarray
Get current velocity vector.
- Returns:
Velocity [vx, vy, vz] in m/s (copy of internal state)
- Type:
Read-only property
- property angular_velocity: float
Get current angular velocity.
- Returns:
Angular velocity in rad/s
- Type:
Read-only property
- property goal_pose: Pose | None
Get current goal pose.
To set a new goal, use set_goal_pose() or set_path().
- Returns:
Current goal Pose, or None if no goal is set
- Type:
Read-only property
- property current_waypoint_index: int
Get current waypoint index in the path.
- Returns:
Current waypoint index (0-based), or 0 if no path is set
- Type:
Read-only property
- property path: List[Pose]
Current waypoint path. Empty when idle.
- Type:
Read-only property
- classmethod from_params(spawn_params, sim_core=None)
Create an Agent from AgentSpawnParams.
This is the recommended way to spawn robots when using AgentSpawnParams. Automatically detects whether to use mesh, URDF, or virtual agent based on spawn_params.
- Parameters:
spawn_params (
AgentSpawnParams) – AgentSpawnParams instance with all robot parameterssim_core – Reference to simulation core (optional)
- Return type:
Agent- Returns:
Agent instance
The recommended way to choose a controller is to set
controller={"type": "omni" | "differential" | ...}explicitly.motion_modestill works as a shortcut and as the default whencontroller.typeis omitted, but if both are given,controller.typewins and a warning is emitted on mismatch.Example (Mesh, omni — recommended explicit form):
params = AgentSpawnParams( visual_shape=ShapeParams( shape_type="mesh", mesh_path="robot.obj", mesh_scale=[1.0, 1.0, 1.0], rgba_color=[0.2, 0.2, 0.2, 1.0] ), collision_shape=ShapeParams( shape_type="box", half_extents=[0.2, 0.1, 0.2] ), initial_pose=Pose.from_xyz(1.0, 2.0, 0.0), controller={"type": "omni", "max_linear_vel": 3.0}, ) robot = Agent.from_params(spawn_params=params)
Example (URDF):
params = AgentSpawnParams( urdf_path="arm_robot.urdf", initial_pose=Pose.from_xyz(0.0, 0.0, 0.0), use_fixed_base=True, ) robot = Agent.from_params(spawn_params=params)
Example (Virtual Agent - invisible, no collision):
params = AgentSpawnParams( initial_pose=Pose.from_xyz(0.0, 0.0, 0.0), controller={"type": "omni", "max_linear_vel": 2.0}, ) virtual_agent = Agent.from_params(spawn_params=params)
- classmethod from_dict(config, sim_core=None)
Create an Agent from a raw config dict.
Combines
AgentSpawnParams.from_dict()andfrom_params()in a single call. Usescls._spawn_params_clsso subclasses (e.g. Door, Elevator) automatically use their own SpawnParams.
- classmethod from_mesh(visual_shape=None, collision_shape=None, pose=None, mass=0.0, motion_mode=MotionMode.DIFFERENTIAL, use_fixed_base=False, collision_mode=CollisionMode.NORMAL_3D, sim_core=None, name=None, user_data=None, controller=None, notify_spawn=True)
Create a mesh-based Agent with flexible shape control.
- Parameters:
visual_shape (
Optional[ShapeParams]) – ShapeParams for visual geometrycollision_shape (
Optional[ShapeParams]) – ShapeParams for collision geometrypose (
Pose) – Initial Pose (position and orientation)mass (
float) – Robot mass (kg), 0.0 for kinematic controlmotion_mode (
Union[MotionMode,str]) – MotionMode.OMNIDIRECTIONAL or MotionMode.DIFFERENTIALuse_fixed_base (
bool) – If True, robot base is fixed and doesn’t movesim_core – Reference to simulation core
controller (
Union[str,Dict[str,Any],ControllerParams,Controller,None]) – Controller selector / config. SeeAgent.__init__()for accepted forms (None/str/dict/ControllerParams).
- Return type:
Agent- Returns:
Agent instance
- classmethod from_urdf(urdf_path, pose=None, mass=None, motion_mode=MotionMode.DIFFERENTIAL, use_fixed_base=False, collision_mode=CollisionMode.NORMAL_3D, sim_core=None, name=None, user_data=None, ik_params=None, joint_tolerance=None, controller=None, notify_spawn=True)
Create a URDF-based Agent (with joints).
- Parameters:
urdf_path (
str) – Robot model name (e.g."panda") or path to URDF file. Names are resolved viaresolve_model()pose (
Pose) – Initial Pose (position and orientation). Defaults to originmass (
Optional[float]) – Mass override. - None (default): Use URDF file’s mass values as-is - 0.0: Override all links to mass=0 for kinematic control (no physics)motion_mode (
Union[MotionMode,str]) – MotionMode.OMNIDIRECTIONAL or MotionMode.DIFFERENTIALuse_fixed_base (
bool) – If True, robot base is fixed in spacesim_core – Reference to simulation core
ik_params (
Optional[IKParams]) – IK solver configuration (default:IKParams()).joint_tolerance (
Union[float,list,dict,None]) – Default joint tolerance for convergence. Seejoint_toleranceproperty for details.controller (
Union[str,Dict[str,Any],ControllerParams,Controller,None]) – Controller selector / config. SeeAgent.__init__()for accepted forms (None/str/dict/ControllerParams).
- Return type:
Agent- Returns:
Agent instance
Note
mass=None (default): Uses URDF file’s mass values for physics simulation
mass=0.0: Override all links to mass=0 for kinematic control (no physics)
Example:
robot = Agent.from_urdf(urdf_path="arm_robot.urdf") robot = Agent.from_urdf(urdf_path="panda", use_fixed_base=True) robot = Agent.from_urdf(urdf_path="arm_robot.urdf", mass=0.0)
- set_goal_pose(goal, direction=None)
Set goal pose for the robot to move to.
Note: This is ignored if the robot has a fixed base.
- set_motion_mode(mode)
Set motion mode for the robot.
Replaces the active controller with the matching default (OmniController or DifferentialController). Any custom controller previously installed via
controller.typeis lost.Note: Motion mode cannot be changed while the robot is moving.
Deprecated since version Prefer:
agent.set_controller(create_controller(name, cfg))for runtime controller swaps.set_motion_modeis kept for backward compatibility and only handles the built-in Omni / Differential defaults.
- set_path(path, auto_approach=True, final_orientation_align=True, direction=None)
Set a path (list of waypoints) for the robot to follow.
- Parameters:
path (
List[Pose]) – List of Pose waypoints to follow in sequenceauto_approach (
bool) – If True, automatically add straight-line approach from current position to first waypoint if distance > threshold (default: True)final_orientation_align (
bool) – If True, add final rotation to match last waypoint’s orientation after reaching the position (default: True)direction (
Union[MovementDirection,str,None]) – Movement direction override, orNoneto use the controller’sdefault_direction. Can be MovementDirection enum or string (“auto”, “forward”, “backward”). Only used in differential drive mode (MotionMode.DIFFERENTIAL). In omnidirectional mode, this parameter is ignored.
- add_action(action)
Add an action to the execution queue.
Actions are executed sequentially in the order they are added.
- Parameters:
action (
Action) – Action instance to add to queue
Example:
agent.add_action(MoveAction(path=my_path)) agent.add_action(WaitAction(duration=5.0))
- add_action_sequence(actions)
Add multiple actions to the execution queue.
- Parameters:
actions (
List[Action]) – List of Action instances to add
Example:
agent.add_action_sequence([ MoveAction(path=path1), PickAction(target_object_id=obj.body_id), MoveAction(path=path2), DropAction(drop_pose=Pose(position=[10, 5, 0])) ])
- clear_actions()
Clear all actions from the queue and cancel current action.
- get_current_action()
Get the currently executing action.
- Return type:
Optional[Action]- Returns:
Current Action instance, or None if no action is executing
- get_action_queue_size()
Get the number of actions waiting in the queue.
- Return type:
- Returns:
Number of actions in queue
- is_action_queue_empty()
Check if action queue is empty and no action is currently executing.
- Return type:
- Returns:
True if no actions are queued or executing, False otherwise
- update(dt)
Update robot position towards goal with velocity/acceleration constraints.
Supports two motion modes: - MotionMode.OMNIDIRECTIONAL: Move in any direction without rotating first (smooth trajectories) - MotionMode.DIFFERENTIAL: Rotate to face target, then move forward (smooth trajectories)
Also processes action queue for high-level task execution.
Note: This does nothing if the robot has a fixed base.
- get_velocity()
Get current velocity vector.
- Return type:
- Returns:
Velocity [vx, vy, vz] in m/s
- set_controller(controller=None)
Set or replace the base (index-0) movement controller.
When a controller is set,
update()delegates movement tocontroller.compute(agent, dt).Note: A default controller is always assigned in
from_params(). SettingNonedisables all movement.
- add_controller(controller)
Append a high-level controller to the chain.
High-level controllers (patrol, random walk) are executed before the base navigation controller in
update().- Parameters:
controller – A
Controllerinstance.- Return type:
- remove_controller(controller_or_cls)
Remove a non-base controller by instance or class.
The base controller (index 0) cannot be removed via this method. Use
set_controller()to replace it.When a class is given, the first instance of that class (excluding the base controller) is removed. When an instance is given, that exact instance is removed.
- Parameters:
controller_or_cls – A Controller instance or subclass.
- Return type:
- get_controller_by_type(cls)
Return the first controller matching cls, or
None.Searches the entire chain (including the base controller).
- property controller
Return the base (index-0) controller, or
Noneif the chain is empty.
- stop()
Stop robot movement and clear goal and path.
- is_urdf_robot()
Check if this robot is URDF-based (has joints).
- Return type:
- get_num_joints()
Get number of joints (0 for mesh robots).
- Return type:
- get_joint_state(joint_index)
Get joint state (position and velocity).
For kinematic robots (mass=0), returns the cached position and velocity=0.0, avoiding a PyBullet C call. For physics robots the value is read from
p.getJointState().- Parameters:
joint_index (
int) – Joint index (0-based)- Return type:
- Returns:
(position, velocity) tuple
Example:
pos, vel = robot.get_joint_state(0)
- get_joint_state_by_name(joint_name)
Get (position, velocity) for a single joint by name. Returns (position, velocity) or (0.0, 0.0) if not found.
- Return type:
- get_all_joints_state()
Return a list of (position, velocity) for all joints. Example: [(pos0, vel0), (pos1, vel1), …]
- Return type:
- get_all_joints_state_by_name()
Return a dict of {joint_name: (position, velocity)} for all joints. Example: {joint_name: (pos, vel), …}
- Return type:
- get_joints_state_by_name(joint_names)
Return a dict of {joint_name: (position, velocity)} for the specified joint_names. Example: {joint_name: (pos, vel), …}
- Return type:
- set_joint_target(joint_index, target_position, max_force=500.0)
Set target position for a joint (for URDF robots only).
- Parameters:
Example:
robot.set_joint_target(0, 1.57) # Move first joint to 90 degrees
- set_joint_target_by_name(joint_name, target_position, max_force=500.0)
Set target position for a joint by joint name.
- set_all_joints_targets(target_positions, max_force=500.0)
Set target positions for all joints at once.
- Parameters:
Example:
robot.set_all_joints_targets([0.0, 1.57, -1.57, 0.0])
- set_joints_targets_by_name(joint_targets, max_force=500.0)
Set multiple joint targets by dict {joint_name: target_position} (partial update allowed).
- set_joints_targets(targets, max_force=500.0)
Set joint targets (accepts both list and dict).
- Parameters:
Example:
# Using list robot.set_joints_targets([0.0, 1.57, -1.57, 0.0]) # Using dict robot.set_joints_targets({"joint1": 1.57, "joint2": -1.57})
- is_joint_at_target(joint_index, target, tolerance=None)
Check if a single joint (by index) is within tolerance of the target position.
- Parameters:
joint_index (
int) – Joint index (0-based).target (
float) – Target position (radians for revolute, metres for prismatic).tolerance (
Union[float,list,dict,None]) – Position tolerance —float,list,dict, orNone(→self.joint_tolerance). Resolved via_resolve_joint_tolerance(tolerance, joint_index).
- Return type:
- is_joint_at_target_by_name(joint_name, target, tolerance=None)
Check if a single joint (by name) is within tolerance of the target position.
- Parameters:
joint_name (
str) – Joint name string.target (
float) – Target position (radians for revolute, metres for prismatic).tolerance (
Union[float,list,dict,None]) – Position tolerance —float,list,dict, orNone(→self.joint_tolerance). Resolved via_resolve_joint_tolerance()using the joint’s index.
- Return type:
- are_all_joints_at_targets(target_positions=None, tolerance=None)
Check if joints are at target positions.
- Parameters:
target_positions (
Optional[list]) – List of target positions for each joint. If None, uses_last_joint_targets(last commanded targets).tolerance (
Union[float,list,dict,None]) –float,list,dict, orNone. If None, usesself.joint_tolerance. Passed through to_resolve_joint_tolerance()per joint.Note
A scalar tolerance applies the same value to all joints. For mixed prismatic (metres) / revolute (radians) robots, consider a
dictkeyed by joint name or setself.joint_toleranceto a dict.
- Return type:
- Returns:
True if all joints are within tolerance of their targets.
- are_joints_at_targets_by_name(joint_targets, tolerance=None)
Check if all specified joints (by name) are within tolerance.
- Parameters:
joint_targets (
dict) –{joint_name: target}tolerance (
Union[float,list,dict,None]) –float(all),dict({joint_name: tol}), orNone(→self.joint_tolerance).A
listis also accepted but is resolved by absolute joint index (not by iteration order of joint_targets). For named-joint subsets, prefer adictto avoid unexpected index-based fallback.For mixed prismatic / revolute chains, use a
dictso prismatic joints (metres) can have a tighter tolerance than revolute joints (radians).
- Return type:
- Returns:
True if all joints are within tolerance, False otherwise.
- are_joints_at_targets(targets=None, tolerance=None)
Check if joints are at target positions.
- Parameters:
targets (
Union[list,dict,None]) – List of target positions for all joints, dict{joint_name: position}, or None to use the last commanded targets (_last_joint_targets).tolerance (
Union[float,list,dict,None]) –Tolerance value(s) —
float,list,dict, or None (→self.joint_tolerance).For mixed prismatic / revolute chains, use a
dictso prismatic joints (metres) can have a tighter tolerance than revolute joints (radians):agent.are_joints_at_targets( targets, tolerance={"rail_joint": 0.005, "elbow_to_wrist": 0.05}, )
- Return type:
- Returns:
True if all joints are within tolerance, False otherwise.
Example:
# No args — check last commanded targets with agent's default tolerance if robot.are_joints_at_targets(): print("All joints settled") # Using list if robot.are_joints_at_targets([0.0, 1.57, -1.57, 0.0], tolerance=0.01): print("Reached target") # Using dict if robot.are_joints_at_targets({"joint1": 1.57, "joint2": -1.57}, tolerance=0.01): print("Reached target")
- property ik_params: IKParams
Read-only access to the IK solver configuration.
- move_end_effector(target_position, target_orientation=None, end_effector_link=None, max_force=500.0, tolerance=None)
Set end-effector target position via inverse kinematics.
Solves IK, checks reachability, and calls
set_all_joints_targets()internally. Actual movement happens inupdate()(fire-and-forget, likeset_joint_target()).The joint targets are always set (best-effort), even when the target is unreachable. The return value tells the caller whether the IK solution actually reaches the desired position.
- Parameters:
target_position (
List[float]) – Target EE position [x, y, z] in world frame.target_orientation (
Optional[Tuple[float,float,float,float]]) – Target EE orientation as quaternion [qx, qy, qz, qw].end_effector_link (
Union[int,str,None]) – End-effector link (int, str, or None for auto-detect).max_force (
float) – Maximum force for motor control (physics mode).tolerance (
Optional[float]) – Euclidean distance (m) for reachability check. If None, usesik_params.reachability_tolerance.
- Return type:
- Returns:
True if the IK solution places the EE within tolerance of the target (reachable). False otherwise (best-effort targets are still set).
Example:
reachable = arm.move_end_effector([0.3, 0.0, 0.5]) if not reachable: print("Target may not be fully reachable")
- are_ee_at_target(target_position, target_orientation=None, end_effector_link=None, tolerance=0.02, orientation_tolerance=0.15)
Check whether the end-effector is near the target pose.
Resolves the EE link and delegates to
_check_ee_pose().- Parameters:
target_position (
List[float]) – Target EE position [x, y, z] in world frame.target_orientation (
Optional[Tuple[float,float,float,float]]) – Target EE orientation as quaternion [qx, qy, qz, qw]. If None, only position is checked.end_effector_link (
Union[int,str,None]) – Link index, name, or None (auto-detect).tolerance (
float) – Maximum Euclidean distance (m) to consider reached.orientation_tolerance (
float) – Maximum1 - |dot|between actual and target quaternions (default 0.15).
- Return type:
- Returns:
True if the EE is within tolerance of the target.
- update_attached_objects_kinematics()
For URDF robots: update attached objects’ pose to follow the parent link (kinematics attach). This is called every step for kinematic (mass=0) attached objects.
pybullet_fleet.agent_manager module
core/agent_manager.py Management classes for simulation objects and agents. - SimObjectManager: Base manager for all simulation objects - AgentManager: Extended manager with movement control for agents
- class pybullet_fleet.agent_manager.GridSpawnParams(x_min, x_max, y_min, y_max, spacing, offset, z_min=0, z_max=0)
Bases:
objectGrid parameters for object spawning in 3D space.
- x_min
Minimum X grid index
- x_max
Maximum X grid index
- y_min
Minimum Y grid index
- y_max
Maximum Y grid index
- spacing
Grid spacing [spacing_x, spacing_y, spacing_z] in meters
- offset
Grid offset [offset_x, offset_y, offset_z] in meters
- z_min
Minimum Z grid index (defaults to 0)
- z_max
Maximum Z grid index (defaults to 0 for 2D grids)
Note
Objects spawn in X → Y → Z order. For 2D grids, set z_min=z_max=0 (default). For 3D grids, set z_max > z_min to enable vertical layers.
- x_min: int
- x_max: int
- y_min: int
- y_max: int
- z_min: int = 0
- z_max: int = 0
- property total_cells: int
(x range) * (y range) * (z range).
- Type:
Total number of grid cells
- classmethod from_dict(config)
Create GridSpawnParams from a configuration dictionary.
Supports two formats:
Explicit ranges (original) — requires
x_min,x_max,y_min,y_max,spacing,offset:grid: x_min: 0 x_max: 9 y_min: 0 y_max: 4 spacing: [3.0, 3.0, 0.0] offset: [0, 0, 0.05]
Count-based (convenience) — requires
countandspacing;columnsandoffsetare optional:grid: count: 50 spacing: [3.0, 3.0] offset: [0, 0, 0.05] columns: 10
When
countis present,from_count()is used to compute the grid ranges.
- classmethod from_count(count, columns=None, spacing=None, offset=None)
Create GridSpawnParams from a total entity count.
Computes
x_maxandy_maxfrom count and columns so that the grid has exactly enough cells. Useful when you know “I want 50 robots” rather than “x_max=9, y_max=4”.- Parameters:
count (
int) – Total number of entities to place.columns (
Optional[int]) – Grid width (X cells) before wrapping to the next row. Defaults toceil(sqrt(count)).spacing (
Optional[List[float]]) –[sx, sy, sz]in metres. Default[1,1,0].offset (
Optional[List[float]]) –[ox, oy, oz]origin offset. Default[0,0,0].
- Return type:
GridSpawnParams- Returns:
GridSpawnParams with
x_min=0,y_min=0,x_max=columns-1,y_max=ceil(count/columns)-1.
- class pybullet_fleet.agent_manager.SimObjectManager(sim_core=None, object_class=<class 'pybullet_fleet.sim_object.SimObject'>, enable_profiling=False)
Bases:
Generic[T]Base manager class for simulation objects (SimObject and Agent).
This class provides centralized control of multiple simulation objects.
Features: - Grid-based object spawning - Query all object poses - Get object count
- Type Parameter:
T: Type of objects managed (must be SimObject or its subclass)
- __init__(sim_core=None, object_class=<class 'pybullet_fleet.sim_object.SimObject'>, enable_profiling=False)
Initialize SimObjectManager.
- Parameters:
sim_core – Reference to simulation core (optional)
object_class (
type) – Default class to instantiate when spawning (SimObject or a subclass such as Agent).enable_profiling (
bool) – IfTrue, spawn methods log elapsed time vialogger.info. DefaultFalse. Can be overridden later:mgr.enable_profiling = True.
- enable_profiling: bool
- objects: List[T]
- add_object(obj)
Add an object to the manager.
- Parameters:
obj (
TypeVar(T, bound=SimObject)) – SimObject or Agent instance to add- Return type:
- Returns:
None. The object is silently skipped (with a warning) when its
object_idis invalid (< 0), which typically means it was created without a sim_core.
Note
Objects are automatically registered to sim_core.sim_objects via SimObject.__init__(), so we don’t register them here again.
- remove_object(obj)
Remove an object from the manager’s tracking lists.
This only removes the object from the manager’s bookkeeping (
objects,body_ids,object_ids). It does not touchsim_coreor PyBullet — that is handled byMultiRobotSimulationCore.remove_object().Called automatically by
sim_core.remove_object()when this manager is registered viasim_core.register_manager().
- clear()
Remove all objects from the manager’s tracking lists.
This is a manager-only operation — it does not remove objects from
sim_coreor PyBullet. Typically called bysim_core.reset()to keep the manager in sync after a full simulation reset.- Return type:
- spawn_objects_grid(num_objects, grid_params, spawn_params)
Spawn multiple objects in a grid pattern. This is a convenience wrapper around spawn_grid_counts for spawning a single object type with a specified count. :type num_objects:
int:param num_objects: Number of objects to spawn :type grid_params:GridSpawnParams:param grid_params: GridSpawnParams instance with grid configuration :type spawn_params:SimObjectSpawnParams:param spawn_params: SimObjectSpawnParams instance with object parameters
- spawn_grid_mixed(num_objects, grid_params, spawn_params_list)
Spawn multiple SimObjects in a grid pattern with mixed types.
This method allows spawning different types of objects with specified probabilities. If probabilities don’t sum to 1.0, some grid positions may remain empty.
- Parameters:
num_objects (
int) – Maximum number of objects to spawngrid_params (
GridSpawnParams) – GridSpawnParams instance with grid configurationspawn_params_list (
List[Tuple[SimObjectSpawnParams,float]]) – List of (spawn_params, probability) tuples - spawn_params: SimObjectSpawnParams for this type - probability: Spawn probability (0.0 to 1.0) Note: Probabilities don’t need to sum to 1.0
- Return type:
- Returns:
List of spawned SimObject instances
Example:
# Spawn 100 objects: 60% typeA, 30% typeB, 10% empty spawn_params_list = [ (typeA_params, 0.6), (typeB_params, 0.3), ] manager.spawn_grid_mixed(100, grid_params, spawn_params_list)
- spawn_grid_counts(grid_params, spawn_params_count_list)
Spawn objects on a grid according to the specified count for each type.
Each type is spawned exactly the requested number of times. Grid positions are randomly shuffled so that types are distributed uniformly across the grid.
- Parameters:
- Return type:
- Returns:
List of spawned objects
- Raises:
ValueError – If the total count exceeds the number of grid cells.
- spawn_objects_batch(params_list)
Batch API to spawn multiple object instances at once.
When sim_core provides
batch_spawn(), rendering is disabled and the spatial grid rebuild is deferred until all objects are created.
- spawn_from_config(entities_yaml)
Spawn entities from a list of config dicts.
Dispatches to
entity_cls.from_dict()for each entity, which internally calls the appropriateSpawnParams.from_dictandfrom_params. Custom entity classes can overridefrom_dictto use their ownSpawnParams.If a dict has no
typefield,"agent"is assumed for backward compatibility. Supported types:"agent","sim_object", or any custom type registered viaregister_entity_class().
- spawn_from_yaml(yaml_path, key='entities')
Load a YAML file and spawn entities from the specified section.
Reads yaml_path, extracts the key section (default
"entities"), and delegates tospawn_from_config().- Parameters:
- Return type:
- Returns:
List of spawned objects (empty if key is missing).
- Raises:
FileNotFoundError – If yaml_path does not exist.
- get_pose(object_index)
Get current pose of a specific object.
- get_all_poses()
Get current poses of all objects.
- Return type:
List[Pose]- Returns:
List of Pose objects for all objects
- get_poses_dict()
Get current poses of all objects as a dictionary.
- set_pose_all(pose_factory)
Set pose for all objects using a factory function.
Applies pose_factory to each object and immediately calls
set_posewith the returned value. Works for both SimObject and Agent instances.- Parameters:
pose_factory (
Callable[[TypeVar(T, bound=SimObject)],Pose]) – Function that takes an object and returns a target Pose.- Return type:
Example:
# Shift every object 1 m along X manager.set_pose_all(lambda obj: Pose.from_xyz( obj.get_pose().x + 1.0, obj.get_pose().y, obj.get_pose().z, ))
- get_object_count()
Get total number of managed objects.
- Return type:
- class pybullet_fleet.agent_manager.AgentManager(sim_core=None, name=None, update_frequency=10.0, object_class=<class 'pybullet_fleet.agent.Agent'>, enable_profiling=False, batch_controller=None, fleet_controller=None)
Bases:
SimObjectManager[Agent]Extended manager class for agents with update callback system.
This class extends SimObjectManager with agent-specific features: - Grid-based agent spawning with AgentSpawnParams - Pose goal assignment to individual agents - Callback system for custom update logic (goals, state tracking, etc.) - Query moving/stopped agents - Optional batch controller for vectorized kinematic updates (
batch_controller=orenable_batch())Key Features: - Auto-registers update callback to simulation loop - Callback receives (agents, manager, dt) for easy agent operations - Configurable update frequency - Direct access to manager methods within callback
Note: - Agent.update() is automatically called by MultiRobotSimulationCore every step - AgentManager callbacks are for high-level logic (goals, coordination, etc.) - Agent-specific state can be stored in each Agent.user_data dict
- __init__(sim_core=None, name=None, update_frequency=10.0, object_class=<class 'pybullet_fleet.agent.Agent'>, enable_profiling=False, batch_controller=None, fleet_controller=None)
Initialize AgentManager.
- Parameters:
sim_core – Reference to simulation core (optional)
name (
Optional[str]) – Optional name for this manager. When set, the manager can be retrieved later viaget_manager().update_frequency (
float) – Default update callback frequency in Hz (default: 10.0)object_class (
type) – Class to instantiate when spawning (default: Agent). Pass an Agent subclass to manage custom agent types.enable_profiling (
bool) – IfTrue, spawn methods log elapsed time. DefaultFalse.batch_controller (
Optional[str]) – Batch controller registry name (e.g."batch_omni","batch_differential") or a dotted import path to a customBatchKinematicControllersubclass (e.g."my_pkg.MyBatchController"). When set, every agent spawned through this manager is automatically registered with the batch controller so movement is driven by vectorizedbatch_advance()rather than per-agentcompute(). Equivalent to callingmanager.enable_batch(batch_controller)after construction.fleet_controller (
Optional[Dict[str,Any]]) – Defaultcontroller:config applied to agents that do not specify their own controller params at spawn time. Accepts the same dict format asAgentSpawnParams.controller(e.g.{"max_linear_vel": 2.0, "max_angular_vel": 3.0}). Per-agentcontroller:values always take precedence.
- enable_batch(mode)
Create and attach a batch controller for mode, replacing any existing one.
Any agents already managed by this instance that are not yet batch-controlled are registered immediately. Agents spawned afterwards are registered automatically via
add_object().- Parameters:
mode (
str) – Batch controller registry name (e.g."batch_omni","batch_differential") or a dotted import path to a customBatchKinematicControllersubclass (e.g."my_pkg.MyBatchController").- Return type:
- Returns:
The newly-created
BatchKinematicController.- Raises:
ValueError – If mode is not recognised.
- disable_batch()
Unregister all managed agents from the batch controller and detach it.
- Return type:
- add_object(obj)
Add an object to the manager.
- Parameters:
obj (
Agent) – SimObject or Agent instance to add- Return type:
- Returns:
None. The object is silently skipped (with a warning) when its
object_idis invalid (< 0), which typically means it was created without a sim_core.
Note
Objects are automatically registered to sim_core.sim_objects via SimObject.__init__(), so we don’t register them here again.
- remove_object(obj)
Remove an object from the manager’s tracking lists.
This only removes the object from the manager’s bookkeeping (
objects,body_ids,object_ids). It does not touchsim_coreor PyBullet — that is handled byMultiRobotSimulationCore.remove_object().Called automatically by
sim_core.remove_object()when this manager is registered viasim_core.register_manager().- Parameters:
obj (
Agent) – The object to remove.- Return type:
- Returns:
True if the object was found and removed, False otherwise.
- spawn_agents_grid(num_agents, grid_params, spawn_params)
Spawn agents in a grid. Alias for
spawn_objects_grid().- Return type:
List[Agent]
- spawn_agents_grid_mixed(num_agents, grid_params, spawn_params_list)
Spawn mixed agent types. Alias for
spawn_grid_mixed().- Return type:
List[Agent]
- spawn_agent_grid_counts(grid_params, spawn_params_count_list)
Spawn exact counts per type. Alias for
spawn_grid_counts().- Return type:
List[Agent]
- set_goal_pose(agent_index, goal)
Set goal pose for a specific agent.
- Parameters:
agent_index (
int) – Index of agent in self.objects listgoal (
Pose) – Target Pose
- set_goal_pose_by_body_id(body_id, goal)
Set goal pose for an agent by its PyBullet body ID.
Note
This method uses PyBullet’s internal body_id for compatibility with direct PyBullet API usage. For manager-level operations, prefer using set_goal_pose_by_object_id() instead.
- Parameters:
body_id (
int) – PyBullet body IDgoal (
Pose) – Target Pose
- set_goal_pose_by_object_id(object_id, goal)
Set goal pose for an agent by its simulation object ID.
This is the preferred method for manager-level operations, as it uses the simulation’s unique object_id rather than PyBullet’s internal body_id.
- Parameters:
object_id (
int) – Simulation object ID (from SimObject.object_id)goal (
Pose) – Target Pose
- stop_all()
Stop all agents and clear their goals.
- set_goal_pose_all(goal_factory)
Set goal pose for all agents using a factory function.
This helper method simplifies bulk goal assignment by applying a factory function to each agent. The factory can use agent-specific data (current pose, user_data, etc.) to create customized goals.
- Parameters:
goal_factory (
Callable[[Agent],Pose]) – Function that takes an Agent and returns a Pose- Return type:
Example:
# Set all agents to move to their designated home positions def get_home_goal(robot): return robot.user_data['home_position'] manager.set_goal_pose_all(get_home_goal) # Move all agents 5 meters forward from their current position def move_forward(robot): current_pos = robot.get_pose().position return Pose.from_xyz(current_pos[0] + 5, current_pos[1], current_pos[2]) manager.set_goal_pose_all(move_forward)
Note
This is a convenience wrapper around individual set_goal_pose calls. No performance benefit over explicit loops, but improves code clarity.
- set_joints_targets_all(targets_factory, max_force=500.0)
Set joint targets for all agents using a factory function.
This helper method simplifies bulk joint control by applying a factory function to each agent. Useful for coordinated multi-robot arm movements.
- Parameters:
- Return type:
Example:
# Set all robot arms to neutral position def get_neutral_joints(robot): return [0.0, 0.0, 0.0, 0.0] manager.set_joints_targets_all(get_neutral_joints) # Set custom positions based on robot index def get_custom_joints(robot): idx = robot.user_data.get('index', 0) return [idx * 0.1, 0.0, 0.0, 0.0] manager.set_joints_targets_all(get_custom_joints, max_force=1000.0)
Note
Only applicable to agents with controllable joints (robot arms, etc.). Mobile robots without arm joints will ignore this command.
- get_moving_count()
Get number of agents currently moving.
- Return type:
- add_action_sequence_all(action_factory)
Apply action sequence to all agents using a factory function.
This helper method simplifies bulk action assignment by applying a factory function to each agent. The factory can use agent-specific data (pose, user_data, etc.) to create customized action sequences.
- Parameters:
action_factory (
Callable[[Agent],List[Any]]) – Function that takes an Agent and returns a list of Actions- Return type:
Example:
def create_pick_drop_sequence(robot): target = robot.user_data['target_object'] return [ PickAction(target_object_id=target.body_id), MoveAction(path=Path.from_positions([[10, 10, 0]])), DropAction(drop_pose=Pose(position=[10, 10, 0])) ] # Apply to all agents at once manager.add_action_sequence_all(create_pick_drop_sequence)
Note
This is a convenience wrapper. Internally it still loops through agents, so there’s no performance benefit over explicit loops. The main advantage is code clarity and reduced boilerplate.
- add_action_all(action_factory)
Add a single action to all agents using a factory function.
Similar to add_action_sequence_all, but adds a single action instead of a sequence.
- Parameters:
action_factory (
Callable[[Agent],Any]) – Function that takes an Agent and returns a single Action- Return type:
Example:
def create_move_action(robot): goal_pos = robot.user_data['next_position'] return MoveAction(path=Path.from_positions([goal_pos])) manager.add_action_all(create_move_action)
- register_callback(callback, frequency=None)
Register a callback for custom agent update logic.
Multiple callbacks can be registered, each with their own frequency. This method automatically registers with sim_core if available.
The callback provides access to the AgentManager instance, making it easy to: - Access all agents via manager.objects - Use manager methods (set_goal_pose, query states, etc.) - Store manager-level state in agent.user_data
- The callback should have signature:
callback(manager: AgentManager, dt: float) -> None
- Parameters:
Example (Goal management):
def goal_update_logic(manager, dt): for agent in manager.objects: if not agent.is_moving: # Set new goal for stopped agents new_goal = calculate_next_goal(agent) agent.set_goal_pose(new_goal) manager.register_callback(goal_update_logic, frequency=4.0)
Example (State tracking):
def state_tracker(manager, dt): for agent in manager.objects: # Track agent statistics if 'total_distance' not in agent.user_data: agent.user_data['total_distance'] = 0.0 agent.user_data['total_distance'] += np.linalg.norm(agent.velocity) * dt manager.register_callback(state_tracker, frequency=10.0)
Comparison with sim_core.register_callback():
AgentManager callback: Provides manager reference and filtered agent list
sim_core callback: Provides sim_core reference for all objects
Use AgentManager for agent-specific operations
Use sim_core for general simulation-wide operations
- setup_camera(camera_config=None)
Setup camera to view all managed objects in the simulation.
This method automatically extracts positions from all managed objects and configures the camera to fit them in the view.
- Parameters:
camera_config (
Optional[Dict]) – Dictionary with camera settings (from yaml config). If None, uses default settings.- Return type:
Example:
# Basic usage with default settings agent_manager.setup_camera() # With custom camera config camera_config = { 'camera_mode': 'auto', 'camera_view_type': 'top_down', 'camera_auto_scale': 0.8 } agent_manager.setup_camera(camera_config)
Note
Requires sim_core to be set during AgentManager initialization. Only works when GUI is enabled.
pybullet_fleet.config_utils module
config_utils.py Utility functions for loading and merging configuration files.
- pybullet_fleet.config_utils.load_yaml_config(config_path)
Load a YAML config file and return parsed contents.
General-purpose YAML loader. Works for any YAML config file (simulation, bridge, spawn definitions, etc.).
- Parameters:
config_path (
str) – Path to YAML config file.- Return type:
- Returns:
Parsed config dict.
- Raises:
FileNotFoundError – If config_path does not exist.
- pybullet_fleet.config_utils.merge_configs(base_config, override_config)
Recursively merge two configuration dictionaries.
Values in override_config take precedence over base_config.
- pybullet_fleet.config_utils.load_config(config_paths)
Load and merge configuration from one or more YAML files.
If multiple paths are provided, later configs override earlier ones. Delegates to
load_yaml_config()for each file.- Parameters:
config_paths (
Union[str,List[str]]) – Single path string or list of paths to configuration files. Later configs in the list override earlier ones.- Return type:
- Returns:
Merged configuration dictionary
Examples
# Single config config = load_config(‘config.yaml’)
# Multiple configs (100robots_config overrides config.yaml) config = load_config([‘config.yaml’, ‘100robots_config.yaml’])
- pybullet_fleet.config_utils.resolve_class(dotted_path)
Import and return a class from a dotted Python path.
- Parameters:
dotted_path (
str) – Fully-qualified class path, e.g."pybullet_fleet.plugins.workcell_plugin.WorkcellPlugin".- Return type:
- Returns:
The resolved class object.
- Raises:
ValueError – If dotted_path is not a valid
module.ClassNamestring.ModuleNotFoundError – If the module cannot be imported.
AttributeError – If the class is not found in the module.
- pybullet_fleet.config_utils.dataclass_from_dict(cls, config, *, aliases=None, converters=None, strict_keys=None)
Construct a dataclass instance from a config dict via field introspection.
For each
init=Truefield in cls, look up the corresponding config key (or its alias) and apply an optional converter. Fields with defaults are optional; fields without defaults are required.Enum fields are auto-wrapped: if the config value is a
strand the field type is anEnumsubclass, the value is converted viaEnumCls(value).- Parameters:
config (
Dict[str,Any]) – Dict of config values (typically from YAML).aliases (
Optional[Dict[str,str]]) –{field_name: config_key}— alternative config keys. For example{"camera_config": "camera"}reads the"camera"key from config into thecamera_configfield.converters (
Optional[Dict[str,Callable]]) –{field_name: callable}— per-field converters. Called asconverter(raw_value)before assignment. A converter returningNoneskips the field (useful for conditionalNone-passthrough, e.g.collision_detection_method).strict_keys (
Optional[List[str]]) – If provided, only these fields are read from config. All other fields use their dataclass defaults.
- Return type:
TypeVar(T)- Returns:
An instance of cls.
- Raises:
ValueError – If a required field (no default) is missing from config.
Example:
@dataclass class MyParams: x: float = 0.0 mode: MyEnum = MyEnum.A params = dataclass_from_dict(MyParams, {"x": 1.5, "mode": "b"}) assert params.x == 1.5 assert params.mode == MyEnum.B
- pybullet_fleet.config_utils.forward_spawn_params(factory_method, spawn_params, *, aliases=None, extra_kwargs=None)
Build kwargs for a factory method from a dataclass spawn_params.
Introspects the factory_method’s signature and forwards matching fields from spawn_params. Fields named differently can be mapped via aliases (e.g.
{"initial_pose": "pose"}).- Parameters:
factory_method (
Callable) – Target class method (e.g.Agent.from_urdf).spawn_params – Dataclass instance with spawn parameters.
aliases (
Optional[Dict[str,str]]) –{spawn_field: factory_param}— field name remapping.extra_kwargs (
Optional[Dict[str,Any]]) – Additional kwargs to merge (e.g.{"sim_core": sc}).
- Return type:
- Returns:
Dict of keyword arguments ready for
factory_method(**result).
Example:
kwargs = forward_spawn_params( Agent.from_urdf, spawn_params, aliases={"initial_pose": "pose"}, extra_kwargs={"sim_core": sim}, ) agent = Agent.from_urdf(**kwargs)
pybullet_fleet.core_simulation module
core_simulation.py Reusable core simulation logic for multi-robot PyBullet environments. Integrates generation, management, transport, attach/detach, collision detection, coordinate conversion, occupied judgment, transport path generation, debugging, monitoring, and log control for various robots, pallets, and meshes.
- class pybullet_fleet.core_simulation.SimulationParams(target_rtf=1.0, timestep=0.1, duration=0, gui=True, physics=False, monitor=True, enable_monitor_gui=True, collision_check_frequency=None, log_level='warn', max_steps_per_frame=10, gui_min_fps=30, enable_collision_shapes=False, enable_structure_transparency=False, enable_shadows=True, enable_gui_panel=False, ignore_static_collision=True, enable_time_profiling=False, profiling_interval=100, enable_memory_profiling=False, enable_collision_color_change=False, spatial_hash_cell_size_mode=SpatialHashCellSizeMode.AUTO_INITIAL, spatial_hash_cell_size=None, collision_detection_method=None, collision_margin=0.02, multi_cell_threshold=1.5, enable_floor=True, camera_config=None, model_paths=None, window_width=1024, window_height=768, monitor_width=200, monitor_height=290, monitor_x=-1, monitor_y=-1)
Bases:
objectSimulation configuration parameters.
All fields have sensible defaults for headless kinematics-based simulation. Use
from_config/from_dictto load from YAML files.- target_rtf: float = 1.0
- timestep: float = 0.1
- duration: float = 0
- gui: bool = True
- physics: bool = False
- monitor: bool = True
- enable_monitor_gui: bool = True
- log_level: str = 'warn'
- max_steps_per_frame: int = 10
- gui_min_fps: int = 30
- enable_collision_shapes: bool = False
- enable_structure_transparency: bool = False
- enable_shadows: bool = True
- enable_gui_panel: bool = False
- ignore_static_collision: bool = True
- enable_time_profiling: bool = False
- profiling_interval: int = 100
- enable_memory_profiling: bool = False
- enable_collision_color_change: bool = False
- spatial_hash_cell_size_mode: SpatialHashCellSizeMode = 'auto_initial'
- collision_detection_method: CollisionDetectionMethod | None = None
- collision_margin: float = 0.02
- multi_cell_threshold: float = 1.5
- enable_floor: bool = True
- window_width: int = 1024
- window_height: int = 768
- monitor_width: int = 200
- monitor_height: int = 290
- monitor_x: int = -1
- monitor_y: int = -1
- classmethod from_config(config_path='config.yaml')
- Return type:
SimulationParams
- class pybullet_fleet.core_simulation.MultiRobotSimulationCore(params, collision_color=None)
Bases:
object- classmethod from_yaml(yaml_path='config.yaml')
Create and initialise a simulation from a YAML config file.
Supports two YAML layouts:
Nested (recommended) —
simulation,world, andentitiessections:simulation: gui: true physics: false world: world_file: "/path/to/office.world" entities: - name: robot0 urdf_path: "robots/mobile_robot.urdf" pose: [0, 0, 0.05] - type: sim_object name: box0 visual_shape: shape_type: box half_extents: [0.3, 0.3, 0.1] pose: [3, 0, 0.1]
Flat (legacy) — all keys at the top level:
gui: true physics: false timestep: 0.1
Custom entity types — register classes via dotted Python paths so
type:dispatch works without any Python code:entity_classes: forklift: "my_ros_pkg.entities.ForkliftAgent" conveyor: "warehouse_sim.conveyor.ConveyorObject" entities: - type: forklift urdf_path: robots/forklift.urdf pose: [0, 0, 0.05]
When a
worldsection is present andenable_flooris not explicitly set in the simulation config, the floor plane is automatically disabled (the world geometry replaces it).- Parameters:
yaml_path (
str) – Path to YAML config file.- Return type:
MultiRobotSimulationCore- Returns:
Fully initialised
MultiRobotSimulationCorewith world loaded and robots spawned (if configured).
- classmethod from_dict(config)
Create MultiRobotSimulationCore from a configuration dictionary.
Supports both flat simulation-params dicts and nested dicts with
simulation,world, andentitiessections. When nested keys are present they are processed the same way asfrom_yaml().An optional
entity_classessection maps names to dotted Python paths for dynamic class registration (seefrom_yaml()for details).
- sim_time: float
- events: EventBus
- setup_monitor()
- Return type:
- property sim_objects: List[SimObject]
List of all simulation objects (Agent, SimObject, etc.).
- property agents: List[Agent]
List of Agent instances.
- property params: SimulationParams
Simulation parameters.
- property step_count: int
Current simulation step count.
- property collision_count: int
Total number of collisions detected.
- property enable_memory_profiling: bool
Whether memory profiling is enabled.
- register_plugin(plugin_or_cls, config=None, frequency=None)
Register a simulation plugin.
- Parameters:
plugin_or_cls (
Union[TypeVar(_SimPluginT, bound= SimPlugin),Type[TypeVar(_SimPluginT, bound= SimPlugin)]]) – Either aSimPlugininstance or aSimPluginsubclass (will be instantiated).config (
Optional[Dict[str,Any]]) – Config dict (only used when plugin_or_cls is a class).frequency (
Optional[float]) – Step frequency in Hz (None= every step).
- Return type:
TypeVar(_SimPluginT, bound= SimPlugin)- Returns:
The registered
SimPlugininstance.- Raises:
TypeError – If plugin_or_cls is not a SimPlugin.
- property plugins: List[SimPlugin]
Read-only list of active plugins.
- register_callback(callback, frequency=None)
Register a callback function to be called during simulation.
- Parameters:
- Return type:
Example:
def my_callback(sim_core, dt): for obj in sim_core.sim_objects: if isinstance(obj, Agent): # Update agent logic pass sim_core.register_callback(my_callback, frequency=10.0) # 10 Hz
- register_manager(manager)
Register a
SimObjectManager(or subclass) with this simulation.Registered managers are automatically synchronised when objects are removed via
remove_object()or the simulation isreset(). The manager’ssim_coreattribute is set to self.A manager can only be registered once; duplicate calls are ignored.
- Parameters:
manager (
SimObjectManager) – The manager instance to register.- Return type:
Example:
from pybullet_fleet.agent_manager import AgentManager mgr = AgentManager() sim.register_manager(mgr) mgr.spawn_agents_grid(10, grid_params, spawn_params) # When sim.remove_object(agent) is called later, # mgr.objects is kept in sync automatically.
- unregister_manager(manager)
Remove a manager from the registry.
- Return type:
- Returns:
True if the manager was found and removed, False otherwise.
- get_manager(name)
Return the registered
AgentManagerwith the given name, orNone.- Parameters:
name (
str) – The name assigned to the manager (name:field in YAML, or thename=argument passed toAgentManager).- Return type:
Example:
sim = MultiRobotSimulationCore.from_yaml("config.yaml") fleet = sim.get_manager("delivery_fleet") for agent in fleet.objects: agent.set_path([goal])
- unregister_callback(callback_func)
Remove a registered callback by function reference.
- start_recording(output='recording.gif', width=800, height=600, fps=15, duration=None, camera_mode='auto', time_base='sim', **kwargs)
Start recording the simulation as an animated GIF or MP4.
Frames are captured via
getCameraImage(ER_TINY_RENDERER)at the specified fps frequency. Works in bothp.DIRECTandp.GUI.Output format is auto-detected from the file extension:
.gif— Animated GIF via Pillow (always available).mp4— H.264 MP4 via imageio + pyav (requirespip install imageio[pyav])
Time bases:
"sim"(default) — Capture at fixed sim-time intervals. Playback always looks like 1× speed."real"— Capture at wall-clock intervals. Playback reflects actual execution speed (e.g. 10× sim → 10× video).
- Parameters:
output (
str) – Output file path (.gifor.mp4). Parent directories created automatically.width (
int) – Frame width in pixels.height (
int) – Frame height in pixels.fps (
int) – Capture frequency and playback FPS.duration (
Optional[float]) – Recording duration in sim-seconds (time_base="sim") or wall-clock seconds (time_base="real").Nonefor manual stop.camera_mode (
str) –"auto"|"gui"|"orbit"|"manual"."auto"is promoted to"gui"when running in GUI mode.time_base (
str) –"sim"or"real". Controls capture timing.**kwargs (
Any) – Passed toSimulationRecorder(e.g.orbit_degrees,camera_params).
- Return type:
- Returns:
SimulationRecorderinstance.
Example:
sim.start_recording("output.gif", duration=4.0) sim.start_recording("output.mp4", duration=4.0, fps=30) sim.start_recording("perf.mp4", duration=4.0, time_base="real") sim.run_simulation(duration=5.0) # File is auto-saved when run_simulation ends
- stop_recording()
Stop recording and save the output file.
- set_collision_check_frequency(frequency=None)
Set the frequency (Hz, number of times per second) for collision detection.
- Parameters:
frequency (
Optional[float]) – Collision check frequency in Hz (times per second) - None: Check every step (highest accuracy, highest cost) - 0: Disable collision checking (no collision detection) - > 0: Check at specified frequency (e.g., 1.0 = once per second)- Return type:
Example:
sim.set_collision_check_frequency(None) # Every step sim.set_collision_check_frequency(1.0) # 1 Hz (once per second) sim.set_collision_check_frequency(10.0) # 10 Hz sim.set_collision_check_frequency(0) # Disable
Note: Higher frequency increases computational cost but improves collision detection accuracy.
- setup_pybullet()
Initialize PyBullet with GUI panels hidden.
- Return type:
- disable_rendering()
Disable rendering during object spawning for better performance.
- Return type:
- enable_rendering()
Enable rendering before starting simulation.
- Return type:
- batch_spawn()
Context manager for optimised batch spawning.
Disables rendering and defers spatial-grid rebuild until the context exits. Use when spawning many objects at once.
Rendering state is saved on entry and restored on exit. If rendering was already disabled (e.g. during the initial setup phase before
run_simulation()), it stays disabled afterbatch_spawn()exits.Example:
with sim_core.batch_spawn(): for params in params_list: Agent.from_params(params, sim_core) # rendering restored to previous state, spatial grid rebuilt once
- set_poses(object_ids, positions, orientations, *, preserve_velocity=True)
Set poses for N objects in one call (batch / high-throughput path).
Use this when poses are already in array form (ROS
PoseArray, vectorized controllers, NumPy pipelines). For single-object writes preferSimObject.set_pose()with aPoseobject — this method intentionally takes raw ndarrays to avoid per-objectPoseallocations on the hot path.Internally honours the same two-phase semantics as
SimObject.set_pose: insidestep_once()the writes are buffered to be flushed in Phase 2, outside they propagate to PyBullet immediately.get_pose/get_posesalways reflect the new values regardless of phase.- Parameters:
object_ids (
Sequence[int]) – N object ids to write to (in matching order).positions (
ndarray) –(N, 3)float array of XYZ positions.orientations (
ndarray) –(N, 4)float array of XYZW quaternions.preserve_velocity (
bool) – If True, preserve current velocities (dynamic objects only; ignored for kinematic objects).
- Raises:
ValueError – if shapes don’t match
(N, 3)/(N, 4)forN = len(object_ids).KeyError – if any
object_idis not registered in this sim.
- Return type:
- get_poses(object_ids)
Return
(positions, orientations)arrays for N objects (batch read).Counterpart of
set_poses(). Returns raw ndarrays for direct use in vectorized math, ROS message construction, or controller state. For single-object reads preferSimObject.get_pose()which returns aPoseobject.Reads each object’s cached pose (no PyBullet API call), so values are always consistent with the two-phase semantics — buffered writes made during the current step are visible immediately.
- set_collision_spatial_hash_cell_size_mode(mode=None, cell_size=None)
Set collision detection spatial hash cell size mode and recalculate/rebuild grid.
This method allows dynamic mode switching and always recalculates the cell_size based on the spatial_hash_cell_size_mode setting (or override via parameters).
- Parameters:
- Return type:
- Returns:
The calculated cell_size (always > 0, falls back to 1.0 if no AABBs available)
Example:
# Recalculate with current mode sim_core.set_collision_spatial_hash_cell_size_mode() # Switch to constant mode with specific size sim_core.set_collision_spatial_hash_cell_size_mode( mode=SpatialHashCellSizeMode.CONSTANT, cell_size=3.0 ) # Switch to auto_adaptive mode sim_core.set_collision_spatial_hash_cell_size_mode( mode=SpatialHashCellSizeMode.AUTO_ADAPTIVE )
- add_object(obj)
Add an object to simulation and register it with all necessary caches.
This method centralizes object registration logic for consistency and maintainability. Called automatically by SimObject.__init__() when sim_core is provided.
- Parameters:
obj (
SimObject) – The SimObject instance to add- Return type:
Example:
# Manual addition (not recommended - SimObject does this automatically) obj = SimObject(body_id=body_id, sim_core=None) sim_core.add_object(obj) # Automatic addition (recommended) obj = SimObject(body_id=body_id, sim_core=sim_core) # Calls add_object() internally
- remove_object(obj)
Remove an object from simulation and clean up all caches.
- Parameters:
obj (
SimObject) – The SimObject instance to remove- Return type:
Example:
obj = sim_core.spawn_robot(...) # ... later ... sim_core.remove_object(obj)
- configure_visualizer(enable_structure_transparency=None, enable_shadows=None)
Configure PyBullet visualizer settings with keyboard control.
- Parameters:
- Return type:
Custom keyboard shortcuts (active during simulation): - Press SPACE to pause/play simulation - Press ‘t’ to toggle structure transparency ON/OFF
Camera controls (always active in GUI mode): - Right-drag to pan camera - Press ‘=’ to zoom in, ‘-’ to zoom out - Press ‘o’ for top-down view
PyBullet built-in keyboard shortcuts (always available): - Press ‘w’ to toggle wireframe / collision shape display - Press ‘g’ to toggle grid display - Press ‘j’ to toggle joint axes display
- compute_scene_bounds()
Compute scene bounding box from all sim_object positions.
Returns a
(center, extent)tuple where center is the mean position[cx, cy, cz]and extent is the axis-aligned size[ex, ey, ez](i.e.max - minper axis).When the scene is empty both lists are
[0.0, 0.0, 0.0].This is used by
setup_camera()(GUI) andSimulationRecorder(offscreen) so that both share the same scene-framing logic.
- setup_camera(camera_config=None, entity_positions=None)
Set up camera view based on configuration.
Valid
camera_modevalues:"auto"(default) — Auto-calculates camera distance and target from entity positions orsim_objects."manual"— Uses explicitcamera_distance,camera_yaw,camera_pitch, andcamera_targetfrom the config.
Interactive controls (right-drag pan, +/- zoom, o=top-down) are always enabled in GUI mode regardless of camera_mode.
- get_aabbs()
Get AABBs for all simulation objects.
- filter_aabb_pairs(ignore_static=None, moved_objects=None, return_profiling=False)
Filter AABB pairs for collision detection with moved-based optimization. Only checks pairs involving moved objects.
- Parameters:
ignore_static (
Optional[bool]) – If True, skip collision checks with static objectsmoved_objects (
Optional[Set[int]]) – Set of object IDs that moved since last check. If None, uses self._moved_this_step (default behavior)return_profiling (
bool) – If True, include timing information in return value
- Returns:
pairs: List of (obj_id_i, obj_id_j) tuples for candidate collision pairs
timings: dict with get_aabbs, spatial_hashing, aabb_filtering (in ms) if return_profiling=True, else None
- Return type:
Tuple of (pairs, timings)
Example:
# Without profiling pairs, _ = sim.filter_aabb_pairs() # With profiling pairs, timings = sim.filter_aabb_pairs(return_profiling=True) print(f"AABB filtering took {timings['total']:.2f}ms")
- check_collisions(collision_color=None, ignore_static=None, return_profiling=False)
Check for collisions between robots using incremental updates.
Only re-checks collision pairs involving moved objects for efficiency. Maintains _active_collision_pairs as persistent state.
- Parameters:
- Returns:
pairs: List of currently active collision pairs (obj_id_i, obj_id_j)
timings: dict with get_aabbs, spatial_hashing, aabb_filtering, contact_points, total (in ms) if return_profiling=True, else None
- Return type:
Tuple of (pairs, timings)
Example:
# Without profiling active_pairs, _ = sim.check_collisions() # With profiling active_pairs, timings = sim.check_collisions(return_profiling=True) print(f"Collision check took {timings['total']:.2f}ms")
- get_active_collision_pairs()
Get currently active collision pairs.
- Return type:
- Returns:
List of (object_id_i, object_id_j) tuples for all active collisions. Pairs are sorted such that object_id_i < object_id_j.
Example:
# Check if any collisions are active if sim_core.get_active_collision_pairs(): print("Collisions detected!") # Get detailed collision info for obj_id_i, obj_id_j in sim_core.get_active_collision_pairs(): obj_i = sim_core._sim_objects_dict[obj_id_i] obj_j = sim_core._sim_objects_dict[obj_id_j] print(f"Collision: {obj_i} <-> {obj_j}")
- update_monitor()
Update simulation monitor with current statistics.
Calculates actual simulation speed from a 10-second sliding window, logs monitor data via logger, and writes to DataMonitor if enabled.
- Return type:
- initialize_simulation()
Initialize simulation state before running.
This method prepares the simulation for execution by: - Resetting simulation counters (step_count, collision_count, sim_time) - Clearing movement tracking state - Configuring visualizer settings (transparency, collision shapes) - Enabling rendering for GUI mode
Called automatically by run_simulation(), but can also be called manually for custom simulation loops or when restarting simulation.
Example:
# Automatic (recommended) sim.run_simulation(duration=10.0) # Manual initialization for custom loop sim.initialize_simulation() while custom_condition: sim.step_once()
- Return type:
- reset()
Reset simulation to a clean state (no objects, fresh PyBullet world).
Removes all objects from the simulation, resets PyBullet, re-applies physics-engine parameters, optionally reloads the ground plane (only when
enable_flooris True), and re-initialises counters viainitialize_simulation().After calling
reset()the simulation is ready for new objects to be spawned. Callbacks registered viaregister_callback()are preserved (theirlast_execis zeroed byinitialize_simulation).Example:
sim.reset() # All objects removed, PyBullet world cleared agent = Agent.from_params(params, sim) sim.step_once()
- Return type:
- load_world(world_config, skip_models=None)
Load world environment from a configuration dictionary.
Supports three sources (checked in priority order):
world_file— a Gazebo.worldfile with<include>elements (buildings, furniture, props).sdf— a single SDF model file (building geometry).mesh_dir— a directory of OBJ/STL mesh files.
- Parameters:
world_config (
Dict[str,Any]) –Dictionary with world loading options:
{ "world_file": "/path/to/office.world", "skip_models": ["TinyRobot", "TeleportDispenser"], } or:: {"sdf": "/path/to/model.sdf"} or:: {"mesh_dir": "/path/to/meshes/"}
skip_models (
Optional[List[str]]) – Model names to skip when loading a.worldfile (e.g. robot names that are spawned separately).
- Return type:
List[SimObject]- Returns:
List of
SimObjectinstances created from the world.
Example:
sim = MultiRobotSimulationCore(SimulationParams(enable_floor=False)) sim.initialize_simulation() objs = sim.load_world({"world_file": "/maps/office.world"})
- spawn_robots_from_config(robots_config)
Spawn robots/entities from a YAML-style configuration list.
Each entry in robots_config is a dict understood by
AgentSpawnParams.from_dict()(or the appropriateSpawnParamsfor the entitytype). SDF models are automatically converted to temporary URDF files viaresolve_sdf_to_urdf(), and model names are resolved throughresolve_model().Grid spawning: If an entry contains a
gridkey, the entry is expanded into multiple entities placed on a grid. Thegriddict is parsed byfrom_dict()and supports two formats:Count-based (convenience):
{ "urdf_path": "robots/mobile_robot.urdf", "motion_mode": "omnidirectional", "grid": { "count": 50, "spacing": [3.0, 3.0], "offset": [0, 0, 0.05], "columns": 10, # optional }, }
Range-based (explicit grid extents):
{ "urdf_path": "robots/mobile_robot.urdf", "grid": { "x_min": 0, "x_max": 9, "y_min": 0, "y_max": 4, "spacing": [3.0, 3.0, 0.0], "offset": [0, 0, 0.05], }, }
Grid entries create a manager internally and register it with this simulation, so objects are automatically synchronised on
remove_object()andreset().Batch processing: Grid entries accept two keys for batch control:
batch_controller— batch controller registry name ("batch_differential","batch_omni") or a dotted"module.ClassName"path to a custom batch controller. Shorthand: creates an unnamedAgentManagerwith a batch controller for this group:{ "urdf_path": "robots/simple_cube.urdf", "motion_mode": "differential", "batch_controller": "batch_differential", "grid": {"count": 100, "spacing": [2, 2]}, }
manager— name of a manager declared in themanagers:config section. All entities in this group are added to that manager:{ "urdf_path": "robots/simple_cube.urdf", "manager": "delivery_fleet", "grid": {"count": 50, "spacing": [2, 2]}, }
Entity type dispatch: If an entry contains a
typekey (e.g."agent","sim_object", or a custom registered type), the corresponding class from the entity registry is used. Default is"agent".- Parameters:
robots_config (
List[Dict[str,Any]]) – List of robot/entity parameter dicts.- Return type:
List[SimObject]- Returns:
List of spawned
SimObject(or subclass) instances.
Example:
sim.spawn_robots_from_config([ # Individual robot {"urdf_path": "robots/mobile_robot.urdf", "pose": [0, 0, 0.05]}, # Grid with batch controller (shorthand) { "urdf_path": "robots/mobile_robot.urdf", "motion_mode": "omnidirectional", "batch_controller": "batch_omni", "grid": {"count": 20, "spacing": [3, 3]}, }, # Grid assigned to a named manager { "urdf_path": "robots/mobile_robot.urdf", "manager": "delivery_fleet", "grid": {"count": 50, "spacing": [2, 2]}, }, # Custom entity type {"type": "forklift", "urdf_path": "robots/forklift.urdf", "pose": [10, 0, 0]}, ])
- run_simulation(duration=None)
Run the simulation for a specified duration.
- Parameters:
duration (
Optional[float]) – Simulation duration in seconds (simulation time, not real time). If None, uses self._params.duration. If duration <= 0, runs indefinitely.- Return type:
Example:
# Run for 10 seconds (simulation time) sim.run_simulation(duration=10.0) # Run indefinitely (until Ctrl+C or GUI closed) sim.run_simulation(duration=0)
- record_profiling(name, value_ms)
Record a custom profiling measurement (in milliseconds).
Call this from within Agent.update() or callback code to accumulate timing data. Multiple calls per step are summed (e.g., 100 agents each recording their custom_logic time). Auto-registers the field on first use.
The recorded field appears in both _print_profiling_summary() output and step_once(return_profiling=True) results alongside built-in fields.
- Parameters:
- Return type:
Example:
# Inside your custom Agent.update(): t0 = time.perf_counter() self._do_custom_logic(dt) self.sim_core.record_profiling('custom_logic', (time.perf_counter() - t0) * 1000)
- pause()
Pause the simulation. step_once will be skipped while paused.
- Return type:
- resume()
Resume the simulation after a pause.
- Return type:
- property is_paused: bool
Return True if the simulation is currently paused.
- step_once(return_profiling=False)
Execute one simulation step.
Performs agent updates, callbacks, physics step, collision detection, and monitoring. If enable_time_profiling is True, detailed timing information is logged.
- Parameters:
return_profiling (
bool) – If True, return timing breakdown dictionary instead of logging. Useful for external profiling tools.- Return type:
- Returns:
If return_profiling=True, returns dict with timing breakdown in milliseconds:
{ 'phase1_update': float, # agent.update + callbacks + plugin on_step 'phase2_pose_flush': float, # flush buffered poses to PyBullet 'phase3_aabb_grid_flush': float, # kinematic AABB + spatial-grid refresh 'agent_update': float, # per-object update loop (subset of phase1; compat) 'callbacks': float, 'step_simulation': float, # p.stepSimulation(); non-zero only if physics=True 'collision_check': float, 'collision_breakdown': { # present only when collision ran 'get_aabbs': float, 'spatial_hashing': float, 'aabb_filtering': float, 'contact_points': float, # narrow phase; non-zero only if physics=True 'total': float, }, 'events_pre_step': float, 'events_post_step': float, 'events_collision': float, 'monitor_update': float, 'total': float, }
Otherwise returns None.
Performance note: time.perf_counter() calls have negligible overhead (<0.1% for 10k objects). The profiling measurements themselves do not significantly impact simulation performance.
- get_memory_usage()
Get current memory usage (requires enable_memory_profiling=True).
- Return type:
- Returns:
Dictionary with current and peak memory usage in MB, or None if not enabled. Keys: ‘current_mb’, ‘peak_mb’
Note
Returns Python heap memory tracked by tracemalloc, NOT RSS (process memory).
tracemalloc tracks Python object allocations, not C extensions or OS-level memory.
For total process memory (RSS), use: psutil.Process().memory_info().rss
Peak is cumulative since last reset_peak() call (interval-based in profiling).
Example:
>>> sim = MultiRobotSimulationCore.from_dict(config) >>> # ... run simulation with enable_memory_profiling=True ... >>> mem = sim.get_memory_usage() >>> if mem: >>> print(f"Python heap: {mem['current_mb']:.2f} MB, Peak: {mem['peak_mb']:.2f} MB")
pybullet_fleet.data_monitor module
Real-time data monitor window for PyBullet simulation Shows simulation statistics in a separate tkinter window
- class pybullet_fleet.data_monitor.DataMonitor(title='Simulation Monitor', enable_gui=True, width=200, height=290, x=-1, y=-1)
Bases:
objectReal-time data monitoring window
- start()
Start the monitor window in a separate thread
- Return type:
- stop()
Stop the monitor window
- Return type:
- write_data(sim_data)
Write simulation data to shared file (called from main simulation).
Uses atomic write (write to temp file + os.replace) to prevent the reader from seeing a truncated / partially-written JSON file.
- pybullet_fleet.data_monitor.main()
Run standalone data monitor
pybullet_fleet.geometry module
Geometric primitives: Pose, Path, and quaternion slerp utilities.
- pybullet_fleet.geometry.Quat
Quaternion in
(x, y, z, w)convention — same as PyBullet.
- pybullet_fleet.geometry.ScalarOrAxes
Scalar limit or per-axis sequence (used for kinematic limits and similar).
- pybullet_fleet.geometry.is_scalar(v)
Return
Trueif v should be treated as a scalar (broadcastable) value.Accepts Python
int/floator a 0-d numpy array. Anything else (list, tuple, 1-d ndarray, …) is treated as per-axis.- Return type:
- pybullet_fleet.geometry.as_axes(v, dim=3)
Coerce v to a
(dim,)numpy array, broadcasting scalars.- Raises:
ValueError – If v is array-like but its shape is not
(dim,).- Return type:
- pybullet_fleet.geometry.projected_axis_limit(limit, direction_unit)
Direction-projected scalar limit from per-axis caps.
Given a per-axis cap
arrand a unit directiond, returns the largest scalarvsuch that|d[i]| * v <= arr[i]for every axis:v_max = 1 / max_i ( |d[i]| / arr[i] )
Used to convert per-axis kinematic limits (velocity or acceleration) into the equivalent scalar bound along a given travel direction.
- pybullet_fleet.geometry.clamp_vec_to_limit(vec, limit)
Clamp a vector against a scalar or per-axis cap.
Semantics depend on the shape of limit:
Scalar ⇒ Euclidean magnitude clamp: if
|vec| > limit, the vector is scaled down to lengthlimitwhile preserving direction. Acts as projection onto a sphere of radiuslimit.Per-axis ⇒ component-wise clamp: each
vec[i]is independently clipped to[-limit[i], +limit[i]]. Acts as projection onto an axis-aligned box.
Non-finite scalar limits (
math.inf) short-circuit and return vec unchanged.
- class pybullet_fleet.geometry.Pose(position, orientation=<factory>)
Bases:
objectPosition and orientation representation for any object in the simulation. Compatible with ROS2 geometry_msgs/Pose and PyBullet’s (position, orientation) tuples.
Supports both quaternion and Euler angle representations.
- position
[x, y, z] in world coordinates
- orientation
[x, y, z, w] quaternion (defaults to no rotation)
- property x: float
X coordinate.
- property y: float
Y coordinate.
- property z: float
Z coordinate.
- property roll: float
Roll angle in radians (rotation around x-axis).
- property pitch: float
Pitch angle in radians (rotation around y-axis).
- property yaw: float
Yaw angle in radians (rotation around z-axis).
- classmethod from_xyz(x, y, z)
Create Pose from x, y, z coordinates.
- classmethod from_euler(x, y, z, roll=0.0, pitch=0.0, yaw=0.0)
Create Pose from position and Euler angles.
- classmethod from_pybullet(position, orientation)
Create Pose from PyBullet position and orientation tuples.
- as_position_orientation()
Return (position, orientation) tuple for PyBullet.
- as_tuple()
Return (position, orientation) tuple. Alias for as_position_orientation() for convenience.
- as_euler()
Return orientation as Euler angles (roll, pitch, yaw).
- class pybullet_fleet.geometry.Path(waypoints)
Bases:
objectRepresents a path as a sequence of waypoints (Poses).
- waypoints
List of Pose objects representing the path
Example:
# Create from Pose list (recommended) path = Path([ Pose.from_xyz(0, 0, 0), Pose.from_xyz(1, 0, 0), Pose.from_xyz(1, 1, 0) ]) # Create from positions path = Path.from_positions([[0, 0, 0], [1, 0, 0], [1, 1, 0]]) # Create predefined shapes circle = Path.create_circle(center=[0, 0], radius=1.5) square = Path.create_square(center=[0, 0], side_length=2.0)
- waypoints: List[Pose]
- classmethod from_positions(positions, orientation=None)
Create Path from list of positions with optional shared orientation.
- Parameters:
- Return type:
Path- Returns:
Path instance
Example:
path = Path.from_positions([ [0, 0, 0], [1, 0, 0], [1, 1, 0] ])
- classmethod create_rectangle(center, width, height, rpy=[0.0, 0.0, 0.0])
Create a rectangular path in 3D.
- Parameters:
center (
List[float]) – [x, y, z] center position in world coordinateswidth (
float) – Width of rectangle (X direction in local frame)height (
float) – Height of rectangle (Y direction in local frame)rpy (
List[float]) – [roll, pitch, yaw] in radians. Defaults to [0, 0, 0]. When non-zero, the Z+ axis of the robot will be perpendicular to the rectangle plane.
- Return type:
Path- Returns:
Path instance with waypoints at rectangle corners
Example:
rect = Path.create_rectangle(center=[0, 0, 1], width=3.0, height=2.0) tilted_rect = Path.create_rectangle(center=[0, 0, 1], width=3.0, height=2.0, rpy=[0.1, 0.2, 0])
- classmethod create_square(center, side_length, rpy=[0.0, 0.0, 0.0])
Create a square path in 3D.
- Parameters:
- Return type:
Path- Returns:
Path instance with waypoints at square corners
Example:
square = Path.create_square(center=[0, 0, 0], side_length=2.0) tilted_square = Path.create_square(center=[0, 0, 1], side_length=2.0, rpy=[0.1, 0.2, 0])
Note:
This is a wrapper for create_rectangle with width == height.
- classmethod create_ellipse(center, radius_x, radius_y, num_points=16, rpy=[0.0, 0.0, 0.0], start_angle=0.0, end_angle=6.283185307179586)
Create an elliptical path in 3D.
- Parameters:
center (
List[float]) – [x, y, z] center position in world coordinatesradius_x (
float) – Radius along X axis in local frameradius_y (
float) – Radius along Y axis in local framenum_points (
int) – Number of waypoints along the arcrpy (
List[float]) – [roll, pitch, yaw] in radians. Defaults to [0, 0, 0].start_angle (
float) – Start angle [rad] in the local XY-planeend_angle (
float) – End angle [rad] in the local XY-plane
- Return type:
Path- Returns:
Path instance with waypoints along ellipse
Example:
ellipse = Path.create_ellipse(center=[0, 0, 0], radius_x=2.0, radius_y=1.0) tilted_ellipse = Path.create_ellipse(center=[0, 0, 1], radius_x=2.0, radius_y=1.0, rpy=[0, 0.3, 0])
- classmethod create_circle(center, radius, num_points=16, rpy=[0.0, 0.0, 0.0], start_angle=0.0, end_angle=6.283185307179586)
Create a circular path in 3D.
- Parameters:
center (
List[float]) – [x, y, z] center position in world coordinatesradius (
float) – Circle radiusnum_points (
int) – Number of waypoints along the arcrpy (
List[float]) – [roll, pitch, yaw] in radians. Defaults to [0, 0, 0].start_angle (
float) – Start angle [rad] in the local XY-planeend_angle (
float) – End angle [rad] in the local XY-plane
- Return type:
Path- Returns:
Path instance with waypoints along circle
Example:
circle = Path.create_circle(center=[0, 0, 0], radius=1.5) arc = Path.create_circle(center=[0, 0, 0], radius=1.5, start_angle=0, end_angle=np.pi)
Note:
This is a wrapper for create_ellipse with radius_x == radius_y.
- append(other)
Append waypoints from another path in-place.
Note:
This mutates the current Path. For a non-mutating version, use `combined = path1 + path2`.
- Return type:
- classmethod from_paths(paths)
Create a single Path by concatenating multiple paths.
Example:
circle_arc = Path.create_circle(center=[0, 0, 0.5], radius=2.0, num_points=16, start_angle=0.0, end_angle=np.pi/2) line = Path.from_positions([[1, 1, 0.5], [3, 1, 0.5]]) full = Path.from_paths([circle_arc, line])
- Return type:
Path
- get_total_distance()
Calculate total path length.
- Return type:
- Returns:
Total distance in meters
- visualize(show_lines=True, line_color=None, line_width=2.0, show_waypoints=False, show_axes=False, axis_length=0.3, show_points=False, point_size=0.05, lifetime=0.0)
Visualize path with lines and optional waypoint markers.
- Parameters:
show_lines (
bool) – If True, draw lines connecting waypointsline_color (
List[float]) – RGB color for path lines [r, g, b] (0-1 range), None for defaultline_width (
float) – Width of path linesshow_waypoints (
bool) – If True, show waypoint coordinate axes and pointsshow_axes (
bool) – If True, draw X, Y, Z axes at each waypointaxis_length (
float) – Length of coordinate axes in metersshow_points (
bool) – If True, draw spheres at waypoint positionspoint_size (
float) – Radius of waypoint spheres in meterslifetime (
float) – Debug line lifetime (0 = permanent until reset)
- Return type:
- Returns:
List of debug item IDs (for later removal if needed)
Example:
# Just show path lines path = Path.create_circle(center=[0, 0, 0.5], radius=2.0) path.visualize(line_color=[1, 0, 0]) # Show path with waypoint orientations path.visualize(show_waypoints=True, show_axes=True)
- visualize_waypoints(show_axes=True, axis_length=0.3, show_points=True, point_size=0.05, lifetime=0.0)
Visualize waypoints with coordinate axes for debugging.
- Parameters:
show_axes (
bool) – If True, draw X, Y, Z axes at each waypointaxis_length (
float) – Length of coordinate axes in metersshow_points (
bool) – If True, draw spheres at waypoint positionspoint_size (
float) – Radius of waypoint spheres in meterslifetime (
float) – Debug line lifetime (0 = permanent until reset)
- Return type:
- Returns:
List of debug item IDs (for later removal if needed)
Example:
path = Path.create_square(center=[0, 0, 1], side_length=2.0, rpy=[0.1, 0.2, 0]) debug_ids = path.visualize_waypoints(axis_length=0.5)
- pybullet_fleet.geometry.quat_to_rot_matrix(quat)
Convert a quaternion
(x, y, z, w)to a 3×3 rotation matrix.Returns the matrix as a flat 9-element tuple in row-major order:
(r00, r01, r02, r10, r11, r12, r20, r21, r22)
This is allocation-free and suitable for hot-path use.
- Parameters:
quat (
Tuple[float,float,float,float]) – Quaternion(x, y, z, w)— same convention as PyBullet. Automatically normalized if not already unit-length.- Return type:
Tuple[float,float,float,float,float,float,float,float,float]- Returns:
9-element tuple representing the 3×3 rotation matrix (row-major).
- Raises:
ValueError – If the quaternion has zero norm.
- pybullet_fleet.geometry.rotate_vector(vec, quat)
Rotate a 3-D vector by a quaternion.
Equivalent to
scipy.spatial.transform.Rotation.from_quat(quat).apply(vec)but avoids SciPy object allocation, making it suitable for per-tick hot paths.
- pybullet_fleet.geometry.quat_from_rotvec(rotvec)
Convert a rotation vector (axis × angle) to a quaternion
(x, y, z, w).Equivalent to
scipy.spatial.transform.Rotation.from_rotvec(rotvec).as_quat().
- pybullet_fleet.geometry.quat_multiply(q1, q2)
Hamilton product of two quaternions in
(x, y, z, w)convention.Equivalent to
(Rotation.from_quat(q1) * Rotation.from_quat(q2)).as_quat(). For active rotations this corresponds to applying q2 first, then q1.
- pybullet_fleet.geometry.quat_angle_between(q0, q1)
Compute the rotation angle (radians) between two quaternions.
Equivalent to
(Rotation.from_quat(q1) * Rotation.from_quat(q0).inv()).magnitude(). Handles the double-cover:qand-qrepresent the same rotation. Inputs are normalized internally, so non-unit quaternions are accepted.
- class pybullet_fleet.geometry.SlerpPrecomp(dot: float, theta_0: float, sin_theta_0: float, needs_flip: bool)
Bases:
NamedTuplePrecomputed constants for fast quaternion slerp between two keyframes.
- dot: float
Alias for field number 0
- theta_0: float
Alias for field number 1
- sin_theta_0: float
Alias for field number 2
- needs_flip: bool
Alias for field number 3
- pybullet_fleet.geometry.quat_slerp_precompute(q0, q1)
Precompute constants for fast scalar quaternion slerp.
- pybullet_fleet.geometry.quat_slerp(q0, q1, t, precomp)
Fast scalar quaternion slerp for normalized quaternions.
Specialized for the two-keyframe case where q0/q1 are fixed and only t varies per call. Uses math.sin/cos for scalar operations instead of numpy.
Note:
q0 and q1 must be unit quaternions. The normal slerp path preserves unit length by construction; the near-identical fallback normalizes explicitly.
- Parameters:
- Return type:
- Returns:
Interpolated quaternion as numpy array [x, y, z, w].
- pybullet_fleet.geometry.quat_slerp_batch(q0, q1, t, dot, theta0, sin_theta0)
Vectorised SLERP for N quaternion pairs.
Evaluates slerp simultaneously for all N agents using NumPy — the hot path for
BatchDifferentialControllerandBatchOmniController.The caller is responsible for shortest-path correction:
q1must already have had its sign flipped wheredot(q0, q1) < 0, anddotmust be the corrected (non-negative) dot product.- Parameters:
q0 (
ndarray) –(N, 4)start quaternions.q1 (
ndarray) –(N, 4)end quaternions (sign-corrected for shortest path).t (
ndarray) –(N,)interpolation parameters in[0, 1].dot (
ndarray) –(N,)dot productsdot(q0, q1) >= 0.theta0 (
ndarray) –(N,)half-anglesacos(dot).sin_theta0 (
ndarray) –(N,)sin(theta0).
- Return type:
- Returns:
(N, 4)interpolated unit quaternions.
pybullet_fleet.logging_utils module
Logging utilities with lazy evaluation for expensive operations.
Design intent & usage policy: - The standard Python logger always evaluates f-strings and expensive computations, even if the log level is disabled. - LazyLogger/NamedLazyLogger allow you to pass a lambda, so the message is only evaluated if the log level is enabled. - This prevents unnecessary computation and side effects, improving performance.
Usage policy: - For lightweight constants or simple strings, pass them directly (no lambda needed). - For expensive computations or side effects, always wrap in a lambda. - Both are supported for flexibility and easier migration from standard logging.
See test_logging_utils.py for more usage and test examples.
- class pybullet_fleet.logging_utils.LazyLogger(name=None, logger=None)
Bases:
objectLogger wrapper with lazy evaluation for expensive formatting operations.
This prevents expensive string formatting (e.g., NumPy array conversion) when the log level is not enabled.
Usage:
logger = LazyLogger(__name__) logger.debug(lambda: f"Expensive array: {numpy_array}")
Or with the decorator:
@lazy_log def my_logger(): return logging.getLogger(__name__) logger = my_logger() logger.debug(lambda: f"Array: {arr}")
- __init__(name=None, logger=None)
Initialize LazyLogger.
- debug(msg_func, *args, **kwargs)
Log debug message with lazy evaluation.
- info(msg_func, *args, **kwargs)
Log info message with lazy evaluation.
- warning(msg_func, *args, **kwargs)
Log warning message with lazy evaluation.
- error(msg_func, *args, **kwargs)
Log error message with lazy evaluation.
- critical(msg_func, *args, **kwargs)
Log critical message with lazy evaluation.
- property logger: Logger
Get underlying logger instance.
- isEnabledFor(level)
Check if logging is enabled for the given level.
- Return type:
- class pybullet_fleet.logging_utils.NamedLazyLogger(name=None, logger=None, prefix='')
Bases:
LazyLoggerLazyLogger with a dynamic prefix for instance-level identification.
Designed for SimObject/Agent/Action instances where each log line should include identifiers like object_id and name, without manually embedding them in every log call.
The prefix can be updated at any time via
set_prefix()(e.g. afterobject_idis assigned by sim_core).Usage:
# In SimObject.__init__ self._log = NamedLazyLogger(__name__) self._log.set_prefix(f"[obj:{self.object_id}] ") # In Agent.__init__ (after super().__init__) self._log.set_prefix(f"[Agent:{self.object_id}:{self.name}] ") # Log calls — prefix is prepended automatically self._log.info("Path complete") # => [Agent:3:robot_A] Path complete self._log.debug(lambda: f"pos={self.get_pose().position[:2]}") # => [Agent:3:robot_A] pos=[1.0, 2.0] (only evaluated if DEBUG enabled)
Note
The prefix is stored as a plain string and prepended inside
_log_lazyonly when the level is enabled, so the cost is negligible when the message is filtered out.- __init__(name=None, logger=None, prefix='')
Initialize NamedLazyLogger.
- set_prefix(prefix)
Update the prefix prepended to every log message.
Call this whenever the identifying information changes (e.g. after object_id is assigned).
- Parameters:
prefix (
str) – New prefix string (include trailing space if desired)
- property prefix: str
Get current prefix string.
- pybullet_fleet.logging_utils.get_named_lazy_logger(name, prefix='')
Get a NamedLazyLogger instance.
This is the recommended way to create instance-level loggers for SimObject / Agent / Action classes.
- Parameters:
- Return type:
NamedLazyLogger- Returns:
NamedLazyLogger instance
Example:
# In __init__ self._log = get_named_lazy_logger(__name__, f"[Agent:{self.object_id}] ") self._log.debug(lambda: f"pos={pos}")
- pybullet_fleet.logging_utils.get_lazy_logger(name)
Get a LazyLogger instance for the given name.
This is the recommended way to create lazy loggers.
- Parameters:
name (
str) – Logger name (typically __name__)- Return type:
LazyLogger- Returns:
LazyLogger instance
Example:
logger = get_lazy_logger(__name__) logger.debug(lambda: f"Expensive: {numpy_array}")
- pybullet_fleet.logging_utils.wrap_existing_logger(logger)
Wrap an existing logger with lazy evaluation.
- Parameters:
logger (
Logger) – Existing logging.Logger instance- Return type:
LazyLogger- Returns:
LazyLogger wrapping the existing logger
Example:
standard_logger = logging.getLogger(__name__) lazy_logger = wrap_existing_logger(standard_logger) lazy_logger.debug(lambda: f"Array: {arr}")
pybullet_fleet.sim_object module
sim_object.py Base class for simulation objects with attachment support.
- class pybullet_fleet.sim_object.ShapeParams(shape_type=None, mesh_path=None, mesh_scale=<factory>, half_extents=<factory>, radius=0.5, height=1.0, rgba_color=<factory>, frame_pose=None)
Bases:
objectParameters for visual or collision shape.
Similar to URDF’s <visual> and <collision> tags, this allows full control over shape type, geometry, appearance, and frame offset.
- shape_type
Shape type - “box”, “sphere”, “cylinder”, “mesh”, or None (no shape)
- mesh_path
Path to mesh file (required if shape_type=”mesh”)
- mesh_scale
Mesh scaling [sx, sy, sz] (for mesh shapes)
- half_extents
Box half extents [hx, hy, hz] (for box shapes)
- radius
Sphere/cylinder radius (for sphere/cylinder shapes)
- height
Cylinder height (for cylinder shapes)
- rgba_color
RGBA color [r, g, b, a] (for visual shapes, ignored for collision)
- frame_pose
Shape offset as Pose object (relative to body frame)
Example:
# Box visual visual = ShapeParams( shape_type="box", half_extents=[0.5, 0.3, 0.2], rgba_color=[0.8, 0.2, 0.2, 1.0] ) # Mesh collision collision = ShapeParams( shape_type="mesh", mesh_path="collision.obj", mesh_scale=[1.0, 1.0, 1.0] ) # Sphere with offset sphere = ShapeParams( shape_type="sphere", radius=0.5, rgba_color=[0.0, 1.0, 0.0, 1.0], frame_pose=Pose.from_xyz(0, 0, 0.5) )
- radius: float = 0.5
- height: float = 1.0
- frame_pose: Pose | None = None
- classmethod from_dict(d)
Create ShapeParams from a configuration dictionary.
Recognised keys match the dataclass fields:
shape_type,mesh_path,mesh_scale,half_extents,radius,height,rgba_color. Missing keys use dataclass defaults.
- class pybullet_fleet.sim_object.SimObjectSpawnParams(visual_shape=None, collision_shape=None, initial_pose=<factory>, mass=0.0, pickable=True, name=None, visual_frame_pose=None, collision_frame_pose=None, collision_mode=CollisionMode.NORMAL_3D, user_data=<factory>)
Bases:
objectParameters for spawning a SimObject.
- visual_shape
Visual shape parameters (ShapeParams or None)
- collision_shape
Collision shape parameters (ShapeParams or None)
- initial_pose
Initial Pose (position and orientation) in world coordinates (default: origin)
- mass
Object mass in kg (0.0 for static objects, >0 for dynamic)
- pickable
Whether this object can be picked by robots (default: True)
- name
Optional string name for human-readable identification (default: None). Duplicates allowed - multiple objects can share the same name. Use for debugging, logging, filtering (e.g., “LeftRobot”, “Pallet_A”). For unique identification, use object_id instead.
- user_data
Optional dictionary for custom metadata (default: empty dict)
Note
Managers calculate positions automatically and may override initial_pose.
Example:
# Box visual + sphere collision with name params = SimObjectSpawnParams( visual_shape=ShapeParams( shape_type="box", half_extents=[0.5, 0.3, 0.2], rgba_color=[1.0, 0.0, 0.0, 1.0] ), collision_shape=ShapeParams( shape_type="sphere", radius=0.3 ), initial_pose=Pose.from_xyz(0, 0, 1), mass=1.0, name="Pallet_A" # Human-readable name (duplicates allowed) ) obj = SimObject.from_params(params, sim_core) # Later: Use object_id for unique lookup retrieved_obj = sim_core.get_object_by_id(obj.object_id) # Or filter by name (may return multiple objects) pallets = [o for o in sim_core.objects if o.name == "Pallet_A"]
- visual_shape: ShapeParams | None = None
- collision_shape: ShapeParams | None = None
- initial_pose: Pose
- mass: float = 0.0
- pickable: bool = True
- visual_frame_pose: Pose | None = None
- collision_frame_pose: Pose | None = None
- collision_mode: CollisionMode = 'normal_3d'
- classmethod from_dict(config)
Create SimObjectSpawnParams from a configuration dictionary.
Accepts both programmatic dicts (with
ShapeParams/Poseobjects) and raw YAML dicts (withposelist,yaw, and shape sub-dicts).- YAML-style keys:
name (str, required): Object name. visual_shape (dict | ShapeParams, optional): Visual shape definition. collision_shape (dict | ShapeParams, optional): Collision shape. pose (list):
[x, y, z]position. Alternative toinitial_pose. yaw (float): Yaw angle in radians (used withpose). initial_pose (Pose): Pose object (takes precedence overpose). mass (float): Default0.0(static). pickable (bool): DefaultTrue. collision_mode (str | CollisionMode): Default"normal_3d". user_data (dict): Default{}.
- Parameters:
- Return type:
SimObjectSpawnParams- Returns:
SimObjectSpawnParams instance.
- Raises:
ValueError – If
nameis missing.
- class pybullet_fleet.sim_object.SimObject(body_id, sim_core=None, pickable=True, mass=None, collision_mode=CollisionMode.NORMAL_3D, name=None, user_data=None)
Bases:
objectBase class for simulation objects (robots, pallets, obstacles). Supports attachment/detachment and kinematic teleportation.
- Object Identification:
- object_id: Unique integer ID assigned by sim_core (used for lookups and internal tracking).
This is the primary identifier for programmatic access.
- name: Optional string name for human-readable identification (used for debugging, logging, filtering).
Multiple objects can share the same name (duplicates allowed). Not guaranteed to be unique - use object_id for unique identification.
- Parameters:
- object_id
Unique integer ID (assigned by sim_core, use for lookups)
- name
Optional string name (duplicates allowed, use for debugging/filtering)
- body_id
PyBullet body ID
- mass
Object mass in kg
- pickable
Whether this object can be picked by robots
- collision_mode
Collision detection mode (NORMAL_3D, NORMAL_2D, STATIC, DISABLED)
- user_data
Dictionary for custom metadata (default: empty dict)
- attached_objects: List[SimObject]
- update(dt)
Called every simulation step for objects with
_needs_update = True.Override in subclasses to implement per-step behavior (e.g. device state machines, animated objects).
- property events: EventBus
Per-entity EventBus. Created on first access (lazy).
Use in subclasses to hook into entity-specific events:
class WarehouseRobot(Agent): def __init__(self, ...): super().__init__(...) from pybullet_fleet.events import SimEvents self.events.on(SimEvents.COLLISION_STARTED, self._on_bump) self.events.on(SimEvents.ACTION_COMPLETED, self._on_task_done)
- set_name(name)
Set the object name and update the log prefix accordingly.
Use this instead of directly setting
self.nameto ensure the log prefix stays in sync.
- classmethod create_shared_shapes(visual_shape=None, collision_shape=None, physics_client_id=0)
Create shared visual and collision shapes for fast object spawning.
This method caches mesh shapes by their parameters, so multiple objects with identical mesh parameters will reuse the same PyBullet shape IDs.
Note: Primitive shapes (box, sphere, cylinder) are NOT cached, as PyBullet handles them very efficiently and caching adds unnecessary overhead.
- Parameters:
- Return type:
- Returns:
(visual_id, collision_id) tuple
Example:
visual_id, collision_id = create_shared_shapes( visual_shape=ShapeParams( shape_type="mesh", mesh_path="visual.obj", mesh_scale=[1.0, 1.0, 1.0], rgba_color=[0.8, 0.2, 0.2, 1.0] ), collision_shape=ShapeParams( shape_type="box", half_extents=[0.5, 0.3, 0.2] ), physics_client_id=sim_core.client )
- classmethod from_mesh(visual_shape=None, collision_shape=None, pose=None, mass=0.0, pickable=True, sim_core=None, collision_mode=CollisionMode.NORMAL_3D, name=None, user_data=None)
Create a SimObject with optional visual and collision shapes.
- Parameters:
visual_shape (
Optional[ShapeParams]) – ShapeParams for visual geometrycollision_shape (
Optional[ShapeParams]) – ShapeParams for collision geometrypose (
Pose) – Initial Pose (position and orientation, default: origin)mass (
float) – Object mass (0.0 for static)pickable (
bool) – Whether object can be picked by robotssim_core – Reference to simulation core
name (
Optional[str]) – Optional human-readable name (duplicates allowed)
- Return type:
SimObject- Returns:
SimObject instance
Example:
# Box visual + sphere collision obj = SimObject.from_mesh( visual_shape=ShapeParams( shape_type="box", half_extents=[0.5, 0.3, 0.2], rgba_color=[0.8, 0.2, 0.2, 1.0] ), collision_shape=ShapeParams( shape_type="sphere", radius=0.4 ), pose=Pose.from_xyz(0, 0, 1), mass=1.0, sim_core=sim_core )
- classmethod from_sdf(sdf_path, sim_core=None, collision_mode=CollisionMode.NORMAL_3D, global_scaling=1.0, name_prefix=None, pickable=False, use_fixed_base=True)
Load an SDF file and wrap each body in a SimObject.
Uses PyBullet’s
p.loadSDF()to load the file and creates a SimObject for each body. All objects are auto-registered tosim_coreif provided.Note
PyBullet’s SDF loader does NOT support
<world>tags,<include>tags, ormodel://URIs. Use for individual SDF models (e.g., pybullet_data’s kiva_shelf, wsg50_gripper).- Parameters:
sdf_path (
str) – Path to SDF file (resolved via resolve_model())sim_core – Simulation core for registration
collision_mode (
CollisionMode) – Collision detection mode for all loaded objectsglobal_scaling (
float) – Uniform scale factorname_prefix (
Optional[str]) – Prefix for names (default: SDF filename stem)pickable (
bool) – Whether objects can be picked upuse_fixed_base (
bool) – If True, fix base (mass=0, kinematic)
- Return type:
List[SimObject]- Returns:
List of SimObject instances (one per body in the SDF)
Example:
shelves = SimObject.from_sdf( 'kiva_shelf/model.sdf', sim_core=sim ) for shelf in shelves: print(f'{shelf.name}: {shelf.get_pose()}')
- classmethod from_params(spawn_params, sim_core=None)
Create a SimObject from SimObjectSpawnParams.
- Parameters:
spawn_params (
SimObjectSpawnParams) – SimObjectSpawnParams instancesim_core – Reference to simulation core
- Return type:
SimObject- Returns:
SimObject instance
Example:
params = SimObjectSpawnParams( visual_shape=ShapeParams( shape_type="mesh", mesh_path="pallet.obj", mesh_scale=[0.5, 0.5, 0.5], rgba_color=[0.8, 0.6, 0.4, 1.0] ), collision_shape=ShapeParams( shape_type="box", half_extents=[0.5, 0.4, 0.1] ), initial_pose=Pose.from_xyz(0, 0, 0.1), mass=1.0 ) obj = SimObject.from_params(params, sim_core)
- classmethod from_dict(config, sim_core=None)
Create a SimObject from a raw config dict.
Combines
SimObjectSpawnParams.from_dict()andfrom_params()in a single call. Usescls._spawn_params_clsso subclasses automatically use their own SpawnParams.
- property is_static: bool
Whether this object is static (never moves).
Returns True if collision_mode is STATIC. This property is read-only. To change collision mode, use set_collision_mode() method.
- set_collision_mode(mode)
Change collision detection mode for this object.
This method updates the collision mode and notifies sim_core to update all collision-related caches (AABB, spatial grid, etc.).
- Parameters:
mode (
CollisionMode) – New CollisionMode- Return type:
Example:
# Change to static obj.set_collision_mode(CollisionMode.STATIC) # Disable collision obj.set_collision_mode(CollisionMode.DISABLED) # Enable 2D collision obj.set_collision_mode(CollisionMode.NORMAL_2D)
- get_pose()
Return current position and orientation as Pose object.
Performance optimization:
Kinematic objects (mass=0): Always returns cached pose since position only changes via set_pose(), which updates the cache.
Dynamic objects: Caches within the same simulation timestep to avoid redundant PyBullet API calls. Cache is invalidated when sim_time advances.
- Return type:
Pose
- set_pose(pose, preserve_velocity=True)
Set position and orientation from a Pose object.
Two-phase step behaviour (see
docs/design/two-phase-step/spec.md):Outside ``step_once()`` (immediate path): writes to PyBullet via
resetBasePositionAndOrientationand, for kinematic objects, refreshes the AABB + spatial grid immediately. Same behaviour as legacy callers.Inside ``step_once()`` (buffered path,
sim_core._in_step == True): updates only the internal pose cache and registers this object for the Phase 2 / Phase 3 batch flush.get_pose()returns the new pose immediately (cache is the source of truth), but PyBullet’s view, AABBs, and the spatial grid are not refreshed until the flush runs later in the samestep_once()invocation. A no-op set_pose (same pose as cache) is skipped entirely with zero PyBullet cost.
Attached children (base-link attachments) are propagated recursively through the same code path — so they take the buffered path inside the step and the immediate path outside it.
- Parameters:
pose (
Pose) – Pose object containing position and orientationpreserve_velocity (
bool) – If True, preserve current velocity after setting pose. If False, velocity is reset to zero (default behavior of PyBullet). For kinematic objects (mass=0), velocity preservation has no effect, so this is automatically set to False for performance.
- Return type:
- Returns:
True if the object moved (position or orientation changed), False otherwise
- set_pose_raw(position, orientation, preserve_velocity=True)
Set position and orientation from raw lists/tuples (no Pose allocation).
This is the fast path for internal use where position and orientation are already available as lists/tuples, avoiding Pose object creation overhead.
- attach_object(obj, parent_link_index=-1, relative_pose=None, joint_type=4, keep_world_pose=False)
Attach another SimObject to this object.
- Parameters:
obj (
SimObject) – Object to attachparent_link_index (
Union[int,str]) – Link index (int) or link name (str) to attach to. -1 or"base_link"for the base link.relative_pose (
Optional[Pose]) – Position and orientation offset in parent link’s frame as Pose object (default: None, which uses Pose(position=[0, 0, 0], orientation=[0, 0, 0, 1]))joint_type (
int) – PyBullet joint type (default: JOINT_FIXED)keep_world_pose (
bool) – If True, automatically computerelative_poseso that the child keeps its current world position and orientation. Cannot be combined with an explicitrelative_pose.
- Return type:
- Returns:
True if attachment successful, False otherwise
- Raises:
ValueError – If both
relative_poseandkeep_world_pose=Trueare specified.
Example:
# Attach to base link at 0.5m in front agent.attach_object(pallet, relative_pose=Pose.from_xyz(0.5, 0, 0)) # Keep the child where it currently is in the world agent.attach_object(box, parent_link_index=2, keep_world_pose=True) # Attach to specific link by name agent.attach_object(box, parent_link_index="end_effector")
- detach_object(obj, drop_pose=None, drop_relative_pose=None)
Detach an object from this object.
After detaching, the object can optionally be teleported:
drop_pose— absolute world pose.drop_relative_pose— offset applied to the object’s current (pre-detach) world pose (position and orientation are composed viamultiplyTransforms).Neither — the object stays at its current world position.
These two parameters are mutually exclusive.
- Parameters:
obj (
SimObject) – Object to detachdrop_pose (
Optional[Pose]) – Absolute world pose to teleport the object to after detaching. Mutually exclusive withdrop_relative_pose.drop_relative_pose (
Optional[Pose]) – Pose offset relative to the object’s current (pre-detach) world pose. Mutually exclusive withdrop_pose.
- Return type:
- Returns:
True if detachment successful, False otherwise
- Raises:
ValueError – If both
drop_poseanddrop_relative_poseare specified.
Example:
# Detach and leave in place agent.detach_object(pallet) # Detach and place at absolute world position agent.detach_object(pallet, drop_pose=Pose.from_xyz(10, 5, 0.1)) # Detach and offset from current position (e.g. lower by 0.5m) agent.detach_object(pallet, drop_relative_pose=Pose.from_xyz(0, 0, -0.5))
- get_attached_objects()
Return list of objects attached to this object.
- Return type:
List[SimObject]- Returns:
List of attached SimObject instances
- is_attached()
Check if this object is attached to another object.
- Return type:
- Returns:
True if attached to another object, False otherwise
- find_nearest_pickable(search_radius=1.0)
Find the nearest pickable, unattached SimObject within radius.
Convenience wrapper around
tools.find_nearest()that searches from this object’s current position with the standardpickable and not attachedfilter.
- register_callback(callback, frequency=0.25)
Register a callback function to be executed periodically.
- execute_callbacks(current_time)
Execute registered callbacks based on their frequency.
pybullet_fleet.tools module
- pybullet_fleet.tools.body_to_world_velocity_2d(vx_body, vy_body, yaw)
Rotate 2-D body-frame velocity to world frame using yaw only.
- Parameters:
- Return type:
- Returns:
(vx, vy)in world frame.
Example:
>>> body_to_world_velocity_2d(1.0, 0.0, yaw=math.pi / 2) (0.0, 1.0) # facing +Y, forward maps to +Y
- pybullet_fleet.tools.body_to_world_velocity_3d(vx_body, vy_body, vz_body, orientation)
Rotate 3-D body-frame velocity to world frame using full quaternion.
Uses the same quaternion convention as PyBullet:
(x, y, z, w).- Parameters:
- Return type:
- Returns:
(vx, vy, vz)in world frame.
Example:
>>> from scipy.spatial.transform import Rotation >>> q = Rotation.from_euler("z", 90, degrees=True).as_quat() # xyzw >>> body_to_world_velocity_3d(1.0, 0.0, 0.0, tuple(q)) (0.0, 1.0, 0.0)
- pybullet_fleet.tools.normalize_vector_param(value, param_name, dim=3)
Normalize a parameter to a numpy array of specified dimension.
This is a utility function for handling parameters that can be either: - A single float (applied to all dimensions) - A list of floats (one per dimension)
Commonly used for velocity/acceleration limits that can be uniform or per-axis.
- Parameters:
- Return type:
- Returns:
Numpy array of shape (dim,)
- Raises:
ValueError – If list length doesn’t match dimension
Example:
>>> normalize_vector_param(2.0, "max_vel", 3) array([2., 2., 2.]) >>> normalize_vector_param([1.0, 2.0, 3.0], "max_vel", 3) array([1., 2., 3.])
- pybullet_fleet.tools.resolve_link_index(body_id, link)
Resolve link name to index.
- Parameters:
- Return type:
- Returns:
Link index (int). Returns -1 if link is “base_link” or if link name not found.
Example:
>>> index = resolve_link_index(robot.body_id, "sensor_mast") >>> index = resolve_link_index(robot.body_id, 2) # Also accepts int >>> index = resolve_link_index(robot.body_id, "base_link") # Returns -1
- pybullet_fleet.tools.resolve_joint_index(body_id, joint)
Resolve joint name to index.
- Parameters:
- Return type:
- Returns:
Joint index (int). Returns -1 if joint name not found.
Example:
>>> idx = resolve_joint_index(robot.body_id, "elbow_joint") >>> idx = resolve_joint_index(robot.body_id, 2)
- pybullet_fleet.tools.calculate_offset_pose(target_position, current_position, offset, keep_height=True)
Calculate pose at specified offset distance from target.
This unified function can be used for various offset calculations: - Approach poses (approach_offset): Position before reaching target - Pick poses (pick_offset): Position where pick operation occurs - Drop poses (drop_offset): Position where drop operation occurs
- Parameters:
- Return type:
Pose- Returns:
Pose object at the offset position, oriented to face the target
Example:
>>> # Approach pose (1.5m away from target) >>> approach = calculate_offset_pose([5, 0, 0.1], [0, 0, 0.3], offset=1.5) >>> >>> # Pick pose (0.3m away from target) >>> pick = calculate_offset_pose([5, 0, 0.1], [0, 0, 0.3], offset=0.3) >>> >>> # At target position (0m offset) >>> drop = calculate_offset_pose([5, 0, 0.1], [0, 0, 0.3], offset=0.0)
- pybullet_fleet.tools.world_to_grid(pos, spacing, offset=None)
Convert world coordinates to grid indices (always 3D), with offset.
- Parameters:
- Return type:
- Returns:
[ix, iy, iz] grid indices
- pybullet_fleet.tools.grid_to_world(grid, spacing, offset=None)
Convert grid indices to world coordinates (always 3D), with offset.
- Parameters:
- Return type:
- Returns:
[x, y, z] world coordinates
- pybullet_fleet.tools.world_to_grid_2d(pos, spacing)
2D wrapper for world_to_grid.
- pybullet_fleet.tools.grid_to_world_2d(grid, spacing)
2D wrapper for grid_to_world.
- pybullet_fleet.tools.grid_execution(grid_num=None, spacing=None, offset=None, func=None, args=None)
Calls the callback function func(grid_index, world_pos, **args) at each grid point.
- Parameters:
grid_num (
List[int]) – [x_num, y_num, z_num] number of grids in each axis (default: [1, 1, 1])spacing (
List[float]) – [spacing_x, spacing_y, spacing_z] grid spacing (default: [1.0, 1.0, 1.0])offset (
Optional[List[float]]) – [offset_x, offset_y, offset_z] offset (default: [0.0, 0.0, 0.0])func (
Optional[Callable]) – Callback function(grid_index: List[int], world_pos: List[float], **args) - grid_index: [ix, iy, iz] grid indices - world_pos: [x, y, z] world coordinatesargs (
Optional[Dict]) – Additional arguments to pass to the callback function
- Return type:
- pybullet_fleet.tools.find_nearest(objects, position, search_radius, predicate=None, exclude=None, use_2d=False)
Find the nearest object from position within search_radius.
A generic spatial nearest-neighbour query over objects that expose
get_pose().position. Filtering is done via an optional predicate callable and an exclude instance.- Parameters:
objects (
Iterable[Any]) – Iterable of objects withget_pose().positionattribute.position (
Sequence[float]) –[x, y, z]reference position in world frame.search_radius (
float) – Maximum search distance (metres).predicate (
Optional[Callable[[Any],bool]]) – Optional filter — called with each object, returnTrueto include. WhenNoneall objects are candidates.exclude (
Optional[Any]) – Single object instance to skip (typicallyself).use_2d (
bool) – WhenTrue, compare only XY distance (ignore Z). Useful for finding items on tables/shelves near a floor-level waypoint.
- Return type:
- Returns:
The nearest matching object, or
Noneif nothing found.
Example:
# Find nearest pickable, unattached object from a world position obj = find_nearest( sim_core.sim_objects, position=[5, 0, 0], search_radius=2.0, predicate=lambda o: o.pickable and not o.is_attached(), exclude=agent, )
pybullet_fleet.types module
types.py Common types and enumerations used across the package.
This module contains shared type definitions to avoid circular imports and provide a single source of truth for common types.
- class pybullet_fleet.types.MotionMode(value)
-
Motion mode for robot movement control.
- OMNIDIRECTIONAL
Robot can move in any direction without rotating first (holonomic)
- DIFFERENTIAL
Robot must rotate to face target, then move forward (non-holonomic)
- OMNIDIRECTIONAL = 'omnidirectional'
- DIFFERENTIAL = 'differential'
- class pybullet_fleet.types.PosePhase(value)
-
Phase for TPI pose control (shared by all kinematic controllers).
- ROTATE
Robot is rotating in place (slerp TPI)
- FORWARD
Robot is moving forward in a straight line (distance TPI)
- ROTATE = 'rotate'
- FORWARD = 'forward'
- class pybullet_fleet.types.MovementDirection(value)
-
Movement direction for robot motion.
- Note: This is only used in differential drive mode (MotionMode.DIFFERENTIAL).
In omnidirectional mode, the robot always faces the movement direction.
- FORWARD
Robot X+ axis points towards target, moves forward along path
- BACKWARD
Robot X- axis points towards target (X+ points away), moves along path (robot appears to move “backward” relative to its X+ orientation)
- AUTO
Automatically choose FORWARD or BACKWARD based on the angle between the robot’s current heading and the movement direction. Uses BACKWARD when the goal is more than 90° behind the robot.
- FORWARD = 'forward'
- BACKWARD = 'backward'
- AUTO = 'auto'
- class pybullet_fleet.types.ActionStatus(value)
Bases:
EnumAction execution status.
- NOT_STARTED
Action has not begun execution
- IN_PROGRESS
Action is currently executing
- COMPLETED
Action completed successfully
- FAILED
Action failed to complete
- CANCELLED
Action was cancelled before completion
- NOT_STARTED = 'not_started'
- IN_PROGRESS = 'in_progress'
- COMPLETED = 'completed'
- FAILED = 'failed'
- CANCELLED = 'cancelled'
- class pybullet_fleet.types.SpatialHashCellSizeMode(value)
Bases:
EnumCollision detection spatial hash cell size calculation mode.
Determines how the spatial hash grid cell size is calculated for efficient broad-phase collision detection.
- CONSTANT
Fixed cell size provided by user (fastest, requires manual tuning)
- AUTO_ADAPTIVE
Recalculate on object add/remove (adaptive, some overhead)
- AUTO_INITIAL
Calculate once at simulation start (default, balanced performance)
- CONSTANT = 'constant'
- AUTO_ADAPTIVE = 'auto_adaptive'
- AUTO_INITIAL = 'auto_initial'
- class pybullet_fleet.types.DoorState(value)
-
Door operational state.
- CLOSED
Door is fully closed.
- OPENING
Door is transitioning toward the open position.
- OPEN
Door is fully open.
- CLOSING
Door is transitioning toward the closed position.
- CLOSED = 'closed'
- OPENING = 'opening'
- OPEN = 'open'
- CLOSING = 'closing'
- class pybullet_fleet.types.ControllerMode(value)
-
Internal mode for KinematicController state machine.
- IDLE
No active command — zero velocity, no pose target.
- VELOCITY
Processing cmd_vel input (body-frame velocity).
- POSE
Processing goal_pose / path via TPI trajectory.
- IDLE = 'idle'
- VELOCITY = 'velocity'
- POSE = 'pose'
- class pybullet_fleet.types.CollisionMode(value)
Bases:
EnumCollision detection mode for individual objects.
Determines how an object participates in collision detection: - Spatial dimension (2D vs 3D) - Movement type (static vs dynamic) - Collision detection enablement
- NORMAL_3D
Normal 3D collision detection (default) - Full 3D AABB overlap check - Checks all 27 spatial hash neighbors (3x3x3) - For dynamic/kinematic objects that move in 3D space
- NORMAL_2D
2D collision detection (Z-axis ignored) - 2D AABB overlap check (X, Y only) - Checks only 9 spatial hash neighbors (3x3x1, Z=0) - For ground vehicles, 2D robots
- STATIC
Static object mode (never moves, but has collision) - Registered in collision system during spawn - Never updates AABB/spatial grid (performance optimization) - Examples: walls, shelves, structures
- DISABLED
Collision detection completely disabled (ghost object) - No AABB calculation - No spatial grid registration - No collision checking - PyBullet physics collision also disabled - Examples: decorative objects, visual markers, UI elements
- Note: The actual collision filtering (e.g., ignore_static) is done at
core_simulation level, not in this mode definition.
Example
# Normal 3D robot robot = SimObject(…, collision_mode=CollisionMode.NORMAL_3D)
# 2D ground vehicle vehicle = SimObject(…, collision_mode=CollisionMode.NORMAL_2D)
# Static structure wall = SimObject(…, collision_mode=CollisionMode.STATIC)
# Ghost object (no collision) decoration = SimObject(…, collision_mode=CollisionMode.DISABLED)
- NORMAL_3D = 'normal_3d'
- NORMAL_2D = 'normal_2d'
- STATIC = 'static'
- DISABLED = 'disabled'
- class pybullet_fleet.types.CollisionDetectionMethod(value)
Bases:
EnumCollision detection method for narrow-phase collision checking.
Determines which PyBullet API to use for detecting actual contacts between objects after broad-phase filtering (AABB + spatial hashing).
Design Philosophy (from design document): - Physics OFF (kinematics): Use CLOSEST_POINTS for safety margin detection - Physics ON (physics): Use CONTACT_POINTS for actual contact logging
- CLOSEST_POINTS
Use getClosestPoints() - distance-based, kinematics-safe (RECOMMENDED DEFAULT) Best for: kinematics motion, safety clearance, collision avoidance Works with: Physics ON/OFF, stable with resetBasePositionAndOrientation
- CONTACT_POINTS
Use getContactPoints() - physics contact manifold (PHYSICS MODE) Best for: physics simulation, actual contact logging, debug/reproduction Requires: stepSimulation() to be called regularly Note: Unstable for kinematic-kinematic pairs
- HYBRID
Use getContactPoints for physics, getClosestPoints for kinematic (ADVANCED) Best for: Mixed physics/kinematics with different detection needs Slower due to branching overhead
- Recommended defaults:
physics_enabled=False → CLOSEST_POINTS (default)
physics_enabled=True → CONTACT_POINTS or HYBRID
- CLOSEST_POINTS = 'closest_points'
- CONTACT_POINTS = 'contact_points'
- HYBRID = 'hybrid'
Module contents
pybullet_fleet package General-purpose PyBullet simulation library for multi-robot fleets