From 7f250a1161c49c193018b90bd769fc88060fa4c3 Mon Sep 17 00:00:00 2001 From: Tony Meyer Date: Fri, 11 Oct 2024 15:10:24 +1300 Subject: [PATCH] Fully automated change, pyupgrade --py38-plus testing/src/\*.py --- testing/src/context.py | 98 +++++++++--------- testing/src/state.py | 220 ++++++++++++++++++++--------------------- 2 files changed, 159 insertions(+), 159 deletions(-) diff --git a/testing/src/context.py b/testing/src/context.py index e04d18be6..598088010 100644 --- a/testing/src/context.py +++ b/testing/src/context.py @@ -81,9 +81,9 @@ class Manager: def __init__( self, - ctx: "Context", + ctx: Context, arg: _Event, - state_in: "State", + state_in: State, ): self._ctx = ctx self._arg = arg @@ -91,7 +91,7 @@ def __init__( self._emitted: bool = False - self.ops: Optional["Ops"] = None + self.ops: Ops | None = None @property def charm(self) -> ops.CharmBase: @@ -115,7 +115,7 @@ def __enter__(self): self.ops = ops return self - def run(self) -> "State": + def run(self) -> State: """Emit the event and proceed with charm execution. This can only be done once. @@ -265,12 +265,12 @@ def collect_unit_status(): @staticmethod @_copy_doc(ops.RelationCreatedEvent) - def relation_created(relation: "RelationBase"): + def relation_created(relation: RelationBase): return _Event(f"{relation.endpoint}_relation_created", relation=relation) @staticmethod @_copy_doc(ops.RelationJoinedEvent) - def relation_joined(relation: "RelationBase", *, remote_unit: Optional[int] = None): + def relation_joined(relation: RelationBase, *, remote_unit: int | None = None): return _Event( f"{relation.endpoint}_relation_joined", relation=relation, @@ -280,9 +280,9 @@ def relation_joined(relation: "RelationBase", *, remote_unit: Optional[int] = No @staticmethod @_copy_doc(ops.RelationChangedEvent) def relation_changed( - relation: "RelationBase", + relation: RelationBase, *, - remote_unit: Optional[int] = None, + remote_unit: int | None = None, ): return _Event( f"{relation.endpoint}_relation_changed", @@ -293,10 +293,10 @@ def relation_changed( @staticmethod @_copy_doc(ops.RelationDepartedEvent) def relation_departed( - relation: "RelationBase", + relation: RelationBase, *, - remote_unit: Optional[int] = None, - departing_unit: Optional[int] = None, + remote_unit: int | None = None, + departing_unit: int | None = None, ): return _Event( f"{relation.endpoint}_relation_departed", @@ -307,7 +307,7 @@ def relation_departed( @staticmethod @_copy_doc(ops.RelationBrokenEvent) - def relation_broken(relation: "RelationBase"): + def relation_broken(relation: RelationBase): return _Event(f"{relation.endpoint}_relation_broken", relation=relation) @staticmethod @@ -356,10 +356,10 @@ def pebble_check_recovered(container: Container, info: CheckInfo): @_copy_doc(ops.ActionEvent) def action( name: str, - params: Optional[Mapping[str, "AnyJson"]] = None, - id: Optional[str] = None, + params: Mapping[str, AnyJson] | None = None, + id: str | None = None, ): - kwargs: Dict[str, Any] = {} + kwargs: dict[str, Any] = {} if params: kwargs["params"] = params if id: @@ -425,26 +425,26 @@ def test_foo(): manager.run() """ - juju_log: List["JujuLogLine"] + juju_log: list[JujuLogLine] """A record of what the charm has sent to juju-log""" - app_status_history: List["_EntityStatus"] + app_status_history: list[_EntityStatus] """A record of the app statuses the charm has set""" - unit_status_history: List["_EntityStatus"] + unit_status_history: list[_EntityStatus] """A record of the unit statuses the charm has set""" - workload_version_history: List[str] + workload_version_history: list[str] """A record of the workload versions the charm has set""" - removed_secret_revisions: List[int] + removed_secret_revisions: list[int] """A record of the secret revisions the charm has removed""" - emitted_events: List[ops.EventBase] + emitted_events: list[ops.EventBase] """A record of the events (including custom) that the charm has processed""" - requested_storages: Dict[str, int] + requested_storages: dict[str, int] """A record of the storages the charm has requested""" - action_logs: List[str] + action_logs: list[str] """The logs associated with the action output, set by the charm with :meth:`ops.ActionEvent.log` This will be empty when handling a non-action event. """ - action_results: Optional[Dict[str, Any]] + action_results: dict[str, Any] | None """A key-value mapping assigned by the charm as a result of the action. This will be ``None`` if the charm never calls :meth:`ops.ActionEvent.set_results` @@ -457,17 +457,17 @@ def test_foo(): def __init__( self, - charm_type: Type["CharmType"], - meta: Optional[Dict[str, Any]] = None, + charm_type: type[CharmType], + meta: dict[str, Any] | None = None, *, - actions: Optional[Dict[str, Any]] = None, - config: Optional[Dict[str, Any]] = None, - charm_root: Optional[Union[str, Path]] = None, + actions: dict[str, Any] | None = None, + config: dict[str, Any] | None = None, + charm_root: str | Path | None = None, juju_version: str = _DEFAULT_JUJU_VERSION, capture_deferred_events: bool = False, capture_framework_events: bool = False, - app_name: Optional[str] = None, - unit_id: Optional[int] = 0, + app_name: str | None = None, + unit_id: int | None = 0, app_trusted: bool = False, ): """Represents a simulated charm's execution context. @@ -536,26 +536,26 @@ def __init__( self.capture_framework_events = capture_framework_events # streaming side effects from running an event - self.juju_log: List["JujuLogLine"] = [] - self.app_status_history: List["_EntityStatus"] = [] - self.unit_status_history: List["_EntityStatus"] = [] - self.exec_history: Dict[str, List["ExecArgs"]] = {} - self.workload_version_history: List[str] = [] - self.removed_secret_revisions: List[int] = [] - self.emitted_events: List[ops.EventBase] = [] - self.requested_storages: Dict[str, int] = {} + self.juju_log: list[JujuLogLine] = [] + self.app_status_history: list[_EntityStatus] = [] + self.unit_status_history: list[_EntityStatus] = [] + self.exec_history: dict[str, list[ExecArgs]] = {} + self.workload_version_history: list[str] = [] + self.removed_secret_revisions: list[int] = [] + self.emitted_events: list[ops.EventBase] = [] + self.requested_storages: dict[str, int] = {} # set by Runtime.exec() in self._run() - self._output_state: Optional["State"] = None + self._output_state: State | None = None # operations (and embedded tasks) from running actions - self.action_logs: List[str] = [] - self.action_results: Optional[Dict[str, Any]] = None - self._action_failure_message: Optional[str] = None + self.action_logs: list[str] = [] + self.action_results: dict[str, Any] | None = None + self._action_failure_message: str | None = None self.on = CharmEvents() - def _set_output_state(self, output_state: "State"): + def _set_output_state(self, output_state: State): """Hook for Runtime to set the output state.""" self._output_state = output_state @@ -570,14 +570,14 @@ def _get_storage_root(self, name: str, index: int) -> Path: storage_root.mkdir(parents=True, exist_ok=True) return storage_root - def _record_status(self, state: "State", is_app: bool): + def _record_status(self, state: State, is_app: bool): """Record the previous status before a status change.""" if is_app: self.app_status_history.append(state.app_status) else: self.unit_status_history.append(state.unit_status) - def __call__(self, event: "_Event", state: "State"): + def __call__(self, event: _Event, state: State): """Context manager to introspect live charm object before and after the event is emitted. Usage:: @@ -594,7 +594,7 @@ def __call__(self, event: "_Event", state: "State"): """ return Manager(self, event, state) - def run_action(self, action: str, state: "State"): + def run_action(self, action: str, state: State): """Use `run()` instead. :private: @@ -604,7 +604,7 @@ def run_action(self, action: str, state: "State"): "and find the results in `ctx.action_results`", ) - def run(self, event: "_Event", state: "State") -> "State": + def run(self, event: _Event, state: State) -> State: """Trigger a charm execution with an event and a State. Calling this function will call ``ops.main`` and set up the context according to the @@ -682,7 +682,7 @@ def run(self, event: "_Event", state: "State") -> "State": return self._output_state @contextmanager - def _run(self, event: "_Event", state: "State"): + def _run(self, event: _Event, state: State): runtime = Runtime( charm_spec=self.charm_spec, juju_version=self.juju_version, diff --git a/testing/src/state.py b/testing/src/state.py index b8d2b97d7..689893025 100644 --- a/testing/src/state.py +++ b/testing/src/state.py @@ -203,14 +203,14 @@ class CloudCredential(_max_posargs(0)): auth_type: str """Authentication type.""" - attributes: Dict[str, str] = dataclasses.field(default_factory=dict) + attributes: dict[str, str] = dataclasses.field(default_factory=dict) """A dictionary containing cloud credentials. For example, for AWS, it contains `access-key` and `secret-key`; for Azure, `application-id`, `application-password` and `subscription-id` can be found here. """ - redacted: List[str] = dataclasses.field(default_factory=list) + redacted: list[str] = dataclasses.field(default_factory=list) """A list of redacted generic cloud API secrets.""" def _to_ops(self) -> CloudCredential_Ops: @@ -231,22 +231,22 @@ class CloudSpec(_max_posargs(1)): name: str = "localhost" """Juju cloud name.""" - region: Optional[str] = None + region: str | None = None """Region of the cloud.""" - endpoint: Optional[str] = None + endpoint: str | None = None """Endpoint of the cloud.""" - identity_endpoint: Optional[str] = None + identity_endpoint: str | None = None """Identity endpoint of the cloud.""" - storage_endpoint: Optional[str] = None + storage_endpoint: str | None = None """Storage endpoint of the cloud.""" - credential: Optional[CloudCredential] = None + credential: CloudCredential | None = None """Cloud credentials with key-value attributes.""" - ca_certificates: List[str] = dataclasses.field(default_factory=list) + ca_certificates: list[str] = dataclasses.field(default_factory=list) """A list of CA certificates.""" skip_tls_verify: bool = False @@ -286,12 +286,12 @@ class Secret(_max_posargs(1)): This class is used for both user and charm secrets. """ - tracked_content: "RawSecretRevisionContents" + tracked_content: RawSecretRevisionContents """The content of the secret that the charm is currently tracking. This is the content the charm will receive with a :meth:`ops.Secret.get_content` call.""" - latest_content: Optional["RawSecretRevisionContents"] = None + latest_content: RawSecretRevisionContents | None = None """The content of the latest revision of the secret. This is the content the charm will receive with a @@ -311,20 +311,20 @@ class Secret(_max_posargs(1)): to this unit. """ - remote_grants: Dict[int, Set[str]] = dataclasses.field(default_factory=dict) + remote_grants: dict[int, set[str]] = dataclasses.field(default_factory=dict) """Mapping from relation IDs to remote units and applications to which this secret has been granted.""" - label: Optional[str] = None + label: str | None = None """A human-readable label the charm can use to retrieve the secret. If this is set, it implies that the charm has previously set the label. """ - description: Optional[str] = None + description: str | None = None """A human-readable description of the secret.""" - expire: Optional[datetime.datetime] = None + expire: datetime.datetime | None = None """The time at which the secret will expire.""" - rotate: Optional[SecretRotate] = None + rotate: SecretRotate | None = None """The rotation policy for the secret.""" # what revision is currently tracked by this charm. Only meaningful if owner=False @@ -353,11 +353,11 @@ def _track_latest_revision(self): def _update_metadata( self, - content: Optional["RawSecretRevisionContents"] = None, - label: Optional[str] = None, - description: Optional[str] = None, - expire: Optional[datetime.datetime] = None, - rotate: Optional[SecretRotate] = None, + content: RawSecretRevisionContents | None = None, + label: str | None = None, + description: str | None = None, + expire: datetime.datetime | None = None, + rotate: SecretRotate | None = None, ): """Update the metadata.""" # bypass frozen dataclass @@ -409,11 +409,11 @@ def address(self, value: str): class BindAddress(_max_posargs(1)): """An address bound to a network interface in a Juju space.""" - addresses: List[Address] + addresses: list[Address] """The addresses in the space.""" interface_name: str = "" """The name of the network interface.""" - mac_address: Optional[str] = None + mac_address: str | None = None """The MAC address of the interface.""" def _hook_tool_output_fmt(self): @@ -434,15 +434,15 @@ class Network(_max_posargs(2)): binding_name: str """The name of the network space.""" - bind_addresses: List[BindAddress] = dataclasses.field( + bind_addresses: list[BindAddress] = dataclasses.field( default_factory=lambda: [BindAddress([Address("192.0.2.0")])], ) """Addresses that the charm's application should bind to.""" - ingress_addresses: List[str] = dataclasses.field( + ingress_addresses: list[str] = dataclasses.field( default_factory=lambda: ["192.0.2.0"], ) """Addresses other applications should use to connect to the unit.""" - egress_subnets: List[str] = dataclasses.field( + egress_subnets: list[str] = dataclasses.field( default_factory=lambda: ["192.0.2.0/24"], ) """Subnets that other units will see the charm connecting from.""" @@ -482,7 +482,7 @@ class RelationBase(_max_posargs(2)): endpoint: str """Relation endpoint name. Must match some endpoint name defined in the metadata.""" - interface: Optional[str] = None + interface: str | None = None """Interface name. Must match the interface name attached to this endpoint in the metadata. If left empty, it will be automatically derived from the metadata.""" @@ -490,10 +490,10 @@ class RelationBase(_max_posargs(2)): """Juju relation ID. Every new Relation instance gets a unique one, if there's trouble, override.""" - local_app_data: "RawDataBagContents" = dataclasses.field(default_factory=dict) + local_app_data: RawDataBagContents = dataclasses.field(default_factory=dict) """This application's databag for this relation.""" - local_unit_data: "RawDataBagContents" = dataclasses.field( + local_unit_data: RawDataBagContents = dataclasses.field( default_factory=lambda: _DEFAULT_JUJU_DATABAG.copy(), ) """This unit's databag for this relation.""" @@ -513,14 +513,14 @@ def _databags(self): yield self.local_unit_data @property - def _remote_unit_ids(self) -> Tuple["UnitID", ...]: + def _remote_unit_ids(self) -> tuple[UnitID, ...]: """Ids of the units on the other end of this relation.""" raise NotImplementedError() def _get_databag_for_remote( self, unit_id: int, # noqa: U100 - ) -> "RawDataBagContents": + ) -> RawDataBagContents: """Return the databag for some remote unit ID.""" raise NotImplementedError() @@ -537,7 +537,7 @@ def __post_init__(self): def __hash__(self) -> int: return hash(self.id) - def _validate_databag(self, databag: Dict[str, str]): + def _validate_databag(self, databag: dict[str, str]): if not isinstance(databag, dict): raise StateValidationError( f"all databags should be dicts, not {type(databag)}", @@ -569,9 +569,9 @@ class Relation(RelationBase): limit: int = 1 """The maximum number of integrations on this endpoint.""" - remote_app_data: "RawDataBagContents" = dataclasses.field(default_factory=dict) + remote_app_data: RawDataBagContents = dataclasses.field(default_factory=dict) """The current content of the application databag.""" - remote_units_data: Dict["UnitID", "RawDataBagContents"] = dataclasses.field( + remote_units_data: dict[UnitID, RawDataBagContents] = dataclasses.field( default_factory=lambda: {0: _DEFAULT_JUJU_DATABAG.copy()}, # dedup ) """The current content of the databag for each unit in the relation.""" @@ -585,11 +585,11 @@ def _remote_app_name(self) -> str: return self.remote_app_name @property - def _remote_unit_ids(self) -> Tuple["UnitID", ...]: + def _remote_unit_ids(self) -> tuple[UnitID, ...]: """Ids of the units on the other end of this relation.""" return tuple(self.remote_units_data) - def _get_databag_for_remote(self, unit_id: "UnitID") -> "RawDataBagContents": + def _get_databag_for_remote(self, unit_id: UnitID) -> RawDataBagContents: """Return the databag for some remote unit ID.""" return self.remote_units_data[unit_id] @@ -606,9 +606,9 @@ def _databags(self): # type: ignore class SubordinateRelation(RelationBase): """A relation to share data between a subordinate and a principal charm.""" - remote_app_data: "RawDataBagContents" = dataclasses.field(default_factory=dict) + remote_app_data: RawDataBagContents = dataclasses.field(default_factory=dict) """The current content of the remote application databag.""" - remote_unit_data: "RawDataBagContents" = dataclasses.field( + remote_unit_data: RawDataBagContents = dataclasses.field( default_factory=lambda: _DEFAULT_JUJU_DATABAG.copy(), ) """The current content of the remote unit databag.""" @@ -622,11 +622,11 @@ def __hash__(self) -> int: return hash(self.id) @property - def _remote_unit_ids(self) -> Tuple[int]: + def _remote_unit_ids(self) -> tuple[int]: """Ids of the units on the other end of this relation.""" return (self.remote_unit_id,) - def _get_databag_for_remote(self, unit_id: int) -> "RawDataBagContents": + def _get_databag_for_remote(self, unit_id: int) -> RawDataBagContents: """Return the databag for some remote unit ID.""" if unit_id is not self.remote_unit_id: raise ValueError( @@ -653,7 +653,7 @@ def remote_unit_name(self) -> str: class PeerRelation(RelationBase): """A relation to share data between units of the charm.""" - peers_data: Dict["UnitID", "RawDataBagContents"] = dataclasses.field( + peers_data: dict[UnitID, RawDataBagContents] = dataclasses.field( default_factory=lambda: {0: _DEFAULT_JUJU_DATABAG.copy()}, ) """Current contents of the peer databags.""" @@ -670,11 +670,11 @@ def _databags(self): # type: ignore yield from self.peers_data.values() @property - def _remote_unit_ids(self) -> Tuple["UnitID", ...]: + def _remote_unit_ids(self) -> tuple[UnitID, ...]: """Ids of the units on the other end of this relation.""" return tuple(self.peers_data) - def _get_databag_for_remote(self, unit_id: "UnitID") -> "RawDataBagContents": + def _get_databag_for_remote(self, unit_id: UnitID) -> RawDataBagContents: """Return the databag for some remote unit ID.""" return self.peers_data[unit_id] @@ -701,7 +701,7 @@ class Model(_max_posargs(1)): type: Literal["kubernetes", "lxd"] = "kubernetes" """The type of Juju model.""" - cloud_spec: Optional[CloudSpec] = None + cloud_spec: CloudSpec | None = None """Cloud specification information (metadata) including credentials.""" @@ -763,9 +763,9 @@ def _run(self) -> int: class Mount(_max_posargs(0)): """Maps local files to a :class:`Container` filesystem.""" - location: Union[str, PurePosixPath] + location: str | PurePosixPath """The location inside of the container.""" - source: Union[str, Path] + source: str | Path """The content to provide when the charm does :meth:`ops.Container.pull`.""" @@ -803,10 +803,10 @@ class Notice(_max_posargs(1)): id: str = dataclasses.field(default_factory=_next_notice_id) """Unique ID for this notice.""" - user_id: Optional[int] = None + user_id: int | None = None """UID of the user who may view this notice (None means notice is public).""" - type: Union[pebble.NoticeType, str] = pebble.NoticeType.CUSTOM + type: pebble.NoticeType | str = pebble.NoticeType.CUSTOM """Type of the notice.""" first_occurred: datetime.datetime = dataclasses.field(default_factory=_now_utc) @@ -825,13 +825,13 @@ class Notice(_max_posargs(1)): occurrences: int = 1 """The number of times one of these notices has occurred.""" - last_data: Dict[str, str] = dataclasses.field(default_factory=dict) + last_data: dict[str, str] = dataclasses.field(default_factory=dict) """Additional data captured from the last occurrence of one of these notices.""" - repeat_after: Optional[datetime.timedelta] = None + repeat_after: datetime.timedelta | None = None """Minimum time after one of these was last repeated before Pebble will repeat it again.""" - expire_after: Optional[datetime.timedelta] = None + expire_after: datetime.timedelta | None = None """How long since one of these last occurred until Pebble will drop the notice.""" def _to_ops(self) -> pebble.Notice: @@ -857,7 +857,7 @@ class CheckInfo(_max_posargs(1)): name: str """Name of the check.""" - level: Optional[pebble.CheckLevel] = None + level: pebble.CheckLevel | None = None """Level of the check.""" status: pebble.CheckStatus = pebble.CheckStatus.UP @@ -903,13 +903,13 @@ class Container(_max_posargs(1)): # pebble or derive them from the resulting plan (which one CAN get from pebble). # So if we are instantiating Container by fetching info from a 'live' charm, the 'layers' # will be unknown. all that we can know is the resulting plan (the 'computed plan'). - _base_plan: Dict[str, Any] = dataclasses.field(default_factory=dict) + _base_plan: dict[str, Any] = dataclasses.field(default_factory=dict) # We expect most of the user-facing testing to be covered by this 'layers' attribute, # as it is all that will be known when unit-testing. - layers: Dict[str, pebble.Layer] = dataclasses.field(default_factory=dict) + layers: dict[str, pebble.Layer] = dataclasses.field(default_factory=dict) """All :class:`ops.pebble.Layer` definitions that have already been added to the container.""" - service_statuses: Dict[str, pebble.ServiceStatus] = dataclasses.field( + service_statuses: dict[str, pebble.ServiceStatus] = dataclasses.field( default_factory=dict, ) """The current status of each Pebble service running in the container.""" @@ -928,7 +928,7 @@ class Container(_max_posargs(1)): # when the charm runs `pebble.pull`, it will return .open() from one of those paths. # when the charm pushes, it will either overwrite one of those paths (careful!) or it will # create a tempfile and insert its path in the mock filesystem tree - mounts: Dict[str, Mount] = dataclasses.field(default_factory=dict) + mounts: dict[str, Mount] = dataclasses.field(default_factory=dict) """Provides access to the contents of the simulated container filesystem. For example, suppose you want to express that your container has: @@ -966,10 +966,10 @@ class Container(_max_posargs(1)): ) """ - notices: List[Notice] = dataclasses.field(default_factory=list) + notices: list[Notice] = dataclasses.field(default_factory=list) """Any Pebble notices that already exist in the container.""" - check_infos: FrozenSet[CheckInfo] = frozenset() + check_infos: frozenset[CheckInfo] = frozenset() """All Pebble health checks that have been added to the container.""" def __hash__(self) -> int: @@ -982,7 +982,7 @@ def __post_init__(self): def _render_services(self): # copied over from ops.testing._TestingPebbleClient._render_services() - services: Dict[str, pebble.Service] = {} + services: dict[str, pebble.Service] = {} for key in sorted(self.layers.keys()): layer = self.layers[key] for name, service in layer.services.items(): @@ -1008,10 +1008,10 @@ def plan(self) -> pebble.Plan: return plan @property - def services(self) -> Dict[str, pebble.ServiceInfo]: + def services(self) -> dict[str, pebble.ServiceInfo]: """The Pebble services as rendered in the plan.""" services = self._render_services() - infos: Dict[str, pebble.ServiceInfo] = {} + infos: dict[str, pebble.ServiceInfo] = {} names = sorted(services.keys()) for name in names: try: @@ -1033,7 +1033,7 @@ def services(self) -> Dict[str, pebble.ServiceInfo]: infos[name] = info return infos - def get_filesystem(self, ctx: "Context") -> Path: + def get_filesystem(self, ctx: Context) -> Path: """Simulated Pebble filesystem in this context. Returns: @@ -1063,7 +1063,7 @@ class _EntityStatus: name: _RawStatusLiteral message: str = "" - _entity_statuses: ClassVar[Dict[str, Type["_EntityStatus"]]] = {} + _entity_statuses: ClassVar[dict[str, type[_EntityStatus]]] = {} def __eq__(self, other: Any): if isinstance(other, (StatusBase, _EntityStatus)): @@ -1081,13 +1081,13 @@ def from_status_name( cls, name: _RawStatusLiteral, message: str = "", - ) -> "_EntityStatus": + ) -> _EntityStatus: # Note that this won't work for UnknownStatus. # All subclasses have a default 'name' attribute, but the type checker can't tell that. return cls._entity_statuses[name](message=message) # type:ignore @classmethod - def from_ops(cls, obj: StatusBase) -> "_EntityStatus": + def from_ops(cls, obj: StatusBase) -> _EntityStatus: return cls.from_status_name(obj.name, obj.message) @@ -1175,7 +1175,7 @@ class MyCharm(ops.CharmBase): """ - owner_path: Optional[str] = None + owner_path: str | None = None """The path to the owner of this StoredState instance. If None, the owner is the Framework. Otherwise, /-separated object names, @@ -1186,7 +1186,7 @@ class MyCharm(ops.CharmBase): # However, it's complex to describe those types, since it's a recursive # definition - even in TypeShed the _Marshallable type includes containers # like list[Any], which seems to defeat the point. - content: Dict[str, Any] = dataclasses.field(default_factory=dict) + content: dict[str, Any] = dataclasses.field(default_factory=dict) """The content of the :class:`ops.StoredState` instance.""" _data_type_name: str = "StoredStateData" @@ -1210,7 +1210,7 @@ class Port(_max_posargs(1)): :class:`UDPPort`, or :class:`ICMPPort` instead. """ - port: Optional[int] = None + port: int | None = None """The port to open. Required for TCP and UDP; not allowed for ICMP.""" protocol: _RawPortProtocolLiteral = "tcp" @@ -1328,7 +1328,7 @@ def __eq__(self, other: object) -> bool: return (self.name, self.index) == (other.name, other.index) return False - def get_filesystem(self, ctx: "Context") -> Path: + def get_filesystem(self, ctx: Context) -> Path: """Simulated filesystem root in this context.""" return ctx._get_storage_root(self.name, self.index) @@ -1339,7 +1339,7 @@ class Resource(_max_posargs(0)): name: str """The name of the resource, as found in the charm metadata.""" - path: Union[str, Path] + path: str | Path """A local path that will be provided to the charm as the content of the resource.""" @@ -1352,11 +1352,11 @@ class State(_max_posargs(0)): return data from `State.leader`, and so on. """ - config: Dict[str, Union[str, int, float, bool]] = dataclasses.field( + config: dict[str, str | int | float | bool] = dataclasses.field( default_factory=dict, ) """The present configuration of this charm.""" - relations: Iterable["RelationBase"] = dataclasses.field(default_factory=frozenset) + relations: Iterable[RelationBase] = dataclasses.field(default_factory=frozenset) """All relations that currently exist for this charm.""" networks: Iterable[Network] = dataclasses.field(default_factory=frozenset) """Manual overrides for any relation and extra bindings currently provisioned for this charm. @@ -1398,9 +1398,9 @@ class State(_max_posargs(0)): # dispatched, and represent the events that had been deferred during the previous run. # If the charm defers any events during "this execution", they will be appended # to this list. - deferred: List["DeferredEvent"] = dataclasses.field(default_factory=list) + deferred: list[DeferredEvent] = dataclasses.field(default_factory=list) """Events that have been deferred on this charm by some previous execution.""" - stored_states: Iterable["StoredState"] = dataclasses.field( + stored_states: Iterable[StoredState] = dataclasses.field( default_factory=frozenset, ) """Contents of a charm's stored state.""" @@ -1481,12 +1481,12 @@ def _update_status( # bypass frozen dataclass object.__setattr__(self, name, new_status) - def _update_opened_ports(self, new_ports: FrozenSet[Port]): + def _update_opened_ports(self, new_ports: frozenset[Port]): """Update the current opened ports.""" # bypass frozen dataclass object.__setattr__(self, "opened_ports", new_ports) - def _update_secrets(self, new_secrets: FrozenSet[Secret]): + def _update_secrets(self, new_secrets: frozenset[Secret]): """Update the current secrets.""" # bypass frozen dataclass object.__setattr__(self, "secrets", new_secrets) @@ -1508,8 +1508,8 @@ def get_network(self, binding_name: str, /) -> Network: def get_secret( self, *, - id: Optional[str] = None, - label: Optional[str] = None, + id: str | None = None, + label: str | None = None, ) -> Secret: """Get secret from this State, based on the secret's id or label.""" if id is None and label is None: @@ -1529,7 +1529,7 @@ def get_stored_state( stored_state: str, /, *, - owner_path: Optional[str] = None, + owner_path: str | None = None, ) -> StoredState: """Get stored state from this State, based on the stored state's name and owner_path.""" for ss in self.stored_states: @@ -1542,7 +1542,7 @@ def get_storage( storage: str, /, *, - index: Optional[int] = 0, + index: int | None = 0, ) -> Storage: """Get storage from this State, based on the storage's name and index.""" for state_storage in self.storages: @@ -1552,14 +1552,14 @@ def get_storage( f"storage: name={storage}, index={index} not found in the State", ) - def get_relation(self, relation: int, /) -> "RelationBase": + def get_relation(self, relation: int, /) -> RelationBase: """Get relation from this State, based on the relation's id.""" for state_relation in self.relations: if state_relation.id == relation: return state_relation raise KeyError(f"relation: id={relation} not found in the State") - def get_relations(self, endpoint: str) -> Tuple["RelationBase", ...]: + def get_relations(self, endpoint: str) -> tuple[RelationBase, ...]: """Get all relations on this endpoint from the current state.""" # we rather normalize the endpoint than worry about cursed metadata situations such as: @@ -1575,7 +1575,7 @@ def get_relations(self, endpoint: str) -> Tuple["RelationBase", ...]: ) -def _is_valid_charmcraft_25_metadata(meta: Dict[str, Any]): +def _is_valid_charmcraft_25_metadata(meta: dict[str, Any]): # Check whether this dict has the expected mandatory metadata fields according to the # charmcraft >2.5 charmcraft.yaml schema if (config_type := meta.get("type")) != "charm": @@ -1593,10 +1593,10 @@ def _is_valid_charmcraft_25_metadata(meta: Dict[str, Any]): class _CharmSpec(Generic[CharmType]): """Charm spec.""" - charm_type: Type[CharmBase] - meta: Dict[str, Any] - actions: Optional[Dict[str, Any]] = None - config: Optional[Dict[str, Any]] = None + charm_type: type[CharmBase] + meta: dict[str, Any] + actions: dict[str, Any] | None = None + config: dict[str, Any] | None = None # autoloaded means: we are running a 'real' charm class, living in some # /src/charm.py, and the metadata files are 'real' metadata files. @@ -1608,7 +1608,7 @@ def _load_metadata_legacy(charm_root: Path): # back in the days, we used to have separate metadata.yaml, config.yaml and actions.yaml # files for charm metadata. metadata_path = charm_root / "metadata.yaml" - meta: Dict[str, Any] = ( + meta: dict[str, Any] = ( yaml.safe_load(metadata_path.open()) if metadata_path.exists() else {} ) @@ -1623,7 +1623,7 @@ def _load_metadata_legacy(charm_root: Path): def _load_metadata(charm_root: Path): """Load metadata from charm projects created with Charmcraft >= 2.5.""" metadata_path = charm_root / "charmcraft.yaml" - meta: Dict[str, Any] = ( + meta: dict[str, Any] = ( yaml.safe_load(metadata_path.open()) if metadata_path.exists() else {} ) if not _is_valid_charmcraft_25_metadata(meta): @@ -1633,7 +1633,7 @@ def _load_metadata(charm_root: Path): return meta, config, actions @staticmethod - def autoload(charm_type: Type[CharmBase]) -> "_CharmSpec[CharmType]": + def autoload(charm_type: type[CharmBase]) -> _CharmSpec[CharmType]: """Construct a ``_CharmSpec`` object by looking up the metadata from the charm's repo root. Will attempt to load the metadata off the ``charmcraft.yaml`` file @@ -1664,7 +1664,7 @@ def autoload(charm_type: Type[CharmBase]) -> "_CharmSpec[CharmType]": is_autoloaded=True, ) - def get_all_relations(self) -> List[Tuple[str, Dict[str, str]]]: + def get_all_relations(self) -> list[tuple[str, dict[str, str]]]: """A list of all relation endpoints defined in the metadata.""" return list( chain( @@ -1692,7 +1692,7 @@ class DeferredEvent: observer: str # needs to be marshal.dumps-able. - snapshot_data: Dict[Any, Any] = dataclasses.field(default_factory=dict) + snapshot_data: dict[Any, Any] = dataclasses.field(default_factory=dict) # It would be nicer if people could do something like: # `isinstance(state.deferred[0], ops.StartEvent)` @@ -1719,7 +1719,7 @@ class _EventType(str, Enum): class _EventPath(str): if TYPE_CHECKING: # pragma: no cover name: str - owner_path: List[str] + owner_path: list[str] suffix: str prefix: str is_custom: bool @@ -1744,7 +1744,7 @@ def __new__(cls, string: str): return instance @staticmethod - def _get_suffix_and_type(s: str) -> Tuple[str, _EventType]: + def _get_suffix_and_type(s: str) -> tuple[str, _EventType]: for suffix in _RELATION_EVENTS_SUFFIX: if s.endswith(suffix): return suffix, _EventType.relation @@ -1789,35 +1789,35 @@ class _Event: # type: ignore """ path: str - args: Tuple[Any, ...] = () - kwargs: Dict[str, Any] = dataclasses.field(default_factory=dict) + args: tuple[Any, ...] = () + kwargs: dict[str, Any] = dataclasses.field(default_factory=dict) - storage: Optional["Storage"] = None + storage: Storage | None = None """If this is a storage event, the storage it refers to.""" - relation: Optional["RelationBase"] = None + relation: RelationBase | None = None """If this is a relation event, the relation it refers to.""" - relation_remote_unit_id: Optional[int] = None - relation_departed_unit_id: Optional[int] = None + relation_remote_unit_id: int | None = None + relation_departed_unit_id: int | None = None - secret: Optional[Secret] = None + secret: Secret | None = None """If this is a secret event, the secret it refers to.""" # if this is a secret-removed or secret-expired event, the secret revision it refers to - secret_revision: Optional[int] = None + secret_revision: int | None = None - container: Optional[Container] = None + container: Container | None = None """If this is a workload (container) event, the container it refers to.""" - notice: Optional[Notice] = None + notice: Notice | None = None """If this is a Pebble notice event, the notice it refers to.""" - check_info: Optional[CheckInfo] = None + check_info: CheckInfo | None = None """If this is a Pebble check event, the check info it provides.""" - action: Optional["_Action"] = None + action: _Action | None = None """If this is an action event, the :class:`Action` it refers to.""" - _owner_path: List[str] = dataclasses.field(default_factory=list) + _owner_path: list[str] = dataclasses.field(default_factory=list) def __post_init__(self): path = _EventPath(self.path) @@ -1843,7 +1843,7 @@ def name(self) -> str: return self._path.name @property - def owner_path(self) -> List[str]: + def owner_path(self) -> list[str]: """Path to the ObjectEvents instance owning this event. If this event is defined on the toplevel charm class, it should be ['on']. @@ -1877,7 +1877,7 @@ def _is_workload_event(self) -> bool: # this method is private because _CharmSpec is not quite user-facing; also, # the user should know. - def _is_builtin_event(self, charm_spec: "_CharmSpec[CharmType]") -> bool: + def _is_builtin_event(self, charm_spec: _CharmSpec[CharmType]) -> bool: """Determine whether the event is a custom-defined one or a builtin one.""" event_name = self.name @@ -1911,7 +1911,7 @@ def deferred(self, handler: Callable[..., Any], event_id: int = 1) -> DeferredEv # Many events have no snapshot data: install, start, stop, remove, config-changed, # upgrade-charm, pre-series-upgrade, post-series-upgrade, leader-elected, # leader-settings-changed, collect-metrics - snapshot_data: Dict[str, Any] = {} + snapshot_data: dict[str, Any] = {} # fixme: at this stage we can't determine if the event is a builtin one or not; if it is # not, then the coming checks are meaningless: the custom event could be named like a @@ -2033,7 +2033,7 @@ def test_backup_action(): name: str """Juju action name, as found in the charm metadata.""" - params: Mapping[str, "AnyJson"] = dataclasses.field(default_factory=dict) + params: Mapping[str, AnyJson] = dataclasses.field(default_factory=dict) """Parameter values passed to the action.""" id: str = dataclasses.field(default_factory=_next_action_id)