From 592d9ee996bbdbd63c8ff34e3850405ee3e5bb72 Mon Sep 17 00:00:00 2001 From: Benjamin Bolte Date: Sun, 31 Mar 2024 21:18:00 -0700 Subject: [PATCH] stuff --- Makefile | 5 + pyproject.toml | 12 +- sim/env.py | 8 ++ sim/logging.py | 315 +++++++++++++++++++++++++++++++++++++++++++++++++ sim/ref.py | 191 ++++++++++++++++++++++++++++++ sim/stompy.py | 85 +++++++++++++ 6 files changed, 612 insertions(+), 4 deletions(-) create mode 100644 sim/env.py create mode 100644 sim/logging.py create mode 100644 sim/ref.py create mode 100644 sim/stompy.py diff --git a/Makefile b/Makefile index c5d711da..023cb857 100644 --- a/Makefile +++ b/Makefile @@ -34,6 +34,11 @@ install-dev: @pip install --verbose -e '.[dev]' .PHONY: install +install-third-party: + @git submodule update --init --recursive + @cd third_party/isaacgym/python/ && pip install --verbose -e . + @cd third_party/humanoid-gym && pip install --verbose -e . + build-ext: @python setup.py build_ext --inplace .PHONY: build-ext diff --git a/pyproject.toml b/pyproject.toml index fa525f9e..f3aa14d5 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -30,10 +30,14 @@ warn_redundant_casts = true incremental = true namespace_packages = false -# Uncomment to exclude modules from Mypy. -# [[tool.mypy.overrides]] -# module = [] -# ignore_missing_imports = true +[[tool.mypy.overrides]] + +module = [ + "isaacgym.*", + "humanoid.*", +] + +ignore_missing_imports = true [tool.isort] diff --git a/sim/env.py b/sim/env.py new file mode 100644 index 00000000..2585f6fe --- /dev/null +++ b/sim/env.py @@ -0,0 +1,8 @@ +"""Defines common environment parameters.""" + +import os +from pathlib import Path + + +def model_dir() -> Path: + return Path(os.environ.get("MODEL_DIR", "models")) diff --git a/sim/logging.py b/sim/logging.py new file mode 100644 index 00000000..441e8c75 --- /dev/null +++ b/sim/logging.py @@ -0,0 +1,315 @@ +"""Defines utility functions for enabling logging.""" + +import datetime +import logging +import re +import sys +from typing import Literal + +# Show as a transient message. +LOG_PING: int = logging.INFO + 2 + +# Show as a persistent status message. +LOG_STATUS: int = logging.INFO + 3 + +RESET_SEQ = "\033[0m" +REG_COLOR_SEQ = "\033[%dm" +BOLD_COLOR_SEQ = "\033[1;%dm" +BOLD_SEQ = "\033[1m" + +Color = Literal[ + "black", + "red", + "green", + "yellow", + "blue", + "magenta", + "cyan", + "white", + "grey", + "light-red", + "light-green", + "light-yellow", + "light-blue", + "light-magenta", + "light-cyan", +] + +COLOR_INDEX: dict[Color, int] = { + "black": 30, + "red": 31, + "green": 32, + "yellow": 33, + "blue": 34, + "magenta": 35, + "cyan": 36, + "white": 37, + "grey": 90, + "light-red": 91, + "light-green": 92, + "light-yellow": 93, + "light-blue": 94, + "light-magenta": 95, + "light-cyan": 96, +} + + +def color_parts(color: Color, bold: bool = False) -> tuple[str, str]: + if bold: + return BOLD_COLOR_SEQ % COLOR_INDEX[color], RESET_SEQ + return REG_COLOR_SEQ % COLOR_INDEX[color], RESET_SEQ + + +def uncolored(s: str) -> str: + return re.sub(r"\033\[[\d;]+m", "", s) + + +def colored(s: str, color: Color | None = None, bold: bool = False) -> str: + if color is None: + return s + start, end = color_parts(color, bold=bold) + return start + s + end + + +def wrapped( + s: str, + length: int | None = None, + space: str = " ", + spaces: str | re.Pattern = r" ", + newlines: str | re.Pattern = r"[\n\r]", + too_long_suffix: str = "...", +) -> list[str]: + strings = [] + lines = re.split(newlines, s.strip(), flags=re.MULTILINE | re.UNICODE) + for line in lines: + cur_string = [] + cur_length = 0 + for part in re.split(spaces, line.strip(), flags=re.MULTILINE | re.UNICODE): + if length is None: + cur_string.append(part) + cur_length += len(space) + len(part) + else: + if len(part) > length: + part = part[: length - len(too_long_suffix)] + too_long_suffix + if cur_length + len(part) > length: + strings.append(space.join(cur_string)) + cur_string = [part] + cur_length = len(part) + else: + cur_string.append(part) + cur_length += len(space) + len(part) + if cur_length > 0: + strings.append(space.join(cur_string)) + return strings + + +def outlined( + s: str, + inner: Color | None = None, + side: Color | None = None, + bold: bool = False, + max_length: int | None = None, + space: str = " ", + spaces: str | re.Pattern = r" ", + newlines: str | re.Pattern = r"[\n\r]", +) -> str: + strs = wrapped(uncolored(s), max_length, space, spaces, newlines) + max_len = max(len(s) for s in strs) + strs = [f"{s}{' ' * (max_len - len(s))}" for s in strs] + strs = [colored(s, inner, bold=bold) for s in strs] + strs_with_sides = [f"{colored('│', side)} {s} {colored('│', side)}" for s in strs] + top = colored("┌─" + "─" * max_len + "─┐", side) + bottom = colored("└─" + "─" * max_len + "─┘", side) + return "\n".join([top] + strs_with_sides + [bottom]) + + +def show_info(s: str, important: bool = False) -> None: + if important: + s = outlined(s, inner="light-cyan", side="cyan", bold=True) + else: + s = colored(s, "light-cyan", bold=False) + sys.stdout.write(s) + sys.stdout.write("\n") + sys.stdout.flush() + + +def show_error(s: str, important: bool = False) -> None: + if important: + s = outlined(s, inner="light-red", side="red", bold=True) + else: + s = colored(s, "light-red", bold=False) + sys.stdout.write(s) + sys.stdout.write("\n") + sys.stdout.flush() + + +def show_warning(s: str, important: bool = False) -> None: + if important: + s = outlined(s, inner="light-yellow", side="yellow", bold=True) + else: + s = colored(s, "light-yellow", bold=False) + sys.stdout.write(s) + sys.stdout.write("\n") + sys.stdout.flush() + + +def format_timedelta(timedelta: datetime.timedelta, short: bool = False) -> str: + """Formats a delta time to human-readable format. + + Args: + timedelta: The delta to format + short: If set, uses a shorter format + + Returns: + The human-readable time delta + """ + parts = [] + if timedelta.days > 0: + if short: + parts += [f"{timedelta.days}d"] + else: + parts += [f"{timedelta.days} day" if timedelta.days == 1 else f"{timedelta.days} days"] + + seconds = timedelta.seconds + + if seconds > 60 * 60: + hours, seconds = seconds // (60 * 60), seconds % (60 * 60) + if short: + parts += [f"{hours}h"] + else: + parts += [f"{hours} hour" if hours == 1 else f"{hours} hours"] + + if seconds > 60: + minutes, seconds = seconds // 60, seconds % 60 + if short: + parts += [f"{minutes}m"] + else: + parts += [f"{minutes} minute" if minutes == 1 else f"{minutes} minutes"] + + if short: + parts += [f"{seconds}s"] + else: + parts += [f"{seconds} second" if seconds == 1 else f"{seconds} seconds"] + + return ", ".join(parts) + + +def format_datetime(dt: datetime.datetime) -> str: + """Formats a datetime to human-readable format. + + Args: + dt: The datetime to format + + Returns: + The human-readable datetime + """ + return dt.strftime("%Y-%m-%d %H:%M:%S") + + +def camelcase_to_snakecase(s: str) -> str: + return re.sub(r"([a-z0-9])([A-Z])", r"\1_\2", s).lower() + + +def snakecase_to_camelcase(s: str) -> str: + return "".join(word.title() for word in s.split("_")) + + +def highlight_exception_message(s: str) -> str: + s = re.sub(r"(\w+Error)", r"\033[1;31m\1\033[0m", s) + s = re.sub(r"(\w+Exception)", r"\033[1;31m\1\033[0m", s) + s = re.sub(r"(\w+Warning)", r"\033[1;33m\1\033[0m", s) + s = re.sub(r"\^+", r"\033[1;35m\g<0>\033[0m", s) + s = re.sub(r"File \"(.+?)\"", r'File "\033[36m\1\033[0m"', s) + return s + + +def is_interactive_session() -> bool: + return hasattr(sys, "ps1") or hasattr(sys, "ps2") or sys.stdout.isatty() or sys.stderr.isatty() + + +class ColoredFormatter(logging.Formatter): + """Defines a custom formatter for displaying logs.""" + + RESET_SEQ = "\033[0m" + COLOR_SEQ = "\033[1;%dm" + BOLD_SEQ = "\033[1m" + + COLORS: dict[str, Color] = { + "WARNING": "yellow", + "INFO": "cyan", + "DEBUG": "grey", + "CRITICAL": "yellow", + "FATAL": "red", + "ERROR": "red", + "STATUS": "green", + "PING": "magenta", + } + + def __init__( + self, + *, + prefix: str | None = None, + use_color: bool = True, + ) -> None: + asc_start, asc_end = color_parts("grey") + name_start, name_end = color_parts("blue", bold=True) + + message_pre = [ + "{levelname:^19s}", + asc_start, + "{asctime}", + asc_end, + " [", + name_start, + "{name}", + name_end, + "]", + ] + message_post = [" {message}"] + + if prefix is not None: + message_pre += [" ", colored(prefix, "magenta", bold=True)] + + message = "".join(message_pre + message_post) + + super().__init__(message, style="{", datefmt="%Y-%m-%d %H:%M:%S") + + self.use_color = use_color + + def format(self, record: logging.LogRecord) -> str: + levelname = record.levelname + if record.levelname and self.use_color and levelname in self.COLORS: + record.levelname = colored(record.levelname, self.COLORS[levelname], bold=True) + return logging.Formatter.format(self, record) + + +def configure_logging(prefix: str | None = None, level: int = logging.INFO) -> None: + """Instantiates logging. + + This captures logs and reroutes them to the Toasts module, which is + pretty similar to Python logging except that the API is a lot easier to + interact with. + + Args: + prefix: An optional prefix to add to the logger + level: The logging level + """ + root_logger = logging.getLogger() + + # Adds new level names. + logging.addLevelName(LOG_PING, "PING") + logging.addLevelName(LOG_STATUS, "STATUS") + + # Captures warnings from the warnings module. + logging.captureWarnings(True) + + # Clears all existing handlers. + for handler in root_logger.handlers[:]: + root_logger.removeHandler(handler) + + # Adds new handler. + stream_handler = logging.StreamHandler(sys.stdout) + stream_handler.setFormatter(ColoredFormatter(prefix=prefix)) + root_logger.addHandler(stream_handler) + + root_logger.setLevel(level) diff --git a/sim/ref.py b/sim/ref.py new file mode 100644 index 00000000..b87578f2 --- /dev/null +++ b/sim/ref.py @@ -0,0 +1,191 @@ +"""DOF control methods example +--------------------------- +An example that demonstrates various DOF control methods: +- Load cartpole asset from an urdf +- Get/set DOF properties +- Set DOF position and velocity targets +- Get DOF positions +- Apply DOF efforts + +Copyright KScale Labs. +""" + +import torch +from isaacgym import gymapi, gymutil + +# initialize gym +gym = gymapi.acquire_gym() + +# parse arguments +args = gymutil.parse_arguments(description="Joint control Methods Example") + +# create a simulator +sim_params = gymapi.SimParams() +sim_params.substeps = 2 +sim_params.dt = 1.0 / 60.0 + +sim_params.physx.solver_type = 1 +sim_params.physx.num_position_iterations = 4 +sim_params.physx.num_velocity_iterations = 1 + +sim_params.physx.num_threads = args.num_threads +sim_params.physx.use_gpu = args.use_gpu + +sim_params.use_gpu_pipeline = False +if args.use_gpu_pipeline: + print("WARNING: Forcing CPU pipeline.") + +# pfb30 +# sim_params.gravity = gymapi.Vec3(0.0, -10.0, 0.0) + +sim = gym.create_sim(args.compute_device_id, args.graphics_device_id, args.physics_engine, sim_params) + +if sim is None: + print("*** Failed to create sim") + quit() + +# create viewer using the default camera properties +viewer = gym.create_viewer(sim, gymapi.CameraProperties()) +if viewer is None: + raise ValueError("*** Failed to create viewer") + +# add ground plane +plane_params = gymapi.PlaneParams() +gym.add_ground(sim, gymapi.PlaneParams()) + +# set up the env grid +num_envs = 1 +spacing = 1.5 +env_lower = gymapi.Vec3(-spacing, 0.0, -spacing) +env_upper = gymapi.Vec3(spacing, 0.0, spacing) + + +# add cartpole urdf asset +asset_root = "../sim-integration/humanoid-gym/resources/robots/XBot/" +asset_file = "urdf_whole_body/XBot-L_example.urdf" +# asset_file = "urdf/XBot-L.urdf" + +asset_root = "../sim-integration/humanoid-gym/resources/robot_new4/" +asset_file = "legs.urdf" + +asset_root = "../sim-integration/humanoid-gym/resources/test_onshape/" +asset_file = "test.urdf" +print(asset_file) + +# Load asset with default control type of position for all joints +asset_options = gymapi.AssetOptions() +# pfb30 - set mode pose +asset_options.default_dof_drive_mode = gymapi.DOF_MODE_POS +print("Loading asset '%s' from '%s'" % (asset_file, asset_root)) +robot_asset = gym.load_asset(sim, asset_root, asset_file, asset_options) + + +# initial root pose for cartpole actors +initial_pose = gymapi.Transform() +initial_pose.p = gymapi.Vec3(0.0, 9.8, 0.0) +# initial_pose.r = gymapi.Quat(-0.717107, 0.0, 0.0, 0.717107) +# initial_pose.r = gymapi.Quat(0.7109, 0.7033, 0,0 ) +# [0.0000, 0.7033, 0.0000, 0.7109] +# Create environment 1 +env1 = gym.create_env(sim, env_lower, env_upper, 1) + +robot1 = gym.create_actor(env1, robot_asset, initial_pose, "robot1") + + +# gym.set_asset_rigid_shape_properties(robot_asset, rigid_shape_props) +body_props = gym.get_actor_rigid_body_properties(env1, robot1) + +# print(f"First", body_props[0].mass) +# mass = 0 +# for ii, body in enumerate(body_props): +# body.mass = abs(body.mass)*0.01 +# mass += body.mass +# print(ii, body.mass) +# print(f"First again", body_props[0].mass) +# print("total mass", mass) +# breakpoint() + +# Configure DOF properties +props = gym.get_actor_dof_properties(env1, robot1) + +# props["driveMode"] = (gymapi.DOF_MODE_EFFORT) +# # Required to work with dof effort +# props["damping"].fill(0.0) +# props["stiffness"].fill(0.0) + +# for pos control stiffness high low damping +props["driveMode"] = gymapi.DOF_MODE_POS +props["stiffness"].fill(200.0) +props["damping"].fill(10.0) +props["armature"].fill(0.0001) +gym.set_actor_dof_properties(env1, robot1, props) + + +# # I. test the default position +# for ii in range(props.shape[0]): +# gym.set_dof_target_position(env1, ii, 0) + +# # II. test setting the position +left_knee_joint = gym.find_actor_dof_handle(env1, robot1, "left_knee_joint") +right_knee_joint = gym.find_actor_dof_handle(env1, robot1, "right_knee_joint") +right_shoulder_pitch_joint = gym.find_actor_dof_handle(env1, robot1, "right_shoulder_pitch_joint") +left_shoulder_pitch_joint = gym.find_actor_dof_handle(env1, robot1, "left_shoulder_pitch_joint") +right_elbow_pitch_joint = gym.find_actor_dof_handle(env1, robot1, "right_elbow_pitch_joint") +left_elbow_pitch_joint = gym.find_actor_dof_handle(env1, robot1, "left_elbow_pitch_joint") + +# gym.set_dof_target_position(env1, left_knee_joint, 1.1) +# gym.set_dof_target_position(env1, right_knee_joint, -1.1) +# gym.set_dof_target_position(env1, right_shoulder_pitch_joint, -1.4) +# gym.set_dof_target_position(env1, left_shoulder_pitch_joint, 1.4) +# gym.set_dof_target_position(env1, right_elbow_pitch_joint, -2.) +# gym.set_dof_target_position(env1, left_elbow_pitch_joint, 2.) + +# Look at the first env +cam_pos = gymapi.Vec3(8, 4, 1.5) +cam_target = gymapi.Vec3(0, 2, 1.5) +gym.viewer_camera_look_at(viewer, None, cam_pos, cam_target) + + +# Simulate +while not gym.query_viewer_has_closed(viewer): + + # step the physics + gym.simulate(sim) + gym.fetch_results(sim, True) + + # update the viewer + gym.step_graphics(sim) + gym.draw_viewer(viewer, sim, True) + + # # Update env 1: make the robot always bend the knees + # Set DOF drive targets + # breakpoint() + # torques = torch.zeros(props["damping"].shape) + # gym.set_dof_actuation_force_tensor(env1, torques) + + # Apply dof effort + # gym.apply_dof_effort(env1, right_knee_joint, 20) + # gym.apply_dof_effort(env1, left_knee_joint, -20) + + # Set position + + # Position 1 elbows: + states = torch.zeros(props.shape[0]) + + # gym.set_dof_state_tensor(env1, states) + + # gym.set_actor_dof_position_targets(env1, right_elbow_pitch_joint, 2.1) + # gym.set_actor_dof_position_targets(env1, left_elbow_pitch_joint, -2.1) + # Get positions + pos = gym.get_dof_position(env1, right_shoulder_pitch_joint) + pos2 = gym.get_dof_position(env1, left_shoulder_pitch_joint) + # print(pos, pos2) + + # Wait for dt to elapse in real time. + # This synchronizes the physics simulation with the rendering rate. + gym.sync_frame_time(sim) + +print("Done") + +gym.destroy_viewer(viewer) +gym.destroy_sim(sim) diff --git a/sim/stompy.py b/sim/stompy.py new file mode 100644 index 00000000..6695adca --- /dev/null +++ b/sim/stompy.py @@ -0,0 +1,85 @@ +"""Defines training code for Stompy.""" + +import logging +import warnings +from pathlib import Path +from typing import Any + +from isaacgym import gymapi, gymutil + +from sim.env import model_dir + +logger = logging.getLogger(__name__) + +Gym = Any + + +def get_stompy_path() -> Path: + urdf_path = model_dir() / "robots" / "stompy" / "robot.urdf" + if not urdf_path.exists(): + raise FileNotFoundError(f"URDF file not found: {urdf_path}") + return urdf_path + + +def load_gym() -> Gym: + # Initialize gym. + gym = gymapi.acquire_gym() + + # Parse arguments. + args = gymutil.parse_arguments(description="Joint control methods") + + # Sets the simulation parameters. + sim_params = gymapi.SimParams() + sim_params.substeps = 2 + sim_params.dt = 1.0 / 60.0 + + sim_params.physx.solver_type = 1 + sim_params.physx.num_position_iterations = 4 + sim_params.physx.num_velocity_iterations = 1 + + sim_params.physx.num_threads = args.num_threads + sim_params.physx.use_gpu = args.use_gpu + + sim_params.use_gpu_pipeline = False + if args.use_gpu_pipeline: + warnings.warn("Forcing CPU pipeline.") + + # Creates the simulation. + sim = gym.create_sim( + args.compute_device_id, + args.graphics_device_id, + args.physics_engine, + sim_params, + ) + if sim is None: + raise RuntimeError("Failed to create sim") + + # Creates a viewer. + viewer = gym.create_viewer(sim, gymapi.CameraProperties()) + if viewer is None: + raise RuntimeError("Failed to create viewer") + + # Add ground plane. + plane_params = gymapi.PlaneParams() + gym.add_ground(sim, plane_params) + + # Set up the environment grid. + num_envs = 1 + spacing = 1.5 + env_lower = gymapi.Vec3(-spacing, 0.0, -spacing) + env_upper = gymapi.Vec3(spacing, 0.0, spacing) + + env = gym.create_env(sim, env_lower, env_upper, num_envs) + + return gym + + +def main() -> None: + args = gymutil.parse_arguments(description="Joint control methods") + + stompy_path = get_stompy_path() + + +if __name__ == "__main__": + # python -m sim.stompy + main()