############################################################################
# Copyright (c) 2024 Cedric Grothues & Lara Bergmann, Bielefeld University #
############################################################################
from collections import OrderedDict
from collections.abc import MutableMapping
from typing import Any
import gymnasium as gym
import mujoco
import numpy as np
import shapely
import shapely.geometry as sg
from gymnasium import logger
from magbotsim import BasicMagBotSingleAgentEnv, MoverImpedanceController
from magbotsim.utils import benchmark_utils, mujoco_utils
DEFAULT_OBJECT_TYPES = [
'square_box',
'box',
'cylinder',
't_shape',
'l_shape',
'plus_shape',
]
DEFAULT_OBJECT_RANGES = {
'width': (0.06, 0.06), # [m]
'height': (0.06, 0.06), # [m]
'depth': (0.02, 0.02), # [m]
'segment_width': (0.02, 0.02), # [m]
'radius': (0.05, 0.05), # [m]
'mass': (0.3, 0.3), # [kg]
}
[docs]
class StateBasedGlobalPushingEnv(BasicMagBotSingleAgentEnv):
"""An object pushing environment.
:param num_movers: the number of movers in the environment, defaults to 1
:param mover_params: a dictionary that can be used to specify the mass and size of each mover using the keys 'mass' or 'size',
defaults to None. To use the same mass and size for each mover, the mass can be specified as a single float value and the
size as a numpy array of shape (3,). However, the movers can also be of different types, i.e. different masses and sizes.
In this case, the mass and size should be specified as numpy arrays of shapes (num_movers,) and (num_movers,3),
respectively. If set to None or only one key is specified, both mass and size or the missing value are set to the following
default values:
- mass: 1.24 [kg]
- size: [0.155/2, 0.155/2, 0.012/2] (x,y,z) [m] (note: half-size)
:param layout_tiles: a numpy array of shape (height, width) that specifies the layout of the tiles, defaults to None. If None, a
4x3 grid of tiles is used.
:param initial_mover_zpos: the initial distance between the bottom of the mover and the top of a tile, defaults to 0.003
:param std_noise: the standard deviation of a Gaussian with zero mean used to add noise, defaults to 1e-5. The standard
deviation can be used to add noise to the mover's position, velocity and acceleration. If you want to use different
standard deviations for position, velocity and acceleration use a numpy array of shape (3,); otherwise use a single float
value, meaning the same standard deviation is used for all three values.
:param render_mode: the mode that is used to render the frames ('human', 'rgb_array' or None), defaults to 'human'. If set to
None, no viewer is initialized and used, i.e. no rendering. This can be useful to speed up training.
:param render_every_cycle: whether to call 'render' after each integrator step in the ``step()`` method, defaults to False.
Rendering every cycle leads to a smoother visualization of the scene, but can also be computationally expensive. Thus, this
parameter provides the possibility to speed up training and evaluation. Regardless of this parameter, the scene is always
rendered after 'num_cycles' have been executed if ``render_mode != None``.
:param num_cycles: the number of control cycles for which to apply the same action, defaults to 40
:param collision_params: a dictionary that can be used to specify the following collision parameters, defaults to None:
- collision shape (key: 'shape'): can be 'box' or 'circle', defaults to 'circle'
- size of the collision shape (key: 'size'), defaults to 0.11 [m]:
- collision shape 'circle':
a single float value which corresponds to the radius of the circle, or a numpy array of shape (num_movers,) to specify
individual values for each mover
- collision shape 'box':
a numpy array of shape (2,) to specify x and y half-size of the box, or a numpy array of shape (num_movers, 2) to
specify individual sizes for each mover
- additional size offset (key: 'offset'), defaults to 0.0 [m]: an additional safety offset that is added to the size of the
collision shape. Think of this offset as increasing the size of a mover by a safety margin.
- additional wall offset (key: 'offset_wall'), defaults to 0.0 [m]: an additional safety offset that is added to the size
of the collision shape to detect wall collisions. Think of this offset as moving the wall, i.e. the edge of a tile
without an adjacent tile, closer to the center of the tile.
:param object_types: a sequence of strings that specifies which object types can be used, defaults to DEFAULT_OBJECT_TYPES.
Valid object types are: 'square_box', 'box', 'cylinder', 't_shape', 'l_shape', 'plus_shape'
:param object_ranges: a dictionary that specifies the range of object parameters for random sampling, defaults to
DEFAULT_OBJECT_RANGES.
Keys can include 'width', 'height', 'depth', 'segment_width', 'radius', 'mass'. Each value is a tuple (min_value, max_value)
:param object_sliding_friction: the sliding friction coefficient of the object, defaults to 1.0
:param object_torsional_friction: the torsional friction coefficient of the object, defaults to 0.005
:param v_max: the maximum velocity, defaults to 2.0 [m/s]
:param a_max: the maximum acceleration, defaults to 10.0 [m/s²]
:param j_max: the maximum jerk (only used if 'learn_jerk=True'), defaults to 100.0 [m/s³]
:param learn_jerk: whether to learn the jerk, defaults to False. If set to False, the acceleration is learned, i.e. the policy
output.
:param learn_pose: whether to learn both position and orientation of the object, defaults to False. If set to False, only
position is learned.
:param early_termination_steps: the number of consecutive steps at goal after which the episode terminates early, defaults to None
(no early termination)
:param max_position_err: the position threshold used to determine whether the object has reached its goal position, defaults
to 0.05 [m]
:param min_coverage: the minimum coverage ratio for goal achievement when learn_pose=True, defaults to 0.9
:param collision_penalty: the reward penalty applied when a collision occurs, defaults to -10.0
:param per_step_penalty: the small negative reward applied at each time step to encourage efficiency, defaults to -0.01
:param object_at_goal_reward: the positive reward given when the object reaches the goal without collisions, defaults to 1.0
:param use_mj_passive_viewer: whether the MuJoCo passive_viewer should be used, defaults to False. If set to False, the Gymnasium
MuJoCo WindowViewer with custom overlays is used.
"""
def __init__(
self,
num_movers: int = 1,
mover_params: dict[str, Any] | None = None,
layout_tiles: np.ndarray | None = None,
initial_mover_zpos: float = 0.003,
std_noise: np.ndarray | float = 1e-5,
render_mode: str | None = 'human',
render_every_cycle: bool = False,
num_cycles: int = 40,
collision_params: dict[str, Any] | None = None,
object_type: str = 'square_box',
object_ranges: MutableMapping[str, tuple[float, float]] = DEFAULT_OBJECT_RANGES,
v_max: float = 2.0,
a_max: float = 10.0,
j_max: float = 100.0,
object_sliding_friction: float = 1.0,
object_torsional_friction: float = 0.005,
learn_jerk: bool = False,
learn_pose: bool = False,
early_termination_steps: int | None = None,
max_position_err: float = 0.05,
min_coverage: float = 0.9,
collision_penalty: float = -10.0,
per_step_penalty: float = -0.01,
object_at_goal_reward: float = 1.0,
use_mj_passive_viewer: bool = False,
) -> None:
self.num_movers = num_movers
self.learn_jerk = learn_jerk
self.learn_pose = learn_pose
self.early_termination_steps = early_termination_steps
# tile configuration
if layout_tiles is None:
layout_tiles = np.ones((4, 3))
# position threshold in m
self.max_position_err = max_position_err
self.min_coverage = min_coverage
# rewards
self.collision_penalty = collision_penalty
self.per_step_penalty = per_step_penalty
self.object_at_goal_reward = object_at_goal_reward
self.steps_at_goal = 0
# object parameters
self.object_xy_start_pos = np.array([0.12, 0.36])
self.object_xy_goal_pos = np.array([0.36, 0.36])
self.object_noise_xy_pos = 1e-5
self.object_noise_yaw = 1e-5
self.object_start_yaw = 0.0
self.object_goal_yaw = 0.0
self.object_sliding_friction = object_sliding_friction
self.object_torsional_friction = object_torsional_friction
self.impedance_controllers = None
assert isinstance(object_type, str) and object_type in DEFAULT_OBJECT_TYPES
assert isinstance(object_ranges, MutableMapping) and all(len(value) == 2 for value in object_ranges.values())
self.object_type = object_type
self.object_ranges = object_ranges
self.object_width = 0
self.object_height = 0
self.object_depth = 0
self.object_radius = 0
self.object_segment_width = 0
self.object_mass = 0
self._sample_object_params(options={})
self.continuous_layout = np.all(layout_tiles == 1)
super().__init__(
layout_tiles=layout_tiles,
num_movers=num_movers,
tile_params=None,
mover_params=mover_params,
initial_mover_zpos=initial_mover_zpos,
std_noise=std_noise,
render_mode=render_mode,
default_cam_config=self._compute_default_cam_config(layout_tiles),
render_every_cycle=render_every_cycle,
num_cycles=num_cycles,
collision_params=collision_params,
custom_model_xml_strings=None,
use_mj_passive_viewer=use_mj_passive_viewer,
)
# maximum velocity, acceleration and jerk
self.v_max = v_max
self.a_max = a_max
self.j_max = j_max
# observation space
max_x = np.max(self.x_pos_tiles) + (self.tile_size[0] / 2)
max_y = np.max(self.y_pos_tiles) + (self.tile_size[1] / 2)
if not self.learn_pose:
low_goals = np.zeros(2)
high_goals = np.array([max_x, max_y])
else:
low_goals = np.array([0, 0, -1, -1])
high_goals = np.array([max_x, max_y, 1, 1])
self.observation_space = gym.spaces.Dict(
{
'observation': gym.spaces.Box(
low=-np.inf,
high=np.inf,
shape=(self.num_movers * (2 + int(self.learn_jerk)) * 2,),
dtype=np.float64,
),
'achieved_goal': gym.spaces.Box(low=low_goals, high=high_goals, dtype=np.float64),
'desired_goal': gym.spaces.Box(low=low_goals, high=high_goals, dtype=np.float64),
}
)
action_scale = self.j_max if self.learn_jerk else self.a_max
self.action_space = gym.spaces.Box(low=-action_scale, high=action_scale, shape=(self.num_movers * 2,), dtype=np.float64)
# minimum and maximum possible mover (x,y)-positions
safety_margin = self.c_size + self.c_size_offset_wall + self.c_size_offset + 0.03
self.min_xy_pos = np.zeros(2) + safety_margin
self.max_xy_pos = (
np.array([np.max(self.x_pos_tiles) + (self.tile_size[0] / 2), np.max(self.y_pos_tiles) + (self.tile_size[1] / 2)])
- safety_margin
)
# minimum and maximum possible object (x,y)-positions
actual_safety_margin = safety_margin + 0.02
self.object_min_xy_pos = self.min_xy_pos + actual_safety_margin
self.object_max_xy_pos = self.max_xy_pos - actual_safety_margin
self.object_goal_min_xy_pos = self.min_xy_pos + actual_safety_margin
self.object_goal_max_xy_pos = self.max_xy_pos - actual_safety_margin
# impedance contoller
self.impedance_controllers = [
MoverImpedanceController(
model=self.model,
mover_joint_name=self.mover_joint_names[mover_idx],
joint_mask=np.array([0, 0, 1, 1, 1, 1]),
translational_stiffness=np.array([1.0, 1.0, 100.0]),
rotational_stiffness=np.array([0.1, 0.1, 1]),
)
for mover_idx in range(self.num_movers)
]
self.reload_model()
# minimum distance between object and mover after env reset
max_side = self.object_radius if self.object_type == 'cylinder' else max(self.object_width, self.object_height)
max_size = self.mover_size.max(axis=0) if self.mover_size.ndim == 2 else self.mover_size
side_coll = np.linalg.norm(max_side + max_size[:2], ord=2)
c_offset = self.c_size + self.c_size_offset if self.c_shape == 'circle' else np.linalg.norm(self.c_size + self.c_size_offset, ord=2)
self.min_mover_object_dist = np.maximum(side_coll, c_offset)
self.min_object_goal_dist = 0.15
# corrective movements
self.cm_measurement = benchmark_utils.CorrectiveMovementMeasurement(
distance_func=self._calc_eucl_dist_xy, threshold=self.max_position_err
)
# throughput
self.time_to_success_s = -1
self.num_elapsed_cycles = 0
self.success_counter = 0
def _compute_default_cam_config(
self,
layout_tiles: np.ndarray,
tile_params: dict[str, Any] | None = None,
factor: float = 1.1,
) -> dict[str, Any]:
if tile_params and 'size' in tile_params:
tile_size = tile_params['size'][:2]
else:
tile_size = np.array([0.12, 0.12])
x, y = layout_tiles.shape * tile_size
diag = np.sqrt(x**2 + y**2)
distance = (diag * factor) / np.cos(np.radians(-65.0))
return {
'distance': distance,
'azimuth': 90.0,
'elevation': -65.0,
'lookat': np.array([x, y, 0.067]),
}
[docs]
def update_cached_actuator_mujoco_data(self) -> None:
"""Update all cached information about MuJoCo actuators, such as names and ids."""
self.mover_actuator_x_names = mujoco_utils.get_mujoco_type_names(self.model, obj_type='actuator', name_pattern='mover_actuator_x')
self.mover_actuator_y_names = mujoco_utils.get_mujoco_type_names(self.model, obj_type='actuator', name_pattern='mover_actuator_y')
self.mover_actuator_x_ids = np.zeros((len(self.mover_actuator_x_names),), dtype=np.int32)
self.mover_actuator_y_ids = np.zeros((len(self.mover_actuator_x_names),), dtype=np.int32)
for idx_a, actuator_x_name in enumerate(self.mover_actuator_x_names):
self.mover_actuator_x_ids[idx_a] = self.model.actuator(actuator_x_name).id
self.mover_actuator_y_ids[idx_a] = self.model.actuator(self.mover_actuator_y_names[idx_a]).id
for mover_idx in range(self.num_movers):
self.impedance_controllers[mover_idx].update_cached_actuator_mujoco_data(self.model)
# object
object_joint_name = mujoco_utils.get_mujoco_type_names(self.model, obj_type='joint', name_pattern='object')[0]
self.object_joint_qpos_adr, _, self.object_joint_qpos_ndim, _ = mujoco_utils.get_joint_addrs_and_ndims(
self.model, object_joint_name
)
def _object_geom_xml(self) -> str:
"""Generate MuJoCo XML geometry definition for the current object.
:return: a string containing the MuJoCo XML geometry definition
"""
def box_volume(width: float, height: float, depth: float) -> float:
"""Calculate the volume of a box geometry.
:param width: the width (x-direction half-size) of the box
:param height: the height (y-direction half-size) of the box
:param depth: the depth (z-direction half-size) of the box
:return: the volume of the box in cubic meters (accounts for half-sizes by factor of 8)
"""
return 8 * width * height * depth
def object_masses(shape: str) -> tuple[float, float]:
"""Calculate mass distribution for object components.
For simple shapes (box, cylinder), returns all mass to the main component.
For complex shapes, distributes mass proportionally based on volume.
:param shape: the object shape type
:return: a tuple of (main_body_mass, segment_mass)
"""
if shape in {'square_box', 'box', 'cylinder'}:
return (self.object_mass, 0)
b_volume = box_volume(self.object_segment_width, self.object_height, self.object_depth)
s_volume = box_volume(segment_width_half, self.object_segment_width, self.object_depth)
volume = b_volume + {'t_shape': 2, 'plus_shape': 2, 'l_shape': 1}[shape] * s_volume
return (
self.object_mass * (b_volume / volume),
self.object_mass * (s_volume / volume),
)
def box_geom(width: float, height: float, depth: float, mass: float, x: float = 0, y: float = 0, z: float = 0) -> str:
"""Generate XML for a box geometry.
:param width: the width (half-size) of the box
:param height: the height (half-size) of the box
:param depth: the depth (half-size) of the box
:param mass: the mass of the box
:param x: the x-position offset from parent body center, defaults to 0
:param y: the y-position offset from parent body center, defaults to 0
:param z: the z-position offset from parent body center, defaults to 0
:return: a string containing the MuJoCo XML for the box geometry
"""
pos_str = f' pos="{x} {y} {z}"' if any([x, y, z]) else ''
return f'\t\t\t<geom type="box" size="{width} {height} {depth}" mass="{mass}" {pos_str} />'
def cylinder_geom(radius: float, depth: float, mass: float) -> str:
"""Generate XML for a cylinder geometry.
:param radius: the radius of the cylinder
:param depth: the height (half-size) of the cylinder
:param mass: the mass of the cylinder
:return: a string containing the MuJoCo XML for the cylinder geometry
"""
return f'\t\t\t<geom type="cylinder" size="{radius} {depth}" mass="{mass}" />'
segment_width_half = (self.object_width - self.object_segment_width) / 2
segment_pos = segment_width_half + self.object_segment_width
masses = object_masses(self.object_type)
match self.object_type:
case 'square_box':
return box_geom(self.object_width, self.object_width, self.object_depth, masses[0])
case 'box':
return box_geom(self.object_width, self.object_height, self.object_depth, masses[0])
case 'cylinder':
return cylinder_geom(self.object_radius, self.object_depth, masses[0])
case 't_shape':
return '\n'.join(
[
box_geom(self.object_segment_width, self.object_height, self.object_depth, masses[0]),
box_geom(
segment_width_half,
self.object_segment_width,
self.object_depth,
masses[1],
-segment_pos,
self.object_height - self.object_segment_width,
0,
),
box_geom(
segment_width_half,
self.object_segment_width,
self.object_depth,
masses[1],
segment_pos,
self.object_height - self.object_segment_width,
0,
),
]
)
case 'l_shape':
return '\n'.join(
[
box_geom(self.object_segment_width, self.object_height, self.object_depth, masses[0]),
box_geom(
segment_width_half,
self.object_segment_width,
self.object_depth,
masses[1],
segment_pos,
-1 * (self.object_height - self.object_segment_width),
0,
),
]
)
case 'plus_shape':
return '\n'.join(
[
box_geom(self.object_segment_width, self.object_height, self.object_depth, masses[0]),
box_geom(segment_width_half, self.object_segment_width, self.object_depth, masses[1], -segment_pos, 0, 0),
box_geom(segment_width_half, self.object_segment_width, self.object_depth, masses[1], segment_pos, 0, 0),
]
)
case shape:
raise ValueError(f'Unsupported object shape {shape}')
@property
def _object_xml(self) -> str:
"""Generate the complete MuJoCo XML definition for the object body.
:return: a string containing the complete MuJoCo XML body definition including joint and geometry
"""
geom_xml = self._object_geom_xml()
goal_xml = geom_xml if self.learn_pose else '\n\t\t\t<geom name="object_goal" type="sphere" material="red" size="0.02" />'
return (
f'\n\t\t<!-- object -->\n\t\t<body name="object" '
f'pos="{self.object_xy_start_pos[0]} {self.object_xy_start_pos[1]} {self.object_depth}" '
f'euler="0 0 {self.object_start_yaw}" childclass="object">'
f'\n\t\t\t<joint name="object_joint" type="free" damping="0.01" />'
f'\n{geom_xml}\n\t\t</body>\n\t\t<body name="object_goal" '
f'pos="{self.object_xy_goal_pos[0]} {self.object_xy_goal_pos[1]} {self.object_depth}" '
f'euler="0 0 {self.object_goal_yaw}" childclass="object_goal">\n{goal_xml}\n\t\t</body>'
)
[docs]
def _custom_xml_string_callback(self, custom_model_xml_strings: dict | None) -> dict[str, str]:
"""Generate custom XML strings for the environment model.
This method is called during model initialization to add custom objects and modifications
to the base MuJoCo model.
:return: a dict of XML strings to be inserted into the model
"""
if custom_model_xml_strings is None:
custom_model_xml_strings = {}
# actuators
if self.impedance_controllers is not None:
mover_actuator_list = ['\n\t<actuator>', '\t\t<!-- mover actuators -->']
for idx_mover in range(self.num_movers):
joint_name = self.mover_joint_names[idx_mover]
mover_mass = self.mover_mass if isinstance(self.mover_mass, float) else self.mover_mass[idx_mover]
if self.learn_jerk:
mover_actuator_list.extend(
[
f'\t\t<general name="mover_actuator_x_{idx_mover}" joint="{joint_name}" gear="1 0 0 0 0 0" '
f'dyntype="integrator" gaintype="fixed" gainprm="{mover_mass} 0 0" biastype="none" actearly="true"/>',
f'\t\t<general name="mover_actuator_y_{idx_mover}" joint="{joint_name}" gear="0 1 0 0 0 0" '
f'dyntype="integrator" gaintype="fixed" gainprm="{mover_mass} 0 0" biastype="none" actearly="true"/>',
]
)
else:
mover_actuator_list.extend(
[
f'\t\t<general name="mover_actuator_x_{idx_mover}" joint="{joint_name}" gear="1 0 0 0 0 0" dyntype="none" '
f'gaintype="fixed" gainprm="{mover_mass} 0 0" biastype="none"/>',
f'\t\t<general name="mover_actuator_y_{idx_mover}" joint="{joint_name}" gear="0 1 0 0 0 0" '
f'dyntype="none" gaintype="fixed" gainprm="{mover_mass} 0 0" biastype="none"/>',
]
)
impedance_controller = self.impedance_controllers[idx_mover]
mover_actuator_list.append(impedance_controller.generate_actuator_xml_string(idx_mover=idx_mover))
mover_actuator_list.append('\t</actuator>')
custom_outworldbody_xml_str = custom_model_xml_strings.get('custom_outworldbody_xml_str', None)
mover_actuator_xml_str = '\n'.join(mover_actuator_list)
if custom_outworldbody_xml_str is not None:
custom_outworldbody_xml_str += mover_actuator_xml_str
else:
custom_outworldbody_xml_str = mover_actuator_xml_str
custom_model_xml_strings['custom_outworldbody_xml_str'] = custom_outworldbody_xml_str
custom_model_xml_strings['custom_default_xml_str'] = (
'\n\t\t<default class="object">\n\t\t\t<geom material="red" group="2" '
f'friction="{self.object_sliding_friction} {self.object_torsional_friction} 0.0001" priority="1" />\n\t\t</default>'
'\n\t\t<default class="object_goal">\n\t\t\t<geom material="red_transparent" contype="0" conaffinity="0" group="1" />'
'\n\t\t</default>'
)
# object
custom_worldbody_xml_str = custom_model_xml_strings.get('custom_worldbody_xml_str', None)
if custom_worldbody_xml_str is not None:
custom_worldbody_xml_str += self._object_xml
else:
custom_worldbody_xml_str = self._object_xml
custom_model_xml_strings['custom_worldbody_xml_str'] = custom_worldbody_xml_str
return custom_model_xml_strings
[docs]
def reload_model(self, mover_start_xy_pos: np.ndarray | None = None) -> None:
"""Generate a new model XML string with new start positions for mover and object and a new object goal position and reload the
model. In this environment, it is necessary to reload the model to ensure that the actuators work as expected.
:param mover_start_xy_pos: None or a numpy array of shape (num_movers,2) containing the (x,y) starting positions of each mover,
defaults to None. If set to None, the movers will be placed in the center of the tiles that are added to the XML string
first.
"""
custom_model_xml_strings = self._custom_xml_string_callback(
custom_model_xml_strings=self.custom_model_xml_strings_before_cb,
)
model_xml_str = self.generate_model_xml_string(
mover_start_xy_pos=mover_start_xy_pos,
mover_goal_xy_pos=None,
custom_xml_strings=custom_model_xml_strings,
)
self.model = mujoco.MjModel.from_xml_string(model_xml_str)
self.data = mujoco.MjData(self.model)
mujoco.mj_forward(self.model, self.data)
# update cached mujoco data
self.update_cached_mover_mujoco_data()
self.update_cached_actuator_mujoco_data()
if self.render_mode is not None:
self.viewer_collection.reload_model(self.model, self.data)
self.render()
def _sample_object_params(self, options: dict[str, Any]) -> None:
"""Sample random object parameters from the configured ranges.
:param options: a dictionary of parameter overrides. Keys can include 'object_type', 'object_start_yaw',
'object_goal_yaw', and any object property like 'object_width', 'object_height', etc.
"""
self.object_start_yaw = options.get('object_start_yaw', self.np_random.uniform(low=-np.pi, high=np.pi))
self.object_goal_yaw = options.get('object_goal_yaw', self.np_random.uniform(low=-np.pi, high=np.pi))
for prop in ['width', 'height', 'depth', 'radius', 'segment_width', 'mass']:
setattr(self, f'object_{prop}', options.get(f'object_{prop}', self.np_random.uniform(*self.object_ranges[prop])))
def _sample_position_on_tile(self) -> np.ndarray:
"""Sample a random position within a random valid tile, constrained to not extend beyond tile edges.
:return: the random position on a tile.
"""
valid_tiles = np.argwhere(self.layout_tiles == 1)
tile = valid_tiles[self.np_random.choice(len(valid_tiles))]
tx, ty = tile[0], tile[1]
center_x = self.x_pos_tiles[tx, ty]
center_y = self.y_pos_tiles[tx, ty]
has_left = ty > 0 and self.layout_tiles[tx, ty - 1] == 1
has_right = ty < self.layout_tiles.shape[1] - 1 and self.layout_tiles[tx, ty + 1] == 1
has_up = tx > 0 and self.layout_tiles[tx - 1, ty] == 1
has_down = tx < self.layout_tiles.shape[0] - 1 and self.layout_tiles[tx + 1, ty] == 1
x_offset = self.np_random.uniform(-self.tile_size[0] if has_up else 0, self.tile_size[0] if has_down else 0)
y_offset = self.np_random.uniform(-self.tile_size[1] if has_left else 0, self.tile_size[1] if has_right else 0)
return np.array([center_x + x_offset, center_y + y_offset])
[docs]
def _reset_callback(self, options: dict[str, Any] | None = None) -> None:
"""Reset the start positions of the movers and object and the object goal position and reload the model. It is ensured that the
new start positions of the movers are collision-free, i.e. no wall collision, no collision with other movers and no collision
with the object. In addition, the object's start position is chosen such that the mover fits between the wall and the object.
This is important to ensure that the object can be pushed in all directions.
:param options: can be used to override mover and object parameters
"""
options = options or {}
# sample new mover start positions
start_qpos = np.zeros((self.num_movers, 7))
start_qpos[:, 2] = self.initial_mover_zpos
start_qpos[:, 3] = 1 # Quaternion (1,0,0,0)
if self.continuous_layout:
self.object_xy_goal_pos = options.get(
'object_xy_goal_pos',
self.np_random.uniform(low=self.object_goal_min_xy_pos, high=self.object_goal_max_xy_pos, size=(2,)),
)
self._sample_object_params(options)
if 'mover_xy_start_pos' in options.keys():
start_qpos[:, :2] = options['mover_xy_start_pos']
if 'object_xy_start_pos' in options.keys():
self.object_xy_start_pos = options['object_xy_start_pos']
else:
# sample a new start position for the object and ensure that it does not collide with the mover
cnt = 0
all_ok = False
while not all_ok:
cnt += 1
if cnt > 0 and cnt % 100 == 0:
logger.warn(
f'Trying to find a start position for the object. No valid configuration found within {cnt} trails. '
'Consider choosing more tiles.'
)
# choose a new start position for the mover
if 'mover_xy_start_pos' in options.keys():
start_qpos[:, :2] = options['mover_xy_start_pos']
else:
start_qpos[:, :2] = self.np_random.uniform(low=self.min_xy_pos, high=self.max_xy_pos, size=(self.num_movers, 2))
pos_ok = self.qpos_is_valid(qpos=start_qpos, c_size=self.c_size, add_safety_offset=True)
mover_collision = self.check_mover_collision(
mover_names=self.mover_names, c_size=self.c_size, add_safety_offset=True, mover_qpos=start_qpos
)
if not self.continuous_layout:
self.object_xy_start_pos = self._sample_position_on_tile()
self.object_xy_goal_pos = options.get('object_xy_goal_pos', self._sample_position_on_tile())
else:
self.object_xy_start_pos = self.np_random.uniform(low=self.object_min_xy_pos, high=self.object_max_xy_pos, size=(2,))
mover_object_dist_ok = (
np.linalg.norm(self.object_xy_start_pos - start_qpos[:, :2], ord=2, axis=1) > self.min_mover_object_dist
)
object_goal_dist_ok = np.linalg.norm(self.object_xy_start_pos - self.object_xy_goal_pos, ord=2) > self.min_object_goal_dist
all_ok = not mover_collision and pos_ok.all() and mover_object_dist_ok.all() and object_goal_dist_ok
self.object_xy_start_pos = self.object_xy_start_pos.flatten()
self.steps_at_goal = 0
# reload model with new start pos and goal pos
self.reload_model(mover_start_xy_pos=start_qpos[:, :2])
# reset corrective movement measurement
self.cm_measurement.reset()
# reset throughput measurement
self.time_to_success_s = -1
self.num_elapsed_cycles = 0
self.success_counter = 0
[docs]
def _before_mujoco_step_callback(self, action: np.ndarray) -> None:
"""Apply the next action, i.e. it sets the jerk or acceleration, ensuring the minimum and maximum velocity and acceleration
(for one cycle).
:param action: a numpy array of shape (num_movers * 2,), which specifies the next action (jerk or acceleration)
"""
action = action.reshape((self.num_movers, 2))
vel = self.get_mover_qvel(mover_names=self.mover_names, add_noise=True)[:, :2]
if self.learn_jerk:
acc = self.get_mover_qacc(mover_names=self.mover_names, add_noise=False)[:, :2]
next_acc_tmp, next_jerk = self.ensure_max_dyn_val(current_values=acc, max_value=self.a_max, next_derivs=action)
_, next_acc = self.ensure_max_dyn_val(current_values=vel, max_value=self.v_max, next_derivs=next_acc_tmp)
if (next_acc_tmp != next_acc).any():
next_jerk = (next_acc - acc) / self.cycle_time
ctrl = next_jerk.copy()
else:
_, next_acc = self.ensure_max_dyn_val(current_values=vel, max_value=self.v_max, next_derivs=action)
ctrl = next_acc.copy()
self.data.ctrl[self.mover_actuator_x_ids] = ctrl[:, 0]
self.data.ctrl[self.mover_actuator_y_ids] = ctrl[:, 1]
assert self.impedance_controllers is not None
for mover_idx in range(self.num_movers):
self.impedance_controllers[mover_idx].update(
model=self.model,
data=self.data,
pos_d=np.array(
[
0,
0,
self.initial_mover_zpos + self.mover_size[mover_idx, 2] if self.mover_size.ndim == 2 else self.mover_size[2],
]
),
quat_d=np.array([1, 0, 0, 0]),
)
[docs]
def _after_mujoco_step_callback(self):
"""Check whether corrective movements (overshoot or distance corrections) occurred and increase the corresponding counter if
necessary.
"""
current_object_pose = self.data.qpos[self.object_joint_qpos_adr : self.object_joint_qpos_adr + self.object_joint_qpos_ndim]
current_object_pose = self.data.qpos[self.object_joint_qpos_adr : self.object_joint_qpos_adr + self.object_joint_qpos_ndim]
object_achieved_goal = current_object_pose[:2]
object_desired_goal = self.object_xy_goal_pos.copy()
# corrective movements
self.cm_measurement.update_distance_corrections(current_object_pose=object_achieved_goal, object_target_pose=object_desired_goal)
self.cm_measurement.update_overshoot_corrections(current_object_pose=object_achieved_goal, object_target_pose=object_desired_goal)
# throughput
if self.success_counter < 150:
self.num_elapsed_cycles += 1
object_geoms = None
if self.learn_pose:
object_geoms = self._geoms(name='object')[np.newaxis]
w, x, y, z = current_object_pose[3:]
object_yaw = np.arctan2(2 * (w * z + x * y), 1 - 2 * (y * y + z * z))
object_achieved_goal = np.append(object_achieved_goal, [np.sin(object_yaw), np.cos(object_yaw)])
object_desired_goal = np.append(object_desired_goal, [np.sin(self.object_goal_yaw), np.cos(self.object_goal_yaw)])
goal_reached, _ = self._is_goal_reached(object_achieved_goal[np.newaxis], object_desired_goal[np.newaxis], object_geoms)
if isinstance(goal_reached, np.ndarray):
assert goal_reached.shape == (1,)
goal_reached = goal_reached[0]
if goal_reached:
if self.success_counter == 0:
self.time_to_success_s = self.num_elapsed_cycles * (1 / 1000) # in s
self.success_counter += 1
else:
self.success_counter = 0
[docs]
def compute_terminated(
self, achieved_goal: np.ndarray, desired_goal: np.ndarray, info: dict[str, Any] | None = None
) -> np.ndarray | bool:
"""Compute whether the episode should terminate.
The episode terminates if there is a collision between movers, a collision with a wall, or if early termination conditions are
met (when configured).
:param achieved_goal: a numpy array of shape (batch_size, length achieved_goal) or (length achieved_goal,) containing the
already achieved (x,y)-positions of an object
:param desired_goal: a numpy array of shape (batch_size, length desired_goal) or (length desired_goal,) containing the
(x,y) goal positions of an object
:param info: a dictionary containing auxiliary information, defaults to None
:return:
- if batch_size > 1:
a numpy array of shape (batch_size,). An entry is True if the state is terminal, False otherwise
- if batch_size = 1:
True if the state is terminal, False otherwise
"""
assert info is not None
early_termination = self.early_termination_steps is not None and info['steps_at_goal'] >= self.early_termination_steps
return bool(info['mover_collision'] or info['wall_collision'] or early_termination)
[docs]
def compute_truncated(
self, achieved_goal: np.ndarray, desired_goal: np.ndarray, info: dict[str, Any] | None = None
) -> np.ndarray | bool:
"""Check whether the truncation condition is satisfied. The truncation condition (a time limit in this environment) is
automatically checked by the Gymnasium TimeLimit Wrapper, which is why this method always returns False.
:param achieved_goal: a numpy array of shape (batch_size, length achieved_goal) or (length achieved_goal,) containing the
already achieved (x,y)-positions of an object
:param desired_goal: a numpy array of shape (batch_size, length desired_goal) or (length desired_goal,) containing the
(x,y) goal positions of an object
:param info: a dictionary containing auxiliary information, defaults to None
:return:
- if batch_size > 1:
a numpy array of shape (batch_size,) in which all entries are False
- if batch_size = 1:
False
"""
batch_size = achieved_goal.shape[0] if len(achieved_goal.shape) > 1 else 1
return np.array([False] * batch_size) if batch_size > 1 else False
def _geom_to_shapely(self, geom: np.ndarray) -> sg.Polygon:
"""Convert MuJoCo geometry data to a Shapely polygon.
:param geom_data: a numpy array containing [type, size_x, size_y, pos_x, pos_y]
where type indicates geometry type (6=box, 5=cylinder)
:return: a Shapely Polygon representing the geometry
"""
type, sx, sy, px, py = geom
match np.int8(type):
case mujoco.mjtGeom.mjGEOM_CYLINDER: # type: ignore
shape = sg.Polygon([(sx * np.cos(angle), sx * np.sin(angle)) for angle in np.linspace(0, 2 * np.pi, 32)])
case mujoco.mjtGeom.mjGEOM_BOX: # type: ignore
shape = sg.box(-sx, -sy, sx, sy)
case _:
raise ValueError(f'Unsupported geometry type: {type}')
return shapely.affinity.translate(shape, px, py)
def _body_to_shapely(
self,
geoms: np.ndarray,
pose: np.ndarray,
) -> sg.MultiPolygon:
"""Convert a MuJoCo body to a Shapely polygon representation.
:param body_name: the name of the MuJoCo body to convert
:return: a Shapely Polygon representing the union of all geometries in the body
"""
x, y, sin_yaw, cos_yaw = pose.squeeze()
yaw = np.arctan2(sin_yaw, cos_yaw)
return shapely.affinity.translate(
shapely.affinity.rotate(
sg.MultiPolygon([self._geom_to_shapely(geoms[i]) for i in range(geoms.shape[0]) if geoms[i].any()]),
yaw,
origin='center',
use_radians=True,
),
x,
y,
)
def _object_coverage(
self,
achieved_goal: np.ndarray,
desired_goal: np.ndarray,
object_geoms: np.ndarray,
) -> np.ndarray:
"""Calculate the coverage ratio between achieved and desired object poses.
Used in pose learning mode to determine how well the object's current pose
matches the desired pose by computing the intersection area ratio.
:param achieved_goal: a numpy array containing the current object pose [x, y, yaw]
:param desired_goal: a numpy array containing the target object pose [x, y, yaw]
:param object_geoms: a numpy array containing geometry data for the object
:return: the coverage ratio between 0.0 and 1.0, where 1.0 means perfect overlap
"""
object_polys = [self._body_to_shapely(geoms, pose) for geoms, pose in zip(object_geoms, achieved_goal, strict=True)]
goal_polys = [self._body_to_shapely(geoms, pose) for geoms, pose in zip(object_geoms, desired_goal, strict=True)]
return np.array(
[
np.clip(goal_poly.intersection(object_poly).area / goal_poly.area, 0, 1)
for object_poly, goal_poly in zip(object_polys, goal_polys, strict=True)
]
)
def _is_goal_reached(
self,
achieved_goal: np.ndarray,
desired_goal: np.ndarray,
object_geoms: np.ndarray | None = None,
) -> tuple[np.ndarray, np.ndarray]:
"""Check whether the object has reached its goal position or pose.
The goal achievement condition depends on the learning mode:
- Position mode (learn_pose=False): Uses Euclidean distance threshold
- Pose mode (learn_pose=True): Uses coverage ratio threshold
:param achieved_goal: a numpy array containing the current object state. For position mode,
shape (2,) with [x, y]. For pose mode, shape (3,) with [x, y, yaw]
:param desired_goal: a numpy array containing the target object state. Same shape as achieved_goal
:param object_geoms: a numpy array containing geometry data for the object, defaults to None.
Required when learn_pose=True, ignored otherwise
:return: True if the goal is reached according to the current learning mode, False otherwise. Additionally, the coverage
(if ``learn_pose=True``) or the distance to the goal (if ``learn_pose=False``) is returned.
"""
if self.learn_pose:
assert object_geoms is not None
coverage = self._object_coverage(
achieved_goal=achieved_goal,
desired_goal=desired_goal,
object_geoms=object_geoms,
)
return coverage >= self.min_coverage, coverage
else:
dist_goal = self._calc_eucl_dist_xy(
achieved_goal=achieved_goal,
desired_goal=desired_goal,
)
return dist_goal <= self.max_position_err, dist_goal
[docs]
def compute_reward(
self,
achieved_goal: np.ndarray,
desired_goal: np.ndarray,
info: dict[str, Any] | None = None,
) -> np.ndarray | float:
"""Compute the immediate reward.
:param achieved_goal: a numpy array of shape (batch_size, length achieved_goal) or (length achieved_goal,) containing the
already achieved (x,y)-positions of an object
:param desired_goal: a numpy array of shape (batch_size, length desired_goal) or (length desired_goal,) containing the
(x,y) goal positions of an object
:param info: a dictionary containing auxiliary information, defaults to None
:return: a single float value or a numpy array of shape (batch_size,) containing the immediate rewards
"""
assert achieved_goal is not None and desired_goal is not None and info is not None
batch_size, mover_collisions, wall_collisions, object_geoms = self._preprocess_info_dict(info=info)
if batch_size == 1:
achieved_goal = achieved_goal.reshape(batch_size, -1)
desired_goal = desired_goal.reshape(batch_size, -1)
has_collision = mover_collisions | wall_collisions
goal_reached, dist_or_coverage = self._is_goal_reached(
achieved_goal,
desired_goal,
object_geoms,
)
reward = self.per_step_penalty * np.ones(shape=goal_reached.shape)
reward[has_collision] = self.collision_penalty
if not self.learn_pose:
reward[np.logical_and(goal_reached, np.logical_not(has_collision))] = self.object_at_goal_reward
else:
coverage_largerzero = np.logical_and(dist_or_coverage > 0, np.logical_not(has_collision))
reward[coverage_largerzero] = dist_or_coverage[coverage_largerzero]
return reward
def _get_obs(self) -> dict[str, np.ndarray] | np.ndarray:
"""Return an observation based on the current state of the environment.
:return: a dictionary containing the following keys and values:
- 'observation':
- if ``learn_jerk=True``:
a numpy array of shape (2*2,) containing the (x,y)-position, (x,y)-velocities and (x,y)-accelerations of the mover
- if ``learn_jerk=False``:
a numpy array of shape (2,) containing the (x,y)-position and (x,y)-velocities and of the mover
- 'achieved_goal':
a numpy array of shape (2,) containing the current (x,y)-position of the object
- 'desired_goal':
a numpy array of shape (2,) containing the desired (x,y)-position of the object
"""
# observation
mover_xy_pos = self.get_mover_qpos(mover_names=self.mover_names, add_noise=True)[:, :2]
mover_xy_velos = self.get_mover_qvel(mover_names=self.mover_names, add_noise=True)[:, :2]
if self.learn_jerk:
# no noise, because only SetAcc is available in a real system
mover_xy_accs = self.get_mover_qacc(mover_names=self.mover_names, add_noise=False)[:, :2]
observation = np.concatenate((mover_xy_pos, mover_xy_velos, mover_xy_accs), axis=0)
else:
observation = np.concatenate((mover_xy_pos, mover_xy_velos), axis=0)
# achieved goal
object_qpos = self.data.qpos[self.object_joint_qpos_adr : self.object_joint_qpos_adr + self.object_joint_qpos_ndim]
achieved_goal = object_qpos[:2] + self.rng_noise.normal(loc=0.0, scale=self.object_noise_xy_pos, size=2)
# desired goal
desired_goal = self.object_xy_goal_pos.copy()
if self.learn_pose:
w, x, y, z = object_qpos[3:]
object_yaw = np.arctan2(2 * (w * z + x * y), 1 - 2 * (y * y + z * z))
achieved_goal = np.append(
achieved_goal,
[
np.sin(object_yaw + self.rng_noise.normal(loc=0.0, scale=self.object_noise_yaw)),
np.cos(object_yaw + self.rng_noise.normal(loc=0.0, scale=self.object_noise_yaw)),
],
)
desired_goal = np.append(desired_goal, [np.sin(self.object_goal_yaw), np.cos(self.object_goal_yaw)])
return OrderedDict(
[
('observation', observation.flatten()),
('achieved_goal', achieved_goal),
('desired_goal', desired_goal),
]
)
def _geoms(self, name: str) -> np.ndarray:
"""Extract geometry information from a MuJoCo body.
Iterates through all geometries in the model and collects those belonging to the
specified body. For each geometry, extracts type, size, and position information.
:param name: the name of the MuJoCo body to extract geometries from
:return: a numpy array of shape (max_num_geoms, 5) containing geometry data.
Each row contains [type, size_x, size_y, pos_x, pos_y]. Unused rows are zero-filled.
Geometry types: 6=box, 5=cylinder
"""
body = self.model.body(name)
max_num_geoms = 3
geoms = np.zeros((max_num_geoms, 5))
geom_idx = 0
for geom_id in range(self.model.ngeom):
if self.model.geom_bodyid[geom_id] == body.id:
if geom_idx >= max_num_geoms:
logger.warn(
f"Body '{name}' has more than {max_num_geoms} geometries. Only the first {max_num_geoms} will be processed."
)
break
geom = self.model.geom(geom_id)
geoms[geom_idx, 0] = geom.type
geoms[geom_idx, 1:3] = geom.size[:2]
geoms[geom_idx, 3:5] = geom.pos[:2]
geom_idx += 1
return geoms
def _get_info(
self,
mover_collision: bool,
wall_collision: bool,
other_collision: bool,
achieved_goal: np.ndarray,
desired_goal: np.ndarray,
collision_info: dict[str, Any] | None = None,
) -> dict[str, Any]:
"""Return a dictionary that contains auxiliary information.
:param mover_collision: whether there is a collision between two movers
:param wall_collision: whether there is a collision between a mover and a wall
:param other_collision: whether there are other collisions besides wall or mover collisions, e.g. collisions with an obstacle
(not used in this environment)
:param achieved_goal: a numpy array of shape (length achieved_goal,) containing the already achieved (x,y)-position of the
object
:param desired_goal: a numpy array of shape (length achieved_goal,) containing the desired (x,y)-position of the object
:param collision_info: a dictionary that is intended to contain additional information about collisions, e.g.
collisions with obstacles. Defaults to None (not used in this environment)
:return: the info dictionary with keys 'is_success', 'mover_collision' and 'wall_collision'
"""
assert achieved_goal is not None and desired_goal is not None
info_dict: dict[str, bool | np.ndarray | int] = {
'mover_collision': mover_collision,
'wall_collision': wall_collision,
}
object_geoms = None
if self.learn_pose:
info_dict['object_geoms'] = self._geoms(name='object')
object_geoms = info_dict['object_geoms'][np.newaxis]
goal_reached, _ = self._is_goal_reached(achieved_goal[np.newaxis], desired_goal[np.newaxis], object_geoms)
if isinstance(goal_reached, np.ndarray):
assert goal_reached.shape == (1,)
goal_reached = goal_reached[0]
self.steps_at_goal = self.steps_at_goal + 1 if goal_reached else 0
is_success = bool(goal_reached) and not bool(mover_collision) and not bool(wall_collision)
info_dict.update(
{
'is_success': is_success,
'steps_at_goal': self.steps_at_goal,
'num_overshoot_corrections': self.get_current_num_overshoot_corrections(),
'num_distance_corrections': self.get_current_num_distance_corrections(),
'time_to_success_s': self.time_to_success_s,
}
)
return info_dict
[docs]
def close(self) -> None:
"""Close the environment."""
super().close()
[docs]
def ensure_max_dyn_val(self, current_values: np.ndarray, max_value: float, next_derivs: np.ndarray) -> tuple[np.ndarray, np.ndarray]:
"""Ensure the minimum and maximum dynamic values.
:param current_values: the current velocity or acceleration specified as a numpy array of shape (2,) or
(num_checks,2)
:param max_value: the maximum velocity or acceleration (float)
:param next_derivs: the next derivative (acceleration or jerk) used for one integrator step specified as a numpy array of
shape (2,) or (num_checks,2)
:return: the next velocity or acceleration and the next derivative (acceleration or jerk) corresponding to the next action
that must be applied to ensure the minimum and maximum dynamics (each of shape (num_checks,2))
"""
if len(current_values.shape) == 1:
current_values = current_values.reshape((1, -1))
if len(next_derivs.shape) == 1:
next_derivs = next_derivs.reshape((1, -1))
next_values = np.zeros((current_values.shape[0], 2))
next_derivs_new = np.zeros((current_values.shape[0], 2))
next_values_tmp = self.cycle_time * next_derivs + current_values
norm_next_values_tmp = np.linalg.norm(next_values_tmp, ord=2, axis=1)
mask_norm = norm_next_values_tmp >= max_value
next_values[np.bitwise_not(mask_norm), :] = next_values_tmp[np.bitwise_not(mask_norm), :]
next_derivs_new[np.bitwise_not(mask_norm), :] = next_derivs[np.bitwise_not(mask_norm), :]
if mask_norm.any():
next_values[mask_norm] = max_value * np.divide(
next_values_tmp[mask_norm],
np.broadcast_to(norm_next_values_tmp[mask_norm, np.newaxis], (np.sum(mask_norm), 2)),
)
next_derivs_new[mask_norm] = (next_values[mask_norm] - current_values[mask_norm]) / self.cycle_time
return next_values, next_derivs_new
def _calc_eucl_dist_xy(self, achieved_goal: np.ndarray, desired_goal: np.ndarray) -> np.ndarray:
"""Calculate the Euclidean distance.
:param achieved_goal: a numpy array of shape (batch_size, length achieved_goal) or (length achieved_goal,) containing the
already achieved (x,y)-positions of an object
:param desired_goal: a numpy array of shape (batch_size, length desired_goal) or (length desired_goal,) containing the
(x,y) goal positions of an object
:return: a numpy array of shape (batch_size,), which contains the distances between the achieved and the desired goals
"""
batch_size = achieved_goal.shape[0] if len(achieved_goal.shape) > 1 else 1
if batch_size == 1:
achieved_goal = achieved_goal.reshape(batch_size, -1)
desired_goal = desired_goal.reshape(batch_size, -1)
return np.linalg.norm(achieved_goal - desired_goal, ord=2, axis=1)
def _preprocess_info_dict(
self,
info: np.ndarray | dict[str, Any] | None,
) -> tuple[int, np.ndarray, np.ndarray, np.ndarray | None]:
"""Extract information about mover collisions, wall collisions and the batch size from the info dictionary.
:param info: the info dictionary or an array of info dictionary to be preprocessed. All dictionaries must contain the keys
'mover_collision' and 'wall_collision'.
:return: the batch_size (int), a numpy array of shape (batch_size,) containing the mover collision values (bool),
a numpy array of shape (batch_size,) containing the wall collision values (bool), information about object geoms (if pose
task, else None)
"""
if isinstance(info, np.ndarray):
batch_size = info.shape[0]
mover_collisions = np.zeros(batch_size).astype(bool)
wall_collisions = np.zeros(batch_size).astype(bool)
if self.learn_pose:
object_geoms = np.zeros((batch_size, 3, 5), np.float32)
for i in range(0, batch_size):
mover_collisions[i] = info[i]['mover_collision']
wall_collisions[i] = info[i]['wall_collision']
if self.learn_pose:
object_geoms[i] = info[i]['object_geoms']
else:
assert isinstance(info, dict)
batch_size = 1
mover_collisions = np.array([info['mover_collision']])
wall_collisions = np.array([info['wall_collision']])
if self.learn_pose:
object_geoms = np.array([info['object_geoms']])
return batch_size, mover_collisions, wall_collisions, object_geoms if self.learn_pose else None
[docs]
def get_current_num_overshoot_corrections(self) -> int:
"""Return the current number of overshoot corrections measured within the current episode.
:return: the current number of overshoot corrections measured within the current episode
"""
return self.cm_measurement.get_current_num_overshoot_corrections()
[docs]
def get_current_num_distance_corrections(self) -> int:
"""Return the current number of distance corrections measured within the current episode.
:return: the current number of distance corrections measured within the current episode
"""
return self.cm_measurement.get_current_num_distance_corrections()