diff --git a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/MySqlValidator.java b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/MySqlValidator.java index efaaa33fe4952..9cfafa91e25b2 100644 --- a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/MySqlValidator.java +++ b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/MySqlValidator.java @@ -161,7 +161,33 @@ private void validateTableSchema() throws SQLException { } private void validatePrivileges() throws SQLException { - // TODO: validate MySQL user privileges + String[] privilegesRequired = { + "SELECT", + "RELOAD", + "SHOW DATABASES", + "REPLICATION SLAVE", + "REPLICATION CLIENT", + "LOCK TABLES" + }; + try (var stmt = jdbcConnection.createStatement()) { + var res = stmt.executeQuery(ValidatorUtils.getSql("mysql.grants")); + while (res.next()) { + String grants = res.getString(1).toUpperCase(); + // all privileges granted, check passed + if (grants.contains("ALL")) { + break; + } + // check whether each privilege is granted + for (String privilege : privilegesRequired) { + if (!grants.contains(privilege)) { + throw ValidatorUtils.invalidArgument( + String.format( + "MySQL user does not have privilege %s, which is needed for debezium connector", + privilege)); + } + } + } + } } @Override diff --git a/java/connector-node/risingwave-connector-service/src/main/resources/validate_sql.properties b/java/connector-node/risingwave-connector-service/src/main/resources/validate_sql.properties index aec6606cc01f0..53d300a8e6aeb 100644 --- a/java/connector-node/risingwave-connector-service/src/main/resources/validate_sql.properties +++ b/java/connector-node/risingwave-connector-service/src/main/resources/validate_sql.properties @@ -3,6 +3,7 @@ mysql.bin_format=show variables like 'binlog_format' mysql.bin_row_image=show variables like 'binlog_row_image' mysql.table=SELECT count(*) FROM information_schema.TABLES WHERE TABLE_SCHEMA = ? AND TABLE_NAME = ? mysql.table_schema=SELECT COLUMN_NAME, DATA_TYPE, COLUMN_KEY FROM information_schema.COLUMNS WHERE TABLE_SCHEMA = ? AND TABLE_NAME = ? ORDER BY ORDINAL_POSITION +mysql.grants=SHOW GRANTS FOR CURRENT_USER() postgres.wal=show wal_level postgres.table=SELECT EXISTS ( SELECT FROM pg_tables WHERE schemaname = ? AND tablename = ?) postgres.pk=SELECT a.attname, format_type(a.atttypid, a.atttypmod) AS data_type FROM pg_index i JOIN pg_attribute a ON a.attrelid = i.indrelid AND a.attnum = ANY(i.indkey) WHERE i.indrelid = ?::regclass AND i.indisprimary diff --git a/java/connector-node/risingwave-source-test/src/test/java/com/risingwave/connector/MySQLSourceTest.java b/java/connector-node/risingwave-source-test/src/test/java/com/risingwave/connector/MySQLSourceTest.java index 3b56dcb6f056f..dbf2a2ab152b0 100644 --- a/java/connector-node/risingwave-source-test/src/test/java/com/risingwave/connector/MySQLSourceTest.java +++ b/java/connector-node/risingwave-source-test/src/test/java/com/risingwave/connector/MySQLSourceTest.java @@ -17,7 +17,9 @@ import static org.assertj.core.api.Assertions.*; import static org.junit.Assert.assertEquals; +import com.risingwave.proto.ConnectorServiceProto; import com.risingwave.proto.ConnectorServiceProto.*; +import com.risingwave.proto.Data; import io.grpc.*; import java.io.IOException; import java.sql.Connection; @@ -27,10 +29,7 @@ import java.util.List; import java.util.concurrent.*; import javax.sql.DataSource; -import org.junit.AfterClass; -import org.junit.BeforeClass; -import org.junit.Ignore; -import org.junit.Test; +import org.junit.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.testcontainers.containers.MySQLContainer; @@ -139,8 +138,75 @@ public void testLines() throws InterruptedException, SQLException { assertEquals(10000, count); } catch (ExecutionException e) { fail("Execution exception: ", e); + } finally { + // cleanup + query = testClient.sqlStmts.getProperty("tpch.drop.orders"); + SourceTestClient.performQuery(connection, query); + connection.close(); + } + } + + // test whether validation catches permission errors + @Test + public void testPermissionCheck() throws SQLException { + // user Root creates a superuser debezium + Connection connRoot = SourceTestClient.connect(mysqlDataSource); + String query = "CREATE USER debezium IDENTIFIED BY '" + mysql.getPassword() + "'"; + SourceTestClient.performQuery(connRoot, query); + query = + "GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'debezium'"; + SourceTestClient.performQuery(connRoot, query); + // user debezium connects to Mysql + DataSource dbzDataSource = + SourceTestClient.getDataSource( + mysql.getJdbcUrl(), + "debezium", + mysql.getPassword(), + mysql.getDriverClassName()); + Connection connDbz = SourceTestClient.connect(dbzDataSource); + query = + "CREATE TABLE IF NOT EXISTS orders (o_key BIGINT NOT NULL, o_val INT, PRIMARY KEY (o_key))"; + SourceTestClient.performQuery(connDbz, query); + ConnectorServiceProto.TableSchema tableSchema = + ConnectorServiceProto.TableSchema.newBuilder() + .addColumns( + ConnectorServiceProto.TableSchema.Column.newBuilder() + .setName("o_key") + .setDataType(Data.DataType.TypeName.INT64) + .build()) + .addColumns( + ConnectorServiceProto.TableSchema.Column.newBuilder() + .setName("o_val") + .setDataType(Data.DataType.TypeName.INT32) + .build()) + .addPkIndices(0) + .build(); + + try { + var resp = + testClient.validateSource( + mysql.getJdbcUrl(), + mysql.getHost(), + "debezium", + mysql.getPassword(), + SourceType.MYSQL, + tableSchema, + "test", + "orders"); + assertEquals( + "INTERNAL: MySQL user does not have privilege LOCK TABLES, which is needed for debezium connector", + resp.getError().getErrorMessage()); + } catch (Exception e) { + Assert.fail("validate rpc fail: " + e.getMessage()); + } finally { + // cleanup + query = testClient.sqlStmts.getProperty("tpch.drop.orders"); + SourceTestClient.performQuery(connDbz, query); + query = "DROP USER IF EXISTS debezium"; + SourceTestClient.performQuery(connRoot, query); + connDbz.close(); + connRoot.close(); } - connection.close(); } // generates test cases for the risingwave debezium parser diff --git a/java/connector-node/risingwave-source-test/src/test/java/com/risingwave/connector/PostgresSourceTest.java b/java/connector-node/risingwave-source-test/src/test/java/com/risingwave/connector/PostgresSourceTest.java index ffb5dab5b626a..57349479ba7f0 100644 --- a/java/connector-node/risingwave-source-test/src/test/java/com/risingwave/connector/PostgresSourceTest.java +++ b/java/connector-node/risingwave-source-test/src/test/java/com/risingwave/connector/PostgresSourceTest.java @@ -146,7 +146,7 @@ public void testLines() throws Exception { Assert.fail("validate rpc fail: " + e.getMessage()); } finally { // cleanup - query = "DROP TABLE orders"; + query = testClient.sqlStmts.getProperty("tpch.drop.orders"); SourceTestClient.performQuery(connection, query); connection.close(); } @@ -227,7 +227,7 @@ public void testPermissionCheck() throws SQLException { Assert.fail("validate rpc fail: " + e.getMessage()); } finally { // cleanup - query = "DROP TABLE orders"; + query = testClient.sqlStmts.getProperty("tpch.drop.orders"); SourceTestClient.performQuery(connDbz, query); query = "DROP USER debezium"; SourceTestClient.performQuery(connPg, query); diff --git a/java/connector-node/risingwave-source-test/src/test/resources/stored_queries.properties b/java/connector-node/risingwave-source-test/src/test/resources/stored_queries.properties index 20acafedebae5..94811a495062b 100644 --- a/java/connector-node/risingwave-source-test/src/test/resources/stored_queries.properties +++ b/java/connector-node/risingwave-source-test/src/test/resources/stored_queries.properties @@ -1,3 +1,4 @@ mysql.bin_log=show variables like 'log_bin' postgres.wal=show wal_level -tpch.create.orders=CREATE TABLE IF NOT EXISTS orders (O_ORDERKEY BIGINT NOT NULL, O_CUSTKEY BIGINT NOT NULL, O_ORDERSTATUS CHAR(1) NOT NULL, O_TOTALPRICE DECIMAL(15, 2) NOT NULL, O_ORDERDATE DATE NOT NULL, O_ORDERPRIORITY CHAR(15) NOT NULL, O_CLERK CHAR(15) NOT NULL, O_SHIPPRIORITY BIGINT NOT NULL, O_COMMENT VARCHAR(79) NOT NULL, PRIMARY KEY (O_ORDERKEY)) \ No newline at end of file +tpch.create.orders=CREATE TABLE IF NOT EXISTS orders (O_ORDERKEY BIGINT NOT NULL, O_CUSTKEY BIGINT NOT NULL, O_ORDERSTATUS CHAR(1) NOT NULL, O_TOTALPRICE DECIMAL(15, 2) NOT NULL, O_ORDERDATE DATE NOT NULL, O_ORDERPRIORITY CHAR(15) NOT NULL, O_CLERK CHAR(15) NOT NULL, O_SHIPPRIORITY BIGINT NOT NULL, O_COMMENT VARCHAR(79) NOT NULL, PRIMARY KEY (O_ORDERKEY)) +tpch.drop.orders=DROP TABLE IF EXISTS orders \ No newline at end of file