From b947def8cb3d1e1329cdac161b9c775384c4b464 Mon Sep 17 00:00:00 2001 From: Peng Liu Date: Tue, 10 Dec 2024 17:06:56 +0800 Subject: [PATCH] Add connection check when building container from configs --- .../engine/orkes/orkes_workflow_client.py | 5 ++++ .../services/connectors/milvus.py | 26 ++++++++++++------- .../omagent_core/services/connectors/redis.py | 7 ++++- .../src/omagent_core/utils/container.py | 19 ++++++++++++++ 4 files changed, 46 insertions(+), 11 deletions(-) diff --git a/omagent-core/src/omagent_core/engine/orkes/orkes_workflow_client.py b/omagent-core/src/omagent_core/engine/orkes/orkes_workflow_client.py index b89c9c01..3b976bf1 100644 --- a/omagent-core/src/omagent_core/engine/orkes/orkes_workflow_client.py +++ b/omagent-core/src/omagent_core/engine/orkes/orkes_workflow_client.py @@ -22,6 +22,11 @@ def __init__( ): super(OrkesWorkflowClient, self).__init__(configuration) + def check_connection(self) -> bool: + """Check if the connection to Conductor server is valid""" + self.metadataResourceApi.get_all_workflows() + + def start_workflow_by_name( self, name: str, diff --git a/omagent-core/src/omagent_core/services/connectors/milvus.py b/omagent-core/src/omagent_core/services/connectors/milvus.py index a24a3101..b5ee34e6 100644 --- a/omagent-core/src/omagent_core/services/connectors/milvus.py +++ b/omagent-core/src/omagent_core/services/connectors/milvus.py @@ -1,6 +1,5 @@ from .base import ConnectorBase -from pymilvus import Collection, DataType, MilvusClient, connections, utility -from pymilvus.client import types +from pymilvus import MilvusClient from typing import Any, Optional from pydantic import Field from omagent_core.utils.registry import registry @@ -15,11 +14,18 @@ class MilvusConnector(ConnectorBase): db: Optional[str] = Field(default="default") alias: Optional[str] = Field(default="alias") - def model_post_init(self, __context: Any) -> None: - self._client = MilvusClient( - uri=self.host, - user=self.username, - password=self.password, - db_name=self.db, - ) - \ No newline at end of file + def model_post_init(self, __context: Any) -> None: + try: + self._client = MilvusClient( + uri=self.host, + user=self.username, + password=self.password, + db_name=self.db, + ) + except Exception as e: + raise ConnectionError(f"Connection to Milvus failed. Please check your connector config in container.yaml. \n Error Message: {e}") + + def check_connection(self) -> bool: + """Check if the connection to Milvus is valid.""" + # Try to list collections to verify connection + self._client.list_collections() \ No newline at end of file diff --git a/omagent-core/src/omagent_core/services/connectors/redis.py b/omagent-core/src/omagent_core/services/connectors/redis.py index d3dfd416..a3569f84 100644 --- a/omagent-core/src/omagent_core/services/connectors/redis.py +++ b/omagent-core/src/omagent_core/services/connectors/redis.py @@ -21,4 +21,9 @@ def model_post_init(self, __context: Any) -> None: db=self.db, decode_responses=False ) - self._client = Redis(connection_pool=pool) \ No newline at end of file + self._client = Redis(connection_pool=pool) + + def check_connection(self) -> bool: + """Check if Redis connection is valid by executing a simple ping command""" + self._client.ping() + diff --git a/omagent-core/src/omagent_core/utils/container.py b/omagent-core/src/omagent_core/utils/container.py index 739e0bae..433e02df 100644 --- a/omagent-core/src/omagent_core/utils/container.py +++ b/omagent-core/src/omagent_core/utils/container.py @@ -203,5 +203,24 @@ def clean_config_dict(config_dict: dict) -> dict: overwrite=True, ) + self.check_connection() + def check_connection(self): + for name, connector in self._connectors.items(): + try: + connector.check_connection() + except Exception as e: + raise ConnectionError(f"Connection to {name} failed. Please check your connector config in container.yaml. \n Error Message: {e}") + + try: + from omagent_core.engine.orkes.orkes_workflow_client import OrkesWorkflowClient + conductor_client = OrkesWorkflowClient(self.conductor_config) + conductor_client.check_connection() + except Exception as e: + raise ConnectionError(f"Connection to Conductor failed. Please check your conductor config in container.yaml. \n Error Message: {e}") + + print("--------------------------------") + print("All connections passed the connection check") + print("--------------------------------") + container = Container()