diff --git a/case_studies/uav_topologies_generation/src/contracts_utils/misc.py b/case_studies/uav_topologies_generation/src/contracts_utils/misc.py new file mode 100644 index 00000000..d4f11aac --- /dev/null +++ b/case_studies/uav_topologies_generation/src/contracts_utils/misc.py @@ -0,0 +1,34 @@ +from pacti.iocontract import IoContract + + +def get_equalized_alphabets(c1: IoContract, c2: IoContract) -> tuple[IoContract, IoContract]: + input_int = set(c1.inputvars) | set(c2.inputvars) + output_uni = set(c1.outputvars) | set(c2.outputvars) + + c1_mod = IoContract(input_vars=list(input_int), assumptions=c1.a, output_vars=list(output_uni), guarantees=c1.g) + + c2_mod = IoContract(input_vars=list(input_int), assumptions=c2.a, output_vars=list(output_uni), guarantees=c2.g) + + return c1_mod, c2_mod + + +def get_equalized_abstracted_contracts(c1: IoContract, c2: IoContract) -> tuple[IoContract, IoContract]: + """ + It takes computes the intersection input variables of c1 and c2 (i.e. 'input_int') and + the union of the output variables 'output_uni'. It then modifies c1 and c2 by: + * Keeping only the assumptions that refer to variables in 'input_int; + * Extending the guarantees to all the variables in 'output_uni' + + """ + + input_int = set(c1.inputvars) & set(c2.inputvars) + output_uni = set(c1.outputvars) | set(c2.outputvars) + + new_a1 = c1.a.get_terms_with_vars(list(input_int)) + new_a2 = c2.a.get_terms_with_vars(list(input_int)) + + c1_mod = IoContract(input_vars=list(input_int), assumptions=new_a1, output_vars=list(output_uni), guarantees=c1.g) + + c2_mod = IoContract(input_vars=list(input_int), assumptions=new_a2, output_vars=list(output_uni), guarantees=c2.g) + + return c1_mod, c2_mod diff --git a/case_studies/uav_topologies_generation/src/contracts_utils/union.py b/case_studies/uav_topologies_generation/src/contracts_utils/union.py new file mode 100644 index 00000000..e32a914a --- /dev/null +++ b/case_studies/uav_topologies_generation/src/contracts_utils/union.py @@ -0,0 +1,213 @@ +from __future__ import annotations + +from dataclasses import dataclass, field + +from case_studies.uav_topologies_generation.src.contracts_utils.misc import get_equalized_alphabets +from pacti.iocontract import IoContract, Var +from pacti.utils.lists import list_intersection, list_union + + +def check_variables_containment(contract_1: IoContract, contract_2: IoContract): + """Returns True if all variables in contract_1 are also in contract_2""" + for v in contract_1.vars: + if v not in contract_2.vars: + return False + return True + + +@dataclass +class ContractsUnions: + """Store a set of IoContracts""" + + contracts: set[IoContract] = field(default_factory=set) + name: str = "" + + def get_inverted(self) -> ContractsUnions: + + inverted_contracts = ContractsUnions(name=self.name) + + for contract in self.contracts: + from pacti.terms.polyhedra import PolyhedralTermList + new_contract = IoContract(assumptions=contract.g, + guarantees=PolyhedralTermList(), output_vars=[], + input_vars=contract.outputvars) + inverted_contracts.contracts.add(new_contract) + + return inverted_contracts + + def __le__(self, other: IoContract | ContractsUnions) -> bool: + """ + A ContractsUnions Cu refines a IoContract C' if any of the IoContracts in Cu refine C' + A ContractsUnions Cu refines a ContractsUnions Cu' if any of the IoContracts in Cu refine any of the IoContracts in Cu' + """ + + for contract in self.contracts: + if isinstance(other, IoContract): + contract_eq, other_eq = get_equalized_alphabets(contract, other) + if not contract_eq <= other_eq: + print("REFINEMENT NOT VERIFIED [1]") + print(self) + print("<=") + print(other) + return False + else: + """Each contract must refine at least one contract in other.contracts""" + found = False + for other_c in other.contracts: + contract_eq, other_c_eq = get_equalized_alphabets(contract, other_c) + print(contract_eq) + print(other_c_eq) + if contract_eq <= other_c_eq: + found = True + break + if not found: + print("REFINEMENT NOT VERIFIED [2]") + print(self) + print("<=") + print(other) + return False + print("REFINEMENT VERIFIED") + print(self) + print("<=") + print(other) + return True + + def __str__(self): + + assumptions = [str(c.a).replace('in', '').replace('1*', '') for c in self.contracts] + guarantees = [str(c.g).replace('in', '').replace('1*', '') for c in self.contracts] + a = "\t | \t".join(assumptions) + g = "\t | \t".join(guarantees) + return f"A: {a}\nG: {g}" + + def add(self, other: IoContract): + self.contracts.add(other) + + # def _refines_at_least_one_io_contract(self, other: IoContract): + # for c in self.contracts: + # """Equalizing inputs/outputs symbols""" + # + # vars_not_preset = False + # for v in c.vars: + # if v not in other.vars: + # vars_not_preset = True + # break + # if vars_not_preset: + # continue + # + # # c_mod, other_mod = get_equalized_abstracted_contracts(c, other) + # c_mod, other_mod = get_equalized_alphabets(c, other) + # + # if not isinstance(c, IoContract): + # raise AttributeError + # if not isinstance(other, IoContract): + # raise AttributeError + # try: + # refine = c_mod <= other_mod + # except Exception as e: + # writeContract(c, f"C_exc_lhs") + # writeContract(other, f"C_exc_rhs") + # raise e + # if not refine: + # return False + # # else: + # # print("\t" + str(c_mod.g)) + # # print("\t" + str(other_mod.g) + "\n") + # print("All refine") + # return True + + def can_be_composed_with_old(self, other: ContractsUnions): + """Returns true if any element of self can be composed with all the elements of other""" + for c_self in self.contracts: + next = False + for c_other in other.contracts: + try: + c = c_self.compose(c_other) + print("\n") + print(str(c_self.a).replace('in', '').replace('1*', '')) + print(str(c_other.g).replace('in', '').replace('1*', '')) + print("TRUE") + except Exception as e: + print("\n") + print(str(c_self.a).replace('in', '').replace('1*', '')) + print(str(c_other.g).replace('in', '').replace('1*', '')) + print("FALSE") + next = True + break + if next: + continue + else: + return True + return False + + def can_be_composed_with(self, other: ContractsUnions): + """Returns true if every element of other can be composed with at least one the elements of self""" + for c_other in other.contracts: + next = False + for c_self in self.contracts: + try: + c_other.compose(c_self) + # print("\n") + # print(str(c_self.a).replace('in', '').replace('1*', '')) + # print(str(c_other.g).replace('in', '').replace('1*', '')) + # print("TRUE") + next = True + break + except Exception as e: + pass + # print("\n") + # print(str(c_self.a).replace('in', '').replace('1*', '')) + # print(str(c_other.g).replace('in', '').replace('1*', '')) + # print("FALSE") + if next: + continue + else: + return False + return True + + def get_relaxed(self, other: ContractsUnions) -> ContractsUnions: + """Returns a new ContractsUnions that only contains variables also in 'other'""" + new_contracts = ContractsUnions(name=self.name) + + allowed_ins = other.inputs + allowed_out = other.outputs + allowed_vars = list_union(other.inputs, other.outputs) + + for contract in self.contracts: + new_ins = list_intersection(contract.inputvars, allowed_vars) + new_out = list_intersection(contract.outputvars, allowed_vars) + + new_a = contract.a.get_terms_with_vars(new_ins) + new_g = contract.g.get_terms_with_vars(new_out) + + new_io_contract = IoContract(assumptions=new_a, guarantees=new_g, input_vars=new_ins, output_vars=new_out) + new_contracts.add(new_io_contract) + + return new_contracts + + @property + def vars(self) -> list[Var]: + c_vars = [] + for c in self.contracts: + c_vars.extend(c.vars) + return list(set(c_vars)) + + @property + def inputs(self) -> list[Var]: + c_vars = [] + for c in self.contracts: + c_vars.extend(c.inputvars) + return c_vars + + @property + def outputs(self) -> list[Var]: + c_vars = [] + for c in self.contracts: + c_vars.extend(c.outputvars) + return c_vars + + def __hash__(self): + all_hashes = [] + for c in self.contracts: + all_hashes = c.__hash__() + return hash(all_hashes) diff --git a/case_studies/uav_topologies_generation/src/grammar/grammar.py b/case_studies/uav_topologies_generation/src/grammar/grammar.py index a7f145bd..806cefe4 100644 --- a/case_studies/uav_topologies_generation/src/grammar/grammar.py +++ b/case_studies/uav_topologies_generation/src/grammar/grammar.py @@ -6,8 +6,8 @@ from pathlib import Path from .rule import Rule +from ..contracts_utils.union import ContractsUnions from ..shared import SymbolType -from pacti.utils.contracts_union import ContractsUnions @dataclass diff --git a/case_studies/uav_topologies_generation/src/grammar/grid.py b/case_studies/uav_topologies_generation/src/grammar/grid.py index 3658cc4b..9fa46e1e 100644 --- a/case_studies/uav_topologies_generation/src/grammar/grid.py +++ b/case_studies/uav_topologies_generation/src/grammar/grid.py @@ -5,15 +5,14 @@ from matplotlib.figure import Figure +from case_studies.uav_topologies_generation.src.contracts_utils.union import ContractsUnions +from pacti.terms.polyhedra import PolyhedralContract from .figures import DirectionsGrid from ..tools.plotting import plot_3d_grid from .rule import Rule from .symbols import Symbol, Unoccupied, Fuselage, Empty, Rotor, Wing, Connector from ..shared import Direction, SymbolType, symbols_colors from ..tools.constraints import from_symbol_directions_to_constraints -from pacti.terms.polyhedra import string_to_polyhedra_contract -from pacti.utils.contracts_union import ContractsUnions -from pacti.utils.string_contract import StrContract @dataclass @@ -136,9 +135,17 @@ def contract(self) -> ContractsUnions: contract_union = ContractsUnions() for constraint in constraints: # new_c = StrContract(assumptions=constraint, inputs=list(symbols)) - new_c = StrContract(guarantees=constraint, outputs=list(symbols)) + # new_c = StrContract(guarantees=constraint, outputs=list(symbols)) # print(constraint) - io_contract = string_to_polyhedra_contract(new_c) + # io_contract = string_to_polyhedra_contract(new_c) + + io_contract = PolyhedralContract.from_string( + OutputVars=list(symbols), + InputVars=[], + assumptions=[], + guarantees=constraint) + + contract_union.contracts.add(io_contract) return contract_union diff --git a/case_studies/uav_topologies_generation/src/grammar/rule.py b/case_studies/uav_topologies_generation/src/grammar/rule.py index 2d16f496..ee4dd56f 100644 --- a/case_studies/uav_topologies_generation/src/grammar/rule.py +++ b/case_studies/uav_topologies_generation/src/grammar/rule.py @@ -6,12 +6,11 @@ from matplotlib.figure import Figure +from pacti.terms.polyhedra import PolyhedralContract from .figures import DirectionsGrid +from ..contracts_utils.union import ContractsUnions from ..shared import Direction, SymbolType, symbols_colors from ..tools.constraints import from_symbol_directions_to_constraints -from pacti.terms.polyhedra import string_to_polyhedra_contract -from pacti.utils.contracts_union import ContractsUnions -from pacti.utils.string_contract import StrContract @dataclass @@ -45,9 +44,15 @@ def contract(self) -> ContractsUnions: contract_union = ContractsUnions(name=self.name) for constraint in constraints: # new_c = StrContract(assumptions=constraint, inputs=list(symbols)) - new_c = StrContract(assumptions=constraint, inputs=list(symbols)) # print(new_c) - io_contract = string_to_polyhedra_contract(new_c) + print(constraint) + print(list(symbols)) + io_contract = PolyhedralContract.from_string( + InputVars=list(symbols), + OutputVars=[], + assumptions=constraint, + guarantees=[]) + contract_union.contracts.add(io_contract) return contract_union diff --git a/case_studies/uav_topologies_generation/src/main.py b/case_studies/uav_topologies_generation/src/main.py index b13b61b2..f2a10ccc 100644 --- a/case_studies/uav_topologies_generation/src/main.py +++ b/case_studies/uav_topologies_generation/src/main.py @@ -1,13 +1,13 @@ import random from datetime import datetime +from case_studies.uav_topologies_generation.src.contracts_utils.union import ContractsUnions from case_studies.uav_topologies_generation.src.grammar.grammar import Grammar from case_studies.uav_topologies_generation.src.grammar.grid import GridBuilder from case_studies.uav_topologies_generation.src.shared import DirectionsAssignment, SymbolType from case_studies.uav_topologies_generation.src.shared.paths import rules_path from case_studies.uav_topologies_generation.src.tools.refinement_checking import rule_matching from case_studies.uav_topologies_generation.src.tools.simple_rule_search import set_rules_search -from pacti.utils.contracts_union import ContractsUnions grid_half_size = 2 max_num_wings = 2 diff --git a/case_studies/uav_topologies_generation/src/tools/constraints.py b/case_studies/uav_topologies_generation/src/tools/constraints.py index 9e31c6c8..561dfae1 100644 --- a/case_studies/uav_topologies_generation/src/tools/constraints.py +++ b/case_studies/uav_topologies_generation/src/tools/constraints.py @@ -16,7 +16,7 @@ def constraint_str_between_integers(symbol: str, cluster: tuple[int, int]) -> li def constraint_str_greater_eq_than(symbol: str, n: int) -> str: - return f"-1 * {symbol} <= -1 * {n}" + return f"-1 * {symbol} <= -{n}" def constraint_str_less_eq_than(symbol: str, n: int) -> str: diff --git a/case_studies/uav_topologies_generation/src/tools/refinement_checking.py b/case_studies/uav_topologies_generation/src/tools/refinement_checking.py index 2edb501b..fd8524cb 100644 --- a/case_studies/uav_topologies_generation/src/tools/refinement_checking.py +++ b/case_studies/uav_topologies_generation/src/tools/refinement_checking.py @@ -1,4 +1,4 @@ -from pacti.utils.contracts_union import ContractsUnions +from case_studies.uav_topologies_generation.src.contracts_utils.union import ContractsUnions def rule_matching(state_contracts: ContractsUnions, rules_contracts: list[ContractsUnions]) -> set[ContractsUnions]: