diff --git a/proto/batch_plan.proto b/proto/batch_plan.proto index 8a196eecf5021..5881c11e1db40 100644 --- a/proto/batch_plan.proto +++ b/proto/batch_plan.proto @@ -27,6 +27,7 @@ message RowSeqScanNode { // The pushed down `batch_limit`. Max rows needed to return. optional uint64 limit = 6; + optional plan_common.AsOf as_of = 7; } message SysRowSeqScanNode { @@ -277,6 +278,7 @@ message LocalLookupJoinNode { // Null safe means it treats `null = null` as true. // Each key pair can be null safe independently. (left_key, right_key, null_safe) repeated bool null_safe = 11; + optional plan_common.AsOf as_of = 12; } // RFC: A new schedule way for distributed lookup join @@ -293,6 +295,7 @@ message DistributedLookupJoinNode { // Null safe means it treats `null = null` as true. // Each key pair can be null safe independently. (left_key, right_key, null_safe) repeated bool null_safe = 9; + optional plan_common.AsOf as_of = 10; } message UnionNode {} diff --git a/proto/common.proto b/proto/common.proto index 164150379c484..12231ac80152b 100644 --- a/proto/common.proto +++ b/proto/common.proto @@ -103,6 +103,7 @@ message BatchQueryEpoch { uint64 committed = 1; uint64 current = 2; uint64 backup = 3; + uint64 time_travel = 4; } } diff --git a/proto/hummock.proto b/proto/hummock.proto index 149944831a4f9..947904df00868 100644 --- a/proto/hummock.proto +++ b/proto/hummock.proto @@ -104,7 +104,7 @@ message GroupDelta { IntraLevelDelta intra_level = 1; GroupConstruct group_construct = 2; GroupDestroy group_destroy = 3; - GroupMetaChange group_meta_change = 4; + GroupMetaChange group_meta_change = 4 [deprecated = true]; GroupTableChange group_table_change = 5 [deprecated = true]; } } @@ -821,6 +821,14 @@ message CancelCompactTaskResponse { bool ret = 1; } +message GetVersionByEpochRequest { + uint64 epoch = 1; +} + +message GetVersionByEpochResponse { + HummockVersion version = 1; +} + service HummockManagerService { rpc UnpinVersionBefore(UnpinVersionBeforeRequest) returns (UnpinVersionBeforeResponse); rpc GetCurrentVersion(GetCurrentVersionRequest) returns (GetCurrentVersionResponse); @@ -861,6 +869,7 @@ service HummockManagerService { rpc ListCompactTaskProgress(ListCompactTaskProgressRequest) returns (ListCompactTaskProgressResponse); rpc CancelCompactTask(CancelCompactTaskRequest) returns (CancelCompactTaskResponse); rpc ListChangeLogEpochs(ListChangeLogEpochsRequest) returns (ListChangeLogEpochsResponse); + rpc GetVersionByEpoch(GetVersionByEpochRequest) returns (GetVersionByEpochResponse); } message CompactionConfig { diff --git a/proto/plan_common.proto b/proto/plan_common.proto index 31718ed9ac5cc..42ab602fc94d2 100644 --- a/proto/plan_common.proto +++ b/proto/plan_common.proto @@ -97,6 +97,21 @@ message StorageTableDesc { optional uint32 retention_seconds = 11; } +message AsOf { + message ProcessTime {} + message Timestamp { + int64 timestamp = 1; + } + message Version { + int64 version = 1; + } + oneof as_of_type { + ProcessTime process_time = 1; + Timestamp timestamp = 2; + Version version = 3; + } +} + // Represents a table in external database for CDC scenario message ExternalTableDesc { uint32 table_id = 1; diff --git a/src/batch/src/error.rs b/src/batch/src/error.rs index 27f355aed48b3..c6a13f5411852 100644 --- a/src/batch/src/error.rs +++ b/src/batch/src/error.rs @@ -146,6 +146,13 @@ pub enum BatchError { #[backtrace] opendal::Error, ), + + #[error("Failed to execute time travel query")] + TimeTravel( + #[source] + #[backtrace] + anyhow::Error, + ), } // Serialize/deserialize error. diff --git a/src/batch/src/executor/join/distributed_lookup_join.rs b/src/batch/src/executor/join/distributed_lookup_join.rs index 1ff32d9631a8f..f5ad5ab5ed984 100644 --- a/src/batch/src/executor/join/distributed_lookup_join.rs +++ b/src/batch/src/executor/join/distributed_lookup_join.rs @@ -36,8 +36,8 @@ use risingwave_storage::{dispatch_state_store, StateStore}; use crate::error::Result; use crate::executor::join::JoinType; use crate::executor::{ - BoxedDataChunkStream, BoxedExecutor, BoxedExecutorBuilder, BufferChunkExecutor, Executor, - ExecutorBuilder, LookupExecutorBuilder, LookupJoinBase, + unix_timestamp_sec_to_epoch, AsOf, BoxedDataChunkStream, BoxedExecutor, BoxedExecutorBuilder, + BufferChunkExecutor, Executor, ExecutorBuilder, LookupExecutorBuilder, LookupJoinBase, }; use crate::task::{BatchTaskContext, ShutdownToken}; @@ -93,6 +93,24 @@ impl BoxedExecutorBuilder for DistributedLookupJoinExecutorBuilder { NodeBody::DistributedLookupJoin )?; + // as_of takes precedence + let as_of = distributed_lookup_join_node + .as_of + .as_ref() + .map(AsOf::try_from) + .transpose()?; + let query_epoch = as_of + .map(|a| { + let epoch = unix_timestamp_sec_to_epoch(a.timestamp).0; + tracing::debug!(epoch, "time travel"); + risingwave_pb::common::BatchQueryEpoch { + epoch: Some(risingwave_pb::common::batch_query_epoch::Epoch::TimeTravel( + epoch, + )), + } + }) + .unwrap_or_else(|| source.epoch()); + let join_type = JoinType::from_prost(distributed_lookup_join_node.get_join_type()?); let condition = match distributed_lookup_join_node.get_condition() { Ok(cond_prost) => Some(build_from_prost(cond_prost)?), @@ -179,12 +197,11 @@ impl BoxedExecutorBuilder for DistributedLookupJoinExecutorBuilder { let vnodes = Some(TableDistribution::all_vnodes()); dispatch_state_store!(source.context().state_store(), state_store, { let table = StorageTable::new_partial(state_store, column_ids, vnodes, table_desc); - let inner_side_builder = InnerSideExecutorBuilder::new( outer_side_key_types, inner_side_key_types.clone(), lookup_prefix_len, - source.epoch(), + query_epoch, vec![], table, chunk_size, diff --git a/src/batch/src/executor/join/local_lookup_join.rs b/src/batch/src/executor/join/local_lookup_join.rs index 54d3185de3dca..f5fbcd682d7f8 100644 --- a/src/batch/src/executor/join/local_lookup_join.rs +++ b/src/batch/src/executor/join/local_lookup_join.rs @@ -42,8 +42,8 @@ use risingwave_pb::plan_common::StorageTableDesc; use crate::error::Result; use crate::executor::{ - BoxedDataChunkStream, BoxedExecutor, BoxedExecutorBuilder, DummyExecutor, Executor, - ExecutorBuilder, JoinType, LookupJoinBase, + unix_timestamp_sec_to_epoch, AsOf, BoxedDataChunkStream, BoxedExecutor, BoxedExecutorBuilder, + DummyExecutor, Executor, ExecutorBuilder, JoinType, LookupJoinBase, }; use crate::task::{BatchTaskContext, ShutdownToken, TaskId}; @@ -66,6 +66,7 @@ struct InnerSideExecutorBuilder { chunk_size: usize, shutdown_rx: ShutdownToken, next_stage_id: usize, + as_of: Option, } /// Used to build the executor for the inner side @@ -108,6 +109,7 @@ impl InnerSideExecutorBuilder { ordered: false, vnode_bitmap: Some(vnode_bitmap.finish().to_protobuf()), limit: None, + as_of: self.as_of.as_ref().map(Into::into), }); Ok(row_seq_scan_node) @@ -296,6 +298,24 @@ impl BoxedExecutorBuilder for LocalLookupJoinExecutorBuilder { source.plan_node().get_node_body().unwrap(), NodeBody::LocalLookupJoin )?; + // as_of takes precedence + let as_of = lookup_join_node + .as_of + .as_ref() + .map(AsOf::try_from) + .transpose()?; + let query_epoch = as_of + .as_ref() + .map(|a| { + let epoch = unix_timestamp_sec_to_epoch(a.timestamp).0; + tracing::debug!(epoch, "time travel"); + risingwave_pb::common::BatchQueryEpoch { + epoch: Some(risingwave_pb::common::batch_query_epoch::Epoch::TimeTravel( + epoch, + )), + } + }) + .unwrap_or_else(|| source.epoch()); let join_type = JoinType::from_prost(lookup_join_node.get_join_type()?); let condition = match lookup_join_node.get_condition() { @@ -402,12 +422,13 @@ impl BoxedExecutorBuilder for LocalLookupJoinExecutorBuilder { lookup_prefix_len, context: source.context().clone(), task_id: source.task_id.clone(), - epoch: source.epoch(), + epoch: query_epoch, worker_slot_to_scan_range_mapping: HashMap::new(), chunk_size, shutdown_rx: source.shutdown_rx.clone(), next_stage_id: 0, worker_slot_mapping, + as_of, }; let identity = source.plan_node().get_identity().clone(); diff --git a/src/batch/src/executor/row_seq_scan.rs b/src/batch/src/executor/row_seq_scan.rs index 988122218479f..438416ba5c191 100644 --- a/src/batch/src/executor/row_seq_scan.rs +++ b/src/batch/src/executor/row_seq_scan.rs @@ -28,7 +28,8 @@ use risingwave_common::util::value_encoding::deserialize_datum; use risingwave_pb::batch_plan::plan_node::NodeBody; use risingwave_pb::batch_plan::{scan_range, PbScanRange}; use risingwave_pb::common::BatchQueryEpoch; -use risingwave_pb::plan_common::StorageTableDesc; +use risingwave_pb::plan_common::as_of::AsOfType; +use risingwave_pb::plan_common::{as_of, PbAsOf, StorageTableDesc}; use risingwave_storage::store::PrefetchOptions; use risingwave_storage::table::batch_table::storage_table::StorageTable; use risingwave_storage::table::{collect_data_chunk, TableDistribution}; @@ -55,6 +56,7 @@ pub struct RowSeqScanExecutor { ordered: bool, epoch: BatchQueryEpoch, limit: Option, + as_of: Option, } /// Range for batch scan. @@ -66,6 +68,36 @@ pub struct ScanRange { pub next_col_bounds: (Bound, Bound), } +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +pub struct AsOf { + pub timestamp: i64, +} + +impl TryFrom<&PbAsOf> for AsOf { + type Error = BatchError; + + fn try_from(pb: &PbAsOf) -> std::result::Result { + match pb.as_of_type.as_ref().unwrap() { + AsOfType::Timestamp(ts) => Ok(Self { + timestamp: ts.timestamp, + }), + AsOfType::ProcessTime(_) | AsOfType::Version(_) => Err(BatchError::TimeTravel( + anyhow::anyhow!("batch query does not support as of process time or version"), + )), + } + } +} + +impl From<&AsOf> for PbAsOf { + fn from(v: &AsOf) -> Self { + PbAsOf { + as_of_type: Some(AsOfType::Timestamp(as_of::Timestamp { + timestamp: v.timestamp, + })), + } + } +} + impl ScanRange { /// Create a scan range from the prost representation. pub fn new( @@ -134,6 +166,7 @@ impl RowSeqScanExecutor { identity: String, limit: Option, metrics: Option, + as_of: Option, ) -> Self { Self { chunk_size, @@ -144,6 +177,7 @@ impl RowSeqScanExecutor { ordered, epoch, limit, + as_of, } } } @@ -205,6 +239,11 @@ impl BoxedExecutorBuilder for RowSeqScanExecutorBuilder { let epoch = source.epoch.clone(); let limit = seq_scan_node.limit; + let as_of = seq_scan_node + .as_of + .as_ref() + .map(AsOf::try_from) + .transpose()?; let chunk_size = if let Some(limit) = seq_scan_node.limit { (limit as u32).min(source.context.get_config().developer.chunk_size as u32) } else { @@ -223,6 +262,7 @@ impl BoxedExecutorBuilder for RowSeqScanExecutorBuilder { source.plan_node().get_identity().clone(), limit, metrics, + as_of, ))) }) } @@ -254,8 +294,21 @@ impl RowSeqScanExecutor { ordered, epoch, limit, + as_of, } = *self; let table = Arc::new(table); + // as_of takes precedence + let query_epoch = as_of + .map(|a| { + let epoch = unix_timestamp_sec_to_epoch(a.timestamp).0; + tracing::debug!(epoch, identity, "time travel"); + risingwave_pb::common::BatchQueryEpoch { + epoch: Some(risingwave_pb::common::batch_query_epoch::Epoch::TimeTravel( + epoch, + )), + } + }) + .unwrap_or_else(|| epoch); // Create collector. let histogram = metrics.as_ref().map(|metrics| { @@ -288,7 +341,8 @@ impl RowSeqScanExecutor { for point_get in point_gets { let table = table.clone(); if let Some(row) = - Self::execute_point_get(table, point_get, epoch.clone(), histogram.clone()).await? + Self::execute_point_get(table, point_get, query_epoch.clone(), histogram.clone()) + .await? { if let Some(chunk) = data_chunk_builder.append_one_row(row) { returned += chunk.cardinality() as u64; @@ -319,7 +373,7 @@ impl RowSeqScanExecutor { table.clone(), range, ordered, - epoch.clone(), + query_epoch.clone(), chunk_size, limit, histogram.clone(), @@ -443,3 +497,10 @@ impl RowSeqScanExecutor { } } } + +pub fn unix_timestamp_sec_to_epoch(ts: i64) -> risingwave_common::util::epoch::Epoch { + let ts = ts.checked_add(1).unwrap(); + risingwave_common::util::epoch::Epoch::from_unix_millis( + u64::try_from(ts).unwrap().checked_mul(1000).unwrap(), + ) +} diff --git a/src/common/src/config.rs b/src/common/src/config.rs index 8b8025d33eb43..41bd3aeccd92f 100644 --- a/src/common/src/config.rs +++ b/src/common/src/config.rs @@ -220,6 +220,20 @@ pub struct MetaConfig { #[serde(default = "default::meta::enable_hummock_data_archive")] pub enable_hummock_data_archive: bool, + /// If enabled, time travel query is available. + #[serde(default = "default::meta::enable_hummock_time_travel")] + pub enable_hummock_time_travel: bool, + + /// The data retention period for time travel. + #[serde(default = "default::meta::hummock_time_travel_retention_ms")] + pub hummock_time_travel_retention_ms: u64, + + /// The interval at which a Hummock version snapshot is taken for time travel. + /// + /// Larger value indicates less storage overhead but worse query performance. + #[serde(default = "default::meta::hummock_time_travel_snapshot_interval")] + pub hummock_time_travel_snapshot_interval: u64, + /// The minimum delta log number a new checkpoint should compact, otherwise the checkpoint /// attempt is rejected. #[serde(default = "default::meta::min_delta_log_num_for_hummock_version_checkpoint")] @@ -1268,7 +1282,7 @@ pub mod default { } pub fn vacuum_spin_interval_ms() -> u64 { - 10 + 200 } pub fn hummock_version_checkpoint_interval_sec() -> u64 { @@ -1279,6 +1293,18 @@ pub mod default { false } + pub fn enable_hummock_time_travel() -> bool { + false + } + + pub fn hummock_time_travel_retention_ms() -> u64 { + 24 * 3600 * 1000 + } + + pub fn hummock_time_travel_snapshot_interval() -> u64 { + 100 + } + pub fn min_delta_log_num_for_hummock_version_checkpoint() -> u64 { 10 } diff --git a/src/compute/tests/cdc_tests.rs b/src/compute/tests/cdc_tests.rs index 26f822bae558f..ac48c42c22e92 100644 --- a/src/compute/tests/cdc_tests.rs +++ b/src/compute/tests/cdc_tests.rs @@ -381,6 +381,7 @@ async fn test_cdc_backfill() -> StreamResult<()> { "RowSeqExecutor2".to_string(), None, None, + None, )); // check result diff --git a/src/compute/tests/integration_tests.rs b/src/compute/tests/integration_tests.rs index 14dbfed466158..fbeda5c59e834 100644 --- a/src/compute/tests/integration_tests.rs +++ b/src/compute/tests/integration_tests.rs @@ -267,6 +267,7 @@ async fn test_table_materialize() -> StreamResult<()> { "RowSeqExecutor2".to_string(), None, None, + None, )); let mut stream = scan.execute(); let result = stream.next().await; @@ -337,6 +338,7 @@ async fn test_table_materialize() -> StreamResult<()> { "RowSeqScanExecutor2".to_string(), None, None, + None, )); let mut stream = scan.execute(); @@ -416,6 +418,7 @@ async fn test_table_materialize() -> StreamResult<()> { "RowSeqScanExecutor2".to_string(), None, None, + None, )); let mut stream = scan.execute(); @@ -487,6 +490,7 @@ async fn test_row_seq_scan() -> StreamResult<()> { "RowSeqScanExecutor2".to_string(), None, None, + None, )); assert_eq!(executor.schema().fields().len(), 3); diff --git a/src/config/docs.md b/src/config/docs.md index ab33559260162..71bb8bd26e18c 100644 --- a/src/config/docs.md +++ b/src/config/docs.md @@ -36,9 +36,12 @@ This page is automatically generated by `./risedev generate-example-config` | enable_compaction_deterministic | Whether to enable deterministic compaction scheduling, which will disable all auto scheduling of compaction tasks. Should only be used in e2e tests. | false | | enable_dropped_column_reclaim | Whether compactor should rewrite row to remove dropped column. | false | | enable_hummock_data_archive | If enabled, `SSTable` object file and version delta will be retained. `SSTable` object file need to be deleted via full GC. version delta need to be manually deleted. | false | +| enable_hummock_time_travel | If enabled, time travel query is available. | false | | event_log_channel_max_size | Keeps the latest N events per channel. | 10 | | event_log_enabled | | true | | full_gc_interval_sec | Interval of automatic hummock full GC. | 86400 | +| hummock_time_travel_retention_ms | The data retention period for time travel. | 86400000 | +| hummock_time_travel_snapshot_interval | The interval at which a Hummock version snapshot is taken for time travel. Larger value indicates less storage overhead but worse query performance. | 100 | | hummock_version_checkpoint_interval_sec | Interval of hummock version checkpoint. | 30 | | hybrid_partition_vnode_count | Count of partitions of tables in default group and materialized view group. The meta node will decide according to some strategy whether to cut the boundaries of the file according to the vnode alignment. Each partition contains aligned data of `VirtualNode::COUNT / hybrid_partition_vnode_count` consecutive virtual-nodes of one state table. Set it zero to disable this feature. | 4 | | max_heartbeat_interval_secs | Maximum allowed heartbeat interval in seconds. | 60 | @@ -62,7 +65,7 @@ This page is automatically generated by `./risedev generate-example-config` | table_write_throughput_threshold | The threshold of write throughput to trigger a group split. Increase this configuration value to avoid split too many groups with few data write. | 16777216 | | unrecognized | | | | vacuum_interval_sec | Interval of invoking a vacuum job, to remove stale metadata from meta store and objects from object store. | 30 | -| vacuum_spin_interval_ms | The spin interval inside a vacuum job. It avoids the vacuum job monopolizing resources of meta node. | 10 | +| vacuum_spin_interval_ms | The spin interval inside a vacuum job. It avoids the vacuum job monopolizing resources of meta node. | 200 | ## meta.compaction_config diff --git a/src/config/example.toml b/src/config/example.toml index 0e33ba465c9ae..7dc2d9050402f 100644 --- a/src/config/example.toml +++ b/src/config/example.toml @@ -19,9 +19,12 @@ full_gc_interval_sec = 86400 collect_gc_watermark_spin_interval_sec = 5 periodic_compaction_interval_sec = 60 vacuum_interval_sec = 30 -vacuum_spin_interval_ms = 10 +vacuum_spin_interval_ms = 200 hummock_version_checkpoint_interval_sec = 30 enable_hummock_data_archive = false +enable_hummock_time_travel = false +hummock_time_travel_retention_ms = 86400000 +hummock_time_travel_snapshot_interval = 100 min_delta_log_num_for_hummock_version_checkpoint = 10 max_heartbeat_interval_secs = 60 disable_recovery = false diff --git a/src/frontend/src/optimizer/plan_node/batch_lookup_join.rs b/src/frontend/src/optimizer/plan_node/batch_lookup_join.rs index a181c590e0e65..b09f45b65574e 100644 --- a/src/frontend/src/optimizer/plan_node/batch_lookup_join.rs +++ b/src/frontend/src/optimizer/plan_node/batch_lookup_join.rs @@ -16,9 +16,10 @@ use pretty_xmlish::{Pretty, XmlNode}; use risingwave_common::catalog::{ColumnId, TableDesc}; use risingwave_pb::batch_plan::plan_node::NodeBody; use risingwave_pb::batch_plan::{DistributedLookupJoinNode, LocalLookupJoinNode}; +use risingwave_sqlparser::ast::AsOf; use super::batch::prelude::*; -use super::utils::{childless_record, Distill}; +use super::utils::{childless_record, to_pb_time_travel_as_of, Distill}; use super::{generic, ExprRewritable}; use crate::error::Result; use crate::expr::{Expr, ExprRewriter, ExprVisitor}; @@ -54,6 +55,8 @@ pub struct BatchLookupJoin { /// If `distributed_lookup` is true, it will generate `DistributedLookupJoinNode` for /// `ToBatchPb`. Otherwise, it will generate `LookupJoinNode`. distributed_lookup: bool, + + as_of: Option, } impl BatchLookupJoin { @@ -64,6 +67,7 @@ impl BatchLookupJoin { right_output_column_ids: Vec, lookup_prefix_len: usize, distributed_lookup: bool, + as_of: Option, ) -> Self { // We cannot create a `BatchLookupJoin` without any eq keys. We require eq keys to do the // lookup. @@ -79,6 +83,7 @@ impl BatchLookupJoin { right_output_column_ids, lookup_prefix_len, distributed_lookup, + as_of, } } @@ -157,6 +162,7 @@ impl PlanTreeNodeUnary for BatchLookupJoin { self.right_output_column_ids.clone(), self.lookup_prefix_len, self.distributed_lookup, + self.as_of.clone(), ) } } @@ -231,6 +237,7 @@ impl TryToBatchPb for BatchLookupJoin { output_indices: self.core.output_indices.iter().map(|&x| x as u32).collect(), null_safe: self.eq_join_predicate.null_safes(), lookup_prefix_len: self.lookup_prefix_len as u32, + as_of: to_pb_time_travel_as_of(&self.as_of)?, }) } else { NodeBody::LocalLookupJoin(LocalLookupJoinNode { @@ -263,6 +270,7 @@ impl TryToBatchPb for BatchLookupJoin { worker_nodes: vec![], // To be filled in at local.rs null_safe: self.eq_join_predicate.null_safes(), lookup_prefix_len: self.lookup_prefix_len as u32, + as_of: to_pb_time_travel_as_of(&self.as_of)?, }) }) } diff --git a/src/frontend/src/optimizer/plan_node/batch_seq_scan.rs b/src/frontend/src/optimizer/plan_node/batch_seq_scan.rs index a504df99f317f..576793f4dd450 100644 --- a/src/frontend/src/optimizer/plan_node/batch_seq_scan.rs +++ b/src/frontend/src/optimizer/plan_node/batch_seq_scan.rs @@ -20,9 +20,10 @@ use risingwave_common::types::ScalarImpl; use risingwave_common::util::scan_range::{is_full_range, ScanRange}; use risingwave_pb::batch_plan::plan_node::NodeBody; use risingwave_pb::batch_plan::RowSeqScanNode; +use risingwave_sqlparser::ast::AsOf; use super::batch::prelude::*; -use super::utils::{childless_record, Distill}; +use super::utils::{childless_record, to_pb_time_travel_as_of, Distill}; use super::{generic, ExprRewritable, PlanBase, PlanRef, ToDistributedBatch}; use crate::catalog::ColumnId; use crate::error::Result; @@ -39,6 +40,7 @@ pub struct BatchSeqScan { core: generic::TableScan, scan_ranges: Vec, limit: Option, + as_of: Option, } impl BatchSeqScan { @@ -68,12 +70,14 @@ impl BatchSeqScan { ); }) } + let as_of = core.as_of.clone(); Self { base, core, scan_ranges, limit, + as_of, } } @@ -248,6 +252,7 @@ impl TryToBatchPb for BatchSeqScan { vnode_bitmap: None, ordered: !self.order().is_any(), limit: *self.limit(), + as_of: to_pb_time_travel_as_of(&self.as_of)?, })) } } diff --git a/src/frontend/src/optimizer/plan_node/logical_join.rs b/src/frontend/src/optimizer/plan_node/logical_join.rs index e9dac0de38b5a..f253cadec686a 100644 --- a/src/frontend/src/optimizer/plan_node/logical_join.rs +++ b/src/frontend/src/optimizer/plan_node/logical_join.rs @@ -418,6 +418,7 @@ impl LogicalJoin { .collect_vec(); let new_scan_output_column_ids = new_scan.output_column_ids(); + let as_of = new_scan.as_of.clone(); // Construct a new logical join, because we have change its RHS. let new_logical_join = generic::Join::new( @@ -435,6 +436,7 @@ impl LogicalJoin { new_scan_output_column_ids, lookup_prefix_len, false, + as_of, )) } diff --git a/src/frontend/src/optimizer/plan_node/utils.rs b/src/frontend/src/optimizer/plan_node/utils.rs index 9ac61ed759c6d..b9b175ed6f094 100644 --- a/src/frontend/src/optimizer/plan_node/utils.rs +++ b/src/frontend/src/optimizer/plan_node/utils.rs @@ -326,10 +326,13 @@ macro_rules! plan_node_name { pub(crate) use plan_node_name; use risingwave_common::types::DataType; use risingwave_expr::aggregate::AggKind; +use risingwave_pb::plan_common::as_of::AsOfType; +use risingwave_pb::plan_common::{as_of, PbAsOf}; +use risingwave_sqlparser::ast::AsOf; use super::generic::{self, GenericPlanRef, PhysicalPlanRef}; use super::pretty_config; -use crate::error::Result; +use crate::error::{ErrorCode, Result}; use crate::expr::InputRef; use crate::optimizer::plan_node::generic::Agg; use crate::optimizer::plan_node::{BatchSimpleAgg, PlanAggCall}; @@ -388,3 +391,27 @@ pub(crate) fn plan_has_backfill_leaf_nodes(plan: &PlanRef) -> bool { plan.inputs().iter().all(plan_has_backfill_leaf_nodes) } } + +pub fn to_pb_time_travel_as_of(a: &Option) -> Result> { + let Some(ref a) = a else { + return Ok(None); + }; + let as_of_type = match a { + AsOf::ProcessTime => AsOfType::ProcessTime(as_of::ProcessTime {}), + AsOf::TimestampNum(ts) => AsOfType::Timestamp(as_of::Timestamp { timestamp: *ts }), + AsOf::TimestampString(ts) => AsOfType::Timestamp(as_of::Timestamp { + // should already have been validated by the parser + timestamp: ts.parse().unwrap(), + }), + AsOf::VersionNum(_) | AsOf::VersionString(_) => { + return Err(ErrorCode::NotSupported( + "do not support as of version".to_string(), + "please use as of timestamp".to_string(), + ) + .into()); + } + }; + Ok(Some(PbAsOf { + as_of_type: Some(as_of_type), + })) +} diff --git a/src/frontend/src/optimizer/rule/index_selection_rule.rs b/src/frontend/src/optimizer/rule/index_selection_rule.rs index d47ad04de2763..e65b249379750 100644 --- a/src/frontend/src/optimizer/rule/index_selection_rule.rs +++ b/src/frontend/src/optimizer/rule/index_selection_rule.rs @@ -58,6 +58,7 @@ use risingwave_common::types::{ }; use risingwave_common::util::iter_util::ZipEqFast; use risingwave_pb::plan_common::JoinType; +use risingwave_sqlparser::ast::AsOf; use super::{BoxedRule, Rule}; use crate::catalog::IndexCatalog; @@ -95,9 +96,6 @@ impl Rule for IndexSelectionRule { if indexes.is_empty() { return None; } - if logical_scan.as_of().is_some() { - return None; - } let primary_table_row_size = TableScanIoEstimator::estimate_row_size(logical_scan); let primary_cost = min( self.estimate_table_scan_cost(logical_scan, primary_table_row_size), @@ -230,7 +228,7 @@ impl IndexSelectionRule { index.index_table.clone(), vec![], logical_scan.ctx(), - None, + logical_scan.as_of().clone(), index.index_table.cardinality, ); @@ -239,7 +237,7 @@ impl IndexSelectionRule { index.primary_table.clone(), vec![], logical_scan.ctx(), - None, + logical_scan.as_of().clone(), index.primary_table.cardinality, ); @@ -338,7 +336,7 @@ impl IndexSelectionRule { logical_scan.table_catalog(), vec![], logical_scan.ctx(), - None, + logical_scan.as_of().clone(), logical_scan.table_cardinality(), ); @@ -543,9 +541,12 @@ impl IndexSelectionRule { let condition = Condition { conjunctions: conj.iter().map(|&x| x.to_owned()).collect(), }; - if let Some(index_access) = - self.build_index_access(index.clone(), condition, logical_scan.ctx().clone()) - { + if let Some(index_access) = self.build_index_access( + index.clone(), + condition, + logical_scan.ctx().clone(), + logical_scan.as_of().clone(), + ) { result.push(index_access); } } @@ -573,7 +574,7 @@ impl IndexSelectionRule { Condition { conjunctions: conjunctions.to_vec(), }, - None, + logical_scan.as_of().clone(), logical_scan.table_cardinality(), ); @@ -588,6 +589,7 @@ impl IndexSelectionRule { index: Rc, predicate: Condition, ctx: OptimizerContextRef, + as_of: Option, ) -> Option { let mut rewriter = IndexPredicateRewriter::new( index.primary_to_secondary_mapping(), @@ -613,7 +615,7 @@ impl IndexSelectionRule { vec![], ctx, new_predicate, - None, + as_of, index.index_table.cardinality, ) .into(), diff --git a/src/frontend/src/planner/relation.rs b/src/frontend/src/planner/relation.rs index 51d475616e210..594c696d22bd7 100644 --- a/src/frontend/src/planner/relation.rs +++ b/src/frontend/src/planner/relation.rs @@ -74,9 +74,11 @@ impl Planner { pub(super) fn plan_base_table(&mut self, base_table: &BoundBaseTable) -> Result { let as_of = base_table.as_of.clone(); match as_of { - None | Some(AsOf::ProcessTime) => {} - Some(AsOf::TimestampString(_)) | Some(AsOf::TimestampNum(_)) => { - bail_not_implemented!("As Of Timestamp is not supported yet.") + None | Some(AsOf::ProcessTime) | Some(AsOf::TimestampNum(_)) => {} + Some(AsOf::TimestampString(ref s)) => { + if s.parse::().is_err() { + return Err(ErrorCode::InvalidParameterValue(s.to_owned()).into()); + } } Some(AsOf::VersionNum(_)) | Some(AsOf::VersionString(_)) => { bail_not_implemented!("As Of Version is not supported yet.") diff --git a/src/frontend/src/scheduler/snapshot.rs b/src/frontend/src/scheduler/snapshot.rs index 36285b59b5e36..9d13573e03a7b 100644 --- a/src/frontend/src/scheduler/snapshot.rs +++ b/src/frontend/src/scheduler/snapshot.rs @@ -65,7 +65,7 @@ impl ReadSnapshot { match self.batch_query_epoch().epoch.unwrap() { batch_query_epoch::Epoch::Committed(epoch) | batch_query_epoch::Epoch::Current(epoch) => Some(epoch.into()), - batch_query_epoch::Epoch::Backup(_) => None, + batch_query_epoch::Epoch::Backup(_) | batch_query_epoch::Epoch::TimeTravel(_) => None, } } @@ -74,7 +74,8 @@ impl ReadSnapshot { match self.batch_query_epoch().epoch.unwrap() { batch_query_epoch::Epoch::Committed(epoch) | batch_query_epoch::Epoch::Current(epoch) - | batch_query_epoch::Epoch::Backup(epoch) => epoch.into(), + | batch_query_epoch::Epoch::Backup(epoch) + | batch_query_epoch::Epoch::TimeTravel(epoch) => epoch.into(), } } diff --git a/src/meta/model_v2/migration/src/lib.rs b/src/meta/model_v2/migration/src/lib.rs index 267a75c7d6f70..8d67dc0ba2df7 100644 --- a/src/meta/model_v2/migration/src/lib.rs +++ b/src/meta/model_v2/migration/src/lib.rs @@ -12,6 +12,7 @@ mod m20240418_142249_function_runtime; mod m20240506_112555_subscription_partial_ckpt; mod m20240525_090457_secret; mod m20240618_072634_function_compressed_binary; +mod m20240701_060504_hummock_time_travel; mod m20240702_080451_system_param_value; mod m20240702_084927_unnecessary_fk; @@ -33,6 +34,7 @@ impl MigratorTrait for Migrator { Box::new(m20240618_072634_function_compressed_binary::Migration), Box::new(m20240702_080451_system_param_value::Migration), Box::new(m20240702_084927_unnecessary_fk::Migration), + Box::new(m20240701_060504_hummock_time_travel::Migration), ] } } diff --git a/src/meta/model_v2/migration/src/m20240701_060504_hummock_time_travel.rs b/src/meta/model_v2/migration/src/m20240701_060504_hummock_time_travel.rs new file mode 100644 index 0000000000000..7dec44913dc8f --- /dev/null +++ b/src/meta/model_v2/migration/src/m20240701_060504_hummock_time_travel.rs @@ -0,0 +1,138 @@ +use sea_orm_migration::prelude::*; + +use crate::drop_tables; + +#[derive(DeriveMigrationName)] +pub struct Migration; + +#[async_trait::async_trait] +impl MigrationTrait for Migration { + async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> { + manager + .create_table( + Table::create() + .table(HummockSstableInfo::Table) + .if_not_exists() + .col( + ColumnDef::new(HummockSstableInfo::SstId) + .big_integer() + .not_null() + .primary_key(), + ) + .col( + ColumnDef::new(HummockSstableInfo::ObjectId) + .big_integer() + .not_null(), + ) + .col( + ColumnDef::new(HummockSstableInfo::SstableInfo) + .blob(BlobSize::Long) + .null(), + ) + .to_owned(), + ) + .await?; + + manager + .create_table( + Table::create() + .table(HummockTimeTravelVersion::Table) + .if_not_exists() + .col( + ColumnDef::new(HummockTimeTravelVersion::VersionId) + .big_integer() + .not_null() + .primary_key(), + ) + .col( + ColumnDef::new(HummockTimeTravelVersion::Version) + .blob(BlobSize::Long) + .null(), + ) + .to_owned(), + ) + .await?; + + manager + .create_table( + Table::create() + .table(HummockTimeTravelDelta::Table) + .if_not_exists() + .col( + ColumnDef::new(HummockTimeTravelDelta::VersionId) + .big_integer() + .not_null() + .primary_key(), + ) + .col( + ColumnDef::new(HummockTimeTravelDelta::VersionDelta) + .blob(BlobSize::Long) + .null(), + ) + .to_owned(), + ) + .await?; + + manager + .create_table( + Table::create() + .table(HummockEpochToVersion::Table) + .if_not_exists() + .col( + ColumnDef::new(HummockEpochToVersion::Epoch) + .big_integer() + .not_null() + .primary_key(), + ) + .col( + ColumnDef::new(HummockEpochToVersion::VersionId) + .big_integer() + .not_null(), + ) + .to_owned(), + ) + .await?; + + Ok(()) + } + + async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> { + drop_tables!( + manager, + HummockSstableInfo, + HummockTimeTravelVersion, + HummockTimeTravelDelta, + HummockEpochToVersion + ); + Ok(()) + } +} + +#[derive(DeriveIden)] +enum HummockSstableInfo { + Table, + SstId, + ObjectId, + SstableInfo, +} + +#[derive(DeriveIden)] +enum HummockTimeTravelVersion { + Table, + VersionId, + Version, +} + +#[derive(DeriveIden)] +enum HummockTimeTravelDelta { + Table, + VersionId, + VersionDelta, +} + +#[derive(DeriveIden)] +enum HummockEpochToVersion { + Table, + Epoch, + VersionId, +} diff --git a/src/meta/model_v2/src/hummock_epoch_to_version.rs b/src/meta/model_v2/src/hummock_epoch_to_version.rs new file mode 100644 index 0000000000000..181b1b320bc54 --- /dev/null +++ b/src/meta/model_v2/src/hummock_epoch_to_version.rs @@ -0,0 +1,31 @@ +// Copyright 2024 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use sea_orm::entity::prelude::*; +use sea_orm::{DeriveEntityModel, DeriveRelation, EnumIter}; + +use crate::{Epoch, HummockVersionId}; + +#[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq, Default)] +#[sea_orm(table_name = "hummock_epoch_to_version")] +pub struct Model { + #[sea_orm(primary_key, auto_increment = false)] + pub epoch: Epoch, + pub version_id: HummockVersionId, +} + +impl ActiveModelBehavior for ActiveModel {} + +#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)] +pub enum Relation {} diff --git a/src/meta/model_v2/src/hummock_sstable_info.rs b/src/meta/model_v2/src/hummock_sstable_info.rs new file mode 100644 index 0000000000000..a9ca4f33361ea --- /dev/null +++ b/src/meta/model_v2/src/hummock_sstable_info.rs @@ -0,0 +1,35 @@ +// Copyright 2024 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use risingwave_pb::hummock::PbSstableInfo; +use sea_orm::entity::prelude::*; +use sea_orm::{DeriveEntityModel, DeriveRelation, EnumIter}; + +use crate::HummockSstableObjectId; + +#[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq, Default)] +#[sea_orm(table_name = "hummock_sstable_info")] +pub struct Model { + #[sea_orm(primary_key, auto_increment = false)] + pub sst_id: HummockSstableObjectId, + pub object_id: HummockSstableObjectId, + pub sstable_info: SstableInfo, +} + +impl ActiveModelBehavior for ActiveModel {} + +#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)] +pub enum Relation {} + +crate::derive_from_blob!(SstableInfo, PbSstableInfo); diff --git a/src/meta/model_v2/src/hummock_time_travel_delta.rs b/src/meta/model_v2/src/hummock_time_travel_delta.rs new file mode 100644 index 0000000000000..f807c6ec082fa --- /dev/null +++ b/src/meta/model_v2/src/hummock_time_travel_delta.rs @@ -0,0 +1,34 @@ +// Copyright 2024 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use risingwave_pb::hummock::PbHummockVersionDelta; +use sea_orm::entity::prelude::*; +use sea_orm::{DeriveEntityModel, DeriveRelation, EnumIter}; + +use crate::HummockVersionId; + +#[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq, Default)] +#[sea_orm(table_name = "hummock_time_travel_delta")] +pub struct Model { + #[sea_orm(primary_key, auto_increment = false)] + pub version_id: HummockVersionId, + pub version_delta: HummockVersionDelta, +} + +impl ActiveModelBehavior for ActiveModel {} + +#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)] +pub enum Relation {} + +crate::derive_from_blob!(HummockVersionDelta, PbHummockVersionDelta); diff --git a/src/meta/model_v2/src/hummock_time_travel_version.rs b/src/meta/model_v2/src/hummock_time_travel_version.rs new file mode 100644 index 0000000000000..91eb42fb52096 --- /dev/null +++ b/src/meta/model_v2/src/hummock_time_travel_version.rs @@ -0,0 +1,34 @@ +// Copyright 2024 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use risingwave_pb::hummock::PbHummockVersion; +use sea_orm::entity::prelude::*; +use sea_orm::{DeriveEntityModel, DeriveRelation, EnumIter}; + +use crate::HummockVersionId; + +#[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq, Default)] +#[sea_orm(table_name = "hummock_time_travel_version")] +pub struct Model { + #[sea_orm(primary_key, auto_increment = false)] + pub version_id: HummockVersionId, + pub version: HummockVersion, +} + +impl ActiveModelBehavior for ActiveModel {} + +#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)] +pub enum Relation {} + +crate::derive_from_blob!(HummockVersion, PbHummockVersion); diff --git a/src/meta/model_v2/src/lib.rs b/src/meta/model_v2/src/lib.rs index e88dba32ca371..7e8ee1eaa6c80 100644 --- a/src/meta/model_v2/src/lib.rs +++ b/src/meta/model_v2/src/lib.rs @@ -35,9 +35,13 @@ pub mod connection; pub mod database; pub mod fragment; pub mod function; +pub mod hummock_epoch_to_version; pub mod hummock_pinned_snapshot; pub mod hummock_pinned_version; pub mod hummock_sequence; +pub mod hummock_sstable_info; +pub mod hummock_time_travel_delta; +pub mod hummock_time_travel_version; pub mod hummock_version_delta; pub mod hummock_version_stats; pub mod index; @@ -393,6 +397,12 @@ derive_from_blob!( risingwave_pb::common::ParallelUnitMapping ); +derive_array_from_blob!( + HummockVersionDeltaArray, + risingwave_pb::hummock::PbHummockVersionDelta, + PbHummockVersionDeltaArray +); + #[derive(Clone, Debug, PartialEq, FromJsonQueryResult, Serialize, Deserialize)] pub enum StreamingParallelism { Adaptive, diff --git a/src/meta/node/src/lib.rs b/src/meta/node/src/lib.rs index 0e49f0805bf1b..ab8b12041974c 100644 --- a/src/meta/node/src/lib.rs +++ b/src/meta/node/src/lib.rs @@ -309,6 +309,11 @@ pub fn start(opts: MetaNodeOpts) -> Pin + Send>> { .meta .hummock_version_checkpoint_interval_sec, enable_hummock_data_archive: config.meta.enable_hummock_data_archive, + enable_hummock_time_travel: config.meta.enable_hummock_time_travel, + hummock_time_travel_retention_ms: config.meta.hummock_time_travel_retention_ms, + hummock_time_travel_snapshot_interval: config + .meta + .hummock_time_travel_snapshot_interval, min_delta_log_num_for_hummock_version_checkpoint: config .meta .min_delta_log_num_for_hummock_version_checkpoint, diff --git a/src/meta/service/src/hummock_service.rs b/src/meta/service/src/hummock_service.rs index 1e46438cb8ddd..ab69d6da4ea2e 100644 --- a/src/meta/service/src/hummock_service.rs +++ b/src/meta/service/src/hummock_service.rs @@ -682,6 +682,17 @@ impl HummockManagerService for HummockServiceImpl { .await; Ok(Response::new(ListChangeLogEpochsResponse { epochs })) } + + async fn get_version_by_epoch( + &self, + request: Request, + ) -> Result, Status> { + let GetVersionByEpochRequest { epoch } = request.into_inner(); + let version = self.hummock_manager.epoch_to_version(epoch).await?; + Ok(Response::new(GetVersionByEpochResponse { + version: Some(version.to_protobuf()), + })) + } } #[cfg(test)] diff --git a/src/meta/src/hummock/error.rs b/src/meta/src/hummock/error.rs index 434de47623310..af82cc4690b04 100644 --- a/src/meta/src/hummock/error.rs +++ b/src/meta/src/hummock/error.rs @@ -27,7 +27,7 @@ pub type Result = std::result::Result; pub enum Error { #[error("invalid hummock context {0}")] InvalidContext(HummockContextId), - #[error("failed to access meta store: {0}")] + #[error("failed to access meta store")] MetaStore( #[source] #[backtrace] @@ -45,6 +45,12 @@ pub enum Error { CompactionGroup(String), #[error("SST {0} is invalid")] InvalidSst(HummockSstableObjectId), + #[error("time travel")] + TimeTravel( + #[source] + #[backtrace] + anyhow::Error, + ), #[error(transparent)] Internal( #[from] diff --git a/src/meta/src/hummock/manager/checkpoint.rs b/src/meta/src/hummock/manager/checkpoint.rs index 70bbef6bd3db2..676b2ada15577 100644 --- a/src/meta/src/hummock/manager/checkpoint.rs +++ b/src/meta/src/hummock/manager/checkpoint.rs @@ -205,6 +205,12 @@ impl HummockManager { .collect(), }); } + // Whenever data archive or time travel is enabled, we can directly discard reference to stale objects that will no longer be used. + if self.env.opts.enable_hummock_data_archive || self.env.opts.enable_hummock_time_travel { + let context_info = self.context_info.read().await; + let min_pinned_version_id = context_info.min_pinned_version_id(); + stale_objects.retain(|version_id, _| *version_id >= min_pinned_version_id); + } let new_checkpoint = HummockVersionCheckpoint { version: current_version.clone(), stale_objects, @@ -227,8 +233,8 @@ impl HummockManager { let context_info = self.context_info.read().await; assert!(new_checkpoint.version.id > versioning.checkpoint.version.id); versioning.checkpoint = new_checkpoint; - // Not delete stale objects when archive is enabled - if !self.env.opts.enable_hummock_data_archive { + // Not delete stale objects when archive or time travel is enabled + if !self.env.opts.enable_hummock_data_archive && !self.env.opts.enable_hummock_time_travel { versioning.mark_objects_for_deletion(&context_info, &self.delete_object_tracker); } diff --git a/src/meta/src/hummock/manager/commit_epoch.rs b/src/meta/src/hummock/manager/commit_epoch.rs index 44117139dc192..99108d918f8a8 100644 --- a/src/meta/src/hummock/manager/commit_epoch.rs +++ b/src/meta/src/hummock/manager/commit_epoch.rs @@ -28,8 +28,10 @@ use risingwave_pb::hummock::compact_task::{self}; use risingwave_pb::hummock::group_delta::DeltaType; use risingwave_pb::hummock::hummock_version_delta::ChangeLogDelta; use risingwave_pb::hummock::{GroupDelta, HummockSnapshot, IntraLevelDelta, StateTableInfoDelta}; +use sea_orm::TransactionTrait; use crate::hummock::error::{Error, Result}; +use crate::hummock::manager::time_travel::require_sql_meta_store_err; use crate::hummock::manager::transaction::{ HummockVersionStatsTransaction, HummockVersionTransaction, }; @@ -39,7 +41,10 @@ use crate::hummock::metrics_utils::{ get_or_create_local_table_stat, trigger_local_table_stat, trigger_sst_stat, }; use crate::hummock::sequence::next_sstable_object_id; -use crate::hummock::{commit_multi_var, start_measure_real_process_timer, HummockManager}; +use crate::hummock::{ + commit_multi_var, commit_multi_var_with_provided_txn, start_measure_real_process_timer, + HummockManager, +}; #[derive(Debug, Clone)] pub struct NewTableFragmentInfo { @@ -338,7 +343,7 @@ impl HummockManager { ); } }); - + let time_travel_delta = new_version_delta.clone(); new_version_delta.pre_apply(); // Apply stats changes. @@ -373,7 +378,42 @@ impl HummockManager { ); table_metrics.inc_write_throughput(stats_value as u64); } - commit_multi_var!(self.meta_store_ref(), version, version_stats)?; + if self.env.opts.enable_hummock_time_travel { + let mut time_travel_version = None; + if versioning.time_travel_snapshot_interval_counter + >= self.env.opts.hummock_time_travel_snapshot_interval + { + versioning.time_travel_snapshot_interval_counter = 0; + time_travel_version = Some(version.latest_version()); + } else { + versioning.time_travel_snapshot_interval_counter = versioning + .time_travel_snapshot_interval_counter + .saturating_add(1); + } + let group_parents = version + .latest_version() + .levels + .values() + .map(|g| (g.group_id, g.parent_group_id)) + .collect(); + let sql_store = self.sql_store().ok_or_else(require_sql_meta_store_err)?; + let mut txn = sql_store.conn.begin().await?; + let version_snapshot_sst_ids = self + .write_time_travel_metadata( + &txn, + time_travel_version, + time_travel_delta, + &group_parents, + &versioning.last_time_travel_snapshot_sst_ids, + ) + .await?; + commit_multi_var_with_provided_txn!(txn, version, version_stats)?; + if let Some(version_snapshot_sst_ids) = version_snapshot_sst_ids { + versioning.last_time_travel_snapshot_sst_ids = version_snapshot_sst_ids; + } + } else { + commit_multi_var!(self.meta_store_ref(), version, version_stats)?; + } let snapshot = HummockSnapshot { committed_epoch: epoch, diff --git a/src/meta/src/hummock/manager/compaction_group_manager.rs b/src/meta/src/hummock/manager/compaction_group_manager.rs index 79596c8a3e774..c0edd701cf6a6 100644 --- a/src/meta/src/hummock/manager/compaction_group_manager.rs +++ b/src/meta/src/hummock/manager/compaction_group_manager.rs @@ -361,7 +361,7 @@ impl HummockManager { version.latest_version(), ))); commit_multi_var!(self.meta_store_ref(), version, compaction_groups_txn)?; - + // No need to handle DeltaType::GroupDestroy during time travel. Ok(()) } @@ -577,7 +577,8 @@ impl HummockManager { new_version_delta.pre_apply(); commit_multi_var!(self.meta_store_ref(), version, compaction_groups_txn)?; } - + // Instead of handling DeltaType::GroupConstruct for time travel, simply enforce a version snapshot. + versioning.mark_next_time_travel_version_snapshot(); let mut canceled_tasks = vec![]; for task_assignment in compaction_guard.compact_task_assignment.values() { if let Some(task) = task_assignment.compact_task.as_ref() { diff --git a/src/meta/src/hummock/manager/gc.rs b/src/meta/src/hummock/manager/gc.rs index 5f5150b7777cb..27c3bfc0ade71 100644 --- a/src/meta/src/hummock/manager/gc.rs +++ b/src/meta/src/hummock/manager/gc.rs @@ -216,23 +216,33 @@ impl HummockManager { let watermark = collect_global_gc_watermark(self.metadata_manager().clone(), spin_interval).await?; metrics.full_gc_last_object_id_watermark.set(watermark as _); - let candidate_sst_number = object_ids.len(); + let candidate_object_number = object_ids.len(); metrics .full_gc_candidate_object_count - .observe(candidate_sst_number as _); + .observe(candidate_object_number as _); + let pinned_object_ids = self + .all_object_ids_in_time_travel() + .await? + .collect::>(); // 1. filter by watermark let object_ids = object_ids .into_iter() .filter(|s| *s < watermark) .collect_vec(); - // 2. filter by version - let selected_sst_number = self.extend_objects_to_delete_from_scan(&object_ids).await; + let after_watermark = object_ids.len(); + // 2. filter by time travel archive + let object_ids = object_ids + .into_iter() + .filter(|s| !pinned_object_ids.contains(s)) + .collect_vec(); + let after_time_travel = object_ids.len(); + // 3. filter by version + let selected_object_number = self.extend_objects_to_delete_from_scan(&object_ids).await; metrics .full_gc_selected_object_count - .observe(selected_sst_number as _); - tracing::info!("GC watermark is {}. SST full scan returns {} SSTs. {} remains after filtered by GC watermark. {} remains after filtered by hummock version.", - watermark, candidate_sst_number, object_ids.len(), selected_sst_number); - Ok(selected_sst_number) + .observe(selected_object_number as _); + tracing::info!("GC watermark is {watermark}. Object full scan returns {candidate_object_number} objects. {after_watermark} remains after filtered by GC watermark. {after_time_travel} remains after filtered by time travel archives. {selected_object_number} remains after filtered by hummock version."); + Ok(selected_object_number) } } diff --git a/src/meta/src/hummock/manager/mod.rs b/src/meta/src/hummock/manager/mod.rs index 8a49d91a55fc3..896a564f91df3 100644 --- a/src/meta/src/hummock/manager/mod.rs +++ b/src/meta/src/hummock/manager/mod.rs @@ -61,6 +61,7 @@ pub(crate) mod checkpoint; mod commit_epoch; mod compaction; pub mod sequence; +mod time_travel; mod timer_task; mod transaction; mod utils; @@ -289,6 +290,7 @@ impl HummockManager { compaction_state: CompactionState::new(), }; let instance = Arc::new(instance); + instance.init_time_travel_state().await?; instance.start_worker(rx).await; instance.load_meta_store_state().await?; instance.release_invalid_contexts().await?; @@ -464,8 +466,8 @@ impl HummockManager { }; self.delete_object_tracker.clear(); - // Not delete stale objects when archive is enabled - if !self.env.opts.enable_hummock_data_archive { + // Not delete stale objects when archive or time travel is enabled + if !self.env.opts.enable_hummock_data_archive && !self.env.opts.enable_hummock_time_travel { versioning_guard.mark_objects_for_deletion(context_info, &self.delete_object_tracker); } diff --git a/src/meta/src/hummock/manager/time_travel.rs b/src/meta/src/hummock/manager/time_travel.rs new file mode 100644 index 0000000000000..eec78c70fab94 --- /dev/null +++ b/src/meta/src/hummock/manager/time_travel.rs @@ -0,0 +1,508 @@ +// Copyright 2024 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::{HashMap, HashSet, VecDeque}; + +use anyhow::anyhow; +use itertools::Itertools; +use risingwave_common::util::epoch::Epoch; +use risingwave_hummock_sdk::compaction_group::StaticCompactionGroupId; +use risingwave_hummock_sdk::time_travel::{ + refill_version, IncompleteHummockVersion, IncompleteHummockVersionDelta, +}; +use risingwave_hummock_sdk::version::{HummockVersion, HummockVersionDelta}; +use risingwave_hummock_sdk::{ + CompactionGroupId, HummockEpoch, HummockSstableId, HummockSstableObjectId, +}; +use risingwave_meta_model_v2::{ + hummock_epoch_to_version, hummock_sstable_info, hummock_time_travel_delta, + hummock_time_travel_version, +}; +use risingwave_pb::hummock::{PbHummockVersion, PbHummockVersionDelta, PbSstableInfo}; +use sea_orm::sea_query::OnConflict; +use sea_orm::ActiveValue::Set; +use sea_orm::{ + ColumnTrait, DatabaseTransaction, EntityTrait, QueryFilter, QueryOrder, QuerySelect, + TransactionTrait, +}; + +use crate::controller::SqlMetaStore; +use crate::hummock::error::{Error, Result}; +use crate::hummock::HummockManager; +use crate::manager::MetaStoreImpl; + +/// Time travel. +impl HummockManager { + pub(crate) fn sql_store(&self) -> Option { + match self.env.meta_store() { + MetaStoreImpl::Sql(sql_store) => Some(sql_store), + _ => None, + } + } + + pub(crate) async fn init_time_travel_state(&self) -> Result<()> { + if self.env.opts.enable_hummock_time_travel && self.sql_store().is_none() { + return Err(require_sql_meta_store_err()); + } + let Some(sql_store) = self.sql_store() else { + return Ok(()); + }; + let mut gurad = self.versioning.write().await; + gurad.mark_next_time_travel_version_snapshot(); + + gurad.last_time_travel_snapshot_sst_ids = HashSet::new(); + let Some(version) = hummock_time_travel_version::Entity::find() + .order_by_desc(hummock_time_travel_version::Column::VersionId) + .one(&sql_store.conn) + .await? + .map(|v| HummockVersion::from_persisted_protobuf(&v.version.to_protobuf())) + else { + return Ok(()); + }; + gurad.last_time_travel_snapshot_sst_ids = version.get_sst_ids(); + Ok(()) + } + + pub(crate) async fn truncate_time_travel_metadata( + &self, + epoch_watermark: HummockEpoch, + ) -> Result<()> { + let sql_store = match self.sql_store() { + Some(sql_store) => sql_store, + None => { + return Ok(()); + } + }; + let txn = sql_store.conn.begin().await?; + + let version_watermark = hummock_epoch_to_version::Entity::find() + .filter( + hummock_epoch_to_version::Column::Epoch + .lt(risingwave_meta_model_v2::Epoch::try_from(epoch_watermark).unwrap()), + ) + .order_by_desc(hummock_epoch_to_version::Column::Epoch) + .one(&txn) + .await?; + let Some(version_watermark) = version_watermark else { + txn.commit().await?; + return Ok(()); + }; + let res = hummock_epoch_to_version::Entity::delete_many() + .filter( + hummock_epoch_to_version::Column::Epoch + .lt(risingwave_meta_model_v2::Epoch::try_from(epoch_watermark).unwrap()), + ) + .exec(&txn) + .await?; + tracing::debug!( + epoch_watermark, + "delete {} rows from hummock_epoch_to_version", + res.rows_affected + ); + let earliest_valid_version = hummock_time_travel_version::Entity::find() + .select_only() + .column(hummock_time_travel_version::Column::VersionId) + .filter( + hummock_time_travel_version::Column::VersionId.lte(version_watermark.version_id), + ) + .order_by_desc(hummock_time_travel_version::Column::VersionId) + .one(&txn) + .await? + .map(|m| HummockVersion::from_persisted_protobuf(&m.version.to_protobuf())); + let Some(earliest_valid_version) = earliest_valid_version else { + txn.commit().await?; + return Ok(()); + }; + let (earliest_valid_version_id, earliest_valid_version_sst_ids) = { + ( + earliest_valid_version.id, + earliest_valid_version.get_sst_ids(), + ) + }; + let version_ids_to_delete: Vec = + hummock_time_travel_version::Entity::find() + .select_only() + .column(hummock_time_travel_version::Column::VersionId) + .filter( + hummock_time_travel_version::Column::VersionId.lt(earliest_valid_version_id), + ) + .order_by_desc(hummock_time_travel_version::Column::VersionId) + .into_tuple() + .all(&txn) + .await?; + let delta_ids_to_delete: Vec = + hummock_time_travel_delta::Entity::find() + .select_only() + .column(hummock_time_travel_delta::Column::VersionId) + .filter(hummock_time_travel_delta::Column::VersionId.lt(earliest_valid_version_id)) + .into_tuple() + .all(&txn) + .await?; + for delta_id_to_delete in delta_ids_to_delete { + let delta_to_delete = hummock_time_travel_delta::Entity::find_by_id(delta_id_to_delete) + .one(&txn) + .await? + .ok_or_else(|| { + Error::TimeTravel(anyhow!(format!( + "version delta {} not found", + delta_id_to_delete + ))) + })?; + let new_sst_ids = HummockVersionDelta::from_persisted_protobuf( + &delta_to_delete.version_delta.to_protobuf(), + ) + .newly_added_sst_ids(); + // The SST ids added and then deleted by compaction between the 2 versions. + let sst_ids_to_delete = &new_sst_ids - &earliest_valid_version_sst_ids; + let res = hummock_sstable_info::Entity::delete_many() + .filter(hummock_sstable_info::Column::SstId.is_in(sst_ids_to_delete)) + .exec(&txn) + .await?; + tracing::debug!( + delta_id = delta_to_delete.version_id, + "delete {} rows from hummock_sstable_info", + res.rows_affected + ); + } + let mut next_version_sst_ids = earliest_valid_version_sst_ids; + for prev_version_id in version_ids_to_delete { + let sst_ids = { + let prev_version = hummock_time_travel_version::Entity::find_by_id(prev_version_id) + .one(&txn) + .await? + .ok_or_else(|| { + Error::TimeTravel(anyhow!(format!( + "prev_version {} not found", + prev_version_id + ))) + })?; + HummockVersion::from_persisted_protobuf(&prev_version.version.to_protobuf()) + .get_sst_ids() + }; + // The SST ids deleted by compaction between the 2 versions. + let sst_ids_to_delete = &sst_ids - &next_version_sst_ids; + let res = hummock_sstable_info::Entity::delete_many() + .filter(hummock_sstable_info::Column::SstId.is_in(sst_ids_to_delete)) + .exec(&txn) + .await?; + tracing::debug!( + prev_version_id, + "delete {} rows from hummock_sstable_info", + res.rows_affected + ); + next_version_sst_ids = sst_ids; + } + + let res = hummock_time_travel_version::Entity::delete_many() + .filter(hummock_time_travel_version::Column::VersionId.lt(earliest_valid_version_id)) + .exec(&txn) + .await?; + tracing::debug!( + epoch_watermark_version_id = version_watermark.version_id, + earliest_valid_version_id, + "delete {} rows from hummock_time_travel_version", + res.rows_affected + ); + + let res = hummock_time_travel_delta::Entity::delete_many() + .filter(hummock_time_travel_delta::Column::VersionId.lt(earliest_valid_version_id)) + .exec(&txn) + .await?; + tracing::debug!( + epoch_watermark_version_id = version_watermark.version_id, + earliest_valid_version_id, + "delete {} rows from hummock_time_travel_delta", + res.rows_affected + ); + + txn.commit().await?; + Ok(()) + } + + pub(crate) async fn all_object_ids_in_time_travel( + &self, + ) -> Result> { + let object_ids: Vec = + match self.sql_store() { + Some(sql_store) => { + hummock_sstable_info::Entity::find() + .select_only() + .column(hummock_sstable_info::Column::ObjectId) + .into_tuple() + .all(&sql_store.conn) + .await? + } + None => { + vec![] + } + }; + let object_ids = object_ids + .into_iter() + .unique() + .map(|object_id| HummockSstableObjectId::try_from(object_id).unwrap()); + Ok(object_ids) + } + + /// Attempt to locate the version corresponding to `query_epoch`. + /// + /// The version is retrieved from `hummock_epoch_to_version`, selecting the entry with the largest epoch that's lte `query_epoch`. + /// + /// The resulted version is complete, i.e. with correct `SstableInfo`. + pub async fn epoch_to_version(&self, query_epoch: HummockEpoch) -> Result { + let sql_store = self.sql_store().ok_or_else(require_sql_meta_store_err)?; + let epoch_to_version = hummock_epoch_to_version::Entity::find() + .filter( + hummock_epoch_to_version::Column::Epoch + .lte(risingwave_meta_model_v2::Epoch::try_from(query_epoch).unwrap()), + ) + .order_by_desc(hummock_epoch_to_version::Column::Epoch) + .one(&sql_store.conn) + .await? + .ok_or_else(|| { + Error::TimeTravel(anyhow!(format!( + "version not found for epoch {}", + query_epoch + ))) + })?; + let actual_version_id = epoch_to_version.version_id; + tracing::debug!( + query_epoch, + query_tz = ?(Epoch(query_epoch).as_timestamptz()), + actual_epoch = epoch_to_version.epoch, + actual_tz = ?(Epoch(u64::try_from(epoch_to_version.epoch).unwrap()).as_timestamptz()), + actual_version_id, + "convert query epoch" + ); + + let replay_version = hummock_time_travel_version::Entity::find() + .filter(hummock_time_travel_version::Column::VersionId.lte(actual_version_id)) + .order_by_desc(hummock_time_travel_version::Column::VersionId) + .one(&sql_store.conn) + .await? + .ok_or_else(|| { + Error::TimeTravel(anyhow!(format!( + "no replay version found for epoch {}, version {}", + query_epoch, actual_version_id, + ))) + })?; + let deltas = hummock_time_travel_delta::Entity::find() + .filter(hummock_time_travel_delta::Column::VersionId.gt(replay_version.version_id)) + .filter(hummock_time_travel_delta::Column::VersionId.lte(actual_version_id)) + .order_by_asc(hummock_time_travel_delta::Column::VersionId) + .all(&sql_store.conn) + .await?; + let mut actual_version = replay_archive( + replay_version.version.to_protobuf(), + deltas.into_iter().map(|d| d.version_delta.to_protobuf()), + ); + + let mut sst_ids = actual_version + .get_sst_ids() + .into_iter() + .collect::>(); + let sst_count = sst_ids.len(); + let mut sst_id_to_info = HashMap::with_capacity(sst_count); + let sst_info_fetch_batch_size = std::env::var("RW_TIME_TRAVEL_SST_INFO_FETCH_BATCH_SIZE") + .unwrap_or_else(|_| "100".into()) + .parse() + .unwrap(); + while !sst_ids.is_empty() { + let sst_infos = hummock_sstable_info::Entity::find() + .filter(hummock_sstable_info::Column::SstId.is_in( + sst_ids.drain(..std::cmp::min(sst_info_fetch_batch_size, sst_ids.len())), + )) + .all(&sql_store.conn) + .await?; + for sst_info in sst_infos { + let sst_info = sst_info.sstable_info.to_protobuf(); + sst_id_to_info.insert(sst_info.sst_id, sst_info); + } + } + if sst_count != sst_id_to_info.len() { + return Err(Error::TimeTravel(anyhow!(format!( + "some SstableInfos not found for epoch {}, version {}", + query_epoch, actual_version_id, + )))); + } + refill_version(&mut actual_version, &sst_id_to_info); + Ok(actual_version) + } + + pub(crate) async fn write_time_travel_metadata( + &self, + txn: &DatabaseTransaction, + version: Option<&HummockVersion>, + delta: HummockVersionDelta, + group_parents: &HashMap, + skip_sst_ids: &HashSet, + ) -> Result>> { + async fn write_sstable_infos( + sst_infos: impl Iterator, + txn: &DatabaseTransaction, + ) -> Result { + let mut count = 0; + for sst_info in sst_infos { + let m = hummock_sstable_info::ActiveModel { + sst_id: Set(sst_info.sst_id.try_into().unwrap()), + object_id: Set(sst_info.object_id.try_into().unwrap()), + sstable_info: Set(sst_info.into()), + }; + hummock_sstable_info::Entity::insert(m) + .on_conflict( + OnConflict::column(hummock_sstable_info::Column::SstId) + .do_nothing() + .to_owned(), + ) + .do_nothing() + .exec(txn) + .await?; + count += 1; + } + Ok(count) + } + + let epoch = delta.max_committed_epoch; + let version_id = delta.id; + let m = hummock_epoch_to_version::ActiveModel { + epoch: Set(epoch.try_into().unwrap()), + version_id: Set(version_id.try_into().unwrap()), + }; + hummock_epoch_to_version::Entity::insert(m) + .exec(txn) + .await?; + + let mut version_sst_ids = None; + let select_groups = group_parents + .iter() + .filter_map(|(cg_id, _)| { + if should_ignore_group(find_root_group(*cg_id, group_parents)) { + None + } else { + Some(*cg_id) + } + }) + .collect::>(); + if let Some(version) = version { + version_sst_ids = Some( + version + .get_sst_infos_from_groups(&select_groups) + .map(|s| s.sst_id) + .collect(), + ); + write_sstable_infos( + version + .get_sst_infos_from_groups(&select_groups) + .filter(|s| !skip_sst_ids.contains(&s.sst_id)), + txn, + ) + .await?; + let m = hummock_time_travel_version::ActiveModel { + version_id: Set( + risingwave_meta_model_v2::HummockVersionId::try_from(version.id).unwrap(), + ), + version: Set((&IncompleteHummockVersion::from((version, &select_groups)) + .to_protobuf()) + .into()), + }; + hummock_time_travel_version::Entity::insert(m) + .on_conflict( + OnConflict::column(hummock_time_travel_version::Column::VersionId) + .do_nothing() + .to_owned(), + ) + .do_nothing() + .exec(txn) + .await?; + } + let written = write_sstable_infos( + delta + .newly_added_sst_infos(&select_groups) + .filter(|s| !skip_sst_ids.contains(&s.sst_id)), + txn, + ) + .await?; + // Ignore delta which adds no data. + if written > 0 { + let m = hummock_time_travel_delta::ActiveModel { + version_id: Set( + risingwave_meta_model_v2::HummockVersionId::try_from(delta.id).unwrap(), + ), + version_delta: Set((&IncompleteHummockVersionDelta::from(( + &delta, + &select_groups, + )) + .to_protobuf()) + .into()), + }; + hummock_time_travel_delta::Entity::insert(m) + .on_conflict( + OnConflict::column(hummock_time_travel_delta::Column::VersionId) + .do_nothing() + .to_owned(), + ) + .do_nothing() + .exec(txn) + .await?; + } + + Ok(version_sst_ids) + } +} + +fn replay_archive( + version: PbHummockVersion, + deltas: impl Iterator, +) -> HummockVersion { + let mut last_version = HummockVersion::from_persisted_protobuf(&version); + let mut mce = last_version.max_committed_epoch; + for d in deltas { + let d = HummockVersionDelta::from_persisted_protobuf(&d); + assert!( + d.max_committed_epoch > mce, + "time travel expects delta from commit_epoch only" + ); + mce = d.max_committed_epoch; + // Need to work around the assertion in `apply_version_delta`. + // Because compaction deltas are not included in time travel archive. + while last_version.id < d.prev_id { + last_version.id += 1; + } + last_version.apply_version_delta(&d); + } + last_version +} + +fn find_root_group( + group_id: CompactionGroupId, + parents: &HashMap, +) -> CompactionGroupId { + let mut root = group_id; + while let Some(parent) = parents.get(&root) + && *parent != 0 + { + root = *parent; + } + root +} + +fn should_ignore_group(root_group_id: CompactionGroupId) -> bool { + // It is possible some intermediate groups has been dropped, + // so it's impossible to tell whether the root group is MaterializedView or not. + // Just treat them as MaterializedView for correctness. + root_group_id == StaticCompactionGroupId::StateDefault as CompactionGroupId +} + +pub(crate) fn require_sql_meta_store_err() -> Error { + Error::TimeTravel(anyhow!("time travel requires SQL meta store")) +} diff --git a/src/meta/src/hummock/manager/utils.rs b/src/meta/src/hummock/manager/utils.rs index 3d8cb04546284..f4d204c393137 100644 --- a/src/meta/src/hummock/manager/utils.rs +++ b/src/meta/src/hummock/manager/utils.rs @@ -24,11 +24,11 @@ macro_rules! commit_multi_var { $crate::manager::MetaStoreImpl::Kv(meta_store) => { use crate::storage::Transaction; use crate::storage::meta_store::MetaStore; - let mut trx = Transaction::default(); + let mut txn = Transaction::default(); $( - $val_txn.apply_to_txn(&mut trx).await?; + $val_txn.apply_to_txn(&mut txn).await?; )* - meta_store.txn(trx).await?; + meta_store.txn(txn).await?; $( $val_txn.commit(); )* @@ -37,11 +37,11 @@ macro_rules! commit_multi_var { crate::manager::MetaStoreImpl::Sql(sql_meta_store) => { use sea_orm::TransactionTrait; use crate::model::MetadataModelError; - let mut trx = sql_meta_store.conn.begin().await.map_err(MetadataModelError::from)?; + let mut txn = sql_meta_store.conn.begin().await.map_err(MetadataModelError::from)?; $( - $val_txn.apply_to_txn(&mut trx).await?; + $val_txn.apply_to_txn(&mut txn).await?; )* - trx.commit().await.map_err(MetadataModelError::from)?; + txn.commit().await.map_err(MetadataModelError::from)?; $( $val_txn.commit(); )* @@ -52,8 +52,27 @@ macro_rules! commit_multi_var { } }; } -pub(crate) use commit_multi_var; +macro_rules! commit_multi_var_with_provided_txn { + ($txn:expr, $($val_txn:expr),*) => { + { + async { + use crate::model::{InMemValTransaction, ValTransaction}; + use crate::model::MetadataModelError; + $( + $val_txn.apply_to_txn(&mut $txn).await?; + )* + $txn.commit().await.map_err(MetadataModelError::from)?; + $( + $val_txn.commit(); + )* + Result::Ok(()) + }.await + } + }; +} + use risingwave_hummock_sdk::SstObjectIdRange; +pub(crate) use {commit_multi_var, commit_multi_var_with_provided_txn}; use crate::hummock::error::Result; use crate::hummock::sequence::next_sstable_object_id; diff --git a/src/meta/src/hummock/manager/versioning.rs b/src/meta/src/hummock/manager/versioning.rs index 2e6b2512a8be0..0603325c2e62a 100644 --- a/src/meta/src/hummock/manager/versioning.rs +++ b/src/meta/src/hummock/manager/versioning.rs @@ -24,7 +24,8 @@ use risingwave_hummock_sdk::compaction_group::StateTableId; use risingwave_hummock_sdk::table_stats::add_prost_table_stats_map; use risingwave_hummock_sdk::version::{HummockVersion, HummockVersionDelta}; use risingwave_hummock_sdk::{ - CompactionGroupId, HummockContextId, HummockEpoch, HummockSstableObjectId, HummockVersionId, + CompactionGroupId, HummockContextId, HummockEpoch, HummockSstableId, HummockSstableObjectId, + HummockVersionId, }; use risingwave_pb::common::WorkerNode; use risingwave_pb::hummock::write_limits::WriteLimit; @@ -56,6 +57,9 @@ pub struct Versioning { /// Latest hummock version pub current_version: HummockVersion, pub local_metrics: HashMap, + pub time_travel_snapshot_interval_counter: u64, + /// Used to avoid the attempts to rewrite the same SST to meta store + pub last_time_travel_snapshot_sst_ids: HashSet, // Persistent states below pub hummock_version_deltas: BTreeMap, @@ -95,6 +99,10 @@ impl Versioning { .flat_map(|(_, stale_objects)| stale_objects.id.iter().cloned()), ); } + + pub(super) fn mark_next_time_travel_version_snapshot(&mut self) { + self.time_travel_snapshot_interval_counter = u64::MAX; + } } impl HummockManager { diff --git a/src/meta/src/hummock/mock_hummock_meta_client.rs b/src/meta/src/hummock/mock_hummock_meta_client.rs index 90f70a4f74f3a..b30d39dceea76 100644 --- a/src/meta/src/hummock/mock_hummock_meta_client.rs +++ b/src/meta/src/hummock/mock_hummock_meta_client.rs @@ -33,7 +33,7 @@ use risingwave_pb::hummock::compact_task::TaskStatus; use risingwave_pb::hummock::subscribe_compaction_event_request::{Event, ReportTask}; use risingwave_pb::hummock::subscribe_compaction_event_response::Event as ResponseEvent; use risingwave_pb::hummock::{ - compact_task, CompactTask, HummockSnapshot, SubscribeCompactionEventRequest, + compact_task, CompactTask, HummockSnapshot, PbHummockVersion, SubscribeCompactionEventRequest, SubscribeCompactionEventResponse, VacuumTask, }; use risingwave_rpc_client::error::{Result, RpcError}; @@ -342,6 +342,10 @@ impl HummockMetaClient for MockHummockMetaClient { }), )) } + + async fn get_version_by_epoch(&self, _epoch: HummockEpoch) -> Result { + unimplemented!() + } } impl MockHummockMetaClient { diff --git a/src/meta/src/hummock/model/ext/hummock.rs b/src/meta/src/hummock/model/ext/hummock.rs index 5becdcc9b9850..6f40523f1e72b 100644 --- a/src/meta/src/hummock/model/ext/hummock.rs +++ b/src/meta/src/hummock/model/ext/hummock.rs @@ -50,7 +50,7 @@ impl From for MetadataModelError { impl Transactional for CompactionGroup { async fn upsert_in_transaction(&self, trx: &mut Transaction) -> MetadataModelResult<()> { let m = compaction_config::ActiveModel { - compaction_group_id: Set(self.group_id as _), + compaction_group_id: Set(self.group_id.try_into().unwrap()), config: Set(CompactionConfig::from(&(*self.compaction_config))), }; compaction_config::Entity::insert(m) @@ -65,9 +65,11 @@ impl Transactional for CompactionGroup { } async fn delete_in_transaction(&self, trx: &mut Transaction) -> MetadataModelResult<()> { - compaction_config::Entity::delete_by_id(self.group_id as CompactionGroupId) - .exec(trx) - .await?; + compaction_config::Entity::delete_by_id( + CompactionGroupId::try_from(self.group_id).unwrap(), + ) + .exec(trx) + .await?; Ok(()) } } @@ -76,7 +78,7 @@ impl Transactional for CompactionGroup { impl Transactional for CompactStatus { async fn upsert_in_transaction(&self, trx: &mut Transaction) -> MetadataModelResult<()> { let m = compaction_status::ActiveModel { - compaction_group_id: Set(self.compaction_group_id as _), + compaction_group_id: Set(self.compaction_group_id.try_into().unwrap()), status: Set(LevelHandlers::from( self.level_handlers.iter().map_into().collect_vec(), )), @@ -93,9 +95,11 @@ impl Transactional for CompactStatus { } async fn delete_in_transaction(&self, trx: &mut Transaction) -> MetadataModelResult<()> { - compaction_status::Entity::delete_by_id(self.compaction_group_id as CompactionGroupId) - .exec(trx) - .await?; + compaction_status::Entity::delete_by_id( + CompactionGroupId::try_from(self.compaction_group_id).unwrap(), + ) + .exec(trx) + .await?; Ok(()) } } @@ -105,8 +109,8 @@ impl Transactional for CompactTaskAssignment { async fn upsert_in_transaction(&self, trx: &mut Transaction) -> MetadataModelResult<()> { let task = self.compact_task.to_owned().unwrap(); let m = compaction_task::ActiveModel { - id: Set(task.task_id as _), - context_id: Set(self.context_id as _), + id: Set(task.task_id.try_into().unwrap()), + context_id: Set(self.context_id.try_into().unwrap()), task: Set(CompactionTask::from(&task)), }; compaction_task::Entity::insert(m) @@ -125,7 +129,7 @@ impl Transactional for CompactTaskAssignment { async fn delete_in_transaction(&self, trx: &mut Transaction) -> MetadataModelResult<()> { compaction_task::Entity::delete_by_id( - self.compact_task.as_ref().unwrap().task_id as CompactionTaskId, + CompactionTaskId::try_from(self.compact_task.as_ref().unwrap().task_id).unwrap(), ) .exec(trx) .await?; @@ -137,8 +141,8 @@ impl Transactional for CompactTaskAssignment { impl Transactional for HummockPinnedVersion { async fn upsert_in_transaction(&self, trx: &mut Transaction) -> MetadataModelResult<()> { let m = hummock_pinned_version::ActiveModel { - context_id: Set(self.context_id as _), - min_pinned_id: Set(self.min_pinned_id as _), + context_id: Set(self.context_id.try_into().unwrap()), + min_pinned_id: Set(self.min_pinned_id.try_into().unwrap()), }; hummock_pinned_version::Entity::insert(m) .on_conflict( @@ -152,7 +156,7 @@ impl Transactional for HummockPinnedVersion { } async fn delete_in_transaction(&self, trx: &mut Transaction) -> MetadataModelResult<()> { - hummock_pinned_version::Entity::delete_by_id(self.context_id as WorkerId) + hummock_pinned_version::Entity::delete_by_id(WorkerId::try_from(self.context_id).unwrap()) .exec(trx) .await?; Ok(()) @@ -163,8 +167,8 @@ impl Transactional for HummockPinnedVersion { impl Transactional for HummockPinnedSnapshot { async fn upsert_in_transaction(&self, trx: &mut Transaction) -> MetadataModelResult<()> { let m = hummock_pinned_snapshot::ActiveModel { - context_id: Set(self.context_id as _), - min_pinned_snapshot: Set(self.minimal_pinned_snapshot as _), + context_id: Set(self.context_id.try_into().unwrap()), + min_pinned_snapshot: Set(self.minimal_pinned_snapshot.try_into().unwrap()), }; hummock_pinned_snapshot::Entity::insert(m) .on_conflict( @@ -178,7 +182,7 @@ impl Transactional for HummockPinnedSnapshot { } async fn delete_in_transaction(&self, trx: &mut Transaction) -> MetadataModelResult<()> { - hummock_pinned_snapshot::Entity::delete_by_id(self.context_id as i32) + hummock_pinned_snapshot::Entity::delete_by_id(WorkerId::try_from(self.context_id).unwrap()) .exec(trx) .await?; Ok(()) @@ -189,7 +193,7 @@ impl Transactional for HummockPinnedSnapshot { impl Transactional for HummockVersionStats { async fn upsert_in_transaction(&self, trx: &mut Transaction) -> MetadataModelResult<()> { let m = hummock_version_stats::ActiveModel { - id: Set(self.hummock_version_id as _), + id: Set(self.hummock_version_id.try_into().unwrap()), stats: Set(TableStats(self.table_stats.clone())), }; hummock_version_stats::Entity::insert(m) @@ -204,9 +208,11 @@ impl Transactional for HummockVersionStats { } async fn delete_in_transaction(&self, trx: &mut Transaction) -> MetadataModelResult<()> { - hummock_version_stats::Entity::delete_by_id(self.hummock_version_id as i64) - .exec(trx) - .await?; + hummock_version_stats::Entity::delete_by_id( + HummockVersionId::try_from(self.hummock_version_id).unwrap(), + ) + .exec(trx) + .await?; Ok(()) } } @@ -215,10 +221,10 @@ impl Transactional for HummockVersionStats { impl Transactional for HummockVersionDelta { async fn upsert_in_transaction(&self, trx: &mut Transaction) -> MetadataModelResult<()> { let m = hummock_version_delta::ActiveModel { - id: Set(self.id as _), - prev_id: Set(self.prev_id as _), - max_committed_epoch: Set(self.max_committed_epoch as _), - safe_epoch: Set(self.visible_table_safe_epoch() as _), + id: Set(self.id.try_into().unwrap()), + prev_id: Set(self.prev_id.try_into().unwrap()), + max_committed_epoch: Set(self.max_committed_epoch.try_into().unwrap()), + safe_epoch: Set(self.visible_table_safe_epoch().try_into().unwrap()), trivial_move: Set(self.trivial_move), full_version_delta: Set(FullVersionDelta::from(&self.to_protobuf())), }; @@ -240,7 +246,7 @@ impl Transactional for HummockVersionDelta { } async fn delete_in_transaction(&self, trx: &mut Transaction) -> MetadataModelResult<()> { - hummock_version_delta::Entity::delete_by_id(self.id as HummockVersionId) + hummock_version_delta::Entity::delete_by_id(HummockVersionId::try_from(self.id).unwrap()) .exec(trx) .await?; Ok(()) @@ -249,14 +255,17 @@ impl Transactional for HummockVersionDelta { impl From for CompactionGroup { fn from(value: compaction_config::Model) -> Self { - Self::new(value.compaction_group_id as _, value.config.to_protobuf()) + Self::new( + value.compaction_group_id.try_into().unwrap(), + value.config.to_protobuf(), + ) } } impl From for CompactStatus { fn from(value: compaction_status::Model) -> Self { Self { - compaction_group_id: value.compaction_group_id as _, + compaction_group_id: value.compaction_group_id.try_into().unwrap(), level_handlers: value.status.to_protobuf().iter().map_into().collect(), } } diff --git a/src/meta/src/hummock/vacuum.rs b/src/meta/src/hummock/vacuum.rs index 6cde13507836b..5dffb35dfc6c7 100644 --- a/src/meta/src/hummock/vacuum.rs +++ b/src/meta/src/hummock/vacuum.rs @@ -17,6 +17,7 @@ use std::sync::Arc; use std::time::Duration; use itertools::Itertools; +use risingwave_common::util::epoch::Epoch; use risingwave_hummock_sdk::HummockSstableObjectId; use risingwave_pb::hummock::subscribe_compaction_event_response::Event as ResponseEvent; use risingwave_pb::hummock::VacuumTask; @@ -76,6 +77,16 @@ impl VacuumManager { break; } } + if self.env.opts.enable_hummock_time_travel { + let current_epoch_time = Epoch::now().physical_time(); + let epoch_watermark = Epoch::from_physical_time( + current_epoch_time.saturating_sub(self.env.opts.hummock_time_travel_retention_ms), + ) + .0; + self.hummock_manager + .truncate_time_travel_metadata(epoch_watermark) + .await?; + } Ok(total_deleted) } @@ -165,6 +176,9 @@ impl VacuumManager { &self, objects_to_delete: &mut Vec, ) -> MetaResult<()> { + if objects_to_delete.is_empty() { + return Ok(()); + } let reject = self.backup_manager.list_pinned_ssts(); // Ack these SSTs immediately, because they tend to be pinned for long time. // They will be GCed during full GC when they are no longer pinned. diff --git a/src/meta/src/manager/env.rs b/src/meta/src/manager/env.rs index b623e441c0c22..2e722dda622a9 100644 --- a/src/meta/src/manager/env.rs +++ b/src/meta/src/manager/env.rs @@ -169,6 +169,9 @@ pub struct MetaOpts { /// Interval of hummock version checkpoint. pub hummock_version_checkpoint_interval_sec: u64, pub enable_hummock_data_archive: bool, + pub enable_hummock_time_travel: bool, + pub hummock_time_travel_retention_ms: u64, + pub hummock_time_travel_snapshot_interval: u64, /// The minimum delta log number a new checkpoint should compact, otherwise the checkpoint /// attempt is rejected. Greater value reduces object store IO, meanwhile it results in /// more loss of in memory `HummockVersionCheckpoint::stale_objects` state when meta node is @@ -302,6 +305,9 @@ impl MetaOpts { vacuum_spin_interval_ms: 0, hummock_version_checkpoint_interval_sec: 30, enable_hummock_data_archive: false, + enable_hummock_time_travel: false, + hummock_time_travel_retention_ms: 0, + hummock_time_travel_snapshot_interval: 0, min_delta_log_num_for_hummock_version_checkpoint: 1, min_sst_retention_time_sec: 3600 * 24 * 7, full_gc_interval_sec: 3600 * 24 * 7, diff --git a/src/rpc_client/src/hummock_meta_client.rs b/src/rpc_client/src/hummock_meta_client.rs index 6e1dfec3b7be3..e037114d843c9 100644 --- a/src/rpc_client/src/hummock_meta_client.rs +++ b/src/rpc_client/src/hummock_meta_client.rs @@ -19,7 +19,8 @@ use risingwave_hummock_sdk::{ HummockEpoch, HummockSstableObjectId, HummockVersionId, SstObjectIdRange, SyncResult, }; use risingwave_pb::hummock::{ - HummockSnapshot, SubscribeCompactionEventRequest, SubscribeCompactionEventResponse, VacuumTask, + HummockSnapshot, PbHummockVersion, SubscribeCompactionEventRequest, + SubscribeCompactionEventResponse, VacuumTask, }; use tokio::sync::mpsc::UnboundedSender; @@ -61,4 +62,6 @@ pub trait HummockMetaClient: Send + Sync + 'static { UnboundedSender, BoxStream<'static, CompactionEventItem>, )>; + + async fn get_version_by_epoch(&self, epoch: HummockEpoch) -> Result; } diff --git a/src/rpc_client/src/meta_client.rs b/src/rpc_client/src/meta_client.rs index 296f8de4d888f..3f3d179a6b12a 100644 --- a/src/rpc_client/src/meta_client.rs +++ b/src/rpc_client/src/meta_client.rs @@ -1364,6 +1364,12 @@ impl MetaClient { let resp = self.inner.cancel_compact_task(req).await?; Ok(resp.ret) } + + pub async fn get_version_by_epoch(&self, epoch: HummockEpoch) -> Result { + let req = GetVersionByEpochRequest { epoch }; + let resp = self.inner.get_version_by_epoch(req).await?; + Ok(resp.version.unwrap()) + } } #[async_trait] @@ -1525,6 +1531,10 @@ impl HummockMetaClient for MetaClient { Ok((request_sender, Box::pin(stream))) } + + async fn get_version_by_epoch(&self, epoch: HummockEpoch) -> Result { + self.get_version_by_epoch(epoch).await + } } #[async_trait] @@ -2019,6 +2029,7 @@ macro_rules! for_all_meta_rpc { ,{ hummock_client, list_compact_task_progress, ListCompactTaskProgressRequest, ListCompactTaskProgressResponse } ,{ hummock_client, cancel_compact_task, CancelCompactTaskRequest, CancelCompactTaskResponse} ,{ hummock_client, list_change_log_epochs, ListChangeLogEpochsRequest, ListChangeLogEpochsResponse } + ,{ hummock_client, get_version_by_epoch, GetVersionByEpochRequest, GetVersionByEpochResponse } ,{ user_client, create_user, CreateUserRequest, CreateUserResponse } ,{ user_client, update_user, UpdateUserRequest, UpdateUserResponse } ,{ user_client, drop_user, DropUserRequest, DropUserResponse } diff --git a/src/storage/hummock_sdk/src/compaction_group/hummock_version_ext.rs b/src/storage/hummock_sdk/src/compaction_group/hummock_version_ext.rs index 0c5e635091c77..4cfbc9488876b 100644 --- a/src/storage/hummock_sdk/src/compaction_group/hummock_version_ext.rs +++ b/src/storage/hummock_sdk/src/compaction_group/hummock_version_ext.rs @@ -159,23 +159,62 @@ impl HummockVersion { } pub fn get_object_ids(&self) -> HashSet { + self.get_sst_infos().map(|s| s.object_id).collect() + } + + pub fn get_sst_ids(&self) -> HashSet { + self.get_sst_infos().map(|s| s.sst_id).collect() + } + + pub fn get_sst_infos(&self) -> impl Iterator { self.get_combined_levels() + .flat_map(|level| level.table_infos.iter()) + .chain(self.table_change_log.values().flat_map(|change_log| { + change_log.0.iter().flat_map(|epoch_change_log| { + epoch_change_log + .old_value + .iter() + .chain(epoch_change_log.new_value.iter()) + }) + })) + } + + /// `get_sst_infos_from_groups` doesn't guarantee that all returned sst info belongs to `select_group`. + /// i.e. `select_group` is just a hint. + /// We separate `get_sst_infos_from_groups` and `get_sst_infos` because `get_sst_infos_from_groups` may be further customized in the future. + pub fn get_sst_infos_from_groups<'a>( + &'a self, + select_group: &'a HashSet, + ) -> impl Iterator + 'a { + self.levels + .iter() + .filter_map(|(cg_id, level)| { + if select_group.contains(cg_id) { + Some(level) + } else { + None + } + }) .flat_map(|level| { level - .table_infos + .l0 + .as_ref() + .unwrap() + .sub_levels .iter() - .map(|table_info| table_info.get_object_id()) + .rev() + .chain(level.levels.iter()) }) + .flat_map(|level| level.table_infos.iter()) .chain(self.table_change_log.values().flat_map(|change_log| { + // TODO: optimization: strip table change log change_log.0.iter().flat_map(|epoch_change_log| { epoch_change_log .old_value .iter() - .map(|sst| sst.object_id) - .chain(epoch_change_log.new_value.iter().map(|sst| sst.object_id)) + .chain(epoch_change_log.new_value.iter()) }) })) - .collect() } pub fn level_iter bool>( diff --git a/src/storage/hummock_sdk/src/lib.rs b/src/storage/hummock_sdk/src/lib.rs index 97e1a334dcf98..7582e6ae4cecd 100644 --- a/src/storage/hummock_sdk/src/lib.rs +++ b/src/storage/hummock_sdk/src/lib.rs @@ -45,6 +45,7 @@ pub mod key_range; pub mod prost_key_range; pub mod table_stats; pub mod table_watermark; +pub mod time_travel; pub mod version; pub use compact::*; @@ -207,6 +208,7 @@ pub enum HummockReadEpoch { NoWait(HummockEpoch), /// We don't need to wait epoch. Backup(HummockEpoch), + TimeTravel(HummockEpoch), } impl From for HummockReadEpoch { @@ -215,6 +217,7 @@ impl From for HummockReadEpoch { batch_query_epoch::Epoch::Committed(epoch) => HummockReadEpoch::Committed(epoch), batch_query_epoch::Epoch::Current(epoch) => HummockReadEpoch::Current(epoch), batch_query_epoch::Epoch::Backup(epoch) => HummockReadEpoch::Backup(epoch), + batch_query_epoch::Epoch::TimeTravel(epoch) => HummockReadEpoch::TimeTravel(epoch), } } } @@ -232,6 +235,7 @@ impl HummockReadEpoch { HummockReadEpoch::Current(epoch) => epoch, HummockReadEpoch::NoWait(epoch) => epoch, HummockReadEpoch::Backup(epoch) => epoch, + HummockReadEpoch::TimeTravel(epoch) => epoch, } } } diff --git a/src/storage/hummock_sdk/src/time_travel.rs b/src/storage/hummock_sdk/src/time_travel.rs new file mode 100644 index 0000000000000..2a848c272e238 --- /dev/null +++ b/src/storage/hummock_sdk/src/time_travel.rs @@ -0,0 +1,348 @@ +// Copyright 2024 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::{HashMap, HashSet}; +use std::sync::Arc; + +use risingwave_common::catalog::TableId; +use risingwave_pb::hummock::group_delta::DeltaType; +use risingwave_pb::hummock::hummock_version::PbLevels; +use risingwave_pb::hummock::hummock_version_delta::{ChangeLogDelta, PbGroupDeltas}; +use risingwave_pb::hummock::{ + EpochNewChangeLog, PbGroupDelta, PbHummockVersion, PbHummockVersionDelta, PbIntraLevelDelta, + PbLevel, PbOverlappingLevel, PbSstableInfo, SstableInfo, StateTableInfoDelta, +}; + +use crate::change_log::TableChangeLog; +use crate::table_watermark::TableWatermarks; +use crate::version::{HummockVersion, HummockVersionDelta, HummockVersionStateTableInfo}; +use crate::{CompactionGroupId, HummockSstableId}; + +/// [`IncompleteHummockVersion`] is incomplete because `SSTableInfo` only has the `sst_id` set in the following fields: +/// - `PbLevels` +/// - `TableChangeLog` +#[derive(Debug, Clone, PartialEq)] +pub struct IncompleteHummockVersion { + pub id: u64, + pub levels: HashMap, + pub max_committed_epoch: u64, + safe_epoch: u64, + pub table_watermarks: HashMap>, + pub table_change_log: HashMap, + pub state_table_info: HummockVersionStateTableInfo, +} + +/// Clone from an `SstableInfo`, but only set the `sst_id` for the target, leaving other fields as default. +/// The goal is to reduce the size of pb object generated afterward. +fn stripped_sstable_info(origin: &SstableInfo) -> SstableInfo { + SstableInfo { + object_id: Default::default(), + sst_id: origin.sst_id, + key_range: Default::default(), + file_size: Default::default(), + table_ids: Default::default(), + meta_offset: Default::default(), + stale_key_count: Default::default(), + total_key_count: Default::default(), + min_epoch: Default::default(), + max_epoch: Default::default(), + uncompressed_file_size: Default::default(), + range_tombstone_count: Default::default(), + bloom_filter_kind: Default::default(), + } +} + +fn stripped_epoch_new_change_log(origin: &EpochNewChangeLog) -> EpochNewChangeLog { + EpochNewChangeLog { + old_value: origin.old_value.iter().map(stripped_sstable_info).collect(), + new_value: origin.new_value.iter().map(stripped_sstable_info).collect(), + epochs: origin.epochs.clone(), + } +} + +fn stripped_change_log_delta(origin: &ChangeLogDelta) -> ChangeLogDelta { + ChangeLogDelta { + new_log: origin.new_log.as_ref().map(stripped_epoch_new_change_log), + truncate_epoch: origin.truncate_epoch, + } +} + +fn stripped_level(origin: &PbLevel) -> PbLevel { + PbLevel { + level_idx: origin.level_idx, + level_type: origin.level_type, + table_infos: origin + .table_infos + .iter() + .map(stripped_sstable_info) + .collect(), + total_file_size: origin.total_file_size, + sub_level_id: origin.sub_level_id, + uncompressed_file_size: origin.uncompressed_file_size, + vnode_partition_count: origin.vnode_partition_count, + } +} + +pub fn refill_version( + version: &mut HummockVersion, + sst_id_to_info: &HashMap, +) { + for level in version.levels.values_mut().flat_map(|level| { + level + .l0 + .as_mut() + .unwrap() + .sub_levels + .iter_mut() + .rev() + .chain(level.levels.iter_mut()) + }) { + refill_level(level, sst_id_to_info); + } + + for t in version.table_change_log.values_mut() { + refill_table_change_log(t, sst_id_to_info); + } +} + +fn refill_level(level: &mut PbLevel, sst_id_to_info: &HashMap) { + for s in &mut level.table_infos { + refill_sstable_info(s, sst_id_to_info); + } +} + +fn refill_table_change_log( + table_change_log: &mut TableChangeLog, + sst_id_to_info: &HashMap, +) { + for c in &mut table_change_log.0 { + for s in &mut c.old_value { + refill_sstable_info(s, sst_id_to_info); + } + for s in &mut c.new_value { + refill_sstable_info(s, sst_id_to_info); + } + } +} + +/// Caller should ensure `sst_id_to_info` includes an entry corresponding to `sstable_info`. +fn refill_sstable_info( + sstable_info: &mut PbSstableInfo, + sst_id_to_info: &HashMap, +) { + *sstable_info = sst_id_to_info + .get(&sstable_info.sst_id) + .unwrap_or_else(|| panic!("SstableInfo should exist")) + .clone(); +} + +fn stripped_l0(origin: &PbOverlappingLevel) -> PbOverlappingLevel { + PbOverlappingLevel { + sub_levels: origin.sub_levels.iter().map(stripped_level).collect(), + total_file_size: origin.total_file_size, + uncompressed_file_size: origin.uncompressed_file_size, + } +} + +#[allow(deprecated)] +fn stripped_levels(origin: &PbLevels) -> PbLevels { + PbLevels { + levels: origin.levels.iter().map(stripped_level).collect(), + l0: origin.l0.as_ref().map(stripped_l0), + group_id: origin.group_id, + parent_group_id: origin.parent_group_id, + member_table_ids: Default::default(), + } +} + +fn stripped_intra_level_delta(origin: &PbIntraLevelDelta) -> PbIntraLevelDelta { + PbIntraLevelDelta { + level_idx: origin.level_idx, + l0_sub_level_id: origin.l0_sub_level_id, + removed_table_ids: origin.removed_table_ids.clone(), + inserted_table_infos: origin + .inserted_table_infos + .iter() + .map(stripped_sstable_info) + .collect(), + vnode_partition_count: origin.vnode_partition_count, + } +} + +fn stripped_group_delta(origin: &PbGroupDelta) -> PbGroupDelta { + let delta_type = origin.delta_type.as_ref().map(|d| match d { + DeltaType::IntraLevel(l) => DeltaType::IntraLevel(stripped_intra_level_delta(l)), + _ => panic!("time travel expects DeltaType::IntraLevel only"), + }); + PbGroupDelta { delta_type } +} + +fn stripped_group_deltas(origin: &PbGroupDeltas) -> PbGroupDeltas { + let group_deltas = origin + .group_deltas + .iter() + .map(stripped_group_delta) + .collect(); + PbGroupDeltas { group_deltas } +} + +/// `SStableInfo` will be stripped. +impl From<(&HummockVersion, &HashSet)> for IncompleteHummockVersion { + fn from(p: (&HummockVersion, &HashSet)) -> Self { + let (version, select_group) = p; + Self { + id: version.id, + levels: version + .levels + .iter() + .filter_map(|(group_id, levels)| { + if select_group.contains(group_id) { + Some((*group_id as CompactionGroupId, stripped_levels(levels))) + } else { + None + } + }) + .collect(), + max_committed_epoch: version.max_committed_epoch, + safe_epoch: version.visible_table_safe_epoch(), + table_watermarks: version.table_watermarks.clone(), + // TODO: optimization: strip table change log + table_change_log: version + .table_change_log + .iter() + .map(|(table_id, change_log)| { + let incomplete_table_change_log = change_log + .0 + .iter() + .map(stripped_epoch_new_change_log) + .collect(); + (*table_id, TableChangeLog(incomplete_table_change_log)) + }) + .collect(), + state_table_info: version.state_table_info.clone(), + } + } +} + +impl IncompleteHummockVersion { + /// Resulted `SStableInfo` is incompelte. + pub fn to_protobuf(&self) -> PbHummockVersion { + PbHummockVersion { + id: self.id, + levels: self + .levels + .iter() + .map(|(group_id, levels)| (*group_id as _, levels.clone())) + .collect(), + max_committed_epoch: self.max_committed_epoch, + safe_epoch: self.safe_epoch, + table_watermarks: self + .table_watermarks + .iter() + .map(|(table_id, watermark)| (table_id.table_id, watermark.to_protobuf())) + .collect(), + table_change_logs: self + .table_change_log + .iter() + .map(|(table_id, change_log)| (table_id.table_id, change_log.to_protobuf())) + .collect(), + state_table_info: self.state_table_info.to_protobuf(), + } + } +} + +/// [`IncompleteHummockVersionDelta`] is incomplete because `SSTableInfo` only has the `sst_id` set in the following fields: +/// - `PbGroupDeltas` +/// - `ChangeLogDelta` +#[derive(Debug, PartialEq, Clone)] +pub struct IncompleteHummockVersionDelta { + pub id: u64, + pub prev_id: u64, + pub group_deltas: HashMap, + pub max_committed_epoch: u64, + pub safe_epoch: u64, + pub trivial_move: bool, + pub new_table_watermarks: HashMap, + pub removed_table_ids: HashSet, + pub change_log_delta: HashMap, + pub state_table_info_delta: HashMap, +} + +/// `SStableInfo` will be stripped. +impl From<(&HummockVersionDelta, &HashSet)> for IncompleteHummockVersionDelta { + fn from(p: (&HummockVersionDelta, &HashSet)) -> Self { + let (delta, select_group) = p; + Self { + id: delta.id, + prev_id: delta.prev_id, + group_deltas: delta + .group_deltas + .iter() + .filter_map(|(cg_id, deltas)| { + if select_group.contains(cg_id) { + Some((*cg_id, stripped_group_deltas(deltas))) + } else { + None + } + }) + .collect(), + max_committed_epoch: delta.max_committed_epoch, + safe_epoch: delta.visible_table_safe_epoch(), + trivial_move: delta.trivial_move, + new_table_watermarks: delta.new_table_watermarks.clone(), + removed_table_ids: delta.removed_table_ids.clone(), + // TODO: optimization: strip table change log + change_log_delta: delta + .change_log_delta + .iter() + .map(|(table_id, log_delta)| (*table_id, stripped_change_log_delta(log_delta))) + .collect(), + state_table_info_delta: delta.state_table_info_delta.clone(), + } + } +} + +impl IncompleteHummockVersionDelta { + /// Resulted `SStableInfo` is incompelte. + pub fn to_protobuf(&self) -> PbHummockVersionDelta { + PbHummockVersionDelta { + id: self.id, + prev_id: self.prev_id, + group_deltas: self.group_deltas.clone(), + max_committed_epoch: self.max_committed_epoch, + safe_epoch: self.safe_epoch, + trivial_move: self.trivial_move, + new_table_watermarks: self + .new_table_watermarks + .iter() + .map(|(table_id, watermarks)| (table_id.table_id, watermarks.to_protobuf())) + .collect(), + removed_table_ids: self + .removed_table_ids + .iter() + .map(|table_id| table_id.table_id) + .collect(), + change_log_delta: self + .change_log_delta + .iter() + .map(|(table_id, log_delta)| (table_id.table_id, log_delta.clone())) + .collect(), + state_table_info_delta: self + .state_table_info_delta + .iter() + .map(|(table_id, delta)| (table_id.table_id, delta.clone())) + .collect(), + } + } +} diff --git a/src/storage/hummock_sdk/src/version.rs b/src/storage/hummock_sdk/src/version.rs index 611e177db74d6..920c43df1df1b 100644 --- a/src/storage/hummock_sdk/src/version.rs +++ b/src/storage/hummock_sdk/src/version.rs @@ -538,6 +538,70 @@ impl HummockVersionDelta { .collect() } + pub fn newly_added_sst_ids(&self) -> HashSet { + self.group_deltas + .values() + .flat_map(|group_deltas| { + group_deltas.group_deltas.iter().flat_map(|group_delta| { + group_delta.delta_type.iter().flat_map(|delta_type| { + static EMPTY_VEC: Vec = Vec::new(); + let sst_slice = match delta_type { + DeltaType::IntraLevel(level_delta) => &level_delta.inserted_table_infos, + DeltaType::GroupConstruct(_) + | DeltaType::GroupDestroy(_) + | DeltaType::GroupMetaChange(_) + | DeltaType::GroupTableChange(_) => &EMPTY_VEC, + }; + sst_slice.iter().map(|sst| sst.sst_id) + }) + }) + }) + .chain(self.change_log_delta.values().flat_map(|delta| { + let new_log = delta.new_log.as_ref().unwrap(); + new_log + .new_value + .iter() + .map(|sst| sst.sst_id) + .chain(new_log.old_value.iter().map(|sst| sst.sst_id)) + })) + .collect() + } + + pub fn newly_added_sst_infos<'a>( + &'a self, + select_group: &'a HashSet, + ) -> impl Iterator + 'a { + self.group_deltas + .iter() + .filter_map(|(cg_id, group_deltas)| { + if select_group.contains(cg_id) { + Some(group_deltas) + } else { + None + } + }) + .flat_map(|group_deltas| { + group_deltas.group_deltas.iter().flat_map(|group_delta| { + group_delta.delta_type.iter().flat_map(|delta_type| { + static EMPTY_VEC: Vec = Vec::new(); + let sst_slice = match delta_type { + DeltaType::IntraLevel(level_delta) => &level_delta.inserted_table_infos, + DeltaType::GroupConstruct(_) + | DeltaType::GroupDestroy(_) + | DeltaType::GroupMetaChange(_) + | DeltaType::GroupTableChange(_) => &EMPTY_VEC, + }; + sst_slice.iter() + }) + }) + }) + .chain(self.change_log_delta.values().flat_map(|delta| { + // TODO: optimization: strip table change log + let new_log = delta.new_log.as_ref().unwrap(); + new_log.new_value.iter().chain(new_log.old_value.iter()) + })) + } + pub fn visible_table_safe_epoch(&self) -> u64 { self.safe_epoch } diff --git a/src/storage/hummock_trace/src/opts.rs b/src/storage/hummock_trace/src/opts.rs index 07c87f4688042..5b56c618852cc 100644 --- a/src/storage/hummock_trace/src/opts.rs +++ b/src/storage/hummock_trace/src/opts.rs @@ -109,6 +109,7 @@ pub struct TracedReadOptions { pub retention_seconds: Option, pub table_id: TracedTableId, pub read_version_from_backup: bool, + pub read_version_from_time_travel: bool, } impl TracedReadOptions { @@ -123,7 +124,8 @@ impl TracedReadOptions { cache_policy: TracedCachePolicy::Disable, retention_seconds: None, table_id: TracedTableId { table_id }, - read_version_from_backup: true, + read_version_from_backup: false, + read_version_from_time_travel: false, } } } @@ -195,6 +197,7 @@ pub enum TracedHummockReadEpoch { Current(TracedHummockEpoch), NoWait(TracedHummockEpoch), Backup(TracedHummockEpoch), + TimeTravel(TracedHummockEpoch), } impl From for TracedHummockReadEpoch { @@ -204,6 +207,7 @@ impl From for TracedHummockReadEpoch { HummockReadEpoch::Current(epoch) => Self::Current(epoch), HummockReadEpoch::NoWait(epoch) => Self::NoWait(epoch), HummockReadEpoch::Backup(epoch) => Self::Backup(epoch), + HummockReadEpoch::TimeTravel(epoch) => Self::TimeTravel(epoch), } } } @@ -215,6 +219,7 @@ impl From for HummockReadEpoch { TracedHummockReadEpoch::Current(epoch) => Self::Current(epoch), TracedHummockReadEpoch::NoWait(epoch) => Self::NoWait(epoch), TracedHummockReadEpoch::Backup(epoch) => Self::Backup(epoch), + TracedHummockReadEpoch::TimeTravel(epoch) => Self::TimeTravel(epoch), } } } diff --git a/src/storage/src/hummock/hummock_meta_client.rs b/src/storage/src/hummock/hummock_meta_client.rs index 5cf380285cf1e..f30240b1264ce 100644 --- a/src/storage/src/hummock/hummock_meta_client.rs +++ b/src/storage/src/hummock/hummock_meta_client.rs @@ -18,7 +18,9 @@ use async_trait::async_trait; use futures::stream::BoxStream; use risingwave_hummock_sdk::version::HummockVersion; use risingwave_hummock_sdk::{HummockSstableObjectId, SstObjectIdRange, SyncResult}; -use risingwave_pb::hummock::{HummockSnapshot, SubscribeCompactionEventRequest, VacuumTask}; +use risingwave_pb::hummock::{ + HummockSnapshot, PbHummockVersion, SubscribeCompactionEventRequest, VacuumTask, +}; use risingwave_rpc_client::error::Result; use risingwave_rpc_client::{CompactionEventItem, HummockMetaClient, MetaClient}; use tokio::sync::mpsc::UnboundedSender; @@ -127,4 +129,8 @@ impl HummockMetaClient for MonitoredHummockMetaClient { )> { self.meta_client.subscribe_compaction_event().await } + + async fn get_version_by_epoch(&self, epoch: HummockEpoch) -> Result { + self.meta_client.get_version_by_epoch(epoch).await + } } diff --git a/src/storage/src/hummock/store/hummock_storage.rs b/src/storage/src/hummock/store/hummock_storage.rs index 7f9f956b62ddf..419f4e7af952d 100644 --- a/src/storage/src/hummock/store/hummock_storage.rs +++ b/src/storage/src/hummock/store/hummock_storage.rs @@ -29,9 +29,11 @@ use risingwave_hummock_sdk::key::{ is_empty_key_range, vnode, vnode_range, TableKey, TableKeyRange, }; use risingwave_hummock_sdk::table_watermark::TableWatermarksIndex; +use risingwave_hummock_sdk::version::HummockVersion; use risingwave_hummock_sdk::HummockReadEpoch; use risingwave_pb::hummock::SstableInfo; use risingwave_rpc_client::HummockMetaClient; +use thiserror_ext::AsReport; use tokio::sync::mpsc::{unbounded_channel, UnboundedSender}; use tokio::sync::oneshot; use tracing::error; @@ -114,6 +116,8 @@ pub struct HummockStorage { write_limiter: WriteLimiterRef, compact_await_tree_reg: Option, + + hummock_meta_client: Arc, } pub type ReadVersionTuple = (Vec, Vec, CommittedVersion); @@ -231,6 +235,7 @@ impl HummockStorage { min_current_epoch, write_limiter, compact_await_tree_reg: await_tree_reg, + hummock_meta_client, }; tokio::spawn(hummock_event_handler.start_hummock_event_handler_worker()); @@ -253,7 +258,10 @@ impl HummockStorage { ) -> StorageResult> { let key_range = (Bound::Included(key.clone()), Bound::Included(key.clone())); - let (key_range, read_version_tuple) = if read_options.read_version_from_backup { + let (key_range, read_version_tuple) = if read_options.read_version_from_time_travel { + self.build_read_version_by_time_travel(epoch, read_options.table_id, key_range) + .await? + } else if read_options.read_version_from_backup { self.build_read_version_tuple_from_backup(epoch, read_options.table_id, key_range) .await? } else { @@ -275,7 +283,10 @@ impl HummockStorage { epoch: u64, read_options: ReadOptions, ) -> StorageResult { - let (key_range, read_version_tuple) = if read_options.read_version_from_backup { + let (key_range, read_version_tuple) = if read_options.read_version_from_time_travel { + self.build_read_version_by_time_travel(epoch, read_options.table_id, key_range) + .await? + } else if read_options.read_version_from_backup { self.build_read_version_tuple_from_backup(epoch, read_options.table_id, key_range) .await? } else { @@ -293,7 +304,10 @@ impl HummockStorage { epoch: u64, read_options: ReadOptions, ) -> StorageResult { - let (key_range, read_version_tuple) = if read_options.read_version_from_backup { + let (key_range, read_version_tuple) = if read_options.read_version_from_time_travel { + self.build_read_version_by_time_travel(epoch, read_options.table_id, key_range) + .await? + } else if read_options.read_version_from_backup { self.build_read_version_tuple_from_backup(epoch, read_options.table_id, key_range) .await? } else { @@ -305,6 +319,29 @@ impl HummockStorage { .await } + async fn build_read_version_by_time_travel( + &self, + epoch: u64, + table_id: TableId, + key_range: TableKeyRange, + ) -> StorageResult<(TableKeyRange, ReadVersionTuple)> { + let pb_version = self + .hummock_meta_client + .get_version_by_epoch(epoch) + .await + .inspect_err(|e| tracing::error!("{}", e.to_report_string())) + .map_err(|e| HummockError::meta_error(e.to_report_string()))?; + let version = HummockVersion::from_rpc_protobuf(&pb_version); + validate_safe_epoch(&version, table_id, epoch)?; + let (tx, _rx) = unbounded_channel(); + Ok(get_committed_read_version_tuple( + PinnedVersion::new(version, tx), + table_id, + key_range, + epoch, + )) + } + async fn build_read_version_tuple_from_backup( &self, epoch: u64, @@ -637,9 +674,6 @@ impl StateStore for HummockStorage { } } -#[cfg(any(test, feature = "test"))] -use risingwave_hummock_sdk::version::HummockVersion; - #[cfg(any(test, feature = "test"))] impl HummockStorage { pub async fn seal_and_sync_epoch( diff --git a/src/storage/src/store.rs b/src/storage/src/store.rs index beb19a0a98c5d..ef7f0f7a13ff2 100644 --- a/src/storage/src/store.rs +++ b/src/storage/src/store.rs @@ -500,6 +500,7 @@ pub struct ReadOptions { /// Read from historical hummock version of meta snapshot backup. /// It should only be used by `StorageTable` for batch query. pub read_version_from_backup: bool, + pub read_version_from_time_travel: bool, } impl From for ReadOptions { @@ -512,6 +513,7 @@ impl From for ReadOptions { retention_seconds: value.retention_seconds, table_id: value.table_id.into(), read_version_from_backup: value.read_version_from_backup, + read_version_from_time_travel: value.read_version_from_time_travel, } } } @@ -526,6 +528,7 @@ impl From for TracedReadOptions { retention_seconds: value.retention_seconds, table_id: value.table_id.into(), read_version_from_backup: value.read_version_from_backup, + read_version_from_time_travel: value.read_version_from_time_travel, } } } diff --git a/src/storage/src/table/batch_table/storage_table.rs b/src/storage/src/table/batch_table/storage_table.rs index 795668dfe8b1b..72ed713ed89a0 100644 --- a/src/storage/src/table/batch_table/storage_table.rs +++ b/src/storage/src/table/batch_table/storage_table.rs @@ -340,6 +340,7 @@ impl StorageTableInner { ) -> StorageResult> { let epoch = wait_epoch.get_epoch(); let read_backup = matches!(wait_epoch, HummockReadEpoch::Backup(_)); + let read_time_travel = matches!(wait_epoch, HummockReadEpoch::TimeTravel(_)); self.store.try_wait_epoch(wait_epoch).await?; let serialized_pk = serialize_pk_with_vnode( &pk, @@ -360,6 +361,7 @@ impl StorageTableInner { retention_seconds: self.table_option.retention_seconds, table_id: self.table_id, read_version_from_backup: read_backup, + read_version_from_time_travel: read_time_travel, cache_policy: CachePolicy::Fill(CacheContext::Default), ..Default::default() }; @@ -465,12 +467,14 @@ impl StorageTableInner { let iterators: Vec<_> = try_join_all(table_key_ranges.map(|table_key_range| { let prefix_hint = prefix_hint.clone(); let read_backup = matches!(wait_epoch, HummockReadEpoch::Backup(_)); + let read_time_travel = matches!(wait_epoch, HummockReadEpoch::TimeTravel(_)); async move { let read_options = ReadOptions { prefix_hint, retention_seconds: self.table_option.retention_seconds, table_id: self.table_id, read_version_from_backup: read_backup, + read_version_from_time_travel: read_time_travel, prefetch_options, cache_policy, ..Default::default()