-
Notifications
You must be signed in to change notification settings - Fork 0
/
product_automaton.py
150 lines (139 loc) · 6.36 KB
/
product_automaton.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
from buchi_automata import BuchiAutomata
from ts import TS
from environment import Environment
from copy import deepcopy
import math
import time
class ProductAutomaton:
def __init__(self,ts:TS,ba:BuchiAutomata):
self.ts = ts
self.env = self.ts.env
self.ba = ba
self.generate()
def set_energies(self):
self.energies = dict()
for key in self.graph.keys():
for i, node in enumerate(self.accepting_nodes):
if key == node:
pass
path, min_dist = self.Dijkstra(key,node)
if i == 0:
self.energies[key] = min_dist
else:
self.energies[key] = min(self.energies[key],min_dist)
for node in self.accepting_nodes:
self.energies[node] = 0
# check if the observation at a node of TS satisfies a transition condition in BA
def sat(self,obs,transition):
# translating the specific BA transitions so that python understands it
tran = transition.split(" && ")
for t in tran:
if t[0] != "!" and obs != t:
return False
else:
if t[1:] == obs:
return False
return True
# using sat method, see which transitions are possible at each node
def potential_state_transitions(self,observation,current_state):
transitions = [] # here is where potential transitions given the observation are stored
edges = self.ba.graph[current_state]
for key in edges.keys():
if self.sat(observation,edges[key]): # check if the observation satisfies each available transition
transitions.append((key,edges[key]))
return transitions
# generate a new graph with all possible transitions for nodes generated by cartesian product
def generate(self):
self.accepting_nodes = list()
graph = dict()
for node in self.env.nodes: # go through all nodes of TS and states of BA
for state in self.ba.nodes:
obs = node.observation
edges = self.potential_state_transitions(obs,state) # get all potential transitions
key = node.name + "|" + state # create new node that combines node and state
value = dict() # store possible transitions here
for next_node in self.env.graph[node.name]: # go through all next nodes
for edge in edges: # go through all available next states
value[next_node+"|"+edge[0]] = self.env.graph[node.name][next_node]#store in transition graph
if edge[0][0:6] == "accept":
self.accepting_nodes.append(next_node+"|"+edge[0])
graph[key] = value # update overall graph
self.graph = graph
#print(graph)
# reusing method from TS
def Dijkstra(self,source,destination):
"""
:param source: string for first node
:param destination: string for last node
:return: tuple containing list of nodes travelled and list of output words
"""
#this code was modified from:
#https://algodaily.com/lessons/an-illustrated-guide-to-dijkstras-algorithm/python
reachable = True
unvisited = deepcopy(self.graph)
shortest_distances = {}
route = []
observation_route = []
path_nodes = {}
for nodes in unvisited:
shortest_distances[nodes] = math.inf
shortest_distances[source] = 0
while unvisited:
min_node = None
for current_node in unvisited:
if min_node is None:
min_node = current_node
elif shortest_distances[min_node] \
> shortest_distances[current_node]:
min_node = current_node
for (node, value) in unvisited[min_node].items():
if value + shortest_distances[min_node] \
< shortest_distances[node]:
shortest_distances[node] = value \
+ shortest_distances[min_node]
path_nodes[node] = min_node
unvisited.pop(min_node)
node = destination
while node != source:
try:
route.insert(0, node)
node = path_nodes[node]
except Exception:
#print('Path not reachable')
reachable = False
break
route.insert(0, source)
return route, shortest_distances[destination], reachable
def shortest_path(self,node):
#lets start with finite path
finite_path = list()
while self.energies[node] != 0: # keep going until reaching energy 0 (accepting)
finite_path.append(node) # add node to path
next_nodes = self.graph[node]
next_keys = list(next_nodes.keys()) # get next possible nodes
compare = self.energies[next_keys[0]] + self.graph[node][next_keys[0]] # get baseline energy to compare
# the next node is decided by weight + energy
temp = -1
for next in next_keys:
if next[:10] == node[:10]: # do not repeat node
pass
elif self.energies[next] + self.graph[node][next] > compare: # do not go to node at higher energy
pass
elif not bool(self.graph[next]):
pass
else:
temp = next
compare = self.energies[next] + self.graph[node][next] # update compare
if temp == -1:
raise Exception("Path is unreachable, sorry!") # exception to avoid infinite loop
node = temp
# now lets find an infinite path!
end_node = node # we'll end at the node we left off at
next_nodes_inf = self.graph[node]
next_node = list(next_nodes_inf.keys())[0] # let's start by finding the neighboring node to the accepting node
# that has the lowest energy
for next_inf in next_nodes_inf:
if self.energies[next_inf] < self.energies[next_node]:
next_node = next_inf
infinite_path, infinite_cost = self.Dijkstra(next_node,end_node) # now get path from there back to original end node
return finite_path, infinite_path