diff --git a/proto/ddl_service.proto b/proto/ddl_service.proto index f51f0e1b3a428..de860593e8105 100644 --- a/proto/ddl_service.proto +++ b/proto/ddl_service.proto @@ -11,13 +11,18 @@ import "stream_plan.proto"; option java_package = "com.risingwave.proto"; option optimize_for = SPEED; +message WaitVersion { + uint64 catalog_version = 1; + uint64 hummock_version_id = 2; +} + message CreateDatabaseRequest { catalog.Database db = 1; } message CreateDatabaseResponse { common.Status status = 1; - uint64 version = 2; + WaitVersion version = 2; } message DropDatabaseRequest { @@ -26,7 +31,7 @@ message DropDatabaseRequest { message DropDatabaseResponse { common.Status status = 1; - uint64 version = 2; + WaitVersion version = 2; } message CreateSchemaRequest { @@ -35,7 +40,7 @@ message CreateSchemaRequest { message CreateSchemaResponse { common.Status status = 1; - uint64 version = 2; + WaitVersion version = 2; } message DropSchemaRequest { @@ -44,7 +49,7 @@ message DropSchemaRequest { message DropSchemaResponse { common.Status status = 1; - uint64 version = 2; + WaitVersion version = 2; } message CreateSourceRequest { @@ -54,7 +59,7 @@ message CreateSourceRequest { message CreateSourceResponse { common.Status status = 1; - uint64 version = 2; + WaitVersion version = 2; } message DropSourceRequest { @@ -64,7 +69,7 @@ message DropSourceRequest { message DropSourceResponse { common.Status status = 1; - uint64 version = 2; + WaitVersion version = 2; } message AlterSourceRequest { @@ -73,7 +78,7 @@ message AlterSourceRequest { message AlterSourceResponse { common.Status status = 1; - uint64 version = 2; + WaitVersion version = 2; } message CreateSinkRequest { @@ -85,7 +90,7 @@ message CreateSinkRequest { message CreateSinkResponse { common.Status status = 1; - uint64 version = 2; + WaitVersion version = 2; } message DropSinkRequest { @@ -96,7 +101,7 @@ message DropSinkRequest { message DropSinkResponse { common.Status status = 1; - uint64 version = 2; + WaitVersion version = 2; } message CreateSubscriptionRequest { @@ -105,7 +110,7 @@ message CreateSubscriptionRequest { message CreateSubscriptionResponse { common.Status status = 1; - uint64 version = 2; + WaitVersion version = 2; } message DropSubscriptionRequest { @@ -115,7 +120,7 @@ message DropSubscriptionRequest { message DropSubscriptionResponse { common.Status status = 1; - uint64 version = 2; + WaitVersion version = 2; } message CreateMaterializedViewRequest { @@ -135,7 +140,7 @@ message CreateMaterializedViewRequest { message CreateMaterializedViewResponse { common.Status status = 1; - uint64 version = 2; + WaitVersion version = 2; } message DropMaterializedViewRequest { @@ -145,7 +150,7 @@ message DropMaterializedViewRequest { message DropMaterializedViewResponse { common.Status status = 1; - uint64 version = 2; + WaitVersion version = 2; } message CreateViewRequest { @@ -154,7 +159,7 @@ message CreateViewRequest { message CreateViewResponse { common.Status status = 1; - uint64 version = 2; + WaitVersion version = 2; } message DropViewRequest { @@ -164,7 +169,7 @@ message DropViewRequest { message DropViewResponse { common.Status status = 1; - uint64 version = 2; + WaitVersion version = 2; } // An enum to distinguish different types of the `Table` streaming job. @@ -197,7 +202,7 @@ message CreateTableRequest { message CreateTableResponse { common.Status status = 1; - uint64 version = 2; + WaitVersion version = 2; } message AlterNameRequest { @@ -216,7 +221,7 @@ message AlterNameRequest { message AlterNameResponse { common.Status status = 1; - uint64 version = 2; + WaitVersion version = 2; } message AlterOwnerRequest { @@ -247,7 +252,7 @@ message AlterSetSchemaRequest { message AlterSetSchemaResponse { common.Status status = 1; - uint64 version = 2; + WaitVersion version = 2; } message AlterParallelismRequest { @@ -260,7 +265,7 @@ message AlterParallelismResponse {} message AlterOwnerResponse { common.Status status = 1; - uint64 version = 2; + WaitVersion version = 2; } message CreateFunctionRequest { @@ -269,7 +274,7 @@ message CreateFunctionRequest { message CreateFunctionResponse { common.Status status = 1; - uint64 version = 2; + WaitVersion version = 2; } message DropFunctionRequest { @@ -278,7 +283,7 @@ message DropFunctionRequest { message DropFunctionResponse { common.Status status = 1; - uint64 version = 2; + WaitVersion version = 2; } message DropTableRequest { @@ -291,7 +296,7 @@ message DropTableRequest { message DropTableResponse { common.Status status = 1; - uint64 version = 2; + WaitVersion version = 2; } // Used by risectl (and in the future, dashboard) @@ -310,7 +315,7 @@ message CreateIndexRequest { message CreateIndexResponse { common.Status status = 1; - uint64 version = 2; + WaitVersion version = 2; } message DropIndexRequest { @@ -320,7 +325,7 @@ message DropIndexRequest { message DropIndexResponse { common.Status status = 1; - uint64 version = 2; + WaitVersion version = 2; } message ReplaceTablePlan { @@ -345,7 +350,7 @@ message ReplaceTablePlanRequest { message ReplaceTablePlanResponse { common.Status status = 1; // The new global catalog version. - uint64 version = 2; + WaitVersion version = 2; } message GetTableRequest { @@ -378,7 +383,7 @@ message CreateSecretRequest { } message CreateSecretResponse { - uint64 version = 1; + WaitVersion version = 1; } message DropSecretRequest { @@ -386,7 +391,7 @@ message DropSecretRequest { } message DropSecretResponse { - uint64 version = 1; + WaitVersion version = 1; } message CreateConnectionRequest { @@ -406,7 +411,7 @@ message CreateConnectionRequest { message CreateConnectionResponse { // global catalog version - uint64 version = 1; + WaitVersion version = 1; } message ListConnectionsRequest {} @@ -421,7 +426,7 @@ message DropConnectionRequest { message DropConnectionResponse { common.Status status = 1; - uint64 version = 2; + WaitVersion version = 2; } message GetTablesRequest { @@ -442,7 +447,7 @@ message CommentOnRequest { message CommentOnResponse { common.Status status = 1; - uint64 version = 2; + WaitVersion version = 2; } message TableSchemaChange { diff --git a/proto/meta.proto b/proto/meta.proto index 4be4f2b18d9ca..b245c9322f932 100644 --- a/proto/meta.proto +++ b/proto/meta.proto @@ -145,7 +145,7 @@ message FlushRequest { message FlushResponse { common.Status status = 1; - hummock.HummockSnapshot snapshot = 2; + uint64 hummock_version_id = 2; } // The reason why the data sources in the cluster are paused. diff --git a/src/frontend/src/catalog/catalog_service.rs b/src/frontend/src/catalog/catalog_service.rs index 061aeb737aa3f..1164df548d5d0 100644 --- a/src/frontend/src/catalog/catalog_service.rs +++ b/src/frontend/src/catalog/catalog_service.rs @@ -19,6 +19,7 @@ use parking_lot::lock_api::ArcRwLockReadGuard; use parking_lot::{RawRwLock, RwLock}; use risingwave_common::catalog::{CatalogVersion, FunctionId, IndexId}; use risingwave_common::util::column_index_mapping::ColIndexMapping; +use risingwave_hummock_sdk::HummockVersionId; use risingwave_pb::catalog::{ PbComment, PbCreateType, PbDatabase, PbFunction, PbIndex, PbSchema, PbSink, PbSource, PbSubscription, PbTable, PbView, @@ -26,7 +27,7 @@ use risingwave_pb::catalog::{ use risingwave_pb::ddl_service::alter_owner_request::Object; use risingwave_pb::ddl_service::{ alter_name_request, alter_set_schema_request, create_connection_request, PbReplaceTablePlan, - PbTableJobType, ReplaceTablePlan, TableJobType, + PbTableJobType, ReplaceTablePlan, TableJobType, WaitVersion, }; use risingwave_pb::meta::PbTableParallelism; use risingwave_pb::stream_plan::StreamFragmentGraph; @@ -36,6 +37,7 @@ use tokio::sync::watch::Receiver; use super::root_catalog::Catalog; use super::{DatabaseId, SecretId, TableId}; use crate::error::Result; +use crate::scheduler::HummockSnapshotManagerRef; use crate::user::UserId; pub type CatalogReadGuard = ArcRwLockReadGuard; @@ -216,6 +218,7 @@ pub trait CatalogWriter: Send + Sync { pub struct CatalogWriterImpl { meta_client: MetaClient, catalog_updated_rx: Receiver, + hummock_snapshot_manager: HummockSnapshotManagerRef, } #[async_trait::async_trait] @@ -578,18 +581,26 @@ impl CatalogWriter for CatalogWriterImpl { } impl CatalogWriterImpl { - pub fn new(meta_client: MetaClient, catalog_updated_rx: Receiver) -> Self { + pub fn new( + meta_client: MetaClient, + catalog_updated_rx: Receiver, + hummock_snapshot_manager: HummockSnapshotManagerRef, + ) -> Self { Self { meta_client, catalog_updated_rx, + hummock_snapshot_manager, } } - async fn wait_version(&self, version: CatalogVersion) -> Result<()> { + async fn wait_version(&self, version: WaitVersion) -> Result<()> { let mut rx = self.catalog_updated_rx.clone(); - while *rx.borrow_and_update() < version { + while *rx.borrow_and_update() < version.catalog_version { rx.changed().await.map_err(|e| anyhow!(e))?; } + self.hummock_snapshot_manager + .wait(HummockVersionId::new(version.hummock_version_id)) + .await; Ok(()) } } diff --git a/src/frontend/src/handler/flush.rs b/src/frontend/src/handler/flush.rs index 784fbb393c916..58e5a305e8ad4 100644 --- a/src/frontend/src/handler/flush.rs +++ b/src/frontend/src/handler/flush.rs @@ -26,14 +26,14 @@ pub(super) async fn handle_flush(handler_args: HandlerArgs) -> Result Result<()> { let client = session.env().meta_client(); - let snapshot = client.flush(true).await?; + let version_id = client.flush(true).await?; // Wait for the snapshot to be synchronized, so that future reads in this session can see // previous writes. session .env() .hummock_snapshot_manager() - .wait(snapshot) + .wait(version_id) .await; Ok(()) diff --git a/src/frontend/src/meta_client.rs b/src/frontend/src/meta_client.rs index 97395dcd786bb..5b6f0e79819c6 100644 --- a/src/frontend/src/meta_client.rs +++ b/src/frontend/src/meta_client.rs @@ -49,7 +49,7 @@ pub trait FrontendMetaClient: Send + Sync { async fn try_unregister(&self); async fn get_snapshot(&self) -> Result; - async fn flush(&self, checkpoint: bool) -> Result; + async fn flush(&self, checkpoint: bool) -> Result; async fn wait(&self) -> Result<()>; @@ -143,7 +143,7 @@ impl FrontendMetaClient for FrontendMetaClientImpl { self.0.get_snapshot().await } - async fn flush(&self, checkpoint: bool) -> Result { + async fn flush(&self, checkpoint: bool) -> Result { self.0.flush(checkpoint).await } diff --git a/src/frontend/src/scheduler/snapshot.rs b/src/frontend/src/scheduler/snapshot.rs index b0d8918cfee41..8f979bf2bfdfe 100644 --- a/src/frontend/src/scheduler/snapshot.rs +++ b/src/frontend/src/scheduler/snapshot.rs @@ -18,10 +18,10 @@ use std::sync::Arc; use risingwave_common::util::epoch::Epoch; use risingwave_hummock_sdk::version::HummockVersionStateTableInfo; use risingwave_hummock_sdk::{ - FrontendHummockVersion, FrontendHummockVersionDelta, INVALID_VERSION_ID, + FrontendHummockVersion, FrontendHummockVersionDelta, HummockVersionId, INVALID_VERSION_ID, }; use risingwave_pb::common::{batch_query_epoch, BatchQueryEpoch}; -use risingwave_pb::hummock::{HummockVersionDeltas, PbHummockSnapshot}; +use risingwave_pb::hummock::HummockVersionDeltas; use tokio::sync::watch; use crate::expr::InlineNowProcTime; @@ -192,9 +192,9 @@ impl HummockSnapshotManager { } /// Wait until the latest snapshot is newer than the given one. - pub async fn wait(&self, snapshot: PbHummockSnapshot) { + pub async fn wait(&self, version_id: HummockVersionId) { let mut rx = self.latest_snapshot.subscribe(); - while rx.borrow_and_update().value.max_committed_epoch < snapshot.committed_epoch { + while rx.borrow_and_update().value.id < version_id { rx.changed().await.unwrap(); } } diff --git a/src/frontend/src/session.rs b/src/frontend/src/session.rs index 21c3e9a950b3a..ff83c905688a4 100644 --- a/src/frontend/src/session.rs +++ b/src/frontend/src/session.rs @@ -298,19 +298,21 @@ impl FrontendEnv { let mut join_handles = vec![heartbeat_join_handle]; let mut shutdown_senders = vec![heartbeat_shutdown_sender]; + let frontend_meta_client = Arc::new(FrontendMetaClientImpl(meta_client.clone())); + let hummock_snapshot_manager = + Arc::new(HummockSnapshotManager::new(frontend_meta_client.clone())); + let (catalog_updated_tx, catalog_updated_rx) = watch::channel(0); let catalog = Arc::new(RwLock::new(Catalog::default())); let catalog_writer = Arc::new(CatalogWriterImpl::new( meta_client.clone(), catalog_updated_rx, + hummock_snapshot_manager.clone(), )); let catalog_reader = CatalogReader::new(catalog.clone()); let worker_node_manager = Arc::new(WorkerNodeManager::new()); - let frontend_meta_client = Arc::new(FrontendMetaClientImpl(meta_client.clone())); - let hummock_snapshot_manager = - Arc::new(HummockSnapshotManager::new(frontend_meta_client.clone())); let compute_client_pool = Arc::new(ComputeClientPool::new( config.batch_exchange_connection_pool_size(), )); diff --git a/src/frontend/src/test_utils.rs b/src/frontend/src/test_utils.rs index 64623cefe9260..a089f00435918 100644 --- a/src/frontend/src/test_utils.rs +++ b/src/frontend/src/test_utils.rs @@ -33,6 +33,7 @@ use risingwave_common::system_param::reader::SystemParamsReader; use risingwave_common::util::cluster_limit::ClusterLimit; use risingwave_common::util::column_index_mapping::ColIndexMapping; use risingwave_hummock_sdk::version::{HummockVersion, HummockVersionDelta}; +use risingwave_hummock_sdk::{HummockVersionId, INVALID_VERSION_ID}; use risingwave_pb::backup_service::MetaSnapshotMetadata; use risingwave_pb::catalog::table::OptionalAssociatedSourceId; use risingwave_pb::catalog::{ @@ -933,8 +934,8 @@ impl FrontendMetaClient for MockFrontendMetaClient { Ok(HummockSnapshot { committed_epoch: 0 }) } - async fn flush(&self, _checkpoint: bool) -> RpcResult { - Ok(HummockSnapshot { committed_epoch: 0 }) + async fn flush(&self, _checkpoint: bool) -> RpcResult { + Ok(INVALID_VERSION_ID) } async fn wait(&self) -> RpcResult<()> { diff --git a/src/meta/service/src/stream_service.rs b/src/meta/service/src/stream_service.rs index cfbdda2e96509..6c1591a68acaf 100644 --- a/src/meta/service/src/stream_service.rs +++ b/src/meta/service/src/stream_service.rs @@ -69,10 +69,10 @@ impl StreamManagerService for StreamServiceImpl { self.env.idle_manager().record_activity(); let req = request.into_inner(); - let snapshot = self.barrier_scheduler.flush(req.checkpoint).await?; + let version_id = self.barrier_scheduler.flush(req.checkpoint).await?; Ok(Response::new(FlushResponse { status: None, - snapshot: Some(snapshot), + hummock_version_id: version_id.to_u64(), })) } diff --git a/src/meta/src/barrier/mod.rs b/src/meta/src/barrier/mod.rs index 5bb76ee46133c..590cfc98d33c3 100644 --- a/src/meta/src/barrier/mod.rs +++ b/src/meta/src/barrier/mod.rs @@ -1301,6 +1301,10 @@ impl GlobalBarrierManagerContext { } } } + + pub fn hummock_manager(&self) -> &HummockManagerRef { + &self.hummock_manager + } } impl CreateMviewProgressTracker { diff --git a/src/meta/src/barrier/schedule.rs b/src/meta/src/barrier/schedule.rs index f213999f0a08f..3e0e16a783a39 100644 --- a/src/meta/src/barrier/schedule.rs +++ b/src/meta/src/barrier/schedule.rs @@ -21,7 +21,7 @@ use anyhow::{anyhow, Context}; use assert_matches::assert_matches; use parking_lot::Mutex; use risingwave_common::catalog::TableId; -use risingwave_pb::hummock::HummockSnapshot; +use risingwave_hummock_sdk::HummockVersionId; use risingwave_pb::meta::PausedReason; use tokio::select; use tokio::sync::{oneshot, watch}; @@ -325,7 +325,7 @@ impl BarrierScheduler { } /// Flush means waiting for the next barrier to collect. - pub async fn flush(&self, checkpoint: bool) -> MetaResult { + pub async fn flush(&self, checkpoint: bool) -> MetaResult { let start = Instant::now(); tracing::debug!("start barrier flush"); @@ -334,8 +334,8 @@ impl BarrierScheduler { let elapsed = Instant::now().duration_since(start); tracing::debug!("barrier flushed in {:?}", elapsed); - let snapshot = self.hummock_manager.latest_snapshot(); - Ok(snapshot) + let version_id = self.hummock_manager.get_version_id().await; + Ok(version_id) } } diff --git a/src/meta/src/hummock/manager/versioning.rs b/src/meta/src/hummock/manager/versioning.rs index fc9fe3b28f650..3a9ae5b098aea 100644 --- a/src/meta/src/hummock/manager/versioning.rs +++ b/src/meta/src/hummock/manager/versioning.rs @@ -148,6 +148,10 @@ impl HummockManager { f(&self.versioning.read().await.current_version) } + pub async fn get_version_id(&self) -> HummockVersionId { + self.on_current_version(|version| version.id).await + } + /// Gets the mapping from table id to compaction group id pub async fn get_table_compaction_group_id_mapping( &self, diff --git a/src/meta/src/rpc/ddl_controller.rs b/src/meta/src/rpc/ddl_controller.rs index 43c8b52a571c3..7c14995521930 100644 --- a/src/meta/src/rpc/ddl_controller.rs +++ b/src/meta/src/rpc/ddl_controller.rs @@ -51,7 +51,7 @@ use risingwave_pb::catalog::{ }; use risingwave_pb::ddl_service::alter_owner_request::Object; use risingwave_pb::ddl_service::{ - alter_name_request, alter_set_schema_request, DdlProgress, TableJobType, + alter_name_request, alter_set_schema_request, DdlProgress, TableJobType, WaitVersion, }; use risingwave_pb::meta::table_fragments::fragment::FragmentDistributionType; use risingwave_pb::meta::table_fragments::PbFragment; @@ -278,7 +278,9 @@ impl DdlController { /// has been interrupted during executing, the request will be cancelled by tonic. Since we have /// a lot of logic for revert, status management, notification and so on, ensuring consistency /// would be a huge hassle and pain if we don't spawn here. - pub async fn run_command(&self, command: DdlCommand) -> MetaResult { + /// + /// Though returning `Option`, it's always `Some`, to simplify the handling logic + pub async fn run_command(&self, command: DdlCommand) -> MetaResult> { if !command.allow_in_recovery() { self.barrier_manager.check_status_running()?; } @@ -353,7 +355,16 @@ impl DdlController { } } .in_current_span(); - tokio::spawn(fut).await.unwrap() + let notification_version = tokio::spawn(fut).await.map_err(|e| anyhow!(e))??; + Ok(Some(WaitVersion { + catalog_version: notification_version, + hummock_version_id: self + .barrier_manager + .hummock_manager() + .get_version_id() + .await + .to_u64(), + })) } pub async fn get_ddl_progress(&self) -> MetaResult> { diff --git a/src/rpc_client/src/meta_client.rs b/src/rpc_client/src/meta_client.rs index f736e23cb9e00..2648843abe195 100644 --- a/src/rpc_client/src/meta_client.rs +++ b/src/rpc_client/src/meta_client.rs @@ -26,7 +26,7 @@ use cluster_limit_service_client::ClusterLimitServiceClient; use either::Either; use futures::stream::BoxStream; use lru::LruCache; -use risingwave_common::catalog::{CatalogVersion, FunctionId, IndexId, SecretId, TableId}; +use risingwave_common::catalog::{FunctionId, IndexId, SecretId, TableId}; use risingwave_common::config::{MetaConfig, MAX_CONNECTION_WINDOW_SIZE}; use risingwave_common::hash::WorkerSlotMapping; use risingwave_common::system_param::reader::SystemParamsReader; @@ -171,7 +171,7 @@ impl MetaClient { schema_id: u32, owner_id: u32, req: create_connection_request::Payload, - ) -> Result { + ) -> Result { let request = CreateConnectionRequest { name: connection_name, database_id, @@ -180,7 +180,9 @@ impl MetaClient { payload: Some(req), }; let resp = self.inner.create_connection(request).await?; - Ok(resp.version) + Ok(resp + .version + .ok_or_else(|| anyhow!("wait version not set"))?) } pub async fn create_secret( @@ -190,7 +192,7 @@ impl MetaClient { schema_id: u32, owner_id: u32, value: Vec, - ) -> Result { + ) -> Result { let request = CreateSecretRequest { name: secret_name, database_id, @@ -199,7 +201,9 @@ impl MetaClient { value, }; let resp = self.inner.create_secret(request).await?; - Ok(resp.version) + Ok(resp + .version + .ok_or_else(|| anyhow!("wait version not set"))?) } pub async fn list_connections(&self, _name: Option<&str>) -> Result> { @@ -208,18 +212,22 @@ impl MetaClient { Ok(resp.connections) } - pub async fn drop_connection(&self, connection_id: ConnectionId) -> Result { + pub async fn drop_connection(&self, connection_id: ConnectionId) -> Result { let request = DropConnectionRequest { connection_id }; let resp = self.inner.drop_connection(request).await?; - Ok(resp.version) + Ok(resp + .version + .ok_or_else(|| anyhow!("wait version not set"))?) } - pub async fn drop_secret(&self, secret_id: SecretId) -> Result { + pub async fn drop_secret(&self, secret_id: SecretId) -> Result { let request = DropSecretRequest { secret_id: secret_id.into(), }; let resp = self.inner.drop_secret(request).await?; - Ok(resp.version) + Ok(resp + .version + .ok_or_else(|| anyhow!("wait version not set"))?) } /// Register the current node to the cluster and set the corresponding worker id. @@ -365,27 +373,31 @@ impl MetaClient { Ok(()) } - pub async fn create_database(&self, db: PbDatabase) -> Result { + pub async fn create_database(&self, db: PbDatabase) -> Result { let request = CreateDatabaseRequest { db: Some(db) }; let resp = self.inner.create_database(request).await?; // TODO: handle error in `resp.status` here - Ok(resp.version) + Ok(resp + .version + .ok_or_else(|| anyhow!("wait version not set"))?) } - pub async fn create_schema(&self, schema: PbSchema) -> Result { + pub async fn create_schema(&self, schema: PbSchema) -> Result { let request = CreateSchemaRequest { schema: Some(schema), }; let resp = self.inner.create_schema(request).await?; // TODO: handle error in `resp.status` here - Ok(resp.version) + Ok(resp + .version + .ok_or_else(|| anyhow!("wait version not set"))?) } pub async fn create_materialized_view( &self, table: PbTable, graph: StreamFragmentGraph, - ) -> Result { + ) -> Result { let request = CreateMaterializedViewRequest { materialized_view: Some(table), fragment_graph: Some(graph), @@ -393,35 +405,41 @@ impl MetaClient { }; let resp = self.inner.create_materialized_view(request).await?; // TODO: handle error in `resp.status` here - Ok(resp.version) + Ok(resp + .version + .ok_or_else(|| anyhow!("wait version not set"))?) } pub async fn drop_materialized_view( &self, table_id: TableId, cascade: bool, - ) -> Result { + ) -> Result { let request = DropMaterializedViewRequest { table_id: table_id.table_id(), cascade, }; let resp = self.inner.drop_materialized_view(request).await?; - Ok(resp.version) + Ok(resp + .version + .ok_or_else(|| anyhow!("wait version not set"))?) } pub async fn create_source( &self, source: PbSource, graph: Option, - ) -> Result { + ) -> Result { let request = CreateSourceRequest { source: Some(source), fragment_graph: graph, }; let resp = self.inner.create_source(request).await?; - Ok(resp.version) + Ok(resp + .version + .ok_or_else(|| anyhow!("wait version not set"))?) } pub async fn create_sink( @@ -429,7 +447,7 @@ impl MetaClient { sink: PbSink, graph: StreamFragmentGraph, affected_table_change: Option, - ) -> Result { + ) -> Result { let request = CreateSinkRequest { sink: Some(sink), fragment_graph: Some(graph), @@ -437,27 +455,30 @@ impl MetaClient { }; let resp = self.inner.create_sink(request).await?; - Ok(resp.version) + Ok(resp + .version + .ok_or_else(|| anyhow!("wait version not set"))?) } - pub async fn create_subscription( - &self, - subscription: PbSubscription, - ) -> Result { + pub async fn create_subscription(&self, subscription: PbSubscription) -> Result { let request = CreateSubscriptionRequest { subscription: Some(subscription), }; let resp = self.inner.create_subscription(request).await?; - Ok(resp.version) + Ok(resp + .version + .ok_or_else(|| anyhow!("wait version not set"))?) } - pub async fn create_function(&self, function: PbFunction) -> Result { + pub async fn create_function(&self, function: PbFunction) -> Result { let request = CreateFunctionRequest { function: Some(function), }; let resp = self.inner.create_function(request).await?; - Ok(resp.version) + Ok(resp + .version + .ok_or_else(|| anyhow!("wait version not set"))?) } pub async fn create_table( @@ -466,7 +487,7 @@ impl MetaClient { table: PbTable, graph: StreamFragmentGraph, job_type: PbTableJobType, - ) -> Result { + ) -> Result { let request = CreateTableRequest { materialized_view: Some(table), fragment_graph: Some(graph), @@ -475,58 +496,70 @@ impl MetaClient { }; let resp = self.inner.create_table(request).await?; // TODO: handle error in `resp.status` here - Ok(resp.version) + Ok(resp + .version + .ok_or_else(|| anyhow!("wait version not set"))?) } - pub async fn comment_on(&self, comment: PbComment) -> Result { + pub async fn comment_on(&self, comment: PbComment) -> Result { let request = CommentOnRequest { comment: Some(comment), }; let resp = self.inner.comment_on(request).await?; - Ok(resp.version) + Ok(resp + .version + .ok_or_else(|| anyhow!("wait version not set"))?) } pub async fn alter_name( &self, object: alter_name_request::Object, name: &str, - ) -> Result { + ) -> Result { let request = AlterNameRequest { object: Some(object), new_name: name.to_string(), }; let resp = self.inner.alter_name(request).await?; - Ok(resp.version) + Ok(resp + .version + .ok_or_else(|| anyhow!("wait version not set"))?) } - pub async fn alter_owner(&self, object: Object, owner_id: u32) -> Result { + pub async fn alter_owner(&self, object: Object, owner_id: u32) -> Result { let request = AlterOwnerRequest { object: Some(object), owner_id, }; let resp = self.inner.alter_owner(request).await?; - Ok(resp.version) + Ok(resp + .version + .ok_or_else(|| anyhow!("wait version not set"))?) } pub async fn alter_set_schema( &self, object: alter_set_schema_request::Object, new_schema_id: u32, - ) -> Result { + ) -> Result { let request = AlterSetSchemaRequest { new_schema_id, object: Some(object), }; let resp = self.inner.alter_set_schema(request).await?; - Ok(resp.version) + Ok(resp + .version + .ok_or_else(|| anyhow!("wait version not set"))?) } - pub async fn alter_source(&self, source: PbSource) -> Result { + pub async fn alter_source(&self, source: PbSource) -> Result { let request = AlterSourceRequest { source: Some(source), }; let resp = self.inner.alter_source(request).await?; - Ok(resp.version) + Ok(resp + .version + .ok_or_else(|| anyhow!("wait version not set"))?) } pub async fn alter_parallelism( @@ -552,7 +585,7 @@ impl MetaClient { graph: StreamFragmentGraph, table_col_index_mapping: ColIndexMapping, job_type: PbTableJobType, - ) -> Result { + ) -> Result { let request = ReplaceTablePlanRequest { plan: Some(ReplaceTablePlan { source, @@ -564,7 +597,9 @@ impl MetaClient { }; let resp = self.inner.replace_table_plan(request).await?; // TODO: handle error in `resp.status` here - Ok(resp.version) + Ok(resp + .version + .ok_or_else(|| anyhow!("wait version not set"))?) } pub async fn auto_schema_change(&self, schema_change: SchemaChangeEnvelope) -> Result<()> { @@ -575,11 +610,13 @@ impl MetaClient { Ok(()) } - pub async fn create_view(&self, view: PbView) -> Result { + pub async fn create_view(&self, view: PbView) -> Result { let request = CreateViewRequest { view: Some(view) }; let resp = self.inner.create_view(request).await?; // TODO: handle error in `resp.status` here - Ok(resp.version) + Ok(resp + .version + .ok_or_else(|| anyhow!("wait version not set"))?) } pub async fn create_index( @@ -587,7 +624,7 @@ impl MetaClient { index: PbIndex, table: PbTable, graph: StreamFragmentGraph, - ) -> Result { + ) -> Result { let request = CreateIndexRequest { index: Some(index), index_table: Some(table), @@ -595,7 +632,9 @@ impl MetaClient { }; let resp = self.inner.create_index(request).await?; // TODO: handle error in `resp.status` here - Ok(resp.version) + Ok(resp + .version + .ok_or_else(|| anyhow!("wait version not set"))?) } pub async fn drop_table( @@ -603,7 +642,7 @@ impl MetaClient { source_id: Option, table_id: TableId, cascade: bool, - ) -> Result { + ) -> Result { let request = DropTableRequest { source_id: source_id.map(SourceId::Id), table_id: table_id.table_id(), @@ -611,19 +650,25 @@ impl MetaClient { }; let resp = self.inner.drop_table(request).await?; - Ok(resp.version) + Ok(resp + .version + .ok_or_else(|| anyhow!("wait version not set"))?) } - pub async fn drop_view(&self, view_id: u32, cascade: bool) -> Result { + pub async fn drop_view(&self, view_id: u32, cascade: bool) -> Result { let request = DropViewRequest { view_id, cascade }; let resp = self.inner.drop_view(request).await?; - Ok(resp.version) + Ok(resp + .version + .ok_or_else(|| anyhow!("wait version not set"))?) } - pub async fn drop_source(&self, source_id: u32, cascade: bool) -> Result { + pub async fn drop_source(&self, source_id: u32, cascade: bool) -> Result { let request = DropSourceRequest { source_id, cascade }; let resp = self.inner.drop_source(request).await?; - Ok(resp.version) + Ok(resp + .version + .ok_or_else(|| anyhow!("wait version not set"))?) } pub async fn drop_sink( @@ -631,27 +676,31 @@ impl MetaClient { sink_id: u32, cascade: bool, affected_table_change: Option, - ) -> Result { + ) -> Result { let request = DropSinkRequest { sink_id, cascade, affected_table_change, }; let resp = self.inner.drop_sink(request).await?; - Ok(resp.version) + Ok(resp + .version + .ok_or_else(|| anyhow!("wait version not set"))?) } pub async fn drop_subscription( &self, subscription_id: u32, cascade: bool, - ) -> Result { + ) -> Result { let request = DropSubscriptionRequest { subscription_id, cascade, }; let resp = self.inner.drop_subscription(request).await?; - Ok(resp.version) + Ok(resp + .version + .ok_or_else(|| anyhow!("wait version not set"))?) } pub async fn list_change_log_epochs( @@ -669,33 +718,41 @@ impl MetaClient { Ok(resp.epochs) } - pub async fn drop_index(&self, index_id: IndexId, cascade: bool) -> Result { + pub async fn drop_index(&self, index_id: IndexId, cascade: bool) -> Result { let request = DropIndexRequest { index_id: index_id.index_id, cascade, }; let resp = self.inner.drop_index(request).await?; - Ok(resp.version) + Ok(resp + .version + .ok_or_else(|| anyhow!("wait version not set"))?) } - pub async fn drop_function(&self, function_id: FunctionId) -> Result { + pub async fn drop_function(&self, function_id: FunctionId) -> Result { let request = DropFunctionRequest { function_id: function_id.0, }; let resp = self.inner.drop_function(request).await?; - Ok(resp.version) + Ok(resp + .version + .ok_or_else(|| anyhow!("wait version not set"))?) } - pub async fn drop_database(&self, database_id: DatabaseId) -> Result { + pub async fn drop_database(&self, database_id: DatabaseId) -> Result { let request = DropDatabaseRequest { database_id }; let resp = self.inner.drop_database(request).await?; - Ok(resp.version) + Ok(resp + .version + .ok_or_else(|| anyhow!("wait version not set"))?) } - pub async fn drop_schema(&self, schema_id: SchemaId) -> Result { + pub async fn drop_schema(&self, schema_id: SchemaId) -> Result { let request = DropSchemaRequest { schema_id }; let resp = self.inner.drop_schema(request).await?; - Ok(resp.version) + Ok(resp + .version + .ok_or_else(|| anyhow!("wait version not set"))?) } // TODO: using UserInfoVersion instead as return type. @@ -879,10 +936,10 @@ impl MetaClient { Ok(resp.tables) } - pub async fn flush(&self, checkpoint: bool) -> Result { + pub async fn flush(&self, checkpoint: bool) -> Result { let request = FlushRequest { checkpoint }; let resp = self.inner.flush(request).await?; - Ok(resp.snapshot.unwrap()) + Ok(HummockVersionId::new(resp.hummock_version_id)) } pub async fn wait(&self) -> Result<()> {