Skip to content

Commit

Permalink
it works for dtmcs + changed how labels and features work in pgc
Browse files Browse the repository at this point in the history
  • Loading branch information
PimLeerkes committed Nov 30, 2024
1 parent 17a443c commit efd58f9
Show file tree
Hide file tree
Showing 3 changed files with 136 additions and 137 deletions.
7 changes: 4 additions & 3 deletions stormvogel/model.py
Original file line number Diff line number Diff line change
Expand Up @@ -825,15 +825,16 @@ def new_state(
self,
labels: list[str] | str | None = None,
features: dict[str, int] | None = None,
name: str | None = None,
) -> State:
"""Creates a new state and returns it."""
state_id = self.__free_state_id()
if isinstance(labels, list):
state = State(labels, features or {}, state_id, self)
state = State(labels, features or {}, state_id, self, name=name)
elif isinstance(labels, str):
state = State([labels], features or {}, state_id, self)
state = State([labels], features or {}, state_id, self, name=name)
elif labels is None:
state = State([], features or {}, state_id, self)
state = State([], features or {}, state_id, self, name=name)

self.states[state_id] = state

Expand Down
189 changes: 63 additions & 126 deletions stormvogel/pgc.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,18 @@
import stormvogel.model
from dataclasses import dataclass
from typing import Callable
import math


@dataclass
class State:
x: int
f: dict[str, int]

def __hash__(self):
return hash(self.x)

def __eq__(self, other):
if isinstance(other, State):
return self.x == other.x
return self.f == other.f
return False


Expand All @@ -25,7 +24,7 @@ class Action:
def build_pgc(
delta, # Callable[[State, Action], list[tuple[float, State]]],
initial_state_pgc: State,
available_actions: Callable[[State], list[Action]],
available_actions: Callable[[State], list[Action]] | None = None,
modeltype: stormvogel.model.ModelType = stormvogel.model.ModelType.MDP,
) -> stormvogel.model.Model:
"""
Expand All @@ -38,149 +37,87 @@ def build_pgc(
(currently only works for mdps)
"""
if modeltype == stormvogel.model.ModelType.MDP and available_actions is None:
raise RuntimeError(
"You have to provide an available actions function for mdp models"
)

model = stormvogel.model.new_model(modeltype=modeltype, create_initial_state=False)

# we create the model with the given type and initial state
model.new_state(
labels=["init", str(initial_state_pgc.x)], features={"x": initial_state_pgc.x}
labels=["init"], features=initial_state_pgc.f, name=str(initial_state_pgc.f)
)

# we continue calling delta and adding new states until no states are
# left to be checked
states_discovered = []
states_to_be_discovered = [initial_state_pgc]
while len(states_to_be_discovered) > 0:
state = states_to_be_discovered[0]
states_to_be_discovered.remove(state)
states_discovered.append(state)
states_seen = []
states_to_be_visited = [initial_state_pgc]
while len(states_to_be_visited) > 0:
state = states_to_be_visited[0]
states_to_be_visited.remove(state)
# we loop over all available actions and call the delta function for each action
transition = {}
for action in available_actions(state):
try:
stormvogel_action = model.new_action(
str(action.labels),
frozenset(
{action.labels[0]}
), # right now we only look at one label
)
except RuntimeError:
stormvogel_action = model.get_action(str(action.labels))

tuples = delta(state, action)
# we add all the newly found transitions to the model (including the action)

if state not in states_seen:
states_seen.append(state)

if model.supports_actions():
assert available_actions is not None
for action in available_actions(state):
try:
stormvogel_action = model.new_action(
str(action.labels),
frozenset(
{action.labels[0]}
), # right now we only look at one label
)
except RuntimeError:
stormvogel_action = model.get_action(str(action.labels))

