diff --git a/configs/agents/rl/push_cube/gym_config.json b/configs/agents/rl/push_cube/gym_config.json index b78e7f3..766d12c 100644 --- a/configs/agents/rl/push_cube/gym_config.json +++ b/configs/agents/rl/push_cube/gym_config.json @@ -1,129 +1,193 @@ -{ - "id": "PushCubeRL", - "max_episodes": 5, - "env": { - "num_envs": 128, - "sim_steps_per_control": 4, - "events": { - "randomize_cube": { - "func": "randomize_rigid_object_pose", - "mode": "reset", - "params": { - "entity_cfg": {"uid": "cube"}, - "position_range": [[-0.2, -0.2, 0.0], [0.2, 0.2, 0.0]], - "relative_position": true +{ + "id": "PushCubeRL", + "max_episodes": 5, + "env": { + "num_envs": 128, + "sim_steps_per_control": 4, + "events": { + "randomize_cube": { + "func": "randomize_rigid_object_pose", + "mode": "reset", + "params": { + "entity_cfg": { + "uid": "cube" + }, + "position_range": [ + [-0.2, -0.2, 0.0], + [0.2, 0.2, 0.0] + ], + "relative_position": true + } + }, + "randomize_goal": { + "func": "randomize_target_pose", + "mode": "reset", + "params": { + "position_range": [ + [-0.3, -0.3, 0.05], + [0.3, 0.3, 0.05] + ], + "relative_position": false, + "store_key": "goal_pose" + } + } + }, + "observations": { + "robot_qpos": { + "func": "normalize_robot_joint_data", + "mode": "modify", + "name": "robot/qpos", + "params": { + "joint_ids": [0, 1, 2, 3, 4, 5] + } + }, + "robot_ee_pos": { + "func": "get_robot_eef_pose", + "mode": "add", + "name": "robot/ee_pos", + "params": { + "part_name": "arm" + } + }, + "cube_pos": { + "func": "get_rigid_object_pose", + "mode": "add", + "name": "object/cube_pos", + "params": { + "entity_cfg": { + "uid": "cube" + } + } + }, + "goal_pos": { + "func": "target_position", + "mode": "add", + "name": "object/goal_pos", + "params": { + "target_pose_key": "goal_pose" + } + } + }, + "rewards": { + "reaching_reward": { + "func": "reaching_behind_object", + "mode": "add", + "weight": 0.1, + "params": { + "object_cfg": { + "uid": "cube" + }, + "target_pose_key": "goal_pose", + "behind_offset": 0.015, + "height_offset": 0.015, + "distance_scale": 5.0, + "part_name": "arm" + } + }, + "place_reward": { + "func": "incremental_distance_to_target", + "mode": "add", + "weight": 1.0, + "params": { + "source_entity_cfg": { + "uid": "cube" + }, + "target_pose_key": "goal_pose", + "tanh_scale": 10.0, + "positive_weight": 2.0, + "negative_weight": 0.5, + "use_xy_only": true + } + }, + "action_penalty": { + "func": "action_smoothness_penalty", + "mode": "add", + "weight": 0.01, + "params": {} + }, + "success_bonus": { + "func": "success_reward", + "mode": "add", + "weight": 10.0, + "params": {} + } + }, + "extensions": { + "obs_mode": "state", + "episode_length": 100, + "joint_limits": 0.5, + "action_scale": 0.1, + "success_threshold": 0.1 } - } }, - "observations": {}, - "extensions": { - "obs_mode": "state", - "episode_length": 100, - "joint_limits": 0.5, - "action_scale": 0.1, - "success_threshold": 0.1, - "reaching_reward_weight": 0.1, - "place_reward_weight": 2.0, - "place_penalty_weight": 0.5, - "action_penalty_weight": 0.01, - "success_bonus_weight": 10.0 - } - }, - "robot": { - "uid": "Manipulator", - "urdf_cfg": { - "components": [ - { - "component_type": "arm", - "urdf_path": "UniversalRobots/UR10/UR10.urdf", - "transform": [ - [1.0, 0.0, 0.0, 0.0], - [0.0, 1.0, 0.0, 0.0], - [0.0, 0.0, 1.0, 0.0], - [0.0, 0.0, 0.0, 1.0] - ] + "robot": { + "uid": "Manipulator", + "urdf_cfg": { + "components": [ + { + "component_type": "arm", + "urdf_path": "UniversalRobots/UR10/UR10.urdf" + }, + { + "component_type": "hand", + "urdf_path": "DH_PGI_140_80/DH_PGI_140_80.urdf" + } + ] }, - { - "component_type": "hand", - "urdf_path": "DH_PGI_140_80/DH_PGI_140_80.urdf", - "transform": [ - [1.0, 0.0, 0.0, 0.0], - [0.0, 1.0, 0.0, 0.0], - [0.0, 0.0, 1.0, 0.0], - [0.0, 0.0, 0.0, 1.0] - ] + "init_pos": [0.0, 0.0, 0.0], + "init_rot": [0.0, 0.0, 0.0], + "init_qpos": [0.0, -1.57, 1.57, -1.57, -1.57, 0.0, 0.04, 0.04], + "drive_pros": { + "drive_type": "force", + "stiffness": 100000.0, + "damping": 1000.0, + "max_velocity": 2.0, + "max_effort": 500.0 + }, + "solver_cfg": { + "arm": { + "class_type": "PytorchSolver", + "end_link_name": "ee_link", + "root_link_name": "base_link", + "tcp": [ + [1.0, 0.0, 0.0, 0.0], + [0.0, 1.0, 0.0, 0.0], + [0.0, 0.0, 1.0, 0.16], + [0.0, 0.0, 0.0, 1.0] + ] + } + }, + "control_parts": { + "arm": ["JOINT[1-6]"] } - ] - }, - "init_pos": [0.0, 0.0, 0.0], - "init_rot": [0.0, 0.0, 0.0], - "init_qpos": [0.0, -1.57, 1.57, -1.57, -1.57, 0.0, 0.04, 0.04], - "drive_pros": { - "drive_type": "force", - "stiffness": 100000.0, - "damping": 1000.0, - "max_velocity": 2.0, - "max_effort": 500.0 - }, - "solver_cfg": { - "arm": { - "class_type": "PytorchSolver", - "end_link_name": "ee_link", - "root_link_name": "base_link", - "tcp": [ - [1.0, 0.0, 0.0, 0.0], - [0.0, 1.0, 0.0, 0.0], - [0.0, 0.0, 1.0, 0.16], - [0.0, 0.0, 0.0, 1.0] - ] - } }, - "control_parts": { - "arm": ["JOINT[1-6]"] - } - }, - "sensor": [], - "light": { - }, - "background": [ - { - "uid": "goal_sphere", - "shape": { - "shape_type": "Sphere", - "radius": 0.02 - }, - "body_type": "kinematic", - "init_pos": [-0.9, -0.6, 0.05], - "attrs": { - "enable_collision": false, - "mass": 0.0 - } - } - ], - "rigid_object": [ - { - "uid": "cube", - "shape": { - "shape_type": "Cube", - "size": [0.1, 0.1, 0.1] - }, - "body_type": "dynamic", - "init_pos": [-0.6, -0.4, 0.05], - "attrs": { - "mass": 10.0, - "static_friction": 3.0, - "dynamic_friction": 2.0, - "linear_damping": 2.0, - "angular_damping": 2.0, - "contact_offset": 0.003, - "rest_offset": 0.001, - "restitution": 0.1, - "max_depenetration_velocity": 10.0, - "max_linear_velocity": 1.0, - "max_angular_velocity": 1.0 - } - } - ], - "rigid_object_group": [], - "articulation": [] + "sensor": [], + "light": {}, + "background": [], + "rigid_object": [ + { + "uid": "cube", + "shape": { + "shape_type": "Cube", + "size": [0.1, 0.1, 0.1] + }, + "body_type": "dynamic", + "init_pos": [-0.6, -0.4, 0.05], + "attrs": { + "mass": 10.0, + "static_friction": 3.0, + "dynamic_friction": 2.0, + "linear_damping": 2.0, + "angular_damping": 2.0, + "contact_offset": 0.003, + "rest_offset": 0.001, + "restitution": 0.1, + "max_depenetration_velocity": 10.0, + "max_linear_velocity": 1.0, + "max_angular_velocity": 1.0 + } + } + ], + "rigid_object_group": [], + "articulation": [] } diff --git a/configs/agents/rl/push_cube/train_config.json b/configs/agents/rl/push_cube/train_config.json index f1558fd..3c3bda0 100644 --- a/configs/agents/rl/push_cube/train_config.json +++ b/configs/agents/rl/push_cube/train_config.json @@ -1,64 +1,67 @@ -{ - "trainer": { - "exp_name": "push_cube_ppo", - "gym_config": "configs/agents/rl/push_cube/gym_config.json", - "seed": 42, - "device": "cuda:0", - "headless": true, - "iterations": 1000, - "rollout_steps": 1024, - "eval_freq": 2, - "save_freq": 200, - "use_wandb": false, - "wandb_project_name": "embodychain-push_cube", - "events": { - "eval": { - "record_camera": { - "func": "record_camera_data_async", - "mode": "interval", - "interval_step": 1, - "params": { - "name": "main_cam", - "resolution": [640, 480], - "eye": [-1.4, 1.4, 2.0], - "target": [0, 0, 0], - "up": [0, 0, 1], - "intrinsics": [600, 600, 320, 240], - "save_path": "./outputs/videos/eval" - } +{ + "trainer": { + "exp_name": "push_cube_ppo", + "gym_config": "configs/agents/rl/push_cube/gym_config.json", + "seed": 42, + "device": "cuda:0", + "headless": false, + "enable_rt": false, + "gpu_id": 0, + "num_envs": 8, + "iterations": 1000, + "rollout_steps": 1024, + "eval_freq": 200, + "save_freq": 200, + "use_wandb": true, + "wandb_project_name": "embodychain-push_cube", + "events": { + "eval": { + "record_camera": { + "func": "record_camera_data_async", + "mode": "interval", + "interval_step": 1, + "params": { + "name": "main_cam", + "resolution": [640, 480], + "eye": [-1.4, 1.4, 2.0], + "target": [0, 0, 0], + "up": [0, 0, 1], + "intrinsics": [600, 600, 320, 240], + "save_path": "./outputs/videos/eval" + } + } + } } - } - } - }, - "policy": { - "name": "actor_critic", - "actor": { - "type": "mlp", - "network_cfg": { - "hidden_sizes": [256, 256], - "activation": "relu" - } }, - "critic": { - "type": "mlp", - "network_cfg": { - "hidden_sizes": [256, 256], - "activation": "relu" - } - } - }, - "algorithm": { - "name": "ppo", - "cfg": { - "learning_rate": 0.0001, - "n_epochs": 10, - "batch_size": 8192, - "gamma": 0.99, - "gae_lambda": 0.95, - "clip_coef": 0.2, - "ent_coef": 0.01, - "vf_coef": 0.5, - "max_grad_norm": 0.5 + "policy": { + "name": "actor_critic", + "actor": { + "type": "mlp", + "network_cfg": { + "hidden_sizes": [256, 256], + "activation": "relu" + } + }, + "critic": { + "type": "mlp", + "network_cfg": { + "hidden_sizes": [256, 256], + "activation": "relu" + } + } + }, + "algorithm": { + "name": "ppo", + "cfg": { + "learning_rate": 0.0001, + "n_epochs": 10, + "batch_size": 8192, + "gamma": 0.99, + "gae_lambda": 0.95, + "clip_coef": 0.2, + "ent_coef": 0.01, + "vf_coef": 0.5, + "max_grad_norm": 0.5 + } } - } -} \ No newline at end of file +} diff --git a/docs/source/overview/sim/sim_manager.md b/docs/source/overview/sim/sim_manager.md index d2f19be..2eca587 100644 --- a/docs/source/overview/sim/sim_manager.md +++ b/docs/source/overview/sim/sim_manager.md @@ -112,12 +112,23 @@ In this mode, the physics simulation stepping is automatically handling by the p > When in automatic update mode, user are recommanded to use CPU `sim_device` for simulation. -### Mainly used methods +## Mainly used methods - **`SimulationManager.update(physics_dt=None, step=1)`**: Steps the physics simulation with optional custom time step and number of steps. If `physics_dt` is None, uses the configured physics time step. - **`SimulationManager.enable_physics(enable: bool)`**: Enable or disable physics simulation. - **`SimulationManager.set_manual_update(enable: bool)`**: Set manual update mode for physics. + +## Multiple instances + +`SimulationManager` supports multiple instances to run separate simulations world independently. Each instance maintains its own simulation state, assets, and configurations. + +- To get current instance number of `SimulationManager`: `SimulationManager.get_instance_num()` +- To get specific instance: `SimulationManager.get_instance(instance_id)`. + +> Currently, multiple instances are not supported for ray tracing rendering backend. Good news is that we are working on adding this feature in future releases. + + For more methods and details, refer to the [SimulationManager](https://dexforce.github.io/EmbodiChain/api_reference/embodichain/embodichain.lab.sim.html#embodichain.lab.sim.SimulationManager) documentation. ### Related Tutorials diff --git a/embodichain/agents/rl/algo/ppo.py b/embodichain/agents/rl/algo/ppo.py index afd636e..5253e89 100644 --- a/embodichain/agents/rl/algo/ppo.py +++ b/embodichain/agents/rl/algo/ppo.py @@ -17,7 +17,7 @@ import torch from typing import Dict, Any, Tuple, Callable -from embodichain.agents.rl.utils import AlgorithmCfg +from embodichain.agents.rl.utils import AlgorithmCfg, flatten_dict_observation from embodichain.agents.rl.buffer import RolloutBuffer from embodichain.utils import configclass from .base import BaseAlgorithm @@ -102,6 +102,10 @@ def collect_rollout( reward = reward.float() done = done.bool() + # Flatten dict observation from ObservationManager if needed + if isinstance(next_obs, dict): + next_obs = flatten_dict_observation(next_obs) + # Add to buffer self.buffer.add(current_obs, actions, reward, done, value, log_prob) diff --git a/embodichain/agents/rl/train.py b/embodichain/agents/rl/train.py index e87d462..ca3c299 100644 --- a/embodichain/agents/rl/train.py +++ b/embodichain/agents/rl/train.py @@ -32,7 +32,7 @@ from embodichain.agents.rl.utils.trainer import Trainer from embodichain.utils import logger from embodichain.lab.gym.envs.tasks.rl import build_env -from embodichain.lab.gym.utils.gym_utils import config_to_rl_cfg +from embodichain.lab.gym.utils.gym_utils import config_to_cfg from embodichain.utils.utility import load_json from embodichain.utils.module_utils import find_function_from_modules from embodichain.lab.sim import SimulationManagerCfg @@ -60,6 +60,9 @@ def main(): eval_freq = int(trainer_cfg.get("eval_freq", 10000)) save_freq = int(trainer_cfg.get("save_freq", 50000)) headless = bool(trainer_cfg.get("headless", True)) + enable_rt = bool(trainer_cfg.get("enable_rt", False)) + gpu_id = int(trainer_cfg.get("gpu_id", 0)) + num_envs = trainer_cfg.get("num_envs", None) wandb_project_name = trainer_cfg.get("wandb_project_name", "embodychain-generic") # Device @@ -120,7 +123,11 @@ def main(): logger.log_info(f"Current working directory: {Path.cwd()}") gym_config_data = load_json(str(gym_config_path)) - gym_env_cfg = config_to_rl_cfg(gym_config_data) + gym_env_cfg = config_to_cfg(gym_config_data) + + # Override num_envs from train config if provided + if num_envs is not None: + gym_env_cfg.num_envs = num_envs # Ensure sim configuration mirrors runtime overrides if gym_env_cfg.sim_cfg is None: @@ -135,24 +142,28 @@ def main(): else: gym_env_cfg.sim_cfg.sim_device = torch.device("cpu") gym_env_cfg.sim_cfg.headless = headless + gym_env_cfg.sim_cfg.enable_rt = enable_rt + gym_env_cfg.sim_cfg.gpu_id = gpu_id logger.log_info( - f"Loaded gym_config from {gym_config_path} (env_id={gym_env_cfg.env_id}, headless={gym_env_cfg.sim_cfg.headless}, sim_device={gym_env_cfg.sim_cfg.sim_device})" + f"Loaded gym_config from {gym_config_path} (env_id={gym_config_data['id']}, num_envs={gym_env_cfg.num_envs}, headless={gym_env_cfg.sim_cfg.headless}, enable_rt={gym_env_cfg.sim_cfg.enable_rt}, sim_device={gym_env_cfg.sim_cfg.sim_device})" ) - env = build_env(gym_env_cfg.env_id, base_env_cfg=gym_env_cfg) + env = build_env(gym_config_data["id"], base_env_cfg=gym_env_cfg) eval_gym_env_cfg = deepcopy(gym_env_cfg) eval_gym_env_cfg.num_envs = 4 eval_gym_env_cfg.sim_cfg.headless = True - eval_env = build_env(eval_gym_env_cfg.env_id, base_env_cfg=eval_gym_env_cfg) + eval_env = build_env(gym_config_data["id"], base_env_cfg=eval_gym_env_cfg) # Build Policy via registry policy_name = policy_block["name"] # Build Policy via registry (actor/critic must be explicitly defined in JSON when using actor_critic) if policy_name.lower() == "actor_critic": - obs_dim = env.observation_space.shape[-1] + # Get observation dimension from flattened observation space + # flattened_observation_space returns Box space for RL training + obs_dim = env.flattened_observation_space.shape[-1] action_dim = env.action_space.shape[-1] actor_cfg = policy_block.get("actor") @@ -167,7 +178,7 @@ def main(): policy = build_policy( policy_block, - env.observation_space, + env.flattened_observation_space, env.action_space, device, actor=actor, @@ -175,7 +186,7 @@ def main(): ) else: policy = build_policy( - policy_block, env.observation_space, env.action_space, device + policy_block, env.flattened_observation_space, env.action_space, device ) # Build Algorithm via factory diff --git a/embodichain/agents/rl/utils/__init__.py b/embodichain/agents/rl/utils/__init__.py index f6f9f4f..e6f9e57 100644 --- a/embodichain/agents/rl/utils/__init__.py +++ b/embodichain/agents/rl/utils/__init__.py @@ -15,7 +15,9 @@ # ---------------------------------------------------------------------------- from .config import AlgorithmCfg +from .helper import flatten_dict_observation __all__ = [ "AlgorithmCfg", + "flatten_dict_observation", ] diff --git a/embodichain/agents/rl/utils/helper.py b/embodichain/agents/rl/utils/helper.py new file mode 100644 index 0000000..3021a31 --- /dev/null +++ b/embodichain/agents/rl/utils/helper.py @@ -0,0 +1,52 @@ +# ---------------------------------------------------------------------------- +# Copyright (c) 2021-2025 DexForce Technology Co., Ltd. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ---------------------------------------------------------------------------- + +import torch + + +def flatten_dict_observation(input_dict: dict) -> torch.Tensor: + """ + Flatten hierarchical dict observations from ObservationManager. + + Recursively traverse nested dicts, collect all tensor values, + flatten each to (num_envs, -1), and concatenate in sorted key order. + + Args: + input_dict: Nested dict structure, e.g. {"robot": {"qpos": tensor, "ee_pos": tensor}, "object": {...}} + + Returns: + Concatenated flat tensor of shape (num_envs, total_dim) + """ + obs_list = [] + + def _collect_tensors(d, prefix=""): + """Recursively collect tensors from nested dicts in sorted order.""" + for key in sorted(d.keys()): + full_key = f"{prefix}/{key}" if prefix else key + value = d[key] + if isinstance(value, dict): + _collect_tensors(value, full_key) + elif isinstance(value, torch.Tensor): + # Flatten tensor to (num_envs, -1) shape + obs_list.append(value.flatten(start_dim=1)) + + _collect_tensors(input_dict) + + if not obs_list: + raise ValueError("No tensors found in observation dict") + + result = torch.cat(obs_list, dim=-1) + return result diff --git a/embodichain/agents/rl/utils/trainer.py b/embodichain/agents/rl/utils/trainer.py index 88c0490..34dc191 100644 --- a/embodichain/agents/rl/utils/trainer.py +++ b/embodichain/agents/rl/utils/trainer.py @@ -25,6 +25,7 @@ import wandb from embodichain.lab.gym.envs.managers.event_manager import EventManager +from .helper import flatten_dict_observation class Trainer: @@ -74,22 +75,22 @@ def __init__( # initial obs (assume env returns torch tensors already on target device) obs, _ = self.env.reset() - self.obs = obs # Initialize algorithm's buffer - self.observation_space = getattr(self.env, "observation_space", None) - self.action_space = getattr(self.env, "action_space", None) - obs_dim = ( - self.observation_space.shape[-1] - if self.observation_space - else self.obs.shape[-1] - ) - action_dim = self.action_space.shape[-1] if self.action_space else None + # Flatten dict observations from ObservationManager to tensor for RL algorithms + if isinstance(obs, dict): + obs_tensor = flatten_dict_observation(obs) + obs_dim = obs_tensor.shape[-1] + num_envs = obs_tensor.shape[0] + # Store flattened observation for RL training + self.obs = obs_tensor + + action_space = getattr(self.env, "action_space", None) + action_dim = action_space.shape[-1] if action_space else None if action_dim is None: raise RuntimeError( "Env must expose action_space with shape for buffer initialization." ) - num_envs = self.obs.shape[0] if self.obs.ndim == 2 else 1 # Algorithm manages its own buffer self.algorithm.initialize_buffer(num_steps, num_envs, obs_dim, action_dim) @@ -160,8 +161,9 @@ def on_step(obs, actions, reward, done, info, next_obs): self.curr_len[done_idx] = 0 # Update global step and observation + # next_obs is already flattened in algorithm's collect_rollout self.obs = next_obs - self.global_step += next_obs.shape[0] if next_obs.ndim == 2 else 1 + self.global_step += next_obs.shape[0] if isinstance(info, dict): rewards_dict = info.get("rewards") @@ -226,6 +228,8 @@ def _eval_once(self, num_episodes: int = 5): returns = [] for _ in range(num_episodes): obs, _ = self.eval_env.reset() + obs = flatten_dict_observation(obs) + done_any = torch.zeros( obs.shape[0] if obs.ndim == 2 else 1, dtype=torch.bool, @@ -235,8 +239,12 @@ def _eval_once(self, num_episodes: int = 5): ep_ret = torch.zeros(num_envs_eval, dtype=torch.float32, device=self.device) while not done_any.any(): actions, _, _ = self.policy.get_action(obs, deterministic=True) - result = self.eval_env.step(actions) - obs, reward, terminated, truncated, info = result + obs, reward, terminated, truncated, info = self.eval_env.step(actions) + + # Flatten dict observation from step + if isinstance(obs, dict): + obs = flatten_dict_observation(obs) + done = terminated | truncated reward = reward.float() done_any = done diff --git a/embodichain/lab/gym/envs/base_env.py b/embodichain/lab/gym/envs/base_env.py index c2ab6d8..9974360 100644 --- a/embodichain/lab/gym/envs/base_env.py +++ b/embodichain/lab/gym/envs/base_env.py @@ -15,6 +15,7 @@ # ---------------------------------------------------------------------------- import torch +import numpy as np import gymnasium as gym from typing import Dict, List, Union, Tuple, Any, Sequence @@ -173,6 +174,21 @@ def observation_space(self) -> gym.spaces.Space: self.single_observation_space, n=self.num_envs ) + @cached_property + def flattened_observation_space(self) -> gym.spaces.Box: + """Flattened observation space for RL training. + + Returns a Box space by computing total dimensions from nested dict observations. + This is needed because RL algorithms (PPO, SAC, etc.) require flat vector inputs. + """ + from embodichain.agents.rl.utils.helper import flatten_dict_observation + + flattened_obs = flatten_dict_observation(self._init_raw_obs) + total_dim = flattened_obs.shape[-1] + return gym.spaces.Box( + low=-np.inf, high=np.inf, shape=(total_dim,), dtype=np.float32 + ) + @cached_property def action_space(self) -> gym.spaces.Space: if self.num_envs == 1: @@ -418,6 +434,30 @@ def check_truncated(self, obs: EnvObs, info: Dict[str, Any]) -> torch.Tensor: """ return torch.zeros(self.num_envs, dtype=torch.bool, device=self.device) + def _extend_reward( + self, + rewards: torch.Tensor, + obs: EnvObs, + action: EnvAction, + info: Dict[str, Any], + **kwargs, + ) -> torch.Tensor: + """Extend the reward computation. + + Overwrite this function to extend or modify the reward computation. + + Args: + rewards: The base reward tensor. + obs: The observation from the environment. + action: The action applied to the robot agent. + info: The info dictionary. + **kwargs: Additional keyword arguments. + + Returns: + The extended reward tensor. + """ + return rewards + def get_reward( self, obs: EnvObs, @@ -438,7 +478,9 @@ def get_reward( The reward for the current step. """ - return torch.zeros(self.num_envs, dtype=torch.float32, device=self.device) + rewards = torch.zeros(self.num_envs, dtype=torch.float32, device=self.device) + + return rewards def _step_action(self, action: EnvAction) -> EnvAction: """Set action control command into simulation. @@ -502,6 +544,9 @@ def step( obs = self.get_obs(**kwargs) info = self.get_info(**kwargs) rewards = self.get_reward(obs=obs, action=action, info=info) + rewards = self._extend_reward( + rewards=rewards, obs=obs, action=action, info=info + ) terminateds = torch.logical_or( info.get( diff --git a/embodichain/lab/gym/envs/embodied_env.py b/embodichain/lab/gym/envs/embodied_env.py index 18c23e0..e6f54d6 100644 --- a/embodichain/lab/gym/envs/embodied_env.py +++ b/embodichain/lab/gym/envs/embodied_env.py @@ -42,6 +42,7 @@ from embodichain.lab.gym.envs.managers import ( EventManager, ObservationManager, + RewardManager, DatasetManager, ) from embodichain.lab.gym.utils.registration import register_env @@ -91,6 +92,13 @@ class EnvLightCfg: Please refer to the :class:`embodichain.lab.gym.managers.ObservationManager` class for more details. """ + rewards: Union[object, None] = None + """Reward settings. Defaults to None, in which case no reward computation is performed through + the reward manager. + + Please refer to the :class:`embodichain.lab.gym.managers.RewardManager` class for more details. + """ + dataset: Union[object, None] = None """Dataset settings. Defaults to None, in which case no dataset collection is performed. @@ -175,6 +183,9 @@ def _init_sim_state(self, **kwargs): if self.cfg.observations: self.observation_manager = ObservationManager(self.cfg.observations, self) + if self.cfg.rewards: + self.reward_manager = RewardManager(self.cfg.rewards, self) + if self.cfg.dataset: self.dataset_manager = DatasetManager(self.cfg.dataset, self) @@ -283,6 +294,21 @@ def _extend_obs(self, obs: EnvObs, **kwargs) -> EnvObs: obs = self.observation_manager.compute(obs) return obs + def _extend_reward( + self, + rewards: torch.Tensor, + obs: EnvObs, + action: EnvAction, + info: Dict[str, Any], + **kwargs, + ) -> torch.Tensor: + if self.reward_manager: + rewards, reward_info = self.reward_manager.compute( + obs=obs, action=action, info=info + ) + info["rewards"] = reward_info + return rewards + def _prepare_scene(self, **kwargs) -> None: self._setup_lights() self._setup_background() @@ -306,6 +332,10 @@ def _initialize_episode( if "reset" in self.event_manager.available_modes: self.event_manager.apply(mode="reset", env_ids=env_ids) + # reset reward manager for environments that need a reset + if self.cfg.rewards: + self.reward_manager.reset(env_ids=env_ids) + def _step_action(self, action: EnvAction) -> EnvAction: """Set action control command into simulation. diff --git a/embodichain/lab/gym/envs/managers/__init__.py b/embodichain/lab/gym/envs/managers/__init__.py index e38f4f2..88c96ef 100644 --- a/embodichain/lab/gym/envs/managers/__init__.py +++ b/embodichain/lab/gym/envs/managers/__init__.py @@ -19,10 +19,12 @@ SceneEntityCfg, EventCfg, ObservationCfg, + RewardCfg, DatasetFunctorCfg, ) from .manager_base import Functor, ManagerBase from .event_manager import EventManager from .observation_manager import ObservationManager +from .reward_manager import RewardManager from .dataset_manager import DatasetManager from .datasets import * diff --git a/embodichain/lab/gym/envs/managers/cfg.py b/embodichain/lab/gym/envs/managers/cfg.py index 07888c9..6aeb280 100644 --- a/embodichain/lab/gym/envs/managers/cfg.py +++ b/embodichain/lab/gym/envs/managers/cfg.py @@ -231,7 +231,7 @@ def resolve(self, scene: SimulationManager): def _resolve_joint_names(self, scene: SimulationManager): # convert joint names to indices based on regex if self.joint_names is not None or self.joint_ids != slice(None): - entity: Articulation = scene[self.uid] + entity: Articulation = scene.get_articulation(self.uid) # -- if both are not their default values, check if they are valid if self.joint_names is not None and self.joint_ids != slice(None): if isinstance(self.joint_names, str): @@ -272,7 +272,7 @@ def _resolve_joint_names(self, scene: SimulationManager): def _resolve_body_names(self, scene: SimulationManager): # convert body names to indices based on regex if self.body_names is not None or self.body_ids != slice(None): - entity: RigidObject = scene[self.uid] + entity: RigidObject = scene.get_rigid_object(self.uid) # -- if both are not their default values, check if they are valid if self.body_names is not None and self.body_ids != slice(None): if isinstance(self.body_names, str): @@ -311,6 +311,29 @@ def _resolve_body_names(self, scene: SimulationManager): self.body_names = [entity.body_names[i] for i in self.body_ids] +@configclass +class RewardCfg(FunctorCfg): + """Configuration for a reward functor. + + The reward functor is used to compute rewards for the environment. The `mode` attribute + determines how the reward is combined with existing rewards. + """ + + mode: Literal["add", "replace"] = "add" + """The mode for the reward computation. + + - `add`: The reward is added to the existing total reward. + - `replace`: The reward replaces the total reward (useful for single reward functions). + """ + + weight: float = 1.0 + """The weight multiplier for this reward term. + + This value is used to scale the reward before adding it to the total reward. + Default is 1.0 (no scaling). + """ + + @configclass class DatasetFunctorCfg(FunctorCfg): """Configuration for dataset collection functors. diff --git a/embodichain/lab/gym/envs/managers/observations.py b/embodichain/lab/gym/envs/managers/observations.py index d7ae016..7e500af 100644 --- a/embodichain/lab/gym/envs/managers/observations.py +++ b/embodichain/lab/gym/envs/managers/observations.py @@ -252,6 +252,51 @@ def compute_semantic_mask( return torch.stack(masks, dim=-1) +def get_robot_eef_pose( + env: "EmbodiedEnv", + obs: EnvObs, + part_name: str | None = None, + position_only: bool = False, +) -> torch.Tensor: + """Get robot end-effector pose using forward kinematics. + + Args: + env: The environment instance. + obs: The observation dictionary. + part_name: The name of the control part. If None, uses default part. + position_only: If True, returns only position (3D). If False, returns full pose (4x4 matrix). + + Returns: + A tensor of shape (num_envs, 3) if position_only=True, or (num_envs, 4, 4) otherwise. + """ + robot = env.robot + + if part_name is not None: + joint_ids = robot.get_joint_ids(part_name) + qpos = robot.body_data.qpos[:, joint_ids] + ee_pose = robot.compute_fk(name=part_name, qpos=qpos, to_matrix=True) + else: + qpos = robot.get_qpos() + ee_pose = robot.compute_fk(qpos=qpos, to_matrix=True) + + if position_only: + return ee_pose[:, :3, 3] + return ee_pose + + +def target_position( + env: "EmbodiedEnv", + obs: EnvObs, + target_pose_key: str = "goal_pose", +) -> torch.Tensor: + """Get virtual target position from env state.""" + state_attr = f"_{target_pose_key}s" + if hasattr(env, state_attr): + target_poses = getattr(env, state_attr) + return target_poses[:, :3, 3] + return torch.zeros(env.num_envs, 3, device=env.device) + + class compute_exteroception(Functor): """Compute the exteroception for the observation space. diff --git a/embodichain/lab/gym/envs/managers/randomization/spatial.py b/embodichain/lab/gym/envs/managers/randomization/spatial.py index 2437e88..c3e32d3 100644 --- a/embodichain/lab/gym/envs/managers/randomization/spatial.py +++ b/embodichain/lab/gym/envs/managers/randomization/spatial.py @@ -266,3 +266,86 @@ def randomize_robot_qpos( robot.set_qpos(qpos=current_qpos, env_ids=env_ids, joint_ids=joint_ids) env.sim.update(step=100) + + +def randomize_target_pose( + env: EmbodiedEnv, + env_ids: Union[torch.Tensor, None], + position_range: tuple[list[float], list[float]], + rotation_range: tuple[list[float], list[float]] | None = None, + relative_position: bool = False, + relative_rotation: bool = False, + reference_entity_cfg: SceneEntityCfg | None = None, + store_key: str = "target_pose", +) -> None: + """Randomize a virtual target pose and store in env state for use in get_info(). + + This function generates random target poses without requiring a physical object in the scene. + The generated poses are stored in env and should be exposed in get_info() for reward functors. + + Args: + env (EmbodiedEnv): The environment instance. + env_ids (Union[torch.Tensor, None]): The environment IDs to apply the randomization. + position_range (tuple[list[float], list[float]]): The range for the position randomization. + rotation_range (tuple[list[float], list[float]] | None): The range for the rotation randomization. + The rotation is represented as Euler angles (roll, pitch, yaw) in degree. + relative_position (bool): Whether to randomize the position relative to a reference entity. Default is False. + relative_rotation (bool): Whether to randomize the rotation relative to a reference entity. Default is False. + reference_entity_cfg (SceneEntityCfg | None): The reference entity for relative randomization. + If None and relative mode is True, uses world origin. + store_key (str): The key to store the target pose in env state. Default is "target_pose". + The pose will be stored in env._{store_key}s and should be exposed in info[store_key]. + """ + num_instance = len(env_ids) + + # Get reference pose if needed + if relative_position or relative_rotation: + if reference_entity_cfg is not None: + # Get reference entity pose + ref_obj = env.sim.get_rigid_object(reference_entity_cfg.uid) + if ref_obj is not None: + ref_pose = ref_obj.get_local_pose(to_matrix=True)[env_ids] + init_pos = ref_pose[:, :3, 3] + init_rot = ref_pose[:, :3, :3] + else: + # Fallback to world origin + init_pos = torch.zeros(num_instance, 3, device=env.device) + init_rot = ( + torch.eye(3, device=env.device) + .unsqueeze(0) + .repeat(num_instance, 1, 1) + ) + else: + # Use world origin as reference + init_pos = torch.zeros(num_instance, 3, device=env.device) + init_rot = ( + torch.eye(3, device=env.device).unsqueeze(0).repeat(num_instance, 1, 1) + ) + else: + # Absolute randomization, init values won't be used + init_pos = torch.zeros(num_instance, 3, device=env.device) + init_rot = ( + torch.eye(3, device=env.device).unsqueeze(0).repeat(num_instance, 1, 1) + ) + + # Generate random pose + pose = get_random_pose( + init_pos=init_pos, + init_rot=init_rot, + position_range=position_range, + rotation_range=rotation_range, + relative_position=relative_position, + relative_rotation=relative_rotation, + ) + + # Store in env state (to be exposed via get_info) + state_attr = f"_{store_key}" + if not hasattr(env, state_attr): + setattr( + env, + state_attr, + torch.zeros(env.num_envs, 4, 4, device=env.device, dtype=torch.float32), + ) + + target_poses = getattr(env, state_attr) + target_poses[env_ids] = pose diff --git a/embodichain/lab/gym/envs/managers/reward_manager.py b/embodichain/lab/gym/envs/managers/reward_manager.py new file mode 100644 index 0000000..b3e46de --- /dev/null +++ b/embodichain/lab/gym/envs/managers/reward_manager.py @@ -0,0 +1,229 @@ +# ---------------------------------------------------------------------------- +# Copyright (c) 2021-2025 DexForce Technology Co., Ltd. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ---------------------------------------------------------------------------- + +"""Reward manager for orchestrating reward computation in reinforcement learning tasks.""" + +from __future__ import annotations + +import inspect +import torch +from collections.abc import Sequence +from prettytable import PrettyTable +from typing import TYPE_CHECKING, Union + +from embodichain.utils import logger +from .manager_base import ManagerBase +from .cfg import RewardCfg + +if TYPE_CHECKING: + from embodichain.lab.gym.envs import EmbodiedEnv + + +class RewardManager(ManagerBase): + """Manager for orchestrating reward computation in reinforcement learning tasks. + + The reward manager computes rewards based on the current state of the environment and actions. + It supports multiple reward terms that can be combined through weighted summation. + + The reward manager offers two modes of operation: + - `add`: This mode computes a reward term and adds it to the total reward (weighted by the term's weight). + - `replace`: This mode replaces the total reward with the computed value (useful for single reward functions). + + Note: The config key is used as the unique identifier and display name for each reward functor. + """ + + _env: EmbodiedEnv + """The environment instance.""" + + def __init__(self, cfg: object, env: EmbodiedEnv): + """Initialize the reward manager. + + Args: + cfg: A configuration object or dictionary (``dict[str, RewardCfg]``). + env: An environment object. + """ + + self._mode_functor_names: dict[str, list[str]] = dict() + self._mode_functor_cfgs: dict[str, list[RewardCfg]] = dict() + self._mode_class_functor_cfgs: dict[str, list[RewardCfg]] = dict() + + # call the base class (this will parse the functors config) + super().__init__(cfg, env) + + def __str__(self) -> str: + """Returns: A string representation for reward manager.""" + functor_num = sum(len(v) for v in self._mode_functor_names.values()) + msg = f" contains {functor_num} active reward terms.\n" + + # add info on each mode + for mode in self._mode_functor_names: + # create table for functor information + table = PrettyTable() + table.title = f"Active Reward Terms in Mode: '{mode}'" + + table.field_names = ["Index", "Name", "Weight"] + table.align["Name"] = "l" + for index, name in enumerate(self._mode_functor_names[mode]): + functor_cfg = self._mode_functor_cfgs[mode][index] + weight = getattr(functor_cfg, "weight", 1.0) + table.add_row([index, name, f"{weight:.3f}"]) + + # convert table to string + msg += table.get_string() + msg += "\n" + return msg + + @property + def active_functors(self) -> dict[str, list[str]]: + """Name of active reward functors. + + The keys are the modes of reward computation and the values are the names of the reward functors. + """ + return self._mode_functor_names + + def reset(self, env_ids: Union[Sequence[int], None] = None) -> dict[str, float]: + """Reset reward terms that are stateful (implemented as classes). + + Args: + env_ids: The environment indices to reset. If None, all environments are reset. + + Returns: + An empty dictionary (no logging needed for reset). + """ + # call all functors that are classes + for mode_cfg in self._mode_class_functor_cfgs.values(): + for functor_cfg in mode_cfg: + functor_cfg.func.reset(env_ids=env_ids) + + # nothing to log here + return {} + + def compute( + self, + obs: "EnvObs", + action: "EnvAction", + info: dict, + ) -> tuple[torch.Tensor, dict[str, torch.Tensor]]: + """Compute the total reward by calling each reward functor. + + This function iterates over all the reward functors and calls them to compute individual + reward terms. The terms are then combined according to their mode and weight. + + Args: + obs: The observation from the environment. + action: The action applied to the robot. + info: Additional information dictionary. + + Returns: + A tuple containing: + - total_reward: The total reward for each environment (shape: [num_envs]). + - reward_info: A dictionary mapping reward term names to their values for logging. + + Raises: + ValueError: If the mode is not supported. + """ + # initialize total reward + total_reward = torch.zeros(self._env.num_envs, device=self._env.device) + reward_info = {} + + # iterate over all the reward functors + for mode, functor_cfgs in self._mode_functor_cfgs.items(): + for functor_name, functor_cfg in zip( + self._mode_functor_names[mode], functor_cfgs + ): + functor_cfg: RewardCfg + + # compute reward term + reward_term = functor_cfg.func( + self._env, obs=obs, action=action, info=info, **functor_cfg.params + ) + + # ensure reward is a tensor + if not isinstance(reward_term, torch.Tensor): + reward_term = torch.tensor( + reward_term, device=self._env.device, dtype=torch.float32 + ) + + # apply weight from config + weighted_reward = reward_term * functor_cfg.weight + + # combine reward based on mode + if mode == "add": + total_reward += weighted_reward + elif mode == "replace": + total_reward = weighted_reward + else: + logger.log_error(f"Unsupported reward mode '{mode}'.") + + # store for logging (use unweighted value for clarity) + reward_info[functor_name] = reward_term + + return total_reward, reward_info + + def get_functor_cfg(self, functor_name: str) -> RewardCfg: + """Gets the configuration for the specified functor. + + The method finds the functor by name by searching through all the modes. + It then returns the configuration of the functor with the first matching name. + + Args: + functor_name: The name of the reward functor. + + Returns: + The configuration of the reward functor. + + Raises: + ValueError: If the functor name is not found. + """ + for mode, functors in self._mode_functor_names.items(): + if functor_name in functors: + return self._mode_functor_cfgs[mode][functors.index(functor_name)] + logger.log_error(f"Reward functor '{functor_name}' not found.") + + def _prepare_functors(self): + # check if config is dict already + if isinstance(self.cfg, dict): + cfg_items = self.cfg.items() + else: + cfg_items = self.cfg.__dict__.items() + # iterate over all the functors + for functor_name, functor_cfg in cfg_items: + # check for non config + if functor_cfg is None: + continue + # check for valid config type + if not isinstance(functor_cfg, RewardCfg): + raise TypeError( + f"Configuration for the functor '{functor_name}' is not of type RewardCfg." + f" Received: '{type(functor_cfg)}'." + ) + + # resolve common parameters + self._resolve_common_functor_cfg(functor_name, functor_cfg, min_argc=4) + + # check if mode is a new mode + if functor_cfg.mode not in self._mode_functor_names: + # add new mode + self._mode_functor_names[functor_cfg.mode] = list() + self._mode_functor_cfgs[functor_cfg.mode] = list() + self._mode_class_functor_cfgs[functor_cfg.mode] = list() + # add functor name and parameters + self._mode_functor_names[functor_cfg.mode].append(functor_name) + self._mode_functor_cfgs[functor_cfg.mode].append(functor_cfg) + + # check if the functor is a class + if inspect.isclass(functor_cfg.func): + self._mode_class_functor_cfgs[functor_cfg.mode].append(functor_cfg) diff --git a/embodichain/lab/gym/envs/managers/rewards.py b/embodichain/lab/gym/envs/managers/rewards.py new file mode 100644 index 0000000..5cfbef5 --- /dev/null +++ b/embodichain/lab/gym/envs/managers/rewards.py @@ -0,0 +1,632 @@ +# ---------------------------------------------------------------------------- +# Copyright (c) 2021-2025 DexForce Technology Co., Ltd. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ---------------------------------------------------------------------------- + +"""Common reward functors for reinforcement learning tasks.""" + +from __future__ import annotations + +import torch +from typing import TYPE_CHECKING + +from embodichain.lab.gym.envs.managers.cfg import SceneEntityCfg + +if TYPE_CHECKING: + from embodichain.lab.gym.envs import EmbodiedEnv + + +def distance_between_objects( + env: EmbodiedEnv, + obs: dict, + action: torch.Tensor, + info: dict, + source_entity_cfg: SceneEntityCfg = None, + target_entity_cfg: SceneEntityCfg = None, + exponential: bool = False, + sigma: float = 1.0, +) -> torch.Tensor: + """Reward based on distance between two rigid objects. + + Encourages the source object to get closer to the target object. Can use either + linear negative distance or exponential Gaussian-shaped reward. + + Args: + source_entity_cfg: Configuration for the source object (e.g., {"uid": "cube"}) + target_entity_cfg: Configuration for the target object (e.g., {"uid": "goal_sphere"}) + exponential: If True, use exponential reward exp(-d²/2σ²), else use -distance + sigma: Standard deviation for exponential reward (controls reward spread) + + Returns: + Reward tensor of shape (num_envs,). Higher when objects are closer. + - Linear mode: ranges from -inf to 0 (0 when objects touch) + - Exponential mode: ranges from 0 to 1 (1 when objects touch) + + Example: + ```json + { + "func": "distance_between_objects", + "weight": 0.5, + "params": { + "source_entity_cfg": {"uid": "cube"}, + "target_entity_cfg": {"uid": "target"}, + "exponential": true, + "sigma": 0.2 + } + } + ``` + """ + # get source entity position + source_obj = env.sim.get_rigid_object(source_entity_cfg.uid) + source_pos = source_obj.get_local_pose(to_matrix=True)[:, :3, 3] + + # get target entity position + target_obj = env.sim.get_rigid_object(target_entity_cfg.uid) + target_pos = target_obj.get_local_pose(to_matrix=True)[:, :3, 3] + + # compute distance + distance = torch.norm(source_pos - target_pos, dim=-1) + + # compute reward + if exponential: + # exponential reward: exp(-distance^2 / (2 * sigma^2)) + reward = torch.exp(-(distance**2) / (2 * sigma**2)) + else: + # negative distance reward + reward = -distance + + return reward + + +def joint_velocity_penalty( + env: EmbodiedEnv, + obs: dict, + action: torch.Tensor, + info: dict, + robot_uid: str = "robot", + joint_ids: slice | list[int] | None = None, + part_name: str | None = None, +) -> torch.Tensor: + """Penalize high joint velocities to encourage smooth motion. + + Computes the L2 norm of joint velocities and returns negative value as penalty. + Useful for preventing jerky or unstable robot movements. + + Args: + robot_uid: Robot entity UID in simulation (default: "robot") + joint_ids: Specific joint indices to penalize. Takes priority over part_name. + Example: [0, 1, 2] or slice(0, 6) + part_name: Control part name (e.g., "arm"). Used only if joint_ids is None. + Will penalize all joints in the specified part. + + Returns: + Penalty tensor of shape (num_envs,). Always negative or zero. + Magnitude increases with joint velocity (larger velocity = more negative). + + Example: + ```json + { + "func": "joint_velocity_penalty", + "weight": 0.001, + "params": { + "robot_uid": "robot", + "part_name": "arm" + } + } + ``` + """ + robot = env.sim.get_robot(robot_uid) + + # get joint velocities + if joint_ids is not None: + qvel = robot.get_qvel()[:, joint_ids] + elif part_name is not None: + qvel = robot.get_qvel(name=part_name) + else: + qvel = robot.get_qvel() + + # compute L2 norm of joint velocities + velocity_norm = torch.norm(qvel, dim=-1) + + # negative penalty (higher velocity = more negative reward) + return -velocity_norm + + +def action_smoothness_penalty( + env: EmbodiedEnv, + obs: dict, + action: torch.Tensor, + info: dict, +) -> torch.Tensor: + """Penalize large action changes between consecutive timesteps. + + Encourages smooth control commands by penalizing sudden changes in actions. + Stores previous action in env._reward_states for comparison. + + Returns: + Penalty tensor of shape (num_envs,). Zero on first call (no previous action), + negative on subsequent calls (larger change = more negative). + + Note: + This function maintains state across calls using env._reward_states['prev_actions']. + State is automatically reset when the environment resets. + + Example: + ```json + { + "func": "action_smoothness_penalty", + "weight": 0.01, + "params": {} + } + ``` + """ + # Use dictionary-based state management + if not hasattr(env, "_reward_states"): + env._reward_states = {} + + # compute difference between current and previous action + if "prev_actions" in env._reward_states: + action_diff = action - env._reward_states["prev_actions"] + penalty = -torch.norm(action_diff, dim=-1) + else: + # no previous action, no penalty + penalty = torch.zeros(env.num_envs, device=env.device) + + # store current action for next step + env._reward_states["prev_actions"] = action.clone() + + return penalty + + +def joint_limit_penalty( + env: EmbodiedEnv, + obs: dict, + action: torch.Tensor, + info: dict, + robot_uid: str = "robot", + joint_ids: slice | list[int] = slice(None), + margin: float = 0.1, +) -> torch.Tensor: + """Penalize robot joints that are close to their position limits. + + Prevents joints from reaching their physical limits, which can cause instability + or singularities. Penalty increases as joints approach limits within the margin. + + Args: + robot_uid: Robot entity UID in simulation (default: "robot") + joint_ids: Joint indices to monitor (default: all joints) + margin: Normalized distance threshold (0 to 1). Penalty applied when joint + is within this fraction of its range from either limit. + Example: 0.1 means penalty when within 10% of limits. + + Returns: + Penalty tensor of shape (num_envs,). Always negative or zero. + Sum of penalties across all monitored joints. + + Example: + ```json + { + "func": "joint_limit_penalty", + "weight": 0.01, + "params": { + "robot_uid": "robot", + "joint_ids": [0, 1, 2, 3, 4, 5], + "margin": 0.1 + } + } + ``` + """ + robot = env.sim.get_robot(robot_uid) + + # get joint positions and limits + qpos = robot.get_qpos()[:, joint_ids] + qpos_limits = robot.get_qpos_limits()[:, joint_ids, :] + + # compute normalized position in range [0, 1] + qpos_normalized = (qpos - qpos_limits[:, :, 0]) / ( + qpos_limits[:, :, 1] - qpos_limits[:, :, 0] + ) + + # compute distance to limits (minimum of distance to lower and upper limit) + dist_to_lower = qpos_normalized + dist_to_upper = 1.0 - qpos_normalized + dist_to_limit = torch.min(dist_to_lower, dist_to_upper) + + # penalize joints within margin of limits + penalty_mask = dist_to_limit < margin + penalty = torch.where( + penalty_mask, + -(margin - dist_to_limit), # negative penalty + torch.zeros_like(dist_to_limit), + ) + + # sum over all joints + return penalty.sum(dim=-1) + + +def orientation_alignment( + env: EmbodiedEnv, + obs: dict, + action: torch.Tensor, + info: dict, + source_entity_cfg: SceneEntityCfg = None, + target_entity_cfg: SceneEntityCfg = None, +) -> torch.Tensor: + """Reward rotational alignment between two rigid objects. + + Encourages the source object's orientation to match the target object's orientation. + Uses rotation matrix trace to measure alignment. + + Args: + source_entity_cfg: Configuration for the source object (e.g., {"uid": "cube"}) + target_entity_cfg: Configuration for the target object (e.g., {"uid": "reference"}) + + Returns: + Reward tensor of shape (num_envs,). Ranges from -1 to 1. + - 1.0: Perfect alignment (same orientation) + - 0.0: 90° rotation difference + - -1.0: 180° rotation difference (opposite orientation) + + Example: + ```json + { + "func": "orientation_alignment", + "weight": 0.5, + "params": { + "source_entity_cfg": {"uid": "object"}, + "target_entity_cfg": {"uid": "goal_object"} + } + } + ``` + """ + # get source entity rotation matrix + source_obj = env.sim.get_rigid_object(source_entity_cfg.uid) + source_rot = source_obj.get_local_pose(to_matrix=True)[:, :3, :3] + + # get target entity rotation matrix + target_obj = env.sim.get_rigid_object(target_entity_cfg.uid) + target_rot = target_obj.get_local_pose(to_matrix=True)[:, :3, :3] + + # compute rotation difference + rot_diff = torch.bmm(source_rot, target_rot.transpose(-1, -2)) + + # trace of rotation matrix difference (measure of alignment) + # trace = 1 + 2*cos(theta) for rotation by angle theta + # normalized to range [0, 1] where 1 is perfect alignment + trace = rot_diff.diagonal(dim1=-2, dim2=-1).sum(-1) + alignment = (trace - 1.0) / 2.0 # normalize to [-1, 1] + + return alignment + + +def success_reward( + env: EmbodiedEnv, + obs: dict, + action: torch.Tensor, + info: dict, +) -> torch.Tensor: + """Sparse bonus reward when task succeeds. + + Provides a fixed reward when the task success condition is met. + Reads success status from info['success'] which should be set by the environment. + + Returns: + Reward tensor of shape (num_envs,). + - 1.0 when successful + - 0.0 when not successful or if 'success' key missing + + Note: + The environment's get_info() must populate info['success'] with a boolean + tensor indicating success status for each environment. + + Example: + ```json + { + "func": "success_reward", + "weight": 10.0, + "params": {} + } + ``` + """ + # Check if success info is available in info dict + if "success" in info: + success = info["success"] + if isinstance(success, bool): + success = torch.tensor([success], device=env.device, dtype=torch.bool) + elif not isinstance(success, torch.Tensor): + success = torch.tensor(success, device=env.device, dtype=torch.bool) + else: + # No success info available + return torch.zeros(env.num_envs, device=env.device) + + # return reward + return torch.where( + success, + torch.ones(env.num_envs, device=env.device), + torch.zeros(env.num_envs, device=env.device), + ) + + +def reaching_behind_object( + env: EmbodiedEnv, + obs: dict, + action: torch.Tensor, + info: dict, + object_cfg: SceneEntityCfg = None, + target_pose_key: str = "goal_pose", + behind_offset: float = 0.015, + height_offset: float = 0.015, + distance_scale: float = 5.0, + part_name: str = None, +) -> torch.Tensor: + """Reward for positioning end-effector behind object for pushing. + + Encourages the robot's end-effector to reach a position behind the object along + the object-to-goal direction. Useful for push manipulation tasks. + + Args: + object_cfg: Configuration for the object to push (e.g., {"uid": "cube"}) + target_pose_key: Key in info dict for goal pose (default: "goal_pose") + Can be (num_envs, 3) position or (num_envs, 4, 4) transform + behind_offset: Distance behind object to reach (in meters, default: 0.015) + height_offset: Additional height above object (in meters, default: 0.015) + distance_scale: Scaling factor for tanh function (higher = steeper, default: 5.0) + part_name: Robot part name for FK computation (e.g., "arm") + + Returns: + Reward tensor of shape (num_envs,). Ranges from 0 to 1. + - 1.0: End-effector at ideal pushing position + - 0.0: End-effector far from ideal position + + Example: + ```json + { + "func": "reaching_behind_object", + "weight": 0.1, + "params": { + "object_cfg": {"uid": "cube"}, + "target_pose_key": "goal_pose", + "behind_offset": 0.015, + "height_offset": 0.015, + "distance_scale": 5.0, + "part_name": "arm" + } + } + ``` + """ + # get end effector position from robot FK + robot = env.robot + joint_ids = robot.get_joint_ids(part_name) + qpos = robot.get_qpos()[:, joint_ids] + ee_pose = robot.compute_fk(name=part_name, qpos=qpos, to_matrix=True) + ee_pos = ee_pose[:, :3, 3] + + # get object position + obj = env.sim.get_rigid_object(object_cfg.uid) + obj_pos = obj.get_local_pose(to_matrix=True)[:, :3, 3] + + # get goal position from info + if target_pose_key not in info: + raise ValueError( + f"Target pose '{target_pose_key}' not found in info dict. " + f"Make sure to provide it in get_info()." + ) + + target_poses = info[target_pose_key] + if target_poses.dim() == 2: # (num_envs, 3) + goal_pos = target_poses + else: # (num_envs, 4, 4) + goal_pos = target_poses[:, :3, 3] + + # compute push direction (from object to goal) + push_direction = goal_pos - obj_pos + push_dir_norm = torch.norm(push_direction, dim=-1, keepdim=True) + 1e-6 + push_dir_normalized = push_direction / push_dir_norm + + # compute target "behind" position + height_vec = torch.tensor( + [0, 0, height_offset], device=env.device, dtype=torch.float32 + ) + target_pos = obj_pos - behind_offset * push_dir_normalized + height_vec + + # distance to target position + ee_to_target_dist = torch.norm(ee_pos - target_pos, dim=-1) + + # tanh-shaped reward (1.0 when at target, 0.0 when far) + reward = 1.0 - torch.tanh(distance_scale * ee_to_target_dist) + + return reward + + +def distance_to_target( + env: "EmbodiedEnv", + obs: dict, + action: torch.Tensor, + info: dict, + source_entity_cfg: SceneEntityCfg = None, + target_pose_key: str = "target_pose", + exponential: bool = False, + sigma: float = 1.0, + use_xy_only: bool = False, +) -> torch.Tensor: + """Reward based on absolute distance to a virtual target pose. + + Encourages an object to get closer to a target pose specified in the info dict. + Unlike incremental_distance_to_target, this provides direct distance-based reward. + + Args: + source_entity_cfg: Configuration for the object (e.g., {"uid": "cube"}) + target_pose_key: Key in info dict for target pose (default: "target_pose") + Can be (num_envs, 3) position or (num_envs, 4, 4) transform + exponential: If True, use exponential reward exp(-d²/2σ²), else use -distance + sigma: Standard deviation for exponential reward (default: 1.0) + use_xy_only: If True, ignore z-axis and only consider horizontal distance + + Returns: + Reward tensor of shape (num_envs,). + - Linear mode: -distance (negative, approaches 0 when close) + - Exponential mode: exp(-d²/2σ²) (0 to 1, approaches 1 when close) + + Example: + ```json + { + "func": "distance_to_target", + "weight": 0.5, + "params": { + "source_entity_cfg": {"uid": "cube"}, + "target_pose_key": "goal_pose", + "exponential": false, + "use_xy_only": true + } + } + ``` + """ + # get source entity position + source_obj = env.sim.get_rigid_object(source_entity_cfg.uid) + source_pos = source_obj.get_local_pose(to_matrix=True)[:, :3, 3] + + # get target position from info + if target_pose_key not in info: + raise ValueError( + f"Target pose '{target_pose_key}' not found in info dict. " + f"Make sure to provide it in get_info()." + ) + + target_poses = info[target_pose_key] + if target_poses.dim() == 2: # (num_envs, 3) + target_pos = target_poses + else: # (num_envs, 4, 4) + target_pos = target_poses[:, :3, 3] + + # compute distance + if use_xy_only: + distance = torch.norm(source_pos[:, :2] - target_pos[:, :2], dim=-1) + else: + distance = torch.norm(source_pos - target_pos, dim=-1) + + # compute reward + if exponential: + # exponential reward: exp(-distance^2 / (2 * sigma^2)) + reward = torch.exp(-(distance**2) / (2 * sigma**2)) + else: + # negative distance reward + reward = -distance + + return reward + + +def incremental_distance_to_target( + env: "EmbodiedEnv", + obs: dict, + action: torch.Tensor, + info: dict, + source_entity_cfg: SceneEntityCfg = None, + target_pose_key: str = "target_pose", + tanh_scale: float = 10.0, + positive_weight: float = 1.0, + negative_weight: float = 1.0, + use_xy_only: bool = False, +) -> torch.Tensor: + """Incremental reward for progress toward a virtual target pose. + + Rewards the robot for getting closer to the target compared to previous timestep. + Stores previous distance in env._reward_states for comparison. Uses tanh shaping + to normalize rewards and supports asymmetric weighting for approach vs. retreat. + + Args: + source_entity_cfg: Configuration for the object (e.g., {"uid": "cube"}) + target_pose_key: Key in info dict for target pose (default: "target_pose") + Can be (num_envs, 3) position or (num_envs, 4, 4) transform + tanh_scale: Scaling for tanh normalization (higher = more sensitive, default: 10.0) + positive_weight: Multiplier for reward when getting closer (default: 1.0) + negative_weight: Multiplier for penalty when moving away (default: 1.0) + use_xy_only: If True, ignore z-axis and only consider horizontal distance + + Returns: + Reward tensor of shape (num_envs,). Zero on first call, then: + - Positive when getting closer (scaled by positive_weight) + - Negative when moving away (scaled by negative_weight) + - Magnitude bounded by tanh function + + Note: + This function maintains state using env._reward_states[f"prev_dist_{uid}_{key}"]. + State is automatically reset when the environment resets. + + Example: + ```json + { + "func": "incremental_distance_to_target", + "weight": 1.0, + "params": { + "source_entity_cfg": {"uid": "cube"}, + "target_pose_key": "goal_pose", + "tanh_scale": 10.0, + "positive_weight": 2.0, + "negative_weight": 0.5, + "use_xy_only": true + } + } + ``` + """ + # get source entity position + source_obj = env.sim.get_rigid_object(source_entity_cfg.uid) + source_pos = source_obj.get_local_pose(to_matrix=True)[:, :3, 3] + + # get target position from info + if target_pose_key not in info: + raise ValueError( + f"Target pose '{target_pose_key}' not found in info dict. " + f"Make sure to provide it in get_info()." + ) + + target_poses = info[target_pose_key] + if target_poses.dim() == 2: # (num_envs, 3) + target_pos = target_poses + else: # (num_envs, 4, 4) + target_pos = target_poses[:, :3, 3] + + # compute current distance + if use_xy_only: + current_dist = torch.norm(source_pos[:, :2] - target_pos[:, :2], dim=-1) + else: + current_dist = torch.norm(source_pos - target_pos, dim=-1) + + # initialize previous distance on first call + # Use dictionary-based state management for better organization + if not hasattr(env, "_reward_states"): + env._reward_states = {} + + state_key = f"prev_dist_{source_entity_cfg.uid}_{target_pose_key}" + if state_key not in env._reward_states: + env._reward_states[state_key] = current_dist.clone() + return torch.zeros(env.num_envs, device=env.device) + + # compute distance delta (positive = getting closer) + prev_dist = env._reward_states[state_key] + distance_delta = prev_dist - current_dist + + # apply tanh shaping + distance_delta_normalized = torch.tanh(tanh_scale * distance_delta) + + # asymmetric weighting + reward = torch.where( + distance_delta_normalized >= 0, + positive_weight * distance_delta_normalized, + negative_weight * distance_delta_normalized, + ) + + # update previous distance + env._reward_states[state_key] = current_dist.clone() + + return reward diff --git a/embodichain/lab/gym/envs/rl_env_cfg.py b/embodichain/lab/gym/envs/rl_env_cfg.py deleted file mode 100644 index 3444874..0000000 --- a/embodichain/lab/gym/envs/rl_env_cfg.py +++ /dev/null @@ -1,33 +0,0 @@ -# ---------------------------------------------------------------------------- -# Copyright (c) 2021-2025 DexForce Technology Co., Ltd. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# ---------------------------------------------------------------------------- - -from typing import Any, Dict - -from embodichain.lab.gym.envs.embodied_env import EmbodiedEnvCfg -from embodichain.utils import configclass - - -@configclass -class RLEnvCfg(EmbodiedEnvCfg): - """Extended configuration for RL environments built from gym-style specs.""" - - env_id: str = "" - extensions: Dict[str, Any] = {} - - @classmethod - def from_dict(cls, d): - """Create an instance from a dictionary.""" - return cls(**d) diff --git a/embodichain/lab/gym/envs/tasks/rl/__init__.py b/embodichain/lab/gym/envs/tasks/rl/__init__.py index f8cf303..be52afc 100644 --- a/embodichain/lab/gym/envs/tasks/rl/__init__.py +++ b/embodichain/lab/gym/envs/tasks/rl/__init__.py @@ -18,10 +18,10 @@ from copy import deepcopy from embodichain.lab.gym.utils import registration as env_registry -from embodichain.lab.gym.envs.rl_env_cfg import RLEnvCfg +from embodichain.lab.gym.envs.embodied_env import EmbodiedEnvCfg -def build_env(env_id: str, base_env_cfg: RLEnvCfg): +def build_env(env_id: str, base_env_cfg: EmbodiedEnvCfg): """Create env from registry id, auto-inferring cfg class (EnvName -> EnvNameCfg).""" env = env_registry.make(env_id, cfg=deepcopy(base_env_cfg)) return env diff --git a/embodichain/lab/gym/envs/tasks/rl/push_cube.py b/embodichain/lab/gym/envs/tasks/rl/push_cube.py index 4aef16c..072574f 100644 --- a/embodichain/lab/gym/envs/tasks/rl/push_cube.py +++ b/embodichain/lab/gym/envs/tasks/rl/push_cube.py @@ -38,27 +38,13 @@ def __init__(self, cfg=None, **kwargs): if cfg is None: cfg = EmbodiedEnvCfg() - extensions = getattr(cfg, "extensions", {}) or {} - - # cfg.sim_cfg.enable_rt = True - - defaults = { - "success_threshold": 0.1, - "reaching_reward_weight": 0.1, - "place_reward_weight": 2.0, - "place_penalty_weight": 0.5, - "action_penalty_weight": 0.01, - "success_bonus_weight": 10.0, - } - for name, default in defaults.items(): - value = extensions.get(name, getattr(cfg, name, default)) - setattr(cfg, name, value) - setattr(self, name, getattr(cfg, name)) - - self.last_cube_goal_dist = None - super().__init__(cfg, **kwargs) + @property + def goal_pose(self) -> torch.Tensor: + """Get current goal poses (4x4 matrices) for all environments.""" + return self._goal_pose + def _draw_goal_marker(self): """Draw axis marker at goal position for visualization.""" goal_sphere = self.sim.get_rigid_object("goal_sphere") @@ -87,32 +73,10 @@ def _draw_goal_marker(self): ) self.sim.draw_marker(cfg=marker_cfg) - def _init_sim_state(self, **kwargs): - super()._init_sim_state(**kwargs) - self.single_action_space = spaces.Box( - low=-self.joint_limits, - high=self.joint_limits, - shape=(6,), - dtype=np.float32, - ) - if self.obs_mode == "state": - self.single_observation_space = spaces.Box( - low=-np.inf, high=np.inf, shape=(15,), dtype=np.float32 - ) - def _initialize_episode( self, env_ids: Sequence[int] | None = None, **kwargs ) -> None: super()._initialize_episode(env_ids=env_ids, **kwargs) - cube = self.sim.get_rigid_object("cube") - - # Calculate previous distance (for incremental reward) based on current (possibly randomized) pose - cube_pos = cube.body_data.pose[:, :3] - goal_sphere = self.sim.get_rigid_object("goal_sphere") - goal_pos = goal_sphere.body_data.pose[ - :, :3 - ] # Get actual goal positions for each environment - self.last_cube_goal_dist = torch.norm(cube_pos[:, :2] - goal_pos[:, :2], dim=1) # Draw marker at goal position # self._draw_goal_marker() @@ -128,84 +92,29 @@ def _step_action(self, action: EnvAction) -> EnvAction: self.robot.set_qpos(qpos=target_qpos) return scaled_action - def get_obs(self, **kwargs) -> EnvObs: - qpos_all = self.robot.body_data.qpos[:, :6] - ee_pose_matrix = self.robot.compute_fk( - name="arm", qpos=qpos_all, to_matrix=True - ) - ee_pos_all = ee_pose_matrix[:, :3, 3] + def get_info(self, **kwargs) -> Dict[str, Any]: cube = self.sim.get_rigid_object("cube") - cube_pos_all = cube.body_data.pose[:, :3] - # Get actual goal positions for each environment - goal_sphere = self.sim.get_rigid_object("goal_sphere") - goal_pos_all = goal_sphere.body_data.pose[:, :3] - if self.obs_mode == "state": - return torch.cat([qpos_all, ee_pos_all, cube_pos_all, goal_pos_all], dim=1) - return { - "robot": {"qpos": qpos_all, "ee_pos": ee_pos_all}, - "object": {"cube_pos": cube_pos_all, "goal_pos": goal_pos_all}, - } + cube_pos = cube.body_data.pose[:, :3] - def get_reward( - self, obs: EnvObs, action: EnvAction, info: Dict[str, Any] - ) -> torch.Tensor: - if self.obs_mode == "state": - ee_pos = obs[:, 6:9] - cube_pos = obs[:, 9:12] - goal_pos = obs[:, 12:15] + # Get goal position from event-managed goal pose + if self.goal_pose is not None: + goal_pos = self.goal_pose[:, :3, 3] + xy_distance = torch.norm(cube_pos[:, :2] - goal_pos[:, :2], dim=1) + is_success = xy_distance < self.success_threshold else: - ee_pos = obs["robot"]["ee_pos"] - cube_pos = obs["object"]["cube_pos"] - goal_pos = obs["object"]["goal_pos"] - push_direction = goal_pos - cube_pos - push_dir_norm = torch.norm(push_direction, dim=1, keepdim=True) + 1e-6 - push_dir_normalized = push_direction / push_dir_norm - push_pose = ( - cube_pos - - 0.015 * push_dir_normalized - + torch.tensor([0, 0, 0.015], device=self.device, dtype=torch.float32) - ) - ee_to_push_dist = torch.norm(ee_pos - push_pose, dim=1) - reaching_reward_raw = 1.0 - torch.tanh(5.0 * ee_to_push_dist) - reaching_reward = self.reaching_reward_weight * reaching_reward_raw - cube_to_goal_dist = torch.norm(cube_pos[:, :2] - goal_pos[:, :2], dim=1) - distance_delta = 10.0 * (self.last_cube_goal_dist - cube_to_goal_dist) - distance_delta_normalized = torch.tanh(distance_delta) - place_reward = torch.where( - distance_delta_normalized >= 0, - self.place_reward_weight * distance_delta_normalized, - self.place_penalty_weight * distance_delta_normalized, - ) - self.last_cube_goal_dist = cube_to_goal_dist - action_magnitude = torch.norm(action, dim=1) - action_penalty = -self.action_penalty_weight * action_magnitude - success_bonus_raw = info["success"].float() - success_bonus = self.success_bonus_weight * success_bonus_raw - reward = reaching_reward + place_reward + action_penalty + success_bonus - # Organize reward components in a dedicated "rewards" dict - # This allows trainer to easily identify and log reward components - if "rewards" not in info: - info["rewards"] = {} - info["rewards"]["reaching_reward"] = reaching_reward - info["rewards"]["place_reward"] = place_reward - info["rewards"]["action_penalty"] = action_penalty - info["rewards"]["success_bonus"] = success_bonus - return reward + # Goal not yet set by randomize_target_pose event (e.g., before first reset) + xy_distance = torch.zeros(self.cfg.num_envs, device=self.device) + is_success = torch.zeros( + self.cfg.num_envs, device=self.device, dtype=torch.bool + ) - def get_info(self, **kwargs) -> Dict[str, Any]: - cube = self.sim.get_rigid_object("cube") - cube_pos = cube.body_data.pose[:, :3] - # Get actual goal positions for each environment - goal_sphere = self.sim.get_rigid_object("goal_sphere") - goal_pos = goal_sphere.body_data.pose[:, :3] - xy_distance = torch.norm(cube_pos[:, :2] - goal_pos[:, :2], dim=1) - is_success = xy_distance < self.success_threshold info = { "success": is_success, "fail": torch.zeros( self.cfg.num_envs, device=self.device, dtype=torch.bool ), "elapsed_steps": self._elapsed_steps, + "goal_pose": self.goal_pose, } info["metrics"] = { "distance_to_goal": xy_distance, diff --git a/embodichain/lab/gym/utils/gym_utils.py b/embodichain/lab/gym/utils/gym_utils.py index 6ebb6d6..dff96e9 100644 --- a/embodichain/lab/gym/utils/gym_utils.py +++ b/embodichain/lab/gym/utils/gym_utils.py @@ -323,24 +323,6 @@ def cat_tensor_with_ids( return out -def config_to_rl_cfg(config: dict) -> "RLEnvCfg": - """Parse gym-style configuration dict into an RL-ready config object.""" - - from embodichain.lab.gym.envs.rl_env_cfg import RLEnvCfg - - # Use config_to_cfg to parse shared fields - env_cfg = config_to_cfg(config) - # Convert to RLEnvCfg if needed - if not isinstance(env_cfg, RLEnvCfg): - env_cfg = RLEnvCfg.from_dict(env_cfg.__dict__) - # RL-specific fields - env_cfg.env_id = config.get("id") - env_cfg.num_envs = config["env"].get("num_envs", env_cfg.num_envs) - env_cfg.extensions = deepcopy(config.get("env", {}).get("extensions", {})) - # Add any RL-specific parsing here - return env_cfg - - def config_to_cfg(config: dict) -> "EmbodiedEnvCfg": """Parser configuration file into cfgs for env initialization. @@ -364,6 +346,7 @@ def config_to_cfg(config: dict) -> "EmbodiedEnvCfg": SceneEntityCfg, EventCfg, ObservationCfg, + RewardCfg, DatasetFunctorCfg, ) from embodichain.utils import configclass @@ -452,6 +435,7 @@ class ComponentCfg: env_cfg.articulation.append(cfg) env_cfg.sim_steps_per_control = config["env"].get("sim_steps_per_control", 4) + env_cfg.extensions = deepcopy(config.get("env", {}).get("extensions", {})) # TODO: support more env events, eg, grasp pose generation, mesh preprocessing, etc. @@ -472,8 +456,6 @@ class ComponentCfg: raise_if_not_found=True, ) - from embodichain.lab.gym.envs.managers import DatasetFunctorCfg - dataset = DatasetFunctorCfg( func=dataset_func, mode=dataset_params_modified["mode"], @@ -546,6 +528,47 @@ class ComponentCfg: setattr(env_cfg.observations, obs_name, observation) + env_cfg.rewards = ComponentCfg() + if "rewards" in config["env"]: + # Define modules to search for reward functions + reward_modules = [ + "embodichain.lab.gym.envs.managers.rewards", + ] + + for reward_name, reward_params in config["env"]["rewards"].items(): + reward_params_modified = deepcopy(reward_params) + + # Handle entity_cfg parameters + for param_key in [ + "entity_cfg", + "source_entity_cfg", + "target_entity_cfg", + "end_effector_cfg", + "object_cfg", + "goal_cfg", + "reference_entity_cfg", + ]: + if param_key in reward_params["params"]: + entity_cfg = SceneEntityCfg( + **reward_params_modified["params"][param_key] + ) + reward_params_modified["params"][param_key] = entity_cfg + + # Find the function from multiple modules using the utility function + reward_func = find_function_from_modules( + reward_params["func"], + reward_modules, + raise_if_not_found=True, + ) + + reward = RewardCfg( + func=reward_func, + mode=reward_params_modified["mode"], + params=reward_params_modified["params"], + ) + + setattr(env_cfg.rewards, reward_name, reward) + return env_cfg diff --git a/embodichain/lab/sim/sim_manager.py b/embodichain/lab/sim/sim_manager.py index 1f9c662..82075ad 100644 --- a/embodichain/lab/sim/sim_manager.py +++ b/embodichain/lab/sim/sim_manager.py @@ -158,14 +158,11 @@ class SimulationManager: - physics simulation management, eg. time step, manual update, etc. - interactive control via gizmo and window callbacks events. - This class implements the singleton pattern to ensure only one instance exists at a time. - Args: sim_config (SimulationManagerCfg, optional): simulation configuration. Defaults to SimulationManagerCfg(). """ - _instance = None - _is_initialized = False + _instances = {} SUPPORTED_SENSOR_TYPES = { "Camera": Camera, @@ -174,25 +171,27 @@ class SimulationManager: } def __new__(cls, sim_config: SimulationManagerCfg = SimulationManagerCfg()): - """Create or return the singleton instance.""" - if cls._instance is None: - cls._instance = super(SimulationManager, cls).__new__(cls) - return cls._instance + """Create or return the instance based on instance_id.""" + n_instance = len(list(cls._instances.keys())) + instance = super(SimulationManager, cls).__new__(cls) + # Store sim_config in the instance for use in __init__ or elsewhere + instance.sim_config = sim_config + cls._instances[n_instance] = instance + return instance def __init__( self, sim_config: SimulationManagerCfg = SimulationManagerCfg() ) -> None: - # Skip initialization if already initialized - if self._is_initialized: - logger.log_warning( - "SimulationManager is already initialized. Skipping re-initialization. " - "Use SimulationManager.get_instance() to get the existing instance or " - "SimulationManager.reset() to create a new instance." - ) - return + instance_id = SimulationManager.get_instance_num() - 1 # Mark as initialized - SimulationManager._is_initialized = True + self.instance_id = instance_id + + if sim_config.enable_rt and instance_id > 0: + logger.log_error( + f"Ray Tracing rendering backend is only supported for single instance (instance_id=0). " + ) + # Cache paths self._sim_cache_dir = SIM_CACHE_DIR self._material_cache_dir = MATERIAL_CACHE_DIR @@ -278,41 +277,52 @@ def __init__( self._build_multiple_arenas(sim_config.num_envs) @classmethod - def get_instance(cls) -> SimulationManager: - """Get the singleton instance of SimulationManager. + def get_instance(cls, instance_id: int = 0) -> SimulationManager: + """Get the instance of SimulationManager by id. + + Args: + instance_id (int): The instance id. Defaults to 0. Returns: - SimulationManager: The singleton instance. + SimulationManager: The instance. Raises: RuntimeError: If the instance has not been created yet. """ - if cls._instance is None: + if instance_id not in cls._instances: logger.log_error( - "SimulationManager has not been instantiated yet. " - "Create an instance first using SimulationManager(sim_config)." + f"SimulationManager (id={instance_id}) has not been instantiated yet. " + f"Create an instance first using SimulationManager(sim_config, instance_id={instance_id})." ) - return cls._instance + return cls._instances[instance_id] + + @classmethod + def get_instance_num(cls) -> int: + """Get the number of instantiated SimulationManager instances. + + Returns: + int: The number of instances. + """ + return len(cls._instances) @classmethod - def reset(cls) -> None: - """Reset the singleton instance. + def reset(cls, instance_id: int = 0) -> None: + """Reset the instance. This allows creating a new instance with different configuration. """ - if cls._instance is not None: - logger.log_info("Resetting SimulationManager singleton instance.") - cls._instance = None - cls._is_initialized = False + if instance_id in cls._instances: + logger.log_info(f"Resetting SimulationManager instance {instance_id}.") + del cls._instances[instance_id] @classmethod - def is_instantiated(cls) -> bool: - """Check if the singleton instance has been created. + def is_instantiated(cls, instance_id: int = 0) -> bool: + """Check if the instance has been created. Returns: bool: True if the instance exists, False otherwise. """ - return cls._instance is not None + return instance_id in cls._instances @property def num_envs(self) -> int: @@ -1560,4 +1570,4 @@ def destroy(self) -> None: self._env.clean() self._world.quit() - self.reset() + SimulationManager.reset(self.instance_id)