-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathPopulation.py
157 lines (139 loc) · 6.14 KB
/
Population.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
import os, progressbar, threading, random, time, bisect
import numpy as np
import statistics as stats
from thread import ThreadPool
from DNA import DNA
BAR_LENGTH = 5
def worker(data, encoded, wordsDict):
for dna in data:
dna.CalcFitness(encoded, wordsDict)
class Population:
def __init__(self, threadsCount,data, mutationRate, encoded, wordsDict, hints):
self.__data = data
self.__matingPool = []
self.__generation = 0
self.__bestScore = 0
self.__mutationRate = mutationRate
self.__encoded = encoded
self.__wordsDict = wordsDict
self.__hints = hints
self.__threadPool = ThreadPool(threadsCount)
self.__threadsCount = threadsCount
self.__consecutiveScores = 0
self.__lastScore = -1
self.__weights = [1 for i in range(len(encoded))]
@classmethod
def Random(cls, threadsCount, count, length, mutationRate, encoded, wordsDict, hints):
data = [DNA.Random(length, hints) for i in range(count)]
return cls(threadsCount, data, mutationRate, encoded, wordsDict, hints)
@classmethod
def FromFolder(cls, threadsCount, path, count, length, mutationRate, encoded, wordsDict, hints):
data = []
for file in os.listdir(path):
if file.endswith('.json'):
data.append(DNA.ReadFromJson(path + file))
print('Loaded ', len(data), 'samples')
if len(data) < count:
count = count - len(data)
print ('Adding ', count, 'random samples...')
data = data + [DNA.Random(length, hints) for i in range(count)]
return cls(threadsCount, data, mutationRate, encoded, wordsDict, hints)
def Print(self, printAll, saveBest):
average = stats.mean(self.__scores)
maxScore = max(self.__scores)
self.__generation = self.__generation + 1
os.makedirs('./generation/best/', exist_ok = True)
if printAll:
saveFolder = './generation/' + str(self.__generation)
os.makedirs(saveFolder, exist_ok = True)
scoresFile = open(saveFolder + '/scores.txt', 'w')
for i,dna in enumerate(self.__data):
print(i, dna.GetScore(), file = scoresFile)
for i,dna in enumerate(self.__data):
dna.WriteJson(saveFolder + '/' + str(i) + '.json')
print(average, file = open(saveFolder + '/average.txt', 'w'))
if average > self.__bestScore:
self.__bestScore = average
if saveBest:
for i,dna in enumerate(self.__data):
dna.WriteJson('./generation/best/' + str(i) + '.json')
for dna in self.__data:
if dna.GetScore() == maxScore:
decoded = dna.decode(self.__encoded)
print('best match: ',decoded)
if printAll:
print(decoded, file=open(saveFolder + '/best.txt', 'w'))
#break
print('generation: ', self.__generation, ' average score : ', average, ' max score: ', max(self.__scores),'\n')
'''
print('in Print')
for dna in self.__data:
print(dna)
print('\n')
'''
def CalcFitness(self):
startTime = time.time()
bad = 0.4
for dna in self.__data:
dna.CalcFitness(self.__encoded, self.__wordsDict, bad, self.__weights)
length = len(self.__data)
# self.__threadPool.Start(lambda dna, encoded, wordsDict: dna.CalcFitness(encoded, wordsDict), list(zip(self.__data, [self.__encoded] * length, [self.__cost]*length, [self.__wordsDict]*length)))
# self.__threadPool.Join()
'''
threads = []
for threadId in range(self.__threadsCount):
data = [self.__data[i] for i in range(length) if i % self.__threadsCount == threadId]
thread = threading.Thread(target=worker, args = (data, self.__encoded, self.__wordsDict, bad, self.__weights, ))
threads.append(thread)
thread.start()
for thread in threads:
thread.join()
'''
#bar = progressbar.ProgressBar(maxval=length)
#show = [randint(0, BAR_LENGTH) for i in range(length)]
#if show[indx] == 0:
# bar.update(indx + 1)
#bar.finish()
print("%s seconds elpased" % (time.time() - startTime))
self.__scores = [dna.GetScore() for dna in self.__data]
def __PickOne(self, cumulativeSums, maxSum):
index = 0
value = np.random.rand() * maxSum
return bisect.bisect_left(cumulativeSums, value)
def Stuck(self, maxScore):
if maxScore == self.__lastScore:
self.__consecutiveScores = self.__consecutiveScores + 1
else:
self.__consecutiveScores = 1
self.__lastScore = maxScore
if self.__consecutiveScores == 10:
return True
return False
def NaturalSelection(self):
length = len(self.__data)
maxScore = max(self.__scores)
if self.Stuck(maxScore):
for dna in self.__data:
if(dna.GetScore() == maxScore):
mutation = 0.7
else:
mutation = 0.5
dna.Mutate(mutation, self.__hints)
print ('Forced mutations was did...')
randNumbers = [np.random.random() for i in range(len(self.__encoded))]
randSum = sum(randNumbers)
length = len(self.__encoded)
for i in range(length):
self.__weights[i] = randNumbers[i]/randSum*length
return None
cumulativeSums = np.array(self.__scores).cumsum().tolist()
maxSum = cumulativeSums[-1]
newGeneration = []
currentMutation = self.__mutationRate# + (self.__generation/1000)
print ('mutation:', currentMutation*100, '%')
for i in range(length):
parent1 = self.__data[self.__PickOne(cumulativeSums, maxSum)]
parent2 = self.__data[self.__PickOne(cumulativeSums, maxSum)]
child = parent1.CrossOver(parent2, currentMutation, self.__hints)
newGeneration.append(child)
self.__data = newGeneration