-
Notifications
You must be signed in to change notification settings - Fork 1
/
hammy_nn.py
104 lines (72 loc) · 3.07 KB
/
hammy_nn.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
from keras.layers import Dense, Input, concatenate
from keras.models import Model
from keras.optimizers import Adam
import numpy as np
from hammy import Needs, Experience, Action
class Ramp(object):
def __init__(self, start: float, end: float, steps: int, delay: int = 0):
self.value = start
self.start = start
self.end = end
self.steps = steps
self.delay = delay
if steps == 0:
self.value = end
self._steps_processed = 0
def step(self, steps: int) -> float:
self._steps_processed += steps
if self._steps_processed < self.delay:
return self.value
ramp_vertical = self.end - self.start
ramp_horizontal = self.steps
try:
m = ramp_vertical / ramp_horizontal
except ZeroDivisionError:
self.value = self.end
return self.end
x = (self._steps_processed - self.delay)
b = self.start
y = m * x + b
if self.start < self.end:
self.value = min(self.end, y)
elif self.start > self.end:
self.value = max(self.end, y)
return self.value
class HammyDQN(object):
def __init__(self, epsilon_start: float = 1., epsilon_end: float = 0., epsilon_steps: int = 10000,
gamma_start: float = 0., gamma_end: float = 0.9, gamma_steps: int = 0):
self.gamma = Ramp(start=gamma_start, end=gamma_end, steps=gamma_steps)
self.epsilon = Ramp(start=epsilon_start, end=epsilon_end, steps=epsilon_steps)
input_needs = Input(shape=(len(Needs),))
input_health = Input(shape=(1,))
x = concatenate([input_needs, input_health])
x = Dense(32, activation='relu')(x)
x = Dense(32, activation='relu')(x)
output_needs = Dense(len(Action), activation='linear')(x)
model = Model(inputs=[input_needs, input_health], outputs=output_needs)
self.optimizer = Adam()
model.compile(optimizer=self.optimizer, loss='mse')
self.model = model
def train(self, experience: Experience):
if not experience.terminal:
future_rewards = self.model.predict(experience.next_state)[0]
target = experience.reward + self.gamma.value * np.amax(future_rewards)
else:
target = experience.reward
if target > 1.:
target = 1.
elif target < -1.:
target = -1.
next_rewards = self.model.predict(experience.original_state)
next_rewards[0][experience.action.value] = target
self.model.fit(experience.original_state, next_rewards, epochs=1, verbose=0)
self.gamma.step(1)
self.epsilon.step(1)
def predict(self, state) -> Action:
if np.random.rand() <= self.epsilon.value:
return Action(np.random.choice([i for i in range(0, len(Action))]))
return Action(np.argmax(self.model.predict(state)[0]))
def save(self, path="weights.h5"):
self.model.save_weights(filepath=path)
def load(self, path="weights.h5"):
self.model.load_weights(filepath=path)