import sys from legged_gym import LEGGED_GYM_ROOT_...
Creado el: 8 de mayo de 2025
Creado el: 8 de mayo de 2025
import sys
from legged_gym import LEGGED_GYM_ROOT_DIR
import os
import isaacgym
from isaacgym import gymapi
from legged_gym.envs import *
from legged_gym.utils import get_args, export_policy_as_jit, export_policy_as_onnx, task_registry, Logger
import numpy as np
import torch
from termcolor import colored
import hydra
from omegaconf import DictConfig, OmegaConf
from easydict import EasyDict
from legged_gym.utils.helpers import class_to_dict
import xml.etree.ElementTree as ET
NOROSPY = False
try:
import rospy
except:
NOROSPY = True
command_state = {
'vel_forward': 0.0,
'vel_side': 0.0,
'orientation': 0.0,
}
override = False
EXPORT_ONNX = True
def dict_compare(d1, d2):
d1_keys = set(d1.keys())
d2_keys = set(d2.keys())
shared_keys = d1_keys.intersection(d2_keys)
added = d1_keys - d2_keys
removed = d2_keys - d1_keys
modified = {o : (d1[o], d2[o]) for o in shared_keys if d1[o] != d2[o]}
same = set(o for o in shared_keys if d1[o] == d2[o])
return added, removed, modified, same
def change_joint_types_to_fixed(urdf_path, joint_names, tmp_urdf_path):
# 读取并解析URDF文件
tree = ET.parse(urdf_path)
root = tree.getroot()
textjoint_names_set = set(joint_names) found_joints = set() # 查找所有joint元素 for joint in root.findall('joint'): joint_name = joint.get('name') if joint_name in joint_names_set: joint.set('type', 'fixed') found_joints.add(joint_name) print(f"Joint '{joint_name}' changed to fixed.") # 检查是否有未找到的joint not_found_joints = joint_names_set - found_joints for missing_joint in not_found_joints: print(f"Joint '{missing_joint}' not found!") # 保存修改后的URDF文件 tree.write(tmp_urdf_path, encoding='utf-8', xml_declaration=True)
@hydra.main(
version_base=None,
config_path="../cfg",
config_name="config_base",
)
def play(cfg_hydra: DictConfig) -> None:
cfg_hydra = EasyDict(OmegaConf.to_container(cfg_hydra, resolve=True))
cfg_hydra.physics_engine = gymapi.SIM_PHYSX
textif "fixed_joints" in cfg_hydra: #todo xmh 只有teacher的情况 fixed_joints = cfg_hydra.fixed_joints num_fixed_joints = len(fixed_joints) cfg_hydra.num_fixed = num_fixed_joints asset_urdf_path = cfg_hydra.asset.file tmp_output_path = os.path.dirname(asset_urdf_path) tmp_urdf_path = os.path.join(tmp_output_path, "tmp.urdf") change_joint_types_to_fixed(asset_urdf_path, fixed_joints, tmp_urdf_path) #cfg_hydra.asset.file = "/home/sjtu_smartcar/Desktop/xmh/robotics/human2humanoid/resources/robots/ym_humanoid/urdf/ym_new_origin_fixed.urdf" #cfg_hydra.asset.file = "/home/sjtu_smartcar/Desktop/xmh/robotics/human2humanoid/resources/robots/ym_humanoid/urdf/ym_new_origin_fixed_body.urdf" cfg_hydra.asset.file = tmp_urdf_path fixed_link_names = [link.replace('Joint', 'Link') for link in fixed_joints] if "Body_Link2" in fixed_link_names: cfg_hydra.domain_rand.randomize_base_com = False not_randomize_link_body_names_count = 0 for fixed_link_name in fixed_link_names: if fixed_link_name in cfg_hydra.domain_rand.randomize_link_body_names: cfg_hydra.domain_rand.randomize_link_body_names.remove(fixed_link_name) not_randomize_link_body_names_count += 1 if not cfg_hydra.train.distill: cfg_hydra.motion.teleop_selected_keypoints_names.remove(fixed_link_name) else: cfg_hydra.train.distill_model_config.teleop_selected_keypoints_names.remove(fixed_link_name) cfg_hydra.env.num_actions -= num_fixed_joints if not cfg_hydra.train.distill: cfg_hydra.env.num_observations -= num_fixed_joints * 40 cfg_hydra.env.num_privileged_obs = cfg_hydra.env.num_privileged_obs - num_fixed_joints * 40 - 3* num_fixed_joints - not_randomize_link_body_names_count #todo xmh self._ground_friction_values = num_bodies cfg_hydra.motion.num_dof_pos_reference -= num_fixed_joints cfg_hydra.motion.num_dof_vel_reference -= num_fixed_joints joint_list = list(cfg_hydra.init_state.default_joint_angles.keys()) fix_dof_index = [] fix_pos_index = [] for fixed_joint in fixed_joints: fix_pos_index.append(joint_list.index(fixed_joint) + 1) fix_dof_index.append(joint_list.index(fixed_joint)) cfg_hydra.rewards.teleop_joint_pos_selection.pop(fixed_joint) cfg_hydra.motion.visualize_config.marker_joint_colors = [item for idx, item in enumerate(cfg_hydra.motion.visualize_config.marker_joint_colors) if idx not in fix_pos_index] cfg_hydra.fix_dof_indices = fix_dof_index cfg_hydra.fix_pos_indices = fix_pos_index cfg_hydra.asset.clip_motion_goal = False #xmh cfg_hydra.leg_joint1_indice = 18 - num_fixed_joints #11 default_selected_keypoints = [9, 11, 18] filtered_list1 = [x for x in default_selected_keypoints if x not in fix_pos_index] cfg_hydra.selected_keypoints = [x - sum(1 for y in fix_pos_index if y < x) for x in filtered_list1] #todo 去掉domainrand和noise num_selected_keypoints = len(cfg_hydra.selected_keypoints) if cfg_hydra.train.distill_model_config: cfg_hydra.train.distill_model_config.num_observations = 1236 - num_fixed_joints * 40 cfg_hydra.train.distill_model_config.num_privileged_obs = 1353 - num_fixed_joints * 40 - 3* num_fixed_joints - not_randomize_link_body_names_count cfg_hydra.env.num_observations = num_selected_keypoints * (3 + 3 + 3) + (30 - num_fixed_joints) * 3 + 3 +3 + cfg_hydra.env.short_history_length* (96 - 3*cfg_hydra.num_fixed) cfg_hydra.env.num_privileged_obs = cfg_hydra.env.num_observations + 3+ 2 +6 +1 + len(cfg_hydra.domain_rand.randomize_link_body_names) + 3*(30-num_fixed_joints) # env_cfg, train_cfg = task_registry.get_cfgs(name=cfg_hydra.task) # import ipdb; ipdb.set_trace() env_cfg, train_cfg = cfg_hydra, cfg_hydra.train train_cfg.seed = 123145 ##### Compare two configs. # env_cfg_, train_cfg_prev = task_registry.get_cfgs(name=cfg_hydra.task) # env_cfg_, train_cfg_prev = class_to_dict(env_cfg_), class_to_dict(train_cfg_prev) # for k, v in env_cfg_.items(): # if isinstance(v, dict): # for kk, vv in v.items(): # if not vv == env_cfg[k][kk]: # print(k, kk) # import ipdb; ipdb.set_trace() # print('...') # elif not v == env_cfg[k]: # import ipdb; ipdb.set_trace() # print('...') # override some parameters for testing # env_cfg.env.num_envs = min(env_cfg.env.num_envs, 100) # if not env_cfg.train_velocity_estimation: env_cfg.env.num_envs = 1 env_cfg.viewer.debug_viz = True env_cfg.motion.visualize = False # env_cfg.terrain.num_rows = 5 # env_cfg.terrain.num_cols = 5 env_cfg.terrain.curriculum = False env_cfg.terrain.mesh_type = 'trimesh' # env_cfg.terrain.mesh_type = 'plane' # if env_cfg.terrain.mesh_type == 'trimesh': # env_cfg.terrain.terrain_types = ['flat', 'rough', 'low_obst'] # do not duplicate! # env_cfg.terrain.terrain_proportions = [1.0, 0.0, 0.0] env_cfg.noise.add_noise = False env_cfg.domain_rand.randomize_friction = False env_cfg.domain_rand.push_robots = False env_cfg.domain_rand.randomize_base_mass = False env_cfg.env.episode_length_s = 20 env_cfg.domain_rand.randomize_rfi_lim = False env_cfg.domain_rand.randomize_pd_gain = False env_cfg.domain_rand.randomize_link_mass = False env_cfg.domain_rand.randomize_base_com = False env_cfg.domain_rand.randomize_ctrl_delay = False env_cfg.domain_rand.ctrl_delay_step_range = [1, 3] # env_cfg.asset.termination_scales.max_ref_motion_distance = 1 env_cfg.env.test = True if env_cfg.motion.realtime_vr_keypoints: env_cfg.asset.terminate_by_1time_motion = False env_cfg.asset.terminate_by_ref_motion_distance = False rospy.init_node("avppose_subscriber") from avp_pose_subscriber import AVPPoseInfo avpposeinfo = AVPPoseInfo() rospy.Subscriber("avp_pose", Float64MultiArray, avpposeinfo.avp_callback, queue_size=1) if cfg_hydra.joystick: env_cfg.commands.ranges.lin_vel_x = [0.0, 0.0] env_cfg.commands.ranges.lin_vel_y = [0.0, 0.0] env_cfg.commands.ranges.ang_vel_yaw = [0.0, 0.0] from pynput import keyboard from legged_gym.utils import key_response_fn # prepare environment env, _ = task_registry.make_env_hydra(name=cfg_hydra.task, hydra_cfg=cfg_hydra, env_cfg=env_cfg) logger = Logger(env.dt) robot_index = 0 # which robot is used for logging joint_index = 3 # which joint is used for logging stop_state_log = 200 # number of steps before plotting states stop_rew_log = env.max_episode_length + 1 # number of steps before print average episode rewards obs = env.get_observations() if env_cfg.motion.realtime_vr_keypoints: init_root_pos = env._rigid_body_pos[..., 0, :].clone() init_avp_pos = avpposeinfo.avp_pose.copy() init_root_offset = init_root_pos[0, :2] - init_avp_pos[2, :2] # import ipdb; ipdb.set_trace() # obs[:, 9:12] = torch.Tensor([0.5, 0, 0]) # load policy train_cfg.runner.resume = True ppo_runner, train_cfg = task_registry.make_alg_runner(env=env, name=cfg_hydra.task, args=cfg_hydra, train_cfg=train_cfg) policy = ppo_runner.get_inference_policy(device=env.device) exported_policy_name = str(task_registry.loaded_policy_path.split('/')[-2]) + str(task_registry.loaded_policy_path.split('/')[-1]) print('Loaded policy from: ', task_registry.loaded_policy_path) # export policy as a jit module (used to run it from C++) if EXPORT_POLICY: path = os.path.join(LEGGED_GYM_ROOT_DIR, 'logs', train_cfg.runner.experiment_name, 'exported', 'policies') export_policy_as_jit(ppo_runner.alg.actor_critic, path, exported_policy_name) print('Exported policy as jit script to: ', os.path.join(path, exported_policy_name)) if EXPORT_ONNX: exported_onnx_name = exported_policy_name.replace('.pt', '.onnx') path = os.path.join(LEGGED_GYM_ROOT_DIR, 'logs', train_cfg.runner.experiment_name, 'exported', 'policies') export_policy_as_onnx(ppo_runner.alg.actor_critic, path, exported_onnx_name, onnx_num_observations=env_cfg.env.num_observations) print('Exported policy as onnx to: ', os.path.join(path, exported_onnx_name)) if cfg_hydra.joystick: print(colored("joystick on", "green")) key_response = key_response_fn(mode='vel') def on_press(key): global command_state try: # print(key.char) key_response(key, command_state, env) except AttributeError: pass listener = keyboard.Listener(on_press=on_press) listener.start() i = 0 while (not NOROSPY and not rospy.is_shutdown()) or (NOROSPY): # for i in range(1000*int(env.max_episode_length)): # obs[:, -19:] = 0 # will destroy the performance actions = policy(obs.detach()) # print(torch.sum(torch.square(env.projected_gravity[:, :2]), dim=1)) obs, _, rews, dones, infos = env.step(actions.detach()) if env_cfg.motion.realtime_vr_keypoints: avpposeinfo.check() keypoints_pos = avpposeinfo.avp_pose.copy() keypoints_pos[:, 0] += init_root_offset[0].item() keypoints_pos[:, 1] += init_root_offset[1].item() # import ipdb; ipdb.set_trace() keypoints_vel = avpposeinfo.avp_vel.copy() print(keypoints_pos) env._update_realtime_vr_keypoints(keypoints_pos, keypoints_vel) # print("obs = ", obs) # print("actions = ", actions) # print() # exit() if override: obs[:,9] = 0.5 obs[:,10] = 0.0 obs[:,11] = 0.0 # overwrite linear velocity - z and angular velocity - xy # obs[:, 40] = 0. # obs[: 41:43] = 0. if i < stop_state_log: logger.log_states( { 'dof_pos_target': actions[robot_index, joint_index].item() * env.cfg.control.action_scale + env.default_dof_pos[robot_index, joint_index].item(), # 'dof_pos_target': env.actions[robot_index, joint_index].item() * env.cfg.control.action_scale + env.default_dof_pos[robot_index, joint_index].item(), 'dof_pos': env.dof_pos[robot_index, joint_index].item(), 'dof_vel': env.dof_vel[robot_index, joint_index].item(), 'dof_torque': env.torques[robot_index, joint_index].item(), 'base_vel_x': env.base_lin_vel[robot_index, 0].item(), 'base_vel_y': env.base_lin_vel[robot_index, 1].item(), 'base_vel_z': env.base_lin_vel[robot_index, 2].item(), 'base_vel_yaw': env.base_ang_vel[robot_index, 2].item(), 'contact_forces_z': env.contact_forces[robot_index, env.feet_indices, 2].cpu().numpy() } ) elif i==stop_state_log: logger.plot_states() if 0 < i < stop_rew_log: if infos["episode"]: num_episodes = torch.sum(env.reset_buf).item() if num_episodes>0: logger.log_rewards(infos["episode"], num_episodes) elif i==stop_rew_log: pass # logger.print_rewards() i += 1
if name == 'main':
EXPORT_POLICY = False
RECORD_FRAMES = False
MOVE_CAMERA = False
# args = get_args()
play()我想要实现遍历pkl的所有motion 如果有出现 reference motion distance > 0.5m的情况就把这个motionid记录下来怎么做
下面是一种最小可用的实现思路,假设 .pkl 里保存的是 dict
/ EasyDict
格式的运动数据,并且根节点(或躯干)在各时间步的世界坐标用键 root_pos
(常见写法)存着 (T, 3)
的 numpy 数组。如果你的字段名不同,只要把注释里标出的地方换成正确的键即可。
pythonimport os import glob import pickle import numpy as np MOTION_DIR = "/path/to/your/motions" # 1️⃣ motion 文件夹 THRESHOLD = 0.5 # 2️⃣ 判定阈值 (m) bad_motion_ids = [] # 3️⃣ 存在问题的 motion 列表 def reference_motion_distance(motion_dict): """ 计算一段 motion 的参考位移: 这里用『根节点在第一帧到最后一帧的平移距离』做示例。 如果你想用别的指标,只要改这一小段就行。 """ root_pos = motion_dict["root_pos"] # <--- 根据你的字段名修改 start, end = root_pos[0, :3], root_pos[-1, :3] return np.linalg.norm(end - start) # 欧氏距离 # 遍历目录下所有 pkl for pkl_path in glob.glob(os.path.join(MOTION_DIR, "*.pkl")): with open(pkl_path, "rb") as f: motion = pickle.load(f) dist = reference_motion_distance(motion) if dist > THRESHOLD: motion_id = os.path.basename(pkl_path) # 4️⃣ 这里把文件名当作 ID bad_motion_ids.append((motion_id, dist)) # 记录距离,方便后续排查 # 打印 / 保存结果 if bad_motion_ids: print("以下 motion 的 reference motion distance 超过 {:.2f} m:".format(THRESHOLD)) for mid, d in bad_motion_ids: print(f" {mid:30s} -> {d:.3f} m") else: print("所有 motion 的 reference motion distance 都在阈值以内 ✅")
play()
脚本check_motion_distance.py
运行即可。task_registry.make_env_hydra(...)
之前插一段类似代码,拿到 cfg_hydra.motion_files
(或你的自定义字段);good_motion_files
写回 cfg_hydra
再创建环境。需求 | 需要改动的地方 |
---|---|
距离用三维而非平面 | 改 np.linalg.norm(end - start) 不要截到 :3 |
想看 最大瞬时误差 | 在 reference_motion_distance 里遍历每一帧比较 |
记录到文件而非终端输出 | 最后把 bad_motion_ids json.dump /写 csv |
pkl 里键名不同 | 把 "root_pos" 改成你的键;或补充函数参数 |
只要保证 ① 能够读取 motion,② 明确什么是 “reference motion distance”,③ 定好阈值并遍历,就能一步定位所有异常 motion。祝调试顺利!
def _resample_motion_times(self, env_ids):
if len(env_ids) == 0:
return
# self.motion_ids[env_ids] = self._motion_lib.sample_motions(len(env_ids))
# self.motion_ids[env_ids] = torch.randint(0, self._motion_lib._num_unique_motions, (len(env_ids),), device=self.device)
# print(self.motion_ids[:10])
self.motion_len[env_ids] = self._motion_lib.get_motion_length(self.motion_ids[env_ids])
# self.env_origins_init_3Doffset[env_ids, :2] = torch_rand_float(-1., 1., (len(env_ids), 2), device=self.device) # xy position within 1m of the center
if self.cfg.env.test:
self.motion_start_times[env_ids] = 0
else:
self.motion_start_times[env_ids] = self._motion_lib.sample_time(self.motion_ids[env_ids])
# self.motion_start_times[env_ids] = self._motion_lib.sample_time(self.motion_ids[env_ids])
offset=(self.env_origins + self.env_origins_init_3Doffset)
motion_times = (self.episode_length_buf ) * self.dt + self.motion_start_times # next frames so +1
# motion_res = self._get_state_from_motionlib_cache(self.motion_ids, motion_times, offset= offset)
motion_res = self._get_state_from_motionlib_cache_trimesh(self.motion_ids, motion_times, offset= offset)
textself.ref_base_pos_init[env_ids] = motion_res["root_pos"][env_ids] self.ref_base_rot_init[env_ids] = motion_res["root_rot"][env_ids] self.ref_base_vel_init[env_ids] = motion_res["root_vel"][env_ids] self.ref_base_ang_vel_init[env_ids] = motion_res["root_ang_vel"][env_ids] 解释这个东西
下面给你逐行拆解_resample_motion_times()
的作用与关键变量,帮助你读懂这段逻辑。为了和源码对照,我保持了原行序,必要时在同一条解释里把相关几行合并讲。
pythonif len(env_ids) == 0: return
tensor/array
,保存了需要重采样的环境(sub-environment)索引。pythonself.motion_len[env_ids] = self._motion_lib.get_motion_length(self.motion_ids[env_ids])
get_motion_length()
:从 motion library 里查询该动作片段的总时长(单位:秒或帧,取决于库实现),写进 self.motion_len
,只更新传进来的 env_ids
那些。之前被注释掉的两行
python# self.motion_ids[env_ids] = self._motion_lib.sample_motions(...) # self.motion_ids[env_ids] = torch.randint(...)
说明作者一度想在这里重新抽 motion,但最终选择把 “抽哪个 motion” 放到别处做,这里只负责时间相关的采样。
python# self.env_origins_init_3Doffset[env_ids, :2] = torch_rand_float(-1., 1., (len(env_ids), 2), device=self.device)
pythonif self.cfg.env.test: self.motion_start_times[env_ids] = 0 else: self.motion_start_times[env_ids] = self._motion_lib.sample_time(self.motion_ids[env_ids])
test=True
) 时,所有 env 都从 动作第 0 帧 开始,保证复现性。sample_time()
会在 [0, motion_len) 内随机采一个实数或帧索引,把它作为“这一段动作在当前 episode 的播放起点”。pythonoffset = (self.env_origins + self.env_origins_init_3Doffset) motion_times = (self.episode_length_buf) * self.dt + self.motion_start_times # next frames so +1 # motion_res = self._get_state_from_motionlib_cache(self.motion_ids, motion_times, offset= offset) motion_res = self._get_state_from_motionlib_cache_trimesh(self.motion_ids, motion_times, offset= offset)
env_origins
与上一步的随机平移 env_origins_init_3Doffset
相加,用于把 mocap 数据从局部坐标系搬到世界坐标。episode_length_buf
记录了当前环境已经运行了多少 仿真步。dt
变成“已经过了多少秒”。motion_start_times
得到:“在动作库里对应的绝对时间点”。_get_state_from_motionlib_cache_trimesh()
_get_state_from_motionlib_cache()
类似,但多半针对 三角网格碰撞(trimesh terrain)做了优化。motion_res
字典里。pythonself.ref_base_pos_init[env_ids] = motion_res["root_pos"][env_ids] self.ref_base_rot_init[env_ids] = motion_res["root_rot"][env_ids] self.ref_base_vel_init[env_ids] = motion_res["root_vel"][env_ids] self.ref_base_ang_vel_init[env_ids] = motion_res["root_ang_vel"][env_ids]
ref_*_init
张量会在 一次 episode 的开头 保存“目标动作”在起始帧的根节点状态。_resample_motion_times()
的工作可以概括为:
这样做的好处:
offset
把动作撒到不同世界坐标,防止所有机器人互相干涉,也增大场景覆盖面。*_trimesh
版本能在崎岖地形上快速查询、避免 CPU-GPU 复制瓶颈。希望这能帮你彻底搞清楚这段代码的意图和数据流!如果还有细节想了解,随时告诉我。
legged gym training的时候是一个env就是一个motionid吗
在 Legged Gym 的模仿学习任务里,“一个 env 对应一个 motion_id” 这个说法大体成立,但要区分两件事:
概念 | 什么时候确定 | 持续多久 | 是否唯一 |
---|---|---|---|
motion_id | 在 reset() 阶段或你看到的 _resample_motion_times() 里,由 _motion_lib.sample_motions() (或同类函数)为 每个 env 随机/顺序抽取 | 整个 episode(直到下一次 reset) | 对当前 env 当下这段 episode 来说是唯一的;不同 env 可以抽到同一个 ID |
motion_start_time | 同一处代码里用 sample_time() 生成 | 同一 episode 内会一直递增(t += dt ),但不会再重新抽 | 对当前 episode 唯一 |
motion_id
),并用当前的 motion_time
去 motion lib 查询“理想姿态/速度”。motion_id
,可以重复也可以不同——完全取决于采样策略。motion_id
(或者沿用原来的,视实现而定)。因此同一个 env 在训练过程中会接触到多个 motion clip。python# 举例:reset() 里常见的逻辑 self.motion_ids[env_ids] = self._motion_lib.sample_motions(len(env_ids)) self.motion_start_times[env_ids] = self._motion_lib.sample_time(self.motion_ids[env_ids]) # 然后每个 step: motion_times = episode_length_buf * dt + motion_start_times motion_res = _get_state_from_motionlib_cache(... motion_ids, motion_times ...)
self.motion_ids
长度 = num_envs
,一一对应。_resample_motion_times()
只是更新 哪些 env 需要重抽(通常是刚 reset 的 env)。sample_motions()
里写死返回(例如 torch.arange(num_envs)
)。总结:在单次 episode 内,一个 env 只跟踪一个 motion clip(一个 motion_id);但训练过程会不断 reset,每次都有机会换新的 motion_id。所以答案是“当前 episode 来说是的,整个训练过程来说不是”。
在整个训练过程中怎么看envid对应的motionid是多少
在 Legged Gym 里,self.motion_ids
这个 (num_envs,)
张量就是 env ID → motion ID 的映射。
它会在 reset(或者你看到的 _resample_motion_times()
)里,对 刚刚重置的 env 重抽并写回,对其他 env 不变。因此,只要在「重置时」把对应值打印/记录下来,就能完整追踪整条映射随训练的演变。下面给出三种常用做法——从最轻量的 print
到把日志写入文件或 TensorBoard。
在 _resample_motion_times()
或 reset_idx()
里插一段:
python# 例如放在 self.motion_ids[...] = ... 之后 print( f"[step {int(self.progress_buf.max())}] " f"reset envs {env_ids.cpu().tolist()} -> " f"motion_ids {self.motion_ids[env_ids].cpu().tolist()}" )
效果示例:
[step 12345] reset envs [0, 17, 92] -> motion_ids [231, 77, 12]
pythonLOG_PATH = "motion_mapping_log.csv" def _log_mapping(self, env_ids): with open(LOG_PATH, "a") as f: step = int(self.progress_buf.max()) for eid in env_ids: mid = int(self.motion_ids[eid]) f.write(f"{step},{eid},{mid}\n")
然后在 reset 的地方调用 _log_mapping(self, env_ids)
。
文件格式:global_step,env_id,motion_id
——随便用 Pandas/Numpy 再分析。
pythonfrom torch.utils.tensorboard import SummaryWriter writer = SummaryWriter("runs/motion_map") def _log_mapping_tb(self, env_ids): step = int(self.progress_buf.max()) mids = self.motion_ids[env_ids].float() # TensorBoard 需要浮点 writer.add_histogram("motion_id_all_envs", self.motion_ids.float(), step) # 也可以逐个 env 作为 scalar: for eid in env_ids: writer.add_scalar(f"motion_id/env_{eid}", float(self.motion_ids[eid]), step)
这样你可以在 TensorBoard 的 “HISTOGRAM” 或 “SCALARS” 面板里实时看 motion ID 的分布/变化。
训练代码自带的 reset 通常在 env.reset_idx(env_ids)
→ _resample_motion_times(env_ids)
这一链条里,只要在最底层(确定了新 ID 后)插钩子即可。
如果你想一目了然,也可以在主循环里检测 dones
,当某些 env 完成一集后打印:
pythonjust_done = torch.nonzero(dones).squeeze(-1) if len(just_done): print(just_done, motion_ids[just_done])
env_ids
来确定是哪几个。print
对 100+ env、1k Hz 训练会刷屏,推荐只在 global_step % N == 0
或 len(env_ids)>0
时打印。rank==0
打印或写文件,避免重复日志。按照上面方法,你就能在 整个训练过程中 随时知道「某个 env 此刻对应的 motion ID」以及历史变动情况。需要更高级可视化时,把日志喂给 Matplotlib / Pandas 或用 TensorBoard 即可。祝调试顺利!
self.motion_ids[env_ids]是不是envid当前时刻的motionid
是的,self.motion_ids[env_ids]
就是 在当前这一帧(或说这一仿真 step)里,env_ids
所代表那些环境实例各自正在追踪的 motion ID。
self.motion_ids
是一个长度等于 num_envs
的张量(或数组),一直缓存着“这一集 episode 期间,每个 env 对应哪条 motion clip”。_resample_motion_times()
阶段被重置时,框架会重新为它们抽取新的 motion_id
并写回 self.motion_ids
的相应位置;在同一 episode 剩余时间里,这些值不会再变化。self.motion_ids[env_ids]
取出来的,就是这些 env “此刻仍然有效的” motion ID。如果想查看全部环境的映射,直接打印 self.motion_ids
即可。只有当环境再次触发重置,框架才会为该 env 刷新新的 motion ID。
是不是每次leggedgym训练load的motionid都是固定的
不是固定的,取决于你什么时候说“load”:
层次 | 发生时机 | 是否固定 |
---|---|---|
加载 motion library<br>(把 .pkl 读进显存 / CPU RAM) | 训练脚本一启动就一次性把 N 条动作都读进来 | 固定——除非你改 motion 数据目录,否则每次 run 都会把同一组文件加载到 _motion_lib |
为每个 env 抽 motion_id | 每次 reset() 或 _resample_motion_times() 时,对“刚 reset 的 env”调用<br>_motion_lib.sample_motions(len(env_ids)) | 不固定——默认是随机抽;但如果:<br>• 你设置了全局随机种子 train_cfg.seed → 抽样可复现<br>• 或者你把 sample_motions() 改成返回固定序列,就能让它固定 |
在一集 episode 内部 | 从 reset 到下一次 reset 之间 | 固定——motion_id 不会再变,直到该 env 重新 reset |
motion_id
,默认是随机的。train_cfg.seed
设同一个值,随机数序列(包括 motion 抽样)会复现;没有种子或种子不同,就会得到另一套 motion_id
序列。或者在python# 把 sample_motions 改成按顺序返回 def sample_motions(self, k): ids = torch.arange(self._num_unique_motions, device=self.device) return ids[:k] # 或者任何你想要的固定列表
_resample_motion_times()
里干脆把那行随机采样注释掉,只在第一次初始化时写一次 self.motion_ids
。pythonprint("当前 step:", int(self.progress_buf.max())) print("motion_id snapshot:", self.motion_ids.cpu().tolist())
连续几帧打印你会看到:同一个 env ID 的 motion_id
保持不变,直到它 done → reset → 重新抽号。
所以答案是:> 加载到库里的数据是固定的一整包;但训练过程中每个 env 在每一集用哪一条 motion(motion_id)默认是随机抽的,可通过设随机种子或自定义抽样方式让它固定。
def _resample_motion_times(self, env_ids):
if len(env_ids) == 0:
return
self.motion_ids[env_ids] = self._motion_lib.sample_motions(len(env_ids))
self.motion_ids[env_ids] = torch.randint(0, self._motion_lib._num_unique_motions, (len(env_ids),), device=self.device)
# print(self.motion_ids[:10])
self.motion_len[env_ids] = self._motion_lib.get_motion_length(self.motion_ids[env_ids])解释self.motion_ids[env_ids] = self._motion_lib.sample_motions(len(env_ids))
self.motion_ids[env_ids] = torch.randint(0, self._motion_lib._num_unique_motions, (len(env_ids),), device=self.device)
# print(self.motion_ids[:10])是什么
这三行只是给 self.motion_ids[env_ids]
赋值、再赋值、最后顺手调试输出——所以 真正起作用的只有第二行,第一行的结果立刻被覆盖掉。
pythonself.motion_ids[env_ids] = self._motion_lib.sample_motions(len(env_ids))
sample_motions(k)
k = len(env_ids)
个 motion ID(整数张量),通常用自己的采样策略:
self.motion_ids
对应的 env 槽位pythonself.motion_ids[env_ids] = torch.randint( 0, self._motion_lib._num_unique_motions, (len(env_ids),), device=self.device )
[0, _num_unique_motions-1]
里随机抽同样数量的整数self.motion_ids[env_ids]
python# print(self.motion_ids[:10])
sample_motions
) 和 均匀随机 (torch.randint
) 的效果:
sample_motions
,却忘了把那一行注释掉需求 | 建议保留哪行 |
---|---|
想用 sample_motions() (可能含权重或排除短片段) | 只留第一行 |
只是想 均匀随机 抽 motion | 只留第二行 |
想比较两者区别 | 可以打印两行结果再分析,而不是直接覆盖 |
删掉不需要的那一行,逻辑就清晰了。
一个n, 3的tensor张量 返回值小于0.5的行的index
下面给出几种常见「行筛选」写法,你可以按自己实际的“返回值”定义挑一条用。假设张量叫 X
,形状是 (n, 3)
:
想保留的行 | 代码里的布尔掩码 mask | 得到行索引 idx ¹ |
---|---|---|
(1) 行向量的 L2 范数 < 0.5 | X.norm(dim=1) < 0.5 | idx = torch.where(mask)[0] |
(2) 三个分量都 < 0.5 | (X < 0.5).all(dim=1) | 同上 |
(3) 只要有一个分量 < 0.5 | (X < 0.5).any(dim=1) | 同上 |
pythonimport torch # 示例张量 X = torch.tensor([ [0.1, 0.2, 0.3], [0.6, 0.4, 0.7], [0.8, 0.9, 0.95], ]) # ① 行 L2 范数 < 0.5 mask = X.norm(dim=1) < 0.5 idx = torch.where(mask)[0] # tensor([0]) # ② 三个分量都 < 0.5 mask_all = (X < 0.5).all(dim=1) idx_all = torch.where(mask_all)[0] # tensor([0]) # ③ 任意一个分量 < 0.5 mask_any = (X < 0.5).any(dim=1) idx_any = torch.where(mask_any)[0] # tensor([0, 1])
torch.where(mask)[0]
或 torch.nonzero(mask, as_tuple=True)[0]
都能把布尔掩码变成索引。
如果你只想直接拿布尔掩码去做切片,也可以:
pythonrows = X[mask] # 把满足条件的行取出来
¹ 返回的是 1-D 索引张量,和 range(n)
中对应行一一对照。
一个1,n, 3的tensor张量 返回3那一维度值小于0.5的行的index
下面假设你的张量叫 X
,形状是 (1, n, 3)
。
你想找出 最后那个 3‑维向量中的第 3 个分量(Python 下标 2)小于 0.5 的行在 n 这一维上的索引。
pythonimport torch # X 的形状: (1, n, 3) # 例子: # X = torch.randn(1, n, 3) # 1️⃣ 先把 batch 维度 (dim=0) 去掉或直接索引到它 third_component = X[0, :, 2] # 形状: (n,) # 2️⃣ 构造布尔掩码 —— 哪些行的第 3 分量 < 0.5 mask = third_component < 0.5 # 形状: (n,) # 3️⃣ 把掩码转成行索引 row_idx = torch.where(mask)[0] # 1‑D 张量,记录满足条件的行号 print(row_idx) # e.g. tensor([0, 4, 7])
X[0, :, 2]
:
0
选出 batch 中的第一个样本;:
选出 n 行;2
取每行向量的第 3 个元素。torch.where(mask)[0]
或torch.nonzero(mask, as_tuple=True)[0]
都能把布尔掩码转成索引。- 得到的
row_idx
即 “在第 1 个 batch 里,那些第 3 分量 < 0.5 的行号**”。
如果你的张量其实是 (batch, n, 3)
而不只 1 个 batch,把 0
换成相应的 batch 索引,或写成批量操作:
pythonmask = X[..., 2] < 0.5 # 形状: (batch, n) row_idx_each_batch = [torch.where(m)[0] for m in mask]
这样就能拿到每个 batch 的行索引列表。
一个n,2, 3的tensor张量 返回3那一维度值小于0.5的n行的index
下面假设张量名为 X
,形状是 (n, 2, 3)
:
需求 | 布尔掩码 mask | 得到行索引 row_idx |
---|---|---|
行里任意一个 (2 × 3 里) 第 3 个分量 < 0.5 | (X[:, :, 2] < 0.5).any(dim=1) | torch.where(mask)[0] |
行里两个 (2 × 3 里) 第 3 个分量都 < 0.5 | (X[:, :, 2] < 0.5).all(dim=1) | 同上 |
pythonimport torch # X: (n, 2, 3) # ① 取出 “第 3 个分量”(下标 2) third_comp = X[:, :, 2] # 形状 (n, 2) # ②‑a 只要任意一个 < 0.5 mask_any = (third_comp < 0.5).any(dim=1) # (n,) # ②‑b 两个都 < 0.5(如果需要) mask_all = (third_comp < 0.5).all(dim=1) # (n,) # ③ 变成行索引 row_idx_any = torch.where(mask_any)[0] # 任意满足 row_idx_all = torch.where(mask_all)[0] # 全部满足
row_idx_any
/ row_idx_all
是一维张量,列出 第 n 维(行)上满足条件的索引。rows = X[mask_any]
。根据你要的是 “只要有一个 < 0.5” 还是 “两个都 < 0.5”,选对应的掩码即可。