##########################################################
# Copyright (c) 2025 Lara Bergmann, Bielefeld University #
##########################################################
from collections.abc import Callable
import numpy as np
import numpy.typing as npt
[docs]
class CorrectiveMovementMeasurement:
"""A utility class to measure corrective movements (overshoot and distance corrections) within one episode.
:param distance_func: a function to measure the distance between the current position of the object and its goal position.
- Inputs: 2 numpy arrays containing the current positions of the object and the object's goal positions each with
shape (1, length position vector) or (length position vector,).
- Outputs: a single float value or a numpy array with shape (1,) containing the distance values
:param threshold_pos: the threshold used to determine whether the object has reached its goal position, defaults to 0.05
"""
def __init__(
self,
distance_func: Callable[[np.ndarray, np.ndarray], np.ndarray],
threshold: float = 0.05,
) -> None:
self.distance_func = distance_func
self.threshold = threshold
# init overshoot corrections
self.cnt_overshoot_ep_corrections = 0
self.success_last_step = False
# init distance corrections
self.cnt_dist_ep_corrections = 0
self.last_dist_pos = np.inf
self.dist_increased = False
[docs]
def reset(self) -> None:
"""Reset counters etc. Should be called at environment reset to measure the number of corrective movements per episode."""
# reset overshoot corrections
self.cnt_overshoot_ep_corrections = 0
self.success_last_step = False
# reset distance corrections
self.cnt_dist_ep_corrections = 0
self.last_dist_pos = np.inf
self.dist_increased = False
[docs]
def update_overshoot_corrections(self, current_object_pose: np.ndarray, object_target_pose: np.ndarray) -> None:
"""Check whether an overshoot correction occurred and increase the counter if necessary.
:param current_object_pose: a numpy array containing the current position of the object.
Shape: (1, length position vector) or (length position vector,)
:param object_target_pose: a numpy array containing the target position of the object.
Shape: (1, length position vector) or (length position vector,)
"""
dist_pos = self.distance_func(current_object_pose, object_target_pose)
success_step = (dist_pos < self.threshold)[0]
if self.success_last_step and not success_step and np.abs(dist_pos - self.threshold) > 2e-5:
self.cnt_overshoot_ep_corrections += 1
self.success_last_step = success_step
[docs]
def update_distance_corrections(self, current_object_pose: np.ndarray, object_target_pose: np.ndarray) -> None:
"""Check whether a distance correction occurred and increase the counter if necessary.
:param current_object_pose: a numpy array containing the current position of the object.
Shape: (1, length position vector) or (length position vector,)
:param object_target_pose: a numpy array containing the target position of the object.
Shape: (1, length position vector) or (length position vector,)
"""
dist_pos = self.distance_func(current_object_pose, object_target_pose)
if self.last_dist_pos < dist_pos and np.abs(self.last_dist_pos - dist_pos) > 2e-5:
self.dist_increased = True
elif self.dist_increased and dist_pos < self.last_dist_pos and np.abs(self.last_dist_pos - dist_pos) > 2e-5:
self.dist_increased = False
self.cnt_dist_ep_corrections += 1
self.last_dist_pos = dist_pos
[docs]
def get_current_num_overshoot_corrections(self) -> int:
"""Return the current number of overshoot corrections measured within the current episode.
:return: the current number of overshoot corrections measured within the current episode
"""
return self.cnt_overshoot_ep_corrections
[docs]
def get_current_num_distance_corrections(self) -> int:
"""Return the current number of distance corrections measured within the current episode.
:return: the current number of distance corrections measured within the current episode
"""
return self.cnt_dist_ep_corrections
[docs]
class EnergyEfficiencyMeasurement:
"""A utility class to measure energy efficiency :math:`W` based on weighted sum of jerk, acceleration, and velocity.
.. math::
W = \sum_{m=0}^{M} w_j\cdot |j(m,t)| + w_a\cdot |a(m,t)| + w_v \cdot |v(m,t)|,
where :math:`w_j, w_a, w_v \in\mathbb{R}` are the weights and :math:`M\in\mathbb{N}` is the number of movers for which
to measure the energy efficiency.
This class computes a weighted sum of the absolute values of jerk, acceleration, and velocity for all movers
to assess energy efficiency. Lower values indicate more energy-efficient movements.
:param weight_jerk: weight for jerk component in the energy efficiency metric, defaults to 1.0
:param weight_acceleration: weight for acceleration component in the energy efficiency metric, defaults to 1.0
:param weight_velocity: weight for velocity component in the energy efficiency metric, defaults to 1.0
:param num_movers: number of movers in the system
:param dt: time step size for numerical differentiation, defaults to 0.01
"""
def __init__(
self,
weight_jerk: float = 15.0,
weight_acceleration: float = 5.0,
weight_velocity: float = 1.0,
num_movers: int = 1,
dt: float = 0.01,
) -> None:
self.weight_jerk = weight_jerk
self.weight_acceleration = weight_acceleration
self.weight_velocity = weight_velocity
self.num_movers = num_movers
self.dt = dt
self.reset()
[docs]
def reset(self) -> None:
"""Reset all measurements. Should be called at environment reset."""
self._cumulative_energy_metric = 0.0
self._min_energy_metric = None
self._max_energy_metric = None
self.step_count = 0
self.last_velocities = None
self.last_accelerations = None
def _compute_weighted_energy_metric(
self,
velocities: np.ndarray,
accelerations: np.ndarray = None,
jerk: np.ndarray = None,
) -> float:
"""Compute the weighted energy efficiency metric for the current step.
:param velocities: velocity array of shape (num_movers, 2) containing x,y velocities
:param accelerations: acceleration array of shape (num_movers, 2), can be None if jerk is provided
:param jerk: jerk array of shape (num_movers, 2), can be None (will be computed from acceleration)
:return: weighted energy efficiency metric value for current step
"""
velocities = velocities.reshape(self.num_movers, 2)
velocity_abs = np.abs(velocities)
velocity_metric = np.sum(velocity_abs)
acceleration_metric = 0.0
if accelerations is not None:
accelerations = accelerations.reshape(self.num_movers, 2)
acceleration_abs = np.abs(accelerations)
acceleration_metric = np.sum(acceleration_abs)
jerk_metric = 0.0
if jerk is not None:
jerk = jerk.reshape(self.num_movers, 2)
jerk_abs = np.abs(jerk)
jerk_metric = np.sum(jerk_abs)
elif accelerations is not None and self.last_accelerations is not None:
jerk_computed = (accelerations - self.last_accelerations) / self.dt
jerk_abs = np.abs(jerk_computed)
jerk_metric = np.sum(jerk_abs)
if accelerations is not None:
self.last_accelerations = accelerations.copy()
self.last_velocities = velocities.copy()
energy_metric = (
self.weight_velocity * velocity_metric + self.weight_acceleration * acceleration_metric + self.weight_jerk * jerk_metric
)
if self._min_energy_metric is None or energy_metric < self._min_energy_metric:
self._min_energy_metric = energy_metric
if self._max_energy_metric is None or energy_metric > self._max_energy_metric:
self._max_energy_metric = energy_metric
return energy_metric
[docs]
def update(
self,
velocities: np.ndarray,
accelerations: np.ndarray = None,
jerk: np.ndarray = None,
) -> None:
"""Update the cumulative energy efficiency measurement.
:param velocities: velocity array of shape (num_movers, 2) containing x,y velocities
:param accelerations: acceleration array of shape (num_movers, 2), can be None if jerk is provided
:param jerk: jerk array of shape (num_movers, 2), can be None (will be computed from acceleration)
"""
current_metric = self._compute_weighted_energy_metric(velocities, accelerations, jerk)
self._cumulative_energy_metric += current_metric
self.step_count += 1
@property
def cumulative_energy_metric(self) -> float:
"""Get the cumulative energy efficiency metric for the current episode.
:return: cumulative energy efficiency metric
"""
return self._cumulative_energy_metric
@property
def average_energy_metric(self) -> float:
"""Get the average energy efficiency metric per step for the current episode.
:return: average energy efficiency metric per step, or 0.0 if no steps recorded
"""
return self._cumulative_energy_metric / self.step_count if self.step_count > 0 else 0.0
@property
def min_energy_metric(self) -> float:
"""Get the minimum energy efficiency metric per step for the current episode.
:return: minimum energy efficiency metric per step, or None if no steps recorded
"""
return self._min_energy_metric
@property
def max_energy_metric(self) -> float:
"""Get the minimum energy efficiency metric per step for the current episode.
:return: minimum energy efficiency metric per step, or None if no steps recorded
"""
return self._max_energy_metric
BENCHMARK_PLANNING_NUM_MOVERS = [3, 3, 4, 4, 4, 3, 4, 4, 5, 5]
BENCHMARK_PLANNING_LAYOUTS: list[npt.NDArray[np.int8]] = [
np.ones((4, 3), dtype=np.int8),
np.array(
[
[1, 1, 1],
[1, 1, 1],
[0, 1, 1],
[1, 1, 1],
],
dtype=np.int8,
),
np.array(
[
[1, 1, 1, 1, 1],
[1, 1, 1, 1, 1],
[0, 0, 0, 1, 1],
[1, 1, 1, 1, 1],
[1, 1, 1, 1, 1],
[1, 1, 0, 0, 0],
[1, 1, 1, 1, 1],
[1, 1, 1, 1, 1],
],
dtype=np.int8,
),
np.array(
[
[0, 0, 1, 1, 1, 0, 0],
[0, 0, 1, 1, 1, 0, 0],
[1, 1, 1, 1, 1, 1, 1],
[1, 1, 1, 0, 1, 1, 1],
[1, 1, 1, 1, 1, 1, 1],
[0, 0, 1, 1, 1, 0, 0],
[0, 0, 1, 1, 1, 0, 0],
],
dtype=np.int8,
),
np.array(
[
[1, 1, 1, 1, 1, 1],
[1, 1, 1, 1, 1, 1],
[1, 1, 1, 1, 1, 1],
[1, 0, 0, 0, 1, 1],
[1, 1, 0, 0, 0, 1],
[1, 1, 1, 1, 1, 1],
[1, 1, 1, 1, 1, 1],
[1, 1, 1, 1, 1, 1],
],
dtype=np.int8,
),
np.array(
[
[1, 1, 1, 1, 1, 1],
[1, 1, 1, 1, 1, 1],
[1, 1, 1, 1, 1, 1],
[0, 0, 1, 1, 0, 0],
[0, 0, 1, 1, 0, 0],
[0, 0, 1, 1, 0, 0],
[0, 0, 1, 1, 0, 0],
[0, 0, 1, 1, 0, 0],
[1, 1, 1, 1, 1, 1],
[1, 1, 1, 1, 1, 1],
[1, 1, 1, 1, 1, 1],
],
dtype=np.int8,
),
np.array(
[
[0, 0, 0, 1, 1, 1],
[0, 0, 0, 1, 1, 1],
[0, 0, 0, 1, 1, 1],
[1, 1, 1, 1, 1, 1],
[1, 1, 1, 1, 1, 1],
[1, 1, 1, 0, 0, 0],
[1, 1, 1, 0, 0, 0],
[1, 1, 1, 0, 0, 0],
],
dtype=np.int8,
),
np.array(
[
[1, 1, 1, 0, 1, 1, 1],
[1, 1, 1, 0, 1, 1, 1],
[1, 1, 0, 0, 0, 1, 1],
[1, 1, 0, 0, 0, 1, 1],
[1, 1, 0, 0, 0, 1, 1],
[1, 1, 1, 1, 1, 1, 1],
[1, 1, 1, 1, 1, 1, 1],
],
dtype=np.int8,
),
np.array(
[
[1, 1, 1, 1, 1, 1],
[1, 1, 1, 1, 1, 1],
[1, 1, 1, 1, 0, 0],
[1, 1, 1, 1, 0, 0],
[1, 1, 1, 1, 1, 1],
[1, 0, 1, 1, 1, 1],
[0, 0, 1, 1, 1, 1],
[0, 0, 0, 1, 0, 0],
],
dtype=np.int8,
),
np.array(
[
[1, 1, 1, 1, 1, 0, 0, 0],
[1, 1, 1, 1, 1, 1, 0, 0],
[0, 0, 1, 1, 1, 1, 1, 0],
[0, 0, 1, 1, 1, 1, 1, 1],
[1, 0, 0, 1, 1, 0, 1, 1],
[1, 1, 1, 1, 1, 0, 1, 1],
[1, 1, 1, 1, 1, 1, 1, 1],
[1, 1, 1, 1, 1, 1, 1, 0],
],
dtype=np.int8,
),
]
BENCHMARK_PUSHING_NUM_MOVERS = [1, 1, 1, 2, 2, 2, 2, 2, 2, 2, 2]
BENCHMARK_PUSHING_LAYOUTS: list[npt.NDArray[np.int8]] = [
np.ones((3, 3), dtype=np.int8),
np.array(
[
[1, 1, 1],
[1, 1, 1],
[0, 1, 1],
[1, 1, 1],
],
dtype=np.int8,
),
np.array(
[
[1, 1, 1, 1, 1],
[1, 1, 1, 1, 1],
[0, 0, 0, 1, 1],
[1, 1, 1, 1, 1],
[1, 1, 1, 1, 1],
[1, 1, 0, 0, 0],
[1, 1, 1, 1, 1],
[1, 1, 1, 1, 1],
],
dtype=np.int8,
),
np.array(
[
[0, 0, 1, 1, 1, 0, 0],
[0, 0, 1, 1, 1, 0, 0],
[1, 1, 1, 1, 1, 1, 1],
[1, 1, 1, 0, 1, 1, 1],
[1, 1, 1, 1, 1, 1, 1],
[0, 0, 1, 1, 1, 0, 0],
[0, 0, 1, 1, 1, 0, 0],
],
dtype=np.int8,
),
np.array(
[
[1, 1, 1, 1, 1, 1],
[1, 1, 1, 1, 1, 1],
[1, 1, 1, 1, 1, 1],
[1, 0, 0, 0, 1, 1],
[1, 1, 0, 0, 0, 1],
[1, 1, 1, 1, 1, 1],
[1, 1, 1, 1, 1, 1],
[1, 1, 1, 1, 1, 1],
],
dtype=np.int8,
),
np.array(
[
[1, 1, 1, 1, 1, 1],
[1, 1, 1, 1, 1, 1],
[1, 1, 1, 1, 1, 1],
[0, 0, 1, 1, 0, 0],
[0, 0, 1, 1, 0, 0],
[0, 0, 1, 1, 0, 0],
[0, 0, 1, 1, 0, 0],
[0, 0, 1, 1, 0, 0],
[1, 1, 1, 1, 1, 1],
[1, 1, 1, 1, 1, 1],
[1, 1, 1, 1, 1, 1],
],
dtype=np.int8,
),
np.array(
[
[0, 0, 0, 1, 1, 1],
[0, 0, 0, 1, 1, 1],
[0, 0, 0, 1, 1, 1],
[1, 1, 1, 1, 1, 1],
[1, 1, 1, 1, 1, 1],
[1, 1, 1, 0, 0, 0],
[1, 1, 1, 0, 0, 0],
[1, 1, 1, 0, 0, 0],
],
dtype=np.int8,
),
np.array(
[
[1, 1, 1, 0, 1, 1, 1],
[1, 1, 1, 0, 1, 1, 1],
[1, 1, 0, 0, 0, 1, 1],
[1, 1, 0, 0, 0, 1, 1],
[1, 1, 0, 0, 0, 1, 1],
[1, 1, 1, 1, 1, 1, 1],
[1, 1, 1, 1, 1, 1, 1],
],
dtype=np.int8,
),
np.array(
[
[1, 1, 1, 1, 1, 1],
[1, 1, 1, 1, 1, 1],
[1, 1, 1, 1, 0, 0],
[1, 1, 1, 1, 0, 0],
[1, 1, 1, 1, 1, 1],
[1, 0, 1, 1, 1, 1],
[0, 0, 1, 1, 1, 1],
[0, 0, 0, 1, 0, 0],
],
dtype=np.int8,
),
np.array(
[
[1, 1, 1, 1, 1, 0, 0, 0],
[1, 1, 1, 1, 1, 1, 0, 0],
[0, 0, 1, 1, 1, 1, 1, 0],
[0, 0, 1, 1, 1, 1, 1, 1],
[1, 0, 0, 1, 1, 0, 1, 1],
[1, 1, 1, 1, 1, 0, 1, 1],
[1, 1, 1, 1, 1, 1, 1, 1],
[1, 1, 1, 1, 1, 1, 1, 0],
],
dtype=np.int8,
),
]