Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support ALTER [TABLE | [MATERIALIZED] VIEW | SOURCE | SINK | CONNECTION | FUNCTION] <name> SET SCHEMA <schema_name> syntax #13341

Merged
merged 27 commits into from
Nov 21, 2023
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
c2fca8b
stash
Rossil2012 Nov 9, 2023
1b7930a
stash
Rossil2012 Nov 10, 2023
50e9a55
support alter set schema
Rossil2012 Nov 13, 2023
b14682f
fix schema change in frontend notification
Rossil2012 Nov 13, 2023
8ca373e
add e2e tests
Rossil2012 Nov 14, 2023
2893b07
fix for tests
Rossil2012 Nov 14, 2023
4bb34f8
fix test
Rossil2012 Nov 14, 2023
835a0e8
frontend: ignore if schema unchanged
Rossil2012 Nov 14, 2023
1a6c4d9
avoid multiple commits
Rossil2012 Nov 14, 2023
652fc88
Merge branch 'main' into kanzhen/alter-set-schema
Rossil2012 Nov 14, 2023
1b60618
meta: ignore if schema unchanged
Rossil2012 Nov 14, 2023
13b2e38
alter schema of table's associated src/index/table
Rossil2012 Nov 15, 2023
0c78f14
Merge branch 'main' into kanzhen/alter-set-schema
Rossil2012 Nov 15, 2023
af10d4a
fix import IGNORED_NOTIFICATION_VERSION twice
Rossil2012 Nov 15, 2023
e04d02b
support alter connection
Rossil2012 Nov 16, 2023
35398b9
test for alter function
Rossil2012 Nov 16, 2023
d22d50f
fix test
Rossil2012 Nov 16, 2023
928ef7a
move function test to udf
Rossil2012 Nov 16, 2023
7534258
fix test
Rossil2012 Nov 16, 2023
aac772a
Merge branch 'main' into kanzhen/alter-set-schema
Rossil2012 Nov 16, 2023
654c4d6
fix test
Rossil2012 Nov 16, 2023
b2977e1
Merge branch 'kanzhen/alter-set-schema' of https://github.com/risingw…
Rossil2012 Nov 16, 2023
9404672
Merge branch 'main' into kanzhen/alter-set-schema
Rossil2012 Nov 16, 2023
3b41c58
add unit test
Rossil2012 Nov 17, 2023
3c0969f
Merge branch 'kanzhen/alter-set-schema' of https://github.com/risingw…
Rossil2012 Nov 17, 2023
3de350f
fix unit test
Rossil2012 Nov 17, 2023
1e35dac
fix when alter to old schema
Rossil2012 Nov 21, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions proto/ddl_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,25 @@ message AlterOwnerRequest {
uint32 owner_id = 20;
}

message AlterSetSchemaRequest {
oneof object {
uint32 table_id = 1;
uint32 view_id = 2;
uint32 source_id = 3;
uint32 sink_id = 4;
uint32 function_id = 5;
Rossil2012 marked this conversation as resolved.
Show resolved Hide resolved
// ALTER INDEX xxx SET SCHEMA is not supported,
// but index's schema can be altered along with table.
uint32 index_id = 6;
}
uint32 new_schema_id = 20;
}

message AlterSetSchemaResponse {
common.Status status = 1;
uint64 version = 2;
}

