Skip to content

Commit

Permalink
Merge branch 'main' into bz/embedded-jaeger-ui
Browse files Browse the repository at this point in the history
  • Loading branch information
BugenZhao authored Jan 2, 2024
2 parents 8a079a0 + 1f4b882 commit 0625ba7
Show file tree
Hide file tree
Showing 51 changed files with 2,873 additions and 1,371 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ public MetaClient(String metaAddr, ScheduledExecutorService scheduler) {
.build();
AddWorkerNodeResponse resp = clusterStub.addWorkerNode(req);

this.workerId = resp.getNode().getId();
this.workerId = resp.getNodeId();
}

public HummockVersion pinVersion() {
Expand Down
30 changes: 10 additions & 20 deletions proto/ddl_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,7 @@ message CreateDatabaseRequest {

message CreateDatabaseResponse {
common.Status status = 1;
uint32 database_id = 2;
uint64 version = 3;
uint64 version = 2;
}

message DropDatabaseRequest {
Expand All @@ -35,8 +34,7 @@ message CreateSchemaRequest {

message CreateSchemaResponse {
common.Status status = 1;
uint32 schema_id = 2;
uint64 version = 3;
uint64 version = 2;
}

message DropSchemaRequest {
Expand All @@ -55,8 +53,7 @@ message CreateSourceRequest {

message CreateSourceResponse {
common.Status status = 1;
uint32 source_id = 2;
uint64 version = 3;
uint64 version = 2;
}

message DropSourceRequest {
Expand Down Expand Up @@ -87,8 +84,7 @@ message CreateSinkRequest {

message CreateSinkResponse {
common.Status status = 1;
uint32 sink_id = 2;
uint64 version = 3;
uint64 version = 2;
}

message DropSinkRequest {
Expand All @@ -109,8 +105,7 @@ message CreateMaterializedViewRequest {

message CreateMaterializedViewResponse {
common.Status status = 1;
uint32 table_id = 2;
uint64 version = 3;
uint64 version = 2;
}

message DropMaterializedViewRequest {
Expand All @@ -129,8 +124,7 @@ message CreateViewRequest {

message CreateViewResponse {
common.Status status = 1;
uint32 view_id = 2;
uint64 version = 3;
uint64 version = 2;
}

message DropViewRequest {
Expand Down Expand Up @@ -166,8 +160,7 @@ message CreateTableRequest {

message CreateTableResponse {
common.Status status = 1;
uint32 table_id = 2;
uint64 version = 3;
uint64 version = 2;
}

message AlterNameRequest {
Expand Down Expand Up @@ -235,8 +228,7 @@ message CreateFunctionRequest {

message CreateFunctionResponse {
common.Status status = 1;
uint32 function_id = 2;
uint64 version = 3;
uint64 version = 2;
}

message DropFunctionRequest {
Expand Down Expand Up @@ -277,8 +269,7 @@ message CreateIndexRequest {

message CreateIndexResponse {
common.Status status = 1;
uint32 index_id = 2;
uint64 version = 4;
uint64 version = 2;
}

message DropIndexRequest {
Expand Down Expand Up @@ -352,9 +343,8 @@ message CreateConnectionRequest {
}

message CreateConnectionResponse {
uint32 connection_id = 1;
// global catalog version
uint64 version = 2;
uint64 version = 1;
}

message ListConnectionsRequest {}
Expand Down
3 changes: 2 additions & 1 deletion proto/meta.proto
Original file line number Diff line number Diff line change
Expand Up @@ -288,11 +288,12 @@ message AddWorkerNodeResponse {
reserved 3;
reserved "system_params";
common.Status status = 1;
common.WorkerNode node = 2;
optional uint32 node_id = 2;
}

message ActivateWorkerNodeRequest {
common.HostAddress host = 1;
uint32 node_id = 2;
}

message ActivateWorkerNodeResponse {
Expand Down
22 changes: 11 additions & 11 deletions src/frontend/src/catalog/catalog_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ pub struct CatalogWriterImpl {
#[async_trait::async_trait]
impl CatalogWriter for CatalogWriterImpl {
async fn create_database(&self, db_name: &str, owner: UserId) -> Result<()> {
let (_, version) = self
let version = self
.meta_client
.create_database(PbDatabase {
name: db_name.to_string(),
Expand All @@ -212,7 +212,7 @@ impl CatalogWriter for CatalogWriterImpl {
schema_name: &str,
owner: UserId,
) -> Result<()> {
let (_, version) = self
let version = self
.meta_client
.create_schema(PbSchema {
id: 0,
Expand All @@ -231,7 +231,7 @@ impl CatalogWriter for CatalogWriterImpl {
graph: StreamFragmentGraph,
) -> Result<()> {
let create_type = table.get_create_type().unwrap_or(PbCreateType::Foreground);
let (_, version) = self
let version = self
.meta_client
.create_materialized_view(table, graph)
.await?;
Expand All @@ -242,7 +242,7 @@ impl CatalogWriter for CatalogWriterImpl {
}

async fn create_view(&self, view: PbView) -> Result<()> {
let (_, version) = self.meta_client.create_view(view).await?;
let version = self.meta_client.create_view(view).await?;
self.wait_version(version).await
}

Expand All @@ -252,7 +252,7 @@ impl CatalogWriter for CatalogWriterImpl {
table: PbTable,
graph: StreamFragmentGraph,
) -> Result<()> {
let (_, version) = self.meta_client.create_index(index, table, graph).await?;
let version = self.meta_client.create_index(index, table, graph).await?;
self.wait_version(version).await
}

Expand All @@ -263,7 +263,7 @@ impl CatalogWriter for CatalogWriterImpl {
graph: StreamFragmentGraph,
job_type: PbTableJobType,
) -> Result<()> {
let (_, version) = self
let version = self
.meta_client
.create_table(source, table, graph, job_type)
.await?;
Expand All @@ -290,7 +290,7 @@ impl CatalogWriter for CatalogWriterImpl {
}

async fn create_source(&self, source: PbSource) -> Result<()> {
let (_id, version) = self.meta_client.create_source(source).await?;
let version = self.meta_client.create_source(source).await?;
self.wait_version(version).await
}

Expand All @@ -299,7 +299,7 @@ impl CatalogWriter for CatalogWriterImpl {
source: PbSource,
graph: StreamFragmentGraph,
) -> Result<()> {
let (_id, version) = self
let version = self
.meta_client
.create_source_with_graph(source, graph)
.await?;
Expand All @@ -312,15 +312,15 @@ impl CatalogWriter for CatalogWriterImpl {
graph: StreamFragmentGraph,
affected_table_change: Option<ReplaceTablePlan>,
) -> Result<()> {
let (_id, version) = self
let version = self
.meta_client
.create_sink(sink, graph, affected_table_change)
.await?;
self.wait_version(version).await
}

async fn create_function(&self, function: PbFunction) -> Result<()> {
let (_, version) = self.meta_client.create_function(function).await?;
let version = self.meta_client.create_function(function).await?;
self.wait_version(version).await
}

Expand All @@ -332,7 +332,7 @@ impl CatalogWriter for CatalogWriterImpl {
owner_id: u32,
connection: create_connection_request::Payload,
) -> Result<()> {
let (_, version) = self
let version = self
.meta_client
.create_connection(
connection_name,
Expand Down
Loading

0 comments on commit 0625ba7

Please sign in to comment.