Skip to content

Commit

Permalink
Merge pull request #24121 from bharathv/datalake_fix_cdt_tests
Browse files Browse the repository at this point in the history
datalake/tests: fix cdt tests
  • Loading branch information
piyushredpanda authored Nov 15, 2024
2 parents 60c6e8c + 6e670a1 commit 0764782
Show file tree
Hide file tree
Showing 13 changed files with 166 additions and 134 deletions.
41 changes: 0 additions & 41 deletions tests/java/iceberg-rest-catalog/src/main/resources/core-site.xml

This file was deleted.

73 changes: 46 additions & 27 deletions tests/rptest/services/apache_iceberg_catalog.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,13 @@

import os
import tempfile
from typing import Optional

from ducktape.services.service import Service
from ducktape.cluster.cluster import ClusterNode
from ducktape.utils.util import wait_until
from pyiceberg.catalog import load_catalog

import uuid
import jinja2


class IcebergRESTCatalog(Service):
Expand All @@ -39,50 +39,60 @@ class IcebergRESTCatalog(Service):
FS_CATALOG_IMPL = "org.apache.iceberg.hadoop.HadoopCatalog"
FS_CATALOG_CONF_PATH = "/opt/iceberg-rest-catalog/core-site.xml"

HADOOP_CONF_TMPL = """<?xml version="1.0"?>
HADOOP_CONF_TMPL = jinja2.Template("""<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
<property>
<name>fs.s3a.aws.credentials.provider</name>
{%- if fs_dedicated_nodes %}
<value>org.apache.hadoop.fs.s3a.auth.IAMInstanceCredentialsProvider</value>
{% else -%}
<value>org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider</value>
{% endif %}
</property>
{%- if fs_endpoint %}
<property>
<name>fs.s3a.endpoint</name>
<value>{fs_endpoint}</value>
<value>{{ fs_endpoint }}</value>
</property>
{%- endif %}
<property>
<name>fs.s3a.path.style.access</name>
<value>true</value>
</property>
<property>
<name>fs.s3a.endpoint.region</name>
<value>{fs_region}</value>
<value>{{ fs_region }}</value>
</property>
{%- if fs_access_key %}
<property>
<name>fs.s3a.access.key</name>
<value>{fs_access_key}</value>
<value>{{ fs_access_key }}</value>
</property>
{%- endif %}
{%- if fs_secret_key %}
<property>
<name>fs.s3a.secret.key</name>
<value>{fs_secret_key}</value>
<value>{{ fs_secret_key }}</value>
</property>
{%- endif %}
<property>
<name>fs.s3a.connection.ssl.enabled</name>
<value>false</value>
<value>{{ fs_dedicated_nodes }}</value>
</property>
</configuration>"""
</configuration>""")

