From b856275c7fdc4a3f0349f9bbf2452a4d102b2b97 Mon Sep 17 00:00:00 2001 From: zwang28 <84491488@qq.com> Date: Sat, 10 Dec 2022 19:30:04 +0800 Subject: [PATCH] feat(batch): query historical epoch data --- src/common/src/session_config/mod.rs | 70 +++++- src/compute/src/server.rs | 2 +- src/ctl/src/common/hummock_service.rs | 8 +- src/frontend/src/handler/query.rs | 23 +- .../src/scheduler/distributed/query.rs | 11 +- .../scheduler/distributed/query_manager.rs | 4 +- .../src/scheduler/hummock_snapshot_manager.rs | 26 +- src/frontend/src/scheduler/local.rs | 7 +- src/meta/src/backup_restore/mod.rs | 4 + src/storage/src/hummock/backup_reader.rs | 63 +++++ .../src/hummock/compaction_group_client.rs | 234 ------------------ src/storage/src/hummock/error.rs | 4 + src/storage/src/hummock/mod.rs | 8 +- src/storage/src/hummock/state_store.rs | 16 +- src/storage/src/store_impl.rs | 8 +- src/tests/compaction_test/src/runner.rs | 10 +- 16 files changed, 229 insertions(+), 269 deletions(-) create mode 100644 src/storage/src/hummock/backup_reader.rs delete mode 100644 src/storage/src/hummock/compaction_group_client.rs diff --git a/src/common/src/session_config/mod.rs b/src/common/src/session_config/mod.rs index a567079a21d8d..9a875d8f55a13 100644 --- a/src/common/src/session_config/mod.rs +++ b/src/common/src/session_config/mod.rs @@ -24,10 +24,11 @@ pub use search_path::{SearchPath, USER_NAME_WILD_CARD}; use crate::error::{ErrorCode, RwError}; use crate::session_config::transaction_isolation_level::IsolationLevel; +use crate::util::epoch::Epoch; // This is a hack, &'static str is not allowed as a const generics argument. // TODO: refine this using the adt_const_params feature. -const CONFIG_KEYS: [&str; 10] = [ +const CONFIG_KEYS: [&str; 11] = [ "RW_IMPLICIT_FLUSH", "CREATE_COMPACTION_GROUP_FOR_MV", "QUERY_MODE", @@ -38,6 +39,7 @@ const CONFIG_KEYS: [&str; 10] = [ "MAX_SPLIT_RANGE_GAP", "SEARCH_PATH", "TRANSACTION ISOLATION LEVEL", + "QUERY_EPOCH", ]; // MUST HAVE 1v1 relationship to CONFIG_KEYS. e.g. CONFIG_KEYS[IMPLICIT_FLUSH] = @@ -52,6 +54,7 @@ const BATCH_ENABLE_LOOKUP_JOIN: usize = 6; const MAX_SPLIT_RANGE_GAP: usize = 7; const SEARCH_PATH: usize = 8; const TRANSACTION_ISOLATION_LEVEL: usize = 9; +const QUERY_EPOCH: usize = 10; trait ConfigEntry: Default + for<'a> TryFrom<&'a [&'a str], Error = RwError> { fn entry_name() -> &'static str; @@ -184,6 +187,51 @@ impl TryFrom<&[&str]> for ConfigI32(i64); + +impl Default for ConfigI64 { + fn default() -> Self { + ConfigI64(DEFAULT) + } +} + +impl Deref for ConfigI64 { + type Target = i64; + + fn deref(&self) -> &Self::Target { + &self.0 + } +} + +impl ConfigEntry for ConfigI64 { + fn entry_name() -> &'static str { + CONFIG_KEYS[NAME] + } +} + +impl TryFrom<&[&str]> for ConfigI64 { + type Error = RwError; + + fn try_from(value: &[&str]) -> Result { + if value.len() != 1 { + return Err(ErrorCode::InternalError(format!( + "SET {} takes only one argument", + Self::entry_name() + )) + .into()); + } + + let s = value[0]; + s.parse::().map(ConfigI64).map_err(|_e| { + ErrorCode::InvalidConfigValue { + config_entry: Self::entry_name().to_string(), + config_value: s.to_string(), + } + .into() + }) + } +} + pub struct VariableInfo { pub name: String, pub setting: String, @@ -198,6 +246,7 @@ type ExtraFloatDigit = ConfigI32; type DateStyle = ConfigString; type BatchEnableLookupJoin = ConfigBool; type MaxSplitRangeGap = ConfigI32; +type QueryEpoch = ConfigI64; #[derive(Default)] pub struct ConfigMap { @@ -234,6 +283,9 @@ pub struct ConfigMap { /// see transaction_isolation_level: IsolationLevel, + + /// select as of specific epoch + query_epoch: QueryEpoch, } impl ConfigMap { @@ -257,6 +309,8 @@ impl ConfigMap { self.max_split_range_gap = val.as_slice().try_into()?; } else if key.eq_ignore_ascii_case(SearchPath::entry_name()) { self.search_path = val.as_slice().try_into()?; + } else if key.eq_ignore_ascii_case(QueryEpoch::entry_name()) { + self.query_epoch = val.as_slice().try_into()?; } else { return Err(ErrorCode::UnrecognizedConfigurationParameter(key.to_string()).into()); } @@ -285,6 +339,8 @@ impl ConfigMap { Ok(self.search_path.to_string()) } else if key.eq_ignore_ascii_case(IsolationLevel::entry_name()) { Ok(self.transaction_isolation_level.to_string()) + } else if key.eq_ignore_ascii_case(QueryEpoch::entry_name()) { + Ok(self.query_epoch.to_string()) } else { Err(ErrorCode::UnrecognizedConfigurationParameter(key.to_string()).into()) } @@ -336,6 +392,11 @@ impl ConfigMap { name: SearchPath::entry_name().to_lowercase(), setting : self.search_path.to_string(), description : String::from("Sets the order in which schemas are searched when an object (table, data type, function, etc.) is referenced by a simple name with no schema specified") + }, + VariableInfo { + name: QueryEpoch::entry_name().to_lowercase(), + setting : self.query_epoch.to_string(), + description : String::from("Sets the historical epoch for querying data. If 0, querying latest data.") } ] } @@ -379,4 +440,11 @@ impl ConfigMap { pub fn get_search_path(&self) -> SearchPath { self.search_path.clone() } + + pub fn get_query_epoch(&self) -> Option { + if self.query_epoch.0 != 0 { + return Some((self.query_epoch.0 as u64).into()); + } + None + } } diff --git a/src/compute/src/server.rs b/src/compute/src/server.rs index c96101605c18b..3cf7b26be28f3 100644 --- a/src/compute/src/server.rs +++ b/src/compute/src/server.rs @@ -116,7 +116,7 @@ pub async fn compute_node_serve( let state_store = StateStoreImpl::new( &opts.state_store, &opts.file_cache_dir, - storage_config.clone(), + &config, hummock_meta_client.clone(), state_store_metrics.clone(), object_store_metrics, diff --git a/src/ctl/src/common/hummock_service.rs b/src/ctl/src/common/hummock_service.rs index 44d79ac522d63..12007216dfdca 100644 --- a/src/ctl/src/common/hummock_service.rs +++ b/src/ctl/src/common/hummock_service.rs @@ -17,7 +17,7 @@ use std::sync::Arc; use std::time::Duration; use anyhow::{anyhow, bail, Result}; -use risingwave_common::config::StorageConfig; +use risingwave_common::config::{RwConfig, StorageConfig}; use risingwave_rpc_client::MetaClient; use risingwave_storage::hummock::hummock_meta_client::MonitoredHummockMetaClient; use risingwave_storage::hummock::{HummockStorage, TieredCacheMetricsBuilder}; @@ -100,6 +100,10 @@ For `./risedev apply-compose-deploy` users, share_buffer_compaction_worker_threads_number: 0, ..Default::default() }; + let rw_config = RwConfig { + storage: config.clone(), + ..Default::default() + }; tracing::info!("using Hummock config: {:#?}", config); @@ -112,7 +116,7 @@ For `./risedev apply-compose-deploy` users, let state_store_impl = StateStoreImpl::new( &self.hummock_url, "", - Arc::new(config), + &rw_config, Arc::new(MonitoredHummockMetaClient::new( meta_client.clone(), metrics.hummock_metrics.clone(), diff --git a/src/frontend/src/handler/query.rs b/src/frontend/src/handler/query.rs index d3386c1ed760e..e0f1494418215 100644 --- a/src/frontend/src/handler/query.rs +++ b/src/frontend/src/handler/query.rs @@ -20,6 +20,7 @@ use itertools::Itertools; use pgwire::pg_field_descriptor::PgFieldDescriptor; use pgwire::pg_response::{PgResponse, StatementType}; use postgres_types::FromSql; +use risingwave_common::bail; use risingwave_common::catalog::Schema; use risingwave_common::error::{ErrorCode, Result, RwError}; use risingwave_common::session_config::QueryMode; @@ -35,7 +36,7 @@ use crate::planner::Planner; use crate::scheduler::plan_fragmenter::Query; use crate::scheduler::{ BatchPlanFragmenter, DistributedQueryStream, ExecutionContext, ExecutionContextRef, - HummockSnapshotGuard, LocalQueryExecution, LocalQueryStream, + LocalQueryExecution, LocalQueryStream, QueryHummockSnapshot, }; use crate::session::SessionImpl; use crate::PlanRef; @@ -133,17 +134,27 @@ pub async fn handle_query( let hummock_snapshot_manager = session.env().hummock_snapshot_manager(); let query_id = query.query_id().clone(); let pinned_snapshot = hummock_snapshot_manager.acquire(&query_id).await?; - + let mut query_snapshot = QueryHummockSnapshot::FrontendPinned(pinned_snapshot); + if let Some(query_epoch) = session.config().get_query_epoch() { + if query_epoch.0 > query_snapshot.get_committed_epoch() { + bail!( + "cannot query with future epoch: {}, current committed epoch: {}", + query_epoch.0, + query_snapshot.get_committed_epoch() + ); + } + query_snapshot = QueryHummockSnapshot::Other(query_epoch); + } match query_mode { QueryMode::Local => PgResponseStream::LocalQuery(DataChunkToRowSetAdapter::new( - local_execute(session.clone(), query, pinned_snapshot).await?, + local_execute(session.clone(), query, query_snapshot).await?, column_types, format, )), // Local mode do not support cancel tasks. QueryMode::Distributed => { PgResponseStream::DistributedQuery(DataChunkToRowSetAdapter::new( - distribute_execute(session.clone(), query, pinned_snapshot).await?, + distribute_execute(session.clone(), query, query_snapshot).await?, column_types, format, )) @@ -224,7 +235,7 @@ fn to_statement_type(stmt: &Statement) -> Result { pub async fn distribute_execute( session: Arc, query: Query, - pinned_snapshot: HummockSnapshotGuard, + pinned_snapshot: QueryHummockSnapshot, ) -> Result { let execution_context: ExecutionContextRef = ExecutionContext::new(session.clone()).into(); let query_manager = session.env().query_manager().clone(); @@ -238,7 +249,7 @@ pub async fn distribute_execute( pub async fn local_execute( session: Arc, query: Query, - pinned_snapshot: HummockSnapshotGuard, + pinned_snapshot: QueryHummockSnapshot, ) -> Result { let front_env = session.env(); diff --git a/src/frontend/src/scheduler/distributed/query.rs b/src/frontend/src/scheduler/distributed/query.rs index a7c52d92db219..4b58592fa8173 100644 --- a/src/frontend/src/scheduler/distributed/query.rs +++ b/src/frontend/src/scheduler/distributed/query.rs @@ -36,8 +36,7 @@ use crate::scheduler::distributed::StageExecution; use crate::scheduler::plan_fragmenter::{Query, StageId, ROOT_TASK_ID, ROOT_TASK_OUTPUT_ID}; use crate::scheduler::worker_node_manager::WorkerNodeManagerRef; use crate::scheduler::{ - ExecutionContextRef, HummockSnapshotGuard, PinnedHummockSnapshot, SchedulerError, - SchedulerResult, + ExecutionContextRef, QueryHummockSnapshot, SchedulerError, SchedulerResult, }; /// Message sent to a `QueryRunner` to control its execution. @@ -115,7 +114,7 @@ impl QueryExecution { &self, context: ExecutionContextRef, worker_node_manager: WorkerNodeManagerRef, - pinned_snapshot: HummockSnapshotGuard, + pinned_snapshot: QueryHummockSnapshot, compute_client_pool: ComputeClientPoolRef, catalog_reader: CatalogReader, query_execution_info: QueryExecutionInfoRef, @@ -189,7 +188,7 @@ impl QueryExecution { fn gen_stage_executions( &self, - pinned_snapshot: &PinnedHummockSnapshot, + pinned_snapshot: &QueryHummockSnapshot, context: ExecutionContextRef, worker_node_manager: WorkerNodeManagerRef, compute_client_pool: ComputeClientPoolRef, @@ -225,7 +224,7 @@ impl QueryExecution { } impl QueryRunner { - async fn run(mut self, pinned_snapshot: PinnedHummockSnapshot) { + async fn run(mut self, pinned_snapshot: QueryHummockSnapshot) { // Start leaf stages. let leaf_stages = self.query.leaf_stages(); for stage_id in &leaf_stages { @@ -424,7 +423,7 @@ pub(crate) mod tests { .start( ExecutionContext::new(SessionImpl::mock().into()).into(), worker_node_manager, - pinned_snapshot, + pinned_snapshot.into(), compute_client_pool, catalog_reader, query_execution_info, diff --git a/src/frontend/src/scheduler/distributed/query_manager.rs b/src/frontend/src/scheduler/distributed/query_manager.rs index c64280707c63f..2c4c327e4f462 100644 --- a/src/frontend/src/scheduler/distributed/query_manager.rs +++ b/src/frontend/src/scheduler/distributed/query_manager.rs @@ -34,7 +34,7 @@ use crate::catalog::catalog_service::CatalogReader; use crate::scheduler::plan_fragmenter::{Query, QueryId}; use crate::scheduler::worker_node_manager::WorkerNodeManagerRef; use crate::scheduler::{ - ExecutionContextRef, HummockSnapshotGuard, HummockSnapshotManagerRef, SchedulerResult, + ExecutionContextRef, HummockSnapshotManagerRef, QueryHummockSnapshot, SchedulerResult, }; pub struct DistributedQueryStream { @@ -160,7 +160,7 @@ impl QueryManager { &self, context: ExecutionContextRef, query: Query, - pinned_snapshot: HummockSnapshotGuard, + pinned_snapshot: QueryHummockSnapshot, ) -> SchedulerResult { let query_id = query.query_id.clone(); let query_execution = Arc::new(QueryExecution::new(query, context.session().id())); diff --git a/src/frontend/src/scheduler/hummock_snapshot_manager.rs b/src/frontend/src/scheduler/hummock_snapshot_manager.rs index ad0c3cdbf7e3b..88794a626a8d4 100644 --- a/src/frontend/src/scheduler/hummock_snapshot_manager.rs +++ b/src/frontend/src/scheduler/hummock_snapshot_manager.rs @@ -19,7 +19,7 @@ use std::time::{Duration, Instant}; use anyhow::anyhow; use arc_swap::ArcSwap; -use risingwave_common::util::epoch::INVALID_EPOCH; +use risingwave_common::util::epoch::{Epoch, INVALID_EPOCH}; use risingwave_pb::hummock::HummockSnapshot; use tokio::sync::mpsc::UnboundedSender; use tokio::sync::oneshot::{channel as once_channel, Sender as Callback}; @@ -32,7 +32,27 @@ use crate::scheduler::{SchedulerError, SchedulerResult}; const UNPIN_INTERVAL_SECS: u64 = 10; pub type HummockSnapshotManagerRef = Arc; -pub type PinnedHummockSnapshot = HummockSnapshotGuard; +pub enum QueryHummockSnapshot { + FrontendPinned(HummockSnapshotGuard), + /// Other arbitrary epoch, e.g. user specified. + /// Availability and consistency of underlying data should be guaranteed accordingly. + Other(Epoch), +} + +impl QueryHummockSnapshot { + pub fn get_committed_epoch(&self) -> u64 { + match self { + QueryHummockSnapshot::FrontendPinned(s) => s.snapshot.committed_epoch, + QueryHummockSnapshot::Other(s) => s.0, + } + } +} + +impl From for QueryHummockSnapshot { + fn from(s: HummockSnapshotGuard) -> Self { + QueryHummockSnapshot::FrontendPinned(s) + } +} type SnapshotRef = Arc>; @@ -165,7 +185,7 @@ impl HummockSnapshotManager { } } - pub async fn acquire(&self, query_id: &QueryId) -> SchedulerResult { + pub async fn acquire(&self, query_id: &QueryId) -> SchedulerResult { let (sender, rc) = once_channel(); let msg = EpochOperation::RequestEpoch { query_id: query_id.clone(), diff --git a/src/frontend/src/scheduler/local.rs b/src/frontend/src/scheduler/local.rs index d0328a4d07e13..bd9912d40acb9 100644 --- a/src/frontend/src/scheduler/local.rs +++ b/src/frontend/src/scheduler/local.rs @@ -39,11 +39,10 @@ use tracing::debug; use uuid::Uuid; use super::plan_fragmenter::{PartitionInfo, QueryStageRef}; -use super::HummockSnapshotGuard; use crate::optimizer::plan_node::PlanNodeType; use crate::scheduler::plan_fragmenter::{ExecutionPlanNode, Query, StageId}; use crate::scheduler::task_context::FrontendBatchTaskContext; -use crate::scheduler::SchedulerResult; +use crate::scheduler::{QueryHummockSnapshot, SchedulerResult}; use crate::session::{AuthContext, FrontendEnv}; pub struct LocalQueryStream { @@ -72,7 +71,7 @@ pub struct LocalQueryExecution { query: Query, front_env: FrontendEnv, // The snapshot will be released when LocalQueryExecution is dropped. - snapshot: HummockSnapshotGuard, + snapshot: QueryHummockSnapshot, auth_context: Arc, } @@ -81,7 +80,7 @@ impl LocalQueryExecution { query: Query, front_env: FrontendEnv, sql: S, - snapshot: HummockSnapshotGuard, + snapshot: QueryHummockSnapshot, auth_context: Arc, ) -> Self { Self { diff --git a/src/meta/src/backup_restore/mod.rs b/src/meta/src/backup_restore/mod.rs index 4b5a35b450863..b896fe21b562b 100644 --- a/src/meta/src/backup_restore/mod.rs +++ b/src/meta/src/backup_restore/mod.rs @@ -34,6 +34,8 @@ pub struct MetaSnapshotMetadata { pub id: MetaSnapshotId, pub hummock_version_id: HummockVersionId, pub ssts: Vec, + pub max_committed_epoch: u64, + pub safe_epoch: u64, } impl MetaSnapshotMetadata { @@ -42,6 +44,8 @@ impl MetaSnapshotMetadata { id, hummock_version_id: v.id, ssts: v.get_sst_ids(), + max_committed_epoch: v.max_committed_epoch, + safe_epoch: v.safe_epoch, } } } diff --git a/src/storage/src/hummock/backup_reader.rs b/src/storage/src/hummock/backup_reader.rs new file mode 100644 index 0000000000000..733b14b3e7c30 --- /dev/null +++ b/src/storage/src/hummock/backup_reader.rs @@ -0,0 +1,63 @@ +// Copyright 2022 Singularity Data +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#![allow(dead_code)] + +use std::collections::HashMap; +use std::sync::Arc; + +use risingwave_common::config::RwConfig; +use risingwave_hummock_sdk::HummockVersionId; + +use crate::hummock::local_version::pinned_version::{PinVersionAction, PinnedVersion}; + +pub type BackupReaderRef = Arc; + +type VersionHolder = ( + PinnedVersion, + tokio::sync::mpsc::UnboundedReceiver, +); + +/// `BackupReader` helps to access historical hummock versions, +/// which are persisted in meta snapshots (aka backups). +pub struct BackupReader { + versions: parking_lot::RwLock>, +} + +// TODO #6482: sync `versions` with `backup_store` +impl BackupReader { + pub fn new(_config: &RwConfig) -> Self { + Self { + versions: Default::default(), + } + } + + pub fn for_test() -> Self { + Self { + versions: Default::default(), + } + } + + /// Tries to get a hummock version eligible for querying `epoch`. + /// SSTs of the returned version are expected to be guarded by corresponding backup. + /// Otherwise, reading the version may encounter object store error, due to SSTs GC. + pub fn try_get_hummock_version(&self, epoch: u64) -> Option { + self.versions.read().iter().find_map(|(_, (v, _))| { + if epoch >= v.safe_epoch() && epoch <= v.max_committed_epoch() { + return Some(v.clone()); + } + None + }) + } +} diff --git a/src/storage/src/hummock/compaction_group_client.rs b/src/storage/src/hummock/compaction_group_client.rs deleted file mode 100644 index 4ead9bb960986..0000000000000 --- a/src/storage/src/hummock/compaction_group_client.rs +++ /dev/null @@ -1,234 +0,0 @@ -// Copyright 2022 Singularity Data -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use std::collections::hash_map::Entry; -use std::collections::{HashMap, HashSet}; -use std::ops::DerefMut; -use std::sync::Arc; - -use parking_lot::{Mutex, RwLock}; -use risingwave_hummock_sdk::compaction_group::StateTableId; -use risingwave_hummock_sdk::CompactionGroupId; -use risingwave_pb::hummock::CompactionGroup; -use risingwave_rpc_client::HummockMetaClient; -use tokio::sync::oneshot; - -use crate::hummock::{HummockError, HummockResult}; - -pub enum CompactionGroupClientImpl { - Meta(Arc), - Dummy(DummyCompactionGroupClient), -} - -impl CompactionGroupClientImpl { - pub async fn get_compaction_group_id( - &self, - table_id: StateTableId, - ) -> HummockResult { - match self { - CompactionGroupClientImpl::Meta(c) => c.get_compaction_group_id(table_id).await, - CompactionGroupClientImpl::Dummy(c) => Ok(c.get_compaction_group_id()), - } - } - - pub fn update_by( - &self, - compaction_groups: Vec, - is_complete_snapshot: bool, - all_table_ids: &[StateTableId], - ) { - match self { - CompactionGroupClientImpl::Meta(c) => { - c.update_by(compaction_groups, is_complete_snapshot, all_table_ids) - } - CompactionGroupClientImpl::Dummy(_) => (), - } - } -} - -/// `CompactionGroupClientImpl` maintains compaction group metadata cache. -pub struct MetaCompactionGroupClient { - // Lock order: wait_queue before cache - wait_queue: Mutex>>>, - cache: RwLock, - hummock_meta_client: Arc, -} - -impl MetaCompactionGroupClient { - /// TODO: cache is synced on need currently. We can refactor it to push based after #3679. - async fn get_compaction_group_id( - self: &Arc, - table_id: StateTableId, - ) -> HummockResult { - // We wait for cache update for at most twice. - // For the first time there may already be an inflight RPC when cache miss, whose response - // may not contain wanted cache entry. For the second time the new RPC must contain - // wanted cache entry, no matter the RPC is fired by this task or other. Otherwise, - // the caller is trying to get an inexistent cache entry, which indicates a bug. - let mut wait_counter = 0; - while wait_counter <= 2 { - // 1. Get from cache - if let Some(id) = self.cache.read().get(&table_id) { - return Ok(id); - } - // 2. Otherwise either update cache, or wait for previous update if any. - let waiter = { - let mut guard = self.wait_queue.lock(); - if let Some(id) = self.cache.read().get(&table_id) { - return Ok(id); - } - let wait_queue = guard.deref_mut(); - if let Some(wait_queue) = wait_queue { - let (tx, rx) = oneshot::channel(); - wait_queue.push(tx); - Some(rx) - } else { - *wait_queue = Some(vec![]); - None - } - }; - if let Some(waiter) = waiter { - // Wait for previous update - if let Ok(success) = waiter.await && success { - wait_counter += 1; - } - continue; - } - // Update cache - let this = self.clone(); - tokio::spawn(async move { - let result = this.update().await; - let mut guard = this.wait_queue.lock(); - let wait_queue = guard.deref_mut().take().unwrap(); - for notify in wait_queue { - let _ = notify.send(result.is_ok()); - } - result - }) - .await - .unwrap()?; - wait_counter += 1; - } - Err(HummockError::compaction_group_error(format!( - "compaction group not found for table id {}", - table_id - ))) - } -} - -impl MetaCompactionGroupClient { - pub fn new(hummock_meta_client: Arc) -> Self { - Self { - wait_queue: Default::default(), - cache: Default::default(), - hummock_meta_client, - } - } - - async fn update(&self) -> HummockResult<()> { - let compaction_groups = self - .hummock_meta_client - .get_compaction_groups() - .await - .map_err(HummockError::meta_error)?; - let mut guard = self.cache.write(); - guard.supply_index(compaction_groups, true, false, &[]); - Ok(()) - } - - fn update_by( - &self, - compaction_groups: Vec, - is_complete_snapshot: bool, - all_table_ids: &[StateTableId], - ) { - let mut guard = self.cache.write(); - guard.supply_index( - compaction_groups, - false, - is_complete_snapshot, - all_table_ids, - ); - } -} - -#[derive(Default)] -struct CompactionGroupClientInner { - index: HashMap, -} - -impl CompactionGroupClientInner { - fn get(&self, table_id: &StateTableId) -> Option { - self.index.get(table_id).cloned() - } - - fn update_member_ids( - &mut self, - member_ids: &[StateTableId], - is_pull: bool, - cg_id: CompactionGroupId, - ) { - for table_id in member_ids { - match self.index.entry(*table_id) { - Entry::Occupied(mut entry) => { - if !is_pull { - entry.insert(cg_id); - } - } - Entry::Vacant(entry) => { - entry.insert(cg_id); - } - } - } - } - - fn supply_index( - &mut self, - compaction_groups: Vec, - is_pull: bool, - is_complete_snapshot: bool, - all_table_ids: &[StateTableId], - ) { - if is_complete_snapshot { - self.index.clear(); - } else if !all_table_ids.is_empty() { - let all_table_set: HashSet = all_table_ids.iter().cloned().collect(); - self.index - .retain(|table_id, _| all_table_set.contains(table_id)); - } - for compaction_group in compaction_groups { - let member_ids = compaction_group.get_member_table_ids(); - self.update_member_ids(member_ids, is_pull, compaction_group.get_id()); - } - } -} - -pub struct DummyCompactionGroupClient { - /// Always return this `compaction_group_id`. - compaction_group_id: CompactionGroupId, -} - -impl DummyCompactionGroupClient { - pub fn new(compaction_group_id: CompactionGroupId) -> Self { - Self { - compaction_group_id, - } - } -} - -impl DummyCompactionGroupClient { - fn get_compaction_group_id(&self) -> CompactionGroupId { - self.compaction_group_id - } -} diff --git a/src/storage/src/hummock/error.rs b/src/storage/src/hummock/error.rs index a7fe279db6210..25e19a5754e29 100644 --- a/src/storage/src/hummock/error.rs +++ b/src/storage/src/hummock/error.rs @@ -117,6 +117,10 @@ impl HummockError { HummockErrorInner::ExpiredEpoch { safe_epoch, epoch }.into() } + pub fn is_expired_epoch(&self) -> bool { + matches!(self.inner, HummockErrorInner::ExpiredEpoch { .. }) + } + pub fn compaction_executor(error: impl ToString) -> HummockError { HummockErrorInner::CompactionExecutor(error.to_string()).into() } diff --git a/src/storage/src/hummock/mod.rs b/src/storage/src/hummock/mod.rs index f1f6ed511f09d..b1fbc325a1d1a 100644 --- a/src/storage/src/hummock/mod.rs +++ b/src/storage/src/hummock/mod.rs @@ -47,7 +47,6 @@ pub use tiered_cache::*; pub mod sstable; pub use sstable::*; -pub mod compaction_group_client; pub mod compactor; pub mod conflict_detector; mod error; @@ -62,6 +61,7 @@ pub mod test_utils; pub mod utils; pub use compactor::{CompactorMemoryCollector, CompactorSstableStore}; pub use utils::MemoryLimiter; +pub mod backup_reader; pub mod event_handler; pub mod local_version; pub mod observer_manager; @@ -85,6 +85,7 @@ use self::iterator::{BackwardUserIterator, HummockIterator, UserIterator}; pub use self::sstable_store::*; use super::monitor::StateStoreMetrics; use crate::error::StorageResult; +use crate::hummock::backup_reader::{BackupReader, BackupReaderRef}; use crate::hummock::compactor::Context; use crate::hummock::event_handler::hummock_event_handler::BufferTracker; use crate::hummock::event_handler::{HummockEvent, HummockEventHandler}; @@ -137,6 +138,8 @@ pub struct HummockStorage { read_version_mapping: Arc, tracing: Arc, + + backup_reader: BackupReaderRef, } impl HummockStorage { @@ -144,6 +147,7 @@ impl HummockStorage { pub async fn new( options: Arc, sstable_store: SstableStoreRef, + backup_reader: BackupReaderRef, hummock_meta_client: Arc, notification_client: impl NotificationClient, // TODO: separate `HummockStats` from `StateStoreMetrics`. @@ -206,6 +210,7 @@ impl HummockStorage { }), read_version_mapping: hummock_event_handler.read_version_mapping(), tracing, + backup_reader, }; tokio::spawn(hummock_event_handler.start_hummock_event_handler_worker()); @@ -301,6 +306,7 @@ impl HummockStorage { Self::new( options, sstable_store, + Arc::new(BackupReader::for_test()), hummock_meta_client, notification_client, Arc::new(StateStoreMetrics::unused()), diff --git a/src/storage/src/hummock/state_store.rs b/src/storage/src/hummock/state_store.rs index ab8912d9e4996..e4ed329ee229c 100644 --- a/src/storage/src/hummock/state_store.rs +++ b/src/storage/src/hummock/state_store.rs @@ -90,13 +90,21 @@ impl HummockStorage { key_range: &TableKeyRange, ) -> StorageResult<(Vec, Vec, CommittedVersion)> { let pinned_version = self.pinned_version.load(); - validate_epoch(pinned_version.safe_epoch(), epoch)?; + let mut committed_version = (**pinned_version).clone(); + if let Err(e) = validate_epoch(pinned_version.safe_epoch(), epoch) { + if e.is_expired_epoch() && let Some(backup_version) = self.backup_reader.try_get_hummock_version(epoch) { + committed_version = backup_version; + assert!(epoch <= committed_version.max_committed_epoch()) + } else { + return Err(e.into()); + } + } // check epoch if lower mce let read_version_tuple: (Vec, Vec, CommittedVersion) = - if epoch <= pinned_version.max_committed_epoch() { + if epoch <= committed_version.max_committed_epoch() { // read committed_version directly without build snapshot - (Vec::default(), Vec::default(), (**pinned_version).clone()) + (Vec::default(), Vec::default(), committed_version) } else { let read_version_vec = { let read_guard = self.read_version_mapping.read(); @@ -111,7 +119,7 @@ impl HummockStorage { // When the system has just started and no state has been created, the memory state // may be empty if read_version_vec.is_empty() { - (Vec::default(), Vec::default(), (**pinned_version).clone()) + (Vec::default(), Vec::default(), committed_version) } else { read_filter_for_batch(epoch, table_id, key_range, read_version_vec)? } diff --git a/src/storage/src/store_impl.rs b/src/storage/src/store_impl.rs index 2a0139e99fdd1..50be6cd3ff94b 100644 --- a/src/storage/src/store_impl.rs +++ b/src/storage/src/store_impl.rs @@ -16,7 +16,7 @@ use std::fmt::Debug; use std::sync::Arc; use enum_as_inner::EnumAsInner; -use risingwave_common::config::StorageConfig; +use risingwave_common::config::RwConfig; use risingwave_common_service::observer_manager::RpcNotificationClient; use risingwave_hummock_sdk::filter_key_extractor::FilterKeyExtractorManagerRef; use risingwave_object_store::object::{ @@ -24,6 +24,7 @@ use risingwave_object_store::object::{ }; use crate::error::StorageResult; +use crate::hummock::backup_reader::BackupReader; use crate::hummock::hummock_meta_client::MonitoredHummockMetaClient; use crate::hummock::sstable_store::SstableStoreRef; use crate::hummock::{ @@ -464,13 +465,14 @@ impl StateStoreImpl { pub async fn new( s: &str, file_cache_dir: &str, - config: Arc, + rw_config: &RwConfig, hummock_meta_client: Arc, state_store_stats: Arc, object_store_metrics: Arc, tiered_cache_metrics_builder: TieredCacheMetricsBuilder, tracing: Arc, ) -> StorageResult { + let config = Arc::new(rw_config.storage.clone()); #[cfg(not(target_os = "linux"))] let tiered_cache = TieredCache::none(); @@ -531,9 +533,11 @@ impl StateStoreImpl { RpcNotificationClient::new(hummock_meta_client.get_inner().clone()); if !config.enable_state_store_v1 { + let backup_reader = Arc::new(BackupReader::new(rw_config)); let inner = HummockStorage::new( config.clone(), sstable_store, + backup_reader, hummock_meta_client.clone(), notification_client, state_store_stats.clone(), diff --git a/src/tests/compaction_test/src/runner.rs b/src/tests/compaction_test/src/runner.rs index 6d6cd6fcc9dbb..bdf48c853485d 100644 --- a/src/tests/compaction_test/src/runner.rs +++ b/src/tests/compaction_test/src/runner.rs @@ -14,7 +14,7 @@ use std::collections::{BTreeMap, HashSet}; use std::net::SocketAddr; -use std::ops::Bound; +use std::ops::{Bound, Deref}; use std::pin::Pin; use std::sync::Arc; use std::thread::JoinHandle; @@ -26,7 +26,7 @@ use clap::Parser; use futures::TryStreamExt; use itertools::Itertools; use risingwave_common::catalog::TableId; -use risingwave_common::config::{load_config, StorageConfig}; +use risingwave_common::config::{load_config, RwConfig, StorageConfig}; use risingwave_common::util::addr::HostAddr; use risingwave_hummock_sdk::{CompactionGroupId, HummockEpoch, FIRST_VERSION_ID}; use risingwave_pb::common::WorkerType; @@ -657,11 +657,15 @@ pub async fn create_hummock_store_with_metrics( state_store_metrics: Arc::new(StateStoreMetrics::unused()), object_store_metrics: Arc::new(ObjectStoreMetrics::unused()), }; + let rw_config = RwConfig { + storage: storage_config.deref().clone(), + ..Default::default() + }; let state_store_impl = StateStoreImpl::new( &opts.state_store, "", - storage_config, + &rw_config, Arc::new(MonitoredHummockMetaClient::new( meta_client.clone(), metrics.hummock_metrics.clone(),