diff --git a/lib/charms/mongodb/v0/mongos.py b/lib/charms/mongodb/v0/mongos.py index 50d9c8ceb..f51ee4097 100644 --- a/lib/charms/mongodb/v0/mongos.py +++ b/lib/charms/mongodb/v0/mongos.py @@ -4,11 +4,11 @@ import logging from dataclasses import dataclass -from typing import Optional, Set +from typing import Dict, Optional, Set from urllib.parse import quote_plus +from charms.mongodb.v0.mongodb import NotReadyError from pymongo import MongoClient -from pymongo.errors import PyMongoError from config import Config @@ -20,7 +20,7 @@ # Increment this PATCH version before using `charmcraft publish-lib` or reset # to 0 if you are raising the major API version -LIBPATCH = 1 +LIBPATCH = 2 # path to store mongodb ketFile logger = logging.getLogger(__name__) @@ -65,8 +65,16 @@ def uri(self): ) -class NotReadyError(PyMongoError): - """Raised when not all replica set members healthy or finished initial sync.""" +class RemovePrimaryShardError(Exception): + """Raised when there is an attempt to remove the primary shard.""" + + +class ShardNotInClusterError(Exception): + """Raised when shard is not present in cluster, but it is expected to be.""" + + +class ShardNotPlannedForRemovalError(Exception): + """Raised when it is expected that a shard is planned for removal, but it is not.""" class MongosConnection: @@ -158,6 +166,97 @@ def add_shard(self, shard_name, shard_hosts, shard_port=Config.MONGODB_PORT): logger.info("Adding shard %s", shard_name) self.client.admin.command("addShard", shard_url) + def remove_shard(self, shard_name: str) -> None: + """Removes shard from the cluster. + + Raises: + ConfigurationError, OperationFailure, NotReadyError, + RemovePrimaryShardError + """ + sc_status = self.client.admin.command("listShards") + # It is necessary to call removeShard multiple times on a shard to guarantee removal. + # Allow re-removal of shards that are currently draining. + if self._is_any_draining(sc_status, ignore_shard=shard_name): + cannot_remove_shard = ( + f"cannot remove shard {shard_name} from cluster, another shard is draining" + ) + logger.error(cannot_remove_shard) + raise NotReadyError(cannot_remove_shard) + + # TODO Follow up PR, there is no MongoDB command to retrieve primary shard, this is + # possible with mongosh. + primary_shard = self.get_primary_shard() + if primary_shard: + # TODO Future PR, support removing Primary Shard if there are no unsharded collections + # on it. All sharded collections should perform `MovePrimary` + cannot_remove_primary_shard = ( + f"Shard {shard_name} is the primary shard, cannot remove." + ) + logger.error(cannot_remove_primary_shard) + raise RemovePrimaryShardError(cannot_remove_primary_shard) + + logger.info("Attempting to remove shard %s", shard_name) + removal_info = self.client.admin.command("removeShard", shard_name) + + # process removal status + remaining_chunks = ( + removal_info["remaining"]["chunks"] if "remaining" in removal_info else "None" + ) + dbs_to_move = ( + removal_info["dbsToMove"] + if "dbsToMove" in removal_info and removal_info["dbsToMove"] != [] + else ["None"] + ) + logger.info( + "Shard %s is draining status is: %s. Remaining chunks: %s. DBs to move: %s.", + shard_name, + removal_info["state"], + str(remaining_chunks), + ",".join(dbs_to_move), + ) + + def _is_shard_draining(self, shard_name: str) -> bool: + """Reports if a given shard is currently in the draining state. + + Raises: + ConfigurationError, OperationFailure, ShardNotInClusterError, + ShardNotPlannedForRemovalError + """ + sc_status = self.client.admin.command("listShards") + for shard in sc_status["shards"]: + if shard["_id"] == shard_name: + if "draining" not in shard: + raise ShardNotPlannedForRemovalError( + f"Shard {shard_name} has not been marked for removal", + ) + return shard["draining"] + + raise ShardNotInClusterError( + f"Shard {shard_name} not in cluster, could not retrieve draining status" + ) + + def get_primary_shard(self) -> str: + """Processes sc_status and identifies the primary shard.""" + # TODO Follow up PR, implement this function there is no MongoDB command to retrieve + # primary shard, this is possible with mongosh. + return False + + @staticmethod + def _is_any_draining(sc_status: Dict, ignore_shard: str = "") -> bool: + """Returns true if any shard members is draining. + + Checks if any members in sharded cluster are draining data. + + Args: + sc_status: current state of shard cluster status as reported by mongos. + ignore_shard: shard to ignore + """ + return any( + # check draining status of all shards except the one to be ignored. + shard.get("draining", False) if shard["_id"] != ignore_shard else False + for shard in sc_status["shards"] + ) + @staticmethod def _hostname_from_hostport(hostname: str) -> str: """Return hostname part from MongoDB returned. diff --git a/lib/charms/mongodb/v0/shards_interface.py b/lib/charms/mongodb/v0/shards_interface.py index 58e4a7a29..868c5b5de 100644 --- a/lib/charms/mongodb/v0/shards_interface.py +++ b/lib/charms/mongodb/v0/shards_interface.py @@ -6,14 +6,25 @@ This class handles the sharing of secrets between sharded components, adding shards, and removing shards. """ +import json import logging -from typing import List, Optional +import time +from typing import List, Optional, Set from charms.mongodb.v0.helpers import KEY_FILE -from charms.mongodb.v0.mongodb import MongoDBConnection, NotReadyError, PyMongoError -from charms.mongodb.v0.mongos import MongosConnection +from charms.mongodb.v0.mongodb import ( + MongoDBConnection, + NotReadyError, + OperationFailure, + PyMongoError, +) +from charms.mongodb.v0.mongos import ( + MongosConnection, + ShardNotInClusterError, + ShardNotPlannedForRemovalError, +) from charms.mongodb.v0.users import MongoDBUser, OperatorUser -from ops.charm import CharmBase, RelationBrokenEvent +from ops.charm import CharmBase, EventBase, RelationBrokenEvent from ops.framework import Object from ops.model import ActiveStatus, BlockedStatus, MaintenanceStatus, WaitingStatus from tenacity import RetryError, Retrying, stop_after_delay, wait_fixed @@ -31,9 +42,19 @@ # Increment this PATCH version before using `charmcraft publish-lib` or reset # to 0 if you are raising the major API version -LIBPATCH = 2 +LIBPATCH = 3 KEYFILE_KEY = "key-file" +HOSTS_KEY = "host" OPERATOR_PASSWORD_KEY = MongoDBUser.get_password_key_name_for_user(OperatorUser.get_username()) +FORBIDDEN_REMOVAL_ERR_CODE = 20 + + +class RemoveLastShardError(Exception): + """Raised when there is an attempt to remove the last shard in the cluster.""" + + +class NotDrainedError(Exception): + """Raised when a shard is still in the cluster after removal.""" class ShardingProvider(Object): @@ -42,7 +63,7 @@ class ShardingProvider(Object): def __init__( self, charm: CharmBase, relation_name: str = Config.Relations.CONFIG_SERVER_RELATIONS_NAME ) -> None: - """Constructor for ShardingRequirer object.""" + """Constructor for ShardingProvider object.""" self.relation_name = relation_name self.charm = charm @@ -53,27 +74,17 @@ def __init__( self.framework.observe( charm.on[self.relation_name].relation_changed, self._on_relation_event ) + self.framework.observe( + charm.on[self.relation_name].relation_broken, self._on_relation_event + ) - # TODO Follow up PR, handle rotating passwords + # TODO Future PR: handle self healing when all IP addresses of a shard changes and we have + # to manually update mongos def _on_relation_joined(self, event): """Handles providing shards with secrets and adding shards to the config server.""" - if self.charm.is_role(Config.Role.REPLICATION): - self.charm.unit.status = BlockedStatus("role replication does not support sharding") - logger.error("sharding interface not supported with config role=replication") - return - - if not self.charm.is_role(Config.Role.CONFIG_SERVER): - logger.info( - "skipping relation joined event ShardingRequirer is only be executed by config-server" - ) - return - - if not self.charm.unit.is_leader(): - return - - if not self.charm.db_initialised: - event.defer() + if not self.pass_hook_checks(event): + logger.info("Skipping relation joined event: hook checks did not pass") return # TODO Future PR, sync tls secrets and PBM password @@ -87,27 +98,50 @@ def _on_relation_joined(self, event): KEYFILE_KEY: self.charm.get_secret( Config.Relations.APP_SCOPE, Config.Secrets.SECRET_KEYFILE_NAME ), + HOSTS_KEY: json.dumps(self.charm._unit_ips), }, ) - def _on_relation_event(self, event): - """Handles adding, removing, and updating of shards.""" + def pass_hook_checks(self, event: EventBase) -> bool: + """Runs the pre-hooks checks for ShardingProvider, returns True if all pass.""" if self.charm.is_role(Config.Role.REPLICATION): self.unit.status = BlockedStatus("role replication does not support sharding") - logger.error("sharding interface not supported with config role=replication") - return + logger.error( + "Skipping %s. Sharding interface not supported with config role=replication.", + type(event), + ) + return False if not self.charm.is_role(Config.Role.CONFIG_SERVER): logger.info( - "skipping relation joined event ShardingRequirer is only be executed by config-server" + "Skipping %s. ShardingProvider is only be executed by config-server", type(event) ) - return + return False if not self.charm.unit.is_leader(): - return + return False if not self.charm.db_initialised: + logger.info("Deferring %s. db is not initialised.", type(event)) event.defer() + return False + + return True + + def _on_relation_event(self, event): + """Handles adding and removing of shards. + + Updating of shards is done automatically via MongoDB change-streams. + """ + if not self.pass_hook_checks(event): + logger.info("Skipping relation event: hook checks did not pass") + return + + # adding/removing shards while a backup/restore is in progress can be disastrous + pbm_status = self.charm.backups._get_pbm_status() + if isinstance(pbm_status, MaintenanceStatus): + event.defer("Cannot add/remove shards while a backup/restore is in progress.") + return departed_relation_id = None if type(event) is RelationBrokenEvent: @@ -116,9 +150,27 @@ def _on_relation_event(self, event): try: logger.info("Adding shards not present in cluster.") self.add_shards(departed_relation_id) - # TODO Future PR, enable updating shards by listening for relation changed events - # TODO Future PR, enable shard drainage by listening for relation departed events - except PyMongoError as e: + self.remove_shards(departed_relation_id) + except NotDrainedError: + # it is necessary to removeShard multiple times for the shard to be removed. + logger.info( + "Shard is still present in the cluster after removal, will defer and remove again." + ) + event.defer() + return + except OperationFailure as e: + if e.code == FORBIDDEN_REMOVAL_ERR_CODE: + # TODO Future PR, allow removal of last shards that have no data. This will be + # tricky since we are not allowed to update the mongos config in this way. + logger.error( + "Cannot not remove the last shard from cluster, this is forbidden by mongos." + ) + # we should not lose connection with the shard, prevent other hooks from executing. + raise RemoveLastShardError() + + logger.error("Deferring _on_relation_event for shards interface since: error=%r", e) + event.defer() + except (PyMongoError, NotReadyError) as e: logger.error("Deferring _on_relation_event for shards interface since: error=%r", e) event.defer() return @@ -152,11 +204,40 @@ def add_shards(self, departed_shard_id): self.charm.unit.status = ActiveStatus("") + def remove_shards(self, departed_shard_id): + """Removes shards from cluster. + + raises: PyMongoError, NotReadyError + """ + with MongosConnection(self.charm.mongos_config) as mongo: + cluster_shards = mongo.get_shard_members() + relation_shards = self._get_shards_from_relations(departed_shard_id) + + for shard in cluster_shards - relation_shards: + self.charm.unit.status = MaintenanceStatus(f"Draining shard {shard}") + logger.info("Attempting to removing shard: %s", shard) + mongo.remove_shard(shard) + logger.info("Shard: %s, is now draining", shard) + + if shard in mongo.get_shard_members(): + shard_draining_message = f"shard {shard} still exists in cluster after removal, it is still draining." + logger.info(shard_draining_message) + raise NotDrainedError(shard_draining_message) + def update_credentials(self, key: str, value: str) -> None: """Sends new credentials, for a key value pair across all shards.""" for relation in self.charm.model.relations[self.relation_name]: self._update_relation_data(relation.id, {key: value}) + def _update_mongos_hosts(self): + """Updates the hosts for mongos on the relation data.""" + if not self.charm.is_role(Config.Role.CONFIG_SERVER): + logger.info("Skipping, ShardingProvider is only be executed by config-server") + return + + for relation in self.charm.model.relations[self.relation_name]: + self._update_relation_data(relation.id, {HOSTS_KEY: json.dumps(self.charm._unit_ips)}) + def _update_relation_data(self, relation_id: int, data: dict) -> None: """Updates a set of key-value pairs in the relation. @@ -214,8 +295,9 @@ def __init__( self.framework.observe( charm.on[self.relation_name].relation_changed, self._on_relation_changed ) - - # TODO Future PR, enable shard drainage by observing relation departed events + self.framework.observe( + charm.on[self.relation_name].relation_broken, self._on_relation_broken + ) def _on_relation_changed(self, event): """Retrieves secrets from config-server and updates them within the shard.""" @@ -264,6 +346,80 @@ def _on_relation_changed(self, event): # TODO future PR, leader unit verifies shard was added to cluster (update-status hook) + def _on_relation_broken(self, event) -> None: + """Waits for the shard to be fully drained from the cluster.""" + if self.charm.is_role(Config.Role.REPLICATION): + self.unit.status = BlockedStatus("role replication does not support sharding") + logger.error("sharding interface not supported with config role=replication") + return + + if not self.charm.is_role(Config.Role.SHARD): + logger.info( + "skipping relation broken event ShardingProvider is only be executed by shards" + ) + return + + if not self.charm.db_initialised: + event.defer() + return + + self.charm.unit.status = MaintenanceStatus("Draining shard from cluster") + # mongos hosts must be retrieved via relation data, as relation.units are not available in + # broken + mongos_hosts = json.loads(event.relation.data[event.relation.app].get(HOSTS_KEY)) + drained = False + while not drained: + try: + # no need to continuously check and abuse resources while shard is draining + time.sleep(10) + drained = self.drained(mongos_hosts, self.charm.app.name) + draining_status = ( + "Shard is still draining" if not drained else "Shard is fully drained." + ) + logger.debug(draining_status) + except PyMongoError as e: + logger.error("Error occurred while draining shard: %s", e) + self.charm.unit.status = BlockedStatus("Failed to drain shard from cluster") + except ShardNotPlannedForRemovalError: + logger.info( + "Shard %s has not been identifies for removal. Must wait for mongos cluster-admin to remove shard." + ) + except ShardNotInClusterError: + logger.info( + "Shard to remove is not in sharded cluster. It has been successfully removed." + ) + if self.charm.unit.is_leader(): + self.charm.app_peer_data["drained"] = json.dumps(True) + + break + + self.charm.unit.status = ActiveStatus("Shard drained from cluster, ready for removal") + # TODO future PR, leader unit displays this message in update-status hook + # TODO future PR, check for shard drainage when removing application + + def drained(self, mongos_hosts: Set[str], shard_name: str) -> bool: + """Returns whether a shard has been drained from the cluster. + + Raises: + ConfigurationError, OperationFailure, ShardNotInClusterError, + ShardNotPlannedForRemovalError + """ + if not self.charm.is_role(Config.Role.SHARD): + logger.info("Component %s is not a shard, has no draining status.", self.charm.role) + return False + + if not self.charm.unit.is_leader(): + # if "drained" hasn't been set by leader, then assume it hasn't be drained. + return json.dumps(self.charm.app_peer_data.get("drained", False)) + + with MongosConnection(self.charm.remote_mongos_config(set(mongos_hosts))) as mongo: + # a shard is "drained" if it is NO LONGER draining. + draining = mongo._is_shard_draining(shard_name) + drained = not draining + + self.charm.app_peer_data["drained"] = json.dumps(drained) + return drained + def update_operator_password(self, new_password: str) -> None: """Updates the password for the operator user. diff --git a/src/charm.py b/src/charm.py index 5e209d42e..6c8525ed8 100755 --- a/src/charm.py +++ b/src/charm.py @@ -164,6 +164,15 @@ def _primary(self) -> str: return None + @property + def drained(self) -> bool: + """Returns whether the shard has been drained.""" + if not self.is_role(Config.Role.SHARD): + logger.info("Component %s is not a shard, cannot check draining status.", self.role) + return False + + return self.app_peer_data.get("drained", False) + @property def _unit_ips(self) -> List[str]: """Retrieve IP addresses associated with MongoDB application. @@ -198,6 +207,12 @@ def mongos_config(self) -> MongoDBConfiguration: """Generates a MongoDBConfiguration object for mongos in the deployment of MongoDB.""" return self._get_mongos_config_for_user(OperatorUser, set(self._unit_ips)) + def remote_mongos_config(self, hosts) -> MongoDBConfiguration: + """Generates a MongoDBConfiguration object for mongos in the deployment of MongoDB.""" + # mongos that are part of the cluster have the same username and password, but different + # hosts + return self._get_mongos_config_for_user(OperatorUser, hosts) + @property def mongodb_config(self) -> MongoDBConfiguration: """Generates a MongoDBConfiguration object for this deployment of MongoDB.""" @@ -404,6 +419,7 @@ def _on_relation_joined(self, event: RelationJoinedEvent) -> None: # app relations should be made aware of the new set of hosts try: self.client_relations.update_app_relation_data() + self.shard_relations._update_mongos_hosts() except PyMongoError as e: logger.error("Deferring on updating app relation data since: error: %r", e) event.defer() @@ -466,6 +482,7 @@ def _on_leader_elected(self, event: LeaderElectedEvent) -> None: # app relations should be made aware of the new set of hosts try: self.client_relations.update_app_relation_data() + self.shard_relations._update_mongos_hosts() except PyMongoError as e: logger.error("Deferring on updating app relation data since: error: %r", e) event.defer() @@ -486,6 +503,7 @@ def _on_relation_departed(self, event: RelationDepartedEvent) -> None: # app relations should be made aware of the new set of hosts try: self.client_relations.update_app_relation_data() + self.shard_relations._update_mongos_hosts() except PyMongoError as e: logger.error("Deferring on updating app relation data since: error: %r", e) event.defer() @@ -882,6 +900,7 @@ def _handle_reconfigure(self, event: UpdateStatusEvent): # app relations should be made aware of the new set of hosts try: self.client_relations.update_app_relation_data() + self.shard_relations._update_mongos_hosts() except PyMongoError as e: logger.error("Deferring on updating app relation data since: error: %r", e) event.defer()