diff --git a/lib/charms/opensearch/v0/constants_charm.py b/lib/charms/opensearch/v0/constants_charm.py index fc5d9c2cd..430e333d5 100644 --- a/lib/charms/opensearch/v0/constants_charm.py +++ b/lib/charms/opensearch/v0/constants_charm.py @@ -27,10 +27,9 @@ ServiceStopped = "The OpenSearch service stopped." ServiceStopFailed = "An error occurred while attempting to stop the OpenSearch service." ServiceIsStopping = "The OpenSearch service is stopping." +TLSRelationMissing = "The TLS operator is not related to OpenSearch. Cannot start this unit." TLSNotFullyConfigured = "Waiting for TLS to be fully configured..." -TLSRelationBrokenError = ( - "Relation broken with the TLS Operator while TLS not fully configured. Stopping OpenSearch." -) +TLSRelationBrokenError = "Relation broken with the TLS Operator while TLS not fully configured." NoNodeUpInCluster = "No node is up in this cluster." TooManyNodesRemoved = ( "Too many nodes being removed at the same time, please scale your application up." diff --git a/lib/charms/opensearch/v0/helper_commands.py b/lib/charms/opensearch/v0/helper_commands.py new file mode 100644 index 000000000..229b55fc3 --- /dev/null +++ b/lib/charms/opensearch/v0/helper_commands.py @@ -0,0 +1,59 @@ +# Copyright 2023 Canonical Ltd. +# See LICENSE file for licensing details. + +"""Utility functions for running commands.""" + +import logging +import os +import subprocess +from types import SimpleNamespace + +from charms.opensearch.v0.opensearch_exceptions import OpenSearchCmdError + +# The unique Charmhub library identifier, never change it +LIBID = "f7199a359074406db94294bef78e3f2a" + +# Increment this major API version when introducing breaking changes +LIBAPI = 0 + +# Increment this PATCH version before using `charmcraft publish-lib` or reset +# to 0 if you are raising the major API version +LIBPATCH = 1 + + +logger = logging.getLogger(__name__) + + +def run_cmd(command: str, args: str = None) -> SimpleNamespace: + """Run command. + + Arg: + command: can contain arguments + args: command line arguments + """ + if args is not None: + command = f"{command} {args}" + + command = " ".join(command.split()) + + logger.debug(f"Executing command: {command}") + + try: + output = subprocess.run( + command, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + shell=True, + text=True, + encoding="utf-8", + timeout=25, + env=os.environ, + ) + + if output.returncode != 0: + logger.error(f"err: {output.stderr} / out: {output.stdout}") + raise OpenSearchCmdError(cmd=command, out=output.stdout, err=output.stderr) + + return SimpleNamespace(cmd=command, out=output.stdout, err=output.stderr) + except (TimeoutError, subprocess.TimeoutExpired): + raise OpenSearchCmdError(cmd=command) diff --git a/lib/charms/opensearch/v0/opensearch_base_charm.py b/lib/charms/opensearch/v0/opensearch_base_charm.py index 146acc253..34f841ec6 100644 --- a/lib/charms/opensearch/v0/opensearch_base_charm.py +++ b/lib/charms/opensearch/v0/opensearch_base_charm.py @@ -4,9 +4,8 @@ """Base class for the OpenSearch Operators.""" import logging import random -from abc import abstractmethod from datetime import datetime -from typing import Dict, List, Optional, Type +from typing import List, Optional, Type from charms.opensearch.v0.constants_charm import ( AdminUserInitProgress, @@ -23,6 +22,7 @@ TLSNewCertsRequested, TLSNotFullyConfigured, TLSRelationBrokenError, + TLSRelationMissing, WaitingToStart, ) from charms.opensearch.v0.constants_tls import TLS_RELATION, CertType @@ -75,9 +75,7 @@ CharmBase, ConfigChangedEvent, LeaderElectedEvent, - RelationBrokenEvent, RelationChangedEvent, - RelationCreatedEvent, RelationDepartedEvent, RelationJoinedEvent, StartEvent, @@ -119,7 +117,9 @@ def __init__(self, *args, distro: Type[OpenSearchDistribution] = None): self.opensearch_exclusions = OpenSearchExclusions(self) self.peers_data = RelationDataStore(self, PeerRelationName) self.secrets = SecretsDataStore(self, PeerRelationName) - self.tls = OpenSearchTLS(self, TLS_RELATION) + self.tls = OpenSearchTLS( + self, TLS_RELATION, self.opensearch.paths.jdk, self.opensearch.paths.certs + ) self.status = Status(self) self.health = OpenSearchHealth(self) self.ops_lock = OpenSearchOpsLock(self) @@ -135,9 +135,6 @@ def __init__(self, *args, distro: Type[OpenSearchDistribution] = None): self.framework.observe(self.on.update_status, self._on_update_status) self.framework.observe(self.on.config_changed, self._on_config_changed) - self.framework.observe( - self.on[PeerRelationName].relation_created, self._on_peer_relation_created - ) self.framework.observe( self.on[PeerRelationName].relation_joined, self._on_peer_relation_joined ) @@ -162,7 +159,7 @@ def _on_leader_elected(self, event: LeaderElectedEvent): event.defer() return - if self.health.apply() == HealthColors.YELLOW_TEMP: + if self.health.apply() in [HealthColors.UNKNOWN, HealthColors.YELLOW_TEMP]: event.defer() self._compute_and_broadcast_updated_topology(self._get_nodes(True)) @@ -194,8 +191,13 @@ def _on_start(self, event: StartEvent): return - if not self._is_tls_fully_configured(): - self.unit.status = BlockedStatus(TLSNotFullyConfigured) + if not self.model.get_relation(TLS_RELATION): + self.unit.status = BlockedStatus(TLSRelationMissing) + event.defer() + return + + if not self._are_security_resources_configured(): + self.unit.status = WaitingStatus(TLSNotFullyConfigured) event.defer() return @@ -206,25 +208,6 @@ def _on_start(self, event: StartEvent): self.unit.status = WaitingStatus(RequestUnitServiceOps.format("start")) self.on[self.service_manager.name].acquire_lock.emit(callback_override="_start_opensearch") - def _on_peer_relation_created(self, event: RelationCreatedEvent): - """Event received by the new node joining the cluster.""" - current_secrets = self.secrets.get_object(Scope.APP, CertType.APP_ADMIN.val) - - # In the case of the first units before TLS is initialized - if not current_secrets: - if not self.unit.is_leader(): - event.defer() - return - - # in the case the cluster was bootstrapped with multiple units at the same time - # and the certificates have not been generated yet - if not current_secrets.get("cert") or not current_secrets.get("chain"): - event.defer() - return - - # Store the "Admin" certificate, key and CA on the disk of the new unit - self._store_tls_resources(CertType.APP_ADMIN, current_secrets, override_admin=False) - def _on_peer_relation_joined(self, event: RelationJoinedEvent): """Event received by all units when a new node joins the cluster.""" if not self.unit.is_leader(): @@ -258,7 +241,7 @@ def _on_peer_relation_changed(self, event: RelationChangedEvent): if ( self.unit.is_leader() and self.opensearch.is_node_up() - and self.health.apply() == HealthColors.YELLOW_TEMP + and self.health.apply() in [HealthColors.UNKNOWN, HealthColors.YELLOW_TEMP] ): # we defer because we want the temporary status to be updated event.defer() @@ -266,6 +249,10 @@ def _on_peer_relation_changed(self, event: RelationChangedEvent): for relation in self.model.relations.get(ClientRelationName, []): self.opensearch_provider.update_endpoints(relation) + # remove old ca if all units have completely finished renewing it and are running + if self.opensearch.is_node_up() and self._is_tls_fully_configured_in_all_units(): + self.tls.remove_old_ca_if_any() + app_data = event.relation.data.get(event.app) unit_data = event.relation.data.get(event.unit) if not unit_data and not app_data: @@ -376,7 +363,7 @@ def _on_update_status(self, event: UpdateStatusEvent): if self.unit.is_leader(): self.opensearch_exclusions.cleanup() - if self.health.apply() == HealthColors.YELLOW_TEMP: + if self.health.apply() in [HealthColors.UNKNOWN, HealthColors.YELLOW_TEMP]: event.defer() return @@ -385,7 +372,7 @@ def _on_update_status(self, event: UpdateStatusEvent): self.user_manager.remove_users_and_roles() - # If relation broken - leave + # If relation not broken - leave if self.model.get_relation("certificates") is not None: return @@ -396,7 +383,6 @@ def _on_config_changed(self, _: ConfigChangedEvent): """On config changed event. Useful for IP changes or for user provided config changes.""" if self.opensearch_config.update_host_if_needed(): self.unit.status = MaintenanceStatus(TLSNewCertsRequested) - self._delete_stored_tls_resources() self.tls.request_new_unit_certificates() # since when an IP change happens, "_on_peer_relation_joined" won't be called, @@ -432,7 +418,7 @@ def _on_get_password_action(self, event: ActionEvent): event.fail("Only the 'admin' username is allowed for this action.") return - if not self._is_tls_fully_configured(): + if not self._are_security_resources_configured(): event.fail("admin user or TLS certificates not configured yet.") return @@ -440,7 +426,7 @@ def _on_get_password_action(self, event: ActionEvent): cert = self.secrets.get_object( Scope.APP, CertType.APP_ADMIN.val ) # replace later with new user certs - ca_chain = "\n".join(cert["chain"][::-1]) + ca_chain = "\n".join(cert["cert-chain"]) event.set_results( { @@ -450,8 +436,18 @@ def _on_get_password_action(self, event: ActionEvent): } ) + def on_tls_ca_renewal(self, _: CertificateAvailableEvent): + """Called when adding new CA to the trust store.""" + self.on[self.service_manager.name].acquire_lock.emit( + callback_override="_restart_opensearch" + ) + def on_tls_conf_set( - self, _: CertificateAvailableEvent, scope: Scope, cert_type: CertType, renewal: bool + self, + event: CertificateAvailableEvent, + scope: Scope, + cert_type: CertType, + cert_renewal: bool, ): """Called after certificate ready and stored on the corresponding scope databag. @@ -459,56 +455,71 @@ def on_tls_conf_set( - Update the corresponding yaml conf files - Run the security admin script """ - # Get the list of stored secrets for this cert - current_secrets = self.secrets.get_object(scope, cert_type.val) - - # Store cert/key on disk - must happen after opensearch stop for transport certs renewal - self._store_tls_resources(cert_type, current_secrets) - if scope == Scope.UNIT: # node http or transport cert - self.opensearch_config.set_node_tls_conf(cert_type, current_secrets) + self.opensearch_config.set_node_tls_conf( + cert_type, + self.secrets.get(Scope.APP, "keystore-password-ca"), + self.secrets.get(scope, f"keystore-password-{cert_type}"), + ) else: # write the admin cert conf on all units, in case there is a leader loss + cert renewal - self.opensearch_config.set_admin_tls_conf(current_secrets) + current_admin_secrets = self.secrets.get_object(scope, cert_type.val) + self.opensearch_config.set_admin_tls_conf(current_admin_secrets) + + if not self._are_security_resources_configured(): + return - # In case of renewal of the unit transport layer cert - restart opensearch - if renewal and self._is_tls_fully_configured(): + # update client relations + for relation in self.opensearch_provider.relations: + self.opensearch_provider.update_certs(relation.id, event.chain) + + if cert_renewal: + self.peers_data.delete(Scope.UNIT, "tls_rel_broken") self.on[self.service_manager.name].acquire_lock.emit( callback_override="_restart_opensearch" ) - def on_tls_relation_broken(self, _: RelationBrokenEvent): + self.status.clear(TLSNotFullyConfigured) + + def on_tls_relation_joined(self) -> None: + """We clean up any residue from the previous TLS relation if any.""" + self.unit.status = WaitingStatus(TLSNotFullyConfigured) + + def on_tls_relation_broken(self) -> None: """As long as all certificates are produced, we don't do anything.""" - if self._is_tls_fully_configured(): + if not self._are_security_resources_configured(): + self.unit.status = BlockedStatus(TLSRelationBrokenError) return - # Otherwise, we block. - self.unit.status = BlockedStatus(TLSRelationBrokenError) - - def _is_tls_fully_configured(self) -> bool: + def _are_security_resources_configured(self) -> bool: """Check if TLS fully configured meaning the admin user configured & 3 certs present.""" - # In case the initialisation of the admin user is not finished yet - if not self.peers_data.get(Scope.APP, "admin_user_initialized"): - return False - - admin_secrets = self.secrets.get_object(Scope.APP, CertType.APP_ADMIN.val) - if not admin_secrets or not admin_secrets.get("cert") or not admin_secrets.get("chain"): - return False + return ( + self.peers_data.get(Scope.APP, "admin_user_initialized", False) # admin user init done + and self.tls.all_certificates_available() # all certificates well generated from same CA + and self.tls.all_tls_resources_stored() # all certificates stored on disk + ) - unit_transport_secrets = self.secrets.get_object(Scope.UNIT, CertType.UNIT_TRANSPORT.val) - if not unit_transport_secrets or not unit_transport_secrets.get("cert"): - return False + def _is_tls_fully_configured_in_all_units(self) -> bool: + """Checks whether TLS is well configured in all units for a rolling restart.""" + # check if all certificates have been well generated and opensearch started in all nodes. + rel = self.model.get_relation(PeerRelationName) + for unit in rel.units.union({self.unit}): + rel_data = rel.data[unit] + if "tls_ca_renewing" in rel_data or "tls_ca_renewed" in rel_data: + return False - unit_http_secrets = self.secrets.get_object(Scope.UNIT, CertType.UNIT_HTTP.val) - if not unit_http_secrets or not unit_http_secrets.get("cert"): - return False + if ( + rel_data.get(f"tls_{CertType.UNIT_TRANSPORT}_configured") != "True" + or rel_data.get(f"tls_{CertType.UNIT_HTTP}_configured") != "True" + ): + return False - return self._are_all_tls_resources_stored() + return self.peers_data.get(Scope.APP, f"tls_{CertType.APP_ADMIN}_configured", False) def _start_opensearch(self, event: EventBase) -> None: # noqa: C901 """Start OpenSearch, with a generated or passed conf, if all resources configured.""" - if self.opensearch.is_started(): + if self._is_opensearch_running(): try: self._post_start_init() self.status.clear(WaitingToStart) @@ -536,23 +547,29 @@ def _start_opensearch(self, event: EventBase) -> None: # noqa: C901 self.peers_data.put(Scope.UNIT, "starting", True) - try: - # Retrieve the nodes of the cluster, needed to configure this node - nodes = self._get_nodes(False) + # The hot reloading of certificates only works when they're issued from the same CA + # if that's not the case, we skip the querying of nodes and simply reboot + if not self.peers_data.get(Scope.UNIT, "tls_ca_renewing", False): + try: + # Retrieve the nodes of the cluster, needed to configure this node + nodes = self._get_nodes(False) - # Set the configuration of the node - self._set_node_conf(nodes) - except OpenSearchHttpError: - self.peers_data.delete(Scope.UNIT, "starting") - event.defer() - self._post_start_init() - return + # Set the configuration of the node + self._set_node_conf(nodes) + except OpenSearchHttpError: + self.peers_data.delete(Scope.UNIT, "starting") + event.defer() + self._post_start_init() + return try: self.opensearch.start( wait_until_http_200=( - not self.unit.is_leader() - or self.peers_data.get(Scope.APP, "security_index_initialised", False) + not self.peers_data.get(Scope.UNIT, "tls_ca_renewing", False) + and ( + not self.unit.is_leader() + or self.peers_data.get(Scope.APP, "security_index_initialised", False) + ) ) ) self._post_start_init() @@ -571,14 +588,15 @@ def _post_start_init(self): if self.unit.is_leader() and not self.peers_data.get( Scope.APP, "security_index_initialised" ): - admin_secrets = self.secrets.get_object(Scope.APP, CertType.APP_ADMIN.val) - self._initialize_security_index(admin_secrets) - self.peers_data.put(Scope.APP, "security_index_initialised", True) + self._initialize_security_index() # it sometimes takes a few seconds before the node is fully "up" otherwise a 503 error # may be thrown when calling a node - we want to ensure this node is perfectly ready # before marking it as ready - if not self.opensearch.is_node_up(): + if ( + not self.peers_data.get(Scope.UNIT, "tls_ca_renewing", False) + and not self.opensearch.is_node_up() + ): raise OpenSearchNotFullyReadyError("Node started but not full ready yet.") # cleanup bootstrap conf in the node @@ -594,6 +612,9 @@ def _post_start_init(self): # apply cluster health self.health.apply() + # reset the TLS related peer rel data flags + self.tls.reset_internal_state() + def _stop_opensearch(self) -> None: """Stop OpenSearch if possible.""" self.unit.status = WaitingStatus(ServiceIsStopping) @@ -610,7 +631,7 @@ def _stop_opensearch(self) -> None: def _restart_opensearch(self, event: EventBase) -> None: """Restart OpenSearch if possible.""" - if not self.peers_data.get(Scope.UNIT, "starting", False): + if self.opensearch.is_started() and not self.peers_data.get(Scope.UNIT, "starting", False): try: self._stop_opensearch() except OpenSearchStopError as e: @@ -621,6 +642,21 @@ def _restart_opensearch(self, event: EventBase) -> None: self._start_opensearch(event) + def _is_opensearch_running(self) -> bool: + """Returns whether opensearch is started on a node depending on state of env.""" + # first leader unit to spin up, daemon may be started but node not up + # because security index not initialized + if self.unit.is_leader() and not self.peers_data.get( + Scope.APP, "security_index_initialised", False + ): + return self.opensearch.is_started() + + if self.peers_data.get(Scope.UNIT, "tls_ca_renewing", False): + return self.opensearch.is_started() + + # any other case + return self.opensearch.is_node_up() + def _can_service_start(self) -> bool: """Return if the opensearch service can start.""" # if there are any missing system requirements leave @@ -629,10 +665,13 @@ def _can_service_start(self) -> bool: self.unit.status = BlockedStatus(" - ".join(missing_sys_reqs)) return False - if self.unit.is_leader(): + security_index_initialised = self.peers_data.get( + Scope.APP, "security_index_initialised", False + ) + if self.unit.is_leader() and not security_index_initialised: return True - if not self.peers_data.get(Scope.APP, "security_index_initialised", False): + if not security_index_initialised: return False if not self.alt_hosts: @@ -641,14 +680,12 @@ def _can_service_start(self) -> bool: # When a new unit joins, replica shards are automatically added to it. In order to prevent # overloading the cluster, units must be started one at a time. So we defer starting # opensearch until all shards in other units are in a "started" or "unassigned" state. - try: - if self.health.apply(use_localhost=False, app=False) == HealthColors.YELLOW_TEMP: - return False - except OpenSearchHttpError: - # this means that the leader unit is not reachable (not started yet), - # meaning it's a new cluster, so we can safely start the OpenSearch service - pass + if self.health.apply(use_localhost=False, app=False) == HealthColors.YELLOW_TEMP: + return False + # if the health is UNKNOWN, this usually means that the leader unit is not + # reachable (not started yet), meaning it's a new cluster, or that the admin certificate + # is unable to auth a request, so we can safely start the OpenSearch service return True def _purge_users(self): @@ -707,7 +744,7 @@ def _put_admin_user(self, pwd: Optional[str] = None): self.secrets.put(Scope.APP, "admin_password_hash", hashed_pwd) self.peers_data.put(Scope.APP, "admin_user_initialized", True) - def _initialize_security_index(self, admin_secrets: Dict[str, any]) -> None: + def _initialize_security_index(self) -> None: """Run the security_admin script, it creates and initializes the opendistro_security index. IMPORTANT: must only run once per cluster, otherwise the index gets overrode @@ -716,19 +753,30 @@ def _initialize_security_index(self, admin_secrets: Dict[str, any]) -> None: f"-cd {self.opensearch.paths.conf}/opensearch-security/", f"-cn {self.app.name}-{self.model.name}", f"-h {self.unit_ip}", - f"-cacert {self.opensearch.paths.certs}/root-ca.cert", - f"-cert {self.opensearch.paths.certs}/{CertType.APP_ADMIN}.cert", - f"-key {self.opensearch.paths.certs}/{CertType.APP_ADMIN}.key", + f"-ts {self.opensearch.paths.certs}/ca.p12", + f"-tspass {self.secrets.get(Scope.APP, 'keystore-password-ca')}", + "-tsalias ca", + "-tst PKCS12", + f"-ks {self.opensearch.paths.certs}/{CertType.APP_ADMIN}.p12", + f"-kspass {self.secrets.get(Scope.APP, f'keystore-password-{CertType.APP_ADMIN}')}", + f"-ksalias {CertType.APP_ADMIN}", + "-kst PKCS12", ] - admin_key_pwd = admin_secrets.get("key-password", None) + admin_key_pwd = self.secrets.get_object(Scope.APP, CertType.APP_ADMIN.val).get( + "key-password", None + ) if admin_key_pwd is not None: args.append(f"-keypass {admin_key_pwd}") self.unit.status = MaintenanceStatus(SecurityIndexInitProgress) + self.opensearch.run_script( "plugins/opensearch-security/tools/securityadmin.sh", " ".join(args) ) + + self.peers_data.put(Scope.APP, "security_index_initialised", True) + self.status.clear(SecurityIndexInitProgress) def _get_nodes(self, use_localhost: bool) -> List[Node]: @@ -739,6 +787,8 @@ def _get_nodes(self, use_localhost: bool) -> List[Node]: ): return [] + # todo handle case where get_nodes(False) and no node available?? + return ClusterTopology.nodes(self.opensearch, use_localhost, self.alt_hosts) def _set_node_conf(self, nodes: List[Node]) -> None: @@ -765,7 +815,7 @@ def _set_node_conf(self, nodes: List[Node]) -> None: cm_names.append(self.unit_name) cm_ips.append(self.unit_ip) - cms_in_bootstrap = self.peers_data.get(Scope.APP, "bootstrap_contributors", 0) + cms_in_bootstrap = self.peers_data.get(Scope.APP, "bootstrap_contributors_count", 0) if cms_in_bootstrap < self.app.planned_units(): contribute_to_bootstrap = True @@ -869,23 +919,6 @@ def _check_certs_expiration(self, event: UpdateStatusEvent) -> None: Scope.UNIT, "certs_exp_checked_at", datetime.now().strftime(date_format) ) - @abstractmethod - def _store_tls_resources( - self, cert_type: CertType, secrets: Dict[str, any], override_admin: bool = True - ): - """Write certificates and keys on disk.""" - pass - - @abstractmethod - def _are_all_tls_resources_stored(self): - """Check if all TLS resources are stored on disk.""" - pass - - @abstractmethod - def _delete_stored_tls_resources(self): - """Delete the TLS resources of the unit that are stored on disk.""" - pass - @property def unit_ip(self) -> str: """IP address of the current unit.""" diff --git a/lib/charms/opensearch/v0/opensearch_config.py b/lib/charms/opensearch/v0/opensearch_config.py index b2eccf1ac..66b55bdd3 100644 --- a/lib/charms/opensearch/v0/opensearch_config.py +++ b/lib/charms/opensearch/v0/opensearch_config.py @@ -71,34 +71,35 @@ def set_admin_tls_conf(self, secrets: Dict[str, any]): f"{normalized_tls_subject(secrets['subject'])}", ) - def set_node_tls_conf(self, cert_type: CertType, secrets: Dict[str, any]): + def set_node_tls_conf(self, cert_type: CertType, truststore_pwd: str, keystore_pwd: str): """Configures TLS for nodes.""" target_conf_layer = "http" if cert_type == CertType.UNIT_HTTP else "transport" - self._opensearch.config.put( - self.CONFIG_YML, - f"plugins.security.ssl.{target_conf_layer}.pemcert_filepath", - f"{self._opensearch.paths.certs_relative}/{cert_type}.cert", - ) + for store_type, cert in [("keystore", target_conf_layer), ("truststore", "ca")]: + self._opensearch.config.put( + self.CONFIG_YML, + f"plugins.security.ssl.{target_conf_layer}.{store_type}_type", + "PKCS12", + ) - self._opensearch.config.put( - self.CONFIG_YML, - f"plugins.security.ssl.{target_conf_layer}.pemkey_filepath", - f"{self._opensearch.paths.certs_relative}/{cert_type}.key", - ) + self._opensearch.config.put( + self.CONFIG_YML, + f"plugins.security.ssl.{target_conf_layer}.{store_type}_filepath", + f"{self._opensearch.paths.certs_relative}/{cert if cert == 'ca' else cert_type}.p12", + ) - self._opensearch.config.put( - self.CONFIG_YML, - f"plugins.security.ssl.{target_conf_layer}.pemtrustedcas_filepath", - f"{self._opensearch.paths.certs_relative}/root-ca.cert", - ) + for store_type, certificate_type in [("keystore", cert_type.val), ("truststore", "ca")]: + self._opensearch.config.put( + self.CONFIG_YML, + f"plugins.security.ssl.{target_conf_layer}.{store_type}_alias", + certificate_type, + ) - key_pwd = secrets.get("key-password") - if key_pwd is not None: + for store_type, pwd in [("keystore", keystore_pwd), ("truststore", truststore_pwd)]: self._opensearch.config.put( self.CONFIG_YML, - f"plugins.security.ssl.{target_conf_layer}.pemkey_password", - key_pwd, + f"plugins.security.ssl.{target_conf_layer}.{store_type}_password", + pwd, ) def append_transport_node(self, ip_pattern_entries: List[str], append: bool = True): @@ -158,6 +159,9 @@ def set_node( self._opensearch.config.put( self.CONFIG_YML, "plugins.security.ssl.transport.enforce_hostname_verification", True ) + self._opensearch.config.put( + self.CONFIG_YML, "plugins.security.ssl_cert_reload_enabled", True + ) # security plugin rest API access self._opensearch.config.put( diff --git a/lib/charms/opensearch/v0/opensearch_distro.py b/lib/charms/opensearch/v0/opensearch_distro.py index ba8de8211..4b84f898c 100644 --- a/lib/charms/opensearch/v0/opensearch_distro.py +++ b/lib/charms/opensearch/v0/opensearch_distro.py @@ -18,6 +18,7 @@ import requests import urllib3.exceptions from charms.opensearch.v0.helper_cluster import Node +from charms.opensearch.v0.helper_commands import run_cmd from charms.opensearch.v0.helper_conf_setter import YamlConfigSetter from charms.opensearch.v0.helper_databag import Scope from charms.opensearch.v0.helper_networking import ( @@ -26,7 +27,6 @@ reachable_hosts, ) from charms.opensearch.v0.opensearch_exceptions import ( - OpenSearchCmdError, OpenSearchError, OpenSearchHttpError, OpenSearchStartTimeoutError, @@ -160,16 +160,16 @@ def is_node_up(self) -> bool: def run_bin(self, bin_script_name: str, args: str = None): """Run opensearch provided bin command, relative to OPENSEARCH_HOME/bin.""" script_path = f"{self.paths.home}/bin/{bin_script_name}" - self._run_cmd(f"chmod a+x {script_path}") + run_cmd(f"chmod a+x {script_path}") - self._run_cmd(script_path, args) + run_cmd(script_path, args) def run_script(self, script_name: str, args: str = None): """Run script provided by Opensearch in another directory, relative to OPENSEARCH_HOME.""" script_path = f"{self.paths.home}/{script_name}" - self._run_cmd(f"chmod a+x {script_path}") + run_cmd(f"chmod a+x {script_path}") - self._run_cmd(f"{script_path}", args) + run_cmd(f"{script_path}", args) def request( # noqa self, @@ -250,7 +250,7 @@ def call( request_kwargs = { "method": method.upper(), "url": urls[0], - "verify": f"{self.paths.certs}/chain.pem", + "verify": f"{self.paths.certs}/admin-cert-chain.pem", "headers": { "Accept": "application/json", "Content-Type": "application/json", @@ -288,7 +288,10 @@ def call( if resp_status_code: return resp.status_code - return resp.json() + try: + return resp.json() + except requests.JSONDecodeError: + raise OpenSearchHttpError(response_body=resp.text) def write_file(self, path: str, data: str, override: bool = True): """Persists data into file. Useful for files generated on the fly, such as certs etc.""" @@ -302,39 +305,6 @@ def write_file(self, path: str, data: str, override: bool = True): with open(path, mode="w") as f: f.write(data) - @staticmethod - def _run_cmd(command: str, args: str = None): - """Run command. - - Arg: - command: can contain arguments - args: command line arguments - """ - if args is not None: - command = f"{command} {args}" - - logger.debug(f"Executing command: {command}") - - try: - output = subprocess.run( - command, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE, - shell=True, - text=True, - encoding="utf-8", - timeout=25, - env=os.environ, - ) - - logger.debug(f"{command}:\n{output.stdout}") - - if output.returncode != 0: - logger.error(f"{command}:\n Stderr: {output.stderr}\n Stdout: {output.stdout}") - raise OpenSearchCmdError() - except (TimeoutError, subprocess.TimeoutExpired): - raise OpenSearchCmdError() - @abstractmethod def _build_paths(self) -> Paths: """Build the Paths object.""" diff --git a/lib/charms/opensearch/v0/opensearch_exceptions.py b/lib/charms/opensearch/v0/opensearch_exceptions.py index 691e78486..ee532117a 100644 --- a/lib/charms/opensearch/v0/opensearch_exceptions.py +++ b/lib/charms/opensearch/v0/opensearch_exceptions.py @@ -70,6 +70,11 @@ class OpenSearchOpsLockAlreadyAcquiredError(OpenSearchError): class OpenSearchCmdError(OpenSearchError): """Exception thrown when an OpenSearch bin command fails.""" + def __init__(self, cmd: str, out: Optional[str] = None, err: Optional[str] = None): + self.cmd = cmd + self.out = out + self.err = err + class OpenSearchHttpError(OpenSearchError): """Exception thrown when an OpenSearch REST call fails.""" diff --git a/lib/charms/opensearch/v0/opensearch_health.py b/lib/charms/opensearch/v0/opensearch_health.py index 648a4dbe2..224c7c94c 100644 --- a/lib/charms/opensearch/v0/opensearch_health.py +++ b/lib/charms/opensearch/v0/opensearch_health.py @@ -38,6 +38,7 @@ class HealthColors: YELLOW = "yellow" YELLOW_TEMP = "yellow-temp" RED = "red" + UNKNOWN = "unknown" class OpenSearchHealth: @@ -54,15 +55,20 @@ def apply( app: bool = True, ) -> str: """Fetch cluster health and set it on the app status.""" - host = self._charm.unit_ip if use_localhost else None - status = self._fetch_status(host, wait_for_green_first) + try: + host = self._charm.unit_ip if use_localhost else None + status = self._fetch_status(host, wait_for_green_first) + if not status: + return HealthColors.UNKNOWN - if app: - self.apply_for_app(status) - else: - self.apply_for_unit(status) + if app: + self.apply_for_app(status) + else: + self.apply_for_unit(status) - return status + return status + except OpenSearchHttpError: + return HealthColors.UNKNOWN def apply_for_app(self, status: str) -> None: """Cluster wide / app status.""" @@ -132,6 +138,9 @@ def _fetch_status(self, host: Optional[str] = None, wait_for_green_first: bool = alt_hosts=self._charm.alt_hosts, ) + if not response: + return None + status = response["status"].lower() if status != HealthColors.YELLOW: return status diff --git a/lib/charms/opensearch/v0/opensearch_nodes_exclusions.py b/lib/charms/opensearch/v0/opensearch_nodes_exclusions.py index 1c62a2a67..21bd49029 100644 --- a/lib/charms/opensearch/v0/opensearch_nodes_exclusions.py +++ b/lib/charms/opensearch/v0/opensearch_nodes_exclusions.py @@ -87,6 +87,7 @@ def _add_voting(self) -> bool: self._opensearch.request( "POST", f"/_cluster/voting_config_exclusions?node_names={self._node.name}&timeout=1m", + host=self._charm.unit_ip, alt_hosts=self._charm.alt_hosts, resp_status_code=True, retries=3, @@ -123,7 +124,9 @@ def _add_allocations( "PUT", "/_cluster/settings", {"persistent": {"cluster.routing.allocation.exclude._name": ",".join(all_allocs)}}, + host=self._charm.unit_ip, alt_hosts=self._charm.alt_hosts, + retries=3, ) return "acknowledged" in response except OpenSearchHttpError: @@ -144,7 +147,10 @@ def _fetch_allocations(self) -> Set[str]: allocation_exclusions = set() try: resp = self._opensearch.request( - "GET", "/_cluster/settings", alt_hosts=self._charm.alt_hosts + "GET", + "/_cluster/settings", + host=self._charm.unit_ip, + alt_hosts=self._charm.alt_hosts, ) exclusions = resp["persistent"]["cluster"]["routing"]["allocation"]["exclude"]["_name"] if exclusions: diff --git a/lib/charms/opensearch/v0/opensearch_relation_provider.py b/lib/charms/opensearch/v0/opensearch_relation_provider.py index 955321c40..faaa8cb5c 100644 --- a/lib/charms/opensearch/v0/opensearch_relation_provider.py +++ b/lib/charms/opensearch/v0/opensearch_relation_provider.py @@ -331,7 +331,7 @@ def update_certs(self, relation_id, ca_chain=None): if not ca_chain: try: ca_chain = self.charm.secrets.get_object(Scope.APP, CertType.APP_ADMIN.val).get( - "chain" + "cert-chain" ) except AttributeError: # cert doesn't exist - presumably we don't yet have a TLS relation. diff --git a/lib/charms/opensearch/v0/opensearch_tls.py b/lib/charms/opensearch/v0/opensearch_tls.py index 6448f11b5..78228705b 100644 --- a/lib/charms/opensearch/v0/opensearch_tls.py +++ b/lib/charms/opensearch/v0/opensearch_tls.py @@ -14,15 +14,22 @@ """ import base64 +import glob +import json import logging +import os import re import socket -from typing import Dict, List, Optional, Tuple +import tempfile +from os.path import exists +from typing import Any, Dict, List, Optional, Tuple from charms.opensearch.v0.constants_tls import TLS_RELATION, CertType +from charms.opensearch.v0.helper_commands import run_cmd from charms.opensearch.v0.helper_databag import Scope from charms.opensearch.v0.helper_networking import get_host_public_ip -from charms.opensearch.v0.opensearch_exceptions import OpenSearchError +from charms.opensearch.v0.helper_security import generate_password +from charms.opensearch.v0.opensearch_exceptions import OpenSearchCmdError from charms.tls_certificates_interface.v1.tls_certificates import ( CertificateAvailableEvent, CertificateExpiringEvent, @@ -30,7 +37,12 @@ generate_csr, generate_private_key, ) -from ops.charm import ActionEvent, RelationBrokenEvent, RelationJoinedEvent +from ops.charm import ( + ActionEvent, + RelationBrokenEvent, + RelationChangedEvent, + RelationJoinedEvent, +) from ops.framework import Object # The unique Charmhub library identifier, never change it @@ -49,11 +61,13 @@ class OpenSearchTLS(Object): """Class that Manages OpenSearch relation with TLS Certificates Operator.""" - def __init__(self, charm, peer_relation: str): + def __init__(self, charm, peer_relation: str, jdk_path: str, certs_path: str): super().__init__(charm, "client-relations") self.charm = charm self.peer_relation = peer_relation + self.jdk_path = jdk_path + self.certs_path = certs_path self.certs = TLSCertificatesRequiresV1(charm, TLS_RELATION) self.framework.observe( @@ -63,6 +77,11 @@ def __init__(self, charm, peer_relation: str): self.framework.observe( self.charm.on[TLS_RELATION].relation_joined, self._on_tls_relation_joined ) + + self.framework.observe( + self.charm.on[TLS_RELATION].relation_changed, self._on_tls_relation_changed + ) + self.framework.observe( self.charm.on[TLS_RELATION].relation_broken, self._on_tls_relation_broken ) @@ -88,6 +107,8 @@ def _on_set_tls_private_key(self, event: ActionEvent) -> None: def request_new_unit_certificates(self) -> None: """Requests a new certificate with the given scope and type from the tls operator.""" + self._delete_tls_resources() + for cert_type in [CertType.UNIT_HTTP, CertType.UNIT_TRANSPORT]: csr = self.charm.secrets.get_object(Scope.UNIT, cert_type.val)["csr"].encode("utf-8") self.certs.request_certificate_revocation(csr) @@ -98,24 +119,89 @@ def request_new_unit_certificates(self) -> None: secrets = self.charm.secrets.get_object(Scope.UNIT, cert_type.val) self._request_certificate_renewal(Scope.UNIT, cert_type, secrets) + def reset_internal_state(self) -> None: + """Removes tls_ca_renewing flag so new certificates can be generated.""" + if not self.charm.peers_data.get(Scope.UNIT, "tls_ca_renewing", False): + # this means simple certificate creation, with no CA renewal + return + + # this means that the unit certificates were generated, after a previous CA renewal. + if self.charm.peers_data.get(Scope.UNIT, "tls_ca_renewed", False): + self.charm.peers_data.delete(Scope.UNIT, "tls_ca_renewing") + self.charm.peers_data.delete(Scope.UNIT, "tls_ca_renewed") + else: + # this means only the CA renewal completed, still need to create certificates + self.charm.peers_data.put(Scope.UNIT, "tls_ca_renewed", True) + def _on_tls_relation_joined(self, _: RelationJoinedEvent) -> None: """Request certificate when TLS relation joined.""" - admin_cert = self.charm.secrets.get_object(Scope.APP, CertType.APP_ADMIN) - if self.charm.unit.is_leader() and admin_cert is None: + self.charm.on_tls_relation_joined() + + if self.charm.unit.is_leader(): + # create passwords for both ca trust_store/admin key_store + self._create_keystore_pwd_if_not_exists(Scope.APP, "ca") + self._create_keystore_pwd_if_not_exists(Scope.APP, CertType.APP_ADMIN.val) + + admin_cert = self.charm.secrets.get_object(Scope.APP, CertType.APP_ADMIN) + if admin_cert is None or self.charm.peers_data.get( + Scope.UNIT, "tls_rel_broken", False + ): + self._request_certificate(Scope.APP, CertType.APP_ADMIN) + + # create passwords for both unit-http/transport key_stores + self._create_keystore_pwd_if_not_exists(Scope.UNIT, CertType.UNIT_TRANSPORT.val) + self._create_keystore_pwd_if_not_exists(Scope.UNIT, CertType.UNIT_HTTP.val) + + self._request_certificate(Scope.UNIT, CertType.UNIT_TRANSPORT) + self._request_certificate(Scope.UNIT, CertType.UNIT_HTTP) + + def _on_tls_relation_changed(self, event: RelationChangedEvent) -> None: + tls_rel_data = event.relation.data[event.app] + if not tls_rel_data: + return + + certs = tls_rel_data.get("certificates") + if not certs: + return + + certs = json.loads(certs) + if not certs: + return + + for cert_entry in certs: + if not cert_entry.get("revoked", False): + return + + self.charm.on_tls_relation_joined() + + if self.charm.unit.is_leader(): self._request_certificate(Scope.APP, CertType.APP_ADMIN) self._request_certificate(Scope.UNIT, CertType.UNIT_TRANSPORT) self._request_certificate(Scope.UNIT, CertType.UNIT_HTTP) - def _on_tls_relation_broken(self, event: RelationBrokenEvent) -> None: + def _on_tls_relation_broken(self, _: RelationBrokenEvent) -> None: """Notify the charm that the relation is broken.""" - self.charm.on_tls_relation_broken(event) + self.charm.peers_data.put(Scope.UNIT, "tls_rel_broken", True) + self.charm.on_tls_relation_broken() - def _on_certificate_available(self, event: CertificateAvailableEvent) -> None: + def _on_certificate_available(self, event: CertificateAvailableEvent) -> None: # noqa """Enable TLS when TLS certificate available. CertificateAvailableEvents fire whenever a new certificate is created by the TLS charm. """ + # this means that the leader unit hasn't initialized the truststore password + if not self.charm.secrets.get(Scope.APP, "keystore-password-ca"): + event.defer() + return + + # CA renewal still in progress (rolling restart) + if self.charm.peers_data.get( + Scope.UNIT, "tls_ca_renewing", False + ) and not self.charm.peers_data.get(Scope.UNIT, "tls_ca_renewed", False): + event.defer() + return + try: scope, cert_type, secrets = self._find_secret(event.certificate_signing_request, "csr") logger.debug(f"{scope.val}.{cert_type.val} TLS certificate available.") @@ -123,32 +209,50 @@ def _on_certificate_available(self, event: CertificateAvailableEvent) -> None: logger.debug("Unknown certificate available.") return + # check if this is a new "ca", if it is - store it in the trust store + current_stored_ca = self._read_stored_ca() + if current_stored_ca != event.ca: + self.store_new_ca(event.ca) + + # this means the CA is renewed - we need to restart all units with this new CA + # prior to updating the certificates with the new CA. + if current_stored_ca: + self.charm.peers_data.put(Scope.UNIT, "tls_ca_renewing", True) + self.charm.on_tls_ca_renewal(event) + event.defer() + return + # seems like the admin certificate is also broadcast to non leader units on refresh request if not self.charm.unit.is_leader() and scope == Scope.APP: return - old_cert = secrets.get("cert", None) - renewal = old_cert is not None and old_cert != event.certificate + # check if there was a CA renewal on the cluster, if not complete defer + if not self._ca_renewal_complete_in_cluster(): + event.defer() + return - self.charm.secrets.put_object( - scope, - cert_type.val, - { - "chain": event.chain, - "cert": event.certificate, - "ca": event.ca, - }, - merge=True, + # check if certificate renewal + old_cert = secrets.get("cert", None) + cert_renewal = ( + self._read_stored_ca(alias="old-ca") is not None + or (old_cert is not None and old_cert != event.certificate) + or self.charm.peers_data.get(Scope.UNIT, "tls_rel_broken", False) ) - for relation in self.charm.opensearch_provider.relations: - self.charm.opensearch_provider.update_certs(relation.id, event.chain) + # persist the certificate on the secrets + disk + self._persist_certificate(event, scope, cert_type) - try: - self.charm.on_tls_conf_set(event, scope, cert_type, renewal) - except OpenSearchError as e: - logger.error(e) - event.defer() + # store the admin certificates in non-leader units + if not self.charm.unit.is_leader(): + if self.all_certificates_available(): + admin_secrets = self.charm.secrets.get_object(Scope.APP, CertType.APP_ADMIN.val) + self._store_new_tls_resources(Scope.APP, CertType.APP_ADMIN, admin_secrets) + elif self._unit_certificates_available(): # admin certificate not ready yet + # we defer the last certificate available event + event.defer() + return + + self.charm.on_tls_conf_set(event, scope, cert_type, cert_renewal) def _on_certificate_expiring(self, event: CertificateExpiringEvent) -> None: """Request the new certificate when old certificate is expiring.""" @@ -161,6 +265,32 @@ def _on_certificate_expiring(self, event: CertificateExpiringEvent) -> None: self._request_certificate_renewal(scope, cert_type, secrets) + def _persist_certificate( + self, event: CertificateAvailableEvent, scope: Scope, cert_type: CertType + ) -> None: + """Persist the certificate on the secrets store and on disk.""" + self.charm.secrets.put_object( + scope, + cert_type.val, + { + "cert": event.certificate, + "cert-chain": event.chain[::-1], + "ca": event.ca, + }, + merge=True, + ) + + # store the certificates and keys in a key store + self._store_new_tls_resources( + scope, cert_type, self.charm.secrets.get_object(scope, cert_type.val) + ) + + # set flag to indicate cert type well configured + if self.charm.unit.is_leader() and cert_type == CertType.APP_ADMIN: + self.charm.peers_data.put(Scope.APP, f"tls_{cert_type}_configured", True) + else: + self.charm.peers_data.put(Scope.UNIT, f"tls_{cert_type}_configured", True) + def _request_certificate( self, scope: Scope, @@ -169,6 +299,14 @@ def _request_certificate( password: Optional[str] = None, ): """Request certificate and store the key/key-password/csr in the scope's data bag.""" + # self._request_certificate_revocation(cert_type) + self.charm.peers_data.put( + Scope.APP if cert_type == CertType.APP_ADMIN else Scope.UNIT, + f"tls_{cert_type}_configured", + False, + ) + # self._delete_tls_resources() + if key is None: key = generate_private_key() else: @@ -228,6 +366,12 @@ def _request_certificate_renewal( new_certificate_signing_request=new_csr, ) + def _request_certificate_revocation(self, cert_type: CertType): + """Requests the revocation of a certificate.""" + existing_secrets = self.charm.secrets.get_object(Scope.UNIT, cert_type.val) + if existing_secrets: + self.certs.request_certificate_revocation(existing_secrets["csr"].encode("utf-8")) + def _get_sans(self, cert_type: CertType) -> Dict[str, List[str]]: """Create a list of OID/IP/DNS names for an OpenSearch unit. @@ -313,3 +457,232 @@ def is_secret_found(secrets: Optional[Dict[str, str]]) -> bool: return Scope.UNIT, CertType.UNIT_HTTP, u_http_secrets return None + + def _unit_certificates_available(self) -> bool: + """Method that checks if all units related certificates in secrets store.""" + secrets = self.charm.secrets + + cert_cas = set() + for cert_type in [CertType.UNIT_TRANSPORT, CertType.UNIT_HTTP]: + unit_secrets = secrets.get_object(Scope.UNIT, cert_type.val) + if not unit_secrets or not unit_secrets.get("cert"): + return False + + cert_cas.add(unit_secrets.get("ca")) + + return len(cert_cas) == 1 + + def all_certificates_available(self) -> bool: + """Method that checks if all certs available and issued from same CA.""" + secrets = self.charm.secrets + + admin_secrets = secrets.get_object(Scope.APP, CertType.APP_ADMIN.val) + if not admin_secrets or not admin_secrets.get("cert"): + return False + + admin_ca = admin_secrets.get("ca") + + for cert_type in [CertType.UNIT_TRANSPORT, CertType.UNIT_HTTP]: + unit_secrets = secrets.get_object(Scope.UNIT, cert_type.val) + if ( + not unit_secrets + or not unit_secrets.get("cert") + or unit_secrets.get("ca") != admin_ca + ): + return False + + peers_data = self.charm.peers_data + return ( + peers_data.get(Scope.APP, f"tls_{CertType.APP_ADMIN}_configured", False) + and peers_data.get(Scope.UNIT, f"tls_{CertType.UNIT_TRANSPORT}_configured", False) + and peers_data.get(Scope.UNIT, f"tls_{CertType.UNIT_HTTP}_configured", False) + ) + + def all_tls_resources_stored(self, only_unit_resources: bool = False) -> bool: + """Check if all TLS resources are stored on disk.""" + cert_types = ["ca", CertType.UNIT_TRANSPORT, CertType.UNIT_HTTP] + if not only_unit_resources: + cert_types.append(CertType.APP_ADMIN) + + for cert_type in cert_types: + if not exists(f"{self.certs_path}/{cert_type}.p12"): + return False + + return True + + def _create_keystore_pwd_if_not_exists(self, scope: Scope, alias: str): + """Create passwords for the key stores if not already created.""" + keystore_pwd = self.charm.secrets.get(scope, f"keystore-password-{alias}") + if not keystore_pwd: + self.charm.secrets.put(scope, f"keystore-password-{alias}", generate_password()) + + def store_new_ca(self, ca_cert: str): + """Add new CA cert to trust store.""" + store_pwd = self.charm.secrets.get(Scope.APP, "keystore-password-ca") + + keytool = f"sudo {self.jdk_path}/bin/keytool" + alias = "ca" + store_path = f"{self.certs_path}/{alias}.p12" + try: + run_cmd( + f"""{keytool} -changealias \ + -alias {alias} \ + -destalias old-{alias} \ + -keystore {store_path} \ + -storepass {store_pwd} \ + -storetype PKCS12 + """ + ) + except OpenSearchCmdError as e: + # This message means there was no "ca" alias or store before, if it happens ignore + if not ( + f"Alias <{alias}> does not exist" in e.out + or "Keystore file does not exist" in e.out + ): + raise + + with tempfile.NamedTemporaryFile(mode="w+t") as ca_tmp_file: + ca_tmp_file.write(ca_cert) + ca_tmp_file.flush() + + run_cmd( + f"""{keytool} -importcert \ + -trustcacerts \ + -noprompt \ + -alias {alias} \ + -keystore {store_path} \ + -file {ca_tmp_file.name} \ + -storepass {store_pwd} \ + -storetype PKCS12 + """ + ) + + run_cmd(f"sudo chown -R snap_daemon:root {self.certs_path}") + run_cmd(f"sudo chmod +r {store_path}") + run_cmd( + f"{keytool} -list -v -keystore {store_path} -storepass {store_pwd} -storetype PKCS12" + ) + + def remove_old_ca_if_any(self) -> None: + """Remove old CA cert from trust store.""" + keytool = f"sudo {self.jdk_path}/bin/keytool" + store_path = f"{self.certs_path}/ca.p12" + old_alias = "old-ca" + + store_pwd = self.charm.secrets.get(Scope.APP, "keystore-password-ca") + try: + run_cmd( + f"{keytool} -list -keystore {store_path} -storepass {store_pwd} -alias {old_alias} -storetype PKCS12" + ) + except OpenSearchCmdError as e: + # This message means there was no "ca" alias or store before, if it happens ignore + if f"Alias <{old_alias}> does not exist" in e.out: + return + + self._remove_key_store_content_by_alias(store_pwd, alias=old_alias, key_store_name="ca") + + def _store_new_tls_resources(self, scope: Scope, cert_type: CertType, secrets: Dict[str, Any]): + """Add key and cert to keystore.""" + store_pwd = self.charm.secrets.get(scope, f"keystore-password-{cert_type.val}") + store_path = f"{self.certs_path}/{cert_type.val}.p12" + + # we store the pem format to make it easier for the python requests lib + if cert_type == CertType.APP_ADMIN: + with open(f"{self.certs_path}/admin-cert-chain.pem", "w+") as f: + f.write("\n".join(secrets.get("cert-chain"))) + + # self._remove_key_store_content_by_alias(store_pwd, alias=cert_type.val) + try: + os.remove(store_path) + except OSError: + pass + + tmp_key = tempfile.NamedTemporaryFile(mode="w+t", suffix=".pem") + tmp_key.write(secrets.get("key")) + tmp_key.flush() + tmp_key.seek(0) + + tmp_cert = tempfile.NamedTemporaryFile(mode="w+t", suffix=".cert") + tmp_cert.write(secrets.get("cert")) + tmp_cert.flush() + tmp_cert.seek(0) + + try: + cmd = f"""openssl pkcs12 -export \ + -in {tmp_cert.name} \ + -inkey {tmp_key.name} \ + -out {store_path} \ + -name {cert_type.val} \ + -passout pass:{store_pwd} + """ + if secrets.get("key-password"): + cmd = f"{cmd} -passin pass:{secrets.get('key-password')}" + + run_cmd(cmd) + run_cmd(f"sudo chown -R snap_daemon:root {self.certs_path}") + run_cmd(f"sudo chmod +r {store_path}") + finally: + tmp_key.close() + tmp_cert.close() + + def _remove_key_store_content_by_alias( + self, store_pass: str, alias: str, key_store_name: Optional[str] = None + ): + """Remove the resources matching an alias in a keystore.""" + if not key_store_name: + key_store_name = alias + + keytool = f"sudo {self.jdk_path}/bin/keytool" + try: + run_cmd( + f"""{keytool} -delete \ + -alias {alias} \ + -storepass {store_pass} \ + -keystore {self.certs_path}/{key_store_name}.p12 + """ + ) + except OpenSearchCmdError as e: + # This message means there was no "" or store before, if it happens ignore + if not ( + f"Alias <{alias}> does not exist" in e.out + or "Keystore file does not exist" in e.out + ): + raise + + def _read_stored_ca(self, alias: str = "ca") -> Optional[str]: + """Load stored CA cert.""" + store_pwd = self.charm.secrets.get(Scope.APP, "keystore-password-ca") + + ca_trust_store = f"{self.certs_path}/ca.p12" + if not exists(ca_trust_store): + return None + + stored_certs = run_cmd(f"openssl pkcs12 -in {ca_trust_store} -passin pass:{store_pwd}").out + + # parse output to retrieve the current CA (in case there are many) + start_cert_marker = "-----BEGIN CERTIFICATE-----" + end_cert_marker = "-----END CERTIFICATE-----" + certificates = stored_certs.split(end_cert_marker) + for cert in certificates: + if f"friendlyName: {alias}" in cert: + return f"{start_cert_marker}{cert.split(start_cert_marker)[1]}{end_cert_marker}" + + return None + + def _delete_tls_resources(self): + """Delete all TLS resources in the current unit.""" + key_stores = glob.glob(f"{self.certs_path}/*") + for key_store in key_stores: + os.remove(key_store) + + def _ca_renewal_complete_in_cluster(self) -> bool: + """Check whether the CA renewal completed in all units.""" + rel = self.charm.model.get_relation(self.peer_relation) + for unit in rel.units.union({self.charm.unit}): + rel_data = rel.data[unit] + ca_renewing = rel_data.get("tls_ca_renewing") + ca_renewed = rel_data.get("tls_ca_renewed") + if ca_renewing == "True" and ca_renewed != "True": + return False + + return True diff --git a/src/charm.py b/src/charm.py index f2c2c230f..93478ec28 100755 --- a/src/charm.py +++ b/src/charm.py @@ -5,19 +5,13 @@ """Charmed Machine Operator for OpenSearch.""" import logging -from os import remove -from os.path import exists -from typing import Dict from charms.opensearch.v0.constants_charm import InstallError, InstallProgress -from charms.opensearch.v0.constants_tls import CertType -from charms.opensearch.v0.helper_security import to_pkcs8 from charms.opensearch.v0.opensearch_base_charm import OpenSearchBaseCharm from charms.opensearch.v0.opensearch_exceptions import OpenSearchInstallError from ops.charm import InstallEvent from ops.main import main from ops.model import BlockedStatus, MaintenanceStatus -from overrides import override from opensearch import OpenSearchSnap @@ -41,50 +35,6 @@ def _on_install(self, _: InstallEvent) -> None: except OpenSearchInstallError: self.unit.status = BlockedStatus(InstallError) - @override - def _store_tls_resources( - self, cert_type: CertType, secrets: Dict[str, any], override_admin: bool = True - ): - """Write certificates and keys on disk.""" - certs_dir = self.opensearch.paths.certs - - self.opensearch.write_file( - f"{certs_dir}/{cert_type}.key", - to_pkcs8(secrets["key"], secrets.get("key-password")), - ) - self.opensearch.write_file(f"{certs_dir}/{cert_type}.cert", secrets["cert"]) - self.opensearch.write_file(f"{certs_dir}/root-ca.cert", secrets["ca"], override=False) - - if cert_type == CertType.APP_ADMIN: - self.opensearch.write_file( - f"{certs_dir}/chain.pem", - "\n".join(secrets["chain"][::-1]), - override=override_admin, - ) - - @override - def _are_all_tls_resources_stored(self): - """Check if all TLS resources are stored on disk.""" - certs_dir = self.opensearch.paths.certs - for cert_type in [CertType.APP_ADMIN, CertType.UNIT_TRANSPORT, CertType.UNIT_HTTP]: - for extension in ["key", "cert"]: - if not exists(f"{certs_dir}/{cert_type}.{extension}"): - return False - - return exists(f"{certs_dir}/chain.pem") and exists(f"{certs_dir}/root-ca.cert") - - @override - def _delete_stored_tls_resources(self): - """Delete the TLS resources of the unit that are stored on disk.""" - certs_dir = self.opensearch.paths.certs - for cert_type in [CertType.UNIT_TRANSPORT, CertType.UNIT_HTTP]: - for extension in ["key", "cert"]: - try: - remove(f"{certs_dir}/{cert_type}.{extension}") - except OSError: - # thrown if file not exists, ignore - pass - if __name__ == "__main__": main(OpenSearchOperatorCharm) diff --git a/src/opensearch.py b/src/opensearch.py index 71a7855ae..f7067e4f3 100644 --- a/src/opensearch.py +++ b/src/opensearch.py @@ -15,6 +15,7 @@ from pathlib import Path import requests +from charms.opensearch.v0.helper_commands import run_cmd from charms.opensearch.v0.opensearch_distro import OpenSearchDistribution, Paths from charms.opensearch.v0.opensearch_exceptions import ( OpenSearchCmdError, @@ -160,7 +161,7 @@ def _start_service(self): """Start opensearch.""" try: self._setup_linux_perms() - self._run_cmd( + run_cmd( "setpriv", "--clear-groups --reuid ubuntu --regid ubuntu -- sudo systemctl start opensearch.service", ) @@ -171,7 +172,7 @@ def _start_service(self): def _stop_service(self): """Stop opensearch.""" try: - self._run_cmd("systemctl stop opensearch.service") + run_cmd("systemctl stop opensearch.service") except OpenSearchCmdError: logger.debug("Failed stopping the opensearch service.") raise OpenSearchStopError() @@ -203,8 +204,8 @@ def _create_directories(self) -> None: def _setup_linux_perms(self): """Create ubuntu:ubuntu user:group.""" - self._run_cmd("chown", f"-R ubuntu:ubuntu {self.paths.home}") - self._run_cmd("chown", "-R ubuntu:ubuntu /mnt/opensearch") + run_cmd("chown", f"-R ubuntu:ubuntu {self.paths.home}") + run_cmd("chown", "-R ubuntu:ubuntu /mnt/opensearch") def _create_systemd_unit(self): """Create a systemd unit file to run OpenSearch as a service.""" @@ -232,4 +233,4 @@ def _create_systemd_unit(self): "\n".join([line.strip() for line in unit_content.split("\n")]), ) - self._run_cmd("systemctl daemon-reload") + run_cmd("systemctl daemon-reload")