Skip to content

Commit

Permalink
MINOR: trino integration test (open-metadata#16291)
Browse files Browse the repository at this point in the history
* added trino integration test

* - removed warnings for classes which are not real tests
- removed "helpers" as its being used

* use a docker network instead of host

* print logs for hive failure

* removed superset unit tests

* try pinning requests for test

* try pinning requests for test

* wait for hive to be ready

* fix trino fixture

* - reduced testcontainers_config.max_tries to 5
- remove intermediate containers

* print with logs

* disable capture logging

* updated db host

* removed debug stuff

* removed debug stuff

* removed version pin for requests

* reverted superset

* ignore trino integration on python 3.8
  • Loading branch information
sushi30 authored May 22, 2024
1 parent bbe9389 commit d5bf30c
Show file tree
Hide file tree
Showing 19 changed files with 410 additions and 29 deletions.
3 changes: 2 additions & 1 deletion ingestion/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@
VERSIONS["pymysql"],
"python-dateutil>=2.8.1",
"PyYAML~=6.0",
"requests>=2.23, <2.32",
"requests>=2.23",
"requests-aws4auth~=1.1", # Only depends on requests as external package. Leaving as base.
"sqlalchemy>=1.4.0,<2",
"collate-sqllineage~=1.4.0",
Expand Down Expand Up @@ -324,6 +324,7 @@
"minio==7.2.5",
*plugins["mlflow"],
*plugins["datalake-s3"],
"requests==2.31.0",
}

