Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: show cluster versions for database objects #14167

Merged
merged 5 commits into from
Dec 29, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions proto/catalog.proto
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,10 @@ message Source {
optional uint64 initialized_at_epoch = 15;
optional uint64 created_at_epoch = 16;

// Cluster version (tracked by git commit) when initialized/created
optional string initialized_at_cluster_version = 17;
optional string created_at_cluster_version = 18;

// Per-source catalog version, used by schema change.
uint64 version = 100;
}
Expand Down Expand Up @@ -149,6 +153,10 @@ message Sink {

// Target table id (only applicable for table sink)
optional uint32 target_table = 21;

// Cluster version (tracked by git commit) when initialized/created
optional string initialized_at_cluster_version = 22;
optional string created_at_cluster_version = 23;
}

message Connection {
Expand Down Expand Up @@ -194,6 +202,9 @@ message Index {

// Use to record the prefix len of the index_item to reconstruct index columns provided by users.
uint32 index_columns_len = 13;
// Cluster version (tracked by git commit) when initialized/created
optional string initialized_at_cluster_version = 14;
optional string created_at_cluster_version = 15;
}

message Function {
Expand Down Expand Up @@ -298,6 +309,10 @@ message Table {
// This field is used to mark the the sink into this table.
repeated uint32 incoming_sinks = 34;

// Cluster version (tracked by git commit) when initialized/created
optional string initialized_at_cluster_version = 35;
optional string created_at_cluster_version = 36;

// Per-table catalog version, used by schema change. `None` for internal tables and tests.
// Not to be confused with the global catalog version for notification service.
TableVersion version = 100;
Expand Down
4 changes: 4 additions & 0 deletions src/common/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,3 +105,7 @@ macro_rules! git_sha {
// `const_option_ext` was broken by https://github.com/rust-lang/rust/pull/110393
// Tracking issue: https://github.com/rust-lang/rust/issues/91930
pub const GIT_SHA: &str = git_sha!("GIT_SHA");

pub fn current_cluster_version() -> String {
format!("PostgreSQL 9.5-RisingWave-{} ({})", RW_VERSION, GIT_SHA)
}
2 changes: 2 additions & 0 deletions src/connector/src/sink/catalog/desc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,8 @@ impl SinkDesc {
db_name: self.db_name,
sink_from_name: self.sink_from_name,
target_table: self.target_table,
created_at_cluster_version: None,
initialized_at_cluster_version: None,
}
}

Expand Down
7 changes: 7 additions & 0 deletions src/connector/src/sink/catalog/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,9 @@ pub struct SinkCatalog {
pub sink_from_name: String,

pub target_table: Option<TableId>,

pub created_at_cluster_version: Option<String>,
pub initialized_at_cluster_version: Option<String>,
}

impl SinkCatalog {
Expand Down Expand Up @@ -336,6 +339,8 @@ impl SinkCatalog {
sink_from_name: self.sink_from_name.clone(),
stream_job_status: PbStreamJobStatus::Creating.into(),
target_table: self.target_table.map(|table_id| table_id.table_id()),
created_at_cluster_version: self.created_at_cluster_version.clone(),
initialized_at_cluster_version: self.initialized_at_cluster_version.clone(),
}
}

Expand Down Expand Up @@ -425,6 +430,8 @@ impl From<PbSink> for SinkCatalog {
db_name: pb.db_name,
sink_from_name: pb.sink_from_name,
target_table: pb.target_table.map(TableId::new),
initialized_at_cluster_version: pb.initialized_at_cluster_version,
created_at_cluster_version: pb.created_at_cluster_version,
}
}
}
Expand Down
10 changes: 5 additions & 5 deletions src/frontend/planner_test/tests/testdata/output/subquery.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -247,17 +247,17 @@
│ │ │ │ │ ├─LogicalUnion { all: true }
│ │ │ │ │ │ ├─LogicalUnion { all: true }
│ │ │ │ │ │ │ ├─LogicalProject { exprs: [rw_tables.id, rw_tables.name, 'table':Varchar, rw_tables.schema_id, rw_tables.owner, rw_tables.definition, rw_tables.acl] }
│ │ │ │ │ │ │ │ └─LogicalSysScan { table: rw_tables, columns: [rw_tables.id, rw_tables.name, rw_tables.schema_id, rw_tables.owner, rw_tables.definition, rw_tables.acl, rw_tables.initialized_at, rw_tables.created_at] }
│ │ │ │ │ │ │ │ └─LogicalSysScan { table: rw_tables, columns: [rw_tables.id, rw_tables.name, rw_tables.schema_id, rw_tables.owner, rw_tables.definition, rw_tables.acl, rw_tables.initialized_at, rw_tables.created_at, rw_tables.initialized_at_cluster_version, rw_tables.created_at_cluster_version] }
│ │ │ │ │ │ │ └─LogicalProject { exprs: [rw_system_tables.id, rw_system_tables.name, 'system table':Varchar, rw_system_tables.schema_id, rw_system_tables.owner, rw_system_tables.definition, rw_system_tables.acl] }
│ │ │ │ │ │ │ └─LogicalSysScan { table: rw_system_tables, columns: [rw_system_tables.id, rw_system_tables.name, rw_system_tables.schema_id, rw_system_tables.owner, rw_system_tables.definition, rw_system_tables.acl] }
│ │ │ │ │ │ └─LogicalProject { exprs: [rw_sources.id, rw_sources.name, 'source':Varchar, rw_sources.schema_id, rw_sources.owner, rw_sources.definition, rw_sources.acl] }
│ │ │ │ │ │ └─LogicalSysScan { table: rw_sources, columns: [rw_sources.id, rw_sources.name, rw_sources.schema_id, rw_sources.owner, rw_sources.connector, rw_sources.columns, rw_sources.format, rw_sources.row_encode, rw_sources.append_only, rw_sources.connection_id, rw_sources.definition, rw_sources.acl, rw_sources.initialized_at, rw_sources.created_at] }
│ │ │ │ │ │ └─LogicalSysScan { table: rw_sources, columns: [rw_sources.id, rw_sources.name, rw_sources.schema_id, rw_sources.owner, rw_sources.connector, rw_sources.columns, rw_sources.format, rw_sources.row_encode, rw_sources.append_only, rw_sources.connection_id, rw_sources.definition, rw_sources.acl, rw_sources.initialized_at, rw_sources.created_at, rw_sources.initialized_at_cluster_version, rw_sources.created_at_cluster_version] }
│ │ │ │ │ └─LogicalProject { exprs: [rw_indexes.id, rw_indexes.name, 'index':Varchar, rw_indexes.schema_id, rw_indexes.owner, rw_indexes.definition, rw_indexes.acl] }
│ │ │ │ │ └─LogicalSysScan { table: rw_indexes, columns: [rw_indexes.id, rw_indexes.name, rw_indexes.primary_table_id, rw_indexes.indkey, rw_indexes.schema_id, rw_indexes.owner, rw_indexes.definition, rw_indexes.acl, rw_indexes.initialized_at, rw_indexes.created_at] }
│ │ │ │ │ └─LogicalSysScan { table: rw_indexes, columns: [rw_indexes.id, rw_indexes.name, rw_indexes.primary_table_id, rw_indexes.indkey, rw_indexes.schema_id, rw_indexes.owner, rw_indexes.definition, rw_indexes.acl, rw_indexes.initialized_at, rw_indexes.created_at, rw_indexes.initialized_at_cluster_version, rw_indexes.created_at_cluster_version] }
│ │ │ │ └─LogicalProject { exprs: [rw_sinks.id, rw_sinks.name, 'sink':Varchar, rw_sinks.schema_id, rw_sinks.owner, rw_sinks.definition, rw_sinks.acl] }
│ │ │ │ └─LogicalSysScan { table: rw_sinks, columns: [rw_sinks.id, rw_sinks.name, rw_sinks.schema_id, rw_sinks.owner, rw_sinks.connector, rw_sinks.sink_type, rw_sinks.connection_id, rw_sinks.definition, rw_sinks.acl, rw_sinks.initialized_at, rw_sinks.created_at] }
│ │ │ │ └─LogicalSysScan { table: rw_sinks, columns: [rw_sinks.id, rw_sinks.name, rw_sinks.schema_id, rw_sinks.owner, rw_sinks.connector, rw_sinks.sink_type, rw_sinks.connection_id, rw_sinks.definition, rw_sinks.acl, rw_sinks.initialized_at, rw_sinks.created_at, rw_sinks.initialized_at_cluster_version, rw_sinks.created_at_cluster_version] }
│ │ │ └─LogicalProject { exprs: [rw_materialized_views.id, rw_materialized_views.name, 'materialized view':Varchar, rw_materialized_views.schema_id, rw_materialized_views.owner, rw_materialized_views.definition, rw_materialized_views.acl] }
│ │ │ └─LogicalSysScan { table: rw_materialized_views, columns: [rw_materialized_views.id, rw_materialized_views.name, rw_materialized_views.schema_id, rw_materialized_views.owner, rw_materialized_views.definition, rw_materialized_views.acl, rw_materialized_views.initialized_at, rw_materialized_views.created_at] }
│ │ │ └─LogicalSysScan { table: rw_materialized_views, columns: [rw_materialized_views.id, rw_materialized_views.name, rw_materialized_views.schema_id, rw_materialized_views.owner, rw_materialized_views.definition, rw_materialized_views.acl, rw_materialized_views.initialized_at, rw_materialized_views.created_at, rw_materialized_views.initialized_at_cluster_version, rw_materialized_views.created_at_cluster_version] }
│ │ └─LogicalProject { exprs: [rw_views.id, rw_views.name, 'view':Varchar, rw_views.schema_id, rw_views.owner, rw_views.definition, rw_views.acl] }
│ │ └─LogicalSysScan { table: rw_views, columns: [rw_views.id, rw_views.name, rw_views.schema_id, rw_views.owner, rw_views.definition, rw_views.acl] }
│ └─LogicalShare { id: 18 }
Expand Down
8 changes: 2 additions & 6 deletions src/frontend/src/binder/expr/function.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use risingwave_common::catalog::{INFORMATION_SCHEMA_SCHEMA_NAME, PG_CATALOG_SCHE
use risingwave_common::error::{ErrorCode, Result, RwError};
use risingwave_common::session_config::USER_NAME_WILD_CARD;
use risingwave_common::types::{DataType, ScalarImpl, Timestamptz};
use risingwave_common::{bail_not_implemented, not_implemented, GIT_SHA, RW_VERSION};
use risingwave_common::{bail_not_implemented, current_cluster_version, not_implemented};
use risingwave_expr::aggregate::{agg_kinds, AggKind};
use risingwave_expr::window_function::{
Frame, FrameBound, FrameBounds, FrameExclusion, WindowFuncKind,
Expand Down Expand Up @@ -1189,11 +1189,7 @@ impl Binder {
// internal
("rw_vnode", raw_call(ExprType::Vnode)),
// TODO: choose which pg version we should return.
("version", raw_literal(ExprImpl::literal_varchar(format!(
"PostgreSQL 9.5-RisingWave-{} ({})",
RW_VERSION,
GIT_SHA
)))),
("version", raw_literal(ExprImpl::literal_varchar(current_cluster_version()))),
// non-deterministic
("now", now()),
("current_timestamp", now()),
Expand Down
8 changes: 8 additions & 0 deletions src/frontend/src/catalog/index_catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,10 @@ pub struct IndexCatalog {
pub created_at_epoch: Option<Epoch>,

pub initialized_at_epoch: Option<Epoch>,

pub created_at_cluster_version: Option<String>,

pub initialized_at_cluster_version: Option<String>,
}

impl IndexCatalog {
Expand Down Expand Up @@ -116,6 +120,8 @@ impl IndexCatalog {
index_columns_len: index_prost.index_columns_len,
created_at_epoch: index_prost.created_at_epoch.map(Epoch::from),
initialized_at_epoch: index_prost.initialized_at_epoch.map(Epoch::from),
created_at_cluster_version: index_prost.created_at_cluster_version.clone(),
initialized_at_cluster_version: index_prost.initialized_at_cluster_version.clone(),
}
}

Expand Down Expand Up @@ -177,6 +183,8 @@ impl IndexCatalog {
initialized_at_epoch: self.initialized_at_epoch.map(|e| e.0),
created_at_epoch: self.created_at_epoch.map(|e| e.0),
stream_job_status: PbStreamJobStatus::Creating.into(),
initialized_at_cluster_version: self.initialized_at_cluster_version.clone(),
created_at_cluster_version: self.created_at_cluster_version.clone(),
}
}

Expand Down
6 changes: 6 additions & 0 deletions src/frontend/src/catalog/source_catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ pub struct SourceCatalog {
pub created_at_epoch: Option<Epoch>,
pub initialized_at_epoch: Option<Epoch>,
pub version: SourceVersionId,
pub created_at_cluster_version: Option<String>,
pub initialized_at_cluster_version: Option<String>,
}

impl SourceCatalog {
Expand Down Expand Up @@ -72,6 +74,8 @@ impl SourceCatalog {
.associated_table_id
.map(|id| OptionalAssociatedTableId::AssociatedTableId(id.table_id)),
version: self.version,
created_at_cluster_version: self.created_at_cluster_version.clone(),
initialized_at_cluster_version: self.initialized_at_cluster_version.clone(),
}
}

Expand Down Expand Up @@ -127,6 +131,8 @@ impl From<&PbSource> for SourceCatalog {
created_at_epoch: prost.created_at_epoch.map(Epoch::from),
initialized_at_epoch: prost.initialized_at_epoch.map(Epoch::from),
version,
created_at_cluster_version: prost.created_at_cluster_version.clone(),
initialized_at_cluster_version: prost.initialized_at_cluster_version.clone(),
}
}
}
Expand Down
10 changes: 10 additions & 0 deletions src/frontend/src/catalog/system_catalog/rw_catalog/rw_indexes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ pub static RW_INDEXES_COLUMNS: LazyLock<Vec<SystemCatalogColumnsDef<'_>>> = Lazy
(DataType::Varchar, "acl"),
(DataType::Timestamptz, "initialized_at"),
(DataType::Timestamptz, "created_at"),
(DataType::Varchar, "initialized_at_cluster_version"),
(DataType::Varchar, "created_at_cluster_version"),
]
});

