Skip to content

Commit

Permalink
[update] 1. Data recording using multithreading (.json not yet implem…
Browse files Browse the repository at this point in the history
…ented) 2. Optimize rerun (not fully implemented yet).
  • Loading branch information
silencht committed Dec 12, 2024
1 parent f7bdf9c commit b56711f
Show file tree
Hide file tree
Showing 7 changed files with 186 additions and 123 deletions.
3 changes: 1 addition & 2 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -19,5 +19,4 @@ __MACOSX/
!real_right_hand.jpg
!wrist_and_ring_mount.png
teleop/data/
teleop/utils/episode_0*
teleop/utils/data/episode_0*
episode_0*
4 changes: 2 additions & 2 deletions teleop/open_television/tv_wrapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -139,8 +139,8 @@ def get_data(self):
head_rmat = head_mat[:3, :3]
# The origin of the coordinate for IK Solve is the WAIST joint motor. You can use teleop/robot_control/robot_arm_ik.py Unit_Test to check it.
# The origin of the coordinate of unitree_left_wrist is HEAD. So it is necessary to translate the origin of unitree_left_wrist from HEAD to WAIST.
unitree_left_wrist[0, 3] +=0.20
unitree_right_wrist[0,3] +=0.20
unitree_left_wrist[0, 3] +=0.15
unitree_right_wrist[0,3] +=0.15
unitree_left_wrist[2, 3] +=0.45
unitree_right_wrist[2,3] +=0.45

Expand Down
2 changes: 1 addition & 1 deletion teleop/robot_control/robot_arm.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ def __init__(self):
self.q_target = np.zeros(14)
self.tauff_target = np.zeros(14)

self.kp_high = 100.0
self.kp_high = 300.0
self.kd_high = 3.0
self.kp_low = 80.0
self.kd_low = 3.0
Expand Down
21 changes: 15 additions & 6 deletions teleop/robot_control/robot_hand_unitree.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
parent2_dir = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
sys.path.append(parent2_dir)
from teleop.robot_control.hand_retargeting import HandRetargeting, HandType
from teleop.utils.weighted_moving_filter import WeightedMovingFilter


unitree_tip_indices = [4, 9, 14] # [thumb, index, middle] in OpenXR
Expand Down Expand Up @@ -228,7 +229,7 @@ class Dex3_1_Right_JointIndex(IntEnum):