message AlterOwnerResponse {
common.Status status = 1;
uint64 version = 2;
Expand Down Expand Up @@ -373,6 +392,7 @@ service DdlService {
rpc AlterRelationName(AlterRelationNameRequest) returns (AlterRelationNameResponse);
rpc AlterSource(AlterSourceRequest) returns (AlterSourceResponse);
rpc AlterOwner(AlterOwnerRequest) returns (AlterOwnerResponse);
rpc AlterSetSchema(AlterSetSchemaRequest) returns (AlterSetSchemaResponse);
rpc DropTable(DropTableRequest) returns (DropTableResponse);
rpc RisectlListStateTables(RisectlListStateTablesRequest) returns (RisectlListStateTablesResponse);
rpc CreateView(CreateViewRequest) returns (CreateViewResponse);
Expand Down
22 changes: 21 additions & 1 deletion src/frontend/src/catalog/catalog_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,9 @@ use risingwave_pb::catalog::{
};
use risingwave_pb::ddl_service::alter_owner_request::Object;
use risingwave_pb::ddl_service::alter_relation_name_request::Relation;
use risingwave_pb::ddl_service::{create_connection_request, PbTableJobType};
use risingwave_pb::ddl_service::{
alter_set_schema_request, create_connection_request, PbTableJobType,
};
use risingwave_pb::stream_plan::StreamFragmentGraph;
use risingwave_rpc_client::MetaClient;
use tokio::sync::watch::Receiver;
Expand Down Expand Up @@ -158,6 +160,12 @@ pub trait CatalogWriter: Send + Sync {
async fn alter_source_name(&self, source_id: u32, source_name: &str) -> Result<()>;

async fn alter_owner(&self, object: Object, owner_id: u32) -> Result<()>;

async fn alter_set_schema(
&self,
object: alter_set_schema_request::Object,
new_schema_id: u32,
) -> Result<()>;
}

#[derive(Clone)]
Expand Down Expand Up @@ -421,6 +429,18 @@ impl CatalogWriter for CatalogWriterImpl {
let version = self.meta_client.alter_owner(object, owner_id).await?;
self.wait_version(version).await
}

async fn alter_set_schema(
&self,
object: alter_set_schema_request::Object,
new_schema_id: u32,
) -> Result<()> {
let version = self
.meta_client
.alter_set_schema(object, new_schema_id)
.await?;
self.wait_version(version).await
}
}

impl CatalogWriterImpl {
Expand Down
4 changes: 4 additions & 0 deletions src/frontend/src/catalog/database_catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,10 @@ impl DatabaseCatalog {
self.schema_by_name.values()
}

pub fn iter_schemas_mut(&mut self) -> impl Iterator<Item = &mut SchemaCatalog> {
self.schema_by_name.values_mut()
}

pub fn get_schema_by_name(&self, name: &str) -> Option<&SchemaCatalog> {
self.schema_by_name.get(name)
}
Expand Down
136 changes: 110 additions & 26 deletions src/frontend/src/catalog/root_catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -253,12 +253,24 @@ impl Catalog {
}

pub fn update_table(&mut self, proto: &PbTable) {
let table = self
.get_database_mut(proto.database_id)
.unwrap()
.get_schema_mut(proto.schema_id)
.unwrap()
.update_table(proto);
let database = self.get_database_mut(proto.database_id).unwrap();
let schema = database.get_schema_mut(proto.schema_id).unwrap();
let table = if schema.get_table_by_id(&proto.id.into()).is_some() {
schema.update_table(proto)
} else {
// Enter this branch when schema is changed by `ALTER ... SET SCHEMA ...` statement.
let new_table = schema.create_table(proto);
database
.iter_schemas_mut()
.find(|schema| {
schema.id() != proto.schema_id
Rossil2012 marked this conversation as resolved.
Show resolved Hide resolved
&& schema.get_table_by_id(&proto.id.into()).is_some()
})
.unwrap()
.drop_table(proto.id.into());
new_table
};

self.table_by_id.insert(proto.id.into(), table);
}

Expand All @@ -275,11 +287,22 @@ impl Catalog {
}

pub fn update_index(&mut self, proto: &PbIndex) {
self.get_database_mut(proto.database_id)
.unwrap()
.get_schema_mut(proto.schema_id)
.unwrap()
.update_index(proto);
let database = self.get_database_mut(proto.database_id).unwrap();
let schema = database.get_schema_mut(proto.schema_id).unwrap();
if schema.get_index_by_id(&proto.id.into()).is_some() {
schema.update_index(proto);
} else {
// Enter this branch when schema is changed by `ALTER ... SET SCHEMA ...` statement.
schema.create_index(proto);
database
.iter_schemas_mut()
.find(|schema| {
schema.id() != proto.schema_id
&& schema.get_index_by_id(&proto.id.into()).is_some()
})
.unwrap()
.drop_index(proto.id.into());
}
}

pub fn drop_source(&mut self, db_id: DatabaseId, schema_id: SchemaId, source_id: SourceId) {
Expand All @@ -291,11 +314,21 @@ impl Catalog {
}

pub fn update_source(&mut self, proto: &PbSource) {
self.get_database_mut(proto.database_id)
.unwrap()
.get_schema_mut(proto.schema_id)
.unwrap()
.update_source(proto);
let database = self.get_database_mut(proto.database_id).unwrap();
let schema = database.get_schema_mut(proto.schema_id).unwrap();
if schema.get_source_by_id(&proto.id).is_some() {
schema.update_source(proto);
} else {
// Enter this branch when schema is changed by `ALTER ... SET SCHEMA ...` statement.
schema.create_source(proto);
database
.iter_schemas_mut()
.find(|schema| {
schema.id() != proto.schema_id && schema.get_source_by_id(&proto.id).is_some()
})
.unwrap()
.drop_source(proto.id);
}
}

pub fn drop_sink(&mut self, db_id: DatabaseId, schema_id: SchemaId, sink_id: SinkId) {
Expand All @@ -307,11 +340,21 @@ impl Catalog {
}

pub fn update_sink(&mut self, proto: &PbSink) {
self.get_database_mut(proto.database_id)
.unwrap()
.get_schema_mut(proto.schema_id)
.unwrap()
.update_sink(proto);
let database = self.get_database_mut(proto.database_id).unwrap();
let schema = database.get_schema_mut(proto.schema_id).unwrap();
if schema.get_sink_by_id(&proto.id).is_some() {
schema.update_sink(proto);
} else {
// Enter this branch when schema is changed by `ALTER ... SET SCHEMA ...` statement.
schema.create_sink(proto);
database
.iter_schemas_mut()
.find(|schema| {
schema.id() != proto.schema_id && schema.get_sink_by_id(&proto.id).is_some()
})
.unwrap()
.drop_sink(proto.id);
}
}

pub fn drop_index(&mut self, db_id: DatabaseId, schema_id: SchemaId, index_id: IndexId) {
Expand All @@ -331,11 +374,21 @@ impl Catalog {
}

pub fn update_view(&mut self, proto: &PbView) {
self.get_database_mut(proto.database_id)
.unwrap()
.get_schema_mut(proto.schema_id)
.unwrap()
.update_view(proto);
let database = self.get_database_mut(proto.database_id).unwrap();
let schema = database.get_schema_mut(proto.schema_id).unwrap();
if schema.get_view_by_id(&proto.id).is_some() {
schema.update_view(proto);
} else {
// Enter this branch when schema is changed by `ALTER ... SET SCHEMA ...` statement.
schema.create_view(proto);
database
.iter_schemas_mut()
.find(|schema| {
schema.id() != proto.schema_id && schema.get_view_by_id(&proto.id).is_some()
})
.unwrap()
.drop_view(proto.id);
}
}

pub fn drop_function(
Expand All @@ -351,6 +404,14 @@ impl Catalog {
.drop_function(function_id);
}

pub fn update_function(&mut self, proto: &PbFunction) {
self.get_database_mut(proto.database_id)
.unwrap()
.get_schema_mut(proto.schema_id)
Rossil2012 marked this conversation as resolved.
Show resolved Hide resolved
.unwrap()
.update_function(proto);
}

pub fn get_database_by_name(&self, db_name: &str) -> CatalogResult<&DatabaseCatalog> {
self.database_by_name
.get(db_name)
Expand Down Expand Up @@ -681,6 +742,29 @@ impl Catalog {
}
}

pub fn check_function_name_duplicated(
&self,
db_name: &str,
schema_name: &str,
function_name: &str,
arg_types: &[DataType],
) -> CatalogResult<()> {
let schema = self.get_schema_by_name(db_name, schema_name)?;

if schema
.get_function_by_name_args(function_name, arg_types)
.is_some()
{
let name = format!(
"{function_name}({})",
arg_types.iter().map(|t| t.to_string()).join(",")
);
Err(CatalogError::Duplicated("function", name))
} else {
Ok(())
}
}

/// Check if the name duplicates with existing connection.
pub fn check_connection_name_duplicated(
&self,
Expand Down
26 changes: 26 additions & 0 deletions src/frontend/src/catalog/schema_catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -356,6 +356,32 @@ impl SchemaCatalog {
.expect("function not found by argument types");
}

pub fn update_function(&mut self, prost: &PbFunction) {
let name = prost.name.clone();
let id = prost.id.into();
let function = FunctionCatalog::from(prost);
let function_ref = Arc::new(function);

let old_function_by_id = self.function_by_id.get(&id).unwrap();
let old_function_by_name = self
.function_by_name
.get_mut(&old_function_by_id.name)
.unwrap();
// check if function name get updated.
if old_function_by_id.name != name {
old_function_by_name.remove(&old_function_by_id.arg_types);
if old_function_by_name.is_empty() {
self.function_by_name.remove(&old_function_by_id.name);
}
}

self.function_by_name
.entry(name)
.or_default()
.insert(old_function_by_id.arg_types.clone(), function_ref.clone());
self.function_by_id.insert(id, function_ref);
}

pub fn create_connection(&mut self, prost: &PbConnection) {
let name = prost.name.clone();
let id = prost.id;
Expand Down
Loading
Loading