From 2b7e0809b3cd4d4e0bdc9a5affd2673407ad9f60 Mon Sep 17 00:00:00 2001 From: Ameen Ur Rehman Date: Sun, 31 Mar 2024 17:58:56 +0530 Subject: [PATCH 01/26] minor update to accept _venv --- syllabus/examples/task_wrappers/procgen_task_wrapper.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/syllabus/examples/task_wrappers/procgen_task_wrapper.py b/syllabus/examples/task_wrappers/procgen_task_wrapper.py index 2296fd58..6eeb1688 100644 --- a/syllabus/examples/task_wrappers/procgen_task_wrapper.py +++ b/syllabus/examples/task_wrappers/procgen_task_wrapper.py @@ -39,7 +39,11 @@ def __init__(self, env: gym.Env, env_id, seed=0): self.observation_space = self.env.observation_space def seed(self, seed): - self.env.gym_env.unwrapped._venv.seed(int(seed), 0) + if hasattr(self.env, 'gym_env') and hasattr(self.env.gym_env, 'unwrapped'): + if hasattr(self.env.gym_env.unwrapped, '_venv'): + self.env.gym_env.unwrapped._venv.seed(int(seed), 0) + else: + self.env.seed(int(seed)) def reset(self, new_task=None, **kwargs): """ From d098b409546a822cafc669b035d592b1e210c79d Mon Sep 17 00:00:00 2001 From: Ameen Ur Rehman Date: Mon, 8 Apr 2024 00:28:49 +0530 Subject: [PATCH 02/26] updated the task_sampler to accept evaluation task ,robust_plr and env_evals --- syllabus/curricula/plr/task_sampler.py | 150 ++++++++++++++++++++++++- 1 file changed, 145 insertions(+), 5 deletions(-) diff --git a/syllabus/curricula/plr/task_sampler.py b/syllabus/curricula/plr/task_sampler.py index 15ad4852..f4d8ddc4 100644 --- a/syllabus/curricula/plr/task_sampler.py +++ b/syllabus/curricula/plr/task_sampler.py @@ -1,8 +1,10 @@ # Code heavily based on the original Prioritized Level Replay implementation from https://github.com/facebookresearch/level-replay # If you use this code, please cite the above codebase and original PLR paper: https://arxiv.org/abs/2010.03934 + import gymnasium as gym import numpy as np import torch +from typing import List class TaskSampler: @@ -23,6 +25,7 @@ class TaskSampler: staleness_coef (float): Linear interpolation weight for task staleness vs. task score. 0.0 means only use task score, 1.0 means only use staleness. staleness_transform (str): Transform to apply to task staleness. One of "constant", "max", "eps_greedy", "rank", "power", "softmax". staleness_temperature (float): Temperature for staleness transform. Increasing temperature makes the sampling distribution more uniform. + eval_envs (List[gym.Env]): List of evaluation environments """ def __init__( self, @@ -37,9 +40,13 @@ def __init__( rho: float = 1.0, nu: float = 0.5, alpha: float = 1.0, + gamma: float = 0.999, + gae_lambda: float = 0.95, staleness_coef: float = 0.1, staleness_transform: str = "power", staleness_temperature: float = 1.0, + robust_plr: bool = False, + eval_envs: List[gym.Env] = None, ): self.action_space = action_space self.tasks = tasks @@ -53,9 +60,13 @@ def __init__( self.rho = rho self.nu = nu self.alpha = float(alpha) + self.gamma = gamma + self.gae_lambda = gae_lambda self.staleness_coef = staleness_coef self.staleness_transform = staleness_transform self.staleness_temperature = staleness_temperature + self.robust_plr = robust_plr + self.eval_envs = eval_envs self.unseen_task_weights = np.array([1.0] * self.num_tasks) self.task_scores = np.array([0.0] * self.num_tasks, dtype=float) @@ -261,12 +272,93 @@ def _sample_unseen_level(self): return task + def compute_returns(self, gamma, gae_lambda, rewards, value_preds, masks): + assert self.requires_value_buffers, "Selected strategy does not use compute_rewards." + gae = 0 + returns = torch.zeros_like(rewards) + for step in reversed(range(rewards.size(0))): + delta = ( + rewards[step] + + gamma * value_preds[step + 1] * masks[step + 1] + - value_preds[step] + ) + gae = delta + gamma * gae_lambda * masks[step + 1] * gae + returns[step] = gae + value_preds[step] + return returns + + def evaluate_task(self, task, env, get_action_and_value_fn, gamma, gae_lambda): + + if env is None: + raise ValueError("Environment object is None. Please ensure it is properly initialized.") + obs = env.reset(next_task=task) + done = False + episode_data = { + 'tasks': [], + 'masks': [], + 'rewards': [], + 'returns': [], + 'value_preds': [], + 'policy_logits': [] + } + + while not done: + action, value = get_action_and_value_fn(obs) + obs, rew, done, info = env.step(action) + + episode_data['tasks'].append(task) + episode_data['masks'].append(not done) + episode_data['rewards'].append(rew) + episode_data['value_preds'].append(value) + episode_data['policy_logits'].append(info['policy_logits']) + + episode_data['returns'] = self.compute_returns(gamma, gae_lambda, episode_data['rewards'], + episode_data['value_preds']) + + return episode_data + + def _sample_unseen_level(self): + sample_weights = self.unseen_task_weights / self.unseen_task_weights.sum() + task_idx = np.random.choice(range(self.num_tasks), 1, p=sample_weights)[0] + task = self.tasks[task_idx] + + self._update_staleness(task_idx) + + return task + + def _evaluate_unseen_level(self): + task_idx = \ + np.random.choice(range(self.num_tasks), 1, p=self.unseen_task_weights / self.unseen_task_weights.sum())[0] + task = self.tasks[task_idx] + episode_data = self.evaluate_task(task, self.eval_envs, self.get_action_and_value_fn, self.gamma, self.gae_lambda) + self.update_with_episode_data(episode_data, self._average_gae) # Update task scores + return task + + def get_action_and_value_fn(agent_model, device): + def action_value_fn(obs): + # Convert observation to tensor if necessary + obs_tensor = torch.tensor(obs, dtype=torch.float32).unsqueeze(0).to(device) + + # Forward pass through the agent's model to get action logits and state value + with torch.no_grad(): + action_logits, state_value = agent_model(obs_tensor) + + # Convert action logits to probabilities + action_probs = torch.softmax(action_logits, dim=-1) + + # Sample action from the action probabilities + action = torch.multinomial(action_probs, num_samples=1).squeeze().item() + + # Return the sampled action and the state value + return action, state_value.item() + + return action_value_fn + def sample(self, strategy=None): if not strategy: strategy = self.strategy if strategy == "random": - task_idx = np.random.choice(range((self.num_tasks))) + task_idx = np.random.choice(range(self.num_tasks)) task = self.tasks[task_idx] return task @@ -285,16 +377,64 @@ def sample(self, strategy=None): if np.random.rand() > self.nu or not proportion_seen < 1.0: return self._sample_replay_level() - # Otherwise, sample a new level - return self._sample_unseen_level() + # Otherwise, evaluate a new level + if self.robust_plr: + self.update_with_episode_data(self._evaluate_unseen_level()) + return self.sample(strategy=strategy) + else: + # Otherwise, sample a new level + return self._sample_unseen_level() + elif self.replay_schedule == "proportionate": if proportion_seen >= self.rho and np.random.rand() < proportion_seen: return self._sample_replay_level() else: - return self._sample_unseen_level() + if self.robust_plr: + return self._evaluate_unseen_level() + else: + return self._sample_unseen_level() + else: - raise NotImplementedError(f"Unsupported replay schedule: {self.replay_schedule}. Must be 'fixed' or 'proportionate'.") + raise NotImplementedError( + f"Unsupported replay schedule: {self.replay_schedule}. Must be 'fixed' or 'proportionate'.") + + def update_with_episode_data(self, episode_data, score_function): + tasks = episode_data['tasks'] + done = ~(episode_data['masks'] > 0) + total_steps, num_actors = episode_data['tasks'].shape[:2] + + for actor_index in range(num_actors): + done_steps = done[:, actor_index].nonzero()[:total_steps, 0] + start_t = 0 + + for t in done_steps: + if not start_t < total_steps: + break + + if (t == 0): # if t is 0, then this done step caused a full update of previous last cycle + continue + + task_idx_t = tasks[start_t, actor_index].item() + + score_function_kwargs = {} + score_function_kwargs["returns"] = episode_data['returns'][start_t:t, actor_index] + score_function_kwargs["value_preds"] = episode_data['value_preds'][start_t:t, actor_index] + score = score_function(**score_function_kwargs) + num_steps = len(episode_data['tasks'][start_t:t, actor_index]) + self.update_task_score(actor_index, task_idx_t, score, num_steps) + + start_t = t.item() + if start_t < total_steps: + task_idx_t = tasks[start_t, actor_index].item() + + score_function_kwargs = {} + score_function_kwargs["returns"] = episode_data['returns'][start_t:, actor_index] + score_function_kwargs["value_preds"] = episode_data['value_preds'][start_t:, actor_index] + score = score_function(**score_function_kwargs) + self._last_score = score + num_steps = len(episode_data['tasks'][start_t:, actor_index]) + self._partial_update_task_score(actor_index, task_idx_t, score, num_steps) def sample_weights(self): weights = self._score_transform(self.score_transform, self.temperature, self.task_scores) From 3b3cea1b611f0aa385f4d7ce4c0741e95f46dca3 Mon Sep 17 00:00:00 2001 From: Ameen Ur Rehman Date: Mon, 8 Apr 2024 00:29:25 +0530 Subject: [PATCH 03/26] revert the changes for seed function --- syllabus/examples/task_wrappers/procgen_task_wrapper.py | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/syllabus/examples/task_wrappers/procgen_task_wrapper.py b/syllabus/examples/task_wrappers/procgen_task_wrapper.py index 6eeb1688..2296fd58 100644 --- a/syllabus/examples/task_wrappers/procgen_task_wrapper.py +++ b/syllabus/examples/task_wrappers/procgen_task_wrapper.py @@ -39,11 +39,7 @@ def __init__(self, env: gym.Env, env_id, seed=0): self.observation_space = self.env.observation_space def seed(self, seed): - if hasattr(self.env, 'gym_env') and hasattr(self.env.gym_env, 'unwrapped'): - if hasattr(self.env.gym_env.unwrapped, '_venv'): - self.env.gym_env.unwrapped._venv.seed(int(seed), 0) - else: - self.env.seed(int(seed)) + self.env.gym_env.unwrapped._venv.seed(int(seed), 0) def reset(self, new_task=None, **kwargs): """ From 39aa1bc911e268e31023feb61544bfa3e9fb50f5 Mon Sep 17 00:00:00 2001 From: Ameen Ur Rehman Date: Wed, 17 Apr 2024 21:36:55 +0530 Subject: [PATCH 04/26] fixs the issues and update the functions --- syllabus/curricula/plr/task_sampler.py | 62 ++++++++++---------------- 1 file changed, 23 insertions(+), 39 deletions(-) diff --git a/syllabus/curricula/plr/task_sampler.py b/syllabus/curricula/plr/task_sampler.py index f4d8ddc4..f06bcb0d 100644 --- a/syllabus/curricula/plr/task_sampler.py +++ b/syllabus/curricula/plr/task_sampler.py @@ -287,34 +287,37 @@ def compute_returns(self, gamma, gae_lambda, rewards, value_preds, masks): return returns def evaluate_task(self, task, env, get_action_and_value_fn, gamma, gae_lambda): - if env is None: raise ValueError("Environment object is None. Please ensure it is properly initialized.") + obs = env.reset(next_task=task) done = False - episode_data = { - 'tasks': [], - 'masks': [], - 'rewards': [], - 'returns': [], - 'value_preds': [], - 'policy_logits': [] - } + rewards = [] + masks = [] + values = [] while not done: action, value = get_action_and_value_fn(obs) - obs, rew, done, info = env.step(action) + obs, rew, term, trunc, info = env.step(action) - episode_data['tasks'].append(task) - episode_data['masks'].append(not done) - episode_data['rewards'].append(rew) - episode_data['value_preds'].append(value) - episode_data['policy_logits'].append(info['policy_logits']) + rewards.append(rew) + masks.append(not (term or trunc)) + values.append(value) - episode_data['returns'] = self.compute_returns(gamma, gae_lambda, episode_data['rewards'], - episode_data['value_preds']) + # Check if the episode is done + if term or trunc: + done = True - return episode_data + # Compute returns after the episode is complete + returns = self.compute_returns(gamma, gae_lambda, rewards, values, masks) + + return { + "tasks": task, + "masks": masks, + "rewards": rewards, + "value_preds": values, + "returns": returns + } def _sample_unseen_level(self): sample_weights = self.unseen_task_weights / self.unseen_task_weights.sum() @@ -333,26 +336,6 @@ def _evaluate_unseen_level(self): self.update_with_episode_data(episode_data, self._average_gae) # Update task scores return task - def get_action_and_value_fn(agent_model, device): - def action_value_fn(obs): - # Convert observation to tensor if necessary - obs_tensor = torch.tensor(obs, dtype=torch.float32).unsqueeze(0).to(device) - - # Forward pass through the agent's model to get action logits and state value - with torch.no_grad(): - action_logits, state_value = agent_model(obs_tensor) - - # Convert action logits to probabilities - action_probs = torch.softmax(action_logits, dim=-1) - - # Sample action from the action probabilities - action = torch.multinomial(action_probs, num_samples=1).squeeze().item() - - # Return the sampled action and the state value - return action, state_value.item() - - return action_value_fn - def sample(self, strategy=None): if not strategy: strategy = self.strategy @@ -391,7 +374,8 @@ def sample(self, strategy=None): return self._sample_replay_level() else: if self.robust_plr: - return self._evaluate_unseen_level() + self.update_with_episode_data(self._evaluate_unseen_level()) + return self.sample(strategy=strategy) else: return self._sample_unseen_level() From 992bdab9d3ab924cef22421ec3a2ac6f2b63184c Mon Sep 17 00:00:00 2001 From: Ameen Ur Rehman Date: Wed, 17 Apr 2024 21:37:21 +0530 Subject: [PATCH 05/26] update the plr_wrapper to accept the robust_plr option --- syllabus/curricula/plr/plr_wrapper.py | 40 +++++++++++++++++++++++++-- 1 file changed, 38 insertions(+), 2 deletions(-) diff --git a/syllabus/curricula/plr/plr_wrapper.py b/syllabus/curricula/plr/plr_wrapper.py index 9515df4b..19983bb9 100644 --- a/syllabus/curricula/plr/plr_wrapper.py +++ b/syllabus/curricula/plr/plr_wrapper.py @@ -166,6 +166,26 @@ def compute_returns(self, gamma, gae_lambda): def null(x): return None +def get_action_and_value_fn(agent_model, device): + def action_value_fn(obs): + # Convert observation to tensor if necessary + obs_tensor = torch.tensor(obs, dtype=torch.float32).unsqueeze(0).to(device) + + # Forward pass through the agent's model to get action logits and state value + with torch.no_grad(): + action_logits, state_value = agent_model(obs_tensor) + + # Convert action logits to probabilities + action_probs = torch.softmax(action_logits, dim=-1) + + # Sample action from the action probabilities + action = torch.multinomial(action_probs, num_samples=1).squeeze().item() + + # Return the sampled action and the state value + return action, state_value.item() + + return action_value_fn + class PrioritizedLevelReplay(Curriculum): """ Prioritized Level Replay (PLR) Curriculum. @@ -181,6 +201,8 @@ class PrioritizedLevelReplay(Curriculum): gamma (float): The discount factor used to compute returns gae_lambda (float): The GAE lambda value. suppress_usage_warnings (bool): Whether to suppress warnings about improper usage. + robust_plr (bool): Option to use RobustPLR. + eval_envs: Evaluation environments for RobustPLR. **curriculum_kwargs: Keyword arguments to pass to the curriculum. """ REQUIRES_STEP_UPDATES = True @@ -202,6 +224,8 @@ def __init__( suppress_usage_warnings=False, get_value=null, get_action_log_dist=null, + robust_plr: bool = False, # Option to use RobustPLR + eval_envs=None, **curriculum_kwargs, ): # Preprocess curriculum intialization args @@ -225,8 +249,15 @@ def __init__( self._supress_usage_warnings = suppress_usage_warnings self._get_action_log_dist = get_action_log_dist self._task2index = {task: i for i, task in enumerate(self.tasks)} + self._robust_plr = robust_plr + self._eval_envs = eval_envs + self._get_action_and_value_fn = get_action_and_value_fn + + if robust_plr: + self._task_sampler = TaskSampler(self.tasks, action_space=action_space, robust_plr=robust_plr, eval_envs=eval_envs, **task_sampler_kwargs_dict) + else: + self._task_sampler = TaskSampler(self.tasks, action_space=action_space, **task_sampler_kwargs_dict) - self._task_sampler = TaskSampler(self.tasks, action_space=action_space, **task_sampler_kwargs_dict) self._rollouts = RolloutStorage( self._num_steps, self._num_processes, @@ -250,7 +281,12 @@ def sample(self, k: int = 1) -> Union[List, Any]: if self._should_use_startup_sampling(): return self._startup_sample() else: - return [self._task_sampler.sample() for _ in range(k)] + if self._robust_plr: + if self._eval_envs is None: + raise ValueError("When robust_plr is enabled, eval_envs must not be None.") + return [self._evaluate_task_and_update_score() for _ in range(k)] + else: + return [self._task_sampler.sample() for _ in range(k)] def update_on_step(self, obs, rew, term, trunc, info, env_id: int = None) -> None: """ From d92f64c05e5c5ffa449ff10acf39d09773f67942 Mon Sep 17 00:00:00 2001 From: Ameen Ur Rehman Date: Thu, 18 Apr 2024 21:59:41 +0530 Subject: [PATCH 06/26] clean-up code and initialize the action_value_fn and callables --- syllabus/curricula/plr/plr_wrapper.py | 33 ++++----------------------- 1 file changed, 5 insertions(+), 28 deletions(-) diff --git a/syllabus/curricula/plr/plr_wrapper.py b/syllabus/curricula/plr/plr_wrapper.py index 19983bb9..06645e62 100644 --- a/syllabus/curricula/plr/plr_wrapper.py +++ b/syllabus/curricula/plr/plr_wrapper.py @@ -166,26 +166,6 @@ def compute_returns(self, gamma, gae_lambda): def null(x): return None -def get_action_and_value_fn(agent_model, device): - def action_value_fn(obs): - # Convert observation to tensor if necessary - obs_tensor = torch.tensor(obs, dtype=torch.float32).unsqueeze(0).to(device) - - # Forward pass through the agent's model to get action logits and state value - with torch.no_grad(): - action_logits, state_value = agent_model(obs_tensor) - - # Convert action logits to probabilities - action_probs = torch.softmax(action_logits, dim=-1) - - # Sample action from the action probabilities - action = torch.multinomial(action_probs, num_samples=1).squeeze().item() - - # Return the sampled action and the state value - return action, state_value.item() - - return action_value_fn - class PrioritizedLevelReplay(Curriculum): """ Prioritized Level Replay (PLR) Curriculum. @@ -203,6 +183,7 @@ class PrioritizedLevelReplay(Curriculum): suppress_usage_warnings (bool): Whether to suppress warnings about improper usage. robust_plr (bool): Option to use RobustPLR. eval_envs: Evaluation environments for RobustPLR. + action_value_fn (callable): A function that takes an observation as input and returns an action and value. **curriculum_kwargs: Keyword arguments to pass to the curriculum. """ REQUIRES_STEP_UPDATES = True @@ -226,6 +207,7 @@ def __init__( get_action_log_dist=null, robust_plr: bool = False, # Option to use RobustPLR eval_envs=None, + action_value_fn = None, **curriculum_kwargs, ): # Preprocess curriculum intialization args @@ -251,10 +233,10 @@ def __init__( self._task2index = {task: i for i, task in enumerate(self.tasks)} self._robust_plr = robust_plr self._eval_envs = eval_envs - self._get_action_and_value_fn = get_action_and_value_fn + self.action_value_fn = action_value_fn if robust_plr: - self._task_sampler = TaskSampler(self.tasks, action_space=action_space, robust_plr=robust_plr, eval_envs=eval_envs, **task_sampler_kwargs_dict) + self._task_sampler = TaskSampler(self.tasks, action_space=action_space, robust_plr=robust_plr, eval_envs=eval_envs, action_value_fn = action_value_fn, **task_sampler_kwargs_dict) else: self._task_sampler = TaskSampler(self.tasks, action_space=action_space, **task_sampler_kwargs_dict) @@ -281,12 +263,7 @@ def sample(self, k: int = 1) -> Union[List, Any]: if self._should_use_startup_sampling(): return self._startup_sample() else: - if self._robust_plr: - if self._eval_envs is None: - raise ValueError("When robust_plr is enabled, eval_envs must not be None.") - return [self._evaluate_task_and_update_score() for _ in range(k)] - else: - return [self._task_sampler.sample() for _ in range(k)] + return [self._task_sampler.sample() for _ in range(k)] def update_on_step(self, obs, rew, term, trunc, info, env_id: int = None) -> None: """ From d24434bed04f98dcc358b72bce0b5664ef1416bf Mon Sep 17 00:00:00 2001 From: Ameen Ur Rehman Date: Thu, 18 Apr 2024 22:01:13 +0530 Subject: [PATCH 07/26] optimize the sample_fn and update tp accept the action_value_fn --- syllabus/curricula/plr/task_sampler.py | 24 +++++++++++++++++++----- 1 file changed, 19 insertions(+), 5 deletions(-) diff --git a/syllabus/curricula/plr/task_sampler.py b/syllabus/curricula/plr/task_sampler.py index f06bcb0d..da9903c4 100644 --- a/syllabus/curricula/plr/task_sampler.py +++ b/syllabus/curricula/plr/task_sampler.py @@ -26,6 +26,7 @@ class TaskSampler: staleness_transform (str): Transform to apply to task staleness. One of "constant", "max", "eps_greedy", "rank", "power", "softmax". staleness_temperature (float): Temperature for staleness transform. Increasing temperature makes the sampling distribution more uniform. eval_envs (List[gym.Env]): List of evaluation environments + action_value_fn (callable): A function that takes an observation as input and returns an action and value. """ def __init__( self, @@ -47,6 +48,7 @@ def __init__( staleness_temperature: float = 1.0, robust_plr: bool = False, eval_envs: List[gym.Env] = None, + action_value_fn=None, ): self.action_space = action_space self.tasks = tasks @@ -67,6 +69,7 @@ def __init__( self.staleness_temperature = staleness_temperature self.robust_plr = robust_plr self.eval_envs = eval_envs + self.action_value_fn = action_value_fn self.unseen_task_weights = np.array([1.0] * self.num_tasks) self.task_scores = np.array([0.0] * self.num_tasks, dtype=float) @@ -286,7 +289,7 @@ def compute_returns(self, gamma, gae_lambda, rewards, value_preds, masks): returns[step] = gae + value_preds[step] return returns - def evaluate_task(self, task, env, get_action_and_value_fn, gamma, gae_lambda): + def evaluate_task(self, task, env, action_value_fn, gamma, gae_lambda): if env is None: raise ValueError("Environment object is None. Please ensure it is properly initialized.") @@ -297,7 +300,7 @@ def evaluate_task(self, task, env, get_action_and_value_fn, gamma, gae_lambda): values = [] while not done: - action, value = get_action_and_value_fn(obs) + action, value = action_value_fn(obs) obs, rew, term, trunc, info = env.step(action) rewards.append(rew) @@ -332,7 +335,7 @@ def _evaluate_unseen_level(self): task_idx = \ np.random.choice(range(self.num_tasks), 1, p=self.unseen_task_weights / self.unseen_task_weights.sum())[0] task = self.tasks[task_idx] - episode_data = self.evaluate_task(task, self.eval_envs, self.get_action_and_value_fn, self.gamma, self.gae_lambda) + episode_data = self.evaluate_task(task, self.eval_envs, self.action_value_fn, self.gamma, self.gae_lambda) self.update_with_episode_data(episode_data, self._average_gae) # Update task scores return task @@ -374,8 +377,19 @@ def sample(self, strategy=None): return self._sample_replay_level() else: if self.robust_plr: - self.update_with_episode_data(self._evaluate_unseen_level()) - return self.sample(strategy=strategy) + while True: + task = self._evaluate_unseen_level() + episode_data = self.evaluate_task(task, self.eval_envs, self.action_value_fn, self.gamma, + self.gae_lambda) + self.update_with_episode_data(episode_data, self._average_gae) # Update task scores + + # Check if we need to sample another unseen level + num_unseen = (self.unseen_task_weights > 0).sum() + proportion_seen = (self.num_tasks - num_unseen) / self.num_tasks + if proportion_seen < self.rho or np.random.rand() >= proportion_seen: + break + + return task else: return self._sample_unseen_level() From 03b6b7752c449cab5ed57172bc627c8f24db2ab3 Mon Sep 17 00:00:00 2001 From: Ameen Ur Rehman Date: Sun, 21 Apr 2024 09:36:54 +0530 Subject: [PATCH 08/26] add the robust_plr option for existence plr --- tests/multiprocessing_smoke_tests.py | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/tests/multiprocessing_smoke_tests.py b/tests/multiprocessing_smoke_tests.py index 9db9f471..364a7172 100644 --- a/tests/multiprocessing_smoke_tests.py +++ b/tests/multiprocessing_smoke_tests.py @@ -1,5 +1,6 @@ """ Test curriculum synchronization across multiple processes. """ import pytest +import gym from nle.env.tasks import NetHackScore, NetHackScout, NetHackStaircase from syllabus.core import make_multiprocessing_curriculum, make_ray_curriculum @@ -9,14 +10,13 @@ LearningProgressCurriculum, NoopCurriculum, PrioritizedLevelReplay, SequentialCurriculum, SimpleBoxCurriculum) -from syllabus.tests import (create_cartpole_env, create_nethack_env, +from syllabus.tests import (create_cartpole_env, create_nethack_env, get_action_value, get_test_values, run_native_multiprocess, run_ray_multiprocess, run_single_process) N_ENVS = 2 N_EPISODES = 2 - nethack_env = create_nethack_env() cartpole_env = create_cartpole_env() @@ -29,7 +29,10 @@ "get_value": get_test_values, "device": "cpu", "num_processes": N_ENVS, - "num_steps": 2048 + "num_steps": 2048, + "robust_plr": True, + "eval_envs": gym.vector.AsyncVectorEnv([make_env(args.env_id, args.seed + i) for i in range(args.num_envs)]), + "action_value_fn": get_action_value }), (SimpleBoxCurriculum, create_cartpole_env, (cartpole_env.task_space,), {}), (AnnealingBoxCurriculum, create_cartpole_env, (cartpole_env.task_space,), { From cda5ddbfd82fa21b5f16a07e3b2dc34d4a3d0a8f Mon Sep 17 00:00:00 2001 From: Ameen Ur Rehman Date: Sun, 21 Apr 2024 09:37:40 +0530 Subject: [PATCH 09/26] clean-up code for plr_wrapper and fixes minor issues --- syllabus/curricula/plr/plr_wrapper.py | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/syllabus/curricula/plr/plr_wrapper.py b/syllabus/curricula/plr/plr_wrapper.py index 06645e62..446bc06e 100644 --- a/syllabus/curricula/plr/plr_wrapper.py +++ b/syllabus/curricula/plr/plr_wrapper.py @@ -206,7 +206,7 @@ def __init__( get_value=null, get_action_log_dist=null, robust_plr: bool = False, # Option to use RobustPLR - eval_envs=None, + eval_envs: List[gym.Env] = None, action_value_fn = None, **curriculum_kwargs, ): @@ -235,10 +235,7 @@ def __init__( self._eval_envs = eval_envs self.action_value_fn = action_value_fn - if robust_plr: - self._task_sampler = TaskSampler(self.tasks, action_space=action_space, robust_plr=robust_plr, eval_envs=eval_envs, action_value_fn = action_value_fn, **task_sampler_kwargs_dict) - else: - self._task_sampler = TaskSampler(self.tasks, action_space=action_space, **task_sampler_kwargs_dict) + self._task_sampler = TaskSampler(self.tasks, action_space=action_space, robust_plr=robust_plr, eval_envs=eval_envs, action_value_fn = action_value_fn, **task_sampler_kwargs_dict) self._rollouts = RolloutStorage( self._num_steps, From 908688d61e47fb8b3cb2aa1dee9ebb96b1ad89fc Mon Sep 17 00:00:00 2001 From: Ameen Ur Rehman Date: Sun, 21 Apr 2024 09:38:50 +0530 Subject: [PATCH 10/26] optimize code based on the suggestions --- syllabus/curricula/plr/task_sampler.py | 107 +++++-------------------- 1 file changed, 20 insertions(+), 87 deletions(-) diff --git a/syllabus/curricula/plr/task_sampler.py b/syllabus/curricula/plr/task_sampler.py index da9903c4..7239cc5f 100644 --- a/syllabus/curricula/plr/task_sampler.py +++ b/syllabus/curricula/plr/task_sampler.py @@ -179,67 +179,6 @@ def _one_step_td_error(self, **kwargs): def requires_value_buffers(self): return self.strategy in ["gae", "value_l1", "one_step_td_error"] - def _update_with_rollouts(self, rollouts, score_function): - tasks = rollouts.tasks - if not self.requires_value_buffers: - policy_logits = rollouts.action_log_dist - done = ~(rollouts.masks > 0) - total_steps, num_actors = rollouts.tasks.shape[:2] - - for actor_index in range(num_actors): - done_steps = done[:, actor_index].nonzero()[:total_steps, 0] - start_t = 0 - - for t in done_steps: - if not start_t < total_steps: - break - - if (t == 0): # if t is 0, then this done step caused a full update of previous last cycle - continue - - # If there is only 1 step, we can't calculate the one-step td error - if self.strategy == "one_step_td_error" and t - start_t <= 1: - continue - - task_idx_t = tasks[start_t, actor_index].item() - - # Store kwargs for score function - score_function_kwargs = {} - if self.requires_value_buffers: - score_function_kwargs["returns"] = rollouts.returns[start_t:t, actor_index] - score_function_kwargs["rewards"] = rollouts.rewards[start_t:t, actor_index] - score_function_kwargs["value_preds"] = rollouts.value_preds[start_t:t, actor_index] - else: - episode_logits = policy_logits[start_t:t, actor_index] - score_function_kwargs["episode_logits"] = torch.log_softmax(episode_logits, -1) - score = score_function(**score_function_kwargs) - num_steps = len(rollouts.tasks[start_t:t, actor_index]) - # TODO: Check that task_idx_t is correct - self.update_task_score(actor_index, task_idx_t, score, num_steps) - - start_t = t.item() - if start_t < total_steps: - # If there is only 1 step, we can't calculate the one-step td error - if self.strategy == "one_step_td_error" and start_t == total_steps - 1: - continue - # TODO: Check this too - task_idx_t = tasks[start_t, actor_index].item() - - # Store kwargs for score function - score_function_kwargs = {} - if self.requires_value_buffers: - score_function_kwargs["returns"] = rollouts.returns[start_t:, actor_index] - score_function_kwargs["rewards"] = rollouts.rewards[start_t:, actor_index] - score_function_kwargs["value_preds"] = rollouts.value_preds[start_t:, actor_index] - else: - episode_logits = policy_logits[start_t:, actor_index] - score_function_kwargs["episode_logits"] = torch.log_softmax(episode_logits, -1) - - score = score_function(**score_function_kwargs) - self._last_score = score - num_steps = len(rollouts.tasks[start_t:, actor_index]) - self._partial_update_task_score(actor_index, task_idx_t, score, num_steps) - def after_update(self): # Reset partial updates, since weights have changed, and thus logits are now stale for actor_index in range(self.partial_task_scores.shape[0]): @@ -322,15 +261,6 @@ def evaluate_task(self, task, env, action_value_fn, gamma, gae_lambda): "returns": returns } - def _sample_unseen_level(self): - sample_weights = self.unseen_task_weights / self.unseen_task_weights.sum() - task_idx = np.random.choice(range(self.num_tasks), 1, p=sample_weights)[0] - task = self.tasks[task_idx] - - self._update_staleness(task_idx) - - return task - def _evaluate_unseen_level(self): task_idx = \ np.random.choice(range(self.num_tasks), 1, p=self.unseen_task_weights / self.unseen_task_weights.sum())[0] @@ -377,19 +307,8 @@ def sample(self, strategy=None): return self._sample_replay_level() else: if self.robust_plr: - while True: - task = self._evaluate_unseen_level() - episode_data = self.evaluate_task(task, self.eval_envs, self.action_value_fn, self.gamma, - self.gae_lambda) - self.update_with_episode_data(episode_data, self._average_gae) # Update task scores - - # Check if we need to sample another unseen level - num_unseen = (self.unseen_task_weights > 0).sum() - proportion_seen = (self.num_tasks - num_unseen) / self.num_tasks - if proportion_seen < self.rho or np.random.rand() >= proportion_seen: - break - - return task + self.update_with_episode_data(self._evaluate_unseen_level()) + return self.sample(strategy=strategy) else: return self._sample_unseen_level() @@ -398,9 +317,13 @@ def sample(self, strategy=None): f"Unsupported replay schedule: {self.replay_schedule}. Must be 'fixed' or 'proportionate'.") def update_with_episode_data(self, episode_data, score_function): - tasks = episode_data['tasks'] - done = ~(episode_data['masks'] > 0) - total_steps, num_actors = episode_data['tasks'].shape[:2] + def compute_episode_variables(episode_data): + tasks = episode_data['tasks'] + done = ~(episode_data['masks'] > 0) + total_steps, num_actors = episode_data['tasks'].shape[:2] + return tasks, done, total_steps, num_actors + + tasks, done, total_steps, num_actors = compute_episode_variables(episode_data) for actor_index in range(num_actors): done_steps = done[:, actor_index].nonzero()[:total_steps, 0] @@ -410,7 +333,7 @@ def update_with_episode_data(self, episode_data, score_function): if not start_t < total_steps: break - if (t == 0): # if t is 0, then this done step caused a full update of previous last cycle + if t == 0: # if t is 0, then this done step caused a full update of previous last cycle continue task_idx_t = tasks[start_t, actor_index].item() @@ -434,6 +357,16 @@ def update_with_episode_data(self, episode_data, score_function): num_steps = len(episode_data['tasks'][start_t:, actor_index]) self._partial_update_task_score(actor_index, task_idx_t, score, num_steps) + def rollouts_to_episode_data(self, rollouts): + episode_data = { + "tasks": rollouts.tasks, + "returns": rollouts.returns, + "value_preds": rollouts.value_preds, + "masks": rollouts.masks + } + + return episode_data + def sample_weights(self): weights = self._score_transform(self.score_transform, self.temperature, self.task_scores) weights = weights * (1 - self.unseen_task_weights) # zero out unseen levels From 8364ff9dd99756df5e05afd75c7a277629952cbc Mon Sep 17 00:00:00 2001 From: Ameen Ur Rehman Date: Sun, 21 Apr 2024 09:39:31 +0530 Subject: [PATCH 11/26] add user defined get_action_value func --- syllabus/tests/utils.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/syllabus/tests/utils.py b/syllabus/tests/utils.py index 314a29c8..4b12c20d 100644 --- a/syllabus/tests/utils.py +++ b/syllabus/tests/utils.py @@ -189,6 +189,10 @@ def run_ray_multiprocess(env_fn, env_args=(), env_kwargs={}, curriculum=None, nu def get_test_values(x): return torch.unsqueeze(torch.Tensor(np.array([0] * len(x))), -1) +def get_action_value(obs): + actions = np.zeros(len(obs)) + values = np.zeros(len(obs)) + return actions, values # Sync Test Environment def create_synctest_env(*args, type=None, env_args=(), env_kwargs={}, **kwargs): From cb71cb832040bcc73ac8bd80a760f0aee13d32ac Mon Sep 17 00:00:00 2001 From: Ameen Ur Rehman Date: Sun, 21 Apr 2024 22:36:52 +0530 Subject: [PATCH 12/26] fixes the eval_envs parameters --- tests/multiprocessing_smoke_tests.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/multiprocessing_smoke_tests.py b/tests/multiprocessing_smoke_tests.py index 364a7172..ddd57b0b 100644 --- a/tests/multiprocessing_smoke_tests.py +++ b/tests/multiprocessing_smoke_tests.py @@ -31,7 +31,7 @@ "num_processes": N_ENVS, "num_steps": 2048, "robust_plr": True, - "eval_envs": gym.vector.AsyncVectorEnv([make_env(args.env_id, args.seed + i) for i in range(args.num_envs)]), + "eval_envs": [gym.vector.AsyncVectorEnv([create_nethack_env for i in range(N_ENVS)])], "action_value_fn": get_action_value }), (SimpleBoxCurriculum, create_cartpole_env, (cartpole_env.task_space,), {}), From 75697e53c1eb80152efe72e01f9473e45e120eb4 Mon Sep 17 00:00:00 2001 From: Ameen Ur Rehman Date: Mon, 22 Apr 2024 22:08:51 +0530 Subject: [PATCH 13/26] updating the task_sampler to fix issues --- syllabus/curricula/plr/task_sampler.py | 59 ++++++++++++-------------- tests/multiprocessing_smoke_tests.py | 2 +- 2 files changed, 29 insertions(+), 32 deletions(-) diff --git a/syllabus/curricula/plr/task_sampler.py b/syllabus/curricula/plr/task_sampler.py index 7239cc5f..f5e85473 100644 --- a/syllabus/curricula/plr/task_sampler.py +++ b/syllabus/curricula/plr/task_sampler.py @@ -217,22 +217,27 @@ def _sample_unseen_level(self): def compute_returns(self, gamma, gae_lambda, rewards, value_preds, masks): assert self.requires_value_buffers, "Selected strategy does not use compute_rewards." gae = 0 - returns = torch.zeros_like(rewards) - for step in reversed(range(rewards.size(0))): - delta = ( - rewards[step] - + gamma * value_preds[step + 1] * masks[step + 1] - - value_preds[step] - ) - gae = delta + gamma * gae_lambda * masks[step + 1] * gae - returns[step] = gae + value_preds[step] + value_preds_tensor = torch.tensor(value_preds, dtype=torch.float64) + returns = torch.zeros_like(value_preds_tensor, dtype=torch.float64) # Initialize with the correct size + for step in reversed(range(len(rewards))): + if step + 1 < len(value_preds) and step + 1 < len(masks): + delta = ( + rewards[step] + + gamma * value_preds_tensor[step + 1] * masks[step + 1] + - value_preds_tensor[step] + ) + gae = delta + gamma * gae_lambda * masks[step + 1] * gae + else: + delta = rewards[step] - value_preds_tensor[step] + gae = delta + returns[step] = gae + value_preds_tensor[step] return returns def evaluate_task(self, task, env, action_value_fn, gamma, gae_lambda): if env is None: raise ValueError("Environment object is None. Please ensure it is properly initialized.") - obs = env.reset(next_task=task) + obs = env.reset(new_task=task) done = False rewards = [] masks = [] @@ -240,6 +245,12 @@ def evaluate_task(self, task, env, action_value_fn, gamma, gae_lambda): while not done: action, value = action_value_fn(obs) + + if isinstance(action, np.ndarray): + action = int(action[0]) + else: + action = int(action) + obs, rew, term, trunc, info = env.step(action) rewards.append(rew) @@ -269,7 +280,7 @@ def _evaluate_unseen_level(self): self.update_with_episode_data(episode_data, self._average_gae) # Update task scores return task - def sample(self, strategy=None): + def sample(self, strategy=None, score_function=None): if not strategy: strategy = self.strategy @@ -295,7 +306,7 @@ def sample(self, strategy=None): # Otherwise, evaluate a new level if self.robust_plr: - self.update_with_episode_data(self._evaluate_unseen_level()) + self.update_with_episode_data(self._evaluate_unseen_level(), score_function) return self.sample(strategy=strategy) else: # Otherwise, sample a new level @@ -307,7 +318,7 @@ def sample(self, strategy=None): return self._sample_replay_level() else: if self.robust_plr: - self.update_with_episode_data(self._evaluate_unseen_level()) + return self.update_with_episode_data(self._evaluate_unseen_level(), score_function) return self.sample(strategy=strategy) else: return self._sample_unseen_level() @@ -317,13 +328,9 @@ def sample(self, strategy=None): f"Unsupported replay schedule: {self.replay_schedule}. Must be 'fixed' or 'proportionate'.") def update_with_episode_data(self, episode_data, score_function): - def compute_episode_variables(episode_data): - tasks = episode_data['tasks'] - done = ~(episode_data['masks'] > 0) - total_steps, num_actors = episode_data['tasks'].shape[:2] - return tasks, done, total_steps, num_actors - - tasks, done, total_steps, num_actors = compute_episode_variables(episode_data) + tasks = np.array(episode_data['tasks']) # Convert to numpy array + done = ~(np.array(episode_data['masks']) > 0) + total_steps, num_actors = tasks.shape[:2] if tasks.ndim >= 2 else (0, 0) for actor_index in range(num_actors): done_steps = done[:, actor_index].nonzero()[:total_steps, 0] @@ -333,7 +340,7 @@ def compute_episode_variables(episode_data): if not start_t < total_steps: break - if t == 0: # if t is 0, then this done step caused a full update of previous last cycle + if (t == 0): # if t is 0, then this done step caused a full update of previous last cycle continue task_idx_t = tasks[start_t, actor_index].item() @@ -357,16 +364,6 @@ def compute_episode_variables(episode_data): num_steps = len(episode_data['tasks'][start_t:, actor_index]) self._partial_update_task_score(actor_index, task_idx_t, score, num_steps) - def rollouts_to_episode_data(self, rollouts): - episode_data = { - "tasks": rollouts.tasks, - "returns": rollouts.returns, - "value_preds": rollouts.value_preds, - "masks": rollouts.masks - } - - return episode_data - def sample_weights(self): weights = self._score_transform(self.score_transform, self.temperature, self.task_scores) weights = weights * (1 - self.unseen_task_weights) # zero out unseen levels diff --git a/tests/multiprocessing_smoke_tests.py b/tests/multiprocessing_smoke_tests.py index ddd57b0b..00872fdf 100644 --- a/tests/multiprocessing_smoke_tests.py +++ b/tests/multiprocessing_smoke_tests.py @@ -31,7 +31,7 @@ "num_processes": N_ENVS, "num_steps": 2048, "robust_plr": True, - "eval_envs": [gym.vector.AsyncVectorEnv([create_nethack_env for i in range(N_ENVS)])], + "eval_envs": create_nethack_env(), "action_value_fn": get_action_value }), (SimpleBoxCurriculum, create_cartpole_env, (cartpole_env.task_space,), {}), From a6e2b313dfafca0c41fea09891473875f4a08abe Mon Sep 17 00:00:00 2001 From: Ameen Ur Rehman Date: Wed, 24 Apr 2024 20:57:42 +0530 Subject: [PATCH 14/26] fix errors and optimize codes --- syllabus/curricula/plr/plr_wrapper.py | 2 +- syllabus/curricula/plr/task_sampler.py | 206 +++++++++++++++++++------ 2 files changed, 163 insertions(+), 45 deletions(-) diff --git a/syllabus/curricula/plr/plr_wrapper.py b/syllabus/curricula/plr/plr_wrapper.py index 446bc06e..32000e3d 100644 --- a/syllabus/curricula/plr/plr_wrapper.py +++ b/syllabus/curricula/plr/plr_wrapper.py @@ -206,7 +206,7 @@ def __init__( get_value=null, get_action_log_dist=null, robust_plr: bool = False, # Option to use RobustPLR - eval_envs: List[gym.Env] = None, + eval_envs = None, action_value_fn = None, **curriculum_kwargs, ): diff --git a/syllabus/curricula/plr/task_sampler.py b/syllabus/curricula/plr/task_sampler.py index f5e85473..46236a76 100644 --- a/syllabus/curricula/plr/task_sampler.py +++ b/syllabus/curricula/plr/task_sampler.py @@ -47,7 +47,7 @@ def __init__( staleness_transform: str = "power", staleness_temperature: float = 1.0, robust_plr: bool = False, - eval_envs: List[gym.Env] = None, + eval_envs = None, action_value_fn=None, ): self.action_space = action_space @@ -109,6 +109,28 @@ def update_with_rollouts(self, rollouts): self._update_with_rollouts(rollouts, score_function) + def update_with_episode_data(self, episode_data): + if self.strategy == "random": + return + + # Update with a EpisodeRolloutStorage object + if self.strategy == "policy_entropy": + score_function = self._average_entropy + elif self.strategy == "least_confidence": + score_function = self._average_least_confidence + elif self.strategy == "min_margin": + score_function = self._average_min_margin + elif self.strategy == "gae": + score_function = self._average_gae + elif self.strategy == "value_l1": + score_function = self._average_value_l1 + elif self.strategy == "one_step_td_error": + score_function = self._one_step_td_error + else: + raise ValueError(f"Unsupported strategy, {self.strategy}") + + self._update_with_episode_data(episode_data, score_function) + def update_task_score(self, actor_index, task_idx, score, num_steps): score = self._partial_update_task_score(actor_index, task_idx, score, num_steps, done=True) @@ -179,6 +201,67 @@ def _one_step_td_error(self, **kwargs): def requires_value_buffers(self): return self.strategy in ["gae", "value_l1", "one_step_td_error"] + def _update_with_rollouts(self, rollouts, score_function): + tasks = rollouts.tasks + if not self.requires_value_buffers: + policy_logits = rollouts.action_log_dist + done = ~(rollouts.masks > 0) + total_steps, num_actors = rollouts.tasks.shape[:2] + + for actor_index in range(num_actors): + done_steps = done[:, actor_index].nonzero()[:total_steps, 0] + start_t = 0 + + for t in done_steps: + if not start_t < total_steps: + break + + if (t == 0): # if t is 0, then this done step caused a full update of previous last cycle + continue + + # If there is only 1 step, we can't calculate the one-step td error + if self.strategy == "one_step_td_error" and t - start_t <= 1: + continue + + task_idx_t = tasks[start_t, actor_index].item() + + # Store kwargs for score function + score_function_kwargs = {} + if self.requires_value_buffers: + score_function_kwargs["returns"] = rollouts.returns[start_t:t, actor_index] + score_function_kwargs["rewards"] = rollouts.rewards[start_t:t, actor_index] + score_function_kwargs["value_preds"] = rollouts.value_preds[start_t:t, actor_index] + else: + episode_logits = policy_logits[start_t:t, actor_index] + score_function_kwargs["episode_logits"] = torch.log_softmax(episode_logits, -1) + score = score_function(**score_function_kwargs) + num_steps = len(rollouts.tasks[start_t:t, actor_index]) + # TODO: Check that task_idx_t is correct + self.update_task_score(actor_index, task_idx_t, score, num_steps) + + start_t = t.item() + if start_t < total_steps: + # If there is only 1 step, we can't calculate the one-step td error + if self.strategy == "one_step_td_error" and start_t == total_steps - 1: + continue + # TODO: Check this too + task_idx_t = tasks[start_t, actor_index].item() + + # Store kwargs for score function + score_function_kwargs = {} + if self.requires_value_buffers: + score_function_kwargs["returns"] = rollouts.returns[start_t:, actor_index] + score_function_kwargs["rewards"] = rollouts.rewards[start_t:, actor_index] + score_function_kwargs["value_preds"] = rollouts.value_preds[start_t:, actor_index] + else: + episode_logits = policy_logits[start_t:, actor_index] + score_function_kwargs["episode_logits"] = torch.log_softmax(episode_logits, -1) + + score = score_function(**score_function_kwargs) + self._last_score = score + num_steps = len(rollouts.tasks[start_t:, actor_index]) + self._partial_update_task_score(actor_index, task_idx_t, score, num_steps) + def after_update(self): # Reset partial updates, since weights have changed, and thus logits are now stale for actor_index in range(self.partial_task_scores.shape[0]): @@ -214,26 +297,49 @@ def _sample_unseen_level(self): return task - def compute_returns(self, gamma, gae_lambda, rewards, value_preds, masks): + def _evaluate_unseen_level(self): + sample_weights = self.unseen_task_weights / self.unseen_task_weights.sum() + task_idx = np.random.choice(range(self.num_tasks), 1, p=sample_weights)[0] + task = self.tasks[task_idx] + + episode_data = self.evaluate_task(task, self.eval_envs, self.action_value_fn) + self.update_with_episode_data(episode_data) + + self._update_staleness(task_idx) + + return task + + def compute_returns(self, rewards, values, masks, gamma, gae_lambda): assert self.requires_value_buffers, "Selected strategy does not use compute_rewards." + + if isinstance(rewards, float): + rewards = [np.array(rewards)] + + num_steps = len(rewards) gae = 0 - value_preds_tensor = torch.tensor(value_preds, dtype=torch.float64) - returns = torch.zeros_like(value_preds_tensor, dtype=torch.float64) # Initialize with the correct size - for step in reversed(range(len(rewards))): - if step + 1 < len(value_preds) and step + 1 < len(masks): - delta = ( - rewards[step] - + gamma * value_preds_tensor[step + 1] * masks[step + 1] - - value_preds_tensor[step] - ) - gae = delta + gamma * gae_lambda * masks[step + 1] * gae - else: - delta = rewards[step] - value_preds_tensor[step] + returns = np.zeros_like(rewards) + for step in reversed(range(num_steps)): + # Check if we are at the last step + if step == num_steps - 1: + delta = (rewards[step] - values[step]) gae = delta - returns[step] = gae + value_preds_tensor[step] + else: + next_value = values[step + 1] if step + 1 < num_steps else 0 + next_mask = masks[step + 1] if step + 1 < num_steps else 0 + delta = (rewards[step] + + gamma * next_value * next_mask + - values[step] + ) + gae = delta + gamma * gae_lambda * next_mask * gae + + gae_scal = gae[0] if isinstance(gae, np.ndarray) else gae + value_scal = values[step][0] if isinstance(values[step], np.ndarray) else values[step] + + returns[step] = gae_scal + value_scal + return returns - def evaluate_task(self, task, env, action_value_fn, gamma, gae_lambda): + def evaluate_task(self, task, env, action_value_fn): if env is None: raise ValueError("Environment object is None. Please ensure it is properly initialized.") @@ -261,26 +367,17 @@ def evaluate_task(self, task, env, action_value_fn, gamma, gae_lambda): if term or trunc: done = True - # Compute returns after the episode is complete - returns = self.compute_returns(gamma, gae_lambda, rewards, values, masks) + returns = self.compute_returns(rewards, values, masks, self.gamma, self.gae_lambda) return { "tasks": task, "masks": masks, "rewards": rewards, - "value_preds": values, + "values": values, "returns": returns } - def _evaluate_unseen_level(self): - task_idx = \ - np.random.choice(range(self.num_tasks), 1, p=self.unseen_task_weights / self.unseen_task_weights.sum())[0] - task = self.tasks[task_idx] - episode_data = self.evaluate_task(task, self.eval_envs, self.action_value_fn, self.gamma, self.gae_lambda) - self.update_with_episode_data(episode_data, self._average_gae) # Update task scores - return task - - def sample(self, strategy=None, score_function=None): + def sample(self, strategy=None): if not strategy: strategy = self.strategy @@ -304,33 +401,32 @@ def sample(self, strategy=None, score_function=None): if np.random.rand() > self.nu or not proportion_seen < 1.0: return self._sample_replay_level() - # Otherwise, evaluate a new level + # Otherwise, sample a new level if self.robust_plr: - self.update_with_episode_data(self._evaluate_unseen_level(), score_function) + self.update_with_episode_data(self._evaluate_unseen_level()) return self.sample(strategy=strategy) else: - # Otherwise, sample a new level return self._sample_unseen_level() - elif self.replay_schedule == "proportionate": if proportion_seen >= self.rho and np.random.rand() < proportion_seen: return self._sample_replay_level() else: if self.robust_plr: - return self.update_with_episode_data(self._evaluate_unseen_level(), score_function) + self.update_with_episode_data(self._evaluate_unseen_level()) return self.sample(strategy=strategy) else: return self._sample_unseen_level() - else: raise NotImplementedError( f"Unsupported replay schedule: {self.replay_schedule}. Must be 'fixed' or 'proportionate'.") - def update_with_episode_data(self, episode_data, score_function): - tasks = np.array(episode_data['tasks']) # Convert to numpy array - done = ~(np.array(episode_data['masks']) > 0) - total_steps, num_actors = tasks.shape[:2] if tasks.ndim >= 2 else (0, 0) + def _update_with_episode_data(self, episode_data, score_function): + tasks = np.array(episode_data["tasks"]) + if not self.requires_value_buffers: + policy_logits = episode_data.action_log_dist + done = np.array([not mask > 0 for mask in episode_data["masks"]]) + total_steps, num_actors = tasks.shape[:2] for actor_index in range(num_actors): done_steps = done[:, actor_index].nonzero()[:total_steps, 0] @@ -343,25 +439,47 @@ def update_with_episode_data(self, episode_data, score_function): if (t == 0): # if t is 0, then this done step caused a full update of previous last cycle continue + # If there is only 1 step, we can't calculate the one-step td error + if self.strategy == "one_step_td_error" and t - start_t <= 1: + continue + task_idx_t = tasks[start_t, actor_index].item() + # Store kwargs for score function score_function_kwargs = {} - score_function_kwargs["returns"] = episode_data['returns'][start_t:t, actor_index] - score_function_kwargs["value_preds"] = episode_data['value_preds'][start_t:t, actor_index] + if self.requires_value_buffers: + score_function_kwargs["returns"] = episode_data.returns[start_t:t, actor_index] + score_function_kwargs["rewards"] = episode_data.rewards[start_t:t, actor_index] + score_function_kwargs["values"] = episode_data.values[start_t:t, actor_index] + else: + episode_logits = policy_logits[start_t:t, actor_index] + score_function_kwargs["episode_logits"] = torch.log_softmax(episode_logits, -1) score = score_function(**score_function_kwargs) - num_steps = len(episode_data['tasks'][start_t:t, actor_index]) + num_steps = len(episode_data.tasks[start_t:t, actor_index]) + # TODO: Check that task_idx_t is correct self.update_task_score(actor_index, task_idx_t, score, num_steps) start_t = t.item() if start_t < total_steps: + # If there is only 1 step, we can't calculate the one-step td error + if self.strategy == "one_step_td_error" and start_t == total_steps - 1: + continue + # TODO: Check this too task_idx_t = tasks[start_t, actor_index].item() + # Store kwargs for score function score_function_kwargs = {} - score_function_kwargs["returns"] = episode_data['returns'][start_t:, actor_index] - score_function_kwargs["value_preds"] = episode_data['value_preds'][start_t:, actor_index] + if self.requires_value_buffers: + score_function_kwargs["returns"] = episode_data.returns[start_t:, actor_index] + score_function_kwargs["rewards"] = episode_data.rewards[start_t:, actor_index] + score_function_kwargs["values"] = episode_data.values[start_t:, actor_index] + else: + episode_logits = policy_logits[start_t:, actor_index] + score_function_kwargs["episode_logits"] = torch.log_softmax(episode_logits, -1) + score = score_function(**score_function_kwargs) self._last_score = score - num_steps = len(episode_data['tasks'][start_t:, actor_index]) + num_steps = len(episode_data.tasks[start_t:, actor_index]) self._partial_update_task_score(actor_index, task_idx_t, score, num_steps) def sample_weights(self): From c6b7ade4a716d4b6bb1709d82e93aef937be0434 Mon Sep 17 00:00:00 2001 From: Ameen Ur Rehman Date: Fri, 26 Apr 2024 01:39:23 +0530 Subject: [PATCH 15/26] added other parameters for Task Sampler initialization --- syllabus/curricula/plr/plr_wrapper.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/syllabus/curricula/plr/plr_wrapper.py b/syllabus/curricula/plr/plr_wrapper.py index 32000e3d..ff6fbb6f 100644 --- a/syllabus/curricula/plr/plr_wrapper.py +++ b/syllabus/curricula/plr/plr_wrapper.py @@ -235,7 +235,7 @@ def __init__( self._eval_envs = eval_envs self.action_value_fn = action_value_fn - self._task_sampler = TaskSampler(self.tasks, action_space=action_space, robust_plr=robust_plr, eval_envs=eval_envs, action_value_fn = action_value_fn, **task_sampler_kwargs_dict) + self._task_sampler = TaskSampler(self.tasks, action_space=action_space, robust_plr=robust_plr, eval_envs=eval_envs, action_value_fn = action_value_fn, get_value = get_value, observation_space = observation_space, **task_sampler_kwargs_dict) self._rollouts = RolloutStorage( self._num_steps, From 16fd2dce37e9d8ccdb29f3c12a3881e360fd9157 Mon Sep 17 00:00:00 2001 From: Ameen Ur Rehman Date: Fri, 26 Apr 2024 01:42:46 +0530 Subject: [PATCH 16/26] initialisation for get_value_fn --- syllabus/tests/utils.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/syllabus/tests/utils.py b/syllabus/tests/utils.py index 4b12c20d..92bad0df 100644 --- a/syllabus/tests/utils.py +++ b/syllabus/tests/utils.py @@ -186,13 +186,14 @@ def run_ray_multiprocess(env_fn, env_args=(), env_kwargs={}, curriculum=None, nu ray.kill(curriculum.curriculum) return ray_speed + def get_test_values(x): return torch.unsqueeze(torch.Tensor(np.array([0] * len(x))), -1) + def get_action_value(obs): - actions = np.zeros(len(obs)) - values = np.zeros(len(obs)) - return actions, values + return 0,0 + # Sync Test Environment def create_synctest_env(*args, type=None, env_args=(), env_kwargs={}, **kwargs): From 91d7060f54fd8a0261544acf3b9f67207d0b153c Mon Sep 17 00:00:00 2001 From: Ameen Ur Rehman Date: Fri, 26 Apr 2024 01:43:44 +0530 Subject: [PATCH 17/26] added rollout class for task_sampler and fixed the GAE. --- syllabus/curricula/plr/task_sampler.py | 219 ++++++++++++++++++++----- 1 file changed, 179 insertions(+), 40 deletions(-) diff --git a/syllabus/curricula/plr/task_sampler.py b/syllabus/curricula/plr/task_sampler.py index 46236a76..94983038 100644 --- a/syllabus/curricula/plr/task_sampler.py +++ b/syllabus/curricula/plr/task_sampler.py @@ -4,8 +4,167 @@ import gymnasium as gym import numpy as np import torch +import warnings from typing import List +from syllabus.core import Curriculum, UsageError, enumerate_axes + + +class RolloutStorage(object): + def __init__( + self, + num_steps: int, + num_processes: int, + requires_value_buffers: bool, + observation_space: gym.Space, + action_space: gym.Space = None, + get_value=None, + ): + self.num_steps = num_steps + self.buffer_steps = num_steps * 2 # Hack to prevent overflow from lagging updates. + self.num_processes = num_processes + self._requires_value_buffers = requires_value_buffers + self._get_value = get_value + self.tasks = torch.zeros(self.buffer_steps, num_processes, 1, dtype=torch.int) + self.masks = torch.ones(self.buffer_steps + 1, num_processes, 1) + self.obs = [[[0] for _ in range(self.num_processes)]] * self.buffer_steps + self._fill = torch.zeros(self.buffer_steps, num_processes, 1) + self.env_steps = [0] * num_processes + self.should_update = False + + if requires_value_buffers: + self.returns = torch.zeros(self.buffer_steps + 1, num_processes, 1) + self.rewards = torch.zeros(self.buffer_steps, num_processes, 1) + self.value_preds = torch.zeros(self.buffer_steps + 1, num_processes, 1) + else: + if action_space is None: + raise ValueError( + "Action space must be provided to PLR for strategies 'policy_entropy', 'least_confidence', 'min_margin'" + ) + self.action_log_dist = torch.zeros(self.buffer_steps, num_processes, action_space.n) + + self.num_steps = num_steps + self.step = 0 + + def to(self, device): + self.masks = self.masks.to(device) + self.tasks = self.tasks.to(device) + self._fill = self._fill.to(device) + if self._requires_value_buffers: + self.rewards = self.rewards.to(device) + self.value_preds = self.value_preds.to(device) + self.returns = self.returns.to(device) + else: + self.action_log_dist = self.action_log_dist.to(device) + + def insert(self, masks, action_log_dist=None, value_preds=None, rewards=None, tasks=None): + if self._requires_value_buffers: + assert (value_preds is not None and rewards is not None), "Selected strategy requires value_preds and rewards" + if len(rewards.shape) == 3: + rewards = rewards.squeeze(2) + self.value_preds[self.step].copy_(torch.as_tensor(value_preds)) + self.rewards[self.step].copy_(torch.as_tensor(rewards)[:, None]) + self.masks[self.step + 1].copy_(torch.as_tensor(masks)[:, None]) + else: + self.action_log_dist[self.step].copy_(action_log_dist) + if tasks is not None: + assert isinstance(tasks[0], int), "Provided task must be an integer" + self.tasks[self.step].copy_(torch.as_tensor(tasks)[:, None]) + self.step = (self.step + 1) % self.num_steps + + def insert_at_index(self, env_index, mask=None, action_log_dist=None, obs=None, reward=None, task=None, steps=1): + if env_index >= self.num_processes: + warnings.warn(f"Env index {env_index} is greater than the number of processes {self.num_processes}. Using index {env_index % self.num_processes} instead.") + env_index = env_index % self.num_processes + + step = self.env_steps[env_index] + end_step = step + steps + # Update buffer fill traacker, and check for common usage errors. + try: + if end_step > len(self._fill): + raise IndexError + self._fill[step:end_step, env_index] = 1 + except IndexError as e: + if any(self._fill[:][env_index] == 0): + raise UsageError(f"Step {step} + {steps} = {end_step} is out of range for env index {env_index}. Your value for PLR's num_processes may be too high.") from e + else: + raise UsageError(f"Step {step} + {steps} = {end_step} is out of range for env index {env_index}. Your value for PLR's num_processes may be too low.") from e + + if mask is not None: + self.masks[step + 1:end_step + 1, env_index].copy_(torch.as_tensor(mask[:, None])) + if obs is not None: + for s in range(step, end_step): + self.obs[s][env_index] = obs[s - step] + if reward is not None: + self.rewards[step:end_step, env_index].copy_(torch.as_tensor(reward[:, None])) + if action_log_dist is not None: + self.action_log_dist[step:end_step, env_index].copy_(torch.as_tensor(action_log_dist[:, None])) + if task is not None: + try: + task = int(task) + except TypeError: + assert isinstance(task, int), f"Provided task must be an integer, got {task} with type {type(task)} instead." + self.tasks[step:end_step, env_index].copy_(torch.as_tensor(task)) + else: + self.env_steps[env_index] += steps + # Hack for now, we call insert_at_index twice + while all(self._fill[self.step] == 1): + self.step = (self.step + 1) % self.buffer_steps + # Check if we have enough steps to compute a task sampler update + if self.step == self.num_steps + 1: + self.should_update = True + + def _get_values(self): + if self._get_value is None: + raise UsageError("Selected strategy requires value predictions. Please provide get_value function.") + for step in range(self.num_steps): + values = self._get_value(self.obs[step]) + if len(values.shape) == 3: + warnings.warn(f"Value function returned a 3D tensor of shape {values.shape}. Attempting to squeeze last dimension.") + values = torch.squeeze(values, -1) + if len(values.shape) == 1: + warnings.warn(f"Value function returned a 1D tensor of shape {values.shape}. Attempting to unsqueeze last dimension.") + values = torch.unsqueeze(values, -1) + self.value_preds[step].copy_(values) + + def after_update(self): + # After consuming the first num_steps of data, remove them and shift the remaining data in the buffer + self.tasks[0: self.num_steps].copy_(self.tasks[self.num_steps: self.buffer_steps]) + self.masks[0: self.num_steps].copy_(self.masks[self.num_steps: self.buffer_steps]) + self.obs[0: self.num_steps][:] = self.obs[self.num_steps: self.buffer_steps][:] + + if self._requires_value_buffers: + self.returns[0: self.num_steps].copy_(self.returns[self.num_steps: self.buffer_steps]) + self.rewards[0: self.num_steps].copy_(self.rewards[self.num_steps: self.buffer_steps]) + self.value_preds[0: self.num_steps].copy_(self.value_preds[self.num_steps: self.buffer_steps]) + else: + self.action_log_dist[0: self.num_steps].copy_(self.action_log_dist[self.num_steps: self.buffer_steps]) + + self._fill[0: self.num_steps].copy_(self._fill[self.num_steps: self.buffer_steps]) + self._fill[self.num_steps: self.buffer_steps].copy_(0) + + self.env_steps = [steps - self.num_steps for steps in self.env_steps] + self.should_update = False + self.step = self.step - self.num_steps + + def compute_returns(self, rewards, values, masks, gamma, gae_lambda): + assert self._requires_value_buffers, "Selected strategy does not use compute_rewards." + self._get_values() + returns = np.zeros_like(rewards) + gae = 0 + for step in reversed(range(self.num_steps)): + delta = ( + rewards[step] + + gamma * values[step + 1] * masks[step + 1] + - values[step] + ) + gae = delta + gamma * gae_lambda * masks[step + 1] * gae + returns[step] = gae + values[step] + + +def null(x): + return None + class TaskSampler: """ Task sampler for Prioritized Level Replay (PLR) @@ -41,6 +200,8 @@ def __init__( rho: float = 1.0, nu: float = 0.5, alpha: float = 1.0, + num_steps: int = 256, + num_processes: int = 1, gamma: float = 0.999, gae_lambda: float = 0.95, staleness_coef: float = 0.1, @@ -49,6 +210,8 @@ def __init__( robust_plr: bool = False, eval_envs = None, action_value_fn=None, + get_value=None, + observation_space = None ): self.action_space = action_space self.tasks = tasks @@ -70,6 +233,10 @@ def __init__( self.robust_plr = robust_plr self.eval_envs = eval_envs self.action_value_fn = action_value_fn + self.num_steps = num_steps + self.num_processes = num_processes + self._get_values = get_value + self.observation_space = observation_space self.unseen_task_weights = np.array([1.0] * self.num_tasks) self.task_scores = np.array([0.0] * self.num_tasks, dtype=float) @@ -77,6 +244,15 @@ def __init__( self.partial_task_steps = np.zeros((num_actors, self.num_tasks), dtype=np.int64) self.task_staleness = np.array([0.0] * self.num_tasks, dtype=float) + self._robust_rollouts = RolloutStorage( + self.num_steps, + self.num_processes, + self.requires_value_buffers, + self.observation_space, + action_space=action_space, + get_value=get_value if get_value is not None else null, + ) + self.next_task_index = 0 # Only used for sequential strategy # Logging metrics @@ -307,38 +483,6 @@ def _evaluate_unseen_level(self): self._update_staleness(task_idx) - return task - - def compute_returns(self, rewards, values, masks, gamma, gae_lambda): - assert self.requires_value_buffers, "Selected strategy does not use compute_rewards." - - if isinstance(rewards, float): - rewards = [np.array(rewards)] - - num_steps = len(rewards) - gae = 0 - returns = np.zeros_like(rewards) - for step in reversed(range(num_steps)): - # Check if we are at the last step - if step == num_steps - 1: - delta = (rewards[step] - values[step]) - gae = delta - else: - next_value = values[step + 1] if step + 1 < num_steps else 0 - next_mask = masks[step + 1] if step + 1 < num_steps else 0 - delta = (rewards[step] - + gamma * next_value * next_mask - - values[step] - ) - gae = delta + gamma * gae_lambda * next_mask * gae - - gae_scal = gae[0] if isinstance(gae, np.ndarray) else gae - value_scal = values[step][0] if isinstance(values[step], np.ndarray) else values[step] - - returns[step] = gae_scal + value_scal - - return returns - def evaluate_task(self, task, env, action_value_fn): if env is None: raise ValueError("Environment object is None. Please ensure it is properly initialized.") @@ -352,11 +496,6 @@ def evaluate_task(self, task, env, action_value_fn): while not done: action, value = action_value_fn(obs) - if isinstance(action, np.ndarray): - action = int(action[0]) - else: - action = int(action) - obs, rew, term, trunc, info = env.step(action) rewards.append(rew) @@ -367,7 +506,7 @@ def evaluate_task(self, task, env, action_value_fn): if term or trunc: done = True - returns = self.compute_returns(rewards, values, masks, self.gamma, self.gae_lambda) + returns = self._robust_rollouts.compute_returns(rewards, values, masks, self.gamma, self.gae_lambda) return { "tasks": task, @@ -403,7 +542,7 @@ def sample(self, strategy=None): # Otherwise, sample a new level if self.robust_plr: - self.update_with_episode_data(self._evaluate_unseen_level()) + self._evaluate_unseen_level() return self.sample(strategy=strategy) else: return self._sample_unseen_level() @@ -413,7 +552,7 @@ def sample(self, strategy=None): return self._sample_replay_level() else: if self.robust_plr: - self.update_with_episode_data(self._evaluate_unseen_level()) + self._evaluate_unseen_level() return self.sample(strategy=strategy) else: return self._sample_unseen_level() From e40c3ad3de9c169c8eb7bf14ac4c66940aa7eaaf Mon Sep 17 00:00:00 2001 From: Ameen Ur Rehman Date: Fri, 26 Apr 2024 23:25:46 +0530 Subject: [PATCH 18/26] updated the task_Sampler to fix errors. --- syllabus/curricula/plr/task_sampler.py | 38 ++++++++++++++------------ 1 file changed, 21 insertions(+), 17 deletions(-) diff --git a/syllabus/curricula/plr/task_sampler.py b/syllabus/curricula/plr/task_sampler.py index 94983038..da81ae5b 100644 --- a/syllabus/curricula/plr/task_sampler.py +++ b/syllabus/curricula/plr/task_sampler.py @@ -161,6 +161,8 @@ def compute_returns(self, rewards, values, masks, gamma, gae_lambda): gae = delta + gamma * gae_lambda * masks[step + 1] * gae returns[step] = gae + values[step] + return returns + def null(x): return None @@ -350,17 +352,17 @@ def _average_gae(self, **kwargs): returns = kwargs["returns"] value_preds = kwargs["value_preds"] - advantages = returns - value_preds + advantages = np.abs(returns - value_preds) - return advantages.mean().item() + return np.mean(advantages).item() def _average_value_l1(self, **kwargs): returns = kwargs["returns"] value_preds = kwargs["value_preds"] - advantages = returns - value_preds + advantages = np.abs(returns - value_preds) - return advantages.abs().mean().item() + return np.mean(advantages).item() def _one_step_td_error(self, **kwargs): rewards = kwargs["rewards"] @@ -476,7 +478,8 @@ def _sample_unseen_level(self): def _evaluate_unseen_level(self): sample_weights = self.unseen_task_weights / self.unseen_task_weights.sum() task_idx = np.random.choice(range(self.num_tasks), 1, p=sample_weights)[0] - task = self.tasks[task_idx] + tasks = np.array(self.tasks)[:, None, None] # Reshape tasks before indexing + task = tasks[task_idx] episode_data = self.evaluate_task(task, self.eval_envs, self.action_value_fn) self.update_with_episode_data(episode_data) @@ -512,7 +515,7 @@ def evaluate_task(self, task, env, action_value_fn): "tasks": task, "masks": masks, "rewards": rewards, - "values": values, + "value_preds": values, "returns": returns } @@ -561,14 +564,15 @@ def sample(self, strategy=None): f"Unsupported replay schedule: {self.replay_schedule}. Must be 'fixed' or 'proportionate'.") def _update_with_episode_data(self, episode_data, score_function): - tasks = np.array(episode_data["tasks"]) + tasks = episode_data["tasks"] if not self.requires_value_buffers: policy_logits = episode_data.action_log_dist done = np.array([not mask > 0 for mask in episode_data["masks"]]) + total_steps, num_actors = tasks.shape[:2] for actor_index in range(num_actors): - done_steps = done[:, actor_index].nonzero()[:total_steps, 0] + done_steps = done.nonzero()[0][:total_steps] start_t = 0 for t in done_steps: @@ -582,19 +586,19 @@ def _update_with_episode_data(self, episode_data, score_function): if self.strategy == "one_step_td_error" and t - start_t <= 1: continue - task_idx_t = tasks[start_t, actor_index].item() + task_idx_t = np.array(tasks[start_t, actor_index]).item() # Store kwargs for score function score_function_kwargs = {} if self.requires_value_buffers: - score_function_kwargs["returns"] = episode_data.returns[start_t:t, actor_index] - score_function_kwargs["rewards"] = episode_data.rewards[start_t:t, actor_index] - score_function_kwargs["values"] = episode_data.values[start_t:t, actor_index] + score_function_kwargs["returns"] = episode_data["returns"][start_t:t] + score_function_kwargs["rewards"] = episode_data["rewards"][start_t:t] + score_function_kwargs["value_preds"] = episode_data["value_preds"][start_t:t] else: episode_logits = policy_logits[start_t:t, actor_index] score_function_kwargs["episode_logits"] = torch.log_softmax(episode_logits, -1) score = score_function(**score_function_kwargs) - num_steps = len(episode_data.tasks[start_t:t, actor_index]) + num_steps = len(episode_data["tasks"][start_t:t, actor_index]) # TODO: Check that task_idx_t is correct self.update_task_score(actor_index, task_idx_t, score, num_steps) @@ -609,16 +613,16 @@ def _update_with_episode_data(self, episode_data, score_function): # Store kwargs for score function score_function_kwargs = {} if self.requires_value_buffers: - score_function_kwargs["returns"] = episode_data.returns[start_t:, actor_index] - score_function_kwargs["rewards"] = episode_data.rewards[start_t:, actor_index] - score_function_kwargs["values"] = episode_data.values[start_t:, actor_index] + score_function_kwargs["returns"] = episode_data["returns"][start_t:, actor_index] + score_function_kwargs["rewards"] = episode_data["rewards"][start_t:, actor_index] + score_function_kwargs["value_preds"] = episode_data["value_preds"][start_t:, actor_index] else: episode_logits = policy_logits[start_t:, actor_index] score_function_kwargs["episode_logits"] = torch.log_softmax(episode_logits, -1) score = score_function(**score_function_kwargs) self._last_score = score - num_steps = len(episode_data.tasks[start_t:, actor_index]) + num_steps = len(episode_data["tasks"][start_t:, actor_index]) self._partial_update_task_score(actor_index, task_idx_t, score, num_steps) def sample_weights(self): From 4a1d9dc0bf594e986548f881fda3b8eb70d1fa63 Mon Sep 17 00:00:00 2001 From: Ryan Sullivan Date: Sun, 28 Apr 2024 01:06:26 -0400 Subject: [PATCH 19/26] Fix Robust PLR --- syllabus/curricula/plr/plr_wrapper.py | 3 + syllabus/curricula/plr/task_sampler.py | 196 ++---------------- .../training_scripts/cleanrl_procgen_plr.py | 39 ++-- 3 files changed, 52 insertions(+), 186 deletions(-) diff --git a/syllabus/curricula/plr/plr_wrapper.py b/syllabus/curricula/plr/plr_wrapper.py index ff6fbb6f..fa347457 100644 --- a/syllabus/curricula/plr/plr_wrapper.py +++ b/syllabus/curricula/plr/plr_wrapper.py @@ -224,6 +224,9 @@ def __init__( task_sampler_kwargs_dict["num_actors"] = num_processes super().__init__(task_space, *curriculum_args, **curriculum_kwargs) + if robust_plr and eval_envs is None: + raise UsageError("RobustPLR requires evaluation environments to be provided.") + self._num_steps = num_steps # Number of steps stored in rollouts and used to update task sampler self._num_processes = num_processes # Number of parallel environments self._gamma = gamma diff --git a/syllabus/curricula/plr/task_sampler.py b/syllabus/curricula/plr/task_sampler.py index da81ae5b..ed70c9e8 100644 --- a/syllabus/curricula/plr/task_sampler.py +++ b/syllabus/curricula/plr/task_sampler.py @@ -8,160 +8,7 @@ from typing import List from syllabus.core import Curriculum, UsageError, enumerate_axes - - -class RolloutStorage(object): - def __init__( - self, - num_steps: int, - num_processes: int, - requires_value_buffers: bool, - observation_space: gym.Space, - action_space: gym.Space = None, - get_value=None, - ): - self.num_steps = num_steps - self.buffer_steps = num_steps * 2 # Hack to prevent overflow from lagging updates. - self.num_processes = num_processes - self._requires_value_buffers = requires_value_buffers - self._get_value = get_value - self.tasks = torch.zeros(self.buffer_steps, num_processes, 1, dtype=torch.int) - self.masks = torch.ones(self.buffer_steps + 1, num_processes, 1) - self.obs = [[[0] for _ in range(self.num_processes)]] * self.buffer_steps - self._fill = torch.zeros(self.buffer_steps, num_processes, 1) - self.env_steps = [0] * num_processes - self.should_update = False - - if requires_value_buffers: - self.returns = torch.zeros(self.buffer_steps + 1, num_processes, 1) - self.rewards = torch.zeros(self.buffer_steps, num_processes, 1) - self.value_preds = torch.zeros(self.buffer_steps + 1, num_processes, 1) - else: - if action_space is None: - raise ValueError( - "Action space must be provided to PLR for strategies 'policy_entropy', 'least_confidence', 'min_margin'" - ) - self.action_log_dist = torch.zeros(self.buffer_steps, num_processes, action_space.n) - - self.num_steps = num_steps - self.step = 0 - - def to(self, device): - self.masks = self.masks.to(device) - self.tasks = self.tasks.to(device) - self._fill = self._fill.to(device) - if self._requires_value_buffers: - self.rewards = self.rewards.to(device) - self.value_preds = self.value_preds.to(device) - self.returns = self.returns.to(device) - else: - self.action_log_dist = self.action_log_dist.to(device) - - def insert(self, masks, action_log_dist=None, value_preds=None, rewards=None, tasks=None): - if self._requires_value_buffers: - assert (value_preds is not None and rewards is not None), "Selected strategy requires value_preds and rewards" - if len(rewards.shape) == 3: - rewards = rewards.squeeze(2) - self.value_preds[self.step].copy_(torch.as_tensor(value_preds)) - self.rewards[self.step].copy_(torch.as_tensor(rewards)[:, None]) - self.masks[self.step + 1].copy_(torch.as_tensor(masks)[:, None]) - else: - self.action_log_dist[self.step].copy_(action_log_dist) - if tasks is not None: - assert isinstance(tasks[0], int), "Provided task must be an integer" - self.tasks[self.step].copy_(torch.as_tensor(tasks)[:, None]) - self.step = (self.step + 1) % self.num_steps - - def insert_at_index(self, env_index, mask=None, action_log_dist=None, obs=None, reward=None, task=None, steps=1): - if env_index >= self.num_processes: - warnings.warn(f"Env index {env_index} is greater than the number of processes {self.num_processes}. Using index {env_index % self.num_processes} instead.") - env_index = env_index % self.num_processes - - step = self.env_steps[env_index] - end_step = step + steps - # Update buffer fill traacker, and check for common usage errors. - try: - if end_step > len(self._fill): - raise IndexError - self._fill[step:end_step, env_index] = 1 - except IndexError as e: - if any(self._fill[:][env_index] == 0): - raise UsageError(f"Step {step} + {steps} = {end_step} is out of range for env index {env_index}. Your value for PLR's num_processes may be too high.") from e - else: - raise UsageError(f"Step {step} + {steps} = {end_step} is out of range for env index {env_index}. Your value for PLR's num_processes may be too low.") from e - - if mask is not None: - self.masks[step + 1:end_step + 1, env_index].copy_(torch.as_tensor(mask[:, None])) - if obs is not None: - for s in range(step, end_step): - self.obs[s][env_index] = obs[s - step] - if reward is not None: - self.rewards[step:end_step, env_index].copy_(torch.as_tensor(reward[:, None])) - if action_log_dist is not None: - self.action_log_dist[step:end_step, env_index].copy_(torch.as_tensor(action_log_dist[:, None])) - if task is not None: - try: - task = int(task) - except TypeError: - assert isinstance(task, int), f"Provided task must be an integer, got {task} with type {type(task)} instead." - self.tasks[step:end_step, env_index].copy_(torch.as_tensor(task)) - else: - self.env_steps[env_index] += steps - # Hack for now, we call insert_at_index twice - while all(self._fill[self.step] == 1): - self.step = (self.step + 1) % self.buffer_steps - # Check if we have enough steps to compute a task sampler update - if self.step == self.num_steps + 1: - self.should_update = True - - def _get_values(self): - if self._get_value is None: - raise UsageError("Selected strategy requires value predictions. Please provide get_value function.") - for step in range(self.num_steps): - values = self._get_value(self.obs[step]) - if len(values.shape) == 3: - warnings.warn(f"Value function returned a 3D tensor of shape {values.shape}. Attempting to squeeze last dimension.") - values = torch.squeeze(values, -1) - if len(values.shape) == 1: - warnings.warn(f"Value function returned a 1D tensor of shape {values.shape}. Attempting to unsqueeze last dimension.") - values = torch.unsqueeze(values, -1) - self.value_preds[step].copy_(values) - - def after_update(self): - # After consuming the first num_steps of data, remove them and shift the remaining data in the buffer - self.tasks[0: self.num_steps].copy_(self.tasks[self.num_steps: self.buffer_steps]) - self.masks[0: self.num_steps].copy_(self.masks[self.num_steps: self.buffer_steps]) - self.obs[0: self.num_steps][:] = self.obs[self.num_steps: self.buffer_steps][:] - - if self._requires_value_buffers: - self.returns[0: self.num_steps].copy_(self.returns[self.num_steps: self.buffer_steps]) - self.rewards[0: self.num_steps].copy_(self.rewards[self.num_steps: self.buffer_steps]) - self.value_preds[0: self.num_steps].copy_(self.value_preds[self.num_steps: self.buffer_steps]) - else: - self.action_log_dist[0: self.num_steps].copy_(self.action_log_dist[self.num_steps: self.buffer_steps]) - - self._fill[0: self.num_steps].copy_(self._fill[self.num_steps: self.buffer_steps]) - self._fill[self.num_steps: self.buffer_steps].copy_(0) - - self.env_steps = [steps - self.num_steps for steps in self.env_steps] - self.should_update = False - self.step = self.step - self.num_steps - - def compute_returns(self, rewards, values, masks, gamma, gae_lambda): - assert self._requires_value_buffers, "Selected strategy does not use compute_rewards." - self._get_values() - returns = np.zeros_like(rewards) - gae = 0 - for step in reversed(range(self.num_steps)): - delta = ( - rewards[step] - + gamma * values[step + 1] * masks[step + 1] - - values[step] - ) - gae = delta + gamma * gae_lambda * masks[step + 1] * gae - returns[step] = gae + values[step] - - return returns +from syllabus.curricula.plr.storage import RolloutStorage def null(x): @@ -250,9 +97,7 @@ def __init__( self.num_steps, self.num_processes, self.requires_value_buffers, - self.observation_space, action_space=action_space, - get_value=get_value if get_value is not None else null, ) self.next_task_index = 0 # Only used for sequential strategy @@ -359,8 +204,7 @@ def _average_gae(self, **kwargs): def _average_value_l1(self, **kwargs): returns = kwargs["returns"] value_preds = kwargs["value_preds"] - - advantages = np.abs(returns - value_preds) + advantages = np.asarray(np.abs(returns - value_preds)) return np.mean(advantages).item() @@ -478,7 +322,7 @@ def _sample_unseen_level(self): def _evaluate_unseen_level(self): sample_weights = self.unseen_task_weights / self.unseen_task_weights.sum() task_idx = np.random.choice(range(self.num_tasks), 1, p=sample_weights)[0] - tasks = np.array(self.tasks)[:, None, None] # Reshape tasks before indexing + tasks = np.array(self.tasks) task = tasks[task_idx] episode_data = self.evaluate_task(task, self.eval_envs, self.action_value_fn) @@ -489,34 +333,33 @@ def _evaluate_unseen_level(self): def evaluate_task(self, task, env, action_value_fn): if env is None: raise ValueError("Environment object is None. Please ensure it is properly initialized.") + print("Evaluating") - obs = env.reset(new_task=task) + obs, _ = env.reset(new_task=task) done = False - rewards = [] - masks = [] - values = [] while not done: action, value = action_value_fn(obs) - obs, rew, term, trunc, info = env.step(action) + obs, rew, term, trunc, _ = env.step(action) + + mask = torch.FloatTensor([0.0] if term or trunc else [1.0]) + self._robust_rollouts.insert(mask, value_preds=value[0], rewards=torch.Tensor([rew]), tasks=torch.Tensor([task])) - rewards.append(rew) - masks.append(not (term or trunc)) - values.append(value) # Check if the episode is done if term or trunc: done = True - returns = self._robust_rollouts.compute_returns(rewards, values, masks, self.gamma, self.gae_lambda) - + _, next_value = action_value_fn(obs) + self._robust_rollouts.compute_returns(next_value, self.gamma, self.gae_lambda) + print("Evaluated") return { - "tasks": task, - "masks": masks, - "rewards": rewards, - "value_preds": values, - "returns": returns + "tasks": self._robust_rollouts.tasks, + "masks": self._robust_rollouts.masks, + "rewards": self._robust_rollouts.rewards, + "value_preds": self._robust_rollouts.value_preds, + "returns": self._robust_rollouts.returns, } def sample(self, strategy=None): @@ -546,6 +389,8 @@ def sample(self, strategy=None): # Otherwise, sample a new level if self.robust_plr: self._evaluate_unseen_level() + self._robust_rollouts.after_update() + self.after_update() return self.sample(strategy=strategy) else: return self._sample_unseen_level() @@ -564,6 +409,7 @@ def sample(self, strategy=None): f"Unsupported replay schedule: {self.replay_schedule}. Must be 'fixed' or 'proportionate'.") def _update_with_episode_data(self, episode_data, score_function): + print("Updating") tasks = episode_data["tasks"] if not self.requires_value_buffers: policy_logits = episode_data.action_log_dist @@ -624,6 +470,8 @@ def _update_with_episode_data(self, episode_data, score_function): self._last_score = score num_steps = len(episode_data["tasks"][start_t:, actor_index]) self._partial_update_task_score(actor_index, task_idx_t, score, num_steps) + print("Updated") + def sample_weights(self): weights = self._score_transform(self.score_transform, self.temperature, self.task_scores) diff --git a/syllabus/examples/training_scripts/cleanrl_procgen_plr.py b/syllabus/examples/training_scripts/cleanrl_procgen_plr.py index e13c22ed..2dd496b8 100644 --- a/syllabus/examples/training_scripts/cleanrl_procgen_plr.py +++ b/syllabus/examples/training_scripts/cleanrl_procgen_plr.py @@ -227,14 +227,24 @@ def fast_level_replay_evaluate( return mean_returns, stddev_returns, normalized_mean_returns -def make_value_fn(): +def make_value_fn(agent): def get_value(obs): obs = np.array(obs) + print(obs.shape) with torch.no_grad(): return agent.get_value(torch.Tensor(obs).to(device)) return get_value +def make_action_value_fn(agent): + def get_action_value(obs): + obs = np.array(obs[None,:]) + with torch.no_grad(): + action, logprob, _, value = agent.get_action_and_value(torch.Tensor(obs).to(device)) + return action.cpu().numpy(), value + return get_action_value + + if __name__ == "__main__": args = parse_args() run_name = f"{args.env_id}__{args.exp_name}__{args.seed}__{int(time.time())}" @@ -268,6 +278,15 @@ def get_value(obs): device = torch.device("cuda" if torch.cuda.is_available() and args.cuda else "cpu") print("Device:", device) + print("Creating agent") + agent = ProcgenAgent( + (64, 64, 3), + 15, + arch="large", + base_kwargs={'recurrent': False, 'hidden_size': 256} + ).to(device) + optimizer = optim.Adam(agent.parameters(), lr=args.learning_rate, eps=1e-5) + # Curriculum setup curriculum = None if args.curriculum: @@ -278,6 +297,8 @@ def get_value(obs): # Intialize Curriculum Method if args.curriculum_method == "plr": print("Using prioritized level replay.") + + plr_eval_env = make_env(args.env_id, args.seed, num_levels=200)() curriculum = PrioritizedLevelReplay( sample_env.task_space, sample_env.observation_space, @@ -286,7 +307,10 @@ def get_value(obs): gamma=args.gamma, gae_lambda=args.gae_lambda, task_sampler_kwargs_dict={"strategy": "value_l1"}, - get_value=make_value_fn(), + get_value=make_value_fn(agent), + robust_plr=True, + eval_envs=plr_eval_env, + action_value_fn=make_action_value_fn(agent), ) elif args.curriculum_method == "dr": print("Using domain randomization.") @@ -340,16 +364,6 @@ def get_value(obs): ) train_eval_envs = wrap_vecenv(train_eval_envs) - assert isinstance(envs.single_action_space, gym.spaces.Discrete), "only discrete action space is supported" - print("Creating agent") - agent = ProcgenAgent( - envs.single_observation_space.shape, - envs.single_action_space.n, - arch="large", - base_kwargs={'recurrent': False, 'hidden_size': 256} - ).to(device) - optimizer = optim.Adam(agent.parameters(), lr=args.learning_rate, eps=1e-5) - # ALGO Logic: Storage setup obs = torch.zeros((args.num_steps, args.num_envs) + envs.single_observation_space.shape).to(device) actions = torch.zeros((args.num_steps, args.num_envs) + envs.single_action_space.shape).to(device) @@ -369,6 +383,7 @@ def get_value(obs): completed_episodes = 0 for update in range(1, num_updates + 1): + print("Update", update) # Annealing the rate if instructed to do so. if args.anneal_lr: frac = 1.0 - (update - 1.0) / num_updates From 85c750416b40cb346018c7d026f201d8aca0999c Mon Sep 17 00:00:00 2001 From: Ryan Sullivan Date: Sun, 28 Apr 2024 02:20:37 -0400 Subject: [PATCH 20/26] Fix procgen script for robust plr --- syllabus/examples/training_scripts/cleanrl_procgen_plr.py | 1 + 1 file changed, 1 insertion(+) diff --git a/syllabus/examples/training_scripts/cleanrl_procgen_plr.py b/syllabus/examples/training_scripts/cleanrl_procgen_plr.py index d59e7c24..d805bab0 100644 --- a/syllabus/examples/training_scripts/cleanrl_procgen_plr.py +++ b/syllabus/examples/training_scripts/cleanrl_procgen_plr.py @@ -257,6 +257,7 @@ def get_action_value(obs): print("Using prioritized level replay.") plr_eval_env = make_env(args.env_id, args.seed, num_levels=200)() + plr_eval_env = ProcgenTaskWrapper(plr_eval_env, args.env_id, seed=args.seed) curriculum = PrioritizedLevelReplay( sample_env.task_space, sample_env.observation_space, From 2b281e69f15c24182fa7dfeaf088ef66fb16da16 Mon Sep 17 00:00:00 2001 From: Ameen Ur Rehman Date: Tue, 30 Apr 2024 15:22:57 +0530 Subject: [PATCH 21/26] pulled everything and continue on robust_plr --- tests/multiprocessing_smoke_tests.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/multiprocessing_smoke_tests.py b/tests/multiprocessing_smoke_tests.py index 43c532fb..9b26fb8d 100644 --- a/tests/multiprocessing_smoke_tests.py +++ b/tests/multiprocessing_smoke_tests.py @@ -29,7 +29,7 @@ "get_value": get_test_values, "device": "cpu", "num_processes": N_ENVS, - "num_steps": 2048 + "num_steps": 2048, "robust_plr": True, "eval_envs": create_nethack_env(), "action_value_fn": get_action_value From 1cc51c24281c8cb9e811e60671824ab8165f35d8 Mon Sep 17 00:00:00 2001 From: Ryan Sullivan Date: Thu, 2 May 2024 14:45:26 -0400 Subject: [PATCH 22/26] Add Storage file --- syllabus/curricula/plr/storage.py | 69 +++++++++++++++++++++++++++++++ 1 file changed, 69 insertions(+) create mode 100644 syllabus/curricula/plr/storage.py diff --git a/syllabus/curricula/plr/storage.py b/syllabus/curricula/plr/storage.py new file mode 100644 index 00000000..70ab76ae --- /dev/null +++ b/syllabus/curricula/plr/storage.py @@ -0,0 +1,69 @@ +import gymnasium as gym +import torch + +class RolloutStorage(object): + def __init__( + self, + num_steps: int, + num_processes: int, + requires_value_buffers: bool, + action_space: gym.Space = None, + ): + self._requires_value_buffers = requires_value_buffers + self.tasks = torch.zeros(num_steps, num_processes, 1, dtype=torch.int) + self.masks = torch.ones(num_steps + 1, num_processes, 1) + + if requires_value_buffers: + self.returns = torch.zeros(num_steps + 1, num_processes, 1) + self.rewards = torch.zeros(num_steps, num_processes, 1) + self.value_preds = torch.zeros(num_steps + 1, num_processes, 1) + else: + if action_space is None: + raise ValueError( + "Action space must be provided to PLR for strategies 'policy_entropy', 'least_confidence', 'min_margin'" + ) + self.action_log_dist = torch.zeros(num_steps, num_processes, action_space.n) + + self.num_steps = num_steps + self.step = 0 + + def to(self, device): + self.masks = self.masks.to(device) + self.tasks = self.tasks.to(device) + if self._requires_value_buffers: + self.rewards = self.rewards.to(device) + self.value_preds = self.value_preds.to(device) + self.returns = self.returns.to(device) + else: + self.action_log_dist = self.action_log_dist.to(device) + + def insert(self, masks, action_log_dist=None, value_preds=None, rewards=None, tasks=None): + if self._requires_value_buffers: + assert (value_preds is not None and rewards is not None), "Selected strategy requires value_preds and rewards" + if len(rewards.shape) == 3: + rewards = rewards.squeeze(2) + self.value_preds[self.step].copy_(torch.as_tensor(value_preds)) + self.rewards[self.step].copy_(torch.as_tensor(rewards)) + self.masks[self.step + 1].copy_(torch.as_tensor(masks)) + else: + self.action_log_dist[self.step].copy_(action_log_dist) + if tasks is not None: + # assert isinstance(tasks[0], (int, torch.int32)), "Provided task must be an integer" + self.tasks[self.step].copy_(torch.as_tensor(tasks)) + self.step = (self.step + 1) % self.num_steps + + def after_update(self): + self.masks[0].copy_(self.masks[-1]) + + def compute_returns(self, next_value, gamma, gae_lambda): + assert self._requires_value_buffers, "Selected strategy does not use compute_rewards." + self.value_preds[-1] = next_value + gae = 0 + for step in reversed(range(self.rewards.size(0))): + delta = ( + self.rewards[step] + + gamma * self.value_preds[step + 1] * self.masks[step + 1] + - self.value_preds[step] + ) + gae = delta + gamma * gae_lambda * self.masks[step + 1] * gae + self.returns[step] = gae + self.value_preds[step] \ No newline at end of file From 81fa16ece9873ebfb1cf3b0a637a9828036849c8 Mon Sep 17 00:00:00 2001 From: Ameen Ur Rehman Date: Fri, 3 May 2024 23:45:46 +0530 Subject: [PATCH 23/26] updated the curricula to accept robust_plr --- syllabus/curricula/plr/central_plr_wrapper.py | 2 +- syllabus/curricula/plr/plr_wrapper.py | 2 +- syllabus/curricula/plr/task_sampler.py | 15 ++++++++++----- syllabus/tests/utils.py | 4 +++- 4 files changed, 15 insertions(+), 8 deletions(-) diff --git a/syllabus/curricula/plr/central_plr_wrapper.py b/syllabus/curricula/plr/central_plr_wrapper.py index 7f69ea85..d9b389bf 100644 --- a/syllabus/curricula/plr/central_plr_wrapper.py +++ b/syllabus/curricula/plr/central_plr_wrapper.py @@ -133,7 +133,7 @@ def __init__( self._gae_lambda = gae_lambda self._supress_usage_warnings = suppress_usage_warnings self._task2index = {task: i for i, task in enumerate(self.tasks)} - self._task_sampler = TaskSampler(self.tasks, action_space=action_space, **task_sampler_kwargs_dict) + self._task_sampler = TaskSampler(self.tasks, task_space=task_space, action_space=action_space, **task_sampler_kwargs_dict) self._rollouts = RolloutStorage( self._num_steps, self._num_processes, diff --git a/syllabus/curricula/plr/plr_wrapper.py b/syllabus/curricula/plr/plr_wrapper.py index 375a70eb..034aa3dd 100644 --- a/syllabus/curricula/plr/plr_wrapper.py +++ b/syllabus/curricula/plr/plr_wrapper.py @@ -206,7 +206,7 @@ def __init__( self._eval_envs = eval_envs self.action_value_fn = action_value_fn - self._task_sampler = TaskSampler(self.tasks, action_space=action_space, robust_plr=robust_plr, eval_envs=eval_envs, action_value_fn = action_value_fn, get_value = get_value, observation_space = observation_space, **task_sampler_kwargs_dict) + self._task_sampler = TaskSampler(self.tasks, task_space=task_space, action_space=action_space, robust_plr=robust_plr, eval_envs=eval_envs, action_value_fn = action_value_fn, **task_sampler_kwargs_dict) self._rollouts = RolloutStorage( self._num_steps, diff --git a/syllabus/curricula/plr/task_sampler.py b/syllabus/curricula/plr/task_sampler.py index 1e521224..2901a89b 100644 --- a/syllabus/curricula/plr/task_sampler.py +++ b/syllabus/curricula/plr/task_sampler.py @@ -4,11 +4,10 @@ import gymnasium as gym import numpy as np import torch -import warnings from typing import List -from syllabus.core import Curriculum, UsageError, enumerate_axes from syllabus.curricula.plr.storage import RolloutStorage +from syllabus.task_space.task_space import TaskSpace def null(x): @@ -39,6 +38,7 @@ class TaskSampler: def __init__( self, tasks: list, + task_space: TaskSpace, action_space: gym.spaces.Space = None, num_actors: int = 1, strategy: str = "value_l1", @@ -56,12 +56,15 @@ def __init__( staleness_coef: float = 0.1, staleness_transform: str = "power", staleness_temperature: float = 1.0, + robust_plr: bool = False, eval_envs = None, action_value_fn=None, get_value=None, - observation_space = None + observation_space = None, + ): + self.task_space = task_space self.action_space = action_space self.tasks = tasks self.num_tasks = len(self.tasks) @@ -336,7 +339,7 @@ def evaluate_task(self, task, env, action_value_fn): raise ValueError("Environment object is None. Please ensure it is properly initialized.") print("Evaluating") - obs, _ = env.reset(new_task=task) + obs = env.reset(new_task=task) done = False while not done: @@ -344,8 +347,10 @@ def evaluate_task(self, task, env, action_value_fn): obs, rew, term, trunc, _ = env.step(action) + task_encoded = self.task_space.encode(task) + mask = torch.FloatTensor([0.0] if term or trunc else [1.0]) - self._robust_rollouts.insert(mask, value_preds=value[0], rewards=torch.Tensor([rew]), tasks=torch.Tensor([task])) + self._robust_rollouts.insert(mask, value_preds=value, rewards=torch.Tensor([rew]), tasks=torch.Tensor([task_encoded])) # Check if the episode is done diff --git a/syllabus/tests/utils.py b/syllabus/tests/utils.py index c47640b1..4a644890 100644 --- a/syllabus/tests/utils.py +++ b/syllabus/tests/utils.py @@ -192,7 +192,9 @@ def get_test_values(x): def get_action_value(obs): - return 0,0 + action = 0 + value = 0 + return action, value # Sync Test Environment From 2ea5979fca39c8fc869821e2fe8a69ebd1396ec2 Mon Sep 17 00:00:00 2001 From: Ameen Ur Rehman Date: Sat, 4 May 2024 12:09:37 +0530 Subject: [PATCH 24/26] fixes errors, minor changes and updated the task_sampler --- syllabus/curricula/plr/task_sampler.py | 15 +++++++-------- syllabus/tests/utils.py | 2 ++ 2 files changed, 9 insertions(+), 8 deletions(-) diff --git a/syllabus/curricula/plr/task_sampler.py b/syllabus/curricula/plr/task_sampler.py index 2901a89b..2610f636 100644 --- a/syllabus/curricula/plr/task_sampler.py +++ b/syllabus/curricula/plr/task_sampler.py @@ -339,7 +339,7 @@ def evaluate_task(self, task, env, action_value_fn): raise ValueError("Environment object is None. Please ensure it is properly initialized.") print("Evaluating") - obs = env.reset(new_task=task) + obs, info = env.reset(new_task=task) done = False while not done: @@ -419,12 +419,12 @@ def _update_with_episode_data(self, episode_data, score_function): tasks = episode_data["tasks"] if not self.requires_value_buffers: policy_logits = episode_data.action_log_dist - done = np.array([not mask > 0 for mask in episode_data["masks"]]) + done = ~(episode_data["masks"] > 0) total_steps, num_actors = tasks.shape[:2] for actor_index in range(num_actors): - done_steps = done.nonzero()[0][:total_steps] + done_steps = done[:, actor_index].nonzero()[:total_steps, 0] start_t = 0 for t in done_steps: @@ -438,14 +438,14 @@ def _update_with_episode_data(self, episode_data, score_function): if self.strategy == "one_step_td_error" and t - start_t <= 1: continue - task_idx_t = np.array(tasks[start_t, actor_index]).item() + task_idx_t = tasks[start_t, actor_index].item() # Store kwargs for score function score_function_kwargs = {} if self.requires_value_buffers: - score_function_kwargs["returns"] = episode_data["returns"][start_t:t] - score_function_kwargs["rewards"] = episode_data["rewards"][start_t:t] - score_function_kwargs["value_preds"] = episode_data["value_preds"][start_t:t] + score_function_kwargs["returns"] = episode_data["returns"][start_t:t, actor_index] + score_function_kwargs["rewards"] = episode_data["rewards"][start_t:t, actor_index] + score_function_kwargs["value_preds"] = episode_data["value_preds"][start_t:t, actor_index] else: episode_logits = policy_logits[start_t:t, actor_index] score_function_kwargs["episode_logits"] = torch.log_softmax(episode_logits, -1) @@ -478,7 +478,6 @@ def _update_with_episode_data(self, episode_data, score_function): self._partial_update_task_score(actor_index, task_idx_t, score, num_steps) print("Updated") - def sample_weights(self): weights = self._score_transform(self.score_transform, self.temperature, self.task_scores) weights = weights * (1 - self.unseen_task_weights) # zero out unseen levels diff --git a/syllabus/tests/utils.py b/syllabus/tests/utils.py index 4a644890..edad012e 100644 --- a/syllabus/tests/utils.py +++ b/syllabus/tests/utils.py @@ -230,8 +230,10 @@ def create_nethack_env(*args, type=None, env_args=(), env_kwargs={}, **kwargs): warnings.warn("Unable to import nle.") env = NetHackScore(*env_args, **env_kwargs) + env = GymV21CompatibilityV0(env=env) env = NethackTaskWrapper(env) + if type == "queue": env = MultiProcessingSyncWrapper( env, *args, task_space=env.task_space, **kwargs From 31c63fe834945a5d4481136a4d8cf6c0d8ea7d17 Mon Sep 17 00:00:00 2001 From: Ameen Ur Rehman Date: Wed, 8 May 2024 21:51:01 +0530 Subject: [PATCH 25/26] minor cleanup --- syllabus/curricula/plr/task_sampler.py | 54 ++++++------------- .../training_scripts/cleanrl_procgen_plr.py | 14 ----- syllabus/tests/utils.py | 1 - 3 files changed, 17 insertions(+), 52 deletions(-) diff --git a/syllabus/curricula/plr/task_sampler.py b/syllabus/curricula/plr/task_sampler.py index 2610f636..225ea637 100644 --- a/syllabus/curricula/plr/task_sampler.py +++ b/syllabus/curricula/plr/task_sampler.py @@ -113,48 +113,32 @@ def __init__( 'Must provide action space to PLR if using "policy_entropy", "least_confidence", or "min_margin" strategies' ) - def update_with_rollouts(self, rollouts, actor_id=None): - if self.strategy == "random": - return - - # Update with a RolloutStorage object + def _get_score_function(self): if self.strategy == "policy_entropy": - score_function = self._average_entropy + return self._average_entropy elif self.strategy == "least_confidence": - score_function = self._average_least_confidence + return self._average_least_confidence elif self.strategy == "min_margin": - score_function = self._average_min_margin + return self._average_min_margin elif self.strategy == "gae": - score_function = self._average_gae + return self._average_gae elif self.strategy == "value_l1": - score_function = self._average_value_l1 + return self._average_value_l1 elif self.strategy == "one_step_td_error": - score_function = self._one_step_td_error + return self._one_step_td_error else: raise ValueError(f"Unsupported strategy, {self.strategy}") + def update_with_rollouts(self, rollouts, actor_id=None): + if self.strategy == "random": + return + score_function = self._get_score_function() self._update_with_rollouts(rollouts, score_function, actor_index=actor_id) def update_with_episode_data(self, episode_data): if self.strategy == "random": return - - # Update with a EpisodeRolloutStorage object - if self.strategy == "policy_entropy": - score_function = self._average_entropy - elif self.strategy == "least_confidence": - score_function = self._average_least_confidence - elif self.strategy == "min_margin": - score_function = self._average_min_margin - elif self.strategy == "gae": - score_function = self._average_gae - elif self.strategy == "value_l1": - score_function = self._average_value_l1 - elif self.strategy == "one_step_td_error": - score_function = self._one_step_td_error - else: - raise ValueError(f"Unsupported strategy, {self.strategy}") - + score_function = self._get_score_function() self._update_with_episode_data(episode_data, score_function) def update_task_score(self, actor_index, task_idx, score, num_steps): @@ -199,17 +183,16 @@ def _average_min_margin(self, **kwargs): def _average_gae(self, **kwargs): returns = kwargs["returns"] value_preds = kwargs["value_preds"] + advantages = returns - value_preds - advantages = np.abs(returns - value_preds) - - return np.mean(advantages).item() + return advantages.mean().item() def _average_value_l1(self, **kwargs): returns = kwargs["returns"] value_preds = kwargs["value_preds"] advantages = np.asarray(np.abs(returns - value_preds)) - return np.mean(advantages).item() + return advantages.mean().item() def _one_step_td_error(self, **kwargs): rewards = kwargs["rewards"] @@ -337,7 +320,6 @@ def _evaluate_unseen_level(self): def evaluate_task(self, task, env, action_value_fn): if env is None: raise ValueError("Environment object is None. Please ensure it is properly initialized.") - print("Evaluating") obs, info = env.reset(new_task=task) done = False @@ -352,14 +334,12 @@ def evaluate_task(self, task, env, action_value_fn): mask = torch.FloatTensor([0.0] if term or trunc else [1.0]) self._robust_rollouts.insert(mask, value_preds=value, rewards=torch.Tensor([rew]), tasks=torch.Tensor([task_encoded])) - # Check if the episode is done if term or trunc: done = True _, next_value = action_value_fn(obs) self._robust_rollouts.compute_returns(next_value, self.gamma, self.gae_lambda) - print("Evaluated") return { "tasks": self._robust_rollouts.tasks, "masks": self._robust_rollouts.masks, @@ -407,6 +387,8 @@ def sample(self, strategy=None): else: if self.robust_plr: self._evaluate_unseen_level() + self._robust_rollouts.after_update() + self.after_update() return self.sample(strategy=strategy) else: return self._sample_unseen_level() @@ -415,7 +397,6 @@ def sample(self, strategy=None): f"Unsupported replay schedule: {self.replay_schedule}. Must be 'fixed' or 'proportionate'.") def _update_with_episode_data(self, episode_data, score_function): - print("Updating") tasks = episode_data["tasks"] if not self.requires_value_buffers: policy_logits = episode_data.action_log_dist @@ -476,7 +457,6 @@ def _update_with_episode_data(self, episode_data, score_function): self._last_score = score num_steps = len(episode_data["tasks"][start_t:, actor_index]) self._partial_update_task_score(actor_index, task_idx_t, score, num_steps) - print("Updated") def sample_weights(self): weights = self._score_transform(self.score_transform, self.temperature, self.task_scores) diff --git a/syllabus/examples/training_scripts/cleanrl_procgen_plr.py b/syllabus/examples/training_scripts/cleanrl_procgen_plr.py index d805bab0..49464bb8 100644 --- a/syllabus/examples/training_scripts/cleanrl_procgen_plr.py +++ b/syllabus/examples/training_scripts/cleanrl_procgen_plr.py @@ -188,7 +188,6 @@ def level_replay_evaluate( def make_value_fn(agent): def get_value(obs): obs = np.array(obs) - print(obs.shape) with torch.no_grad(): return agent.get_value(torch.Tensor(obs).to(device)) return get_value @@ -234,7 +233,6 @@ def get_action_value(obs): torch.backends.cudnn.deterministic = args.torch_deterministic device = torch.device("cuda" if torch.cuda.is_available() and args.cuda else "cpu") - print("Device:", device) print("Creating agent") agent = ProcgenAgent( @@ -307,17 +305,6 @@ def get_action_value(obs): ) envs = wrap_vecenv(envs) - - assert isinstance(envs.single_action_space, gym.spaces.Discrete), "only discrete action space is supported" - print("Creating agent") - agent = ProcgenAgent( - envs.single_observation_space.shape, - envs.single_action_space.n, - arch="large", - base_kwargs={'recurrent': False, 'hidden_size': 256} - ).to(device) - optimizer = optim.Adam(agent.parameters(), lr=args.learning_rate, eps=1e-5) - # ALGO Logic: Storage setup obs = torch.zeros((args.num_steps, args.num_envs) + envs.single_observation_space.shape).to(device) actions = torch.zeros((args.num_steps, args.num_envs) + envs.single_action_space.shape).to(device) @@ -337,7 +324,6 @@ def get_action_value(obs): completed_episodes = 0 for update in range(1, num_updates + 1): - print("Update", update) # Annealing the rate if instructed to do so. if args.anneal_lr: frac = 1.0 - (update - 1.0) / num_updates diff --git a/syllabus/tests/utils.py b/syllabus/tests/utils.py index edad012e..fe49f773 100644 --- a/syllabus/tests/utils.py +++ b/syllabus/tests/utils.py @@ -233,7 +233,6 @@ def create_nethack_env(*args, type=None, env_args=(), env_kwargs={}, **kwargs): env = GymV21CompatibilityV0(env=env) env = NethackTaskWrapper(env) - if type == "queue": env = MultiProcessingSyncWrapper( env, *args, task_space=env.task_space, **kwargs From b96686ddeed3c80086cf87ea5ad8c6225ae20473 Mon Sep 17 00:00:00 2001 From: Ameen Ur Rehman Date: Thu, 9 May 2024 23:23:19 +0530 Subject: [PATCH 26/26] minor cleanup final --- syllabus/curricula/plr/task_sampler.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/syllabus/curricula/plr/task_sampler.py b/syllabus/curricula/plr/task_sampler.py index 225ea637..0907bd2a 100644 --- a/syllabus/curricula/plr/task_sampler.py +++ b/syllabus/curricula/plr/task_sampler.py @@ -183,6 +183,7 @@ def _average_min_margin(self, **kwargs): def _average_gae(self, **kwargs): returns = kwargs["returns"] value_preds = kwargs["value_preds"] + advantages = returns - value_preds return advantages.mean().item() @@ -190,9 +191,10 @@ def _average_gae(self, **kwargs): def _average_value_l1(self, **kwargs): returns = kwargs["returns"] value_preds = kwargs["value_preds"] - advantages = np.asarray(np.abs(returns - value_preds)) - return advantages.mean().item() + advantages = returns - value_preds + + return advantages.abs().mean().item() def _one_step_td_error(self, **kwargs): rewards = kwargs["rewards"]