Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Robust PLR #29

Open
wants to merge 30 commits into
base: main
Choose a base branch
from
Open

Conversation

AmeenUrRehman
Copy link

@AmeenUrRehman AmeenUrRehman commented Mar 31, 2024

  • update the seed function

@AmeenUrRehman AmeenUrRehman changed the title minor update to accept _venv Robust PLR Apr 7, 2024
Copy link
Owner

@RyanNavillus RyanNavillus left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks good so far, make sure to test your code because I noticed a lot of errors that the Python runtime would have caught.


elif self.replay_schedule == "proportionate":
if proportion_seen >= self.rho and np.random.rand() < proportion_seen:
return self._sample_replay_level()
else:
return self._sample_unseen_level()
if self.robust_plr:
return self._evaluate_unseen_level()
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
return self._evaluate_unseen_level()
self.update_with_episode_data(self._evaluate_unseen_level())
return self.sample(strategy=strategy)

I think this needs to be the same as above, evaluate instead of returning

Comment on lines 289 to 317
def evaluate_task(self, task, env, get_action_and_value_fn, gamma, gae_lambda):

if env is None:
raise ValueError("Environment object is None. Please ensure it is properly initialized.")
obs = env.reset(next_task=task)
done = False
episode_data = {
'tasks': [],
'masks': [],
'rewards': [],
'returns': [],
'value_preds': [],
'policy_logits': []
}

while not done:
action, value = get_action_and_value_fn(obs)
obs, rew, done, info = env.step(action)

episode_data['tasks'].append(task)
episode_data['masks'].append(not done)
episode_data['rewards'].append(rew)
episode_data['value_preds'].append(value)
episode_data['policy_logits'].append(info['policy_logits'])

episode_data['returns'] = self.compute_returns(gamma, gae_lambda, episode_data['rewards'],
episode_data['value_preds'])

return episode_data
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
def evaluate_task(self, task, env, get_action_and_value_fn, gamma, gae_lambda):
if env is None:
raise ValueError("Environment object is None. Please ensure it is properly initialized.")
obs = env.reset(next_task=task)
done = False
episode_data = {
'tasks': [],
'masks': [],
'rewards': [],
'returns': [],
'value_preds': [],
'policy_logits': []
}
while not done:
action, value = get_action_and_value_fn(obs)
obs, rew, done, info = env.step(action)
episode_data['tasks'].append(task)
episode_data['masks'].append(not done)
episode_data['rewards'].append(rew)
episode_data['value_preds'].append(value)
episode_data['policy_logits'].append(info['policy_logits'])
episode_data['returns'] = self.compute_returns(gamma, gae_lambda, episode_data['rewards'],
episode_data['value_preds'])
return episode_data
def evaluate_task(self, task, env, get_action_and_value_fn, gamma, gae_lambda):
if env is None:
raise ValueError("Environment object is None. Please ensure it is properly initialized.")
obs = env.reset(next_task=task)
done = False
rewards = []
masks = []
values = []
while not done:
action, value = get_action_and_value_fn(obs)
obs, rew, term, trunc, info = env.step(action)
rewards.append(rew)
masks.append(not (term or trunc))
values.append(value)
return = self.compute_returns(gamma, gae_lambda, rewards, values, masks)
return {
"tasks": task,
"masks": masks,
"rewards": rewards,
"value_preds": values,
"returns": return
}

I think you can simplify the code here. Also you'll need to be careful, the environments we're using are probably Gymnasium environments not Gym environments, meaning they'll return obs, rew, term, trunc, info rather than obs, rew, done, info.

raise NotImplementedError(
f"Unsupported replay schedule: {self.replay_schedule}. Must be 'fixed' or 'proportionate'.")

def update_with_episode_data(self, episode_data, score_function):
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make sure whenever you call this function, you're passing in the score_function. It looks like you're passing in different values each time you call it.

else:
# Otherwise, sample a new level
return self._sample_unseen_level()

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a good start, but it's going to be very inefficient to do these evaluations in the main process. We'll probably want to batch and multiprocess them in the future, but for now this is good as a proof of concept.

return action, state_value.item()

