Skip to content

Commit

Permalink
Pass test
Browse files Browse the repository at this point in the history
  • Loading branch information
liurenjie1024 committed Jan 28, 2024
1 parent 005a5ac commit 97036f2
Show file tree
Hide file tree
Showing 13 changed files with 1,590 additions and 12 deletions.
24 changes: 24 additions & 0 deletions integration_tests/iceberg-sink2/docker/hive/config.ini
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
[default]
aws_key=hummockadmin
aws_secret=hummockadmin

[risingwave]
db=dev
user=root
host=127.0.0.1
port=4566

[sink]
connector = iceberg
type=append-only
force_append_only = true
catalog.type = hive
catalog.uri = thrift://metastore:9083
warehouse.path = s3://icebergdata/demo
s3.endpoint=http://minio-0:9301
s3.access.key = hummockadmin
s3.secret.key = hummockadmin
s3.region = ap-southeast-1
catalog.name = demo
database.name=s1
table.name=t1
111 changes: 111 additions & 0 deletions integration_tests/iceberg-sink2/docker/hive/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
version: '3.8'

services:
postgres:
image: postgres:16.1
environment:
POSTGRES_USER: admin
POSTGRES_PASSWORD: 123456
POSTGRES_DB: metastore_db
expose:
- 5432
ports:
- "5432:5432"
networks:
iceberg_net:
spark:
depends_on:
- minio-0
- metastore
image: ghcr.io/icelake-io/icelake-spark:1.0
environment:
- AWS_ACCESS_KEY_ID=hummockadmin
- AWS_SECRET_ACCESS_KEY=hummockadmin
- AWS_REGION=us-east-1
- SPARK_HOME=/opt/spark
- PYSPARK_PYTHON=/usr/bin/python3.9
- PATH=/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin:/opt/spark/bin:/opt/spark/sbin
user: root
networks:
iceberg_net:
links:
- minio-0:icebergdata.minio-0
expose:
- 15002
healthcheck:
test: netstat -ltn | grep -c 15002
interval: 1s
retries: 1200
volumes:
- ./spark-script:/spark-script
entrypoint: [ "/spark-script/spark-connect-server.sh" ]

risingwave-standalone:
extends:
file: ../../../../docker/docker-compose.yml
service: risingwave-standalone
healthcheck:
test:
- CMD-SHELL
- bash -c 'printf \"GET / HTTP/1.1\n\n\" > /dev/tcp/127.0.0.1/4566; exit $$?;'
interval: 1s
timeout: 30s
networks:
iceberg_net:

minio-0:
extends:
file: ../../../../docker/docker-compose.yml
service: minio-0
entrypoint: "
/bin/sh -c '
set -e
mkdir -p \"/data/icebergdata/demo\"
mkdir -p \"/data/hummock001\"
/usr/bin/docker-entrypoint.sh \"$$0\" \"$$@\"
'"
networks:
iceberg_net:

etcd-0:
extends:
file: ../../../../docker/docker-compose.yml
service: etcd-0
networks:
iceberg_net:

metastore:
image: naushadh/hive-metastore
depends_on:
- postgres
environment:
- DATABASE_HOST=postgres
- DATABASE_DB=metastore_db
- DATABASE_USER=admin
- DATABASE_PASSWORD=123456
- AWS_ACCESS_KEY_ID=hummockadmin
- AWS_SECRET_ACCESS_KEY=hummockadmin
- S3_ENDPOINT_URL=http://minio-0:9301
- S3_BUCKET=icebergdata
- S3_PREFIX=demo
ports:
- "9083:9083"
expose:
- 9083
networks:
iceberg_net:

volumes:
risingwave-standalone:
external: false
etcd-0:
external: false
minio-0:
external: false

networks:
iceberg_net:
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
#!/bin/bash

set -ex

JARS=$(find /opt/spark/deps -type f -name "*.jar" | tr '\n' ':')

/opt/spark/sbin/start-connect-server.sh \
--master local[3] \
--driver-class-path $JARS \
--conf spark.driver.bindAddress=0.0.0.0 \
--conf spark.sql.catalog.demo=org.apache.iceberg.spark.SparkCatalog \
--conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions \
--conf spark.sql.catalog.demo.catalog-impl=org.apache.iceberg.hive.HiveCatalog \
--conf spark.sql.catalog.demo.uri=thrift://metastore:9083 \
--conf spark.sql.catalog.demo.clients=10 \
--conf spark.sql.catalog.demo.warehouse=s3a://icebergdata/demo \
--conf spark.sql.catalog.demo.hadoop.fs.s3a.endpoint=http://minio-0:9301 \
--conf spark.sql.catalog.demo.hadoop.fs.s3a.path.style.access=true \
--conf spark.sql.catalog.demo.hadoop.fs.s3a.access.key=hummockadmin \
--conf spark.sql.catalog.demo.hadoop.fs.s3a.secret.key=hummockadmin \
--conf spark.sql.defaultCatalog=demo

