From 2651278ac2a301fa23229b39eb66432515f33c51 Mon Sep 17 00:00:00 2001 From: William Wen <44139337+wenym1@users.noreply.github.com> Date: Mon, 23 Sep 2024 15:26:51 +0800 Subject: [PATCH 01/22] feat(frontend): frontend wait hummock version id for ddl (#18613) --- proto/ddl_service.proto | 65 +++---- proto/meta.proto | 2 +- src/frontend/src/catalog/catalog_service.rs | 19 +- src/frontend/src/handler/flush.rs | 4 +- src/frontend/src/meta_client.rs | 4 +- src/frontend/src/scheduler/snapshot.rs | 8 +- src/frontend/src/session.rs | 8 +- src/frontend/src/test_utils.rs | 5 +- src/meta/service/src/stream_service.rs | 4 +- src/meta/src/barrier/mod.rs | 4 + src/meta/src/barrier/schedule.rs | 8 +- src/meta/src/hummock/manager/versioning.rs | 4 + src/meta/src/rpc/ddl_controller.rs | 17 +- src/rpc_client/src/meta_client.rs | 189 +++++++++++++------- 14 files changed, 218 insertions(+), 123 deletions(-) diff --git a/proto/ddl_service.proto b/proto/ddl_service.proto index f51f0e1b3a42..de860593e810 100644 --- a/proto/ddl_service.proto +++ b/proto/ddl_service.proto @@ -11,13 +11,18 @@ import "stream_plan.proto"; option java_package = "com.risingwave.proto"; option optimize_for = SPEED; +message WaitVersion { + uint64 catalog_version = 1; + uint64 hummock_version_id = 2; +} + message CreateDatabaseRequest { catalog.Database db = 1; } message CreateDatabaseResponse { common.Status status = 1; - uint64 version = 2; + WaitVersion version = 2; } message DropDatabaseRequest { @@ -26,7 +31,7 @@ message DropDatabaseRequest { message DropDatabaseResponse { common.Status status = 1; - uint64 version = 2; + WaitVersion version = 2; } message CreateSchemaRequest { @@ -35,7 +40,7 @@ message CreateSchemaRequest { message CreateSchemaResponse { common.Status status = 1; - uint64 version = 2; + WaitVersion version = 2; } message DropSchemaRequest { @@ -44,7 +49,7 @@ message DropSchemaRequest { message DropSchemaResponse { common.Status status = 1; - uint64 version = 2; + WaitVersion version = 2; } message CreateSourceRequest { @@ -54,7 +59,7 @@ message CreateSourceRequest { message CreateSourceResponse { common.Status status = 1; - uint64 version = 2; + WaitVersion version = 2; } message DropSourceRequest { @@ -64,7 +69,7 @@ message DropSourceRequest { message DropSourceResponse { common.Status status = 1; - uint64 version = 2; + WaitVersion version = 2; } message AlterSourceRequest { @@ -73,7 +78,7 @@ message AlterSourceRequest { message AlterSourceResponse { common.Status status = 1; - uint64 version = 2; + WaitVersion version = 2; } message CreateSinkRequest { @@ -85,7 +90,7 @@ message CreateSinkRequest { message CreateSinkResponse { common.Status status = 1; - uint64 version = 2; + WaitVersion version = 2; } message DropSinkRequest { @@ -96,7 +101,7 @@ message DropSinkRequest { message DropSinkResponse { common.Status status = 1; - uint64 version = 2; + WaitVersion version = 2; } message CreateSubscriptionRequest { @@ -105,7 +110,7 @@ message CreateSubscriptionRequest { message CreateSubscriptionResponse { common.Status status = 1; - uint64 version = 2; + WaitVersion version = 2; } message DropSubscriptionRequest { @@ -115,7 +120,7 @@ message DropSubscriptionRequest { message DropSubscriptionResponse { common.Status status = 1; - uint64 version = 2; + WaitVersion version = 2; } message CreateMaterializedViewRequest { @@ -135,7 +140,7 @@ message CreateMaterializedViewRequest { message CreateMaterializedViewResponse { common.Status status = 1; - uint64 version = 2; + WaitVersion version = 2; } message DropMaterializedViewRequest { @@ -145,7 +150,7 @@ message DropMaterializedViewRequest { message DropMaterializedViewResponse { common.Status status = 1; - uint64 version = 2; + WaitVersion version = 2; } message CreateViewRequest { @@ -154,7 +159,7 @@ message CreateViewRequest { message CreateViewResponse { common.Status status = 1; - uint64 version = 2; + WaitVersion version = 2; } message DropViewRequest { @@ -164,7 +169,7 @@ message DropViewRequest { message DropViewResponse { common.Status status = 1; - uint64 version = 2; + WaitVersion version = 2; } // An enum to distinguish different types of the `Table` streaming job. @@ -197,7 +202,7 @@ message CreateTableRequest { message CreateTableResponse { common.Status status = 1; - uint64 version = 2; + WaitVersion version = 2; } message AlterNameRequest { @@ -216,7 +221,7 @@ message AlterNameRequest { message AlterNameResponse { common.Status status = 1; - uint64 version = 2; + WaitVersion version = 2; } message AlterOwnerRequest { @@ -247,7 +252,7 @@ message AlterSetSchemaRequest { message AlterSetSchemaResponse { common.Status status = 1; - uint64 version = 2; + WaitVersion version = 2; } message AlterParallelismRequest { @@ -260,7 +265,7 @@ message AlterParallelismResponse {} message AlterOwnerResponse { common.Status status = 1; - uint64 version = 2; + WaitVersion version = 2; } message CreateFunctionRequest { @@ -269,7 +274,7 @@ message CreateFunctionRequest { message CreateFunctionResponse { common.Status status = 1; - uint64 version = 2; + WaitVersion version = 2; } message DropFunctionRequest { @@ -278,7 +283,7 @@ message DropFunctionRequest { message DropFunctionResponse { common.Status status = 1; - uint64 version = 2; + WaitVersion version = 2; } message DropTableRequest { @@ -291,7 +296,7 @@ message DropTableRequest { message DropTableResponse { common.Status status = 1; - uint64 version = 2; + WaitVersion version = 2; } // Used by risectl (and in the future, dashboard) @@ -310,7 +315,7 @@ message CreateIndexRequest { message CreateIndexResponse { common.Status status = 1; - uint64 version = 2; + WaitVersion version = 2; } message DropIndexRequest { @@ -320,7 +325,7 @@ message DropIndexRequest { message DropIndexResponse { common.Status status = 1; - uint64 version = 2; + WaitVersion version = 2; } message ReplaceTablePlan { @@ -345,7 +350,7 @@ message ReplaceTablePlanRequest { message ReplaceTablePlanResponse { common.Status status = 1; // The new global catalog version. - uint64 version = 2; + WaitVersion version = 2; } message GetTableRequest { @@ -378,7 +383,7 @@ message CreateSecretRequest { } message CreateSecretResponse { - uint64 version = 1; + WaitVersion version = 1; } message DropSecretRequest { @@ -386,7 +391,7 @@ message DropSecretRequest { } message DropSecretResponse { - uint64 version = 1; + WaitVersion version = 1; } message CreateConnectionRequest { @@ -406,7 +411,7 @@ message CreateConnectionRequest { message CreateConnectionResponse { // global catalog version - uint64 version = 1; + WaitVersion version = 1; } message ListConnectionsRequest {} @@ -421,7 +426,7 @@ message DropConnectionRequest { message DropConnectionResponse { common.Status status = 1; - uint64 version = 2; + WaitVersion version = 2; } message GetTablesRequest { @@ -442,7 +447,7 @@ message CommentOnRequest { message CommentOnResponse { common.Status status = 1; - uint64 version = 2; + WaitVersion version = 2; } message TableSchemaChange { diff --git a/proto/meta.proto b/proto/meta.proto index 4be4f2b18d9c..b245c9322f93 100644 --- a/proto/meta.proto +++ b/proto/meta.proto @@ -145,7 +145,7 @@ message FlushRequest { message FlushResponse { common.Status status = 1; - hummock.HummockSnapshot snapshot = 2; + uint64 hummock_version_id = 2; } // The reason why the data sources in the cluster are paused. diff --git a/src/frontend/src/catalog/catalog_service.rs b/src/frontend/src/catalog/catalog_service.rs index 061aeb737aa3..1164df548d5d 100644 --- a/src/frontend/src/catalog/catalog_service.rs +++ b/src/frontend/src/catalog/catalog_service.rs @@ -19,6 +19,7 @@ use parking_lot::lock_api::ArcRwLockReadGuard; use parking_lot::{RawRwLock, RwLock}; use risingwave_common::catalog::{CatalogVersion, FunctionId, IndexId}; use risingwave_common::util::column_index_mapping::ColIndexMapping; +use risingwave_hummock_sdk::HummockVersionId; use risingwave_pb::catalog::{ PbComment, PbCreateType, PbDatabase, PbFunction, PbIndex, PbSchema, PbSink, PbSource, PbSubscription, PbTable, PbView, @@ -26,7 +27,7 @@ use risingwave_pb::catalog::{ use risingwave_pb::ddl_service::alter_owner_request::Object; use risingwave_pb::ddl_service::{ alter_name_request, alter_set_schema_request, create_connection_request, PbReplaceTablePlan, - PbTableJobType, ReplaceTablePlan, TableJobType, + PbTableJobType, ReplaceTablePlan, TableJobType, WaitVersion, }; use risingwave_pb::meta::PbTableParallelism; use risingwave_pb::stream_plan::StreamFragmentGraph; @@ -36,6 +37,7 @@ use tokio::sync::watch::Receiver; use super::root_catalog::Catalog; use super::{DatabaseId, SecretId, TableId}; use crate::error::Result; +use crate::scheduler::HummockSnapshotManagerRef; use crate::user::UserId; pub type CatalogReadGuard = ArcRwLockReadGuard; @@ -216,6 +218,7 @@ pub trait CatalogWriter: Send + Sync { pub struct CatalogWriterImpl { meta_client: MetaClient, catalog_updated_rx: Receiver, + hummock_snapshot_manager: HummockSnapshotManagerRef, } #[async_trait::async_trait] @@ -578,18 +581,26 @@ impl CatalogWriter for CatalogWriterImpl { } impl CatalogWriterImpl { - pub fn new(meta_client: MetaClient, catalog_updated_rx: Receiver) -> Self { + pub fn new( + meta_client: MetaClient, + catalog_updated_rx: Receiver, + hummock_snapshot_manager: HummockSnapshotManagerRef, + ) -> Self { Self { meta_client, catalog_updated_rx, + hummock_snapshot_manager, } } - async fn wait_version(&self, version: CatalogVersion) -> Result<()> { + async fn wait_version(&self, version: WaitVersion) -> Result<()> { let mut rx = self.catalog_updated_rx.clone(); - while *rx.borrow_and_update() < version { + while *rx.borrow_and_update() < version.catalog_version { rx.changed().await.map_err(|e| anyhow!(e))?; } + self.hummock_snapshot_manager + .wait(HummockVersionId::new(version.hummock_version_id)) + .await; Ok(()) } } diff --git a/src/frontend/src/handler/flush.rs b/src/frontend/src/handler/flush.rs index 784fbb393c91..58e5a305e8ad 100644 --- a/src/frontend/src/handler/flush.rs +++ b/src/frontend/src/handler/flush.rs @@ -26,14 +26,14 @@ pub(super) async fn handle_flush(handler_args: HandlerArgs) -> Result Result<()> { let client = session.env().meta_client(); - let snapshot = client.flush(true).await?; + let version_id = client.flush(true).await?; // Wait for the snapshot to be synchronized, so that future reads in this session can see // previous writes. session .env() .hummock_snapshot_manager() - .wait(snapshot) + .wait(version_id) .await; Ok(()) diff --git a/src/frontend/src/meta_client.rs b/src/frontend/src/meta_client.rs index 97395dcd786b..5b6f0e79819c 100644 --- a/src/frontend/src/meta_client.rs +++ b/src/frontend/src/meta_client.rs @@ -49,7 +49,7 @@ pub trait FrontendMetaClient: Send + Sync { async fn try_unregister(&self); async fn get_snapshot(&self) -> Result; - async fn flush(&self, checkpoint: bool) -> Result; + async fn flush(&self, checkpoint: bool) -> Result; async fn wait(&self) -> Result<()>; @@ -143,7 +143,7 @@ impl FrontendMetaClient for FrontendMetaClientImpl { self.0.get_snapshot().await } - async fn flush(&self, checkpoint: bool) -> Result { + async fn flush(&self, checkpoint: bool) -> Result { self.0.flush(checkpoint).await } diff --git a/src/frontend/src/scheduler/snapshot.rs b/src/frontend/src/scheduler/snapshot.rs index b0d8918cfee4..8f979bf2bfdf 100644 --- a/src/frontend/src/scheduler/snapshot.rs +++ b/src/frontend/src/scheduler/snapshot.rs @@ -18,10 +18,10 @@ use std::sync::Arc; use risingwave_common::util::epoch::Epoch; use risingwave_hummock_sdk::version::HummockVersionStateTableInfo; use risingwave_hummock_sdk::{ - FrontendHummockVersion, FrontendHummockVersionDelta, INVALID_VERSION_ID, + FrontendHummockVersion, FrontendHummockVersionDelta, HummockVersionId, INVALID_VERSION_ID, }; use risingwave_pb::common::{batch_query_epoch, BatchQueryEpoch}; -use risingwave_pb::hummock::{HummockVersionDeltas, PbHummockSnapshot}; +use risingwave_pb::hummock::HummockVersionDeltas; use tokio::sync::watch; use crate::expr::InlineNowProcTime; @@ -192,9 +192,9 @@ impl HummockSnapshotManager { } /// Wait until the latest snapshot is newer than the given one. - pub async fn wait(&self, snapshot: PbHummockSnapshot) { + pub async fn wait(&self, version_id: HummockVersionId) { let mut rx = self.latest_snapshot.subscribe(); - while rx.borrow_and_update().value.max_committed_epoch < snapshot.committed_epoch { + while rx.borrow_and_update().value.id < version_id { rx.changed().await.unwrap(); } } diff --git a/src/frontend/src/session.rs b/src/frontend/src/session.rs index 21c3e9a950b3..ff83c905688a 100644 --- a/src/frontend/src/session.rs +++ b/src/frontend/src/session.rs @@ -298,19 +298,21 @@ impl FrontendEnv { let mut join_handles = vec![heartbeat_join_handle]; let mut shutdown_senders = vec![heartbeat_shutdown_sender]; + let frontend_meta_client = Arc::new(FrontendMetaClientImpl(meta_client.clone())); + let hummock_snapshot_manager = + Arc::new(HummockSnapshotManager::new(frontend_meta_client.clone())); + let (catalog_updated_tx, catalog_updated_rx) = watch::channel(0); let catalog = Arc::new(RwLock::new(Catalog::default())); let catalog_writer = Arc::new(CatalogWriterImpl::new( meta_client.clone(), catalog_updated_rx, + hummock_snapshot_manager.clone(), )); let catalog_reader = CatalogReader::new(catalog.clone()); let worker_node_manager = Arc::new(WorkerNodeManager::new()); - let frontend_meta_client = Arc::new(FrontendMetaClientImpl(meta_client.clone())); - let hummock_snapshot_manager = - Arc::new(HummockSnapshotManager::new(frontend_meta_client.clone())); let compute_client_pool = Arc::new(ComputeClientPool::new( config.batch_exchange_connection_pool_size(), )); diff --git a/src/frontend/src/test_utils.rs b/src/frontend/src/test_utils.rs index 64623cefe926..a089f0043591 100644 --- a/src/frontend/src/test_utils.rs +++ b/src/frontend/src/test_utils.rs @@ -33,6 +33,7 @@ use risingwave_common::system_param::reader::SystemParamsReader; use risingwave_common::util::cluster_limit::ClusterLimit; use risingwave_common::util::column_index_mapping::ColIndexMapping; use risingwave_hummock_sdk::version::{HummockVersion, HummockVersionDelta}; +use risingwave_hummock_sdk::{HummockVersionId, INVALID_VERSION_ID}; use risingwave_pb::backup_service::MetaSnapshotMetadata; use risingwave_pb::catalog::table::OptionalAssociatedSourceId; use risingwave_pb::catalog::{ @@ -933,8 +934,8 @@ impl FrontendMetaClient for MockFrontendMetaClient { Ok(HummockSnapshot { committed_epoch: 0 }) } - async fn flush(&self, _checkpoint: bool) -> RpcResult { - Ok(HummockSnapshot { committed_epoch: 0 }) + async fn flush(&self, _checkpoint: bool) -> RpcResult { + Ok(INVALID_VERSION_ID) } async fn wait(&self) -> RpcResult<()> { diff --git a/src/meta/service/src/stream_service.rs b/src/meta/service/src/stream_service.rs index cfbdda2e9650..6c1591a68aca 100644 --- a/src/meta/service/src/stream_service.rs +++ b/src/meta/service/src/stream_service.rs @@ -69,10 +69,10 @@ impl StreamManagerService for StreamServiceImpl { self.env.idle_manager().record_activity(); let req = request.into_inner(); - let snapshot = self.barrier_scheduler.flush(req.checkpoint).await?; + let version_id = self.barrier_scheduler.flush(req.checkpoint).await?; Ok(Response::new(FlushResponse { status: None, - snapshot: Some(snapshot), + hummock_version_id: version_id.to_u64(), })) } diff --git a/src/meta/src/barrier/mod.rs b/src/meta/src/barrier/mod.rs index 5bb76ee46133..590cfc98d33c 100644 --- a/src/meta/src/barrier/mod.rs +++ b/src/meta/src/barrier/mod.rs @@ -1301,6 +1301,10 @@ impl GlobalBarrierManagerContext { } } } + + pub fn hummock_manager(&self) -> &HummockManagerRef { + &self.hummock_manager + } } impl CreateMviewProgressTracker { diff --git a/src/meta/src/barrier/schedule.rs b/src/meta/src/barrier/schedule.rs index f213999f0a08..3e0e16a783a3 100644 --- a/src/meta/src/barrier/schedule.rs +++ b/src/meta/src/barrier/schedule.rs @@ -21,7 +21,7 @@ use anyhow::{anyhow, Context}; use assert_matches::assert_matches; use parking_lot::Mutex; use risingwave_common::catalog::TableId; -use risingwave_pb::hummock::HummockSnapshot; +use risingwave_hummock_sdk::HummockVersionId; use risingwave_pb::meta::PausedReason; use tokio::select; use tokio::sync::{oneshot, watch}; @@ -325,7 +325,7 @@ impl BarrierScheduler { } /// Flush means waiting for the next barrier to collect. - pub async fn flush(&self, checkpoint: bool) -> MetaResult { + pub async fn flush(&self, checkpoint: bool) -> MetaResult { let start = Instant::now(); tracing::debug!("start barrier flush"); @@ -334,8 +334,8 @@ impl BarrierScheduler { let elapsed = Instant::now().duration_since(start); tracing::debug!("barrier flushed in {:?}", elapsed); - let snapshot = self.hummock_manager.latest_snapshot(); - Ok(snapshot) + let version_id = self.hummock_manager.get_version_id().await; + Ok(version_id) } } diff --git a/src/meta/src/hummock/manager/versioning.rs b/src/meta/src/hummock/manager/versioning.rs index fc9fe3b28f65..3a9ae5b098ae 100644 --- a/src/meta/src/hummock/manager/versioning.rs +++ b/src/meta/src/hummock/manager/versioning.rs @@ -148,6 +148,10 @@ impl HummockManager { f(&self.versioning.read().await.current_version) } + pub async fn get_version_id(&self) -> HummockVersionId { + self.on_current_version(|version| version.id).await + } + /// Gets the mapping from table id to compaction group id pub async fn get_table_compaction_group_id_mapping( &self, diff --git a/src/meta/src/rpc/ddl_controller.rs b/src/meta/src/rpc/ddl_controller.rs index 43c8b52a571c..7c1499552193 100644 --- a/src/meta/src/rpc/ddl_controller.rs +++ b/src/meta/src/rpc/ddl_controller.rs @@ -51,7 +51,7 @@ use risingwave_pb::catalog::{ }; use risingwave_pb::ddl_service::alter_owner_request::Object; use risingwave_pb::ddl_service::{ - alter_name_request, alter_set_schema_request, DdlProgress, TableJobType, + alter_name_request, alter_set_schema_request, DdlProgress, TableJobType, WaitVersion, }; use risingwave_pb::meta::table_fragments::fragment::FragmentDistributionType; use risingwave_pb::meta::table_fragments::PbFragment; @@ -278,7 +278,9 @@ impl DdlController { /// has been interrupted during executing, the request will be cancelled by tonic. Since we have /// a lot of logic for revert, status management, notification and so on, ensuring consistency /// would be a huge hassle and pain if we don't spawn here. - pub async fn run_command(&self, command: DdlCommand) -> MetaResult { + /// + /// Though returning `Option`, it's always `Some`, to simplify the handling logic + pub async fn run_command(&self, command: DdlCommand) -> MetaResult> { if !command.allow_in_recovery() { self.barrier_manager.check_status_running()?; } @@ -353,7 +355,16 @@ impl DdlController { } } .in_current_span(); - tokio::spawn(fut).await.unwrap() + let notification_version = tokio::spawn(fut).await.map_err(|e| anyhow!(e))??; + Ok(Some(WaitVersion { + catalog_version: notification_version, + hummock_version_id: self + .barrier_manager + .hummock_manager() + .get_version_id() + .await + .to_u64(), + })) } pub async fn get_ddl_progress(&self) -> MetaResult> { diff --git a/src/rpc_client/src/meta_client.rs b/src/rpc_client/src/meta_client.rs index f736e23cb9e0..2648843abe19 100644 --- a/src/rpc_client/src/meta_client.rs +++ b/src/rpc_client/src/meta_client.rs @@ -26,7 +26,7 @@ use cluster_limit_service_client::ClusterLimitServiceClient; use either::Either; use futures::stream::BoxStream; use lru::LruCache; -use risingwave_common::catalog::{CatalogVersion, FunctionId, IndexId, SecretId, TableId}; +use risingwave_common::catalog::{FunctionId, IndexId, SecretId, TableId}; use risingwave_common::config::{MetaConfig, MAX_CONNECTION_WINDOW_SIZE}; use risingwave_common::hash::WorkerSlotMapping; use risingwave_common::system_param::reader::SystemParamsReader; @@ -171,7 +171,7 @@ impl MetaClient { schema_id: u32, owner_id: u32, req: create_connection_request::Payload, - ) -> Result { + ) -> Result { let request = CreateConnectionRequest { name: connection_name, database_id, @@ -180,7 +180,9 @@ impl MetaClient { payload: Some(req), }; let resp = self.inner.create_connection(request).await?; - Ok(resp.version) + Ok(resp + .version + .ok_or_else(|| anyhow!("wait version not set"))?) } pub async fn create_secret( @@ -190,7 +192,7 @@ impl MetaClient { schema_id: u32, owner_id: u32, value: Vec, - ) -> Result { + ) -> Result { let request = CreateSecretRequest { name: secret_name, database_id, @@ -199,7 +201,9 @@ impl MetaClient { value, }; let resp = self.inner.create_secret(request).await?; - Ok(resp.version) + Ok(resp + .version + .ok_or_else(|| anyhow!("wait version not set"))?) } pub async fn list_connections(&self, _name: Option<&str>) -> Result> { @@ -208,18 +212,22 @@ impl MetaClient { Ok(resp.connections) } - pub async fn drop_connection(&self, connection_id: ConnectionId) -> Result { + pub async fn drop_connection(&self, connection_id: ConnectionId) -> Result { let request = DropConnectionRequest { connection_id }; let resp = self.inner.drop_connection(request).await?; - Ok(resp.version) + Ok(resp + .version + .ok_or_else(|| anyhow!("wait version not set"))?) } - pub async fn drop_secret(&self, secret_id: SecretId) -> Result { + pub async fn drop_secret(&self, secret_id: SecretId) -> Result { let request = DropSecretRequest { secret_id: secret_id.into(), }; let resp = self.inner.drop_secret(request).await?; - Ok(resp.version) + Ok(resp + .version + .ok_or_else(|| anyhow!("wait version not set"))?) } /// Register the current node to the cluster and set the corresponding worker id. @@ -365,27 +373,31 @@ impl MetaClient { Ok(()) } - pub async fn create_database(&self, db: PbDatabase) -> Result { + pub async fn create_database(&self, db: PbDatabase) -> Result { let request = CreateDatabaseRequest { db: Some(db) }; let resp = self.inner.create_database(request).await?; // TODO: handle error in `resp.status` here - Ok(resp.version) + Ok(resp + .version + .ok_or_else(|| anyhow!("wait version not set"))?) } - pub async fn create_schema(&self, schema: PbSchema) -> Result { + pub async fn create_schema(&self, schema: PbSchema) -> Result { let request = CreateSchemaRequest { schema: Some(schema), }; let resp = self.inner.create_schema(request).await?; // TODO: handle error in `resp.status` here - Ok(resp.version) + Ok(resp + .version + .ok_or_else(|| anyhow!("wait version not set"))?) } pub async fn create_materialized_view( &self, table: PbTable, graph: StreamFragmentGraph, - ) -> Result { + ) -> Result { let request = CreateMaterializedViewRequest { materialized_view: Some(table), fragment_graph: Some(graph), @@ -393,35 +405,41 @@ impl MetaClient { }; let resp = self.inner.create_materialized_view(request).await?; // TODO: handle error in `resp.status` here - Ok(resp.version) + Ok(resp + .version + .ok_or_else(|| anyhow!("wait version not set"))?) } pub async fn drop_materialized_view( &self, table_id: TableId, cascade: bool, - ) -> Result { + ) -> Result { let request = DropMaterializedViewRequest { table_id: table_id.table_id(), cascade, }; let resp = self.inner.drop_materialized_view(request).await?; - Ok(resp.version) + Ok(resp + .version + .ok_or_else(|| anyhow!("wait version not set"))?) } pub async fn create_source( &self, source: PbSource, graph: Option, - ) -> Result { + ) -> Result { let request = CreateSourceRequest { source: Some(source), fragment_graph: graph, }; let resp = self.inner.create_source(request).await?; - Ok(resp.version) + Ok(resp + .version + .ok_or_else(|| anyhow!("wait version not set"))?) } pub async fn create_sink( @@ -429,7 +447,7 @@ impl MetaClient { sink: PbSink, graph: StreamFragmentGraph, affected_table_change: Option, - ) -> Result { + ) -> Result { let request = CreateSinkRequest { sink: Some(sink), fragment_graph: Some(graph), @@ -437,27 +455,30 @@ impl MetaClient { }; let resp = self.inner.create_sink(request).await?; - Ok(resp.version) + Ok(resp + .version + .ok_or_else(|| anyhow!("wait version not set"))?) } - pub async fn create_subscription( - &self, - subscription: PbSubscription, - ) -> Result { + pub async fn create_subscription(&self, subscription: PbSubscription) -> Result { let request = CreateSubscriptionRequest { subscription: Some(subscription), }; let resp = self.inner.create_subscription(request).await?; - Ok(resp.version) + Ok(resp + .version + .ok_or_else(|| anyhow!("wait version not set"))?) } - pub async fn create_function(&self, function: PbFunction) -> Result { + pub async fn create_function(&self, function: PbFunction) -> Result { let request = CreateFunctionRequest { function: Some(function), }; let resp = self.inner.create_function(request).await?; - Ok(resp.version) + Ok(resp + .version + .ok_or_else(|| anyhow!("wait version not set"))?) } pub async fn create_table( @@ -466,7 +487,7 @@ impl MetaClient { table: PbTable, graph: StreamFragmentGraph, job_type: PbTableJobType, - ) -> Result { + ) -> Result { let request = CreateTableRequest { materialized_view: Some(table), fragment_graph: Some(graph), @@ -475,58 +496,70 @@ impl MetaClient { }; let resp = self.inner.create_table(request).await?; // TODO: handle error in `resp.status` here - Ok(resp.version) + Ok(resp + .version + .ok_or_else(|| anyhow!("wait version not set"))?) } - pub async fn comment_on(&self, comment: PbComment) -> Result { + pub async fn comment_on(&self, comment: PbComment) -> Result { let request = CommentOnRequest { comment: Some(comment), }; let resp = self.inner.comment_on(request).await?; - Ok(resp.version) + Ok(resp + .version + .ok_or_else(|| anyhow!("wait version not set"))?) } pub async fn alter_name( &self, object: alter_name_request::Object, name: &str, - ) -> Result { + ) -> Result { let request = AlterNameRequest { object: Some(object), new_name: name.to_string(), }; let resp = self.inner.alter_name(request).await?; - Ok(resp.version) + Ok(resp + .version + .ok_or_else(|| anyhow!("wait version not set"))?) } - pub async fn alter_owner(&self, object: Object, owner_id: u32) -> Result { + pub async fn alter_owner(&self, object: Object, owner_id: u32) -> Result { let request = AlterOwnerRequest { object: Some(object), owner_id, }; let resp = self.inner.alter_owner(request).await?; - Ok(resp.version) + Ok(resp + .version + .ok_or_else(|| anyhow!("wait version not set"))?) } pub async fn alter_set_schema( &self, object: alter_set_schema_request::Object, new_schema_id: u32, - ) -> Result { + ) -> Result { let request = AlterSetSchemaRequest { new_schema_id, object: Some(object), }; let resp = self.inner.alter_set_schema(request).await?; - Ok(resp.version) + Ok(resp + .version + .ok_or_else(|| anyhow!("wait version not set"))?) } - pub async fn alter_source(&self, source: PbSource) -> Result { + pub async fn alter_source(&self, source: PbSource) -> Result { let request = AlterSourceRequest { source: Some(source), }; let resp = self.inner.alter_source(request).await?; - Ok(resp.version) + Ok(resp + .version + .ok_or_else(|| anyhow!("wait version not set"))?) } pub async fn alter_parallelism( @@ -552,7 +585,7 @@ impl MetaClient { graph: StreamFragmentGraph, table_col_index_mapping: ColIndexMapping, job_type: PbTableJobType, - ) -> Result { + ) -> Result { let request = ReplaceTablePlanRequest { plan: Some(ReplaceTablePlan { source, @@ -564,7 +597,9 @@ impl MetaClient { }; let resp = self.inner.replace_table_plan(request).await?; // TODO: handle error in `resp.status` here - Ok(resp.version) + Ok(resp + .version + .ok_or_else(|| anyhow!("wait version not set"))?) } pub async fn auto_schema_change(&self, schema_change: SchemaChangeEnvelope) -> Result<()> { @@ -575,11 +610,13 @@ impl MetaClient { Ok(()) } - pub async fn create_view(&self, view: PbView) -> Result { + pub async fn create_view(&self, view: PbView) -> Result { let request = CreateViewRequest { view: Some(view) }; let resp = self.inner.create_view(request).await?; // TODO: handle error in `resp.status` here - Ok(resp.version) + Ok(resp + .version + .ok_or_else(|| anyhow!("wait version not set"))?) } pub async fn create_index( @@ -587,7 +624,7 @@ impl MetaClient { index: PbIndex, table: PbTable, graph: StreamFragmentGraph, - ) -> Result { + ) -> Result { let request = CreateIndexRequest { index: Some(index), index_table: Some(table), @@ -595,7 +632,9 @@ impl MetaClient { }; let resp = self.inner.create_index(request).await?; // TODO: handle error in `resp.status` here - Ok(resp.version) + Ok(resp + .version + .ok_or_else(|| anyhow!("wait version not set"))?) } pub async fn drop_table( @@ -603,7 +642,7 @@ impl MetaClient { source_id: Option, table_id: TableId, cascade: bool, - ) -> Result { + ) -> Result { let request = DropTableRequest { source_id: source_id.map(SourceId::Id), table_id: table_id.table_id(), @@ -611,19 +650,25 @@ impl MetaClient { }; let resp = self.inner.drop_table(request).await?; - Ok(resp.version) + Ok(resp + .version + .ok_or_else(|| anyhow!("wait version not set"))?) } - pub async fn drop_view(&self, view_id: u32, cascade: bool) -> Result { + pub async fn drop_view(&self, view_id: u32, cascade: bool) -> Result { let request = DropViewRequest { view_id, cascade }; let resp = self.inner.drop_view(request).await?; - Ok(resp.version) + Ok(resp + .version + .ok_or_else(|| anyhow!("wait version not set"))?) } - pub async fn drop_source(&self, source_id: u32, cascade: bool) -> Result { + pub async fn drop_source(&self, source_id: u32, cascade: bool) -> Result { let request = DropSourceRequest { source_id, cascade }; let resp = self.inner.drop_source(request).await?; - Ok(resp.version) + Ok(resp + .version + .ok_or_else(|| anyhow!("wait version not set"))?) } pub async fn drop_sink( @@ -631,27 +676,31 @@ impl MetaClient { sink_id: u32, cascade: bool, affected_table_change: Option, - ) -> Result { + ) -> Result { let request = DropSinkRequest { sink_id, cascade, affected_table_change, }; let resp = self.inner.drop_sink(request).await?; - Ok(resp.version) + Ok(resp + .version + .ok_or_else(|| anyhow!("wait version not set"))?) } pub async fn drop_subscription( &self, subscription_id: u32, cascade: bool, - ) -> Result { + ) -> Result { let request = DropSubscriptionRequest { subscription_id, cascade, }; let resp = self.inner.drop_subscription(request).await?; - Ok(resp.version) + Ok(resp + .version + .ok_or_else(|| anyhow!("wait version not set"))?) } pub async fn list_change_log_epochs( @@ -669,33 +718,41 @@ impl MetaClient { Ok(resp.epochs) } - pub async fn drop_index(&self, index_id: IndexId, cascade: bool) -> Result { + pub async fn drop_index(&self, index_id: IndexId, cascade: bool) -> Result { let request = DropIndexRequest { index_id: index_id.index_id, cascade, }; let resp = self.inner.drop_index(request).await?; - Ok(resp.version) + Ok(resp + .version + .ok_or_else(|| anyhow!("wait version not set"))?) } - pub async fn drop_function(&self, function_id: FunctionId) -> Result { + pub async fn drop_function(&self, function_id: FunctionId) -> Result { let request = DropFunctionRequest { function_id: function_id.0, }; let resp = self.inner.drop_function(request).await?; - Ok(resp.version) + Ok(resp + .version + .ok_or_else(|| anyhow!("wait version not set"))?) } - pub async fn drop_database(&self, database_id: DatabaseId) -> Result { + pub async fn drop_database(&self, database_id: DatabaseId) -> Result { let request = DropDatabaseRequest { database_id }; let resp = self.inner.drop_database(request).await?; - Ok(resp.version) + Ok(resp + .version + .ok_or_else(|| anyhow!("wait version not set"))?) } - pub async fn drop_schema(&self, schema_id: SchemaId) -> Result { + pub async fn drop_schema(&self, schema_id: SchemaId) -> Result { let request = DropSchemaRequest { schema_id }; let resp = self.inner.drop_schema(request).await?; - Ok(resp.version) + Ok(resp + .version + .ok_or_else(|| anyhow!("wait version not set"))?) } // TODO: using UserInfoVersion instead as return type. @@ -879,10 +936,10 @@ impl MetaClient { Ok(resp.tables) } - pub async fn flush(&self, checkpoint: bool) -> Result { + pub async fn flush(&self, checkpoint: bool) -> Result { let request = FlushRequest { checkpoint }; let resp = self.inner.flush(request).await?; - Ok(resp.snapshot.unwrap()) + Ok(HummockVersionId::new(resp.hummock_version_id)) } pub async fn wait(&self) -> Result<()> { From 039af9dd98477abd1e24cf52caeeff56bcd90e9b Mon Sep 17 00:00:00 2001 From: William Wen <44139337+wenym1@users.noreply.github.com> Date: Mon, 23 Sep 2024 16:20:25 +0800 Subject: [PATCH 02/22] refactor(meta): remove BarrierInfo and reduce pause and resume log output (#18630) --- proto/meta.proto | 10 ++-------- src/ctl/src/cmd_impl/meta/pause_resume.rs | 18 +++++------------ src/meta/service/src/stream_service.rs | 16 ++++----------- src/meta/src/barrier/mod.rs | 10 +--------- src/meta/src/barrier/notifier.rs | 18 +++-------------- src/meta/src/barrier/schedule.rs | 24 +++++++---------------- 6 files changed, 22 insertions(+), 74 deletions(-) diff --git a/proto/meta.proto b/proto/meta.proto index b245c9322f93..93195e985d96 100644 --- a/proto/meta.proto +++ b/proto/meta.proto @@ -160,17 +160,11 @@ enum PausedReason { message PauseRequest {} -message PauseResponse { - optional PausedReason prev = 1; - optional PausedReason curr = 2; -} +message PauseResponse {} message ResumeRequest {} -message ResumeResponse { - optional PausedReason prev = 1; - optional PausedReason curr = 2; -} +message ResumeResponse {} message CancelCreatingJobsRequest { message CreatingJobInfo { diff --git a/src/ctl/src/cmd_impl/meta/pause_resume.rs b/src/ctl/src/cmd_impl/meta/pause_resume.rs index d274819f1db6..532bacc4241e 100644 --- a/src/ctl/src/cmd_impl/meta/pause_resume.rs +++ b/src/ctl/src/cmd_impl/meta/pause_resume.rs @@ -16,7 +16,7 @@ use risingwave_pb::meta::PausedReason; use crate::CtlContext; -fn desc(reason: PausedReason) -> &'static str { +pub fn desc(reason: PausedReason) -> &'static str { // Method on optional enums derived from `prost` will use `Unspecified` if unset. So we treat // `Unspecified` as not paused here. match reason { @@ -29,13 +29,9 @@ fn desc(reason: PausedReason) -> &'static str { pub async fn pause(context: &CtlContext) -> anyhow::Result<()> { let meta_client = context.meta_client().await?; - let response = meta_client.pause().await?; + meta_client.pause().await?; - println!( - "Done.\nPrevious: {}\nCurrent: {}", - desc(response.prev()), - desc(response.curr()) - ); + println!("Done."); Ok(()) } @@ -43,13 +39,9 @@ pub async fn pause(context: &CtlContext) -> anyhow::Result<()> { pub async fn resume(context: &CtlContext) -> anyhow::Result<()> { let meta_client = context.meta_client().await?; - let response = meta_client.resume().await?; + meta_client.resume().await?; - println!( - "Done.\nPrevious: {}\nCurrent: {}", - desc(response.prev()), - desc(response.curr()) - ); + println!("Done."); Ok(()) } diff --git a/src/meta/service/src/stream_service.rs b/src/meta/service/src/stream_service.rs index 6c1591a68aca..7cc43ae1c78b 100644 --- a/src/meta/service/src/stream_service.rs +++ b/src/meta/service/src/stream_service.rs @@ -78,26 +78,18 @@ impl StreamManagerService for StreamServiceImpl { #[cfg_attr(coverage, coverage(off))] async fn pause(&self, _: Request) -> Result, Status> { - let i = self - .barrier_scheduler + self.barrier_scheduler .run_command(Command::pause(PausedReason::Manual)) .await?; - Ok(Response::new(PauseResponse { - prev: i.prev_paused_reason.map(Into::into), - curr: i.curr_paused_reason.map(Into::into), - })) + Ok(Response::new(PauseResponse {})) } #[cfg_attr(coverage, coverage(off))] async fn resume(&self, _: Request) -> Result, Status> { - let i = self - .barrier_scheduler + self.barrier_scheduler .run_command(Command::resume(PausedReason::Manual)) .await?; - Ok(Response::new(ResumeResponse { - prev: i.prev_paused_reason.map(Into::into), - curr: i.curr_paused_reason.map(Into::into), - })) + Ok(Response::new(ResumeResponse {})) } #[cfg_attr(coverage, coverage(off))] diff --git a/src/meta/src/barrier/mod.rs b/src/meta/src/barrier/mod.rs index 590cfc98d33c..f47e400dce81 100644 --- a/src/meta/src/barrier/mod.rs +++ b/src/meta/src/barrier/mod.rs @@ -54,7 +54,6 @@ use self::command::CommandContext; use self::notifier::Notifier; use crate::barrier::creating_job::CreatingStreamingJobControl; use crate::barrier::info::InflightGraphInfo; -use crate::barrier::notifier::BarrierInfo; use crate::barrier::progress::{CreateMviewProgressTracker, TrackingCommand, TrackingJob}; use crate::barrier::rpc::{merge_node_rpc_errors, ControlStreamManager}; use crate::barrier::state::BarrierManagerState; @@ -1075,16 +1074,9 @@ impl GlobalBarrierManager { }; // Notify about the injection. - let prev_paused_reason = self.state.paused_reason(); let curr_paused_reason = command_ctx.next_paused_reason(); - let info = BarrierInfo { - prev_epoch: prev_epoch.value(), - curr_epoch: curr_epoch.value(), - prev_paused_reason, - curr_paused_reason, - }; - notifiers.iter_mut().for_each(|n| n.notify_started(info)); + notifiers.iter_mut().for_each(|n| n.notify_started()); // Update the paused state after the barrier is injected. self.state.set_paused_reason(curr_paused_reason); diff --git a/src/meta/src/barrier/notifier.rs b/src/meta/src/barrier/notifier.rs index da201927664a..d86f43fdd0a1 100644 --- a/src/meta/src/barrier/notifier.rs +++ b/src/meta/src/barrier/notifier.rs @@ -12,27 +12,15 @@ // See the License for the specific language governing permissions and // limitations under the License. -use risingwave_common::util::epoch::Epoch; -use risingwave_pb::meta::PausedReason; use tokio::sync::oneshot; use crate::{MetaError, MetaResult}; -/// The barrier info sent back to the caller when a barrier is injected. -#[derive(Debug, Clone, Copy)] -pub struct BarrierInfo { - pub prev_epoch: Epoch, - pub curr_epoch: Epoch, - - pub prev_paused_reason: Option, - pub curr_paused_reason: Option, -} - /// Used for notifying the status of a scheduled command/barrier. #[derive(Debug, Default)] pub(crate) struct Notifier { /// Get notified when scheduled barrier has started to be handled. - pub started: Option>>, + pub started: Option>>, /// Get notified when scheduled barrier is collected or failed. pub collected: Option>>, @@ -40,9 +28,9 @@ pub(crate) struct Notifier { impl Notifier { /// Notify when we have injected a barrier to compute nodes. - pub fn notify_started(&mut self, info: BarrierInfo) { + pub fn notify_started(&mut self) { if let Some(tx) = self.started.take() { - tx.send(Ok(info)).ok(); + tx.send(Ok(())).ok(); } } diff --git a/src/meta/src/barrier/schedule.rs b/src/meta/src/barrier/schedule.rs index 3e0e16a783a3..c39fdb56e4fb 100644 --- a/src/meta/src/barrier/schedule.rs +++ b/src/meta/src/barrier/schedule.rs @@ -27,7 +27,7 @@ use tokio::select; use tokio::sync::{oneshot, watch}; use tokio::time::Interval; -use super::notifier::{BarrierInfo, Notifier}; +use super::notifier::Notifier; use super::{Command, Scheduled}; use crate::hummock::HummockManagerRef; use crate::model::ActorId; @@ -251,7 +251,7 @@ impl BarrierScheduler { /// Returns the barrier info of each command. /// /// TODO: atomicity of multiple commands is not guaranteed. - async fn run_multiple_commands(&self, commands: Vec) -> MetaResult> { + async fn run_multiple_commands(&self, commands: Vec) -> MetaResult<()> { let mut contexts = Vec::with_capacity(commands.len()); let mut scheduleds = Vec::with_capacity(commands.len()); @@ -272,16 +272,13 @@ impl BarrierScheduler { self.push(scheduleds)?; - let mut infos = Vec::with_capacity(contexts.len()); - for (injected_rx, collect_rx) in contexts { // Wait for this command to be injected, and record the result. tracing::trace!("waiting for injected_rx"); - let info = injected_rx + injected_rx .await .ok() .context("failed to inject barrier")??; - infos.push(info); tracing::trace!("waiting for collect_rx"); // Throw the error if it occurs when collecting this barrier. @@ -291,35 +288,28 @@ impl BarrierScheduler { .context("failed to collect barrier")??; } - Ok(infos) + Ok(()) } /// Run a command with a `Pause` command before and `Resume` command after it. Used for /// configuration change. /// /// Returns the barrier info of the actual command. - pub async fn run_config_change_command_with_pause( - &self, - command: Command, - ) -> MetaResult { + pub async fn run_config_change_command_with_pause(&self, command: Command) -> MetaResult<()> { self.run_multiple_commands(vec![ Command::pause(PausedReason::ConfigChange), command, Command::resume(PausedReason::ConfigChange), ]) .await - .map(|i| i[1]) } /// Run a command and return when it's completely finished. /// /// Returns the barrier info of the actual command. - pub async fn run_command(&self, command: Command) -> MetaResult { + pub async fn run_command(&self, command: Command) -> MetaResult<()> { tracing::trace!("run_command: {:?}", command); - let ret = self - .run_multiple_commands(vec![command]) - .await - .map(|i| i[0]); + let ret = self.run_multiple_commands(vec![command]).await; tracing::trace!("run_command finished"); ret } From 8fa0468689002dc919c2560e1a264f754d5dc4d9 Mon Sep 17 00:00:00 2001 From: Noel Kwan <47273164+kwannoel@users.noreply.github.com> Date: Mon, 23 Sep 2024 16:40:48 +0800 Subject: [PATCH 03/22] chore(dashboard): rename relation dependency graph to relation graph (#18612) --- dashboard/components/Layout.tsx | 2 +- .../{RelationDependencyGraph.tsx => RelationGraph.tsx} | 2 +- dashboard/components/Relations.tsx | 2 +- .../pages/{dependency_graph.tsx => relation_graph.tsx} | 6 ++---- 4 files changed, 5 insertions(+), 7 deletions(-) rename dashboard/components/{RelationDependencyGraph.tsx => RelationGraph.tsx} (99%) rename dashboard/pages/{dependency_graph.tsx => relation_graph.tsx} (98%) diff --git a/dashboard/components/Layout.tsx b/dashboard/components/Layout.tsx index f16270adb2f4..2a1eaf488d12 100644 --- a/dashboard/components/Layout.tsx +++ b/dashboard/components/Layout.tsx @@ -144,7 +144,7 @@ function Layout({ children }: { children: React.ReactNode }) {
Streaming - Relation Graph + Relation Graph Fragment Graph
diff --git a/dashboard/components/RelationDependencyGraph.tsx b/dashboard/components/RelationGraph.tsx similarity index 99% rename from dashboard/components/RelationDependencyGraph.tsx rename to dashboard/components/RelationGraph.tsx index 1ab2fc7839a8..a61a6924a370 100644 --- a/dashboard/components/RelationDependencyGraph.tsx +++ b/dashboard/components/RelationGraph.tsx @@ -57,7 +57,7 @@ const rowMargin = 50 export const nodeRadius = 12 const layoutMargin = 50 -export default function RelationDependencyGraph({ +export default function RelationGraph({ nodes, selectedId, setSelectedId, diff --git a/dashboard/components/Relations.tsx b/dashboard/components/Relations.tsx index 49161feea908..4636b3764947 100644 --- a/dashboard/components/Relations.tsx +++ b/dashboard/components/Relations.tsx @@ -60,7 +60,7 @@ export const dependentsColumn: Column = { name: "Depends", width: 1, content: (r) => ( - +