From 6f1322a1a3148323ff3a46dffb5b54d27c096003 Mon Sep 17 00:00:00 2001 From: Mehdi Bendriss Date: Sat, 4 May 2024 14:03:36 +0200 Subject: [PATCH] [DPE-4281] - Fixed secrets updates + users/s3 handling on large-depl. relations (#289) ## Issue This PR implements [DPE-4281](https://warthogs.atlassian.net/browse/DPE-4281). Namely, this PR implements various fixes for Multiple issues arose from secrets which led to endless triggers of secrets-changed hook - became more obvious on large deployments. - Ensure that COSUser is NOT updated constantly on post_start_init - Correct setting / deletion of started flag in the start_opensearch workflow - correct settings of s3 credentials in large deployments relations and preventing empty values to be set on: - both sides of the peer-cluster relation - peer relation - In addition to AdminUser propagate in large deployment relations: - COSuser - Kibanaserver - fix status clearing from requirer side of large deployment relation - make continuous writes more robust [DPE-4281]: https://warthogs.atlassian.net/browse/DPE-4281?atlOrigin=eyJpIjoiNWRkNTljNzYxNjVmNDY3MDlhMDU5Y2ZhYzA5YTRkZjUiLCJwIjoiZ2l0aHViLWNvbS1KU1cifQ --- lib/charms/opensearch/v0/models.py | 3 + .../opensearch/v0/opensearch_base_charm.py | 43 ++++++---- .../v0/opensearch_relation_peer_cluster.py | 81 ++++++++++++------- tests/integration/ha/continuous_writes.py | 8 +- tests/integration/ha/helpers.py | 20 +++-- 5 files changed, 104 insertions(+), 51 deletions(-) diff --git a/lib/charms/opensearch/v0/models.py b/lib/charms/opensearch/v0/models.py index f80c57964..38af6c9f0 100644 --- a/lib/charms/opensearch/v0/models.py +++ b/lib/charms/opensearch/v0/models.py @@ -215,6 +215,9 @@ class PeerClusterRelDataCredentials(Model): admin_username: str admin_password: str admin_password_hash: str + kibana_password: str + kibana_password_hash: str + monitor_password: str admin_tls: Dict[str, Optional[str]] s3: Optional[S3RelDataCredentials] diff --git a/lib/charms/opensearch/v0/opensearch_base_charm.py b/lib/charms/opensearch/v0/opensearch_base_charm.py index 73a7c5809..c438a1378 100644 --- a/lib/charms/opensearch/v0/opensearch_base_charm.py +++ b/lib/charms/opensearch/v0/opensearch_base_charm.py @@ -832,16 +832,6 @@ def _handle_change_to_main_orchestrator_if_needed( def _start_opensearch(self, event: _StartOpenSearch) -> None: # noqa: C901 """Start OpenSearch, with a generated or passed conf, if all resources configured.""" - if not self.node_lock.acquired: - # (Attempt to acquire lock even if `event.ignore_lock`) - if event.ignore_lock: - # Only used for force upgrades - logger.debug("Starting without lock") - else: - logger.debug("Lock to start opensearch not acquired. Will retry next event") - event.defer() - return - self.peers_data.delete(Scope.UNIT, "started") if self.opensearch.is_started(): try: self._post_start_init(event) @@ -858,6 +848,18 @@ def _start_opensearch(self, event: _StartOpenSearch) -> None: # noqa: C901 event.defer() return + self.peers_data.delete(Scope.UNIT, "started") + + if not self.node_lock.acquired: + # (Attempt to acquire lock even if `event.ignore_lock`) + if event.ignore_lock: + # Only used for force upgrades + logger.debug("Starting without lock") + else: + logger.debug("Lock to start opensearch not acquired. Will retry next event") + event.defer() + return + if not self._can_service_start(): self.node_lock.release() event.defer() @@ -965,9 +967,12 @@ def _post_start_init(self, event: _StartOpenSearch): # noqa: C901 # apply cluster health self.health.apply(wait_for_green_first=True, app=self.unit.is_leader()) - if self.unit.is_leader(): + if ( + self.unit.is_leader() + and self.opensearch_peer_cm.deployment_desc().typ == DeploymentType.MAIN_ORCHESTRATOR + ): # Creating the monitoring user - self._put_or_update_internal_user_leader(COSUser) + self._put_or_update_internal_user_leader(COSUser, update=False) self.unit.open_port("tcp", 9200) @@ -1043,6 +1048,7 @@ def _stop_opensearch(self, *, restart=False) -> None: # 2. stop the service self.opensearch.stop() + self.peers_data.delete(Scope.UNIT, "started") self.status.set(WaitingStatus(ServiceStopped)) # 3. Remove the exclusions @@ -1214,18 +1220,25 @@ def _purge_users(self): if user != "_meta": self.opensearch.config.delete("opensearch-security/internal_users.yml", user) - def _put_or_update_internal_user_leader(self, user: str, pwd: Optional[str] = None) -> None: + def _put_or_update_internal_user_leader( + self, user: str, pwd: Optional[str] = None, update: bool = True + ) -> None: """Create system user or update it with a new password.""" # Leader is to set new password and hash, others populate existing hash locally if not self.unit.is_leader(): logger.error("Credential change can be only performed by the leader unit.") return + secret = self.secrets.get(Scope.APP, self.secrets.password_key(user)) + if secret and not update: + self._put_or_update_internal_user_unit(user) + return + hashed_pwd, pwd = generate_hashed_password(pwd) # Updating security index # We need to do this for all credential changes - if secret := self.secrets.get(Scope.APP, self.secrets.password_key(user)): + if secret: self.user_manager.update_user_password(user, hashed_pwd) # In case it's a new user, OR it's a system user (that has an entry in internal_users.yml) @@ -1244,7 +1257,7 @@ def _put_or_update_internal_user_leader(self, user: str, pwd: Optional[str] = No if user == AdminUser: self.peers_data.put(Scope.APP, "admin_user_initialized", True) - def _put_or_update_internal_user_unit(self, user: str, pwd: Optional[str] = None) -> None: + def _put_or_update_internal_user_unit(self, user: str) -> None: """Create system user or update it with a new password.""" # Leader is to set new password and hash, others populate existing hash locally hashed_pwd = self.secrets.get(Scope.APP, self.secrets.hash_key(user)) diff --git a/lib/charms/opensearch/v0/opensearch_relation_peer_cluster.py b/lib/charms/opensearch/v0/opensearch_relation_peer_cluster.py index efd222ac4..b8f2433ce 100644 --- a/lib/charms/opensearch/v0/opensearch_relation_peer_cluster.py +++ b/lib/charms/opensearch/v0/opensearch_relation_peer_cluster.py @@ -7,6 +7,9 @@ from typing import TYPE_CHECKING, Any, Dict, List, MutableMapping, Optional, Union from charms.opensearch.v0.constants_charm import ( + AdminUser, + COSUser, + KibanaserverUser, PeerClusterOrchestratorRelationName, PeerClusterRelationName, ) @@ -26,6 +29,7 @@ PeerClusterRelErrorData, S3RelDataCredentials, ) +from charms.opensearch.v0.opensearch_backups import S3_RELATION from charms.opensearch.v0.opensearch_exceptions import OpenSearchHttpError from charms.opensearch.v0.opensearch_internal_data import Scope from ops import ( @@ -300,25 +304,30 @@ def _put_planned_units(self, app: str, count: int, target_relation_ids: List[int Scope.APP, "cluster_fleet_planned_units", cluster_fleet_planned_units ) - def _s3_credentials(self, deployment_desc: DeploymentDescription) -> S3RelDataCredentials: - secrets = self.charm.secrets + def _s3_credentials( + self, deployment_desc: DeploymentDescription + ) -> Optional[S3RelDataCredentials]: if deployment_desc.typ == DeploymentType.MAIN_ORCHESTRATOR: + if not self.charm.model.get_relation(S3_RELATION): + return None + + if not self.charm.backup.s3_client.get_s3_connection_info().get("access-key"): + return None + # As the main orchestrator, this application must set the S3 information. - s3_credentials = S3RelDataCredentials( - access_key=self.charm.backup.s3_client.get_s3_connection_info().get( - "access-key", "" - ), - secret_key=self.charm.backup.s3_client.get_s3_connection_info().get( - "secret-key", "" - ), - ) - else: - # Return what we have received from the peer relation - s3_credentials = S3RelDataCredentials( - access_key=secrets.get(Scope.APP, "access-key", default=""), - secret_key=secrets.get(Scope.APP, "secret-key", default=""), + return S3RelDataCredentials( + access_key=self.charm.backup.s3_client.get_s3_connection_info().get("access-key"), + secret_key=self.charm.backup.s3_client.get_s3_connection_info().get("secret-key"), ) - return s3_credentials + + if not self.charm.secrets.get(Scope.APP, "access-key"): + return None + + # Return what we have received from the peer relation + return S3RelDataCredentials( + access_key=self.charm.secrets.get(Scope.APP, "access-key"), + secret_key=self.charm.secrets.get(Scope.APP, "secret-key"), + ) def _rel_data( self, deployment_desc: DeploymentDescription, orchestrators: PeerClusterOrchestrators @@ -332,17 +341,20 @@ def _rel_data( # ready (will receive a subsequent try: secrets = self.charm.secrets - - s3_credentials = self._s3_credentials(deployment_desc) return PeerClusterRelData( cluster_name=deployment_desc.config.cluster_name, cm_nodes=self._fetch_local_cm_nodes(), credentials=PeerClusterRelDataCredentials( - admin_username="admin", - admin_password=secrets.get(Scope.APP, secrets.password_key("admin")), - admin_password_hash=secrets.get(Scope.APP, secrets.hash_key("admin")), + admin_username=AdminUser, + admin_password=secrets.get(Scope.APP, secrets.password_key(AdminUser)), + admin_password_hash=secrets.get(Scope.APP, secrets.hash_key(AdminUser)), + kibana_password=secrets.get(Scope.APP, secrets.password_key(KibanaserverUser)), + kibana_password_hash=secrets.get( + Scope.APP, secrets.hash_key(KibanaserverUser) + ), + monitor_password=secrets.get(Scope.APP, secrets.password_key(COSUser)), admin_tls=secrets.get_object(Scope.APP, CertType.APP_ADMIN.val), - s3=s3_credentials, + s3=self._s3_credentials(deployment_desc), ), deployment_desc=deployment_desc, ) @@ -384,6 +396,8 @@ def _rel_err_data( # noqa: C901 blocked_msg = f"Security index not initialized {message_suffix}." elif not self.charm.is_every_unit_marked_as_started(): blocked_msg = f"Waiting for every unit {message_suffix} to start." + elif not self.charm.secrets.get(Scope.APP, self.charm.secrets.password_key(COSUser)): + blocked_msg = f"'{COSUser}' user not created yet." else: try: if not self._fetch_local_cm_nodes(): @@ -501,8 +515,16 @@ def _set_security_conf(self, data: PeerClusterRelData) -> None: """Store security related config.""" # set admin secrets secrets = self.charm.secrets - secrets.put(Scope.APP, secrets.password_key("admin"), data.credentials.admin_password) - secrets.put(Scope.APP, secrets.hash_key("admin"), data.credentials.admin_password_hash) + secrets.put(Scope.APP, secrets.password_key(AdminUser), data.credentials.admin_password) + secrets.put(Scope.APP, secrets.hash_key(AdminUser), data.credentials.admin_password_hash) + secrets.put( + Scope.APP, secrets.password_key(KibanaserverUser), data.credentials.kibana_password + ) + secrets.put( + Scope.APP, secrets.hash_key(KibanaserverUser), data.credentials.kibana_password_hash + ) + secrets.put(Scope.APP, secrets.password_key(COSUser), data.credentials.monitor_password) + secrets.put_object(Scope.APP, CertType.APP_ADMIN.val, data.credentials.admin_tls) # store the app admin TLS resources if not stored @@ -512,9 +534,9 @@ def _set_security_conf(self, data: PeerClusterRelData) -> None: self.charm.peers_data.put(Scope.APP, "admin_user_initialized", True) self.charm.peers_data.put(Scope.APP, "security_index_initialised", True) - s3_creds = data.credentials.s3 - self.charm.secrets.put(Scope.APP, "access-key", s3_creds.access_key) - self.charm.secrets.put(Scope.APP, "secret-key", s3_creds.secret_key) + if s3_creds := data.credentials.s3: + self.charm.secrets.put(Scope.APP, "access-key", s3_creds.access_key) + self.charm.secrets.put(Scope.APP, "secret-key", s3_creds.secret_key) def _orchestrators( self, @@ -747,6 +769,8 @@ def _error_set_from_requirer( blocked_msg = ( "A cluster can only be related to 1 main and 1 failover-clusters at most." ) + elif peer_cluster_rel_data.cluster_name != deployment_desc.config.cluster_name: + blocked_msg = "Cannot relate 2 clusters with different 'cluster_name' values." if not blocked_msg: self._clear_errors(f"error_from_requirer-{event_rel_id}") @@ -807,5 +831,6 @@ def _set_error(self, label: str, error: Optional[Dict[str, Any]]) -> None: def _clear_errors(self, *error_labels: str): """Clear previously set Peer clusters related statuses.""" for error_label in error_labels: - self.charm.status.clear(error_label) + error = self.charm.peers_data.get(Scope.APP, error_label, "") + self.charm.status.clear(error, app=True) self.charm.peers_data.delete(Scope.APP, error_label) diff --git a/tests/integration/ha/continuous_writes.py b/tests/integration/ha/continuous_writes.py index 5975cb23b..d061c0f6e 100644 --- a/tests/integration/ha/continuous_writes.py +++ b/tests/integration/ha/continuous_writes.py @@ -87,7 +87,7 @@ async def clear(self) -> None: client = await self._client() try: - client.indices.delete(index=ContinuousWrites.INDEX_NAME) + client.indices.delete(index=ContinuousWrites.INDEX_NAME, ignore_unavailable=True) finally: client.close() @@ -181,6 +181,10 @@ def _create_process(self, is_bulk: bool = True): ) def _stop_process(self): + if self._is_stopped or not self._process.is_alive(): + self._is_stopped = True + return + self._event.set() self._process.join() self._queue.close() @@ -189,7 +193,7 @@ def _stop_process(self): async def _secrets(self) -> str: """Fetch secrets and return the password.""" - secrets = await get_secrets(self._ops_test) + secrets = await get_secrets(self._ops_test, app=self._app) with open(ContinuousWrites.CERT_PATH, "w") as chain: chain.write(secrets["ca-chain"]) diff --git a/tests/integration/ha/helpers.py b/tests/integration/ha/helpers.py index 17fc88f57..55c82da2c 100644 --- a/tests/integration/ha/helpers.py +++ b/tests/integration/ha/helpers.py @@ -61,12 +61,20 @@ async def app_name(ops_test: OpsTest) -> Optional[str]: application name "opensearch". Note: if multiple clusters are running OpenSearch this will return the one first found. """ - status = await ops_test.model.get_status() - for app in ops_test.model.applications: - if "opensearch" in status["applications"][app]["charm"]: - return app + apps = json.loads( + subprocess.check_output( + f"juju status --model {ops_test.model.info.name} --format=json".split() + ) + )["applications"] + + opensearch_apps = { + name: desc for name, desc in apps.items() if desc["charm-name"] == "opensearch" + } + for name, desc in opensearch_apps.items(): + if name == "opensearch-main": + return name - return None + return list(opensearch_apps.keys())[0] if opensearch_apps else None @retry( @@ -579,4 +587,4 @@ async def assert_restore_indices_and_compare_consistency( ) # We expect that new_count has a loss of documents and the numbers are different. # Check if we have data but not all of it. - assert new_count > 0 and new_count < original_count + assert 0 < new_count < original_count