return action_value_fn

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry for not noticing this earlier, but this doesn't make sense here. The user needs to provide an action_value_fn and we call it with the observations. We should assume they return the action and value in a good format (maybe with some asserts to check if its the correct shape). The user should pass it to the initializer of PrioritizedLevelReplay whenever they enable robust_plr.

return [self._task_sampler.sample() for _ in range(k)]
if self._robust_plr:
if self._eval_envs is None:
raise ValueError("When robust_plr is enabled, eval_envs must not be None.")
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Move this check to the initializer, we don't need to wait to check this.

if self._robust_plr:
if self._eval_envs is None:
raise ValueError("When robust_plr is enabled, eval_envs must not be None.")
return [self._evaluate_task_and_update_score() for _ in range(k)]
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where is this function?

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also why are you defining the robust plr behavior here when you already have it defined in the task sampler?

Comment on lines 238 to 241
if robust_plr:
self._task_sampler = TaskSampler(self.tasks, action_space=action_space, robust_plr=robust_plr, eval_envs=eval_envs, action_value_fn = action_value_fn, **task_sampler_kwargs_dict)
else:
self._task_sampler = TaskSampler(self.tasks, action_space=action_space, **task_sampler_kwargs_dict)
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if robust_plr:
self._task_sampler = TaskSampler(self.tasks, action_space=action_space, robust_plr=robust_plr, eval_envs=eval_envs, action_value_fn = action_value_fn, **task_sampler_kwargs_dict)
else:
self._task_sampler = TaskSampler(self.tasks, action_space=action_space, **task_sampler_kwargs_dict)
self._task_sampler = TaskSampler(self.tasks, action_space=action_space, robust_plr=robust_plr, eval_envs=eval_envs, action_value_fn = action_value_fn, **task_sampler_kwargs_dict)

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You don't need to check for robust_plr here, just pass the None values through


self._update_staleness(task_idx)

return task
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't this function already defined above?

return task
else:
return self._sample_unseen_level()

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do you need to do this? If you just call sample again recursively, won't it recalculate num_seen and proportion seen for you? Let me know if I'm missing something that prevents you from doing that.

score = score_function(**score_function_kwargs)
self._last_score = score
num_steps = len(episode_data['tasks'][start_t:, actor_index])
self._partial_update_task_score(actor_index, task_idx_t, score, num_steps)
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you reduce the code duplication between update_with_rollouts and this? Maybe create an inner function that does most of the computation, and a helper function that converts rollouts to an episode_data dictionary.

So update_with_rollouts will first move data into the episode_data dictionary and then call update_with_episode_data

"rewards": rewards,
"value_preds": values,
"returns": returns
}
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Once you clean up everything else, we should see how fast this is and come up with some ideas to optimize it

@@ -206,7 +206,7 @@ def __init__(
get_value=null,
get_action_log_dist=null,
robust_plr: bool = False, # Option to use RobustPLR
eval_envs=None,
eval_envs: List[gym.Env] = None,
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should probably be a gym.VectorEnv

break

return task
self.update_with_episode_data(self._evaluate_unseen_level())
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You call self.update_with_episode_data here and within self._evaluate_unseen_level. You should only keep one of them.

Suggested change
self.update_with_episode_data(self._evaluate_unseen_level())
self._evaluate_unseen_level()

@@ -363,44 +401,32 @@ def sample(self, strategy=None):
if np.random.rand() > self.nu or not proportion_seen < 1.0:
return self._sample_replay_level()

# Otherwise, evaluate a new level
# Otherwise, sample a new level
if self.robust_plr:
self.update_with_episode_data(self._evaluate_unseen_level())
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You call self.update_with_episode_data here and within self._evaluate_unseen_level. You should only keep one of them.

Suggested change
self.update_with_episode_data(self._evaluate_unseen_level())
self._evaluate_unseen_level()

done = False
rewards = []
masks = []
values = []

while not done:
action, value = action_value_fn(obs)

if isinstance(action, np.ndarray):
action = int(action[0])
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You shouldn't do this, instead make sure that action_value_fn returns a single action rather than an np.ndarray

gae_scal = gae[0] if isinstance(gae, np.ndarray) else gae
value_scal = values[step][0] if isinstance(values[step], np.ndarray) else values[step]

returns[step] = gae_scal + value_scal
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are you changing this function? You should not need to modify the GAE code, it's already correct.


self._update_staleness(task_idx)

return task
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
return task

No need to return the task here, see my other comments

"num_steps": 2048,
"robust_plr": True,
"eval_envs": create_nethack_env(),
"action_value_fn": get_action_value
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you need this to output a single action and value for now, just do something simple for testing

def get_action_value(obs):
    return 0, 0

Copy link
Owner

@RyanNavillus RyanNavillus left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just some cleanup and minor errors to correct

@@ -282,6 +307,7 @@ def get_value(obs):
)
envs = wrap_vecenv(envs)