Expand Down Expand Up @@ -77,6 +79,14 @@ impl SysCatalogReaderImpl {
Some(ScalarImpl::Utf8("".into())),
index.initialized_at_epoch.map(|e| e.as_scalar()),
index.created_at_epoch.map(|e| e.as_scalar()),
index
.initialized_at_cluster_version
.clone()
.map(|v| ScalarImpl::Utf8(v.into())),
index
.created_at_cluster_version
.clone()
.map(|v| ScalarImpl::Utf8(v.into())),
])
})
})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ pub const RW_INTERNAL_TABLES: BuiltinTable = BuiltinTable {
(DataType::Varchar, "acl"),
(DataType::Timestamptz, "initialized_at"),
(DataType::Timestamptz, "created_at"),
(DataType::Varchar, "initialized_at_cluster_version"),
(DataType::Varchar, "created_at_cluster_version"),
],
pk: &[0],
};
Expand Down Expand Up @@ -65,6 +67,14 @@ impl SysCatalogReaderImpl {
)),
table.initialized_at_epoch.map(|e| e.as_scalar()),
table.created_at_epoch.map(|e| e.as_scalar()),
table
.initialized_at_cluster_version
.clone()
.map(|v| ScalarImpl::Utf8(v.into())),
table
.created_at_cluster_version
.clone()
.map(|v| ScalarImpl::Utf8(v.into())),
])
})
})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ pub const RW_MATERIALIZED_VIEWS: BuiltinTable = BuiltinTable {
(DataType::Varchar, "acl"),
(DataType::Timestamptz, "initialized_at"),
(DataType::Timestamptz, "created_at"),
(DataType::Varchar, "initialized_at_cluster_version"),
(DataType::Varchar, "created_at_cluster_version"),
],
pk: &[0],
};
Expand Down Expand Up @@ -65,6 +67,14 @@ impl SysCatalogReaderImpl {
)),
table.initialized_at_epoch.map(|e| e.as_scalar()),
table.created_at_epoch.map(|e| e.as_scalar()),
table
.initialized_at_cluster_version
.clone()
.map(|v| ScalarImpl::Utf8(v.into())),
table
.created_at_cluster_version
.clone()
.map(|v| ScalarImpl::Utf8(v.into())),
])
})
})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ pub const RW_RELATION_INFO: BuiltinTable = BuiltinTable {
(DataType::Varchar, "fragments"), // fragments is json encoded fragment infos.
(DataType::Timestamptz, "initialized_at"),
(DataType::Timestamptz, "created_at"),
(DataType::Varchar, "initialized_at_cluster_version"),
(DataType::Varchar, "created_at_cluster_version"),
],
pk: &[0, 1],
};
Expand Down Expand Up @@ -93,6 +95,12 @@ impl SysCatalogReaderImpl {
)),
t.initialized_at_epoch.map(|e| e.as_scalar()),
t.created_at_epoch.map(|e| e.as_scalar()),
t.initialized_at_cluster_version
.clone()
.map(|v| ScalarImpl::Utf8(v.into())),
t.created_at_cluster_version
.clone()
.map(|v| ScalarImpl::Utf8(v.into())),
]));
}
});
Expand All @@ -114,6 +122,12 @@ impl SysCatalogReaderImpl {
)),
t.initialized_at_epoch.map(|e| e.as_scalar()),
t.created_at_epoch.map(|e| e.as_scalar()),
t.initialized_at_cluster_version
.clone()
.map(|v| ScalarImpl::Utf8(v.into())),
t.created_at_cluster_version
.clone()
.map(|v| ScalarImpl::Utf8(v.into())),
]));
}
});
Expand All @@ -135,6 +149,12 @@ impl SysCatalogReaderImpl {
)),
t.initialized_at_epoch.map(|e| e.as_scalar()),
t.created_at_epoch.map(|e| e.as_scalar()),
t.initialized_at_cluster_version
.clone()
.map(|v| ScalarImpl::Utf8(v.into())),
t.created_at_cluster_version
.clone()
.map(|v| ScalarImpl::Utf8(v.into())),
]));
}
});
Expand All @@ -156,6 +176,12 @@ impl SysCatalogReaderImpl {
)),
t.initialized_at_epoch.map(|e| e.as_scalar()),
t.created_at_epoch.map(|e| e.as_scalar()),
t.initialized_at_cluster_version
.clone()
.map(|v| ScalarImpl::Utf8(v.into())),
t.created_at_cluster_version
.clone()
.map(|v| ScalarImpl::Utf8(v.into())),
]));
}
});
Expand All @@ -173,6 +199,12 @@ impl SysCatalogReaderImpl {
None,
t.initialized_at_epoch.map(|e| e.as_scalar()),
t.created_at_epoch.map(|e| e.as_scalar()),
t.initialized_at_cluster_version
.clone()
.map(|v| ScalarImpl::Utf8(v.into())),
t.created_at_cluster_version
.clone()
.map(|v| ScalarImpl::Utf8(v.into())),
]));
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ pub const RW_SINKS: BuiltinTable = BuiltinTable {
(DataType::Varchar, "acl"),
(DataType::Timestamptz, "initialized_at"),
(DataType::Timestamptz, "created_at"),
(DataType::Varchar, "initialized_at_cluster_version"),
(DataType::Varchar, "created_at_cluster_version"),
],
pk: &[0],
};
Expand Down Expand Up @@ -82,6 +84,12 @@ impl SysCatalogReaderImpl {
),
sink.initialized_at_epoch.map(|e| e.as_scalar()),
sink.created_at_epoch.map(|e| e.as_scalar()),
sink.initialized_at_cluster_version
.clone()
.map(|v| ScalarImpl::Utf8(v.into())),
sink.created_at_cluster_version
.clone()
.map(|v| ScalarImpl::Utf8(v.into())),
])
})
})
Expand Down
Loading
Loading