diff --git a/testing/src/context.py b/testing/src/context.py index 5ecf4a57f..44aea5056 100644 --- a/testing/src/context.py +++ b/testing/src/context.py @@ -2,6 +2,8 @@ # Copyright 2023 Canonical Ltd. # See LICENSE file for licensing details. +from __future__ import annotations + import functools import tempfile from contextlib import contextmanager @@ -10,12 +12,7 @@ TYPE_CHECKING, Any, Callable, - Dict, - List, Mapping, - Optional, - Type, - Union, cast, ) @@ -79,9 +76,9 @@ class Manager: def __init__( self, - ctx: "Context", + ctx: Context, arg: _Event, - state_in: "State", + state_in: State, ): self._ctx = ctx self._arg = arg @@ -89,7 +86,7 @@ def __init__( self._emitted: bool = False - self.ops: Optional["Ops"] = None + self.ops: Ops | None = None @property def charm(self) -> ops.CharmBase: @@ -113,7 +110,7 @@ def __enter__(self): self.ops = ops return self - def run(self) -> "State": + def run(self) -> State: """Emit the event and proceed with charm execution. This can only be done once. @@ -263,12 +260,12 @@ def collect_unit_status(): @staticmethod @_copy_doc(ops.RelationCreatedEvent) - def relation_created(relation: "RelationBase"): + def relation_created(relation: RelationBase): return _Event(f"{relation.endpoint}_relation_created", relation=relation) @staticmethod @_copy_doc(ops.RelationJoinedEvent) - def relation_joined(relation: "RelationBase", *, remote_unit: Optional[int] = None): + def relation_joined(relation: RelationBase, *, remote_unit: int | None = None): return _Event( f"{relation.endpoint}_relation_joined", relation=relation, @@ -278,9 +275,9 @@ def relation_joined(relation: "RelationBase", *, remote_unit: Optional[int] = No @staticmethod @_copy_doc(ops.RelationChangedEvent) def relation_changed( - relation: "RelationBase", + relation: RelationBase, *, - remote_unit: Optional[int] = None, + remote_unit: int | None = None, ): return _Event( f"{relation.endpoint}_relation_changed", @@ -291,10 +288,10 @@ def relation_changed( @staticmethod @_copy_doc(ops.RelationDepartedEvent) def relation_departed( - relation: "RelationBase", + relation: RelationBase, *, - remote_unit: Optional[int] = None, - departing_unit: Optional[int] = None, + remote_unit: int | None = None, + departing_unit: int | None = None, ): return _Event( f"{relation.endpoint}_relation_departed", @@ -305,7 +302,7 @@ def relation_departed( @staticmethod @_copy_doc(ops.RelationBrokenEvent) - def relation_broken(relation: "RelationBase"): + def relation_broken(relation: RelationBase): return _Event(f"{relation.endpoint}_relation_broken", relation=relation) @staticmethod @@ -354,10 +351,10 @@ def pebble_check_recovered(container: Container, info: CheckInfo): @_copy_doc(ops.ActionEvent) def action( name: str, - params: Optional[Mapping[str, "AnyJson"]] = None, - id: Optional[str] = None, + params: Mapping[str, AnyJson] | None = None, + id: str | None = None, ): - kwargs: Dict[str, Any] = {} + kwargs: dict[str, Any] = {} if params: kwargs["params"] = params if id: @@ -423,26 +420,26 @@ def test_foo(): manager.run() """ - juju_log: List["JujuLogLine"] + juju_log: list[JujuLogLine] """A record of what the charm has sent to juju-log""" - app_status_history: List["_EntityStatus"] + app_status_history: list[_EntityStatus] """A record of the app statuses the charm has set""" - unit_status_history: List["_EntityStatus"] + unit_status_history: list[_EntityStatus] """A record of the unit statuses the charm has set""" - workload_version_history: List[str] + workload_version_history: list[str] """A record of the workload versions the charm has set""" - removed_secret_revisions: List[int] + removed_secret_revisions: list[int] """A record of the secret revisions the charm has removed""" - emitted_events: List[ops.EventBase] + emitted_events: list[ops.EventBase] """A record of the events (including custom) that the charm has processed""" - requested_storages: Dict[str, int] + requested_storages: dict[str, int] """A record of the storages the charm has requested""" - action_logs: List[str] + action_logs: list[str] """The logs associated with the action output, set by the charm with :meth:`ops.ActionEvent.log` This will be empty when handling a non-action event. """ - action_results: Optional[Dict[str, Any]] + action_results: dict[str, Any] | None """A key-value mapping assigned by the charm as a result of the action. This will be ``None`` if the charm never calls :meth:`ops.ActionEvent.set_results` @@ -455,17 +452,17 @@ def test_foo(): def __init__( self, - charm_type: Type["CharmType"], - meta: Optional[Dict[str, Any]] = None, + charm_type: type[CharmType], + meta: dict[str, Any] | None = None, *, - actions: Optional[Dict[str, Any]] = None, - config: Optional[Dict[str, Any]] = None, - charm_root: Optional[Union[str, Path]] = None, + actions: dict[str, Any] | None = None, + config: dict[str, Any] | None = None, + charm_root: str | Path | None = None, juju_version: str = _DEFAULT_JUJU_VERSION, capture_deferred_events: bool = False, capture_framework_events: bool = False, - app_name: Optional[str] = None, - unit_id: Optional[int] = 0, + app_name: str | None = None, + unit_id: int | None = 0, app_trusted: bool = False, ): """Represents a simulated charm's execution context. @@ -534,26 +531,26 @@ def __init__( self.capture_framework_events = capture_framework_events # streaming side effects from running an event - self.juju_log: List["JujuLogLine"] = [] - self.app_status_history: List["_EntityStatus"] = [] - self.unit_status_history: List["_EntityStatus"] = [] - self.exec_history: Dict[str, List["ExecArgs"]] = {} - self.workload_version_history: List[str] = [] - self.removed_secret_revisions: List[int] = [] - self.emitted_events: List[ops.EventBase] = [] - self.requested_storages: Dict[str, int] = {} + self.juju_log: list[JujuLogLine] = [] + self.app_status_history: list[_EntityStatus] = [] + self.unit_status_history: list[_EntityStatus] = [] + self.exec_history: dict[str, list[ExecArgs]] = {} + self.workload_version_history: list[str] = [] + self.removed_secret_revisions: list[int] = [] + self.emitted_events: list[ops.EventBase] = [] + self.requested_storages: dict[str, int] = {} # set by Runtime.exec() in self._run() - self._output_state: Optional["State"] = None + self._output_state: State | None = None # operations (and embedded tasks) from running actions - self.action_logs: List[str] = [] - self.action_results: Optional[Dict[str, Any]] = None - self._action_failure_message: Optional[str] = None + self.action_logs: list[str] = [] + self.action_results: dict[str, Any] | None = None + self._action_failure_message: str | None = None self.on = CharmEvents() - def _set_output_state(self, output_state: "State"): + def _set_output_state(self, output_state: State): """Hook for Runtime to set the output state.""" self._output_state = output_state @@ -568,14 +565,14 @@ def _get_storage_root(self, name: str, index: int) -> Path: storage_root.mkdir(parents=True, exist_ok=True) return storage_root - def _record_status(self, state: "State", is_app: bool): + def _record_status(self, state: State, is_app: bool): """Record the previous status before a status change.""" if is_app: self.app_status_history.append(state.app_status) else: self.unit_status_history.append(state.unit_status) - def __call__(self, event: "_Event", state: "State"): + def __call__(self, event: _Event, state: State): """Context manager to introspect live charm object before and after the event is emitted. Usage:: @@ -592,7 +589,7 @@ def __call__(self, event: "_Event", state: "State"): """ return Manager(self, event, state) - def run_action(self, action: str, state: "State"): + def run_action(self, action: str, state: State): """Use `run()` instead. :private: @@ -602,7 +599,7 @@ def run_action(self, action: str, state: "State"): "and find the results in `ctx.action_results`", ) - def run(self, event: "_Event", state: "State") -> "State": + def run(self, event: _Event, state: State) -> State: """Trigger a charm execution with an event and a State. Calling this function will call ``ops.main`` and set up the context according to the @@ -680,7 +677,7 @@ def run(self, event: "_Event", state: "State") -> "State": return self._output_state @contextmanager - def _run(self, event: "_Event", state: "State"): + def _run(self, event: _Event, state: State): runtime = Runtime( charm_spec=self.charm_spec, juju_version=self.juju_version, diff --git a/testing/src/errors.py b/testing/src/errors.py index 56a01d126..81049eaa6 100644 --- a/testing/src/errors.py +++ b/testing/src/errors.py @@ -38,7 +38,7 @@ class StateValidationError(RuntimeError): class MetadataNotFoundError(RuntimeError): - """Raised when Scenario can't find a metadata file in the provided charm root.""" + """Raised when a metadata file can't be found in the provided charm root.""" class ActionMissingFromContextError(Exception): diff --git a/testing/src/state.py b/testing/src/state.py index 5ff4d71ed..8e46afe1f 100644 --- a/testing/src/state.py +++ b/testing/src/state.py @@ -2,7 +2,9 @@ # Copyright 2023 Canonical Ltd. # See LICENSE file for licensing details. -"""The core Scenario State object, and the components inside it.""" +"""The core State object, and the components inside it.""" + +from __future__ import annotations import dataclasses import datetime @@ -20,18 +22,13 @@ ClassVar, Dict, Final, - FrozenSet, Generic, Iterable, List, Literal, Mapping, NoReturn, - Optional, Sequence, - Set, - Tuple, - Type, TypeVar, Union, cast, @@ -201,14 +198,14 @@ class CloudCredential(_max_posargs(0)): auth_type: str """Authentication type.""" - attributes: Dict[str, str] = dataclasses.field(default_factory=dict) + attributes: dict[str, str] = dataclasses.field(default_factory=dict) """A dictionary containing cloud credentials. For example, for AWS, it contains `access-key` and `secret-key`; for Azure, `application-id`, `application-password` and `subscription-id` can be found here. """ - redacted: List[str] = dataclasses.field(default_factory=list) + redacted: list[str] = dataclasses.field(default_factory=list) """A list of redacted generic cloud API secrets.""" def _to_ops(self) -> CloudCredential_Ops: @@ -229,22 +226,22 @@ class CloudSpec(_max_posargs(1)): name: str = "localhost" """Juju cloud name.""" - region: Optional[str] = None + region: str | None = None """Region of the cloud.""" - endpoint: Optional[str] = None + endpoint: str | None = None """Endpoint of the cloud.""" - identity_endpoint: Optional[str] = None + identity_endpoint: str | None = None """Identity endpoint of the cloud.""" - storage_endpoint: Optional[str] = None + storage_endpoint: str | None = None """Storage endpoint of the cloud.""" - credential: Optional[CloudCredential] = None + credential: CloudCredential | None = None """Cloud credentials with key-value attributes.""" - ca_certificates: List[str] = dataclasses.field(default_factory=list) + ca_certificates: list[str] = dataclasses.field(default_factory=list) """A list of CA certificates.""" skip_tls_verify: bool = False @@ -284,12 +281,12 @@ class Secret(_max_posargs(1)): This class is used for both user and charm secrets. """ - tracked_content: "RawSecretRevisionContents" + tracked_content: RawSecretRevisionContents """The content of the secret that the charm is currently tracking. This is the content the charm will receive with a :meth:`ops.Secret.get_content` call.""" - latest_content: Optional["RawSecretRevisionContents"] = None + latest_content: RawSecretRevisionContents | None = None """The content of the latest revision of the secret. This is the content the charm will receive with a @@ -309,20 +306,20 @@ class Secret(_max_posargs(1)): to this unit. """ - remote_grants: Dict[int, Set[str]] = dataclasses.field(default_factory=dict) + remote_grants: dict[int, set[str]] = dataclasses.field(default_factory=dict) """Mapping from relation IDs to remote units and applications to which this secret has been granted.""" - label: Optional[str] = None + label: str | None = None """A human-readable label the charm can use to retrieve the secret. If this is set, it implies that the charm has previously set the label. """ - description: Optional[str] = None + description: str | None = None """A human-readable description of the secret.""" - expire: Optional[datetime.datetime] = None + expire: datetime.datetime | None = None """The time at which the secret will expire.""" - rotate: Optional[SecretRotate] = None + rotate: SecretRotate | None = None """The rotation policy for the secret.""" # what revision is currently tracked by this charm. Only meaningful if owner=False @@ -351,11 +348,11 @@ def _track_latest_revision(self): def _update_metadata( self, - content: Optional["RawSecretRevisionContents"] = None, - label: Optional[str] = None, - description: Optional[str] = None, - expire: Optional[datetime.datetime] = None, - rotate: Optional[SecretRotate] = None, + content: RawSecretRevisionContents | None = None, + label: str | None = None, + description: str | None = None, + expire: datetime.datetime | None = None, + rotate: SecretRotate | None = None, ): """Update the metadata.""" # bypass frozen dataclass @@ -407,11 +404,11 @@ def address(self, value: str): class BindAddress(_max_posargs(1)): """An address bound to a network interface in a Juju space.""" - addresses: List[Address] + addresses: list[Address] """The addresses in the space.""" interface_name: str = "" """The name of the network interface.""" - mac_address: Optional[str] = None + mac_address: str | None = None """The MAC address of the interface.""" def _hook_tool_output_fmt(self): @@ -432,15 +429,15 @@ class Network(_max_posargs(2)): binding_name: str """The name of the network space.""" - bind_addresses: List[BindAddress] = dataclasses.field( + bind_addresses: list[BindAddress] = dataclasses.field( default_factory=lambda: [BindAddress([Address("192.0.2.0")])], ) """Addresses that the charm's application should bind to.""" - ingress_addresses: List[str] = dataclasses.field( + ingress_addresses: list[str] = dataclasses.field( default_factory=lambda: ["192.0.2.0"], ) """Addresses other applications should use to connect to the unit.""" - egress_subnets: List[str] = dataclasses.field( + egress_subnets: list[str] = dataclasses.field( default_factory=lambda: ["192.0.2.0/24"], ) """Subnets that other units will see the charm connecting from.""" @@ -480,7 +477,7 @@ class RelationBase(_max_posargs(2)): endpoint: str """Relation endpoint name. Must match some endpoint name defined in the metadata.""" - interface: Optional[str] = None + interface: str | None = None """Interface name. Must match the interface name attached to this endpoint in the metadata. If left empty, it will be automatically derived from the metadata.""" @@ -488,10 +485,10 @@ class RelationBase(_max_posargs(2)): """Juju relation ID. Every new Relation instance gets a unique one, if there's trouble, override.""" - local_app_data: "RawDataBagContents" = dataclasses.field(default_factory=dict) + local_app_data: RawDataBagContents = dataclasses.field(default_factory=dict) """This application's databag for this relation.""" - local_unit_data: "RawDataBagContents" = dataclasses.field( + local_unit_data: RawDataBagContents = dataclasses.field( default_factory=lambda: _DEFAULT_JUJU_DATABAG.copy(), ) """This unit's databag for this relation.""" @@ -511,14 +508,14 @@ def _databags(self): yield self.local_unit_data @property - def _remote_unit_ids(self) -> Tuple["UnitID", ...]: + def _remote_unit_ids(self) -> tuple[UnitID, ...]: """Ids of the units on the other end of this relation.""" raise NotImplementedError() def _get_databag_for_remote( self, unit_id: int, # noqa: U100 - ) -> "RawDataBagContents": + ) -> RawDataBagContents: """Return the databag for some remote unit ID.""" raise NotImplementedError() @@ -535,7 +532,7 @@ def __post_init__(self): def __hash__(self) -> int: return hash(self.id) - def _validate_databag(self, databag: Dict[str, str]): + def _validate_databag(self, databag: dict[str, str]): if not isinstance(databag, dict): raise StateValidationError( f"all databags should be dicts, not {type(databag)}", @@ -567,9 +564,9 @@ class Relation(RelationBase): limit: int = 1 """The maximum number of integrations on this endpoint.""" - remote_app_data: "RawDataBagContents" = dataclasses.field(default_factory=dict) + remote_app_data: RawDataBagContents = dataclasses.field(default_factory=dict) """The current content of the application databag.""" - remote_units_data: Dict["UnitID", "RawDataBagContents"] = dataclasses.field( + remote_units_data: dict[UnitID, RawDataBagContents] = dataclasses.field( default_factory=lambda: {0: _DEFAULT_JUJU_DATABAG.copy()}, # dedup ) """The current content of the databag for each unit in the relation.""" @@ -583,11 +580,11 @@ def _remote_app_name(self) -> str: return self.remote_app_name @property - def _remote_unit_ids(self) -> Tuple["UnitID", ...]: + def _remote_unit_ids(self) -> tuple[UnitID, ...]: """Ids of the units on the other end of this relation.""" return tuple(self.remote_units_data) - def _get_databag_for_remote(self, unit_id: "UnitID") -> "RawDataBagContents": + def _get_databag_for_remote(self, unit_id: UnitID) -> RawDataBagContents: """Return the databag for some remote unit ID.""" return self.remote_units_data[unit_id] @@ -604,9 +601,9 @@ def _databags(self): # type: ignore class SubordinateRelation(RelationBase): """A relation to share data between a subordinate and a principal charm.""" - remote_app_data: "RawDataBagContents" = dataclasses.field(default_factory=dict) + remote_app_data: RawDataBagContents = dataclasses.field(default_factory=dict) """The current content of the remote application databag.""" - remote_unit_data: "RawDataBagContents" = dataclasses.field( + remote_unit_data: RawDataBagContents = dataclasses.field( default_factory=lambda: _DEFAULT_JUJU_DATABAG.copy(), ) """The current content of the remote unit databag.""" @@ -620,11 +617,11 @@ def __hash__(self) -> int: return hash(self.id) @property - def _remote_unit_ids(self) -> Tuple[int]: + def _remote_unit_ids(self) -> tuple[int]: """Ids of the units on the other end of this relation.""" return (self.remote_unit_id,) - def _get_databag_for_remote(self, unit_id: int) -> "RawDataBagContents": + def _get_databag_for_remote(self, unit_id: int) -> RawDataBagContents: """Return the databag for some remote unit ID.""" if unit_id is not self.remote_unit_id: raise ValueError( @@ -651,7 +648,7 @@ def remote_unit_name(self) -> str: class PeerRelation(RelationBase): """A relation to share data between units of the charm.""" - peers_data: Dict["UnitID", "RawDataBagContents"] = dataclasses.field( + peers_data: dict[UnitID, RawDataBagContents] = dataclasses.field( default_factory=lambda: {0: _DEFAULT_JUJU_DATABAG.copy()}, ) """Current contents of the peer databags.""" @@ -668,11 +665,11 @@ def _databags(self): # type: ignore yield from self.peers_data.values() @property - def _remote_unit_ids(self) -> Tuple["UnitID", ...]: + def _remote_unit_ids(self) -> tuple[UnitID, ...]: """Ids of the units on the other end of this relation.""" return tuple(self.peers_data) - def _get_databag_for_remote(self, unit_id: "UnitID") -> "RawDataBagContents": + def _get_databag_for_remote(self, unit_id: UnitID) -> RawDataBagContents: """Return the databag for some remote unit ID.""" return self.peers_data[unit_id] @@ -699,7 +696,7 @@ class Model(_max_posargs(1)): type: Literal["kubernetes", "lxd"] = "kubernetes" """The type of Juju model.""" - cloud_spec: Optional[CloudSpec] = None + cloud_spec: CloudSpec | None = None """Cloud specification information (metadata) including credentials.""" @@ -761,9 +758,9 @@ def _run(self) -> int: class Mount(_max_posargs(0)): """Maps local files to a :class:`Container` filesystem.""" - location: Union[str, PurePosixPath] + location: str | PurePosixPath """The location inside of the container.""" - source: Union[str, Path] + source: str | Path """The content to provide when the charm does :meth:`ops.Container.pull`.""" @@ -801,10 +798,10 @@ class Notice(_max_posargs(1)): id: str = dataclasses.field(default_factory=_next_notice_id) """Unique ID for this notice.""" - user_id: Optional[int] = None + user_id: int | None = None """UID of the user who may view this notice (None means notice is public).""" - type: Union[pebble.NoticeType, str] = pebble.NoticeType.CUSTOM + type: pebble.NoticeType | str = pebble.NoticeType.CUSTOM """Type of the notice.""" first_occurred: datetime.datetime = dataclasses.field(default_factory=_now_utc) @@ -823,13 +820,13 @@ class Notice(_max_posargs(1)): occurrences: int = 1 """The number of times one of these notices has occurred.""" - last_data: Dict[str, str] = dataclasses.field(default_factory=dict) + last_data: dict[str, str] = dataclasses.field(default_factory=dict) """Additional data captured from the last occurrence of one of these notices.""" - repeat_after: Optional[datetime.timedelta] = None + repeat_after: datetime.timedelta | None = None """Minimum time after one of these was last repeated before Pebble will repeat it again.""" - expire_after: Optional[datetime.timedelta] = None + expire_after: datetime.timedelta | None = None """How long since one of these last occurred until Pebble will drop the notice.""" def _to_ops(self) -> pebble.Notice: @@ -855,7 +852,7 @@ class CheckInfo(_max_posargs(1)): name: str """Name of the check.""" - level: Optional[pebble.CheckLevel] = None + level: pebble.CheckLevel | None = None """Level of the check.""" status: pebble.CheckStatus = pebble.CheckStatus.UP @@ -901,13 +898,13 @@ class Container(_max_posargs(1)): # pebble or derive them from the resulting plan (which one CAN get from pebble). # So if we are instantiating Container by fetching info from a 'live' charm, the 'layers' # will be unknown. all that we can know is the resulting plan (the 'computed plan'). - _base_plan: Dict[str, Any] = dataclasses.field(default_factory=dict) + _base_plan: dict[str, Any] = dataclasses.field(default_factory=dict) # We expect most of the user-facing testing to be covered by this 'layers' attribute, # as it is all that will be known when unit-testing. - layers: Dict[str, pebble.Layer] = dataclasses.field(default_factory=dict) + layers: dict[str, pebble.Layer] = dataclasses.field(default_factory=dict) """All :class:`ops.pebble.Layer` definitions that have already been added to the container.""" - service_statuses: Dict[str, pebble.ServiceStatus] = dataclasses.field( + service_statuses: dict[str, pebble.ServiceStatus] = dataclasses.field( default_factory=dict, ) """The current status of each Pebble service running in the container.""" @@ -926,7 +923,7 @@ class Container(_max_posargs(1)): # when the charm runs `pebble.pull`, it will return .open() from one of those paths. # when the charm pushes, it will either overwrite one of those paths (careful!) or it will # create a tempfile and insert its path in the mock filesystem tree - mounts: Dict[str, Mount] = dataclasses.field(default_factory=dict) + mounts: dict[str, Mount] = dataclasses.field(default_factory=dict) """Provides access to the contents of the simulated container filesystem. For example, suppose you want to express that your container has: @@ -954,8 +951,8 @@ class Container(_max_posargs(1)): container = Container( name='foo', execs={ - scenario.Exec(['whoami'], return_code=0, stdout='ubuntu'), - scenario.Exec( + Exec(['whoami'], return_code=0, stdout='ubuntu'), + Exec( ['dig', '+short', 'canonical.com'], return_code=0, stdout='185.125.190.20\\n185.125.190.21', @@ -964,10 +961,10 @@ class Container(_max_posargs(1)): ) """ - notices: List[Notice] = dataclasses.field(default_factory=list) + notices: list[Notice] = dataclasses.field(default_factory=list) """Any Pebble notices that already exist in the container.""" - check_infos: FrozenSet[CheckInfo] = frozenset() + check_infos: frozenset[CheckInfo] = frozenset() """All Pebble health checks that have been added to the container.""" def __hash__(self) -> int: @@ -980,7 +977,7 @@ def __post_init__(self): def _render_services(self): # copied over from ops.testing._TestingPebbleClient._render_services() - services: Dict[str, pebble.Service] = {} + services: dict[str, pebble.Service] = {} for key in sorted(self.layers.keys()): layer = self.layers[key] for name, service in layer.services.items(): @@ -1006,10 +1003,10 @@ def plan(self) -> pebble.Plan: return plan @property - def services(self) -> Dict[str, pebble.ServiceInfo]: + def services(self) -> dict[str, pebble.ServiceInfo]: """The Pebble services as rendered in the plan.""" services = self._render_services() - infos: Dict[str, pebble.ServiceInfo] = {} + infos: dict[str, pebble.ServiceInfo] = {} names = sorted(services.keys()) for name in names: try: @@ -1031,7 +1028,7 @@ def services(self) -> Dict[str, pebble.ServiceInfo]: infos[name] = info return infos - def get_filesystem(self, ctx: "Context") -> Path: + def get_filesystem(self, ctx: Context) -> Path: """Simulated Pebble filesystem in this context. Returns: @@ -1061,7 +1058,7 @@ class _EntityStatus: name: _RawStatusLiteral message: str = "" - _entity_statuses: ClassVar[Dict[str, Type["_EntityStatus"]]] = {} + _entity_statuses: ClassVar[dict[str, type[_EntityStatus]]] = {} def __eq__(self, other: Any): if isinstance(other, (StatusBase, _EntityStatus)): @@ -1079,13 +1076,13 @@ def from_status_name( cls, name: _RawStatusLiteral, message: str = "", - ) -> "_EntityStatus": + ) -> _EntityStatus: # Note that this won't work for UnknownStatus. # All subclasses have a default 'name' attribute, but the type checker can't tell that. return cls._entity_statuses[name](message=message) # type:ignore @classmethod - def from_ops(cls, obj: StatusBase) -> "_EntityStatus": + def from_ops(cls, obj: StatusBase) -> _EntityStatus: return cls.from_status_name(obj.name, obj.message) @@ -1173,7 +1170,7 @@ class MyCharm(ops.CharmBase): """ - owner_path: Optional[str] = None + owner_path: str | None = None """The path to the owner of this StoredState instance. If None, the owner is the Framework. Otherwise, /-separated object names, @@ -1184,7 +1181,7 @@ class MyCharm(ops.CharmBase): # However, it's complex to describe those types, since it's a recursive # definition - even in TypeShed the _Marshallable type includes containers # like list[Any], which seems to defeat the point. - content: Dict[str, Any] = dataclasses.field(default_factory=dict) + content: dict[str, Any] = dataclasses.field(default_factory=dict) """The content of the :class:`ops.StoredState` instance.""" _data_type_name: str = "StoredStateData" @@ -1208,7 +1205,7 @@ class Port(_max_posargs(1)): :class:`UDPPort`, or :class:`ICMPPort` instead. """ - port: Optional[int] = None + port: int | None = None """The port to open. Required for TCP and UDP; not allowed for ICMP.""" protocol: _RawPortProtocolLiteral = "tcp" @@ -1326,7 +1323,7 @@ def __eq__(self, other: object) -> bool: return (self.name, self.index) == (other.name, other.index) return False - def get_filesystem(self, ctx: "Context") -> Path: + def get_filesystem(self, ctx: Context) -> Path: """Simulated filesystem root in this context.""" return ctx._get_storage_root(self.name, self.index) @@ -1337,7 +1334,7 @@ class Resource(_max_posargs(0)): name: str """The name of the resource, as found in the charm metadata.""" - path: Union[str, Path] + path: str | Path """A local path that will be provided to the charm as the content of the resource.""" @@ -1350,11 +1347,11 @@ class State(_max_posargs(0)): return data from `State.leader`, and so on. """ - config: Dict[str, Union[str, int, float, bool]] = dataclasses.field( + config: dict[str, str | int | float | bool] = dataclasses.field( default_factory=dict, ) """The present configuration of this charm.""" - relations: Iterable["RelationBase"] = dataclasses.field(default_factory=frozenset) + relations: Iterable[RelationBase] = dataclasses.field(default_factory=frozenset) """All relations that currently exist for this charm.""" networks: Iterable[Network] = dataclasses.field(default_factory=frozenset) """Manual overrides for any relation and extra bindings currently provisioned for this charm. @@ -1396,9 +1393,9 @@ class State(_max_posargs(0)): # dispatched, and represent the events that had been deferred during the previous run. # If the charm defers any events during "this execution", they will be appended # to this list. - deferred: List["DeferredEvent"] = dataclasses.field(default_factory=list) + deferred: list[DeferredEvent] = dataclasses.field(default_factory=list) """Events that have been deferred on this charm by some previous execution.""" - stored_states: Iterable["StoredState"] = dataclasses.field( + stored_states: Iterable[StoredState] = dataclasses.field( default_factory=frozenset, ) """Contents of a charm's stored state.""" @@ -1479,12 +1476,12 @@ def _update_status( # bypass frozen dataclass object.__setattr__(self, name, new_status) - def _update_opened_ports(self, new_ports: FrozenSet[Port]): + def _update_opened_ports(self, new_ports: frozenset[Port]): """Update the current opened ports.""" # bypass frozen dataclass object.__setattr__(self, "opened_ports", new_ports) - def _update_secrets(self, new_secrets: FrozenSet[Secret]): + def _update_secrets(self, new_secrets: frozenset[Secret]): """Update the current secrets.""" # bypass frozen dataclass object.__setattr__(self, "secrets", new_secrets) @@ -1506,8 +1503,8 @@ def get_network(self, binding_name: str, /) -> Network: def get_secret( self, *, - id: Optional[str] = None, - label: Optional[str] = None, + id: str | None = None, + label: str | None = None, ) -> Secret: """Get secret from this State, based on the secret's id or label.""" if id is None and label is None: @@ -1527,7 +1524,7 @@ def get_stored_state( stored_state: str, /, *, - owner_path: Optional[str] = None, + owner_path: str | None = None, ) -> StoredState: """Get stored state from this State, based on the stored state's name and owner_path.""" for ss in self.stored_states: @@ -1540,7 +1537,7 @@ def get_storage( storage: str, /, *, - index: Optional[int] = 0, + index: int | None = 0, ) -> Storage: """Get storage from this State, based on the storage's name and index.""" for state_storage in self.storages: @@ -1550,14 +1547,14 @@ def get_storage( f"storage: name={storage}, index={index} not found in the State", ) - def get_relation(self, relation: int, /) -> "RelationBase": + def get_relation(self, relation: int, /) -> RelationBase: """Get relation from this State, based on the relation's id.""" for state_relation in self.relations: if state_relation.id == relation: return state_relation raise KeyError(f"relation: id={relation} not found in the State") - def get_relations(self, endpoint: str) -> Tuple["RelationBase", ...]: + def get_relations(self, endpoint: str) -> tuple[RelationBase, ...]: """Get all relations on this endpoint from the current state.""" # we rather normalize the endpoint than worry about cursed metadata situations such as: @@ -1573,7 +1570,7 @@ def get_relations(self, endpoint: str) -> Tuple["RelationBase", ...]: ) -def _is_valid_charmcraft_25_metadata(meta: Dict[str, Any]): +def _is_valid_charmcraft_25_metadata(meta: dict[str, Any]): # Check whether this dict has the expected mandatory metadata fields according to the # charmcraft >2.5 charmcraft.yaml schema if (config_type := meta.get("type")) != "charm": @@ -1591,10 +1588,10 @@ def _is_valid_charmcraft_25_metadata(meta: Dict[str, Any]): class _CharmSpec(Generic[CharmType]): """Charm spec.""" - charm_type: Type[CharmBase] - meta: Dict[str, Any] - actions: Optional[Dict[str, Any]] = None - config: Optional[Dict[str, Any]] = None + charm_type: type[CharmBase] + meta: dict[str, Any] + actions: dict[str, Any] | None = None + config: dict[str, Any] | None = None # autoloaded means: we are running a 'real' charm class, living in some # /src/charm.py, and the metadata files are 'real' metadata files. @@ -1606,7 +1603,7 @@ def _load_metadata_legacy(charm_root: Path): # back in the days, we used to have separate metadata.yaml, config.yaml and actions.yaml # files for charm metadata. metadata_path = charm_root / "metadata.yaml" - meta: Dict[str, Any] = ( + meta: dict[str, Any] = ( yaml.safe_load(metadata_path.open()) if metadata_path.exists() else {} ) @@ -1621,7 +1618,7 @@ def _load_metadata_legacy(charm_root: Path): def _load_metadata(charm_root: Path): """Load metadata from charm projects created with Charmcraft >= 2.5.""" metadata_path = charm_root / "charmcraft.yaml" - meta: Dict[str, Any] = ( + meta: dict[str, Any] = ( yaml.safe_load(metadata_path.open()) if metadata_path.exists() else {} ) if not _is_valid_charmcraft_25_metadata(meta): @@ -1631,7 +1628,7 @@ def _load_metadata(charm_root: Path): return meta, config, actions @staticmethod - def autoload(charm_type: Type[CharmBase]) -> "_CharmSpec[CharmType]": + def autoload(charm_type: type[CharmBase]) -> _CharmSpec[CharmType]: """Construct a ``_CharmSpec`` object by looking up the metadata from the charm's repo root. Will attempt to load the metadata off the ``charmcraft.yaml`` file @@ -1662,7 +1659,7 @@ def autoload(charm_type: Type[CharmBase]) -> "_CharmSpec[CharmType]": is_autoloaded=True, ) - def get_all_relations(self) -> List[Tuple[str, Dict[str, str]]]: + def get_all_relations(self) -> list[tuple[str, dict[str, str]]]: """A list of all relation endpoints defined in the metadata.""" return list( chain( @@ -1690,7 +1687,7 @@ class DeferredEvent: observer: str # needs to be marshal.dumps-able. - snapshot_data: Dict[Any, Any] = dataclasses.field(default_factory=dict) + snapshot_data: dict[Any, Any] = dataclasses.field(default_factory=dict) # It would be nicer if people could do something like: # `isinstance(state.deferred[0], ops.StartEvent)` @@ -1717,7 +1714,7 @@ class _EventType(str, Enum): class _EventPath(str): if TYPE_CHECKING: # pragma: no cover name: str - owner_path: List[str] + owner_path: list[str] suffix: str prefix: str is_custom: bool @@ -1742,7 +1739,7 @@ def __new__(cls, string: str): return instance @staticmethod - def _get_suffix_and_type(s: str) -> Tuple[str, _EventType]: + def _get_suffix_and_type(s: str) -> tuple[str, _EventType]: for suffix in _RELATION_EVENTS_SUFFIX: if s.endswith(suffix): return suffix, _EventType.relation @@ -1787,35 +1784,35 @@ class _Event: # type: ignore """ path: str - args: Tuple[Any, ...] = () - kwargs: Dict[str, Any] = dataclasses.field(default_factory=dict) + args: tuple[Any, ...] = () + kwargs: dict[str, Any] = dataclasses.field(default_factory=dict) - storage: Optional["Storage"] = None + storage: Storage | None = None """If this is a storage event, the storage it refers to.""" - relation: Optional["RelationBase"] = None + relation: RelationBase | None = None """If this is a relation event, the relation it refers to.""" - relation_remote_unit_id: Optional[int] = None - relation_departed_unit_id: Optional[int] = None + relation_remote_unit_id: int | None = None + relation_departed_unit_id: int | None = None - secret: Optional[Secret] = None + secret: Secret | None = None """If this is a secret event, the secret it refers to.""" # if this is a secret-removed or secret-expired event, the secret revision it refers to - secret_revision: Optional[int] = None + secret_revision: int | None = None - container: Optional[Container] = None + container: Container | None = None """If this is a workload (container) event, the container it refers to.""" - notice: Optional[Notice] = None + notice: Notice | None = None """If this is a Pebble notice event, the notice it refers to.""" - check_info: Optional[CheckInfo] = None + check_info: CheckInfo | None = None """If this is a Pebble check event, the check info it provides.""" - action: Optional["_Action"] = None + action: _Action | None = None """If this is an action event, the :class:`Action` it refers to.""" - _owner_path: List[str] = dataclasses.field(default_factory=list) + _owner_path: list[str] = dataclasses.field(default_factory=list) def __post_init__(self): path = _EventPath(self.path) @@ -1841,7 +1838,7 @@ def name(self) -> str: return self._path.name @property - def owner_path(self) -> List[str]: + def owner_path(self) -> list[str]: """Path to the ObjectEvents instance owning this event. If this event is defined on the toplevel charm class, it should be ['on']. @@ -1875,7 +1872,7 @@ def _is_workload_event(self) -> bool: # this method is private because _CharmSpec is not quite user-facing; also, # the user should know. - def _is_builtin_event(self, charm_spec: "_CharmSpec[CharmType]") -> bool: + def _is_builtin_event(self, charm_spec: _CharmSpec[CharmType]) -> bool: """Determine whether the event is a custom-defined one or a builtin one.""" event_name = self.name @@ -1909,7 +1906,7 @@ def deferred(self, handler: Callable[..., Any], event_id: int = 1) -> DeferredEv # Many events have no snapshot data: install, start, stop, remove, config-changed, # upgrade-charm, pre-series-upgrade, post-series-upgrade, leader-elected, # leader-settings-changed, collect-metrics - snapshot_data: Dict[str, Any] = {} + snapshot_data: dict[str, Any] = {} # fixme: at this stage we can't determine if the event is a builtin one or not; if it is # not, then the coming checks are meaningless: the custom event could be named like a @@ -2020,10 +2017,10 @@ class _Action(_max_posargs(1)): Used to simulate ``juju run``, passing in any parameters. For example:: def test_backup_action(): - ctx = scenario.Context(MyCharm) + ctx = Context(MyCharm) state = ctx.run( ctx.on.action('do_backup', params={'filename': 'foo'}), - scenario.State() + State(), ) assert ctx.action_results == ... """ @@ -2031,7 +2028,7 @@ def test_backup_action(): name: str """Juju action name, as found in the charm metadata.""" - params: Mapping[str, "AnyJson"] = dataclasses.field(default_factory=dict) + params: Mapping[str, AnyJson] = dataclasses.field(default_factory=dict) """Parameter values passed to the action.""" id: str = dataclasses.field(default_factory=_next_action_id)