Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: #19020 Check permissions using native Postgres functions to support GROUPs and ROLEs based access control #19021

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion java/connector-node/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ Downloading and launching MinIO is a straightforward process. For PostgreSQL, I

```shell
# create postgresql in docker
docker run --name my-postgres -e POSTGRES_PASSWORD=connector -e POSTGRES_DB=test -e POSTGRES_USER=test -d -p 5432:5432 postgres
docker run --name my-postgres -e POSTGRES_PASSWORD=connector -e POSTGRES_DB=test -e POSTGRES_USER=test -d -p 5432:5432 postgres:16
# connect postgresql
psql -h localhost -p 5432 -U test -d postgres
```
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ public class DbzConnectorConfig {
public static final String PG_PUB_CREATE = "publication.create.enable";
public static final String PG_SCHEMA_NAME = "schema.name";
public static final String PG_SSL_ROOT_CERT = "ssl.root.cert";
public static final String PG_TEST_ONLY_FORCE_RDS = "test.only.force.rds";

/* Sql Server configs */
public static final String SQL_SERVER_SCHEMA_NAME = "schema.name";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,9 @@ public PostgresValidator(
var password = userProps.get(DbzConnectorConfig.PASSWORD);
this.jdbcConnection = DriverManager.getConnection(jdbcUrl, user, password);

this.isAwsRds = dbHost.contains(AWS_RDS_HOST);
this.isAwsRds =
dbHost.contains(AWS_RDS_HOST)
|| userProps.get(DbzConnectorConfig.PG_TEST_ONLY_FORCE_RDS).equals("true");
this.dbName = dbName;
this.user = user;
this.schemaName = userProps.get(DbzConnectorConfig.PG_SCHEMA_NAME);
Expand All @@ -86,8 +88,8 @@ public PostgresValidator(
@Override
public void validateDbConfig() {
try {
if (pgVersion > 16) {
throw ValidatorUtils.failedPrecondition("Postgres version should be less than 16.");
if (pgVersion >= 17) {
throw ValidatorUtils.failedPrecondition("Postgres version should be less than 17.");
}

try (var stmt = jdbcConnection.createStatement()) {
Expand Down Expand Up @@ -254,24 +256,17 @@ private void validatePrivileges() throws SQLException {
boolean isSuperUser = false;
if (this.isAwsRds) {
// check privileges for aws rds postgres
boolean hasReplicationRole;
boolean hasReplicationRole = false;
try (var stmt =
jdbcConnection.prepareStatement(
ValidatorUtils.getSql("postgres.rds.role.check"))) {
stmt.setString(1, this.user);
stmt.setString(2, this.user);
var res = stmt.executeQuery();
var hashSet = new HashSet<String>();
while (res.next()) {
// check rds_superuser role or rds_replication role is granted
var memberof = res.getArray("memberof");
if (memberof != null) {
var members = (String[]) memberof.getArray();
hashSet.addAll(Arrays.asList(members));
}
LOG.info("rds memberof: {}", hashSet);
isSuperUser = res.getBoolean(1);
hasReplicationRole = res.getBoolean(2);
}
isSuperUser = hashSet.contains("rds_superuser");
hasReplicationRole = hashSet.contains("rds_replication");
}

if (!isSuperUser && !hasReplicationRole) {
Expand Down Expand Up @@ -320,9 +315,8 @@ private void validateTablePrivileges(boolean isSuperUser) throws SQLException {
try (var stmt =
jdbcConnection.prepareStatement(
ValidatorUtils.getSql("postgres.table_read_privilege.check"))) {
stmt.setString(1, this.schemaName);
stmt.setString(2, this.tableName);
stmt.setString(3, this.user);
stmt.setString(1, this.user);
stmt.setString(2, this.schemaName + "." + this.tableName);
var res = stmt.executeQuery();
while (res.next()) {
if (!res.getBoolean(1)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ postgres.slot_limit.check=SELECT CASE WHEN (SELECT count(*) FROM pg_replication_
postgres.role.check=SELECT rolreplication OR rolsuper FROM pg_roles WHERE rolname = ?
postgres.superuser.check=SELECT rolsuper FROM pg_roles WHERE rolname = ?
postgres.database_privilege.check=SELECT has_database_privilege(?, ?, 'create') FROM pg_roles WHERE rolname = ?
postgres.table_read_privilege.check=SELECT (COUNT(*) = 1) FROM information_schema.role_table_grants WHERE table_schema = ? AND table_name = ? AND grantee = ? and privilege_type = 'SELECT'
postgres.table_read_privilege.check=SELECT has_table_privilege(?, ?, 'SELECT')
postgres.table_owner=SELECT tableowner FROM pg_tables WHERE schemaname = ? and tablename = ?
postgres.publication_att_exists=SELECT count(*) > 0 FROM information_schema.columns WHERE table_name = 'pg_publication_tables' AND column_name = 'attnames'
postgres.publication_attnames=SELECT attnames FROM pg_publication_tables WHERE schemaname = ? AND tablename = ? AND pubname = ?
Expand Down Expand Up @@ -51,4 +51,4 @@ sqlserver.has.perms=SELECT HAS_PERMS_BY_NAME('cdc.' + ct.capture_instance + '_CT
sqlserver.sql.agent.enabled=SELECT sys.fn_cdc_get_max_lsn()
sqlserver.case.sensitive=WITH collations AS (SELECT name, CASE WHEN description like '%case-insensitive%' THEN 0 WHEN description like '%case-sensitive%' THEN 1 END isCaseSensitive FROM sys.fn_helpcollations()) SELECT * FROM collations WHERE name = CONVERT(varchar, DATABASEPROPERTYEX( ? ,'collation'));
citus.distributed_table=select citus_table_type from citus_tables where table_name=?::regclass
postgres.rds.role.check=SELECT r.rolname, r.rolsuper, r.rolinherit, r.rolcreaterole, r.rolcreatedb, r.rolcanlogin, r.rolconnlimit, r.rolvaliduntil, ARRAY(SELECT b.rolname FROM pg_catalog.pg_auth_members m JOIN pg_catalog.pg_roles b ON (m.roleid = b.oid) WHERE m.member = r.oid) as memberof , r.rolreplication , r.rolbypassrls FROM pg_catalog.pg_roles r WHERE r.rolname = ?
postgres.rds.role.check=SELECT pg_has_role(?, 'rds_superuser', 'member') as is_rds_superuser, pg_has_role(?, 'rds_replication', 'member') as is_rds_replication;
Loading
Loading