tuples = delta(state, action)
# we add all the newly found transitions to the model
branch = []
for tuple in tuples:
if tuple[1] not in states_seen:
states_seen.append(tuple[1])
new_state = model.new_state(
name=str(tuple[1].f), features=tuple[1].f
)
branch.append((tuple[0], new_state))
states_to_be_visited.append(tuple[1])
else:
# TODO what if there are multiple states with the same label? use names?
branch.append(
(tuple[0], model.get_state_by_name(str(tuple[1].f)))
)
if branch != []:
transition[stormvogel_action] = stormvogel.model.Branch(branch)
else:
tuples = delta(state)
# we add all the newly found transitions to the model
branch = []
for tuple in tuples:
if tuple[1] not in states_discovered:
if tuple[1] not in states_seen:
states_seen.append(tuple[1])
new_state = model.new_state(
labels=[str(tuple[1].x)], features={"x": tuple[1].x}
name=str(tuple[1].f), features=tuple[1].f
)

branch.append((tuple[0], new_state))
states_to_be_discovered.append(tuple[1])
states_to_be_visited.append(tuple[1])
else:
# TODO what if there are multiple states with the same label? use names?
branch.append(
(tuple[0], model.get_states_with_label(str(tuple[1].x))[0])
branch.append((tuple[0], model.get_state_by_name(str(tuple[1].f))))
if branch != []:
transition[stormvogel.model.EmptyAction] = stormvogel.model.Branch(
branch
)
if branch != []:
transition[stormvogel_action] = stormvogel.model.Branch(branch)
s = model.get_state_by_name(str(state.f))
assert s is not None
model.add_transitions(
model.get_states_with_label(str(state.x))[0],
s,
stormvogel.model.Transition(transition),
)

return model


"""
def build(
delta: Callable[
[stormvogel.model.State, stormvogel.model.Action],
list[tuple[float, stormvogel.model.State]],
],
available_actions: Callable[
[stormvogel.model.State], list[stormvogel.model.Action]
],
initial_value: str,
modeltype: stormvogel.model.ModelType | None = stormvogel.model.ModelType.MDP,
) -> stormvogel.model.Model:
#function that converts a delta function, an available_actions function an initial state and a model type
#to a stormvogel model
#this works analogous to a prism file, where the delta is the module in this case.
# we create the model with the given type and initial state
model = stormvogel.model.new_model(modeltype, create_initial_state=False)
model.new_state(features=[initial_state.features])
# we continue calling delta and adding new states until no states are
# left to be checked
states = [initial_state]
for state in states:
# we loop over all available actions and call the delta function for each action
transition = {}
for action in available_actions(state):
tuples = delta(state, action)
# we add all the newly found transitions to the model (including the action)
branch = []
for tuple in tuples:
branch.append(tuple)
if tuple[1] not in states:
states.add(tuple[1])
transition[action] = branch
model.add_transitions(state, stormvogel.model.Transition(transition))
return model
"""

if __name__ == "__main__":
N = 100
p = 0.5
initial_state = State(math.floor(N / 2))

left = Action(["left"])
right = Action(["right"])

def available_actions(s: State) -> list[Action]:
return [left, right]

def delta(s: State, action: Action):
if action == left:
return [(p, State(x=s.x + 1)), (1 - p, State(x=s.x))] if s.x < N else []
elif action == right:
return [(p, State(x=s.x - 1)), (1 - p, State(x=s.x))] if s.x > 0 else []

model = build_pgc(
delta=delta,
available_actions=available_actions,
initial_state_pgc=initial_state,
)

print(model)

# Do we also want to be able to do it with the stormvogel.model objects?
"""
N = 10
p = 0.5
initial_value = 5
left = stormvogel.model.Action("",["left"])
right = stormvogel.modle.Action("",["right"])
def available_actions(s: stormvogel.model.State) -> list[stormvogel.model.Action]:
return [left, right]
def delta(s: stormvogel.model.State, action: stormvogel.model.Action) -> stormvogel.model.State:
if action == left:
return [(p, stormvogel.model.State(features={x:s.x + 1})), (1 - p, stormvogel.model.State(features=["s.x"]))]
elif action == right:
return [(p, stormvogel.model.State(features=["s.x - 1"])), (1 - p, stormvogel.model.State(features=["s.x"]))]
model = build(delta, available_actions, initial_value, modeltype)
print(model)
"""
77 changes: 69 additions & 8 deletions tests/test_pgc.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
from stormvogel import pgc
import stormvogel.model
import math
import stormpy
import stormvogel.mapping


def test_pgc_mdp():
# we build the model with pgc:
N = 2
p = 0.5
initial_state = pgc.State(math.floor(N / 2))
initial_state = pgc.State({"x": math.floor(N / 2)})

left = pgc.Action(["left"])
right = pgc.Action(["right"])
Expand All @@ -17,15 +19,23 @@ def available_actions(s: pgc.State):

def delta(s: pgc.State, action: pgc.Action):
if action == left:
print(s.f)
print(s.f["x"])
return (
[(p, pgc.State(x=s.x + 1)), (1 - p, pgc.State(x=s.x))]
if s.x < N
[
(p, pgc.State({"x": s.f["x"] + 1})),
(1 - p, pgc.State({"x": s.f["x"]})),
]
if s.f["x"] < N
else []
)
elif action == right:
return (
[(p, pgc.State(x=s.x - 1)), (1 - p, pgc.State(x=s.x))]
if s.x > 0
[
(p, pgc.State({"x": s.f["x"] - 1})),
(1 - p, pgc.State({"x": s.f["x"]})),
]
if s.f["x"] > 0
else []
)

Expand All @@ -37,9 +47,9 @@ def delta(s: pgc.State, action: pgc.Action):

# we build the model in the regular way:
model = stormvogel.model.new_mdp(create_initial_state=False)
state1 = model.new_state(labels=["init", "1"], features={"x": 1})
state2 = model.new_state(labels=["2"], features={"x": 2})
state0 = model.new_state(labels=["0"], features={"0": 0})
state1 = model.new_state(labels=["init"], features={"x": 1})
state2 = model.new_state(features={"x": 2})
state0 = model.new_state(features={"0": 0})
left = model.new_action("left", frozenset({"left"}))
right = model.new_action("right", frozenset({"right"}))
branch11 = stormvogel.model.Branch([(0.5, state1), (0.5, state2)])
Expand All @@ -53,4 +63,55 @@ def delta(s: pgc.State, action: pgc.Action):
model.add_transitions(state2, stormvogel.model.Transition({right: branch2}))
model.add_transitions(state0, stormvogel.model.Transition({left: branch0}))

print(model)
print(pgc_model)

assert model == pgc_model


def test_pgc_dtmc():
# we build the model with pgc:
p = 0.5
initial_state = pgc.State({"s": 0})

def delta(s: pgc.State):
match s.f["s"]:
case 0:
return [(p, pgc.State({"s": 1})), (1 - p, pgc.State({"s": 2}))]
case 1:
return [(p, pgc.State({"s": 3})), (1 - p, pgc.State({"s": 4}))]
case 2:
return [(p, pgc.State({"s": 5})), (1 - p, pgc.State({"s": 6}))]
case 3:
return [(p, pgc.State({"s": 1})), (1 - p, pgc.State({"s": 7, "d": 1}))]
case 4:
return [
(p, pgc.State({"s": 7, "d": 2})),
(1 - p, pgc.State({"s": 7, "d": 3})),
]
case 5:
return [
(p, pgc.State({"s": 7, "d": 4})),
(1 - p, pgc.State({"s": 7, "d": 5})),
]
case 6:
return [(p, pgc.State({"s": 2})), (1 - p, pgc.State({"s": 7, "d": 6}))]
case 7:
return [(1, pgc.State({"s": 7}))]

pgc_model = stormvogel.pgc.build_pgc(
delta=delta,
initial_state_pgc=initial_state,
modeltype=stormvogel.model.ModelType.DTMC,
)

# we build the model in the regular way:
path = stormpy.examples.files.prism_dtmc_die
prism_program = stormpy.parse_prism_program(path)
formula_str = "P=? [F s=7 & d=2]"
properties = stormpy.parse_properties(formula_str, prism_program)
model = stormpy.build_model(prism_program, properties)
print(dir(model.states[0]))
stormvogel_model = stormvogel.mapping.stormpy_to_stormvogel(model)
print(pgc_model)
print(stormvogel_model)

0 comments on commit efd58f9

Please sign in to comment.