class Gripper_Controller:
def __init__(self, left_hand_array, right_hand_array, dual_gripper_data_lock = None, dual_gripper_state_out = None, dual_gripper_action_out = None,
fps = 250.0, Unit_Test = False):
filter = True, fps = 200.0, Unit_Test = False):
"""
[note] A *_array type parameter requires using a multiprocessing Array, because it needs to be passed to the internal child process
Expand All @@ -251,6 +252,10 @@ def __init__(self, left_hand_array, right_hand_array, dual_gripper_data_lock = N

self.fps = fps
self.Unit_Test = Unit_Test
if filter:
self.smooth_filter = WeightedMovingFilter(np.array([0.5, 0.3, 0.2]), Gripper_Num_Motors)
else:
self.smooth_filter = None

if self.Unit_Test:
ChannelFactoryInitialize(0)
Expand Down Expand Up @@ -303,8 +308,8 @@ def control_thread(self, left_hand_array, right_hand_array, dual_gripper_state_i
self.running = True

DELTA_GRIPPER_CMD = 0.18 # The motor rotates 5.4 radians, the clamping jaw slide open 9 cm, so 0.6 rad <==> 1 cm, 0.18 rad <==> 3 mm
THUMB_INDEX_DISTANCE_MIN = 0.030 # Assuming a minimum Euclidean distance is 3 cm between thumb and index.
THUMB_INDEX_DISTANCE_MAX = 0.120 # Assuming a maximum Euclidean distance is 12 cm between thumb and index.
THUMB_INDEX_DISTANCE_MIN = 0.05 # Assuming a minimum Euclidean distance is 5 cm between thumb and index.
THUMB_INDEX_DISTANCE_MAX = 0.07 # Assuming a maximum Euclidean distance is 9 cm between thumb and index.
LEFT_MAPPED_MIN = 0.0 # The minimum initial motor position when the gripper closes at startup.
RIGHT_MAPPED_MIN = 0.0 # The minimum initial motor position when the gripper closes at startup.
# The maximum initial motor position when the gripper closes before calibration (with the rail stroke calculated as 0.6 cm/rad * 9 rad = 5.4 cm).
Expand All @@ -315,7 +320,7 @@ def control_thread(self, left_hand_array, right_hand_array, dual_gripper_state_i

dq = 0.0
tau = 0.0
kp = 5.0
kp = 5.00
kd = 0.05
# initialize gripper cmd msg
self.gripper_msg = MotorCmds_()
Expand All @@ -335,11 +340,11 @@ def control_thread(self, left_hand_array, right_hand_array, dual_gripper_state_i

if not np.all(left_hand_mat == 0.0): # if hand data has been initialized.
left_euclidean_distance = np.linalg.norm(left_hand_mat[unitree_gripper_indices[1]] - left_hand_mat[unitree_gripper_indices[0]])
right_euclidean_distance = np.linalg.norm(right_hand_mat[unitree_gripper_indices[1]] - left_hand_mat[unitree_gripper_indices[0]])
right_euclidean_distance = np.linalg.norm(right_hand_mat[unitree_gripper_indices[1]] - right_hand_mat[unitree_gripper_indices[0]])
# Linear mapping from [0, THUMB_INDEX_DISTANCE_MAX] to gripper action range
left_target_action = np.interp(left_euclidean_distance, [THUMB_INDEX_DISTANCE_MIN, THUMB_INDEX_DISTANCE_MAX], [LEFT_MAPPED_MIN, LEFT_MAPPED_MAX])
right_target_action = np.interp(right_euclidean_distance, [THUMB_INDEX_DISTANCE_MIN, THUMB_INDEX_DISTANCE_MAX], [RIGHT_MAPPED_MIN, RIGHT_MAPPED_MAX])
# else: # TEST WITHOUT APPLE VISION PRO
# else: # TEST WITHOUT XR DEVICE
# current_time = time.time()
# period = 2.5
# import math
Expand All @@ -357,6 +362,10 @@ def control_thread(self, left_hand_array, right_hand_array, dual_gripper_state_i

dual_gripper_action = np.array([right_actual_action, left_actual_action])

if self.smooth_filter:
self.smooth_filter.add_data(dual_gripper_action)
dual_gripper_action = self.smooth_filter.filtered_data

if dual_gripper_state_out and dual_gripper_action_out:
with dual_hand_data_lock:
dual_gripper_state_out[:] = dual_gripper_state - np.array([RIGHT_MAPPED_MIN, LEFT_MAPPED_MIN])
Expand Down
11 changes: 6 additions & 5 deletions teleop/teleop_hand_and_arm.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@
gripper_ctrl = Gripper_Controller(left_hand_array, right_hand_array, dual_gripper_data_lock, dual_gripper_state_array, dual_gripper_action_array)

if args.record:
recorder = EpisodeWriter(task_dir = args.task_dir, frequency = args.frequency, log = True)
recorder = EpisodeWriter(task_dir = args.task_dir, frequency = args.frequency, rerun_log = True)

try:
user_input = input("Please enter the start signal (enter 'r' to start the subsequent program):\n")
Expand Down Expand Up @@ -156,10 +156,10 @@
right_hand_action = dual_hand_action_array[-7:]
else:
with dual_gripper_data_lock:
left_hand_state = dual_gripper_state_array[1]
right_hand_state = dual_gripper_state_array[0]
left_hand_action = dual_gripper_action_array[1]
right_hand_action = dual_gripper_action_array[0]
left_hand_state = [dual_gripper_state_array[1]]
right_hand_state = [dual_gripper_state_array[0]]
left_hand_action = [dual_gripper_action_array[1]]
right_hand_action = [dual_gripper_action_array[0]]
# head image
current_tv_image = tv_img_array.copy()
# wrist image
Expand Down Expand Up @@ -247,5 +247,6 @@
if WRIST:
wrist_img_shm.unlink()
wrist_img_shm.close()
recorder.close()
print("Finally, exiting program...")
exit(0)
113 changes: 77 additions & 36 deletions teleop/utils/episode_writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,39 +5,52 @@
import numpy as np
import time
from .rerun_visualizer import RerunLogger

from queue import Queue, Empty
from threading import Thread

class EpisodeWriter():
def __init__(self, task_dir, frequency=30, image_size=[640, 480], log = False):
def __init__(self, task_dir, frequency=30, image_size=[640, 480], rerun_log = True):
"""
image_size: [width, height]
"""
print("==> EpisodeWriter initializing...\n")
self.task_dir = task_dir
self.frequency = frequency
self.image_size = image_size

self.rerun_log = rerun_log
if self.rerun_log:
print("==> RerunLogger initializing...\n")
self.rerun_logger = RerunLogger(prefix="online/", IdxRangeBoundary = 60, memory_limit = "300MB")
print("==> RerunLogger initializing ok.\n")

self.data = {}
self.episode_data = []
self.item_id = -1
self.episode_id = -1
self.log = log
if os.path.exists(self.task_dir):
episode_dirs = [episode_dir for episode_dir in os.listdir(self.task_dir) if 'episode_' in episode_dir]
episode_last = sorted(episode_dirs)[-1] if len(episode_dirs) > 0 else None
self.episode_id = 0 if episode_last is None else int(episode_last.split('_')[-1])
print(f"==> task_dir directory already exist, now self.episode_id is{self.episode_id}\n")
print(f"==> task_dir directory already exist, now self.episode_id is:{self.episode_id}\n")
else:
os.makedirs(self.task_dir)
print(f"==> episode directory does not exist, now create one.\n")
self.data_info()
self.text_desc()

# Initialize the queue and worker thread
self.item_data_queue = Queue(maxsize=100)
self.stop_worker = False
self.worker_thread = Thread(target=self.process_queue)
self.worker_thread.start()

print("==> EpisodeWriter initialized successfully.\n")

def data_info(self, version='1.0.0', date=None, author=None):
self.info = {
"version": "1.0.0" if version is None else version,
"date": "" if date is None else datetime.date.today().strftime('%Y-%m-%d'),
"date": datetime.date.today().strftime('%Y-%m-%d') if date is None else date,
"author": "unitree" if author is None else author,
"image": {"width":self.image_size[0], "height":self.image_size[1], "fps":self.frequency},
"depth": {"width":self.image_size[0], "height":self.image_size[1], "fps":self.frequency},
Expand Down Expand Up @@ -85,63 +98,91 @@ def create_episode(self):
os.makedirs(self.color_dir, exist_ok=True)
os.makedirs(self.depth_dir, exist_ok=True)
os.makedirs(self.audio_dir, exist_ok=True)
if self.log:
self.online_logger = RerunLogger(prefix="online/", IdxRangeBoundary = 60)
if self.rerun_log:
self.online_logger = RerunLogger(prefix="online/", IdxRangeBoundary = 60, memory_limit="300MB")

def add_item(self, colors, depths=None, states=None, actions=None, tactiles=None, audios=None):
# Increment the item ID
self.item_id += 1
# Create the item data dictionary
item_data = {
'idx': self.item_id,
'colors': {"color_0": None},
'depths': {"depth_0": None},
'states': {'left_arm':{}, 'right_arm':{}, 'left_hand':{}, 'right_hand':{}, 'body': {}},
'actions': {'left_arm':{}, 'right_arm':{}, 'left_hand':{}, 'right_hand':{}, 'body': {}},
'tactiles': {'left_hand':[], 'right_hand':[]},
'audios': { "mic0": None},
'colors': colors,
'depths': depths,
'states': states,
'actions': actions,
'tactiles': tactiles,
'audios': audios,
}
# Enqueue the item data
self.item_data_queue.put(item_data)

def process_queue(self):
while not self.stop_worker or not self.item_data_queue.empty():
try:
item_data = self.item_data_queue.get(timeout=1)
try:
self._process_item_data(item_data)
except Exception as e:
print(f"Error processing item_data (idx={item_data['idx']}): {e}")
self.item_data_queue.task_done()
except Empty:
continue

# save images
if colors is not None:
for idx, (_, color) in enumerate(colors.items()):
color_key = f'color_{idx}'
color_name = f'{str(self.item_id).zfill(6)}_{color_key}.jpg'
cv2.imwrite(os.path.join(self.color_dir, color_name), color)
def _process_item_data(self, item_data):
idx = item_data['idx']
colors = item_data.get('colors', {})
depths = item_data.get('depths', {})
audios = item_data.get('audios', {})

# Save images
if colors:
for idx_color, (color_key, color) in enumerate(colors.items()):
color_name = f'{str(idx).zfill(6)}_{color_key}.jpg'
if not cv2.imwrite(os.path.join(self.color_dir, color_name), color):
print(f"Failed to save color image.")
item_data['colors'][color_key] = os.path.join('colors', color_name)

# save depths
if depths is not None:
for idx, (_, depth) in enumerate(depths.items()):
depth_key = f'depth_{idx}'
depth_name = f'{str(self.item_id).zfill(6)}_{depth_key}.png'
cv2.imwrite(os.path.join(self.depth_dir, depth_name), depth)
# Save depths
if depths:
for idx_depth, (depth_key, depth) in enumerate(depths.items()):
depth_name = f'{str(idx).zfill(6)}_{depth_key}.jpg'
if not cv2.imwrite(os.path.join(self.depth_dir, depth_name), depth):
print(f"Failed to save depth image.")
item_data['depths'][depth_key] = os.path.join('depths', depth_name)

# save audios
if audios is not None:
# Save audios
if audios:
for mic, audio in audios.items():
audio_name = f'audio_{str(self.item_id).zfill(6)}_{mic}.npy'
audio_name = f'audio_{str(idx).zfill(6)}_{mic}.npy'
np.save(os.path.join(self.audio_dir, audio_name), audio.astype(np.int16))
item_data['audios'][mic] = os.path.join('audios', audio_name)

item_data['states'] = states
item_data['actions'] = actions
item_data['tactiles'] = tactiles

# Update episode data
self.episode_data.append(item_data)

if self.log:
# Log data if necessary
if self.rerun_log:
curent_record_time = time.time()
print(f"==> episode_id:{self.episode_id} item_id:{self.item_id} current_time:{curent_record_time}")
self.online_logger.log_item_data(item_data)

self.rerun_logger.log_item_data(item_data)

def save_episode(self):
"""
with open("./hmm.json",'r',encoding='utf-8') as json_file:
model=json.load(json_file)
"""
# Wait for the queue to be processed
self.item_data_queue.join()
# save
self.data['info'] = self.info
self.data['text'] = self.text
self.data['data'] = self.episode_data
with open(self.json_path,'w',encoding='utf-8') as jsonf:
jsonf.write(json.dumps(self.data, indent=4, ensure_ascii=False))
jsonf.write(json.dumps(self.data, indent=4, ensure_ascii=False))

def close(self):
self.item_data_queue.join()
# Signal the worker thread to stop and join the thread
self.stop_worker = True
self.worker_thread.join()
Loading

0 comments on commit b56711f

Please sign in to comment.