assert isinstance(envs.single_action_space, gym.spaces.Discrete), "only discrete action space is supported"
print("Creating agent")
agent = ProcgenAgent(
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We shouldn't be creating the agent twice, please remove this

@@ -311,6 +337,7 @@ def get_value(obs):
completed_episodes = 0

for update in range(1, num_updates + 1):
print("Update", update)
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove print statements throughout code please

def get_value(obs):
obs = np.array(obs)
print(obs.shape)
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove

self._last_score = score
num_steps = len(episode_data["tasks"][start_t:, actor_index])
self._partial_update_task_score(actor_index, task_idx_t, score, num_steps)
print("Updated")
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove

f"Unsupported replay schedule: {self.replay_schedule}. Must be 'fixed' or 'proportionate'.")

def _update_with_episode_data(self, episode_data, score_function):
print("Updating")
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove print

def evaluate_task(self, task, env, action_value_fn):
if env is None:
raise ValueError("Environment object is None. Please ensure it is properly initialized.")
print("Evaluating")
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove print

Comment on lines 346 to 359
action, value = action_value_fn(obs)

obs, rew, term, trunc, _ = env.step(action)

task_encoded = self.task_space.encode(task)

mask = torch.FloatTensor([0.0] if term or trunc else [1.0])
self._robust_rollouts.insert(mask, value_preds=value, rewards=torch.Tensor([rew]), tasks=torch.Tensor([task_encoded]))


# Check if the episode is done
if term or trunc:
done = True

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
action, value = action_value_fn(obs)
obs, rew, term, trunc, _ = env.step(action)
task_encoded = self.task_space.encode(task)
mask = torch.FloatTensor([0.0] if term or trunc else [1.0])
self._robust_rollouts.insert(mask, value_preds=value, rewards=torch.Tensor([rew]), tasks=torch.Tensor([task_encoded]))
# Check if the episode is done
if term or trunc:
done = True
action, value = action_value_fn(obs)
obs, rew, term, trunc, _ = env.step(action)
task_encoded = self.task_space.encode(task)
mask = torch.FloatTensor([0.0] if term or trunc else [1.0])
self._robust_rollouts.insert(mask, value_preds=value, rewards=torch.Tensor([rew]), tasks=torch.Tensor([task_encoded]))
# Check if the episode is done
if term or trunc:
done = True

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove spaces


_, next_value = action_value_fn(obs)
self._robust_rollouts.compute_returns(next_value, self.gamma, self.gae_lambda)
print("Evaluated")
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove print

Comment on lines 408 to 410
if self.robust_plr:
self._evaluate_unseen_level()
return self.sample(strategy=strategy)
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if self.robust_plr:
self._evaluate_unseen_level()
return self.sample(strategy=strategy)
if self.robust_plr:
self._evaluate_unseen_level()
self._robust_rollouts.after_update()
self.after_update()
return self.sample(strategy=strategy)

env = NethackTaskWrapper(env)


Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove space

Suggested change

Copy link
Owner

@RyanNavillus RyanNavillus left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks great. 1 Minor change you missed and this should be good!

Comment on lines 193 to 195
advantages = np.asarray(np.abs(returns - value_preds))

advantages = returns - value_preds

return advantages.abs().mean().item()
return advantages.mean().item()
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This code should be

        advantages = returns - value_preds

        return advantages.abs().mean().item()

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants