pybullet_fleet package
Submodules
pybullet_fleet.action module
action.py Base classes and implementations for robot actions.
Provides a unified interface for different types of robot actions: - MoveAction: Path following - PickAction: Object picking with approach - DropAction: Object dropping with approach - WaitAction: Timed waiting (e.g., charging)
Actions are executed through an action queue system in the Agent class.
- pybullet_fleet.action.DEFAULT_MAX_FORCE: float = 500.0
Default maximum motor force (N) for joint/EE control.
- pybullet_fleet.action.DEFAULT_JOINT_TOLERANCE: float = 0.01
Default joint-space tolerance (rad or m) for convergence checks.
- pybullet_fleet.action.DEFAULT_EE_TOLERANCE: float = 0.02
Default Cartesian end-effector tolerance (m) for convergence checks.
- class pybullet_fleet.action.PickPhase(value)
Bases:
EnumPhases of pick action execution.
- INIT = 'init'
- APPROACHING = 'approaching'
- MOVING_TO_PICK = 'moving_to_pick'
- PICKING = 'picking'
- RETREATING = 'retreating'
- class pybullet_fleet.action.DropPhase(value)
Bases:
EnumPhases of drop action execution.
- INIT = 'init'
- APPROACHING = 'approaching'
- MOVING_TO_DROP = 'moving_to_drop'
- DROPPING = 'dropping'
- RETREATING = 'retreating'
- class pybullet_fleet.action.Action
Bases:
ABCBase class for all robot actions.
Actions represent high-level tasks that agents can perform: - MoveAction: Navigate along a path - PickAction: Pick up an object - DropAction: Drop an object - WaitAction: Wait for a duration
Actions are executed through Agent.add_action() or Agent.add_action_sequence(). Each action manages its own state and returns True when complete.
- status
Current execution status
- start_time
When action started (simulation time)
- end_time
When action completed (simulation time)
- error_message
Error description if action failed
- abstractmethod execute(agent, dt)
Execute action for one timestep.
- abstractmethod reset()
Reset action state for re-execution.
- cancel()
Cancel action execution.
- get_duration()
Get action execution duration.
- is_complete()
Check if action has completed (successfully or with failure).
- Return type:
- class pybullet_fleet.action.MoveAction(path, auto_approach=True, final_orientation_align=True, direction=MovementDirection.FORWARD)
Bases:
ActionMove along a path.
Wraps the existing Agent path following functionality into the Action interface.
- Parameters:
path (
Path) – Path object with waypoints to followauto_approach (
bool) – Add approach waypoint if far from start (default: True)final_orientation_align (
bool) – Align to final orientation after reaching end (default: True)direction (
Union[MovementDirection,str]) – Movement direction - MovementDirection.FORWARD (default, face movement direction) or MovementDirection.BACKWARD (maintain orientation, move backward). Can also accept string “forward” or “backward” for convenience.
Example:
# Move forward (normal) path = Path.from_positions([[0, 0, 0], [1, 0, 0], [1, 1, 0]]) action = MoveAction(path=path) agent.add_action(action) # Move backward action = MoveAction(path=path, direction=MovementDirection.BACKWARD) agent.add_action(action)
- path: Path
- auto_approach: bool = True
- final_orientation_align: bool = True
- direction: MovementDirection | str = 'forward'
- execute(agent, dt)
Execute move action.
- Return type:
- reset()
Reset action state.
- class pybullet_fleet.action.JointAction(target_joint_positions, max_force=500.0, tolerance=None)
Bases:
ActionJoint positions control as an Action.
- Parameters:
target_joint_positions (
Union[list,dict]) – List of target joint positions (radians for revolute, metres for prismatic)max_force (
float) – Maximum force to apply (default: 500.0)tolerance (
Union[float,list,dict,None]) –Position tolerance to consider as reached. Accepts a single float (applied to all joints), a list (one per joint), or a dict
{joint_name: tol}. WhenNone(the default), the tolerance is resolved fromagent.joint_toleranceat the firstexecute()call and written back to this attribute so the resolved value is inspectable after the action starts.Note
A scalar tolerance applies the same value to every joint regardless of type. For mixed prismatic / revolute chains, consider specifying per-joint tolerances so that prismatic joints (metres) can be tighter than revolute joints (radians).
wait_time – Optional seconds to wait after reaching target (default: 0.0)
Example:
# Scalar tolerance (same for all joints) action = JointAction(target_joint_positions=[0.0, 1.0, 0.0, 0.0]) agent.add_action(action) # Per-joint tolerance for mixed prismatic/revolute action = JointAction( target_joint_positions={"rail_joint": 0.5, "elbow_to_wrist": -0.3}, tolerance={"rail_joint": 0.005, "elbow_to_wrist": 0.05}, # 5 mm / 0.05 rad )
- max_force: float = 500.0
- execute(agent, dt)
Execute action for one timestep.
- reset()
Reset action state for re-execution.
- class pybullet_fleet.action.PoseAction(target_position, target_orientation=None, end_effector_link=None, max_force=500.0, tolerance=0.02)
Bases:
ActionEnd-effector pose control via inverse kinematics.
Calls
agent.move_end_effector()on the first step. Joint targets are always set and the action waits for joints to settle. After settling, the action completes withCOMPLETEDif the target was reachable, orFAILEDif it was not.toleranceis used for both the IK reachability pre-check (viamove_end_effector()) and the final EE Cartesian distance verification (viaare_ee_at_target()). Joint convergence uses the default joint tolerance (not this value).- Parameters:
target_position (
List[float]) – Target EE position [x, y, z] in world frame.target_orientation (
Optional[tuple]) – Target EE orientation as quaternion [qx, qy, qz, qw] (optional).end_effector_link (
Union[int,str,None]) – Link index (int), name (str), or None for auto-detect.max_force (
float) – Maximum force for motor control (default: 500.0).tolerance (
float) – EE Cartesian distance tolerance in metres (default: 0.02).
Example:
action = PoseAction(target_position=[0.3, 0.0, 0.5]) agent.add_action(action) action = PoseAction( target_position=[0.3, 0.0, 0.5], target_orientation=[0, 0, 0, 1], ) agent.add_action(action)
- max_force: float = 500.0
- tolerance: float = 0.02
- execute(agent, dt)
Execute action for one timestep.
- reset()
Reset action state for re-execution.
- class pybullet_fleet.action.WaitAction(duration, action_type='idle')
Bases:
ActionWait for a specified duration.
Useful for simulating charging, loading, or other time-based operations.
- Parameters:
Example:
# Wait for 5 seconds (charging) action = WaitAction(duration=5.0, action_type="charge") agent.add_action(action)
- duration: float
- action_type: str = 'idle'
- execute(agent, dt)
Execute wait action.
- Return type:
- reset()
Reset action state.
- class pybullet_fleet.action.PickAction(target_object_id=None, target_position=None, search_radius=0.5, use_approach=True, approach_pose=None, approach_offset=1.0, pick_offset=0.0, attach_link=-1, attach_relative_pose=<factory>, joint_targets=None, joint_tolerance=None, joint_max_force=500.0, ee_target_position=None, ee_target_orientation=None, ee_end_effector_link=None, ee_tolerance=0.02, continue_on_ik_failure=True)
Bases:
ActionPick an object and attach it to the robot.
Kinematic implementation: 1. Move to approach pose (if specified or auto-calculated) 2. Teleport object to attachment position 3. Attach object using constraint
- Parameters:
target_object_id (
Optional[int]) – Specific object body ID to pick (optional)target_position (
Optional[List[float]]) – Pick from position - auto-select nearest pickable object (optional)search_radius (
float) – Search radius when using target_position (default: 0.5m)approach_pose (
Optional[Pose]) – Explicit approach pose (optional, auto-calculated if None)approach_offset (
float) – Auto-calculate approach offset distance (default: 1.0m)pick_offset (
float) – Distance from target where pick is executed (default: 0.0m)attach_link (
Union[int,str]) – Link to attach to - can be index (int) or name (str). -1 or “base_link” for base link (default: -1)attach_relative_pose (
Pose) – Position and orientation offset in link’s frame as Pose object
Note
Path visualization is controlled by agent.path_visualize setting.
Example:
# Pick specific object with position offset (using link index) action = PickAction( target_object_id=pallet.body_id, approach_offset=1.0, pick_offset=0.3, attach_link=-1, # Base link attach_relative_pose=Pose.from_euler(0.6, 0, -0.2, roll=np.pi/2, pitch=0, yaw=0) ) agent.add_action(action) # Pick using link name action = PickAction( target_object_id=pallet.body_id, attach_link="sensor_mast", # Attach to named link attach_relative_pose=Pose.from_euler(0, 0, 0.5) ) agent.add_action(action) # Pick nearest object at location action = PickAction( target_position=[5, 0, 0.1], search_radius=0.5 ) agent.add_action(action)
- search_radius: float = 0.5
- use_approach: bool = True
- approach_pose: Pose | None = None
- approach_offset: float = 1.0
- pick_offset: float = 0.0
- attach_relative_pose: Pose
- joint_max_force: float = 500.0
- ee_tolerance: float = 0.02
- continue_on_ik_failure: bool = True
- execute(agent, dt)
Execute pick action.
- Return type:
- reset()
Reset action state.
- class pybullet_fleet.action.DropAction(drop_pose, use_approach=True, approach_pose=None, approach_offset=1.0, drop_offset=0.0, place_gently=True, drop_height=0.1, target_object_id=None, drop_relative_pose=None, joint_targets=None, joint_tolerance=None, joint_max_force=500.0, ee_target_position=None, ee_target_orientation=None, ee_end_effector_link=None, ee_tolerance=0.02, continue_on_ik_failure=True)
Bases:
ActionDrop an attached object at a specified location.
Kinematic implementation: 1. Move to approach pose (if specified or auto-calculated) 2. Detach object 3. Teleport object to drop position
- Parameters:
drop_pose (
Pose) – Where to drop the object (position and orientation). Orientation defaults to no rotation[0, 0, 0, 1].approach_pose (
Optional[Pose]) – Explicit approach pose (optional, auto-calculated if None)approach_offset (
float) – Auto-calculate approach offset distance (default: 0.2m)place_gently (
bool) – Place at exact position vs drop from height (default: True)drop_height (
float) – Height above drop_pose to release if not gentle (default: 0.1m)target_object_id (
Optional[int]) – Specific object to drop (None = drop first attached)
Example:
# Drop at specific location action = DropAction( drop_pose=Pose(position=[10, 5, 0.1]), place_gently=True ) agent.add_action(action) # Drop with specific orientation action = DropAction( drop_pose=Pose(position=[10, 5, 0.1], orientation=[0, 0, 0.7, 0.7]), )
Note
Path visualization is controlled by agent.path_visualize setting.
- drop_pose: Pose
- use_approach: bool = True
- approach_pose: Pose | None = None
- approach_offset: float = 1.0
- drop_offset: float = 0.0
- place_gently: bool = True
- drop_height: float = 0.1
- drop_relative_pose: Pose | None = None
- joint_max_force: float = 500.0
- ee_tolerance: float = 0.02
- continue_on_ik_failure: bool = True
- execute(agent, dt)
Execute drop action.
- Return type:
- reset()
Reset action state.
pybullet_fleet.agent module
core/agent.py Agent class for goal-based position control with max velocity and acceleration constraints. Supports both mobile (use_fixed_base=False) and fixed (use_fixed_base=True) robots. Supports both Mesh and URDF loading.
- class pybullet_fleet.agent.AgentSpawnParams(visual_shape=None, collision_shape=None, initial_pose=<factory>, mass=0.0, pickable=True, name=None, visual_frame_pose=None, collision_frame_pose=None, collision_mode=CollisionMode.NORMAL_3D, user_data=<factory>, urdf_path=None, max_linear_vel=2.0, max_linear_accel=5.0, max_angular_vel=3.0, max_angular_accel=10.0, motion_mode=MotionMode.DIFFERENTIAL, use_fixed_base=False, ik_params=None, controller_config=None)
Bases:
SimObjectSpawnParamsAgent spawn parameters extending SimObjectSpawnParams.
These parameters define the physical properties, appearance, and initial state of an agent. They are used by Agent.from_params() and AgentManager.spawn_agents_grid().
Supports three types of robots: 1. Mesh robots: Specify visual_shape (and optionally collision_shape) 2. URDF robots: Specify urdf_path (for robots with joints) 3. Virtual agents: Specify neither (invisible, no collision, useful for tracking/planning)
- Attributes (in addition to SimObjectSpawnParams):
urdf_path: Path to robot URDF file (for URDF-based robots with joints) max_linear_vel: Maximum velocity in m/s - float or [vx, vy, vz] (ignored if use_fixed_base=True) max_linear_accel: Maximum acceleration in m/s² - float or [ax, ay, az] (ignored if use_fixed_base=True) max_angular_vel: Maximum angular velocity in rad/s (for differential drive) max_angular_accel: Maximum angular acceleration in rad/s² (for differential drive, default: 10.0) motion_mode: MotionMode.OMNIDIRECTIONAL (move in any direction) or MotionMode.DIFFERENTIAL (rotate then move forward) use_fixed_base: If True, robot base is fixed and doesn’t move (default: False)
- Inherited from SimObjectSpawnParams:
- name: Optional string name for human-readable identification.
Duplicates allowed - use for debugging/filtering, not unique lookup. For unique identification, use object_id after spawning.
Note
AgentManager.spawn_agents_grid() calculates positions automatically and may override initial_pose.
- motion_mode: MotionMode | str = 'differential'
- use_fixed_base: bool = False
- ik_params: IKParams | None = None
- classmethod from_dict(config)
Create AgentSpawnParams from configuration dictionary.
- Parameters:
config (
Dict[str,Any]) – Dictionary with robot parameters including: - visual_shape/collision_shape or urdf_path - initial_pose: Pose object (optional) - max_linear_vel, max_linear_accel, use_fixed_base, etc. (optional)- Returns:
AgentSpawnParams instance
Example:
config = { 'visual_shape': ShapeParams( shape_type='mesh', mesh_path='robot.obj', mesh_scale=[1.0, 1.0, 1.0], rgba_color=[0.2, 0.2, 0.2, 1.0] ), 'collision_shape': ShapeParams( shape_type='box', half_extents=[0.2, 0.1, 0.2] ), 'initial_pose': Pose.from_xyz(1.0, 2.0, 0.0), 'max_linear_vel': 2.0, 'max_linear_accel': 5.0, 'use_fixed_base': False } params = AgentSpawnParams.from_dict(config) # Or for URDF: config = { 'urdf_path': 'robots/mobile_robot.urdf', 'initial_pose': Pose.from_xyz(1.0, 2.0, 0.0), 'use_fixed_base': False } params = AgentSpawnParams.from_dict(config)
- class pybullet_fleet.agent.IKParams(max_outer_iterations=5, convergence_threshold=0.01, max_inner_iterations=200, residual_threshold=0.0001, reachability_tolerance=0.02, seed_quartiles=(0.25, 0.5, 0.75), ik_joint_names=None)
Bases:
objectInverse kinematics solver configuration.
Controls the multi-seed iterative IK solver used by
Agent._solve_ik()andAgent.move_end_effector().- max_outer_iterations
Maximum refinement iterations per seed.
- convergence_threshold
EE-to-target distance (m) below which the IK solution is accepted early.
- max_inner_iterations
maxNumIterationspassed topybullet.calculateInverseKinematics.
- residual_threshold
residualThresholdpassed topybullet.calculateInverseKinematics.
- reachability_tolerance
Maximum EE distance (m) for
Agent.move_end_effector()to report reachable.
- seed_quartiles
Tuple of joint-range fractions (0–1) used to build diversified rest-pose seeds. Each value
qproduces a seed atlower + q * rangefor every joint.
- ik_joint_names
Explicit tuple of joint names that the IK solver is allowed to move. All other movable joints are locked at their current position. When
None(default), the solver falls back to the automatic heuristic (continuous joints are locked, everything else is free).
Example:
cfg = IKParams(max_outer_iterations=10, seed_quartiles=(0.1, 0.5, 0.9)) arm = Agent.from_urdf("arm.urdf", ik_params=cfg) # Mobile manipulator — solve only for arm joints cfg = IKParams(ik_joint_names=( "mount_to_shoulder", "shoulder_to_elbow", "elbow_to_wrist", "wrist_to_end", )) robot = Agent.from_urdf("mobile_manipulator.urdf", ik_params=cfg)
- max_outer_iterations: int = 5
- convergence_threshold: float = 0.01
- max_inner_iterations: int = 200
- residual_threshold: float = 0.0001
- reachability_tolerance: float = 0.02
- class pybullet_fleet.agent.Agent(body_id, urdf_path=None, max_linear_vel=2.0, max_linear_accel=5.0, max_angular_vel=3.0, max_angular_accel=10.0, motion_mode=MotionMode.DIFFERENTIAL, use_fixed_base=False, collision_mode=CollisionMode.NORMAL_3D, sim_core=None, name=None, user_data=None, mass=None, ik_params=None, joint_tolerance=None)
Bases:
SimObjectAgent class with goal-based position control.
Inherits from SimObject to get attach/detach and callback functionality. Supports both mobile robots (use_fixed_base=False) and fixed robots (use_fixed_base=True). Supports both Mesh and URDF loading.
- Class Constants:
- _KINEMATIC_JOINT_FALLBACK_VELOCITY: Default max joint velocity for
revolute joints (rad/s) used when the URDF
<limit velocity="...">is 0 or missing.- _KINEMATIC_PRISMATIC_FALLBACK_VELOCITY: Default max joint velocity
for prismatic joints (m/s) used when the URDF limit is 0 or missing.
- _DEFAULT_JOINT_TOLERANCE: Default tolerance (rad / m) for joint
target comparison methods.
Features: - Accepts Pose goals (compatible with ROS2 geometry_msgs/Pose) - Max velocity and acceleration constraints (for mobile robots) - Current pose query API - Shared shape optimization for fast spawning - Fixed robot support (non-moving objects) - URDF support with joint control - Attach/detach objects (inherited from SimObject)
Object Identification (inherited from SimObject): - object_id: Unique integer ID (assigned by sim_core, use for lookups) - name: Optional string name (duplicates allowed, use for debugging/filtering)
This class is designed to be eventually controlled by ROS2 nodes, so the API follows ROS message conventions.
- __init__(body_id, urdf_path=None, max_linear_vel=2.0, max_linear_accel=5.0, max_angular_vel=3.0, max_angular_accel=10.0, motion_mode=MotionMode.DIFFERENTIAL, use_fixed_base=False, collision_mode=CollisionMode.NORMAL_3D, sim_core=None, name=None, user_data=None, mass=None, ik_params=None, joint_tolerance=None)
Initialize Agent.
- Parameters:
body_id (
int) – PyBullet body IDurdf_path (
Optional[str]) – Path to robot URDF file (if URDF-based)max_linear_vel (
Union[float,List[float]]) – Maximum velocity (m/s) - float or [vx, vy, vz] for per-axis limitsmax_linear_accel (
Union[float,List[float]]) – Maximum acceleration (m/s²) - float or [ax, ay, az] for per-axis limitsmax_angular_vel (
Union[float,List[float]]) – Maximum angular velocity (rad/s) - float or [yaw, pitch, roll] for per-axis limitsmax_angular_accel (
Union[float,List[float]]) – Maximum angular acceleration (rad/s²) - float or [yaw, pitch, roll] for per-axis limitsmotion_mode (
Union[MotionMode,str]) – MotionMode.OMNIDIRECTIONAL or MotionMode.DIFFERENTIAL driveuse_fixed_base (
bool) – If True, robot base is fixed and doesn’t movecollision_mode (
CollisionMode) – Collision detection mode (default: NORMAL_3D)sim_core – Reference to simulation core (optional)
ik_params (
Optional[IKParams]) – IK solver configuration (default:IKParams()).joint_tolerance (
Union[float,list,dict,None]) – Default joint tolerance for convergence checks. Acceptsfloat(all joints),list(per-joint), ordict({joint_name: tol}). IfNone, uses the class default (0.01). For mixed prismatic / revolute chains, a dict is recommended so prismatic joints (metres) can use a tighter tolerance than revolute joints (radians).
Note
For spawning robots from AgentSpawnParams, use Agent.from_params() instead. For mesh robots, use Agent.from_mesh() factory method. For URDF robots, use Agent.from_urdf() factory method.
- path_visualize: bool
- path_visualize_width: float
- property joint_tolerance: float | list | dict
Default joint tolerance used by convergence checks.
Returns the instance-level tolerance if set, otherwise the class default (
_DEFAULT_JOINT_TOLERANCE = 0.01).For robots with mixed prismatic (metres) and revolute (radians) joints, set this to a
dictso each joint type can have an appropriate tolerance:agent = Agent.from_urdf( ..., joint_tolerance={"rail_joint": 0.005, "elbow_to_wrist": 0.05}, )
- property is_moving: bool
Check if robot is currently moving.
To change movement state, use set_goal_pose(), set_path(), or stop().
- Returns:
True if robot is moving, False otherwise
- Type:
Read-only property
- property motion_mode: MotionMode
Get current motion mode.
To change motion mode, use set_motion_mode() method.
- Returns:
Current MotionMode (OMNIDIRECTIONAL or DIFFERENTIAL)
- Type:
Read-only property
- property velocity: ndarray
Get current velocity vector.
- Returns:
Velocity [vx, vy, vz] in m/s (copy of internal state)
- Type:
Read-only property
- property angular_velocity: float
Get current angular velocity.
- Returns:
Angular velocity in rad/s
- Type:
Read-only property
- property goal_pose: Pose | None
Get current goal pose.
To set a new goal, use set_goal_pose() or set_path().
- Returns:
Current goal Pose, or None if no goal is set
- Type:
Read-only property
- property current_waypoint_index: int
Get current waypoint index in the path.
- Returns:
Current waypoint index (0-based), or 0 if no path is set
- Type:
Read-only property
- property path: List[Pose]
Current waypoint path. Empty when idle.
- Type:
Read-only property
- classmethod from_params(spawn_params, sim_core=None)
Create an Agent from AgentSpawnParams.
This is the recommended way to spawn robots when using AgentSpawnParams. Automatically detects whether to use mesh, URDF, or virtual agent based on spawn_params.
- Parameters:
spawn_params (
AgentSpawnParams) – AgentSpawnParams instance with all robot parameterssim_core – Reference to simulation core (optional)
- Return type:
Agent- Returns:
Agent instance
Example (Mesh):
params = AgentSpawnParams( visual_shape=ShapeParams( shape_type="mesh", mesh_path="robot.obj", mesh_scale=[1.0, 1.0, 1.0], rgba_color=[0.2, 0.2, 0.2, 1.0] ), collision_shape=ShapeParams( shape_type="box", half_extents=[0.2, 0.1, 0.2] ), initial_pose=Pose.from_xyz(1.0, 2.0, 0.0), max_linear_vel=3.0 ) robot = Agent.from_params(spawn_params=params)
Example (URDF):
params = AgentSpawnParams( urdf_path="arm_robot.urdf", initial_pose=Pose.from_xyz(0.0, 0.0, 0.0), use_fixed_base=True ) robot = Agent.from_params(spawn_params=params)
Example (Virtual Agent - invisible, no collision):
params = AgentSpawnParams( initial_pose=Pose.from_xyz(0.0, 0.0, 0.0), max_linear_vel=2.0 ) virtual_agent = Agent.from_params(spawn_params=params)
- classmethod from_dict(config, sim_core=None)
Create an Agent from a raw config dict.
Combines
AgentSpawnParams.from_dict()andfrom_params()in a single call. Subclasses can override to use a customSpawnParamsclass.
- classmethod from_mesh(visual_shape=None, collision_shape=None, pose=None, mass=0.0, max_linear_vel=2.0, max_linear_accel=5.0, max_angular_vel=3.0, max_angular_accel=10.0, motion_mode=MotionMode.DIFFERENTIAL, use_fixed_base=False, collision_mode=CollisionMode.NORMAL_3D, sim_core=None, name=None, user_data=None)
Create a mesh-based Agent with flexible shape control.
- Parameters:
visual_shape (
Optional[ShapeParams]) – ShapeParams for visual geometrycollision_shape (
Optional[ShapeParams]) – ShapeParams for collision geometrypose (
Pose) – Initial Pose (position and orientation)mass (
float) – Robot mass (kg), 0.0 for kinematic controlmax_linear_vel (
Union[float,List[float]]) – Maximum velocity (m/s) - float or [vx, vy, vz]max_linear_accel (
Union[float,List[float]]) – Maximum acceleration (m/s²) - float or [ax, ay, az]max_angular_vel (
Union[float,List[float]]) – Maximum angular velocity (rad/s)max_angular_accel (
Union[float,List[float]]) – Maximum angular acceleration (rad/s²)motion_mode (
Union[MotionMode,str]) – MotionMode.OMNIDIRECTIONAL or MotionMode.DIFFERENTIALuse_fixed_base (
bool) – If True, robot base is fixed and doesn’t movesim_core – Reference to simulation core
- Return type:
Agent- Returns:
Agent instance
Example:
# Mesh visual with rotated frame + box collision agent = Agent.from_mesh( visual_shape=ShapeParams( shape_type="mesh", mesh_path="robot.obj", mesh_scale=[1.0, 1.0, 1.0], rgba_color=[0.2, 0.2, 0.2, 1.0], frame_pose=Pose.from_euler(0, 0, 0, roll=np.pi/2, yaw=np.pi/2) ), collision_shape=ShapeParams( shape_type="box", half_extents=[0.2, 0.1, 0.2] ), pose=Pose.from_xyz(0, 0, 0.5), max_linear_vel=2.0, motion_mode=MotionMode.DIFFERENTIAL )
- classmethod from_urdf(urdf_path, pose=None, mass=None, max_linear_vel=2.0, max_linear_accel=5.0, max_angular_vel=3.0, max_angular_accel=10.0, motion_mode=MotionMode.DIFFERENTIAL, use_fixed_base=False, collision_mode=CollisionMode.NORMAL_3D, sim_core=None, name=None, user_data=None, ik_params=None, joint_tolerance=None)
Create a URDF-based Agent (with joints).
- Parameters:
urdf_path (
str) – Robot model name (e.g."panda") or path to URDF file. Names are resolved viaresolve_urdf()pose (
Pose) – Initial Pose (position and orientation). Defaults to originmass (
Optional[float]) – Mass override. - None (default): Use URDF file’s mass values as-is - 0.0: Override all links to mass=0 for kinematic control (no physics)max_linear_vel (
Union[float,List[float]]) – Maximum velocity (m/s) - float or [vx, vy, vz]max_linear_accel (
Union[float,List[float]]) – Maximum acceleration (m/s²) - float or [ax, ay, az]max_angular_vel (
Union[float,List[float]]) – Maximum angular velocity (rad/s)max_angular_accel (
Union[float,List[float]]) – Maximum angular acceleration (rad/s²)motion_mode (
Union[MotionMode,str]) – MotionMode.OMNIDIRECTIONAL or MotionMode.DIFFERENTIALuse_fixed_base (
bool) – If True, robot base is fixed in spacesim_core – Reference to simulation core
ik_params (
Optional[IKParams]) – IK solver configuration (default:IKParams()).joint_tolerance (
Union[float,list,dict,None]) – Default joint tolerance for convergence. Seejoint_toleranceproperty for details.
- Return type:
Agent- Returns:
Agent instance
Note
mass=None (default): Uses URDF file’s mass values for physics simulation
mass=0.0: Override all links to mass=0 for kinematic control (no physics)
Example:
# Use URDF mass values (physics enabled) — mass defaults to None robot = Agent.from_urdf(urdf_path="arm_robot.urdf") # Resolve by model name robot = Agent.from_urdf(urdf_path="panda", use_fixed_base=True) # Kinematic control (no physics) robot = Agent.from_urdf(urdf_path="arm_robot.urdf", mass=0.0)
- set_goal_pose(goal)
Set goal pose for the robot to move to.
Note: This is ignored if the robot has a fixed base.
- Parameters:
goal (
Pose) – Target Pose (compatible with ROS2 geometry_msgs/Pose)
- set_motion_mode(mode)
Set motion mode for the robot.
Creates and assigns the matching controller (OmniController or DifferentialController). Also initializes legacy TPI instances for backward compatibility.
Note: Motion mode cannot be changed while the robot is moving.
- set_path(path, auto_approach=True, final_orientation_align=True, direction=MovementDirection.FORWARD)
Set a path (list of waypoints) for the robot to follow.
- Parameters:
path (
List[Pose]) – List of Pose waypoints to follow in sequenceauto_approach (
bool) – If True, automatically add straight-line approach from current position to first waypoint if distance > threshold (default: True)final_orientation_align (
bool) – If True, add final rotation to match last waypoint’s orientation after reaching the position (default: True)direction (
Union[MovementDirection,str]) – Movement direction - MovementDirection.FORWARD (face movement direction) or MovementDirection.BACKWARD (maintain current orientation, move backward). Can also accept string “forward” or “backward” for convenience. Note: Only used in differential drive mode (MotionMode.DIFFERENTIAL). In omnidirectional mode, this parameter is ignored.
- add_action(action)
Add an action to the execution queue.
Actions are executed sequentially in the order they are added.
- Parameters:
action (
Action) – Action instance to add to queue
Example:
agent.add_action(MoveAction(path=my_path)) agent.add_action(WaitAction(duration=5.0))
- add_action_sequence(actions)
Add multiple actions to the execution queue.
- Parameters:
actions (
List[Action]) – List of Action instances to add
Example:
agent.add_action_sequence([ MoveAction(path=path1), PickAction(target_object_id=obj.body_id), MoveAction(path=path2), DropAction(drop_pose=Pose(position=[10, 5, 0])) ])
- clear_actions()
Clear all actions from the queue and cancel current action.
- get_current_action()
Get the currently executing action.
- Return type:
Optional[Action]- Returns:
Current Action instance, or None if no action is executing
- get_action_queue_size()
Get the number of actions waiting in the queue.
- Return type:
- Returns:
Number of actions in queue
- is_action_queue_empty()
Check if action queue is empty and no action is currently executing.
- Return type:
- Returns:
True if no actions are queued or executing, False otherwise
- update(dt)
Update robot position towards goal with velocity/acceleration constraints.
Supports two motion modes: - MotionMode.OMNIDIRECTIONAL: Move in any direction without rotating first (smooth trajectories) - MotionMode.DIFFERENTIAL: Rotate to face target, then move forward (smooth trajectories)
Also processes action queue for high-level task execution.
Note: This does nothing if the robot has a fixed base.
- get_velocity()
Get current velocity vector.
- Return type:
- Returns:
Velocity [vx, vy, vz] in m/s
- set_controller(controller=None)
Set or clear the movement controller (Strategy pattern).
When a controller is set,
update()delegates movement tocontroller.compute(agent, dt).Note: A default controller is always assigned in
from_params(). SettingNonedisables all movement.
- stop()
Stop robot movement and clear goal and path.
- is_urdf_robot()
Check if this robot is URDF-based (has joints).
- Return type:
- get_num_joints()
Get number of joints (0 for mesh robots).
- Return type:
- get_joint_state(joint_index)
Get joint state (position and velocity).
For kinematic robots (mass=0), returns the cached position and velocity=0.0, avoiding a PyBullet C call. For physics robots the value is read from
p.getJointState().- Parameters:
joint_index (
int) – Joint index (0-based)- Return type:
- Returns:
(position, velocity) tuple
Example:
pos, vel = robot.get_joint_state(0)
- get_joint_state_by_name(joint_name)
Get (position, velocity) for a single joint by name. Returns (position, velocity) or (0.0, 0.0) if not found.
- Return type:
- get_all_joints_state()
Return a list of (position, velocity) for all joints. Example: [(pos0, vel0), (pos1, vel1), …]
- Return type:
- get_all_joints_state_by_name()
Return a dict of {joint_name: (position, velocity)} for all joints. Example: {joint_name: (pos, vel), …}
- Return type:
- get_joints_state_by_name(joint_names)
Return a dict of {joint_name: (position, velocity)} for the specified joint_names. Example: {joint_name: (pos, vel), …}
- Return type:
- set_joint_target(joint_index, target_position, max_force=500.0)
Set target position for a joint (for URDF robots only).
- Parameters:
Example:
robot.set_joint_target(0, 1.57) # Move first joint to 90 degrees
- set_joint_target_by_name(joint_name, target_position, max_force=500.0)
Set target position for a joint by joint name.
- set_all_joints_targets(target_positions, max_force=500.0)
Set target positions for all joints at once.
- Parameters:
Example:
robot.set_all_joints_targets([0.0, 1.57, -1.57, 0.0])
- set_joints_targets_by_name(joint_targets, max_force=500.0)
Set multiple joint targets by dict {joint_name: target_position} (partial update allowed).
- set_joints_targets(targets, max_force=500.0)
Set joint targets (accepts both list and dict).
- Parameters:
Example:
# Using list robot.set_joints_targets([0.0, 1.57, -1.57, 0.0]) # Using dict robot.set_joints_targets({"joint1": 1.57, "joint2": -1.57})
- is_joint_at_target(joint_index, target, tolerance=None)
Check if a single joint (by index) is within tolerance of the target position.
- Parameters:
joint_index (
int) – Joint index (0-based).target (
float) – Target position (radians for revolute, metres for prismatic).tolerance (
Union[float,list,dict,None]) – Position tolerance —float,list,dict, orNone(→self.joint_tolerance). Resolved via_resolve_joint_tolerance(tolerance, joint_index).
- Return type:
- is_joint_at_target_by_name(joint_name, target, tolerance=None)
Check if a single joint (by name) is within tolerance of the target position.
- Parameters:
joint_name (
str) – Joint name string.target (
float) – Target position (radians for revolute, metres for prismatic).tolerance (
Union[float,list,dict,None]) – Position tolerance —float,list,dict, orNone(→self.joint_tolerance). Resolved via_resolve_joint_tolerance()using the joint’s index.
- Return type:
- are_all_joints_at_targets(target_positions=None, tolerance=None)
Check if joints are at target positions.
- Parameters:
target_positions (
Optional[list]) – List of target positions for each joint. If None, uses_last_joint_targets(last commanded targets).tolerance (
Union[float,list,dict,None]) –float,list,dict, orNone. If None, usesself.joint_tolerance. Passed through to_resolve_joint_tolerance()per joint.Note
A scalar tolerance applies the same value to all joints. For mixed prismatic (metres) / revolute (radians) robots, consider a
dictkeyed by joint name or setself.joint_toleranceto a dict.
- Return type:
- Returns:
True if all joints are within tolerance of their targets.
- are_joints_at_targets_by_name(joint_targets, tolerance=None)
Check if all specified joints (by name) are within tolerance.
- Parameters:
joint_targets (
dict) –{joint_name: target}tolerance (
Union[float,list,dict,None]) –float(all),dict({joint_name: tol}), orNone(→self.joint_tolerance).A
listis also accepted but is resolved by absolute joint index (not by iteration order of joint_targets). For named-joint subsets, prefer adictto avoid unexpected index-based fallback.For mixed prismatic / revolute chains, use a
dictso prismatic joints (metres) can have a tighter tolerance than revolute joints (radians).
- Return type:
- Returns:
True if all joints are within tolerance, False otherwise.
- are_joints_at_targets(targets=None, tolerance=None)
Check if joints are at target positions.
- Parameters:
targets (
Union[list,dict,None]) – List of target positions for all joints, dict{joint_name: position}, or None to use the last commanded targets (_last_joint_targets).tolerance (
Union[float,list,dict,None]) –Tolerance value(s) —
float,list,dict, or None (→self.joint_tolerance).For mixed prismatic / revolute chains, use a
dictso prismatic joints (metres) can have a tighter tolerance than revolute joints (radians):agent.are_joints_at_targets( targets, tolerance={"rail_joint": 0.005, "elbow_to_wrist": 0.05}, )
- Return type:
- Returns:
True if all joints are within tolerance, False otherwise.
Example:
# No args — check last commanded targets with agent's default tolerance if robot.are_joints_at_targets(): print("All joints settled") # Using list if robot.are_joints_at_targets([0.0, 1.57, -1.57, 0.0], tolerance=0.01): print("Reached target") # Using dict if robot.are_joints_at_targets({"joint1": 1.57, "joint2": -1.57}, tolerance=0.01): print("Reached target")
- property ik_params: IKParams
Read-only access to the IK solver configuration.
- move_end_effector(target_position, target_orientation=None, end_effector_link=None, max_force=500.0, tolerance=None)
Set end-effector target position via inverse kinematics.
Solves IK, checks reachability, and calls
set_all_joints_targets()internally. Actual movement happens inupdate()(fire-and-forget, likeset_joint_target()).The joint targets are always set (best-effort), even when the target is unreachable. The return value tells the caller whether the IK solution actually reaches the desired position.
- Parameters:
target_position (
List[float]) – Target EE position [x, y, z] in world frame.target_orientation (
Optional[Tuple[float,float,float,float]]) – Target EE orientation as quaternion [qx, qy, qz, qw].end_effector_link (
Union[int,str,None]) – End-effector link (int, str, or None for auto-detect).max_force (
float) – Maximum force for motor control (physics mode).tolerance (
Optional[float]) – Euclidean distance (m) for reachability check. If None, usesik_params.reachability_tolerance.
- Return type:
- Returns:
True if the IK solution places the EE within tolerance of the target (reachable). False otherwise (best-effort targets are still set).
Example:
reachable = arm.move_end_effector([0.3, 0.0, 0.5]) if not reachable: print("Target may not be fully reachable")
- are_ee_at_target(target_position, target_orientation=None, end_effector_link=None, tolerance=0.02, orientation_tolerance=0.15)
Check whether the end-effector is near the target pose.
Resolves the EE link and delegates to
_check_ee_pose().- Parameters:
target_position (
List[float]) – Target EE position [x, y, z] in world frame.target_orientation (
Optional[Tuple[float,float,float,float]]) – Target EE orientation as quaternion [qx, qy, qz, qw]. If None, only position is checked.end_effector_link (
Union[int,str,None]) – Link index, name, or None (auto-detect).tolerance (
float) – Maximum Euclidean distance (m) to consider reached.orientation_tolerance (
float) – Maximum1 - |dot|between actual and target quaternions (default 0.15).
- Return type:
- Returns:
True if the EE is within tolerance of the target.
- update_attached_objects_kinematics()
For URDF robots: update attached objects’ pose to follow the parent link (kinematics attach). This is called every step for kinematic (mass=0) attached objects.
pybullet_fleet.agent_manager module
core/agent_manager.py Management classes for simulation objects and agents. - SimObjectManager: Base manager for all simulation objects - AgentManager: Extended manager with movement control for agents
- class pybullet_fleet.agent_manager.GridSpawnParams(x_min, x_max, y_min, y_max, spacing, offset, z_min=0, z_max=0)
Bases:
objectGrid parameters for object spawning in 3D space.
- x_min
Minimum X grid index
- x_max
Maximum X grid index
- y_min
Minimum Y grid index
- y_max
Maximum Y grid index
- spacing
Grid spacing [spacing_x, spacing_y, spacing_z] in meters
- offset
Grid offset [offset_x, offset_y, offset_z] in meters
- z_min
Minimum Z grid index (defaults to 0)
- z_max
Maximum Z grid index (defaults to 0 for 2D grids)
Note
Objects spawn in X → Y → Z order. For 2D grids, set z_min=z_max=0 (default). For 3D grids, set z_max > z_min to enable vertical layers.
- x_min: int
- x_max: int
- y_min: int
- y_max: int
- z_min: int = 0
- z_max: int = 0
- classmethod from_dict(config)
Create GridSpawnParams from a configuration dictionary.
Required keys:
x_min,x_max,y_min,y_max,spacing,offset.Optional keys:
z_min(default 0),z_max(default 0).
- class pybullet_fleet.agent_manager.SimObjectManager(sim_core=None, object_class=<class 'pybullet_fleet.sim_object.SimObject'>, enable_profiling=False)
Bases:
Generic[T]Base manager class for simulation objects (SimObject and Agent).
This class provides centralized control of multiple simulation objects.
Features: - Grid-based object spawning - Query all object poses - Get object count
- Type Parameter:
T: Type of objects managed (must be SimObject or its subclass)
- __init__(sim_core=None, object_class=<class 'pybullet_fleet.sim_object.SimObject'>, enable_profiling=False)
Initialize SimObjectManager.
- Parameters:
sim_core – Reference to simulation core (optional)
object_class (
type) – Default class to instantiate when spawning (SimObject or a subclass such as Agent).enable_profiling (
bool) – IfTrue, spawn methods log elapsed time vialogger.info. DefaultFalse. Can be overridden later:mgr.enable_profiling = True.
- enable_profiling: bool
- objects: List[T]
- add_object(obj)
Add an object to the manager.
- Parameters:
obj (
TypeVar(T, bound=SimObject)) – SimObject or Agent instance to add- Return type:
- Returns:
None. The object is silently skipped (with a warning) when its
object_idis invalid (< 0), which typically means it was created without a sim_core.
Note
Objects are automatically registered to sim_core.sim_objects via SimObject.__init__(), so we don’t register them here again.
- spawn_objects_grid(num_objects, grid_params, spawn_params)
Spawn multiple objects in a grid pattern. This is a convenience wrapper around spawn_grid_counts for spawning a single object type with a specified count. :type num_objects:
int:param num_objects: Number of objects to spawn :type grid_params:GridSpawnParams:param grid_params: GridSpawnParams instance with grid configuration :type spawn_params:SimObjectSpawnParams:param spawn_params: SimObjectSpawnParams instance with object parameters
- spawn_grid_mixed(num_objects, grid_params, spawn_params_list)
Spawn multiple SimObjects in a grid pattern with mixed types.
This method allows spawning different types of objects with specified probabilities. If probabilities don’t sum to 1.0, some grid positions may remain empty.
- Parameters:
num_objects (
int) – Maximum number of objects to spawngrid_params (
GridSpawnParams) – GridSpawnParams instance with grid configurationspawn_params_list (
List[Tuple[SimObjectSpawnParams,float]]) – List of (spawn_params, probability) tuples - spawn_params: SimObjectSpawnParams for this type - probability: Spawn probability (0.0 to 1.0) Note: Probabilities don’t need to sum to 1.0
- Return type:
- Returns:
List of spawned SimObject instances
Example:
# Spawn 100 objects: 60% typeA, 30% typeB, 10% empty spawn_params_list = [ (typeA_params, 0.6), (typeB_params, 0.3), ] manager.spawn_grid_mixed(100, grid_params, spawn_params_list)
- spawn_grid_counts(grid_params, spawn_params_count_list)
Spawn objects on a grid according to the specified count for each type.
Each type is spawned exactly the requested number of times. Grid positions are randomly shuffled so that types are distributed uniformly across the grid.
- Parameters:
- Return type:
- Returns:
List of spawned objects
- Raises:
ValueError – If the total count exceeds the number of grid cells.
- spawn_objects_batch(params_list)
Batch API to spawn multiple object instances at once.
When sim_core provides
batch_spawn(), rendering is disabled and the spatial grid rebuild is deferred until all objects are created.
- spawn_from_config(entities_yaml)
Spawn entities from a list of config dicts.
Dispatches to
entity_cls.from_dict()for each entity, which internally calls the appropriateSpawnParams.from_dictandfrom_params. Custom entity classes can overridefrom_dictto use their ownSpawnParams.If a dict has no
typefield,"agent"is assumed for backward compatibility. Supported types:"agent","sim_object", or any custom type registered viaregister_entity_class().
- spawn_from_yaml(yaml_path, key='robots')
Load a YAML file and spawn entities from the specified section.
Reads yaml_path, extracts the key section (default
"robots"), and delegates tospawn_from_config().- Parameters:
- Return type:
- Returns:
List of spawned objects (empty if key is missing).
- Raises:
FileNotFoundError – If yaml_path does not exist.
- get_pose(object_index)
Get current pose of a specific object.
- get_all_poses()
Get current poses of all objects.
- Return type:
List[Pose]- Returns:
List of Pose objects for all objects
- get_poses_dict()
Get current poses of all objects as a dictionary.
- set_pose_all(pose_factory)
Set pose for all objects using a factory function.
Applies pose_factory to each object and immediately calls
set_posewith the returned value. Works for both SimObject and Agent instances.- Parameters:
pose_factory (
Callable[[TypeVar(T, bound=SimObject)],Pose]) – Function that takes an object and returns a target Pose.- Return type:
Example:
# Shift every object 1 m along X manager.set_pose_all(lambda obj: Pose.from_xyz( obj.get_pose().x + 1.0, obj.get_pose().y, obj.get_pose().z, ))
- get_object_count()
Get total number of managed objects.
- Return type:
- class pybullet_fleet.agent_manager.AgentManager(sim_core=None, update_frequency=10.0, object_class=<class 'pybullet_fleet.agent.Agent'>, enable_profiling=False)
Bases:
SimObjectManager[Agent]Extended manager class for agents with update callback system.
This class extends SimObjectManager with agent-specific features: - Grid-based agent spawning with AgentSpawnParams - Pose goal assignment to individual agents - Callback system for custom update logic (goals, state tracking, etc.) - Query moving/stopped agents
Key Features: - Auto-registers update callback to simulation loop - Callback receives (agents, manager, dt) for easy agent operations - Configurable update frequency - Direct access to manager methods within callback
Note: - Agent.update() is automatically called by MultiRobotSimulationCore every step - AgentManager callbacks are for high-level logic (goals, coordination, etc.) - Agent-specific state can be stored in each Agent.user_data dict
- __init__(sim_core=None, update_frequency=10.0, object_class=<class 'pybullet_fleet.agent.Agent'>, enable_profiling=False)
Initialize AgentManager.
- Parameters:
sim_core – Reference to simulation core (optional)
update_frequency (
float) – Default update callback frequency in Hz (default: 10.0)object_class (
type) – Class to instantiate when spawning (default: Agent). Pass an Agent subclass to manage custom agent types.enable_profiling (
bool) – IfTrue, spawn methods log elapsed time. DefaultFalse.
- spawn_agents_grid(num_agents, grid_params, spawn_params)
Spawn agents in a grid. Alias for
spawn_objects_grid().- Return type:
List[Agent]
- spawn_agents_grid_mixed(num_agents, grid_params, spawn_params_list)
Spawn mixed agent types. Alias for
spawn_grid_mixed().- Return type:
List[Agent]
- spawn_agent_grid_counts(grid_params, spawn_params_count_list)
Spawn exact counts per type. Alias for
spawn_grid_counts().- Return type:
List[Agent]
- set_goal_pose(agent_index, goal)
Set goal pose for a specific agent.
- Parameters:
agent_index (
int) – Index of agent in self.objects listgoal (
Pose) – Target Pose
- set_goal_pose_by_body_id(body_id, goal)
Set goal pose for an agent by its PyBullet body ID.
Note
This method uses PyBullet’s internal body_id for compatibility with direct PyBullet API usage. For manager-level operations, prefer using set_goal_pose_by_object_id() instead.
- Parameters:
body_id (
int) – PyBullet body IDgoal (
Pose) – Target Pose
- set_goal_pose_by_object_id(object_id, goal)
Set goal pose for an agent by its simulation object ID.
This is the preferred method for manager-level operations, as it uses the simulation’s unique object_id rather than PyBullet’s internal body_id.
- Parameters:
object_id (
int) – Simulation object ID (from SimObject.object_id)goal (
Pose) – Target Pose
- stop_all()
Stop all agents and clear their goals.
- set_goal_pose_all(goal_factory)
Set goal pose for all agents using a factory function.
This helper method simplifies bulk goal assignment by applying a factory function to each agent. The factory can use agent-specific data (current pose, user_data, etc.) to create customized goals.
- Parameters:
goal_factory (
Callable[[Agent],Pose]) – Function that takes an Agent and returns a Pose- Return type:
Example:
# Set all agents to move to their designated home positions def get_home_goal(robot): return robot.user_data['home_position'] manager.set_goal_pose_all(get_home_goal) # Move all agents 5 meters forward from their current position def move_forward(robot): current_pos = robot.get_pose().position return Pose.from_xyz(current_pos[0] + 5, current_pos[1], current_pos[2]) manager.set_goal_pose_all(move_forward)
Note
This is a convenience wrapper around individual set_goal_pose calls. No performance benefit over explicit loops, but improves code clarity.
- set_joints_targets_all(targets_factory, max_force=500.0)
Set joint targets for all agents using a factory function.
This helper method simplifies bulk joint control by applying a factory function to each agent. Useful for coordinated multi-robot arm movements.
- Parameters:
- Return type:
Example:
# Set all robot arms to neutral position def get_neutral_joints(robot): return [0.0, 0.0, 0.0, 0.0] manager.set_joints_targets_all(get_neutral_joints) # Set custom positions based on robot index def get_custom_joints(robot): idx = robot.user_data.get('index', 0) return [idx * 0.1, 0.0, 0.0, 0.0] manager.set_joints_targets_all(get_custom_joints, max_force=1000.0)
Note
Only applicable to agents with controllable joints (robot arms, etc.). Mobile robots without arm joints will ignore this command.
- get_moving_count()
Get number of agents currently moving.
- Return type:
- add_action_sequence_all(action_factory)
Apply action sequence to all agents using a factory function.
This helper method simplifies bulk action assignment by applying a factory function to each agent. The factory can use agent-specific data (pose, user_data, etc.) to create customized action sequences.
- Parameters:
action_factory (
Callable[[Agent],List[Any]]) – Function that takes an Agent and returns a list of Actions- Return type:
Example:
def create_pick_drop_sequence(robot): target = robot.user_data['target_object'] return [ PickAction(target_object_id=target.body_id), MoveAction(path=Path.from_positions([[10, 10, 0]])), DropAction(drop_pose=Pose(position=[10, 10, 0])) ] # Apply to all agents at once manager.add_action_sequence_all(create_pick_drop_sequence)
Note
This is a convenience wrapper. Internally it still loops through agents, so there’s no performance benefit over explicit loops. The main advantage is code clarity and reduced boilerplate.
- add_action_all(action_factory)
Add a single action to all agents using a factory function.
Similar to add_action_sequence_all, but adds a single action instead of a sequence.
- Parameters:
action_factory (
Callable[[Agent],Any]) – Function that takes an Agent and returns a single Action- Return type:
Example:
def create_move_action(robot): goal_pos = robot.user_data['next_position'] return MoveAction(path=Path.from_positions([goal_pos])) manager.add_action_all(create_move_action)
- register_callback(callback, frequency=None)
Register a callback for custom agent update logic.
Multiple callbacks can be registered, each with their own frequency. This method automatically registers with sim_core if available.
The callback provides access to the AgentManager instance, making it easy to: - Access all agents via manager.objects - Use manager methods (set_goal_pose, query states, etc.) - Store manager-level state in agent.user_data
- The callback should have signature:
callback(manager: AgentManager, dt: float) -> None
- Parameters:
Example (Goal management):
def goal_update_logic(manager, dt): for agent in manager.objects: if not agent.is_moving: # Set new goal for stopped agents new_goal = calculate_next_goal(agent) agent.set_goal_pose(new_goal) manager.register_callback(goal_update_logic, frequency=4.0)
Example (State tracking):
def state_tracker(manager, dt): for agent in manager.objects: # Track agent statistics if 'total_distance' not in agent.user_data: agent.user_data['total_distance'] = 0.0 agent.user_data['total_distance'] += np.linalg.norm(agent.velocity) * dt manager.register_callback(state_tracker, frequency=10.0)
Comparison with sim_core.register_callback():
AgentManager callback: Provides manager reference and filtered agent list
sim_core callback: Provides sim_core reference for all objects
Use AgentManager for agent-specific operations
Use sim_core for general simulation-wide operations
- setup_camera(camera_config=None)
Setup camera to view all managed objects in the simulation.
This method automatically extracts positions from all managed objects and configures the camera to fit them in the view.
- Parameters:
camera_config (
Optional[Dict]) – Dictionary with camera settings (from yaml config). If None, uses default settings.- Return type:
Example:
# Basic usage with default settings agent_manager.setup_camera() # With custom camera config camera_config = { 'camera_mode': 'auto', 'camera_view_type': 'top_down', 'camera_auto_scale': 0.8 } agent_manager.setup_camera(camera_config)
Note
Requires sim_core to be set during AgentManager initialization. Only works when GUI is enabled.
pybullet_fleet.config_utils module
config_utils.py Utility functions for loading and merging configuration files.
- pybullet_fleet.config_utils.load_yaml_config(config_path)
Load a YAML config file and return parsed contents.
General-purpose YAML loader. Works for any YAML config file (simulation, bridge, spawn definitions, etc.).
- Parameters:
config_path (
str) – Path to YAML config file.- Return type:
- Returns:
Parsed config dict.
- Raises:
FileNotFoundError – If config_path does not exist.
- pybullet_fleet.config_utils.merge_configs(base_config, override_config)
Recursively merge two configuration dictionaries.
Values in override_config take precedence over base_config.
- pybullet_fleet.config_utils.load_config(config_paths)
Load and merge configuration from one or more YAML files.
If multiple paths are provided, later configs override earlier ones. Delegates to
load_yaml_config()for each file.- Parameters:
config_paths (
Union[str,List[str]]) – Single path string or list of paths to configuration files. Later configs in the list override earlier ones.- Return type:
- Returns:
Merged configuration dictionary
Examples
# Single config config = load_config(‘config.yaml’)
# Multiple configs (100robots_config overrides config.yaml) config = load_config([‘config.yaml’, ‘100robots_config.yaml’])
pybullet_fleet.core_simulation module
core_simulation.py Reusable core simulation logic for multi-robot PyBullet environments. Integrates generation, management, transport, attach/detach, collision detection, coordinate conversion, occupied judgment, transport path generation, debugging, monitoring, and log control for various robots, pallets, and meshes.
- class pybullet_fleet.core_simulation.SimulationParams(target_rtf=1.0, timestep=0.1, duration=0, gui=True, physics=False, monitor=True, enable_monitor_gui=True, collision_check_frequency=None, log_level='warn', max_steps_per_frame=10, gui_min_fps=30, enable_collision_shapes=False, enable_structure_transparency=False, enable_shadows=True, enable_gui_panel=False, ignore_static_collision=True, enable_time_profiling=False, profiling_interval=100, enable_memory_profiling=False, enable_collision_color_change=False, spatial_hash_cell_size_mode=SpatialHashCellSizeMode.AUTO_INITIAL, spatial_hash_cell_size=None, collision_detection_method=None, collision_margin=0.02, multi_cell_threshold=1.5, enable_floor=True, camera_config=None, model_paths=None, window_width=1024, window_height=768, monitor_width=200, monitor_height=290, monitor_x=-1, monitor_y=-1)
Bases:
objectSimulation configuration parameters.
All fields have sensible defaults for headless kinematics-based simulation. Use
from_config/from_dictto load from YAML files.- target_rtf: float = 1.0
- timestep: float = 0.1
- duration: float = 0
- gui: bool = True
- physics: bool = False
- monitor: bool = True
- enable_monitor_gui: bool = True
- log_level: str = 'warn'
- max_steps_per_frame: int = 10
- gui_min_fps: int = 30
- enable_collision_shapes: bool = False
- enable_structure_transparency: bool = False
- enable_shadows: bool = True
- enable_gui_panel: bool = False
- ignore_static_collision: bool = True
- enable_time_profiling: bool = False
- profiling_interval: int = 100
- enable_memory_profiling: bool = False
- enable_collision_color_change: bool = False
- spatial_hash_cell_size_mode: SpatialHashCellSizeMode = 'auto_initial'
- collision_detection_method: CollisionDetectionMethod | None = None
- collision_margin: float = 0.02
- multi_cell_threshold: float = 1.5
- enable_floor: bool = True
- window_width: int = 1024
- window_height: int = 768
- monitor_width: int = 200
- monitor_height: int = 290
- monitor_x: int = -1
- monitor_y: int = -1
- classmethod from_config(config_path='config.yaml')
- Return type:
SimulationParams
- class pybullet_fleet.core_simulation.MultiRobotSimulationCore(params, collision_color=None)
Bases:
object- classmethod from_yaml(yaml_path='config.yaml')
- Return type:
MultiRobotSimulationCore
- classmethod from_dict(config)
Create MultiRobotSimulationCore from a configuration dictionary.
- sim_time: float
- setup_monitor()
- Return type:
- property sim_objects: List[SimObject]
List of all simulation objects (Agent, SimObject, etc.).
- property agents: List[Agent]
List of Agent instances.
- property params: SimulationParams
Simulation parameters.
- property step_count: int
Current simulation step count.
- property collision_count: int
Total number of collisions detected.
- property enable_memory_profiling: bool
Whether memory profiling is enabled.
- register_callback(callback, frequency=None)
Register a callback function to be called during simulation.
- Parameters:
- Return type:
Example:
def my_callback(sim_core, dt): for obj in sim_core.sim_objects: if isinstance(obj, Agent): # Update agent logic pass sim_core.register_callback(my_callback, frequency=10.0) # 10 Hz
- unregister_callback(callback_func)
Remove a registered callback by function reference.
- start_recording(output='recording.gif', width=800, height=600, fps=15, duration=None, camera_mode='auto', time_base='sim', **kwargs)
Start recording the simulation as an animated GIF or MP4.
Frames are captured via
getCameraImage(ER_TINY_RENDERER)at the specified fps frequency. Works in bothp.DIRECTandp.GUI.Output format is auto-detected from the file extension:
.gif— Animated GIF via Pillow (always available).mp4— H.264 MP4 via imageio + pyav (requirespip install imageio[pyav])
Time bases:
"sim"(default) — Capture at fixed sim-time intervals. Playback always looks like 1× speed."real"— Capture at wall-clock intervals. Playback reflects actual execution speed (e.g. 10× sim → 10× video).
- Parameters:
output (
str) – Output file path (.gifor.mp4). Parent directories created automatically.width (
int) – Frame width in pixels.height (
int) – Frame height in pixels.fps (
int) – Capture frequency and playback FPS.duration (
Optional[float]) – Recording duration in sim-seconds (time_base="sim") or wall-clock seconds (time_base="real").Nonefor manual stop.camera_mode (
str) –"auto"|"gui"|"orbit"|"manual"."auto"is promoted to"gui"when running in GUI mode.time_base (
str) –"sim"or"real". Controls capture timing.**kwargs (
Any) – Passed toSimulationRecorder(e.g.orbit_degrees,camera_params).
- Return type:
- Returns:
SimulationRecorderinstance.
Example:
sim.start_recording("output.gif", duration=4.0) sim.start_recording("output.mp4", duration=4.0, fps=30) sim.start_recording("perf.mp4", duration=4.0, time_base="real") sim.run_simulation(duration=5.0) # File is auto-saved when run_simulation ends
- stop_recording()
Stop recording and save the output file.
- set_collision_check_frequency(frequency=None)
Set the frequency (Hz, number of times per second) for collision detection.
- Parameters:
frequency (
Optional[float]) – Collision check frequency in Hz (times per second) - None: Check every step (highest accuracy, highest cost) - 0: Disable collision checking (no collision detection) - > 0: Check at specified frequency (e.g., 1.0 = once per second)- Return type:
Example:
sim.set_collision_check_frequency(None) # Every step sim.set_collision_check_frequency(1.0) # 1 Hz (once per second) sim.set_collision_check_frequency(10.0) # 10 Hz sim.set_collision_check_frequency(0) # Disable
Note: Higher frequency increases computational cost but improves collision detection accuracy.
- setup_pybullet()
Initialize PyBullet with GUI panels hidden.
- Return type:
- disable_rendering()
Disable rendering during object spawning for better performance.
- Return type:
- enable_rendering()
Enable rendering before starting simulation.
- Return type:
- batch_spawn()
Context manager for optimised batch spawning.
Disables rendering and defers spatial-grid rebuild until the context exits. Use when spawning many objects at once.
Rendering state is saved on entry and restored on exit. If rendering was already disabled (e.g. during the initial setup phase before
run_simulation()), it stays disabled afterbatch_spawn()exits.Example:
with sim_core.batch_spawn(): for params in params_list: Agent.from_params(params, sim_core) # rendering restored to previous state, spatial grid rebuilt once
- set_collision_spatial_hash_cell_size_mode(mode=None, cell_size=None)
Set collision detection spatial hash cell size mode and recalculate/rebuild grid.
This method allows dynamic mode switching and always recalculates the cell_size based on the spatial_hash_cell_size_mode setting (or override via parameters).
- Parameters:
- Return type:
- Returns:
The calculated cell_size (always > 0, falls back to 1.0 if no AABBs available)
Example:
# Recalculate with current mode sim_core.set_collision_spatial_hash_cell_size_mode() # Switch to constant mode with specific size sim_core.set_collision_spatial_hash_cell_size_mode( mode=SpatialHashCellSizeMode.CONSTANT, cell_size=3.0 ) # Switch to auto_adaptive mode sim_core.set_collision_spatial_hash_cell_size_mode( mode=SpatialHashCellSizeMode.AUTO_ADAPTIVE )
- add_object(obj)
Add an object to simulation and register it with all necessary caches.
This method centralizes object registration logic for consistency and maintainability. Called automatically by SimObject.__init__() when sim_core is provided.
- Parameters:
obj (
SimObject) – The SimObject instance to add- Return type:
Example:
# Manual addition (not recommended - SimObject does this automatically) obj = SimObject(body_id=body_id, sim_core=None) sim_core.add_object(obj) # Automatic addition (recommended) obj = SimObject(body_id=body_id, sim_core=sim_core) # Calls add_object() internally
- remove_object(obj)
Remove an object from simulation and clean up all caches.
- Parameters:
obj (
SimObject) – The SimObject instance to remove- Return type:
Example:
obj = sim_core.spawn_robot(...) # ... later ... sim_core.remove_object(obj)
- configure_visualizer(enable_structure_transparency=None, enable_shadows=None)
Configure PyBullet visualizer settings with keyboard control.
- Parameters:
- Return type:
Custom keyboard shortcuts (active during simulation): - Press SPACE to pause/play simulation - Press ‘t’ to toggle structure transparency ON/OFF
PyBullet built-in keyboard shortcuts (always available): - Press ‘w’ to toggle wireframe / collision shape display - Press ‘g’ to toggle grid display - Press ‘j’ to toggle joint axes display
- compute_scene_bounds()
Compute scene bounding box from all sim_object positions.
Returns a
(center, extent)tuple where center is the mean position[cx, cy, cz]and extent is the axis-aligned size[ex, ey, ez](i.e.max - minper axis).When the scene is empty both lists are
[0.0, 0.0, 0.0].This is used by
setup_camera()(GUI) andSimulationRecorder(offscreen) so that both share the same scene-framing logic.
- setup_camera(camera_config=None, entity_positions=None)
Set up camera view based on configuration.
- get_aabbs()
Get AABBs for all simulation objects.
- filter_aabb_pairs(ignore_static=None, moved_objects=None, return_profiling=False)
Filter AABB pairs for collision detection with moved-based optimization. Only checks pairs involving moved objects.
- Parameters:
ignore_static (
Optional[bool]) – If True, skip collision checks with static objectsmoved_objects (
Optional[Set[int]]) – Set of object IDs that moved since last check. If None, uses self._moved_this_step (default behavior)return_profiling (
bool) – If True, include timing information in return value
- Returns:
pairs: List of (obj_id_i, obj_id_j) tuples for candidate collision pairs
timings: dict with get_aabbs, spatial_hashing, aabb_filtering (in ms) if return_profiling=True, else None
- Return type:
Tuple of (pairs, timings)
Example:
# Without profiling pairs, _ = sim.filter_aabb_pairs() # With profiling pairs, timings = sim.filter_aabb_pairs(return_profiling=True) print(f"AABB filtering took {timings['total']:.2f}ms")
- check_collisions(collision_color=None, ignore_static=None, return_profiling=False)
Check for collisions between robots using incremental updates.
Only re-checks collision pairs involving moved objects for efficiency. Maintains _active_collision_pairs as persistent state.
- Parameters:
- Returns:
pairs: List of currently active collision pairs (obj_id_i, obj_id_j)
timings: dict with get_aabbs, spatial_hashing, aabb_filtering, contact_points, total (in ms) if return_profiling=True, else None
- Return type:
Tuple of (pairs, timings)
Example:
# Without profiling active_pairs, _ = sim.check_collisions() # With profiling active_pairs, timings = sim.check_collisions(return_profiling=True) print(f"Collision check took {timings['total']:.2f}ms")
- get_active_collision_pairs()
Get currently active collision pairs.
- Return type:
- Returns:
List of (object_id_i, object_id_j) tuples for all active collisions. Pairs are sorted such that object_id_i < object_id_j.
Example:
# Check if any collisions are active if sim_core.get_active_collision_pairs(): print("Collisions detected!") # Get detailed collision info for obj_id_i, obj_id_j in sim_core.get_active_collision_pairs(): obj_i = sim_core._sim_objects_dict[obj_id_i] obj_j = sim_core._sim_objects_dict[obj_id_j] print(f"Collision: {obj_i} <-> {obj_j}")
- update_monitor()
Update simulation monitor with current statistics.
Calculates actual simulation speed from a 10-second sliding window, logs monitor data via logger, and writes to DataMonitor if enabled.
- Return type:
- initialize_simulation()
Initialize simulation state before running.
This method prepares the simulation for execution by: - Resetting simulation counters (step_count, collision_count, sim_time) - Clearing movement tracking state - Configuring visualizer settings (transparency, collision shapes) - Enabling rendering for GUI mode
Called automatically by run_simulation(), but can also be called manually for custom simulation loops or when restarting simulation.
Example:
# Automatic (recommended) sim.run_simulation(duration=10.0) # Manual initialization for custom loop sim.initialize_simulation() while custom_condition: sim.step_once()
- Return type:
- reset()
Reset simulation to a clean state (no objects, fresh PyBullet world).
Removes all objects from the simulation, resets PyBullet, re-applies physics-engine parameters, optionally reloads the ground plane (only when
enable_flooris True), and re-initialises counters viainitialize_simulation().After calling
reset()the simulation is ready for new objects to be spawned. Callbacks registered viaregister_callback()are preserved (theirlast_execis zeroed byinitialize_simulation).Example:
sim.reset() # All objects removed, PyBullet world cleared agent = Agent.from_params(params, sim) sim.step_once()
- Return type:
- run_simulation(duration=None)
Run the simulation for a specified duration.
- Parameters:
duration (
Optional[float]) – Simulation duration in seconds (simulation time, not real time). If None, uses self._params.duration. If duration <= 0, runs indefinitely.- Return type:
Example:
# Run for 10 seconds (simulation time) sim.run_simulation(duration=10.0) # Run indefinitely (until Ctrl+C or GUI closed) sim.run_simulation(duration=0)
- record_profiling(name, value_ms)
Record a custom profiling measurement (in milliseconds).
Call this from within Agent.update() or callback code to accumulate timing data. Multiple calls per step are summed (e.g., 100 agents each recording their custom_logic time). Auto-registers the field on first use.
The recorded field appears in both _print_profiling_summary() output and step_once(return_profiling=True) results alongside built-in fields.
- Parameters:
- Return type:
Example:
# Inside your custom Agent.update(): t0 = time.perf_counter() self._do_custom_logic(dt) self.sim_core.record_profiling('custom_logic', (time.perf_counter() - t0) * 1000)
- pause()
Pause the simulation. step_once will be skipped while paused.
- Return type:
- resume()
Resume the simulation after a pause.
- Return type:
- property is_paused: bool
Return True if the simulation is currently paused.
- step_once(return_profiling=False)
Execute one simulation step.
Performs agent updates, callbacks, physics step, collision detection, and monitoring. If enable_time_profiling is True, detailed timing information is logged.
- Parameters:
return_profiling (
bool) – If True, return timing breakdown dictionary instead of logging. Useful for external profiling tools.- Return type:
- Returns:
If return_profiling=True, returns dict with timing breakdown in milliseconds:
{ 'agent_update': float, 'callbacks': float, 'step_simulation': float, 'collision_check': float, 'collision_breakdown': { # per-phase detail (present when collision ran) 'get_aabbs': float, 'spatial_hashing': float, 'aabb_filtering': float, 'contact_points': float, 'total': float, }, '<custom_field>': float, # present when custom fields registered 'monitor_update': float, 'total': float, }
Otherwise returns None.
Performance note: time.perf_counter() calls have negligible overhead (<0.1% for 10k objects). The profiling measurements themselves do not significantly impact simulation performance.
- get_memory_usage()
Get current memory usage (requires enable_memory_profiling=True).
- Return type:
- Returns:
Dictionary with current and peak memory usage in MB, or None if not enabled. Keys: ‘current_mb’, ‘peak_mb’
Note
Returns Python heap memory tracked by tracemalloc, NOT RSS (process memory).
tracemalloc tracks Python object allocations, not C extensions or OS-level memory.
For total process memory (RSS), use: psutil.Process().memory_info().rss
Peak is cumulative since last reset_peak() call (interval-based in profiling).
Example:
>>> sim = MultiRobotSimulationCore.from_dict(config) >>> # ... run simulation with enable_memory_profiling=True ... >>> mem = sim.get_memory_usage() >>> if mem: >>> print(f"Python heap: {mem['current_mb']:.2f} MB, Peak: {mem['peak_mb']:.2f} MB")
pybullet_fleet.data_monitor module
Real-time data monitor window for PyBullet simulation Shows simulation statistics in a separate tkinter window
- class pybullet_fleet.data_monitor.DataMonitor(title='Simulation Monitor', enable_gui=True, width=200, height=290, x=-1, y=-1)
Bases:
objectReal-time data monitoring window
- start()
Start the monitor window in a separate thread
- Return type:
- stop()
Stop the monitor window
- Return type:
- write_data(sim_data)
Write simulation data to shared file (called from main simulation).
Uses atomic write (write to temp file + os.replace) to prevent the reader from seeing a truncated / partially-written JSON file.
- pybullet_fleet.data_monitor.main()
Run standalone data monitor
pybullet_fleet.geometry module
Geometric primitives: Pose, Path, and quaternion slerp utilities.
- pybullet_fleet.geometry.Quat
Quaternion in
(x, y, z, w)convention — same as PyBullet.
- class pybullet_fleet.geometry.Pose(position, orientation=<factory>)
Bases:
objectPosition and orientation representation for any object in the simulation. Compatible with ROS2 geometry_msgs/Pose and PyBullet’s (position, orientation) tuples.
Supports both quaternion and Euler angle representations.
- position
[x, y, z] in world coordinates
- orientation
[x, y, z, w] quaternion (defaults to no rotation)
- property x: float
X coordinate.
- property y: float
Y coordinate.
- property z: float
Z coordinate.
- property roll: float
Roll angle in radians (rotation around x-axis).
- property pitch: float
Pitch angle in radians (rotation around y-axis).
- property yaw: float
Yaw angle in radians (rotation around z-axis).
- classmethod from_xyz(x, y, z)
Create Pose from x, y, z coordinates.
- classmethod from_euler(x, y, z, roll=0.0, pitch=0.0, yaw=0.0)
Create Pose from position and Euler angles.
- classmethod from_pybullet(position, orientation)
Create Pose from PyBullet position and orientation tuples.
- as_position_orientation()
Return (position, orientation) tuple for PyBullet.
- as_tuple()
Return (position, orientation) tuple. Alias for as_position_orientation() for convenience.
- as_euler()
Return orientation as Euler angles (roll, pitch, yaw).
- class pybullet_fleet.geometry.Path(waypoints)
Bases:
objectRepresents a path as a sequence of waypoints (Poses).
- waypoints
List of Pose objects representing the path
Example:
# Create from Pose list (recommended) path = Path([ Pose.from_xyz(0, 0, 0), Pose.from_xyz(1, 0, 0), Pose.from_xyz(1, 1, 0) ]) # Create from positions path = Path.from_positions([[0, 0, 0], [1, 0, 0], [1, 1, 0]]) # Create predefined shapes circle = Path.create_circle(center=[0, 0], radius=1.5) square = Path.create_square(center=[0, 0], side_length=2.0)
- waypoints: List[Pose]
- classmethod from_positions(positions, orientation=None)
Create Path from list of positions with optional shared orientation.
- Parameters:
- Return type:
Path- Returns:
Path instance
Example:
path = Path.from_positions([ [0, 0, 0], [1, 0, 0], [1, 1, 0] ])
- classmethod create_rectangle(center, width, height, rpy=[0.0, 0.0, 0.0])
Create a rectangular path in 3D.
- Parameters:
center (
List[float]) – [x, y, z] center position in world coordinateswidth (
float) – Width of rectangle (X direction in local frame)height (
float) – Height of rectangle (Y direction in local frame)rpy (
List[float]) – [roll, pitch, yaw] in radians. Defaults to [0, 0, 0]. When non-zero, the Z+ axis of the robot will be perpendicular to the rectangle plane.
- Return type:
Path- Returns:
Path instance with waypoints at rectangle corners
Example:
rect = Path.create_rectangle(center=[0, 0, 1], width=3.0, height=2.0) tilted_rect = Path.create_rectangle(center=[0, 0, 1], width=3.0, height=2.0, rpy=[0.1, 0.2, 0])
- classmethod create_square(center, side_length, rpy=[0.0, 0.0, 0.0])
Create a square path in 3D.
- Parameters:
- Return type:
Path- Returns:
Path instance with waypoints at square corners
Example:
square = Path.create_square(center=[0, 0, 0], side_length=2.0) tilted_square = Path.create_square(center=[0, 0, 1], side_length=2.0, rpy=[0.1, 0.2, 0])
Note:
This is a wrapper for create_rectangle with width == height.
- classmethod create_ellipse(center, radius_x, radius_y, num_points=16, rpy=[0.0, 0.0, 0.0], start_angle=0.0, end_angle=6.283185307179586)
Create an elliptical path in 3D.
- Parameters:
center (
List[float]) – [x, y, z] center position in world coordinatesradius_x (
float) – Radius along X axis in local frameradius_y (
float) – Radius along Y axis in local framenum_points (
int) – Number of waypoints along the arcrpy (
List[float]) – [roll, pitch, yaw] in radians. Defaults to [0, 0, 0].start_angle (
float) – Start angle [rad] in the local XY-planeend_angle (
float) – End angle [rad] in the local XY-plane
- Return type:
Path- Returns:
Path instance with waypoints along ellipse
Example:
ellipse = Path.create_ellipse(center=[0, 0, 0], radius_x=2.0, radius_y=1.0) tilted_ellipse = Path.create_ellipse(center=[0, 0, 1], radius_x=2.0, radius_y=1.0, rpy=[0, 0.3, 0])
- classmethod create_circle(center, radius, num_points=16, rpy=[0.0, 0.0, 0.0], start_angle=0.0, end_angle=6.283185307179586)
Create a circular path in 3D.
- Parameters:
center (
List[float]) – [x, y, z] center position in world coordinatesradius (
float) – Circle radiusnum_points (
int) – Number of waypoints along the arcrpy (
List[float]) – [roll, pitch, yaw] in radians. Defaults to [0, 0, 0].start_angle (
float) – Start angle [rad] in the local XY-planeend_angle (
float) – End angle [rad] in the local XY-plane
- Return type:
Path- Returns:
Path instance with waypoints along circle
Example:
circle = Path.create_circle(center=[0, 0, 0], radius=1.5) arc = Path.create_circle(center=[0, 0, 0], radius=1.5, start_angle=0, end_angle=np.pi)
Note:
This is a wrapper for create_ellipse with radius_x == radius_y.
- append(other)
Append waypoints from another path in-place.
Note:
This mutates the current Path. For a non-mutating version, use `combined = path1 + path2`.
- Return type:
- classmethod from_paths(paths)
Create a single Path by concatenating multiple paths.
Example:
circle_arc = Path.create_circle(center=[0, 0, 0.5], radius=2.0, num_points=16, start_angle=0.0, end_angle=np.pi/2) line = Path.from_positions([[1, 1, 0.5], [3, 1, 0.5]]) full = Path.from_paths([circle_arc, line])
- Return type:
Path
- get_total_distance()
Calculate total path length.
- Return type:
- Returns:
Total distance in meters
- visualize(show_lines=True, line_color=None, line_width=2.0, show_waypoints=False, show_axes=False, axis_length=0.3, show_points=False, point_size=0.05, lifetime=0.0)
Visualize path with lines and optional waypoint markers.
- Parameters:
show_lines (
bool) – If True, draw lines connecting waypointsline_color (
List[float]) – RGB color for path lines [r, g, b] (0-1 range), None for defaultline_width (
float) – Width of path linesshow_waypoints (
bool) – If True, show waypoint coordinate axes and pointsshow_axes (
bool) – If True, draw X, Y, Z axes at each waypointaxis_length (
float) – Length of coordinate axes in metersshow_points (
bool) – If True, draw spheres at waypoint positionspoint_size (
float) – Radius of waypoint spheres in meterslifetime (
float) – Debug line lifetime (0 = permanent until reset)
- Return type:
- Returns:
List of debug item IDs (for later removal if needed)
Example:
# Just show path lines path = Path.create_circle(center=[0, 0, 0.5], radius=2.0) path.visualize(line_color=[1, 0, 0]) # Show path with waypoint orientations path.visualize(show_waypoints=True, show_axes=True)
- visualize_waypoints(show_axes=True, axis_length=0.3, show_points=True, point_size=0.05, lifetime=0.0)
Visualize waypoints with coordinate axes for debugging.
- Parameters:
show_axes (
bool) – If True, draw X, Y, Z axes at each waypointaxis_length (
float) – Length of coordinate axes in metersshow_points (
bool) – If True, draw spheres at waypoint positionspoint_size (
float) – Radius of waypoint spheres in meterslifetime (
float) – Debug line lifetime (0 = permanent until reset)
- Return type:
- Returns:
List of debug item IDs (for later removal if needed)
Example:
path = Path.create_square(center=[0, 0, 1], side_length=2.0, rpy=[0.1, 0.2, 0]) debug_ids = path.visualize_waypoints(axis_length=0.5)
- pybullet_fleet.geometry.quat_to_rot_matrix(quat)
Convert a quaternion
(x, y, z, w)to a 3×3 rotation matrix.Returns the matrix as a flat 9-element tuple in row-major order:
(r00, r01, r02, r10, r11, r12, r20, r21, r22)
This is allocation-free and suitable for hot-path use.
- Parameters:
quat (
Tuple[float,float,float,float]) – Quaternion(x, y, z, w)— same convention as PyBullet. Automatically normalized if not already unit-length.- Return type:
Tuple[float,float,float,float,float,float,float,float,float]- Returns:
9-element tuple representing the 3×3 rotation matrix (row-major).
- Raises:
ValueError – If the quaternion has zero norm.
- pybullet_fleet.geometry.rotate_vector(vec, quat)
Rotate a 3-D vector by a quaternion.
Equivalent to
scipy.spatial.transform.Rotation.from_quat(quat).apply(vec)but avoids SciPy object allocation, making it suitable for per-tick hot paths.
- pybullet_fleet.geometry.quat_from_rotvec(rotvec)
Convert a rotation vector (axis × angle) to a quaternion
(x, y, z, w).Equivalent to
scipy.spatial.transform.Rotation.from_rotvec(rotvec).as_quat().
- pybullet_fleet.geometry.quat_multiply(q1, q2)
Hamilton product of two quaternions in
(x, y, z, w)convention.Equivalent to
(Rotation.from_quat(q1) * Rotation.from_quat(q2)).as_quat(). For active rotations this corresponds to applying q2 first, then q1.
- pybullet_fleet.geometry.quat_angle_between(q0, q1)
Compute the rotation angle (radians) between two quaternions.
Equivalent to
(Rotation.from_quat(q1) * Rotation.from_quat(q0).inv()).magnitude(). Handles the double-cover:qand-qrepresent the same rotation. Inputs are normalized internally, so non-unit quaternions are accepted.
- class pybullet_fleet.geometry.SlerpPrecomp(dot: float, theta_0: float, sin_theta_0: float, needs_flip: bool)
Bases:
NamedTuplePrecomputed constants for fast quaternion slerp between two keyframes.
- dot: float
Alias for field number 0
- theta_0: float
Alias for field number 1
- sin_theta_0: float
Alias for field number 2
- needs_flip: bool
Alias for field number 3
- pybullet_fleet.geometry.quat_slerp_precompute(q0, q1)
Precompute constants for fast scalar quaternion slerp.
- pybullet_fleet.geometry.quat_slerp(q0, q1, t, precomp)
Fast scalar quaternion slerp for normalized quaternions.
Specialized for the two-keyframe case where q0/q1 are fixed and only t varies per call. Uses math.sin/cos for scalar operations instead of numpy.
Note:
q0 and q1 must be unit quaternions. The normal slerp path preserves unit length by construction; the near-identical fallback normalizes explicitly.
- Parameters:
- Return type:
- Returns:
Interpolated quaternion as numpy array [x, y, z, w].
pybullet_fleet.logging_utils module
Logging utilities with lazy evaluation for expensive operations.
Design intent & usage policy: - The standard Python logger always evaluates f-strings and expensive computations, even if the log level is disabled. - LazyLogger/NamedLazyLogger allow you to pass a lambda, so the message is only evaluated if the log level is enabled. - This prevents unnecessary computation and side effects, improving performance.
Usage policy: - For lightweight constants or simple strings, pass them directly (no lambda needed). - For expensive computations or side effects, always wrap in a lambda. - Both are supported for flexibility and easier migration from standard logging.
See test_logging_utils.py for more usage and test examples.
- class pybullet_fleet.logging_utils.LazyLogger(name=None, logger=None)
Bases:
objectLogger wrapper with lazy evaluation for expensive formatting operations.
This prevents expensive string formatting (e.g., NumPy array conversion) when the log level is not enabled.
Usage:
logger = LazyLogger(__name__) logger.debug(lambda: f"Expensive array: {numpy_array}")
Or with the decorator:
@lazy_log def my_logger(): return logging.getLogger(__name__) logger = my_logger() logger.debug(lambda: f"Array: {arr}")
- __init__(name=None, logger=None)
Initialize LazyLogger.
- debug(msg_func, *args, **kwargs)
Log debug message with lazy evaluation.
- info(msg_func, *args, **kwargs)
Log info message with lazy evaluation.
- warning(msg_func, *args, **kwargs)
Log warning message with lazy evaluation.
- error(msg_func, *args, **kwargs)
Log error message with lazy evaluation.
- critical(msg_func, *args, **kwargs)
Log critical message with lazy evaluation.
- property logger: Logger
Get underlying logger instance.
- isEnabledFor(level)
Check if logging is enabled for the given level.
- Return type:
- class pybullet_fleet.logging_utils.NamedLazyLogger(name=None, logger=None, prefix='')
Bases:
LazyLoggerLazyLogger with a dynamic prefix for instance-level identification.
Designed for SimObject/Agent/Action instances where each log line should include identifiers like object_id and name, without manually embedding them in every log call.
The prefix can be updated at any time via
set_prefix()(e.g. afterobject_idis assigned by sim_core).Usage:
# In SimObject.__init__ self._log = NamedLazyLogger(__name__) self._log.set_prefix(f"[obj:{self.object_id}] ") # In Agent.__init__ (after super().__init__) self._log.set_prefix(f"[Agent:{self.object_id}:{self.name}] ") # Log calls — prefix is prepended automatically self._log.info("Path complete") # => [Agent:3:robot_A] Path complete self._log.debug(lambda: f"pos={self.get_pose().position[:2]}") # => [Agent:3:robot_A] pos=[1.0, 2.0] (only evaluated if DEBUG enabled)
Note
The prefix is stored as a plain string and prepended inside
_log_lazyonly when the level is enabled, so the cost is negligible when the message is filtered out.- __init__(name=None, logger=None, prefix='')
Initialize NamedLazyLogger.
- set_prefix(prefix)
Update the prefix prepended to every log message.
Call this whenever the identifying information changes (e.g. after object_id is assigned).
- Parameters:
prefix (
str) – New prefix string (include trailing space if desired)
- property prefix: str
Get current prefix string.
- pybullet_fleet.logging_utils.get_named_lazy_logger(name, prefix='')
Get a NamedLazyLogger instance.
This is the recommended way to create instance-level loggers for SimObject / Agent / Action classes.
- Parameters:
- Return type:
NamedLazyLogger- Returns:
NamedLazyLogger instance
Example:
# In __init__ self._log = get_named_lazy_logger(__name__, f"[Agent:{self.object_id}] ") self._log.debug(lambda: f"pos={pos}")
- pybullet_fleet.logging_utils.get_lazy_logger(name)
Get a LazyLogger instance for the given name.
This is the recommended way to create lazy loggers.
- Parameters:
name (
str) – Logger name (typically __name__)- Return type:
LazyLogger- Returns:
LazyLogger instance
Example:
logger = get_lazy_logger(__name__) logger.debug(lambda: f"Expensive: {numpy_array}")
- pybullet_fleet.logging_utils.wrap_existing_logger(logger)
Wrap an existing logger with lazy evaluation.
- Parameters:
logger (
Logger) – Existing logging.Logger instance- Return type:
LazyLogger- Returns:
LazyLogger wrapping the existing logger
Example:
standard_logger = logging.getLogger(__name__) lazy_logger = wrap_existing_logger(standard_logger) lazy_logger.debug(lambda: f"Array: {arr}")
pybullet_fleet.sim_object module
sim_object.py Base class for simulation objects with attachment support.
- class pybullet_fleet.sim_object.ShapeParams(shape_type=None, mesh_path=None, mesh_scale=<factory>, half_extents=<factory>, radius=0.5, height=1.0, rgba_color=<factory>, frame_pose=None)
Bases:
objectParameters for visual or collision shape.
Similar to URDF’s <visual> and <collision> tags, this allows full control over shape type, geometry, appearance, and frame offset.
- shape_type
Shape type - “box”, “sphere”, “cylinder”, “mesh”, or None (no shape)
- mesh_path
Path to mesh file (required if shape_type=”mesh”)
- mesh_scale
Mesh scaling [sx, sy, sz] (for mesh shapes)
- half_extents
Box half extents [hx, hy, hz] (for box shapes)
- radius
Sphere/cylinder radius (for sphere/cylinder shapes)
- height
Cylinder height (for cylinder shapes)
- rgba_color
RGBA color [r, g, b, a] (for visual shapes, ignored for collision)
- frame_pose
Shape offset as Pose object (relative to body frame)
Example:
# Box visual visual = ShapeParams( shape_type="box", half_extents=[0.5, 0.3, 0.2], rgba_color=[0.8, 0.2, 0.2, 1.0] ) # Mesh collision collision = ShapeParams( shape_type="mesh", mesh_path="collision.obj", mesh_scale=[1.0, 1.0, 1.0] ) # Sphere with offset sphere = ShapeParams( shape_type="sphere", radius=0.5, rgba_color=[0.0, 1.0, 0.0, 1.0], frame_pose=Pose.from_xyz(0, 0, 0.5) )
- radius: float = 0.5
- height: float = 1.0
- frame_pose: Pose | None = None
- classmethod from_dict(d)
Create ShapeParams from a configuration dictionary.
Recognised keys match the dataclass fields:
shape_type,mesh_path,mesh_scale,half_extents,radius,height,rgba_color. Missing keys use dataclass defaults.
- class pybullet_fleet.sim_object.SimObjectSpawnParams(visual_shape=None, collision_shape=None, initial_pose=<factory>, mass=0.0, pickable=True, name=None, visual_frame_pose=None, collision_frame_pose=None, collision_mode=CollisionMode.NORMAL_3D, user_data=<factory>)
Bases:
objectParameters for spawning a SimObject.
- visual_shape
Visual shape parameters (ShapeParams or None)
- collision_shape
Collision shape parameters (ShapeParams or None)
- initial_pose
Initial Pose (position and orientation) in world coordinates (default: origin)
- mass
Object mass in kg (0.0 for static objects, >0 for dynamic)
- pickable
Whether this object can be picked by robots (default: True)
- name
Optional string name for human-readable identification (default: None). Duplicates allowed - multiple objects can share the same name. Use for debugging, logging, filtering (e.g., “LeftRobot”, “Pallet_A”). For unique identification, use object_id instead.
- user_data
Optional dictionary for custom metadata (default: empty dict)
Note
Managers calculate positions automatically and may override initial_pose.
Example:
# Box visual + sphere collision with name params = SimObjectSpawnParams( visual_shape=ShapeParams( shape_type="box", half_extents=[0.5, 0.3, 0.2], rgba_color=[1.0, 0.0, 0.0, 1.0] ), collision_shape=ShapeParams( shape_type="sphere", radius=0.3 ), initial_pose=Pose.from_xyz(0, 0, 1), mass=1.0, name="Pallet_A" # Human-readable name (duplicates allowed) ) obj = SimObject.from_params(params, sim_core) # Later: Use object_id for unique lookup retrieved_obj = sim_core.get_object_by_id(obj.object_id) # Or filter by name (may return multiple objects) pallets = [o for o in sim_core.objects if o.name == "Pallet_A"]
- visual_shape: ShapeParams | None = None
- collision_shape: ShapeParams | None = None
- initial_pose: Pose
- mass: float = 0.0
- pickable: bool = True
- visual_frame_pose: Pose | None = None
- collision_frame_pose: Pose | None = None
- collision_mode: CollisionMode = 'normal_3d'
- classmethod from_dict(config)
Create SimObjectSpawnParams from a configuration dictionary.
Accepts both programmatic dicts (with
ShapeParams/Poseobjects) and raw YAML dicts (withposelist,yaw, and shape sub-dicts).- YAML-style keys:
name (str, required): Object name. visual_shape (dict | ShapeParams, optional): Visual shape definition. collision_shape (dict | ShapeParams, optional): Collision shape. pose (list):
[x, y, z]position. Alternative toinitial_pose. yaw (float): Yaw angle in radians (used withpose). initial_pose (Pose): Pose object (takes precedence overpose). mass (float): Default0.0(static). pickable (bool): DefaultTrue. collision_mode (str | CollisionMode): Default"normal_3d". user_data (dict): Default{}.
- Parameters:
- Return type:
SimObjectSpawnParams- Returns:
SimObjectSpawnParams instance.
- Raises:
ValueError – If
nameis missing.
- class pybullet_fleet.sim_object.SimObject(body_id, sim_core=None, pickable=True, mass=None, collision_mode=CollisionMode.NORMAL_3D, name=None, user_data=None)
Bases:
objectBase class for simulation objects (robots, pallets, obstacles). Supports attachment/detachment and kinematic teleportation.
- Object Identification:
- object_id: Unique integer ID assigned by sim_core (used for lookups and internal tracking).
This is the primary identifier for programmatic access.
- name: Optional string name for human-readable identification (used for debugging, logging, filtering).
Multiple objects can share the same name (duplicates allowed). Not guaranteed to be unique - use object_id for unique identification.
- Parameters:
- object_id
Unique integer ID (assigned by sim_core, use for lookups)
- name
Optional string name (duplicates allowed, use for debugging/filtering)
- body_id
PyBullet body ID
- mass
Object mass in kg
- pickable
Whether this object can be picked by robots
- collision_mode
Collision detection mode (NORMAL_3D, NORMAL_2D, STATIC, DISABLED)
- user_data
Dictionary for custom metadata (default: empty dict)
- attached_objects: List[SimObject]
- set_name(name)
Set the object name and update the log prefix accordingly.
Use this instead of directly setting
self.nameto ensure the log prefix stays in sync.
- classmethod create_shared_shapes(visual_shape=None, collision_shape=None, physics_client_id=0)
Create shared visual and collision shapes for fast object spawning.
This method caches mesh shapes by their parameters, so multiple objects with identical mesh parameters will reuse the same PyBullet shape IDs.
Note: Primitive shapes (box, sphere, cylinder) are NOT cached, as PyBullet handles them very efficiently and caching adds unnecessary overhead.
- Parameters:
- Return type:
- Returns:
(visual_id, collision_id) tuple
Example:
visual_id, collision_id = create_shared_shapes( visual_shape=ShapeParams( shape_type="mesh", mesh_path="visual.obj", mesh_scale=[1.0, 1.0, 1.0], rgba_color=[0.8, 0.2, 0.2, 1.0] ), collision_shape=ShapeParams( shape_type="box", half_extents=[0.5, 0.3, 0.2] ), physics_client_id=sim_core.client )
- classmethod from_mesh(visual_shape=None, collision_shape=None, pose=None, mass=0.0, pickable=True, sim_core=None, collision_mode=CollisionMode.NORMAL_3D, name=None, user_data=None)
Create a SimObject with optional visual and collision shapes.
- Parameters:
visual_shape (
Optional[ShapeParams]) – ShapeParams for visual geometrycollision_shape (
Optional[ShapeParams]) – ShapeParams for collision geometrypose (
Pose) – Initial Pose (position and orientation, default: origin)mass (
float) – Object mass (0.0 for static)pickable (
bool) – Whether object can be picked by robotssim_core – Reference to simulation core
name (
Optional[str]) – Optional human-readable name (duplicates allowed)
- Return type:
SimObject- Returns:
SimObject instance
Example:
# Box visual + sphere collision obj = SimObject.from_mesh( visual_shape=ShapeParams( shape_type="box", half_extents=[0.5, 0.3, 0.2], rgba_color=[0.8, 0.2, 0.2, 1.0] ), collision_shape=ShapeParams( shape_type="sphere", radius=0.4 ), pose=Pose.from_xyz(0, 0, 1), mass=1.0, sim_core=sim_core )
- classmethod from_params(spawn_params, sim_core=None)
Create a SimObject from SimObjectSpawnParams.
- Parameters:
spawn_params (
SimObjectSpawnParams) – SimObjectSpawnParams instancesim_core – Reference to simulation core
- Return type:
SimObject- Returns:
SimObject instance
Example:
params = SimObjectSpawnParams( visual_shape=ShapeParams( shape_type="mesh", mesh_path="pallet.obj", mesh_scale=[0.5, 0.5, 0.5], rgba_color=[0.8, 0.6, 0.4, 1.0] ), collision_shape=ShapeParams( shape_type="box", half_extents=[0.5, 0.4, 0.1] ), initial_pose=Pose.from_xyz(0, 0, 0.1), mass=1.0 ) obj = SimObject.from_params(params, sim_core)
- classmethod from_dict(config, sim_core=None)
Create a SimObject from a raw config dict.
Combines
SimObjectSpawnParams.from_dict()andfrom_params()in a single call. Subclasses can override to use a customSpawnParamsclass.
- property is_static: bool
Whether this object is static (never moves).
Returns True if collision_mode is STATIC. This property is read-only. To change collision mode, use set_collision_mode() method.
- set_collision_mode(mode)
Change collision detection mode for this object.
This method updates the collision mode and notifies sim_core to update all collision-related caches (AABB, spatial grid, etc.).
- Parameters:
mode (
CollisionMode) – New CollisionMode- Return type:
Example:
# Change to static obj.set_collision_mode(CollisionMode.STATIC) # Disable collision obj.set_collision_mode(CollisionMode.DISABLED) # Enable 2D collision obj.set_collision_mode(CollisionMode.NORMAL_2D)
- get_pose()
Return current position and orientation as Pose object.
Performance optimization:
Kinematic objects (mass=0): Always returns cached pose since position only changes via set_pose(), which updates the cache.
Dynamic objects: Caches within the same simulation timestep to avoid redundant PyBullet API calls. Cache is invalidated when sim_time advances.
- Return type:
Pose
- set_pose(pose, preserve_velocity=True)
Set position and orientation from a Pose object.
- Parameters:
pose (
Pose) – Pose object containing position and orientationpreserve_velocity (
bool) – If True, preserve current velocity after setting pose. If False, velocity is reset to zero (default behavior of PyBullet). For kinematic objects (mass=0), velocity preservation has no effect, so this is automatically set to False for performance.
- Return type:
- Returns:
True if the object moved (position or orientation changed), False otherwise
- set_pose_raw(position, orientation, preserve_velocity=True)
Set position and orientation from raw lists/tuples (no Pose allocation).
This is the fast path for internal use where position and orientation are already available as lists/tuples, avoiding Pose object creation overhead.
- attach_object(obj, parent_link_index=-1, relative_pose=None, joint_type=4)
Attach another SimObject to this object.
- Parameters:
obj (
SimObject) – Object to attachparent_link_index (
Union[int,str]) – Link index (int) or link name (str) to attach to. -1 or"base_link"for the base link.relative_pose (
Optional[Pose]) – Position and orientation offset in parent link’s frame as Pose object (default: None, which uses Pose(position=[0, 0, 0], orientation=[0, 0, 0, 1]))joint_type (
int) – PyBullet joint type (default: JOINT_FIXED)
- Return type:
- Returns:
True if attachment successful, False otherwise
Example:
# Attach to base link at 0.5m in front agent.attach_object(pallet, relative_pose=Pose.from_xyz(0.5, 0, 0)) # Attach with position and orientation agent.attach_object(pallet, relative_pose=Pose.from_euler(0.6, 0, -0.2, roll=1.5708, pitch=0, yaw=0)) # Attach to specific link by index agent.attach_object(box, parent_link_index=2) # Attach to specific link by name agent.attach_object(box, parent_link_index="end_effector")
- detach_object(obj)
Detach an object from this object.
- Parameters:
obj (
SimObject) – Object to detach- Return type:
- Returns:
True if detachment successful, False otherwise
Example:
agent.detach_object(pallet)
- get_attached_objects()
Return list of objects attached to this object.
- Return type:
List[SimObject]- Returns:
List of attached SimObject instances
- is_attached()
Check if this object is attached to another object.
- Return type:
- Returns:
True if attached to another object, False otherwise
- register_callback(callback, frequency=0.25)
Register a callback function to be executed periodically.
- execute_callbacks(current_time)
Execute registered callbacks based on their frequency.
pybullet_fleet.tools module
- pybullet_fleet.tools.body_to_world_velocity_2d(vx_body, vy_body, yaw)
Rotate 2-D body-frame velocity to world frame using yaw only.
- Parameters:
- Return type:
- Returns:
(vx, vy)in world frame.
Example:
>>> body_to_world_velocity_2d(1.0, 0.0, yaw=math.pi / 2) (0.0, 1.0) # facing +Y, forward maps to +Y
- pybullet_fleet.tools.body_to_world_velocity_3d(vx_body, vy_body, vz_body, orientation)
Rotate 3-D body-frame velocity to world frame using full quaternion.
Uses the same quaternion convention as PyBullet:
(x, y, z, w).- Parameters:
- Return type:
- Returns:
(vx, vy, vz)in world frame.
Example:
>>> from scipy.spatial.transform import Rotation >>> q = Rotation.from_euler("z", 90, degrees=True).as_quat() # xyzw >>> body_to_world_velocity_3d(1.0, 0.0, 0.0, tuple(q)) (0.0, 1.0, 0.0)
- pybullet_fleet.tools.normalize_vector_param(value, param_name, dim=3)
Normalize a parameter to a numpy array of specified dimension.
This is a utility function for handling parameters that can be either: - A single float (applied to all dimensions) - A list of floats (one per dimension)
Commonly used for velocity/acceleration limits that can be uniform or per-axis.
- Parameters:
- Return type:
- Returns:
Numpy array of shape (dim,)
- Raises:
ValueError – If list length doesn’t match dimension
Example:
>>> normalize_vector_param(2.0, "max_vel", 3) array([2., 2., 2.]) >>> normalize_vector_param([1.0, 2.0, 3.0], "max_vel", 3) array([1., 2., 3.])
- pybullet_fleet.tools.resolve_link_index(body_id, link)
Resolve link name to index.
- Parameters:
- Return type:
- Returns:
Link index (int). Returns -1 if link is “base_link” or if link name not found.
Example:
>>> index = resolve_link_index(robot.body_id, "sensor_mast") >>> index = resolve_link_index(robot.body_id, 2) # Also accepts int >>> index = resolve_link_index(robot.body_id, "base_link") # Returns -1
- pybullet_fleet.tools.resolve_joint_index(body_id, joint)
Resolve joint name to index.
- Parameters:
- Return type:
- Returns:
Joint index (int). Returns -1 if joint name not found.
Example:
>>> idx = resolve_joint_index(robot.body_id, "elbow_joint") >>> idx = resolve_joint_index(robot.body_id, 2)
- pybullet_fleet.tools.calculate_offset_pose(target_position, current_position, offset, keep_height=True)
Calculate pose at specified offset distance from target.
This unified function can be used for various offset calculations: - Approach poses (approach_offset): Position before reaching target - Pick poses (pick_offset): Position where pick operation occurs - Drop poses (drop_offset): Position where drop operation occurs
- Parameters:
- Return type:
Pose- Returns:
Pose object at the offset position, oriented to face the target
Example:
>>> # Approach pose (1.5m away from target) >>> approach = calculate_offset_pose([5, 0, 0.1], [0, 0, 0.3], offset=1.5) >>> >>> # Pick pose (0.3m away from target) >>> pick = calculate_offset_pose([5, 0, 0.1], [0, 0, 0.3], offset=0.3) >>> >>> # At target position (0m offset) >>> drop = calculate_offset_pose([5, 0, 0.1], [0, 0, 0.3], offset=0.0)
- pybullet_fleet.tools.world_to_grid(pos, spacing, offset=None)
Convert world coordinates to grid indices (always 3D), with offset.
- Parameters:
- Return type:
- Returns:
[ix, iy, iz] grid indices
- pybullet_fleet.tools.grid_to_world(grid, spacing, offset=None)
Convert grid indices to world coordinates (always 3D), with offset.
- Parameters:
- Return type:
- Returns:
[x, y, z] world coordinates
- pybullet_fleet.tools.world_to_grid_2d(pos, spacing)
2D wrapper for world_to_grid.
- pybullet_fleet.tools.grid_to_world_2d(grid, spacing)
2D wrapper for grid_to_world.
- pybullet_fleet.tools.grid_execution(grid_num=None, spacing=None, offset=None, func=None, args=None)
Calls the callback function func(grid_index, world_pos, **args) at each grid point.
- Parameters:
grid_num (
List[int]) – [x_num, y_num, z_num] number of grids in each axis (default: [1, 1, 1])spacing (
List[float]) – [spacing_x, spacing_y, spacing_z] grid spacing (default: [1.0, 1.0, 1.0])offset (
Optional[List[float]]) – [offset_x, offset_y, offset_z] offset (default: [0.0, 0.0, 0.0])func (
Optional[Callable]) – Callback function(grid_index: List[int], world_pos: List[float], **args) - grid_index: [ix, iy, iz] grid indices - world_pos: [x, y, z] world coordinatesargs (
Optional[Dict]) – Additional arguments to pass to the callback function
- Return type:
pybullet_fleet.types module
types.py Common types and enumerations used across the package.
This module contains shared type definitions to avoid circular imports and provide a single source of truth for common types.
- class pybullet_fleet.types.MotionMode(value)
-
Motion mode for robot movement control.
- OMNIDIRECTIONAL
Robot can move in any direction without rotating first (holonomic)
- DIFFERENTIAL
Robot must rotate to face target, then move forward (non-holonomic)
- OMNIDIRECTIONAL = 'omnidirectional'
- DIFFERENTIAL = 'differential'
- class pybullet_fleet.types.PosePhase(value)
-
Phase for TPI pose control (shared by all kinematic controllers).
- ROTATE
Robot is rotating in place (slerp TPI)
- FORWARD
Robot is moving forward in a straight line (distance TPI)
- ROTATE = 'rotate'
- FORWARD = 'forward'
- class pybullet_fleet.types.MovementDirection(value)
-
Movement direction for robot motion.
- Note: This is only used in differential drive mode (MotionMode.DIFFERENTIAL).
In omnidirectional mode, the robot always faces the movement direction.
- FORWARD
Robot X+ axis points towards target, moves forward along path
- BACKWARD
Robot X- axis points towards target (X+ points away), moves along path (robot appears to move “backward” relative to its X+ orientation)
- FORWARD = 'forward'
- BACKWARD = 'backward'
- class pybullet_fleet.types.ActionStatus(value)
Bases:
EnumAction execution status.
- NOT_STARTED
Action has not begun execution
- IN_PROGRESS
Action is currently executing
- COMPLETED
Action completed successfully
- FAILED
Action failed to complete
- CANCELLED
Action was cancelled before completion
- NOT_STARTED = 'not_started'
- IN_PROGRESS = 'in_progress'
- COMPLETED = 'completed'
- FAILED = 'failed'
- CANCELLED = 'cancelled'
- class pybullet_fleet.types.SpatialHashCellSizeMode(value)
Bases:
EnumCollision detection spatial hash cell size calculation mode.
Determines how the spatial hash grid cell size is calculated for efficient broad-phase collision detection.
- CONSTANT
Fixed cell size provided by user (fastest, requires manual tuning)
- AUTO_ADAPTIVE
Recalculate on object add/remove (adaptive, some overhead)
- AUTO_INITIAL
Calculate once at simulation start (default, balanced performance)
- CONSTANT = 'constant'
- AUTO_ADAPTIVE = 'auto_adaptive'
- AUTO_INITIAL = 'auto_initial'
- class pybullet_fleet.types.ControllerMode(value)
-
Internal mode for KinematicController state machine.
- IDLE
No active command — zero velocity, no pose target.
- VELOCITY
Processing cmd_vel input (body-frame velocity).
- POSE
Processing goal_pose / path via TPI trajectory.
- IDLE = 'idle'
- VELOCITY = 'velocity'
- POSE = 'pose'
- class pybullet_fleet.types.CollisionMode(value)
Bases:
EnumCollision detection mode for individual objects.
Determines how an object participates in collision detection: - Spatial dimension (2D vs 3D) - Movement type (static vs dynamic) - Collision detection enablement
- NORMAL_3D
Normal 3D collision detection (default) - Full 3D AABB overlap check - Checks all 27 spatial hash neighbors (3x3x3) - For dynamic/kinematic objects that move in 3D space
- NORMAL_2D
2D collision detection (Z-axis ignored) - 2D AABB overlap check (X, Y only) - Checks only 9 spatial hash neighbors (3x3x1, Z=0) - For ground vehicles, 2D robots
- STATIC
Static object mode (never moves, but has collision) - Registered in collision system during spawn - Never updates AABB/spatial grid (performance optimization) - Examples: walls, shelves, structures
- DISABLED
Collision detection completely disabled (ghost object) - No AABB calculation - No spatial grid registration - No collision checking - PyBullet physics collision also disabled - Examples: decorative objects, visual markers, UI elements
- Note: The actual collision filtering (e.g., ignore_static) is done at
core_simulation level, not in this mode definition.
Example
# Normal 3D robot robot = SimObject(…, collision_mode=CollisionMode.NORMAL_3D)
# 2D ground vehicle vehicle = SimObject(…, collision_mode=CollisionMode.NORMAL_2D)
# Static structure wall = SimObject(…, collision_mode=CollisionMode.STATIC)
# Ghost object (no collision) decoration = SimObject(…, collision_mode=CollisionMode.DISABLED)
- NORMAL_3D = 'normal_3d'
- NORMAL_2D = 'normal_2d'
- STATIC = 'static'
- DISABLED = 'disabled'
- class pybullet_fleet.types.CollisionDetectionMethod(value)
Bases:
EnumCollision detection method for narrow-phase collision checking.
Determines which PyBullet API to use for detecting actual contacts between objects after broad-phase filtering (AABB + spatial hashing).
Design Philosophy (from design document): - Physics OFF (kinematics): Use CLOSEST_POINTS for safety margin detection - Physics ON (physics): Use CONTACT_POINTS for actual contact logging
- CLOSEST_POINTS
Use getClosestPoints() - distance-based, kinematics-safe (RECOMMENDED DEFAULT) Best for: kinematics motion, safety clearance, collision avoidance Works with: Physics ON/OFF, stable with resetBasePositionAndOrientation
- CONTACT_POINTS
Use getContactPoints() - physics contact manifold (PHYSICS MODE) Best for: physics simulation, actual contact logging, debug/reproduction Requires: stepSimulation() to be called regularly Note: Unstable for kinematic-kinematic pairs
- HYBRID
Use getContactPoints for physics, getClosestPoints for kinematic (ADVANCED) Best for: Mixed physics/kinematics with different detection needs Slower due to branching overhead
- Recommended defaults:
physics_enabled=False → CLOSEST_POINTS (default)
physics_enabled=True → CONTACT_POINTS or HYBRID
- CLOSEST_POINTS = 'closest_points'
- CONTACT_POINTS = 'contact_points'
- HYBRID = 'hybrid'
Module contents
pybullet_fleet package General-purpose PyBullet simulation library for multi-robot fleets