diff --git a/interface_tester/__init__.py b/interface_tester/__init__.py index 6e78801..599293e 100644 --- a/interface_tester/__init__.py +++ b/interface_tester/__init__.py @@ -2,9 +2,8 @@ # See LICENSE file for licensing details. import pytest -from .interface_test import interface_test_case # noqa: F401 -from .plugin import InterfaceTester -from .schema_base import DataBagSchema # noqa: F401 +from interface_tester.plugin import InterfaceTester +from interface_tester.schema_base import DataBagSchema # noqa: F401 @pytest.fixture(scope="function") diff --git a/interface_tester/cli/discover.py b/interface_tester/cli/discover.py index d4685b2..4ea0dc2 100644 --- a/interface_tester/cli/discover.py +++ b/interface_tester/cli/discover.py @@ -1,9 +1,9 @@ from pathlib import Path +from typing import Callable import typer from interface_tester.collector import _CharmTestConfig, collect_tests -from interface_tester.interface_test import SchemaConfig, _InterfaceTestCase def pprint_tests( @@ -22,12 +22,11 @@ def _pprint_tests(path: Path = Path(), include="*"): tests = collect_tests(path=path, include=include) print("Discovered:") - def pprint_case(case: "_InterfaceTestCase"): - state = "yes" if case.input_state else "no" - schema_config = case.schema if isinstance(case.schema, SchemaConfig) else "custom" - print(f" - {case.name}:: {case.event} (state={state}, schema={schema_config})") + def pprint_case(case: Callable): + print(f" - {case.__name__}") - for interface, versions in tests.items(): + # sorted by interface first, version then + for interface, versions in sorted(tests.items()): if not versions: print(f"{interface}: ") print() @@ -35,19 +34,20 @@ def pprint_case(case: "_InterfaceTestCase"): print(f"{interface}:") - for version, roles in versions.items(): + for version, roles in sorted(versions.items()): print(f" - {version}:") by_role = {role: roles[role] for role in {"requirer", "provider"}} - for role, test_spec in by_role.items(): + for role, test_spec in sorted(by_role.items()): print(f" - {role}:") tests = test_spec["tests"] schema = test_spec["schema"] - for test_cls in tests: + for test_cls in sorted(tests, key=lambda fn: fn.__name__): pprint_case(test_cls) + if not tests: print(" - ") @@ -61,7 +61,7 @@ def pprint_case(case: "_InterfaceTestCase"): if charms: print(" - charms:") charm: _CharmTestConfig - for charm in charms: + for charm in sorted(charms): if isinstance(charm, str): print(" - ") continue diff --git a/interface_tester/collector.py b/interface_tester/collector.py index 70fa626..71089e0 100644 --- a/interface_tester/collector.py +++ b/interface_tester/collector.py @@ -11,20 +11,19 @@ """ import dataclasses import importlib +import inspect import json import logging import sys import types from pathlib import Path -from typing import TYPE_CHECKING, Dict, List, Literal, Optional, Type, TypedDict +from typing import Callable, Dict, List, Literal, Optional, Type, TypedDict import pydantic import yaml -from .interface_test import DataBagSchema, Role, get_registered_test_cases - -if TYPE_CHECKING: - from .interface_test import _InterfaceTestCase +from interface_tester.interface_test import Role +from interface_tester.schema_base import DataBagSchema logger = logging.getLogger("interface_tests_checker") @@ -71,7 +70,7 @@ class _CharmsDotYamlSpec(TypedDict): class _RoleTestSpec(TypedDict): """The tests, schema, and charms for a single role of a given relation interface version.""" - tests: List["_InterfaceTestCase"] + tests: List[Callable[[None], None]] schema: Optional[Type[DataBagSchema]] charms: List[_CharmTestConfig] @@ -125,13 +124,13 @@ def load_schema_module(schema_path: Path) -> types.ModuleType: def get_schemas(file: Path) -> Dict[Literal["requirer", "provider"], Type[DataBagSchema]]: """Load databag schemas from schema.py file.""" if not file.exists(): - logger.warning(f"File does not exist: {file}") + logger.warning("File does not exist: %s" % file) return {} try: module = load_schema_module(file) except ImportError as e: - logger.error(f"Failed to load module {file}: {e}") + logger.error("Failed to load module %s: %s" % (file, e)) return {} out = {} @@ -140,12 +139,12 @@ def get_schemas(file: Path) -> Dict[Literal["requirer", "provider"], Type[DataBa out[role] = get_schema_from_module(module, name) except NameError: logger.warning( - f"Failed to load {name} from {file}: " f"schema not defined for role: {role}." + "Failed to load %s from %s: schema not defined for role: %s." % (name, file, role) ) except TypeError as e: logger.error( - f"Found object called {name!r} in {file}; " - f"expecting a DataBagSchema subclass, not {e.args[0]!r}." + "Found object called %s in %s; expecting a DataBagSchema subclass, not %s." + % (name, file, e.args[0]) ) return out @@ -163,9 +162,9 @@ def _gather_charms_for_version(version_dir: Path) -> Optional[_CharmsDotYamlSpec try: charms = yaml.safe_load(charms_yaml.read_text()) except (json.JSONDecodeError, yaml.YAMLError) as e: - logger.error(f"failed to decode {charms_yaml}: " f"verify that it is valid: {e}") + logger.error("failed to decode %s: verify that it is valid yaml: %s" % (charms_yaml, e)) except FileNotFoundError as e: - logger.error(f"not found: {e}") + logger.error("not found: %s" % e) if not charms: return None @@ -188,8 +187,8 @@ def _gather_charms_for_version(version_dir: Path) -> Optional[_CharmsDotYamlSpec cfg = _CharmTestConfig(**item) except TypeError: logger.error( - f"failure parsing {item} to _CharmTestConfig; invalid charm test " - f"configuration in {version_dir}/charms.yaml:providers" + "failure parsing %s to _CharmTestConfig; invalid charm test " + "configuration in %s/charms.yaml:providers" % (item, version_dir) ) continue destination.append(cfg) @@ -198,6 +197,14 @@ def _gather_charms_for_version(version_dir: Path) -> Optional[_CharmsDotYamlSpec return spec +def _scrape_module_for_tests(module: types.ModuleType) -> List[Callable[[None], None]]: + tests = [] + for name, obj in inspect.getmembers(module): + if inspect.isfunction(obj): + tests.append(obj) + return tests + + def _gather_test_cases_for_version(version_dir: Path, interface_name: str, version: int): """Collect interface test cases from a directory containing an interface version spec.""" @@ -210,24 +217,23 @@ def _gather_test_cases_for_version(version_dir: Path, interface_name: str, versi # so we can import without tricks sys.path.append(str(interface_tests_dir)) - for possible_test_file in interface_tests_dir.glob("*.py"): - # strip .py - module_name = str(possible_test_file.with_suffix("").name) + for role in Role: + module_name = "test_requirer" if role is Role.requirer else "test_provider" try: - importlib.import_module(module_name) + module = importlib.import_module(module_name) except ImportError as e: - logger.error(f"Failed to load module {possible_test_file}: {e}") + logger.error("Failed to load module %s: %s" % (module_name, e)) continue - cases = get_registered_test_cases() + tests = _scrape_module_for_tests(module) + del sys.modules[module_name] - # print(cases) - provider_test_cases.extend(cases[(interface_name, version, Role.provider)]) - requirer_test_cases.extend(cases[(interface_name, version, Role.requirer)]) + tgt = provider_test_cases if role is Role.provider else requirer_test_cases + tgt.extend(tests) if not (requirer_test_cases or provider_test_cases): - logger.error(f"no valid test case files found in {interface_tests_dir}") + logger.error("no valid test case files found in %s" % interface_tests_dir) # remove from import search path sys.path.pop(-1) @@ -277,7 +283,9 @@ def _gather_tests_for_interface( try: version_n = int(version_dir.name[1:]) except TypeError: - logger.error(f"Unable to parse version {version_dir.name} as an integer. Skipping...") + logger.error( + "Unable to parse version %s as an integer. Skipping..." % version_dir.name + ) continue tests[version_dir.name] = gather_test_spec_for_version( version_dir, interface_name, version_n @@ -298,14 +306,14 @@ def collect_tests(path: Path, include: str = "*") -> Dict[str, Dict[str, Interfa - name: foo url: www.github.com/canonical/foo """ - logger.info(f"collecting tests from {path}:{include}") + logger.info("collecting tests from %s: %s" % (path, include)) tests = {} for interface_dir in (path / "interfaces").glob(include): interface_dir_name = interface_dir.name if interface_dir_name.startswith("__"): # ignore __template__ and python-dirs continue # skip - logger.info(f"collecting tests for interface {interface_dir_name}") + logger.info("collecting tests for interface %s" % interface_dir_name) interface_name = interface_dir_name.replace("-", "_") tests[interface_name] = _gather_tests_for_interface(interface_dir, interface_name) diff --git a/interface_tester/errors.py b/interface_tester/errors.py index b199340..b23d648 100644 --- a/interface_tester/errors.py +++ b/interface_tester/errors.py @@ -14,3 +14,7 @@ class InterfaceTestsFailed(RuntimeError): class NoTestsRun(RuntimeError): """Raised if no interface test was collected during a run() call.""" + + +class SchemaValidationError(RuntimeError): + """Raised when schema validation fails on one or more relations.""" diff --git a/interface_tester/interface_test.py b/interface_tester/interface_test.py index a5b56f6..e4ebd77 100644 --- a/interface_tester/interface_test.py +++ b/interface_tester/interface_test.py @@ -3,21 +3,26 @@ import dataclasses import inspect import logging +import operator import re import typing -from collections import defaultdict +from contextlib import contextmanager from enum import Enum -from typing import Callable, Dict, List, Literal, Optional, Tuple, Union +from typing import Any, Callable, List, Literal, Optional, Union -from scenario import Event, State +from ops.testing import CharmType +from pydantic import ValidationError +from scenario import Context, Event, Relation, State -from .schema_base import DataBagSchema +from interface_tester.errors import InvalidTestCaseError, SchemaValidationError + +RoleLiteral = Literal["requirer", "provider"] if typing.TYPE_CHECKING: InterfaceNameStr = str VersionInt = int - RoleLiteral = Literal["requirer", "provider"] _SchemaConfigLiteral = Literal["default", "skip", "empty"] + from interface_tester import DataBagSchema INTF_NAME_AND_VERSION_REGEX = re.compile(r"/interfaces/(\w+)/v(\d+)/") @@ -28,94 +33,40 @@ class InvalidTestCase(RuntimeError): """Raised if a function decorated with interface_test_case is invalid.""" -class SchemaConfig(str, Enum): - """Class used to program the schema validator run that is paired with each test case.""" - - default = "default" - """Use this value if you want the test case to validate the output state's databags with the - default schema.""" - skip = "skip" - """Use this value if you want the test case skip schema validation altogether.""" - empty = "empty" - """Use this value if you want the databag validator to assert that the databags are empty.""" - - class Role(str, Enum): provider = "provider" requirer = "requirer" @dataclasses.dataclass -class _InterfaceTestCase: +class _InterfaceTestContext: """Data associated with a single interface test case.""" interface_name: str """The name of the interface that this test is about.""" version: int """The version of the interface that this test is about.""" - event: Union[Event, str] - """The event that this test is about.""" role: Role - """The role (provider|requirer) that this test is about.""" - name: str - """Human-readable name of what this test does.""" - validator: Callable[[State], None] - """The function that will be called on the output state to validate it.""" + charm_type: CharmType + """Charm class being tested""" + supported_endpoints: dict + """Supported relation endpoints.""" + meta: Any + """Charm metadata.yaml""" + config: Any + """Charm config.yaml""" + actions: Any + """Charm actions.yaml""" + test_fn: Callable + """Test function.""" - schema: Union[DataBagSchema, SchemaConfig] = SchemaConfig.default - """Either a Pydantic schema for the unit/app databags of 'this side' of the relation, which - will be used to validate the relation databags in the output state, or: - - 'skip' to skip schema validation altogether - - 'empty' to have the schema validator assert that the databags should be empty. - """ + """The role (provider|requirer) that this test is about.""" + schema: Optional["DataBagSchema"] = None + """Databag schema to validate the output relation with.""" input_state: Optional[State] = None """Initial state that this test should be run with.""" - def run(self, output_state: State): - """Execute the test: that is, run the decorated validator against the output state.""" - return self.validator(output_state) - - -_TestCaseCacheType = Dict[Tuple["InterfaceNameStr", "VersionInt", Role], List[_InterfaceTestCase]] -# for each (interface_name, version, role) triplet: the list of all collected interface test cases. -REGISTERED_TEST_CASES: _TestCaseCacheType = defaultdict(list) - - -def get_registered_test_cases(clear: bool=False) -> _TestCaseCacheType: - """The test cases that have been registered so far.""" - tc = REGISTERED_TEST_CASES.copy() - if clear: - REGISTERED_TEST_CASES.clear() - return tc - - -def get_interface_name_and_version(fn: Callable) -> Tuple[str, int]: - f"""Return the interface name and version of a test case validator function. - - It assumes that the function is in a module whose path matches the following regex: - {INTF_NAME_AND_VERSION_REGEX} - - If that can't be matched, it will raise a InvalidTestCase. - """ - - file = inspect.getfile(fn) - match = INTF_NAME_AND_VERSION_REGEX.findall(file) - if len(match) != 1: - raise InvalidTestCase( - f"Can't determine interface name and version from test case location: {file}." - rf"expecting a file path matching '/interfaces/(\w+)/v(\d+)/' " - ) - interface_name, version_str = match[0] - try: - version_int = int(version_str) - except TypeError: - # overly cautious: the regex should already be only matching digits. - raise InvalidTestCase( - f"Unable to cast version {version_str} to integer. " f"Check file location: {file}." - ) - return interface_name, version_int - def check_test_case_validator_signature(fn: Callable): """Verify the signature of a test case validator function. @@ -148,48 +99,387 @@ def check_test_case_validator_signature(fn: Callable): ) -def interface_test_case( - role: Union[Role, "RoleLiteral"], - event: Union[str, Event], - input_state: Optional[State] = None, - name: str = None, - schema: Union[DataBagSchema, SchemaConfig, "_SchemaConfigLiteral"] = SchemaConfig.default, -): - """Decorator to register a function as an interface test case. - The decorated function must take exactly one positional argument of type State. - - Arguments: - :param name: the name of the test. Will default to the decorated function's identifier. - :param event: the event that this test is about. - :param role: the interface role this test is about. - :param input_state: the input state for this scenario test. Will default to the empty State(). - :param schema: the schema that the relation databags for the endpoint being tested should - satisfy **after** the event has been processed. - """ - if not isinstance(schema, DataBagSchema): - schema = SchemaConfig(schema) - - def wrapper(fn: Callable[[State], None]): - # validate that the function is a valid validator - check_test_case_validator_signature(fn) - - # derive from the module the function is defined in what the - # interface name and version are - interface_name, version = get_interface_name_and_version(fn) - - role_ = Role(role) - - REGISTERED_TEST_CASES[(interface_name, version, role_)].append( - _InterfaceTestCase( - interface_name=interface_name, - version=version, - event=event, - role=role_, - validator=fn, - name=name or fn.__name__, - input_state=input_state, - schema=schema, +_TESTER_CTX: Optional[_InterfaceTestContext] = None + + +@contextmanager +def tester_context(ctx: _InterfaceTestContext): + global _TESTER_CTX + _TESTER_CTX = ctx + + try: + yield + except Exception: + tester = Tester.__instance__ + + if tester: + tester._detach() + + _TESTER_CTX = None + raise + + tester = Tester.__instance__ + + if not tester: + raise NoTesterInstanceError(f"Invalid test: {ctx.test_fn} did not instantiate Tester.") + + try: + tester._finalize() + finally: + tester._detach() + _TESTER_CTX = None + + if Tester.__instance__: + raise RuntimeError("cleanup failed, tester instance still bound") + + +class InvalidTesterRunError(RuntimeError): + """Raised if Tester is being used incorrectly.""" + + def __init__(self, test_name: str, msg: str): + _msg = f"failed running {test_name}: invalid test. {msg}" + super().__init__(_msg) + + +class NoTesterInstanceError(RuntimeError): + """Raised if no Tester is created within a tester_context scope.""" + + +class NoSchemaError(InvalidTesterRunError): + """Raised when schemas cannot be validated because there is no schema.""" + + +class Tester: + __instance__ = None + + def __init__(self, state_in: Optional[State] = None, name: Optional[str] = None): + """Core interface test specification tool. + + This class is essential to defining an interface test to be used in the + ``charm-relation-interfaces`` repository. In order to define a valid interface + test you will need to: + + a) Initialize this class in the scope of an interface test to specify the scenario's + initial state. Then b) call its ``run`` method to execute scenario, and finally + c) validate the schema. + + Failing to take any of these three steps will result in an invalid test. + If an error is raised during execution of these steps, or manually from elsewhere in the + test scope, the test will fail. + + usage: + >>> def test_foo_relation_joined(): + >>> t = Tester(state_in=State()) # specify the initial state + >>> state_out = t.run('foo-relation-joined') # run Scenario and get the output state + >>> t.assert_schema_valid() # check that the schema is valid + + You can run assertions on ``state_out``, if the interface specification makes + claims on its contents. + + Alternatively to calling ``assert_schema_valid``, you can: + 1) define your own schema subclassing ``DataBagSchema`` + >>> from interface_tester.schema_base import DataBagSchema, BaseModel + >>> class CustomAppModel(BaseModel): + >>> foo: int + >>> bar: str + >>> + >>> class SomeCustomSchema(DataBagSchema): + >>> app: CustomAppModel + And then pass it to assert_schema_valid to override the default schema + (the one defined in ``schema.py``). + >>> t.assert_schema_valid(SomeCustomSchema) + 2) check that all local databags are empty (same as ``assert_schema_valid(DataBagSchema)``) + >>> t.assert_relation_data_empty() + 3) skip schema validation altogether if you know better + >>> t.skip_schema_validation() + + :param state_in: the input state for this scenario test. + Will default to the empty ``State()``. + :param name: the name of the test. Will default to the function's + identifier (``__name__``). + """ + # todo: pythonify + if Tester.__instance__: + raise RuntimeError("Tester is a singleton.") + Tester.__instance__ = self + + if not self.ctx: + raise RuntimeError("Tester can only be initialized inside an interface test context.") + + self._state_template = None + self._state_in = state_in or State() + self._test_name = name or self.ctx.test_fn.__name__ + + self._state_out = None # will be State when has_run is true + self._has_run = False + self._has_checked_schema = False + + @property + def _test_id(self) -> str: + """A name for this test, as descriptive and unique as possible.""" + return f"{self.ctx.interface_name}[{self.ctx.version}]/{self.ctx.role}:{self._test_name}" + + @property + def ctx(self) -> Optional[_InterfaceTestContext]: + """The test context, defined by the test caller. + + It exposes information about the charm that is using this test. + You probably won't need to call this from inside the test definition. + + When called from an interface test scope, is guaranteed(^tm) to return + ``_InterfaceTestContext``. + """ + return _TESTER_CTX + + def run(self, event: Union[str, Event]) -> State: + """Simulate the emission on an event in the initial state you passed to the initializer. + + Calling this method will run scenario and verify that the charm being tested can handle + the ``event`` without raising exceptions. + + It returns the output state resulting from this execution, should you want to + write assertions against it. + """ + if not self.ctx: + raise InvalidTesterRunError("tester cannot run: no _TESTER_CTX set") + + state_out = self._run(event) + self._state_out = state_out + return state_out + + @property + def _relations(self) -> List[Relation]: + """The relations that this test is about.""" + return [r for r in self._state_out.relations if r.interface == self.ctx.interface_name] + + def assert_schema_valid(self, schema: Optional["DataBagSchema"] = None): + """Check that the local databags of the relations being tested satisfy the default schema. + + Default schema is defined in this-interface/vX/schema.py. + Override the schema being checked against by passing your own DataBagSchema subclass. + """ + + self._has_checked_schema = True + if not self._has_run: + raise InvalidTesterRunError(self._test_id, "call Tester.run() first") + + if schema: + logger.info("running test with custom schema") + databag_schema = schema + else: + logger.info("running test with built-in schema") + databag_schema = self.ctx.schema + if not databag_schema: + raise NoSchemaError( + self._test_id, + "No schema found. If this is expected, " + "call Tester.skip_schema_validation() instead.", + ) + + errors = [] + for relation in self._relations: + try: + databag_schema.validate( + { + "unit": relation.local_unit_data, + "app": relation.local_app_data, + } + ) + except ValidationError as e: + errors.append(e.args[0]) + if errors: + raise SchemaValidationError(errors) + + def _check_has_run(self): + if not self._has_run: + raise InvalidTesterRunError(self._test_id, "Call Tester.run() first.") + + def assert_relation_data_empty(self): + """Assert that all local databags are empty for the relations being tested.""" + self._check_has_run() + for relation in self._relations: + if relation.local_app_data: + raise SchemaValidationError( + f"test {self._test_id}: local app databag not empty for {relation}" + ) + if relation.local_unit_data: + raise SchemaValidationError( + f"test {self._test_id}: local unit databag not empty for {relation}" + ) + self._has_checked_schema = True + + def skip_schema_validation(self): + """Skip schema validation for this test run. + + Only use if you really have to. + """ + self._check_has_run() + logger.debug("skipping schema validation") + self._has_checked_schema = True + + def _finalize(self): + """Verify that .run() has been called, as well as some schema validation method.""" + if not self._has_run: + raise InvalidTesterRunError( + self._test_id, "Test function must call Tester.run() before returning." + ) + if not self._has_checked_schema: + raise InvalidTesterRunError( + self._test_id, + "Test function must call " + "Tester.skip_schema_validation(), or " + "Tester.assert_schema_valid(), or " + "Tester.assert_relation_data_empty() before returning.", ) + self._detach() + + def _detach(self): + # release singleton + Tester.__instance__ = None + + def _run(self, event: Union[str, Event]): + logger.debug("running %s" % event) + self._has_run = True + + # this is the input state as specified by the interface tests writer. It can + # contain elements that are required for the relation interface test to work, + # typically relation data pertaining to the relation interface being tested. + input_state = self._state_in + + # state_template is state as specified by the charm being tested, which the charm + # requires to function properly. Consider it part of the mocking. For example: + # some required config, a "happy" status, network information, OTHER relations. + # Typically, should NOT touch the relation that this interface test is about + # -> so we overwrite and warn on conflict: state_template is the baseline, + state = (self._state_template or State()).copy() + + relations = self._generate_relations_state( + state, input_state, self.ctx.supported_endpoints, self.ctx.role ) + # State is frozen; replace + modified_state = state.replace(relations=relations) + + # the Relation instance this test is about: + relation = next(filter(lambda r: r.interface == self.ctx.interface_name, relations)) + # test.EVENT might be a string or an Event. Cast to Event. + evt: Event = self._coerce_event(event, relation) + + logger.info("collected test for %s with %s" % (self.ctx.interface_name, evt.name)) + return self._run_scenario(evt, modified_state) + + def _run_scenario(self, event: Event, state: State): + logger.debug("running scenario with state=%s, event=%s" % (state, event)) - return wrapper + ctx = Context( + self.ctx.charm_type, + meta=self.ctx.meta, + actions=self.ctx.actions, + config=self.ctx.config, + ) + return ctx.run(event, state) + + def _coerce_event(self, raw_event: Union[str, Event], relation: Relation) -> Event: + # if the event being tested is a relation event, we need to inject some metadata + # or scenario.Runtime won't be able to guess what envvars need setting before ops.main + # takes over + if isinstance(raw_event, str): + ep_name, _, evt_kind = raw_event.rpartition("-relation-") + if ep_name and evt_kind: + # this is a relation event. + # we inject the relation metadata + # todo: if the user passes a relation event that is NOT about the relation + # interface that this test is about, at this point we are injecting the wrong + # Relation instance. + # e.g. if in interfaces/foo one wants to test that if 'bar-relation-joined' is + # fired... then one would have to pass an Event instance already with its + # own Relation. + return Event( + raw_event, + relation=relation.replace(endpoint=ep_name), + ) + + else: + return Event(raw_event) + + elif isinstance(raw_event, Event): + if raw_event._is_relation_event and not raw_event.relation: + raise InvalidTestCaseError( + "This test case was passed an Event representing a relation event." + "However it does not have a Relation. Please pass it to the Event like so: " + "evt = Event('my_relation_changed', relation=Relation(...))" + ) + + return raw_event + + else: + raise InvalidTestCaseError( + f"Expected Event or str, not {type(raw_event)}. " + f"Invalid test case: {self} cannot cast {raw_event} to Event." + ) + + def _generate_relations_state( + self, state_template: State, input_state: State, supported_endpoints, role: Role + ) -> List[Relation]: + """Merge the relations from the input state and the state template into one. + + The charm being tested possibly provided a state_template to define some setup mocking data + The interface tests also have an input_state. Here we merge them into one relation list to + be passed to the 'final' State the test will run with. + """ + interface_name = self.ctx.interface_name + + for rel in state_template.relations: + if rel.interface == interface_name: + logger.warning( + "relation with interface name =%s found in state template. " + "This will be overwritten by the relation spec provided by the relation " + "interface test case." % interface_name + ) + + def filter_relations(rels: List[Relation], op: Callable): + return [r for r in rels if op(r.interface, interface_name)] + + # the baseline is: all relations whose interface IS NOT the interface we're testing. + relations = filter_relations(state_template.relations, op=operator.ne) + + if input_state: + # if the charm we're testing specified some relations in its input state, we add those + # whose interface IS the same as the one we're testing. If other relation interfaces + # were specified, they will be ignored. + relations.extend(filter_relations(input_state.relations, op=operator.eq)) + + if ignored := filter_relations(input_state.relations, op=operator.eq): + logger.warning( + "irrelevant relations specified in input state for %s/%s." + "These will be ignored. details: %s" % (interface_name, role, ignored) + ) + + # if we still don't have any relation matching the interface we're testing, we generate + # one from scratch. + if not filter_relations(relations, op=operator.eq): + # if neither the charm nor the interface specified any custom relation spec for + # the interface we're testing, we will provide one. + endpoints_for_interface = supported_endpoints[role] + + if len(endpoints_for_interface) < 1: + raise ValueError(f"no endpoint found for {role}/{interface_name}.") + elif len(endpoints_for_interface) > 1: + raise ValueError( + f"Multiple endpoints found for {role}/{interface_name}: " + f"{endpoints_for_interface}: cannot guess which one it is " + f"we're supposed to be testing" + ) + else: + endpoint = endpoints_for_interface[0] + + relations.append( + Relation( + interface=interface_name, + endpoint=endpoint, + ) + ) + logger.debug( + "%s: merged %s and %s --> relations=%s" + % (self, input_state, state_template, relations) + ) + return relations diff --git a/interface_tester/plugin.py b/interface_tester/plugin.py index 6e60dbb..11c3d4a 100644 --- a/interface_tester/plugin.py +++ b/interface_tester/plugin.py @@ -1,48 +1,36 @@ # Copyright 2022 Canonical Ltd. # See LICENSE file for licensing details. import logging -import operator import tempfile from pathlib import Path from subprocess import PIPE, Popen -from typing import ( - TYPE_CHECKING, - Any, - Callable, - Dict, - Generator, - List, - Literal, - Optional, - Tuple, - Type, - Union, -) +from typing import Any, Callable, Dict, Generator, List, Optional, Tuple, Type from ops.testing import CharmType -from scenario.state import Event, Relation, State, _CharmSpec +from scenario.state import Event, MetadataNotFoundError, State, _CharmSpec -from .collector import InterfaceTestSpec, gather_test_spec_for_version -from .errors import ( +from interface_tester.collector import InterfaceTestSpec, gather_test_spec_for_version +from interface_tester.errors import ( InterfaceTesterValidationError, InterfaceTestsFailed, - InvalidTestCaseError, NoTestsRun, ) -from .runner import run_test_case -from .schema_base import DataBagSchema - -if TYPE_CHECKING: - from .interface_test import _InterfaceTestCase +from interface_tester.interface_test import ( + RoleLiteral, + _InterfaceTestContext, + tester_context, +) +from interface_tester.schema_base import DataBagSchema Callback = Callable[[State, Event], None] +ROLE_TO_ROLE_META = {"provider": "provides", "requirer": "requires"} logger = logging.getLogger("pytest_interface_tester") -ROLE_TO_ROLE_META = {"provider": "provides", "requirer": "requires"} -Role = Literal["provider", "requirer"] class InterfaceTester: + _RAISE_IMMEDIATELY = False + def __init__( self, repo: str = "https://github.com/canonical/charm-relation-interfaces", @@ -125,7 +113,7 @@ def _validate_config(self): ) if not self._charm_type: errors.append("Tester misconfigured: needs a charm_type set.") - if not self.meta: + elif not self.meta: errors.append("no metadata: it was not provided, and it cannot be autoloaded") if not self._repo: errors.append("repo missing") @@ -153,8 +141,7 @@ def _charm_spec(self) -> _CharmSpec: spec = _CharmSpec.autoload(self._charm_type) # if no metadata.yaml can be found in the charm type module's parent directory, # this call will raise: - - except FileNotFoundError as e: + except MetadataNotFoundError as e: # if we have _meta set, we're good, otherwise we're misconfigured. if self._meta and self._charm_type: spec = _CharmSpec( @@ -223,13 +210,13 @@ def _collect_interface_test_specs(self) -> InterfaceTestSpec: return tests - def _gather_supported_endpoints(self) -> Dict[Literal[Role], List[str]]: + def _gather_supported_endpoints(self) -> Dict[RoleLiteral, List[str]]: """Given a relation interface name, return a list of supported endpoints as either role. These are collected from the charm's metadata.yaml. """ - supported_endpoints: Dict[Literal[Role], List[str]] = {} - role: Role + supported_endpoints: Dict["RoleLiteral", List[str]] = {} + role: RoleLiteral for role in ("provider", "requirer"): meta_role = ROLE_TO_ROLE_META[role] @@ -253,7 +240,7 @@ def _gather_supported_endpoints(self) -> Dict[Literal[Role], List[str]]: def _yield_tests( self, - ) -> Generator[Tuple["_InterfaceTestCase", "DataBagSchema", Event, State], None, None]: + ) -> Generator[Tuple[Callable, RoleLiteral, DataBagSchema], None, None]: """Yield all test cases applicable to this charm and interface. This means: @@ -280,41 +267,14 @@ def _yield_tests( if not supported_endpoints: raise RuntimeError(f"this charm does not declare any endpoint using {interface_name}.") - role: Role + role: RoleLiteral for role in supported_endpoints: logger.debug(f"collecting scenes for {role}") spec = tests[role] - test: "_InterfaceTestCase" + schema = spec["schema"] for test in spec["tests"]: - logger.debug(f"converting {test} to ") - - # this is the input state as specified by the interface tests writer. It can - # contain elements that are required for the relation interface test to work, - # typically relation data pertaining to the relation interface being tested. - input_state = test.input_state - - # state_template is state as specified by the charm being tested, which the charm - # requires to function properly. Consider it part of the mocking. For example: - # some required config, a "happy" status, network information, OTHER relations. - # Typically, should NOT touch the relation that this interface test is about - # -> so we overwrite and warn on conflict: state_template is the baseline, - state = (self._state_template or State()).copy() - - relations = self._generate_relations_state( - state, input_state, supported_endpoints, role - ) - # State is frozen; replace - modified_state = state.replace(relations=relations) - - # the Relation instance this test is about: - relation = next(filter(lambda r: r.interface == self._interface_name, relations)) - # test.EVENT might be a string or an Event. Cast to Event. - evt = self._coerce_event(test.event, relation) - - logger.info(f"collected test for {interface_name} with {evt.name}") - logger.debug(f"state={modified_state}, evt={evt}") - yield test, spec["schema"], evt, modified_state + yield test, role, schema def __repr__(self): return f""" bool: errors = [] ran_some = False - for test, schema, event, state in self._yield_tests(): - out = run_test_case( - test=test, + for test_fn, role, schema in self._yield_tests(): + ctx = _InterfaceTestContext( + role=role, schema=schema, - event=event, - state=state, interface_name=self._interface_name, + version=self._interface_version, charm_type=self._charm_type, meta=self.meta, config=self.config, actions=self.actions, + supported_endpoints=self._gather_supported_endpoints(), + test_fn=test_fn, ) - if out: - errors.extend(out) - + try: + with tester_context(ctx): + test_fn() + except Exception as e: + if self._RAISE_IMMEDIATELY: + raise e + errors.append(e) ran_some = True # todo: consider raising custom exceptions here. @@ -364,108 +329,3 @@ def run(self) -> bool: msg = f"no tests gathered for {self._interface_name}/v{self._interface_version}" logger.warning(msg) raise NoTestsRun(msg) - - def _coerce_event(self, raw_event: Union[str, Event], relation: Relation) -> Event: - # if the event being tested is a relation event, we need to inject some metadata - # or scenario.Runtime won't be able to guess what envvars need setting before ops.main - # takes over - if isinstance(raw_event, str): - ep_name, _, evt_kind = raw_event.rpartition("-relation-") - if ep_name and evt_kind: - # this is a relation event. - # we inject the relation metadata - # todo: if the user passes a relation event that is NOT about the relation - # interface that this test is about, at this point we are injecting the wrong - # Relation instance. - # e.g. if in interfaces/foo one wants to test that if 'bar-relation-joined' is - # fired... then one would have to pass an Event instance already with its - # own Relation. - return Event( - raw_event, - relation=relation.replace(endpoint=ep_name), - ) - - else: - return Event(raw_event) - - elif isinstance(raw_event, Event): - if raw_event._is_relation_event and not raw_event.relation: - raise InvalidTestCaseError( - "This test case was passed an Event representing a relation event." - "However it does not have a Relation. Please pass it to the Event like so: " - "evt = Event('my_relation_changed', relation=Relation(...))" - ) - - return raw_event - - else: - raise InvalidTestCaseError( - f"Expected Event or str, not {type(raw_event)}. " - f"Invalid test case: {self} cannot cast {raw_event} to Event." - ) - - def _generate_relations_state( - self, state_template: State, input_state: State, supported_endpoints, role: Role - ) -> List[Relation]: - """Merge the relations from the input state and the state template into one. - - The charm being tested possibly provided a state_template to define some setup mocking data - The interface tests also have an input_state. Here we merge them into one relation list to - be passed to the 'final' State the test will run with. - """ - interface_name = self._interface_name - - for rel in state_template.relations: - if rel.interface == interface_name: - logger.warning( - f"relation with interface name = {interface_name} found in state template. " - f"This will be overwritten by the relation spec provided by the relation " - f"interface test case." - ) - - def filter_relations(rels: List[Relation], op: Callable): - return [r for r in rels if op(r.interface, interface_name)] - - # the baseline is: all relations whose interface IS NOT the interface we're testing. - relations = filter_relations(state_template.relations, op=operator.ne) - - if input_state: - # if the charm we're testing specified some relations in its input state, we add those - # whose interface IS the same as the one we're testing. If other relation interfaces - # were specified, they will be ignored. - relations.extend(filter_relations(input_state.relations, op=operator.eq)) - - if ignored := filter_relations(input_state.relations, op=operator.eq): - logger.warning( - f"irrelevant relations specified in input state for {interface_name}/{role}." - f"These will be ignored. details: {ignored}" - ) - - # if we still don't have any relation matching the interface we're testing, we generate - # one from scratch. - if not filter_relations(relations, op=operator.eq): - # if neither the charm nor the interface specified any custom relation spec for - # the interface we're testing, we will provide one. - endpoints_for_interface = supported_endpoints[role] - - if len(endpoints_for_interface) < 1: - raise ValueError(f"no endpoint found for {role}/{interface_name}.") - elif len(endpoints_for_interface) > 1: - raise ValueError( - f"Multiple endpoints found for {role}/{interface_name}: " - f"{endpoints_for_interface}: cannot guess which one it is " - f"we're supposed to be testing" - ) - else: - endpoint = endpoints_for_interface[0] - - relations.append( - Relation( - interface=interface_name, - endpoint=endpoint, - ) - ) - logger.debug( - f"{self}: merged {input_state} and {state_template} --> relations={relations}" - ) - return relations diff --git a/interface_tester/runner.py b/interface_tester/runner.py deleted file mode 100644 index be9907b..0000000 --- a/interface_tester/runner.py +++ /dev/null @@ -1,146 +0,0 @@ -import typing -from typing import Dict, List, Optional, Type - -from ops.charm import CharmBase -from scenario import Context, Event, Relation, State - -from .errors import InvalidTestCaseError -from .interface_test import SchemaConfig, logger -from .schema_base import DataBagSchema - -if typing.TYPE_CHECKING: - from .interface_test import _InterfaceTestCase - - -def _assert_case_plays( - event: Event, state: State, charm_type: Type["CharmBase"], meta, actions, config -) -> State: - try: - ctx = Context(charm_type, meta=meta, actions=actions, config=config) - state_out = ctx.run(event, state) - except Exception as e: - msg = ( - f"Failed check 1: scenario errored out: ({type(e).__name__}){e}. Could not play scene." - ) - raise RuntimeError(msg) from e - return state_out - - -def _assert_state_out_valid(state_out: State, test: "_InterfaceTestCase"): - """Run the test's validator against the output state. - - Raise RuntimeError if any exception is raised by the validator. - """ - try: - test.run(state_out) - except Exception as e: - msg = f"Failed check 2: validating scene output: {e}" - raise RuntimeError(msg) from e - - -def _assert_schema_valid(schema: DataBagSchema, relation: Relation) -> None: - """Validate the relation databags against this schema. - - Raise RuntimeError if any exception is raised by the validator. - """ - try: - schema.validate( - { - "unit": relation.local_unit_data, - "app": relation.local_app_data, - } - ) - except Exception as e: - msg = f"Failed check 3: validating schema on scene output: {e}" - logger.error(msg) - raise RuntimeError(msg) from e - - -def _assert_schemas_valid( - test: "_InterfaceTestCase", state_out: State, schema: DataBagSchema, interface_name: str -) -> List[str]: - """Check that all relations using the interface comply with the provided schema.""" - test_schema = test.schema - if test_schema is SchemaConfig.skip: - logger.info("Schema validation skipped as per interface_test_case schema config.") - return [] - - if test_schema == SchemaConfig.default: - schema = schema - elif test_schema == SchemaConfig.empty: - schema = DataBagSchema() - elif isinstance(test_schema, DataBagSchema): - schema = test_schema - else: - raise InvalidTestCaseError( - "interface_test_case schema should be either a SchemaConfig instance or a " - f"DataBagSchema instance, not {type(test_schema)}." - ) - - errors = [] - for relation in [r for r in state_out.relations if r.interface == interface_name]: - try: - _assert_schema_valid(schema=schema, relation=relation) - except RuntimeError as e: - errors.append(e.args[0]) - return errors - - -def run_test_case( - test: "_InterfaceTestCase", - schema: Optional["DataBagSchema"], - event: Event, - state: State, - interface_name: str, - # the charm type we're testing - charm_type: Type["CharmBase"], - # charm metadata yamls - meta: Dict, - config: Dict, - actions: Dict, -) -> List[str]: - """Run an interface test case. - - This will run three checks in sequence: - - play the scenario (check that the charm runs without exceptions) and - obtain the output state - - validate the output state (by calling the test-case-provided validator with - the output state as argument) - - validate the schema against the relations in the output state. - - It will return a list of strings, representing any issues encountered in any of the checks. - """ - errors: List[str] = [] - logger.info(f"running test {test.name!r}") - logger.info("check 1/3: scenario play") - try: - state_out = _assert_case_plays( - event=event, - state=state, - charm_type=charm_type, - meta=meta, - config=config, - actions=actions, - ) - except RuntimeError as e: - errors.append(e.args[0]) - logger.error("scenario couldn't run: aborting test.", exc_info=True) - return errors - - logger.info("check 2/3: scenario output state validation") - # todo: consistency check? or should we rely on scenario's? - try: - _assert_state_out_valid(state_out=state_out, test=test) - except RuntimeError as e: - errors.append(e.args[0]) - - logger.info("check 3/3: databag schema validation") - if not schema: - logger.info("schema validation step skipped: no schema provided") - return errors - errors.extend( - _assert_schemas_valid( - test=test, state_out=state_out, schema=schema, interface_name=interface_name - ) - ) - return errors diff --git a/pyproject.toml b/pyproject.toml index 31ebe11..8da2688 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -11,7 +11,7 @@ build-backend = "setuptools.build_meta" [project] name = "pytest-interface-tester" -version = "0.3.3" +version = "1.0" authors = [ { name = "Pietro Pasotti", email = "pietro.pasotti@canonical.com" }, ] diff --git a/tests/resources/charm-like-path/tests/interface/conftest.py b/tests/resources/charm-like-path/tests/interface/conftest.py new file mode 100644 index 0000000..7026569 --- /dev/null +++ b/tests/resources/charm-like-path/tests/interface/conftest.py @@ -0,0 +1,37 @@ +# Copyright 2023 Canonical Ltd. +# See LICENSE file for licensing details. + +import pytest +from ops import CharmBase +from scenario.state import State + +from interface_tester import InterfaceTester +from interface_tester.collector import gather_test_spec_for_version +from tests.unit.utils import CRI_LIKE_PATH + + +class CRILikePathTester(InterfaceTester): + def _collect_interface_test_specs(self): + gather_test_spec_for_version( + CRI_LIKE_PATH, + interface_name=self._interface_name, + version=self._interface_version, + ) + + +class DummiCharm(CharmBase): + pass + + +@pytest.fixture +def interface_tester(interface_tester: CRILikePathTester): + interface_tester.configure( + charm_type=DummiCharm, + meta={ + "name": "dummi", + "provides": {"tracing": {"interface": "tracing"}}, + "requires": {"tracing": {"interface": "tracing"}}, + }, + state_template=State(leader=True), + ) + yield interface_tester diff --git a/tests/resources/cri-like-path/interfaces/database/v1/charms.yaml b/tests/resources/cri-like-path/interfaces/database/v1/charms.yaml new file mode 100644 index 0000000..335bd83 --- /dev/null +++ b/tests/resources/cri-like-path/interfaces/database/v1/charms.yaml @@ -0,0 +1,5 @@ +providers: + - name: foo-k8s + url: https://github.com/canonical/foo-k8s-operator + +requirers: [] \ No newline at end of file diff --git a/tests/resources/cri-like-path/interfaces/database/v1/interface_tests/test_provider.py b/tests/resources/cri-like-path/interfaces/database/v1/interface_tests/test_provider.py new file mode 100644 index 0000000..2ef0a9d --- /dev/null +++ b/tests/resources/cri-like-path/interfaces/database/v1/interface_tests/test_provider.py @@ -0,0 +1,35 @@ +# Copyright 2023 Canonical +# See LICENSE file for licensing details. + +from scenario import Relation, State + +from interface_tester.interface_test import Tester + + +def test_no_data_on_created(): + t = Tester(State()) + t.run(event="database-relation-created") + t.assert_relation_data_empty() + + +def test_no_data_on_joined(): + t = Tester() + t.run(event="database-relation-joined") + t.assert_relation_data_empty() + + +def test_data_on_changed(): + t = Tester( + State( + relations=[ + Relation( + endpoint="database", + interface="database", + remote_app_name="remote", + local_app_data={}, + ) + ] + ) + ) + t.run("database-relation-changed") + t.assert_relation_data_empty() diff --git a/tests/resources/cri-like-path/interfaces/database/v1/interface_tests/test_requirer.py b/tests/resources/cri-like-path/interfaces/database/v1/interface_tests/test_requirer.py new file mode 100644 index 0000000..2ef0a9d --- /dev/null +++ b/tests/resources/cri-like-path/interfaces/database/v1/interface_tests/test_requirer.py @@ -0,0 +1,35 @@ +# Copyright 2023 Canonical +# See LICENSE file for licensing details. + +from scenario import Relation, State + +from interface_tester.interface_test import Tester + + +def test_no_data_on_created(): + t = Tester(State()) + t.run(event="database-relation-created") + t.assert_relation_data_empty() + + +def test_no_data_on_joined(): + t = Tester() + t.run(event="database-relation-joined") + t.assert_relation_data_empty() + + +def test_data_on_changed(): + t = Tester( + State( + relations=[ + Relation( + endpoint="database", + interface="database", + remote_app_name="remote", + local_app_data={}, + ) + ] + ) + ) + t.run("database-relation-changed") + t.assert_relation_data_empty() diff --git a/tests/resources/cri-like-path/interfaces/database/v1/schema.py b/tests/resources/cri-like-path/interfaces/database/v1/schema.py new file mode 100644 index 0000000..bf655c2 --- /dev/null +++ b/tests/resources/cri-like-path/interfaces/database/v1/schema.py @@ -0,0 +1,14 @@ +from pydantic import BaseModel + +from interface_tester.schema_base import DataBagSchema + + +class DBRequirerData(BaseModel): + foo: str + bar: int + + +class RequirerSchema(DataBagSchema): + """Requirer schema for Tracing.""" + + app: DBRequirerData diff --git a/tests/resources/cri-like-path/interfaces/tracing/v42/charms.yaml b/tests/resources/cri-like-path/interfaces/tracing/v42/charms.yaml new file mode 100644 index 0000000..bcc7677 --- /dev/null +++ b/tests/resources/cri-like-path/interfaces/tracing/v42/charms.yaml @@ -0,0 +1,5 @@ +providers: + - name: tempo-k8s + url: https://github.com/canonical/tempo-k8s-operator + +requirers: [] \ No newline at end of file diff --git a/tests/resources/cri-like-path/interfaces/tracing/v42/interface_tests/test_provider.py b/tests/resources/cri-like-path/interfaces/tracing/v42/interface_tests/test_provider.py new file mode 100644 index 0000000..61d32a2 --- /dev/null +++ b/tests/resources/cri-like-path/interfaces/tracing/v42/interface_tests/test_provider.py @@ -0,0 +1,35 @@ +# Copyright 2023 Canonical +# See LICENSE file for licensing details. + +from scenario import Relation, State + +from interface_tester.interface_test import Tester + + +def test_no_data_on_created(): + t = Tester(State()) + t.run(event="tracing-relation-created") + t.assert_relation_data_empty() + + +def test_no_data_on_joined(): + t = Tester() + t.run(event="tracing-relation-joined") + t.assert_relation_data_empty() + + +def test_data_on_changed(): + t = Tester( + State( + relations=[ + Relation( + endpoint="tracing", + interface="tracing", + remote_app_name="remote", + local_app_data={}, + ) + ] + ) + ) + t.run("tracing-relation-changed") + t.assert_relation_data_empty() diff --git a/tests/resources/cri-like-path/interfaces/tracing/v42/interface_tests/test_requirer.py b/tests/resources/cri-like-path/interfaces/tracing/v42/interface_tests/test_requirer.py new file mode 100644 index 0000000..61d32a2 --- /dev/null +++ b/tests/resources/cri-like-path/interfaces/tracing/v42/interface_tests/test_requirer.py @@ -0,0 +1,35 @@ +# Copyright 2023 Canonical +# See LICENSE file for licensing details. + +from scenario import Relation, State + +from interface_tester.interface_test import Tester + + +def test_no_data_on_created(): + t = Tester(State()) + t.run(event="tracing-relation-created") + t.assert_relation_data_empty() + + +def test_no_data_on_joined(): + t = Tester() + t.run(event="tracing-relation-joined") + t.assert_relation_data_empty() + + +def test_data_on_changed(): + t = Tester( + State( + relations=[ + Relation( + endpoint="tracing", + interface="tracing", + remote_app_name="remote", + local_app_data={}, + ) + ] + ) + ) + t.run("tracing-relation-changed") + t.assert_relation_data_empty() diff --git a/tests/resources/cri-like-path/interfaces/tracing/v42/schema.py b/tests/resources/cri-like-path/interfaces/tracing/v42/schema.py new file mode 100644 index 0000000..d172c93 --- /dev/null +++ b/tests/resources/cri-like-path/interfaces/tracing/v42/schema.py @@ -0,0 +1,29 @@ +from enum import Enum +from typing import List + +from pydantic import BaseModel, Json + +from interface_tester.schema_base import DataBagSchema + + +class IngesterProtocol(str, Enum): + otlp_grpc = "otlp_grpc" + otlp_http = "otlp_http" + zipkin = "zipkin" + tempo = "tempo" + + +class Ingester(BaseModel): + port: str + protocol: IngesterProtocol + + +class TracingRequirerData(BaseModel): + host: str + ingesters: Json[List[Ingester]] + + +class RequirerSchema(DataBagSchema): + """Requirer schema for Tracing.""" + + app: TracingRequirerData diff --git a/tests/unit/test_collect_interface_tests.py b/tests/unit/test_collect_interface_tests.py index 2df901b..34ac9b8 100644 --- a/tests/unit/test_collect_interface_tests.py +++ b/tests/unit/test_collect_interface_tests.py @@ -1,18 +1,10 @@ -import importlib -import random -import string -import sys -from textwrap import dedent - import pytest +from utils import CRI_LIKE_PATH from interface_tester.collector import collect_tests from interface_tester.interface_test import ( InvalidTestCase, - Role, check_test_case_validator_signature, - get_interface_name_and_version, - get_registered_test_cases, ) @@ -43,102 +35,14 @@ def _foo(a, b=2, c="a"): check_test_case_validator_signature(_foo) -@pytest.mark.parametrize("role", list(Role)) -@pytest.mark.parametrize("event", ("start", "update-status", "foo-relation-joined")) -@pytest.mark.parametrize("input_state", ("State()", "State(leader=True)")) -@pytest.mark.parametrize("intf_name", ("foo", "bar")) -@pytest.mark.parametrize("version", (0, 42)) -def test_registered_test_cases_cache(tmp_path, role, event, input_state, intf_name, version): - unique_name = "".join(random.choices(string.ascii_letters + string.digits, k=16)) - - # if the module name is not unique, importing it multiple times will result in royal confusion - pth = ( - tmp_path - / "interfaces" - / intf_name - / f"v{version}" - / "interface_tests" - / f"mytestcase_{unique_name}.py" - ) - pth.parent.mkdir(parents=True) - - pth.write_text( - dedent( - f""" -from interface_tester.interface_test import interface_test_case, Role -from scenario import State - -@interface_test_case( - "{role.value}", - "{event}", - input_state={input_state}, - name="{unique_name}", - schema='skip' -) -def foo(state_out: State): - pass - """ - ) - ) - - collect_tests(tmp_path) - registered = get_registered_test_cases()[(intf_name, version, role)] - - # exactly one test found with the unique name we just created - assert len([x for x in registered if x.name == unique_name]) == 1 - - -@pytest.mark.parametrize("intf_name", ("foo", "bar")) -@pytest.mark.parametrize("version", (0, 42)) -def test_get_interface_name_and_version(tmp_path, intf_name, version): - unique_name = "".join(random.choices(string.ascii_letters + string.digits, k=16)) +def test_load_from_mock_cri(): + tests = collect_tests(CRI_LIKE_PATH) + provider = tests["tracing"]["v42"]["provider"] + assert len(provider["tests"]) == 3 + assert not provider["schema"] + assert provider["charms"][0].name == "tempo-k8s" - pth = ( - tmp_path - / "interfaces" - / intf_name - / f"v{version}" - / "interface_tests" - / f"mytestcase_{unique_name}.py" - ) - pth.parent.mkdir(parents=True) - pth.write_text("def foo(): pass") - - # so we can import without tricks - sys.path.append(str(pth.parent)) - module_name = str(pth.with_suffix("").name) - module = importlib.import_module(module_name) - # cleanup - sys.path.pop(-1) - - foo_fn = getattr(module, "foo") - assert get_interface_name_and_version(foo_fn) == (intf_name, version) - - -@pytest.mark.parametrize("intf_name", ("foo", "bar")) -@pytest.mark.parametrize("version", (0, 42)) -def test_get_interface_name_and_version_raises(tmp_path, intf_name, version): - unique_name = "".join(random.choices(string.ascii_letters + string.digits, k=16)) - - pth = ( - tmp_path - / "gibber" - / "ish" - / intf_name - / f"v{version}" - / "boots" - / f"mytestcase_{unique_name}.py" - ) - pth.parent.mkdir(parents=True) - pth.write_text("def foo(): pass") - - # so we can import without tricks - sys.path.append(str(pth.parent)) - module_name = str(pth.with_suffix("").name) - module = importlib.import_module(module_name) - # cleanup - sys.path.pop(-1) - - foo_fn = getattr(module, "foo") - with pytest.raises(InvalidTestCase): - get_interface_name_and_version(foo_fn) + requirer = tests["tracing"]["v42"]["requirer"] + assert len(requirer["tests"]) == 3 + assert requirer["schema"] + assert not requirer["charms"] diff --git a/tests/unit/test_discover.py b/tests/unit/test_discover.py new file mode 100644 index 0000000..27788ef --- /dev/null +++ b/tests/unit/test_discover.py @@ -0,0 +1,46 @@ +from utils import CRI_LIKE_PATH + +from interface_tester.cli.discover import pprint_tests + + +def test_discover(capsys): + pprint_tests(CRI_LIKE_PATH, "*") + out = capsys.readouterr().out + assert ( + out.strip() + == f""" +collecting tests for * from root = {CRI_LIKE_PATH} +Discovered: +database: + - v1: + - provider: + - test_data_on_changed + - test_no_data_on_created + - test_no_data_on_joined + - schema NOT OK + - charms: + - foo-k8s (https://github.com/canonical/foo-k8s-operator) custom_test_setup=no + - requirer: + - test_data_on_changed + - test_no_data_on_created + - test_no_data_on_joined + - schema OK + - + +tracing: + - v42: + - provider: + - test_data_on_changed + - test_no_data_on_created + - test_no_data_on_joined + - schema NOT OK + - charms: + - tempo-k8s (https://github.com/canonical/tempo-k8s-operator) custom_test_setup=no + - requirer: + - test_data_on_changed + - test_no_data_on_created + - test_no_data_on_joined + - schema OK + - +""".strip() + ) diff --git a/tests/unit/test_e2e.py b/tests/unit/test_e2e.py new file mode 100644 index 0000000..5a021ed --- /dev/null +++ b/tests/unit/test_e2e.py @@ -0,0 +1,461 @@ +import tempfile +from pathlib import Path +from textwrap import dedent + +import pytest +from ops import CharmBase +from scenario import State +from utils import CRI_LIKE_PATH + +from interface_tester import InterfaceTester +from interface_tester.collector import gather_test_spec_for_version +from interface_tester.errors import SchemaValidationError +from interface_tester.interface_test import ( + InvalidTesterRunError, + NoSchemaError, + NoTesterInstanceError, + Tester, +) + + +class LocalTester(InterfaceTester): + _RAISE_IMMEDIATELY = True + + def _collect_interface_test_specs(self): + return gather_test_spec_for_version( + CRI_LIKE_PATH / "interfaces" / self._interface_name / f"v{self._interface_version}", + interface_name=self._interface_name, + version=self._interface_version, + ) + + +class DummiCharm(CharmBase): + pass + + +@pytest.fixture +def interface_tester(): + interface_tester = LocalTester() + interface_tester.configure( + charm_type=DummiCharm, + meta={ + "name": "dummi", + "provides": {"tracing": {"interface": "tracing"}}, + "requires": {"tracing-req": {"interface": "tracing"}}, + }, + state_template=State(leader=True), + ) + yield interface_tester + + +def test_local_run(interface_tester): + interface_tester.configure( + interface_name="tracing", + interface_version=42, + ) + interface_tester.run() + + +def _setup_with_test_file(test_file: str, schema_file: str = None): + td = tempfile.TemporaryDirectory() + temppath = Path(td.name) + + class TempDirTester(InterfaceTester): + _RAISE_IMMEDIATELY = True + + def _collect_interface_test_specs(self): + pth = temppath / "interfaces" / self._interface_name / f"v{self._interface_version}" + + test_dir = pth / "interface_tests" + test_dir.mkdir(parents=True) + test_provider = test_dir / "test_provider.py" + test_provider.write_text(test_file) + + if schema_file: + schema_path = pth / "schema.py" + schema_path.write_text(schema_file) + + return gather_test_spec_for_version( + pth, + interface_name=self._interface_name, + version=self._interface_version, + ) + + interface_tester = TempDirTester() + interface_tester.configure( + interface_name="tracing", + charm_type=DummiCharm, + meta={ + "name": "dummi", + "provides": {"tracing": {"interface": "tracing"}}, + "requires": {"tracing-req": {"interface": "tracing"}}, + }, + state_template=State(leader=True), + ) + + return interface_tester + + +def test_error_if_skip_schema_before_run(): + tester = _setup_with_test_file( + dedent( + """ +from scenario import State, Relation + +from interface_tester.interface_test import Tester + +def test_data_on_changed(): + t = Tester(State( + relations=[Relation( + endpoint='tracing', + interface='tracing', + remote_app_name='remote', + local_app_data={} + )] + )) + t.skip_schema_validation() +""" + ) + ) + + with pytest.raises(InvalidTesterRunError): + tester.run() + + +def test_error_if_assert_relation_data_empty_before_run(): + tester = _setup_with_test_file( + dedent( + """ +from scenario import State, Relation + +from interface_tester.interface_test import Tester + +def test_data_on_changed(): + t = Tester(State( + relations=[Relation( + endpoint='tracing', + interface='tracing', + remote_app_name='remote', + local_app_data={} + )] + )) + t.assert_relation_data_empty() +""" + ) + ) + + with pytest.raises(InvalidTesterRunError): + tester.run() + assert not Tester.__instance__ + + +def test_error_if_assert_schema_valid_before_run(): + tester = _setup_with_test_file( + dedent( + """ +from scenario import State, Relation + +from interface_tester.interface_test import Tester + +def test_data_on_changed(): + t = Tester(State( + relations=[Relation( + endpoint='tracing', + interface='tracing', + remote_app_name='remote', + local_app_data={} + )] + )) + t.assert_schema_valid() +""" + ) + ) + + with pytest.raises(InvalidTesterRunError): + tester.run() + + +def test_error_if_assert_schema_without_schema(): + tester = _setup_with_test_file( + dedent( + """ +from scenario import State, Relation + +from interface_tester.interface_test import Tester + +def test_data_on_changed(): + t = Tester(State( + relations=[Relation( + endpoint='tracing', + interface='tracing', + remote_app_name='remote', + local_app_data={} + )] + )) + state_out = t.run("tracing-relation-changed") + t.assert_schema_valid() +""" + ) + ) + + with pytest.raises(NoSchemaError): + tester.run() + + +def test_error_if_return_before_schema_call(): + tester = _setup_with_test_file( + dedent( + """ +from scenario import State, Relation + +from interface_tester.interface_test import Tester + +def test_data_on_changed(): + t = Tester(State( + relations=[Relation( + endpoint='tracing', + interface='tracing', + remote_app_name='remote', + local_app_data={} + )] + )) + state_out = t.run("tracing-relation-changed") +""" + ) + ) + + with pytest.raises(InvalidTesterRunError): + tester.run() + + +def test_error_if_return_without_run(): + tester = _setup_with_test_file( + dedent( + """ +from scenario import State, Relation + +from interface_tester.interface_test import Tester + +def test_data_on_changed(): + t = Tester(State( + relations=[Relation( + endpoint='tracing', + interface='tracing', + remote_app_name='remote', + local_app_data={} + )] + )) + +""" + ) + ) + + with pytest.raises(InvalidTesterRunError): + tester.run() + + +def test_error_if_return_without_tester_init(): + tester = _setup_with_test_file( + dedent( + """ +from scenario import State, Relation + +from interface_tester.interface_test import Tester + +def test_data_on_changed(): + pass + +""" + ) + ) + + with pytest.raises(NoTesterInstanceError): + tester.run() + + +def test_valid_run(): + tester = _setup_with_test_file( + dedent( + """ + from scenario import State, Relation + + from interface_tester.interface_test import Tester + from interface_tester.schema_base import DataBagSchema + + def test_data_on_changed(): + t = Tester(State( + relations=[Relation( + endpoint='tracing', + interface='tracing', + remote_app_name='remote', + local_app_data={} + )] + )) + state_out = t.run("tracing-relation-changed") + t.assert_schema_valid(schema=DataBagSchema()) + """ + ) + ) + + tester.run() + + +def test_valid_run_default_schema(): + tester = _setup_with_test_file( + dedent( + """ + from scenario import State, Relation + + from interface_tester.interface_test import Tester + from interface_tester.schema_base import DataBagSchema + + def test_data_on_changed(): + t = Tester(State( + relations=[Relation( + endpoint='tracing', + interface='tracing', + remote_app_name='remote', + local_app_data={"foo":"1"}, + local_unit_data={"bar": "smackbeef"} + )] + )) + state_out = t.run("tracing-relation-changed") + t.assert_schema_valid() + """ + ), + schema_file=dedent( + """ +from interface_tester.interface_test import Tester +from interface_tester.schema_base import DataBagSchema, BaseModel + +class Foo(BaseModel): + foo:int=1 +class Bar(BaseModel): + bar:str + +class ProviderSchema(DataBagSchema): + unit: Bar + app: Foo +""" + ), + ) + + tester.run() + + +def test_default_schema_validation_failure(): + tester = _setup_with_test_file( + dedent( + """ + from scenario import State, Relation + + from interface_tester.interface_test import Tester + from interface_tester.schema_base import DataBagSchema + + def test_data_on_changed(): + t = Tester(State( + relations=[Relation( + endpoint='tracing', + interface='tracing', + remote_app_name='remote', + local_app_data={"foo":"abc"}, + local_unit_data={"bar": "smackbeef"} + )] + )) + state_out = t.run("tracing-relation-changed") + t.assert_schema_valid() + """ + ), + schema_file=dedent( + """ + from interface_tester.interface_test import Tester + from interface_tester.schema_base import DataBagSchema, BaseModel + + class Foo(BaseModel): + foo:int=1 + class Bar(BaseModel): + bar:str + + class ProviderSchema(DataBagSchema): + unit: Bar + app: Foo + """ + ), + ) + + with pytest.raises(SchemaValidationError): + tester.run() + + +def test_valid_run_custom_schema(): + tester = _setup_with_test_file( + dedent( + """ + from scenario import State, Relation + + from interface_tester.interface_test import Tester + from interface_tester.schema_base import DataBagSchema, BaseModel + + class Foo(BaseModel): + foo:int=1 + class Bar(BaseModel): + bar:str + + class FooBarSchema(DataBagSchema): + unit: Bar + app: Foo + + def test_data_on_changed(): + t = Tester(State( + relations=[Relation( + endpoint='tracing', + interface='tracing', + remote_app_name='remote', + local_app_data={"foo":"1"}, + local_unit_data={"bar": "smackbeef"} + )] + )) + state_out = t.run("tracing-relation-changed") + t.assert_schema_valid(schema=FooBarSchema) + """ + ) + ) + + tester.run() + + +def test_invalid_custom_schema(): + tester = _setup_with_test_file( + dedent( + """ + from scenario import State, Relation + + from interface_tester.interface_test import Tester + from interface_tester.schema_base import DataBagSchema, BaseModel + + class Foo(BaseModel): + foo:int=1 + class Bar(BaseModel): + bar:str + + class FooBarSchema(DataBagSchema): + unit: Bar + app: Foo + + def test_data_on_changed(): + t = Tester(State( + relations=[Relation( + endpoint='tracing', + interface='tracing', + remote_app_name='remote', + local_app_data={"foo":"abc"}, + local_unit_data={"bar": "smackbeef"} + )] + )) + state_out = t.run("tracing-relation-changed") + t.assert_schema_valid(schema=FooBarSchema) + """ + ) + ) + with pytest.raises(SchemaValidationError): + tester.run() diff --git a/tests/unit/utils.py b/tests/unit/utils.py new file mode 100644 index 0000000..851deae --- /dev/null +++ b/tests/unit/utils.py @@ -0,0 +1,3 @@ +from pathlib import Path + +CRI_LIKE_PATH = Path(__file__).parent.parent / "resources" / "cri-like-path" diff --git a/tox.ini b/tox.ini index 63345f7..b256842 100644 --- a/tox.ini +++ b/tox.ini @@ -18,16 +18,16 @@ description = run unittests deps = .[unit_tests] commands = - pytest {[vars]tst_path}/unit/ + pytest -v --tb native {[vars]tst_path}/unit --log-cli-level=INFO -s {posargs} [testenv:lint] skip_install = True description = run linters (check only) deps = - black - isort - ruff + black==23.9.1 + isort==5.12.0 + ruff==0.0.289 commands = black --check {[vars]all_path} isort --profile black --check-only {[vars]all_path} @@ -38,8 +38,8 @@ commands = skip_install=True description = run formatters deps = - black - isort + black==23.9.1 + isort==5.12.0 commands = black {[vars]all_path} isort --profile black {[vars]all_path}