-
Notifications
You must be signed in to change notification settings - Fork 0
/
insert_peg.py
207 lines (175 loc) · 8.37 KB
/
insert_peg.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
import collections
from dm_control import mujoco
from dm_control.rl import control
from dm_control.suite import base
from dm_control.suite import common
from dm_control.utils import containers
from dm_control.utils import rewards
from dm_control.utils import xml_tools
from dm_control import viewer
import common
from lxml import etree
import numpy as np
_CLOSE = .01 # (Meters) Distance below which a thing is considered close.
_CONTROL_TIMESTEP = .01 # (Seconds)
_TIME_LIMIT = 20 # (Seconds)
_P_IN_HAND = .1 # Probabillity of object-in-hand initial state
_P_IN_TARGET = .1 # Probabillity of object-in-target initial state
_ARM_JOINTS = ['arm_root', 'arm_shoulder', 'arm_wrist', ]
class Physics(mujoco.Physics):
"""Physics with additional features for the Planar Manipulator domain."""
def bounded_joint_pos(self, joint_names):
"""Returns joint positions as (sin, cos) values."""
joint_pos = self.named.data.qpos[joint_names]
return np.array(joint_pos)
def joint_vel(self, joint_names):
"""Returns joint velocities."""
return self.named.data.qvel[joint_names]
def body_2d_pose(self, body_names, orientation=True):
"""Returns positions and/or orientations of bodies."""
if not isinstance(body_names, str):
body_names = np.array(body_names).reshape(-1, 1) # Broadcast indices.
pos = self.named.data.xpos[body_names, ['x', 'z']]
if orientation:
ori = self.named.data.xquat[body_names, ['qw', 'qy']]
return np.hstack([pos, ori])
else:
return pos
def force_torque(self):
return self.named.data.sensordata['peg_force'] # contact force
def site_distance(self, site1, site2):
site1_to_site2 = np.diff(self.named.data.site_xpos[[site2, site1]], axis=0)
return np.linalg.norm(site1_to_site2)
class Bring(base.Task):
"""A Bring `Task`: bring the prop to the target."""
def __init__(self, use_peg, insert, fully_observable, random=None):
"""Initialize an instance of the `Bring` task.
Args:
use_peg: A `bool`, whether to replace the ball prop with the peg prop.
insert: A `bool`, whether to insert the prop in a receptacle.
fully_observable: A `bool`, whether the observation should contain the
position and velocity of the object being manipulated and the target
location.
random: Optional, either a `numpy.random.RandomState` instance, an
integer seed for creating a new `RandomState`, or None to select a seed
automatically (default).
"""
self._use_peg = use_peg
self._target = 'target_peg' if use_peg else 'target_ball'
self._object = 'peg' if self._use_peg else 'ball'
self._object_joints = ['_'.join([self._object, dim]) for dim in 'xzy']
self._receptacle = 'slot' if self._use_peg else 'cup'
self._insert = insert
self._fully_observable = fully_observable
super().__init__(random=random)
def initialize_episode(self, physics):
"""Sets the state of the environment at the start of each episode."""
# Local aliases
choice = self.random.choice
uniform = self.random.uniform
model = physics.named.model
data = physics.named.data
# Find a collision-free random initial configuration.
penetrating = True
while penetrating:
# Randomise angles of arm joints.
is_limited = model.jnt_limited[_ARM_JOINTS].astype(bool)
joint_range = model.jnt_range[_ARM_JOINTS]
lower_limits = np.where(is_limited, joint_range[:, 0], -np.pi)
upper_limits = np.where(is_limited, joint_range[:, 1], np.pi)
# angles = uniform(lower_limits, upper_limits)
data.qpos[_ARM_JOINTS] = np.array([-1, -1, -1])
# Symmetrize hand.
# data.qpos['finger'] = data.qpos['thumb']
# Randomise target location.
# target_x = uniform(-.4, .4)
# target_z = uniform(.1, .4)
# if self._insert:
# target_angle = uniform(-np.pi / 3, np.pi / 3)
# model.body_pos[self._receptacle, ['x', 'z']] = target_x, target_z
# model.body_quat[self._receptacle, ['qw', 'qy']] = [
# np.cos(target_angle / 2), np.sin(target_angle / 2)]
# else:
# target_angle = uniform(-np.pi, np.pi)
# model.body_pos[self._target, ['x', 'z']] = target_x, target_z
# model.body_quat[self._target, ['qw', 'qy']] = [np.cos(target_angle / 2), np.sin(target_angle / 2)]
# Randomise object location.
# object_init_probs = [0, 1, 0]
# init_type = choice(['in_hand', 'in_target', 'uniform'],
# p=object_init_probs)
# if init_type == 'in_target':
# object_x = target_x
# object_z = target_z
# object_angle = target_angle
# elif init_type == 'in_hand':
# physics.after_reset()
# object_x = data.site_xpos['grasp', 'x']
# object_z = data.site_xpos['grasp', 'z']
# grasp_direction = data.site_xmat['grasp', ['xx', 'zx']]
# object_angle = np.pi - np.arctan2(grasp_direction[1], grasp_direction[0])
# else:
# object_x = uniform(-.5, .5)
# object_z = uniform(0, .7)
# object_angle = uniform(0, 2 * np.pi)
# data.qvel[self._object + '_x'] = uniform(-5, 5)
#
# data.qpos[self._object_joints] = object_x, object_z, object_angle
# Check for collisions.
physics.after_reset()
penetrating = physics.data.ncon > 0
super().initialize_episode(physics)
def get_observation(self, physics):
"""Returns either features or only sensors (to be used with pixels)."""
obs = collections.OrderedDict()
obs['arm_pos'] = physics.bounded_joint_pos(_ARM_JOINTS)
obs['arm_vel'] = physics.joint_vel(_ARM_JOINTS)
obs['force_torque'] = physics.force_torque()
return obs
def _is_close(self, distance):
return rewards.tolerance(distance, (0, _CLOSE), _CLOSE * 2)
def _peg_reward(self, physics):
"""Returns a reward for bringing the peg prop to the target."""
# grasp = self._is_close(physics.site_distance('peg_grasp', 'grasp'))
# pinch = self._is_close(physics.site_distance('peg_pinch', 'pinch'))
# grasping = (grasp + pinch) / 2
grasping = 1
bring = self._is_close(physics.site_distance('peg', 'target_peg'))
bring_tip = self._is_close(physics.site_distance('target_peg_tip',
'peg_tip'))
bringing = (bring + bring_tip) / 2
return max(bringing, grasping / 3)
def _ball_reward(self, physics):
"""Returns a reward for bringing the ball prop to the target."""
return self._is_close(physics.site_distance('ball', 'target_ball'))
def get_reward(self, physics):
"""Returns a reward to the agent."""
return 1
if self._use_peg:
return self._peg_reward(physics)
else:
return self._ball_reward(physics)
def make_model():
"""Returns a tuple containing the model XML string and a dict of assets."""
xml_string = common.read_model('insert_peg.xml')
parser = etree.XMLParser(remove_blank_text=True)
mjcf = etree.XML(xml_string, parser)
return etree.tostring(mjcf, pretty_print=True), common.ASSETS
def insert_peg(fully_observable=True, time_limit=_TIME_LIMIT, random=None,
environment_kwargs=None):
"""Returns manipulator insert task with the peg prop."""
use_peg = True
insert = True
physics = Physics.from_xml_string(*make_model())
task = Bring(use_peg=use_peg, insert=insert,
fully_observable=fully_observable, random=random)
environment_kwargs = environment_kwargs or {}
return control.Environment(
physics, task, control_timestep=_CONTROL_TIMESTEP, time_limit=time_limit,
**environment_kwargs)
if __name__ == "__main__":
def random_policy(time_step):
# return np.random.uniform(spec.minimum, spec.maximum, spec.shape)
return np.array([0.0, 0, 0])
env = insert_peg()
spec = env.action_spec() # 3
viewer.launch(env, policy=random_policy)