Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(sql-backend): [PART 2] introduce metadata manager and adapt it in all RPC services, dashboard, metric etc #14191

Merged
merged 17 commits into from
Jan 2, 2024
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ public MetaClient(String metaAddr, ScheduledExecutorService scheduler) {
.build();
AddWorkerNodeResponse resp = clusterStub.addWorkerNode(req);

this.workerId = resp.getNode().getId();
this.workerId = resp.getNodeId();
}

public HummockVersion pinVersion() {
Expand Down
30 changes: 10 additions & 20 deletions proto/ddl_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,9 @@
catalog.Database db = 1;
}

message CreateDatabaseResponse {

Check failure on line 16 in proto/ddl_service.proto

View workflow job for this annotation

GitHub Actions / Check breaking changes in Protobuf files

Previously present field "3" with name "version" on message "CreateDatabaseResponse" was deleted without reserving the number "3".
common.Status status = 1;
uint32 database_id = 2;
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need to return the id for all create ddl, they are never used. And it doesn't have any compatible issues, since they are never persisted.

uint64 version = 3;
uint64 version = 2;
}

message DropDatabaseRequest {
Expand All @@ -32,10 +31,9 @@
catalog.Schema schema = 1;
}

message CreateSchemaResponse {

Check failure on line 34 in proto/ddl_service.proto

View workflow job for this annotation

GitHub Actions / Check breaking changes in Protobuf files

Previously present field "3" with name "version" on message "CreateSchemaResponse" was deleted without reserving the number "3".
common.Status status = 1;
uint32 schema_id = 2;
uint64 version = 3;
uint64 version = 2;
}

message DropSchemaRequest {
Expand All @@ -52,10 +50,9 @@
stream_plan.StreamFragmentGraph fragment_graph = 2;
}

message CreateSourceResponse {

Check failure on line 53 in proto/ddl_service.proto

View workflow job for this annotation

GitHub Actions / Check breaking changes in Protobuf files

Previously present field "3" with name "version" on message "CreateSourceResponse" was deleted without reserving the number "3".
common.Status status = 1;
uint32 source_id = 2;
uint64 version = 3;
uint64 version = 2;
}

message DropSourceRequest {
Expand Down Expand Up @@ -84,10 +81,9 @@
optional ReplaceTablePlan affected_table_change = 3;
}

message CreateSinkResponse {

Check failure on line 84 in proto/ddl_service.proto

View workflow job for this annotation

GitHub Actions / Check breaking changes in Protobuf files

Previously present field "3" with name "version" on message "CreateSinkResponse" was deleted without reserving the number "3".
common.Status status = 1;
uint32 sink_id = 2;
uint64 version = 3;
uint64 version = 2;
}

message DropSinkRequest {
Expand All @@ -106,10 +102,9 @@
stream_plan.StreamFragmentGraph fragment_graph = 2;
}

message CreateMaterializedViewResponse {

Check failure on line 105 in proto/ddl_service.proto

View workflow job for this annotation

GitHub Actions / Check breaking changes in Protobuf files

Previously present field "3" with name "version" on message "CreateMaterializedViewResponse" was deleted without reserving the number "3".
common.Status status = 1;
uint32 table_id = 2;
uint64 version = 3;
uint64 version = 2;
}

message DropMaterializedViewRequest {
Expand All @@ -126,10 +121,9 @@
catalog.View view = 1;
}

message CreateViewResponse {

Check failure on line 124 in proto/ddl_service.proto

View workflow job for this annotation

GitHub Actions / Check breaking changes in Protobuf files

Previously present field "3" with name "version" on message "CreateViewResponse" was deleted without reserving the number "3".
common.Status status = 1;
uint32 view_id = 2;
uint64 version = 3;
uint64 version = 2;
}

message DropViewRequest {
Expand Down Expand Up @@ -163,10 +157,9 @@
TableJobType job_type = 4;
}

message CreateTableResponse {

Check failure on line 160 in proto/ddl_service.proto

View workflow job for this annotation

GitHub Actions / Check breaking changes in Protobuf files

Previously present field "3" with name "version" on message "CreateTableResponse" was deleted without reserving the number "3".
common.Status status = 1;
uint32 table_id = 2;
uint64 version = 3;
uint64 version = 2;
}

message AlterNameRequest {
Expand Down Expand Up @@ -225,10 +218,9 @@
catalog.Function function = 1;
}

message CreateFunctionResponse {

Check failure on line 221 in proto/ddl_service.proto

View workflow job for this annotation

GitHub Actions / Check breaking changes in Protobuf files

Previously present field "3" with name "version" on message "CreateFunctionResponse" was deleted without reserving the number "3".
common.Status status = 1;
uint32 function_id = 2;
uint64 version = 3;
uint64 version = 2;
}

message DropFunctionRequest {
Expand Down Expand Up @@ -267,10 +259,9 @@
stream_plan.StreamFragmentGraph fragment_graph = 3;
}