tail -f /opt/spark/logs/spark*.out
3 changes: 2 additions & 1 deletion integration_tests/iceberg-sink2/python/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,8 @@ def check_spark_table(case_name):
if __name__ == "__main__":
# case_name = "rest"
# case_name = "storage"
case_name = "jdbc"
# case_name = "jdbc"
case_name = "hive"
config = configparser.ConfigParser()
config.read(f"{case_dir(case_name)}/config.ini")
print({section: dict(config[section]) for section in config.sections()})
Expand Down
3 changes: 3 additions & 0 deletions java/connector-node/assembly/assembly.xml
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,9 @@
<include>*:risingwave-sink-mock-flink-http-sink</include>

</includes>
<excludes>
<exclude>org.apache.iceberg:iceberg-common</exclude>
</excludes>
<useTransitiveDependencies>true</useTransitiveDependencies>
<useTransitiveFiltering>true</useTransitiveFiltering>
</dependencySet>
Expand Down
30 changes: 25 additions & 5 deletions java/connector-node/risingwave-sink-iceberg/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -65,11 +65,36 @@
<artifactId>iceberg-data</artifactId>
<version>${iceberg.version}</version>
</dependency>
<dependency>
<groupId>org.apache.iceberg</groupId>
<artifactId>iceberg-hive-metastore</artifactId>
<version>${iceberg.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-metastore</artifactId>
</dependency>
<dependency>
<groupId>org.apache.parquet</groupId>
<artifactId>parquet-avro</artifactId>
<version>1.12.3</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-core</artifactId>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-common</artifactId>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-jobclient</artifactId>
</dependency>
<dependency>
<groupId>org.apache.iceberg</groupId>
<artifactId>iceberg-aws</artifactId>
Expand All @@ -83,11 +108,6 @@
<groupId>software.amazon.awssdk</groupId>
<artifactId>sts</artifactId>
</dependency>
<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>url-connection-client</artifactId>
<version>2.18.20</version>
</dependency>
<dependency>
<groupId>org.postgresql</groupId>
<artifactId>postgresql</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ public static JniCatalogWrapper create(String name, String klassName, String[] p
props.length % 2 == 0,
"props should be key-value pairs, but length is: " + props.length);

Thread.currentThread().setContextClassLoader(ClassLoader.getSystemClassLoader());
// Thread.currentThread().setContextClassLoader(ClassLoader.getSystemClassLoader());
System.out.println("Current thread name is: " + Thread.currentThread().getName());

// try {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.common;

import org.apache.iceberg.relocated.com.google.common.base.Joiner;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;

import java.util.Set;

public class DynClasses {

private DynClasses() {}

public static Builder builder() {
return new Builder();
}

public static class Builder {
private Class<?> foundClass = null;
private boolean nullOk = false;
private Set<String> classNames = Sets.newLinkedHashSet();

private Builder() {}

/**
* Set the {@link ClassLoader} used to lookup classes by name.
*
* <p>If not set, the current thread's ClassLoader is used.
*
* @param newLoader a ClassLoader
* @return this Builder for method chaining
*/
public Builder loader(ClassLoader newLoader) {
return this;
}

/**
* Checks for an implementation of the class by name.
*
* @param className name of a class
* @return this Builder for method chaining
*/
public Builder impl(String className) {
classNames.add(className);

if (foundClass != null) {
return this;
}

try {
this.foundClass = Class.forName(className);
} catch (ClassNotFoundException e) {
// not the right implementation
}

return this;
}

/**
* Instructs this builder to return null if no class is found, rather than throwing an
* Exception.
*
* @return this Builder for method chaining
*/
public Builder orNull() {
this.nullOk = true;
return this;
}

/**
* Returns the first implementation or throws ClassNotFoundException if one was not found.
*
* @param <S> Java superclass
* @return a {@link Class} for the first implementation found
* @throws ClassNotFoundException if no implementation was found
*/
@SuppressWarnings("unchecked")
public <S> Class<? extends S> buildChecked() throws ClassNotFoundException {
if (!nullOk && foundClass == null) {
throw new ClassNotFoundException(
"Cannot find class; alternatives: " + Joiner.on(", ").join(classNames));
}
return (Class<? extends S>) foundClass;
}

/**
* Returns the first implementation or throws RuntimeException if one was not found.
*
* @param <S> Java superclass
* @return a {@link Class} for the first implementation found
* @throws RuntimeException if no implementation was found
*/
@SuppressWarnings("unchecked")
public <S> Class<? extends S> build() {
if (!nullOk && foundClass == null) {
throw new RuntimeException(
"Cannot find class; alternatives: " + Joiner.on(", ").join(classNames));
}
return (Class<? extends S>) foundClass;
}
}
}
Loading

0 comments on commit 97036f2

Please sign in to comment.