From 54d47e2a098df6220eb18c0cb3e6a4f279a6fca1 Mon Sep 17 00:00:00 2001 From: Smail KOURTA Date: Fri, 16 Aug 2024 18:25:02 +0400 Subject: [PATCH] [DPE-3426] migrate from _cat to _cluster APIs (#390) ## Issue We are using some `_cat` API calls to fetch data around shard information and node statuses. The `_cat` API is designed as a human-readable interface. ## Solution Move the `_cat` API calls to other API endpoints such as `_cluster/state` and `_cluster/health` while keeping the same data model. --- lib/charms/opensearch/v0/helper_cluster.py | 56 ++++++++++++++++--- .../opensearch/v0/opensearch_base_charm.py | 17 ++++-- tests/unit/lib/test_helper_cluster.py | 22 ++++++++ 3 files changed, 84 insertions(+), 11 deletions(-) diff --git a/lib/charms/opensearch/v0/helper_cluster.py b/lib/charms/opensearch/v0/helper_cluster.py index e0de33540..1ef70921e 100644 --- a/lib/charms/opensearch/v0/helper_cluster.py +++ b/lib/charms/opensearch/v0/helper_cluster.py @@ -186,10 +186,38 @@ def shards( verbose: bool = False, ) -> List[Dict[str, str]]: """Get all shards of all indexes in the cluster.""" - params = "" - if verbose: - params = "?v=true&h=index,shard,prirep,state,unassigned.reason&s=state" - return opensearch.request("GET", f"/_cat/shards{params}", host=host, alt_hosts=alt_hosts) + cluster_state = opensearch.request( + "GET", "_cluster/state/routing_table,metadata,nodes", host=host, alt_hosts=alt_hosts + ) + + nodes = cluster_state["nodes"] + + shards_info = [] + for index_name, index_data in cluster_state["routing_table"]["indices"].items(): + for shard_num, shard_data in index_data["shards"].items(): + for shard in shard_data: + node_data = nodes.get(shard["node"], {}) + node_name = node_data.get("name", None) + node_ip = ( + node_data["transport_address"].split(":")[0] + if "transport_address" in node_data + else None + ) + + shard_info = { + "index": index_name, + "shard": shard_num, + "prirep": shard["primary"] and "p" or "r", + "state": shard["state"], + "ip": node_ip, + "node": node_name, + } + if verbose: + shard_info["unassigned.reason"] = shard.get("unassigned_info", {}).get( + "reason", None + ) + shards_info.append(shard_info) + return shards_info @staticmethod @retry( @@ -222,10 +250,24 @@ def indices( alt_hosts: Optional[List[str]] = None, ) -> Dict[str, Dict[str, str]]: """Get all shards of all indexes in the cluster.""" - endpoint = "/_cat/indices?expand_wildcards=all" + # Get cluster state + cluster_state = opensearch.request( + "GET", "/_cluster/state/metadata", host=host, alt_hosts=alt_hosts + ) + indices_state = cluster_state["metadata"]["indices"] + + # Get cluster health + cluster_health = opensearch.request( + "GET", "/_cluster/health?level=indices", host=host, alt_hosts=alt_hosts + ) + indices_health = cluster_health["indices"] + idx = {} - for index in opensearch.request("GET", endpoint, host=host, alt_hosts=alt_hosts): - idx[index["index"]] = {"health": index["health"], "status": index["status"]} + for index in indices_state.keys(): + idx[index] = { + "health": indices_health[index]["status"], + "status": indices_state[index]["state"], + } return idx @staticmethod diff --git a/lib/charms/opensearch/v0/opensearch_base_charm.py b/lib/charms/opensearch/v0/opensearch_base_charm.py index d94a0bb15..a9a86e88e 100644 --- a/lib/charms/opensearch/v0/opensearch_base_charm.py +++ b/lib/charms/opensearch/v0/opensearch_base_charm.py @@ -1183,11 +1183,20 @@ def _remove_data_role_from_dedicated_cm_if_needed( # noqa: C901 try: for attempt in Retrying(stop=stop_after_attempt(3), wait=wait_fixed(0.5)): with attempt: - resp = self.opensearch.request( - "GET", endpoint=f"/_cat/allocation/{self.unit_name}?format=json" + search_shards_info = self.opensearch.request( + "GET", "/*/_search_shards?expand_wildcards=all" ) - for entry in resp: - if entry.get("node") == self.unit_name and entry.get("shards") != 0: + + # find the node id of the current unit + node_id = None + for node_id, node in search_shards_info["nodes"].items(): + if node["name"] == self.unit_name: + break + assert node_id is not None # should never happen + + # check if the node has any shards assigned to it + for shard_data in search_shards_info["shards"]: + if shard_data[0]["node"] == node_id: raise Exception return True except RetryError: diff --git a/tests/unit/lib/test_helper_cluster.py b/tests/unit/lib/test_helper_cluster.py index 41e30c70d..e2b2adcfd 100644 --- a/tests/unit/lib/test_helper_cluster.py +++ b/tests/unit/lib/test_helper_cluster.py @@ -314,3 +314,25 @@ def test_get_cluster_settings(self, request_mock): host=None, alt_hosts=None, ) + + @patch("charms.opensearch.v0.helper_cluster.ClusterState.shards") + def test_count_shards_by_state(self, shards): + """Test the busy shards filtering.""" + shards.return_value = [ + {"index": "index1", "state": "STARTED", "node": "opensearch-0"}, + {"index": "index1", "state": "INITIALIZING", "node": "opensearch-1"}, + {"index": "index2", "state": "STARTED", "node": "opensearch-0"}, + {"index": "index2", "state": "RELOCATING", "node": "opensearch-1"}, + {"index": "index3", "state": "STARTED", "node": "opensearch-0"}, + {"index": "index3", "state": "STARTED", "node": "opensearch-1"}, + {"index": "index4", "state": "STARTED", "node": "opensearch-2"}, + {"index": "index4", "state": "INITIALIZING", "node": "opensearch-2"}, + ] + self.assertDictEqual( + ClusterState.shards_by_state(self.opensearch), + { + "STARTED": 5, + "INITIALIZING": 2, + "RELOCATING": 1, + }, + )