diff --git a/ci/scripts/gen-integration-test-yaml.py b/ci/scripts/gen-integration-test-yaml.py
index 13022a06d5ebe..5822d8e2f4946 100644
--- a/ci/scripts/gen-integration-test-yaml.py
+++ b/ci/scripts/gen-integration-test-yaml.py
@@ -36,6 +36,7 @@
'doris-sink': ['json'],
'starrocks-sink': ['json'],
'deltalake-sink': ['json'],
+ 'client-library': ['none'],
}
def gen_pipeline_steps():
diff --git a/ci/scripts/integration-tests.sh b/ci/scripts/integration-tests.sh
index 47ccda049dc8b..9eed69256e50e 100755
--- a/ci/scripts/integration-tests.sh
+++ b/ci/scripts/integration-tests.sh
@@ -34,14 +34,6 @@ docker volume prune -f
echo "--- ghcr login"
echo "$GHCR_TOKEN" | docker login ghcr.io -u "$GHCR_USERNAME" --password-stdin
-echo "--- install postgresql"
-sudo yum install -y postgresql15
-
-echo "--- download rwctest-key"
-aws secretsmanager get-secret-value --secret-id "gcp-buildkite-rwctest-key" --region us-east-2 --query "SecretString" --output text >gcp-rwctest.json
-
-cd integration_tests/scripts
-
echo "--- case: ${case}, format: ${format}"
if [[ -n "${RW_IMAGE_TAG+x}" ]]; then
@@ -55,6 +47,20 @@ if [ "${BUILDKITE_SOURCE}" == "schedule" ]; then
echo Docker image: $RW_IMAGE
fi
+if [ "${case}" == "client-library" ]; then
+ cd integration_tests/client-library
+ python3 client_test.py
+ exit 0
+fi
+
+echo "--- install postgresql"
+sudo yum install -y postgresql15
+
+echo "--- download rwctest-key"
+aws secretsmanager get-secret-value --secret-id "gcp-buildkite-rwctest-key" --region us-east-2 --query "SecretString" --output text >gcp-rwctest.json
+
+cd integration_tests/scripts
+
echo "--- rewrite docker compose for protobuf"
if [ "${format}" == "protobuf" ]; then
python3 gen_pb_compose.py ${case} ${format}
diff --git a/ci/scripts/notify.py b/ci/scripts/notify.py
index 2f68733b26022..dc5d062a1efe8 100755
--- a/ci/scripts/notify.py
+++ b/ci/scripts/notify.py
@@ -59,6 +59,7 @@
"doris-sink": ["xinhao"],
"starrocks-sink": ["xinhao"],
"deltalake-sink": ["xinhao"],
+ "client-library": ["tao"],
}
def get_failed_tests(get_test_status, test_map):
diff --git a/integration_tests/client-library/client_test.py b/integration_tests/client-library/client_test.py
new file mode 100644
index 0000000000000..cd1eaa9304372
--- /dev/null
+++ b/integration_tests/client-library/client_test.py
@@ -0,0 +1,51 @@
+import subprocess
+import sys
+from time import sleep
+
+
+def check_go():
+ print("--- go client test")
+ subprocess.run(["docker", "compose", "exec", "go-lang", "bash", "-c", "cd /go-client && ./run.sh"], check=True)
+
+
+def check_python():
+ print("--- python client test")
+ subprocess.run(["docker", "compose", "exec", "python", "bash", "-c",
+ "cd /python-client && pip3 install -r requirements.txt && pytest"], check=True)
+
+
+def check_java():
+ print("--- java client test")
+ subprocess.run(["docker", "compose", "exec", "java", "bash", "-c", "apt-get update && apt-get install -y maven"],
+ check=True)
+ subprocess.run(["docker", "compose", "exec", "java", "bash", "-c", "cd /java-client && mvn clean test"], check=True)
+
+
+subprocess.run(["docker", "compose", "up", "-d"], check=True)
+sleep(10)
+
+failed_cases = []
+
+try:
+ check_go()
+except Exception as e:
+ print(e)
+ failed_cases.append("go client failed")
+
+try:
+ check_python()
+except Exception as e:
+ print(e)
+ failed_cases.append("python client failed")
+
+try:
+ check_java()
+except Exception as e:
+ print(e)
+ failed_cases.append("java client failed")
+
+if len(failed_cases) != 0:
+ print(f"--- client check failed for case\n{failed_cases}")
+ sys.exit(1)
+
+subprocess.run(["docker", "compose", "down", "--remove-orphans", "-v"], check=True)
diff --git a/integration_tests/client-library/docker-compose.yml b/integration_tests/client-library/docker-compose.yml
new file mode 100644
index 0000000000000..0bad2564d453d
--- /dev/null
+++ b/integration_tests/client-library/docker-compose.yml
@@ -0,0 +1,54 @@
+---
+version: "3"
+services:
+ risingwave-standalone:
+ extends:
+ file: ../../docker/docker-compose.yml
+ service: risingwave-standalone
+ etcd-0:
+ extends:
+ file: ../../docker/docker-compose.yml
+ service: etcd-0
+ grafana-0:
+ extends:
+ file: ../../docker/docker-compose.yml
+ service: grafana-0
+ minio-0:
+ extends:
+ file: ../../docker/docker-compose.yml
+ service: minio-0
+ prometheus-0:
+ extends:
+ file: ../../docker/docker-compose.yml
+ service: prometheus-0
+
+ go-lang:
+ image: golang:bullseye
+ command: tail -f /dev/null
+ volumes:
+ - ./go:/go-client
+ python:
+ image: python:3.9.18-slim-bullseye
+ command: tail -f /dev/null
+ volumes:
+ - ./python:/python-client
+ java:
+ image: eclipse-temurin:11.0.21_9-jdk-jammy
+ command: tail -f /dev/null
+ volumes:
+ - ./java:/java-client
+
+volumes:
+ risingwave-standalone:
+ external: false
+ etcd-0:
+ external: false
+ grafana-0:
+ external: false
+ minio-0:
+ external: false
+ prometheus-0:
+ external: false
+ message_queue:
+ external: false
+name: risingwave-compose
diff --git a/integration_tests/client-library/go/go.mod b/integration_tests/client-library/go/go.mod
new file mode 100644
index 0000000000000..04c94f20889a2
--- /dev/null
+++ b/integration_tests/client-library/go/go.mod
@@ -0,0 +1,20 @@
+module github.com/risingwave/risingwave-test/client-library-test/go
+
+go 1.20
+
+require (
+ github.com/jackc/pgx/v5 v5.4.3
+ github.com/stretchr/testify v1.8.4
+)
+
+require (
+ github.com/davecgh/go-spew v1.1.1 // indirect
+ github.com/jackc/pgpassfile v1.0.0 // indirect
+ github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a // indirect
+ github.com/kr/text v0.2.0 // indirect
+ github.com/pmezard/go-difflib v1.0.0 // indirect
+ github.com/rogpeppe/go-internal v1.11.0 // indirect
+ golang.org/x/crypto v0.9.0 // indirect
+ golang.org/x/text v0.9.0 // indirect
+ gopkg.in/yaml.v3 v3.0.1 // indirect
+)
diff --git a/integration_tests/client-library/go/go.sum b/integration_tests/client-library/go/go.sum
new file mode 100644
index 0000000000000..788dc1f7787a1
--- /dev/null
+++ b/integration_tests/client-library/go/go.sum
@@ -0,0 +1,31 @@
+github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
+github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
+github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
+github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
+github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsIM=
+github.com/jackc/pgpassfile v1.0.0/go.mod h1:CEx0iS5ambNFdcRtxPj5JhEz+xB6uRky5eyVu/W2HEg=
+github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a h1:bbPeKD0xmW/Y25WS6cokEszi5g+S0QxI/d45PkRi7Nk=
+github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a/go.mod h1:5TJZWKEWniPve33vlWYSoGYefn3gLQRzjfDlhSJ9ZKM=
+github.com/jackc/pgx/v5 v5.4.3 h1:cxFyXhxlvAifxnkKKdlxv8XqUf59tDlYjnV5YYfsJJY=
+github.com/jackc/pgx/v5 v5.4.3/go.mod h1:Ig06C2Vu0t5qXC60W8sqIthScaEnFvojjj9dSljmHRA=
+github.com/kr/pretty v0.3.0 h1:WgNl7dwNpEZ6jJ9k1snq4pZsg7DOEN8hP9Xw0Tsjwk0=
+github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
+github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
+github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
+github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
+github.com/rogpeppe/go-internal v1.11.0 h1:cWPaGQEPrBb5/AsnsZesgZZ9yb1OQ+GOISoDNXVBh4M=
+github.com/rogpeppe/go-internal v1.11.0/go.mod h1:ddIwULY96R17DhadqLgMfk9H9tvdUzkipdSkR5nkCZA=
+github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
+github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
+github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
+github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk=
+github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo=
+golang.org/x/crypto v0.9.0 h1:LF6fAI+IutBocDJ2OT0Q1g8plpYljMZ4+lty+dsqw3g=
+golang.org/x/crypto v0.9.0/go.mod h1:yrmDGqONDYtNj3tH8X9dzUun2m2lzPa9ngI6/RUPGR0=
+golang.org/x/text v0.9.0 h1:2sjJmO8cDvYveuX97RDLsxlyUxLl+GHoLxBiRdHllBE=
+golang.org/x/text v0.9.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8=
+gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
+gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk=
+gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
+gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
+gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
diff --git a/integration_tests/client-library/go/pgx_test/crud_test.go b/integration_tests/client-library/go/pgx_test/crud_test.go
new file mode 100644
index 0000000000000..5656199550fe4
--- /dev/null
+++ b/integration_tests/client-library/go/pgx_test/crud_test.go
@@ -0,0 +1,204 @@
+package pgx_test
+
+import (
+ "context"
+ "fmt"
+ "log"
+ "math"
+ "reflect"
+ "testing"
+ "time"
+
+ "github.com/jackc/pgx/v5"
+ "github.com/stretchr/testify/assert"
+)
+
+func connect() (*pgx.Conn, error) {
+ cfg, err := pgx.ParseConfig("postgres://root@risingwave-standalone:4566/dev")
+ if err != nil {
+ log.Fatalf("Failed to parse config: %v", err)
+ }
+ // TODO: Investigate into why simple protocol is required
+ cfg.DefaultQueryExecMode = pgx.QueryExecModeSimpleProtocol
+ conn, err := pgx.ConnectConfig(context.Background(), cfg)
+ if err != nil {
+ return nil, fmt.Errorf("Failed to connect to database: %w", err)
+ }
+ return conn, nil
+}
+
+func disconnect(conn *pgx.Conn) {
+ conn.Close(context.Background())
+ fmt.Println("Disconnected from the database.")
+}
+
+func createTable(t *testing.T, conn *pgx.Conn) {
+ createTableQuery := `
+ CREATE TABLE IF NOT EXISTS sample_table_go (
+ name VARCHAR,
+ age INTEGER,
+ salary BIGINT,
+ trip_id VARCHAR[],
+ birthdate DATE,
+ deci DOUBLE PRECISION,
+ fare STRUCT <
+ initial_charge DOUBLE PRECISION,
+ subsequent_charge DOUBLE PRECISION,
+ surcharge DOUBLE PRECISION,
+ tolls DOUBLE PRECISION
+ >,
+ starttime TIME,
+ timest TIMESTAMP,
+ timestz TIMESTAMPTZ,
+ timegap INTERVAL
+ );
+ `
+ _, err := conn.Exec(context.Background(), createTableQuery)
+ assert.Nil(t, err, "Table creation failed")
+ fmt.Println("Table created successfully.")
+}
+func insertData(t *testing.T, conn *pgx.Conn, name string, age int, salary int64, tripIDs []string, birthdate string, deci float64, fareData map[string]float64, starttime string, timest time.Time, timestz time.Time, timegap time.Duration) {
+ insertDataQuery := `
+ INSERT INTO sample_table_go (name, age, salary, trip_id, birthdate, deci, fare, starttime, timest, timestz, timegap)
+ VALUES ($1, $2, $3, $4, $5, $6, ROW( $7, $8, $9, $10), $11, $12, $13, $14);
+ `
+ _, err := conn.Exec(context.Background(), insertDataQuery,
+ name, age, salary, tripIDs,
+ birthdate, deci, fareData["initial_charge"], fareData["subsequent_charge"], fareData["surcharge"], fareData["tolls"],
+ starttime,
+ timest.Format("2006-01-02 15:04:05.000000"),
+ timestz.Format("2006-01-02 15:04:05.000000"),
+ timegap,
+ )
+ fmt.Println(timest)
+ fmt.Println(timestz)
+ assert.Nil(t, err, "Data insertion failed")
+ fmt.Println("Data inserted successfully.")
+}
+
+func updateData(conn *pgx.Conn, name string, salary int64) {
+ updateDataQuery := `
+ UPDATE sample_table_go
+ SET salary=$1
+ WHERE name=$2;
+ `
+ _, err := conn.Exec(context.Background(), updateDataQuery, salary, name)
+ if err != nil {
+ log.Fatalf("Data updation failed: %v", err)
+ }
+ fmt.Println("Data updated successfully.")
+}
+
+func deleteData(conn *pgx.Conn, name string) {
+ deleteDataQuery := `
+ DELETE FROM sample_table_go WHERE name=$1;
+ `
+ _, err := conn.Exec(context.Background(), deleteDataQuery, name)
+ if err != nil {
+ log.Fatalf("Data deletion failed: %v", err)
+ }
+ fmt.Println("Data deletion successfully.")
+}
+
+func tableDrop(conn *pgx.Conn) {
+ resetQuery := `
+ DROP TABLE IF EXISTS sample_table_go;
+ `
+ _, err := conn.Exec(context.Background(), resetQuery)
+ if err != nil {
+ log.Fatalf("Table drop failed: %v", err)
+ }
+ fmt.Println("Table dropped successfully.")
+}
+
+func TestCrud(t *testing.T) {
+ conn, err := connect()
+ if err != nil {
+ log.Fatalf("Error connecting to the database: %v", err)
+ }
+ defer disconnect(conn)
+
+ createTable(t, conn)
+ name := "John Doe"
+ age := 30
+ salary := int64(50000)
+ tripIDs := []string{"12345", "67890"}
+ fareData := map[string]float64{
+ "initial_charge": 3.0,
+ "subsequent_charge": 1.5,
+ "surcharge": 0.5,
+ "tolls": 2.0,
+ }
+ birth := time.Date(1993, time.May, 15, 0, 0, 0, 0, time.UTC)
+ birthdate := birth.Format("2006-01-02")
+ start := time.Date(2023, time.August, 7, 18, 20, 0, 0, time.UTC)
+ starttime := start.Format("15:04:05")
+ timest := time.Now()
+ timestz := time.Now().UTC()
+ timegap := 2 * time.Hour
+ deci := 3.14159
+ insertData(t, conn, name, age, salary, tripIDs, birthdate, deci, fareData, starttime, timest, timestz, timegap)
+
+ // Insert data with null values
+ nullName := "Null Person"
+ nullAge := 0
+ nullSalary := int64(0)
+ nullTripIDs := []string{}
+ nullFareData := map[string]float64{}
+ nullBirth := time.Time{}
+ nullBirthdate := nullBirth.Format("2006-01-02")
+ nullStart := time.Time{}
+ nullStarttime := nullStart.Format("15:04:05")
+ nullTimest := time.Time{}
+ nullTimestz := time.Time{}
+ nullTimegap := time.Duration(0)
+ nullDeci := 0.0
+ insertData(t, conn, nullName, nullAge, nullSalary, nullTripIDs, nullBirthdate, nullDeci, nullFareData, nullStarttime, nullTimest, nullTimestz, nullTimegap)
+ checkInsertedData(t, conn, nullName, nullAge, nullSalary, nullTripIDs, nullBirthdate, nullDeci, nullFareData, nullStarttime, nullTimest, nullTimestz, nullTimegap)
+ updateData(conn, "John Doe", 60000)
+ deleteData(conn, "John Doe")
+ tableDrop(conn)
+}
+
+func checkInsertedData(t *testing.T, conn *pgx.Conn, name string, age int, salary int64, tripIDs []string, birthdate string, deci float64, fareData map[string]float64, starttime string, timest time.Time, timestz time.Time, timegap time.Duration) {
+ flushQuery := `
+ FLUSH;
+ `
+ _, err := conn.Exec(context.Background(), flushQuery)
+ assert.Nil(t, err, "Materialized View flush failed")
+
+ query := "SELECT name, age, salary, trip_id, birthdate, deci, fare, starttime, timest, timestz, timegap FROM sample_table_go WHERE name=$1"
+ row := conn.QueryRow(context.Background(), query, name)
+
+ var retrievedName string
+ var retrievedAge int
+ var retrievedSalary int64
+ var retrievedTripIDs []string
+ var retrievedBirthdate string
+ var retrievedDeci float64
+ var retrievedFareData string
+ var retrievedStarttime string
+ var retrievedTimest time.Time
+ var retrievedTimestz time.Time
+ var retrievedTimegap time.Duration
+
+ err = row.Scan(
+ &retrievedName, &retrievedAge, &retrievedSalary, &retrievedTripIDs,
+ &retrievedBirthdate, &retrievedDeci, &retrievedFareData, &retrievedStarttime,
+ &retrievedTimest, &retrievedTimestz, &retrievedTimegap,
+ )
+ assert.Nil(t, err, "Error retrieving inserted data")
+
+ if retrievedName != name ||
+ retrievedAge != age ||
+ retrievedSalary != salary ||
+ !reflect.DeepEqual(retrievedTripIDs, tripIDs) ||
+ retrievedBirthdate != birthdate ||
+ math.Abs(retrievedDeci-deci) > 0.00001 ||
+ retrievedStarttime != starttime ||
+ retrievedTimest != timest ||
+ (!retrievedTimestz.IsZero() && retrievedTimestz.UTC() != timestz.UTC()) ||
+ retrievedTimegap != timegap {
+ t.Errorf("Data didn't matched properly")
+ }
+}
diff --git a/integration_tests/client-library/go/pgx_test/mv_test.go b/integration_tests/client-library/go/pgx_test/mv_test.go
new file mode 100644
index 0000000000000..6a2a2c33cf74a
--- /dev/null
+++ b/integration_tests/client-library/go/pgx_test/mv_test.go
@@ -0,0 +1,115 @@
+package pgx_test
+
+import (
+ "context"
+ "fmt"
+ "log"
+ "math"
+ "testing"
+ "time"
+
+ "github.com/jackc/pgx/v5"
+ "github.com/stretchr/testify/assert"
+)
+
+func TestMaterializedView(t *testing.T) {
+ conn, err := connect()
+ if err != nil {
+ log.Fatalf("Error connecting to the database: %v", err)
+ }
+ createTable(t, conn)
+
+ name := "John Doe"
+ age := 30
+ salary := int64(50000)
+ tripIDs := []string{"12345", "67890"}
+ fareData := map[string]float64{
+ "initial_charge": 3.0,
+ "subsequent_charge": 1.5,
+ "surcharge": 0.5,
+ "tolls": 2.0,
+ }
+ birth := time.Date(1993, time.May, 15, 0, 0, 0, 0, time.UTC)
+ birthdate := birth.Format("2006-01-02")
+ start := time.Date(2023, time.August, 7, 18, 20, 0, 0, time.UTC)
+ starttime := start.Format("15:04:05")
+ timest := time.Now()
+ deci := 3.14159
+ insertData(t, conn, name, age, salary, tripIDs, birthdate, deci, fareData, starttime, timest, timest, 2*time.Hour)
+
+ name2 := "Jane Smith"
+ age2 := 28
+ salary2 := int64(60000)
+ tripIDs2 := []string{"98765", "54321"}
+ fareData2 := map[string]float64{
+ "initial_charge": 2.5,
+ "subsequent_charge": 1.2,
+ "surcharge": 0.4,
+ "tolls": 1.5,
+ }
+ birth2 := time.Date(1995, time.June, 20, 0, 0, 0, 0, time.UTC)
+ birthdate2 := birth2.Format("2006-01-02")
+ start2 := time.Date(2023, time.August, 7, 14, 30, 0, 0, time.UTC)
+ starttime2 := start2.Format("15:04:05")
+ timest2 := time.Now().Add(-time.Hour * 3)
+ deci2 := 2.71828
+ insertData(t, conn, name2, age2, salary2, tripIDs2, birthdate2, deci2, fareData2, starttime2, timest2, timest2, 2*time.Hour)
+
+ createMVQuery := `
+ CREATE MATERIALIZED VIEW IF NOT EXISTS customer_earnings_mv_go AS
+ SELECT
+ name AS customer_name,
+ age AS customer_age,
+ SUM((fare).initial_charge + (fare).subsequent_charge + (fare).surcharge + (fare).tolls) AS total_earnings
+ FROM
+ sample_table_go
+ GROUP BY
+ name, age;
+ `
+ _, err = conn.Exec(context.Background(), createMVQuery)
+ assert.Nil(t, err, "Materialized View creation failed")
+
+ fmt.Println("Materialized View created successfully.")
+ checkMVData(t, conn)
+ dropMV(t, conn)
+}
+
+func checkMVData(t *testing.T, conn *pgx.Conn) {
+ flushQuery := `
+ FLUSH;
+ `
+ _, err := conn.Exec(context.Background(), flushQuery)
+ assert.Nil(t, err, "Materialized View flush failed")
+ fmt.Println("Materialized View flushed successfully.")
+ checkMVQuery := `
+ SELECT * FROM customer_earnings_mv_go;
+ `
+ rows, err := conn.Query(context.Background(), checkMVQuery)
+ assert.Nil(t, err, "Materialized View data check failed")
+ defer rows.Close()
+
+ for rows.Next() {
+ var customerName string
+ var customerAge int
+ var totalEarnings float64
+ err := rows.Scan(&customerName, &customerAge, &totalEarnings)
+ assert.Nil(t, err, "Error scanning MV row")
+
+ switch customerName {
+ case "John Doe":
+ if customerAge != 30 || math.Abs(totalEarnings-7.0) > 0.001 {
+ t.Errorf("Incorrect data for John Doe: Age=%d, Total Earnings=%.2f", customerAge, totalEarnings)
+ }
+ case "Jane Smith":
+ if customerAge != 28 || math.Abs(totalEarnings-5.6) > 0.001 {
+ t.Errorf("Incorrect data for Jane Smith: Age=%d, Total Earnings=%.2f", customerAge, totalEarnings)
+ }
+ }
+ }
+}
+func dropMV(t *testing.T, conn *pgx.Conn) {
+ dropMVQuery := "DROP MATERIALIZED VIEW IF EXISTS customer_earnings_mv_go;"
+ _, err := conn.Exec(context.Background(), dropMVQuery)
+ assert.Nil(t, err, "Materialized View drop failed")
+ fmt.Println("Materialized View dropped successfully.")
+}
diff --git a/integration_tests/client-library/go/run.sh b/integration_tests/client-library/go/run.sh
new file mode 100755
index 0000000000000..701e40f9c2ffe
--- /dev/null
+++ b/integration_tests/client-library/go/run.sh
@@ -0,0 +1,4 @@
+#!/bin/bash
+
+go mod tidy
+go test ./...
diff --git a/integration_tests/client-library/java/.gitignore b/integration_tests/client-library/java/.gitignore
new file mode 100644
index 0000000000000..7f995f335e528
--- /dev/null
+++ b/integration_tests/client-library/java/.gitignore
@@ -0,0 +1,2 @@
+*.iml
+target/
diff --git a/integration_tests/client-library/java/pom.xml b/integration_tests/client-library/java/pom.xml
new file mode 100644
index 0000000000000..dd176ce286442
--- /dev/null
+++ b/integration_tests/client-library/java/pom.xml
@@ -0,0 +1,34 @@
+
+
+ 4.0.0
+
+ org.example
+ jdbc-test
+ 1.0-SNAPSHOT
+
+
+ 11
+ 11
+ UTF-8
+
+
+
+
+
+ org.postgresql
+ postgresql
+ 42.5.4
+
+
+
+
+ org.junit.jupiter
+ junit-jupiter-api
+ 5.9.2
+ test
+
+
+
+
\ No newline at end of file
diff --git a/integration_tests/client-library/java/src/test/java/com/risingwave/TestCreateTable.java b/integration_tests/client-library/java/src/test/java/com/risingwave/TestCreateTable.java
new file mode 100644
index 0000000000000..14bf61ab06595
--- /dev/null
+++ b/integration_tests/client-library/java/src/test/java/com/risingwave/TestCreateTable.java
@@ -0,0 +1,57 @@
+package com.risingwave;
+
+import org.junit.jupiter.api.Test;
+
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.sql.Statement;
+
+public class TestCreateTable {
+
+ public void createSourceTable() throws SQLException {
+ String createTableQuery;
+ Statement statement;
+ try (Connection connection = TestUtils.establishConnection()) {
+ createTableQuery = "CREATE TABLE s1_java (i1 int [], v1 struct, t1 timestamp, c1 varchar) " +
+ "WITH (" +
+ " connector = 'datagen'," +
+ " fields.i1.length = '3'," +
+ " fields.i1._.kind = 'sequence'," +
+ " fields.i1._.start = '1'," +
+ " fields.v1.v2.kind = 'random'," +
+ " fields.v1.v2.min = '-10'," +
+ " fields.v1.v2.max = '10'," +
+ " fields.v1.v2.seed = '1'," +
+ " fields.v1.v3.kind = 'random'," +
+ " fields.v1.v3.min = '15'," +
+ " fields.v1.v3.max = '55'," +
+ " fields.v1.v3.seed = '1'," +
+ " fields.t1.kind = 'random'," +
+ " fields.t1.max_past = '10 day'," +
+ " fields.t1.seed = '3'," +
+ " fields.c1.kind = 'random'," +
+ " fields.c1.length = '16'," +
+ " fields.c1.seed = '3'," +
+ " datagen.rows.per.second = '10'" +
+ ") FORMAT plain ENCODE json ;";
+ statement = connection.createStatement();
+ statement.executeUpdate(createTableQuery);
+ System.out.println("Source table s1_java created successfully.");
+ }
+ }
+
+ public void dropSourceTable() throws SQLException {
+ String dropSourceQuery = "DROP TABLE s1_java;";
+ try (Connection connection = TestUtils.establishConnection()) {
+ Statement statement = connection.createStatement();
+ statement.executeUpdate(dropSourceQuery);
+ System.out.println("Source table s1_java dropped successfully.");
+ }
+ }
+
+ @Test
+ public void testCreateSourceTable() throws SQLException {
+ createSourceTable();
+ dropSourceTable();
+ }
+}
diff --git a/integration_tests/client-library/java/src/test/java/com/risingwave/TestDatabaseConnection.java b/integration_tests/client-library/java/src/test/java/com/risingwave/TestDatabaseConnection.java
new file mode 100644
index 0000000000000..30b42b3015808
--- /dev/null
+++ b/integration_tests/client-library/java/src/test/java/com/risingwave/TestDatabaseConnection.java
@@ -0,0 +1,38 @@
+package com.risingwave;
+
+import org.junit.jupiter.api.Test;
+
+import java.sql.*;
+import java.util.Properties;
+
+import static org.junit.jupiter.api.Assertions.*;
+
+public class TestDatabaseConnection {
+ static Connection establishConnection() throws SQLException {
+ final String url = "jdbc:postgresql://risingwave-standalone:4566/dev";
+
+ Properties props = new Properties();
+ props.setProperty("user", "root");
+ props.setProperty("password", "");
+ props.setProperty("sslmode", "disable");
+ // To reproduce the bug: https://github.com/risingwavelabs/metabase-risingwave-driver/issues/1
+ props.setProperty("ApplicationName", "01234567890123456789012345678901234567890123456");
+
+ return DriverManager.getConnection(url, props);
+ }
+
+ @Test
+ public void testEstablishConnection() throws SQLException {
+ Connection conn = establishConnection();
+ assertNotNull(conn, "Connection should not be null");
+
+ String query = "SELECT 1";
+ Statement statement = conn.createStatement();
+ ResultSet resultSet = statement.executeQuery(query);
+ assertTrue(resultSet.next(), "Expected a result");
+ int resultValue = resultSet.getInt(1);
+ assertEquals(1, resultValue, "Expected result value to be 1");
+
+ conn.close(); // Close the connection to release resources
+ }
+}
diff --git a/integration_tests/client-library/java/src/test/java/com/risingwave/TestMaterializedView.java b/integration_tests/client-library/java/src/test/java/com/risingwave/TestMaterializedView.java
new file mode 100644
index 0000000000000..2d3d537493129
--- /dev/null
+++ b/integration_tests/client-library/java/src/test/java/com/risingwave/TestMaterializedView.java
@@ -0,0 +1,66 @@
+package com.risingwave;
+
+import org.junit.jupiter.api.Test;
+
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+
+public class TestMaterializedView {
+
+ public void clearDatabase() throws SQLException {
+ try (Connection connection = TestUtils.establishConnection()) {
+ // Drop the materialized view
+ String dropViewQuery = "DROP MATERIALIZED VIEW IF EXISTS my_materialized_view_java;";
+ try (Statement statement = connection.createStatement()) {
+ statement.executeUpdate(dropViewQuery);
+ System.out.println("Materialized view dropped successfully.");
+ }
+ String truncateTableQuery = "DROP TABLE my_table_java;";
+ try (Statement statement = connection.createStatement()) {
+ statement.executeUpdate(truncateTableQuery);
+ System.out.println("Table dropped successfully.");
+ }
+ }
+ }
+
+ @Test
+ public void testCompatibility() throws SQLException {
+ try (Connection connection = TestUtils.establishConnection()) {
+ Statement statement = connection.createStatement();
+
+ String createTableSQL = "CREATE TABLE IF NOT EXISTS my_table_java (id int, name VARCHAR)";
+ statement.executeUpdate(createTableSQL);
+
+ String insertDataSQL = "INSERT INTO my_table_java (id,name) VALUES (1,'John'), (2,'Jane'), (3,'Alice')";
+ statement.execute(insertDataSQL);
+ statement.execute("FLUSH;");
+
+ String viewName = "my_materialized_view_java";
+ String createViewSQL = "CREATE MATERIALIZED VIEW " + viewName + " AS SELECT * FROM my_table_java";
+ statement.execute(createViewSQL);
+
+ String query = "SELECT * FROM " + viewName;
+ ResultSet resultSet = statement.executeQuery(query);
+
+ while (resultSet.next()) {
+ int id = resultSet.getInt("id");
+ String name = resultSet.getString("name");
+ System.out.println("ID: " + id + ", Name: " + name);
+ }
+
+ String updateDataSQL = "UPDATE my_table_java SET name = 'Bob' WHERE id = 1";
+ statement.execute(updateDataSQL);
+
+ resultSet = statement.executeQuery(query);
+ while (resultSet.next()) {
+ int id = resultSet.getInt("id");
+ String name = resultSet.getString("name");
+ System.out.println("ID: " + id + ", Name: " + name);
+ }
+ }
+
+ clearDatabase();
+ }
+}
diff --git a/integration_tests/client-library/java/src/test/java/com/risingwave/TestUtils.java b/integration_tests/client-library/java/src/test/java/com/risingwave/TestUtils.java
new file mode 100644
index 0000000000000..756da4eb3eb70
--- /dev/null
+++ b/integration_tests/client-library/java/src/test/java/com/risingwave/TestUtils.java
@@ -0,0 +1,20 @@
+package com.risingwave;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.util.Properties;
+
+public class TestUtils {
+ public static Connection establishConnection() throws SQLException {
+ final String url = "jdbc:postgresql://risingwave-standalone:4566/dev";
+ final String user = "root";
+ final String password = "";
+
+ Properties props = new Properties();
+ props.setProperty("user", user);
+ props.setProperty("password", password);
+
+ return DriverManager.getConnection(url, props);
+ }
+}
diff --git a/integration_tests/client-library/python/client.py b/integration_tests/client-library/python/client.py
new file mode 100644
index 0000000000000..dbd2ad9b4581c
--- /dev/null
+++ b/integration_tests/client-library/python/client.py
@@ -0,0 +1,30 @@
+import psycopg2
+
+class client:
+ def __init__(self, host, port,database, user, password):
+ self.host = host
+ self.port=port
+ self.database = database
+ self.user = user
+ self.password = password
+ self.connection = None
+
+ def connect(self):
+ try:
+ self.connection = psycopg2.connect(
+ host=self.host,
+ port=self.port,
+ database=self.database,
+ user=self.user,
+ password=self.password
+ )
+ cursor = self.connection.cursor() # Create a cursor object
+ return cursor # Return the cursor object
+ except psycopg2.Error as e:
+ print(e)
+ return e
+
+ def disconnect(self):
+ if self.connection:
+ self.connection.close()
+ self.connection = None
diff --git a/integration_tests/client-library/python/crud.py b/integration_tests/client-library/python/crud.py
new file mode 100644
index 0000000000000..f90233c24e336
--- /dev/null
+++ b/integration_tests/client-library/python/crud.py
@@ -0,0 +1,90 @@
+import psycopg2
+from client import client
+
+class crud:
+ def __init__(self, host, port, database, user, password):
+ self.host = host
+ self.database = database
+ self.user = user
+ self.password = password
+ self.connection = None
+ self.port=port
+
+ def create_table(self):
+ create_table_query = """
+ CREATE TABLE IF NOT EXISTS sample_table_py (
+ name VARCHAR,
+ age INTEGER,
+ salary BIGINT
+ );
+ """
+ try:
+ databaseconnection = client(self.host, self.port,self.database, self.user, self.password)
+ cursor=databaseconnection.connect()
+ cursor.execute(create_table_query)
+ databaseconnection.connection.commit()
+ print("Table created successfully.")
+ except psycopg2.Error as e:
+ print("Table creation failed: ", str(e))
+
+ def insert_data(self, name, age, salary):
+ insert_data_query = """
+ INSERT INTO sample_table_py (name, age, salary)
+ VALUES (%s, %s,%s);
+ """
+ try:
+ databaseconnection = client(self.host, self.port,self.database, self.user, self.password)
+ cursor=databaseconnection.connect()
+ cursor.execute(insert_data_query, (name, age, salary))
+ databaseconnection.connection.commit()
+ print("Data inserted successfully.")
+ except psycopg2.Error as e:
+ print("Data insertion failed: ", str(e))
+
+ def update_data(self, name, salary):
+ update_data_query = """
+ UPDATE sample_table_py
+ SET salary=%s
+ WHERE name=%s;
+ """
+ try:
+ databaseconnection = client(self.host, self.port,self.database, self.user, self.password)
+ cursor=databaseconnection.connect()
+ cursor.execute(update_data_query, (salary, name))
+ databaseconnection.connection.commit()
+ print("Data updated successfully.")
+ except psycopg2.Error as e:
+ print("Data updation failed: ", str(e))
+
+ def delete_data(self, name):
+ insert_data_query = """
+ DELETE FROM sample_table_py WHERE name='%s';
+ """
+ try:
+ databaseconnection = client(self.host, self.port,self.database, self.user, self.password)
+ cursor=databaseconnection.connect()
+ cursor.execute(insert_data_query, (name,))
+ databaseconnection.connection.commit()
+ print("Data deletion successfully.")
+ except psycopg2.Error as e:
+ print("Data deletion failed: ", str(e))
+
+ def table_drop(self):
+ reset_query = """
+ DROP TABLE sample_table_py;
+ """
+ try:
+ databaseconnection = client(self.host, self.port,self.database, self.user, self.password)
+ cursor=databaseconnection.connect()
+ cursor.execute(reset_query)
+ databaseconnection.connection.commit()
+ print("Table Dropped successfully")
+ except psycopg2.Error as e:
+ print("Table Drop Failed: ", str(e))
+
+crud_ins=crud(host="risingwave-standalone",
+ port="4566",
+ database="dev",
+ user="root",
+ password="")
+crud_ins.create_table()
diff --git a/integration_tests/client-library/python/init.py b/integration_tests/client-library/python/init.py
new file mode 100644
index 0000000000000..e69de29bb2d1d
diff --git a/integration_tests/client-library/python/materializeview.py b/integration_tests/client-library/python/materializeview.py
new file mode 100644
index 0000000000000..a5cf7fbac5f68
--- /dev/null
+++ b/integration_tests/client-library/python/materializeview.py
@@ -0,0 +1,46 @@
+import psycopg2
+from client import client
+from crud import crud
+
+class MaterializeView:
+ def __init__(self, host, port, database, user, password):
+ self.host = host
+ self.database = database
+ self.user = user
+ self.password = password
+ self.connection = None
+ self.port=port
+
+ def create_mv(self):
+ crud_ins = crud(self.host, self.port, self.database, self.user, self.password)
+ crud_ins.create_table()
+ crud_ins.insert_data("John",25,10000)
+ crud_ins.insert_data("Shaun",25,11000)
+ crud_ins.insert_data("Caul",25,14000)
+ crud_ins.insert_data("Mantis",28,18000)
+ crud_ins.insert_data("Tony",28,19000)
+ mv_query="""
+ CREATE MATERIALIZED VIEW average_salary_view_py AS
+ SELECT age, AVG(salary) AS average_salary
+ FROM sample_table_py
+ GROUP BY age;
+ """
+ try:
+ databaseconnection = client(self.host, self.port,self.database, self.user, self.password)
+ cursor=databaseconnection.connect()
+ cursor.execute(mv_query)
+ databaseconnection.connection.commit()
+ print("MV created successfully.")
+ except psycopg2.Error as e:
+ print("MV creation failed: ", str(e))
+
+ def drop_mv(self):
+ mv_drop_query = "DROP materialized view average_salary_view_py;"
+ try:
+ databaseconnection = client(self.host, self.port,self.database, self.user, self.password)
+ cursor=databaseconnection.connect()
+ cursor.execute(mv_drop_query)
+ databaseconnection.connection.commit()
+ print("MV dropped successfully.")
+ except psycopg2.Error as e:
+ print("MV drop failed: ", str(e))
diff --git a/integration_tests/client-library/python/requirements.txt b/integration_tests/client-library/python/requirements.txt
new file mode 100644
index 0000000000000..8391ca9c83332
--- /dev/null
+++ b/integration_tests/client-library/python/requirements.txt
@@ -0,0 +1,2 @@
+psycopg2-binary
+pytest
\ No newline at end of file
diff --git a/integration_tests/client-library/python/test_database.py b/integration_tests/client-library/python/test_database.py
new file mode 100644
index 0000000000000..78ceea33f8373
--- /dev/null
+++ b/integration_tests/client-library/python/test_database.py
@@ -0,0 +1,139 @@
+import pytest
+from client import client
+from crud import crud
+from materializeview import MaterializeView
+
+
+@pytest.fixture
+def db_connection():
+ db = client(
+ host="risingwave-standalone",
+ port="4566",
+ database="dev",
+ user="root",
+ password=""
+ )
+ yield db
+ db.disconnect()
+
+
+@pytest.fixture
+def crud_instance():
+ return crud(
+ host="risingwave-standalone",
+ port="4566",
+ database="dev",
+ user="root",
+ password=""
+ )
+
+
+@pytest.fixture
+def mv_instance():
+ return MaterializeView(
+ host="risingwave-standalone",
+ port="4566",
+ database="dev",
+ user="root",
+ password=""
+ )
+
+
+def test_connect(db_connection):
+ assert db_connection.connect() != None
+
+
+def test_disconnect(db_connection):
+ db_connection.connect()
+ db_connection.disconnect()
+ assert db_connection.connection is None
+
+def test_table_creation(crud_instance, db_connection):
+ cursor = db_connection.connect()
+ cursor.execute("SET TRANSACTION READ WRITE;")
+ crud_instance.create_table()
+
+ cursor.execute("FLUSH;")
+ cursor.execute("SELECT table_name FROM information_schema.tables WHERE table_name = 'sample_table_py';")
+ result = cursor.fetchone()[0]
+ cursor.close()
+ assert result == 'sample_table_py'
+
+def test_data_insertion(crud_instance, db_connection):
+ crud_instance.insert_data("John Doe", 25,10000)
+
+ cursor = db_connection.connect()
+ cursor.execute("FLUSH;")
+ cursor.execute("SELECT COUNT(*) FROM sample_table_py;")
+ result = cursor.fetchone()
+ result = result[0]
+ cursor.close()
+
+ assert result == 1
+
+def test_data_updation(crud_instance, db_connection):
+ crud_instance.update_data("John Doe", 12000)
+
+ cursor = db_connection.connect()
+ cursor.execute("FLUSH;")
+ cursor.execute("SELECT salary FROM sample_table_py WHERE name='John Doe';")
+ result = cursor.fetchone()
+ result = result[0]
+ cursor.close()
+ assert result == 12000
+
+def test_data_deletion(crud_instance, db_connection):
+ crud_instance.delete_data("John Doe")
+
+ cursor = db_connection.connect()
+ cursor.execute("FLUSH;")
+ cursor.execute("SELECT EXISTS (SELECT 1 FROM sample_table_py WHERE name = 'John Doe');")
+ result = cursor.fetchone()
+ result = result[0]
+ cursor.close()
+
+ assert result == True
+
+def test_table_drop(crud_instance, db_connection):
+ crud_instance.table_drop()
+
+ cursor = db_connection.connect()
+ cursor.execute("FLUSH;")
+ cursor.execute("SELECT EXISTS (SELECT 1 FROM information_schema.tables WHERE table_name = 'sample_table_py');")
+ result = cursor.fetchone()
+ result = result[0]
+ cursor.close()
+
+ assert result is False
+
+def test_mv_creation(mv_instance,db_connection):
+ mv_instance.create_mv()
+ cursor = db_connection.connect()
+ cursor.execute("FLUSH;")
+ cursor.execute("SELECT EXISTS (SELECT 1 FROM pg_matviews WHERE matviewname = 'average_salary_view_py');")
+ result = cursor.fetchone()[0]
+ cursor.close()
+ assert result is True
+
+def test_mv_updation(db_connection,crud_instance):
+ crud_instance.insert_data("Stark", 25, 13000)
+ cursor = db_connection.connect()
+ cursor.execute("FLUSH;")
+ cursor.execute("SELECT average_salary FROM average_salary_view_py WHERE age=25;")
+ result = cursor.fetchone()[0]
+ cursor.close()
+ # assert result == 11250
+ assert result == 12000
+
+
+def test_mv_drop(crud_instance,mv_instance,db_connection):
+ mv_instance.drop_mv()
+ crud_instance.table_drop()
+ cursor = db_connection.connect()
+ cursor.execute("FLUSH;")
+ cursor.execute("SELECT EXISTS (SELECT 1 FROM pg_matviews WHERE matviewname = 'average_salary_view_py');")
+ result = cursor.fetchone()
+ result = result[0]
+ cursor.close()
+
+ assert result is False