diff --git a/.typos.toml b/.typos.toml index 225e69a6997bf..4d4bbfca1c082 100644 --- a/.typos.toml +++ b/.typos.toml @@ -14,7 +14,9 @@ mosquitto = "mosquitto" # This is a MQTT broker. abd = "abd" iy = "iy" Pn = "Pn" + [default.extend-identifiers] +TABLE_SCHEM = "TABLE_SCHEM" [files] extend-exclude = [ diff --git a/integration_tests/client-library/java/pom.xml b/integration_tests/client-library/java/pom.xml index d9383161d1261..66d83eee92651 100644 --- a/integration_tests/client-library/java/pom.xml +++ b/integration_tests/client-library/java/pom.xml @@ -19,7 +19,13 @@ org.postgresql postgresql - 42.5.5 + 42.7.3 + + + + commons-dbutils + commons-dbutils + 1.8.1 diff --git a/integration_tests/client-library/java/src/test/java/com/risingwave/TestCreateTable.java b/integration_tests/client-library/java/src/test/java/com/risingwave/TestCreateTable.java index 14bf61ab06595..7694d094120ca 100644 --- a/integration_tests/client-library/java/src/test/java/com/risingwave/TestCreateTable.java +++ b/integration_tests/client-library/java/src/test/java/com/risingwave/TestCreateTable.java @@ -8,7 +8,7 @@ public class TestCreateTable { - public void createSourceTable() throws SQLException { + public static void createSourceTable() throws SQLException { String createTableQuery; Statement statement; try (Connection connection = TestUtils.establishConnection()) { @@ -40,7 +40,7 @@ public void createSourceTable() throws SQLException { } } - public void dropSourceTable() throws SQLException { + public static void dropSourceTable() throws SQLException { String dropSourceQuery = "DROP TABLE s1_java;"; try (Connection connection = TestUtils.establishConnection()) { Statement statement = connection.createStatement(); diff --git a/integration_tests/client-library/java/src/test/java/com/risingwave/TestDatabaseConnection.java b/integration_tests/client-library/java/src/test/java/com/risingwave/TestDatabaseConnection.java index 30b42b3015808..65a18a8b0f02f 100644 --- a/integration_tests/client-library/java/src/test/java/com/risingwave/TestDatabaseConnection.java +++ b/integration_tests/client-library/java/src/test/java/com/risingwave/TestDatabaseConnection.java @@ -23,16 +23,15 @@ static Connection establishConnection() throws SQLException { @Test public void testEstablishConnection() throws SQLException { - Connection conn = establishConnection(); - assertNotNull(conn, "Connection should not be null"); - - String query = "SELECT 1"; - Statement statement = conn.createStatement(); - ResultSet resultSet = statement.executeQuery(query); - assertTrue(resultSet.next(), "Expected a result"); - int resultValue = resultSet.getInt(1); - assertEquals(1, resultValue, "Expected result value to be 1"); - - conn.close(); // Close the connection to release resources + try (Connection conn = TestUtils.establishConnection()) { + assertNotNull(conn, "Connection should not be null"); + + String query = "SELECT 1"; + Statement statement = conn.createStatement(); + ResultSet resultSet = statement.executeQuery(query); + assertTrue(resultSet.next(), "Expected a result"); + int resultValue = resultSet.getInt(1); + assertEquals(1, resultValue, "Expected result value to be 1"); + } } } diff --git a/integration_tests/client-library/java/src/test/java/com/risingwave/TestMaterializedView.java b/integration_tests/client-library/java/src/test/java/com/risingwave/TestMaterializedView.java index 2d3d537493129..9bcb0207f62aa 100644 --- a/integration_tests/client-library/java/src/test/java/com/risingwave/TestMaterializedView.java +++ b/integration_tests/client-library/java/src/test/java/com/risingwave/TestMaterializedView.java @@ -1,11 +1,16 @@ package com.risingwave; +import org.apache.commons.dbutils.QueryRunner; +import org.apache.commons.dbutils.handlers.MapListHandler; +import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; import java.sql.Connection; import java.sql.ResultSet; import java.sql.SQLException; import java.sql.Statement; +import java.util.List; +import java.util.Map; public class TestMaterializedView { @@ -17,11 +22,38 @@ public void clearDatabase() throws SQLException { statement.executeUpdate(dropViewQuery); System.out.println("Materialized view dropped successfully."); } - String truncateTableQuery = "DROP TABLE my_table_java;"; + String truncateTableQuery = "DROP TABLE IF EXISTS my_table_java;"; try (Statement statement = connection.createStatement()) { statement.executeUpdate(truncateTableQuery); - System.out.println("Table dropped successfully."); + System.out.println("Table my_table_java dropped successfully."); } + truncateTableQuery = "DROP TABLE IF EXISTS test_struct;"; + try (Statement statement = connection.createStatement()) { + statement.executeUpdate(truncateTableQuery); + System.out.println("Table test_struct dropped successfully."); + } + } + } + + @Test + public void testStruct() throws SQLException { + try (Connection conn = TestUtils.establishConnection()) { + Statement statement = conn.createStatement(); + statement.executeUpdate("CREATE TABLE IF NOT EXISTS test_struct(" + + "i1 int [], v1 struct, t1 timestamptz, c1 varchar" + + ")"); + + String insertDataSQL = "INSERT INTO test_struct (i1, v1, t1, c1) VALUES ('{1}', (2, 3), '2020-01-01 01:02:03', 'abc')"; + statement.execute(insertDataSQL); + statement.execute("FLUSH;"); + + QueryRunner runner = new QueryRunner(); + String query = "SELECT * FROM test_struct"; + List> resultList = runner.query(conn, query, new MapListHandler()); + Assertions.assertEquals(resultList.size(), 1); + Assertions.assertEquals(resultList.get(0).get("v1"), "(2,3)"); + } finally { + clearDatabase(); } } diff --git a/integration_tests/client-library/java/src/test/java/com/risingwave/TestMetadata.java b/integration_tests/client-library/java/src/test/java/com/risingwave/TestMetadata.java new file mode 100644 index 0000000000000..b076da30d233a --- /dev/null +++ b/integration_tests/client-library/java/src/test/java/com/risingwave/TestMetadata.java @@ -0,0 +1,238 @@ +package com.risingwave; + +import org.junit.jupiter.api.Test; + +import java.sql.Connection; +import java.sql.DatabaseMetaData; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.List; + +import static org.junit.jupiter.api.Assertions.*; + +public class TestMetadata { + + public List fetchAllStrings(ResultSet resultSet, String columnName) throws SQLException { + List stringColumn = new ArrayList<>(); + + // Iterate through the ResultSet and collect schema names + while (resultSet.next()) { + String element = resultSet.getString(columnName); + stringColumn.add(element); + } + + return stringColumn; + } + + @Test + public void testGetSchemas() throws SQLException { + try (Connection connection = TestUtils.establishConnection()) { + DatabaseMetaData metaData = connection.getMetaData(); + try (ResultSet schemas = metaData.getSchemas()) { + // Check that the ResultSet is not null + assertNotNull(schemas); + + List schemaList = fetchAllStrings(schemas, "TABLE_SCHEM"); + + // Check that the schema list is not empty + assertFalse(schemaList.isEmpty()); + + // Check that "pg_catalog" schema exists in the list + assertTrue(schemaList.contains("pg_catalog")); + } + } + } + + @Test + public void testGetCatalogs() throws SQLException { + try (Connection connection = TestUtils.establishConnection()) { + DatabaseMetaData metaData = connection.getMetaData(); + try (ResultSet catalogs = metaData.getCatalogs()) { + // Check that the ResultSet is not null + assertNotNull(catalogs); + + List catalogList = fetchAllStrings(catalogs, "TABLE_CAT"); + + // Check that the catalog list is not empty + assertFalse(catalogList.isEmpty()); + + // Check that a must-have database "dev" exists in the list + assertTrue(catalogList.contains("dev")); + } + } + } + + /* + TODO: Support the parameter 'default_transaction_isolation'. + @Test + public void testGetDefaultTransactionIsolation() throws SQLException { + try (Connection connection = TestUtils.establishConnection()) { + DatabaseMetaData metaData = connection.getMetaData(); + metaData.getDefaultTransactionIsolation(); + } + } + */ + + /* + TODO: Support the parameter 'max_index_keys'. + @Test + public void testGetMaxColumnsInIndex() throws SQLException { + try (Connection connection = TestUtils.establishConnection()) { + DatabaseMetaData metaData = connection.getMetaData(); + metaData.getMaxColumnsInIndex(); + metaData.getImportedKeys("dev", "information_schema", "tables"); + metaData.getExportedKeys("dev", "information_schema", "tables"); + metaData.getCrossReference("dev", "information_schema", "tables", "dev", "pg_catalog", "pg_tables"); + } + } + */ + + /* + TODO: Support the `name` type and give it a type length. + @Test + public void testGetMaxNameLength() throws SQLException { + try (Connection connection = TestUtils.establishConnection()) { + DatabaseMetaData metaData = connection.getMetaData(); + metaData.getMaxCursorNameLength(); + metaData.getMaxSchemaNameLength(); + metaData.getMaxTableNameLength(); + metaData.getMaxSchemaNameLength(); + metaData.getMaxUserNameLength(); + metaData.getMaxProcedureNameLength(); + metaData.getMaxCatalogNameLength(); + metaData.getClientInfoProperties(); + } + } + */ + + @Test + public void testGetProcedures() throws SQLException { + try (Connection connection = TestUtils.establishConnection()) { + DatabaseMetaData metaData = connection.getMetaData(); + try (ResultSet procedures = metaData.getProcedures("dev", "public", "")) { + assertNotNull(procedures); + List procedureList = fetchAllStrings(procedures, "PROCEDURE_NAME"); + assertTrue(procedureList.isEmpty()); + } + } + } + + @Test + public void testGetTables() throws SQLException { + try (Connection connection = TestUtils.establishConnection()) { + DatabaseMetaData metaData = connection.getMetaData(); + try (ResultSet tables = metaData.getTables("dev", "pg_catalog", "", null)) { + assertNotNull(tables); + List tablesList = fetchAllStrings(tables, "TABLE_NAME"); + assertFalse(tablesList.isEmpty()); + } + + /* + TODO: `relacl` is missing in `pg_class`. + try (ResultSet acl = metaData.getTablePrivileges("dev", "information_schema", "tables")) { + assertNotNull(acl); + } + */ + } + } + + @Test + public void testGetColumns() throws SQLException { + try (Connection connection = TestUtils.establishConnection()) { + DatabaseMetaData metaData = connection.getMetaData(); + try (ResultSet columns = metaData.getColumns("dev", "information_schema", "tables", "")) { + assertNotNull(columns); + List columnList = fetchAllStrings(columns, "COLUMN_NAME"); + assertFalse(columnList.isEmpty()); + } + + /* + TODO: `relacl` is missing in `pg_class`. + try (ResultSet acl = metaData.getColumnPrivileges("dev", "information_schema", "tables", "")) { + assertNotNull(acl); + } + */ + } + } + + /* + TODO: the query used in getPrimaryKeys retrieves nothing in RisingWave. + @Test + public void testGetPrimaryKeys() throws SQLException { + try (Connection connection = TestUtils.establishConnection()) { + DatabaseMetaData metaData = connection.getMetaData(); + + TestUtils.executeDdl(connection, "CREATE TABLE t_test_get_pks (id INT PRIMARY KEY, value TEXT);"); + + try (ResultSet pKeys = metaData.getPrimaryKeys("dev", "public", "t_test_get_pks")) { + assertNotNull(pKeys); + List pKeyList = fetchAllStrings(pKeys, "PK_NAME"); + assertFalse(pKeyList.isEmpty()); + } + + try (ResultSet pKeys = metaData.getPrimaryUniqueKeys("dev", "public", "t_test_get_pks")) { + assertNotNull(pKeys); + List pKeyList = fetchAllStrings(pKeys, "PK_NAME"); + assertFalse(pKeyList.isEmpty()); + } + + TestUtils.executeDdl(connection, "DROP TABLE t_test_get_pks;"); + } + } + */ + + + /* + TODO: Support pg_get_function_result. + TODO: prokind is missing in pg_proc. + @Test + public void testGetFunctions() throws SQLException { + try (Connection connection = TestUtils.establishConnection()) { + DatabaseMetaData metaData = connection.getMetaData(); + + try (ResultSet functions = metaData.getFunctions("dev", "public", "")) { + assertNotNull(functions); + List fList = fetchAllStrings(functions, "FUNCTION_NAME"); + assertFalse(fList.isEmpty()); + } + + try (ResultSet cols = metaData.getFunctionColumns("dev", "public", "")) { + assertNotNull(cols); + } + } + } + */ + + @Test + public void testGetUDTs() throws SQLException { + try (Connection connection = TestUtils.establishConnection()) { + DatabaseMetaData metaData = connection.getMetaData(); + + try (ResultSet types = metaData.getUDTs("dev", "public", "", null)) { + assertNotNull(types); + } + } + } + + /** + * Only checks the API callability, not results. + */ + @Test + public void testApiCallable() throws SQLException { + try (Connection connection = TestUtils.establishConnection()) { + DatabaseMetaData metaData = connection.getMetaData(); + + metaData.getTableTypes(); + metaData.getTypeInfo(); + + /// metaData.getIndexInfo(); + + // TODO: Actually implement `pg_catalog.pg_get_keywords()`. Now it returns empty. + metaData.getSQLKeywords(); + + metaData.getBestRowIdentifier("dev", "information_schema", "tables", 1, true); + metaData.getVersionColumns("dev", "information_schema", "tables"); + } + } +} diff --git a/integration_tests/client-library/java/src/test/java/com/risingwave/TestSubscription.java b/integration_tests/client-library/java/src/test/java/com/risingwave/TestSubscription.java new file mode 100644 index 0000000000000..0c834e23806b2 --- /dev/null +++ b/integration_tests/client-library/java/src/test/java/com/risingwave/TestSubscription.java @@ -0,0 +1,57 @@ +package com.risingwave; + +import org.junit.jupiter.api.Test; + +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; + +import static org.junit.jupiter.api.Assertions.assertNotNull; + +public class TestSubscription { + public void createAndTest() throws SQLException { + TestCreateTable.createSourceTable(); + try (Connection conn = TestUtils.establishConnection()) { + // Step 1: Create a subscription + String createSubscription = "CREATE SUBSCRIPTION sub4 FROM s1_java WITH (retention = '1 hour')"; + try (PreparedStatement createStmt = conn.prepareStatement(createSubscription)) { + createStmt.execute(); + System.out.println("Subscription created successfully."); + } + // Step 2: Declare a subscription cursor + String declareCursor = "DECLARE cur4 SUBSCRIPTION CURSOR FOR sub4"; + try (PreparedStatement declareStmt = conn.prepareStatement(declareCursor)) { + declareStmt.execute(); + System.out.println("Subscription cursor declared successfully."); + } + // Step 3: Fetch data from the subscription cursor + String fetchData = "FETCH NEXT FROM cur4"; + try (PreparedStatement fetchStmt = conn.prepareStatement(fetchData)) { + ResultSet rs = fetchStmt.executeQuery(); + while (rs.next()) { + Object v1 = rs.getObject("v1"); + assertNotNull(v1); + } + } + } + } + + public void dropSubscription() throws SQLException { + try (Connection conn = TestUtils.establishConnection()) { + PreparedStatement stmt = conn.prepareStatement("DROP SUBSCRIPTION sub4"); + stmt.execute(); + System.out.println("Subscription dropped successfully."); + } + } + + @Test + public void testSubscription() throws SQLException { + try { + createAndTest(); + } finally { + dropSubscription(); + TestCreateTable.dropSourceTable(); + } + } +} \ No newline at end of file diff --git a/integration_tests/client-library/java/src/test/java/com/risingwave/TestUtils.java b/integration_tests/client-library/java/src/test/java/com/risingwave/TestUtils.java index 756da4eb3eb70..245c2f36a06d2 100644 --- a/integration_tests/client-library/java/src/test/java/com/risingwave/TestUtils.java +++ b/integration_tests/client-library/java/src/test/java/com/risingwave/TestUtils.java @@ -7,7 +7,8 @@ public class TestUtils { public static Connection establishConnection() throws SQLException { - final String url = "jdbc:postgresql://risingwave-standalone:4566/dev"; + // TODO: remove preferQueryMode=simple. + final String url = "jdbc:postgresql://risingwave-standalone:4566/dev?preferQueryMode=simple"; final String user = "root"; final String password = ""; diff --git a/src/frontend/src/catalog/system_catalog/pg_catalog/pg_proc.rs b/src/frontend/src/catalog/system_catalog/pg_catalog/pg_proc.rs index 259f42376abe9..2f2abce7a0b8e 100644 --- a/src/frontend/src/catalog/system_catalog/pg_catalog/pg_proc.rs +++ b/src/frontend/src/catalog/system_catalog/pg_catalog/pg_proc.rs @@ -30,4 +30,5 @@ struct PgProc { proargdefaults: i32, // Data type of the return value, refer to pg_type. prorettype: i32, + prokind: String, }