From 6e4cbee8efcac41a948cd7ac12630a1a9f38dcc0 Mon Sep 17 00:00:00 2001 From: Mehdi Bendriss Date: Wed, 3 Apr 2024 11:54:56 +0200 Subject: [PATCH] [DPE-2564] - Large deployments relations (#190) ## Issue This PR addresses [DPE-2564](https://warthogs.atlassian.net/browse/DPE-2564), namely, this PR addresses: - Implementation of Peer Cluster Relations - Orchestrators: - implementation of the `main` orchestrator logic - implementation of the `failover` orchestrator logic - implementation of the demotion / promotion of either - validation of relations - Management of `Main / Failover`-orchestrators vs regular clusters - propagation of errors from orchestrators to related clusters - various changes to make it work with the charm - changed the previous terminology from `main/failover-cluster-manager` to `main/failover-orchestrator` - fixes unit tests ## Implemented UX: ``` juju deploy tls-certificates-operator --channel stable --show-log --verbose juju config tls-certificates-operator generate-self-signed-certificates=true ca-common-name="CN_CA" # deploy main-orchestrator cluster juju deploy -n 3 ./opensearch.charm \ main \ --config cluster_name="log-app" --config init_hold=false --config roles="cluster_manager" # deploy failover-orchestrator cluster juju deploy -n 2 ./opensearch.charm \ failover \ --config cluster_name="log-app" --config init_hold=true --config roles="cluster_manager" # deploy data-hot cluster juju deploy -n 2 ./opensearch.charm \ data-hot \ --config cluster_name="log-app" --config init_hold=true --config roles="data.hot" # integrate TLS juju integrate tls-certificates-operator main juju integrate tls-certificates-operator failover juju integrate tls-certificates-operator data-hot # integrate the "main"-orchestrator with all clusters: juju integrate main:peer-cluster-orchestrator failover:peer-cluster juju integrate main:peer-cluster-orchestrator data-hot:peer-cluster # integrate the "failover"-orchestrator with rest of clusters: juju integrate failover:peer-cluster-orchestrator data-hot:peer-cluster # trigger the promotion of the "failover" to "main" orchestrator juju remove-relation main:peer-cluster-orchestrator failover:peer-cluster juju remove-relation main:peer-cluster-orchestrator data-hot:peer-cluster # have the "old" main orchestrator rejoins the relation and becomes a "failover" juju integrate failover:peer-cluster-orchestrator main:peer-cluster juju integrate main:peer-cluster-orchestrator data-hot:peer-cluster ``` ### Next steps: - Integ. tests + unit tests in other PR - Use secrets to pass credentials between the main orchestrator and the rest of the clusters - Externalize error messages in `constants-charm.py` - delay the initialization of the security index until the first data node joins the cluster [DPE-2564]: https://warthogs.atlassian.net/browse/DPE-2564?atlOrigin=eyJpIjoiNWRkNTljNzYxNjVmNDY3MDlhMDU5Y2ZhYzA5YTRkZjUiLCJwIjoiZ2l0aHViLWNvbS1KU1cifQ --- lib/charms/opensearch/v0/constants_charm.py | 3 + lib/charms/opensearch/v0/helper_charm.py | 44 + lib/charms/opensearch/v0/models.py | 91 ++- .../opensearch/v0/opensearch_base_charm.py | 283 ++++++- lib/charms/opensearch/v0/opensearch_config.py | 10 + lib/charms/opensearch/v0/opensearch_health.py | 17 + .../opensearch/v0/opensearch_peer_clusters.py | 173 +++- .../v0/opensearch_relation_peer_cluster.py | 768 +++++++++++++++++- lib/charms/opensearch/v0/opensearch_tls.py | 33 +- metadata.yaml | 7 + tests/integration/test_charm.py | 21 +- tests/unit/lib/test_opensearch_base_charm.py | 45 +- .../unit/lib/test_opensearch_internal_data.py | 34 +- .../unit/lib/test_opensearch_peer_clusters.py | 22 +- tests/unit/lib/test_opensearch_tls.py | 56 +- 15 files changed, 1454 insertions(+), 153 deletions(-) diff --git a/lib/charms/opensearch/v0/constants_charm.py b/lib/charms/opensearch/v0/constants_charm.py index c54088683..23f698051 100644 --- a/lib/charms/opensearch/v0/constants_charm.py +++ b/lib/charms/opensearch/v0/constants_charm.py @@ -27,7 +27,9 @@ ServiceStopped = "The OpenSearch service stopped." ServiceStopFailed = "An error occurred while attempting to stop the OpenSearch service." ServiceIsStopping = "The OpenSearch service is stopping." +AdminUserNotConfigured = "Waiting for the admin user to be fully configured..." TLSNotFullyConfigured = "Waiting for TLS to be fully configured..." +TLSRelationMissing = "Missing TLS relation with this cluster." TLSRelationBrokenError = ( "Relation broken with the TLS Operator while TLS not fully configured. Stopping OpenSearch." ) @@ -82,6 +84,7 @@ # Relation Interfaces ClientRelationName = "opensearch-client" PeerRelationName = "opensearch-peers" +PeerClusterOrchestratorRelationName = "peer-cluster-orchestrator" PeerClusterRelationName = "peer-cluster" COSUser = "monitor" COSRelationName = "cos-agent" diff --git a/lib/charms/opensearch/v0/helper_charm.py b/lib/charms/opensearch/v0/helper_charm.py index 78849545c..d8d3385e4 100644 --- a/lib/charms/opensearch/v0/helper_charm.py +++ b/lib/charms/opensearch/v0/helper_charm.py @@ -3,8 +3,12 @@ """Utility functions for charms related operations.""" import re +from datetime import datetime +from charms.data_platform_libs.v0.data_interfaces import Scope +from charms.opensearch.v0.constants_charm import PeerRelationName from charms.opensearch.v0.helper_enums import BaseStrEnum +from ops import CharmBase from ops.model import ActiveStatus, StatusBase # The unique Charmhub library identifier, never change it @@ -68,3 +72,43 @@ def set(self, status: StatusBase, app: bool = False): return context.status = status + + +class RelDepartureReason(BaseStrEnum): + """Enum depicting the 3 various causes of a Relation Departed event.""" + + APP_REMOVAL = "app-removal" + SCALE_DOWN = "scale-down" + REL_BROKEN = "rel-broken" + + +def relation_departure_reason(charm: CharmBase, relation_name: str) -> RelDepartureReason: + """Compute the reason behind a relation departed event.""" + # fetch relation info + goal_state = charm.model._backend._run("goal-state", return_output=True, use_json=True) + rel_info = goal_state["relations"][relation_name] + + # check dying units + dying_units = [ + unit_data["status"] == "dying" + for unit, unit_data in rel_info.items() + if unit != relation_name + ] + + # check if app removal + if all(dying_units): + return RelDepartureReason.APP_REMOVAL + + if any(dying_units): + return RelDepartureReason.SCALE_DOWN + + return RelDepartureReason.REL_BROKEN + + +def trigger_leader_peer_rel_changed(charm: CharmBase) -> None: + """Force trigger a peer rel changed event by leader.""" + if not charm.unit.is_leader(): + return + + charm.peers_data.put(Scope.APP, "triggered", datetime.now().timestamp()) + charm.on[PeerRelationName].relation_changed.emit(charm.model.get_relation(PeerRelationName)) diff --git a/lib/charms/opensearch/v0/models.py b/lib/charms/opensearch/v0/models.py index 62903f799..0b6bf61f0 100644 --- a/lib/charms/opensearch/v0/models.py +++ b/lib/charms/opensearch/v0/models.py @@ -3,7 +3,8 @@ """Cluster-related data structures / model classes.""" from abc import ABC -from typing import Any, Dict, List, Optional +from datetime import datetime +from typing import Any, Dict, List, Literal, Optional from charms.opensearch.v0.helper_enums import BaseStrEnum from pydantic import BaseModel, Field, root_validator, validator @@ -31,8 +32,10 @@ def to_dict(self) -> Dict[str, Any]: return self.dict() @classmethod - def from_dict(cls, input_dict: Dict[str, Any]): + def from_dict(cls, input_dict: Optional[Dict[str, Any]]): """Create a new instance of this class from a json/dict repr.""" + if not input_dict: # to handle when classes defined defaults + return cls() return cls(**input_dict) @classmethod @@ -91,8 +94,8 @@ def is_data(self): class DeploymentType(BaseStrEnum): """Nature of a sub cluster deployment.""" - MAIN_CLUSTER_MANAGER = "main-cluster-manager" - CLUSTER_MANAGER_FAILOVER = "cluster-manager-failover" + MAIN_ORCHESTRATOR = "main-orchestrator" + FAILOVER_ORCHESTRATOR = "failover-orchestrator" OTHER = "other" @@ -141,22 +144,6 @@ def prevent_none(cls, values): # noqa: N805 return values -class PeerClusterRelDataCredentials(Model): - """Model class for credentials passed on the PCluster relation.""" - - admin_username: str - admin_password: str - - -class PeerClusterRelData(Model): - """Model class for the PCluster relation data.""" - - cluster_name: Optional[str] - cm_nodes: List[str] - credentials: PeerClusterRelDataCredentials - tls_ca: str - - class PeerClusterConfig(Model): """Model class for the multi-clusters related config set by the user.""" @@ -201,4 +188,68 @@ class DeploymentDescription(Model): start: StartMode pending_directives: List[Directive] typ: DeploymentType + app: str state: DeploymentState = DeploymentState(value=State.ACTIVE) + promotion_time: Optional[float] + + @root_validator + def set_promotion_time(cls, values): # noqa: N805 + """Set promotion time of a failover to a main CM.""" + if values["typ"] == DeploymentType.MAIN_ORCHESTRATOR: + values["promotion_time"] = datetime.now().timestamp() + + return values + + +class PeerClusterRelDataCredentials(Model): + """Model class for credentials passed on the PCluster relation.""" + + admin_username: str + admin_password: str + admin_password_hash: str + admin_tls: Dict[str, Optional[str]] + + +class PeerClusterRelData(Model): + """Model class for the PCluster relation data.""" + + cluster_name: str + cm_nodes: List[Node] + credentials: PeerClusterRelDataCredentials + deployment_desc: Optional[DeploymentDescription] + + +class PeerClusterRelErrorData(Model): + """Model class for the PCluster relation data.""" + + cluster_name: Optional[str] + should_sever_relation: bool + should_wait: bool + blocked_message: str + deployment_desc: Optional[DeploymentDescription] + + +class PeerClusterOrchestrators(Model): + """Model class for the PClusters registered main/failover clusters.""" + + _TYPES = Literal["main", "failover"] + + main_rel_id: int = -1 + main_app: Optional[str] + failover_rel_id: int = -1 + failover_app: Optional[str] + + def delete(self, typ: _TYPES) -> None: + """Delete an orchestrator from the current pair.""" + if typ == "main": + self.main_rel_id = -1 + self.main_app = None + else: + self.failover_rel_id = -1 + self.failover_app = None + + def promote_failover(self) -> None: + """Delete previous main orchestrator and promote failover if any.""" + self.main_app = self.failover_app + self.main_rel_id = self.failover_rel_id + self.delete("failover") diff --git a/lib/charms/opensearch/v0/opensearch_base_charm.py b/lib/charms/opensearch/v0/opensearch_base_charm.py index d47f033e5..92635dcaf 100644 --- a/lib/charms/opensearch/v0/opensearch_base_charm.py +++ b/lib/charms/opensearch/v0/opensearch_base_charm.py @@ -11,6 +11,7 @@ from charms.grafana_agent.v0.cos_agent import COSAgentProvider from charms.opensearch.v0.constants_charm import ( AdminUserInitProgress, + AdminUserNotConfigured, CertsExpirationError, ClientRelationName, ClusterHealthRed, @@ -30,6 +31,7 @@ TLSNewCertsRequested, TLSNotFullyConfigured, TLSRelationBrokenError, + TLSRelationMissing, WaitingToStart, ) from charms.opensearch.v0.constants_secrets import ADMIN_PW, ADMIN_PW_HASH @@ -48,6 +50,7 @@ generate_hashed_password, generate_password, ) +from charms.opensearch.v0.models import DeploymentDescription, DeploymentType from charms.opensearch.v0.opensearch_backups import OpenSearchBackup from charms.opensearch.v0.opensearch_config import OpenSearchConfig from charms.opensearch.v0.opensearch_distro import OpenSearchDistribution @@ -76,6 +79,10 @@ ) from charms.opensearch.v0.opensearch_plugin_manager import OpenSearchPluginManager from charms.opensearch.v0.opensearch_plugins import OpenSearchPluginError +from charms.opensearch.v0.opensearch_relation_peer_cluster import ( + OpenSearchPeerClusterProvider, + OpenSearchPeerClusterRequirer, +) from charms.opensearch.v0.opensearch_relation_provider import OpenSearchProvider from charms.opensearch.v0.opensearch_secrets import OpenSearchSecrets from charms.opensearch.v0.opensearch_tls import OpenSearchTLS @@ -100,6 +107,7 @@ ) from ops.framework import EventBase from ops.model import BlockedStatus, MaintenanceStatus, WaitingStatus +from tenacity import RetryError, Retrying, stop_after_attempt, wait_fixed # The unique Charmhub library identifier, never change it LIBID = "cba015bae34642baa1b6bb27bb35a2f7" @@ -133,6 +141,7 @@ def __init__(self, *args, distro: Type[OpenSearchDistribution] = None): self.opensearch_config = OpenSearchConfig(self.opensearch) self.opensearch_exclusions = OpenSearchExclusions(self) self.opensearch_fixes = OpenSearchFixes(self) + self.peers_data = RelationDataStore(self, PeerRelationName) self.secrets = OpenSearchSecrets(self, PeerRelationName) self.tls = OpenSearchTLS(self, TLS_RELATION) @@ -157,6 +166,8 @@ def __init__(self, *args, distro: Type[OpenSearchDistribution] = None): ) self.user_manager = OpenSearchUserManager(self) self.opensearch_provider = OpenSearchProvider(self) + self.peer_cluster_provider = OpenSearchPeerClusterProvider(self) + self.peer_cluster_requirer = OpenSearchPeerClusterRequirer(self) self.framework.observe(self.on.leader_elected, self._on_leader_elected) self.framework.observe(self.on.start, self._on_start) @@ -198,13 +209,20 @@ def _on_leader_elected(self, event: LeaderElectedEvent): # TODO: check if cluster can start independently - if not self.peers_data.get(Scope.APP, "admin_user_initialized"): - self.status.set(MaintenanceStatus(AdminUserInitProgress)) - # User config is currently in a default state, which contains multiple insecure default # users. Purge the user list before initialising the users the charm requires. self._purge_users() + if not (deployment_desc := self.opensearch_peer_cm.deployment_desc()): + event.defer() + return + + if deployment_desc.typ != DeploymentType.MAIN_ORCHESTRATOR: + return + + if not self.peers_data.get(Scope.APP, "admin_user_initialized"): + self.status.set(MaintenanceStatus(AdminUserInitProgress)) + # this is in case we're coming from 0 to N units, we don't want to use the rest api self._put_admin_user() @@ -224,18 +242,30 @@ def _on_start(self, event: StartEvent): return - if not self._is_tls_fully_configured(): - self.status.set(BlockedStatus(TLSNotFullyConfigured)) + # apply the directives computed and emitted by the peer cluster manager + if not self._apply_peer_cm_directives_and_check_if_can_start(): event.defer() return - self.status.clear(TLSNotFullyConfigured) - - # apply the directives computed and emitted by the peer cluster manager - if not self._apply_peer_cm_directives_and_start(): + if not self.is_admin_user_configured() or not self.is_tls_fully_configured(): + if not self.model.get_relation("certificates"): + status = BlockedStatus(TLSRelationMissing) + else: + status = MaintenanceStatus( + TLSNotFullyConfigured + if self.is_admin_user_configured() + else AdminUserNotConfigured + ) + self.status.set(status) event.defer() return + self.status.clear(AdminUserNotConfigured) + self.status.clear(TLSNotFullyConfigured) + self.status.clear(TLSRelationMissing) + + self.peers_data.put(Scope.UNIT, "tls_configured", True) + # configure clients auth self.opensearch_config.set_client_auth() @@ -243,7 +273,7 @@ def _on_start(self, event: StartEvent): self.status.set(WaitingStatus(RequestUnitServiceOps.format("start"))) self.on[self.service_manager.name].acquire_lock.emit(callback_override="_start_opensearch") - def _apply_peer_cm_directives_and_start(self) -> bool: + def _apply_peer_cm_directives_and_check_if_can_start(self) -> bool: """Apply the directives computed by the opensearch peer cluster manager.""" if not (deployment_desc := self.opensearch_peer_cm.deployment_desc()): # the deployment description hasn't finished being computed by the leader @@ -260,11 +290,6 @@ def _apply_peer_cm_directives_and_start(self) -> bool: self.unit.status = BlockedStatus(str(e)) return False - # request the start of OpenSearch - self.status.set(WaitingStatus(RequestUnitServiceOps.format("start"))) - self.on[self.service_manager.name].acquire_lock.emit( - callback_override="_start_opensearch" - ) return True if self.unit.is_leader(): @@ -335,6 +360,12 @@ def _on_peer_relation_changed(self, event: RelationChangedEvent): # register new cm addresses on every node self._add_cm_addresses_to_conf() + # TODO remove the data role of the first CM to start if applies needed + # we no longer need this once we delay the security index init to *after* the + # first data node joins + # if self._remove_data_role_from_dedicated_cm_if_needed(event): + # return + app_data = event.relation.data.get(event.app) if self.unit.is_leader(): # Recompute the node roles in case self-healing didn't trigger leader related event @@ -443,7 +474,7 @@ def _on_update_status(self, event: UpdateStatusEvent): self.opensearch_exclusions.cleanup() health = self.health.apply() - if health != HealthColors.GREEN: + if health not in [HealthColors.GREEN, HealthColors.IGNORE]: event.defer() if health == HealthColors.UNKNOWN: @@ -475,10 +506,14 @@ def _on_config_changed(self, event: ConfigChangedEvent): self.model.get_relation(PeerRelationName) ) + previous_deployment_desc = self.opensearch_peer_cm.deployment_desc() if self.unit.is_leader(): # run peer cluster manager processing self.opensearch_peer_cm.run() - elif not self.opensearch_peer_cm.deployment_desc(): + + # handle cluster change to main-orchestrator (i.e: init_hold: true -> false) + self._handle_change_to_main_orchestrator_if_needed(event, previous_deployment_desc) + elif not previous_deployment_desc: # deployment desc not initialized yet by leader event.defer() return @@ -506,6 +541,10 @@ def _on_config_changed(self, event: ConfigChangedEvent): def _on_set_password_action(self, event: ActionEvent): """Set new admin password from user input or generate if not passed.""" + if self.opensearch_peer_cm.deployment_desc().typ != DeploymentType.MAIN_ORCHESTRATOR: + event.fail("The action can be run only on the leader unit of the main cluster.") + return + if not self.unit.is_leader(): event.fail("The action can be run only on leader unit.") return @@ -531,8 +570,12 @@ def _on_get_password_action(self, event: ActionEvent): event.fail(f"Only the 'admin' and {COSUser} username is allowed for this action.") return - if not self._is_tls_fully_configured(): - event.fail(f"{user_name} user or TLS certificates not configured yet.") + if not self.is_admin_user_configured(): + event.fail(f"{user_name} user not configured yet.") + return + + if not self.is_tls_fully_configured(): + event.fail("TLS certificates not configured yet.") return password = self.secrets.get(Scope.APP, self.secrets.password_key(user_name)) @@ -571,25 +614,22 @@ def on_tls_conf_set( self.opensearch_config.set_admin_tls_conf(current_secrets) # In case of renewal of the unit transport layer cert - restart opensearch - if renewal and self._is_tls_fully_configured(): + if renewal and self.is_admin_user_configured() and self.is_tls_fully_configured(): self.on[self.service_manager.name].acquire_lock.emit( callback_override="_restart_opensearch" ) def on_tls_relation_broken(self, _: RelationBrokenEvent): """As long as all certificates are produced, we don't do anything.""" - if self._is_tls_fully_configured(): + if self.is_tls_fully_configured(): return # Otherwise, we block. self.status.set(BlockedStatus(TLSRelationBrokenError)) - def _is_tls_fully_configured(self) -> bool: - """Check if TLS fully configured meaning the admin user configured & 3 certs present.""" + def is_tls_fully_configured(self) -> bool: + """Check if TLS fully configured meaning the 3 certificates are present.""" # In case the initialisation of the admin user is not finished yet - if not self.peers_data.get(Scope.APP, "admin_user_initialized"): - return False - admin_secrets = self.secrets.get_object(Scope.APP, CertType.APP_ADMIN.val) if not admin_secrets or not admin_secrets.get("cert") or not admin_secrets.get("chain"): return False @@ -602,16 +642,70 @@ def _is_tls_fully_configured(self) -> bool: if not unit_http_secrets or not unit_http_secrets.get("cert"): return False - return self._are_all_tls_resources_stored() + stored = self._are_all_tls_resources_stored() + if stored: + self.peers_data.put(Scope.UNIT, "tls_configured", True) + + return stored + + def is_every_unit_marked_as_started(self) -> bool: + """Check if every unit in the cluster is marked as started.""" + rel = self.model.get_relation(PeerRelationName) + for unit in rel.units.union({self.unit}): + if rel.data[unit].get("started") != "True": + return False + return True + + def is_tls_full_configured_in_cluster(self) -> bool: + """Check if TLS is configured in all the units of the current cluster.""" + rel = self.model.get_relation(PeerRelationName) + for unit in rel.units.union({self.unit}): + if rel.data[unit].get("tls_configured") != "True": + return False + return True + + def is_admin_user_configured(self) -> bool: + """Check if admin user configured.""" + # In case the initialisation of the admin user is not finished yet + return self.peers_data.get(Scope.APP, "admin_user_initialized", False) + + def _handle_change_to_main_orchestrator_if_needed( + self, event: ConfigChangedEvent, previous_deployment_desc: Optional[DeploymentDescription] + ) -> None: + """Handle when the user changes the roles or init_hold config from True to False.""" + # if the current cluster wasn't already a "main-Orchestrator" and we're now updating + # the roles for it to become one. We need to: create the admin user if missing, and + # generate the admin certificate if missing and the TLS relation is established. + cluster_changed_to_main_cm = ( + previous_deployment_desc is not None + and previous_deployment_desc.typ != DeploymentType.MAIN_ORCHESTRATOR + and self.opensearch_peer_cm.deployment_desc().typ == DeploymentType.MAIN_ORCHESTRATOR + ) + if not cluster_changed_to_main_cm: + return + + # we check if we need to create the admin user + if not self.is_admin_user_configured(): + self._put_admin_user() + + # we check if we need to generate the admin certificate if missing + if not self.is_tls_fully_configured(): + if not self.model.get_relation("certificates"): + event.defer() + return + + self.tls.request_new_admin_certificate() def _start_opensearch(self, event: EventBase) -> None: # noqa: C901 """Start OpenSearch, with a generated or passed conf, if all resources configured.""" + self.peers_data.delete(Scope.UNIT, "started") if self.opensearch.is_started(): try: - self._post_start_init() + self._post_start_init(event) except (OpenSearchHttpError, OpenSearchNotFullyReadyError): event.defer() return + if not self._can_service_start(): self.peers_data.delete(Scope.UNIT, "starting") event.defer() @@ -621,7 +715,9 @@ def _start_opensearch(self, event: EventBase) -> None: # noqa: C901 self.peers_data.delete(Scope.UNIT, "starting") event.defer() return + self.unit.status = WaitingStatus(WaitingToStart) + rel = self.model.get_relation(PeerRelationName) for unit in rel.units.union({self.unit}): if rel.data[unit].get("starting") == "True": @@ -633,6 +729,7 @@ def _start_opensearch(self, event: EventBase) -> None: # noqa: C901 try: # Retrieve the nodes of the cluster, needed to configure this node nodes = self._get_nodes(False) + # validate the roles prior to starting self.opensearch_peer_cm.validate_roles(nodes, on_new_unit=True) @@ -641,7 +738,7 @@ def _start_opensearch(self, event: EventBase) -> None: # noqa: C901 except OpenSearchHttpError: self.peers_data.delete(Scope.UNIT, "starting") event.defer() - self._post_start_init() + self._post_start_init(event) return except OpenSearchProvidedRolesException as e: logger.exception(e) @@ -657,7 +754,7 @@ def _start_opensearch(self, event: EventBase) -> None: # noqa: C901 or self.peers_data.get(Scope.APP, "security_index_initialised", False) ) ) - self._post_start_init() + self._post_start_init(event) except (OpenSearchStartTimeoutError, OpenSearchNotFullyReadyError): event.defer() except OpenSearchStartError as e: @@ -666,7 +763,7 @@ def _start_opensearch(self, event: EventBase) -> None: # noqa: C901 self.status.set(BlockedStatus(ServiceStartError)) event.defer() - def _post_start_init(self): + def _post_start_init(self, event: EventBase): """Initialization post OpenSearch start.""" # initialize the security index if needed (and certs written on disk etc.) if self.unit.is_leader() and not self.peers_data.get( @@ -691,6 +788,7 @@ def _post_start_init(self): # Remove the 'starting' flag on the unit self.peers_data.delete(Scope.UNIT, "starting") + self.peers_data.put(Scope.UNIT, "started", True) # apply post_start fixes to resolve start related upstream bugs self.opensearch_fixes.apply_on_start() @@ -704,12 +802,20 @@ def _post_start_init(self): # clear waiting to start status self.status.clear(WaitingToStart) + # update the peer cluster rel data with new IP in case of main cluster manager + if self.opensearch_peer_cm.deployment_desc().typ != DeploymentType.OTHER: + if self.opensearch_peer_cm.is_peer_cluster_orchestrator_relation_set(): + self.peer_cluster_provider.refresh_relation_data(event) + def _stop_opensearch(self) -> None: """Stop OpenSearch if possible.""" self.status.set(WaitingStatus(ServiceIsStopping)) - # 1. Add current node to the voting + alloc exclusions - self.opensearch_exclusions.add_current() + if self.opensearch.is_node_up(): + # 1. Add current node to the voting + alloc exclusions + self.opensearch_exclusions.add_current() + + # TODO: should block until all shards move addressed in PR DPE-2234 # 2. stop the service self.opensearch.stop() @@ -761,6 +867,58 @@ def _can_service_start(self) -> bool: return True + def _remove_data_role_from_dedicated_cm_if_needed( # noqa: C901 + self, event: EventBase + ) -> bool: + """Remove the data role from the first started CM node.""" + # TODO: this method should be deleted in favor of delaying the init of the sec. index + # until after a node with the "data" role joined the cluster. + deployment_desc = self.opensearch_peer_cm.deployment_desc() + if not deployment_desc or deployment_desc.typ != DeploymentType.MAIN_ORCHESTRATOR: + return False + + if not self.peers_data.get(Scope.UNIT, "remove-data-role", default=False): + return False + + try: + nodes = self._get_nodes(self.opensearch.is_node_up()) + except OpenSearchHttpError: + return False + + if len([node for node in nodes if node.is_data() and node.name != self.unit_name]) == 0: + event.defer() + return False + + if not self.is_every_unit_marked_as_started(): + return False + + self.peers_data.delete(Scope.UNIT, "remove-data-role") + self.opensearch_config.remove_temporary_data_role() + + # wait until data moves out completely + self.opensearch_exclusions.add_current() + + try: + for attempt in Retrying(stop=stop_after_attempt(3), wait=wait_fixed(0.5)): + with attempt: + resp = self.opensearch.request( + "GET", endpoint=f"/_cat/allocation/{self.unit_name}?format=json" + ) + for entry in resp: + if entry.get("node") == self.unit_name and entry.get("shards") != 0: + raise Exception + return True + except RetryError: + self.opensearch_exclusions.delete_current() + event.defer() + return False + + self.status.set(WaitingStatus(WaitingToStart)) + self.on[self.service_manager.name].acquire_lock.emit( + callback_override="_restart_opensearch" + ) + return True + def _purge_users(self): """Removes all users from internal_users yaml config. @@ -840,7 +998,7 @@ def _initialize_security_index(self, admin_secrets: Dict[str, any]) -> None: """ args = [ f"-cd {self.opensearch.paths.conf}/opensearch-security/", - f"-cn {self.app.name}-{self.model.name}", + f"-cn {self.opensearch_peer_cm.deployment_desc().config.cluster_name}", f"-h {self.unit_ip}", f"-cacert {self.opensearch.paths.certs}/root-ca.cert", f"-cert {self.opensearch.paths.certs}/{CertType.APP_ADMIN}.cert", @@ -865,7 +1023,15 @@ def _get_nodes(self, use_localhost: bool) -> List[Node]: ): return [] - return ClusterTopology.nodes(self.opensearch, use_localhost, self.alt_hosts) + # add CM nodes reported in the peer cluster relation if any + hosts = self.alt_hosts + if ( + self.opensearch_peer_cm.deployment_desc().typ != DeploymentType.MAIN_ORCHESTRATOR + and (peer_cm_rel_data := self.opensearch_peer_cm.rel_data()) is not None + ): + hosts.extend([node.ip for node in peer_cm_rel_data.cm_nodes]) + + return ClusterTopology.nodes(self.opensearch, use_localhost, hosts) def _set_node_conf(self, nodes: List[Node]) -> None: """Set the configuration of the current node / unit.""" @@ -881,6 +1047,18 @@ def _set_node_conf(self, nodes: List[Node]) -> None: deployment_desc := self.opensearch_peer_cm.deployment_desc() ).start == StartMode.WITH_PROVIDED_ROLES: computed_roles = deployment_desc.config.roles + + # This is the case where the 1st and main orchestrator to be deployed with no + # "data" role in the provided roles, we need to add the role to be able to create + # and store the security index + if ( + self.unit.is_leader() + and deployment_desc.typ == DeploymentType.MAIN_ORCHESTRATOR + and "data" not in computed_roles + and not self.peers_data.get(Scope.APP, "security_index_initialised", False) + ): + computed_roles.append("data") + self.peers_data.put(Scope.UNIT, "remove-data-role", True) else: computed_roles = ( update_conf.roles @@ -913,8 +1091,8 @@ def _set_node_conf(self, nodes: List[Node]) -> None: cluster_name=deployment_desc.config.cluster_name, unit_name=self.unit_name, roles=computed_roles, - cm_names=cm_names, - cm_ips=cm_ips, + cm_names=list(set(cm_names)), + cm_ips=list(set(cm_ips)), contribute_to_bootstrap=contribute_to_bootstrap, node_temperature=deployment_desc.config.data_temperature, ) @@ -1000,16 +1178,31 @@ def _compute_and_broadcast_updated_topology(self, current_nodes: List[Node]) -> app_name=self.app.name, nodes=current_nodes ) else: - updated_nodes = { - node.name: Node( + first_dedicated_cm_node = None + rel = self.model.get_relation(PeerRelationName) + for unit in rel.units.union({self.unit}): + if rel.data[unit].get("remove-data-role") == "True": + first_dedicated_cm_node = unit.name.replace("/", "-") + break + + updated_nodes = {} + for node in current_nodes: + roles = node.roles + temperature = node.temperature + + # only change the roles of the nodes of the current cluster + if node.app_name == self.app.name and node.name != first_dedicated_cm_node: + roles = deployment_desc.config.roles + temperature = deployment_desc.config.data_temperature + + updated_nodes[node.name] = Node( name=node.name, - roles=deployment_desc.config.roles, + roles=roles, ip=node.ip, - app_name=self.app.name, - temperature=deployment_desc.config.data_temperature, + app_name=node.app_name, + temperature=temperature, ) - for node in current_nodes - } + try: self.opensearch_peer_cm.validate_roles(current_nodes, on_new_unit=False) except OpenSearchProvidedRolesException as e: @@ -1074,7 +1267,7 @@ def _scrape_config(self) -> List[Dict]: "metrics_path": "/_prometheus/metrics", "static_configs": [{"targets": [f"{self.unit_ip}:{COSPort}"]}], "tls_config": {"ca": ca}, - "scheme": "https" if self._is_tls_fully_configured() else "http", + "scheme": "https" if self.is_tls_fully_configured() else "http", "basic_auth": {"username": f"{COSUser}", "password": f"{pwd}"}, } ] diff --git a/lib/charms/opensearch/v0/opensearch_config.py b/lib/charms/opensearch/v0/opensearch_config.py index 7dd2cc93a..eee1f1a12 100644 --- a/lib/charms/opensearch/v0/opensearch_config.py +++ b/lib/charms/opensearch/v0/opensearch_config.py @@ -176,6 +176,16 @@ def set_node( True, ) + def remove_temporary_data_role(self): + """Remove the data role that was added temporarily to the first dedicated CM node.""" + conf = self._opensearch.config.load(self.CONFIG_YML) + stored_roles = conf.get("node.roles", []) + + if "data" in stored_roles: + stored_roles.remove("data") + + self._opensearch.config.put(self.CONFIG_YML, "node.roles", stored_roles) + def add_seed_hosts(self, cm_ips: List[str]): """Add CM nodes ips / host names to the seed host list of this unit.""" cm_ips_hostnames = set(cm_ips) diff --git a/lib/charms/opensearch/v0/opensearch_health.py b/lib/charms/opensearch/v0/opensearch_health.py index 80ce01140..0513d9cb7 100644 --- a/lib/charms/opensearch/v0/opensearch_health.py +++ b/lib/charms/opensearch/v0/opensearch_health.py @@ -13,6 +13,7 @@ ) from charms.opensearch.v0.helper_charm import Status from charms.opensearch.v0.helper_cluster import ClusterState +from charms.opensearch.v0.models import StartMode from charms.opensearch.v0.opensearch_exceptions import OpenSearchHttpError from charms.opensearch.v0.opensearch_internal_data import Scope from ops.model import BlockedStatus, WaitingStatus @@ -39,6 +40,7 @@ class HealthColors: YELLOW_TEMP = "yellow-temp" RED = "red" UNKNOWN = "unknown" + IGNORE = "ignore" class OpenSearchHealth: @@ -61,6 +63,21 @@ def apply( if not status: return HealthColors.UNKNOWN + # the health depends on data nodes, for large deployments: an ML cluster + # may not be concerned about reporting or relying on the health of the + # data nodes in other clusters. We should therefore get this info from + # the deployment descriptor which has an overview of all the cluster + if not (deployment_desc := self._charm.opensearch_peer_cm.deployment_desc()): + return HealthColors.UNKNOWN + + # compute health only in clusters where data nodes exist + compute_health = ( + deployment_desc.start == StartMode.WITH_GENERATED_ROLES + or "data" in deployment_desc.config.roles + ) + if not compute_health: + return HealthColors.IGNORE + if app: self.apply_for_app(status) else: diff --git a/lib/charms/opensearch/v0/opensearch_peer_clusters.py b/lib/charms/opensearch/v0/opensearch_peer_clusters.py index eea07ac34..64741e5c3 100644 --- a/lib/charms/opensearch/v0/opensearch_peer_clusters.py +++ b/lib/charms/opensearch/v0/opensearch_peer_clusters.py @@ -3,6 +3,7 @@ """Class for Managing simple or large deployments and configuration related changes.""" import logging +from datetime import datetime from typing import TYPE_CHECKING, List, Optional import shortuuid @@ -14,7 +15,7 @@ PClusterWrongNodesCountForQuorum, PClusterWrongRelation, PClusterWrongRolesProvided, - PeerClusterRelationName, + PeerClusterOrchestratorRelationName, PeerRelationName, ) from charms.opensearch.v0.helper_cluster import ClusterTopology @@ -25,11 +26,14 @@ Directive, Node, PeerClusterConfig, + PeerClusterOrchestrators, + PeerClusterRelData, + PeerClusterRelErrorData, StartMode, State, ) from charms.opensearch.v0.opensearch_exceptions import OpenSearchError -from charms.opensearch.v0.opensearch_internal_data import RelationDataStore, Scope +from charms.opensearch.v0.opensearch_internal_data import Scope from ops import BlockedStatus # The unique Charmhub library identifier, never change it @@ -59,14 +63,11 @@ class OpenSearchPeerClustersManager: def __init__(self, charm: "OpenSearchBaseCharm"): self._charm = charm self._opensearch = charm.opensearch - self.peer_cluster_data = RelationDataStore(self._charm, PeerClusterRelationName) def run(self) -> None: """Init, or updates / recomputes current peer cluster related config if applies.""" user_config = self._user_config() - current_deployment_desc = self.deployment_desc() - - if not current_deployment_desc: + if not (current_deployment_desc := self.deployment_desc()): # new cluster deployment_desc = self._new_cluster_setup(user_config) self._charm.peers_data.put_object( @@ -98,6 +99,46 @@ def run(self) -> None: # TODO: once peer clusters relation implemented, we should apply all directives # + removing them from queue. We currently only apply the status. + def run_with_relation_data( + self, data: PeerClusterRelData, error_data: Optional[PeerClusterRelErrorData] = None + ) -> None: + """Update current peer cluster related config based on peer_cluster rel_data.""" + current_deployment_desc = self.deployment_desc() + + config = current_deployment_desc.config + deployment_state = current_deployment_desc.state + pending_directives = current_deployment_desc.pending_directives + + if Directive.WAIT_FOR_PEER_CLUSTER_RELATION in pending_directives: + pending_directives.remove(Directive.WAIT_FOR_PEER_CLUSTER_RELATION) + + if Directive.VALIDATE_CLUSTER_NAME in pending_directives: + if config.cluster_name != data.cluster_name: + deployment_state = DeploymentState( + value=State.BLOCKED_WRONG_RELATED_CLUSTER, message=PClusterWrongRelation + ) + elif deployment_state.value in [ + State.BLOCKED_WRONG_RELATED_CLUSTER, + State.BLOCKED_WAITING_FOR_RELATION, + ]: + deployment_state = DeploymentState(value=State.ACTIVE) + pending_directives.remove(Directive.VALIDATE_CLUSTER_NAME) + elif Directive.INHERIT_CLUSTER_NAME in pending_directives: + config.cluster_name = data.cluster_name + pending_directives.remove(Directive.INHERIT_CLUSTER_NAME) + + new_deployment_desc = DeploymentDescription( + config=config, + pending_directives=pending_directives, + typ=current_deployment_desc.typ, + state=deployment_state, + app=self._charm.app.name, + start=current_deployment_desc.start, + ) + self._charm.peers_data.put_object( + Scope.APP, "deployment-description", new_deployment_desc.to_dict() + ) + def _user_config(self): """Build a user provided config object.""" return PeerClusterConfig( @@ -115,10 +156,7 @@ def _new_cluster_setup(self, config: PeerClusterConfig) -> DeploymentDescription deployment_state = DeploymentState(value=State.ACTIVE) if config.init_hold: # checks if peer cluster relation is set - if not ( - self.is_peer_cluster_relation_set() - and self.peer_cluster_data.get_object(Scope.APP, "data") - ): + if not self.is_peer_cluster_orchestrator_relation_set(): deployment_state = DeploymentState( value=State.BLOCKED_WAITING_FOR_RELATION, message=PClusterNoRelation ) @@ -139,6 +177,7 @@ def _new_cluster_setup(self, config: PeerClusterConfig) -> DeploymentDescription start=start_mode, pending_directives=directives, typ=self._deployment_type(config, start_mode), + app=self._charm.app.name, state=deployment_state, ) @@ -169,6 +208,7 @@ def _new_cluster_setup(self, config: PeerClusterConfig) -> DeploymentDescription start=start_mode, pending_directives=directives, typ=self._deployment_type(config, start_mode), + app=self._charm.app.name, state=deployment_state, ) @@ -182,6 +222,9 @@ def _existing_cluster_setup( self._pre_validate_roles_change( new_roles=config.roles, prev_roles=prev_deployment.config.roles ) + if prev_deployment.state.value == State.BLOCKED_CANNOT_APPLY_NEW_ROLES: + deployment_state = DeploymentState(value=State.ACTIVE, message="") + directives.append(Directive.SHOW_STATUS) # todo: should we further handle states here? except OpenSearchProvidedRolesException as e: logger.error(e) @@ -193,6 +236,15 @@ def _existing_cluster_setup( start_mode = ( StartMode.WITH_PROVIDED_ROLES if config.roles else StartMode.WITH_GENERATED_ROLES ) + if ( + not config.init_hold + and prev_deployment.state.value == State.BLOCKED_CANNOT_START_WITH_ROLES + and (start_mode == StartMode.WITH_GENERATED_ROLES or "cluster_manager" in config.roles) + ): + deployment_state = DeploymentState(value=State.ACTIVE, message="") + directives.append(Directive.SHOW_STATUS) + directives.remove(Directive.WAIT_FOR_PEER_CLUSTER_RELATION) + return DeploymentDescription( config=PeerClusterConfig( cluster_name=prev_deployment.config.cluster_name, @@ -203,13 +255,13 @@ def _existing_cluster_setup( start=start_mode, state=deployment_state, typ=self._deployment_type(config, start_mode), + app=self._charm.app.name, pending_directives=list(set(directives)), ) def can_start(self, deployment_desc: Optional[DeploymentDescription] = None) -> bool: """Return whether the service of a node can start.""" - deployment_desc = deployment_desc or self.deployment_desc() - if not deployment_desc: + if not (deployment_desc := deployment_desc or self.deployment_desc()): return False blocking_directives = [ @@ -228,8 +280,7 @@ def apply_status_if_needed( self, deployment_desc: Optional[DeploymentDescription] = None ) -> None: """Resolve and applies corresponding status from the deployment state.""" - deployment_desc = deployment_desc or self.deployment_desc() - if not deployment_desc: + if not (deployment_desc := deployment_desc or self.deployment_desc()): return if Directive.SHOW_STATUS not in deployment_desc.pending_directives: @@ -256,8 +307,7 @@ def apply_status_if_needed( def clear_directive(self, directive: Directive): """Remove directive after having applied it.""" - deployment_desc = self.deployment_desc() - if not deployment_desc: + if not (deployment_desc := self.deployment_desc()): return if directive not in deployment_desc.pending_directives: @@ -278,6 +328,34 @@ def deployment_desc(self) -> Optional[DeploymentDescription]: return DeploymentDescription.from_dict(current_deployment_desc) + def promote_to_main_orchestrator(self) -> None: + """Update the deployment type of the current deployment desc.""" + if not (deployment_desc := self.deployment_desc()): + return + + if deployment_desc.typ != DeploymentType.FAILOVER_ORCHESTRATOR: + return + + deployment_desc.typ = DeploymentType.MAIN_ORCHESTRATOR + deployment_desc.promotion_time = datetime.now().timestamp() + self._charm.peers_data.put_object( + Scope.APP, "deployment-description", deployment_desc.to_dict() + ) + + def demote_to_failover_orchestrator(self) -> None: + """Update the deployment type of the current deployment desc.""" + if not (deployment_desc := self.deployment_desc()): + return + + if deployment_desc.typ != DeploymentType.MAIN_ORCHESTRATOR: + return + + deployment_desc.typ = DeploymentType.FAILOVER_ORCHESTRATOR + deployment_desc.promotion_time = None + self._charm.peers_data.put_object( + Scope.APP, "deployment-description", deployment_desc.to_dict() + ) + def validate_roles(self, nodes: List[Node], on_new_unit: bool = False) -> None: """Validate full-cluster wide the quorum for CM/voting_only nodes on services start.""" deployment_desc = self.deployment_desc() @@ -290,7 +368,17 @@ def validate_roles(self, nodes: List[Node], on_new_unit: bool = False) -> None: return # validate the full-cluster wide count of cm+voting_only nodes to keep the quorum - current_cluster_planned_units = self._charm.app.planned_units() + full_cluster_planned_units = self._charm.app.planned_units() + if self.is_peer_cluster_orchestrator_relation_set(): + cluster_fleet_planned_units = self._charm.peers_data.get_object( + Scope.APP, "cluster_fleet_planned_units" + ) + if cluster_fleet_planned_units: + full_cluster_planned_units = sum( + [int(count) for count in cluster_fleet_planned_units.values()] + ) + + # current_cluster_planned_units = self._charm.app.planned_units() current_cluster_units = [ unit.name.replace("/", "-") for unit in self._charm.model.get_relation(PeerRelationName).units @@ -299,7 +387,7 @@ def validate_roles(self, nodes: List[Node], on_new_unit: bool = False) -> None: node for node in nodes if node.name in current_cluster_units ] - if len(current_cluster_online_nodes) < current_cluster_planned_units - 1: + if len(current_cluster_online_nodes) < full_cluster_planned_units - 1: # this is not the latest unit to be brought online, we can continue return @@ -312,9 +400,34 @@ def validate_roles(self, nodes: List[Node], on_new_unit: bool = False) -> None: raise OpenSearchProvidedRolesException(PClusterWrongNodesCountForQuorum) - def is_peer_cluster_relation_set(self): + def is_peer_cluster_orchestrator_relation_set(self) -> bool: """Return whether the peer cluster relation is established.""" - return PeerClusterRelationName in self._charm.model.relations + orchestrators = PeerClusterOrchestrators.from_dict( + self._charm.peers_data.get_object(Scope.APP, "orchestrators") or {} + ) + if orchestrators.main_rel_id == -1: + return False + + return ( + self._charm.model.get_relation( + PeerClusterOrchestratorRelationName, orchestrators.main_rel_id + ) + is not None + ) + + def rel_data(self) -> Optional[PeerClusterRelData]: + """Return the peer cluster rel data if any.""" + if not self.is_peer_cluster_orchestrator_relation_set(): + return None + + orchestrators = PeerClusterOrchestrators.from_dict( + self._charm.peers_data.get_object(Scope.APP, "orchestrators") + ) + + rel = self._charm.model.get_relation( + PeerClusterOrchestratorRelationName, orchestrators.main_rel_id + ) + return PeerClusterRelData.from_str(rel.data[rel.app].get("data")) def _pre_validate_roles_change(self, new_roles: List[str], prev_roles: List[str]): """Validate that the config changes of roles are allowed to happen.""" @@ -343,7 +456,7 @@ def _pre_validate_roles_change(self, new_roles: List[str], prev_roles: List[str] if "data" in prev_roles and "data" not in new_roles: # this is dangerous as this might induce downtime + error on start when data on disk # we need to check if there are other sub-clusters with the data roles - if not self.is_peer_cluster_relation_set(): + if not self.is_peer_cluster_orchestrator_relation_set(): raise OpenSearchProvidedRolesException(DataRoleRemovalForbidden) # todo guarantee unicity of unit names on peer_relation_joined @@ -365,14 +478,14 @@ def _pre_validate_roles_change(self, new_roles: List[str], prev_roles: List[str] @staticmethod def _deployment_type(config: PeerClusterConfig, start_mode: StartMode) -> DeploymentType: """Check if the current cluster is an independent cluster.""" - has_cm_roles = start_mode == StartMode.WITH_GENERATED_ROLES or ( - start_mode == StartMode.WITH_PROVIDED_ROLES and "cluster_manager" in config.roles + has_cm_roles = ( + start_mode == StartMode.WITH_GENERATED_ROLES or "cluster_manager" in config.roles ) + if not has_cm_roles: + return DeploymentType.OTHER - if has_cm_roles and not config.init_hold: - return DeploymentType.MAIN_CLUSTER_MANAGER - - if has_cm_roles and config.init_hold: - return DeploymentType.CLUSTER_MANAGER_FAILOVER - - return DeploymentType.OTHER + return ( + DeploymentType.MAIN_ORCHESTRATOR + if not config.init_hold + else DeploymentType.FAILOVER_ORCHESTRATOR + ) diff --git a/lib/charms/opensearch/v0/opensearch_relation_peer_cluster.py b/lib/charms/opensearch/v0/opensearch_relation_peer_cluster.py index 745457aba..b594d6a9c 100644 --- a/lib/charms/opensearch/v0/opensearch_relation_peer_cluster.py +++ b/lib/charms/opensearch/v0/opensearch_relation_peer_cluster.py @@ -2,7 +2,51 @@ # See LICENSE file for licensing details. """Peer clusters relation related classes for OpenSearch.""" -from ops import Object +import json +import logging +from typing import TYPE_CHECKING, Any, Dict, List, MutableMapping, Optional, Union + +from charms.opensearch.v0.constants_charm import ( + PeerClusterOrchestratorRelationName, + PeerClusterRelationName, +) +from charms.opensearch.v0.constants_secrets import ADMIN_PW, ADMIN_PW_HASH +from charms.opensearch.v0.constants_tls import CertType +from charms.opensearch.v0.helper_charm import ( + RelDepartureReason, + relation_departure_reason, +) +from charms.opensearch.v0.helper_cluster import ClusterTopology +from charms.opensearch.v0.models import ( + DeploymentDescription, + DeploymentType, + Node, + PeerClusterOrchestrators, + PeerClusterRelData, + PeerClusterRelDataCredentials, + PeerClusterRelErrorData, +) +from charms.opensearch.v0.opensearch_exceptions import OpenSearchHttpError +from charms.opensearch.v0.opensearch_internal_data import Scope +from ops import ( + BlockedStatus, + EventBase, + Object, + Relation, + RelationChangedEvent, + RelationDepartedEvent, + RelationEvent, + RelationJoinedEvent, + WaitingStatus, +) +from tenacity import RetryError, Retrying, stop_after_attempt, wait_fixed + +if TYPE_CHECKING: + from charms.opensearch.v0.opensearch_base_charm import OpenSearchBaseCharm + + +logger = logging.getLogger(__name__) + # The unique Charmhub library identifier, never change it LIBID = "5f54c024d6a2405f9c625cf832c302db" @@ -15,13 +59,727 @@ LIBPATCH = 1 -class OpenSearchPeerClusterProvider(Object): +class OpenSearchPeerClusterRelation(Object): + """Base class for Peer cluster relations.""" + + def __init__(self, charm: "OpenSearchBaseCharm", relation_name: str): + super().__init__(charm, relation_name) + self.relation_name = relation_name + self.charm = charm + self.peer_cm = charm.opensearch_peer_cm + + def get_from_rel( + self, key: str, rel_id: int = None, remote_app: bool = False + ) -> Optional[str]: + """Fetch relation data by key from relation id (from an int or relation event).""" + if not rel_id: + raise ValueError("Relation id must be provided as arguments.") + + relation = self.get_rel(rel_id=rel_id) + if relation: + return relation.data[relation.app if remote_app else self.charm.app].get(key) + + return None + + def get_obj_from_rel( + self, key: str, rel_id: int = None, remote_app: bool = True + ) -> Dict[Any, Any]: + """Get object from peer cluster relation data.""" + data = self.get_from_rel(key, rel_id=rel_id, remote_app=remote_app) or "{}" + return json.loads(data) + + def put_in_rel(self, data: Dict[str, Any], rel_id: Optional[int] = None) -> None: + """Put object in peer cluster rel data.""" + if not rel_id: + raise ValueError("Relation id must be provided as arguments.") + + relation = self.get_rel(rel_id=rel_id) + if relation: + relation.data[self.charm.app].update(data) + + def delete_from_rel( + self, key: str, event: Optional[RelationEvent] = None, rel_id: Optional[int] = None + ) -> None: + """Delete from peer cluster relation data by key.""" + if not event and not rel_id: + raise ValueError("Relation Event or relation id must be provided as arguments.") + + relation = self.get_rel(rel_id=rel_id if rel_id else event.relation.id) + if relation: + relation.data[self.charm.app].pop(key, None) + + def get_rel(self, rel_id: Optional[int]) -> Optional[Relation]: + """Retrieve the relation object assigned to this id.""" + return self.charm.model.get_relation(self.relation_name, relation_id=rel_id) + + +class OpenSearchPeerClusterProvider(OpenSearchPeerClusterRelation): """Peer cluster relation provider class.""" - pass + def __init__(self, charm: "OpenSearchBaseCharm"): + super().__init__(charm, PeerClusterOrchestratorRelationName) + self._opensearch = charm.opensearch + + self.framework.observe( + charm.on[self.relation_name].relation_joined, self._on_peer_cluster_relation_joined + ) + self.framework.observe( + charm.on[self.relation_name].relation_changed, self._on_peer_cluster_relation_changed + ) + self.framework.observe( + charm.on[self.relation_name].relation_departed, + self._on_peer_cluster_relation_departed, + ) + + def _on_peer_cluster_relation_joined(self, event: RelationJoinedEvent): + """Received by all units in main/failover clusters when new sub-cluster joins the rel.""" + if not self.charm.unit.is_leader(): + return + + self.refresh_relation_data(event) + + # TODO: is the below still needed + # self.charm.trigger_leader_peer_rel_changed() + + def _on_peer_cluster_relation_changed(self, event: RelationChangedEvent): + """Event received by all units in sub-cluster when a new sub-cluster joins the relation.""" + if not self.charm.unit.is_leader(): + return + + # the current app is not ready + if not (deployment_desc := self.peer_cm.deployment_desc()): + event.defer() + return + + # only the main-orchestrator is able to designate a failover + if deployment_desc.typ != DeploymentType.MAIN_ORCHESTRATOR: + return + + if not (data := event.relation.data.get(event.app)): + return + + # get list of relations with this orchestrator + target_relation_ids = [rel.id for rel in self.charm.model.relations[self.relation_name]] + + # fetch emitting app planned units and broadcast + self._put_planned_units( + event.app.name, json.loads(data.get("planned_units")), target_relation_ids + ) + + if not (candidate_failover_app := data.get("candidate_failover_orchestrator_app")): + self.refresh_relation_data(event) + return + + orchestrators = PeerClusterOrchestrators.from_dict( + self.charm.peers_data.get_object(Scope.APP, "orchestrators") + ) + if orchestrators.failover_app and orchestrators.failover_rel_id in target_relation_ids: + logger.info("A failover cluster orchestrator is already registered.") + self.refresh_relation_data(event) + return + + # register the new failover in the current main peer relation data + orchestrators.failover_app = candidate_failover_app + orchestrators.failover_rel_id = event.relation.id + self.charm.peers_data.put_object(Scope.APP, "orchestrators", orchestrators.to_dict()) + + # broadcast the new failover in all the cluster fleet + for rel_id in target_relation_ids: + orchestrators = PeerClusterOrchestrators.from_dict( + self.get_obj_from_rel("orchestrators", rel_id, remote_app=False) + ) + orchestrators.failover_app = candidate_failover_app + self.put_in_rel( + data={"orchestrators": json.dumps(orchestrators.to_dict())}, rel_id=rel_id + ) + + def _on_peer_cluster_relation_departed(self, event: RelationDepartedEvent) -> None: + """Event received by all units in sub-cluster when a sub-cluster leaves the relation.""" + if not self.charm.unit.is_leader(): + return + + # we need to update the fleet planned units + target_relation_ids = [rel.id for rel in self.charm.model.relations[self.relation_name]] + self._put_planned_units(event.app.name, 0, target_relation_ids) + def refresh_relation_data(self, event: EventBase) -> None: + """Refresh the peer cluster rel data (new cm node, admin password change etc.).""" + if not self.charm.unit.is_leader(): + return -class OpenSearchPeerClusterRequirer(Object): + # all relations with the current orchestrator + all_relation_ids = [rel.id for rel in self.charm.model.relations[self.relation_name]] + + # get deployment descriptor of current app + deployment_desc = self.charm.opensearch_peer_cm.deployment_desc() + + # fetch stored orchestrators + orchestrators = PeerClusterOrchestrators.from_dict( + self.charm.peers_data.get_object(Scope.APP, "orchestrators") + ) + + # compute the data that needs to be broadcast to all related clusters (success or error) + rel_data = self._rel_data(deployment_desc, orchestrators) + + # exit if current cluster should not have been considered a provider + if self._notify_if_wrong_integration(rel_data, all_relation_ids): + return + + # store the main/failover-cm planned units count + self._put_planned_units( + self.charm.app.name, self.charm.app.planned_units(), all_relation_ids + ) + + cluster_type = ( + "main" if deployment_desc.typ == DeploymentType.MAIN_ORCHESTRATOR else "failover" + ) + + # update reported orchestrators on local orchestrator + orchestrators = orchestrators.to_dict() + orchestrators[f"{cluster_type}_app"] = self.charm.app.name + self.charm.peers_data.put_object(Scope.APP, "orchestrators", orchestrators) + + peer_rel_data_key, should_defer = "data", False + if isinstance(rel_data, PeerClusterRelErrorData): + peer_rel_data_key, should_defer = "error_data", rel_data.should_wait + + # save the orchestrators of this fleet + for rel_id in all_relation_ids: + orchestrators = self.get_obj_from_rel("orchestrators", rel_id=rel_id) + orchestrators.update( + { + f"{cluster_type}_app": self.charm.app.name, + f"{cluster_type}_rel_id": rel_id, + } + ) + self.put_in_rel(data={"orchestrators": json.dumps(orchestrators)}, rel_id=rel_id) + + # there is no error to broadcast - we clear any previously broadcasted error + if isinstance(rel_data, PeerClusterRelData): + self.delete_from_rel("error_data", rel_id=rel_id) + + # are we potentially overriding stuff here? + self.put_in_rel( + data={peer_rel_data_key: json.dumps(rel_data.to_dict())}, rel_id=rel_id + ) + + if should_defer: + event.defer() + + def _notify_if_wrong_integration( + self, + rel_data: Union[PeerClusterRelData, PeerClusterRelErrorData], + target_relation_ids: List[int], + ) -> bool: + """Check if relation is invalid and notify related sub-clusters.""" + if not isinstance(rel_data, PeerClusterRelErrorData): + return False + + if not rel_data.should_sever_relation: + return False + + for rel_id in target_relation_ids: + self.put_in_rel(data={"error_data": json.dumps(rel_data.to_dict())}, rel_id=rel_id) + + return True + + def _put_planned_units(self, app: str, count: int, target_relation_ids: List[int]): + """Save in the peer cluster rel data the planned units count per app.""" + cluster_fleet_planned_units = ( + self.charm.peers_data.get_object(Scope.APP, "cluster_fleet_planned_units") or {} + ) + + # TODO: need to ensure unicity of app name for cross models + cluster_fleet_planned_units.update({app: count}) + cluster_fleet_planned_units.update({self.charm.app.name: self.charm.app.planned_units()}) + + for rel_id in target_relation_ids: + self.put_in_rel( + data={"cluster_fleet_planned_units": json.dumps(cluster_fleet_planned_units)}, + rel_id=rel_id, + ) + + self.charm.peers_data.put_object( + Scope.APP, "cluster_fleet_planned_units", cluster_fleet_planned_units + ) + + def _rel_data( + self, deployment_desc: DeploymentDescription, orchestrators: PeerClusterOrchestrators + ) -> Union[PeerClusterRelData, PeerClusterRelErrorData]: + """Build and return the peer cluster rel data to be shared with requirer sub-clusters.""" + if rel_err_data := self._rel_err_data(deployment_desc, orchestrators): + return rel_err_data + + # check that this cluster is fully ready, otherwise put "configuring" in + # peer rel data for requirers to show a blocked status until it's fully + # ready (will receive a subsequent + try: + secrets = self.charm.secrets + return PeerClusterRelData( + cluster_name=deployment_desc.config.cluster_name, + cm_nodes=self._fetch_local_cm_nodes(), + credentials=PeerClusterRelDataCredentials( + admin_username="admin", + admin_password=secrets.get(Scope.APP, ADMIN_PW), + admin_password_hash=secrets.get(Scope.APP, ADMIN_PW_HASH), + admin_tls=secrets.get_object(Scope.APP, CertType.APP_ADMIN.val), + ), + deployment_desc=deployment_desc, + ) + except OpenSearchHttpError: + return PeerClusterRelErrorData( + cluster_name=deployment_desc.config.cluster_name, + should_sever_relation=False, + should_wait=True, + blocked_message=f"Could not fetch nodes in related {deployment_desc.typ} sub-cluster.", + deployment_desc=deployment_desc, + ) + + def _rel_err_data( # noqa: C901 + self, deployment_desc: DeploymentDescription, orchestrators: PeerClusterOrchestrators + ) -> Optional[PeerClusterRelErrorData]: + """Build error peer relation data object.""" + should_sever_relation, blocked_msg = False, None + message_suffix = f"in related '{deployment_desc.typ}'" + + if not deployment_desc: + blocked_msg = "'main/failover'-orchestrators not configured yet." + elif deployment_desc.typ == DeploymentType.OTHER: + should_sever_relation = True + blocked_msg = "Related to non 'main/failover'-orchestrator cluster." + elif orchestrators.failover_app and orchestrators.failover_app != self.charm.app.name: + should_sever_relation = True + blocked_msg = ( + "Cannot have 2 'failover'-orchestrators. Relate to the existing failover." + ) + elif not self.charm.is_admin_user_configured(): + blocked_msg = f"Admin user not fully configured {message_suffix}." + elif not self.charm.is_tls_full_configured_in_cluster(): + blocked_msg = f"TLS not fully configured {message_suffix}." + elif not self.charm.peers_data.get(Scope.APP, "security_index_initialised", False): + blocked_msg = f"Security index not initialized {message_suffix}." + elif not self.charm.is_every_unit_marked_as_started(): + blocked_msg = f"Waiting for every unit {message_suffix} to start." + else: + try: + if not self._fetch_local_cm_nodes(): + blocked_msg = f"No 'cluster_manager' eligible nodes found {message_suffix}" + except OpenSearchHttpError as e: + logger.error(e) + blocked_msg = f"Could not fetch nodes {message_suffix}" + + if not blocked_msg: + return None + + return PeerClusterRelErrorData( + cluster_name=deployment_desc.config.cluster_name if deployment_desc else None, + should_sever_relation=should_sever_relation, + should_wait=not should_sever_relation, + blocked_message=blocked_msg, + deployment_desc=deployment_desc, + ) + + def _fetch_local_cm_nodes(self) -> List[Node]: + """Fetch the cluster_manager eligible node IPs in the current cluster.""" + nodes = ClusterTopology.nodes( + self._opensearch, + use_localhost=self._opensearch.is_node_up(), + hosts=self.charm.alt_hosts, + ) + return [ + node + for node in nodes + if node.is_cm_eligible() and node.app_name == self.charm.app.name + ] + + +class OpenSearchPeerClusterRequirer(OpenSearchPeerClusterRelation): """Peer cluster relation requirer class.""" - pass + def __init__(self, charm: "OpenSearchBaseCharm"): + super().__init__(charm, PeerClusterRelationName) + + self.framework.observe( + charm.on[self.relation_name].relation_joined, self._on_peer_cluster_relation_joined + ) + self.framework.observe( + charm.on[self.relation_name].relation_changed, self._on_peer_cluster_relation_changed + ) + self.framework.observe( + charm.on[self.relation_name].relation_departed, + self._on_peer_cluster_relation_departed, + ) + + def _on_peer_cluster_relation_joined(self, event: RelationJoinedEvent): + """Event received when a new main-failover cluster unit joins the fleet.""" + # self.charm.trigger_leader_peer_rel_changed() + pass + + def _on_peer_cluster_relation_changed(self, event: RelationChangedEvent): + """Peer cluster relation change hook. Crucial to capture changes from the provider side.""" + if not self.charm.unit.is_leader(): + return + + # register in the 'main/failover'-CMs / save the number of planned units of the current app + self._put_planned_units(event) + + # check if current cluster ready + if not (deployment_desc := self.charm.opensearch_peer_cm.deployment_desc()): + event.defer() + return + + if not (data := event.relation.data.get(event.app)): + return + + # fetch main and failover clusters relations ids if any + orchestrators = self._orchestrators(event, data, deployment_desc) + + # check errors sent by providers + if self._error_set_from_providers(orchestrators, data, event.relation.id): + return + + # fetch the success data + data = PeerClusterRelData.from_str(data["data"]) + + # check errors that can only be figured out from the requirer side + if self._error_set_from_requirer(orchestrators, deployment_desc, data, event.relation.id): + return + + # this means it's a previous "main orchestrator" that was unrelated then re-related + if deployment_desc.typ == DeploymentType.MAIN_ORCHESTRATOR: + self.charm.opensearch_peer_cm.demote_to_failover_orchestrator() + deployment_desc = self.charm.opensearch_peer_cm.deployment_desc() + + # broadcast that this cluster is a failover candidate, and let the main CM elect it or not + if deployment_desc.typ == DeploymentType.FAILOVER_ORCHESTRATOR: + self.put_in_rel( + data={"candidate_failover_orchestrator_app": self.charm.app.name}, + rel_id=event.relation.id, + ) + + # register main and failover cm app names if any + self.charm.peers_data.put_object(Scope.APP, "orchestrators", orchestrators.to_dict()) + + # store the security related settings in secrets, peer_data, disk + self._set_security_conf(data) + + # check if there are any security misconfigurations / violations + if self._error_set_from_tls(data): + event.defer() + return + + # aggregate all CMs (main + failover if any) + data.cm_nodes = self._cm_nodes(orchestrators) + + # recompute the deployment desc + self.charm.opensearch_peer_cm.run_with_relation_data(data) + + def _set_security_conf(self, data: PeerClusterRelData) -> None: + """Store security related config.""" + # set admin secrets + self.charm.secrets.put(Scope.APP, ADMIN_PW, data.credentials.admin_password) + self.charm.secrets.put(Scope.APP, ADMIN_PW_HASH, data.credentials.admin_password_hash) + self.charm.secrets.put_object( + Scope.APP, CertType.APP_ADMIN.val, data.credentials.admin_tls + ) + + # store the app admin TLS resources if not stored + self.charm.store_tls_resources(CertType.APP_ADMIN, data.credentials.admin_tls) + + # set user and security_index initialized flags + self.charm.peers_data.put(Scope.APP, "admin_user_initialized", True) + self.charm.peers_data.put(Scope.APP, "security_index_initialised", True) + + def _orchestrators( + self, + event: RelationChangedEvent, + data: MutableMapping[str, str], + deployment_desc: DeploymentDescription, + ) -> PeerClusterOrchestrators: + """Fetch related orchestrator IDs and App names.""" + orchestrators = self.get_obj_from_rel(key="orchestrators", rel_id=event.relation.id) + + # fetch the (main/failover)-cluster-orchestrator relations + cm_relations = dict( + [(rel.id, rel.app.name) for rel in self.model.relations[self.relation_name]] + ) + for rel_id, rel_app_name in cm_relations.items(): + orchestrators.update(self.get_obj_from_rel(key="orchestrators", rel_id=rel_id)) + + if not orchestrators: + orchestrators = json.loads(data["orchestrators"]) + + # handle case where the current is a designated failover + if deployment_desc.typ == DeploymentType.FAILOVER_ORCHESTRATOR: + local_orchestrators = PeerClusterOrchestrators.from_dict( + self.charm.peers_data.get_object(Scope.APP, "orchestrators") or {} + ) + if local_orchestrators.failover_app == self.charm.app.name: + orchestrators["failover_app"] = local_orchestrators.failover_app + + return PeerClusterOrchestrators.from_dict(orchestrators) + + def _put_planned_units(self, event: RelationEvent): + """Report self planned units and store the fleet's on the peer data bag.""" + # register the number of planned units in the current app, to notify the orchestrators + self.put_in_rel( + data={"planned_units": json.dumps(self.charm.app.planned_units())}, + rel_id=event.relation.id, + ) + + # self in the current app's peer databag + cluster_fleet_planned_units = self.get_obj_from_rel( + "cluster_fleet_planned_units", rel_id=event.relation.id + ) + cluster_fleet_planned_units.update({self.charm.app.name: self.charm.app.planned_units()}) + + self.charm.peers_data.put_object( + Scope.APP, "cluster_fleet_planned_units", cluster_fleet_planned_units + ) + + def _on_peer_cluster_relation_departed(self, event: RelationDepartedEvent): + """Handle when 'main/failover'-CMs leave the relation (app or relation removal).""" + if not self.charm.unit.is_leader(): + return + + # fetch registered orchestrators + orchestrators = PeerClusterOrchestrators.from_dict( + self.charm.peers_data.get_object(Scope.APP, "orchestrators") + ) + + # a cluster of type "other" is departing (wrong relation), or, the current is a main + # orchestrator and a failover is departing, we can safely ignore. + if event.relation.id not in [orchestrators.main_rel_id, orchestrators.failover_rel_id]: + self._clear_errors(f"error_from_requirer-{event.relation.id}") + return + + # handle scale-down at the charm level storage detaching, or?? + if ( + relation_departure_reason(self.charm, self.relation_name) + == RelDepartureReason.SCALE_DOWN + ): + return + + # fetch cluster manager nodes from API + across all peer clusters + cms = self._cm_nodes(orchestrators) + + # check the departed cluster which triggered this hook + event_src_cluster_type = ( + "main" if event.relation.id == orchestrators.main_rel_id else "failover" + ) + + # delete the orchestrator that triggered this event + orchestrators.delete(event_src_cluster_type) + + # the 'main' cluster orchestrator is the one being removed + failover_promoted = False + if event_src_cluster_type == "main": + if not orchestrators.failover_app: + self.charm.status.set( + BlockedStatus( + "Main-cluster-orchestrator removed, and no failover cluster related." + ) + ) + elif orchestrators.failover_app == self.charm.app.name: + self._promote_failover(orchestrators, cms) + failover_promoted = True + + self.charm.peers_data.put_object(Scope.APP, "orchestrators", orchestrators.to_dict()) + + # clear previously set errors due to this relation + self._clear_errors(f"error_from_provider-{event.relation.id}") + self._clear_errors(f"error_from_requirer-{event.relation.id}") + + # we leave in case not an orchestrator + if ( + self.charm.opensearch_peer_cm.deployment_desc().typ == DeploymentType.OTHER + or self.charm.app.name not in [orchestrators.main_app, orchestrators.failover_app] + ): + return + + # the current is an orchestrator, let's broadcast the new conf to all related apps + for rel_id in [ + rel.id for rel in self.charm.model.relations[PeerClusterOrchestratorRelationName] + ]: + rel_orchestrators = PeerClusterOrchestrators.from_dict( + self.get_obj_from_rel("orchestrators", rel_id, remote_app=False) + ) + + rel_orchestrators.delete(event_src_cluster_type) + if failover_promoted: + rel_orchestrators.promote_failover() + + self.put_in_rel( + data={"orchestrators": json.dumps(rel_orchestrators.to_dict())}, rel_id=rel_id + ) + + def _promote_failover(self, orchestrators: PeerClusterOrchestrators, cms: List[Node]) -> None: + """Handle the departure of the main orchestrator.""" + # current cluster is failover + self.charm.opensearch_peer_cm.promote_to_main_orchestrator() + + # ensuring quorum + main_cms = [cm for cm in cms if cm.app_name == orchestrators.main_app] + non_main_cms = [cm for cm in cms if cm not in main_cms] + if len(non_main_cms) % 2 == 0: + departure_reason = relation_departure_reason(self.charm, self.relation_name) + message = "Scale-up this application by an odd number of units{} to ensure quorum." + if len(main_cms) % 2 == 1 and departure_reason == RelDepartureReason.REL_BROKEN: + message = message.format( + f" and scale-'down/up' {orchestrators.main_app} by 1 unit" + ) + + self.charm.status.set(message) + + # remove old main and promote new failover + orchestrators.promote_failover() + + def _cm_nodes(self, orchestrators: PeerClusterOrchestrators) -> List[Node]: + """Fetch the cm nodes passed from the peer cluster relation not api call.""" + cm_nodes = {} + for rel_id in [orchestrators.main_rel_id, orchestrators.failover_rel_id]: + if rel_id == -1: + continue + + data = self.get_obj_from_rel(key="data", rel_id=rel_id) + if not data: # not ready yet + continue + + data = PeerClusterRelData.from_dict(data) + cm_nodes = {**cm_nodes, **{node.name: node for node in data.cm_nodes}} + + # attempt to have an opensearch reported list of CMs - the response + # may be smaller or greater than previous list. + try: + for attempt in Retrying(stop=stop_after_attempt(3), wait=wait_fixed(0.5)): + with attempt: + all_nodes = ClusterTopology.nodes( + self.charm.opensearch, + self.charm.opensearch.is_node_up(), + hosts=self.charm.alt_hosts + [node.ip for node in cm_nodes], + ) + cm_nodes = { + **cm_nodes, + **{node.name: node for node in all_nodes if node.is_cm_eligible()}, + } + except RetryError: + pass + + return list(cm_nodes.values()) + + def _error_set_from_providers( + self, + orchestrators: PeerClusterOrchestrators, + event_data: Optional[MutableMapping[str, Any]], + event_rel_id: int, + ) -> bool: + """Check if the providers are ready and set error if not.""" + orchestrator_rel_ids = [ + rel_id + for rel_id in [orchestrators.main_rel_id, orchestrators.failover_rel_id] + if rel_id != -1 + ] + + error = None + for rel_id in orchestrator_rel_ids: + data = self.get_obj_from_rel("data", rel_id=rel_id) + error_data = self.get_obj_from_rel("error_data", rel_id=rel_id) + if not data and not error_data: # relation data still incomplete + return True + + if error_data: + error = error_data + break + + # we handle the case where the error came from the provider of a wrong relation + if not error and "error_data" in (event_data or {}): + error = json.loads(event_data["error_data"]) + + if error: + self._set_error(f"error_from_providers-{event_rel_id}", error) + return True + + self._clear_errors(f"error_from_providers-{event_rel_id}") + return False + + def _error_set_from_requirer( + self, + orchestrators: PeerClusterOrchestrators, + deployment_desc: DeploymentDescription, + peer_cluster_rel_data: PeerClusterRelData, + event_rel_id: int, + ) -> bool: + """Fetch error when relation is wrong and can only be computed on the requirer side.""" + blocked_msg = None + if ( + deployment_desc.typ == DeploymentType.MAIN_ORCHESTRATOR + and deployment_desc.promotion_time + > peer_cluster_rel_data.deployment_desc.promotion_time + ): + blocked_msg = "Main cluster-orchestrator cannot be requirer of relation." + elif event_rel_id not in [orchestrators.main_rel_id, orchestrators.failover_rel_id]: + blocked_msg = ( + "A cluster can only be related to 1 main and 1 failover-clusters at most." + ) + + if not blocked_msg: + self._clear_errors(f"error_from_requirer-{event_rel_id}") + return False + + self._set_error( + label=f"error_from_requirer-{event_rel_id}", + error=PeerClusterRelErrorData( + cluster_name=peer_cluster_rel_data.cluster_name, + should_sever_relation=True, + should_wait=False, + blocked_message=blocked_msg, + deployment_desc=deployment_desc, + ).to_dict(), + ) + return True + + def _error_set_from_tls(self, peer_cluster_rel_data: PeerClusterRelData) -> bool: + """Compute TLS related errors.""" + blocked_msg, should_sever_relation = None, False + + if self.charm.is_tls_fully_configured(): # compare CAs + unit_transport_ca_cert = self.charm.secrets.get_object( + Scope.UNIT, CertType.UNIT_TRANSPORT.val + )["ca-cert"] + if unit_transport_ca_cert != peer_cluster_rel_data.credentials.admin_tls["ca-cert"]: + blocked_msg = "CA certificate mismatch between clusters." + should_sever_relation = True + + if not blocked_msg: + self._clear_errors("error_from_tls") + return False + + self._set_error( + label="error_from_tls", + error=PeerClusterRelErrorData( + cluster_name=peer_cluster_rel_data.cluster_name, + should_sever_relation=should_sever_relation, + should_wait=not should_sever_relation, + blocked_message=blocked_msg, + deployment_desc=self.peer_cm.deployment_desc(), + ).to_dict(), + ) + return True + + def _set_error(self, label: str, error: Optional[Dict[str, Any]]) -> None: + """Set error status from the passed errors and store for future deletion.""" + error = PeerClusterRelErrorData.from_dict(error) + err_message = error.blocked_message + self.charm.status.set( + WaitingStatus(err_message) if error.should_wait else BlockedStatus(err_message), + app=True, + ) + + # we should keep track of set messages for targeted deletion later + self.charm.peers_data.put(Scope.APP, label, err_message) + + def _clear_errors(self, *error_labels: str): + """Clear previously set Peer clusters related statuses.""" + for error_label in error_labels: + self.charm.status.clear(error_label) + self.charm.peers_data.delete(Scope.APP, error_label) diff --git a/lib/charms/opensearch/v0/opensearch_tls.py b/lib/charms/opensearch/v0/opensearch_tls.py index ac207369c..6e1bd5b28 100644 --- a/lib/charms/opensearch/v0/opensearch_tls.py +++ b/lib/charms/opensearch/v0/opensearch_tls.py @@ -21,6 +21,7 @@ from charms.opensearch.v0.constants_tls import TLS_RELATION, CertType from charms.opensearch.v0.helper_networking import get_host_public_ip +from charms.opensearch.v0.models import DeploymentType from charms.opensearch.v0.opensearch_exceptions import OpenSearchError from charms.opensearch.v0.opensearch_internal_data import Scope from charms.tls_certificates_interface.v3.tls_certificates import ( @@ -86,8 +87,17 @@ def _on_set_tls_private_key(self, event: ActionEvent) -> None: except ValueError as e: event.fail(str(e)) + def request_new_admin_certificate(self) -> None: + """Request the generation of a new admin certificate.""" + if not self.charm.unit.is_leader(): + return + + self._request_certificate(Scope.APP, CertType.APP_ADMIN) + def request_new_unit_certificates(self) -> None: """Requests a new certificate with the given scope and type from the tls operator.""" + self.charm.peers_data.delete(Scope.UNIT, "tls_configured") + for cert_type in [CertType.UNIT_HTTP, CertType.UNIT_TRANSPORT]: csr = self.charm.secrets.get_object(Scope.UNIT, cert_type.val)["csr"].encode("utf-8") self.certs.request_certificate_revocation(csr) @@ -98,10 +108,18 @@ def request_new_unit_certificates(self) -> None: secrets = self.charm.secrets.get_object(Scope.UNIT, cert_type.val) self._request_certificate_renewal(Scope.UNIT, cert_type, secrets) - def _on_tls_relation_joined(self, _: RelationJoinedEvent) -> None: + def _on_tls_relation_joined(self, event: RelationJoinedEvent) -> None: """Request certificate when TLS relation joined.""" + if not (deployment_desc := self.charm.opensearch_peer_cm.deployment_desc()): + event.defer() + return + admin_cert = self.charm.secrets.get_object(Scope.APP, CertType.APP_ADMIN.val) - if self.charm.unit.is_leader() and admin_cert is None: + if ( + self.charm.unit.is_leader() + and admin_cert is None + and deployment_desc.typ == DeploymentType.MAIN_ORCHESTRATOR + ): self._request_certificate(Scope.APP, CertType.APP_ADMIN) self._request_certificate(Scope.UNIT, CertType.UNIT_TRANSPORT) @@ -149,11 +167,12 @@ def _on_certificate_available(self, event: CertificateAvailableEvent) -> None: try: self.charm.on_tls_conf_set(event, scope, cert_type, renewal) except OpenSearchError as e: - logger.error(e) + logger.exception(e) event.defer() def _on_certificate_expiring(self, event: CertificateExpiringEvent) -> None: """Request the new certificate when old certificate is expiring.""" + self.charm.peers_data.delete(Scope.UNIT, "tls_configured") try: scope, cert_type, secrets = self._find_secret(event.certificate, "cert") logger.debug(f"{scope.val}.{cert_type.val} TLS certificate expiring.") @@ -180,12 +199,13 @@ def _request_certificate( password = password.encode("utf-8") subject = self._get_subject(cert_type) + organization = self.charm.opensearch_peer_cm.deployment_desc().config.cluster_name csr = generate_csr( add_unique_id_to_subject_name=False, private_key=key, private_key_password=password, subject=subject, - organization=self.charm.app.name, + organization=organization, **self._get_sans(cert_type), ) @@ -196,7 +216,7 @@ def _request_certificate( "key": key.decode("utf-8"), "key-password": password, "csr": csr.decode("utf-8"), - "subject": f"/O={self.charm.app.name}/CN={subject}", + "subject": f"/O={self.charm.opensearch_peer_cm.deployment_desc().config.cluster_name}/CN={subject}", }, merge=True, ) @@ -213,11 +233,12 @@ def _request_certificate_renewal( old_csr = secrets["csr"].encode("utf-8") subject = self._get_subject(cert_type) + organization = self.charm.opensearch_peer_cm.deployment_desc().config.cluster_name new_csr = generate_csr( private_key=key, private_key_password=(None if key_password is None else key_password.encode("utf-8")), subject=subject, - organization=self.charm.app.name, + organization=organization, **self._get_sans(cert_type), ) diff --git a/metadata.yaml b/metadata.yaml index d4d4f65a0..a163f66e1 100644 --- a/metadata.yaml +++ b/metadata.yaml @@ -17,6 +17,9 @@ peers: interface: rolling_op provides: + peer-cluster-orchestrator: + interface: peer_cluster + optional: true opensearch-client: interface: opensearch_client cos-agent: @@ -31,6 +34,10 @@ requires: certificates: interface: tls-certificates limit: 1 + peer-cluster: + interface: peer_cluster + limit: 2 # (main+failover)_cluster_orchestrator(s) + optional: true s3-credentials: interface: s3 limit: 1 diff --git a/tests/integration/test_charm.py b/tests/integration/test_charm.py index 0c8984bba..3fcb99f24 100644 --- a/tests/integration/test_charm.py +++ b/tests/integration/test_charm.py @@ -7,7 +7,10 @@ import pytest import yaml -from charms.opensearch.v0.constants_charm import OPENSEARCH_SNAP_REVISION +from charms.opensearch.v0.constants_charm import ( + OPENSEARCH_SNAP_REVISION, + TLSRelationMissing, +) from pytest_operator.plugin import OpsTest from .helpers import ( @@ -43,18 +46,11 @@ async def test_build_and_deploy(ops_test: OpsTest) -> None: num_units=DEFAULT_NUM_UNITS, series=SERIES, ) - await ops_test.model.wait_for_idle(wait_for_exact_units=DEFAULT_NUM_UNITS, timeout=1800) - - -@pytest.mark.group(1) -@pytest.mark.abort_on_fail -async def test_status(ops_test: OpsTest) -> None: - """Verifies that the application and unit are active.""" await wait_until( ops_test, apps=[APP_NAME], wait_for_exact_units=DEFAULT_NUM_UNITS, - apps_statuses=["blocked"], + apps_full_statuses={APP_NAME: {"blocked": [TLSRelationMissing]}}, ) assert len(ops_test.model.applications[APP_NAME].units) == DEFAULT_NUM_UNITS @@ -72,10 +68,11 @@ async def test_actions_get_admin_password(ops_test: OpsTest) -> None: await ops_test.model.deploy(TLS_CERTIFICATES_APP_NAME, channel="stable", config=config) # Relate it to OpenSearch to set up TLS. await ops_test.model.integrate(APP_NAME, TLS_CERTIFICATES_APP_NAME) - await ops_test.model.wait_for_idle( + await wait_until( + ops_test, apps=[APP_NAME], - status="active", - timeout=1200, + apps_statuses=["active"], + units_statuses=["active"], wait_for_exact_units=DEFAULT_NUM_UNITS, ) diff --git a/tests/unit/lib/test_opensearch_base_charm.py b/tests/unit/lib/test_opensearch_base_charm.py index 90295e87f..78b1f1800 100644 --- a/tests/unit/lib/test_opensearch_base_charm.py +++ b/tests/unit/lib/test_opensearch_base_charm.py @@ -42,7 +42,8 @@ class TestOpenSearchBaseCharm(unittest.TestCase): config=PeerClusterConfig(cluster_name="", init_hold=False, roles=[]), start=StartMode.WITH_GENERATED_ROLES, pending_directives=[], - typ=DeploymentType.MAIN_CLUSTER_MANAGER, + typ=DeploymentType.MAIN_ORCHESTRATOR, + app="opensearch", state=DeploymentState(value=State.ACTIVE), ), "ko": DeploymentDescription( @@ -50,6 +51,7 @@ class TestOpenSearchBaseCharm(unittest.TestCase): start=StartMode.WITH_PROVIDED_ROLES, pending_directives=[Directive.WAIT_FOR_PEER_CLUSTER_RELATION], typ=DeploymentType.OTHER, + app="opensearch", state=DeploymentState(value=State.BLOCKED_CANNOT_START_WITH_ROLES, message="error"), ), } @@ -72,7 +74,6 @@ def setUp(self) -> None: self.opensearch.is_failed.return_value = False self.peers_data = self.charm.peers_data - self.peer_cm = self.charm.opensearch_peer_cm self.rel_id = self.harness.add_relation(PeerRelationName, self.charm.app.name) self.service_rel_id = self.harness.add_relation(SERVICE_MANAGER, self.charm.app.name) @@ -94,11 +95,16 @@ def test_on_install_error(self): self.charm.on.install.emit() self.assertTrue(isinstance(self.harness.model.unit.status, BlockedStatus)) + @patch( + f"{BASE_LIB_PATH}.opensearch_peer_clusters.OpenSearchPeerClustersManager.deployment_desc" + ) @patch(f"{BASE_CHARM_CLASS}._purge_users") @patch(f"{BASE_CHARM_CLASS}._put_admin_user") - def test_on_leader_elected(self, _put_admin_user, _purge_users): + def test_on_leader_elected(self, _put_admin_user, _purge_users, deployment_desc): """Test on leader elected event.""" + deployment_desc.return_value = self.deployment_descriptions["ok"] self.harness.set_leader(True) + _purge_users.assert_called_once() _put_admin_user.assert_called_once() self.assertTrue(isinstance(self.harness.model.unit.status, ActiveStatus)) @@ -111,12 +117,19 @@ def test_on_leader_elected(self, _put_admin_user, _purge_users): _purge_users.assert_called_once() _put_admin_user.assert_called_once() + @patch( + f"{BASE_LIB_PATH}.opensearch_peer_clusters.OpenSearchPeerClustersManager.deployment_desc" + ) @patch(f"{BASE_CHARM_CLASS}._purge_users") @patch(f"{BASE_CHARM_CLASS}._put_admin_user") - def test_on_leader_elected_index_initialised(self, _put_admin_user, _purge_users): + def test_on_leader_elected_index_initialised( + self, _put_admin_user, _purge_users, deployment_desc + ): # security_index_initialised self.peers_data.put(Scope.APP, "security_index_initialised", True) + deployment_desc.return_value = self.deployment_descriptions["ok"] self.harness.set_leader(True) + self.charm.on.leader_elected.emit() _put_admin_user.assert_not_called() _purge_users.assert_not_called() @@ -128,11 +141,15 @@ def test_on_leader_elected_index_initialised(self, _put_admin_user, _purge_users _put_admin_user.assert_called_once() _purge_users.assert_called_once() + @patch( + f"{BASE_LIB_PATH}.opensearch_peer_clusters.OpenSearchPeerClustersManager.validate_roles" + ) @patch( f"{BASE_LIB_PATH}.opensearch_peer_clusters.OpenSearchPeerClustersManager.deployment_desc" ) @patch(f"{BASE_LIB_PATH}.opensearch_peer_clusters.OpenSearchPeerClustersManager.can_start") - @patch(f"{BASE_CHARM_CLASS}._is_tls_fully_configured") + @patch(f"{BASE_CHARM_CLASS}.is_admin_user_configured") + @patch(f"{BASE_CHARM_CLASS}.is_tls_fully_configured") @patch(f"{BASE_LIB_PATH}.opensearch_config.OpenSearchConfig.set_client_auth") @patch(f"{BASE_CHARM_CLASS}._get_nodes") @patch(f"{BASE_CHARM_CLASS}._set_node_conf") @@ -151,9 +168,11 @@ def test_on_start( _set_node_conf, _get_nodes, set_client_auth, - _is_tls_fully_configured, + is_tls_fully_configured, + is_admin_user_configured, can_start, deployment_desc, + validate_roles, ): """Test on start event.""" with patch(f"{self.OPENSEARCH_DISTRO}.is_node_up") as is_node_up: @@ -161,12 +180,14 @@ def test_on_start( is_node_up.return_value = True self.peers_data.put(Scope.APP, "security_index_initialised", True) self.charm.on.start.emit() - _is_tls_fully_configured.assert_not_called() + is_tls_fully_configured.assert_not_called() + is_admin_user_configured.assert_not_called() # test when setup not complete is_node_up.return_value = False self.peers_data.delete(Scope.APP, "security_index_initialised") - _is_tls_fully_configured.return_value = False + is_tls_fully_configured.return_value = False + is_admin_user_configured.return_value = False self.charm.on.start.emit() set_client_auth.assert_not_called() @@ -175,8 +196,11 @@ def test_on_start( self.charm.on.start.emit() _set_node_conf.assert_not_called() + _get_nodes.reset_mock() + # _get_nodes succeeds - _is_tls_fully_configured.return_value = True + is_tls_fully_configured.return_value = True + is_admin_user_configured.return_value = True _get_nodes.side_effect = None _can_service_start.return_value = False self.charm.on.start.emit() @@ -196,8 +220,9 @@ def test_on_start( deployment_desc.return_value = self.deployment_descriptions["ok"] can_start.return_value = True _get_nodes.assert_called() - _set_node_conf.assert_called() + validate_roles.side_effect = None start.assert_called_once() + _set_node_conf.assert_called() _initialize_security_index.assert_called_once() self.assertTrue(self.peers_data.get(Scope.APP, "security_index_initialised")) diff --git a/tests/unit/lib/test_opensearch_internal_data.py b/tests/unit/lib/test_opensearch_internal_data.py index fafe1e787..5ebb6af0e 100644 --- a/tests/unit/lib/test_opensearch_internal_data.py +++ b/tests/unit/lib/test_opensearch_internal_data.py @@ -4,6 +4,7 @@ """Unit test for the helper_cluster library.""" import unittest +from unittest.mock import patch from charms.opensearch.v0.constants_charm import PeerRelationName from charms.opensearch.v0.models import ( @@ -22,6 +23,8 @@ class TestOpenSearchInternalData(unittest.TestCase): + BASE_LIB_PATH = "charms.opensearch.v0" + def setUp(self) -> None: self.harness = Harness(OpenSearchOperatorCharm) self.addCleanup(self.harness.cleanup) @@ -151,17 +154,20 @@ def test_nullify_obj(self, scope): @parameterized.expand([Scope.APP, Scope.UNIT]) def test_put_and_get_complex_obj(self, scope): """Test putting complex nested object.""" - deployment = DeploymentDescription( - config=PeerClusterConfig( - cluster_name="logs", init_hold=False, roles=["cluster_manager", "data"] - ), - start=StartMode.WITH_PROVIDED_ROLES, - pending_directives=[], - typ=DeploymentType.MAIN_CLUSTER_MANAGER, - state=DeploymentState(value=State.ACTIVE), - ) - self.store.put_object(scope, "deployment", deployment.to_dict()) - fetched_deployment = DeploymentDescription.from_dict( - self.store.get_object(scope, "deployment") - ) - self.assertEqual(deployment, fetched_deployment) + with patch(f"{self.BASE_LIB_PATH}.models.datetime") as datetime: + datetime.now.return_value.timestamp.return_value = 12345788.12 + deployment = DeploymentDescription( + config=PeerClusterConfig( + cluster_name="logs", init_hold=False, roles=["cluster_manager", "data"] + ), + start=StartMode.WITH_PROVIDED_ROLES, + pending_directives=[], + app=self.charm.app.name, + typ=DeploymentType.MAIN_ORCHESTRATOR, + state=DeploymentState(value=State.ACTIVE), + ) + self.store.put_object(scope, "deployment", deployment.to_dict()) + fetched_deployment = DeploymentDescription.from_dict( + self.store.get_object(scope, "deployment") + ) + self.assertEqual(deployment, fetched_deployment) diff --git a/tests/unit/lib/test_opensearch_peer_clusters.py b/tests/unit/lib/test_opensearch_peer_clusters.py index 31bbc212c..66aad09ec 100644 --- a/tests/unit/lib/test_opensearch_peer_clusters.py +++ b/tests/unit/lib/test_opensearch_peer_clusters.py @@ -89,23 +89,29 @@ def test_can_start(self, deployment_desc): ), start=StartMode.WITH_PROVIDED_ROLES, pending_directives=directives, - typ=DeploymentType.MAIN_CLUSTER_MANAGER, + app=self.charm.app.name, + typ=DeploymentType.MAIN_ORCHESTRATOR, state=DeploymentState(value=State.ACTIVE), ) can_start = self.peer_cm.can_start(deployment_desc) self.assertEqual(can_start, expected) + @patch(f"{PEER_CLUSTERS_MANAGER}.is_peer_cluster_orchestrator_relation_set") @patch("ops.model.Model.get_relation") @patch(f"{PEER_CLUSTERS_MANAGER}.deployment_desc") - def test_validate_roles(self, deployment_desc, get_relation): + def test_validate_roles( + self, deployment_desc, get_relation, is_peer_cluster_orchestrator_relation_set + ): """Test the roles' validation.""" + is_peer_cluster_orchestrator_relation_set.return_value = False get_relation.return_value.units = set(self.p_units) deployment_desc.return_value = DeploymentDescription( config=self.user_configs["roles_ok"], start=StartMode.WITH_PROVIDED_ROLES, pending_directives=[], - typ=DeploymentType.MAIN_CLUSTER_MANAGER, + app=self.charm.app.name, + typ=DeploymentType.MAIN_ORCHESTRATOR, state=DeploymentState(value=State.ACTIVE), ) with self.assertRaises(OpenSearchProvidedRolesException): @@ -139,9 +145,9 @@ def test_validate_roles(self, deployment_desc, get_relation): @patch("ops.model.Model.get_relation") @patch(f"{BASE_LIB_PATH}.helper_cluster.ClusterTopology.nodes") @patch(f"{BASE_CHARM_CLASS}.alt_hosts") - @patch(f"{PEER_CLUSTERS_MANAGER}.is_peer_cluster_relation_set") + @patch(f"{PEER_CLUSTERS_MANAGER}.is_peer_cluster_orchestrator_relation_set") def test_pre_validate_roles_change( - self, is_peer_cluster_relation_set, alt_hosts, nodes, get_relation + self, is_peer_cluster_orchestrator_relation_set, alt_hosts, nodes, get_relation ): """Test the pre_validation of roles change.""" get_relation.return_value.units = set(self.p_units) @@ -154,7 +160,7 @@ def test_pre_validate_roles_change( self.peer_cm._pre_validate_roles_change(new_roles=[], prev_roles=["data", "ml"]) # test on a multi clusters fleet - happy path - is_peer_cluster_relation_set.return_value = True + is_peer_cluster_orchestrator_relation_set.return_value = True nodes.return_value = [ Node( name=node.name.replace("/", "-"), @@ -177,11 +183,11 @@ def test_pre_validate_roles_change( new_roles=["data"], prev_roles=["cluster_manager", "data"] ) with self.assertRaises(OpenSearchProvidedRolesException): - is_peer_cluster_relation_set.return_value = False + is_peer_cluster_orchestrator_relation_set.return_value = False self.peer_cm._pre_validate_roles_change(new_roles=["ml"], prev_roles=["ml", "data"]) with self.assertRaises(OpenSearchProvidedRolesException): # no other data nodes in cluster fleet - is_peer_cluster_relation_set.return_value = True + is_peer_cluster_orchestrator_relation_set.return_value = True nodes.return_value = [ Node( name=node.name.replace("/", "-"), diff --git a/tests/unit/lib/test_opensearch_tls.py b/tests/unit/lib/test_opensearch_tls.py index 17675b36c..90bd32abb 100644 --- a/tests/unit/lib/test_opensearch_tls.py +++ b/tests/unit/lib/test_opensearch_tls.py @@ -9,6 +9,14 @@ from charms.opensearch.v0.constants_charm import PeerRelationName from charms.opensearch.v0.constants_tls import TLS_RELATION, CertType +from charms.opensearch.v0.models import ( + DeploymentDescription, + DeploymentState, + DeploymentType, + PeerClusterConfig, + StartMode, + State, +) from charms.opensearch.v0.opensearch_internal_data import Scope from ops.testing import Harness @@ -88,11 +96,25 @@ def test_find_secret(self): ) self.secret_store.put_object(Scope.APP, CertType.APP_ADMIN.val, {"csr": event_data_csr}) + @patch( + f"{BASE_LIB_PATH}.opensearch_peer_clusters.OpenSearchPeerClustersManager.deployment_desc" + ) @patch("charms.opensearch.v0.opensearch_tls.OpenSearchTLS._request_certificate") @patch("charm.OpenSearchOperatorCharm._put_admin_user") @patch("charm.OpenSearchOperatorCharm._purge_users") - def test_on_relation_joined_admin(self, _, _put_admin_user, _request_certificate): + def test_on_relation_joined_admin( + self, _, _put_admin_user, _request_certificate, deployment_desc + ): """Test on certificate relation joined event.""" + deployment_desc.return_value = DeploymentDescription( + config=PeerClusterConfig(cluster_name="", init_hold=False, roles=[]), + start=StartMode.WITH_GENERATED_ROLES, + pending_directives=[], + typ=DeploymentType.MAIN_ORCHESTRATOR, + app=self.charm.app.name, + state=DeploymentState(value=State.ACTIVE), + ) + event_mock = MagicMock() self.harness.set_leader(is_leader=True) @@ -106,11 +128,25 @@ def test_on_relation_joined_admin(self, _, _put_admin_user, _request_certificate ], ) + @patch( + f"{BASE_LIB_PATH}.opensearch_peer_clusters.OpenSearchPeerClustersManager.deployment_desc" + ) @patch("charms.opensearch.v0.opensearch_tls.OpenSearchTLS._request_certificate") @patch("charm.OpenSearchOperatorCharm._put_admin_user") @patch("charm.OpenSearchOperatorCharm._purge_users") - def test_on_relation_joined_non_admin(self, _, _put_admin_user, _request_certificate): + def test_on_relation_joined_non_admin( + self, _, _put_admin_user, _request_certificate, deployment_desc + ): """Test on certificate relation joined event.""" + deployment_desc.return_value = DeploymentDescription( + config=PeerClusterConfig(cluster_name="", init_hold=False, roles=[]), + start=StartMode.WITH_GENERATED_ROLES, + pending_directives=[], + typ=DeploymentType.MAIN_ORCHESTRATOR, + app=self.charm.app.name, + state=DeploymentState(value=State.ACTIVE), + ) + event_mock = MagicMock() self.harness.set_leader(is_leader=False) @@ -181,8 +217,13 @@ def test_on_certificate_available( @patch( "charms.tls_certificates_interface.v3.tls_certificates.TLSCertificatesRequiresV3.request_certificate_creation" ) + @patch( + f"{BASE_LIB_PATH}.opensearch_peer_clusters.OpenSearchPeerClustersManager.deployment_desc" + ) @patch("charm.OpenSearchOperatorCharm._put_admin_user") - def test_on_certificate_expiring(self, _put_admin_user, request_certificate_creation): + def test_on_certificate_expiring( + self, _put_admin_user, deployment_desc, request_certificate_creation + ): """Test _on_certificate_available event.""" csr = "csr_12345" cert = "cert_12345" @@ -195,6 +236,15 @@ def test_on_certificate_expiring(self, _put_admin_user, request_certificate_crea {"csr": csr, "cert": cert, "key": key}, ) + deployment_desc.return_value = DeploymentDescription( + config=PeerClusterConfig(cluster_name="", init_hold=False, roles=[]), + start=StartMode.WITH_GENERATED_ROLES, + pending_directives=[], + typ=DeploymentType.MAIN_ORCHESTRATOR, + app=self.charm.app.name, + state=DeploymentState(value=State.ACTIVE), + ) + event_mock = MagicMock(certificate=cert) self.charm.tls._on_certificate_expiring(event_mock)