Skip to content

Commit

Permalink
[DPE-4281] - Fixed secrets updates + users/s3 handling on large-depl.…
Browse files Browse the repository at this point in the history
… relations (#289)

## Issue
This PR implements
[DPE-4281](https://warthogs.atlassian.net/browse/DPE-4281). Namely, this
PR implements various fixes for Multiple issues arose from secrets which
led to endless triggers of secrets-changed hook - became more obvious on
large deployments.

- Ensure that COSUser is NOT updated constantly on post_start_init
- Correct setting / deletion of started flag in the start_opensearch
workflow
- correct settings of s3 credentials in large deployments relations and
preventing empty values to be set on:
     - both sides of the peer-cluster relation
     - peer relation
- In addition to AdminUser propagate in large deployment relations: 
     - COSuser 
     - Kibanaserver
- fix status clearing from requirer side of large deployment relation
- make continuous writes more robust

[DPE-4281]:
https://warthogs.atlassian.net/browse/DPE-4281?atlOrigin=eyJpIjoiNWRkNTljNzYxNjVmNDY3MDlhMDU5Y2ZhYzA5YTRkZjUiLCJwIjoiZ2l0aHViLWNvbS1KU1cifQ
  • Loading branch information
Mehdi-Bendriss authored May 4, 2024
1 parent fda46cb commit 6f1322a
Show file tree
Hide file tree
Showing 5 changed files with 104 additions and 51 deletions.
3 changes: 3 additions & 0 deletions lib/charms/opensearch/v0/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,9 @@ class PeerClusterRelDataCredentials(Model):
admin_username: str
admin_password: str
admin_password_hash: str
kibana_password: str
kibana_password_hash: str
monitor_password: str
admin_tls: Dict[str, Optional[str]]
s3: Optional[S3RelDataCredentials]

Expand Down
43 changes: 28 additions & 15 deletions lib/charms/opensearch/v0/opensearch_base_charm.py
Original file line number Diff line number Diff line change
Expand Up @@ -832,16 +832,6 @@ def _handle_change_to_main_orchestrator_if_needed(

def _start_opensearch(self, event: _StartOpenSearch) -> None: # noqa: C901
"""Start OpenSearch, with a generated or passed conf, if all resources configured."""
if not self.node_lock.acquired:
# (Attempt to acquire lock even if `event.ignore_lock`)
if event.ignore_lock:
# Only used for force upgrades
logger.debug("Starting without lock")
else:
logger.debug("Lock to start opensearch not acquired. Will retry next event")
event.defer()
return
self.peers_data.delete(Scope.UNIT, "started")
if self.opensearch.is_started():
try:
self._post_start_init(event)
Expand All @@ -858,6 +848,18 @@ def _start_opensearch(self, event: _StartOpenSearch) -> None: # noqa: C901
event.defer()
return

self.peers_data.delete(Scope.UNIT, "started")

if not self.node_lock.acquired:
# (Attempt to acquire lock even if `event.ignore_lock`)
if event.ignore_lock:
# Only used for force upgrades
logger.debug("Starting without lock")
else:
logger.debug("Lock to start opensearch not acquired. Will retry next event")
event.defer()
return

if not self._can_service_start():
self.node_lock.release()
event.defer()
Expand Down Expand Up @@ -965,9 +967,12 @@ def _post_start_init(self, event: _StartOpenSearch): # noqa: C901
# apply cluster health
self.health.apply(wait_for_green_first=True, app=self.unit.is_leader())

if self.unit.is_leader():
if (
self.unit.is_leader()
and self.opensearch_peer_cm.deployment_desc().typ == DeploymentType.MAIN_ORCHESTRATOR
):
# Creating the monitoring user
self._put_or_update_internal_user_leader(COSUser)
self._put_or_update_internal_user_leader(COSUser, update=False)

self.unit.open_port("tcp", 9200)

Expand Down Expand Up @@ -1043,6 +1048,7 @@ def _stop_opensearch(self, *, restart=False) -> None:

# 2. stop the service
self.opensearch.stop()
self.peers_data.delete(Scope.UNIT, "started")
self.status.set(WaitingStatus(ServiceStopped))

# 3. Remove the exclusions
Expand Down Expand Up @@ -1214,18 +1220,25 @@ def _purge_users(self):
if user != "_meta":
self.opensearch.config.delete("opensearch-security/internal_users.yml", user)

def _put_or_update_internal_user_leader(self, user: str, pwd: Optional[str] = None) -> None:
def _put_or_update_internal_user_leader(
self, user: str, pwd: Optional[str] = None, update: bool = True
) -> None:
"""Create system user or update it with a new password."""
# Leader is to set new password and hash, others populate existing hash locally
if not self.unit.is_leader():
logger.error("Credential change can be only performed by the leader unit.")
return

secret = self.secrets.get(Scope.APP, self.secrets.password_key(user))
if secret and not update:
self._put_or_update_internal_user_unit(user)
return

hashed_pwd, pwd = generate_hashed_password(pwd)

# Updating security index
# We need to do this for all credential changes
if secret := self.secrets.get(Scope.APP, self.secrets.password_key(user)):
if secret:
self.user_manager.update_user_password(user, hashed_pwd)

# In case it's a new user, OR it's a system user (that has an entry in internal_users.yml)
Expand All @@ -1244,7 +1257,7 @@ def _put_or_update_internal_user_leader(self, user: str, pwd: Optional[str] = No
if user == AdminUser:
self.peers_data.put(Scope.APP, "admin_user_initialized", True)

def _put_or_update_internal_user_unit(self, user: str, pwd: Optional[str] = None) -> None:
def _put_or_update_internal_user_unit(self, user: str) -> None:
"""Create system user or update it with a new password."""
# Leader is to set new password and hash, others populate existing hash locally
hashed_pwd = self.secrets.get(Scope.APP, self.secrets.hash_key(user))
Expand Down
81 changes: 53 additions & 28 deletions lib/charms/opensearch/v0/opensearch_relation_peer_cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@
from typing import TYPE_CHECKING, Any, Dict, List, MutableMapping, Optional, Union

from charms.opensearch.v0.constants_charm import (
AdminUser,
COSUser,
KibanaserverUser,
PeerClusterOrchestratorRelationName,
PeerClusterRelationName,
)
Expand All @@ -26,6 +29,7 @@
PeerClusterRelErrorData,
S3RelDataCredentials,
)
from charms.opensearch.v0.opensearch_backups import S3_RELATION
from charms.opensearch.v0.opensearch_exceptions import OpenSearchHttpError
from charms.opensearch.v0.opensearch_internal_data import Scope
from ops import (
Expand Down Expand Up @@ -300,25 +304,30 @@ def _put_planned_units(self, app: str, count: int, target_relation_ids: List[int
Scope.APP, "cluster_fleet_planned_units", cluster_fleet_planned_units
)

def _s3_credentials(self, deployment_desc: DeploymentDescription) -> S3RelDataCredentials:
secrets = self.charm.secrets
def _s3_credentials(
self, deployment_desc: DeploymentDescription
) -> Optional[S3RelDataCredentials]:
if deployment_desc.typ == DeploymentType.MAIN_ORCHESTRATOR:
if not self.charm.model.get_relation(S3_RELATION):
return None

if not self.charm.backup.s3_client.get_s3_connection_info().get("access-key"):
return None

# As the main orchestrator, this application must set the S3 information.
s3_credentials = S3RelDataCredentials(
access_key=self.charm.backup.s3_client.get_s3_connection_info().get(
"access-key", ""
),
secret_key=self.charm.backup.s3_client.get_s3_connection_info().get(
"secret-key", ""
),
)
else:
# Return what we have received from the peer relation
s3_credentials = S3RelDataCredentials(
access_key=secrets.get(Scope.APP, "access-key", default=""),
secret_key=secrets.get(Scope.APP, "secret-key", default=""),
return S3RelDataCredentials(
access_key=self.charm.backup.s3_client.get_s3_connection_info().get("access-key"),
secret_key=self.charm.backup.s3_client.get_s3_connection_info().get("secret-key"),
)
return s3_credentials

if not self.charm.secrets.get(Scope.APP, "access-key"):
return None

# Return what we have received from the peer relation
return S3RelDataCredentials(
access_key=self.charm.secrets.get(Scope.APP, "access-key"),
secret_key=self.charm.secrets.get(Scope.APP, "secret-key"),
)

def _rel_data(
self, deployment_desc: DeploymentDescription, orchestrators: PeerClusterOrchestrators
Expand All @@ -332,17 +341,20 @@ def _rel_data(
# ready (will receive a subsequent
try:
secrets = self.charm.secrets

s3_credentials = self._s3_credentials(deployment_desc)
return PeerClusterRelData(
cluster_name=deployment_desc.config.cluster_name,
cm_nodes=self._fetch_local_cm_nodes(),
credentials=PeerClusterRelDataCredentials(
admin_username="admin",
admin_password=secrets.get(Scope.APP, secrets.password_key("admin")),
admin_password_hash=secrets.get(Scope.APP, secrets.hash_key("admin")),
admin_username=AdminUser,
admin_password=secrets.get(Scope.APP, secrets.password_key(AdminUser)),
admin_password_hash=secrets.get(Scope.APP, secrets.hash_key(AdminUser)),
kibana_password=secrets.get(Scope.APP, secrets.password_key(KibanaserverUser)),
kibana_password_hash=secrets.get(
Scope.APP, secrets.hash_key(KibanaserverUser)
),
monitor_password=secrets.get(Scope.APP, secrets.password_key(COSUser)),
admin_tls=secrets.get_object(Scope.APP, CertType.APP_ADMIN.val),
s3=s3_credentials,
s3=self._s3_credentials(deployment_desc),
),
deployment_desc=deployment_desc,
)
Expand Down Expand Up @@ -384,6 +396,8 @@ def _rel_err_data( # noqa: C901
blocked_msg = f"Security index not initialized {message_suffix}."
elif not self.charm.is_every_unit_marked_as_started():
blocked_msg = f"Waiting for every unit {message_suffix} to start."
elif not self.charm.secrets.get(Scope.APP, self.charm.secrets.password_key(COSUser)):
blocked_msg = f"'{COSUser}' user not created yet."
else:
try:
if not self._fetch_local_cm_nodes():
Expand Down Expand Up @@ -501,8 +515,16 @@ def _set_security_conf(self, data: PeerClusterRelData) -> None:
"""Store security related config."""
# set admin secrets
secrets = self.charm.secrets
secrets.put(Scope.APP, secrets.password_key("admin"), data.credentials.admin_password)
secrets.put(Scope.APP, secrets.hash_key("admin"), data.credentials.admin_password_hash)
secrets.put(Scope.APP, secrets.password_key(AdminUser), data.credentials.admin_password)
secrets.put(Scope.APP, secrets.hash_key(AdminUser), data.credentials.admin_password_hash)
secrets.put(
Scope.APP, secrets.password_key(KibanaserverUser), data.credentials.kibana_password
)
secrets.put(
Scope.APP, secrets.hash_key(KibanaserverUser), data.credentials.kibana_password_hash
)
secrets.put(Scope.APP, secrets.password_key(COSUser), data.credentials.monitor_password)

secrets.put_object(Scope.APP, CertType.APP_ADMIN.val, data.credentials.admin_tls)

# store the app admin TLS resources if not stored
Expand All @@ -512,9 +534,9 @@ def _set_security_conf(self, data: PeerClusterRelData) -> None:
self.charm.peers_data.put(Scope.APP, "admin_user_initialized", True)
self.charm.peers_data.put(Scope.APP, "security_index_initialised", True)

s3_creds = data.credentials.s3
self.charm.secrets.put(Scope.APP, "access-key", s3_creds.access_key)
self.charm.secrets.put(Scope.APP, "secret-key", s3_creds.secret_key)
if s3_creds := data.credentials.s3:
self.charm.secrets.put(Scope.APP, "access-key", s3_creds.access_key)
self.charm.secrets.put(Scope.APP, "secret-key", s3_creds.secret_key)

def _orchestrators(
self,
Expand Down Expand Up @@ -747,6 +769,8 @@ def _error_set_from_requirer(
blocked_msg = (
"A cluster can only be related to 1 main and 1 failover-clusters at most."
)
elif peer_cluster_rel_data.cluster_name != deployment_desc.config.cluster_name:
blocked_msg = "Cannot relate 2 clusters with different 'cluster_name' values."

if not blocked_msg:
self._clear_errors(f"error_from_requirer-{event_rel_id}")
Expand Down Expand Up @@ -807,5 +831,6 @@ def _set_error(self, label: str, error: Optional[Dict[str, Any]]) -> None:
def _clear_errors(self, *error_labels: str):
"""Clear previously set Peer clusters related statuses."""
for error_label in error_labels:
self.charm.status.clear(error_label)
error = self.charm.peers_data.get(Scope.APP, error_label, "")
self.charm.status.clear(error, app=True)
self.charm.peers_data.delete(Scope.APP, error_label)
8 changes: 6 additions & 2 deletions tests/integration/ha/continuous_writes.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ async def clear(self) -> None:

client = await self._client()
try:
client.indices.delete(index=ContinuousWrites.INDEX_NAME)
client.indices.delete(index=ContinuousWrites.INDEX_NAME, ignore_unavailable=True)
finally:
client.close()

Expand Down Expand Up @@ -181,6 +181,10 @@ def _create_process(self, is_bulk: bool = True):
)

def _stop_process(self):
if self._is_stopped or not self._process.is_alive():
self._is_stopped = True
return

self._event.set()
self._process.join()
self._queue.close()
Expand All @@ -189,7 +193,7 @@ def _stop_process(self):

async def _secrets(self) -> str:
"""Fetch secrets and return the password."""
secrets = await get_secrets(self._ops_test)
secrets = await get_secrets(self._ops_test, app=self._app)
with open(ContinuousWrites.CERT_PATH, "w") as chain:
chain.write(secrets["ca-chain"])

Expand Down
20 changes: 14 additions & 6 deletions tests/integration/ha/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,12 +61,20 @@ async def app_name(ops_test: OpsTest) -> Optional[str]:
application name "opensearch".
Note: if multiple clusters are running OpenSearch this will return the one first found.
"""
status = await ops_test.model.get_status()
for app in ops_test.model.applications:
if "opensearch" in status["applications"][app]["charm"]:
return app
apps = json.loads(
subprocess.check_output(
f"juju status --model {ops_test.model.info.name} --format=json".split()
)
)["applications"]

opensearch_apps = {
name: desc for name, desc in apps.items() if desc["charm-name"] == "opensearch"
}
for name, desc in opensearch_apps.items():
if name == "opensearch-main":
return name

return None
return list(opensearch_apps.keys())[0] if opensearch_apps else None


@retry(
Expand Down Expand Up @@ -579,4 +587,4 @@ async def assert_restore_indices_and_compare_consistency(
)
# We expect that new_count has a loss of documents and the numbers are different.
# Check if we have data but not all of it.
assert new_count > 0 and new_count < original_count
assert 0 < new_count < original_count

0 comments on commit 6f1322a

Please sign in to comment.