Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(frontend): frontend wait hummock version id for ddl #18613

Merged
merged 8 commits into from
Sep 23, 2024
65 changes: 35 additions & 30 deletions proto/ddl_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,18 @@
option java_package = "com.risingwave.proto";
option optimize_for = SPEED;

message WaitVersion {
uint64 catalog_version = 1;
uint64 hummock_version_id = 2;
}

message CreateDatabaseRequest {
catalog.Database db = 1;
}

message CreateDatabaseResponse {
common.Status status = 1;
uint64 version = 2;
WaitVersion version = 2;

Check failure on line 25 in proto/ddl_service.proto

View workflow job for this annotation

GitHub Actions / Check breaking changes in Protobuf files

Field "2" with name "version" on message "CreateDatabaseResponse" changed type from "uint64" to "message". See https://developers.google.com/protocol-buffers/docs/proto3#updating for wire compatibility rules and https://developers.google.com/protocol-buffers/docs/proto3#json for JSON compatibility rules.
}

message DropDatabaseRequest {
Expand All @@ -26,7 +31,7 @@

message DropDatabaseResponse {
common.Status status = 1;
uint64 version = 2;
WaitVersion version = 2;

Check failure on line 34 in proto/ddl_service.proto

View workflow job for this annotation

GitHub Actions / Check breaking changes in Protobuf files

Field "2" with name "version" on message "DropDatabaseResponse" changed type from "uint64" to "message". See https://developers.google.com/protocol-buffers/docs/proto3#updating for wire compatibility rules and https://developers.google.com/protocol-buffers/docs/proto3#json for JSON compatibility rules.
}

message CreateSchemaRequest {
Expand All @@ -35,7 +40,7 @@

message CreateSchemaResponse {
common.Status status = 1;
uint64 version = 2;
WaitVersion version = 2;

Check failure on line 43 in proto/ddl_service.proto

View workflow job for this annotation

GitHub Actions / Check breaking changes in Protobuf files

Field "2" with name "version" on message "CreateSchemaResponse" changed type from "uint64" to "message". See https://developers.google.com/protocol-buffers/docs/proto3#updating for wire compatibility rules and https://developers.google.com/protocol-buffers/docs/proto3#json for JSON compatibility rules.
}

message DropSchemaRequest {
Expand All @@ -44,7 +49,7 @@

message DropSchemaResponse {
common.Status status = 1;
uint64 version = 2;
WaitVersion version = 2;

Check failure on line 52 in proto/ddl_service.proto

View workflow job for this annotation

GitHub Actions / Check breaking changes in Protobuf files

Field "2" with name "version" on message "DropSchemaResponse" changed type from "uint64" to "message". See https://developers.google.com/protocol-buffers/docs/proto3#updating for wire compatibility rules and https://developers.google.com/protocol-buffers/docs/proto3#json for JSON compatibility rules.
}

message CreateSourceRequest {
Expand All @@ -54,7 +59,7 @@

message CreateSourceResponse {
common.Status status = 1;
uint64 version = 2;
WaitVersion version = 2;

Check failure on line 62 in proto/ddl_service.proto

View workflow job for this annotation

GitHub Actions / Check breaking changes in Protobuf files

Field "2" with name "version" on message "CreateSourceResponse" changed type from "uint64" to "message". See https://developers.google.com/protocol-buffers/docs/proto3#updating for wire compatibility rules and https://developers.google.com/protocol-buffers/docs/proto3#json for JSON compatibility rules.
}

message DropSourceRequest {
Expand All @@ -64,7 +69,7 @@

message DropSourceResponse {
common.Status status = 1;
uint64 version = 2;
WaitVersion version = 2;

Check failure on line 72 in proto/ddl_service.proto

View workflow job for this annotation

GitHub Actions / Check breaking changes in Protobuf files

Field "2" with name "version" on message "DropSourceResponse" changed type from "uint64" to "message". See https://developers.google.com/protocol-buffers/docs/proto3#updating for wire compatibility rules and https://developers.google.com/protocol-buffers/docs/proto3#json for JSON compatibility rules.
}

message AlterSourceRequest {
Expand All @@ -73,7 +78,7 @@

message AlterSourceResponse {
common.Status status = 1;
uint64 version = 2;
WaitVersion version = 2;

Check failure on line 81 in proto/ddl_service.proto

View workflow job for this annotation

GitHub Actions / Check breaking changes in Protobuf files

Field "2" with name "version" on message "AlterSourceResponse" changed type from "uint64" to "message". See https://developers.google.com/protocol-buffers/docs/proto3#updating for wire compatibility rules and https://developers.google.com/protocol-buffers/docs/proto3#json for JSON compatibility rules.
}

message CreateSinkRequest {
Expand All @@ -85,7 +90,7 @@

message CreateSinkResponse {
common.Status status = 1;
uint64 version = 2;
WaitVersion version = 2;

Check failure on line 93 in proto/ddl_service.proto

View workflow job for this annotation

GitHub Actions / Check breaking changes in Protobuf files

Field "2" with name "version" on message "CreateSinkResponse" changed type from "uint64" to "message". See https://developers.google.com/protocol-buffers/docs/proto3#updating for wire compatibility rules and https://developers.google.com/protocol-buffers/docs/proto3#json for JSON compatibility rules.
}

message DropSinkRequest {
Expand All @@ -96,7 +101,7 @@

message DropSinkResponse {
common.Status status = 1;
uint64 version = 2;
WaitVersion version = 2;

Check failure on line 104 in proto/ddl_service.proto

View workflow job for this annotation

GitHub Actions / Check breaking changes in Protobuf files

Field "2" with name "version" on message "DropSinkResponse" changed type from "uint64" to "message". See https://developers.google.com/protocol-buffers/docs/proto3#updating for wire compatibility rules and https://developers.google.com/protocol-buffers/docs/proto3#json for JSON compatibility rules.
}

message CreateSubscriptionRequest {
Expand All @@ -105,7 +110,7 @@

message CreateSubscriptionResponse {
common.Status status = 1;
uint64 version = 2;
WaitVersion version = 2;

Check failure on line 113 in proto/ddl_service.proto

View workflow job for this annotation

GitHub Actions / Check breaking changes in Protobuf files

Field "2" with name "version" on message "CreateSubscriptionResponse" changed type from "uint64" to "message". See https://developers.google.com/protocol-buffers/docs/proto3#updating for wire compatibility rules and https://developers.google.com/protocol-buffers/docs/proto3#json for JSON compatibility rules.
}

message DropSubscriptionRequest {
Expand All @@ -115,7 +120,7 @@

message DropSubscriptionResponse {
common.Status status = 1;
uint64 version = 2;
WaitVersion version = 2;
}

message CreateMaterializedViewRequest {
Expand All @@ -135,7 +140,7 @@

message CreateMaterializedViewResponse {
common.Status status = 1;
uint64 version = 2;
WaitVersion version = 2;
}

message DropMaterializedViewRequest {
Expand All @@ -145,7 +150,7 @@

message DropMaterializedViewResponse {
common.Status status = 1;
uint64 version = 2;
WaitVersion version = 2;
}

message CreateViewRequest {
Expand All @@ -154,7 +159,7 @@

message CreateViewResponse {
common.Status status = 1;
uint64 version = 2;
WaitVersion version = 2;
}

message DropViewRequest {
Expand All @@ -164,7 +169,7 @@

message DropViewResponse {
common.Status status = 1;
uint64 version = 2;
WaitVersion version = 2;
}

// An enum to distinguish different types of the `Table` streaming job.
Expand Down Expand Up @@ -197,7 +202,7 @@

message CreateTableResponse {
common.Status status = 1;
uint64 version = 2;
WaitVersion version = 2;
}

message AlterNameRequest {
Expand All @@ -216,7 +221,7 @@

message AlterNameResponse {
common.Status status = 1;
uint64 version = 2;
WaitVersion version = 2;
}

message AlterOwnerRequest {
Expand Down Expand Up @@ -247,7 +252,7 @@

message AlterSetSchemaResponse {
common.Status status = 1;
uint64 version = 2;
WaitVersion version = 2;
}

message AlterParallelismRequest {
Expand All @@ -260,7 +265,7 @@

message AlterOwnerResponse {
common.Status status = 1;
uint64 version = 2;
WaitVersion version = 2;
}

message CreateFunctionRequest {
Expand All @@ -269,7 +274,7 @@

message CreateFunctionResponse {
common.Status status = 1;
uint64 version = 2;
WaitVersion version = 2;
}

message DropFunctionRequest {
Expand All @@ -278,7 +283,7 @@

message DropFunctionResponse {
common.Status status = 1;
uint64 version = 2;
WaitVersion version = 2;
}

message DropTableRequest {
Expand All @@ -291,7 +296,7 @@

message DropTableResponse {
common.Status status = 1;
uint64 version = 2;
WaitVersion version = 2;
}

// Used by risectl (and in the future, dashboard)
Expand All @@ -310,7 +315,7 @@

message CreateIndexResponse {
common.Status status = 1;
uint64 version = 2;
WaitVersion version = 2;
}

message DropIndexRequest {
Expand All @@ -320,7 +325,7 @@

message DropIndexResponse {
common.Status status = 1;
uint64 version = 2;
WaitVersion version = 2;
}

message ReplaceTablePlan {
Expand All @@ -345,7 +350,7 @@
message ReplaceTablePlanResponse {
common.Status status = 1;
// The new global catalog version.
uint64 version = 2;
WaitVersion version = 2;
}

message GetTableRequest {
Expand Down Expand Up @@ -378,15 +383,15 @@
}

message CreateSecretResponse {
uint64 version = 1;
WaitVersion version = 1;
}

message DropSecretRequest {
uint32 secret_id = 1;
}

message DropSecretResponse {
uint64 version = 1;
WaitVersion version = 1;
}

message CreateConnectionRequest {
Expand All @@ -406,7 +411,7 @@

message CreateConnectionResponse {
// global catalog version
uint64 version = 1;
WaitVersion version = 1;
}

message ListConnectionsRequest {}
Expand All @@ -421,7 +426,7 @@

message DropConnectionResponse {
common.Status status = 1;
uint64 version = 2;
WaitVersion version = 2;
}

message GetTablesRequest {
Expand All @@ -442,7 +447,7 @@

message CommentOnResponse {
common.Status status = 1;
uint64 version = 2;
WaitVersion version = 2;
}

message TableSchemaChange {
Expand Down
2 changes: 1 addition & 1 deletion proto/meta.proto
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ message FlushRequest {

message FlushResponse {
common.Status status = 1;
hummock.HummockSnapshot snapshot = 2;
uint64 hummock_version_id = 2;
}

// The reason why the data sources in the cluster are paused.
Expand Down
19 changes: 15 additions & 4 deletions src/frontend/src/catalog/catalog_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,15 @@ use parking_lot::lock_api::ArcRwLockReadGuard;
use parking_lot::{RawRwLock, RwLock};
use risingwave_common::catalog::{CatalogVersion, FunctionId, IndexId};
use risingwave_common::util::column_index_mapping::ColIndexMapping;
use risingwave_hummock_sdk::HummockVersionId;
use risingwave_pb::catalog::{
PbComment, PbCreateType, PbDatabase, PbFunction, PbIndex, PbSchema, PbSink, PbSource,
PbSubscription, PbTable, PbView,
};
use risingwave_pb::ddl_service::alter_owner_request::Object;
use risingwave_pb::ddl_service::{
alter_name_request, alter_set_schema_request, create_connection_request, PbReplaceTablePlan,
PbTableJobType, ReplaceTablePlan, TableJobType,
PbTableJobType, ReplaceTablePlan, TableJobType, WaitVersion,
};
use risingwave_pb::meta::PbTableParallelism;
use risingwave_pb::stream_plan::StreamFragmentGraph;
Expand All @@ -36,6 +37,7 @@ use tokio::sync::watch::Receiver;
use super::root_catalog::Catalog;
use super::{DatabaseId, SecretId, TableId};
use crate::error::Result;
use crate::scheduler::HummockSnapshotManagerRef;
use crate::user::UserId;

pub type CatalogReadGuard = ArcRwLockReadGuard<RawRwLock, Catalog>;
Expand Down Expand Up @@ -216,6 +218,7 @@ pub trait CatalogWriter: Send + Sync {
pub struct CatalogWriterImpl {
meta_client: MetaClient,
catalog_updated_rx: Receiver<CatalogVersion>,
hummock_snapshot_manager: HummockSnapshotManagerRef,
}

#[async_trait::async_trait]
Expand Down Expand Up @@ -578,18 +581,26 @@ impl CatalogWriter for CatalogWriterImpl {
}

impl CatalogWriterImpl {
pub fn new(meta_client: MetaClient, catalog_updated_rx: Receiver<CatalogVersion>) -> Self {
pub fn new(
meta_client: MetaClient,
catalog_updated_rx: Receiver<CatalogVersion>,
hummock_snapshot_manager: HummockSnapshotManagerRef,
) -> Self {
Self {
meta_client,
catalog_updated_rx,
hummock_snapshot_manager,
}
}

async fn wait_version(&self, version: CatalogVersion) -> Result<()> {
async fn wait_version(&self, version: WaitVersion) -> Result<()> {
let mut rx = self.catalog_updated_rx.clone();
while *rx.borrow_and_update() < version {
while *rx.borrow_and_update() < version.catalog_version {
rx.changed().await.map_err(|e| anyhow!(e))?;
}
self.hummock_snapshot_manager
.wait(HummockVersionId::new(version.hummock_version_id))
.await;
Ok(())
}
}
4 changes: 2 additions & 2 deletions src/frontend/src/handler/flush.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,14 @@ pub(super) async fn handle_flush(handler_args: HandlerArgs) -> Result<RwPgRespon

pub(crate) async fn do_flush(session: &SessionImpl) -> Result<()> {
let client = session.env().meta_client();
let snapshot = client.flush(true).await?;
let version_id = client.flush(true).await?;

// Wait for the snapshot to be synchronized, so that future reads in this session can see
// previous writes.
session
.env()
.hummock_snapshot_manager()
.wait(snapshot)
.wait(version_id)
.await;

Ok(())
Expand Down
4 changes: 2 additions & 2 deletions src/frontend/src/meta_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ pub trait FrontendMetaClient: Send + Sync {
async fn try_unregister(&self);
async fn get_snapshot(&self) -> Result<HummockSnapshot>;

async fn flush(&self, checkpoint: bool) -> Result<HummockSnapshot>;
async fn flush(&self, checkpoint: bool) -> Result<HummockVersionId>;

async fn wait(&self) -> Result<()>;

Expand Down Expand Up @@ -143,7 +143,7 @@ impl FrontendMetaClient for FrontendMetaClientImpl {
self.0.get_snapshot().await
}

async fn flush(&self, checkpoint: bool) -> Result<HummockSnapshot> {
async fn flush(&self, checkpoint: bool) -> Result<HummockVersionId> {
self.0.flush(checkpoint).await
}

Expand Down
Loading
Loading