e2e_test = {
Expand Down
2 changes: 2 additions & 0 deletions ingestion/src/metadata/workflow/data_quality.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ class TestSuiteWorkflow(IngestionWorkflow):
this workflow. No need to do anything here if this does not pass
"""

__test__ = False

def set_steps(self):
self.source = TestSuiteSource.create(self.config.dict(), self.metadata)

Expand Down
28 changes: 28 additions & 0 deletions ingestion/tests/integration/conftest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
import sys

import pytest

from .integration_base import int_admin_ometa

if not sys.version_info >= (3, 9):
collect_ignore = ["trino"]


@pytest.fixture(scope="module")
def metadata():
return int_admin_ometa()


def pytest_pycollect_makeitem(collector, name, obj):
try:
if obj.__base__.__name__ in ("BaseModel", "Enum"):
return []
except AttributeError:
pass


@pytest.fixture(scope="session", autouse=sys.version_info >= (3, 9))
def config_testcontatiners():
from testcontainers.core.config import testcontainers_config

testcontainers_config.max_tries = 10
7 changes: 0 additions & 7 deletions ingestion/tests/integration/mlflow/test_mlflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,6 @@
)
from metadata.workflow.metadata import MetadataWorkflow

from ..integration_base import int_admin_ometa

MODEL_HYPERPARAMS = {
"alpha": {"name": "alpha", "value": "0.5", "description": None},
"l1_ratio": {"name": "l1_ratio", "value": "1.0", "description": None},
Expand Down Expand Up @@ -130,11 +128,6 @@ def create_data(mlflow_environment):
mlflow.sklearn.log_model(lr, "model")


@pytest.fixture(scope="module")
def metadata():
return int_admin_ometa()


@pytest.fixture(scope="module")
def service(metadata, mlflow_environment):
service = CreateMlModelServiceRequest(
Expand Down
7 changes: 0 additions & 7 deletions ingestion/tests/integration/postgres/test_postgres.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,6 @@
from metadata.workflow.profiler import ProfilerWorkflow
from metadata.workflow.usage import UsageWorkflow

from ..integration_base import int_admin_ometa

if not sys.version_info >= (3, 9):
pytest.skip("requires python 3.9+", allow_module_level=True)

Expand All @@ -53,11 +51,6 @@ def config_logging():
logging.getLogger("sqlfluff").setLevel(logging.CRITICAL)


@pytest.fixture(scope="module")
def metadata():
return int_admin_ometa()


@pytest.fixture(scope="module")
def db_service(metadata, postgres_container):
service = CreateDatabaseServiceRequest(
Expand Down
7 changes: 0 additions & 7 deletions ingestion/tests/integration/profiler/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,19 +17,12 @@
)
from metadata.generated.schema.security.credentials.awsCredentials import AWSCredentials

from ...integration.integration_base import int_admin_ometa

if TYPE_CHECKING:
from mypy_boto3_dynamodb.client import DynamoDBClient
else:
DynamoDBClient = None


@pytest.fixture(scope="session")
def metadata():
return int_admin_ometa()


@pytest.fixture(scope="session")
def localstack_container():
with LocalStackContainer("localstack/localstack:3.3") as container:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,17 +26,10 @@
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.workflow.metadata import MetadataWorkflow

from ..integration_base import int_admin_ometa

if not sys.version_info >= (3, 9):
pytest.skip("requires python 3.9+", allow_module_level=True)


@pytest.fixture(scope="module")
def metadata():
return int_admin_ometa()


@pytest.fixture(
scope="module",
params=[
Expand Down
Empty file.
165 changes: 165 additions & 0 deletions ingestion/tests/integration/trino/conftest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,165 @@
import os.path
import random

import docker
import pytest
import testcontainers.core.network
from sqlalchemy import create_engine
from tenacity import retry, stop_after_delay, wait_fixed
from testcontainers.core.container import DockerContainer
from testcontainers.core.generic import DbContainer
from testcontainers.minio import MinioContainer
from testcontainers.mysql import MySqlContainer


class TrinoContainer(DbContainer):
def __init__(
self,
image: str = "trinodb/trino",
port=8080,
**kwargs,
):
super().__init__(image, **kwargs)
self.user = "admin"
self.port = port
self.with_exposed_ports(port)
self._built_image = f"trino:{random.randint(0, 10000)}"

def start(self) -> "DbContainer":
self.build()
self.image = self._built_image
return super().start()

def _connect(self) -> None:
super()._connect()
engine = create_engine(self.get_connection_url())
try:
retry(wait=wait_fixed(1), stop=stop_after_delay(120))(engine.execute)(
"select system.runtime.nodes.node_id from system.runtime.nodes"
).fetchall()
finally:
engine.dispose()

def _configure(self) -> None:
pass

def stop(self, force=True, delete_volume=True) -> None:
super().stop(force, delete_volume)
self._docker.client.images.remove(self._built_image)

def get_connection_url(self) -> str:
return f"trino://{self.user}:@{self.get_container_host_ip()}:{self.get_exposed_port(self.port)}/?http_scheme=http"

def build(self):
docker_client = docker.from_env()
docker_client.images.build(
path=os.path.dirname(__file__) + "/trino",
tag=self._built_image,
buildargs={"BASE_IMAGE": self.image},
rm=True,
)


class HiveMetaStoreContainer(DockerContainer):
def __init__(
self,
image: str = "apache/hive",
port=9083,
**kwargs,
):
super().__init__(image, **kwargs)
self.port = port
self.with_exposed_ports(port)
self._build_args = {}
self._built_image = f"hive:{random.randint(0, 10000)}"

def start(self) -> "DockerContainer":
self.build()
self.image = self._built_image
return super().start()

def with_build_args(self, key, value) -> "HiveMetaStoreContainer":
self._build_args.update({key: value})
return self

def stop(self, force=True, delete_volume=True) -> None:
super().stop(force, delete_volume)
self._docker.client.images.remove(self._built_image)

def build(self):
docker_client = docker.from_env()
docker_client.images.build(
path=os.path.dirname(__file__) + "/hive",
tag=self._built_image,
buildargs={
"BASE_IMAGE": self.image,
},
rm=True,
)


@pytest.fixture(scope="module")
def docker_network():
with testcontainers.core.network.Network() as network:
yield network


@pytest.fixture(scope="module")
def trino_container(hive_metastore_container, minio_container, docker_network):
with TrinoContainer(image="trinodb/trino:418").with_network(
docker_network
).with_env(
"HIVE_METASTORE_URI",
f"thrift://metastore:{hive_metastore_container.port}",
).with_env(
"MINIO_ENDPOINT",
f"http://minio:{minio_container.port}",
) as trino:
yield trino


@pytest.fixture(scope="module")
def mysql_container(docker_network):
with MySqlContainer(
"mariadb:10.6.16", username="admin", password="admin", dbname="metastore_db"
).with_network(docker_network).with_network_aliases("mariadb") as mysql:
yield mysql


@pytest.fixture(scope="module")
def hive_metastore_container(mysql_container, minio_container, docker_network):
with HiveMetaStoreContainer("bitsondatadev/hive-metastore:latest").with_network(
docker_network
).with_network_aliases("metastore").with_env(
"METASTORE_DB_HOSTNAME", "mariadb"
).with_env(
"METASTORE_DB_PORT", str(mysql_container.port)
).with_env(
"JDBC_CONNECTION_URL",
f"jdbc:mysql://mariadb:{mysql_container.port}/{mysql_container.dbname}",
).with_env(
"MINIO_ENDPOINT",
f"http://minio:{minio_container.port}",
) as hive:
yield hive


@pytest.fixture(scope="module")
def minio_container(docker_network):
with MinioContainer().with_network(docker_network).with_network_aliases(
"minio"
) as minio:
client = minio.get_client()
client.make_bucket("hive-warehouse")
yield minio


@pytest.fixture(scope="module")
def create_test_data(trino_container):
engine = create_engine(trino_container.get_connection_url())
engine.execute(
"create schema minio.my_schema WITH (location = 's3a://hive-warehouse/')"
)
engine.execute("create table minio.my_schema.test_table (id int)")
engine.execute("insert into minio.my_schema.test_table values (1), (2), (3)")
return
9 changes: 9 additions & 0 deletions ingestion/tests/integration/trino/hive/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
ARG BASE_IMAGE=bitsondatadev/hive-metastore:latest
FROM ${BASE_IMAGE}
COPY conf/metastore-site.xml /opt/apache-hive-metastore-3.0.0-bin/conf/metastore-site.xml
COPY entrypoint.sh /entrypoint.sh
ENV JDBC_CONNECTION_URL ""
ENV MINIO_ENDPOINT ""
USER root
RUN chmod +x /entrypoint.sh
USER hive
52 changes: 52 additions & 0 deletions ingestion/tests/integration/trino/hive/conf/metastore-site.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
<configuration>
<property><value>org.apache.hadoop.hive.metastore.events.EventCleanerTask,org.apache.hadoop.hive.metastore.MaterializationsCacheCleanerTask</value>
<name>metastore.thrift.uris</name>
<value>thrift://metastore:9083</value>
<description>Thrift URI for the remote metastore. Used by metastore client to connect to remote metastore.</description>
</property>
<property>
<name>metastore.task.threads.always</name>
<value>org.apache.hadoop.hive.metastore.events.EventCleanerTask</value>
</property>
<property>
<name>metastore.expression.proxy</name>
<value>org.apache.hadoop.hive.metastore.DefaultPartitionExpressionProxy</value>
</property>
<property>
<name>javax.jdo.option.ConnectionDriverName</name>
<value>com.mysql.cj.jdbc.Driver</value>
</property>

<property>
<name>javax.jdo.option.ConnectionURL</name>
<value>%JDBC_CONNECTION_URL%</value>
</property>

<property>
<name>javax.jdo.option.ConnectionUserName</name>
<value>admin</value>
</property>

<property>
<name>javax.jdo.option.ConnectionPassword</name>
<value>admin</value>
</property>

<property>
<name>fs.s3a.access.key</name>
<value>minioadmin</value>
</property>
<property>
<name>fs.s3a.secret.key</name>
<value>minioadmin</value>
</property>
<property>
<name>fs.s3a.endpoint</name>
<value>%MINIO_ENDPOINT%</value>
</property>
<property>
<name>fs.s3a.path.style.access</name>
<value>true</value>
</property>

</configuration>
Loading

0 comments on commit d5bf30c

Please sign in to comment.