Skip to content

Commit

Permalink
feat(source): add mysql permission checks in validation (#9575)
Browse files Browse the repository at this point in the history
  • Loading branch information
WillyKidd authored and Li0k committed May 5, 2023
1 parent 5f7a9b1 commit f04829b
Show file tree
Hide file tree
Showing 5 changed files with 103 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,33 @@ private void validateTableSchema() throws SQLException {
}

private void validatePrivileges() throws SQLException {
// TODO: validate MySQL user privileges
String[] privilegesRequired = {
"SELECT",
"RELOAD",
"SHOW DATABASES",
"REPLICATION SLAVE",
"REPLICATION CLIENT",
"LOCK TABLES"
};
try (var stmt = jdbcConnection.createStatement()) {
var res = stmt.executeQuery(ValidatorUtils.getSql("mysql.grants"));
while (res.next()) {
String grants = res.getString(1).toUpperCase();
// all privileges granted, check passed
if (grants.contains("ALL")) {
break;
}
// check whether each privilege is granted
for (String privilege : privilegesRequired) {
if (!grants.contains(privilege)) {
throw ValidatorUtils.invalidArgument(
String.format(
"MySQL user does not have privilege %s, which is needed for debezium connector",
privilege));
}
}
}
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ mysql.bin_format=show variables like 'binlog_format'
mysql.bin_row_image=show variables like 'binlog_row_image'
mysql.table=SELECT count(*) FROM information_schema.TABLES WHERE TABLE_SCHEMA = ? AND TABLE_NAME = ?
mysql.table_schema=SELECT COLUMN_NAME, DATA_TYPE, COLUMN_KEY FROM information_schema.COLUMNS WHERE TABLE_SCHEMA = ? AND TABLE_NAME = ? ORDER BY ORDINAL_POSITION
mysql.grants=SHOW GRANTS FOR CURRENT_USER()
postgres.wal=show wal_level
postgres.table=SELECT EXISTS ( SELECT FROM pg_tables WHERE schemaname = ? AND tablename = ?)
postgres.pk=SELECT a.attname, format_type(a.atttypid, a.atttypmod) AS data_type FROM pg_index i JOIN pg_attribute a ON a.attrelid = i.indrelid AND a.attnum = ANY(i.indkey) WHERE i.indrelid = ?::regclass AND i.indisprimary
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@
import static org.assertj.core.api.Assertions.*;
import static org.junit.Assert.assertEquals;

import com.risingwave.proto.ConnectorServiceProto;
import com.risingwave.proto.ConnectorServiceProto.*;
import com.risingwave.proto.Data;
import io.grpc.*;
import java.io.IOException;
import java.sql.Connection;
Expand All @@ -27,10 +29,7 @@
import java.util.List;
import java.util.concurrent.*;
import javax.sql.DataSource;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Ignore;
import org.junit.Test;
import org.junit.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.MySQLContainer;
Expand Down Expand Up @@ -139,8 +138,75 @@ public void testLines() throws InterruptedException, SQLException {
assertEquals(10000, count);
} catch (ExecutionException e) {
fail("Execution exception: ", e);
} finally {
// cleanup
query = testClient.sqlStmts.getProperty("tpch.drop.orders");
SourceTestClient.performQuery(connection, query);
connection.close();
}
}

// test whether validation catches permission errors
@Test
public void testPermissionCheck() throws SQLException {
// user Root creates a superuser debezium
Connection connRoot = SourceTestClient.connect(mysqlDataSource);
String query = "CREATE USER debezium IDENTIFIED BY '" + mysql.getPassword() + "'";
SourceTestClient.performQuery(connRoot, query);
query =
"GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'debezium'";
SourceTestClient.performQuery(connRoot, query);
// user debezium connects to Mysql
DataSource dbzDataSource =
SourceTestClient.getDataSource(
mysql.getJdbcUrl(),
"debezium",
mysql.getPassword(),
mysql.getDriverClassName());
Connection connDbz = SourceTestClient.connect(dbzDataSource);
query =
"CREATE TABLE IF NOT EXISTS orders (o_key BIGINT NOT NULL, o_val INT, PRIMARY KEY (o_key))";
SourceTestClient.performQuery(connDbz, query);
ConnectorServiceProto.TableSchema tableSchema =
ConnectorServiceProto.TableSchema.newBuilder()
.addColumns(
ConnectorServiceProto.TableSchema.Column.newBuilder()
.setName("o_key")
.setDataType(Data.DataType.TypeName.INT64)
.build())
.addColumns(
ConnectorServiceProto.TableSchema.Column.newBuilder()
.setName("o_val")
.setDataType(Data.DataType.TypeName.INT32)
.build())
.addPkIndices(0)
.build();

try {
var resp =
testClient.validateSource(
mysql.getJdbcUrl(),
mysql.getHost(),
"debezium",
mysql.getPassword(),
SourceType.MYSQL,
tableSchema,
"test",
"orders");
assertEquals(
"INTERNAL: MySQL user does not have privilege LOCK TABLES, which is needed for debezium connector",
resp.getError().getErrorMessage());
} catch (Exception e) {
Assert.fail("validate rpc fail: " + e.getMessage());
} finally {
// cleanup
query = testClient.sqlStmts.getProperty("tpch.drop.orders");
SourceTestClient.performQuery(connDbz, query);
query = "DROP USER IF EXISTS debezium";
SourceTestClient.performQuery(connRoot, query);
connDbz.close();
connRoot.close();
}
connection.close();
}

// generates test cases for the risingwave debezium parser
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ public void testLines() throws Exception {
Assert.fail("validate rpc fail: " + e.getMessage());
} finally {
// cleanup
query = "DROP TABLE orders";
query = testClient.sqlStmts.getProperty("tpch.drop.orders");
SourceTestClient.performQuery(connection, query);
connection.close();
}
Expand Down Expand Up @@ -227,7 +227,7 @@ public void testPermissionCheck() throws SQLException {
Assert.fail("validate rpc fail: " + e.getMessage());
} finally {
// cleanup
query = "DROP TABLE orders";
query = testClient.sqlStmts.getProperty("tpch.drop.orders");
SourceTestClient.performQuery(connDbz, query);
query = "DROP USER debezium";
SourceTestClient.performQuery(connPg, query);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
mysql.bin_log=show variables like 'log_bin'
postgres.wal=show wal_level
tpch.create.orders=CREATE TABLE IF NOT EXISTS orders (O_ORDERKEY BIGINT NOT NULL, O_CUSTKEY BIGINT NOT NULL, O_ORDERSTATUS CHAR(1) NOT NULL, O_TOTALPRICE DECIMAL(15, 2) NOT NULL, O_ORDERDATE DATE NOT NULL, O_ORDERPRIORITY CHAR(15) NOT NULL, O_CLERK CHAR(15) NOT NULL, O_SHIPPRIORITY BIGINT NOT NULL, O_COMMENT VARCHAR(79) NOT NULL, PRIMARY KEY (O_ORDERKEY))
tpch.create.orders=CREATE TABLE IF NOT EXISTS orders (O_ORDERKEY BIGINT NOT NULL, O_CUSTKEY BIGINT NOT NULL, O_ORDERSTATUS CHAR(1) NOT NULL, O_TOTALPRICE DECIMAL(15, 2) NOT NULL, O_ORDERDATE DATE NOT NULL, O_ORDERPRIORITY CHAR(15) NOT NULL, O_CLERK CHAR(15) NOT NULL, O_SHIPPRIORITY BIGINT NOT NULL, O_COMMENT VARCHAR(79) NOT NULL, PRIMARY KEY (O_ORDERKEY))
tpch.drop.orders=DROP TABLE IF EXISTS orders

0 comments on commit f04829b

Please sign in to comment.