def __init__(
self,
ctx,
cloud_storage_bucket: str,
cloud_storage_catalog_prefix: str = 'redpanda-iceberg-catalog',
cloud_storage_access_key: str = 'panda-user',
cloud_storage_secret_key: str = 'panda-secret',
cloud_storage_region: str = 'panda-region',
cloud_storage_api_endpoint: str = "http://minio-s3:9000",
cloud_storage_access_key: Optional[str] = 'panda-user',
cloud_storage_secret_key: Optional[str] = 'panda-secret',
cloud_storage_region: Optional[str] = 'panda-region',
cloud_storage_api_endpoint: Optional[str] = "http://minio-s3:9000",
filesystem_wrapper_mode: bool = False,
node: ClusterNode | None = None):
super(IcebergRESTCatalog, self).__init__(ctx,
Expand All @@ -93,6 +103,7 @@ def __init__(
self.cloud_storage_api_endpoint = cloud_storage_api_endpoint
self.cloud_storage_bucket = cloud_storage_bucket
self.cloud_storage_catalog_prefix = cloud_storage_catalog_prefix
self.dedicated_nodes = ctx.globals.get("dedicated_nodes", False)
self.set_filesystem_wrapper_mode(filesystem_wrapper_mode)
# This REST server can operate in two modes.
# 1. filesystem wrapper mode
Expand Down Expand Up @@ -126,10 +137,14 @@ def _make_env(self):
env = dict()

# Common envs
env["AWS_ACCESS_KEY_ID"] = self.cloud_storage_access_key
env["AWS_SECRET_ACCESS_KEY"] = self.cloud_storage_secret_key
env["AWS_REGION"] = self.cloud_storage_region
env["CATALOG_S3_ENDPOINT"] = self.cloud_storage_api_endpoint
if self.cloud_storage_region:
env["AWS_REGION"] = self.cloud_storage_region
if self.cloud_storage_access_key:
env["AWS_ACCESS_KEY_ID"] = self.cloud_storage_access_key
if self.cloud_storage_secret_key:
env["AWS_SECRET_ACCESS_KEY"] = self.cloud_storage_secret_key
if self.cloud_storage_api_endpoint:
env["CATALOG_S3_ENDPOINT"] = self.cloud_storage_api_endpoint
env["CATALOG_WAREHOUSE"] = self.cloud_storage_warehouse
if self.filesystem_wrapper_mode:
env["CATALOG_CATALOG__IMPL"] = IcebergRESTCatalog.FS_CATALOG_IMPL
Expand All @@ -152,25 +167,29 @@ def _cmd(self):

def client(self, catalog_name="default"):
assert self.catalog_url
return load_catalog(
catalog_name, **{
"uri": self.catalog_url,
"s3.endpoint": self.cloud_storage_api_endpoint,
"s3.access-key-id": self.cloud_storage_access_key,
"s3.secret-access-key": self.cloud_storage_secret_key,
"s3.region": self.cloud_storage_region,
})
conf = dict()
conf["uri"] = self.catalog_url
if self.cloud_storage_api_endpoint:
conf["s3.endpoint"] = self.cloud_storage_api_endpoint
if self.cloud_storage_access_key:
conf["s3.access-key-id"] = self.cloud_storage_access_key
if self.cloud_storage_secret_key:
conf["s3.secret-access-key"] = self.cloud_storage_secret_key
if self.cloud_storage_region:
conf["s3.region"] = self.cloud_storage_region
return load_catalog(catalog_name, **conf)

def start_node(self, node, timeout_sec=60, **kwargs):
node.account.ssh("mkdir -p %s" % IcebergRESTCatalog.PERSISTENT_ROOT,
allow_fail=False)
# Delete any existing hadoop config and repopulate
node.account.ssh(f"rm -f {IcebergRESTCatalog.FS_CATALOG_CONF_PATH}")
config_tmpl = IcebergRESTCatalog.HADOOP_CONF_TMPL.format(
config_tmpl = IcebergRESTCatalog.HADOOP_CONF_TMPL.render(
fs_endpoint=self.cloud_storage_api_endpoint,
fs_region=self.cloud_storage_region,
fs_access_key=self.cloud_storage_access_key,
fs_secret_key=self.cloud_storage_secret_key)
fs_secret_key=self.cloud_storage_secret_key,
fs_dedicated_nodes=self.dedicated_nodes)
self.logger.debug(f"Using hadoop config: {config_tmpl}")
node.account.create_file(IcebergRESTCatalog.FS_CATALOG_CONF_PATH,
config_tmpl)
Expand Down
38 changes: 24 additions & 14 deletions tests/rptest/services/spark_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,14 @@
# the Business Source License, use of this software will be governed
# by the Apache License, Version 2.0

import subprocess
from typing import Optional
from ducktape.services.service import Service
from ducktape.cluster.cluster import ClusterNode
from ducktape.utils.util import wait_until
from pyhive import hive
from rptest.services.redpanda import SISettings
from rptest.tests.datalake.query_engine_base import QueryEngineType, QueryEngineBase
import jinja2


class SparkService(Service, QueryEngineBase):
Expand All @@ -24,17 +24,18 @@ class SparkService(Service, QueryEngineBase):
LOGS_DIR = f"{SPARK_HOME}/logs"
logs = {"spark_sql_logs": {"path": LOGS_DIR, "collect_default": True}}

SPARK_SERVER_CMD = """AWS_ACCESS_KEY_ID={akey} AWS_SECRET_ACCESS_KEY={skey} AWS_REGION={region} \
/opt/spark/sbin/start-thriftserver.sh \
SPARK_SERVER_CMD = jinja2.Template(
""" /opt/spark/sbin/start-thriftserver.sh \
--conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions \
--conf spark.sql.defaultCatalog=redpanda-iceberg-catalog \
--conf spark.sql.catalog.redpanda-iceberg-catalog=org.apache.iceberg.spark.SparkCatalog \
--conf spark.sql.catalog.redpanda-iceberg-catalog.type=rest \
--conf spark.sql.catalog.redpanda-iceberg-catalog.uri={catalog} \
--conf spark.sql.catalog.redpanda-iceberg-catalog.uri={{ catalog }} \
--conf spark.sql.catalog.redpanda-iceberg-catalog.io-impl=org.apache.iceberg.aws.s3.S3FileIO \
--conf spark.sql.catalog.redpanda-iceberg-catalog.s3.endpoint={s3} \
--conf spark.sql.catalog.redpanda-iceberg-catalog.cache-enabled=false
"""
--conf spark.sql.catalog.redpanda-iceberg-catalog.cache-enabled=false \
{% if s3 -%}
--conf spark.sql.catalog.redpanda-iceberg-catalog.s3.endpoint={{ s3 }}
{% endif %}""")

def __init__(self, ctx, iceberg_catalog_rest_uri: str, si: SISettings):
super(SparkService, self).__init__(ctx, num_nodes=1)
Expand All @@ -46,20 +47,29 @@ def __init__(self, ctx, iceberg_catalog_rest_uri: str, si: SISettings):
self.spark_host: Optional[SparkService] = None
self.spark_port = 10000

def make_env(self):
env = dict()
if self.cloud_storage_access_key:
env["AWS_ACCESS_KEY_ID"] = self.cloud_storage_access_key
if self.cloud_storage_secret_key:
env["AWS_SECRET_ACCESS_KEY"] = self.cloud_storage_secret_key
if self.cloud_storage_region:
env["AWS_REGION"] = self.cloud_storage_region
return " ".join([f"{k}={v}" for k, v in env.items()])

def start_cmd(self):
return SparkService.SPARK_SERVER_CMD.format(\
akey=self.cloud_storage_access_key, \
skey=self.cloud_storage_secret_key, \
region=self.cloud_storage_region, \
catalog=self.iceberg_catalog_rest_uri, \
cmd = SparkService.SPARK_SERVER_CMD.render(
catalog=self.iceberg_catalog_rest_uri,
s3=self.cloud_storage_api_endpoint)
env = self.make_env()
return f"{env} {cmd}"

def start_node(self, node, timeout_sec=30, **kwargs):
def start_node(self, node, timeout_sec=120, **kwargs):
node.account.ssh(self.start_cmd(), allow_fail=False)
self.spark_host = node.account.hostname
self.wait(timeout_sec=timeout_sec)

def wait_node(self, node, timeout_sec=None):
def wait_node(self, node, timeout_sec):
def _ready():
try:
self.run_query_fetch_all("show databases")
Expand Down
45 changes: 27 additions & 18 deletions tests/rptest/services/trino_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
from pyhive import trino
from rptest.services.redpanda import SISettings
from rptest.services.spark_service import QueryEngineBase
import jinja2
from rptest.tests.datalake.query_engine_base import QueryEngineType


Expand All @@ -29,16 +30,24 @@ class TrinoService(Service, QueryEngineBase):
LOG_FILE = os.path.join(PERSISTENT_ROOT, "trino_server.log")
logs = {"iceberg_rest_logs": {"path": LOG_FILE, "collect_default": True}}

ICEBERG_CONNECTOR_CONF = """
ICEBERG_CONNECTOR_CONF = jinja2.Template("""
connector.name=iceberg
iceberg.catalog.type=rest
iceberg.rest-catalog.uri={catalog_rest_uri}
iceberg.rest-catalog.uri={{ catalog_rest_uri }}
fs.native-s3.enabled=true
s3.endpoint={s3_endpoint}
s3.region={s3_region}
{%- if s3_region %}
s3.region={{ s3_region }}
{%- endif %}
s3.path-style-access=true
s3.aws-access-key={s3_access_key}
s3.aws-secret-key={s3_secret_key}"""
{%- if s3_endpoint %}
s3.endpoint={{ s3_endpoint }}
{%- endif %}
{%- if s3_access_key %}
s3.aws-access-key={{ s3_access_key }}
{%- endif %}
{%- if s3_secret_key %}
s3.aws-secret-key={{ s3_secret_key }}
{%- endif %}""")
TRINO_LOGGING_CONF = "io.trino=INFO\n"
TRINO_LOGGING_CONF_FILE = "/opt/trino/etc/log.properties"

Expand All @@ -52,18 +61,18 @@ def __init__(self, ctx, iceberg_catalog_rest_uri: str, si: SISettings):
self.trino_host: Optional[str] = None
self.trino_port = 8083

def start_node(self, node, timeout_sec=60, **kwargs):
def start_node(self, node, timeout_sec=120, **kwargs):
node.account.ssh(f"mkdir -p {TrinoService.PERSISTENT_ROOT}")
node.account.ssh(f"rm -f {TrinoService.TRINO_CONF_PATH}")
connector_config = TrinoService.ICEBERG_CONNECTOR_CONF.format(
catalog_rest_uri=self.iceberg_catalog_rest_uri,
s3_endpoint=self.cloud_storage_api_endpoint,
s3_region=self.cloud_storage_region,
s3_access_key=self.cloud_storage_access_key,
s3_secret_key=self.cloud_storage_secret_key)
self.logger.debug(f"Using connector config: {connector_config}")
node.account.create_file(TrinoService.TRINO_CONF_PATH,
connector_config)
connector_config = dict(catalog_rest_uri=self.iceberg_catalog_rest_uri,
s3_region=self.cloud_storage_region,
s3_endpoint=self.cloud_storage_api_endpoint,
s3_access_key=self.cloud_storage_access_key,
s3_secret_key=self.cloud_storage_secret_key)
config_str = TrinoService.ICEBERG_CONNECTOR_CONF.render(
connector_config)
self.logger.debug(f"Using connector config: {config_str}")
node.account.create_file(TrinoService.TRINO_CONF_PATH, config_str)
# Create logger configuration
node.account.ssh(f"rm -f {TrinoService.TRINO_LOGGING_CONF_FILE}")
node.account.create_file(TrinoService.TRINO_LOGGING_CONF_FILE,
Expand All @@ -72,9 +81,9 @@ def start_node(self, node, timeout_sec=60, **kwargs):
f"nohup /opt/trino/bin/trino-launcher run 1> {TrinoService.LOG_FILE} 2>&1 &",
allow_fail=False)
self.trino_host = node.account.hostname
self.wait(timeout_sec=30)
self.wait(timeout_sec=timeout_sec)

def wait_node(self, node, timeout_sec=None):
def wait_node(self, node, timeout_sec):
def _ready():
try:
self.run_query_fetch_all("show catalogs")
Expand Down
8 changes: 4 additions & 4 deletions tests/rptest/tests/datalake/datalake_e2e_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,10 +65,10 @@ def _get_serde_client(
compression_type=compression_type)

@cluster(num_nodes=4)
@matrix(cloud_storage_type=supported_storage_types(),
@matrix(storage_type=supported_storage_types(),
query_engine=[QueryEngineType.SPARK, QueryEngineType.TRINO],
filesystem_catalog_mode=[True])
def test_e2e_basic(self, cloud_storage_type, query_engine,
def test_e2e_basic(self, storage_type, query_engine,
filesystem_catalog_mode):
# Create a topic
# Produce some events
Expand All @@ -83,9 +83,9 @@ def test_e2e_basic(self, cloud_storage_type, query_engine,
dl.wait_for_translation(self.topic_name, msg_count=count)

@cluster(num_nodes=4)
@matrix(cloud_storage_type=supported_storage_types(),
@matrix(storage_type=supported_storage_types(),
query_engine=[QueryEngineType.SPARK, QueryEngineType.TRINO])
def test_avro_schema(self, cloud_storage_type, query_engine):
def test_avro_schema(self, storage_type, query_engine):
count = 100
table_name = f"redpanda.{self.topic_name}"

Expand Down
6 changes: 3 additions & 3 deletions tests/rptest/tests/datalake/datalake_services.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,10 @@ def __init__(self,
self.catalog_service = IcebergRESTCatalog(
test_ctx,
cloud_storage_bucket=si_settings.cloud_storage_bucket,
cloud_storage_access_key=str(si_settings.cloud_storage_access_key),
cloud_storage_secret_key=str(si_settings.cloud_storage_secret_key),
cloud_storage_access_key=si_settings.cloud_storage_access_key,
cloud_storage_secret_key=si_settings.cloud_storage_secret_key,
cloud_storage_region=si_settings.cloud_storage_region,
cloud_storage_api_endpoint=str(si_settings.endpoint_url),
cloud_storage_api_endpoint=si_settings.endpoint_url,
filesystem_wrapper_mode=filesystem_catalog_mode)
self.included_query_engines = include_query_engines
# To be populated later once we have the URI of the catalog
Expand Down
Loading

0 comments on commit 0764782

Please sign in to comment.