This repository has been archived by the owner on Jun 28, 2022. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 18
/
episode.py
103 lines (80 loc) · 3.23 KB
/
episode.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
""" Contains the Episodes for Navigation. """
import random
import torch
import time
import sys
from constants import GOAL_SUCCESS_REWARD, STEP_PENALTY, BASIC_ACTIONS
from environment import Environment
from utils.net_util import gpuify
class Episode:
""" Episode for Navigation. """
def __init__(self, args, gpu_id, rank, strict_done=False):
super(Episode, self).__init__()
self._env = None
self.gpu_id = gpu_id
self.strict_done = strict_done
self.task_data = None
self.glove_embedding = None
self.seed = args.seed + rank
random.seed(self.seed)
with open('./datasets/objects/int_objects.txt') as f:
int_objects = [s.strip() for s in f.readlines()]
with open('./datasets/objects/rec_objects.txt') as f:
rec_objects = [s.strip() for s in f.readlines()]
self.objects = int_objects + rec_objects
self.actions_list = [{'action':a} for a in BASIC_ACTIONS]
self.actions_taken = []
@property
def environment(self):
return self._env
def state_for_agent(self):
return self.environment.current_frame
def step(self, action_as_int):
action = self.actions_list[action_as_int]
self.actions_taken.append(action)
return self.action_step(action)
def action_step(self, action):
self.environment.step(action)
reward, terminal, action_was_successful = self.judge(action)
return reward, terminal, action_was_successful
def slow_replay(self, delay=0.2):
# Reset the episode
self._env.reset(self.cur_scene, change_seed = False)
for action in self.actions_taken:
self.action_step(action)
time.sleep(delay)
def judge(self, action):
""" Judge the last event. """
# immediate reward
reward = STEP_PENALTY
done = False
action_was_successful = self.environment.last_action_success
if action['action'] == 'Done':
done = True
objects = self._env.last_event.metadata['objects']
visible_objects = [o['objectType'] for o in objects if o['visible']]
if self.target in visible_objects:
reward += GOAL_SUCCESS_REWARD
self.success = True
return reward, done, action_was_successful
def new_episode(self, args, scene):
if self._env is None:
if args.arch == 'osx':
local_executable_path = './datasets/builds/thor-local-OSXIntel64.app/Contents/MacOS/thor-local-OSXIntel64'
else:
local_executable_path = './datasets/builds/thor-local-Linux64'
self._env = Environment(
grid_size=args.grid_size,
fov=args.fov,
local_executable_path=local_executable_path,
randomize_objects=args.randomize_objects,
seed=self.seed)
self._env.start(scene, self.gpu_id)
else:
self._env.reset(scene)
# For now, single target.
self.target = 'Tomato'
self.success = False
self.cur_scene = scene
self.actions_taken = []
return True