From 1863fe172c6d8dc83fa3b8522448f4737d99e899 Mon Sep 17 00:00:00 2001 From: xxhZs <1060434431@qq.com> Date: Tue, 24 Sep 2024 17:53:15 +0800 Subject: [PATCH 1/8] support block cursor --- proto/hummock.proto | 11 -- src/frontend/src/catalog/root_catalog.rs | 20 +++ src/frontend/src/catalog/schema_catalog.rs | 4 + .../src/catalog/subscription_catalog.rs | 15 +- src/frontend/src/handler/fetch_cursor.rs | 17 +- src/frontend/src/handler/util.rs | 19 ++- src/frontend/src/meta_client.rs | 18 -- src/frontend/src/observer/observer_manager.rs | 69 ++++++-- src/frontend/src/scheduler/snapshot.rs | 15 ++ src/frontend/src/session.rs | 15 +- src/frontend/src/session/cursor_manager.rs | 160 ++++++++++++++++-- src/frontend/src/test_utils.rs | 9 - src/meta/service/src/hummock_service.rs | 16 -- src/meta/src/hummock/manager/versioning.rs | 20 --- src/rpc_client/src/meta_client.rs | 16 -- src/sqlparser/src/ast/statement.rs | 8 +- src/storage/hummock_sdk/src/change_log.rs | 2 +- .../hummock_sdk/src/frontend_version.rs | 42 ++++- 18 files changed, 331 insertions(+), 145 deletions(-) diff --git a/proto/hummock.proto b/proto/hummock.proto index 92c1494707fb..0781c5416c00 100644 --- a/proto/hummock.proto +++ b/proto/hummock.proto @@ -837,7 +837,6 @@ service HummockManagerService { rpc ListCompactTaskAssignment(ListCompactTaskAssignmentRequest) returns (ListCompactTaskAssignmentResponse); rpc ListCompactTaskProgress(ListCompactTaskProgressRequest) returns (ListCompactTaskProgressResponse); rpc CancelCompactTask(CancelCompactTaskRequest) returns (CancelCompactTaskResponse); - rpc ListChangeLogEpochs(ListChangeLogEpochsRequest) returns (ListChangeLogEpochsResponse); rpc GetVersionByEpoch(GetVersionByEpochRequest) returns (GetVersionByEpochResponse); rpc MergeCompactionGroup(MergeCompactionGroupRequest) returns (MergeCompactionGroupResponse); } @@ -909,13 +908,3 @@ message BranchedObject { // Compaction group id the SST belongs to. uint64 compaction_group_id = 3; } - -message ListChangeLogEpochsRequest { - uint32 table_id = 1; - uint64 min_epoch = 2; - uint32 max_count = 3; -} - -message ListChangeLogEpochsResponse { - repeated uint64 epochs = 1; -} diff --git a/src/frontend/src/catalog/root_catalog.rs b/src/frontend/src/catalog/root_catalog.rs index 85c76927be77..f052452ae074 100644 --- a/src/frontend/src/catalog/root_catalog.rs +++ b/src/frontend/src/catalog/root_catalog.rs @@ -287,6 +287,14 @@ impl Catalog { } } + pub fn get_all_tables_id_in_database(&self, db_id: DatabaseId) -> Vec { + if let Ok(database) = self.get_database_by_id(&db_id) { + database.iter_all_table_ids().collect() + } else { + vec![] + } + } + pub fn drop_database(&mut self, db_id: DatabaseId) { let name = self.db_name_by_id.remove(&db_id).unwrap(); let database = self.database_by_name.remove(&name).unwrap(); @@ -295,6 +303,18 @@ impl Catalog { }); } + pub fn get_all_tables_id_in_schema( + &self, + db_id: DatabaseId, + schema_id: SchemaId, + ) -> Vec { + if let Ok(schema) = self.get_schema_by_id(&db_id, &schema_id) { + schema.iter_all_table_ids().cloned().collect() + } else { + vec![] + } + } + pub fn drop_schema(&mut self, db_id: DatabaseId, schema_id: SchemaId) { self.get_database_mut(db_id).unwrap().drop_schema(schema_id); } diff --git a/src/frontend/src/catalog/schema_catalog.rs b/src/frontend/src/catalog/schema_catalog.rs index 0394da2a70f8..be8db9ad66b2 100644 --- a/src/frontend/src/catalog/schema_catalog.rs +++ b/src/frontend/src/catalog/schema_catalog.rs @@ -543,6 +543,10 @@ impl SchemaCatalog { .map(|(_, v)| v) } + pub fn iter_all_table_ids(&self) -> impl Iterator { + self.table_by_id.keys() + } + pub fn iter_internal_table(&self) -> impl Iterator> { self.table_by_name .iter() diff --git a/src/frontend/src/catalog/subscription_catalog.rs b/src/frontend/src/catalog/subscription_catalog.rs index 36a5a71a0e9b..b7129ca709a9 100644 --- a/src/frontend/src/catalog/subscription_catalog.rs +++ b/src/frontend/src/catalog/subscription_catalog.rs @@ -12,17 +12,14 @@ // See the License for the specific language governing permissions and // limitations under the License. -use core::str::FromStr; - use risingwave_common::catalog::{TableId, UserId, OBJECT_ID_PLACEHOLDER}; -use risingwave_common::types::Interval; use risingwave_common::util::epoch::Epoch; use risingwave_pb::catalog::subscription::PbSubscriptionState; use risingwave_pb::catalog::PbSubscription; -use thiserror_ext::AsReport; use super::OwnedByUserCatalog; use crate::error::{ErrorCode, Result}; +use crate::handler::util::convert_interval_to_logstore_u64; use crate::WithOptions; #[derive(Clone, Debug, PartialEq, Eq, Hash)] @@ -86,15 +83,7 @@ impl SubscriptionCatalog { let retention_seconds_str = properties.get("retention").ok_or_else(|| { ErrorCode::InternalError("Subscription retention time not set.".to_string()) })?; - let retention_seconds = (Interval::from_str(retention_seconds_str) - .map_err(|err| { - ErrorCode::InternalError(format!( - "Retention needs to be set in Interval format: {:?}", - err.to_report_string() - )) - })? - .epoch_in_micros() - / 1000000) as u64; + let retention_seconds = convert_interval_to_logstore_u64(retention_seconds_str)?; self.retention_seconds = retention_seconds; Ok(()) } diff --git a/src/frontend/src/handler/fetch_cursor.rs b/src/frontend/src/handler/fetch_cursor.rs index d339e3e7a1ac..7e29ab611db5 100644 --- a/src/frontend/src/handler/fetch_cursor.rs +++ b/src/frontend/src/handler/fetch_cursor.rs @@ -22,11 +22,12 @@ use risingwave_sqlparser::ast::{FetchCursorStatement, Statement}; use super::extended_handle::{PortalResult, PrepareStatement, PreparedResult}; use super::query::BoundResult; +use super::util::convert_interval_to_logstore_u64; use super::RwPgResponse; use crate::binder::BoundStatement; use crate::error::Result; use crate::handler::HandlerArgs; -use crate::{Binder, PgResponseStream}; +use crate::{Binder, PgResponseStream, WithOptions}; pub async fn handle_fetch_cursor_execute( handler_args: HandlerArgs, @@ -61,10 +62,22 @@ pub async fn handle_fetch_cursor( let (_, cursor_name) = Binder::resolve_schema_qualified_name(db_name, stmt.cursor_name.clone())?; + let with_options = WithOptions::try_from(stmt.with_properties.0.as_slice())?; + let timeout_seconds = with_options + .get("timeout") + .map(convert_interval_to_logstore_u64) + .transpose()?; + let cursor_manager = session.get_cursor_manager(); let (rows, pg_descs) = cursor_manager - .get_rows_with_cursor(cursor_name, stmt.count, handler_args, formats) + .get_rows_with_cursor( + cursor_name, + stmt.count, + handler_args, + formats, + timeout_seconds, + ) .await?; Ok(build_fetch_cursor_response(rows, pg_descs)) } diff --git a/src/frontend/src/handler/util.rs b/src/frontend/src/handler/util.rs index 0531ce5a6528..6a3b76b65a83 100644 --- a/src/frontend/src/handler/util.rs +++ b/src/frontend/src/handler/util.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +use core::str::FromStr; use std::pin::Pin; use std::sync::Arc; use std::task::{Context, Poll}; @@ -28,13 +29,16 @@ use pin_project_lite::pin_project; use risingwave_common::array::DataChunk; use risingwave_common::catalog::Field; use risingwave_common::row::Row as _; -use risingwave_common::types::{write_date_time_tz, DataType, ScalarRefImpl, Timestamptz}; +use risingwave_common::types::{ + write_date_time_tz, DataType, Interval, ScalarRefImpl, Timestamptz, +}; use risingwave_common::util::epoch::Epoch; use risingwave_common::util::iter_util::ZipEqFast; use risingwave_sqlparser::ast::{ CompatibleSourceSchema, ConnectorSchema, ObjectName, Query, Select, SelectItem, SetExpr, TableFactor, TableWithJoins, }; +use thiserror_ext::AsReport; use crate::error::{ErrorCode, Result as RwResult}; use crate::session::{current, SessionImpl}; @@ -238,6 +242,19 @@ pub fn convert_logstore_u64_to_unix_millis(logstore_u64: u64) -> u64 { Epoch::from(logstore_u64).as_unix_millis() } +pub fn convert_interval_to_logstore_u64(interval: &String) -> RwResult { + let retention_seconds = (Interval::from_str(interval) + .map_err(|err| { + ErrorCode::InternalError(format!( + "Covert interval to u64 error, please check format, error: {:?}", + err.to_report_string() + )) + })? + .epoch_in_micros() + / 1000000) as u64; + Ok(retention_seconds) +} + #[cfg(test)] mod tests { use postgres_types::{ToSql, Type}; diff --git a/src/frontend/src/meta_client.rs b/src/frontend/src/meta_client.rs index 97395dcd786b..aaf4b48744ba 100644 --- a/src/frontend/src/meta_client.rs +++ b/src/frontend/src/meta_client.rs @@ -119,13 +119,6 @@ pub trait FrontendMetaClient: Send + Sync { rate_limit: Option, ) -> Result<()>; - async fn list_change_log_epochs( - &self, - table_id: u32, - min_epoch: u64, - max_count: u32, - ) -> Result>; - async fn get_cluster_recovery_status(&self) -> Result; async fn get_cluster_limits(&self) -> Result>; @@ -297,17 +290,6 @@ impl FrontendMetaClient for FrontendMetaClientImpl { .map(|_| ()) } - async fn list_change_log_epochs( - &self, - table_id: u32, - min_epoch: u64, - max_count: u32, - ) -> Result> { - self.0 - .list_change_log_epochs(table_id, min_epoch, max_count) - .await - } - async fn get_cluster_recovery_status(&self) -> Result { self.0.get_cluster_recovery_status().await } diff --git a/src/frontend/src/observer/observer_manager.rs b/src/frontend/src/observer/observer_manager.rs index b1ecf4182d1d..920c36ab887d 100644 --- a/src/frontend/src/observer/observer_manager.rs +++ b/src/frontend/src/observer/observer_manager.rs @@ -18,7 +18,7 @@ use std::sync::Arc; use itertools::Itertools; use parking_lot::RwLock; use risingwave_batch::worker_manager::worker_node_manager::WorkerNodeManagerRef; -use risingwave_common::catalog::CatalogVersion; +use risingwave_common::catalog::{CatalogVersion, TableId}; use risingwave_common::hash::WorkerSlotMapping; use risingwave_common::secret::LocalSecretManager; use risingwave_common::session_config::SessionConfig; @@ -36,6 +36,7 @@ use tokio::sync::watch::Sender; use crate::catalog::root_catalog::Catalog; use crate::catalog::{FragmentId, SecretId}; use crate::scheduler::HummockSnapshotManagerRef; +use crate::session::SessionMapRef; use crate::user::user_manager::UserInfoManager; use crate::user::UserInfoVersion; @@ -49,6 +50,7 @@ pub struct FrontendObserverNode { system_params_manager: LocalSystemParamsManagerRef, session_params: Arc>, compute_client_pool: ComputeClientPoolRef, + sessions_map: SessionMapRef, } impl ObserverState for FrontendObserverNode { @@ -87,7 +89,14 @@ impl ObserverState for FrontendObserverNode { ) } Info::HummockVersionDeltas(deltas) => { + let table_ids = deltas + .version_deltas + .iter() + .flat_map(|version_deltas| version_deltas.change_log_delta.keys()) + .map(|table_id| TableId::new(*table_id)) + .collect_vec(); self.handle_hummock_snapshot_notification(deltas); + self.handle_cursor_notify(table_ids); } Info::MetaBackupManifestId(_) => { panic!("frontend node should not receive MetaBackupManifestId"); @@ -191,10 +200,14 @@ impl ObserverState for FrontendObserverNode { convert_worker_slot_mapping(&streaming_worker_slot_mappings), convert_worker_slot_mapping(&serving_worker_slot_mappings), ); - self.hummock_snapshot_manager - .init(FrontendHummockVersion::from_protobuf( - hummock_version.unwrap(), - )); + let hummock_version = FrontendHummockVersion::from_protobuf(hummock_version.unwrap()); + let table_ids = hummock_version + .table_change_log + .keys() + .cloned() + .collect_vec(); + self.hummock_snapshot_manager.init(hummock_version); + self.handle_cursor_notify(table_ids); let snapshot_version = version.unwrap(); catalog_guard.set_version(snapshot_version.catalog_version); @@ -222,6 +235,7 @@ impl FrontendObserverNode { system_params_manager: LocalSystemParamsManagerRef, session_params: Arc>, compute_client_pool: ComputeClientPoolRef, + sessions_map: SessionMapRef, ) -> Self { Self { worker_node_manager, @@ -233,6 +247,7 @@ impl FrontendObserverNode { system_params_manager, session_params, compute_client_pool, + sessions_map, } } @@ -250,13 +265,22 @@ impl FrontendObserverNode { match info { Info::Database(database) => match resp.operation() { Operation::Add => catalog_guard.create_database(database), - Operation::Delete => catalog_guard.drop_database(database.id), + Operation::Delete => { + let table_ids = catalog_guard.get_all_tables_id_in_database(database.id); + catalog_guard.drop_database(database.id); + self.handle_cursor_remove_table_ids(table_ids); + } Operation::Update => catalog_guard.update_database(database), _ => panic!("receive an unsupported notify {:?}", resp), }, Info::Schema(schema) => match resp.operation() { Operation::Add => catalog_guard.create_schema(schema), - Operation::Delete => catalog_guard.drop_schema(schema.database_id, schema.id), + Operation::Delete => { + let table_ids = + catalog_guard.get_all_tables_id_in_schema(schema.database_id, schema.id); + catalog_guard.drop_schema(schema.database_id, schema.id); + self.handle_cursor_remove_table_ids(table_ids); + } Operation::Update => catalog_guard.update_schema(schema), _ => panic!("receive an unsupported notify {:?}", resp), }, @@ -268,11 +292,14 @@ impl FrontendObserverNode { match relation { RelationInfo::Table(table) => match resp.operation() { Operation::Add => catalog_guard.create_table(table), - Operation::Delete => catalog_guard.drop_table( - table.database_id, - table.schema_id, - table.id.into(), - ), + Operation::Delete => { + catalog_guard.drop_table( + table.database_id, + table.schema_id, + table.id.into(), + ); + self.handle_cursor_remove_table_ids(vec![table.id.into()]); + } Operation::Update => { let old_fragment_id = catalog_guard .get_any_table_by_id(&table.id.into()) @@ -468,6 +495,24 @@ impl FrontendObserverNode { self.hummock_snapshot_manager.update(deltas); } + fn handle_cursor_notify(&self, table_ids: Vec) { + for session in self.sessions_map.read().values() { + session + .get_cursor_manager() + .get_cursor_notifies() + .notify_cursors(&table_ids); + } + } + + fn handle_cursor_remove_table_ids(&self, table_ids: Vec) { + for session in self.sessions_map.read().values() { + session + .get_cursor_manager() + .get_cursor_notifies() + .remove_tables_ids(&table_ids); + } + } + fn handle_secret_notification(&mut self, resp: SubscribeResponse) { let resp_op = resp.operation(); let Some(Info::Secret(secret)) = resp.info else { diff --git a/src/frontend/src/scheduler/snapshot.rs b/src/frontend/src/scheduler/snapshot.rs index b0d8918cfee4..e07360717b51 100644 --- a/src/frontend/src/scheduler/snapshot.rs +++ b/src/frontend/src/scheduler/snapshot.rs @@ -15,6 +15,7 @@ use std::collections::HashMap; use std::sync::Arc; +use risingwave_common::catalog::TableId; use risingwave_common::util::epoch::Epoch; use risingwave_hummock_sdk::version::HummockVersionStateTableInfo; use risingwave_hummock_sdk::{ @@ -105,6 +106,20 @@ impl PinnedSnapshot { pub fn committed_epoch(&self) -> u64 { self.value.max_committed_epoch } + + pub fn list_change_log_epochs( + &self, + table_id: u32, + min_epoch: u64, + max_count: u32, + ) -> Vec { + if let Some(table_change_log) = self.value.table_change_log.get(&TableId::new(table_id)) { + let table_change_log = table_change_log.clone(); + table_change_log.get_non_empty_epochs(min_epoch, max_count as usize) + } else { + vec![] + } + } } /// Returns an invalid snapshot, used for initial values. diff --git a/src/frontend/src/session.rs b/src/frontend/src/session.rs index 21c3e9a950b3..b6da8522ec76 100644 --- a/src/frontend/src/session.rs +++ b/src/frontend/src/session.rs @@ -342,6 +342,9 @@ impl FrontendEnv { // This `session_params` should be initialized during the initial notification in `observer_manager` let session_params = Arc::new(RwLock::new(SessionConfig::default())); + let sessions_map: SessionMapRef = Arc::new(RwLock::new(HashMap::new())); + let cursor_metrics = Arc::new(CursorMetrics::init(sessions_map.clone())); + let frontend_observer_node = FrontendObserverNode::new( worker_node_manager.clone(), catalog, @@ -352,6 +355,7 @@ impl FrontendEnv { system_params_manager.clone(), session_params.clone(), compute_client_pool.clone(), + sessions_map.clone(), ); let observer_manager = ObserverManager::new_with_meta_client(meta_client.clone(), frontend_observer_node) @@ -413,10 +417,7 @@ impl FrontendEnv { .unwrap(), )); - let sessions_map: SessionMapRef = Arc::new(RwLock::new(HashMap::new())); - let cursor_metrics = Arc::new(CursorMetrics::init(sessions_map.clone())); let sessions = sessions_map.clone(); - // Idle transaction background monitor let join_handle = tokio::spawn(async move { let mut check_idle_txn_interval = @@ -1086,7 +1087,7 @@ impl SessionImpl { Ok(secret.clone()) } - pub async fn list_change_log_epochs( + pub fn list_change_log_epochs( &self, table_id: u32, min_epoch: u64, @@ -1094,9 +1095,9 @@ impl SessionImpl { ) -> Result> { Ok(self .env - .meta_client() - .list_change_log_epochs(table_id, min_epoch, max_count) - .await?) + .hummock_snapshot_manager() + .acquire() + .list_change_log_epochs(table_id, min_epoch, max_count)) } pub fn clear_cancel_query_flag(&self) { diff --git a/src/frontend/src/session/cursor_manager.rs b/src/frontend/src/session/cursor_manager.rs index c4e0ab709621..758b3ac45c9a 100644 --- a/src/frontend/src/session/cursor_manager.rs +++ b/src/frontend/src/session/cursor_manager.rs @@ -30,6 +30,8 @@ use risingwave_common::error::BoxedError; use risingwave_common::session_config::QueryMode; use risingwave_common::types::DataType; use risingwave_sqlparser::ast::{Ident, ObjectName, Statement}; +use tokio::sync::watch::{channel, Receiver, Sender}; +use tokio::sync::{mpsc, oneshot, RwLock}; use super::SessionImpl; use crate::catalog::subscription_catalog::SubscriptionCatalog; @@ -110,13 +112,18 @@ impl Cursor { count: u32, handle_args: HandlerArgs, formats: &Vec, + timeout_seconds: Option, ) -> Result<(Vec, Vec)> { match self { Cursor::Subscription(cursor) => cursor - .next(count, handle_args, formats) + .next(count, handle_args, formats, timeout_seconds) .await .inspect_err(|_| cursor.cursor_metrics.subscription_cursor_error_count.inc()), - Cursor::Query(cursor) => cursor.next(count, formats, handle_args).await, + Cursor::Query(cursor) => { + cursor + .next(count, formats, handle_args, timeout_seconds) + .await + } } } @@ -161,9 +168,11 @@ impl QueryCursor { count: u32, formats: &Vec, handle_args: HandlerArgs, + timeout_seconds: Option, ) -> Result<(Vec, Vec)> { // `FETCH NEXT` is equivalent to `FETCH 1`. // min with 100 to avoid allocating too many memory at once. + let timeout_instant = timeout_seconds.map(|s| Instant::now() + Duration::from_secs(s)); let session = handle_args.session; let mut ans = Vec::with_capacity(std::cmp::min(100, count) as usize); let mut cur = 0; @@ -175,6 +184,11 @@ impl QueryCursor { { cur += 1; ans.push(row); + if let Some(timeout_instant) = timeout_instant + && Instant::now() > timeout_instant + { + break; + } } Ok((ans, desc)) } @@ -221,6 +235,7 @@ pub struct SubscriptionCursor { // and will be reset each time it is created chunk_stream, this is to avoid changes in the catalog due to alter. fields: Vec, cursor_metrics: Arc, + cursor_notifies: Arc, last_fetch: Instant, } @@ -232,6 +247,7 @@ impl SubscriptionCursor { dependent_table_id: TableId, handle_args: &HandlerArgs, cursor_metrics: Arc, + cursor_notifies: Arc, ) -> Result { let (state, fields) = if let Some(start_timestamp) = start_timestamp { let table_catalog = handle_args.session.get_table_by_id(&dependent_table_id)?; @@ -290,6 +306,7 @@ impl SubscriptionCursor { fields, cursor_metrics, last_fetch: Instant::now(), + cursor_notifies, }) } @@ -313,9 +330,7 @@ impl SubscriptionCursor { *expected_timestamp, handle_args.clone(), &self.subscription, - ) - .await - { + ) { Ok((Some(rw_timestamp), expected_timestamp)) => { let (mut chunk_stream, fields, init_query_timer) = Self::initiate_query( @@ -429,7 +444,9 @@ impl SubscriptionCursor { count: u32, handle_args: HandlerArgs, formats: &Vec, + timeout_seconds: Option, ) -> Result<(Vec, Vec)> { + let timeout_instant = timeout_seconds.map(|s| Instant::now() + Duration::from_secs(s)); if Instant::now() > self.cursor_need_drop_time { return Err(ErrorCode::InternalError( "The cursor has exceeded its maximum lifetime, please recreate it (close then declare cursor).".to_string(), @@ -467,16 +484,27 @@ impl SubscriptionCursor { ans.push(row); } None => { - break; + if cur == 0 { + self.cursor_notifies + .wait_next_epoch(self.dependent_table_id.table_id(), timeout_seconds) + .await?; + } else { + break; + } } } + if let Some(timeout_instant) = timeout_instant + && Instant::now() > timeout_instant + { + break; + } } self.last_fetch = Instant::now(); Ok((ans, desc)) } - async fn get_next_rw_timestamp( + fn get_next_rw_timestamp( seek_timestamp: u64, table_id: &TableId, expected_timestamp: Option, @@ -491,9 +519,7 @@ impl SubscriptionCursor { )?; // The epoch here must be pulled every time, otherwise there will be cache consistency issues - let new_epochs = session - .list_change_log_epochs(table_id.table_id(), seek_timestamp, 2) - .await?; + let new_epochs = session.list_change_log_epochs(table_id.table_id(), seek_timestamp, 2)?; if let Some(expected_timestamp) = expected_timestamp && (new_epochs.is_empty() || &expected_timestamp != new_epochs.first().unwrap()) { @@ -673,6 +699,7 @@ impl SubscriptionCursor { pub struct CursorManager { cursor_map: tokio::sync::Mutex>, + cursor_notifies: Arc, cursor_metrics: Arc, } @@ -681,9 +708,14 @@ impl CursorManager { Self { cursor_map: tokio::sync::Mutex::new(HashMap::new()), cursor_metrics, + cursor_notifies: Arc::new(CursorNotifies::new()), } } + pub fn get_cursor_notifies(&self) -> Arc { + self.cursor_notifies.clone() + } + pub async fn add_subscription_cursor( &self, cursor_name: String, @@ -701,6 +733,7 @@ impl CursorManager { dependent_table_id, handle_args, self.cursor_metrics.clone(), + self.cursor_notifies.clone(), ) .await?; let mut cursor_map = self.cursor_map.lock().await; @@ -773,9 +806,12 @@ impl CursorManager { count: u32, handle_args: HandlerArgs, formats: &Vec, + timeout_seconds: Option, ) -> Result<(Vec, Vec)> { if let Some(cursor) = self.cursor_map.lock().await.get_mut(&cursor_name) { - cursor.next(count, handle_args, formats).await + cursor + .next(count, handle_args, formats, timeout_seconds) + .await } else { Err(ErrorCode::ItemNotFound(format!("Cannot find cursor `{}`", cursor_name)).into()) } @@ -875,3 +911,105 @@ impl CursorManager { ) } } + +enum CursorNotifyState { + RemoveTables(Vec), + NotifyTables(Vec), +} +pub type CursorNotifyMapRef = Arc, Receiver<()>)>>>; +pub struct CursorNotifies { + cursor_notify_map: CursorNotifyMapRef, + sender: mpsc::UnboundedSender, + shutdown_tx: Option>, +} + +impl CursorNotifies { + pub fn new() -> Self { + let (sender, mut receiver) = mpsc::unbounded_channel(); + let (shutdown_tx, mut shutdown_rx) = oneshot::channel(); + let cursor_notify_map = Arc::new(RwLock::new( + HashMap::, Receiver<()>)>::new(), + )); + let cursor_notify_map_clone = cursor_notify_map.clone(); + tokio::spawn(async move { + loop { + tokio::select! { + _ = &mut shutdown_rx => { + break; + } + cursor_notify_state = receiver.recv() => { + match cursor_notify_state { + Some(CursorNotifyState::RemoveTables(table_ids)) => { + for table_id in table_ids { + let mut cursor_notify_map_clone_write = cursor_notify_map_clone.write().await; + cursor_notify_map_clone_write.remove(&table_id.table_id()); + } + } + Some(CursorNotifyState::NotifyTables(table_ids)) => { + let cursor_notify_map_clone_read = cursor_notify_map_clone.read().await; + for table_id in table_ids { + if let Some((tx,_rx)) = cursor_notify_map_clone_read.get(&table_id.table_id()) { + tx.send(()).unwrap(); + } + } + } + None => { + break; + } + } + } + } + } + }); + Self { + cursor_notify_map, + sender, + shutdown_tx: Some(shutdown_tx), + } + } + + pub async fn wait_next_epoch(&self, id: u32, timeout_seconds: Option) -> Result<()> { + let mut rx = { + let mut cursor_notify_map_write = self.cursor_notify_map.write().await; + let (_tx, rx) = cursor_notify_map_write.entry(id).or_insert_with(|| { + let (tx, rx) = channel(()); + (tx, rx) + }); + rx.clone() + }; + let mut timeout_interval = + tokio::time::interval(Duration::from_secs(timeout_seconds.unwrap_or(u64::MAX))); + timeout_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay); + timeout_interval.tick().await; + tokio::select! { + result = rx.changed() => { + result.map_err(|_| { + ErrorCode::InternalError(format!("Cursor dependent table deleted: table_id is {:?}", id)) + })?; + } + _ = timeout_interval.tick() => { + tracing::debug!("Cursor wait next epoch timeout"); + } + } + Ok(()) + } + + pub fn notify_cursors(&self, ids: &Vec) { + self.sender + .send(CursorNotifyState::NotifyTables(ids.clone())) + .unwrap(); + } + + pub fn remove_tables_ids(&self, ids: &Vec) { + self.sender + .send(CursorNotifyState::RemoveTables(ids.clone())) + .unwrap(); + } +} +impl Drop for CursorNotifies { + fn drop(&mut self) { + if let Some(shutdown_tx) = mem::take(&mut self.shutdown_tx) { + shutdown_tx.send(()).ok(); + } + } +} diff --git a/src/frontend/src/test_utils.rs b/src/frontend/src/test_utils.rs index 64623cefe926..a54d09f3ae78 100644 --- a/src/frontend/src/test_utils.rs +++ b/src/frontend/src/test_utils.rs @@ -1065,15 +1065,6 @@ impl FrontendMetaClient for MockFrontendMetaClient { Ok(RecoveryStatus::StatusRunning) } - async fn list_change_log_epochs( - &self, - _table_id: u32, - _min_epoch: u64, - _max_count: u32, - ) -> RpcResult> { - unimplemented!() - } - async fn get_cluster_limits(&self) -> RpcResult> { Ok(vec![]) } diff --git a/src/meta/service/src/hummock_service.rs b/src/meta/service/src/hummock_service.rs index 5830951d3e5f..29e58c74b18f 100644 --- a/src/meta/service/src/hummock_service.rs +++ b/src/meta/service/src/hummock_service.rs @@ -626,22 +626,6 @@ impl HummockManagerService for HummockServiceImpl { return Ok(response); } - async fn list_change_log_epochs( - &self, - request: Request, - ) -> Result, Status> { - let ListChangeLogEpochsRequest { - table_id, - min_epoch, - max_count, - } = request.into_inner(); - let epochs = self - .hummock_manager - .list_change_log_epochs(table_id, min_epoch, max_count) - .await; - Ok(Response::new(ListChangeLogEpochsResponse { epochs })) - } - async fn get_version_by_epoch( &self, request: Request, diff --git a/src/meta/src/hummock/manager/versioning.rs b/src/meta/src/hummock/manager/versioning.rs index fc9fe3b28f65..a4136f39004f 100644 --- a/src/meta/src/hummock/manager/versioning.rs +++ b/src/meta/src/hummock/manager/versioning.rs @@ -16,7 +16,6 @@ use std::cmp; use std::collections::{BTreeMap, HashMap, HashSet}; use itertools::Itertools; -use risingwave_common::catalog::TableId; use risingwave_hummock_sdk::compaction_group::hummock_version_ext::{ get_compaction_group_ids, get_table_compaction_group_id_mapping, BranchedSstInfo, }; @@ -270,25 +269,6 @@ impl HummockManager { let snapshot = self.latest_snapshot.load(); HummockSnapshot::clone(&snapshot) } - - pub async fn list_change_log_epochs( - &self, - table_id: u32, - min_epoch: u64, - max_count: u32, - ) -> Vec { - let versioning = self.versioning.read().await; - if let Some(table_change_log) = versioning - .current_version - .table_change_log - .get(&TableId::new(table_id)) - { - let table_change_log = table_change_log.clone(); - table_change_log.get_non_empty_epochs(min_epoch, max_count as usize) - } else { - vec![] - } - } } /// Calculates write limits for `target_groups`. diff --git a/src/rpc_client/src/meta_client.rs b/src/rpc_client/src/meta_client.rs index f736e23cb9e0..6b2579aec5bc 100644 --- a/src/rpc_client/src/meta_client.rs +++ b/src/rpc_client/src/meta_client.rs @@ -654,21 +654,6 @@ impl MetaClient { Ok(resp.version) } - pub async fn list_change_log_epochs( - &self, - table_id: u32, - min_epoch: u64, - max_count: u32, - ) -> Result> { - let request = ListChangeLogEpochsRequest { - table_id, - min_epoch, - max_count, - }; - let resp = self.inner.list_change_log_epochs(request).await?; - Ok(resp.epochs) - } - pub async fn drop_index(&self, index_id: IndexId, cascade: bool) -> Result { let request = DropIndexRequest { index_id: index_id.index_id, @@ -2073,7 +2058,6 @@ macro_rules! for_all_meta_rpc { ,{ hummock_client, list_compact_task_assignment, ListCompactTaskAssignmentRequest, ListCompactTaskAssignmentResponse } ,{ hummock_client, list_compact_task_progress, ListCompactTaskProgressRequest, ListCompactTaskProgressResponse } ,{ hummock_client, cancel_compact_task, CancelCompactTaskRequest, CancelCompactTaskResponse} - ,{ hummock_client, list_change_log_epochs, ListChangeLogEpochsRequest, ListChangeLogEpochsResponse } ,{ hummock_client, get_version_by_epoch, GetVersionByEpochRequest, GetVersionByEpochResponse } ,{ hummock_client, merge_compaction_group, MergeCompactionGroupRequest, MergeCompactionGroupResponse } ,{ user_client, create_user, CreateUserRequest, CreateUserResponse } diff --git a/src/sqlparser/src/ast/statement.rs b/src/sqlparser/src/ast/statement.rs index 7dd7564fc666..8164194b36bc 100644 --- a/src/sqlparser/src/ast/statement.rs +++ b/src/sqlparser/src/ast/statement.rs @@ -717,6 +717,7 @@ impl fmt::Display for DeclareCursorStatement { pub struct FetchCursorStatement { pub cursor_name: ObjectName, pub count: u32, + pub with_properties: WithProperties, } impl ParseTo for FetchCursorStatement { @@ -728,8 +729,13 @@ impl ParseTo for FetchCursorStatement { }; p.expect_keyword(Keyword::FROM)?; impl_parse_to!(cursor_name: ObjectName, p); + impl_parse_to!(with_properties: WithProperties, p); - Ok(Self { cursor_name, count }) + Ok(Self { + cursor_name, + count, + with_properties, + }) } } diff --git a/src/storage/hummock_sdk/src/change_log.rs b/src/storage/hummock_sdk/src/change_log.rs index cf3ded58b946..10ae557159fa 100644 --- a/src/storage/hummock_sdk/src/change_log.rs +++ b/src/storage/hummock_sdk/src/change_log.rs @@ -101,7 +101,7 @@ impl TableChangeLogCommon { &self.0[start..end] } - /// Returns epochs where value is non-null and >= `min_epoch`. + // Returns epochs where value is non-null and >= `min_epoch`. pub fn get_non_empty_epochs(&self, min_epoch: u64, max_count: usize) -> Vec { self.filter_epoch((min_epoch, u64::MAX)) .iter() diff --git a/src/storage/hummock_sdk/src/frontend_version.rs b/src/storage/hummock_sdk/src/frontend_version.rs index 2086832d0771..6c24676c4ea1 100644 --- a/src/storage/hummock_sdk/src/frontend_version.rs +++ b/src/storage/hummock_sdk/src/frontend_version.rs @@ -14,10 +14,11 @@ use std::collections::{HashMap, HashSet}; +use itertools::Itertools; use risingwave_common::catalog::TableId; use risingwave_pb::hummock::hummock_version_delta::PbChangeLogDelta; use risingwave_pb::hummock::{ - PbEpochNewChangeLog, PbHummockVersion, PbHummockVersionDelta, PbTableChangeLog, + PbEpochNewChangeLog, PbHummockVersion, PbHummockVersionDelta, PbSstableInfo, PbTableChangeLog, StateTableInfoDelta, }; @@ -166,8 +167,8 @@ impl FrontendHummockVersionDelta { truncate_epoch: change_log_delta.truncate_epoch, new_log: change_log_delta.new_log.as_ref().map(|new_log| { EpochNewChangeLogCommon { - new_value: vec![], - old_value: vec![], + new_value: new_log.new_value.iter().map(|_| ()).collect(), + old_value: new_log.new_value.iter().map(|_| ()).collect(), epochs: new_log.epochs.clone(), } }), @@ -199,8 +200,16 @@ impl FrontendHummockVersionDelta { table_id.table_id, PbChangeLogDelta { new_log: delta.new_log.as_ref().map(|new_log| PbEpochNewChangeLog { - old_value: vec![], - new_value: vec![], + old_value: new_log + .old_value + .iter() + .map(|_| mock_pb_sstable_info()) + .collect_vec(), + new_value: new_log + .new_value + .iter() + .map(|_| mock_pb_sstable_info()) + .collect_vec(), epochs: new_log.epochs.clone(), }), truncate_epoch: delta.truncate_epoch, @@ -241,8 +250,8 @@ impl FrontendHummockVersionDelta { truncate_epoch: change_log_delta.truncate_epoch, new_log: change_log_delta.new_log.as_ref().map(|new_log| { EpochNewChangeLogCommon { - new_value: vec![], - old_value: vec![], + new_value: new_log.new_value.iter().map(|_| ()).collect(), + old_value: new_log.old_value.iter().map(|_| ()).collect(), epochs: new_log.epochs.clone(), } }), @@ -253,3 +262,22 @@ impl FrontendHummockVersionDelta { } } } + +fn mock_pb_sstable_info() -> PbSstableInfo { + PbSstableInfo { + object_id: 0, + sst_id: 0, + key_range: None, + file_size: 0, + table_ids: vec![], + meta_offset: 0, + stale_key_count: 0, + total_key_count: 0, + min_epoch: 0, + max_epoch: 0, + uncompressed_file_size: 0, + range_tombstone_count: 0, + bloom_filter_kind: 0, + sst_size: 0, + } +} From ca927e8d79181ed2e3b512404c7339661e0726a4 Mon Sep 17 00:00:00 2001 From: xxhZs <1060434431@qq.com> Date: Tue, 24 Sep 2024 18:23:13 +0800 Subject: [PATCH 2/8] add ci --- e2e_test/subscription/main.py | 124 ++++++++++++++------- src/frontend/src/session/cursor_manager.rs | 3 +- 2 files changed, 85 insertions(+), 42 deletions(-) diff --git a/e2e_test/subscription/main.py b/e2e_test/subscription/main.py index 60db0df82c33..87c2b6e7f4ab 100644 --- a/e2e_test/subscription/main.py +++ b/e2e_test/subscription/main.py @@ -1,5 +1,6 @@ import subprocess import psycopg2 +import threading import time @@ -54,9 +55,9 @@ def test_cursor_snapshot(): ) execute_insert("declare cur subscription cursor for sub full",conn) - row = execute_query("fetch next from cur",conn) + row = execute_query("fetch next from cur with (timeout = '2s')",conn) check_rows_data([1,2],row[0],"Insert") - row = execute_query("fetch next from cur",conn) + row = execute_query("fetch next from cur with (timeout = '2s')",conn) assert row == [] execute_insert("close cur",conn) drop_table_subscription() @@ -73,19 +74,19 @@ def test_cursor_snapshot_log_store(): ) execute_insert("declare cur subscription cursor for sub full",conn) - row = execute_query("fetch next from cur",conn) + row = execute_query("fetch next from cur with (timeout = '2s')",conn) check_rows_data([1,2],row[0],"Insert") - row = execute_query("fetch next from cur",conn) + row = execute_query("fetch next from cur with (timeout = '2s')",conn) assert row == [] execute_insert("insert into t1 values(4,4)",conn) execute_insert("flush",conn) execute_insert("insert into t1 values(5,5)",conn) execute_insert("flush",conn) - row = execute_query("fetch next from cur",conn) + row = execute_query("fetch next from cur with (timeout = '2s')",conn) check_rows_data([4,4],row[0],"Insert") - row = execute_query("fetch next from cur",conn) + row = execute_query("fetch next from cur with (timeout = '2s')",conn) check_rows_data([5,5],row[0],"Insert") - row = execute_query("fetch next from cur",conn) + row = execute_query("fetch next from cur with (timeout = '2s')",conn) assert row == [] execute_insert("close cur",conn) drop_table_subscription() @@ -107,13 +108,13 @@ def test_cursor_since_begin(): execute_insert("declare cur subscription cursor for sub since begin()",conn) execute_insert("insert into t1 values(6,6)",conn) execute_insert("flush",conn) - row = execute_query("fetch next from cur",conn) + row = execute_query("fetch next from cur with (timeout = '2s')",conn) check_rows_data([4,4],row[0],"Insert") - row = execute_query("fetch next from cur",conn) + row = execute_query("fetch next from cur with (timeout = '2s')",conn) check_rows_data([5,5],row[0],"Insert") - row = execute_query("fetch next from cur",conn) + row = execute_query("fetch next from cur with (timeout = '2s')",conn) check_rows_data([6,6],row[0],"Insert") - row = execute_query("fetch next from cur",conn) + row = execute_query("fetch next from cur with (timeout = '2s')",conn) assert row == [] execute_insert("close cur",conn) drop_table_subscription() @@ -136,9 +137,9 @@ def test_cursor_since_now(): time.sleep(2) execute_insert("insert into t1 values(6,6)",conn) execute_insert("flush",conn) - row = execute_query("fetch next from cur",conn) + row = execute_query("fetch next from cur with (timeout = '2s')",conn) check_rows_data([6,6],row[0],"Insert") - row = execute_query("fetch next from cur",conn) + row = execute_query("fetch next from cur with (timeout = '2s')",conn) assert row == [] execute_insert("close cur",conn) drop_table_subscription() @@ -161,9 +162,9 @@ def test_cursor_without_since(): time.sleep(2) execute_insert("insert into t1 values(6,6)",conn) execute_insert("flush",conn) - row = execute_query("fetch next from cur",conn) + row = execute_query("fetch next from cur with (timeout = '2s')",conn) check_rows_data([6,6],row[0],"Insert") - row = execute_query("fetch next from cur",conn) + row = execute_query("fetch next from cur with (timeout = '2s')",conn) assert row == [] execute_insert("close cur",conn) drop_table_subscription() @@ -185,34 +186,34 @@ def test_cursor_since_rw_timestamp(): execute_insert("declare cur subscription cursor for sub since begin()",conn) execute_insert("insert into t1 values(6,6)",conn) execute_insert("flush",conn) - row = execute_query("fetch next from cur",conn) + row = execute_query("fetch next from cur with (timeout = '2s')",conn) valuelen = len(row[0]) rw_timestamp_1 = row[0][valuelen - 1] check_rows_data([4,4],row[0],"Insert") - row = execute_query("fetch next from cur",conn) + row = execute_query("fetch next from cur with (timeout = '2s')",conn) valuelen = len(row[0]) rw_timestamp_2 = row[0][valuelen - 1] - 1 check_rows_data([5,5],row[0],"Insert") - row = execute_query("fetch next from cur",conn) + row = execute_query("fetch next from cur with (timeout = '2s')",conn) valuelen = len(row[0]) rw_timestamp_3 = row[0][valuelen - 1] + 1 check_rows_data([6,6],row[0],"Insert") - row = execute_query("fetch next from cur",conn) + row = execute_query("fetch next from cur with (timeout = '2s')",conn) assert row == [] execute_insert("close cur",conn) execute_insert(f"declare cur subscription cursor for sub since {rw_timestamp_1}",conn) - row = execute_query("fetch next from cur",conn) + row = execute_query("fetch next from cur with (timeout = '2s')",conn) check_rows_data([4,4],row[0],"Insert") execute_insert("close cur",conn) execute_insert(f"declare cur subscription cursor for sub since {rw_timestamp_2}",conn) - row = execute_query("fetch next from cur",conn) + row = execute_query("fetch next from cur with (timeout = '2s')",conn) check_rows_data([5,5],row[0],"Insert") execute_insert("close cur",conn) execute_insert(f"declare cur subscription cursor for sub since {rw_timestamp_3}",conn) - row = execute_query("fetch next from cur",conn) + row = execute_query("fetch next from cur with (timeout = '2s')",conn) assert row == [] execute_insert("close cur",conn) @@ -229,29 +230,29 @@ def test_cursor_op(): ) execute_insert("declare cur subscription cursor for sub full",conn) - row = execute_query("fetch next from cur",conn) + row = execute_query("fetch next from cur with (timeout = '2s')",conn) check_rows_data([1,2],row[0],"Insert") - row = execute_query("fetch next from cur",conn) + row = execute_query("fetch next from cur with (timeout = '2s')",conn) assert row == [] execute_insert("insert into t1 values(4,4)",conn) execute_insert("flush",conn) execute_insert("update t1 set v2 = 10 where v1 = 4",conn) execute_insert("flush",conn) - row = execute_query("fetch next from cur",conn) + row = execute_query("fetch next from cur with (timeout = '2s')",conn) check_rows_data([4,4],row[0],"Insert") - row = execute_query("fetch next from cur",conn) + row = execute_query("fetch next from cur with (timeout = '2s')",conn) check_rows_data([4,4],row[0],"UpdateDelete") - row = execute_query("fetch next from cur",conn) + row = execute_query("fetch next from cur with (timeout = '2s')",conn) check_rows_data([4,10],row[0],"UpdateInsert") - row = execute_query("fetch next from cur",conn) + row = execute_query("fetch next from cur with (timeout = '2s')",conn) assert row == [] execute_insert("delete from t1 where v1 = 4",conn) execute_insert("flush",conn) - row = execute_query("fetch next from cur",conn) + row = execute_query("fetch next from cur with (timeout = '2s')",conn) check_rows_data([4,10],row[0],"Delete") - row = execute_query("fetch next from cur",conn) + row = execute_query("fetch next from cur with (timeout = '2s')",conn) assert row == [] execute_insert("close cur",conn) @@ -271,22 +272,18 @@ def test_cursor_with_table_alter(): execute_insert("alter table t1 add v3 int",conn) execute_insert("insert into t1 values(4,4,4)",conn) execute_insert("flush",conn) - row = execute_query("fetch next from cur",conn) + row = execute_query("fetch next from cur with (timeout = '2s')",conn) check_rows_data([1,2],row[0],"Insert") - row = execute_query("fetch next from cur",conn) - assert(row == []) - row = execute_query("fetch next from cur",conn) + row = execute_query("fetch next from cur with (timeout = '2s')",conn) check_rows_data([4,4,4],row[0],"Insert") execute_insert("insert into t1 values(5,5,5)",conn) execute_insert("flush",conn) - row = execute_query("fetch next from cur",conn) + row = execute_query("fetch next from cur with (timeout = '2s')",conn) check_rows_data([5,5,5],row[0],"Insert") execute_insert("alter table t1 drop column v2",conn) execute_insert("insert into t1 values(6,6)",conn) execute_insert("flush",conn) - row = execute_query("fetch next from cur",conn) - assert(row == []) - row = execute_query("fetch next from cur",conn) + row = execute_query("fetch next from cur with (timeout = '2s')",conn) check_rows_data([6,6],row[0],"Insert") drop_table_subscription() @@ -317,7 +314,7 @@ def test_cursor_fetch_n(): execute_insert("flush",conn) execute_insert("update t1 set v2 = 100 where v1 = 10",conn) execute_insert("flush",conn) - row = execute_query("fetch 6 from cur",conn) + row = execute_query("fetch 6 from cur with (timeout = '5s')",conn) assert len(row) == 6 check_rows_data([1,2],row[0],"Insert") check_rows_data([4,4],row[1],"Insert") @@ -325,7 +322,7 @@ def test_cursor_fetch_n(): check_rows_data([6,6],row[3],"Insert") check_rows_data([7,7],row[4],"Insert") check_rows_data([8,8],row[5],"Insert") - row = execute_query("fetch 6 from cur",conn) + row = execute_query("fetch 6 from cur with (timeout = '5s')",conn) assert len(row) == 4 check_rows_data([9,9],row[0],"Insert") check_rows_data([10,10],row[1],"Insert") @@ -348,13 +345,57 @@ def test_rebuild_table(): execute_insert("flush",conn) execute_insert("update t2 set v2 = 100 where v1 = 1",conn) execute_insert("flush",conn) - row = execute_query("fetch 4 from cur",conn) + row = execute_query("fetch 4 from cur with (timeout = '5s')",conn) assert len(row) == 3 check_rows_data([1,1],row[0],"Insert") check_rows_data([1,1],row[1],"UpdateDelete") check_rows_data([1,100],row[2],"UpdateInsert") drop_table_subscription() +def test_blcok_cursor(): + print(f"test_blcok_cursor") + create_table_subscription() + conn = psycopg2.connect( + host="localhost", + port="4566", + user="root", + database="dev" + ) + + execute_insert("declare cur subscription cursor for sub2 full",conn) + execute_insert("insert into t2 values(1,1)",conn) + execute_insert("flush",conn) + execute_insert("update t2 set v2 = 100 where v1 = 1",conn) + execute_insert("flush",conn) + row = execute_query("fetch 100 from cur",conn) + assert len(row) == 3 + check_rows_data([1,1],row[0],"Insert") + check_rows_data([1,1],row[1],"UpdateDelete") + check_rows_data([1,100],row[2],"UpdateInsert") + + # Test block cursor fetches data successfully + thread = threading.Thread(target=insert_into_table) + thread.start() + row = execute_query("fetch 100 from cur",conn) + check_rows_data([10,10],row[0],"Insert") + thread.join() + + # Test block cursor timeout + row = execute_query("fetch 100 from cur with (timeout = '5s')",conn) + assert row == [] + + drop_table_subscription() + +def insert_into_table(): + time.sleep(2) + conn = psycopg2.connect( + host="localhost", + port="4566", + user="root", + database="dev" + ) + execute_insert("insert into t2 values(10,10)",conn) + if __name__ == "__main__": test_cursor_snapshot() test_cursor_op() @@ -366,3 +407,4 @@ def test_rebuild_table(): test_cursor_with_table_alter() test_cursor_fetch_n() test_rebuild_table() + test_blcok_cursor() diff --git a/src/frontend/src/session/cursor_manager.rs b/src/frontend/src/session/cursor_manager.rs index 758b3ac45c9a..5faa360269d5 100644 --- a/src/frontend/src/session/cursor_manager.rs +++ b/src/frontend/src/session/cursor_manager.rs @@ -456,7 +456,6 @@ impl SubscriptionCursor { let mut ans = Vec::with_capacity(std::cmp::min(100, count) as usize); let mut cur = 0; - let desc = self.fields.iter().map(to_pg_field).collect(); if let State::Fetch { from_snapshot, chunk_stream, @@ -500,6 +499,8 @@ impl SubscriptionCursor { } } self.last_fetch = Instant::now(); + let desc = self.fields.iter().map(to_pg_field).collect(); + println!("ans: {:?},{:?}", ans,desc); Ok((ans, desc)) } From 895eab4756a33dc0a56103e331fb6214ad8877a9 Mon Sep 17 00:00:00 2001 From: xxhZs <1060434431@qq.com> Date: Wed, 25 Sep 2024 13:00:51 +0800 Subject: [PATCH 3/8] fix notify fix notify fmt --- e2e_test/subscription/main.py | 2 ++ src/frontend/src/observer/observer_manager.rs | 31 ++++++++++++++++--- src/frontend/src/session/cursor_manager.rs | 24 +++++++------- 3 files changed, 40 insertions(+), 17 deletions(-) diff --git a/e2e_test/subscription/main.py b/e2e_test/subscription/main.py index 87c2b6e7f4ab..6107d072fc82 100644 --- a/e2e_test/subscription/main.py +++ b/e2e_test/subscription/main.py @@ -275,6 +275,8 @@ def test_cursor_with_table_alter(): row = execute_query("fetch next from cur with (timeout = '2s')",conn) check_rows_data([1,2],row[0],"Insert") row = execute_query("fetch next from cur with (timeout = '2s')",conn) + assert row == [] + row = execute_query("fetch next from cur with (timeout = '2s')",conn) check_rows_data([4,4,4],row[0],"Insert") execute_insert("insert into t1 values(5,5,5)",conn) execute_insert("flush",conn) diff --git a/src/frontend/src/observer/observer_manager.rs b/src/frontend/src/observer/observer_manager.rs index 920c36ab887d..cc0c4cd93b7f 100644 --- a/src/frontend/src/observer/observer_manager.rs +++ b/src/frontend/src/observer/observer_manager.rs @@ -92,8 +92,19 @@ impl ObserverState for FrontendObserverNode { let table_ids = deltas .version_deltas .iter() - .flat_map(|version_deltas| version_deltas.change_log_delta.keys()) - .map(|table_id| TableId::new(*table_id)) + .flat_map(|version_deltas| &version_deltas.change_log_delta) + .filter_map(|(table_id, change_log)| match change_log.new_log.as_ref() { + Some(new_log) => { + let new_value_empty = new_log.new_value.is_empty(); + let old_value_empty = new_log.old_value.is_empty(); + if !new_value_empty || !old_value_empty { + Some(TableId::new(table_id.clone())) + } else { + None + } + } + None => None, + }) .collect_vec(); self.handle_hummock_snapshot_notification(deltas); self.handle_cursor_notify(table_ids); @@ -203,8 +214,14 @@ impl ObserverState for FrontendObserverNode { let hummock_version = FrontendHummockVersion::from_protobuf(hummock_version.unwrap()); let table_ids = hummock_version .table_change_log - .keys() - .cloned() + .iter() + .filter_map(|(table_id, change_log)| { + if change_log.get_non_empty_epochs(0, usize::MAX).is_empty() { + None + } else { + Some(table_id.clone()) + } + }) .collect_vec(); self.hummock_snapshot_manager.init(hummock_version); self.handle_cursor_notify(table_ids); @@ -496,6 +513,9 @@ impl FrontendObserverNode { } fn handle_cursor_notify(&self, table_ids: Vec) { + if table_ids.is_empty() { + return; + } for session in self.sessions_map.read().values() { session .get_cursor_manager() @@ -505,6 +525,9 @@ impl FrontendObserverNode { } fn handle_cursor_remove_table_ids(&self, table_ids: Vec) { + if table_ids.is_empty() { + return; + } for session in self.sessions_map.read().values() { session .get_cursor_manager() diff --git a/src/frontend/src/session/cursor_manager.rs b/src/frontend/src/session/cursor_manager.rs index 9118579dddf5..616eb2a56938 100644 --- a/src/frontend/src/session/cursor_manager.rs +++ b/src/frontend/src/session/cursor_manager.rs @@ -31,7 +31,7 @@ use risingwave_common::error::BoxedError; use risingwave_common::session_config::QueryMode; use risingwave_common::types::DataType; use risingwave_sqlparser::ast::{Ident, ObjectName, Statement}; -use tokio::sync::watch::{channel, Receiver, Sender}; +use tokio::sync::broadcast::{channel, Sender}; use tokio::sync::{mpsc, oneshot, RwLock}; use super::SessionImpl; @@ -921,9 +921,8 @@ enum CursorNotifyState { RemoveTables(Vec), NotifyTables(Vec), } -pub type CursorNotifyMapRef = Arc, Receiver<()>)>>>; pub struct CursorNotifies { - cursor_notify_map: CursorNotifyMapRef, + cursor_notify_map: Arc>>>, sender: mpsc::UnboundedSender, shutdown_tx: Option>, } @@ -932,9 +931,7 @@ impl CursorNotifies { pub fn new() -> Self { let (sender, mut receiver) = mpsc::unbounded_channel(); let (shutdown_tx, mut shutdown_rx) = oneshot::channel(); - let cursor_notify_map = Arc::new(RwLock::new( - HashMap::, Receiver<()>)>::new(), - )); + let cursor_notify_map = Arc::new(RwLock::new(HashMap::>::new())); let cursor_notify_map_clone = cursor_notify_map.clone(); tokio::spawn(async move { loop { @@ -953,8 +950,9 @@ impl CursorNotifies { Some(CursorNotifyState::NotifyTables(table_ids)) => { let cursor_notify_map_clone_read = cursor_notify_map_clone.read().await; for table_id in table_ids { - if let Some((tx,_rx)) = cursor_notify_map_clone_read.get(&table_id.table_id()) { - tx.send(()).unwrap(); + if let Some(tx) = cursor_notify_map_clone_read.get(&table_id.table_id()) { + // Maybe there's no cursor. + tx.send(()).ok(); } } } @@ -976,18 +974,18 @@ impl CursorNotifies { pub async fn wait_next_epoch(&self, id: u32, timeout_seconds: Option) -> Result<()> { let mut rx = { let mut cursor_notify_map_write = self.cursor_notify_map.write().await; - let (_tx, rx) = cursor_notify_map_write.entry(id).or_insert_with(|| { - let (tx, rx) = channel(()); - (tx, rx) + let tx = cursor_notify_map_write.entry(id).or_insert_with(|| { + let (tx, _rx) = channel(10); + tx }); - rx.clone() + tx.subscribe() }; let mut timeout_interval = tokio::time::interval(Duration::from_secs(timeout_seconds.unwrap_or(u64::MAX))); timeout_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay); timeout_interval.tick().await; tokio::select! { - result = rx.changed() => { + result = rx.recv() => { result.map_err(|_| { ErrorCode::InternalError(format!("Cursor dependent table deleted: table_id is {:?}", id)) })?; From c41303951745106e99a7249082f2256edf14518b Mon Sep 17 00:00:00 2001 From: xxhZs <1060434431@qq.com> Date: Wed, 25 Sep 2024 13:41:06 +0800 Subject: [PATCH 4/8] fmt rename --- src/frontend/src/catalog/root_catalog.rs | 4 +-- src/frontend/src/observer/observer_manager.rs | 16 +++++----- src/frontend/src/session/cursor_manager.rs | 31 ++++++++++--------- 3 files changed, 27 insertions(+), 24 deletions(-) diff --git a/src/frontend/src/catalog/root_catalog.rs b/src/frontend/src/catalog/root_catalog.rs index f052452ae074..112277d0adcc 100644 --- a/src/frontend/src/catalog/root_catalog.rs +++ b/src/frontend/src/catalog/root_catalog.rs @@ -287,7 +287,7 @@ impl Catalog { } } - pub fn get_all_tables_id_in_database(&self, db_id: DatabaseId) -> Vec { + pub fn get_all_table_ids_in_database(&self, db_id: DatabaseId) -> Vec { if let Ok(database) = self.get_database_by_id(&db_id) { database.iter_all_table_ids().collect() } else { @@ -303,7 +303,7 @@ impl Catalog { }); } - pub fn get_all_tables_id_in_schema( + pub fn get_all_table_ids_in_schema( &self, db_id: DatabaseId, schema_id: SchemaId, diff --git a/src/frontend/src/observer/observer_manager.rs b/src/frontend/src/observer/observer_manager.rs index cc0c4cd93b7f..b442a494bf6e 100644 --- a/src/frontend/src/observer/observer_manager.rs +++ b/src/frontend/src/observer/observer_manager.rs @@ -98,7 +98,7 @@ impl ObserverState for FrontendObserverNode { let new_value_empty = new_log.new_value.is_empty(); let old_value_empty = new_log.old_value.is_empty(); if !new_value_empty || !old_value_empty { - Some(TableId::new(table_id.clone())) + Some(TableId::new(*table_id)) } else { None } @@ -219,7 +219,7 @@ impl ObserverState for FrontendObserverNode { if change_log.get_non_empty_epochs(0, usize::MAX).is_empty() { None } else { - Some(table_id.clone()) + Some(*table_id) } }) .collect_vec(); @@ -283,7 +283,7 @@ impl FrontendObserverNode { Info::Database(database) => match resp.operation() { Operation::Add => catalog_guard.create_database(database), Operation::Delete => { - let table_ids = catalog_guard.get_all_tables_id_in_database(database.id); + let table_ids = catalog_guard.get_all_table_ids_in_database(database.id); catalog_guard.drop_database(database.id); self.handle_cursor_remove_table_ids(table_ids); } @@ -294,7 +294,7 @@ impl FrontendObserverNode { Operation::Add => catalog_guard.create_schema(schema), Operation::Delete => { let table_ids = - catalog_guard.get_all_tables_id_in_schema(schema.database_id, schema.id); + catalog_guard.get_all_table_ids_in_schema(schema.database_id, schema.id); catalog_guard.drop_schema(schema.database_id, schema.id); self.handle_cursor_remove_table_ids(table_ids); } @@ -519,8 +519,8 @@ impl FrontendObserverNode { for session in self.sessions_map.read().values() { session .get_cursor_manager() - .get_cursor_notifies() - .notify_cursors(&table_ids); + .get_cursor_notifier() + .notify_cursors_by_table_ids(&table_ids); } } @@ -531,8 +531,8 @@ impl FrontendObserverNode { for session in self.sessions_map.read().values() { session .get_cursor_manager() - .get_cursor_notifies() - .remove_tables_ids(&table_ids); + .get_cursor_notifier() + .remove_table_ids(&table_ids); } } diff --git a/src/frontend/src/session/cursor_manager.rs b/src/frontend/src/session/cursor_manager.rs index 616eb2a56938..0f79d71b55d6 100644 --- a/src/frontend/src/session/cursor_manager.rs +++ b/src/frontend/src/session/cursor_manager.rs @@ -236,7 +236,7 @@ pub struct SubscriptionCursor { // and will be reset each time it is created chunk_stream, this is to avoid changes in the catalog due to alter. fields: Vec, cursor_metrics: Arc, - cursor_notifies: Arc, + cursor_notifier: Arc, last_fetch: Instant, } @@ -248,7 +248,7 @@ impl SubscriptionCursor { dependent_table_id: TableId, handle_args: &HandlerArgs, cursor_metrics: Arc, - cursor_notifies: Arc, + cursor_notifier: Arc, ) -> Result { let (state, fields) = if let Some(start_timestamp) = start_timestamp { let table_catalog = handle_args.session.get_table_by_id(&dependent_table_id)?; @@ -310,7 +310,7 @@ impl SubscriptionCursor { fields, cursor_metrics, last_fetch: Instant::now(), - cursor_notifies, + cursor_notifier, }) } @@ -488,7 +488,9 @@ impl SubscriptionCursor { } None => { if cur == 0 { - self.cursor_notifies + // It's only blocked when there's no data + // This method will only be called once, either to trigger a timeout or to get the return value in the next loop via `next_row`. + self.cursor_notifier .wait_next_epoch(self.dependent_table_id.table_id(), timeout_seconds) .await?; } else { @@ -496,6 +498,7 @@ impl SubscriptionCursor { } } } + // Timeout, return with current value if let Some(timeout_instant) = timeout_instant && Instant::now() > timeout_instant { @@ -704,7 +707,7 @@ impl SubscriptionCursor { pub struct CursorManager { cursor_map: tokio::sync::Mutex>, - cursor_notifies: Arc, + cursor_notifier: Arc, cursor_metrics: Arc, } @@ -713,12 +716,12 @@ impl CursorManager { Self { cursor_map: tokio::sync::Mutex::new(HashMap::new()), cursor_metrics, - cursor_notifies: Arc::new(CursorNotifies::new()), + cursor_notifier: Arc::new(CursorNotifier::new()), } } - pub fn get_cursor_notifies(&self) -> Arc { - self.cursor_notifies.clone() + pub fn get_cursor_notifier(&self) -> Arc { + self.cursor_notifier.clone() } pub async fn add_subscription_cursor( @@ -738,7 +741,7 @@ impl CursorManager { dependent_table_id, handle_args, self.cursor_metrics.clone(), - self.cursor_notifies.clone(), + self.cursor_notifier.clone(), ) .await?; let mut cursor_map = self.cursor_map.lock().await; @@ -921,13 +924,13 @@ enum CursorNotifyState { RemoveTables(Vec), NotifyTables(Vec), } -pub struct CursorNotifies { +pub struct CursorNotifier { cursor_notify_map: Arc>>>, sender: mpsc::UnboundedSender, shutdown_tx: Option>, } -impl CursorNotifies { +impl CursorNotifier { pub fn new() -> Self { let (sender, mut receiver) = mpsc::unbounded_channel(); let (shutdown_tx, mut shutdown_rx) = oneshot::channel(); @@ -997,19 +1000,19 @@ impl CursorNotifies { Ok(()) } - pub fn notify_cursors(&self, ids: &Vec) { + pub fn notify_cursors_by_table_ids(&self, ids: &Vec) { self.sender .send(CursorNotifyState::NotifyTables(ids.clone())) .unwrap(); } - pub fn remove_tables_ids(&self, ids: &Vec) { + pub fn remove_table_ids(&self, ids: &Vec) { self.sender .send(CursorNotifyState::RemoveTables(ids.clone())) .unwrap(); } } -impl Drop for CursorNotifies { +impl Drop for CursorNotifier { fn drop(&mut self) { if let Some(shutdown_tx) = mem::take(&mut self.shutdown_tx) { shutdown_tx.send(()).ok(); From c47a2e8fce4a1a5bbd4f422b816be2b18460215f Mon Sep 17 00:00:00 2001 From: xxhZs <1060434431@qq.com> Date: Wed, 25 Sep 2024 15:05:01 +0800 Subject: [PATCH 5/8] fix ci --- e2e_test/subscription/main.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/e2e_test/subscription/main.py b/e2e_test/subscription/main.py index 6107d072fc82..2ba715742eaa 100644 --- a/e2e_test/subscription/main.py +++ b/e2e_test/subscription/main.py @@ -286,6 +286,8 @@ def test_cursor_with_table_alter(): execute_insert("insert into t1 values(6,6)",conn) execute_insert("flush",conn) row = execute_query("fetch next from cur with (timeout = '2s')",conn) + assert row == [] + row = execute_query("fetch next from cur with (timeout = '2s')",conn) check_rows_data([6,6],row[0],"Insert") drop_table_subscription() From 4d80f98e8f48ca1a24b0ef068067c85027ca2b1d Mon Sep 17 00:00:00 2001 From: xxhZs <1060434431@qq.com> Date: Fri, 27 Sep 2024 17:18:10 +0800 Subject: [PATCH 6/8] fix comm fix comm fmt --- e2e_test/subscription/main.py | 52 +++---- src/frontend/src/catalog/root_catalog.rs | 20 --- .../src/catalog/subscription_catalog.rs | 4 +- src/frontend/src/handler/fetch_cursor.rs | 13 +- src/frontend/src/handler/util.rs | 6 +- src/frontend/src/observer/observer_manager.rs | 92 ++----------- src/frontend/src/scheduler/snapshot.rs | 98 ++++++++++++- src/frontend/src/session.rs | 1 - src/frontend/src/session/cursor_manager.rs | 129 ++---------------- src/storage/hummock_sdk/src/change_log.rs | 2 +- .../hummock_sdk/src/frontend_version.rs | 40 +----- 11 files changed, 172 insertions(+), 285 deletions(-) diff --git a/e2e_test/subscription/main.py b/e2e_test/subscription/main.py index 2ba715742eaa..2c7703e04615 100644 --- a/e2e_test/subscription/main.py +++ b/e2e_test/subscription/main.py @@ -55,7 +55,7 @@ def test_cursor_snapshot(): ) execute_insert("declare cur subscription cursor for sub full",conn) - row = execute_query("fetch next from cur with (timeout = '2s')",conn) + row = execute_query("fetch next from cur",conn) check_rows_data([1,2],row[0],"Insert") row = execute_query("fetch next from cur with (timeout = '2s')",conn) assert row == [] @@ -74,7 +74,7 @@ def test_cursor_snapshot_log_store(): ) execute_insert("declare cur subscription cursor for sub full",conn) - row = execute_query("fetch next from cur with (timeout = '2s')",conn) + row = execute_query("fetch next from cur",conn) check_rows_data([1,2],row[0],"Insert") row = execute_query("fetch next from cur with (timeout = '2s')",conn) assert row == [] @@ -82,9 +82,9 @@ def test_cursor_snapshot_log_store(): execute_insert("flush",conn) execute_insert("insert into t1 values(5,5)",conn) execute_insert("flush",conn) - row = execute_query("fetch next from cur with (timeout = '2s')",conn) + row = execute_query("fetch next from cur",conn) check_rows_data([4,4],row[0],"Insert") - row = execute_query("fetch next from cur with (timeout = '2s')",conn) + row = execute_query("fetch next from cur",conn) check_rows_data([5,5],row[0],"Insert") row = execute_query("fetch next from cur with (timeout = '2s')",conn) assert row == [] @@ -108,11 +108,11 @@ def test_cursor_since_begin(): execute_insert("declare cur subscription cursor for sub since begin()",conn) execute_insert("insert into t1 values(6,6)",conn) execute_insert("flush",conn) - row = execute_query("fetch next from cur with (timeout = '2s')",conn) + row = execute_query("fetch next from cur",conn) check_rows_data([4,4],row[0],"Insert") - row = execute_query("fetch next from cur with (timeout = '2s')",conn) + row = execute_query("fetch next from cur",conn) check_rows_data([5,5],row[0],"Insert") - row = execute_query("fetch next from cur with (timeout = '2s')",conn) + row = execute_query("fetch next from cur",conn) check_rows_data([6,6],row[0],"Insert") row = execute_query("fetch next from cur with (timeout = '2s')",conn) assert row == [] @@ -137,7 +137,7 @@ def test_cursor_since_now(): time.sleep(2) execute_insert("insert into t1 values(6,6)",conn) execute_insert("flush",conn) - row = execute_query("fetch next from cur with (timeout = '2s')",conn) + row = execute_query("fetch next from cur",conn) check_rows_data([6,6],row[0],"Insert") row = execute_query("fetch next from cur with (timeout = '2s')",conn) assert row == [] @@ -162,7 +162,7 @@ def test_cursor_without_since(): time.sleep(2) execute_insert("insert into t1 values(6,6)",conn) execute_insert("flush",conn) - row = execute_query("fetch next from cur with (timeout = '2s')",conn) + row = execute_query("fetch next from cur",conn) check_rows_data([6,6],row[0],"Insert") row = execute_query("fetch next from cur with (timeout = '2s')",conn) assert row == [] @@ -186,15 +186,15 @@ def test_cursor_since_rw_timestamp(): execute_insert("declare cur subscription cursor for sub since begin()",conn) execute_insert("insert into t1 values(6,6)",conn) execute_insert("flush",conn) - row = execute_query("fetch next from cur with (timeout = '2s')",conn) + row = execute_query("fetch next from cur",conn) valuelen = len(row[0]) rw_timestamp_1 = row[0][valuelen - 1] check_rows_data([4,4],row[0],"Insert") - row = execute_query("fetch next from cur with (timeout = '2s')",conn) + row = execute_query("fetch next from cur",conn) valuelen = len(row[0]) rw_timestamp_2 = row[0][valuelen - 1] - 1 check_rows_data([5,5],row[0],"Insert") - row = execute_query("fetch next from cur with (timeout = '2s')",conn) + row = execute_query("fetch next from cur",conn) valuelen = len(row[0]) rw_timestamp_3 = row[0][valuelen - 1] + 1 check_rows_data([6,6],row[0],"Insert") @@ -203,12 +203,12 @@ def test_cursor_since_rw_timestamp(): execute_insert("close cur",conn) execute_insert(f"declare cur subscription cursor for sub since {rw_timestamp_1}",conn) - row = execute_query("fetch next from cur with (timeout = '2s')",conn) + row = execute_query("fetch next from cur",conn) check_rows_data([4,4],row[0],"Insert") execute_insert("close cur",conn) execute_insert(f"declare cur subscription cursor for sub since {rw_timestamp_2}",conn) - row = execute_query("fetch next from cur with (timeout = '2s')",conn) + row = execute_query("fetch next from cur",conn) check_rows_data([5,5],row[0],"Insert") execute_insert("close cur",conn) @@ -230,7 +230,7 @@ def test_cursor_op(): ) execute_insert("declare cur subscription cursor for sub full",conn) - row = execute_query("fetch next from cur with (timeout = '2s')",conn) + row = execute_query("fetch next from cur",conn) check_rows_data([1,2],row[0],"Insert") row = execute_query("fetch next from cur with (timeout = '2s')",conn) assert row == [] @@ -239,18 +239,18 @@ def test_cursor_op(): execute_insert("flush",conn) execute_insert("update t1 set v2 = 10 where v1 = 4",conn) execute_insert("flush",conn) - row = execute_query("fetch next from cur with (timeout = '2s')",conn) + row = execute_query("fetch next from cur",conn) check_rows_data([4,4],row[0],"Insert") - row = execute_query("fetch next from cur with (timeout = '2s')",conn) + row = execute_query("fetch next from cur",conn) check_rows_data([4,4],row[0],"UpdateDelete") - row = execute_query("fetch next from cur with (timeout = '2s')",conn) + row = execute_query("fetch next from cur",conn) check_rows_data([4,10],row[0],"UpdateInsert") row = execute_query("fetch next from cur with (timeout = '2s')",conn) assert row == [] execute_insert("delete from t1 where v1 = 4",conn) execute_insert("flush",conn) - row = execute_query("fetch next from cur with (timeout = '2s')",conn) + row = execute_query("fetch next from cur",conn) check_rows_data([4,10],row[0],"Delete") row = execute_query("fetch next from cur with (timeout = '2s')",conn) assert row == [] @@ -272,22 +272,22 @@ def test_cursor_with_table_alter(): execute_insert("alter table t1 add v3 int",conn) execute_insert("insert into t1 values(4,4,4)",conn) execute_insert("flush",conn) - row = execute_query("fetch next from cur with (timeout = '2s')",conn) + row = execute_query("fetch next from cur",conn) check_rows_data([1,2],row[0],"Insert") row = execute_query("fetch next from cur with (timeout = '2s')",conn) assert row == [] - row = execute_query("fetch next from cur with (timeout = '2s')",conn) + row = execute_query("fetch next from cur",conn) check_rows_data([4,4,4],row[0],"Insert") execute_insert("insert into t1 values(5,5,5)",conn) execute_insert("flush",conn) - row = execute_query("fetch next from cur with (timeout = '2s')",conn) + row = execute_query("fetch next from cur",conn) check_rows_data([5,5,5],row[0],"Insert") execute_insert("alter table t1 drop column v2",conn) execute_insert("insert into t1 values(6,6)",conn) execute_insert("flush",conn) row = execute_query("fetch next from cur with (timeout = '2s')",conn) assert row == [] - row = execute_query("fetch next from cur with (timeout = '2s')",conn) + row = execute_query("fetch next from cur",conn) check_rows_data([6,6],row[0],"Insert") drop_table_subscription() @@ -318,7 +318,7 @@ def test_cursor_fetch_n(): execute_insert("flush",conn) execute_insert("update t1 set v2 = 100 where v1 = 10",conn) execute_insert("flush",conn) - row = execute_query("fetch 6 from cur with (timeout = '5s')",conn) + row = execute_query("fetch 6 from cur",conn) assert len(row) == 6 check_rows_data([1,2],row[0],"Insert") check_rows_data([4,4],row[1],"Insert") @@ -326,7 +326,7 @@ def test_cursor_fetch_n(): check_rows_data([6,6],row[3],"Insert") check_rows_data([7,7],row[4],"Insert") check_rows_data([8,8],row[5],"Insert") - row = execute_query("fetch 6 from cur with (timeout = '5s')",conn) + row = execute_query("fetch 6 from cur",conn) assert len(row) == 4 check_rows_data([9,9],row[0],"Insert") check_rows_data([10,10],row[1],"Insert") @@ -349,7 +349,7 @@ def test_rebuild_table(): execute_insert("flush",conn) execute_insert("update t2 set v2 = 100 where v1 = 1",conn) execute_insert("flush",conn) - row = execute_query("fetch 4 from cur with (timeout = '5s')",conn) + row = execute_query("fetch 4 from cur",conn) assert len(row) == 3 check_rows_data([1,1],row[0],"Insert") check_rows_data([1,1],row[1],"UpdateDelete") diff --git a/src/frontend/src/catalog/root_catalog.rs b/src/frontend/src/catalog/root_catalog.rs index 112277d0adcc..85c76927be77 100644 --- a/src/frontend/src/catalog/root_catalog.rs +++ b/src/frontend/src/catalog/root_catalog.rs @@ -287,14 +287,6 @@ impl Catalog { } } - pub fn get_all_table_ids_in_database(&self, db_id: DatabaseId) -> Vec { - if let Ok(database) = self.get_database_by_id(&db_id) { - database.iter_all_table_ids().collect() - } else { - vec![] - } - } - pub fn drop_database(&mut self, db_id: DatabaseId) { let name = self.db_name_by_id.remove(&db_id).unwrap(); let database = self.database_by_name.remove(&name).unwrap(); @@ -303,18 +295,6 @@ impl Catalog { }); } - pub fn get_all_table_ids_in_schema( - &self, - db_id: DatabaseId, - schema_id: SchemaId, - ) -> Vec { - if let Ok(schema) = self.get_schema_by_id(&db_id, &schema_id) { - schema.iter_all_table_ids().cloned().collect() - } else { - vec![] - } - } - pub fn drop_schema(&mut self, db_id: DatabaseId, schema_id: SchemaId) { self.get_database_mut(db_id).unwrap().drop_schema(schema_id); } diff --git a/src/frontend/src/catalog/subscription_catalog.rs b/src/frontend/src/catalog/subscription_catalog.rs index b7129ca709a9..93c726953c75 100644 --- a/src/frontend/src/catalog/subscription_catalog.rs +++ b/src/frontend/src/catalog/subscription_catalog.rs @@ -19,7 +19,7 @@ use risingwave_pb::catalog::PbSubscription; use super::OwnedByUserCatalog; use crate::error::{ErrorCode, Result}; -use crate::handler::util::convert_interval_to_logstore_u64; +use crate::handler::util::convert_interval_to_u64_seconds; use crate::WithOptions; #[derive(Clone, Debug, PartialEq, Eq, Hash)] @@ -83,7 +83,7 @@ impl SubscriptionCatalog { let retention_seconds_str = properties.get("retention").ok_or_else(|| { ErrorCode::InternalError("Subscription retention time not set.".to_string()) })?; - let retention_seconds = convert_interval_to_logstore_u64(retention_seconds_str)?; + let retention_seconds = convert_interval_to_u64_seconds(retention_seconds_str)?; self.retention_seconds = retention_seconds; Ok(()) } diff --git a/src/frontend/src/handler/fetch_cursor.rs b/src/frontend/src/handler/fetch_cursor.rs index 7e29ab611db5..8759e7c5f591 100644 --- a/src/frontend/src/handler/fetch_cursor.rs +++ b/src/frontend/src/handler/fetch_cursor.rs @@ -22,7 +22,7 @@ use risingwave_sqlparser::ast::{FetchCursorStatement, Statement}; use super::extended_handle::{PortalResult, PrepareStatement, PreparedResult}; use super::query::BoundResult; -use super::util::convert_interval_to_logstore_u64; +use super::util::convert_interval_to_u64_seconds; use super::RwPgResponse; use crate::binder::BoundStatement; use crate::error::Result; @@ -63,11 +63,20 @@ pub async fn handle_fetch_cursor( Binder::resolve_schema_qualified_name(db_name, stmt.cursor_name.clone())?; let with_options = WithOptions::try_from(stmt.with_properties.0.as_slice())?; + + if with_options.len() > 1 { + bail_not_implemented!("only `timeout` is supported in with options") + } + let timeout_seconds = with_options .get("timeout") - .map(convert_interval_to_logstore_u64) + .map(convert_interval_to_u64_seconds) .transpose()?; + if with_options.len() == 1 && timeout_seconds.is_none() { + bail_not_implemented!("only `timeout` is supported in with options") + } + let cursor_manager = session.get_cursor_manager(); let (rows, pg_descs) = cursor_manager diff --git a/src/frontend/src/handler/util.rs b/src/frontend/src/handler/util.rs index 6a3b76b65a83..5551154bd8ec 100644 --- a/src/frontend/src/handler/util.rs +++ b/src/frontend/src/handler/util.rs @@ -242,8 +242,8 @@ pub fn convert_logstore_u64_to_unix_millis(logstore_u64: u64) -> u64 { Epoch::from(logstore_u64).as_unix_millis() } -pub fn convert_interval_to_logstore_u64(interval: &String) -> RwResult { - let retention_seconds = (Interval::from_str(interval) +pub fn convert_interval_to_u64_seconds(interval: &String) -> RwResult { + let seconds = (Interval::from_str(interval) .map_err(|err| { ErrorCode::InternalError(format!( "Covert interval to u64 error, please check format, error: {:?}", @@ -252,7 +252,7 @@ pub fn convert_interval_to_logstore_u64(interval: &String) -> RwResult { })? .epoch_in_micros() / 1000000) as u64; - Ok(retention_seconds) + Ok(seconds) } #[cfg(test)] diff --git a/src/frontend/src/observer/observer_manager.rs b/src/frontend/src/observer/observer_manager.rs index b442a494bf6e..b1ecf4182d1d 100644 --- a/src/frontend/src/observer/observer_manager.rs +++ b/src/frontend/src/observer/observer_manager.rs @@ -18,7 +18,7 @@ use std::sync::Arc; use itertools::Itertools; use parking_lot::RwLock; use risingwave_batch::worker_manager::worker_node_manager::WorkerNodeManagerRef; -use risingwave_common::catalog::{CatalogVersion, TableId}; +use risingwave_common::catalog::CatalogVersion; use risingwave_common::hash::WorkerSlotMapping; use risingwave_common::secret::LocalSecretManager; use risingwave_common::session_config::SessionConfig; @@ -36,7 +36,6 @@ use tokio::sync::watch::Sender; use crate::catalog::root_catalog::Catalog; use crate::catalog::{FragmentId, SecretId}; use crate::scheduler::HummockSnapshotManagerRef; -use crate::session::SessionMapRef; use crate::user::user_manager::UserInfoManager; use crate::user::UserInfoVersion; @@ -50,7 +49,6 @@ pub struct FrontendObserverNode { system_params_manager: LocalSystemParamsManagerRef, session_params: Arc>, compute_client_pool: ComputeClientPoolRef, - sessions_map: SessionMapRef, } impl ObserverState for FrontendObserverNode { @@ -89,25 +87,7 @@ impl ObserverState for FrontendObserverNode { ) } Info::HummockVersionDeltas(deltas) => { - let table_ids = deltas - .version_deltas - .iter() - .flat_map(|version_deltas| &version_deltas.change_log_delta) - .filter_map(|(table_id, change_log)| match change_log.new_log.as_ref() { - Some(new_log) => { - let new_value_empty = new_log.new_value.is_empty(); - let old_value_empty = new_log.old_value.is_empty(); - if !new_value_empty || !old_value_empty { - Some(TableId::new(*table_id)) - } else { - None - } - } - None => None, - }) - .collect_vec(); self.handle_hummock_snapshot_notification(deltas); - self.handle_cursor_notify(table_ids); } Info::MetaBackupManifestId(_) => { panic!("frontend node should not receive MetaBackupManifestId"); @@ -211,20 +191,10 @@ impl ObserverState for FrontendObserverNode { convert_worker_slot_mapping(&streaming_worker_slot_mappings), convert_worker_slot_mapping(&serving_worker_slot_mappings), ); - let hummock_version = FrontendHummockVersion::from_protobuf(hummock_version.unwrap()); - let table_ids = hummock_version - .table_change_log - .iter() - .filter_map(|(table_id, change_log)| { - if change_log.get_non_empty_epochs(0, usize::MAX).is_empty() { - None - } else { - Some(*table_id) - } - }) - .collect_vec(); - self.hummock_snapshot_manager.init(hummock_version); - self.handle_cursor_notify(table_ids); + self.hummock_snapshot_manager + .init(FrontendHummockVersion::from_protobuf( + hummock_version.unwrap(), + )); let snapshot_version = version.unwrap(); catalog_guard.set_version(snapshot_version.catalog_version); @@ -252,7 +222,6 @@ impl FrontendObserverNode { system_params_manager: LocalSystemParamsManagerRef, session_params: Arc>, compute_client_pool: ComputeClientPoolRef, - sessions_map: SessionMapRef, ) -> Self { Self { worker_node_manager, @@ -264,7 +233,6 @@ impl FrontendObserverNode { system_params_manager, session_params, compute_client_pool, - sessions_map, } } @@ -282,22 +250,13 @@ impl FrontendObserverNode { match info { Info::Database(database) => match resp.operation() { Operation::Add => catalog_guard.create_database(database), - Operation::Delete => { - let table_ids = catalog_guard.get_all_table_ids_in_database(database.id); - catalog_guard.drop_database(database.id); - self.handle_cursor_remove_table_ids(table_ids); - } + Operation::Delete => catalog_guard.drop_database(database.id), Operation::Update => catalog_guard.update_database(database), _ => panic!("receive an unsupported notify {:?}", resp), }, Info::Schema(schema) => match resp.operation() { Operation::Add => catalog_guard.create_schema(schema), - Operation::Delete => { - let table_ids = - catalog_guard.get_all_table_ids_in_schema(schema.database_id, schema.id); - catalog_guard.drop_schema(schema.database_id, schema.id); - self.handle_cursor_remove_table_ids(table_ids); - } + Operation::Delete => catalog_guard.drop_schema(schema.database_id, schema.id), Operation::Update => catalog_guard.update_schema(schema), _ => panic!("receive an unsupported notify {:?}", resp), }, @@ -309,14 +268,11 @@ impl FrontendObserverNode { match relation { RelationInfo::Table(table) => match resp.operation() { Operation::Add => catalog_guard.create_table(table), - Operation::Delete => { - catalog_guard.drop_table( - table.database_id, - table.schema_id, - table.id.into(), - ); - self.handle_cursor_remove_table_ids(vec![table.id.into()]); - } + Operation::Delete => catalog_guard.drop_table( + table.database_id, + table.schema_id, + table.id.into(), + ), Operation::Update => { let old_fragment_id = catalog_guard .get_any_table_by_id(&table.id.into()) @@ -512,30 +468,6 @@ impl FrontendObserverNode { self.hummock_snapshot_manager.update(deltas); } - fn handle_cursor_notify(&self, table_ids: Vec) { - if table_ids.is_empty() { - return; - } - for session in self.sessions_map.read().values() { - session - .get_cursor_manager() - .get_cursor_notifier() - .notify_cursors_by_table_ids(&table_ids); - } - } - - fn handle_cursor_remove_table_ids(&self, table_ids: Vec) { - if table_ids.is_empty() { - return; - } - for session in self.sessions_map.read().values() { - session - .get_cursor_manager() - .get_cursor_notifier() - .remove_table_ids(&table_ids); - } - } - fn handle_secret_notification(&mut self, resp: SubscribeResponse) { let resp_op = resp.operation(); let Some(Info::Secret(secret)) = resp.info else { diff --git a/src/frontend/src/scheduler/snapshot.rs b/src/frontend/src/scheduler/snapshot.rs index e108c96352e9..62facfc4ff86 100644 --- a/src/frontend/src/scheduler/snapshot.rs +++ b/src/frontend/src/scheduler/snapshot.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +use core::time::Duration; use std::collections::{HashMap, HashSet}; use std::sync::Arc; @@ -26,6 +27,7 @@ use risingwave_pb::common::{batch_query_epoch, BatchQueryEpoch}; use risingwave_pb::hummock::{HummockVersionDeltas, StateTableInfoDelta}; use tokio::sync::watch; +use crate::error::{ErrorCode, RwError}; use crate::expr::InlineNowProcTime; use crate::meta_client::FrontendMetaClient; use crate::scheduler::SchedulerError; @@ -173,6 +175,14 @@ pub struct HummockSnapshotManager { /// `current_epoch` is always in the shared buffer, so it will never be gc before the data /// of `committed_epoch`. latest_snapshot: watch::Sender, + + table_change_log_notification_sender: watch::Sender, +} + +#[derive(Default)] +struct TableChangeLogNotificationMsg { + updated_change_log_table_ids: HashSet, + deleted_table_ids: HashSet, } pub type HummockSnapshotManagerRef = Arc; @@ -185,7 +195,13 @@ impl HummockSnapshotManager { let (latest_snapshot, _) = watch::channel(latest_snapshot); - Self { latest_snapshot } + let (table_change_log_notification_sender, _) = + watch::channel(TableChangeLogNotificationMsg::default()); + + Self { + latest_snapshot, + table_change_log_notification_sender, + } } /// Acquire the latest snapshot by increasing its reference count. @@ -194,6 +210,24 @@ impl HummockSnapshotManager { } pub fn init(&self, version: FrontendHummockVersion) { + let updated_change_log_table_ids: HashSet<_> = version + .table_change_log + .iter() + .filter_map(|(table_id, change_log)| { + if change_log.get_non_empty_epochs(0, usize::MAX).is_empty() { + None + } else { + Some(table_id.table_id()) + } + }) + .collect(); + self.table_change_log_notification_sender + .send(TableChangeLogNotificationMsg { + updated_change_log_table_ids, + deleted_table_ids: Default::default(), + }) + .ok(); + self.update_inner(|_| Some(version)); } @@ -201,6 +235,35 @@ impl HummockSnapshotManager { /// /// Should only be called by the observer manager. pub fn update(&self, deltas: HummockVersionDeltas) { + let updated_change_log_table_ids: HashSet<_> = deltas + .version_deltas + .iter() + .flat_map(|version_deltas| &version_deltas.change_log_delta) + .filter_map(|(table_id, change_log)| match change_log.new_log.as_ref() { + Some(new_log) => { + let new_value_empty = new_log.new_value.is_empty(); + let old_value_empty = new_log.old_value.is_empty(); + if !new_value_empty || !old_value_empty { + Some(*table_id) + } else { + None + } + } + None => None, + }) + .collect(); + let deleted_table_ids: HashSet<_> = deltas + .version_deltas + .iter() + .flat_map(|version_deltas| version_deltas.removed_table_ids.clone()) + .collect(); + self.table_change_log_notification_sender + .send(TableChangeLogNotificationMsg { + updated_change_log_table_ids, + deleted_table_ids, + }) + .ok(); + self.update_inner(|old_snapshot| { if deltas.version_deltas.is_empty() { return None; @@ -260,4 +323,37 @@ impl HummockSnapshotManager { rx.changed().await.unwrap(); } } + + pub async fn wait_table_change_log_notification( + &self, + table_id: u32, + timeout_seconds: Option, + ) -> Result<(), RwError> { + let mut rx = self.table_change_log_notification_sender.subscribe(); + let mut timeout_interval = + tokio::time::interval(Duration::from_secs(timeout_seconds.unwrap_or(u64::MAX))); + timeout_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay); + timeout_interval.tick().await; + loop { + tokio::select! { + result = rx.changed() => { + result.map_err(|_| { + ErrorCode::InternalError("cursor notify channel is closed.".into()) + })?; + let table_change_log_notification_msg = rx.borrow_and_update(); + if table_change_log_notification_msg.deleted_table_ids.contains(&table_id) { + return Err(ErrorCode::InternalError(format!("Cursor dependent table deleted: table_id is {:?}", table_id)).into()); + } + if table_change_log_notification_msg.updated_change_log_table_ids.contains(&table_id) { + break; + } + } + _ = timeout_interval.tick() => { + tracing::debug!("Cursor wait next epoch timeout"); + break; + } + } + } + Ok(()) + } } diff --git a/src/frontend/src/session.rs b/src/frontend/src/session.rs index d51944c74290..696da6ae8616 100644 --- a/src/frontend/src/session.rs +++ b/src/frontend/src/session.rs @@ -360,7 +360,6 @@ impl FrontendEnv { system_params_manager.clone(), session_params.clone(), compute_client_pool.clone(), - sessions_map.clone(), ); let observer_manager = ObserverManager::new_with_meta_client(meta_client.clone(), frontend_observer_node) diff --git a/src/frontend/src/session/cursor_manager.rs b/src/frontend/src/session/cursor_manager.rs index 0f79d71b55d6..8a961db50f6b 100644 --- a/src/frontend/src/session/cursor_manager.rs +++ b/src/frontend/src/session/cursor_manager.rs @@ -31,8 +31,6 @@ use risingwave_common::error::BoxedError; use risingwave_common::session_config::QueryMode; use risingwave_common::types::DataType; use risingwave_sqlparser::ast::{Ident, ObjectName, Statement}; -use tokio::sync::broadcast::{channel, Sender}; -use tokio::sync::{mpsc, oneshot, RwLock}; use super::SessionImpl; use crate::catalog::subscription_catalog::SubscriptionCatalog; @@ -236,7 +234,6 @@ pub struct SubscriptionCursor { // and will be reset each time it is created chunk_stream, this is to avoid changes in the catalog due to alter. fields: Vec, cursor_metrics: Arc, - cursor_notifier: Arc, last_fetch: Instant, } @@ -248,7 +245,6 @@ impl SubscriptionCursor { dependent_table_id: TableId, handle_args: &HandlerArgs, cursor_metrics: Arc, - cursor_notifier: Arc, ) -> Result { let (state, fields) = if let Some(start_timestamp) = start_timestamp { let table_catalog = handle_args.session.get_table_by_id(&dependent_table_id)?; @@ -310,7 +306,6 @@ impl SubscriptionCursor { fields, cursor_metrics, last_fetch: Instant::now(), - cursor_notifier, }) } @@ -458,6 +453,7 @@ impl SubscriptionCursor { .into()); } + let session = &handle_args.session; let mut ans = Vec::with_capacity(std::cmp::min(100, count) as usize); let mut cur = 0; if let State::Fetch { @@ -471,7 +467,7 @@ impl SubscriptionCursor { formats, from_snapshot, &self.fields, - handle_args.session.clone(), + session.clone(), ); } while cur < count { @@ -490,14 +486,24 @@ impl SubscriptionCursor { if cur == 0 { // It's only blocked when there's no data // This method will only be called once, either to trigger a timeout or to get the return value in the next loop via `next_row`. - self.cursor_notifier - .wait_next_epoch(self.dependent_table_id.table_id(), timeout_seconds) + session + .env + .hummock_snapshot_manager() + .wait_table_change_log_notification( + self.dependent_table_id.table_id(), + timeout_seconds, + ) .await?; } else { break; } } } + println!( + "timeout_instant: {:?},{:?}", + timeout_instant, + Instant::now() + ); // Timeout, return with current value if let Some(timeout_instant) = timeout_instant && Instant::now() > timeout_instant @@ -707,7 +713,6 @@ impl SubscriptionCursor { pub struct CursorManager { cursor_map: tokio::sync::Mutex>, - cursor_notifier: Arc, cursor_metrics: Arc, } @@ -716,14 +721,9 @@ impl CursorManager { Self { cursor_map: tokio::sync::Mutex::new(HashMap::new()), cursor_metrics, - cursor_notifier: Arc::new(CursorNotifier::new()), } } - pub fn get_cursor_notifier(&self) -> Arc { - self.cursor_notifier.clone() - } - pub async fn add_subscription_cursor( &self, cursor_name: String, @@ -741,7 +741,6 @@ impl CursorManager { dependent_table_id, handle_args, self.cursor_metrics.clone(), - self.cursor_notifier.clone(), ) .await?; let mut cursor_map = self.cursor_map.lock().await; @@ -919,103 +918,3 @@ impl CursorManager { ) } } - -enum CursorNotifyState { - RemoveTables(Vec), - NotifyTables(Vec), -} -pub struct CursorNotifier { - cursor_notify_map: Arc>>>, - sender: mpsc::UnboundedSender, - shutdown_tx: Option>, -} - -impl CursorNotifier { - pub fn new() -> Self { - let (sender, mut receiver) = mpsc::unbounded_channel(); - let (shutdown_tx, mut shutdown_rx) = oneshot::channel(); - let cursor_notify_map = Arc::new(RwLock::new(HashMap::>::new())); - let cursor_notify_map_clone = cursor_notify_map.clone(); - tokio::spawn(async move { - loop { - tokio::select! { - _ = &mut shutdown_rx => { - break; - } - cursor_notify_state = receiver.recv() => { - match cursor_notify_state { - Some(CursorNotifyState::RemoveTables(table_ids)) => { - for table_id in table_ids { - let mut cursor_notify_map_clone_write = cursor_notify_map_clone.write().await; - cursor_notify_map_clone_write.remove(&table_id.table_id()); - } - } - Some(CursorNotifyState::NotifyTables(table_ids)) => { - let cursor_notify_map_clone_read = cursor_notify_map_clone.read().await; - for table_id in table_ids { - if let Some(tx) = cursor_notify_map_clone_read.get(&table_id.table_id()) { - // Maybe there's no cursor. - tx.send(()).ok(); - } - } - } - None => { - break; - } - } - } - } - } - }); - Self { - cursor_notify_map, - sender, - shutdown_tx: Some(shutdown_tx), - } - } - - pub async fn wait_next_epoch(&self, id: u32, timeout_seconds: Option) -> Result<()> { - let mut rx = { - let mut cursor_notify_map_write = self.cursor_notify_map.write().await; - let tx = cursor_notify_map_write.entry(id).or_insert_with(|| { - let (tx, _rx) = channel(10); - tx - }); - tx.subscribe() - }; - let mut timeout_interval = - tokio::time::interval(Duration::from_secs(timeout_seconds.unwrap_or(u64::MAX))); - timeout_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay); - timeout_interval.tick().await; - tokio::select! { - result = rx.recv() => { - result.map_err(|_| { - ErrorCode::InternalError(format!("Cursor dependent table deleted: table_id is {:?}", id)) - })?; - } - _ = timeout_interval.tick() => { - tracing::debug!("Cursor wait next epoch timeout"); - } - } - Ok(()) - } - - pub fn notify_cursors_by_table_ids(&self, ids: &Vec) { - self.sender - .send(CursorNotifyState::NotifyTables(ids.clone())) - .unwrap(); - } - - pub fn remove_table_ids(&self, ids: &Vec) { - self.sender - .send(CursorNotifyState::RemoveTables(ids.clone())) - .unwrap(); - } -} -impl Drop for CursorNotifier { - fn drop(&mut self) { - if let Some(shutdown_tx) = mem::take(&mut self.shutdown_tx) { - shutdown_tx.send(()).ok(); - } - } -} diff --git a/src/storage/hummock_sdk/src/change_log.rs b/src/storage/hummock_sdk/src/change_log.rs index 10ae557159fa..cf3ded58b946 100644 --- a/src/storage/hummock_sdk/src/change_log.rs +++ b/src/storage/hummock_sdk/src/change_log.rs @@ -101,7 +101,7 @@ impl TableChangeLogCommon { &self.0[start..end] } - // Returns epochs where value is non-null and >= `min_epoch`. + /// Returns epochs where value is non-null and >= `min_epoch`. pub fn get_non_empty_epochs(&self, min_epoch: u64, max_count: usize) -> Vec { self.filter_epoch((min_epoch, u64::MAX)) .iter() diff --git a/src/storage/hummock_sdk/src/frontend_version.rs b/src/storage/hummock_sdk/src/frontend_version.rs index 93d637978a65..d263bd5b54af 100644 --- a/src/storage/hummock_sdk/src/frontend_version.rs +++ b/src/storage/hummock_sdk/src/frontend_version.rs @@ -14,7 +14,6 @@ use std::collections::{HashMap, HashSet}; -use itertools::Itertools; use risingwave_common::catalog::TableId; use risingwave_common::util::epoch::INVALID_EPOCH; use risingwave_pb::hummock::hummock_version_delta::PbChangeLogDelta; @@ -162,8 +161,8 @@ impl FrontendHummockVersionDelta { truncate_epoch: change_log_delta.truncate_epoch, new_log: change_log_delta.new_log.as_ref().map(|new_log| { EpochNewChangeLogCommon { - new_value: new_log.new_value.iter().map(|_| ()).collect(), - old_value: new_log.new_value.iter().map(|_| ()).collect(), + new_value: vec![(); new_log.new_value.len()], + old_value: vec![(); new_log.old_value.len()], epochs: new_log.epochs.clone(), } }), @@ -195,16 +194,8 @@ impl FrontendHummockVersionDelta { table_id.table_id, PbChangeLogDelta { new_log: delta.new_log.as_ref().map(|new_log| PbEpochNewChangeLog { - old_value: new_log - .old_value - .iter() - .map(|_| mock_pb_sstable_info()) - .collect_vec(), - new_value: new_log - .new_value - .iter() - .map(|_| mock_pb_sstable_info()) - .collect_vec(), + old_value: vec![PbSstableInfo::default(); new_log.old_value.len()], + new_value: vec![PbSstableInfo::default(); new_log.new_value.len()], epochs: new_log.epochs.clone(), }), truncate_epoch: delta.truncate_epoch, @@ -244,8 +235,8 @@ impl FrontendHummockVersionDelta { truncate_epoch: change_log_delta.truncate_epoch, new_log: change_log_delta.new_log.as_ref().map(|new_log| { EpochNewChangeLogCommon { - new_value: new_log.new_value.iter().map(|_| ()).collect(), - old_value: new_log.old_value.iter().map(|_| ()).collect(), + new_value: vec![(); new_log.new_value.len()], + old_value: vec![(); new_log.old_value.len()], epochs: new_log.epochs.clone(), } }), @@ -256,22 +247,3 @@ impl FrontendHummockVersionDelta { } } } - -fn mock_pb_sstable_info() -> PbSstableInfo { - PbSstableInfo { - object_id: 0, - sst_id: 0, - key_range: None, - file_size: 0, - table_ids: vec![], - meta_offset: 0, - stale_key_count: 0, - total_key_count: 0, - min_epoch: 0, - max_epoch: 0, - uncompressed_file_size: 0, - range_tombstone_count: 0, - bloom_filter_kind: 0, - sst_size: 0, - } -} From bed212c776c6f50c2256ff2a39fa3293d8f32258 Mon Sep 17 00:00:00 2001 From: xxhZs <1060434431@qq.com> Date: Sun, 29 Sep 2024 12:59:41 +0800 Subject: [PATCH 7/8] remove print --- src/frontend/src/session/cursor_manager.rs | 5 ----- 1 file changed, 5 deletions(-) diff --git a/src/frontend/src/session/cursor_manager.rs b/src/frontend/src/session/cursor_manager.rs index 8a961db50f6b..7e561ae5e81f 100644 --- a/src/frontend/src/session/cursor_manager.rs +++ b/src/frontend/src/session/cursor_manager.rs @@ -499,11 +499,6 @@ impl SubscriptionCursor { } } } - println!( - "timeout_instant: {:?},{:?}", - timeout_instant, - Instant::now() - ); // Timeout, return with current value if let Some(timeout_instant) = timeout_instant && Instant::now() > timeout_instant From c3c7cce3524110e0831d4afc2ee0ab230183e123 Mon Sep 17 00:00:00 2001 From: xxhZs <1060434431@qq.com> Date: Tue, 8 Oct 2024 18:17:59 +0800 Subject: [PATCH 8/8] fix comm --- e2e_test/subscription/main.py | 38 ++++++++------- src/frontend/src/catalog/schema_catalog.rs | 4 -- src/frontend/src/scheduler/snapshot.rs | 47 ++++++++----------- src/frontend/src/session/cursor_manager.rs | 27 +++++++---- .../hummock_sdk/src/frontend_version.rs | 3 ++ 5 files changed, 60 insertions(+), 59 deletions(-) diff --git a/e2e_test/subscription/main.py b/e2e_test/subscription/main.py index 2c7703e04615..a3079e2e6847 100644 --- a/e2e_test/subscription/main.py +++ b/e2e_test/subscription/main.py @@ -57,7 +57,7 @@ def test_cursor_snapshot(): execute_insert("declare cur subscription cursor for sub full",conn) row = execute_query("fetch next from cur",conn) check_rows_data([1,2],row[0],"Insert") - row = execute_query("fetch next from cur with (timeout = '2s')",conn) + row = execute_query("fetch next from cur",conn) assert row == [] execute_insert("close cur",conn) drop_table_subscription() @@ -76,7 +76,7 @@ def test_cursor_snapshot_log_store(): execute_insert("declare cur subscription cursor for sub full",conn) row = execute_query("fetch next from cur",conn) check_rows_data([1,2],row[0],"Insert") - row = execute_query("fetch next from cur with (timeout = '2s')",conn) + row = execute_query("fetch next from cur",conn) assert row == [] execute_insert("insert into t1 values(4,4)",conn) execute_insert("flush",conn) @@ -86,7 +86,7 @@ def test_cursor_snapshot_log_store(): check_rows_data([4,4],row[0],"Insert") row = execute_query("fetch next from cur",conn) check_rows_data([5,5],row[0],"Insert") - row = execute_query("fetch next from cur with (timeout = '2s')",conn) + row = execute_query("fetch next from cur",conn) assert row == [] execute_insert("close cur",conn) drop_table_subscription() @@ -114,7 +114,7 @@ def test_cursor_since_begin(): check_rows_data([5,5],row[0],"Insert") row = execute_query("fetch next from cur",conn) check_rows_data([6,6],row[0],"Insert") - row = execute_query("fetch next from cur with (timeout = '2s')",conn) + row = execute_query("fetch next from cur",conn) assert row == [] execute_insert("close cur",conn) drop_table_subscription() @@ -139,7 +139,7 @@ def test_cursor_since_now(): execute_insert("flush",conn) row = execute_query("fetch next from cur",conn) check_rows_data([6,6],row[0],"Insert") - row = execute_query("fetch next from cur with (timeout = '2s')",conn) + row = execute_query("fetch next from cur",conn) assert row == [] execute_insert("close cur",conn) drop_table_subscription() @@ -164,7 +164,7 @@ def test_cursor_without_since(): execute_insert("flush",conn) row = execute_query("fetch next from cur",conn) check_rows_data([6,6],row[0],"Insert") - row = execute_query("fetch next from cur with (timeout = '2s')",conn) + row = execute_query("fetch next from cur",conn) assert row == [] execute_insert("close cur",conn) drop_table_subscription() @@ -198,7 +198,7 @@ def test_cursor_since_rw_timestamp(): valuelen = len(row[0]) rw_timestamp_3 = row[0][valuelen - 1] + 1 check_rows_data([6,6],row[0],"Insert") - row = execute_query("fetch next from cur with (timeout = '2s')",conn) + row = execute_query("fetch next from cur",conn) assert row == [] execute_insert("close cur",conn) @@ -213,7 +213,7 @@ def test_cursor_since_rw_timestamp(): execute_insert("close cur",conn) execute_insert(f"declare cur subscription cursor for sub since {rw_timestamp_3}",conn) - row = execute_query("fetch next from cur with (timeout = '2s')",conn) + row = execute_query("fetch next from cur",conn) assert row == [] execute_insert("close cur",conn) @@ -232,7 +232,7 @@ def test_cursor_op(): execute_insert("declare cur subscription cursor for sub full",conn) row = execute_query("fetch next from cur",conn) check_rows_data([1,2],row[0],"Insert") - row = execute_query("fetch next from cur with (timeout = '2s')",conn) + row = execute_query("fetch next from cur",conn) assert row == [] execute_insert("insert into t1 values(4,4)",conn) @@ -245,14 +245,14 @@ def test_cursor_op(): check_rows_data([4,4],row[0],"UpdateDelete") row = execute_query("fetch next from cur",conn) check_rows_data([4,10],row[0],"UpdateInsert") - row = execute_query("fetch next from cur with (timeout = '2s')",conn) + row = execute_query("fetch next from cur",conn) assert row == [] execute_insert("delete from t1 where v1 = 4",conn) execute_insert("flush",conn) row = execute_query("fetch next from cur",conn) check_rows_data([4,10],row[0],"Delete") - row = execute_query("fetch next from cur with (timeout = '2s')",conn) + row = execute_query("fetch next from cur",conn) assert row == [] execute_insert("close cur",conn) @@ -274,7 +274,7 @@ def test_cursor_with_table_alter(): execute_insert("flush",conn) row = execute_query("fetch next from cur",conn) check_rows_data([1,2],row[0],"Insert") - row = execute_query("fetch next from cur with (timeout = '2s')",conn) + row = execute_query("fetch next from cur",conn) assert row == [] row = execute_query("fetch next from cur",conn) check_rows_data([4,4,4],row[0],"Insert") @@ -285,7 +285,7 @@ def test_cursor_with_table_alter(): execute_insert("alter table t1 drop column v2",conn) execute_insert("insert into t1 values(6,6)",conn) execute_insert("flush",conn) - row = execute_query("fetch next from cur with (timeout = '2s')",conn) + row = execute_query("fetch next from cur",conn) assert row == [] row = execute_query("fetch next from cur",conn) check_rows_data([6,6],row[0],"Insert") @@ -356,8 +356,8 @@ def test_rebuild_table(): check_rows_data([1,100],row[2],"UpdateInsert") drop_table_subscription() -def test_blcok_cursor(): - print(f"test_blcok_cursor") +def test_block_cursor(): + print(f"test_block_cursor") create_table_subscription() conn = psycopg2.connect( host="localhost", @@ -371,7 +371,9 @@ def test_blcok_cursor(): execute_insert("flush",conn) execute_insert("update t2 set v2 = 100 where v1 = 1",conn) execute_insert("flush",conn) - row = execute_query("fetch 100 from cur",conn) + start_time = time.time() + row = execute_query("fetch 100 from cur with (timeout = '30s')",conn) + assert (time.time() - start_time) < 3 assert len(row) == 3 check_rows_data([1,1],row[0],"Insert") check_rows_data([1,1],row[1],"UpdateDelete") @@ -380,7 +382,7 @@ def test_blcok_cursor(): # Test block cursor fetches data successfully thread = threading.Thread(target=insert_into_table) thread.start() - row = execute_query("fetch 100 from cur",conn) + row = execute_query("fetch 100 from cur with (timeout = '5s')",conn) check_rows_data([10,10],row[0],"Insert") thread.join() @@ -411,4 +413,4 @@ def insert_into_table(): test_cursor_with_table_alter() test_cursor_fetch_n() test_rebuild_table() - test_blcok_cursor() + test_block_cursor() diff --git a/src/frontend/src/catalog/schema_catalog.rs b/src/frontend/src/catalog/schema_catalog.rs index be8db9ad66b2..0394da2a70f8 100644 --- a/src/frontend/src/catalog/schema_catalog.rs +++ b/src/frontend/src/catalog/schema_catalog.rs @@ -543,10 +543,6 @@ impl SchemaCatalog { .map(|(_, v)| v) } - pub fn iter_all_table_ids(&self) -> impl Iterator { - self.table_by_id.keys() - } - pub fn iter_internal_table(&self) -> impl Iterator> { self.table_by_name .iter() diff --git a/src/frontend/src/scheduler/snapshot.rs b/src/frontend/src/scheduler/snapshot.rs index 62facfc4ff86..3b1f92f3f20c 100644 --- a/src/frontend/src/scheduler/snapshot.rs +++ b/src/frontend/src/scheduler/snapshot.rs @@ -12,7 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -use core::time::Duration; use std::collections::{HashMap, HashSet}; use std::sync::Arc; @@ -324,34 +323,28 @@ impl HummockSnapshotManager { } } - pub async fn wait_table_change_log_notification( - &self, - table_id: u32, - timeout_seconds: Option, - ) -> Result<(), RwError> { + pub async fn wait_table_change_log_notification(&self, table_id: u32) -> Result<(), RwError> { let mut rx = self.table_change_log_notification_sender.subscribe(); - let mut timeout_interval = - tokio::time::interval(Duration::from_secs(timeout_seconds.unwrap_or(u64::MAX))); - timeout_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay); - timeout_interval.tick().await; loop { - tokio::select! { - result = rx.changed() => { - result.map_err(|_| { - ErrorCode::InternalError("cursor notify channel is closed.".into()) - })?; - let table_change_log_notification_msg = rx.borrow_and_update(); - if table_change_log_notification_msg.deleted_table_ids.contains(&table_id) { - return Err(ErrorCode::InternalError(format!("Cursor dependent table deleted: table_id is {:?}", table_id)).into()); - } - if table_change_log_notification_msg.updated_change_log_table_ids.contains(&table_id) { - break; - } - } - _ = timeout_interval.tick() => { - tracing::debug!("Cursor wait next epoch timeout"); - break; - } + rx.changed() + .await + .map_err(|_| ErrorCode::InternalError("cursor notify channel is closed.".into()))?; + let table_change_log_notification_msg = rx.borrow_and_update(); + if table_change_log_notification_msg + .deleted_table_ids + .contains(&table_id) + { + return Err(ErrorCode::InternalError(format!( + "Cursor dependent table deleted: table_id is {:?}", + table_id + )) + .into()); + } + if table_change_log_notification_msg + .updated_change_log_table_ids + .contains(&table_id) + { + break; } } Ok(()) diff --git a/src/frontend/src/session/cursor_manager.rs b/src/frontend/src/session/cursor_manager.rs index 7e561ae5e81f..47765182a279 100644 --- a/src/frontend/src/session/cursor_manager.rs +++ b/src/frontend/src/session/cursor_manager.rs @@ -483,19 +483,26 @@ impl SubscriptionCursor { ans.push(row); } None => { - if cur == 0 { - // It's only blocked when there's no data - // This method will only be called once, either to trigger a timeout or to get the return value in the next loop via `next_row`. + let timeout_seconds = timeout_seconds.unwrap_or(0); + if cur > 0 || timeout_seconds == 0 { + break; + } + // It's only blocked when there's no data + // This method will only be called once, either to trigger a timeout or to get the return value in the next loop via `next_row`. + match tokio::time::timeout( + Duration::from_secs(timeout_seconds), session .env .hummock_snapshot_manager() - .wait_table_change_log_notification( - self.dependent_table_id.table_id(), - timeout_seconds, - ) - .await?; - } else { - break; + .wait_table_change_log_notification(self.dependent_table_id.table_id()), + ) + .await + { + Ok(result) => result?, + Err(_) => { + tracing::debug!("Cursor wait next epoch timeout"); + break; + } } } } diff --git a/src/storage/hummock_sdk/src/frontend_version.rs b/src/storage/hummock_sdk/src/frontend_version.rs index d263bd5b54af..baab12293bac 100644 --- a/src/storage/hummock_sdk/src/frontend_version.rs +++ b/src/storage/hummock_sdk/src/frontend_version.rs @@ -161,6 +161,7 @@ impl FrontendHummockVersionDelta { truncate_epoch: change_log_delta.truncate_epoch, new_log: change_log_delta.new_log.as_ref().map(|new_log| { EpochNewChangeLogCommon { + // Here we need to determine if value is null but don't care what the value is, so we fill him in using `()` new_value: vec![(); new_log.new_value.len()], old_value: vec![(); new_log.old_value.len()], epochs: new_log.epochs.clone(), @@ -194,6 +195,7 @@ impl FrontendHummockVersionDelta { table_id.table_id, PbChangeLogDelta { new_log: delta.new_log.as_ref().map(|new_log| PbEpochNewChangeLog { + // Here we need to determine if value is null but don't care what the value is, so we fill him in using `PbSstableInfo::default()` old_value: vec![PbSstableInfo::default(); new_log.old_value.len()], new_value: vec![PbSstableInfo::default(); new_log.new_value.len()], epochs: new_log.epochs.clone(), @@ -235,6 +237,7 @@ impl FrontendHummockVersionDelta { truncate_epoch: change_log_delta.truncate_epoch, new_log: change_log_delta.new_log.as_ref().map(|new_log| { EpochNewChangeLogCommon { + // Here we need to determine if value is null but don't care what the value is, so we fill him in using `()` new_value: vec![(); new_log.new_value.len()], old_value: vec![(); new_log.old_value.len()], epochs: new_log.epochs.clone(),