message CreateIndexResponse {

Check failure on line 262 in proto/ddl_service.proto

View workflow job for this annotation

GitHub Actions / Check breaking changes in Protobuf files

Previously present field "4" with name "version" on message "CreateIndexResponse" was deleted without reserving the number "4".
common.Status status = 1;
uint32 index_id = 2;
uint64 version = 4;
uint64 version = 2;
}

message DropIndexRequest {
Expand Down Expand Up @@ -343,10 +334,9 @@
uint32 owner_id = 5;
}

message CreateConnectionResponse {

Check failure on line 337 in proto/ddl_service.proto

View workflow job for this annotation

GitHub Actions / Check breaking changes in Protobuf files

Previously present field "2" with name "version" on message "CreateConnectionResponse" was deleted without reserving the number "2".
uint32 connection_id = 1;
// global catalog version
uint64 version = 2;
uint64 version = 1;
}

message ListConnectionsRequest {}
Expand Down
3 changes: 2 additions & 1 deletion proto/meta.proto
Original file line number Diff line number Diff line change
Expand Up @@ -286,11 +286,12 @@ message AddWorkerNodeResponse {
reserved 3;
reserved "system_params";
common.Status status = 1;
common.WorkerNode node = 2;
optional uint32 node_id = 2;
}

message ActivateWorkerNodeRequest {
common.HostAddress host = 1;
uint32 node_id = 2;
}

message ActivateWorkerNodeResponse {
Expand Down
22 changes: 11 additions & 11 deletions src/frontend/src/catalog/catalog_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ pub struct CatalogWriterImpl {
#[async_trait::async_trait]
impl CatalogWriter for CatalogWriterImpl {
async fn create_database(&self, db_name: &str, owner: UserId) -> Result<()> {
let (_, version) = self
let version = self
.meta_client
.create_database(PbDatabase {
name: db_name.to_string(),
Expand All @@ -208,7 +208,7 @@ impl CatalogWriter for CatalogWriterImpl {
schema_name: &str,
owner: UserId,
) -> Result<()> {
let (_, version) = self
let version = self
.meta_client
.create_schema(PbSchema {
id: 0,
Expand All @@ -227,7 +227,7 @@ impl CatalogWriter for CatalogWriterImpl {
graph: StreamFragmentGraph,
) -> Result<()> {
let create_type = table.get_create_type().unwrap_or(PbCreateType::Foreground);
let (_, version) = self
let version = self
.meta_client
.create_materialized_view(table, graph)
.await?;
Expand All @@ -238,7 +238,7 @@ impl CatalogWriter for CatalogWriterImpl {
}

async fn create_view(&self, view: PbView) -> Result<()> {
let (_, version) = self.meta_client.create_view(view).await?;
let version = self.meta_client.create_view(view).await?;
self.wait_version(version).await
}

Expand All @@ -248,7 +248,7 @@ impl CatalogWriter for CatalogWriterImpl {
table: PbTable,
graph: StreamFragmentGraph,
) -> Result<()> {
let (_, version) = self.meta_client.create_index(index, table, graph).await?;
let version = self.meta_client.create_index(index, table, graph).await?;
self.wait_version(version).await
}

Expand All @@ -259,7 +259,7 @@ impl CatalogWriter for CatalogWriterImpl {
graph: StreamFragmentGraph,
job_type: PbTableJobType,
) -> Result<()> {
let (_, version) = self
let version = self
.meta_client
.create_table(source, table, graph, job_type)
.await?;
Expand All @@ -286,7 +286,7 @@ impl CatalogWriter for CatalogWriterImpl {
}

async fn create_source(&self, source: PbSource) -> Result<()> {
let (_id, version) = self.meta_client.create_source(source).await?;
let version = self.meta_client.create_source(source).await?;
self.wait_version(version).await
}

Expand All @@ -295,7 +295,7 @@ impl CatalogWriter for CatalogWriterImpl {
source: PbSource,
graph: StreamFragmentGraph,
) -> Result<()> {
let (_id, version) = self
let version = self
.meta_client
.create_source_with_graph(source, graph)
.await?;
Expand All @@ -308,15 +308,15 @@ impl CatalogWriter for CatalogWriterImpl {
graph: StreamFragmentGraph,
affected_table_change: Option<ReplaceTablePlan>,
) -> Result<()> {
let (_id, version) = self
let version = self
.meta_client
.create_sink(sink, graph, affected_table_change)
.await?;
self.wait_version(version).await
}

async fn create_function(&self, function: PbFunction) -> Result<()> {
let (_, version) = self.meta_client.create_function(function).await?;
let version = self.meta_client.create_function(function).await?;
self.wait_version(version).await
}

Expand All @@ -328,7 +328,7 @@ impl CatalogWriter for CatalogWriterImpl {
owner_id: u32,
connection: create_connection_request::Payload,
) -> Result<()> {
let (_, version) = self
let version = self
.meta_client
.create_connection(
connection_name,
Expand Down
Loading
Loading