From 1b1d2f1d0d9bbb22506a8ada5c0e12a91ba9ed2d Mon Sep 17 00:00:00 2001
From: Runji Wang <wangrunji0408@163.com>
Date: Sun, 18 Feb 2024 12:23:29 +0800
Subject: [PATCH] refactor(frontend): rewrite 3 system functions using context
 (#14960)

Signed-off-by: Runji Wang <wangrunji0408@163.com>
---
 proto/expr.proto                              |   3 +
 .../tests/testdata/output/pg_catalog.yaml     |  17 +-
 .../tests/testdata/output/subquery.yaml       | 141 +++++------
 src/frontend/src/binder/expr/function.rs      | 125 ++--------
 src/frontend/src/binder/expr/mod.rs           |  16 +-
 src/frontend/src/binder/select.rs             | 231 +-----------------
 .../system_catalog/pg_catalog/pg_user.rs      |   2 -
 .../rw_catalog/rw_table_stats.rs              |   2 -
 src/frontend/src/expr/function_call.rs        |   2 +-
 .../src/expr/function_impl/context.rs         |   1 +
 src/frontend/src/expr/function_impl/mod.rs    |   3 +
 .../src/expr/function_impl/pg_get_userbyid.rs |  31 +++
 .../src/expr/function_impl/pg_indexes_size.rs |  51 ++++
 .../expr/function_impl/pg_relation_size.rs    |  50 ++++
 src/frontend/src/expr/mod.rs                  |  20 ++
 src/frontend/src/expr/pure.rs                 |   3 +
 src/frontend/src/scheduler/local.rs           |  17 +-
 17 files changed, 262 insertions(+), 453 deletions(-)
 create mode 100644 src/frontend/src/expr/function_impl/pg_get_userbyid.rs
 create mode 100644 src/frontend/src/expr/function_impl/pg_indexes_size.rs
 create mode 100644 src/frontend/src/expr/function_impl/pg_relation_size.rs

diff --git a/proto/expr.proto b/proto/expr.proto
index 14f9eb8c102cd..4c9be4d15ea24 100644
--- a/proto/expr.proto
+++ b/proto/expr.proto
@@ -283,6 +283,9 @@ message ExprNode {
     PG_GET_INDEXDEF = 2400;
     COL_DESCRIPTION = 2401;
     PG_GET_VIEWDEF = 2402;
+    PG_GET_USERBYID = 2403;
+    PG_INDEXES_SIZE = 2404;
+    PG_RELATION_SIZE = 2405;
 
     // EXTERNAL
     ICEBERG_TRANSFORM = 2201;
diff --git a/src/frontend/planner_test/tests/testdata/output/pg_catalog.yaml b/src/frontend/planner_test/tests/testdata/output/pg_catalog.yaml
index 7842e311e47ae..b369b85de69f5 100644
--- a/src/frontend/planner_test/tests/testdata/output/pg_catalog.yaml
+++ b/src/frontend/planner_test/tests/testdata/output/pg_catalog.yaml
@@ -204,20 +204,11 @@
 - sql: |
     select pg_catalog.pg_get_userbyid(1)
   logical_plan: |-
-    LogicalProject { exprs: [rw_users.name] }
-    └─LogicalApply { type: LeftOuter, on: true, correlated_id: 1, max_one_row: true }
-      ├─LogicalValues { rows: [[]], schema: Schema { fields: [] } }
-      └─LogicalProject { exprs: [rw_users.name] }
-        └─LogicalFilter { predicate: (1:Int32 = rw_users.id) }
-          └─LogicalShare { id: 2 }
-            └─LogicalProject { exprs: [rw_users.id, rw_users.name, rw_users.create_db, rw_users.is_super, '********':Varchar] }
-              └─LogicalSysScan { table: rw_users, columns: [rw_users.id, rw_users.name, rw_users.is_super, rw_users.create_db, rw_users.create_user, rw_users.can_login] }
+    LogicalProject { exprs: [PgGetUserbyid(1:Int32) as $expr1] }
+    └─LogicalValues { rows: [[]], schema: Schema { fields: [] } }
   batch_plan: |-
-    BatchNestedLoopJoin { type: LeftOuter, predicate: true, output: all }
-    ├─BatchValues { rows: [[]] }
-    └─BatchProject { exprs: [rw_users.name] }
-      └─BatchFilter { predicate: (1:Int32 = rw_users.id) }
-        └─BatchScan { table: rw_users, columns: [rw_users.name, rw_users.id], distribution: Single }
+    BatchProject { exprs: [PgGetUserbyid(1:Int32) as $expr1] }
+    └─BatchValues { rows: [[]] }
 - sql: |
     select 'pg_namespace'::regclass
   logical_plan: |-
diff --git a/src/frontend/planner_test/tests/testdata/output/subquery.yaml b/src/frontend/planner_test/tests/testdata/output/subquery.yaml
index 6dda70d6f98ef..6196244e193a5 100644
--- a/src/frontend/planner_test/tests/testdata/output/subquery.yaml
+++ b/src/frontend/planner_test/tests/testdata/output/subquery.yaml
@@ -194,31 +194,21 @@
 - sql: |
     SELECT (SELECT pg_catalog.pg_get_userbyid(1))
   logical_plan: |-
-    LogicalProject { exprs: [rw_users.name] }
+    LogicalProject { exprs: [$expr1] }
     └─LogicalApply { type: LeftOuter, on: true, correlated_id: 1, max_one_row: true }
       ├─LogicalValues { rows: [[]], schema: Schema { fields: [] } }
-      └─LogicalProject { exprs: [rw_users.name] }
-        └─LogicalApply { type: LeftOuter, on: true, correlated_id: 2, max_one_row: true }
-          ├─LogicalValues { rows: [[]], schema: Schema { fields: [] } }
-          └─LogicalProject { exprs: [rw_users.name] }
-            └─LogicalFilter { predicate: (1:Int32 = rw_users.id) }
-              └─LogicalShare { id: 2 }
-                └─LogicalProject { exprs: [rw_users.id, rw_users.name, rw_users.create_db, rw_users.is_super, '********':Varchar] }
-                  └─LogicalSysScan { table: rw_users, columns: [rw_users.id, rw_users.name, rw_users.is_super, rw_users.create_db, rw_users.create_user, rw_users.can_login] }
+      └─LogicalProject { exprs: [PgGetUserbyid(1:Int32) as $expr1] }
+        └─LogicalValues { rows: [[]], schema: Schema { fields: [] } }
   optimized_logical_plan_for_batch: |-
     LogicalJoin { type: LeftOuter, on: true, output: all }
     ├─LogicalValues { rows: [[]], schema: Schema { fields: [] } }
-    └─LogicalJoin { type: LeftOuter, on: true, output: all }
-      ├─LogicalValues { rows: [[]], schema: Schema { fields: [] } }
-      └─LogicalSysScan { table: rw_users, output_columns: [rw_users.name], required_columns: [rw_users.name, rw_users.id], predicate: (1:Int32 = rw_users.id) }
+    └─LogicalProject { exprs: [PgGetUserbyid(1:Int32) as $expr1] }
+      └─LogicalValues { rows: [[]], schema: Schema { fields: [] } }
   batch_plan: |-
     BatchNestedLoopJoin { type: LeftOuter, predicate: true, output: all }
     ├─BatchValues { rows: [[]] }
-    └─BatchNestedLoopJoin { type: LeftOuter, predicate: true, output: all }
-      ├─BatchValues { rows: [[]] }
-      └─BatchProject { exprs: [rw_users.name] }
-        └─BatchFilter { predicate: (1:Int32 = rw_users.id) }
-          └─BatchScan { table: rw_users, columns: [rw_users.name, rw_users.id], distribution: Single }
+    └─BatchProject { exprs: [PgGetUserbyid(1:Int32) as $expr1] }
+      └─BatchValues { rows: [[]] }
 - sql: |
     SELECT n.nspname as "Schema",
     c.relname as "Name",
@@ -233,73 +223,62 @@
     AND pg_catalog.pg_table_is_visible(c.oid)
     ORDER BY 1,2;
   logical_plan: |-
-    LogicalProject { exprs: [rw_schemas.name, rw_tables.name, Case(($expr1 = 'r':Varchar), 'table':Varchar, ($expr1 = 'v':Varchar), 'view':Varchar, ($expr1 = 'm':Varchar), 'materialized view':Varchar, ($expr1 = 'i':Varchar), 'index':Varchar, ($expr1 = 'S':Varchar), 'sequence':Varchar, ($expr1 = 's':Varchar), 'special':Varchar, ($expr1 = 't':Varchar), 'TOAST table':Varchar, ($expr1 = 'f':Varchar), 'foreign table':Varchar, ($expr1 = 'p':Varchar), 'partitioned table':Varchar, ($expr1 = 'I':Varchar), 'partitioned index':Varchar) as $expr3, rw_users.name] }
-    └─LogicalApply { type: LeftOuter, on: true, correlated_id: 1, max_one_row: true }
-      ├─LogicalFilter { predicate: In($expr1, 'r':Varchar, 'p':Varchar, 'v':Varchar, 'm':Varchar, 'S':Varchar, 'f':Varchar, '':Varchar) AND (rw_schemas.name <> 'pg_catalog':Varchar) AND Not(RegexpEq(rw_schemas.name, '^pg_toast':Varchar)) AND (rw_schemas.name <> 'information_schema':Varchar) }
-      │ └─LogicalJoin { type: LeftOuter, on: (rw_schemas.id = rw_tables.schema_id), output: all }
-      │   ├─LogicalShare { id: 16 }
-      │   │ └─LogicalProject { exprs: [rw_tables.id, rw_tables.name, rw_tables.schema_id, rw_tables.owner, 'p':Varchar, Case(('table':Varchar = 'table':Varchar), 'r':Varchar, ('table':Varchar = 'system table':Varchar), 'r':Varchar, ('table':Varchar = 'index':Varchar), 'i':Varchar, ('table':Varchar = 'view':Varchar), 'v':Varchar, ('table':Varchar = 'materialized view':Varchar), 'm':Varchar) as $expr1, 0:Int32, 0:Int32, Array as $expr2] }
-      │   │   └─LogicalShare { id: 14 }
-      │   │     └─LogicalUnion { all: true }
-      │   │       ├─LogicalUnion { all: true }
-      │   │       │ ├─LogicalUnion { all: true }
-      │   │       │ │ ├─LogicalUnion { all: true }
-      │   │       │ │ │ ├─LogicalUnion { all: true }
-      │   │       │ │ │ │ ├─LogicalUnion { all: true }
-      │   │       │ │ │ │ │ ├─LogicalProject { exprs: [rw_tables.id, rw_tables.name, 'table':Varchar, rw_tables.schema_id, rw_tables.owner, rw_tables.definition, rw_tables.acl] }
-      │   │       │ │ │ │ │ │ └─LogicalSysScan { table: rw_tables, columns: [rw_tables.id, rw_tables.name, rw_tables.schema_id, rw_tables.owner, rw_tables.definition, rw_tables.acl, rw_tables.initialized_at, rw_tables.created_at, rw_tables.initialized_at_cluster_version, rw_tables.created_at_cluster_version] }
-      │   │       │ │ │ │ │ └─LogicalProject { exprs: [rw_system_tables.id, rw_system_tables.name, 'system table':Varchar, rw_system_tables.schema_id, rw_system_tables.owner, rw_system_tables.definition, rw_system_tables.acl] }
-      │   │       │ │ │ │ │   └─LogicalSysScan { table: rw_system_tables, columns: [rw_system_tables.id, rw_system_tables.name, rw_system_tables.schema_id, rw_system_tables.owner, rw_system_tables.definition, rw_system_tables.acl] }
-      │   │       │ │ │ │ └─LogicalProject { exprs: [rw_sources.id, rw_sources.name, 'source':Varchar, rw_sources.schema_id, rw_sources.owner, rw_sources.definition, rw_sources.acl] }
-      │   │       │ │ │ │   └─LogicalSysScan { table: rw_sources, columns: [rw_sources.id, rw_sources.name, rw_sources.schema_id, rw_sources.owner, rw_sources.connector, rw_sources.columns, rw_sources.format, rw_sources.row_encode, rw_sources.append_only, rw_sources.connection_id, rw_sources.definition, rw_sources.acl, rw_sources.initialized_at, rw_sources.created_at, rw_sources.initialized_at_cluster_version, rw_sources.created_at_cluster_version] }
-      │   │       │ │ │ └─LogicalProject { exprs: [rw_indexes.id, rw_indexes.name, 'index':Varchar, rw_indexes.schema_id, rw_indexes.owner, rw_indexes.definition, rw_indexes.acl] }
-      │   │       │ │ │   └─LogicalSysScan { table: rw_indexes, columns: [rw_indexes.id, rw_indexes.name, rw_indexes.primary_table_id, rw_indexes.indkey, rw_indexes.schema_id, rw_indexes.owner, rw_indexes.definition, rw_indexes.acl, rw_indexes.initialized_at, rw_indexes.created_at, rw_indexes.initialized_at_cluster_version, rw_indexes.created_at_cluster_version] }
-      │   │       │ │ └─LogicalProject { exprs: [rw_sinks.id, rw_sinks.name, 'sink':Varchar, rw_sinks.schema_id, rw_sinks.owner, rw_sinks.definition, rw_sinks.acl] }
-      │   │       │ │   └─LogicalSysScan { table: rw_sinks, columns: [rw_sinks.id, rw_sinks.name, rw_sinks.schema_id, rw_sinks.owner, rw_sinks.connector, rw_sinks.sink_type, rw_sinks.connection_id, rw_sinks.definition, rw_sinks.acl, rw_sinks.initialized_at, rw_sinks.created_at, rw_sinks.initialized_at_cluster_version, rw_sinks.created_at_cluster_version] }
-      │   │       │ └─LogicalProject { exprs: [rw_materialized_views.id, rw_materialized_views.name, 'materialized view':Varchar, rw_materialized_views.schema_id, rw_materialized_views.owner, rw_materialized_views.definition, rw_materialized_views.acl] }
-      │   │       │   └─LogicalSysScan { table: rw_materialized_views, columns: [rw_materialized_views.id, rw_materialized_views.name, rw_materialized_views.schema_id, rw_materialized_views.owner, rw_materialized_views.definition, rw_materialized_views.acl, rw_materialized_views.initialized_at, rw_materialized_views.created_at, rw_materialized_views.initialized_at_cluster_version, rw_materialized_views.created_at_cluster_version] }
-      │   │       └─LogicalProject { exprs: [rw_views.id, rw_views.name, 'view':Varchar, rw_views.schema_id, rw_views.owner, rw_views.definition, rw_views.acl] }
-      │   │         └─LogicalSysScan { table: rw_views, columns: [rw_views.id, rw_views.name, rw_views.schema_id, rw_views.owner, rw_views.definition, rw_views.acl] }
-      │   └─LogicalShare { id: 18 }
-      │     └─LogicalProject { exprs: [rw_schemas.id, rw_schemas.name, rw_schemas.owner, rw_schemas.acl] }
-      │       └─LogicalSysScan { table: rw_schemas, columns: [rw_schemas.id, rw_schemas.name, rw_schemas.owner, rw_schemas.acl] }
-      └─LogicalProject { exprs: [rw_users.name] }
-        └─LogicalFilter { predicate: (CorrelatedInputRef { index: 3, correlated_id: 1 } = rw_users.id) }
-          └─LogicalShare { id: 22 }
-            └─LogicalProject { exprs: [rw_users.id, rw_users.name, rw_users.create_db, rw_users.is_super, '********':Varchar] }
-              └─LogicalSysScan { table: rw_users, columns: [rw_users.id, rw_users.name, rw_users.is_super, rw_users.create_db, rw_users.create_user, rw_users.can_login] }
+    LogicalProject { exprs: [rw_schemas.name, rw_tables.name, Case(($expr1 = 'r':Varchar), 'table':Varchar, ($expr1 = 'v':Varchar), 'view':Varchar, ($expr1 = 'm':Varchar), 'materialized view':Varchar, ($expr1 = 'i':Varchar), 'index':Varchar, ($expr1 = 'S':Varchar), 'sequence':Varchar, ($expr1 = 's':Varchar), 'special':Varchar, ($expr1 = 't':Varchar), 'TOAST table':Varchar, ($expr1 = 'f':Varchar), 'foreign table':Varchar, ($expr1 = 'p':Varchar), 'partitioned table':Varchar, ($expr1 = 'I':Varchar), 'partitioned index':Varchar) as $expr3, PgGetUserbyid(rw_tables.owner) as $expr4] }
+    └─LogicalFilter { predicate: In($expr1, 'r':Varchar, 'p':Varchar, 'v':Varchar, 'm':Varchar, 'S':Varchar, 'f':Varchar, '':Varchar) AND (rw_schemas.name <> 'pg_catalog':Varchar) AND Not(RegexpEq(rw_schemas.name, '^pg_toast':Varchar)) AND (rw_schemas.name <> 'information_schema':Varchar) }
+      └─LogicalJoin { type: LeftOuter, on: (rw_schemas.id = rw_tables.schema_id), output: all }
+        ├─LogicalShare { id: 16 }
+        │ └─LogicalProject { exprs: [rw_tables.id, rw_tables.name, rw_tables.schema_id, rw_tables.owner, 'p':Varchar, Case(('table':Varchar = 'table':Varchar), 'r':Varchar, ('table':Varchar = 'system table':Varchar), 'r':Varchar, ('table':Varchar = 'index':Varchar), 'i':Varchar, ('table':Varchar = 'view':Varchar), 'v':Varchar, ('table':Varchar = 'materialized view':Varchar), 'm':Varchar) as $expr1, 0:Int32, 0:Int32, Array as $expr2] }
+        │   └─LogicalShare { id: 14 }
+        │     └─LogicalUnion { all: true }
+        │       ├─LogicalUnion { all: true }
+        │       │ ├─LogicalUnion { all: true }
+        │       │ │ ├─LogicalUnion { all: true }
+        │       │ │ │ ├─LogicalUnion { all: true }
+        │       │ │ │ │ ├─LogicalUnion { all: true }
+        │       │ │ │ │ │ ├─LogicalProject { exprs: [rw_tables.id, rw_tables.name, 'table':Varchar, rw_tables.schema_id, rw_tables.owner, rw_tables.definition, rw_tables.acl] }
+        │       │ │ │ │ │ │ └─LogicalSysScan { table: rw_tables, columns: [rw_tables.id, rw_tables.name, rw_tables.schema_id, rw_tables.owner, rw_tables.definition, rw_tables.acl, rw_tables.initialized_at, rw_tables.created_at, rw_tables.initialized_at_cluster_version, rw_tables.created_at_cluster_version] }
+        │       │ │ │ │ │ └─LogicalProject { exprs: [rw_system_tables.id, rw_system_tables.name, 'system table':Varchar, rw_system_tables.schema_id, rw_system_tables.owner, rw_system_tables.definition, rw_system_tables.acl] }
+        │       │ │ │ │ │   └─LogicalSysScan { table: rw_system_tables, columns: [rw_system_tables.id, rw_system_tables.name, rw_system_tables.schema_id, rw_system_tables.owner, rw_system_tables.definition, rw_system_tables.acl] }
+        │       │ │ │ │ └─LogicalProject { exprs: [rw_sources.id, rw_sources.name, 'source':Varchar, rw_sources.schema_id, rw_sources.owner, rw_sources.definition, rw_sources.acl] }
+        │       │ │ │ │   └─LogicalSysScan { table: rw_sources, columns: [rw_sources.id, rw_sources.name, rw_sources.schema_id, rw_sources.owner, rw_sources.connector, rw_sources.columns, rw_sources.format, rw_sources.row_encode, rw_sources.append_only, rw_sources.connection_id, rw_sources.definition, rw_sources.acl, rw_sources.initialized_at, rw_sources.created_at, rw_sources.initialized_at_cluster_version, rw_sources.created_at_cluster_version] }
+        │       │ │ │ └─LogicalProject { exprs: [rw_indexes.id, rw_indexes.name, 'index':Varchar, rw_indexes.schema_id, rw_indexes.owner, rw_indexes.definition, rw_indexes.acl] }
+        │       │ │ │   └─LogicalSysScan { table: rw_indexes, columns: [rw_indexes.id, rw_indexes.name, rw_indexes.primary_table_id, rw_indexes.indkey, rw_indexes.schema_id, rw_indexes.owner, rw_indexes.definition, rw_indexes.acl, rw_indexes.initialized_at, rw_indexes.created_at, rw_indexes.initialized_at_cluster_version, rw_indexes.created_at_cluster_version] }
+        │       │ │ └─LogicalProject { exprs: [rw_sinks.id, rw_sinks.name, 'sink':Varchar, rw_sinks.schema_id, rw_sinks.owner, rw_sinks.definition, rw_sinks.acl] }
+        │       │ │   └─LogicalSysScan { table: rw_sinks, columns: [rw_sinks.id, rw_sinks.name, rw_sinks.schema_id, rw_sinks.owner, rw_sinks.connector, rw_sinks.sink_type, rw_sinks.connection_id, rw_sinks.definition, rw_sinks.acl, rw_sinks.initialized_at, rw_sinks.created_at, rw_sinks.initialized_at_cluster_version, rw_sinks.created_at_cluster_version] }
+        │       │ └─LogicalProject { exprs: [rw_materialized_views.id, rw_materialized_views.name, 'materialized view':Varchar, rw_materialized_views.schema_id, rw_materialized_views.owner, rw_materialized_views.definition, rw_materialized_views.acl] }
+        │       │   └─LogicalSysScan { table: rw_materialized_views, columns: [rw_materialized_views.id, rw_materialized_views.name, rw_materialized_views.schema_id, rw_materialized_views.owner, rw_materialized_views.definition, rw_materialized_views.acl, rw_materialized_views.initialized_at, rw_materialized_views.created_at, rw_materialized_views.initialized_at_cluster_version, rw_materialized_views.created_at_cluster_version] }
+        │       └─LogicalProject { exprs: [rw_views.id, rw_views.name, 'view':Varchar, rw_views.schema_id, rw_views.owner, rw_views.definition, rw_views.acl] }
+        │         └─LogicalSysScan { table: rw_views, columns: [rw_views.id, rw_views.name, rw_views.schema_id, rw_views.owner, rw_views.definition, rw_views.acl] }
+        └─LogicalShare { id: 18 }
+          └─LogicalProject { exprs: [rw_schemas.id, rw_schemas.name, rw_schemas.owner, rw_schemas.acl] }
+            └─LogicalSysScan { table: rw_schemas, columns: [rw_schemas.id, rw_schemas.name, rw_schemas.owner, rw_schemas.acl] }
   batch_plan: |-
     BatchExchange { order: [rw_schemas.name ASC, rw_tables.name ASC], dist: Single }
-    └─BatchProject { exprs: [rw_schemas.name, rw_tables.name, Case(($expr1 = 'r':Varchar), 'table':Varchar, ($expr1 = 'v':Varchar), 'view':Varchar, ($expr1 = 'm':Varchar), 'materialized view':Varchar, ($expr1 = 'i':Varchar), 'index':Varchar, ($expr1 = 'S':Varchar), 'sequence':Varchar, ($expr1 = 's':Varchar), 'special':Varchar, ($expr1 = 't':Varchar), 'TOAST table':Varchar, ($expr1 = 'f':Varchar), 'foreign table':Varchar, ($expr1 = 'p':Varchar), 'partitioned table':Varchar, ($expr1 = 'I':Varchar), 'partitioned index':Varchar) as $expr2, rw_users.name] }
-      └─BatchProject { exprs: [rw_tables.name, Case(('table':Varchar = 'table':Varchar), 'r':Varchar, ('table':Varchar = 'system table':Varchar), 'r':Varchar, ('table':Varchar = 'index':Varchar), 'i':Varchar, ('table':Varchar = 'view':Varchar), 'v':Varchar, ('table':Varchar = 'materialized view':Varchar), 'm':Varchar) as $expr1, rw_schemas.name, rw_users.name] }
+    └─BatchProject { exprs: [rw_schemas.name, rw_tables.name, Case(($expr1 = 'r':Varchar), 'table':Varchar, ($expr1 = 'v':Varchar), 'view':Varchar, ($expr1 = 'm':Varchar), 'materialized view':Varchar, ($expr1 = 'i':Varchar), 'index':Varchar, ($expr1 = 'S':Varchar), 'sequence':Varchar, ($expr1 = 's':Varchar), 'special':Varchar, ($expr1 = 't':Varchar), 'TOAST table':Varchar, ($expr1 = 'f':Varchar), 'foreign table':Varchar, ($expr1 = 'p':Varchar), 'partitioned table':Varchar, ($expr1 = 'I':Varchar), 'partitioned index':Varchar) as $expr2, PgGetUserbyid(rw_tables.owner) as $expr3] }
+      └─BatchProject { exprs: [rw_tables.name, rw_tables.owner, Case(('table':Varchar = 'table':Varchar), 'r':Varchar, ('table':Varchar = 'system table':Varchar), 'r':Varchar, ('table':Varchar = 'index':Varchar), 'i':Varchar, ('table':Varchar = 'view':Varchar), 'v':Varchar, ('table':Varchar = 'materialized view':Varchar), 'm':Varchar) as $expr1, rw_schemas.name] }
         └─BatchSort { order: [rw_schemas.name ASC, rw_tables.name ASC] }
-          └─BatchHashJoin { type: LeftOuter, predicate: rw_tables.owner = rw_users.id, output: all }
-            ├─BatchExchange { order: [], dist: HashShard(rw_tables.owner) }
-            │ └─BatchHashJoin { type: Inner, predicate: rw_tables.schema_id = rw_schemas.id, output: all }
-            │   ├─BatchExchange { order: [], dist: HashShard(rw_tables.schema_id) }
-            │   │ └─BatchUnion { all: true }
-            │   │   ├─BatchProject { exprs: [rw_tables.name, 'table':Varchar, rw_tables.schema_id, rw_tables.owner] }
-            │   │   │ └─BatchScan { table: rw_tables, columns: [rw_tables.name, rw_tables.schema_id, rw_tables.owner], distribution: Single }
-            │   │   ├─BatchProject { exprs: [rw_system_tables.name, 'system table':Varchar, rw_system_tables.schema_id, rw_system_tables.owner] }
-            │   │   │ └─BatchScan { table: rw_system_tables, columns: [rw_system_tables.name, rw_system_tables.schema_id, rw_system_tables.owner], distribution: Single }
-            │   │   ├─BatchProject { exprs: [rw_sources.name, 'source':Varchar, rw_sources.schema_id, rw_sources.owner] }
-            │   │   │ └─BatchFilter { predicate: null:Boolean }
-            │   │   │   └─BatchScan { table: rw_sources, columns: [rw_sources.name, rw_sources.schema_id, rw_sources.owner], distribution: Single }
-            │   │   ├─BatchProject { exprs: [rw_indexes.name, 'index':Varchar, rw_indexes.schema_id, rw_indexes.owner] }
-            │   │   │ └─BatchValues { rows: [] }
-            │   │   ├─BatchProject { exprs: [rw_sinks.name, 'sink':Varchar, rw_sinks.schema_id, rw_sinks.owner] }
-            │   │   │ └─BatchFilter { predicate: null:Boolean }
-            │   │   │   └─BatchScan { table: rw_sinks, columns: [rw_sinks.name, rw_sinks.schema_id, rw_sinks.owner], distribution: Single }
-            │   │   ├─BatchProject { exprs: [rw_materialized_views.name, 'materialized view':Varchar, rw_materialized_views.schema_id, rw_materialized_views.owner] }
-            │   │   │ └─BatchScan { table: rw_materialized_views, columns: [rw_materialized_views.name, rw_materialized_views.schema_id, rw_materialized_views.owner], distribution: Single }
-            │   │   └─BatchProject { exprs: [rw_views.name, 'view':Varchar, rw_views.schema_id, rw_views.owner] }
-            │   │     └─BatchScan { table: rw_views, columns: [rw_views.name, rw_views.schema_id, rw_views.owner], distribution: Single }
-            │   └─BatchExchange { order: [], dist: HashShard(rw_schemas.id) }
-            │     └─BatchFilter { predicate: (rw_schemas.name <> 'pg_catalog':Varchar) AND Not(RegexpEq(rw_schemas.name, '^pg_toast':Varchar)) AND (rw_schemas.name <> 'information_schema':Varchar) }
-            │       └─BatchScan { table: rw_schemas, columns: [rw_schemas.id, rw_schemas.name], distribution: Single }
-            └─BatchExchange { order: [], dist: HashShard(rw_users.id) }
-              └─BatchProject { exprs: [rw_users.name, rw_users.id] }
-                └─BatchScan { table: rw_users, columns: [rw_users.id, rw_users.name], distribution: Single }
+          └─BatchHashJoin { type: Inner, predicate: rw_tables.schema_id = rw_schemas.id, output: all }
+            ├─BatchExchange { order: [], dist: HashShard(rw_tables.schema_id) }
+            │ └─BatchUnion { all: true }
+            │   ├─BatchProject { exprs: [rw_tables.name, 'table':Varchar, rw_tables.schema_id, rw_tables.owner] }
+            │   │ └─BatchScan { table: rw_tables, columns: [rw_tables.name, rw_tables.schema_id, rw_tables.owner], distribution: Single }
+            │   ├─BatchProject { exprs: [rw_system_tables.name, 'system table':Varchar, rw_system_tables.schema_id, rw_system_tables.owner] }
+            │   │ └─BatchScan { table: rw_system_tables, columns: [rw_system_tables.name, rw_system_tables.schema_id, rw_system_tables.owner], distribution: Single }
+            │   ├─BatchProject { exprs: [rw_sources.name, 'source':Varchar, rw_sources.schema_id, rw_sources.owner] }
+            │   │ └─BatchFilter { predicate: null:Boolean }
+            │   │   └─BatchScan { table: rw_sources, columns: [rw_sources.name, rw_sources.schema_id, rw_sources.owner], distribution: Single }
+            │   ├─BatchProject { exprs: [rw_indexes.name, 'index':Varchar, rw_indexes.schema_id, rw_indexes.owner] }
+            │   │ └─BatchValues { rows: [] }
+            │   ├─BatchProject { exprs: [rw_sinks.name, 'sink':Varchar, rw_sinks.schema_id, rw_sinks.owner] }
+            │   │ └─BatchFilter { predicate: null:Boolean }
+            │   │   └─BatchScan { table: rw_sinks, columns: [rw_sinks.name, rw_sinks.schema_id, rw_sinks.owner], distribution: Single }
+            │   ├─BatchProject { exprs: [rw_materialized_views.name, 'materialized view':Varchar, rw_materialized_views.schema_id, rw_materialized_views.owner] }
+            │   │ └─BatchScan { table: rw_materialized_views, columns: [rw_materialized_views.name, rw_materialized_views.schema_id, rw_materialized_views.owner], distribution: Single }
+            │   └─BatchProject { exprs: [rw_views.name, 'view':Varchar, rw_views.schema_id, rw_views.owner] }
+            │     └─BatchScan { table: rw_views, columns: [rw_views.name, rw_views.schema_id, rw_views.owner], distribution: Single }
+            └─BatchExchange { order: [], dist: HashShard(rw_schemas.id) }
+              └─BatchFilter { predicate: (rw_schemas.name <> 'pg_catalog':Varchar) AND Not(RegexpEq(rw_schemas.name, '^pg_toast':Varchar)) AND (rw_schemas.name <> 'information_schema':Varchar) }
+                └─BatchScan { table: rw_schemas, columns: [rw_schemas.id, rw_schemas.name], distribution: Single }
 - sql: |
     create table auction (date_time date);
     select * from hop( auction, auction.date_time, INTERVAL '1', INTERVAL '3600' ) AS hop_1
diff --git a/src/frontend/src/binder/expr/function.rs b/src/frontend/src/binder/expr/function.rs
index 477c493ef2ab9..b787632846e98 100644
--- a/src/frontend/src/binder/expr/function.rs
+++ b/src/frontend/src/binder/expr/function.rs
@@ -37,11 +37,11 @@ use risingwave_sqlparser::parser::ParserError;
 use thiserror_ext::AsReport;
 
 use crate::binder::bind_context::Clause;
-use crate::binder::{Binder, BoundQuery, BoundSetExpr, UdfContext};
+use crate::binder::{Binder, UdfContext};
 use crate::error::{ErrorCode, Result, RwError};
 use crate::expr::{
     AggCall, Expr, ExprImpl, ExprType, FunctionCall, FunctionCallWithLambda, Literal, Now, OrderBy,
-    Subquery, SubqueryKind, TableFunction, TableFunctionType, UserDefinedFunction, WindowFunction,
+    TableFunction, TableFunctionType, UserDefinedFunction, WindowFunction,
 };
 use crate::utils::Condition;
 
@@ -1230,112 +1230,27 @@ impl Binder {
                 ("current_role", current_user()),
                 ("current_user", current_user()),
                 ("user", current_user()),
-                ("pg_get_userbyid", guard_by_len(1, raw(|binder, inputs|{
-                        let input = &inputs[0];
-                        let bound_query = binder.bind_get_user_by_id_select(input)?;
-                        Ok(ExprImpl::Subquery(Box::new(Subquery::new(
-                            BoundQuery {
-                                body: BoundSetExpr::Select(Box::new(bound_query)),
-                                order: vec![],
-                                limit: None,
-                                offset: None,
-                                with_ties: false,
-                                extra_order_exprs: vec![],
-                            },
-                            SubqueryKind::Scalar,
-                        ))))
-                    }
-                ))),
+                ("pg_get_userbyid", raw_call(ExprType::PgGetUserbyid)),
                 ("pg_get_indexdef", raw_call(ExprType::PgGetIndexdef)),
                 ("pg_get_viewdef", raw_call(ExprType::PgGetViewdef)),
-                ("pg_relation_size", dispatch_by_len(vec![
-                    (1, raw(|binder, inputs|{
-                        let table_name = &inputs[0];
-                        let bound_query = binder.bind_get_table_size_select("pg_relation_size", table_name)?;
-                        Ok(ExprImpl::Subquery(Box::new(Subquery::new(
-                            BoundQuery {
-                                body: BoundSetExpr::Select(Box::new(bound_query)),
-                                order: vec![],
-                                limit: None,
-                                offset: None,
-                                with_ties: false,
-                                extra_order_exprs: vec![],
-                            },
-                            SubqueryKind::Scalar,
-                        ))))
-                    })),
-                    (2, raw(|binder, inputs|{
-                        let table_name = &inputs[0];
-                        match inputs[1].as_literal() {
-                            Some(literal) if literal.return_type() == DataType::Varchar => {
-                                match literal
-                                     .get_data()
-                                     .as_ref()
-                                     .expect("ExprImpl value is a Literal but cannot get ref to data")
-                                     .as_utf8()
-                                     .as_ref() {
-                                        "main" => {
-                                            let bound_query = binder.bind_get_table_size_select("pg_relation_size", table_name)?;
-                                            Ok(ExprImpl::Subquery(Box::new(Subquery::new(
-                                                BoundQuery {
-                                                    body: BoundSetExpr::Select(Box::new(bound_query)),
-                                                    order: vec![],
-                                                    limit: None,
-                                                    offset: None,
-                                                    with_ties: false,
-                                                    extra_order_exprs: vec![],
-                                                },
-                                                SubqueryKind::Scalar,
-                                            ))))
-                                        },
-                                        // These options are invalid in RW so we return 0 value as the result
-                                        "fsm"|"vm"|"init" => {
-                                            Ok(ExprImpl::literal_int(0))
-                                        },
-                                        _ => Err(ErrorCode::InvalidInputSyntax(
-                                            "invalid fork name. Valid fork names are \"main\", \"fsm\", \"vm\", and \"init\"".into()).into())
-                                    }
-                            },
-                            _ => Err(ErrorCode::ExprError(
-                                "The 2nd argument of `pg_relation_size` must be string literal.".into(),
-                            )
-                            .into())
-                        }
-                    })),
-                    ]
-                )),
-                ("pg_table_size", guard_by_len(1, raw(|binder, inputs|{
-                        let input = &inputs[0];
-                        let bound_query = binder.bind_get_table_size_select("pg_table_size", input)?;
-                        Ok(ExprImpl::Subquery(Box::new(Subquery::new(
-                            BoundQuery {
-                                body: BoundSetExpr::Select(Box::new(bound_query)),
-                                order: vec![],
-                                limit: None,
-                                offset: None,
-                                with_ties: false,
-                                extra_order_exprs: vec![],
-                            },
-                            SubqueryKind::Scalar,
-                        ))))
-                    }
-                ))),
-                ("pg_indexes_size", guard_by_len(1, raw(|binder, inputs|{
-                        let input = &inputs[0];
-                        let bound_query = binder.bind_get_indexes_size_select(input)?;
-                        Ok(ExprImpl::Subquery(Box::new(Subquery::new(
-                            BoundQuery {
-                                body: BoundSetExpr::Select(Box::new(bound_query)),
-                                order: vec![],
-                                limit: None,
-                                offset: None,
-                                with_ties: false,
-                                extra_order_exprs: vec![],
-                            },
-                            SubqueryKind::Scalar,
-                        ))))
+                ("pg_relation_size", raw(|_binder, mut inputs|{
+                    if inputs.is_empty() {
+                        return Err(ErrorCode::ExprError(
+                            "function pg_relation_size() does not exist".into(),
+                        )
+                        .into());
                     }
-                ))),
+                    inputs[0].cast_to_regclass_mut()?;
+                    Ok(FunctionCall::new(ExprType::PgRelationSize, inputs)?.into())
+                })),
+                ("pg_table_size", guard_by_len(1, raw(|_binder, mut inputs|{
+                    inputs[0].cast_to_regclass_mut()?;
+                    Ok(FunctionCall::new(ExprType::PgRelationSize, inputs)?.into())
+                }))),
+                ("pg_indexes_size", guard_by_len(1, raw(|_binder, mut inputs|{
+                    inputs[0].cast_to_regclass_mut()?;
+                    Ok(FunctionCall::new(ExprType::PgIndexesSize, inputs)?.into())
+                }))),
                 ("pg_get_expr", raw(|_binder, inputs|{
                     if inputs.len() == 2 || inputs.len() == 3 {
                         // TODO: implement pg_get_expr rather than just return empty as an workaround.
diff --git a/src/frontend/src/binder/expr/mod.rs b/src/frontend/src/binder/expr/mod.rs
index 2baf9cb1f84e7..0bc12545984ca 100644
--- a/src/frontend/src/binder/expr/mod.rs
+++ b/src/frontend/src/binder/expr/mod.rs
@@ -612,23 +612,9 @@ impl Binder {
         match &data_type {
             // Casting to Regclass type means getting the oid of expr.
             // See https://www.postgresql.org/docs/current/datatype-oid.html.
-            // Currently only string liter expr is supported since we cannot handle subquery in join
-            // on condition: https://github.com/risingwavelabs/risingwave/issues/6852
-            // TODO: Add generic expr support when needed
             AstDataType::Regclass => {
                 let input = self.bind_expr_inner(expr)?;
-                match input.return_type() {
-                    DataType::Varchar => Ok(ExprImpl::FunctionCall(Box::new(
-                        FunctionCall::new_unchecked(
-                            ExprType::CastRegclass,
-                            vec![input],
-                            DataType::Int32,
-                        ),
-                    ))),
-                    DataType::Int32 => Ok(input),
-                    dt if dt.is_int() => Ok(input.cast_explicit(DataType::Int32)?),
-                    _ => Err(ErrorCode::BindError("Unsupported input type".to_string()).into()),
-                }
+                Ok(input.cast_to_regclass()?)
             }
             AstDataType::Regproc => {
                 let lhs = self.bind_expr_inner(expr)?;
diff --git a/src/frontend/src/binder/select.rs b/src/frontend/src/binder/select.rs
index b9689e007297e..d9848ed769732 100644
--- a/src/frontend/src/binder/select.rs
+++ b/src/frontend/src/binder/select.rs
@@ -16,13 +16,11 @@ use std::collections::{HashMap, HashSet};
 use std::fmt::Debug;
 
 use itertools::Itertools;
-use risingwave_common::catalog::{Field, Schema, PG_CATALOG_SCHEMA_NAME, RW_CATALOG_SCHEMA_NAME};
-use risingwave_common::types::{DataType, ScalarImpl};
+use risingwave_common::catalog::{Field, Schema};
+use risingwave_common::types::ScalarImpl;
 use risingwave_common::util::iter_util::ZipEqFast;
-use risingwave_expr::aggregate::AggKind;
 use risingwave_sqlparser::ast::{
-    BinaryOperator, DataType as AstDataType, Distinct, Expr, Ident, Join, JoinConstraint,
-    JoinOperator, ObjectName, Select, SelectItem, TableFactor, TableWithJoins, Value,
+    DataType as AstDataType, Distinct, Expr, Select, SelectItem, Value,
 };
 
 use super::bind_context::{Clause, ColumnBinding};
@@ -30,18 +28,8 @@ use super::statement::RewriteExprsRecursive;
 use super::UNNAMED_COLUMN;
 use crate::binder::{Binder, Relation};
 use crate::catalog::check_valid_column_name;
-use crate::catalog::system_catalog::pg_catalog::{
-    PG_INDEX_COLUMNS, PG_INDEX_TABLE_NAME, PG_USER_ID_INDEX, PG_USER_NAME_INDEX, PG_USER_TABLE_NAME,
-};
-use crate::catalog::system_catalog::rw_catalog::{
-    RW_TABLE_STATS_COLUMNS, RW_TABLE_STATS_KEY_SIZE_INDEX, RW_TABLE_STATS_TABLE_ID_INDEX,
-    RW_TABLE_STATS_TABLE_NAME, RW_TABLE_STATS_VALUE_SIZE_INDEX,
-};
 use crate::error::{ErrorCode, Result, RwError};
-use crate::expr::{
-    AggCall, CorrelatedId, CorrelatedInputRef, Depth, Expr as _, ExprImpl, ExprType, FunctionCall,
-    InputRef,
-};
+use crate::expr::{CorrelatedId, Depth, Expr as _, ExprImpl, ExprType, FunctionCall, InputRef};
 use crate::utils::group_by::GroupBy;
 
 #[derive(Debug, Clone)]
@@ -530,217 +518,6 @@ impl Binder {
         Ok((returning_list, fields))
     }
 
-    /// `bind_get_user_by_id_select` binds a select statement that returns a single user name by id,
-    /// this is used for function `pg_catalog.get_user_by_id()`.
-    pub fn bind_get_user_by_id_select(&mut self, input: &ExprImpl) -> Result<BoundSelect> {
-        let select_items = vec![InputRef::new(PG_USER_NAME_INDEX, DataType::Varchar).into()];
-        let schema = Schema {
-            fields: vec![Field::with_name(
-                DataType::Varchar,
-                UNNAMED_COLUMN.to_string(),
-            )],
-        };
-        let input = match input {
-            ExprImpl::InputRef(input_ref) => {
-                CorrelatedInputRef::new(input_ref.index(), input_ref.return_type(), 1).into()
-            }
-            ExprImpl::CorrelatedInputRef(col_input_ref) => CorrelatedInputRef::new(
-                col_input_ref.index(),
-                col_input_ref.return_type(),
-                col_input_ref.depth() + 1,
-            )
-            .into(),
-            ExprImpl::Literal(_) => input.clone(),
-            _ => return Err(ErrorCode::BindError("Unsupported input type".to_string()).into()),
-        };
-        let from = Some(self.bind_relation_by_name_inner(
-            Some(PG_CATALOG_SCHEMA_NAME),
-            PG_USER_TABLE_NAME,
-            None,
-            false,
-        )?);
-        let where_clause = Some(
-            FunctionCall::new(
-                ExprType::Equal,
-                vec![
-                    input,
-                    InputRef::new(PG_USER_ID_INDEX, DataType::Int32).into(),
-                ],
-            )?
-            .into(),
-        );
-
-        Ok(BoundSelect {
-            distinct: BoundDistinct::All,
-            select_items,
-            aliases: vec![None],
-            from,
-            where_clause,
-            group_by: GroupBy::GroupKey(vec![]),
-            having: None,
-            schema,
-        })
-    }
-
-    /// This returns the size of all the indexes that are on the specified table.
-    pub fn bind_get_indexes_size_select(&mut self, table: &ExprImpl) -> Result<BoundSelect> {
-        // this function is implemented with the following query:
-        //     SELECT sum(total_key_size + total_value_size)
-        //     FROM rw_catalog.rw_table_stats as stats
-        //     JOIN pg_index on stats.id = pg_index.indexrelid
-        //     WHERE pg_index.indrelid = 'table_name'::regclass
-
-        let indexrelid_col = PG_INDEX_COLUMNS[0].1;
-        let tbl_stats_id_col = RW_TABLE_STATS_COLUMNS[0].1;
-
-        // Filter to only the Indexes on this table
-        let table_id = self.table_id_query(table)?;
-
-        let constraint = JoinConstraint::On(Expr::BinaryOp {
-            left: Box::new(Expr::Identifier(Ident::new_unchecked(tbl_stats_id_col))),
-            op: BinaryOperator::Eq,
-            right: Box::new(Expr::Identifier(Ident::new_unchecked(indexrelid_col))),
-        });
-        let indexes_with_stats = self.bind_table_with_joins(TableWithJoins {
-            relation: TableFactor::Table {
-                name: ObjectName(vec![
-                    RW_CATALOG_SCHEMA_NAME.into(),
-                    RW_TABLE_STATS_TABLE_NAME.into(),
-                ]),
-                alias: None,
-                for_system_time_as_of_proctime: false,
-            },
-            joins: vec![Join {
-                relation: TableFactor::Table {
-                    name: ObjectName(vec![
-                        PG_CATALOG_SCHEMA_NAME.into(),
-                        PG_INDEX_TABLE_NAME.into(),
-                    ]),
-                    alias: None,
-                    for_system_time_as_of_proctime: false,
-                },
-                join_operator: JoinOperator::Inner(constraint),
-            }],
-        })?;
-
-        // Get the size of an index by adding the size of the keys and the size of the values
-        let sum = FunctionCall::new(
-            ExprType::Add,
-            vec![
-                InputRef::new(RW_TABLE_STATS_KEY_SIZE_INDEX, DataType::Int64).into(),
-                InputRef::new(RW_TABLE_STATS_VALUE_SIZE_INDEX, DataType::Int64).into(),
-            ],
-        )?
-        .into();
-
-        // There could be multiple indexes on a table so aggregate the sizes of all indexes
-        let select_items: Vec<ExprImpl> =
-            vec![AggCall::new_unchecked(AggKind::Sum0, vec![sum], DataType::Int64)?.into()];
-
-        let indrelid_col = PG_INDEX_COLUMNS[1].1;
-        let indrelid_ref = self.bind_column(&[indrelid_col.into()])?;
-        let where_clause: Option<ExprImpl> =
-            Some(FunctionCall::new(ExprType::Equal, vec![indrelid_ref, table_id])?.into());
-
-        // define the output schema
-        let result_schema = Schema {
-            fields: vec![Field::with_name(
-                DataType::Int64,
-                "pg_indexes_size".to_string(),
-            )],
-        };
-
-        Ok(BoundSelect {
-            distinct: BoundDistinct::All,
-            select_items,
-            aliases: vec![None],
-            from: Some(indexes_with_stats),
-            where_clause,
-            group_by: GroupBy::GroupKey(vec![]),
-            having: None,
-            schema: result_schema,
-        })
-    }
-
-    pub fn bind_get_table_size_select(
-        &mut self,
-        output_name: &str,
-        table: &ExprImpl,
-    ) -> Result<BoundSelect> {
-        // define the output schema
-        let result_schema = Schema {
-            fields: vec![Field::with_name(DataType::Int64, output_name.to_string())],
-        };
-
-        // Get table stats data
-        let from = Some(self.bind_relation_by_name_inner(
-            Some(RW_CATALOG_SCHEMA_NAME),
-            RW_TABLE_STATS_TABLE_NAME,
-            None,
-            false,
-        )?);
-
-        let table_id = self.table_id_query(table)?;
-
-        // Filter to only the Indexes on this table
-        let where_clause: Option<ExprImpl> = Some(
-            FunctionCall::new(
-                ExprType::Equal,
-                vec![
-                    table_id,
-                    InputRef::new(RW_TABLE_STATS_TABLE_ID_INDEX, DataType::Int32).into(),
-                ],
-            )?
-            .into(),
-        );
-
-        // Add the space used by keys and the space used by values to get the total space used by
-        // the table
-        let key_value_size_sum = FunctionCall::new(
-            ExprType::Add,
-            vec![
-                InputRef::new(RW_TABLE_STATS_KEY_SIZE_INDEX, DataType::Int64).into(),
-                InputRef::new(RW_TABLE_STATS_VALUE_SIZE_INDEX, DataType::Int64).into(),
-            ],
-        )?
-        .into();
-        let select_items = vec![key_value_size_sum];
-
-        Ok(BoundSelect {
-            distinct: BoundDistinct::All,
-            select_items,
-            aliases: vec![None],
-            from,
-            where_clause,
-            group_by: GroupBy::GroupKey(vec![]),
-            having: None,
-            schema: result_schema,
-        })
-    }
-
-    /// Given literal varchar this will return the Object ID of the table or index whose
-    /// name matches the varchar.  Given a literal integer, this will return the integer regardless
-    /// of whether an object exists with an Object ID that matches the integer.
-    fn table_id_query(&mut self, table: &ExprImpl) -> Result<ExprImpl> {
-        match table.as_literal() {
-            Some(literal) if literal.return_type().is_int() => Ok(table.clone()),
-            Some(literal) if literal.return_type() == DataType::Varchar => {
-                let table_name = literal
-                    .get_data()
-                    .as_ref()
-                    .expect("ExprImpl value is a Literal but cannot get ref to data")
-                    .as_utf8();
-                self.bind_cast(
-                    Expr::Value(Value::SingleQuotedString(table_name.to_string())),
-                    AstDataType::Regclass,
-                )
-            }
-            _ => Err(RwError::from(ErrorCode::ExprError(
-                "Expected an integer or varchar literal".into(),
-            ))),
-        }
-    }
-
     pub fn iter_bound_columns<'a>(
         column_binding: impl Iterator<Item = &'a ColumnBinding>,
     ) -> (Vec<ExprImpl>, Vec<Option<String>>) {
diff --git a/src/frontend/src/catalog/system_catalog/pg_catalog/pg_user.rs b/src/frontend/src/catalog/system_catalog/pg_catalog/pg_user.rs
index bb1321a795408..20b9c7a04d9cb 100644
--- a/src/frontend/src/catalog/system_catalog/pg_catalog/pg_user.rs
+++ b/src/frontend/src/catalog/system_catalog/pg_catalog/pg_user.rs
@@ -22,8 +22,6 @@ use crate::catalog::system_catalog::BuiltinView;
 /// The catalog `pg_user` provides access to information about database users.
 /// Ref: [`https://www.postgresql.org/docs/current/view-pg-user.html`]
 pub const PG_USER_TABLE_NAME: &str = "pg_user";
-pub const PG_USER_ID_INDEX: usize = 0;
-pub const PG_USER_NAME_INDEX: usize = 1;
 
 pub static PG_USER: LazyLock<BuiltinView> = LazyLock::new(|| BuiltinView {
     name: PG_USER_TABLE_NAME,
diff --git a/src/frontend/src/catalog/system_catalog/rw_catalog/rw_table_stats.rs b/src/frontend/src/catalog/system_catalog/rw_catalog/rw_table_stats.rs
index eeae9167d1af8..189ecdb4a1725 100644
--- a/src/frontend/src/catalog/system_catalog/rw_catalog/rw_table_stats.rs
+++ b/src/frontend/src/catalog/system_catalog/rw_catalog/rw_table_stats.rs
@@ -21,8 +21,6 @@ use crate::error::Result;
 
 pub const RW_TABLE_STATS_TABLE_NAME: &str = "rw_table_stats";
 pub const RW_TABLE_STATS_TABLE_ID_INDEX: usize = 0;
-pub const RW_TABLE_STATS_KEY_SIZE_INDEX: usize = 2;
-pub const RW_TABLE_STATS_VALUE_SIZE_INDEX: usize = 3;
 
 pub const RW_TABLE_STATS_COLUMNS: &[SystemCatalogColumnsDef<'_>] = &[
     (DataType::Int32, "id"),
diff --git a/src/frontend/src/expr/function_call.rs b/src/frontend/src/expr/function_call.rs
index 5868c74355ccb..af1f84b321eb5 100644
--- a/src/frontend/src/expr/function_call.rs
+++ b/src/frontend/src/expr/function_call.rs
@@ -425,7 +425,7 @@ pub fn is_row_function(expr: &ExprImpl) -> bool {
 
 #[derive(Debug, Error)]
 #[error("{0}")]
-pub struct CastError(String);
+pub struct CastError(pub(super) String);
 
 impl From<CastError> for ErrorCode {
     fn from(value: CastError) -> Self {
diff --git a/src/frontend/src/expr/function_impl/context.rs b/src/frontend/src/expr/function_impl/context.rs
index 1a5ecf88ce444..74cc5001043ad 100644
--- a/src/frontend/src/expr/function_impl/context.rs
+++ b/src/frontend/src/expr/function_impl/context.rs
@@ -22,6 +22,7 @@ use crate::session::AuthContext;
 // Only for local mode.
 define_context! {
     pub(super) CATALOG_READER: crate::catalog::CatalogReader,
+    pub(super) USER_INFO_READER: crate::user::user_service::UserInfoReader,
     pub(super) AUTH_CONTEXT: Arc<AuthContext>,
     pub(super) DB_NAME: String,
     pub(super) SEARCH_PATH: SearchPath,
diff --git a/src/frontend/src/expr/function_impl/mod.rs b/src/frontend/src/expr/function_impl/mod.rs
index 0541c392d1878..ad0b3b7a853d8 100644
--- a/src/frontend/src/expr/function_impl/mod.rs
+++ b/src/frontend/src/expr/function_impl/mod.rs
@@ -16,4 +16,7 @@ mod cast_regclass;
 mod col_description;
 pub mod context;
 mod pg_get_indexdef;
+mod pg_get_userbyid;
 mod pg_get_viewdef;
+mod pg_indexes_size;
+mod pg_relation_size;
diff --git a/src/frontend/src/expr/function_impl/pg_get_userbyid.rs b/src/frontend/src/expr/function_impl/pg_get_userbyid.rs
new file mode 100644
index 0000000000000..175d07e6aef73
--- /dev/null
+++ b/src/frontend/src/expr/function_impl/pg_get_userbyid.rs
@@ -0,0 +1,31 @@
+// Copyright 2024 RisingWave Labs
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+use risingwave_expr::{capture_context, function, Result};
+
+use super::context::USER_INFO_READER;
+use crate::user::user_service::UserInfoReader;
+
+#[function("pg_get_userbyid(int4) -> varchar")]
+fn pg_get_userbyid(oid: i32) -> Result<Option<Box<str>>> {
+    pg_get_userbyid_impl_captured(oid)
+}
+
+#[capture_context(USER_INFO_READER)]
+fn pg_get_userbyid_impl(reader: &UserInfoReader, oid: i32) -> Result<Option<Box<str>>> {
+    Ok(reader
+        .read_guard()
+        .get_user_name_by_id(oid as u32)
+        .map(|s| s.into_boxed_str()))
+}
diff --git a/src/frontend/src/expr/function_impl/pg_indexes_size.rs b/src/frontend/src/expr/function_impl/pg_indexes_size.rs
new file mode 100644
index 0000000000000..cbcf631d415ee
--- /dev/null
+++ b/src/frontend/src/expr/function_impl/pg_indexes_size.rs
@@ -0,0 +1,51 @@
+// Copyright 2024 RisingWave Labs
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+use risingwave_expr::{capture_context, function, ExprError, Result};
+use thiserror_ext::AsReport;
+
+use super::context::{CATALOG_READER, DB_NAME};
+use crate::catalog::CatalogReader;
+
+/// Computes the total disk space used by indexes attached to the specified table.
+#[function("pg_indexes_size(int4) -> int8")]
+fn pg_indexes_size(oid: i32) -> Result<i64> {
+    pg_indexes_size_impl_captured(oid)
+}
+
+#[capture_context(CATALOG_READER, DB_NAME)]
+fn pg_indexes_size_impl(catalog: &CatalogReader, db_name: &str, oid: i32) -> Result<i64> {
+    let catalog = catalog.read_guard();
+    let database = catalog
+        .get_database_by_name(db_name)
+        .map_err(|e| ExprError::InvalidParam {
+            name: "oid",
+            reason: e.to_report_string().into(),
+        })?;
+    let mut sum = 0;
+    for schema in database.iter_schemas() {
+        for index in schema.iter_index() {
+            if index.primary_table.id().table_id == oid as u32 {
+                if let Some(table_stats) = catalog
+                    .table_stats()
+                    .table_stats
+                    .get(&index.primary_table.id().table_id)
+                {
+                    sum += table_stats.total_key_size + table_stats.total_value_size;
+                }
+            }
+        }
+    }
+    Ok(sum)
+}
diff --git a/src/frontend/src/expr/function_impl/pg_relation_size.rs b/src/frontend/src/expr/function_impl/pg_relation_size.rs
new file mode 100644
index 0000000000000..b851688ceedec
--- /dev/null
+++ b/src/frontend/src/expr/function_impl/pg_relation_size.rs
@@ -0,0 +1,50 @@
+// Copyright 2024 RisingWave Labs
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+use risingwave_expr::{capture_context, function, ExprError, Result};
+
+use super::context::CATALOG_READER;
+use crate::catalog::CatalogReader;
+
+/// Computes the disk space used by one “fork” of the specified relation.
+#[function("pg_relation_size(int4) -> int8")]
+fn pg_relation_size(oid: i32) -> Result<i64> {
+    pg_relation_size_impl_captured(oid, "main")
+}
+
+#[function("pg_relation_size(int4, varchar) -> int8")]
+fn pg_relation_size_fork(oid: i32, fork: &str) -> Result<i64> {
+    pg_relation_size_impl_captured(oid, fork)
+}
+
+#[capture_context(CATALOG_READER)]
+fn pg_relation_size_impl(catalog: &CatalogReader, oid: i32, fork: &str) -> Result<i64> {
+    match fork {
+        "main" => {}
+        // These options are invalid in RW so we return 0 value as the result
+        "fsm" | "vm" | "init" => return Ok(0),
+        _ => return Err(ExprError::InvalidParam {
+            name: "fork",
+            reason:
+                "invalid fork name. Valid fork names are \"main\", \"fsm\", \"vm\", and \"init\""
+                    .into(),
+        }),
+    }
+    let catalog = catalog.read_guard();
+    if let Some(stats) = catalog.table_stats().table_stats.get(&(oid as u32)) {
+        Ok(stats.total_key_size + stats.total_value_size)
+    } else {
+        Ok(0)
+    }
+}
diff --git a/src/frontend/src/expr/mod.rs b/src/frontend/src/expr/mod.rs
index 1241ef6e4e288..78ae2db726a39 100644
--- a/src/frontend/src/expr/mod.rs
+++ b/src/frontend/src/expr/mod.rs
@@ -265,6 +265,26 @@ impl ExprImpl {
         FunctionCall::cast_mut(self, target, CastContext::Explicit)
     }
 
+    /// Casting to Regclass type means getting the oid of expr.
+    /// See <https://www.postgresql.org/docs/current/datatype-oid.html>
+    pub fn cast_to_regclass(self) -> Result<ExprImpl, CastError> {
+        match self.return_type() {
+            DataType::Varchar => Ok(ExprImpl::FunctionCall(Box::new(
+                FunctionCall::new_unchecked(ExprType::CastRegclass, vec![self], DataType::Int32),
+            ))),
+            DataType::Int32 => Ok(self),
+            dt if dt.is_int() => Ok(self.cast_explicit(DataType::Int32)?),
+            _ => Err(CastError("Unsupported input type".to_string())),
+        }
+    }
+
+    /// Shorthand to inplace cast expr to `regclass` type.
+    pub fn cast_to_regclass_mut(&mut self) -> Result<(), CastError> {
+        let owned = std::mem::replace(self, ExprImpl::literal_bool(false));
+        *self = owned.cast_to_regclass()?;
+        Ok(())
+    }
+
     /// Ensure the return type of this expression is an array of some type.
     pub fn ensure_array_type(&self) -> Result<(), ErrorCode> {
         if self.is_untyped() {
diff --git a/src/frontend/src/expr/pure.rs b/src/frontend/src/expr/pure.rs
index c50f1cc2460b8..5528b4614c355 100644
--- a/src/frontend/src/expr/pure.rs
+++ b/src/frontend/src/expr/pure.rs
@@ -250,6 +250,9 @@ impl ExprVisitor for ImpureAnalyzer {
             | expr_node::Type::PgGetIndexdef
             | expr_node::Type::ColDescription
             | expr_node::Type::PgGetViewdef
+            | expr_node::Type::PgGetUserbyid
+            | expr_node::Type::PgIndexesSize
+            | expr_node::Type::PgRelationSize
             | expr_node::Type::MakeTimestamptz => self.impure = true,
         }
     }
diff --git a/src/frontend/src/scheduler/local.rs b/src/frontend/src/scheduler/local.rs
index 2cb642dc9054c..c155cfe9aa234 100644
--- a/src/frontend/src/scheduler/local.rs
+++ b/src/frontend/src/scheduler/local.rs
@@ -20,7 +20,7 @@ use std::time::Duration;
 
 use anyhow::anyhow;
 use futures::stream::BoxStream;
-use futures::StreamExt;
+use futures::{FutureExt, StreamExt};
 use futures_async_stream::try_stream;
 use itertools::Itertools;
 use pgwire::pg_server::BoxedError;
@@ -147,6 +147,7 @@ impl LocalQueryExecution {
         let shutdown_rx = self.shutdown_rx().clone();
 
         let catalog_reader = self.front_env.catalog_reader().clone();
+        let user_info_reader = self.front_env.user_info_reader().clone();
         let auth_context = self.session.auth_context().clone();
         let db_name = self.session.database().to_string();
         let search_path = self.session.config().search_path();
@@ -173,14 +174,16 @@ impl LocalQueryExecution {
         use risingwave_expr::expr_context::TIME_ZONE;
 
         use crate::expr::function_impl::context::{
-            AUTH_CONTEXT, CATALOG_READER, DB_NAME, SEARCH_PATH,
+            AUTH_CONTEXT, CATALOG_READER, DB_NAME, SEARCH_PATH, USER_INFO_READER,
         };
 
-        let exec = async move { CATALOG_READER::scope(catalog_reader, exec).await };
-        let exec = async move { DB_NAME::scope(db_name, exec).await };
-        let exec = async move { SEARCH_PATH::scope(search_path, exec).await };
-        let exec = async move { AUTH_CONTEXT::scope(auth_context, exec).await };
-        let exec = async move { TIME_ZONE::scope(time_zone, exec).await };
+        // box is necessary, otherwise the size of `exec` will double each time it is nested.
+        let exec = async move { CATALOG_READER::scope(catalog_reader, exec).await }.boxed();
+        let exec = async move { USER_INFO_READER::scope(user_info_reader, exec).await }.boxed();
+        let exec = async move { DB_NAME::scope(db_name, exec).await }.boxed();
+        let exec = async move { SEARCH_PATH::scope(search_path, exec).await }.boxed();
+        let exec = async move { AUTH_CONTEXT::scope(auth_context, exec).await }.boxed();
+        let exec = async move { TIME_ZONE::scope(time_zone, exec).await }.boxed();
 
         if let Some(timeout) = timeout {
             let exec = async move {