diff --git a/proto/catalog.proto b/proto/catalog.proto index 4ebf9d0af6340..01a7893383232 100644 --- a/proto/catalog.proto +++ b/proto/catalog.proto @@ -101,6 +101,10 @@ message Source { optional uint64 initialized_at_epoch = 15; optional uint64 created_at_epoch = 16; + // Cluster version (tracked by git commit) when initialized/created + optional string initialized_at_cluster_version = 17; + optional string created_at_cluster_version = 18; + // Per-source catalog version, used by schema change. uint64 version = 100; } @@ -149,6 +153,10 @@ message Sink { // Target table id (only applicable for table sink) optional uint32 target_table = 21; + + // Cluster version (tracked by git commit) when initialized/created + optional string initialized_at_cluster_version = 22; + optional string created_at_cluster_version = 23; } message Connection { @@ -194,6 +202,9 @@ message Index { // Use to record the prefix len of the index_item to reconstruct index columns provided by users. uint32 index_columns_len = 13; + // Cluster version (tracked by git commit) when initialized/created + optional string initialized_at_cluster_version = 14; + optional string created_at_cluster_version = 15; } message Function { @@ -298,6 +309,10 @@ message Table { // This field is used to mark the the sink into this table. repeated uint32 incoming_sinks = 34; + // Cluster version (tracked by git commit) when initialized/created + optional string initialized_at_cluster_version = 35; + optional string created_at_cluster_version = 36; + // Per-table catalog version, used by schema change. `None` for internal tables and tests. // Not to be confused with the global catalog version for notification service. TableVersion version = 100; diff --git a/src/common/src/lib.rs b/src/common/src/lib.rs index f95b39d287fdb..d343c94bdd77c 100644 --- a/src/common/src/lib.rs +++ b/src/common/src/lib.rs @@ -105,3 +105,7 @@ macro_rules! git_sha { // `const_option_ext` was broken by https://github.com/rust-lang/rust/pull/110393 // Tracking issue: https://github.com/rust-lang/rust/issues/91930 pub const GIT_SHA: &str = git_sha!("GIT_SHA"); + +pub fn current_cluster_version() -> String { + format!("PostgreSQL 9.5-RisingWave-{} ({})", RW_VERSION, GIT_SHA) +} diff --git a/src/connector/src/sink/catalog/desc.rs b/src/connector/src/sink/catalog/desc.rs index 3fa82e36f976b..84a3d77c728dd 100644 --- a/src/connector/src/sink/catalog/desc.rs +++ b/src/connector/src/sink/catalog/desc.rs @@ -99,6 +99,8 @@ impl SinkDesc { db_name: self.db_name, sink_from_name: self.sink_from_name, target_table: self.target_table, + created_at_cluster_version: None, + initialized_at_cluster_version: None, } } diff --git a/src/connector/src/sink/catalog/mod.rs b/src/connector/src/sink/catalog/mod.rs index 55d450adc5ee2..49cd376703b0b 100644 --- a/src/connector/src/sink/catalog/mod.rs +++ b/src/connector/src/sink/catalog/mod.rs @@ -298,6 +298,9 @@ pub struct SinkCatalog { pub sink_from_name: String, pub target_table: Option, + + pub created_at_cluster_version: Option, + pub initialized_at_cluster_version: Option, } impl SinkCatalog { @@ -336,6 +339,8 @@ impl SinkCatalog { sink_from_name: self.sink_from_name.clone(), stream_job_status: PbStreamJobStatus::Creating.into(), target_table: self.target_table.map(|table_id| table_id.table_id()), + created_at_cluster_version: self.created_at_cluster_version.clone(), + initialized_at_cluster_version: self.initialized_at_cluster_version.clone(), } } @@ -425,6 +430,8 @@ impl From for SinkCatalog { db_name: pb.db_name, sink_from_name: pb.sink_from_name, target_table: pb.target_table.map(TableId::new), + initialized_at_cluster_version: pb.initialized_at_cluster_version, + created_at_cluster_version: pb.created_at_cluster_version, } } } diff --git a/src/frontend/planner_test/tests/testdata/output/subquery.yaml b/src/frontend/planner_test/tests/testdata/output/subquery.yaml index ebdfed20bd32b..076a3ceb8fac1 100644 --- a/src/frontend/planner_test/tests/testdata/output/subquery.yaml +++ b/src/frontend/planner_test/tests/testdata/output/subquery.yaml @@ -247,17 +247,17 @@ │ │ │ │ │ ├─LogicalUnion { all: true } │ │ │ │ │ │ ├─LogicalUnion { all: true } │ │ │ │ │ │ │ ├─LogicalProject { exprs: [rw_tables.id, rw_tables.name, 'table':Varchar, rw_tables.schema_id, rw_tables.owner, rw_tables.definition, rw_tables.acl] } - │ │ │ │ │ │ │ │ └─LogicalSysScan { table: rw_tables, columns: [rw_tables.id, rw_tables.name, rw_tables.schema_id, rw_tables.owner, rw_tables.definition, rw_tables.acl, rw_tables.initialized_at, rw_tables.created_at] } + │ │ │ │ │ │ │ │ └─LogicalSysScan { table: rw_tables, columns: [rw_tables.id, rw_tables.name, rw_tables.schema_id, rw_tables.owner, rw_tables.definition, rw_tables.acl, rw_tables.initialized_at, rw_tables.created_at, rw_tables.initialized_at_cluster_version, rw_tables.created_at_cluster_version] } │ │ │ │ │ │ │ └─LogicalProject { exprs: [rw_system_tables.id, rw_system_tables.name, 'system table':Varchar, rw_system_tables.schema_id, rw_system_tables.owner, rw_system_tables.definition, rw_system_tables.acl] } │ │ │ │ │ │ │ └─LogicalSysScan { table: rw_system_tables, columns: [rw_system_tables.id, rw_system_tables.name, rw_system_tables.schema_id, rw_system_tables.owner, rw_system_tables.definition, rw_system_tables.acl] } │ │ │ │ │ │ └─LogicalProject { exprs: [rw_sources.id, rw_sources.name, 'source':Varchar, rw_sources.schema_id, rw_sources.owner, rw_sources.definition, rw_sources.acl] } - │ │ │ │ │ │ └─LogicalSysScan { table: rw_sources, columns: [rw_sources.id, rw_sources.name, rw_sources.schema_id, rw_sources.owner, rw_sources.connector, rw_sources.columns, rw_sources.format, rw_sources.row_encode, rw_sources.append_only, rw_sources.connection_id, rw_sources.definition, rw_sources.acl, rw_sources.initialized_at, rw_sources.created_at] } + │ │ │ │ │ │ └─LogicalSysScan { table: rw_sources, columns: [rw_sources.id, rw_sources.name, rw_sources.schema_id, rw_sources.owner, rw_sources.connector, rw_sources.columns, rw_sources.format, rw_sources.row_encode, rw_sources.append_only, rw_sources.connection_id, rw_sources.definition, rw_sources.acl, rw_sources.initialized_at, rw_sources.created_at, rw_sources.initialized_at_cluster_version, rw_sources.created_at_cluster_version] } │ │ │ │ │ └─LogicalProject { exprs: [rw_indexes.id, rw_indexes.name, 'index':Varchar, rw_indexes.schema_id, rw_indexes.owner, rw_indexes.definition, rw_indexes.acl] } - │ │ │ │ │ └─LogicalSysScan { table: rw_indexes, columns: [rw_indexes.id, rw_indexes.name, rw_indexes.primary_table_id, rw_indexes.indkey, rw_indexes.schema_id, rw_indexes.owner, rw_indexes.definition, rw_indexes.acl, rw_indexes.initialized_at, rw_indexes.created_at] } + │ │ │ │ │ └─LogicalSysScan { table: rw_indexes, columns: [rw_indexes.id, rw_indexes.name, rw_indexes.primary_table_id, rw_indexes.indkey, rw_indexes.schema_id, rw_indexes.owner, rw_indexes.definition, rw_indexes.acl, rw_indexes.initialized_at, rw_indexes.created_at, rw_indexes.initialized_at_cluster_version, rw_indexes.created_at_cluster_version] } │ │ │ │ └─LogicalProject { exprs: [rw_sinks.id, rw_sinks.name, 'sink':Varchar, rw_sinks.schema_id, rw_sinks.owner, rw_sinks.definition, rw_sinks.acl] } - │ │ │ │ └─LogicalSysScan { table: rw_sinks, columns: [rw_sinks.id, rw_sinks.name, rw_sinks.schema_id, rw_sinks.owner, rw_sinks.connector, rw_sinks.sink_type, rw_sinks.connection_id, rw_sinks.definition, rw_sinks.acl, rw_sinks.initialized_at, rw_sinks.created_at] } + │ │ │ │ └─LogicalSysScan { table: rw_sinks, columns: [rw_sinks.id, rw_sinks.name, rw_sinks.schema_id, rw_sinks.owner, rw_sinks.connector, rw_sinks.sink_type, rw_sinks.connection_id, rw_sinks.definition, rw_sinks.acl, rw_sinks.initialized_at, rw_sinks.created_at, rw_sinks.initialized_at_cluster_version, rw_sinks.created_at_cluster_version] } │ │ │ └─LogicalProject { exprs: [rw_materialized_views.id, rw_materialized_views.name, 'materialized view':Varchar, rw_materialized_views.schema_id, rw_materialized_views.owner, rw_materialized_views.definition, rw_materialized_views.acl] } - │ │ │ └─LogicalSysScan { table: rw_materialized_views, columns: [rw_materialized_views.id, rw_materialized_views.name, rw_materialized_views.schema_id, rw_materialized_views.owner, rw_materialized_views.definition, rw_materialized_views.acl, rw_materialized_views.initialized_at, rw_materialized_views.created_at] } + │ │ │ └─LogicalSysScan { table: rw_materialized_views, columns: [rw_materialized_views.id, rw_materialized_views.name, rw_materialized_views.schema_id, rw_materialized_views.owner, rw_materialized_views.definition, rw_materialized_views.acl, rw_materialized_views.initialized_at, rw_materialized_views.created_at, rw_materialized_views.initialized_at_cluster_version, rw_materialized_views.created_at_cluster_version] } │ │ └─LogicalProject { exprs: [rw_views.id, rw_views.name, 'view':Varchar, rw_views.schema_id, rw_views.owner, rw_views.definition, rw_views.acl] } │ │ └─LogicalSysScan { table: rw_views, columns: [rw_views.id, rw_views.name, rw_views.schema_id, rw_views.owner, rw_views.definition, rw_views.acl] } │ └─LogicalShare { id: 18 } diff --git a/src/frontend/src/binder/expr/function.rs b/src/frontend/src/binder/expr/function.rs index 1e80e896c2636..cf9c6a4070696 100644 --- a/src/frontend/src/binder/expr/function.rs +++ b/src/frontend/src/binder/expr/function.rs @@ -24,7 +24,7 @@ use risingwave_common::catalog::{INFORMATION_SCHEMA_SCHEMA_NAME, PG_CATALOG_SCHE use risingwave_common::error::{ErrorCode, Result, RwError}; use risingwave_common::session_config::USER_NAME_WILD_CARD; use risingwave_common::types::{DataType, ScalarImpl, Timestamptz}; -use risingwave_common::{bail_not_implemented, not_implemented, GIT_SHA, RW_VERSION}; +use risingwave_common::{bail_not_implemented, current_cluster_version, not_implemented}; use risingwave_expr::aggregate::{agg_kinds, AggKind}; use risingwave_expr::window_function::{ Frame, FrameBound, FrameBounds, FrameExclusion, WindowFuncKind, @@ -1189,11 +1189,7 @@ impl Binder { // internal ("rw_vnode", raw_call(ExprType::Vnode)), // TODO: choose which pg version we should return. - ("version", raw_literal(ExprImpl::literal_varchar(format!( - "PostgreSQL 9.5-RisingWave-{} ({})", - RW_VERSION, - GIT_SHA - )))), + ("version", raw_literal(ExprImpl::literal_varchar(current_cluster_version()))), // non-deterministic ("now", now()), ("current_timestamp", now()), diff --git a/src/frontend/src/catalog/index_catalog.rs b/src/frontend/src/catalog/index_catalog.rs index 4e66cea8e73ff..0fcfd836e6a43 100644 --- a/src/frontend/src/catalog/index_catalog.rs +++ b/src/frontend/src/catalog/index_catalog.rs @@ -62,6 +62,10 @@ pub struct IndexCatalog { pub created_at_epoch: Option, pub initialized_at_epoch: Option, + + pub created_at_cluster_version: Option, + + pub initialized_at_cluster_version: Option, } impl IndexCatalog { @@ -116,6 +120,8 @@ impl IndexCatalog { index_columns_len: index_prost.index_columns_len, created_at_epoch: index_prost.created_at_epoch.map(Epoch::from), initialized_at_epoch: index_prost.initialized_at_epoch.map(Epoch::from), + created_at_cluster_version: index_prost.created_at_cluster_version.clone(), + initialized_at_cluster_version: index_prost.initialized_at_cluster_version.clone(), } } @@ -177,6 +183,8 @@ impl IndexCatalog { initialized_at_epoch: self.initialized_at_epoch.map(|e| e.0), created_at_epoch: self.created_at_epoch.map(|e| e.0), stream_job_status: PbStreamJobStatus::Creating.into(), + initialized_at_cluster_version: self.initialized_at_cluster_version.clone(), + created_at_cluster_version: self.created_at_cluster_version.clone(), } } diff --git a/src/frontend/src/catalog/source_catalog.rs b/src/frontend/src/catalog/source_catalog.rs index c5fd8232cf0c1..54c499c65e833 100644 --- a/src/frontend/src/catalog/source_catalog.rs +++ b/src/frontend/src/catalog/source_catalog.rs @@ -43,6 +43,8 @@ pub struct SourceCatalog { pub created_at_epoch: Option, pub initialized_at_epoch: Option, pub version: SourceVersionId, + pub created_at_cluster_version: Option, + pub initialized_at_cluster_version: Option, } impl SourceCatalog { @@ -72,6 +74,8 @@ impl SourceCatalog { .associated_table_id .map(|id| OptionalAssociatedTableId::AssociatedTableId(id.table_id)), version: self.version, + created_at_cluster_version: self.created_at_cluster_version.clone(), + initialized_at_cluster_version: self.initialized_at_cluster_version.clone(), } } @@ -127,6 +131,8 @@ impl From<&PbSource> for SourceCatalog { created_at_epoch: prost.created_at_epoch.map(Epoch::from), initialized_at_epoch: prost.initialized_at_epoch.map(Epoch::from), version, + created_at_cluster_version: prost.created_at_cluster_version.clone(), + initialized_at_cluster_version: prost.initialized_at_cluster_version.clone(), } } } diff --git a/src/frontend/src/catalog/system_catalog/rw_catalog/rw_indexes.rs b/src/frontend/src/catalog/system_catalog/rw_catalog/rw_indexes.rs index f8991b7a229c8..1b948a2347d83 100644 --- a/src/frontend/src/catalog/system_catalog/rw_catalog/rw_indexes.rs +++ b/src/frontend/src/catalog/system_catalog/rw_catalog/rw_indexes.rs @@ -35,6 +35,8 @@ pub static RW_INDEXES_COLUMNS: LazyLock>> = Lazy (DataType::Varchar, "acl"), (DataType::Timestamptz, "initialized_at"), (DataType::Timestamptz, "created_at"), + (DataType::Varchar, "initialized_at_cluster_version"), + (DataType::Varchar, "created_at_cluster_version"), ] }); @@ -77,6 +79,14 @@ impl SysCatalogReaderImpl { Some(ScalarImpl::Utf8("".into())), index.initialized_at_epoch.map(|e| e.as_scalar()), index.created_at_epoch.map(|e| e.as_scalar()), + index + .initialized_at_cluster_version + .clone() + .map(|v| ScalarImpl::Utf8(v.into())), + index + .created_at_cluster_version + .clone() + .map(|v| ScalarImpl::Utf8(v.into())), ]) }) }) diff --git a/src/frontend/src/catalog/system_catalog/rw_catalog/rw_internal_tables.rs b/src/frontend/src/catalog/system_catalog/rw_catalog/rw_internal_tables.rs index 523becd11b701..c68f54cb8a250 100644 --- a/src/frontend/src/catalog/system_catalog/rw_catalog/rw_internal_tables.rs +++ b/src/frontend/src/catalog/system_catalog/rw_catalog/rw_internal_tables.rs @@ -33,6 +33,8 @@ pub const RW_INTERNAL_TABLES: BuiltinTable = BuiltinTable { (DataType::Varchar, "acl"), (DataType::Timestamptz, "initialized_at"), (DataType::Timestamptz, "created_at"), + (DataType::Varchar, "initialized_at_cluster_version"), + (DataType::Varchar, "created_at_cluster_version"), ], pk: &[0], }; @@ -65,6 +67,14 @@ impl SysCatalogReaderImpl { )), table.initialized_at_epoch.map(|e| e.as_scalar()), table.created_at_epoch.map(|e| e.as_scalar()), + table + .initialized_at_cluster_version + .clone() + .map(|v| ScalarImpl::Utf8(v.into())), + table + .created_at_cluster_version + .clone() + .map(|v| ScalarImpl::Utf8(v.into())), ]) }) }) diff --git a/src/frontend/src/catalog/system_catalog/rw_catalog/rw_materialized_views.rs b/src/frontend/src/catalog/system_catalog/rw_catalog/rw_materialized_views.rs index ec4a211da89b3..8c1117a4fe6de 100644 --- a/src/frontend/src/catalog/system_catalog/rw_catalog/rw_materialized_views.rs +++ b/src/frontend/src/catalog/system_catalog/rw_catalog/rw_materialized_views.rs @@ -33,6 +33,8 @@ pub const RW_MATERIALIZED_VIEWS: BuiltinTable = BuiltinTable { (DataType::Varchar, "acl"), (DataType::Timestamptz, "initialized_at"), (DataType::Timestamptz, "created_at"), + (DataType::Varchar, "initialized_at_cluster_version"), + (DataType::Varchar, "created_at_cluster_version"), ], pk: &[0], }; @@ -65,6 +67,14 @@ impl SysCatalogReaderImpl { )), table.initialized_at_epoch.map(|e| e.as_scalar()), table.created_at_epoch.map(|e| e.as_scalar()), + table + .initialized_at_cluster_version + .clone() + .map(|v| ScalarImpl::Utf8(v.into())), + table + .created_at_cluster_version + .clone() + .map(|v| ScalarImpl::Utf8(v.into())), ]) }) }) diff --git a/src/frontend/src/catalog/system_catalog/rw_catalog/rw_relation_info.rs b/src/frontend/src/catalog/system_catalog/rw_catalog/rw_relation_info.rs index 4275956635816..6baa90f212a48 100644 --- a/src/frontend/src/catalog/system_catalog/rw_catalog/rw_relation_info.rs +++ b/src/frontend/src/catalog/system_catalog/rw_catalog/rw_relation_info.rs @@ -38,6 +38,8 @@ pub const RW_RELATION_INFO: BuiltinTable = BuiltinTable { (DataType::Varchar, "fragments"), // fragments is json encoded fragment infos. (DataType::Timestamptz, "initialized_at"), (DataType::Timestamptz, "created_at"), + (DataType::Varchar, "initialized_at_cluster_version"), + (DataType::Varchar, "created_at_cluster_version"), ], pk: &[0, 1], }; @@ -93,6 +95,12 @@ impl SysCatalogReaderImpl { )), t.initialized_at_epoch.map(|e| e.as_scalar()), t.created_at_epoch.map(|e| e.as_scalar()), + t.initialized_at_cluster_version + .clone() + .map(|v| ScalarImpl::Utf8(v.into())), + t.created_at_cluster_version + .clone() + .map(|v| ScalarImpl::Utf8(v.into())), ])); } }); @@ -114,6 +122,12 @@ impl SysCatalogReaderImpl { )), t.initialized_at_epoch.map(|e| e.as_scalar()), t.created_at_epoch.map(|e| e.as_scalar()), + t.initialized_at_cluster_version + .clone() + .map(|v| ScalarImpl::Utf8(v.into())), + t.created_at_cluster_version + .clone() + .map(|v| ScalarImpl::Utf8(v.into())), ])); } }); @@ -135,6 +149,12 @@ impl SysCatalogReaderImpl { )), t.initialized_at_epoch.map(|e| e.as_scalar()), t.created_at_epoch.map(|e| e.as_scalar()), + t.initialized_at_cluster_version + .clone() + .map(|v| ScalarImpl::Utf8(v.into())), + t.created_at_cluster_version + .clone() + .map(|v| ScalarImpl::Utf8(v.into())), ])); } }); @@ -156,6 +176,12 @@ impl SysCatalogReaderImpl { )), t.initialized_at_epoch.map(|e| e.as_scalar()), t.created_at_epoch.map(|e| e.as_scalar()), + t.initialized_at_cluster_version + .clone() + .map(|v| ScalarImpl::Utf8(v.into())), + t.created_at_cluster_version + .clone() + .map(|v| ScalarImpl::Utf8(v.into())), ])); } }); @@ -173,6 +199,12 @@ impl SysCatalogReaderImpl { None, t.initialized_at_epoch.map(|e| e.as_scalar()), t.created_at_epoch.map(|e| e.as_scalar()), + t.initialized_at_cluster_version + .clone() + .map(|v| ScalarImpl::Utf8(v.into())), + t.created_at_cluster_version + .clone() + .map(|v| ScalarImpl::Utf8(v.into())), ])); }); } diff --git a/src/frontend/src/catalog/system_catalog/rw_catalog/rw_sinks.rs b/src/frontend/src/catalog/system_catalog/rw_catalog/rw_sinks.rs index 38ebff453ba34..37a65c742c97c 100644 --- a/src/frontend/src/catalog/system_catalog/rw_catalog/rw_sinks.rs +++ b/src/frontend/src/catalog/system_catalog/rw_catalog/rw_sinks.rs @@ -37,6 +37,8 @@ pub const RW_SINKS: BuiltinTable = BuiltinTable { (DataType::Varchar, "acl"), (DataType::Timestamptz, "initialized_at"), (DataType::Timestamptz, "created_at"), + (DataType::Varchar, "initialized_at_cluster_version"), + (DataType::Varchar, "created_at_cluster_version"), ], pk: &[0], }; @@ -82,6 +84,12 @@ impl SysCatalogReaderImpl { ), sink.initialized_at_epoch.map(|e| e.as_scalar()), sink.created_at_epoch.map(|e| e.as_scalar()), + sink.initialized_at_cluster_version + .clone() + .map(|v| ScalarImpl::Utf8(v.into())), + sink.created_at_cluster_version + .clone() + .map(|v| ScalarImpl::Utf8(v.into())), ]) }) }) diff --git a/src/frontend/src/catalog/system_catalog/rw_catalog/rw_sources.rs b/src/frontend/src/catalog/system_catalog/rw_catalog/rw_sources.rs index 7378bfa9fdd72..a141940fee1ae 100644 --- a/src/frontend/src/catalog/system_catalog/rw_catalog/rw_sources.rs +++ b/src/frontend/src/catalog/system_catalog/rw_catalog/rw_sources.rs @@ -44,6 +44,8 @@ pub static RW_SOURCES_COLUMNS: LazyLock>> = Lazy (DataType::Varchar, "acl"), (DataType::Timestamptz, "initialized_at"), (DataType::Timestamptz, "created_at"), + (DataType::Varchar, "initialized_at_cluster_version"), + (DataType::Varchar, "created_at_cluster_version"), ] }); @@ -111,6 +113,14 @@ impl SysCatalogReaderImpl { ), source.initialized_at_epoch.map(|e| e.as_scalar()), source.created_at_epoch.map(|e| e.as_scalar()), + source + .initialized_at_cluster_version + .clone() + .map(|v| ScalarImpl::Utf8(v.into())), + source + .created_at_cluster_version + .clone() + .map(|v| ScalarImpl::Utf8(v.into())), ]) }) }) diff --git a/src/frontend/src/catalog/system_catalog/rw_catalog/rw_tables.rs b/src/frontend/src/catalog/system_catalog/rw_catalog/rw_tables.rs index 6734f456f27a7..6a75f1d50fcba 100644 --- a/src/frontend/src/catalog/system_catalog/rw_catalog/rw_tables.rs +++ b/src/frontend/src/catalog/system_catalog/rw_catalog/rw_tables.rs @@ -33,6 +33,8 @@ pub const RW_TABLES: BuiltinTable = BuiltinTable { (DataType::Varchar, "acl"), (DataType::Timestamptz, "initialized_at"), (DataType::Timestamptz, "created_at"), + (DataType::Varchar, "initialized_at_cluster_version"), + (DataType::Varchar, "created_at_cluster_version"), ], pk: &[0], }; @@ -65,6 +67,14 @@ impl SysCatalogReaderImpl { )), table.initialized_at_epoch.map(|e| e.as_scalar()), table.created_at_epoch.map(|e| e.as_scalar()), + table + .initialized_at_cluster_version + .clone() + .map(|v| ScalarImpl::Utf8(v.into())), + table + .created_at_cluster_version + .clone() + .map(|v| ScalarImpl::Utf8(v.into())), ]) }) }) diff --git a/src/frontend/src/catalog/table_catalog.rs b/src/frontend/src/catalog/table_catalog.rs index 9622ea3cfcedd..84bcfa50badb7 100644 --- a/src/frontend/src/catalog/table_catalog.rs +++ b/src/frontend/src/catalog/table_catalog.rs @@ -157,6 +157,10 @@ pub struct TableCatalog { /// Incoming sinks, used for sink into table pub incoming_sinks: Vec, + + pub created_at_cluster_version: Option, + + pub initialized_at_cluster_version: Option, } // How the stream job was created will determine @@ -445,6 +449,8 @@ impl TableCatalog { create_type: self.create_type.to_prost().into(), description: self.description.clone(), incoming_sinks: self.incoming_sinks.clone(), + created_at_cluster_version: self.created_at_cluster_version.clone(), + initialized_at_cluster_version: self.initialized_at_cluster_version.clone(), } } @@ -560,6 +566,8 @@ impl From for TableCatalog { create_type: CreateType::from_prost(create_type), description: tb.description, incoming_sinks: tb.incoming_sinks.clone(), + created_at_cluster_version: tb.created_at_cluster_version.clone(), + initialized_at_cluster_version: tb.initialized_at_cluster_version.clone(), } } } @@ -656,6 +664,8 @@ mod tests { create_type: PbCreateType::Foreground.into(), description: Some("description".to_string()), incoming_sinks: vec![], + created_at_cluster_version: None, + initialized_at_cluster_version: None, } .into(); @@ -716,6 +726,8 @@ mod tests { create_type: CreateType::Foreground, description: Some("description".to_string()), incoming_sinks: vec![], + created_at_cluster_version: None, + initialized_at_cluster_version: None, } ); assert_eq!(table, TableCatalog::from(table.to_prost(0, 0))); diff --git a/src/frontend/src/handler/create_index.rs b/src/frontend/src/handler/create_index.rs index f2fa8bca2de4e..e1082350ebbf9 100644 --- a/src/frontend/src/handler/create_index.rs +++ b/src/frontend/src/handler/create_index.rs @@ -240,6 +240,8 @@ pub(crate) fn gen_create_index_plan( initialized_at_epoch: None, created_at_epoch: None, stream_job_status: PbStreamJobStatus::Creating.into(), + initialized_at_cluster_version: None, + created_at_cluster_version: None, }; let plan: PlanRef = materialize.into(); diff --git a/src/frontend/src/handler/create_source.rs b/src/frontend/src/handler/create_source.rs index 21f442af8f677..106f44f202196 100644 --- a/src/frontend/src/handler/create_source.rs +++ b/src/frontend/src/handler/create_source.rs @@ -1391,6 +1391,8 @@ pub async fn handle_create_source( created_at_epoch: None, optional_associated_table_id: None, version: INITIAL_SOURCE_VERSION_ID, + initialized_at_cluster_version: None, + created_at_cluster_version: None, }; let catalog_writer = session.catalog_writer()?; diff --git a/src/frontend/src/handler/create_table.rs b/src/frontend/src/handler/create_table.rs index ee93fc565958d..413f86cfc079e 100644 --- a/src/frontend/src/handler/create_table.rs +++ b/src/frontend/src/handler/create_table.rs @@ -684,6 +684,8 @@ fn gen_table_plan_inner( TableId::placeholder().table_id, )), version: INITIAL_SOURCE_VERSION_ID, + initialized_at_cluster_version: None, + created_at_cluster_version: None, }); let source_catalog = source.as_ref().map(|source| Rc::new((source).into())); diff --git a/src/frontend/src/optimizer/plan_node/stream_materialize.rs b/src/frontend/src/optimizer/plan_node/stream_materialize.rs index efa059e50efd9..326f70cab0412 100644 --- a/src/frontend/src/optimizer/plan_node/stream_materialize.rs +++ b/src/frontend/src/optimizer/plan_node/stream_materialize.rs @@ -254,6 +254,8 @@ impl StreamMaterialize { create_type: CreateType::Foreground, // Will be updated in the handler itself. description: None, incoming_sinks: vec![], + initialized_at_cluster_version: None, + created_at_cluster_version: None, }) } diff --git a/src/frontend/src/optimizer/plan_node/utils.rs b/src/frontend/src/optimizer/plan_node/utils.rs index 1ae1a21a04442..14d2fc25667e5 100644 --- a/src/frontend/src/optimizer/plan_node/utils.rs +++ b/src/frontend/src/optimizer/plan_node/utils.rs @@ -183,6 +183,8 @@ impl TableCatalogBuilder { create_type: CreateType::Foreground, description: None, incoming_sinks: vec![], + initialized_at_cluster_version: None, + created_at_cluster_version: None, } } diff --git a/src/frontend/src/scheduler/distributed/query.rs b/src/frontend/src/scheduler/distributed/query.rs index 25ba6ebcec7c3..2437e4d5d60b1 100644 --- a/src/frontend/src/scheduler/distributed/query.rs +++ b/src/frontend/src/scheduler/distributed/query.rs @@ -591,6 +591,8 @@ pub(crate) mod tests { create_type: CreateType::Foreground, description: None, incoming_sinks: vec![], + initialized_at_cluster_version: None, + created_at_cluster_version: None, }; let batch_plan_node: PlanRef = LogicalScan::create( "".to_string(), diff --git a/src/meta/model_v2/migration/src/m20230908_072257_init.rs b/src/meta/model_v2/migration/src/m20230908_072257_init.rs index fbee1aba46b94..5b3d55ab83bfb 100644 --- a/src/meta/model_v2/migration/src/m20230908_072257_init.rs +++ b/src/meta/model_v2/migration/src/m20230908_072257_init.rs @@ -165,6 +165,8 @@ impl MigrationTrait for Migration { .default(Expr::current_timestamp()) .not_null(), ) + .col(ColumnDef::new(Object::InitializedAtClusterVersion).string()) + .col(ColumnDef::new(Object::CreatedAtClusterVersion).string()) .foreign_key( &mut ForeignKey::create() .name("FK_object_owner_id") @@ -1110,6 +1112,8 @@ enum Object { DatabaseId, InitializedAt, CreatedAt, + InitializedAtClusterVersion, + CreatedAtClusterVersion, } #[derive(DeriveIden)] diff --git a/src/meta/model_v2/src/object.rs b/src/meta/model_v2/src/object.rs index 117992ff20511..5e0cc1d12cf85 100644 --- a/src/meta/model_v2/src/object.rs +++ b/src/meta/model_v2/src/object.rs @@ -66,6 +66,8 @@ pub struct Model { pub database_id: Option, pub initialized_at: DateTime, pub created_at: DateTime, + pub initialized_at_cluster_version: Option, + pub created_at_cluster_version: Option, } #[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)] diff --git a/src/meta/src/controller/catalog.rs b/src/meta/src/controller/catalog.rs index b6f9ccac9df44..dc66c4b904e48 100644 --- a/src/meta/src/controller/catalog.rs +++ b/src/meta/src/controller/catalog.rs @@ -144,6 +144,8 @@ impl CatalogController { database_id: Set(database_id), initialized_at: Default::default(), created_at: Default::default(), + initialized_at_cluster_version: Default::default(), + created_at_cluster_version: Default::default(), }; Ok(active_db.insert(txn).await?) } diff --git a/src/meta/src/controller/mod.rs b/src/meta/src/controller/mod.rs index 137ed6af69c76..62c52b97d6d79 100644 --- a/src/meta/src/controller/mod.rs +++ b/src/meta/src/controller/mod.rs @@ -137,6 +137,8 @@ impl From> for PbTable { description: value.0.description, // TODO: fix it for model v2. incoming_sinks: vec![], + initialized_at_cluster_version: value.1.initialized_at_cluster_version, + created_at_cluster_version: value.1.created_at_cluster_version, } } } @@ -169,6 +171,8 @@ impl From> for PbSource { .0 .optional_associated_table_id .map(|id| PbOptionalAssociatedTableId::AssociatedTableId(id as _)), + initialized_at_cluster_version: value.1.initialized_at_cluster_version, + created_at_cluster_version: value.1.created_at_cluster_version, } } } @@ -202,6 +206,8 @@ impl From> for PbSink { format_desc: value.0.sink_format_desc.map(|desc| desc.0), // todo: fix this for model v2 target_table: None, + initialized_at_cluster_version: value.1.initialized_at_cluster_version, + created_at_cluster_version: value.1.created_at_cluster_version, } } } @@ -225,6 +231,8 @@ impl From> for PbIndex { Epoch::from_unix_millis(value.1.created_at.timestamp_millis() as _).0, ), stream_job_status: PbStreamJobStatus::Created as _, // todo: deprecate it. + initialized_at_cluster_version: value.1.initialized_at_cluster_version, + created_at_cluster_version: value.1.created_at_cluster_version, } } } diff --git a/src/meta/src/manager/streaming_job.rs b/src/meta/src/manager/streaming_job.rs index 5f989df4bb5d5..64cb8f7a13fc7 100644 --- a/src/meta/src/manager/streaming_job.rs +++ b/src/meta/src/manager/streaming_job.rs @@ -15,6 +15,7 @@ use std::collections::HashMap; use risingwave_common::catalog::TableVersionId; +use risingwave_common::current_cluster_version; use risingwave_common::util::epoch::Epoch; use risingwave_pb::catalog::{CreateType, Index, PbSource, Sink, Table}; use risingwave_pb::ddl_service::TableJobType; @@ -48,42 +49,60 @@ impl Default for DdlType { impl StreamingJob { pub fn mark_created(&mut self) { let created_at_epoch = Some(Epoch::now().0); + let created_at_cluster_version = Some(current_cluster_version()); match self { - StreamingJob::MaterializedView(table) => table.created_at_epoch = created_at_epoch, + StreamingJob::MaterializedView(table) => { + table.created_at_epoch = created_at_epoch; + table.created_at_cluster_version = created_at_cluster_version; + } StreamingJob::Sink(table, _) => table.created_at_epoch = created_at_epoch, StreamingJob::Table(source, table, ..) => { table.created_at_epoch = created_at_epoch; + table.created_at_cluster_version = created_at_cluster_version.clone(); if let Some(source) = source { source.created_at_epoch = created_at_epoch; + source.created_at_cluster_version = created_at_cluster_version; } } StreamingJob::Index(index, _) => { index.created_at_epoch = created_at_epoch; + index.created_at_cluster_version = created_at_cluster_version; } StreamingJob::Source(source) => { source.created_at_epoch = created_at_epoch; + source.created_at_cluster_version = created_at_cluster_version; } } } pub fn mark_initialized(&mut self) { let initialized_at_epoch = Some(Epoch::now().0); + let initialized_at_cluster_version = Some(current_cluster_version()); match self { StreamingJob::MaterializedView(table) => { - table.initialized_at_epoch = initialized_at_epoch + table.initialized_at_epoch = initialized_at_epoch; + table.initialized_at_cluster_version = initialized_at_cluster_version; + } + StreamingJob::Sink(table, _) => { + table.initialized_at_epoch = initialized_at_epoch; + table.initialized_at_cluster_version = initialized_at_cluster_version; } - StreamingJob::Sink(table, _) => table.initialized_at_epoch = initialized_at_epoch, StreamingJob::Table(source, table, ..) => { table.initialized_at_epoch = initialized_at_epoch; + table.initialized_at_cluster_version = initialized_at_cluster_version.clone(); + if let Some(source) = source { source.initialized_at_epoch = initialized_at_epoch; + source.initialized_at_cluster_version = initialized_at_cluster_version; } } StreamingJob::Index(index, _) => { index.initialized_at_epoch = initialized_at_epoch; + index.initialized_at_cluster_version = initialized_at_cluster_version; } StreamingJob::Source(source) => { source.initialized_at_epoch = initialized_at_epoch; + source.initialized_at_cluster_version = initialized_at_cluster_version; } } } diff --git a/src/meta/src/rpc/ddl_controller.rs b/src/meta/src/rpc/ddl_controller.rs index 158488989ec6e..b9366307e7247 100644 --- a/src/meta/src/rpc/ddl_controller.rs +++ b/src/meta/src/rpc/ddl_controller.rs @@ -20,12 +20,12 @@ use std::time::Duration; use anyhow::anyhow; use itertools::Itertools; -use risingwave_common::bail; use risingwave_common::config::DefaultParallelism; use risingwave_common::hash::{ParallelUnitMapping, VirtualNode}; use risingwave_common::util::column_index_mapping::ColIndexMapping; use risingwave_common::util::epoch::Epoch; use risingwave_common::util::stream_graph_visitor::{visit_stream_node, visit_stream_node_cont}; +use risingwave_common::{bail, current_cluster_version}; use risingwave_connector::dispatch_source_prop; use risingwave_connector::source::{ ConnectorProperties, SourceEnumeratorContext, SourceProperties, SplitEnumerator, @@ -372,6 +372,7 @@ impl DdlController { async fn create_source(&self, mut source: Source) -> MetaResult { // set the initialized_at_epoch to the current epoch. source.initialized_at_epoch = Some(Epoch::now().0); + source.initialized_at_cluster_version = Some(current_cluster_version()); self.catalog_manager .start_create_source_procedure(&source) diff --git a/src/storage/src/filter_key_extractor.rs b/src/storage/src/filter_key_extractor.rs index 528fa7326b54a..23019ae309e6f 100644 --- a/src/storage/src/filter_key_extractor.rs +++ b/src/storage/src/filter_key_extractor.rs @@ -553,6 +553,8 @@ mod tests { create_type: PbCreateType::Foreground.into(), description: None, incoming_sinks: vec![], + initialized_at_cluster_version: None, + created_at_cluster_version: None, } } diff --git a/src/tests/compaction_test/src/delete_range_runner.rs b/src/tests/compaction_test/src/delete_range_runner.rs index 2aab4679bb634..9aefb2a0d2293 100644 --- a/src/tests/compaction_test/src/delete_range_runner.rs +++ b/src/tests/compaction_test/src/delete_range_runner.rs @@ -159,6 +159,8 @@ async fn compaction_test( create_type: PbCreateType::Foreground.into(), description: None, incoming_sinks: vec![], + initialized_at_cluster_version: None, + created_at_cluster_version: None, }; let mut delete_range_table = delete_key_table.clone(); delete_range_table.id = 2;