From a4b1d2ee0a969a8fe38b66b3078970e9c2b794a4 Mon Sep 17 00:00:00 2001 From: Tony Meyer Date: Fri, 11 Oct 2024 16:16:41 +1300 Subject: [PATCH] chore: use annotations from the __future__ (#1433) The goal here is to reduce the use of the name "Scenario" in the docs, to control where it appears when the docs are included at ops.readthedocs.io. Using `from __future__ import annotations` seems to make Sphinx much happier when doing the class signatures, avoiding odd text like `~scenario.state.CloudCredential` instead of the expected link with text "CloudCredential" and destination that class. When adding those imports, pyupgrade transformed all the type annotations. I've run all the tests with 3.8, 3.9, 3.10, 3.11, and 3.12 and they all pass, and I've manually tested in 3.8 and everything seems to work without any problems. I like this much more, so it seems like a win-win. You may find it easier to review commit-by-commit, as [one commit is completely automated](https://github.com/canonical/operator/commit/7f250a1161c49c193018b90bd769fc88060fa4c3), running `pyupgrade`, and it is responsible for the majority of lines changed. Also removes a couple of references to the name "Scenario", as the original intent of this change was to clean up references in the docs. Migrated from https://github.com/canonical/ops-scenario/pull/203 --- testing/src/context.py | 105 +++++++++--------- testing/src/errors.py | 2 +- testing/src/state.py | 237 ++++++++++++++++++++--------------------- 3 files changed, 169 insertions(+), 175 deletions(-) diff --git a/testing/src/context.py b/testing/src/context.py index 5ecf4a57f..44aea5056 100644 --- a/testing/src/context.py +++ b/testing/src/context.py @@ -2,6 +2,8 @@ # Copyright 2023 Canonical Ltd. # See LICENSE file for licensing details. +from __future__ import annotations + import functools import tempfile from contextlib import contextmanager @@ -10,12 +12,7 @@ TYPE_CHECKING, Any, Callable, - Dict, - List, Mapping, - Optional, - Type, - Union, cast, ) @@ -79,9 +76,9 @@ class Manager: def __init__( self, - ctx: "Context", + ctx: Context, arg: _Event, - state_in: "State", + state_in: State, ): self._ctx = ctx self._arg = arg @@ -89,7 +86,7 @@ def __init__( self._emitted: bool = False - self.ops: Optional["Ops"] = None + self.ops: Ops | None = None @property def charm(self) -> ops.CharmBase: @@ -113,7 +110,7 @@ def __enter__(self): self.ops = ops return self - def run(self) -> "State": + def run(self) -> State: """Emit the event and proceed with charm execution. This can only be done once. @@ -263,12 +260,12 @@ def collect_unit_status(): @staticmethod @_copy_doc(ops.RelationCreatedEvent) - def relation_created(relation: "RelationBase"): + def relation_created(relation: RelationBase): return _Event(f"{relation.endpoint}_relation_created", relation=relation) @staticmethod @_copy_doc(ops.RelationJoinedEvent) - def relation_joined(relation: "RelationBase", *, remote_unit: Optional[int] = None): + def relation_joined(relation: RelationBase, *, remote_unit: int | None = None): return _Event( f"{relation.endpoint}_relation_joined", relation=relation, @@ -278,9 +275,9 @@ def relation_joined(relation: "RelationBase", *, remote_unit: Optional[int] = No @staticmethod @_copy_doc(ops.RelationChangedEvent) def relation_changed( - relation: "RelationBase", + relation: RelationBase, *, - remote_unit: Optional[int] = None, + remote_unit: int | None = None, ): return _Event( f"{relation.endpoint}_relation_changed", @@ -291,10 +288,10 @@ def relation_changed( @staticmethod @_copy_doc(ops.RelationDepartedEvent) def relation_departed( - relation: "RelationBase", + relation: RelationBase, *, - remote_unit: Optional[int] = None, - departing_unit: Optional[int] = None, + remote_unit: int | None = None, + departing_unit: int | None = None, ): return _Event( f"{relation.endpoint}_relation_departed", @@ -305,7 +302,7 @@ def relation_departed( @staticmethod @_copy_doc(ops.RelationBrokenEvent) - def relation_broken(relation: "RelationBase"): + def relation_broken(relation: RelationBase): return _Event(f"{relation.endpoint}_relation_broken", relation=relation) @staticmethod @@ -354,10 +351,10 @@ def pebble_check_recovered(container: Container, info: CheckInfo): @_copy_doc(ops.ActionEvent) def action( name: str, - params: Optional[Mapping[str, "AnyJson"]] = None, - id: Optional[str] = None, + params: Mapping[str, AnyJson] | None = None, + id: str | None = None, ): - kwargs: Dict[str, Any] = {} + kwargs: dict[str, Any] = {} if params: kwargs["params"] = params if id: @@ -423,26 +420,26 @@ def test_foo(): manager.run() """ - juju_log: List["JujuLogLine"] + juju_log: list[JujuLogLine] """A record of what the charm has sent to juju-log""" - app_status_history: List["_EntityStatus"] + app_status_history: list[_EntityStatus] """A record of the app statuses the charm has set""" - unit_status_history: List["_EntityStatus"] + unit_status_history: list[_EntityStatus] """A record of the unit statuses the charm has set""" - workload_version_history: List[str] + workload_version_history: list[str] """A record of the workload versions the charm has set""" - removed_secret_revisions: List[int] + removed_secret_revisions: list[int] """A record of the secret revisions the charm has removed""" - emitted_events: List[ops.EventBase] + emitted_events: list[ops.EventBase] """A record of the events (including custom) that the charm has processed""" - requested_storages: Dict[str, int] + requested_storages: dict[str, int] """A record of the storages the charm has requested""" - action_logs: List[str] + action_logs: list[str] """The logs associated with the action output, set by the charm with :meth:`ops.ActionEvent.log` This will be empty when handling a non-action event. """ - action_results: Optional[Dict[str, Any]] + action_results: dict[str, Any] | None """A key-value mapping assigned by the charm as a result of the action. This will be ``None`` if the charm never calls :meth:`ops.ActionEvent.set_results` @@ -455,17 +452,17 @@ def test_foo(): def __init__( self, - charm_type: Type["CharmType"], - meta: Optional[Dict[str, Any]] = None, + charm_type: type[CharmType], + meta: dict[str, Any] | None = None, *, - actions: Optional[Dict[str, Any]] = None, - config: Optional[Dict[str, Any]] = None, - charm_root: Optional[Union[str, Path]] = None, + actions: dict[str, Any] | None = None, + config: dict[str, Any] | None = None, + charm_root: str | Path | None = None, juju_version: str = _DEFAULT_JUJU_VERSION, capture_deferred_events: bool = False, capture_framework_events: bool = False, - app_name: Optional[str] = None, - unit_id: Optional[int] = 0, + app_name: str | None = None, + unit_id: int | None = 0, app_trusted: bool = False, ): """Represents a simulated charm's execution context. @@ -534,26 +531,26 @@ def __init__( self.capture_framework_events = capture_framework_events # streaming side effects from running an event - self.juju_log: List["JujuLogLine"] = [] - self.app_status_history: List["_EntityStatus"] = [] - self.unit_status_history: List["_EntityStatus"] = [] - self.exec_history: Dict[str, List["ExecArgs"]] = {} - self.workload_version_history: List[str] = [] - self.removed_secret_revisions: List[int] = [] - self.emitted_events: List[ops.EventBase] = [] - self.requested_storages: Dict[str, int] = {} + self.juju_log: list[JujuLogLine] = [] + self.app_status_history: list[_EntityStatus] = [] + self.unit_status_history: list[_EntityStatus] = [] + self.exec_history: dict[str, list[ExecArgs]] = {} + self.workload_version_history: list[str] = [] + self.removed_secret_revisions: list[int] = [] + self.emitted_events: list[ops.EventBase] = [] + self.requested_storages: dict[str, int] = {} # set by Runtime.exec() in self._run() - self._output_state: Optional["State"] = None + self._output_state: State | None = None # operations (and embedded tasks) from running actions - self.action_logs: List[str] = [] - self.action_results: Optional[Dict[str, Any]] = None - self._action_failure_message: Optional[str] = None + self.action_logs: list[str] = [] + self.action_results: dict[str, Any] | None = None + self._action_failure_message: str | None = None self.on = CharmEvents() - def _set_output_state(self, output_state: "State"): + def _set_output_state(self, output_state: State): """Hook for Runtime to set the output state.""" self._output_state = output_state @@ -568,14 +565,14 @@ def _get_storage_root(self, name: str, index: int) -> Path: storage_root.mkdir(parents=True, exist_ok=True) return storage_root - def _record_status(self, state: "State", is_app: bool): + def _record_status(self, state: State, is_app: bool): """Record the previous status before a status change.""" if is_app: self.app_status_history.append(state.app_status) else: self.unit_status_history.append(state.unit_status) - def __call__(self, event: "_Event", state: "State"): + def __call__(self, event: _Event, state: State): """Context manager to introspect live charm object before and after the event is emitted. Usage:: @@ -592,7 +589,7 @@ def __call__(self, event: "_Event", state: "State"): """ return Manager(self, event, state) - def run_action(self, action: str, state: "State"): + def run_action(self, action: str, state: State): """Use `run()` instead. :private: @@ -602,7 +599,7 @@ def run_action(self, action: str, state: "State"): "and find the results in `ctx.action_results`", ) - def run(self, event: "_Event", state: "State") -> "State": + def run(self, event: _Event, state: State) -> State: """Trigger a charm execution with an event and a State. Calling this function will call ``ops.main`` and set up the context according to the @@ -680,7 +677,7 @@ def run(self, event: "_Event", state: "State") -> "State": return self._output_state @contextmanager - def _run(self, event: "_Event", state: "State"): + def _run(self, event: _Event, state: State): runtime = Runtime( charm_spec=self.charm_spec, juju_version=self.juju_version, diff --git a/testing/src/errors.py b/testing/src/errors.py index 56a01d126..81049eaa6 100644 --- a/testing/src/errors.py +++ b/testing/src/errors.py @@ -38,7 +38,7 @@ class StateValidationError(RuntimeError): class MetadataNotFoundError(RuntimeError): - """Raised when Scenario can't find a metadata file in the provided charm root.""" + """Raised when a metadata file can't be found in the provided charm root.""" class ActionMissingFromContextError(Exception): diff --git a/testing/src/state.py b/testing/src/state.py index 5ff4d71ed..8e46afe1f 100644 --- a/testing/src/state.py +++ b/testing/src/state.py @@ -2,7 +2,9 @@ # Copyright 2023 Canonical Ltd. # See LICENSE file for licensing details. -"""The core Scenario State object, and the components inside it.""" +"""The core State object, and the components inside it.""" + +from __future__ import annotations import dataclasses import datetime @@ -20,18 +22,13 @@ ClassVar, Dict, Final, - FrozenSet, Generic, Iterable, List, Literal, Mapping, NoReturn, - Optional, Sequence, - Set, - Tuple, - Type, TypeVar, Union, cast, @@ -201,14 +198,14 @@ class CloudCredential(_max_posargs(0)): auth_type: str """Authentication type.""" - attributes: Dict[str, str] = dataclasses.field(default_factory=dict) + attributes: dict[str, str] = dataclasses.field(default_factory=dict) """A dictionary containing cloud credentials. For example, for AWS, it contains `access-key` and `secret-key`; for Azure, `application-id`, `application-password` and `subscription-id` can be found here. """ - redacted: List[str] = dataclasses.field(default_factory=list) + redacted: list[str] = dataclasses.field(default_factory=list) """A list of redacted generic cloud API secrets.""" def _to_ops(self) -> CloudCredential_Ops: @@ -229,22 +226,22 @@ class CloudSpec(_max_posargs(1)): name: str = "localhost" """Juju cloud name.""" - region: Optional[str] = None + region: str | None = None """Region of the cloud.""" - endpoint: Optional[str] = None + endpoint: str | None = None """Endpoint of the cloud.""" - identity_endpoint: Optional[str] = None + identity_endpoint: str | None = None """Identity endpoint of the cloud.""" - storage_endpoint: Optional[str] = None + storage_endpoint: str | None = None """Storage endpoint of the cloud.""" - credential: Optional[CloudCredential] = None + credential: CloudCredential | None = None """Cloud credentials with key-value attributes.""" - ca_certificates: List[str] = dataclasses.field(default_factory=list) + ca_certificates: list[str] = dataclasses.field(default_factory=list) """A list of CA certificates.""" skip_tls_verify: bool = False @@ -284,12 +281,12 @@ class Secret(_max_posargs(1)): This class is used for both user and charm secrets. """ - tracked_content: "RawSecretRevisionContents" + tracked_content: RawSecretRevisionContents """The content of the secret that the charm is currently tracking. This is the content the charm will receive with a :meth:`ops.Secret.get_content` call.""" - latest_content: Optional["RawSecretRevisionContents"] = None + latest_content: RawSecretRevisionContents | None = None """The content of the latest revision of the secret. This is the content the charm will receive with a @@ -309,20 +306,20 @@ class Secret(_max_posargs(1)): to this unit. """ - remote_grants: Dict[int, Set[str]] = dataclasses.field(default_factory=dict) + remote_grants: dict[int, set[str]] = dataclasses.field(default_factory=dict) """Mapping from relation IDs to remote units and applications to which this secret has been granted.""" - label: Optional[str] = None + label: str | None = None """A human-readable label the charm can use to retrieve the secret. If this is set, it implies that the charm has previously set the label. """ - description: Optional[str] = None + description: str | None = None """A human-readable description of the secret.""" - expire: Optional[datetime.datetime] = None + expire: datetime.datetime | None = None """The time at which the secret will expire.""" - rotate: Optional[SecretRotate] = None + rotate: SecretRotate | None = None """The rotation policy for the secret.""" # what revision is currently tracked by this charm. Only meaningful if owner=False @@ -351,11 +348,11 @@ def _track_latest_revision(self): def _update_metadata( self, - content: Optional["RawSecretRevisionContents"] = None, - label: Optional[str] = None, - description: Optional[str] = None, - expire: Optional[datetime.datetime] = None, - rotate: Optional[SecretRotate] = None, + content: RawSecretRevisionContents | None = None, + label: str | None = None, + description: str | None = None, + expire: datetime.datetime | None = None, + rotate: SecretRotate | None = None, ): """Update the metadata.""" # bypass frozen dataclass @@ -407,11 +404,11 @@ def address(self, value: str): class BindAddress(_max_posargs(1)): """An address bound to a network interface in a Juju space.""" - addresses: List[Address] + addresses: list[Address] """The addresses in the space.""" interface_name: str = "" """The name of the network interface.""" - mac_address: Optional[str] = None + mac_address: str | None = None """The MAC address of the interface.""" def _hook_tool_output_fmt(self): @@ -432,15 +429,15 @@ class Network(_max_posargs(2)): binding_name: str """The name of the network space.""" - bind_addresses: List[BindAddress] = dataclasses.field( + bind_addresses: list[BindAddress] = dataclasses.field( default_factory=lambda: [BindAddress([Address("192.0.2.0")])], ) """Addresses that the charm's application should bind to.""" - ingress_addresses: List[str] = dataclasses.field( + ingress_addresses: list[str] = dataclasses.field( default_factory=lambda: ["192.0.2.0"], ) """Addresses other applications should use to connect to the unit.""" - egress_subnets: List[str] = dataclasses.field( + egress_subnets: list[str] = dataclasses.field( default_factory=lambda: ["192.0.2.0/24"], ) """Subnets that other units will see the charm connecting from.""" @@ -480,7 +477,7 @@ class RelationBase(_max_posargs(2)): endpoint: str """Relation endpoint name. Must match some endpoint name defined in the metadata.""" - interface: Optional[str] = None + interface: str | None = None """Interface name. Must match the interface name attached to this endpoint in the metadata. If left empty, it will be automatically derived from the metadata.""" @@ -488,10 +485,10 @@ class RelationBase(_max_posargs(2)): """Juju relation ID. Every new Relation instance gets a unique one, if there's trouble, override.""" - local_app_data: "RawDataBagContents" = dataclasses.field(default_factory=dict) + local_app_data: RawDataBagContents = dataclasses.field(default_factory=dict) """This application's databag for this relation.""" - local_unit_data: "RawDataBagContents" = dataclasses.field( + local_unit_data: RawDataBagContents = dataclasses.field( default_factory=lambda: _DEFAULT_JUJU_DATABAG.copy(), ) """This unit's databag for this relation.""" @@ -511,14 +508,14 @@ def _databags(self): yield self.local_unit_data @property - def _remote_unit_ids(self) -> Tuple["UnitID", ...]: + def _remote_unit_ids(self) -> tuple[UnitID, ...]: """Ids of the units on the other end of this relation.""" raise NotImplementedError() def _get_databag_for_remote( self, unit_id: int, # noqa: U100 - ) -> "RawDataBagContents": + ) -> RawDataBagContents: """Return the databag for some remote unit ID.""" raise NotImplementedError() @@ -535,7 +532,7 @@ def __post_init__(self): def __hash__(self) -> int: return hash(self.id) - def _validate_databag(self, databag: Dict[str, str]): + def _validate_databag(self, databag: dict[str, str]): if not isinstance(databag, dict): raise StateValidationError( f"all databags should be dicts, not {type(databag)}", @@ -567,9 +564,9 @@ class Relation(RelationBase): limit: int = 1 """The maximum number of integrations on this endpoint.""" - remote_app_data: "RawDataBagContents" = dataclasses.field(default_factory=dict) + remote_app_data: RawDataBagContents = dataclasses.field(default_factory=dict) """The current content of the application databag.""" - remote_units_data: Dict["UnitID", "RawDataBagContents"] = dataclasses.field( + remote_units_data: dict[UnitID, RawDataBagContents] = dataclasses.field( default_factory=lambda: {0: _DEFAULT_JUJU_DATABAG.copy()}, # dedup ) """The current content of the databag for each unit in the relation.""" @@ -583,11 +580,11 @@ def _remote_app_name(self) -> str: return self.remote_app_name @property - def _remote_unit_ids(self) -> Tuple["UnitID", ...]: + def _remote_unit_ids(self) -> tuple[UnitID, ...]: """Ids of the units on the other end of this relation.""" return tuple(self.remote_units_data) - def _get_databag_for_remote(self, unit_id: "UnitID") -> "RawDataBagContents": + def _get_databag_for_remote(self, unit_id: UnitID) -> RawDataBagContents: """Return the databag for some remote unit ID.""" return self.remote_units_data[unit_id] @@ -604,9 +601,9 @@ def _databags(self): # type: ignore class SubordinateRelation(RelationBase): """A relation to share data between a subordinate and a principal charm.""" - remote_app_data: "RawDataBagContents" = dataclasses.field(default_factory=dict) + remote_app_data: RawDataBagContents = dataclasses.field(default_factory=dict) """The current content of the remote application databag.""" - remote_unit_data: "RawDataBagContents" = dataclasses.field( + remote_unit_data: RawDataBagContents = dataclasses.field( default_factory=lambda: _DEFAULT_JUJU_DATABAG.copy(), ) """The current content of the remote unit databag.""" @@ -620,11 +617,11 @@ def __hash__(self) -> int: return hash(self.id) @property - def _remote_unit_ids(self) -> Tuple[int]: + def _remote_unit_ids(self) -> tuple[int]: """Ids of the units on the other end of this relation.""" return (self.remote_unit_id,) - def _get_databag_for_remote(self, unit_id: int) -> "RawDataBagContents": + def _get_databag_for_remote(self, unit_id: int) -> RawDataBagContents: """Return the databag for some remote unit ID.""" if unit_id is not self.remote_unit_id: raise ValueError( @@ -651,7 +648,7 @@ def remote_unit_name(self) -> str: class PeerRelation(RelationBase): """A relation to share data between units of the charm.""" - peers_data: Dict["UnitID", "RawDataBagContents"] = dataclasses.field( + peers_data: dict[UnitID, RawDataBagContents] = dataclasses.field( default_factory=lambda: {0: _DEFAULT_JUJU_DATABAG.copy()}, ) """Current contents of the peer databags.""" @@ -668,11 +665,11 @@ def _databags(self): # type: ignore yield from self.peers_data.values() @property - def _remote_unit_ids(self) -> Tuple["UnitID", ...]: + def _remote_unit_ids(self) -> tuple[UnitID, ...]: """Ids of the units on the other end of this relation.""" return tuple(self.peers_data) - def _get_databag_for_remote(self, unit_id: "UnitID") -> "RawDataBagContents": + def _get_databag_for_remote(self, unit_id: UnitID) -> RawDataBagContents: """Return the databag for some remote unit ID.""" return self.peers_data[unit_id] @@ -699,7 +696,7 @@ class Model(_max_posargs(1)): type: Literal["kubernetes", "lxd"] = "kubernetes" """The type of Juju model.""" - cloud_spec: Optional[CloudSpec] = None + cloud_spec: CloudSpec | None = None """Cloud specification information (metadata) including credentials.""" @@ -761,9 +758,9 @@ def _run(self) -> int: class Mount(_max_posargs(0)): """Maps local files to a :class:`Container` filesystem.""" - location: Union[str, PurePosixPath] + location: str | PurePosixPath """The location inside of the container.""" - source: Union[str, Path] + source: str | Path """The content to provide when the charm does :meth:`ops.Container.pull`.""" @@ -801,10 +798,10 @@ class Notice(_max_posargs(1)): id: str = dataclasses.field(default_factory=_next_notice_id) """Unique ID for this notice.""" - user_id: Optional[int] = None + user_id: int | None = None """UID of the user who may view this notice (None means notice is public).""" - type: Union[pebble.NoticeType, str] = pebble.NoticeType.CUSTOM + type: pebble.NoticeType | str = pebble.NoticeType.CUSTOM """Type of the notice.""" first_occurred: datetime.datetime = dataclasses.field(default_factory=_now_utc) @@ -823,13 +820,13 @@ class Notice(_max_posargs(1)): occurrences: int = 1 """The number of times one of these notices has occurred.""" - last_data: Dict[str, str] = dataclasses.field(default_factory=dict) + last_data: dict[str, str] = dataclasses.field(default_factory=dict) """Additional data captured from the last occurrence of one of these notices.""" - repeat_after: Optional[datetime.timedelta] = None + repeat_after: datetime.timedelta | None = None """Minimum time after one of these was last repeated before Pebble will repeat it again.""" - expire_after: Optional[datetime.timedelta] = None + expire_after: datetime.timedelta | None = None """How long since one of these last occurred until Pebble will drop the notice.""" def _to_ops(self) -> pebble.Notice: @@ -855,7 +852,7 @@ class CheckInfo(_max_posargs(1)): name: str """Name of the check.""" - level: Optional[pebble.CheckLevel] = None + level: pebble.CheckLevel | None = None """Level of the check.""" status: pebble.CheckStatus = pebble.CheckStatus.UP @@ -901,13 +898,13 @@ class Container(_max_posargs(1)): # pebble or derive them from the resulting plan (which one CAN get from pebble). # So if we are instantiating Container by fetching info from a 'live' charm, the 'layers' # will be unknown. all that we can know is the resulting plan (the 'computed plan'). - _base_plan: Dict[str, Any] = dataclasses.field(default_factory=dict) + _base_plan: dict[str, Any] = dataclasses.field(default_factory=dict) # We expect most of the user-facing testing to be covered by this 'layers' attribute, # as it is all that will be known when unit-testing. - layers: Dict[str, pebble.Layer] = dataclasses.field(default_factory=dict) + layers: dict[str, pebble.Layer] = dataclasses.field(default_factory=dict) """All :class:`ops.pebble.Layer` definitions that have already been added to the container.""" - service_statuses: Dict[str, pebble.ServiceStatus] = dataclasses.field( + service_statuses: dict[str, pebble.ServiceStatus] = dataclasses.field( default_factory=dict, ) """The current status of each Pebble service running in the container.""" @@ -926,7 +923,7 @@ class Container(_max_posargs(1)): # when the charm runs `pebble.pull`, it will return .open() from one of those paths. # when the charm pushes, it will either overwrite one of those paths (careful!) or it will # create a tempfile and insert its path in the mock filesystem tree - mounts: Dict[str, Mount] = dataclasses.field(default_factory=dict) + mounts: dict[str, Mount] = dataclasses.field(default_factory=dict) """Provides access to the contents of the simulated container filesystem. For example, suppose you want to express that your container has: @@ -954,8 +951,8 @@ class Container(_max_posargs(1)): container = Container( name='foo', execs={ - scenario.Exec(['whoami'], return_code=0, stdout='ubuntu'), - scenario.Exec( + Exec(['whoami'], return_code=0, stdout='ubuntu'), + Exec( ['dig', '+short', 'canonical.com'], return_code=0, stdout='185.125.190.20\\n185.125.190.21', @@ -964,10 +961,10 @@ class Container(_max_posargs(1)): ) """ - notices: List[Notice] = dataclasses.field(default_factory=list) + notices: list[Notice] = dataclasses.field(default_factory=list) """Any Pebble notices that already exist in the container.""" - check_infos: FrozenSet[CheckInfo] = frozenset() + check_infos: frozenset[CheckInfo] = frozenset() """All Pebble health checks that have been added to the container.""" def __hash__(self) -> int: @@ -980,7 +977,7 @@ def __post_init__(self): def _render_services(self): # copied over from ops.testing._TestingPebbleClient._render_services() - services: Dict[str, pebble.Service] = {} + services: dict[str, pebble.Service] = {} for key in sorted(self.layers.keys()): layer = self.layers[key] for name, service in layer.services.items(): @@ -1006,10 +1003,10 @@ def plan(self) -> pebble.Plan: return plan @property - def services(self) -> Dict[str, pebble.ServiceInfo]: + def services(self) -> dict[str, pebble.ServiceInfo]: """The Pebble services as rendered in the plan.""" services = self._render_services() - infos: Dict[str, pebble.ServiceInfo] = {} + infos: dict[str, pebble.ServiceInfo] = {} names = sorted(services.keys()) for name in names: try: @@ -1031,7 +1028,7 @@ def services(self) -> Dict[str, pebble.ServiceInfo]: infos[name] = info return infos - def get_filesystem(self, ctx: "Context") -> Path: + def get_filesystem(self, ctx: Context) -> Path: """Simulated Pebble filesystem in this context. Returns: @@ -1061,7 +1058,7 @@ class _EntityStatus: name: _RawStatusLiteral message: str = "" - _entity_statuses: ClassVar[Dict[str, Type["_EntityStatus"]]] = {} + _entity_statuses: ClassVar[dict[str, type[_EntityStatus]]] = {} def __eq__(self, other: Any): if isinstance(other, (StatusBase, _EntityStatus)): @@ -1079,13 +1076,13 @@ def from_status_name( cls, name: _RawStatusLiteral, message: str = "", - ) -> "_EntityStatus": + ) -> _EntityStatus: # Note that this won't work for UnknownStatus. # All subclasses have a default 'name' attribute, but the type checker can't tell that. return cls._entity_statuses[name](message=message) # type:ignore @classmethod - def from_ops(cls, obj: StatusBase) -> "_EntityStatus": + def from_ops(cls, obj: StatusBase) -> _EntityStatus: return cls.from_status_name(obj.name, obj.message) @@ -1173,7 +1170,7 @@ class MyCharm(ops.CharmBase): """ - owner_path: Optional[str] = None + owner_path: str | None = None """The path to the owner of this StoredState instance. If None, the owner is the Framework. Otherwise, /-separated object names, @@ -1184,7 +1181,7 @@ class MyCharm(ops.CharmBase): # However, it's complex to describe those types, since it's a recursive # definition - even in TypeShed the _Marshallable type includes containers # like list[Any], which seems to defeat the point. - content: Dict[str, Any] = dataclasses.field(default_factory=dict) + content: dict[str, Any] = dataclasses.field(default_factory=dict) """The content of the :class:`ops.StoredState` instance.""" _data_type_name: str = "StoredStateData" @@ -1208,7 +1205,7 @@ class Port(_max_posargs(1)): :class:`UDPPort`, or :class:`ICMPPort` instead. """ - port: Optional[int] = None + port: int | None = None """The port to open. Required for TCP and UDP; not allowed for ICMP.""" protocol: _RawPortProtocolLiteral = "tcp" @@ -1326,7 +1323,7 @@ def __eq__(self, other: object) -> bool: return (self.name, self.index) == (other.name, other.index) return False - def get_filesystem(self, ctx: "Context") -> Path: + def get_filesystem(self, ctx: Context) -> Path: """Simulated filesystem root in this context.""" return ctx._get_storage_root(self.name, self.index) @@ -1337,7 +1334,7 @@ class Resource(_max_posargs(0)): name: str """The name of the resource, as found in the charm metadata.""" - path: Union[str, Path] + path: str | Path """A local path that will be provided to the charm as the content of the resource.""" @@ -1350,11 +1347,11 @@ class State(_max_posargs(0)): return data from `State.leader`, and so on. """ - config: Dict[str, Union[str, int, float, bool]] = dataclasses.field( + config: dict[str, str | int | float | bool] = dataclasses.field( default_factory=dict, ) """The present configuration of this charm.""" - relations: Iterable["RelationBase"] = dataclasses.field(default_factory=frozenset) + relations: Iterable[RelationBase] = dataclasses.field(default_factory=frozenset) """All relations that currently exist for this charm.""" networks: Iterable[Network] = dataclasses.field(default_factory=frozenset) """Manual overrides for any relation and extra bindings currently provisioned for this charm. @@ -1396,9 +1393,9 @@ class State(_max_posargs(0)): # dispatched, and represent the events that had been deferred during the previous run. # If the charm defers any events during "this execution", they will be appended # to this list. - deferred: List["DeferredEvent"] = dataclasses.field(default_factory=list) + deferred: list[DeferredEvent] = dataclasses.field(default_factory=list) """Events that have been deferred on this charm by some previous execution.""" - stored_states: Iterable["StoredState"] = dataclasses.field( + stored_states: Iterable[StoredState] = dataclasses.field( default_factory=frozenset, ) """Contents of a charm's stored state.""" @@ -1479,12 +1476,12 @@ def _update_status( # bypass frozen dataclass object.__setattr__(self, name, new_status) - def _update_opened_ports(self, new_ports: FrozenSet[Port]): + def _update_opened_ports(self, new_ports: frozenset[Port]): """Update the current opened ports.""" # bypass frozen dataclass object.__setattr__(self, "opened_ports", new_ports) - def _update_secrets(self, new_secrets: FrozenSet[Secret]): + def _update_secrets(self, new_secrets: frozenset[Secret]): """Update the current secrets.""" # bypass frozen dataclass object.__setattr__(self, "secrets", new_secrets) @@ -1506,8 +1503,8 @@ def get_network(self, binding_name: str, /) -> Network: def get_secret( self, *, - id: Optional[str] = None, - label: Optional[str] = None, + id: str | None = None, + label: str | None = None, ) -> Secret: """Get secret from this State, based on the secret's id or label.""" if id is None and label is None: @@ -1527,7 +1524,7 @@ def get_stored_state( stored_state: str, /, *, - owner_path: Optional[str] = None, + owner_path: str | None = None, ) -> StoredState: """Get stored state from this State, based on the stored state's name and owner_path.""" for ss in self.stored_states: @@ -1540,7 +1537,7 @@ def get_storage( storage: str, /, *, - index: Optional[int] = 0, + index: int | None = 0, ) -> Storage: """Get storage from this State, based on the storage's name and index.""" for state_storage in self.storages: @@ -1550,14 +1547,14 @@ def get_storage( f"storage: name={storage}, index={index} not found in the State", ) - def get_relation(self, relation: int, /) -> "RelationBase": + def get_relation(self, relation: int, /) -> RelationBase: """Get relation from this State, based on the relation's id.""" for state_relation in self.relations: if state_relation.id == relation: return state_relation raise KeyError(f"relation: id={relation} not found in the State") - def get_relations(self, endpoint: str) -> Tuple["RelationBase", ...]: + def get_relations(self, endpoint: str) -> tuple[RelationBase, ...]: """Get all relations on this endpoint from the current state.""" # we rather normalize the endpoint than worry about cursed metadata situations such as: @@ -1573,7 +1570,7 @@ def get_relations(self, endpoint: str) -> Tuple["RelationBase", ...]: ) -def _is_valid_charmcraft_25_metadata(meta: Dict[str, Any]): +def _is_valid_charmcraft_25_metadata(meta: dict[str, Any]): # Check whether this dict has the expected mandatory metadata fields according to the # charmcraft >2.5 charmcraft.yaml schema if (config_type := meta.get("type")) != "charm": @@ -1591,10 +1588,10 @@ def _is_valid_charmcraft_25_metadata(meta: Dict[str, Any]): class _CharmSpec(Generic[CharmType]): """Charm spec.""" - charm_type: Type[CharmBase] - meta: Dict[str, Any] - actions: Optional[Dict[str, Any]] = None - config: Optional[Dict[str, Any]] = None + charm_type: type[CharmBase] + meta: dict[str, Any] + actions: dict[str, Any] | None = None + config: dict[str, Any] | None = None # autoloaded means: we are running a 'real' charm class, living in some # /src/charm.py, and the metadata files are 'real' metadata files. @@ -1606,7 +1603,7 @@ def _load_metadata_legacy(charm_root: Path): # back in the days, we used to have separate metadata.yaml, config.yaml and actions.yaml # files for charm metadata. metadata_path = charm_root / "metadata.yaml" - meta: Dict[str, Any] = ( + meta: dict[str, Any] = ( yaml.safe_load(metadata_path.open()) if metadata_path.exists() else {} ) @@ -1621,7 +1618,7 @@ def _load_metadata_legacy(charm_root: Path): def _load_metadata(charm_root: Path): """Load metadata from charm projects created with Charmcraft >= 2.5.""" metadata_path = charm_root / "charmcraft.yaml" - meta: Dict[str, Any] = ( + meta: dict[str, Any] = ( yaml.safe_load(metadata_path.open()) if metadata_path.exists() else {} ) if not _is_valid_charmcraft_25_metadata(meta): @@ -1631,7 +1628,7 @@ def _load_metadata(charm_root: Path): return meta, config, actions @staticmethod - def autoload(charm_type: Type[CharmBase]) -> "_CharmSpec[CharmType]": + def autoload(charm_type: type[CharmBase]) -> _CharmSpec[CharmType]: """Construct a ``_CharmSpec`` object by looking up the metadata from the charm's repo root. Will attempt to load the metadata off the ``charmcraft.yaml`` file @@ -1662,7 +1659,7 @@ def autoload(charm_type: Type[CharmBase]) -> "_CharmSpec[CharmType]": is_autoloaded=True, ) - def get_all_relations(self) -> List[Tuple[str, Dict[str, str]]]: + def get_all_relations(self) -> list[tuple[str, dict[str, str]]]: """A list of all relation endpoints defined in the metadata.""" return list( chain( @@ -1690,7 +1687,7 @@ class DeferredEvent: observer: str # needs to be marshal.dumps-able. - snapshot_data: Dict[Any, Any] = dataclasses.field(default_factory=dict) + snapshot_data: dict[Any, Any] = dataclasses.field(default_factory=dict) # It would be nicer if people could do something like: # `isinstance(state.deferred[0], ops.StartEvent)` @@ -1717,7 +1714,7 @@ class _EventType(str, Enum): class _EventPath(str): if TYPE_CHECKING: # pragma: no cover name: str - owner_path: List[str] + owner_path: list[str] suffix: str prefix: str is_custom: bool @@ -1742,7 +1739,7 @@ def __new__(cls, string: str): return instance @staticmethod - def _get_suffix_and_type(s: str) -> Tuple[str, _EventType]: + def _get_suffix_and_type(s: str) -> tuple[str, _EventType]: for suffix in _RELATION_EVENTS_SUFFIX: if s.endswith(suffix): return suffix, _EventType.relation @@ -1787,35 +1784,35 @@ class _Event: # type: ignore """ path: str - args: Tuple[Any, ...] = () - kwargs: Dict[str, Any] = dataclasses.field(default_factory=dict) + args: tuple[Any, ...] = () + kwargs: dict[str, Any] = dataclasses.field(default_factory=dict) - storage: Optional["Storage"] = None + storage: Storage | None = None """If this is a storage event, the storage it refers to.""" - relation: Optional["RelationBase"] = None + relation: RelationBase | None = None """If this is a relation event, the relation it refers to.""" - relation_remote_unit_id: Optional[int] = None - relation_departed_unit_id: Optional[int] = None + relation_remote_unit_id: int | None = None + relation_departed_unit_id: int | None = None - secret: Optional[Secret] = None + secret: Secret | None = None """If this is a secret event, the secret it refers to.""" # if this is a secret-removed or secret-expired event, the secret revision it refers to - secret_revision: Optional[int] = None + secret_revision: int | None = None - container: Optional[Container] = None + container: Container | None = None """If this is a workload (container) event, the container it refers to.""" - notice: Optional[Notice] = None + notice: Notice | None = None """If this is a Pebble notice event, the notice it refers to.""" - check_info: Optional[CheckInfo] = None + check_info: CheckInfo | None = None """If this is a Pebble check event, the check info it provides.""" - action: Optional["_Action"] = None + action: _Action | None = None """If this is an action event, the :class:`Action` it refers to.""" - _owner_path: List[str] = dataclasses.field(default_factory=list) + _owner_path: list[str] = dataclasses.field(default_factory=list) def __post_init__(self): path = _EventPath(self.path) @@ -1841,7 +1838,7 @@ def name(self) -> str: return self._path.name @property - def owner_path(self) -> List[str]: + def owner_path(self) -> list[str]: """Path to the ObjectEvents instance owning this event. If this event is defined on the toplevel charm class, it should be ['on']. @@ -1875,7 +1872,7 @@ def _is_workload_event(self) -> bool: # this method is private because _CharmSpec is not quite user-facing; also, # the user should know. - def _is_builtin_event(self, charm_spec: "_CharmSpec[CharmType]") -> bool: + def _is_builtin_event(self, charm_spec: _CharmSpec[CharmType]) -> bool: """Determine whether the event is a custom-defined one or a builtin one.""" event_name = self.name @@ -1909,7 +1906,7 @@ def deferred(self, handler: Callable[..., Any], event_id: int = 1) -> DeferredEv # Many events have no snapshot data: install, start, stop, remove, config-changed, # upgrade-charm, pre-series-upgrade, post-series-upgrade, leader-elected, # leader-settings-changed, collect-metrics - snapshot_data: Dict[str, Any] = {} + snapshot_data: dict[str, Any] = {} # fixme: at this stage we can't determine if the event is a builtin one or not; if it is # not, then the coming checks are meaningless: the custom event could be named like a @@ -2020,10 +2017,10 @@ class _Action(_max_posargs(1)): Used to simulate ``juju run``, passing in any parameters. For example:: def test_backup_action(): - ctx = scenario.Context(MyCharm) + ctx = Context(MyCharm) state = ctx.run( ctx.on.action('do_backup', params={'filename': 'foo'}), - scenario.State() + State(), ) assert ctx.action_results == ... """ @@ -2031,7 +2028,7 @@ def test_backup_action(): name: str """Juju action name, as found in the charm metadata.""" - params: Mapping[str, "AnyJson"] = dataclasses.field(default_factory=dict) + params: Mapping[str, AnyJson] = dataclasses.field(default_factory=dict) """Parameter values passed to the action.""" id: str = dataclasses.field(default_factory=_next_action_id)