diff --git a/Cargo.lock b/Cargo.lock index 52bec01bd833f..30c03a679ace0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5954,6 +5954,7 @@ dependencies = [ "prost 0.11.2", "rand 0.8.5", "regex", + "risingwave_backup", "risingwave_common", "risingwave_common_service", "risingwave_hummock_sdk", diff --git a/proto/backup_service.proto b/proto/backup_service.proto index c5fd3ec01cc78..9efa7d9fa5c00 100644 --- a/proto/backup_service.proto +++ b/proto/backup_service.proto @@ -4,6 +4,10 @@ package backup_service; option optimize_for = SPEED; +message MetaBackupManifestId { + uint64 id = 1; +} + enum BackupJobStatus { UNSPECIFIED = 0; RUNNING = 1; diff --git a/proto/meta.proto b/proto/meta.proto index 6dd48b1e7aa33..5c0a34eae494a 100644 --- a/proto/meta.proto +++ b/proto/meta.proto @@ -8,6 +8,7 @@ import "hummock.proto"; import "source.proto"; import "stream_plan.proto"; import "user.proto"; +import "backup_service.proto"; option optimize_for = SPEED; @@ -205,6 +206,7 @@ message MetaSnapshot { hummock.HummockVersion hummock_version = 12; SnapshotVersion version = 13; + backup_service.MetaBackupManifestId meta_backup_manifest_id = 14; } message SubscribeResponse { @@ -232,6 +234,7 @@ message SubscribeResponse { hummock.HummockSnapshot hummock_snapshot = 14; hummock.HummockVersionDeltas hummock_version_deltas = 15; MetaSnapshot snapshot = 16; + backup_service.MetaBackupManifestId meta_backup_manifest_id = 17; } } diff --git a/src/common/common_service/src/observer_manager.rs b/src/common/common_service/src/observer_manager.rs index dcf7129af8506..c0d3dbe0496c8 100644 --- a/src/common/common_service/src/observer_manager.rs +++ b/src/common/common_service/src/observer_manager.rs @@ -130,6 +130,7 @@ where version_delta.version_deltas[0].id > info.hummock_version.as_ref().unwrap().id } Info::HummockSnapshot(_) => true, + Info::MetaBackupManifestId(_) => true, Info::Snapshot(_) => unreachable!(), }); diff --git a/src/common/src/session_config/mod.rs b/src/common/src/session_config/mod.rs index a567079a21d8d..e8f688deb5e39 100644 --- a/src/common/src/session_config/mod.rs +++ b/src/common/src/session_config/mod.rs @@ -24,10 +24,11 @@ pub use search_path::{SearchPath, USER_NAME_WILD_CARD}; use crate::error::{ErrorCode, RwError}; use crate::session_config::transaction_isolation_level::IsolationLevel; +use crate::util::epoch::Epoch; // This is a hack, &'static str is not allowed as a const generics argument. // TODO: refine this using the adt_const_params feature. -const CONFIG_KEYS: [&str; 10] = [ +const CONFIG_KEYS: [&str; 11] = [ "RW_IMPLICIT_FLUSH", "CREATE_COMPACTION_GROUP_FOR_MV", "QUERY_MODE", @@ -38,6 +39,7 @@ const CONFIG_KEYS: [&str; 10] = [ "MAX_SPLIT_RANGE_GAP", "SEARCH_PATH", "TRANSACTION ISOLATION LEVEL", + "QUERY_EPOCH", ]; // MUST HAVE 1v1 relationship to CONFIG_KEYS. e.g. CONFIG_KEYS[IMPLICIT_FLUSH] = @@ -52,6 +54,7 @@ const BATCH_ENABLE_LOOKUP_JOIN: usize = 6; const MAX_SPLIT_RANGE_GAP: usize = 7; const SEARCH_PATH: usize = 8; const TRANSACTION_ISOLATION_LEVEL: usize = 9; +const QUERY_EPOCH: usize = 10; trait ConfigEntry: Default + for<'a> TryFrom<&'a [&'a str], Error = RwError> { fn entry_name() -> &'static str; @@ -184,6 +187,51 @@ impl TryFrom<&[&str]> for ConfigI32(u64); + +impl Default for ConfigU64 { + fn default() -> Self { + ConfigU64(DEFAULT) + } +} + +impl Deref for ConfigU64 { + type Target = u64; + + fn deref(&self) -> &Self::Target { + &self.0 + } +} + +impl ConfigEntry for ConfigU64 { + fn entry_name() -> &'static str { + CONFIG_KEYS[NAME] + } +} + +impl TryFrom<&[&str]> for ConfigU64 { + type Error = RwError; + + fn try_from(value: &[&str]) -> Result { + if value.len() != 1 { + return Err(ErrorCode::InternalError(format!( + "SET {} takes only one argument", + Self::entry_name() + )) + .into()); + } + + let s = value[0]; + s.parse::().map(ConfigU64).map_err(|_e| { + ErrorCode::InvalidConfigValue { + config_entry: Self::entry_name().to_string(), + config_value: s.to_string(), + } + .into() + }) + } +} + pub struct VariableInfo { pub name: String, pub setting: String, @@ -198,6 +246,7 @@ type ExtraFloatDigit = ConfigI32; type DateStyle = ConfigString; type BatchEnableLookupJoin = ConfigBool; type MaxSplitRangeGap = ConfigI32; +type QueryEpoch = ConfigU64; #[derive(Default)] pub struct ConfigMap { @@ -234,6 +283,9 @@ pub struct ConfigMap { /// see transaction_isolation_level: IsolationLevel, + + /// select as of specific epoch + query_epoch: QueryEpoch, } impl ConfigMap { @@ -257,6 +309,8 @@ impl ConfigMap { self.max_split_range_gap = val.as_slice().try_into()?; } else if key.eq_ignore_ascii_case(SearchPath::entry_name()) { self.search_path = val.as_slice().try_into()?; + } else if key.eq_ignore_ascii_case(QueryEpoch::entry_name()) { + self.query_epoch = val.as_slice().try_into()?; } else { return Err(ErrorCode::UnrecognizedConfigurationParameter(key.to_string()).into()); } @@ -285,6 +339,8 @@ impl ConfigMap { Ok(self.search_path.to_string()) } else if key.eq_ignore_ascii_case(IsolationLevel::entry_name()) { Ok(self.transaction_isolation_level.to_string()) + } else if key.eq_ignore_ascii_case(QueryEpoch::entry_name()) { + Ok(self.query_epoch.to_string()) } else { Err(ErrorCode::UnrecognizedConfigurationParameter(key.to_string()).into()) } @@ -336,6 +392,11 @@ impl ConfigMap { name: SearchPath::entry_name().to_lowercase(), setting : self.search_path.to_string(), description : String::from("Sets the order in which schemas are searched when an object (table, data type, function, etc.) is referenced by a simple name with no schema specified") + }, + VariableInfo { + name: QueryEpoch::entry_name().to_lowercase(), + setting : self.query_epoch.to_string(), + description : String::from("Sets the historical epoch for querying data. If 0, querying latest data.") } ] } @@ -379,4 +440,11 @@ impl ConfigMap { pub fn get_search_path(&self) -> SearchPath { self.search_path.clone() } + + pub fn get_query_epoch(&self) -> Option { + if self.query_epoch.0 != 0 { + return Some((self.query_epoch.0).into()); + } + None + } } diff --git a/src/compute/src/server.rs b/src/compute/src/server.rs index c96101605c18b..3cf7b26be28f3 100644 --- a/src/compute/src/server.rs +++ b/src/compute/src/server.rs @@ -116,7 +116,7 @@ pub async fn compute_node_serve( let state_store = StateStoreImpl::new( &opts.state_store, &opts.file_cache_dir, - storage_config.clone(), + &config, hummock_meta_client.clone(), state_store_metrics.clone(), object_store_metrics, diff --git a/src/ctl/src/common/hummock_service.rs b/src/ctl/src/common/hummock_service.rs index 44d79ac522d63..12007216dfdca 100644 --- a/src/ctl/src/common/hummock_service.rs +++ b/src/ctl/src/common/hummock_service.rs @@ -17,7 +17,7 @@ use std::sync::Arc; use std::time::Duration; use anyhow::{anyhow, bail, Result}; -use risingwave_common::config::StorageConfig; +use risingwave_common::config::{RwConfig, StorageConfig}; use risingwave_rpc_client::MetaClient; use risingwave_storage::hummock::hummock_meta_client::MonitoredHummockMetaClient; use risingwave_storage::hummock::{HummockStorage, TieredCacheMetricsBuilder}; @@ -100,6 +100,10 @@ For `./risedev apply-compose-deploy` users, share_buffer_compaction_worker_threads_number: 0, ..Default::default() }; + let rw_config = RwConfig { + storage: config.clone(), + ..Default::default() + }; tracing::info!("using Hummock config: {:#?}", config); @@ -112,7 +116,7 @@ For `./risedev apply-compose-deploy` users, let state_store_impl = StateStoreImpl::new( &self.hummock_url, "", - Arc::new(config), + &rw_config, Arc::new(MonitoredHummockMetaClient::new( meta_client.clone(), metrics.hummock_metrics.clone(), diff --git a/src/frontend/src/handler/query.rs b/src/frontend/src/handler/query.rs index d3386c1ed760e..e0f1494418215 100644 --- a/src/frontend/src/handler/query.rs +++ b/src/frontend/src/handler/query.rs @@ -20,6 +20,7 @@ use itertools::Itertools; use pgwire::pg_field_descriptor::PgFieldDescriptor; use pgwire::pg_response::{PgResponse, StatementType}; use postgres_types::FromSql; +use risingwave_common::bail; use risingwave_common::catalog::Schema; use risingwave_common::error::{ErrorCode, Result, RwError}; use risingwave_common::session_config::QueryMode; @@ -35,7 +36,7 @@ use crate::planner::Planner; use crate::scheduler::plan_fragmenter::Query; use crate::scheduler::{ BatchPlanFragmenter, DistributedQueryStream, ExecutionContext, ExecutionContextRef, - HummockSnapshotGuard, LocalQueryExecution, LocalQueryStream, + LocalQueryExecution, LocalQueryStream, QueryHummockSnapshot, }; use crate::session::SessionImpl; use crate::PlanRef; @@ -133,17 +134,27 @@ pub async fn handle_query( let hummock_snapshot_manager = session.env().hummock_snapshot_manager(); let query_id = query.query_id().clone(); let pinned_snapshot = hummock_snapshot_manager.acquire(&query_id).await?; - + let mut query_snapshot = QueryHummockSnapshot::FrontendPinned(pinned_snapshot); + if let Some(query_epoch) = session.config().get_query_epoch() { + if query_epoch.0 > query_snapshot.get_committed_epoch() { + bail!( + "cannot query with future epoch: {}, current committed epoch: {}", + query_epoch.0, + query_snapshot.get_committed_epoch() + ); + } + query_snapshot = QueryHummockSnapshot::Other(query_epoch); + } match query_mode { QueryMode::Local => PgResponseStream::LocalQuery(DataChunkToRowSetAdapter::new( - local_execute(session.clone(), query, pinned_snapshot).await?, + local_execute(session.clone(), query, query_snapshot).await?, column_types, format, )), // Local mode do not support cancel tasks. QueryMode::Distributed => { PgResponseStream::DistributedQuery(DataChunkToRowSetAdapter::new( - distribute_execute(session.clone(), query, pinned_snapshot).await?, + distribute_execute(session.clone(), query, query_snapshot).await?, column_types, format, )) @@ -224,7 +235,7 @@ fn to_statement_type(stmt: &Statement) -> Result { pub async fn distribute_execute( session: Arc, query: Query, - pinned_snapshot: HummockSnapshotGuard, + pinned_snapshot: QueryHummockSnapshot, ) -> Result { let execution_context: ExecutionContextRef = ExecutionContext::new(session.clone()).into(); let query_manager = session.env().query_manager().clone(); @@ -238,7 +249,7 @@ pub async fn distribute_execute( pub async fn local_execute( session: Arc, query: Query, - pinned_snapshot: HummockSnapshotGuard, + pinned_snapshot: QueryHummockSnapshot, ) -> Result { let front_env = session.env(); diff --git a/src/frontend/src/observer/observer_manager.rs b/src/frontend/src/observer/observer_manager.rs index 01be3d159ec0e..16e902cdde46c 100644 --- a/src/frontend/src/observer/observer_manager.rs +++ b/src/frontend/src/observer/observer_manager.rs @@ -75,6 +75,9 @@ impl ObserverState for FrontendObserverNode { Info::HummockVersionDeltas(_) => { panic!("frontend node should not receive HummockVersionDeltas"); } + Info::MetaBackupManifestId(_) => { + panic!("frontend node should not receive MetaBackupManifestId"); + } } } diff --git a/src/frontend/src/scheduler/distributed/query.rs b/src/frontend/src/scheduler/distributed/query.rs index a7c52d92db219..4b58592fa8173 100644 --- a/src/frontend/src/scheduler/distributed/query.rs +++ b/src/frontend/src/scheduler/distributed/query.rs @@ -36,8 +36,7 @@ use crate::scheduler::distributed::StageExecution; use crate::scheduler::plan_fragmenter::{Query, StageId, ROOT_TASK_ID, ROOT_TASK_OUTPUT_ID}; use crate::scheduler::worker_node_manager::WorkerNodeManagerRef; use crate::scheduler::{ - ExecutionContextRef, HummockSnapshotGuard, PinnedHummockSnapshot, SchedulerError, - SchedulerResult, + ExecutionContextRef, QueryHummockSnapshot, SchedulerError, SchedulerResult, }; /// Message sent to a `QueryRunner` to control its execution. @@ -115,7 +114,7 @@ impl QueryExecution { &self, context: ExecutionContextRef, worker_node_manager: WorkerNodeManagerRef, - pinned_snapshot: HummockSnapshotGuard, + pinned_snapshot: QueryHummockSnapshot, compute_client_pool: ComputeClientPoolRef, catalog_reader: CatalogReader, query_execution_info: QueryExecutionInfoRef, @@ -189,7 +188,7 @@ impl QueryExecution { fn gen_stage_executions( &self, - pinned_snapshot: &PinnedHummockSnapshot, + pinned_snapshot: &QueryHummockSnapshot, context: ExecutionContextRef, worker_node_manager: WorkerNodeManagerRef, compute_client_pool: ComputeClientPoolRef, @@ -225,7 +224,7 @@ impl QueryExecution { } impl QueryRunner { - async fn run(mut self, pinned_snapshot: PinnedHummockSnapshot) { + async fn run(mut self, pinned_snapshot: QueryHummockSnapshot) { // Start leaf stages. let leaf_stages = self.query.leaf_stages(); for stage_id in &leaf_stages { @@ -424,7 +423,7 @@ pub(crate) mod tests { .start( ExecutionContext::new(SessionImpl::mock().into()).into(), worker_node_manager, - pinned_snapshot, + pinned_snapshot.into(), compute_client_pool, catalog_reader, query_execution_info, diff --git a/src/frontend/src/scheduler/distributed/query_manager.rs b/src/frontend/src/scheduler/distributed/query_manager.rs index c64280707c63f..2c4c327e4f462 100644 --- a/src/frontend/src/scheduler/distributed/query_manager.rs +++ b/src/frontend/src/scheduler/distributed/query_manager.rs @@ -34,7 +34,7 @@ use crate::catalog::catalog_service::CatalogReader; use crate::scheduler::plan_fragmenter::{Query, QueryId}; use crate::scheduler::worker_node_manager::WorkerNodeManagerRef; use crate::scheduler::{ - ExecutionContextRef, HummockSnapshotGuard, HummockSnapshotManagerRef, SchedulerResult, + ExecutionContextRef, HummockSnapshotManagerRef, QueryHummockSnapshot, SchedulerResult, }; pub struct DistributedQueryStream { @@ -160,7 +160,7 @@ impl QueryManager { &self, context: ExecutionContextRef, query: Query, - pinned_snapshot: HummockSnapshotGuard, + pinned_snapshot: QueryHummockSnapshot, ) -> SchedulerResult { let query_id = query.query_id.clone(); let query_execution = Arc::new(QueryExecution::new(query, context.session().id())); diff --git a/src/frontend/src/scheduler/hummock_snapshot_manager.rs b/src/frontend/src/scheduler/hummock_snapshot_manager.rs index ad0c3cdbf7e3b..88794a626a8d4 100644 --- a/src/frontend/src/scheduler/hummock_snapshot_manager.rs +++ b/src/frontend/src/scheduler/hummock_snapshot_manager.rs @@ -19,7 +19,7 @@ use std::time::{Duration, Instant}; use anyhow::anyhow; use arc_swap::ArcSwap; -use risingwave_common::util::epoch::INVALID_EPOCH; +use risingwave_common::util::epoch::{Epoch, INVALID_EPOCH}; use risingwave_pb::hummock::HummockSnapshot; use tokio::sync::mpsc::UnboundedSender; use tokio::sync::oneshot::{channel as once_channel, Sender as Callback}; @@ -32,7 +32,27 @@ use crate::scheduler::{SchedulerError, SchedulerResult}; const UNPIN_INTERVAL_SECS: u64 = 10; pub type HummockSnapshotManagerRef = Arc; -pub type PinnedHummockSnapshot = HummockSnapshotGuard; +pub enum QueryHummockSnapshot { + FrontendPinned(HummockSnapshotGuard), + /// Other arbitrary epoch, e.g. user specified. + /// Availability and consistency of underlying data should be guaranteed accordingly. + Other(Epoch), +} + +impl QueryHummockSnapshot { + pub fn get_committed_epoch(&self) -> u64 { + match self { + QueryHummockSnapshot::FrontendPinned(s) => s.snapshot.committed_epoch, + QueryHummockSnapshot::Other(s) => s.0, + } + } +} + +impl From for QueryHummockSnapshot { + fn from(s: HummockSnapshotGuard) -> Self { + QueryHummockSnapshot::FrontendPinned(s) + } +} type SnapshotRef = Arc>; @@ -165,7 +185,7 @@ impl HummockSnapshotManager { } } - pub async fn acquire(&self, query_id: &QueryId) -> SchedulerResult { + pub async fn acquire(&self, query_id: &QueryId) -> SchedulerResult { let (sender, rc) = once_channel(); let msg = EpochOperation::RequestEpoch { query_id: query_id.clone(), diff --git a/src/frontend/src/scheduler/local.rs b/src/frontend/src/scheduler/local.rs index d0328a4d07e13..bd9912d40acb9 100644 --- a/src/frontend/src/scheduler/local.rs +++ b/src/frontend/src/scheduler/local.rs @@ -39,11 +39,10 @@ use tracing::debug; use uuid::Uuid; use super::plan_fragmenter::{PartitionInfo, QueryStageRef}; -use super::HummockSnapshotGuard; use crate::optimizer::plan_node::PlanNodeType; use crate::scheduler::plan_fragmenter::{ExecutionPlanNode, Query, StageId}; use crate::scheduler::task_context::FrontendBatchTaskContext; -use crate::scheduler::SchedulerResult; +use crate::scheduler::{QueryHummockSnapshot, SchedulerResult}; use crate::session::{AuthContext, FrontendEnv}; pub struct LocalQueryStream { @@ -72,7 +71,7 @@ pub struct LocalQueryExecution { query: Query, front_env: FrontendEnv, // The snapshot will be released when LocalQueryExecution is dropped. - snapshot: HummockSnapshotGuard, + snapshot: QueryHummockSnapshot, auth_context: Arc, } @@ -81,7 +80,7 @@ impl LocalQueryExecution { query: Query, front_env: FrontendEnv, sql: S, - snapshot: HummockSnapshotGuard, + snapshot: QueryHummockSnapshot, auth_context: Arc, ) -> Self { Self { diff --git a/src/meta/src/backup_restore/backup_manager.rs b/src/meta/src/backup_restore/backup_manager.rs index 03d1f49b4c82a..2524e29497183 100644 --- a/src/meta/src/backup_restore/backup_manager.rs +++ b/src/meta/src/backup_restore/backup_manager.rs @@ -16,11 +16,12 @@ use std::sync::Arc; use itertools::Itertools; use risingwave_backup::error::BackupError; -use risingwave_backup::storage::BackupStorageRef; -use risingwave_backup::{MetaBackupJobId, MetaSnapshotId}; +use risingwave_backup::storage::MetaSnapshotStorageRef; +use risingwave_backup::{MetaBackupJobId, MetaSnapshotId, MetaSnapshotManifest}; use risingwave_common::bail; use risingwave_hummock_sdk::HummockSstableId; -use risingwave_pb::backup_service::BackupJobStatus; +use risingwave_pb::backup_service::{BackupJobStatus, MetaBackupManifestId}; +use risingwave_pb::meta::subscribe_response::{Info, Operation}; use tokio::task::JoinHandle; use crate::backup_restore::meta_snapshot_builder::MetaSnapshotBuilder; @@ -37,6 +38,7 @@ pub enum BackupJobResult { /// `BackupJobHandle` tracks running job. struct BackupJobHandle { job_id: u64, + #[expect(dead_code)] hummock_version_safe_point: HummockVersionSafePoint, } @@ -55,7 +57,7 @@ pub type BackupManagerRef = Arc>; pub struct BackupManager { env: MetaSrvEnv, hummock_manager: HummockManagerRef, - backup_store: BackupStorageRef, + backup_store: MetaSnapshotStorageRef, /// Tracks the running backup job. Concurrent jobs is not supported. running_backup_job: tokio::sync::Mutex>, } @@ -64,7 +66,7 @@ impl BackupManager { pub fn new( env: MetaSrvEnv, hummock_manager: HummockManagerRef, - backup_store: BackupStorageRef, + backup_store: MetaSnapshotStorageRef, ) -> Self { Self { env, @@ -79,7 +81,7 @@ impl BackupManager { Self { env, hummock_manager, - backup_store: Arc::new(risingwave_backup::storage::DummyBackupStorage {}), + backup_store: Arc::new(risingwave_backup::storage::DummyMetaSnapshotStorage::default()), running_backup_job: Default::default(), } } @@ -123,8 +125,8 @@ impl BackupManager { } if self .backup_store - .list() - .await? + .manifest() + .snapshot_metadata .iter() .any(|m| m.id == job_id) { @@ -143,6 +145,14 @@ impl BackupManager { match job_result { BackupJobResult::Succeeded => { tracing::info!("succeeded backup job {}", job_id); + self.env + .notification_manager() + .notify_hummock_without_version( + Operation::Update, + Info::MetaBackupManifestId(MetaBackupManifestId { + id: self.backup_store.manifest().manifest_id, + }), + ); } BackupJobResult::Failed(e) => { tracing::warn!("failed backup job {}: {}", job_id, e); @@ -172,16 +182,18 @@ impl BackupManager { } /// List all `SSTables` required by backups. - pub async fn list_pinned_ssts(&self) -> MetaResult> { - let r = self - .backup_store - .list() - .await? - .into_iter() - .flat_map(|s| s.ssts) + pub fn list_pinned_ssts(&self) -> Vec { + self.backup_store + .manifest() + .snapshot_metadata + .iter() + .flat_map(|s| s.ssts.clone()) .dedup() - .collect_vec(); - Ok(r) + .collect_vec() + } + + pub fn manifest(&self) -> Arc { + self.backup_store.manifest() } } diff --git a/src/meta/src/backup_restore/mod.rs b/src/meta/src/backup_restore/mod.rs index 7399a042624ca..e329b8eaaea0f 100644 --- a/src/meta/src/backup_restore/mod.rs +++ b/src/meta/src/backup_restore/mod.rs @@ -12,8 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -#![allow(dead_code)] - mod backup_manager; pub use backup_manager::*; mod error; diff --git a/src/meta/src/backup_restore/restore.rs b/src/meta/src/backup_restore/restore.rs index d6da77b11641c..2c6bae9f20fac 100644 --- a/src/meta/src/backup_restore/restore.rs +++ b/src/meta/src/backup_restore/restore.rs @@ -16,7 +16,7 @@ use clap::Parser; use itertools::Itertools; use risingwave_backup::error::{BackupError, BackupResult}; use risingwave_backup::meta_snapshot::MetaSnapshot; -use risingwave_backup::storage::BackupStorageRef; +use risingwave_backup::storage::MetaSnapshotStorageRef; use crate::backup_restore::utils::{get_backup_store, get_meta_store, MetaStoreBackendImpl}; use crate::hummock::compaction_group::CompactionGroup; @@ -123,7 +123,7 @@ async fn restore_metadata(meta_store: S, snapshot: MetaSnapshot) - async fn restore_impl( opts: RestoreOpts, meta_store: Option, - backup_store: Option, + backup_store: Option, ) -> BackupResult<()> { if cfg!(not(test)) { assert!(meta_store.is_none()); @@ -138,7 +138,7 @@ async fn restore_impl( Some(b) => b, }; let target_id = opts.meta_snapshot_id; - let snapshot_list = backup_store.list().await?; + let snapshot_list = backup_store.manifest().snapshot_metadata.clone(); if !snapshot_list.iter().any(|m| m.id == target_id) { return Err(BackupError::Other(anyhow::anyhow!( "snapshot id {} not found", diff --git a/src/meta/src/backup_restore/utils.rs b/src/meta/src/backup_restore/utils.rs index 0f9e210fffa04..eff377ba716a5 100644 --- a/src/meta/src/backup_restore/utils.rs +++ b/src/meta/src/backup_restore/utils.rs @@ -17,7 +17,7 @@ use std::time::Duration; use etcd_client::{Client as EtcdClient, ConnectOptions}; use risingwave_backup::error::BackupResult; -use risingwave_backup::storage::{BackupStorageRef, ObjectStoreMetaSnapshotStorage}; +use risingwave_backup::storage::{MetaSnapshotStorageRef, ObjectStoreMetaSnapshotStorage}; use risingwave_object_store::object::object_metrics::ObjectStoreMetrics; use risingwave_object_store::object::parse_remote_object_store; @@ -76,7 +76,7 @@ pub async fn get_meta_store(opts: RestoreOpts) -> BackupResult BackupResult { +pub async fn get_backup_store(opts: RestoreOpts) -> BackupResult { let object_store = parse_remote_object_store( &opts.storage_url, Arc::new(ObjectStoreMetrics::unused()), diff --git a/src/meta/src/hummock/vacuum.rs b/src/meta/src/hummock/vacuum.rs index 123830f3d73ee..4e749ea2522b1 100644 --- a/src/meta/src/hummock/vacuum.rs +++ b/src/meta/src/hummock/vacuum.rs @@ -165,12 +165,8 @@ where &self, ssts_to_delete: &mut Vec, ) -> MetaResult<()> { - let reject: HashSet = self - .backup_manager - .list_pinned_ssts() - .await? - .into_iter() - .collect(); + let reject: HashSet = + self.backup_manager.list_pinned_ssts().into_iter().collect(); // Ack these pinned SSTs directly. Otherwise delta log containing them cannot be GCed. // These SSTs will be GCed during full GC when they are no longer pinned. let to_ack = ssts_to_delete diff --git a/src/meta/src/rpc/server.rs b/src/meta/src/rpc/server.rs index 619607c74ff27..6fdd73bf507a6 100644 --- a/src/meta/src/rpc/server.rs +++ b/src/meta/src/rpc/server.rs @@ -479,6 +479,7 @@ pub async fn rpc_serve_with_store( cluster_manager.clone(), hummock_manager.clone(), fragment_manager.clone(), + backup_manager.clone(), ); let health_srv = HealthServiceImpl::new(); let backup_srv = BackupServiceImpl::new(backup_manager); diff --git a/src/meta/src/rpc/service/notification_service.rs b/src/meta/src/rpc/service/notification_service.rs index 49fa61d729772..d016101b02b45 100644 --- a/src/meta/src/rpc/service/notification_service.rs +++ b/src/meta/src/rpc/service/notification_service.rs @@ -13,6 +13,7 @@ // limitations under the License. use itertools::Itertools; +use risingwave_pb::backup_service::MetaBackupManifestId; use risingwave_pb::catalog::Table; use risingwave_pb::common::worker_node::State::Running; use risingwave_pb::common::{ParallelUnitMapping, WorkerNode, WorkerType}; @@ -24,6 +25,7 @@ use tokio::sync::mpsc; use tokio_stream::wrappers::UnboundedReceiverStream; use tonic::{Request, Response, Status}; +use crate::backup_restore::BackupManagerRef; use crate::hummock::HummockManagerRef; use crate::manager::{ Catalog, CatalogManagerRef, ClusterManagerRef, FragmentManagerRef, MetaSrvEnv, Notification, @@ -38,6 +40,7 @@ pub struct NotificationServiceImpl { cluster_manager: ClusterManagerRef, hummock_manager: HummockManagerRef, fragment_manager: FragmentManagerRef, + backup_manager: BackupManagerRef, } impl NotificationServiceImpl @@ -50,6 +53,7 @@ where cluster_manager: ClusterManagerRef, hummock_manager: HummockManagerRef, fragment_manager: FragmentManagerRef, + backup_manager: BackupManagerRef, ) -> Self { Self { env, @@ -57,6 +61,7 @@ where cluster_manager, hummock_manager, fragment_manager, + backup_manager, } } @@ -148,6 +153,7 @@ where .await .current_version .clone(); + let meta_backup_manifest_id = self.backup_manager.manifest().manifest_id; MetaSnapshot { tables, @@ -156,6 +162,9 @@ where catalog_version, ..Default::default() }), + meta_backup_manifest_id: Some(MetaBackupManifestId { + id: meta_backup_manifest_id, + }), ..Default::default() } } diff --git a/src/storage/Cargo.toml b/src/storage/Cargo.toml index 7badd7734e05a..3c5727ed60420 100644 --- a/src/storage/Cargo.toml +++ b/src/storage/Cargo.toml @@ -45,6 +45,7 @@ prometheus = { version = "0.13", features = ["process"] } prost = "0.11" rand = "0.8" regex = "1" +risingwave_backup = { path = "../storage/backup" } risingwave_common = { path = "../common" } risingwave_common_service = { path = "../common/common_service" } risingwave_hummock_sdk = { path = "../storage/hummock_sdk" } diff --git a/src/storage/backup/integration_tests/common.sh b/src/storage/backup/integration_tests/common.sh index 37714028000f1..282e3ef4f91e9 100644 --- a/src/storage/backup/integration_tests/common.sh +++ b/src/storage/backup/integration_tests/common.sh @@ -75,6 +75,42 @@ function restore() { --storage-url minio://hummockadmin:hummockadmin@127.0.0.1:9301/hummock001 } +function execute_sql() { + local sql + sql=$1 + echo "execute sql ${sql}" + echo "${sql}" | psql -h localhost -p 4566 -d dev -U root 2>&1 +} + +function get_max_committed_epoch() { + mce=$(cargo make ctl hummock list-version | grep max_committed_epoch | sed -n 's/max_committed_epoch: \(.*\),/\1/p') + echo "${mce}" +} + +function get_safe_epoch() { + safe_epoch=$(cargo make ctl hummock list-version | grep safe_epoch | sed -n 's/safe_epoch: \(.*\),/\1/p') + echo "${safe_epoch}" +} + function get_total_sst_count() { - find "${BACKUP_TEST_PREFIX_DATA}/minio/hummock001" -type f -name "*.data" |wc -l + find "${BACKUP_TEST_PREFIX_DATA}/minio/hummock001/hummock_001" -type f -name "*.data" |wc -l +} + +function get_max_committed_epoch_in_backup() { + local id + id=$1 + sed_str="s/.*{\"id\":${id},\"hummock_version_id\":.*,\"ssts\":\[.*\],\"max_committed_epoch\":\([[:digit:]]*\),\"safe_epoch\":.*}.*/\1/p" + cat "${BACKUP_TEST_PREFIX_DATA}/minio/hummock001/backup/manifest.json" | sed -n "${sed_str}" +} + +function get_safe_epoch_in_backup() { + local id + id=$1 + sed_str="s/.*{\"id\":${id},\"hummock_version_id\":.*,\"ssts\":\[.*\],\"max_committed_epoch\":.*,\"safe_epoch\":\([[:digit:]]*\)}.*/\1/p" + cat "${BACKUP_TEST_PREFIX_DATA}/minio/hummock001/backup/manifest.json" | sed -n "${sed_str}" +} + +function get_min_pinned_snapshot() { + s=$(cargo make ctl hummock list-pinned-snapshots | grep "min_pinned_snapshot" | sed -n 's/.*min_pinned_snapshot \(.*\)/\1/p' | sort -n | head -1) + echo "${s}" } diff --git a/src/storage/backup/integration_tests/run_all.sh b/src/storage/backup/integration_tests/run_all.sh index 9b6aa91434b23..6abe76b5bd883 100644 --- a/src/storage/backup/integration_tests/run_all.sh +++ b/src/storage/backup/integration_tests/run_all.sh @@ -2,5 +2,12 @@ DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" >/dev/null 2>&1 && pwd )" -bash "${DIR}/test_basic.sh" -bash "${DIR}/test_pin_sst.sh" +tests=( \ +#"test_basic.sh" \ +#"test_pin_sst.sh" \ +"test_query_backup.sh" \ +) +for t in "${tests[@]}" +do + bash "${DIR}/${t}" +done diff --git a/src/storage/backup/integration_tests/test_query_backup.sh b/src/storage/backup/integration_tests/test_query_backup.sh new file mode 100644 index 0000000000000..8f7832caab315 --- /dev/null +++ b/src/storage/backup/integration_tests/test_query_backup.sh @@ -0,0 +1,128 @@ +#!/bin/bash + +DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" >/dev/null 2>&1 && pwd )" +. "${DIR}/common.sh" + +stop_cluster +clean_all_data +start_cluster + +execute_sql " +SET RW_IMPLICIT_FLUSH TO true; +create table t1(v1 int, v2 int); +insert into t1 values (2,1),(1,2),(1,1); +" + +result=$( +execute_sql " +select * from t1; +" | grep "3 row" +) +[ -n "${result}" ] + +# backup before delete rows + +job_id=$(backup) +echo "${job_id}" +backup_mce=$(get_max_committed_epoch_in_backup "${job_id}") +backup_safe_epoch=$(get_safe_epoch_in_backup "${job_id}") +echo "backup MCE: ${backup_mce}" +echo "backup safe_epoch: ${backup_safe_epoch}" + +execute_sql " +SET RW_IMPLICIT_FLUSH TO true; +delete from t1 where v1=1; +" + +result=$( +execute_sql " +select * from t1; +" | grep "1 row" +) +[ -n "${result}" ] + +result=$( +execute_sql " +SET QUERY_EPOCH TO 0; +select * from t1; +" | grep "1 row" +) +[ -n "${result}" ] + +min_pinned_snapshot=$(get_min_pinned_snapshot) +while [ "${min_pinned_snapshot}" -le "${backup_mce}" ] ; +do + echo "wait frontend to unpin snapshot. current: ${min_pinned_snapshot}, expect: ${backup_mce}" + sleep 5 + min_pinned_snapshot=$(get_min_pinned_snapshot) +done +# safe epoch equals to 0 because no compaction has been done +safe_epoch=$(get_safe_epoch) +[ "${safe_epoch}" -eq 0 ] +# trigger a compaction to increase safe_epoch +cargo make ctl hummock trigger-manual-compaction -c 3 -l 0 +# wait until compaction is done +while [ "${safe_epoch}" -le "${backup_mce}" ] ; +do + safe_epoch=$(get_safe_epoch) + sleep 5 +done +echo "safe epoch after compaction: ${safe_epoch}" + +# query with safe_epoch +result=$( +execute_sql " +SET QUERY_EPOCH TO ${safe_epoch}; +select * from t1; +" | grep "1 row\|3 row" +) +[ -n "${result}" ] + +# query with safe_epoch - 1 +result=$( +execute_sql " +SET QUERY_EPOCH TO $(( safe_epoch - 1 )); +select * from t1; +" | grep "Expired Epoch" +) +[ -n "${result}" ] + +# query with QUERY_EPOCH=0 means use latest epoch +result=$( +execute_sql " +SET QUERY_EPOCH TO 0; +select * from t1; +" | grep "1 row" +) +[ -n "${result}" ] + +echo "query with backup_safe_epoch + 1 < safe_epoch but covered by backup" +# query with backup_safe_epoch < safe_epoch but covered by backup +[ $((backup_safe_epoch + 1)) -eq 1 ] +result=$( +execute_sql " +SET QUERY_EPOCH TO $((backup_safe_epoch + 1)); +select * from t1; +" | grep "0 row" +) +[ -n "${result}" ] + +echo "query with backup_mce < safe_epoch but covered by backup" +# query with backup_mce < safe_epoch but covered by backup +result=$( +execute_sql " +SET QUERY_EPOCH TO $((backup_mce)); +select * from t1; +" | grep "3 row" +) +[ -n "${result}" ] + +echo "query with future epoch" +# query with future epoch +result=$( +execute_sql " +SET QUERY_EPOCH TO 18446744073709551615; +select * from t1; +" | grep "cannot query with future epoch" +) +[ -n "${result}" ] \ No newline at end of file diff --git a/src/storage/backup/src/lib.rs b/src/storage/backup/src/lib.rs index 7d5eadf9748ef..351c29ba5a8ae 100644 --- a/src/storage/backup/src/lib.rs +++ b/src/storage/backup/src/lib.rs @@ -30,7 +30,6 @@ #![feature(error_generic_member_access)] #![feature(provide_any)] #![cfg_attr(coverage, feature(no_coverage))] -#![allow(dead_code)] pub mod error; pub mod meta_snapshot; @@ -54,6 +53,8 @@ pub struct MetaSnapshotMetadata { pub id: MetaSnapshotId, pub hummock_version_id: HummockVersionId, pub ssts: Vec, + pub max_committed_epoch: u64, + pub safe_epoch: u64, } impl MetaSnapshotMetadata { @@ -62,10 +63,19 @@ impl MetaSnapshotMetadata { id, hummock_version_id: v.id, ssts: v.get_sst_ids(), + max_committed_epoch: v.max_committed_epoch, + safe_epoch: v.safe_epoch, } } } +/// `MetaSnapshotManifest` is the source of truth for valid `MetaSnapshot`. +#[derive(Serialize, Deserialize, Default, Clone)] +pub struct MetaSnapshotManifest { + pub manifest_id: u64, + pub snapshot_metadata: Vec, +} + // Code is copied from storage crate. TODO #6482: extract method. pub fn xxhash64_checksum(data: &[u8]) -> u64 { let mut hasher = twox_hash::XxHash64::with_seed(0); diff --git a/src/storage/backup/src/storage.rs b/src/storage/backup/src/storage.rs index 16dd5749294b0..9157d933ca6b7 100644 --- a/src/storage/backup/src/storage.rs +++ b/src/storage/backup/src/storage.rs @@ -17,12 +17,13 @@ use std::sync::Arc; use itertools::Itertools; use risingwave_object_store::object::{ObjectError, ObjectStoreRef}; -use serde::{Deserialize, Serialize}; use crate::meta_snapshot::MetaSnapshot; -use crate::{BackupError, BackupResult, MetaSnapshotId, MetaSnapshotMetadata}; +use crate::{ + BackupError, BackupResult, MetaSnapshotId, MetaSnapshotManifest, MetaSnapshotMetadata, +}; -pub type BackupStorageRef = Arc; +pub type MetaSnapshotStorageRef = Arc; #[async_trait::async_trait] pub trait MetaSnapshotStorage: 'static + Sync + Send { @@ -32,40 +33,32 @@ pub trait MetaSnapshotStorage: 'static + Sync + Send { /// Gets a snapshot by id. async fn get(&self, id: MetaSnapshotId) -> BackupResult; - /// List all snapshots' metadata. - async fn list(&self) -> BackupResult>; + /// Gets local snapshot manifest. + fn manifest(&self) -> Arc; + + /// Refreshes local snapshot manifest. + async fn refresh_manifest(&self) -> BackupResult<()>; /// Deletes snapshots by ids. async fn delete(&self, ids: &[MetaSnapshotId]) -> BackupResult<()>; } -/// `MetaSnapshotManifest` is the source of truth for valid `MetaSnapshot`. -#[derive(Serialize, Deserialize, Default, Clone)] -struct MetaSnapshotManifest { - pub manifest_id: u64, - pub snapshot_metadata: Vec, -} - #[derive(Clone)] pub struct ObjectStoreMetaSnapshotStorage { path: String, store: ObjectStoreRef, - manifest: Arc>, + manifest: Arc>>, } // TODO #6482: purge stale snapshots that is not in manifest. impl ObjectStoreMetaSnapshotStorage { pub async fn new(path: &str, store: ObjectStoreRef) -> BackupResult { - let mut instance = Self { + let instance = Self { path: path.to_string(), store, manifest: Default::default(), }; - let manifest = match instance.get_manifest().await? { - None => MetaSnapshotManifest::default(), - Some(manifest) => manifest, - }; - instance.manifest = Arc::new(parking_lot::RwLock::new(manifest)); + instance.refresh_manifest().await?; Ok(instance) } @@ -75,7 +68,7 @@ impl ObjectStoreMetaSnapshotStorage { self.store .upload(&self.get_manifest_path(), bytes.into()) .await?; - *self.manifest.write() = new_manifest; + *self.manifest.write() = Arc::new(new_manifest); Ok(()) } @@ -107,6 +100,7 @@ impl ObjectStoreMetaSnapshotStorage { format!("{}/{}.snapshot", self.path, id) } + #[allow(dead_code)] fn get_snapshot_id_from_path(path: &str) -> MetaSnapshotId { let split = path.split(&['/', '.']).collect_vec(); debug_assert!(split.len() > 2); @@ -124,7 +118,7 @@ impl MetaSnapshotStorage for ObjectStoreMetaSnapshotStorage { self.store.upload(&path, snapshot.encode().into()).await?; // update manifest last - let mut new_manifest = self.manifest.read().clone(); + let mut new_manifest = (**self.manifest.read()).clone(); new_manifest.manifest_id += 1; new_manifest .snapshot_metadata @@ -142,14 +136,24 @@ impl MetaSnapshotStorage for ObjectStoreMetaSnapshotStorage { MetaSnapshot::decode(&data) } - async fn list(&self) -> BackupResult> { - Ok(self.manifest.read().snapshot_metadata.clone()) + fn manifest(&self) -> Arc { + self.manifest.read().clone() + } + + async fn refresh_manifest(&self) -> BackupResult<()> { + if let Some(manifest) = self.get_manifest().await? { + let mut guard = self.manifest.write(); + if manifest.manifest_id > guard.manifest_id { + *guard = Arc::new(manifest); + } + } + Ok(()) } async fn delete(&self, ids: &[MetaSnapshotId]) -> BackupResult<()> { // update manifest first let to_delete: HashSet = HashSet::from_iter(ids.iter().cloned()); - let mut new_manifest = self.manifest.read().clone(); + let mut new_manifest = (**self.manifest.read()).clone(); new_manifest.manifest_id += 1; new_manifest .snapshot_metadata @@ -171,10 +175,13 @@ impl From for BackupError { } } -pub struct DummyBackupStorage {} +#[derive(Default)] +pub struct DummyMetaSnapshotStorage { + manifest: Arc, +} #[async_trait::async_trait] -impl MetaSnapshotStorage for DummyBackupStorage { +impl MetaSnapshotStorage for DummyMetaSnapshotStorage { async fn create(&self, _snapshot: &MetaSnapshot) -> BackupResult<()> { panic!("should not create from DummyBackupStorage") } @@ -183,9 +190,12 @@ impl MetaSnapshotStorage for DummyBackupStorage { panic!("should not get from DummyBackupStorage") } - async fn list(&self) -> BackupResult> { - // Satisfy `BackupManager` - Ok(vec![]) + fn manifest(&self) -> Arc { + self.manifest.clone() + } + + async fn refresh_manifest(&self) -> BackupResult<()> { + Ok(()) } async fn delete(&self, _ids: &[MetaSnapshotId]) -> BackupResult<()> { diff --git a/src/storage/hummock_test/src/test_utils.rs b/src/storage/hummock_test/src/test_utils.rs index 7777ee10d0773..ac3f380f19fb2 100644 --- a/src/storage/hummock_test/src/test_utils.rs +++ b/src/storage/hummock_test/src/test_utils.rs @@ -31,10 +31,12 @@ use risingwave_meta::hummock::test_utils::setup_compute_env; use risingwave_meta::hummock::{HummockManager, HummockManagerRef, MockHummockMetaClient}; use risingwave_meta::manager::{MessageStatus, MetaSrvEnv, NotificationManagerRef, WorkerKey}; use risingwave_meta::storage::{MemStore, MetaStore}; +use risingwave_pb::backup_service::MetaBackupManifestId; use risingwave_pb::common::WorkerNode; use risingwave_pb::hummock::pin_version_response; use risingwave_pb::meta::{MetaSnapshot, SubscribeResponse, SubscribeType}; use risingwave_storage::error::StorageResult; +use risingwave_storage::hummock::backup_reader::BackupReader; use risingwave_storage::hummock::event_handler::HummockEvent; use risingwave_storage::hummock::iterator::test_utils::mock_sstable_store; use risingwave_storage::hummock::local_version::pinned_version::PinnedVersion; @@ -106,6 +108,7 @@ impl NotificationClient for TestNotificationClient { let meta_snapshot = MetaSnapshot { hummock_version: Some(hummock_version), version: Some(Default::default()), + meta_backup_manifest_id: Some(MetaBackupManifestId { id: 0 }), ..Default::default() }; @@ -140,9 +143,14 @@ pub async fn prepare_first_valid_version( let (tx, mut rx) = unbounded_channel(); let notification_client = get_test_notification_client(env, hummock_manager_ref.clone(), worker_node.clone()); + let backup_manager = BackupReader::unused(); let observer_manager = ObserverManager::new( notification_client, - HummockObserverNode::new(Arc::new(FilterKeyExtractorManager::default()), tx.clone()), + HummockObserverNode::new( + Arc::new(FilterKeyExtractorManager::default()), + backup_manager, + tx.clone(), + ), ) .await; let _ = observer_manager.start().await; diff --git a/src/storage/src/hummock/backup_reader.rs b/src/storage/src/hummock/backup_reader.rs new file mode 100644 index 0000000000000..f438df2ebb81c --- /dev/null +++ b/src/storage/src/hummock/backup_reader.rs @@ -0,0 +1,202 @@ +// Copyright 2022 Singularity Data +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::{HashMap, HashSet}; +use std::future::Future; +use std::pin::Pin; +use std::sync::Arc; +use std::time::Duration; + +use futures::future::Shared; +use futures::FutureExt; +use risingwave_backup::error::BackupError; +use risingwave_backup::meta_snapshot::MetaSnapshot; +use risingwave_backup::storage::{ + DummyMetaSnapshotStorage, MetaSnapshotStorageRef, ObjectStoreMetaSnapshotStorage, +}; +use risingwave_backup::MetaSnapshotId; +use risingwave_common::config::RwConfig; +use risingwave_object_store::object::object_metrics::ObjectStoreMetrics; +use risingwave_object_store::object::parse_remote_object_store; + +use crate::error::{StorageError, StorageResult}; +use crate::hummock::local_version::pinned_version::{PinVersionAction, PinnedVersion}; +use crate::hummock::HummockError; + +pub type BackupReaderRef = Arc; + +type VersionHolder = ( + PinnedVersion, + tokio::sync::mpsc::UnboundedReceiver, +); + +pub async fn parse_meta_snapshot_storage( + config: &RwConfig, +) -> StorageResult { + let backup_object_store = Arc::new( + parse_remote_object_store( + &config.backup.storage_url, + Arc::new(ObjectStoreMetrics::unused()), + true, + ) + .await, + ); + let store = Arc::new( + ObjectStoreMetaSnapshotStorage::new(&config.backup.storage_directory, backup_object_store) + .await?, + ); + Ok(store) +} + +type InflightRequest = Shared> + Send>>>; +/// `BackupReader` helps to access historical hummock versions, +/// which are persisted in meta snapshots (aka backups). +pub struct BackupReader { + versions: parking_lot::RwLock>, + inflight_request: parking_lot::Mutex>, + store: MetaSnapshotStorageRef, + refresh_tx: tokio::sync::mpsc::UnboundedSender, +} + +impl BackupReader { + pub fn new(store: MetaSnapshotStorageRef) -> BackupReaderRef { + let (refresh_tx, refresh_rx) = tokio::sync::mpsc::unbounded_channel(); + let instance = Arc::new(Self { + store, + versions: Default::default(), + inflight_request: Default::default(), + refresh_tx, + }); + tokio::spawn(Self::start_manifest_refresher(instance.clone(), refresh_rx)); + instance + } + + pub fn unused() -> BackupReaderRef { + Self::new(Arc::new(DummyMetaSnapshotStorage::default())) + } + + async fn start_manifest_refresher( + backup_reader: BackupReaderRef, + mut refresh_rx: tokio::sync::mpsc::UnboundedReceiver, + ) { + loop { + let expect_manifest_id = refresh_rx.recv().await; + if expect_manifest_id.is_none() { + break; + } + let expect_manifest_id = expect_manifest_id.unwrap(); + let previous_id = backup_reader.store.manifest().manifest_id; + if expect_manifest_id <= previous_id { + continue; + } + if let Err(e) = backup_reader.store.refresh_manifest().await { + // reschedule refresh request + tracing::warn!("failed to refresh backup manifest, will retry. {}", e); + let backup_reader_clone = backup_reader.clone(); + tokio::spawn(async move { + tokio::time::sleep(Duration::from_secs(60)).await; + backup_reader_clone.try_refresh_manifest(expect_manifest_id); + }); + continue; + } + // purge stale version cache + let manifest: HashSet = backup_reader + .store + .manifest() + .snapshot_metadata + .iter() + .map(|s| s.id) + .collect(); + backup_reader + .versions + .write() + .retain(|k, _v| manifest.contains(k)); + } + } + + pub fn try_refresh_manifest(self: &BackupReaderRef, min_manifest_id: u64) { + let _ = self + .refresh_tx + .send(min_manifest_id) + .inspect_err(|e| tracing::warn!("failed to send refresh_manifest request {}", e)); + } + + /// Tries to get a hummock version eligible for querying `epoch`. + /// SSTs of the returned version are expected to be guarded by corresponding backup. + /// Otherwise, reading the version may encounter object store error, due to SST absence. + pub async fn try_get_hummock_version( + self: &BackupReaderRef, + epoch: u64, + ) -> StorageResult> { + // 1. check manifest to locate snapshot, if any. + let snapshot = self + .store + .manifest() + .snapshot_metadata + .iter() + .find(|v| epoch >= v.safe_epoch && epoch <= v.max_committed_epoch) + .cloned(); + let snapshot_meta = match snapshot { + None => { + return Ok(None); + } + Some(s) => s, + }; + let snapshot_id = snapshot_meta.id; + // 2. load hummock version of chosen snapshot. + let future = { + let mut req_guard = self.inflight_request.lock(); + if let Some((v, _)) = self.versions.read().get(&snapshot_meta.id) { + return Ok(Some(v.clone())); + } + if let Some(f) = req_guard.get(&snapshot_id) { + f.clone() + } else { + let this = self.clone(); + let f = async move { + let snapshot = this + .store + .get(snapshot_id) + .await + .map_err(|e| e.to_string())?; + let version_holder = build_version_holder(snapshot); + let version_clone = version_holder.0.clone(); + this.versions.write().insert(snapshot_id, version_holder); + Ok(version_clone) + } + .boxed() + .shared(); + req_guard.insert(snapshot_id, f.clone()); + f + } + }; + let result = future + .await + .map(Some) + .map_err(|e| HummockError::other(e).into()); + self.inflight_request.lock().remove(&snapshot_id); + result + } +} + +fn build_version_holder(s: MetaSnapshot) -> VersionHolder { + let (tx, rx) = tokio::sync::mpsc::unbounded_channel(); + (PinnedVersion::new(s.metadata.hummock_version, tx), rx) +} + +impl From for StorageError { + fn from(e: BackupError) -> Self { + HummockError::other(e).into() + } +} diff --git a/src/storage/src/hummock/compaction_group_client.rs b/src/storage/src/hummock/compaction_group_client.rs deleted file mode 100644 index 4ead9bb960986..0000000000000 --- a/src/storage/src/hummock/compaction_group_client.rs +++ /dev/null @@ -1,234 +0,0 @@ -// Copyright 2022 Singularity Data -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use std::collections::hash_map::Entry; -use std::collections::{HashMap, HashSet}; -use std::ops::DerefMut; -use std::sync::Arc; - -use parking_lot::{Mutex, RwLock}; -use risingwave_hummock_sdk::compaction_group::StateTableId; -use risingwave_hummock_sdk::CompactionGroupId; -use risingwave_pb::hummock::CompactionGroup; -use risingwave_rpc_client::HummockMetaClient; -use tokio::sync::oneshot; - -use crate::hummock::{HummockError, HummockResult}; - -pub enum CompactionGroupClientImpl { - Meta(Arc), - Dummy(DummyCompactionGroupClient), -} - -impl CompactionGroupClientImpl { - pub async fn get_compaction_group_id( - &self, - table_id: StateTableId, - ) -> HummockResult { - match self { - CompactionGroupClientImpl::Meta(c) => c.get_compaction_group_id(table_id).await, - CompactionGroupClientImpl::Dummy(c) => Ok(c.get_compaction_group_id()), - } - } - - pub fn update_by( - &self, - compaction_groups: Vec, - is_complete_snapshot: bool, - all_table_ids: &[StateTableId], - ) { - match self { - CompactionGroupClientImpl::Meta(c) => { - c.update_by(compaction_groups, is_complete_snapshot, all_table_ids) - } - CompactionGroupClientImpl::Dummy(_) => (), - } - } -} - -/// `CompactionGroupClientImpl` maintains compaction group metadata cache. -pub struct MetaCompactionGroupClient { - // Lock order: wait_queue before cache - wait_queue: Mutex>>>, - cache: RwLock, - hummock_meta_client: Arc, -} - -impl MetaCompactionGroupClient { - /// TODO: cache is synced on need currently. We can refactor it to push based after #3679. - async fn get_compaction_group_id( - self: &Arc, - table_id: StateTableId, - ) -> HummockResult { - // We wait for cache update for at most twice. - // For the first time there may already be an inflight RPC when cache miss, whose response - // may not contain wanted cache entry. For the second time the new RPC must contain - // wanted cache entry, no matter the RPC is fired by this task or other. Otherwise, - // the caller is trying to get an inexistent cache entry, which indicates a bug. - let mut wait_counter = 0; - while wait_counter <= 2 { - // 1. Get from cache - if let Some(id) = self.cache.read().get(&table_id) { - return Ok(id); - } - // 2. Otherwise either update cache, or wait for previous update if any. - let waiter = { - let mut guard = self.wait_queue.lock(); - if let Some(id) = self.cache.read().get(&table_id) { - return Ok(id); - } - let wait_queue = guard.deref_mut(); - if let Some(wait_queue) = wait_queue { - let (tx, rx) = oneshot::channel(); - wait_queue.push(tx); - Some(rx) - } else { - *wait_queue = Some(vec![]); - None - } - }; - if let Some(waiter) = waiter { - // Wait for previous update - if let Ok(success) = waiter.await && success { - wait_counter += 1; - } - continue; - } - // Update cache - let this = self.clone(); - tokio::spawn(async move { - let result = this.update().await; - let mut guard = this.wait_queue.lock(); - let wait_queue = guard.deref_mut().take().unwrap(); - for notify in wait_queue { - let _ = notify.send(result.is_ok()); - } - result - }) - .await - .unwrap()?; - wait_counter += 1; - } - Err(HummockError::compaction_group_error(format!( - "compaction group not found for table id {}", - table_id - ))) - } -} - -impl MetaCompactionGroupClient { - pub fn new(hummock_meta_client: Arc) -> Self { - Self { - wait_queue: Default::default(), - cache: Default::default(), - hummock_meta_client, - } - } - - async fn update(&self) -> HummockResult<()> { - let compaction_groups = self - .hummock_meta_client - .get_compaction_groups() - .await - .map_err(HummockError::meta_error)?; - let mut guard = self.cache.write(); - guard.supply_index(compaction_groups, true, false, &[]); - Ok(()) - } - - fn update_by( - &self, - compaction_groups: Vec, - is_complete_snapshot: bool, - all_table_ids: &[StateTableId], - ) { - let mut guard = self.cache.write(); - guard.supply_index( - compaction_groups, - false, - is_complete_snapshot, - all_table_ids, - ); - } -} - -#[derive(Default)] -struct CompactionGroupClientInner { - index: HashMap, -} - -impl CompactionGroupClientInner { - fn get(&self, table_id: &StateTableId) -> Option { - self.index.get(table_id).cloned() - } - - fn update_member_ids( - &mut self, - member_ids: &[StateTableId], - is_pull: bool, - cg_id: CompactionGroupId, - ) { - for table_id in member_ids { - match self.index.entry(*table_id) { - Entry::Occupied(mut entry) => { - if !is_pull { - entry.insert(cg_id); - } - } - Entry::Vacant(entry) => { - entry.insert(cg_id); - } - } - } - } - - fn supply_index( - &mut self, - compaction_groups: Vec, - is_pull: bool, - is_complete_snapshot: bool, - all_table_ids: &[StateTableId], - ) { - if is_complete_snapshot { - self.index.clear(); - } else if !all_table_ids.is_empty() { - let all_table_set: HashSet = all_table_ids.iter().cloned().collect(); - self.index - .retain(|table_id, _| all_table_set.contains(table_id)); - } - for compaction_group in compaction_groups { - let member_ids = compaction_group.get_member_table_ids(); - self.update_member_ids(member_ids, is_pull, compaction_group.get_id()); - } - } -} - -pub struct DummyCompactionGroupClient { - /// Always return this `compaction_group_id`. - compaction_group_id: CompactionGroupId, -} - -impl DummyCompactionGroupClient { - pub fn new(compaction_group_id: CompactionGroupId) -> Self { - Self { - compaction_group_id, - } - } -} - -impl DummyCompactionGroupClient { - fn get_compaction_group_id(&self) -> CompactionGroupId { - self.compaction_group_id - } -} diff --git a/src/storage/src/hummock/error.rs b/src/storage/src/hummock/error.rs index a7fe279db6210..25e19a5754e29 100644 --- a/src/storage/src/hummock/error.rs +++ b/src/storage/src/hummock/error.rs @@ -117,6 +117,10 @@ impl HummockError { HummockErrorInner::ExpiredEpoch { safe_epoch, epoch }.into() } + pub fn is_expired_epoch(&self) -> bool { + matches!(self.inner, HummockErrorInner::ExpiredEpoch { .. }) + } + pub fn compaction_executor(error: impl ToString) -> HummockError { HummockErrorInner::CompactionExecutor(error.to_string()).into() } diff --git a/src/storage/src/hummock/mod.rs b/src/storage/src/hummock/mod.rs index f1f6ed511f09d..02e7551743058 100644 --- a/src/storage/src/hummock/mod.rs +++ b/src/storage/src/hummock/mod.rs @@ -47,7 +47,6 @@ pub use tiered_cache::*; pub mod sstable; pub use sstable::*; -pub mod compaction_group_client; pub mod compactor; pub mod conflict_detector; mod error; @@ -62,6 +61,7 @@ pub mod test_utils; pub mod utils; pub use compactor::{CompactorMemoryCollector, CompactorSstableStore}; pub use utils::MemoryLimiter; +pub mod backup_reader; pub mod event_handler; pub mod local_version; pub mod observer_manager; @@ -85,6 +85,7 @@ use self::iterator::{BackwardUserIterator, HummockIterator, UserIterator}; pub use self::sstable_store::*; use super::monitor::StateStoreMetrics; use crate::error::StorageResult; +use crate::hummock::backup_reader::{BackupReader, BackupReaderRef}; use crate::hummock::compactor::Context; use crate::hummock::event_handler::hummock_event_handler::BufferTracker; use crate::hummock::event_handler::{HummockEvent, HummockEventHandler}; @@ -137,6 +138,8 @@ pub struct HummockStorage { read_version_mapping: Arc, tracing: Arc, + + backup_reader: BackupReaderRef, } impl HummockStorage { @@ -144,6 +147,7 @@ impl HummockStorage { pub async fn new( options: Arc, sstable_store: SstableStoreRef, + backup_reader: BackupReaderRef, hummock_meta_client: Arc, notification_client: impl NotificationClient, // TODO: separate `HummockStats` from `StateStoreMetrics`. @@ -160,7 +164,11 @@ impl HummockStorage { let observer_manager = ObserverManager::new( notification_client, - HummockObserverNode::new(filter_key_extractor_manager.clone(), event_tx.clone()), + HummockObserverNode::new( + filter_key_extractor_manager.clone(), + backup_reader.clone(), + event_tx.clone(), + ), ) .await; let _ = observer_manager.start().await; @@ -206,6 +214,7 @@ impl HummockStorage { }), read_version_mapping: hummock_event_handler.read_version_mapping(), tracing, + backup_reader, }; tokio::spawn(hummock_event_handler.start_hummock_event_handler_worker()); @@ -301,6 +310,7 @@ impl HummockStorage { Self::new( options, sstable_store, + BackupReader::unused(), hummock_meta_client, notification_client, Arc::new(StateStoreMetrics::unused()), @@ -493,10 +503,14 @@ impl HummockStorageV1 { let filter_key_extractor_manager = Arc::new(FilterKeyExtractorManager::default()); let (event_tx, mut event_rx) = unbounded_channel(); - + let backup_manager = BackupReader::unused(); let observer_manager = ObserverManager::new( notification_client, - HummockObserverNode::new(filter_key_extractor_manager.clone(), event_tx.clone()), + HummockObserverNode::new( + filter_key_extractor_manager.clone(), + backup_manager, + event_tx.clone(), + ), ) .await; let _ = observer_manager.start().await; diff --git a/src/storage/src/hummock/observer_manager.rs b/src/storage/src/hummock/observer_manager.rs index 41c338259f930..e55e6b26c5423 100644 --- a/src/storage/src/hummock/observer_manager.rs +++ b/src/storage/src/hummock/observer_manager.rs @@ -25,11 +25,14 @@ use risingwave_pb::meta::subscribe_response::{Info, Operation}; use risingwave_pb::meta::SubscribeResponse; use tokio::sync::mpsc::UnboundedSender; +use crate::hummock::backup_reader::BackupReaderRef; use crate::hummock::event_handler::HummockEvent; pub struct HummockObserverNode { filter_key_extractor_manager: FilterKeyExtractorManagerRef, + backup_reader: BackupReaderRef, + version_update_sender: UnboundedSender, version: u64, @@ -68,6 +71,10 @@ impl ObserverState for HummockObserverNode { }); } + Info::MetaBackupManifestId(id) => { + self.backup_reader.try_refresh_manifest(id.id); + } + _ => { panic!("error type notification"); } @@ -80,6 +87,12 @@ impl ObserverState for HummockObserverNode { }; self.handle_catalog_snapshot(snapshot.tables); + self.backup_reader.try_refresh_manifest( + snapshot + .meta_backup_manifest_id + .expect("should get meta backup manifest id") + .id, + ); let _ = self .version_update_sender .send(HummockEvent::VersionUpdate( @@ -100,10 +113,12 @@ impl ObserverState for HummockObserverNode { impl HummockObserverNode { pub fn new( filter_key_extractor_manager: FilterKeyExtractorManagerRef, + backup_reader: BackupReaderRef, version_update_sender: UnboundedSender, ) -> Self { Self { filter_key_extractor_manager, + backup_reader, version_update_sender, version: 0, } diff --git a/src/storage/src/hummock/state_store.rs b/src/storage/src/hummock/state_store.rs index ab8912d9e4996..3834c1a509103 100644 --- a/src/storage/src/hummock/state_store.rs +++ b/src/storage/src/hummock/state_store.rs @@ -61,8 +61,9 @@ impl HummockStorage { Bound::Included(TableKey(key.to_vec())), ); - let read_version_tuple = - self.build_read_version_tuple(epoch, read_options.table_id, &key_range)?; + let read_version_tuple = self + .build_read_version_tuple(epoch, read_options.table_id, &key_range) + .await?; self.hummock_version_reader .get(TableKey(key), epoch, read_options, read_version_tuple) @@ -75,28 +76,37 @@ impl HummockStorage { epoch: u64, read_options: ReadOptions, ) -> StorageResult> { - let read_version_tuple = - self.build_read_version_tuple(epoch, read_options.table_id, &key_range)?; + let read_version_tuple = self + .build_read_version_tuple(epoch, read_options.table_id, &key_range) + .await?; self.hummock_version_reader .iter(key_range, epoch, read_options, read_version_tuple) .await } - fn build_read_version_tuple( + async fn build_read_version_tuple( &self, epoch: u64, table_id: TableId, key_range: &TableKeyRange, ) -> StorageResult<(Vec, Vec, CommittedVersion)> { let pinned_version = self.pinned_version.load(); - validate_epoch(pinned_version.safe_epoch(), epoch)?; + let mut committed_version = (**pinned_version).clone(); + if let Err(e) = validate_epoch(pinned_version.safe_epoch(), epoch) { + if e.is_expired_epoch() && let Some(backup_version) = self.backup_reader.try_get_hummock_version(epoch).await? { + committed_version = backup_version; + assert!(epoch <= committed_version.max_committed_epoch() && epoch >= committed_version.safe_epoch()); + } else { + return Err(e.into()); + } + } // check epoch if lower mce let read_version_tuple: (Vec, Vec, CommittedVersion) = - if epoch <= pinned_version.max_committed_epoch() { + if epoch <= committed_version.max_committed_epoch() { // read committed_version directly without build snapshot - (Vec::default(), Vec::default(), (**pinned_version).clone()) + (Vec::default(), Vec::default(), committed_version) } else { let read_version_vec = { let read_guard = self.read_version_mapping.read(); @@ -111,7 +121,7 @@ impl HummockStorage { // When the system has just started and no state has been created, the memory state // may be empty if read_version_vec.is_empty() { - (Vec::default(), Vec::default(), (**pinned_version).clone()) + (Vec::default(), Vec::default(), committed_version) } else { read_filter_for_batch(epoch, table_id, key_range, read_version_vec)? } diff --git a/src/storage/src/store_impl.rs b/src/storage/src/store_impl.rs index 2a0139e99fdd1..b352fd2379439 100644 --- a/src/storage/src/store_impl.rs +++ b/src/storage/src/store_impl.rs @@ -16,7 +16,7 @@ use std::fmt::Debug; use std::sync::Arc; use enum_as_inner::EnumAsInner; -use risingwave_common::config::StorageConfig; +use risingwave_common::config::RwConfig; use risingwave_common_service::observer_manager::RpcNotificationClient; use risingwave_hummock_sdk::filter_key_extractor::FilterKeyExtractorManagerRef; use risingwave_object_store::object::{ @@ -24,6 +24,7 @@ use risingwave_object_store::object::{ }; use crate::error::StorageResult; +use crate::hummock::backup_reader::{parse_meta_snapshot_storage, BackupReader}; use crate::hummock::hummock_meta_client::MonitoredHummockMetaClient; use crate::hummock::sstable_store::SstableStoreRef; use crate::hummock::{ @@ -464,13 +465,14 @@ impl StateStoreImpl { pub async fn new( s: &str, file_cache_dir: &str, - config: Arc, + rw_config: &RwConfig, hummock_meta_client: Arc, state_store_stats: Arc, object_store_metrics: Arc, tiered_cache_metrics_builder: TieredCacheMetricsBuilder, tracing: Arc, ) -> StorageResult { + let config = Arc::new(rw_config.storage.clone()); #[cfg(not(target_os = "linux"))] let tiered_cache = TieredCache::none(); @@ -531,9 +533,12 @@ impl StateStoreImpl { RpcNotificationClient::new(hummock_meta_client.get_inner().clone()); if !config.enable_state_store_v1 { + let backup_store = parse_meta_snapshot_storage(rw_config).await?; + let backup_reader = BackupReader::new(backup_store); let inner = HummockStorage::new( config.clone(), sstable_store, + backup_reader, hummock_meta_client.clone(), notification_client, state_store_stats.clone(), diff --git a/src/tests/compaction_test/src/runner.rs b/src/tests/compaction_test/src/runner.rs index 6d6cd6fcc9dbb..bdf48c853485d 100644 --- a/src/tests/compaction_test/src/runner.rs +++ b/src/tests/compaction_test/src/runner.rs @@ -14,7 +14,7 @@ use std::collections::{BTreeMap, HashSet}; use std::net::SocketAddr; -use std::ops::Bound; +use std::ops::{Bound, Deref}; use std::pin::Pin; use std::sync::Arc; use std::thread::JoinHandle; @@ -26,7 +26,7 @@ use clap::Parser; use futures::TryStreamExt; use itertools::Itertools; use risingwave_common::catalog::TableId; -use risingwave_common::config::{load_config, StorageConfig}; +use risingwave_common::config::{load_config, RwConfig, StorageConfig}; use risingwave_common::util::addr::HostAddr; use risingwave_hummock_sdk::{CompactionGroupId, HummockEpoch, FIRST_VERSION_ID}; use risingwave_pb::common::WorkerType; @@ -657,11 +657,15 @@ pub async fn create_hummock_store_with_metrics( state_store_metrics: Arc::new(StateStoreMetrics::unused()), object_store_metrics: Arc::new(ObjectStoreMetrics::unused()), }; + let rw_config = RwConfig { + storage: storage_config.deref().clone(), + ..Default::default() + }; let state_store_impl = StateStoreImpl::new( &opts.state_store, "", - storage_config, + &rw_config, Arc::new(MonitoredHummockMetaClient::new( meta_client.clone(), metrics.hummock_metrics.clone(),