diff --git a/opensearchpy/_async/plugins/knn.py b/opensearchpy/_async/plugins/knn.py new file mode 100644 index 00000000..e7643e40 --- /dev/null +++ b/opensearchpy/_async/plugins/knn.py @@ -0,0 +1,316 @@ +# SPDX-License-Identifier: Apache-2.0 +# +# The OpenSearch Contributors require contributions made to +# this file be licensed under the Apache-2.0 license or a +# compatible open source license. +# +# Modifications Copyright OpenSearch Contributors. See +# GitHub history for details. + +# ------------------------------------------------------------------------------------------ +# THIS CODE IS AUTOMATICALLY GENERATED AND MANUAL EDITS WILL BE LOST +# +# To contribute, kindly make modifications in the opensearch-py client generator +# or in the OpenSearch API specification, and run `nox -rs generate`. See DEVELOPER_GUIDE.md +# and https://github.com/opensearch-project/opensearch-api-specification for details. +# -----------------------------------------------------------------------------------------+ + + +from typing import Any + +from ..client.utils import SKIP_IN_PATH, NamespacedClient, _make_path, query_params + + +class KnnClient(NamespacedClient): + @query_params() + async def delete_model( + self, + model_id: Any, + params: Any = None, + headers: Any = None, + ) -> Any: + """ + Used to delete a particular model in the cluster. + + + :arg model_id: The id of the model. + """ + if model_id in SKIP_IN_PATH: + raise ValueError("Empty value passed for a required argument 'model_id'.") + + return await self.transport.perform_request( + "DELETE", + _make_path("_plugins", "_knn", "models", model_id), + params=params, + headers=headers, + ) + + @query_params() + async def get_model( + self, + model_id: Any, + params: Any = None, + headers: Any = None, + ) -> Any: + """ + Used to retrieve information about models present in the cluster. + + + :arg model_id: The id of the model. + """ + if model_id in SKIP_IN_PATH: + raise ValueError("Empty value passed for a required argument 'model_id'.") + + return await self.transport.perform_request( + "GET", + _make_path("_plugins", "_knn", "models", model_id), + params=params, + headers=headers, + ) + + @query_params( + "_source", + "_source_excludes", + "_source_includes", + "allow_no_indices", + "allow_partial_search_results", + "analyze_wildcard", + "analyzer", + "batched_reduce_size", + "ccs_minimize_roundtrips", + "default_operator", + "df", + "docvalue_fields", + "expand_wildcards", + "explain", + "from_", + "ignore_throttled", + "ignore_unavailable", + "lenient", + "max_concurrent_shard_requests", + "pre_filter_shard_size", + "preference", + "q", + "request_cache", + "rest_total_hits_as_int", + "routing", + "scroll", + "search_type", + "seq_no_primary_term", + "size", + "sort", + "stats", + "stored_fields", + "suggest_field", + "suggest_mode", + "suggest_size", + "suggest_text", + "terminate_after", + "timeout", + "track_scores", + "track_total_hits", + "typed_keys", + "version", + ) + async def search_models( + self, + body: Any = None, + params: Any = None, + headers: Any = None, + ) -> Any: + """ + Use an OpenSearch query to search for models in the index. + + + :arg _source: True or false to return the _source field or not, + or a list of fields to return. + :arg _source_excludes: List of fields to exclude from the + returned _source field. + :arg _source_includes: List of fields to extract and return from + the _source field. + :arg allow_no_indices: Whether to ignore if a wildcard indices + expression resolves into no concrete indices. (This includes `_all` + string or when no indices have been specified). + :arg allow_partial_search_results: Indicate if an error should + be returned if there is a partial search failure or timeout. Default is + True. + :arg analyze_wildcard: Specify whether wildcard and prefix + queries should be analyzed. Default is false. + :arg analyzer: The analyzer to use for the query string. + :arg batched_reduce_size: The number of shard results that + should be reduced at once on the coordinating node. This value should be + used as a protection mechanism to reduce the memory overhead per search + request if the potential number of shards in the request can be large. + Default is 512. + :arg ccs_minimize_roundtrips: Indicates whether network round- + trips should be minimized as part of cross-cluster search requests + execution. Default is True. + :arg default_operator: The default operator for query string + query (AND or OR). Valid choices are AND, OR. + :arg df: The field to use as default where no field prefix is + given in the query string. + :arg docvalue_fields: Comma-separated list of fields to return + as the docvalue representation of a field for each hit. + :arg expand_wildcards: Whether to expand wildcard expression to + concrete indices that are open, closed or both. Valid choices are all, + open, closed, hidden, none. + :arg explain: Specify whether to return detailed information + about score computation as part of a hit. + :arg from_: Starting offset. Default is 0. + :arg ignore_throttled: Whether specified concrete, expanded or + aliased indices should be ignored when throttled. + :arg ignore_unavailable: Whether specified concrete indices + should be ignored when unavailable (missing or closed). + :arg lenient: Specify whether format-based query failures (such + as providing text to a numeric field) should be ignored. + :arg max_concurrent_shard_requests: The number of concurrent + shard requests per node this search executes concurrently. This value + should be used to limit the impact of the search on the cluster in order + to limit the number of concurrent shard requests. Default is 5. + :arg pre_filter_shard_size: Threshold that enforces a pre-filter + round-trip to prefilter search shards based on query rewriting if the + number of shards the search request expands to exceeds the threshold. + This filter round-trip can limit the number of shards significantly if + for instance a shard can not match any documents based on its rewrite + method ie. if date filters are mandatory to match but the shard bounds + and the query are disjoint. + :arg preference: Specify the node or shard the operation should + be performed on. Default is random. + :arg q: Query in the Lucene query string syntax. + :arg request_cache: Specify if request cache should be used for + this request or not, defaults to index level setting. + :arg rest_total_hits_as_int: Indicates whether hits.total should + be rendered as an integer or an object in the rest search response. + Default is false. + :arg routing: Comma-separated list of specific routing values. + :arg scroll: Specify how long a consistent view of the index + should be maintained for scrolled search. + :arg search_type: Search operation type. Valid choices are + query_then_fetch, dfs_query_then_fetch. + :arg seq_no_primary_term: Specify whether to return sequence + number and primary term of the last modification of each hit. + :arg size: Number of hits to return. Default is 10. + :arg sort: Comma-separated list of : pairs. + :arg stats: Specific 'tag' of the request for logging and + statistical purposes. + :arg stored_fields: Comma-separated list of stored fields to + return. + :arg suggest_field: Specify which field to use for suggestions. + :arg suggest_mode: Specify suggest mode. Valid choices are + missing, popular, always. + :arg suggest_size: How many suggestions to return in response. + :arg suggest_text: The source text for which the suggestions + should be returned. + :arg terminate_after: The maximum number of documents to collect + for each shard, upon reaching which the query execution will terminate + early. + :arg timeout: Operation timeout. + :arg track_scores: Whether to calculate and return scores even + if they are not used for sorting. + :arg track_total_hits: Indicate if the number of documents that + match the query should be tracked. + :arg typed_keys: Specify whether aggregation and suggester names + should be prefixed by their respective types in the response. + :arg version: Whether to return document version as part of a + hit. + """ + # from is a reserved word so it cannot be used, use from_ instead + if "from_" in params: + params["from"] = params.pop("from_") + + return await self.transport.perform_request( + "POST", + "/_plugins/_knn/models/_search", + params=params, + headers=headers, + body=body, + ) + + @query_params("timeout") + async def stats( + self, + nodeId: Any = None, + stat: Any = None, + params: Any = None, + headers: Any = None, + ) -> Any: + """ + Provides information about the current status of the k-NN plugin. + + + :arg nodeId: Comma-separated list of node IDs or names to limit + the returned information; use `_local` to return information from the + node you're connecting to, leave empty to get information from all + nodes. + :arg stat: Comma-separated list of stats to retrieve; use `_all` + or empty string to retrieve all stats. Valid choices are + circuit_breaker_triggered, total_load_time, eviction_count, hit_count, + miss_count, graph_memory_usage, graph_memory_usage_percentage, + graph_index_requests, graph_index_errors, graph_query_requests, + graph_query_errors, knn_query_requests, cache_capacity_reached, + load_success_count, load_exception_count, indices_in_cache, + script_compilations, script_compilation_errors, script_query_requests, + script_query_errors, nmslib_initialized, faiss_initialized, + model_index_status, indexing_from_model_degraded, training_requests, + training_errors, training_memory_usage, + training_memory_usage_percentage. + :arg timeout: Operation timeout. + """ + return await self.transport.perform_request( + "GET", + _make_path("_plugins", "_knn", nodeId, "stats", stat), + params=params, + headers=headers, + ) + + @query_params("preference") + async def train_model( + self, + body: Any, + model_id: Any = None, + params: Any = None, + headers: Any = None, + ) -> Any: + """ + Create and train a model that can be used for initializing k-NN native library + indexes during indexing. + + + :arg model_id: The id of the model. + :arg preference: Preferred node to execute training. + """ + if body in SKIP_IN_PATH: + raise ValueError("Empty value passed for a required argument 'body'.") + + return await self.transport.perform_request( + "POST", + _make_path("_plugins", "_knn", "models", model_id, "_train"), + params=params, + headers=headers, + body=body, + ) + + @query_params() + async def warmup( + self, + index: Any, + params: Any = None, + headers: Any = None, + ) -> Any: + """ + Preloads native library files into memory, reducing initial search latency for + specified indexes + + + :arg index: Comma-separated list of indices; use `_all` or empty + string to perform the operation on all indices. + """ + if index in SKIP_IN_PATH: + raise ValueError("Empty value passed for a required argument 'index'.") + + return await self.transport.perform_request( + "GET", + _make_path("_plugins", "_knn", "warmup", index), + params=params, + headers=headers, + ) diff --git a/opensearchpy/plugins/__init__.py b/opensearchpy/plugins/__init__.py index 2f42da79..6c0097cd 100644 --- a/opensearchpy/plugins/__init__.py +++ b/opensearchpy/plugins/__init__.py @@ -6,4 +6,3 @@ # # Modifications Copyright OpenSearch Contributors. See # GitHub history for details. -# diff --git a/opensearchpy/plugins/knn.py b/opensearchpy/plugins/knn.py new file mode 100644 index 00000000..99c45f53 --- /dev/null +++ b/opensearchpy/plugins/knn.py @@ -0,0 +1,316 @@ +# SPDX-License-Identifier: Apache-2.0 +# +# The OpenSearch Contributors require contributions made to +# this file be licensed under the Apache-2.0 license or a +# compatible open source license. +# +# Modifications Copyright OpenSearch Contributors. See +# GitHub history for details. + +# ------------------------------------------------------------------------------------------ +# THIS CODE IS AUTOMATICALLY GENERATED AND MANUAL EDITS WILL BE LOST +# +# To contribute, kindly make modifications in the opensearch-py client generator +# or in the OpenSearch API specification, and run `nox -rs generate`. See DEVELOPER_GUIDE.md +# and https://github.com/opensearch-project/opensearch-api-specification for details. +# -----------------------------------------------------------------------------------------+ + + +from typing import Any + +from ..client.utils import SKIP_IN_PATH, NamespacedClient, _make_path, query_params + + +class KnnClient(NamespacedClient): + @query_params() + def delete_model( + self, + model_id: Any, + params: Any = None, + headers: Any = None, + ) -> Any: + """ + Used to delete a particular model in the cluster. + + + :arg model_id: The id of the model. + """ + if model_id in SKIP_IN_PATH: + raise ValueError("Empty value passed for a required argument 'model_id'.") + + return self.transport.perform_request( + "DELETE", + _make_path("_plugins", "_knn", "models", model_id), + params=params, + headers=headers, + ) + + @query_params() + def get_model( + self, + model_id: Any, + params: Any = None, + headers: Any = None, + ) -> Any: + """ + Used to retrieve information about models present in the cluster. + + + :arg model_id: The id of the model. + """ + if model_id in SKIP_IN_PATH: + raise ValueError("Empty value passed for a required argument 'model_id'.") + + return self.transport.perform_request( + "GET", + _make_path("_plugins", "_knn", "models", model_id), + params=params, + headers=headers, + ) + + @query_params( + "_source", + "_source_excludes", + "_source_includes", + "allow_no_indices", + "allow_partial_search_results", + "analyze_wildcard", + "analyzer", + "batched_reduce_size", + "ccs_minimize_roundtrips", + "default_operator", + "df", + "docvalue_fields", + "expand_wildcards", + "explain", + "from_", + "ignore_throttled", + "ignore_unavailable", + "lenient", + "max_concurrent_shard_requests", + "pre_filter_shard_size", + "preference", + "q", + "request_cache", + "rest_total_hits_as_int", + "routing", + "scroll", + "search_type", + "seq_no_primary_term", + "size", + "sort", + "stats", + "stored_fields", + "suggest_field", + "suggest_mode", + "suggest_size", + "suggest_text", + "terminate_after", + "timeout", + "track_scores", + "track_total_hits", + "typed_keys", + "version", + ) + def search_models( + self, + body: Any = None, + params: Any = None, + headers: Any = None, + ) -> Any: + """ + Use an OpenSearch query to search for models in the index. + + + :arg _source: True or false to return the _source field or not, + or a list of fields to return. + :arg _source_excludes: List of fields to exclude from the + returned _source field. + :arg _source_includes: List of fields to extract and return from + the _source field. + :arg allow_no_indices: Whether to ignore if a wildcard indices + expression resolves into no concrete indices. (This includes `_all` + string or when no indices have been specified). + :arg allow_partial_search_results: Indicate if an error should + be returned if there is a partial search failure or timeout. Default is + True. + :arg analyze_wildcard: Specify whether wildcard and prefix + queries should be analyzed. Default is false. + :arg analyzer: The analyzer to use for the query string. + :arg batched_reduce_size: The number of shard results that + should be reduced at once on the coordinating node. This value should be + used as a protection mechanism to reduce the memory overhead per search + request if the potential number of shards in the request can be large. + Default is 512. + :arg ccs_minimize_roundtrips: Indicates whether network round- + trips should be minimized as part of cross-cluster search requests + execution. Default is True. + :arg default_operator: The default operator for query string + query (AND or OR). Valid choices are AND, OR. + :arg df: The field to use as default where no field prefix is + given in the query string. + :arg docvalue_fields: Comma-separated list of fields to return + as the docvalue representation of a field for each hit. + :arg expand_wildcards: Whether to expand wildcard expression to + concrete indices that are open, closed or both. Valid choices are all, + open, closed, hidden, none. + :arg explain: Specify whether to return detailed information + about score computation as part of a hit. + :arg from_: Starting offset. Default is 0. + :arg ignore_throttled: Whether specified concrete, expanded or + aliased indices should be ignored when throttled. + :arg ignore_unavailable: Whether specified concrete indices + should be ignored when unavailable (missing or closed). + :arg lenient: Specify whether format-based query failures (such + as providing text to a numeric field) should be ignored. + :arg max_concurrent_shard_requests: The number of concurrent + shard requests per node this search executes concurrently. This value + should be used to limit the impact of the search on the cluster in order + to limit the number of concurrent shard requests. Default is 5. + :arg pre_filter_shard_size: Threshold that enforces a pre-filter + round-trip to prefilter search shards based on query rewriting if the + number of shards the search request expands to exceeds the threshold. + This filter round-trip can limit the number of shards significantly if + for instance a shard can not match any documents based on its rewrite + method ie. if date filters are mandatory to match but the shard bounds + and the query are disjoint. + :arg preference: Specify the node or shard the operation should + be performed on. Default is random. + :arg q: Query in the Lucene query string syntax. + :arg request_cache: Specify if request cache should be used for + this request or not, defaults to index level setting. + :arg rest_total_hits_as_int: Indicates whether hits.total should + be rendered as an integer or an object in the rest search response. + Default is false. + :arg routing: Comma-separated list of specific routing values. + :arg scroll: Specify how long a consistent view of the index + should be maintained for scrolled search. + :arg search_type: Search operation type. Valid choices are + query_then_fetch, dfs_query_then_fetch. + :arg seq_no_primary_term: Specify whether to return sequence + number and primary term of the last modification of each hit. + :arg size: Number of hits to return. Default is 10. + :arg sort: Comma-separated list of : pairs. + :arg stats: Specific 'tag' of the request for logging and + statistical purposes. + :arg stored_fields: Comma-separated list of stored fields to + return. + :arg suggest_field: Specify which field to use for suggestions. + :arg suggest_mode: Specify suggest mode. Valid choices are + missing, popular, always. + :arg suggest_size: How many suggestions to return in response. + :arg suggest_text: The source text for which the suggestions + should be returned. + :arg terminate_after: The maximum number of documents to collect + for each shard, upon reaching which the query execution will terminate + early. + :arg timeout: Operation timeout. + :arg track_scores: Whether to calculate and return scores even + if they are not used for sorting. + :arg track_total_hits: Indicate if the number of documents that + match the query should be tracked. + :arg typed_keys: Specify whether aggregation and suggester names + should be prefixed by their respective types in the response. + :arg version: Whether to return document version as part of a + hit. + """ + # from is a reserved word so it cannot be used, use from_ instead + if "from_" in params: + params["from"] = params.pop("from_") + + return self.transport.perform_request( + "POST", + "/_plugins/_knn/models/_search", + params=params, + headers=headers, + body=body, + ) + + @query_params("timeout") + def stats( + self, + nodeId: Any = None, + stat: Any = None, + params: Any = None, + headers: Any = None, + ) -> Any: + """ + Provides information about the current status of the k-NN plugin. + + + :arg nodeId: Comma-separated list of node IDs or names to limit + the returned information; use `_local` to return information from the + node you're connecting to, leave empty to get information from all + nodes. + :arg stat: Comma-separated list of stats to retrieve; use `_all` + or empty string to retrieve all stats. Valid choices are + circuit_breaker_triggered, total_load_time, eviction_count, hit_count, + miss_count, graph_memory_usage, graph_memory_usage_percentage, + graph_index_requests, graph_index_errors, graph_query_requests, + graph_query_errors, knn_query_requests, cache_capacity_reached, + load_success_count, load_exception_count, indices_in_cache, + script_compilations, script_compilation_errors, script_query_requests, + script_query_errors, nmslib_initialized, faiss_initialized, + model_index_status, indexing_from_model_degraded, training_requests, + training_errors, training_memory_usage, + training_memory_usage_percentage. + :arg timeout: Operation timeout. + """ + return self.transport.perform_request( + "GET", + _make_path("_plugins", "_knn", nodeId, "stats", stat), + params=params, + headers=headers, + ) + + @query_params("preference") + def train_model( + self, + body: Any, + model_id: Any = None, + params: Any = None, + headers: Any = None, + ) -> Any: + """ + Create and train a model that can be used for initializing k-NN native library + indexes during indexing. + + + :arg model_id: The id of the model. + :arg preference: Preferred node to execute training. + """ + if body in SKIP_IN_PATH: + raise ValueError("Empty value passed for a required argument 'body'.") + + return self.transport.perform_request( + "POST", + _make_path("_plugins", "_knn", "models", model_id, "_train"), + params=params, + headers=headers, + body=body, + ) + + @query_params() + def warmup( + self, + index: Any, + params: Any = None, + headers: Any = None, + ) -> Any: + """ + Preloads native library files into memory, reducing initial search latency for + specified indexes + + + :arg index: Comma-separated list of indices; use `_all` or empty + string to perform the operation on all indices. + """ + if index in SKIP_IN_PATH: + raise ValueError("Empty value passed for a required argument 'index'.") + + return self.transport.perform_request( + "GET", + _make_path("_plugins", "_knn", "warmup", index), + params=params, + headers=headers, + ) diff --git a/test_opensearchpy/test_server/__init__.py b/test_opensearchpy/test_server/__init__.py index 36b548b5..52c14407 100644 --- a/test_opensearchpy/test_server/__init__.py +++ b/test_opensearchpy/test_server/__init__.py @@ -35,7 +35,7 @@ def get_client(**kwargs: Any) -> Any: - global client + global client # pylint: disable=invalid-name if client is False: raise SkipTest("No client is available") if client is not None and not kwargs: diff --git a/utils/generate_api.py b/utils/generate_api.py index 76f6353a..10ac653d 100644 --- a/utils/generate_api.py +++ b/utils/generate_api.py @@ -101,9 +101,10 @@ def is_valid_url(url: str) -> bool: class Module: - def __init__(self, namespace: str) -> None: + def __init__(self, namespace: str, is_plugin: bool) -> None: self.namespace: Any = namespace self._apis: Any = [] + self.is_plugin: bool = is_plugin self.parse_orig() def add(self, api: Any) -> None: @@ -118,7 +119,12 @@ def parse_orig(self) -> None: reads the written module and updates with important code specific to this client """ self.orders = [] - self.header = "from typing import Any, Collection, Optional, Tuple, Union\n\n" + if self.is_plugin: + self.header = "from typing import Any\n\n" + else: + self.header = ( + "from typing import Any, Collection, Optional, Tuple, Union\n\n" + ) namespace_new = "".join(word.capitalize() for word in self.namespace.split("_")) self.header += "class " + namespace_new + "Client(NamespacedClient):" @@ -209,8 +215,14 @@ def dump(self) -> None: # Imports are temporarily removed from the header and are regenerated # later to ensure imports are updated after code generation. + utils = ".utils" + if self.is_plugin: + utils = "..client.utils" + self.header = "\n".join( - line for line in self.header.split("\n") if "from .utils import" not in line + line + for line in self.header.split("\n") + if "from " + utils + " import" not in line ) with open(self.filepath, "w", encoding="utf-8") as file: @@ -252,7 +264,7 @@ def dump(self) -> None: present_keywords = [keyword for keyword in keywords if keyword in content] if present_keywords: - utils_imports = "from .utils import" + utils_imports = "from " + utils + " import" result = f"{utils_imports} {', '.join(present_keywords)}" utils_imports = result file_content = content.replace("#replace_token#", utils_imports) @@ -265,7 +277,10 @@ def filepath(self) -> Any: """ :return: absolute path to the module """ - return CODE_ROOT / f"opensearchpy/_async/client/{self.namespace}.py" + if self.is_plugin: + return CODE_ROOT / f"opensearchpy/_async/plugins/{self.namespace}.py" + else: + return CODE_ROOT / f"opensearchpy/_async/client/{self.namespace}.py" class API: @@ -704,8 +719,12 @@ def read_modules() -> Any: api = apply_patch(namespace, name, api) + is_plugin = False + if "_plugins" in api["url"]["paths"][0]["path"] and namespace != "security": + is_plugin = True + if namespace not in modules: - modules[namespace] = Module(namespace) + modules[namespace] = Module(namespace, is_plugin) modules[namespace].add(API(namespace, name, api)) @@ -752,13 +771,20 @@ def dump_modules(modules: Any) -> None: todir="/opensearchpy/client/", additional_replacements=additional_replacements, ), + unasync.Rule( + fromdir="/opensearchpy/_async/plugins/", + todir="/opensearchpy/plugins/", + additional_replacements=additional_replacements, + ), ] filepaths = [] for root, _, filenames in os.walk(CODE_ROOT / "opensearchpy/_async"): for filename in filenames: - if filename.rpartition(".")[-1] in ("py",) and not filename.startswith( - "utils.py" + if filename.rpartition(".")[-1] in ("py",) and filename not in ( + "utils.py", + "index_management.py", + "alerting.py", ): filepaths.append(os.path.join(root, filename)) diff --git a/utils/license_headers.py b/utils/license_headers.py index 0405476e..86582c9d 100644 --- a/utils/license_headers.py +++ b/utils/license_headers.py @@ -79,7 +79,7 @@ def add_header_to_file(filepath: str) -> None: for i, line in enumerate(lines): if len(line) > 0 and line not in LINES_TO_KEEP: break - lines = lines[:i] + [LICENSE_HEADER] + lines[i:] + lines = lines[:i] + [LICENSE_HEADER + "\n\n"] + lines[i:] with open(filepath, mode="w", encoding="utf-8") as file: file.truncate() file.write("".join(lines))