Skip to content

Commit

Permalink
Added Tests for Search Pipeline and Notifications Plugin (opensearch-…
Browse files Browse the repository at this point in the history
…project#668)

* Added Tests for Notification and Search Pipeline.ml

Signed-off-by: AbitraryYu <[email protected]>

* Add back the lines of comment in indices.py during resolving merge conflict.

Signed-off-by: AbitraryYu <[email protected]>

* Fix notification plugin tests, from async def to def.

Signed-off-by: AbitraryYu <[email protected]>

* Fix test errors. Rename function names and rewrite assertion tests.

Signed-off-by: AbitraryYu <[email protected]>

* Fix formatting errors. Added typings to CONTENT.

Signed-off-by: AbitraryYu <[email protected]>

---------

Signed-off-by: AbitraryYu <[email protected]>
  • Loading branch information
AbitraryYu authored Apr 30, 2024
1 parent 303b6c6 commit 0caacf8
Show file tree
Hide file tree
Showing 2 changed files with 163 additions and 0 deletions.
35 changes: 35 additions & 0 deletions test_opensearchpy/test_client/test_search_pipeline.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
# SPDX-License-Identifier: Apache-2.0
#
# The OpenSearch Contributors require contributions made to
# this file be licensed under the Apache-2.0 license or a
# compatible open source license.
#
# Modifications Copyright OpenSearch Contributors. See
# GitHub history for details.

from test_opensearchpy.test_cases import OpenSearchTestCase


class TestSearchPipeline(OpenSearchTestCase):
def test_create_search_pipeline(self) -> None:
body = {
"request_processors": [
{
"filter_query": {
"tag": "tag1",
"description": "This processor returns only publicly visible documents",
"query": {"term": {"visibility": "public"}},
}
}
],
"response_processors": [
{"rename_field": {"field": "message", "target_field": "notification"}}
],
}

self.client.search_pipeline.put("my_pipeline", body)
self.assert_url_called("PUT", "/_search/pipeline/my_pipeline")

def test_get_search_pipeline(self) -> None:
self.client.search_pipeline.get("my_pipeline")
self.assert_url_called("GET", "/_search/pipeline/my_pipeline")
128 changes: 128 additions & 0 deletions test_opensearchpy/test_server/test_plugins/test_notification.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
# SPDX-License-Identifier: Apache-2.0
#
# The OpenSearch Contributors require contributions made to
# this file be licensed under the Apache-2.0 license or a
# compatible open source license.
#
# Modifications Copyright OpenSearch Contributors. See
# GitHub history for details.


from __future__ import unicode_literals

import unittest
from typing import Any, Dict

from opensearchpy.exceptions import NotFoundError
from opensearchpy.helpers.test import OPENSEARCH_VERSION

from .. import OpenSearchTestCase


class TestNotificationPlugin(OpenSearchTestCase):
CONFIG_ID = "sample-id"
CONTENT: Dict[str, Any] = {
"config_id": "sample-id",
"name": "sample-name",
"config": {
"name": "Sample Slack Channel",
"description": "This ialerting.create_destination(dummy_destination)s a Slack channel",
"config_type": "slack",
"is_enabled": True,
"slack": {"url": "https://sample-slack-webhook"},
},
}

@unittest.skipUnless(
(OPENSEARCH_VERSION) and (OPENSEARCH_VERSION >= (2, 0, 0)),
"Plugin not supported for opensearch version",
)
def test_create_config(self) -> None:
response = self.client.plugins.notifications.create_config(self.CONTENT)

self.assertNotIn("errors", response)
self.assertIn("config_id", response)

@unittest.skipUnless(
(OPENSEARCH_VERSION) and (OPENSEARCH_VERSION >= (2, 0, 0)),
"Plugin not supported for opensearch version",
)
def test_list_channel_config(self) -> None:
self.test_create_config()

response = self.client.plugins.notifications.list_features()

self.assertNotIn("errors", response)
self.assertIn("allowed_config_type_list", response)

@unittest.skipUnless(
(OPENSEARCH_VERSION) and (OPENSEARCH_VERSION >= (2, 0, 0)),
"Plugin not supported for opensearch version",
)
def test_list_all_config(self) -> None:
self.test_create_config()

response = self.client.plugins.notifications.get_configs()

self.assertNotIn("errors", response)
self.assertIn("config_list", response)

@unittest.skipUnless(
(OPENSEARCH_VERSION) and (OPENSEARCH_VERSION >= (2, 0, 0)),
"Plugin not supported for opensearch version",
)
def test_get_config(self) -> None:
self.test_create_config()

response = self.client.plugins.notifications.get_config(
config_id=self.CONFIG_ID
)

self.assertNotIn("errors", response)
self.assertIn("config_list", response)
self.assertEqual(response["config_list"][0]["config_id"], self.CONFIG_ID)

@unittest.skipUnless(
(OPENSEARCH_VERSION) and (OPENSEARCH_VERSION >= (2, 0, 0)),
"Plugin not supported for opensearch version",
)
def test_update_config(self) -> None:
# Create configuration
self.test_create_config()

# Fetch the configuration
response = self.client.plugins.notifications.get_config(self.CONFIG_ID)

channel_content = self.CONTENT.copy()
channel_content["config"]["name"] = "Slack Channel"
channel_content["config"][
"description"
] = "This is an updated channel configuration"
channel_content["config"]["config_type"] = "slack"
channel_content["config"]["is_enabled"] = True
channel_content["config"]["slack"]["url"] = "https://hooks.slack.com/sample-url"

# Test to updat configuration
response = self.client.plugins.notifications.update_config(
config_id=self.CONFIG_ID, body=channel_content
)

self.assertNotIn("errors", response)
self.assertIn("config_id", response)

@unittest.skipUnless(
(OPENSEARCH_VERSION) and (OPENSEARCH_VERSION >= (2, 0, 0)),
"Plugin not supported for opensearch version",
)
def test_delete_config(self) -> None:
self.test_create_config()

response = self.client.plugins.notifications.delete_config(
config_id=self.CONFIG_ID
)

self.assertNotIn("errors", response)

# Try fetching the policy
with self.assertRaises(NotFoundError):
response = self.client.plugins.notifications.get_config(self.CONFIG_ID)

0 comments on commit 0caacf8

Please sign in to comment.