diff --git a/proto/batch_plan.proto b/proto/batch_plan.proto index 592b60a5ab0a..a7a70246f0f7 100644 --- a/proto/batch_plan.proto +++ b/proto/batch_plan.proto @@ -28,6 +28,7 @@ message RowSeqScanNode { // The pushed down `batch_limit`. Max rows needed to return. optional uint64 limit = 6; + optional plan_common.AsOf as_of = 7; } message SysRowSeqScanNode { @@ -279,6 +280,7 @@ message LocalLookupJoinNode { // Null safe means it treats `null = null` as true. // Each key pair can be null safe independently. (left_key, right_key, null_safe) repeated bool null_safe = 11; + optional plan_common.AsOf as_of = 12; } // RFC: A new schedule way for distributed lookup join @@ -295,6 +297,7 @@ message DistributedLookupJoinNode { // Null safe means it treats `null = null` as true. // Each key pair can be null safe independently. (left_key, right_key, null_safe) repeated bool null_safe = 9; + optional plan_common.AsOf as_of = 10; } message UnionNode {} diff --git a/proto/common.proto b/proto/common.proto index 164150379c48..12231ac80152 100644 --- a/proto/common.proto +++ b/proto/common.proto @@ -103,6 +103,7 @@ message BatchQueryEpoch { uint64 committed = 1; uint64 current = 2; uint64 backup = 3; + uint64 time_travel = 4; } } diff --git a/proto/hummock.proto b/proto/hummock.proto index 149944831a4f..947904df0086 100644 --- a/proto/hummock.proto +++ b/proto/hummock.proto @@ -104,7 +104,7 @@ message GroupDelta { IntraLevelDelta intra_level = 1; GroupConstruct group_construct = 2; GroupDestroy group_destroy = 3; - GroupMetaChange group_meta_change = 4; + GroupMetaChange group_meta_change = 4 [deprecated = true]; GroupTableChange group_table_change = 5 [deprecated = true]; } } @@ -821,6 +821,14 @@ message CancelCompactTaskResponse { bool ret = 1; } +message GetVersionByEpochRequest { + uint64 epoch = 1; +} + +message GetVersionByEpochResponse { + HummockVersion version = 1; +} + service HummockManagerService { rpc UnpinVersionBefore(UnpinVersionBeforeRequest) returns (UnpinVersionBeforeResponse); rpc GetCurrentVersion(GetCurrentVersionRequest) returns (GetCurrentVersionResponse); @@ -861,6 +869,7 @@ service HummockManagerService { rpc ListCompactTaskProgress(ListCompactTaskProgressRequest) returns (ListCompactTaskProgressResponse); rpc CancelCompactTask(CancelCompactTaskRequest) returns (CancelCompactTaskResponse); rpc ListChangeLogEpochs(ListChangeLogEpochsRequest) returns (ListChangeLogEpochsResponse); + rpc GetVersionByEpoch(GetVersionByEpochRequest) returns (GetVersionByEpochResponse); } message CompactionConfig { diff --git a/proto/plan_common.proto b/proto/plan_common.proto index 7b763c24a44a..c019994b771c 100644 --- a/proto/plan_common.proto +++ b/proto/plan_common.proto @@ -98,6 +98,21 @@ message StorageTableDesc { optional uint32 retention_seconds = 11; } +message AsOf { + message ProcessTime {} + message Timestamp { + int64 timestamp = 1; + } + message Version { + int64 version = 1; + } + oneof as_of_type { + ProcessTime process_time = 1; + Timestamp timestamp = 2; + Version version = 3; + } +} + // Represents a table in external database for CDC scenario message ExternalTableDesc { uint32 table_id = 1; diff --git a/src/batch/src/error.rs b/src/batch/src/error.rs index 9f652fad233c..780b90cf178b 100644 --- a/src/batch/src/error.rs +++ b/src/batch/src/error.rs @@ -154,6 +154,13 @@ pub enum BatchError { #[backtrace] opendal::Error, ), + + #[error("Failed to execute time travel query")] + TimeTravel( + #[source] + #[backtrace] + anyhow::Error, + ), } // Serialize/deserialize error. diff --git a/src/batch/src/executor/join/distributed_lookup_join.rs b/src/batch/src/executor/join/distributed_lookup_join.rs index 1ff32d9631a8..f5ad5ab5ed98 100644 --- a/src/batch/src/executor/join/distributed_lookup_join.rs +++ b/src/batch/src/executor/join/distributed_lookup_join.rs @@ -36,8 +36,8 @@ use risingwave_storage::{dispatch_state_store, StateStore}; use crate::error::Result; use crate::executor::join::JoinType; use crate::executor::{ - BoxedDataChunkStream, BoxedExecutor, BoxedExecutorBuilder, BufferChunkExecutor, Executor, - ExecutorBuilder, LookupExecutorBuilder, LookupJoinBase, + unix_timestamp_sec_to_epoch, AsOf, BoxedDataChunkStream, BoxedExecutor, BoxedExecutorBuilder, + BufferChunkExecutor, Executor, ExecutorBuilder, LookupExecutorBuilder, LookupJoinBase, }; use crate::task::{BatchTaskContext, ShutdownToken}; @@ -93,6 +93,24 @@ impl BoxedExecutorBuilder for DistributedLookupJoinExecutorBuilder { NodeBody::DistributedLookupJoin )?; + // as_of takes precedence + let as_of = distributed_lookup_join_node + .as_of + .as_ref() + .map(AsOf::try_from) + .transpose()?; + let query_epoch = as_of + .map(|a| { + let epoch = unix_timestamp_sec_to_epoch(a.timestamp).0; + tracing::debug!(epoch, "time travel"); + risingwave_pb::common::BatchQueryEpoch { + epoch: Some(risingwave_pb::common::batch_query_epoch::Epoch::TimeTravel( + epoch, + )), + } + }) + .unwrap_or_else(|| source.epoch()); + let join_type = JoinType::from_prost(distributed_lookup_join_node.get_join_type()?); let condition = match distributed_lookup_join_node.get_condition() { Ok(cond_prost) => Some(build_from_prost(cond_prost)?), @@ -179,12 +197,11 @@ impl BoxedExecutorBuilder for DistributedLookupJoinExecutorBuilder { let vnodes = Some(TableDistribution::all_vnodes()); dispatch_state_store!(source.context().state_store(), state_store, { let table = StorageTable::new_partial(state_store, column_ids, vnodes, table_desc); - let inner_side_builder = InnerSideExecutorBuilder::new( outer_side_key_types, inner_side_key_types.clone(), lookup_prefix_len, - source.epoch(), + query_epoch, vec![], table, chunk_size, diff --git a/src/batch/src/executor/join/local_lookup_join.rs b/src/batch/src/executor/join/local_lookup_join.rs index dd428e316690..9e70a59d1773 100644 --- a/src/batch/src/executor/join/local_lookup_join.rs +++ b/src/batch/src/executor/join/local_lookup_join.rs @@ -42,8 +42,8 @@ use risingwave_pb::plan_common::StorageTableDesc; use crate::error::Result; use crate::executor::{ - BoxedDataChunkStream, BoxedExecutor, BoxedExecutorBuilder, DummyExecutor, Executor, - ExecutorBuilder, JoinType, LookupJoinBase, + unix_timestamp_sec_to_epoch, AsOf, BoxedDataChunkStream, BoxedExecutor, BoxedExecutorBuilder, + DummyExecutor, Executor, ExecutorBuilder, JoinType, LookupJoinBase, }; use crate::task::{BatchTaskContext, ShutdownToken, TaskId}; @@ -66,6 +66,7 @@ struct InnerSideExecutorBuilder { chunk_size: usize, shutdown_rx: ShutdownToken, next_stage_id: usize, + as_of: Option, } /// Used to build the executor for the inner side @@ -108,6 +109,7 @@ impl InnerSideExecutorBuilder { ordered: false, vnode_bitmap: Some(vnode_bitmap.finish().to_protobuf()), limit: None, + as_of: self.as_of.as_ref().map(Into::into), }); Ok(row_seq_scan_node) @@ -296,6 +298,24 @@ impl BoxedExecutorBuilder for LocalLookupJoinExecutorBuilder { source.plan_node().get_node_body().unwrap(), NodeBody::LocalLookupJoin )?; + // as_of takes precedence + let as_of = lookup_join_node + .as_of + .as_ref() + .map(AsOf::try_from) + .transpose()?; + let query_epoch = as_of + .as_ref() + .map(|a| { + let epoch = unix_timestamp_sec_to_epoch(a.timestamp).0; + tracing::debug!(epoch, "time travel"); + risingwave_pb::common::BatchQueryEpoch { + epoch: Some(risingwave_pb::common::batch_query_epoch::Epoch::TimeTravel( + epoch, + )), + } + }) + .unwrap_or_else(|| source.epoch()); let join_type = JoinType::from_prost(lookup_join_node.get_join_type()?); let condition = match lookup_join_node.get_condition() { @@ -402,12 +422,13 @@ impl BoxedExecutorBuilder for LocalLookupJoinExecutorBuilder { lookup_prefix_len, context: source.context().clone(), task_id: source.task_id.clone(), - epoch: source.epoch(), + epoch: query_epoch, worker_slot_to_scan_range_mapping: HashMap::new(), chunk_size, shutdown_rx: source.shutdown_rx.clone(), next_stage_id: 0, worker_slot_mapping, + as_of, }; let identity = source.plan_node().get_identity().clone(); diff --git a/src/batch/src/executor/row_seq_scan.rs b/src/batch/src/executor/row_seq_scan.rs index e9d4ea10b01e..455e81342fec 100644 --- a/src/batch/src/executor/row_seq_scan.rs +++ b/src/batch/src/executor/row_seq_scan.rs @@ -28,7 +28,8 @@ use risingwave_common::util::value_encoding::deserialize_datum; use risingwave_pb::batch_plan::plan_node::NodeBody; use risingwave_pb::batch_plan::{scan_range, PbScanRange}; use risingwave_pb::common::BatchQueryEpoch; -use risingwave_pb::plan_common::StorageTableDesc; +use risingwave_pb::plan_common::as_of::AsOfType; +use risingwave_pb::plan_common::{as_of, PbAsOf, StorageTableDesc}; use risingwave_storage::store::PrefetchOptions; use risingwave_storage::table::batch_table::storage_table::StorageTable; use risingwave_storage::table::TableDistribution; @@ -55,6 +56,7 @@ pub struct RowSeqScanExecutor { ordered: bool, epoch: BatchQueryEpoch, limit: Option, + as_of: Option, } /// Range for batch scan. @@ -66,6 +68,36 @@ pub struct ScanRange { pub next_col_bounds: (Bound, Bound), } +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +pub struct AsOf { + pub timestamp: i64, +} + +impl TryFrom<&PbAsOf> for AsOf { + type Error = BatchError; + + fn try_from(pb: &PbAsOf) -> std::result::Result { + match pb.as_of_type.as_ref().unwrap() { + AsOfType::Timestamp(ts) => Ok(Self { + timestamp: ts.timestamp, + }), + AsOfType::ProcessTime(_) | AsOfType::Version(_) => Err(BatchError::TimeTravel( + anyhow::anyhow!("batch query does not support as of process time or version"), + )), + } + } +} + +impl From<&AsOf> for PbAsOf { + fn from(v: &AsOf) -> Self { + PbAsOf { + as_of_type: Some(AsOfType::Timestamp(as_of::Timestamp { + timestamp: v.timestamp, + })), + } + } +} + impl ScanRange { /// Create a scan range from the prost representation. pub fn new( @@ -134,6 +166,7 @@ impl RowSeqScanExecutor { identity: String, limit: Option, metrics: Option, + as_of: Option, ) -> Self { Self { chunk_size, @@ -144,6 +177,7 @@ impl RowSeqScanExecutor { ordered, epoch, limit, + as_of, } } } @@ -205,6 +239,11 @@ impl BoxedExecutorBuilder for RowSeqScanExecutorBuilder { let epoch = source.epoch.clone(); let limit = seq_scan_node.limit; + let as_of = seq_scan_node + .as_of + .as_ref() + .map(AsOf::try_from) + .transpose()?; let chunk_size = if let Some(limit) = seq_scan_node.limit { (limit as u32).min(source.context.get_config().developer.chunk_size as u32) } else { @@ -223,6 +262,7 @@ impl BoxedExecutorBuilder for RowSeqScanExecutorBuilder { source.plan_node().get_identity().clone(), limit, metrics, + as_of, ))) }) } @@ -254,8 +294,21 @@ impl RowSeqScanExecutor { ordered, epoch, limit, + as_of, } = *self; let table = Arc::new(table); + // as_of takes precedence + let query_epoch = as_of + .map(|a| { + let epoch = unix_timestamp_sec_to_epoch(a.timestamp).0; + tracing::debug!(epoch, identity, "time travel"); + risingwave_pb::common::BatchQueryEpoch { + epoch: Some(risingwave_pb::common::batch_query_epoch::Epoch::TimeTravel( + epoch, + )), + } + }) + .unwrap_or_else(|| epoch); // Create collector. let histogram = metrics.as_ref().map(|metrics| { @@ -288,7 +341,8 @@ impl RowSeqScanExecutor { for point_get in point_gets { let table = table.clone(); if let Some(row) = - Self::execute_point_get(table, point_get, epoch.clone(), histogram.clone()).await? + Self::execute_point_get(table, point_get, query_epoch.clone(), histogram.clone()) + .await? { if let Some(chunk) = data_chunk_builder.append_one_row(row) { returned += chunk.cardinality() as u64; @@ -319,7 +373,7 @@ impl RowSeqScanExecutor { table.clone(), range, ordered, - epoch.clone(), + query_epoch.clone(), chunk_size, limit, histogram.clone(), @@ -442,3 +496,10 @@ impl RowSeqScanExecutor { } } } + +pub fn unix_timestamp_sec_to_epoch(ts: i64) -> risingwave_common::util::epoch::Epoch { + let ts = ts.checked_add(1).unwrap(); + risingwave_common::util::epoch::Epoch::from_unix_millis( + u64::try_from(ts).unwrap().checked_mul(1000).unwrap(), + ) +} diff --git a/src/common/src/config.rs b/src/common/src/config.rs index 4bdbf72cf273..5c9de58d898a 100644 --- a/src/common/src/config.rs +++ b/src/common/src/config.rs @@ -223,6 +223,20 @@ pub struct MetaConfig { #[serde(default = "default::meta::enable_hummock_data_archive")] pub enable_hummock_data_archive: bool, + /// If enabled, time travel query is available. + #[serde(default = "default::meta::enable_hummock_time_travel")] + pub enable_hummock_time_travel: bool, + + /// The data retention period for time travel. + #[serde(default = "default::meta::hummock_time_travel_retention_ms")] + pub hummock_time_travel_retention_ms: u64, + + /// The interval at which a Hummock version snapshot is taken for time travel. + /// + /// Larger value indicates less storage overhead but worse query performance. + #[serde(default = "default::meta::hummock_time_travel_snapshot_interval")] + pub hummock_time_travel_snapshot_interval: u64, + /// The minimum delta log number a new checkpoint should compact, otherwise the checkpoint /// attempt is rejected. #[serde(default = "default::meta::min_delta_log_num_for_hummock_version_checkpoint")] @@ -1281,7 +1295,7 @@ pub mod default { } pub fn vacuum_spin_interval_ms() -> u64 { - 10 + 200 } pub fn hummock_version_checkpoint_interval_sec() -> u64 { @@ -1292,6 +1306,18 @@ pub mod default { false } + pub fn enable_hummock_time_travel() -> bool { + false + } + + pub fn hummock_time_travel_retention_ms() -> u64 { + 24 * 3600 * 1000 + } + + pub fn hummock_time_travel_snapshot_interval() -> u64 { + 100 + } + pub fn min_delta_log_num_for_hummock_version_checkpoint() -> u64 { 10 } diff --git a/src/compute/tests/cdc_tests.rs b/src/compute/tests/cdc_tests.rs index 26f822bae558..ac48c42c22e9 100644 --- a/src/compute/tests/cdc_tests.rs +++ b/src/compute/tests/cdc_tests.rs @@ -381,6 +381,7 @@ async fn test_cdc_backfill() -> StreamResult<()> { "RowSeqExecutor2".to_string(), None, None, + None, )); // check result diff --git a/src/compute/tests/integration_tests.rs b/src/compute/tests/integration_tests.rs index 5c8ec479d3be..11cbe06386a2 100644 --- a/src/compute/tests/integration_tests.rs +++ b/src/compute/tests/integration_tests.rs @@ -267,6 +267,7 @@ async fn test_table_materialize() -> StreamResult<()> { "RowSeqExecutor2".to_string(), None, None, + None, )); let mut stream = scan.execute(); let result = stream.next().await; @@ -337,6 +338,7 @@ async fn test_table_materialize() -> StreamResult<()> { "RowSeqScanExecutor2".to_string(), None, None, + None, )); let mut stream = scan.execute(); @@ -416,6 +418,7 @@ async fn test_table_materialize() -> StreamResult<()> { "RowSeqScanExecutor2".to_string(), None, None, + None, )); let mut stream = scan.execute(); @@ -487,6 +490,7 @@ async fn test_row_seq_scan() -> StreamResult<()> { "RowSeqScanExecutor2".to_string(), None, None, + None, )); assert_eq!(executor.schema().fields().len(), 3); diff --git a/src/config/docs.md b/src/config/docs.md index f93700f5c5e6..5b0789325945 100644 --- a/src/config/docs.md +++ b/src/config/docs.md @@ -36,9 +36,12 @@ This page is automatically generated by `./risedev generate-example-config` | enable_compaction_deterministic | Whether to enable deterministic compaction scheduling, which will disable all auto scheduling of compaction tasks. Should only be used in e2e tests. | false | | enable_dropped_column_reclaim | Whether compactor should rewrite row to remove dropped column. | false | | enable_hummock_data_archive | If enabled, `SSTable` object file and version delta will be retained. `SSTable` object file need to be deleted via full GC. version delta need to be manually deleted. | false | +| enable_hummock_time_travel | If enabled, time travel query is available. | false | | event_log_channel_max_size | Keeps the latest N events per channel. | 10 | | event_log_enabled | | true | | full_gc_interval_sec | Interval of automatic hummock full GC. | 86400 | +| hummock_time_travel_retention_ms | The data retention period for time travel. | 86400000 | +| hummock_time_travel_snapshot_interval | The interval at which a Hummock version snapshot is taken for time travel. Larger value indicates less storage overhead but worse query performance. | 100 | | hummock_version_checkpoint_interval_sec | Interval of hummock version checkpoint. | 30 | | hybrid_partition_vnode_count | Count of partitions of tables in default group and materialized view group. The meta node will decide according to some strategy whether to cut the boundaries of the file according to the vnode alignment. Each partition contains aligned data of `VirtualNode::COUNT / hybrid_partition_vnode_count` consecutive virtual-nodes of one state table. Set it zero to disable this feature. | 4 | | max_heartbeat_interval_secs | Maximum allowed heartbeat interval in seconds. | 60 | @@ -62,7 +65,7 @@ This page is automatically generated by `./risedev generate-example-config` | table_write_throughput_threshold | The threshold of write throughput to trigger a group split. Increase this configuration value to avoid split too many groups with few data write. | 16777216 | | unrecognized | | | | vacuum_interval_sec | Interval of invoking a vacuum job, to remove stale metadata from meta store and objects from object store. | 30 | -| vacuum_spin_interval_ms | The spin interval inside a vacuum job. It avoids the vacuum job monopolizing resources of meta node. | 10 | +| vacuum_spin_interval_ms | The spin interval inside a vacuum job. It avoids the vacuum job monopolizing resources of meta node. | 200 | ## meta.compaction_config diff --git a/src/config/example.toml b/src/config/example.toml index 3a394eada864..e5c4b557d3a2 100644 --- a/src/config/example.toml +++ b/src/config/example.toml @@ -19,9 +19,12 @@ full_gc_interval_sec = 86400 collect_gc_watermark_spin_interval_sec = 5 periodic_compaction_interval_sec = 60 vacuum_interval_sec = 30 -vacuum_spin_interval_ms = 10 +vacuum_spin_interval_ms = 200 hummock_version_checkpoint_interval_sec = 30 enable_hummock_data_archive = false +enable_hummock_time_travel = false +hummock_time_travel_retention_ms = 86400000 +hummock_time_travel_snapshot_interval = 100 min_delta_log_num_for_hummock_version_checkpoint = 10 max_heartbeat_interval_secs = 60 disable_recovery = false diff --git a/src/frontend/src/optimizer/plan_node/batch_lookup_join.rs b/src/frontend/src/optimizer/plan_node/batch_lookup_join.rs index a181c590e0e6..b09f45b65574 100644 --- a/src/frontend/src/optimizer/plan_node/batch_lookup_join.rs +++ b/src/frontend/src/optimizer/plan_node/batch_lookup_join.rs @@ -16,9 +16,10 @@ use pretty_xmlish::{Pretty, XmlNode}; use risingwave_common::catalog::{ColumnId, TableDesc}; use risingwave_pb::batch_plan::plan_node::NodeBody; use risingwave_pb::batch_plan::{DistributedLookupJoinNode, LocalLookupJoinNode}; +use risingwave_sqlparser::ast::AsOf; use super::batch::prelude::*; -use super::utils::{childless_record, Distill}; +use super::utils::{childless_record, to_pb_time_travel_as_of, Distill}; use super::{generic, ExprRewritable}; use crate::error::Result; use crate::expr::{Expr, ExprRewriter, ExprVisitor}; @@ -54,6 +55,8 @@ pub struct BatchLookupJoin { /// If `distributed_lookup` is true, it will generate `DistributedLookupJoinNode` for /// `ToBatchPb`. Otherwise, it will generate `LookupJoinNode`. distributed_lookup: bool, + + as_of: Option, } impl BatchLookupJoin { @@ -64,6 +67,7 @@ impl BatchLookupJoin { right_output_column_ids: Vec, lookup_prefix_len: usize, distributed_lookup: bool, + as_of: Option, ) -> Self { // We cannot create a `BatchLookupJoin` without any eq keys. We require eq keys to do the // lookup. @@ -79,6 +83,7 @@ impl BatchLookupJoin { right_output_column_ids, lookup_prefix_len, distributed_lookup, + as_of, } } @@ -157,6 +162,7 @@ impl PlanTreeNodeUnary for BatchLookupJoin { self.right_output_column_ids.clone(), self.lookup_prefix_len, self.distributed_lookup, + self.as_of.clone(), ) } } @@ -231,6 +237,7 @@ impl TryToBatchPb for BatchLookupJoin { output_indices: self.core.output_indices.iter().map(|&x| x as u32).collect(), null_safe: self.eq_join_predicate.null_safes(), lookup_prefix_len: self.lookup_prefix_len as u32, + as_of: to_pb_time_travel_as_of(&self.as_of)?, }) } else { NodeBody::LocalLookupJoin(LocalLookupJoinNode { @@ -263,6 +270,7 @@ impl TryToBatchPb for BatchLookupJoin { worker_nodes: vec![], // To be filled in at local.rs null_safe: self.eq_join_predicate.null_safes(), lookup_prefix_len: self.lookup_prefix_len as u32, + as_of: to_pb_time_travel_as_of(&self.as_of)?, }) }) } diff --git a/src/frontend/src/optimizer/plan_node/batch_seq_scan.rs b/src/frontend/src/optimizer/plan_node/batch_seq_scan.rs index a504df99f317..576793f4dd45 100644 --- a/src/frontend/src/optimizer/plan_node/batch_seq_scan.rs +++ b/src/frontend/src/optimizer/plan_node/batch_seq_scan.rs @@ -20,9 +20,10 @@ use risingwave_common::types::ScalarImpl; use risingwave_common::util::scan_range::{is_full_range, ScanRange}; use risingwave_pb::batch_plan::plan_node::NodeBody; use risingwave_pb::batch_plan::RowSeqScanNode; +use risingwave_sqlparser::ast::AsOf; use super::batch::prelude::*; -use super::utils::{childless_record, Distill}; +use super::utils::{childless_record, to_pb_time_travel_as_of, Distill}; use super::{generic, ExprRewritable, PlanBase, PlanRef, ToDistributedBatch}; use crate::catalog::ColumnId; use crate::error::Result; @@ -39,6 +40,7 @@ pub struct BatchSeqScan { core: generic::TableScan, scan_ranges: Vec, limit: Option, + as_of: Option, } impl BatchSeqScan { @@ -68,12 +70,14 @@ impl BatchSeqScan { ); }) } + let as_of = core.as_of.clone(); Self { base, core, scan_ranges, limit, + as_of, } } @@ -248,6 +252,7 @@ impl TryToBatchPb for BatchSeqScan { vnode_bitmap: None, ordered: !self.order().is_any(), limit: *self.limit(), + as_of: to_pb_time_travel_as_of(&self.as_of)?, })) } } diff --git a/src/frontend/src/optimizer/plan_node/logical_join.rs b/src/frontend/src/optimizer/plan_node/logical_join.rs index a8a832407ba6..2b64b5fd93ad 100644 --- a/src/frontend/src/optimizer/plan_node/logical_join.rs +++ b/src/frontend/src/optimizer/plan_node/logical_join.rs @@ -418,6 +418,7 @@ impl LogicalJoin { .collect_vec(); let new_scan_output_column_ids = new_scan.output_column_ids(); + let as_of = new_scan.as_of.clone(); // Construct a new logical join, because we have change its RHS. let new_logical_join = generic::Join::new( @@ -435,6 +436,7 @@ impl LogicalJoin { new_scan_output_column_ids, lookup_prefix_len, false, + as_of, )) } diff --git a/src/frontend/src/optimizer/plan_node/utils.rs b/src/frontend/src/optimizer/plan_node/utils.rs index 9ac61ed759c6..b9b175ed6f09 100644 --- a/src/frontend/src/optimizer/plan_node/utils.rs +++ b/src/frontend/src/optimizer/plan_node/utils.rs @@ -326,10 +326,13 @@ macro_rules! plan_node_name { pub(crate) use plan_node_name; use risingwave_common::types::DataType; use risingwave_expr::aggregate::AggKind; +use risingwave_pb::plan_common::as_of::AsOfType; +use risingwave_pb::plan_common::{as_of, PbAsOf}; +use risingwave_sqlparser::ast::AsOf; use super::generic::{self, GenericPlanRef, PhysicalPlanRef}; use super::pretty_config; -use crate::error::Result; +use crate::error::{ErrorCode, Result}; use crate::expr::InputRef; use crate::optimizer::plan_node::generic::Agg; use crate::optimizer::plan_node::{BatchSimpleAgg, PlanAggCall}; @@ -388,3 +391,27 @@ pub(crate) fn plan_has_backfill_leaf_nodes(plan: &PlanRef) -> bool { plan.inputs().iter().all(plan_has_backfill_leaf_nodes) } } + +pub fn to_pb_time_travel_as_of(a: &Option) -> Result> { + let Some(ref a) = a else { + return Ok(None); + }; + let as_of_type = match a { + AsOf::ProcessTime => AsOfType::ProcessTime(as_of::ProcessTime {}), + AsOf::TimestampNum(ts) => AsOfType::Timestamp(as_of::Timestamp { timestamp: *ts }), + AsOf::TimestampString(ts) => AsOfType::Timestamp(as_of::Timestamp { + // should already have been validated by the parser + timestamp: ts.parse().unwrap(), + }), + AsOf::VersionNum(_) | AsOf::VersionString(_) => { + return Err(ErrorCode::NotSupported( + "do not support as of version".to_string(), + "please use as of timestamp".to_string(), + ) + .into()); + } + }; + Ok(Some(PbAsOf { + as_of_type: Some(as_of_type), + })) +} diff --git a/src/frontend/src/optimizer/rule/index_selection_rule.rs b/src/frontend/src/optimizer/rule/index_selection_rule.rs index d47ad04de276..e65b24937975 100644 --- a/src/frontend/src/optimizer/rule/index_selection_rule.rs +++ b/src/frontend/src/optimizer/rule/index_selection_rule.rs @@ -58,6 +58,7 @@ use risingwave_common::types::{ }; use risingwave_common::util::iter_util::ZipEqFast; use risingwave_pb::plan_common::JoinType; +use risingwave_sqlparser::ast::AsOf; use super::{BoxedRule, Rule}; use crate::catalog::IndexCatalog; @@ -95,9 +96,6 @@ impl Rule for IndexSelectionRule { if indexes.is_empty() { return None; } - if logical_scan.as_of().is_some() { - return None; - } let primary_table_row_size = TableScanIoEstimator::estimate_row_size(logical_scan); let primary_cost = min( self.estimate_table_scan_cost(logical_scan, primary_table_row_size), @@ -230,7 +228,7 @@ impl IndexSelectionRule { index.index_table.clone(), vec![], logical_scan.ctx(), - None, + logical_scan.as_of().clone(), index.index_table.cardinality, ); @@ -239,7 +237,7 @@ impl IndexSelectionRule { index.primary_table.clone(), vec![], logical_scan.ctx(), - None, + logical_scan.as_of().clone(), index.primary_table.cardinality, ); @@ -338,7 +336,7 @@ impl IndexSelectionRule { logical_scan.table_catalog(), vec![], logical_scan.ctx(), - None, + logical_scan.as_of().clone(), logical_scan.table_cardinality(), ); @@ -543,9 +541,12 @@ impl IndexSelectionRule { let condition = Condition { conjunctions: conj.iter().map(|&x| x.to_owned()).collect(), }; - if let Some(index_access) = - self.build_index_access(index.clone(), condition, logical_scan.ctx().clone()) - { + if let Some(index_access) = self.build_index_access( + index.clone(), + condition, + logical_scan.ctx().clone(), + logical_scan.as_of().clone(), + ) { result.push(index_access); } } @@ -573,7 +574,7 @@ impl IndexSelectionRule { Condition { conjunctions: conjunctions.to_vec(), }, - None, + logical_scan.as_of().clone(), logical_scan.table_cardinality(), ); @@ -588,6 +589,7 @@ impl IndexSelectionRule { index: Rc, predicate: Condition, ctx: OptimizerContextRef, + as_of: Option, ) -> Option { let mut rewriter = IndexPredicateRewriter::new( index.primary_to_secondary_mapping(), @@ -613,7 +615,7 @@ impl IndexSelectionRule { vec![], ctx, new_predicate, - None, + as_of, index.index_table.cardinality, ) .into(), diff --git a/src/frontend/src/planner/relation.rs b/src/frontend/src/planner/relation.rs index 7941d2837a06..98a124c72d97 100644 --- a/src/frontend/src/planner/relation.rs +++ b/src/frontend/src/planner/relation.rs @@ -74,9 +74,11 @@ impl Planner { pub(super) fn plan_base_table(&mut self, base_table: &BoundBaseTable) -> Result { let as_of = base_table.as_of.clone(); match as_of { - None | Some(AsOf::ProcessTime) => {} - Some(AsOf::TimestampString(_)) | Some(AsOf::TimestampNum(_)) => { - bail_not_implemented!("As Of Timestamp is not supported yet.") + None | Some(AsOf::ProcessTime) | Some(AsOf::TimestampNum(_)) => {} + Some(AsOf::TimestampString(ref s)) => { + if s.parse::().is_err() { + return Err(ErrorCode::InvalidParameterValue(s.to_owned()).into()); + } } Some(AsOf::VersionNum(_)) | Some(AsOf::VersionString(_)) => { bail_not_implemented!("As Of Version is not supported yet.") diff --git a/src/frontend/src/scheduler/snapshot.rs b/src/frontend/src/scheduler/snapshot.rs index 36285b59b5e3..9d13573e03a7 100644 --- a/src/frontend/src/scheduler/snapshot.rs +++ b/src/frontend/src/scheduler/snapshot.rs @@ -65,7 +65,7 @@ impl ReadSnapshot { match self.batch_query_epoch().epoch.unwrap() { batch_query_epoch::Epoch::Committed(epoch) | batch_query_epoch::Epoch::Current(epoch) => Some(epoch.into()), - batch_query_epoch::Epoch::Backup(_) => None, + batch_query_epoch::Epoch::Backup(_) | batch_query_epoch::Epoch::TimeTravel(_) => None, } } @@ -74,7 +74,8 @@ impl ReadSnapshot { match self.batch_query_epoch().epoch.unwrap() { batch_query_epoch::Epoch::Committed(epoch) | batch_query_epoch::Epoch::Current(epoch) - | batch_query_epoch::Epoch::Backup(epoch) => epoch.into(), + | batch_query_epoch::Epoch::Backup(epoch) + | batch_query_epoch::Epoch::TimeTravel(epoch) => epoch.into(), } } diff --git a/src/meta/model_v2/migration/src/lib.rs b/src/meta/model_v2/migration/src/lib.rs index a835bf0b9e6e..590d398998ba 100644 --- a/src/meta/model_v2/migration/src/lib.rs +++ b/src/meta/model_v2/migration/src/lib.rs @@ -13,6 +13,7 @@ mod m20240506_112555_subscription_partial_ckpt; mod m20240525_090457_secret; mod m20240617_070131_index_column_properties; mod m20240618_072634_function_compressed_binary; +mod m20240701_060504_hummock_time_travel; mod m20240702_080451_system_param_value; mod m20240702_084927_unnecessary_fk; @@ -35,6 +36,7 @@ impl MigratorTrait for Migrator { Box::new(m20240617_070131_index_column_properties::Migration), Box::new(m20240702_080451_system_param_value::Migration), Box::new(m20240702_084927_unnecessary_fk::Migration), + Box::new(m20240701_060504_hummock_time_travel::Migration), ] } } diff --git a/src/meta/model_v2/migration/src/m20240701_060504_hummock_time_travel.rs b/src/meta/model_v2/migration/src/m20240701_060504_hummock_time_travel.rs new file mode 100644 index 000000000000..7dec44913dc8 --- /dev/null +++ b/src/meta/model_v2/migration/src/m20240701_060504_hummock_time_travel.rs @@ -0,0 +1,138 @@ +use sea_orm_migration::prelude::*; + +use crate::drop_tables; + +#[derive(DeriveMigrationName)] +pub struct Migration; + +#[async_trait::async_trait] +impl MigrationTrait for Migration { + async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> { + manager + .create_table( + Table::create() + .table(HummockSstableInfo::Table) + .if_not_exists() + .col( + ColumnDef::new(HummockSstableInfo::SstId) + .big_integer() + .not_null() + .primary_key(), + ) + .col( + ColumnDef::new(HummockSstableInfo::ObjectId) + .big_integer() + .not_null(), + ) + .col( + ColumnDef::new(HummockSstableInfo::SstableInfo) + .blob(BlobSize::Long) + .null(), + ) + .to_owned(), + ) + .await?; + + manager + .create_table( + Table::create() + .table(HummockTimeTravelVersion::Table) + .if_not_exists() + .col( + ColumnDef::new(HummockTimeTravelVersion::VersionId) + .big_integer() + .not_null() + .primary_key(), + ) + .col( + ColumnDef::new(HummockTimeTravelVersion::Version) + .blob(BlobSize::Long) + .null(), + ) + .to_owned(), + ) + .await?; + + manager + .create_table( + Table::create() + .table(HummockTimeTravelDelta::Table) + .if_not_exists() + .col( + ColumnDef::new(HummockTimeTravelDelta::VersionId) + .big_integer() + .not_null() + .primary_key(), + ) + .col( + ColumnDef::new(HummockTimeTravelDelta::VersionDelta) + .blob(BlobSize::Long) + .null(), + ) + .to_owned(), + ) + .await?; + + manager + .create_table( + Table::create() + .table(HummockEpochToVersion::Table) + .if_not_exists() + .col( + ColumnDef::new(HummockEpochToVersion::Epoch) + .big_integer() + .not_null() + .primary_key(), + ) + .col( + ColumnDef::new(HummockEpochToVersion::VersionId) + .big_integer() + .not_null(), + ) + .to_owned(), + ) + .await?; + + Ok(()) + } + + async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> { + drop_tables!( + manager, + HummockSstableInfo, + HummockTimeTravelVersion, + HummockTimeTravelDelta, + HummockEpochToVersion + ); + Ok(()) + } +} + +#[derive(DeriveIden)] +enum HummockSstableInfo { + Table, + SstId, + ObjectId, + SstableInfo, +} + +#[derive(DeriveIden)] +enum HummockTimeTravelVersion { + Table, + VersionId, + Version, +} + +#[derive(DeriveIden)] +enum HummockTimeTravelDelta { + Table, + VersionId, + VersionDelta, +} + +#[derive(DeriveIden)] +enum HummockEpochToVersion { + Table, + Epoch, + VersionId, +} diff --git a/src/meta/model_v2/src/hummock_epoch_to_version.rs b/src/meta/model_v2/src/hummock_epoch_to_version.rs new file mode 100644 index 000000000000..181b1b320bc5 --- /dev/null +++ b/src/meta/model_v2/src/hummock_epoch_to_version.rs @@ -0,0 +1,31 @@ +// Copyright 2024 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use sea_orm::entity::prelude::*; +use sea_orm::{DeriveEntityModel, DeriveRelation, EnumIter}; + +use crate::{Epoch, HummockVersionId}; + +#[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq, Default)] +#[sea_orm(table_name = "hummock_epoch_to_version")] +pub struct Model { + #[sea_orm(primary_key, auto_increment = false)] + pub epoch: Epoch, + pub version_id: HummockVersionId, +} + +impl ActiveModelBehavior for ActiveModel {} + +#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)] +pub enum Relation {} diff --git a/src/meta/model_v2/src/hummock_sstable_info.rs b/src/meta/model_v2/src/hummock_sstable_info.rs new file mode 100644 index 000000000000..a9ca4f33361e --- /dev/null +++ b/src/meta/model_v2/src/hummock_sstable_info.rs @@ -0,0 +1,35 @@ +// Copyright 2024 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use risingwave_pb::hummock::PbSstableInfo; +use sea_orm::entity::prelude::*; +use sea_orm::{DeriveEntityModel, DeriveRelation, EnumIter}; + +use crate::HummockSstableObjectId; + +#[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq, Default)] +#[sea_orm(table_name = "hummock_sstable_info")] +pub struct Model { + #[sea_orm(primary_key, auto_increment = false)] + pub sst_id: HummockSstableObjectId, + pub object_id: HummockSstableObjectId, + pub sstable_info: SstableInfo, +} + +impl ActiveModelBehavior for ActiveModel {} + +#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)] +pub enum Relation {} + +crate::derive_from_blob!(SstableInfo, PbSstableInfo); diff --git a/src/meta/model_v2/src/hummock_time_travel_delta.rs b/src/meta/model_v2/src/hummock_time_travel_delta.rs new file mode 100644 index 000000000000..f807c6ec082f --- /dev/null +++ b/src/meta/model_v2/src/hummock_time_travel_delta.rs @@ -0,0 +1,34 @@ +// Copyright 2024 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use risingwave_pb::hummock::PbHummockVersionDelta; +use sea_orm::entity::prelude::*; +use sea_orm::{DeriveEntityModel, DeriveRelation, EnumIter}; + +use crate::HummockVersionId; + +#[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq, Default)] +#[sea_orm(table_name = "hummock_time_travel_delta")] +pub struct Model { + #[sea_orm(primary_key, auto_increment = false)] + pub version_id: HummockVersionId, + pub version_delta: HummockVersionDelta, +} + +impl ActiveModelBehavior for ActiveModel {} + +#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)] +pub enum Relation {} + +crate::derive_from_blob!(HummockVersionDelta, PbHummockVersionDelta); diff --git a/src/meta/model_v2/src/hummock_time_travel_version.rs b/src/meta/model_v2/src/hummock_time_travel_version.rs new file mode 100644 index 000000000000..91eb42fb5209 --- /dev/null +++ b/src/meta/model_v2/src/hummock_time_travel_version.rs @@ -0,0 +1,34 @@ +// Copyright 2024 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use risingwave_pb::hummock::PbHummockVersion; +use sea_orm::entity::prelude::*; +use sea_orm::{DeriveEntityModel, DeriveRelation, EnumIter}; + +use crate::HummockVersionId; + +#[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq, Default)] +#[sea_orm(table_name = "hummock_time_travel_version")] +pub struct Model { + #[sea_orm(primary_key, auto_increment = false)] + pub version_id: HummockVersionId, + pub version: HummockVersion, +} + +impl ActiveModelBehavior for ActiveModel {} + +#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)] +pub enum Relation {} + +crate::derive_from_blob!(HummockVersion, PbHummockVersion); diff --git a/src/meta/model_v2/src/lib.rs b/src/meta/model_v2/src/lib.rs index 751ae99b64a1..20c886a5ce53 100644 --- a/src/meta/model_v2/src/lib.rs +++ b/src/meta/model_v2/src/lib.rs @@ -35,9 +35,13 @@ pub mod connection; pub mod database; pub mod fragment; pub mod function; +pub mod hummock_epoch_to_version; pub mod hummock_pinned_snapshot; pub mod hummock_pinned_version; pub mod hummock_sequence; +pub mod hummock_sstable_info; +pub mod hummock_time_travel_delta; +pub mod hummock_time_travel_version; pub mod hummock_version_delta; pub mod hummock_version_stats; pub mod index; @@ -398,6 +402,12 @@ derive_from_blob!( risingwave_pb::common::ParallelUnitMapping ); +derive_array_from_blob!( + HummockVersionDeltaArray, + risingwave_pb::hummock::PbHummockVersionDelta, + PbHummockVersionDeltaArray +); + #[derive(Clone, Debug, PartialEq, FromJsonQueryResult, Serialize, Deserialize)] pub enum StreamingParallelism { Adaptive, diff --git a/src/meta/node/src/lib.rs b/src/meta/node/src/lib.rs index 5fd658c8a658..cb712914f25e 100644 --- a/src/meta/node/src/lib.rs +++ b/src/meta/node/src/lib.rs @@ -355,6 +355,11 @@ pub fn start( .meta .hummock_version_checkpoint_interval_sec, enable_hummock_data_archive: config.meta.enable_hummock_data_archive, + enable_hummock_time_travel: config.meta.enable_hummock_time_travel, + hummock_time_travel_retention_ms: config.meta.hummock_time_travel_retention_ms, + hummock_time_travel_snapshot_interval: config + .meta + .hummock_time_travel_snapshot_interval, min_delta_log_num_for_hummock_version_checkpoint: config .meta .min_delta_log_num_for_hummock_version_checkpoint, diff --git a/src/meta/service/src/hummock_service.rs b/src/meta/service/src/hummock_service.rs index 1e46438cb8dd..ab69d6da4ea2 100644 --- a/src/meta/service/src/hummock_service.rs +++ b/src/meta/service/src/hummock_service.rs @@ -682,6 +682,17 @@ impl HummockManagerService for HummockServiceImpl { .await; Ok(Response::new(ListChangeLogEpochsResponse { epochs })) } + + async fn get_version_by_epoch( + &self, + request: Request, + ) -> Result, Status> { + let GetVersionByEpochRequest { epoch } = request.into_inner(); + let version = self.hummock_manager.epoch_to_version(epoch).await?; + Ok(Response::new(GetVersionByEpochResponse { + version: Some(version.to_protobuf()), + })) + } } #[cfg(test)] diff --git a/src/meta/src/hummock/error.rs b/src/meta/src/hummock/error.rs index 434de4762331..af82cc4690b0 100644 --- a/src/meta/src/hummock/error.rs +++ b/src/meta/src/hummock/error.rs @@ -27,7 +27,7 @@ pub type Result = std::result::Result; pub enum Error { #[error("invalid hummock context {0}")] InvalidContext(HummockContextId), - #[error("failed to access meta store: {0}")] + #[error("failed to access meta store")] MetaStore( #[source] #[backtrace] @@ -45,6 +45,12 @@ pub enum Error { CompactionGroup(String), #[error("SST {0} is invalid")] InvalidSst(HummockSstableObjectId), + #[error("time travel")] + TimeTravel( + #[source] + #[backtrace] + anyhow::Error, + ), #[error(transparent)] Internal( #[from] diff --git a/src/meta/src/hummock/manager/checkpoint.rs b/src/meta/src/hummock/manager/checkpoint.rs index 70bbef6bd3db..676b2ada1557 100644 --- a/src/meta/src/hummock/manager/checkpoint.rs +++ b/src/meta/src/hummock/manager/checkpoint.rs @@ -205,6 +205,12 @@ impl HummockManager { .collect(), }); } + // Whenever data archive or time travel is enabled, we can directly discard reference to stale objects that will no longer be used. + if self.env.opts.enable_hummock_data_archive || self.env.opts.enable_hummock_time_travel { + let context_info = self.context_info.read().await; + let min_pinned_version_id = context_info.min_pinned_version_id(); + stale_objects.retain(|version_id, _| *version_id >= min_pinned_version_id); + } let new_checkpoint = HummockVersionCheckpoint { version: current_version.clone(), stale_objects, @@ -227,8 +233,8 @@ impl HummockManager { let context_info = self.context_info.read().await; assert!(new_checkpoint.version.id > versioning.checkpoint.version.id); versioning.checkpoint = new_checkpoint; - // Not delete stale objects when archive is enabled - if !self.env.opts.enable_hummock_data_archive { + // Not delete stale objects when archive or time travel is enabled + if !self.env.opts.enable_hummock_data_archive && !self.env.opts.enable_hummock_time_travel { versioning.mark_objects_for_deletion(&context_info, &self.delete_object_tracker); } diff --git a/src/meta/src/hummock/manager/commit_epoch.rs b/src/meta/src/hummock/manager/commit_epoch.rs index 3fe407dba620..cae89ab34d9d 100644 --- a/src/meta/src/hummock/manager/commit_epoch.rs +++ b/src/meta/src/hummock/manager/commit_epoch.rs @@ -28,8 +28,10 @@ use risingwave_hummock_sdk::{ use risingwave_pb::hummock::compact_task::{self}; use risingwave_pb::hummock::hummock_version_delta::ChangeLogDelta; use risingwave_pb::hummock::{HummockSnapshot, SstableInfo}; +use sea_orm::TransactionTrait; use crate::hummock::error::{Error, Result}; +use crate::hummock::manager::time_travel::require_sql_meta_store_err; use crate::hummock::manager::transaction::{ HummockVersionStatsTransaction, HummockVersionTransaction, }; @@ -38,7 +40,10 @@ use crate::hummock::metrics_utils::{ get_or_create_local_table_stat, trigger_local_table_stat, trigger_sst_stat, }; use crate::hummock::sequence::next_sstable_object_id; -use crate::hummock::{commit_multi_var, start_measure_real_process_timer, HummockManager}; +use crate::hummock::{ + commit_multi_var, commit_multi_var_with_provided_txn, start_measure_real_process_timer, + HummockManager, +}; #[derive(Debug, Clone)] pub struct NewTableFragmentInfo { @@ -189,7 +194,7 @@ impl HummockManager { let modified_compaction_groups: Vec<_> = commit_sstables.keys().cloned().collect(); - version.pre_commit_epoch( + let time_travel_delta = version.pre_commit_epoch( epoch, commit_sstables, new_table_ids, @@ -245,8 +250,42 @@ impl HummockManager { ); table_metrics.inc_write_throughput(stats_value as u64); } - - commit_multi_var!(self.meta_store_ref(), version, version_stats)?; + if self.env.opts.enable_hummock_time_travel { + let mut time_travel_version = None; + if versioning.time_travel_snapshot_interval_counter + >= self.env.opts.hummock_time_travel_snapshot_interval + { + versioning.time_travel_snapshot_interval_counter = 0; + time_travel_version = Some(version.latest_version()); + } else { + versioning.time_travel_snapshot_interval_counter = versioning + .time_travel_snapshot_interval_counter + .saturating_add(1); + } + let group_parents = version + .latest_version() + .levels + .values() + .map(|g| (g.group_id, g.parent_group_id)) + .collect(); + let sql_store = self.sql_store().ok_or_else(require_sql_meta_store_err)?; + let mut txn = sql_store.conn.begin().await?; + let version_snapshot_sst_ids = self + .write_time_travel_metadata( + &txn, + time_travel_version, + time_travel_delta, + &group_parents, + &versioning.last_time_travel_snapshot_sst_ids, + ) + .await?; + commit_multi_var_with_provided_txn!(txn, version, version_stats)?; + if let Some(version_snapshot_sst_ids) = version_snapshot_sst_ids { + versioning.last_time_travel_snapshot_sst_ids = version_snapshot_sst_ids; + } + } else { + commit_multi_var!(self.meta_store_ref(), version, version_stats)?; + } let snapshot = HummockSnapshot { committed_epoch: epoch, diff --git a/src/meta/src/hummock/manager/compaction_group_manager.rs b/src/meta/src/hummock/manager/compaction_group_manager.rs index c0c00d18a01e..6fc637148e17 100644 --- a/src/meta/src/hummock/manager/compaction_group_manager.rs +++ b/src/meta/src/hummock/manager/compaction_group_manager.rs @@ -363,7 +363,7 @@ impl HummockManager { version.latest_version(), ))); commit_multi_var!(self.meta_store_ref(), version, compaction_groups_txn)?; - + // No need to handle DeltaType::GroupDestroy during time travel. Ok(()) } @@ -579,7 +579,8 @@ impl HummockManager { new_version_delta.pre_apply(); commit_multi_var!(self.meta_store_ref(), version, compaction_groups_txn)?; } - + // Instead of handling DeltaType::GroupConstruct for time travel, simply enforce a version snapshot. + versioning.mark_next_time_travel_version_snapshot(); let mut canceled_tasks = vec![]; for task_assignment in compaction_guard.compact_task_assignment.values() { if let Some(task) = task_assignment.compact_task.as_ref() { diff --git a/src/meta/src/hummock/manager/gc.rs b/src/meta/src/hummock/manager/gc.rs index 5f5150b7777c..27c3bfc0ade7 100644 --- a/src/meta/src/hummock/manager/gc.rs +++ b/src/meta/src/hummock/manager/gc.rs @@ -216,23 +216,33 @@ impl HummockManager { let watermark = collect_global_gc_watermark(self.metadata_manager().clone(), spin_interval).await?; metrics.full_gc_last_object_id_watermark.set(watermark as _); - let candidate_sst_number = object_ids.len(); + let candidate_object_number = object_ids.len(); metrics .full_gc_candidate_object_count - .observe(candidate_sst_number as _); + .observe(candidate_object_number as _); + let pinned_object_ids = self + .all_object_ids_in_time_travel() + .await? + .collect::>(); // 1. filter by watermark let object_ids = object_ids .into_iter() .filter(|s| *s < watermark) .collect_vec(); - // 2. filter by version - let selected_sst_number = self.extend_objects_to_delete_from_scan(&object_ids).await; + let after_watermark = object_ids.len(); + // 2. filter by time travel archive + let object_ids = object_ids + .into_iter() + .filter(|s| !pinned_object_ids.contains(s)) + .collect_vec(); + let after_time_travel = object_ids.len(); + // 3. filter by version + let selected_object_number = self.extend_objects_to_delete_from_scan(&object_ids).await; metrics .full_gc_selected_object_count - .observe(selected_sst_number as _); - tracing::info!("GC watermark is {}. SST full scan returns {} SSTs. {} remains after filtered by GC watermark. {} remains after filtered by hummock version.", - watermark, candidate_sst_number, object_ids.len(), selected_sst_number); - Ok(selected_sst_number) + .observe(selected_object_number as _); + tracing::info!("GC watermark is {watermark}. Object full scan returns {candidate_object_number} objects. {after_watermark} remains after filtered by GC watermark. {after_time_travel} remains after filtered by time travel archives. {selected_object_number} remains after filtered by hummock version."); + Ok(selected_object_number) } } diff --git a/src/meta/src/hummock/manager/mod.rs b/src/meta/src/hummock/manager/mod.rs index 51d3c6c397ea..d4b577d808d4 100644 --- a/src/meta/src/hummock/manager/mod.rs +++ b/src/meta/src/hummock/manager/mod.rs @@ -61,6 +61,7 @@ pub(crate) mod checkpoint; mod commit_epoch; mod compaction; pub mod sequence; +mod time_travel; mod timer_task; mod transaction; mod utils; @@ -288,6 +289,7 @@ impl HummockManager { compaction_state: CompactionState::new(), }; let instance = Arc::new(instance); + instance.init_time_travel_state().await?; instance.start_worker(rx).await; instance.load_meta_store_state().await?; instance.release_invalid_contexts().await?; @@ -463,8 +465,8 @@ impl HummockManager { }; self.delete_object_tracker.clear(); - // Not delete stale objects when archive is enabled - if !self.env.opts.enable_hummock_data_archive { + // Not delete stale objects when archive or time travel is enabled + if !self.env.opts.enable_hummock_data_archive && !self.env.opts.enable_hummock_time_travel { versioning_guard.mark_objects_for_deletion(context_info, &self.delete_object_tracker); } diff --git a/src/meta/src/hummock/manager/time_travel.rs b/src/meta/src/hummock/manager/time_travel.rs new file mode 100644 index 000000000000..eec78c70fab9 --- /dev/null +++ b/src/meta/src/hummock/manager/time_travel.rs @@ -0,0 +1,508 @@ +// Copyright 2024 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::{HashMap, HashSet, VecDeque}; + +use anyhow::anyhow; +use itertools::Itertools; +use risingwave_common::util::epoch::Epoch; +use risingwave_hummock_sdk::compaction_group::StaticCompactionGroupId; +use risingwave_hummock_sdk::time_travel::{ + refill_version, IncompleteHummockVersion, IncompleteHummockVersionDelta, +}; +use risingwave_hummock_sdk::version::{HummockVersion, HummockVersionDelta}; +use risingwave_hummock_sdk::{ + CompactionGroupId, HummockEpoch, HummockSstableId, HummockSstableObjectId, +}; +use risingwave_meta_model_v2::{ + hummock_epoch_to_version, hummock_sstable_info, hummock_time_travel_delta, + hummock_time_travel_version, +}; +use risingwave_pb::hummock::{PbHummockVersion, PbHummockVersionDelta, PbSstableInfo}; +use sea_orm::sea_query::OnConflict; +use sea_orm::ActiveValue::Set; +use sea_orm::{ + ColumnTrait, DatabaseTransaction, EntityTrait, QueryFilter, QueryOrder, QuerySelect, + TransactionTrait, +}; + +use crate::controller::SqlMetaStore; +use crate::hummock::error::{Error, Result}; +use crate::hummock::HummockManager; +use crate::manager::MetaStoreImpl; + +/// Time travel. +impl HummockManager { + pub(crate) fn sql_store(&self) -> Option { + match self.env.meta_store() { + MetaStoreImpl::Sql(sql_store) => Some(sql_store), + _ => None, + } + } + + pub(crate) async fn init_time_travel_state(&self) -> Result<()> { + if self.env.opts.enable_hummock_time_travel && self.sql_store().is_none() { + return Err(require_sql_meta_store_err()); + } + let Some(sql_store) = self.sql_store() else { + return Ok(()); + }; + let mut gurad = self.versioning.write().await; + gurad.mark_next_time_travel_version_snapshot(); + + gurad.last_time_travel_snapshot_sst_ids = HashSet::new(); + let Some(version) = hummock_time_travel_version::Entity::find() + .order_by_desc(hummock_time_travel_version::Column::VersionId) + .one(&sql_store.conn) + .await? + .map(|v| HummockVersion::from_persisted_protobuf(&v.version.to_protobuf())) + else { + return Ok(()); + }; + gurad.last_time_travel_snapshot_sst_ids = version.get_sst_ids(); + Ok(()) + } + + pub(crate) async fn truncate_time_travel_metadata( + &self, + epoch_watermark: HummockEpoch, + ) -> Result<()> { + let sql_store = match self.sql_store() { + Some(sql_store) => sql_store, + None => { + return Ok(()); + } + }; + let txn = sql_store.conn.begin().await?; + + let version_watermark = hummock_epoch_to_version::Entity::find() + .filter( + hummock_epoch_to_version::Column::Epoch + .lt(risingwave_meta_model_v2::Epoch::try_from(epoch_watermark).unwrap()), + ) + .order_by_desc(hummock_epoch_to_version::Column::Epoch) + .one(&txn) + .await?; + let Some(version_watermark) = version_watermark else { + txn.commit().await?; + return Ok(()); + }; + let res = hummock_epoch_to_version::Entity::delete_many() + .filter( + hummock_epoch_to_version::Column::Epoch + .lt(risingwave_meta_model_v2::Epoch::try_from(epoch_watermark).unwrap()), + ) + .exec(&txn) + .await?; + tracing::debug!( + epoch_watermark, + "delete {} rows from hummock_epoch_to_version", + res.rows_affected + ); + let earliest_valid_version = hummock_time_travel_version::Entity::find() + .select_only() + .column(hummock_time_travel_version::Column::VersionId) + .filter( + hummock_time_travel_version::Column::VersionId.lte(version_watermark.version_id), + ) + .order_by_desc(hummock_time_travel_version::Column::VersionId) + .one(&txn) + .await? + .map(|m| HummockVersion::from_persisted_protobuf(&m.version.to_protobuf())); + let Some(earliest_valid_version) = earliest_valid_version else { + txn.commit().await?; + return Ok(()); + }; + let (earliest_valid_version_id, earliest_valid_version_sst_ids) = { + ( + earliest_valid_version.id, + earliest_valid_version.get_sst_ids(), + ) + }; + let version_ids_to_delete: Vec = + hummock_time_travel_version::Entity::find() + .select_only() + .column(hummock_time_travel_version::Column::VersionId) + .filter( + hummock_time_travel_version::Column::VersionId.lt(earliest_valid_version_id), + ) + .order_by_desc(hummock_time_travel_version::Column::VersionId) + .into_tuple() + .all(&txn) + .await?; + let delta_ids_to_delete: Vec = + hummock_time_travel_delta::Entity::find() + .select_only() + .column(hummock_time_travel_delta::Column::VersionId) + .filter(hummock_time_travel_delta::Column::VersionId.lt(earliest_valid_version_id)) + .into_tuple() + .all(&txn) + .await?; + for delta_id_to_delete in delta_ids_to_delete { + let delta_to_delete = hummock_time_travel_delta::Entity::find_by_id(delta_id_to_delete) + .one(&txn) + .await? + .ok_or_else(|| { + Error::TimeTravel(anyhow!(format!( + "version delta {} not found", + delta_id_to_delete + ))) + })?; + let new_sst_ids = HummockVersionDelta::from_persisted_protobuf( + &delta_to_delete.version_delta.to_protobuf(), + ) + .newly_added_sst_ids(); + // The SST ids added and then deleted by compaction between the 2 versions. + let sst_ids_to_delete = &new_sst_ids - &earliest_valid_version_sst_ids; + let res = hummock_sstable_info::Entity::delete_many() + .filter(hummock_sstable_info::Column::SstId.is_in(sst_ids_to_delete)) + .exec(&txn) + .await?; + tracing::debug!( + delta_id = delta_to_delete.version_id, + "delete {} rows from hummock_sstable_info", + res.rows_affected + ); + } + let mut next_version_sst_ids = earliest_valid_version_sst_ids; + for prev_version_id in version_ids_to_delete { + let sst_ids = { + let prev_version = hummock_time_travel_version::Entity::find_by_id(prev_version_id) + .one(&txn) + .await? + .ok_or_else(|| { + Error::TimeTravel(anyhow!(format!( + "prev_version {} not found", + prev_version_id + ))) + })?; + HummockVersion::from_persisted_protobuf(&prev_version.version.to_protobuf()) + .get_sst_ids() + }; + // The SST ids deleted by compaction between the 2 versions. + let sst_ids_to_delete = &sst_ids - &next_version_sst_ids; + let res = hummock_sstable_info::Entity::delete_many() + .filter(hummock_sstable_info::Column::SstId.is_in(sst_ids_to_delete)) + .exec(&txn) + .await?; + tracing::debug!( + prev_version_id, + "delete {} rows from hummock_sstable_info", + res.rows_affected + ); + next_version_sst_ids = sst_ids; + } + + let res = hummock_time_travel_version::Entity::delete_many() + .filter(hummock_time_travel_version::Column::VersionId.lt(earliest_valid_version_id)) + .exec(&txn) + .await?; + tracing::debug!( + epoch_watermark_version_id = version_watermark.version_id, + earliest_valid_version_id, + "delete {} rows from hummock_time_travel_version", + res.rows_affected + ); + + let res = hummock_time_travel_delta::Entity::delete_many() + .filter(hummock_time_travel_delta::Column::VersionId.lt(earliest_valid_version_id)) + .exec(&txn) + .await?; + tracing::debug!( + epoch_watermark_version_id = version_watermark.version_id, + earliest_valid_version_id, + "delete {} rows from hummock_time_travel_delta", + res.rows_affected + ); + + txn.commit().await?; + Ok(()) + } + + pub(crate) async fn all_object_ids_in_time_travel( + &self, + ) -> Result> { + let object_ids: Vec = + match self.sql_store() { + Some(sql_store) => { + hummock_sstable_info::Entity::find() + .select_only() + .column(hummock_sstable_info::Column::ObjectId) + .into_tuple() + .all(&sql_store.conn) + .await? + } + None => { + vec![] + } + }; + let object_ids = object_ids + .into_iter() + .unique() + .map(|object_id| HummockSstableObjectId::try_from(object_id).unwrap()); + Ok(object_ids) + } + + /// Attempt to locate the version corresponding to `query_epoch`. + /// + /// The version is retrieved from `hummock_epoch_to_version`, selecting the entry with the largest epoch that's lte `query_epoch`. + /// + /// The resulted version is complete, i.e. with correct `SstableInfo`. + pub async fn epoch_to_version(&self, query_epoch: HummockEpoch) -> Result { + let sql_store = self.sql_store().ok_or_else(require_sql_meta_store_err)?; + let epoch_to_version = hummock_epoch_to_version::Entity::find() + .filter( + hummock_epoch_to_version::Column::Epoch + .lte(risingwave_meta_model_v2::Epoch::try_from(query_epoch).unwrap()), + ) + .order_by_desc(hummock_epoch_to_version::Column::Epoch) + .one(&sql_store.conn) + .await? + .ok_or_else(|| { + Error::TimeTravel(anyhow!(format!( + "version not found for epoch {}", + query_epoch + ))) + })?; + let actual_version_id = epoch_to_version.version_id; + tracing::debug!( + query_epoch, + query_tz = ?(Epoch(query_epoch).as_timestamptz()), + actual_epoch = epoch_to_version.epoch, + actual_tz = ?(Epoch(u64::try_from(epoch_to_version.epoch).unwrap()).as_timestamptz()), + actual_version_id, + "convert query epoch" + ); + + let replay_version = hummock_time_travel_version::Entity::find() + .filter(hummock_time_travel_version::Column::VersionId.lte(actual_version_id)) + .order_by_desc(hummock_time_travel_version::Column::VersionId) + .one(&sql_store.conn) + .await? + .ok_or_else(|| { + Error::TimeTravel(anyhow!(format!( + "no replay version found for epoch {}, version {}", + query_epoch, actual_version_id, + ))) + })?; + let deltas = hummock_time_travel_delta::Entity::find() + .filter(hummock_time_travel_delta::Column::VersionId.gt(replay_version.version_id)) + .filter(hummock_time_travel_delta::Column::VersionId.lte(actual_version_id)) + .order_by_asc(hummock_time_travel_delta::Column::VersionId) + .all(&sql_store.conn) + .await?; + let mut actual_version = replay_archive( + replay_version.version.to_protobuf(), + deltas.into_iter().map(|d| d.version_delta.to_protobuf()), + ); + + let mut sst_ids = actual_version + .get_sst_ids() + .into_iter() + .collect::>(); + let sst_count = sst_ids.len(); + let mut sst_id_to_info = HashMap::with_capacity(sst_count); + let sst_info_fetch_batch_size = std::env::var("RW_TIME_TRAVEL_SST_INFO_FETCH_BATCH_SIZE") + .unwrap_or_else(|_| "100".into()) + .parse() + .unwrap(); + while !sst_ids.is_empty() { + let sst_infos = hummock_sstable_info::Entity::find() + .filter(hummock_sstable_info::Column::SstId.is_in( + sst_ids.drain(..std::cmp::min(sst_info_fetch_batch_size, sst_ids.len())), + )) + .all(&sql_store.conn) + .await?; + for sst_info in sst_infos { + let sst_info = sst_info.sstable_info.to_protobuf(); + sst_id_to_info.insert(sst_info.sst_id, sst_info); + } + } + if sst_count != sst_id_to_info.len() { + return Err(Error::TimeTravel(anyhow!(format!( + "some SstableInfos not found for epoch {}, version {}", + query_epoch, actual_version_id, + )))); + } + refill_version(&mut actual_version, &sst_id_to_info); + Ok(actual_version) + } + + pub(crate) async fn write_time_travel_metadata( + &self, + txn: &DatabaseTransaction, + version: Option<&HummockVersion>, + delta: HummockVersionDelta, + group_parents: &HashMap, + skip_sst_ids: &HashSet, + ) -> Result>> { + async fn write_sstable_infos( + sst_infos: impl Iterator, + txn: &DatabaseTransaction, + ) -> Result { + let mut count = 0; + for sst_info in sst_infos { + let m = hummock_sstable_info::ActiveModel { + sst_id: Set(sst_info.sst_id.try_into().unwrap()), + object_id: Set(sst_info.object_id.try_into().unwrap()), + sstable_info: Set(sst_info.into()), + }; + hummock_sstable_info::Entity::insert(m) + .on_conflict( + OnConflict::column(hummock_sstable_info::Column::SstId) + .do_nothing() + .to_owned(), + ) + .do_nothing() + .exec(txn) + .await?; + count += 1; + } + Ok(count) + } + + let epoch = delta.max_committed_epoch; + let version_id = delta.id; + let m = hummock_epoch_to_version::ActiveModel { + epoch: Set(epoch.try_into().unwrap()), + version_id: Set(version_id.try_into().unwrap()), + }; + hummock_epoch_to_version::Entity::insert(m) + .exec(txn) + .await?; + + let mut version_sst_ids = None; + let select_groups = group_parents + .iter() + .filter_map(|(cg_id, _)| { + if should_ignore_group(find_root_group(*cg_id, group_parents)) { + None + } else { + Some(*cg_id) + } + }) + .collect::>(); + if let Some(version) = version { + version_sst_ids = Some( + version + .get_sst_infos_from_groups(&select_groups) + .map(|s| s.sst_id) + .collect(), + ); + write_sstable_infos( + version + .get_sst_infos_from_groups(&select_groups) + .filter(|s| !skip_sst_ids.contains(&s.sst_id)), + txn, + ) + .await?; + let m = hummock_time_travel_version::ActiveModel { + version_id: Set( + risingwave_meta_model_v2::HummockVersionId::try_from(version.id).unwrap(), + ), + version: Set((&IncompleteHummockVersion::from((version, &select_groups)) + .to_protobuf()) + .into()), + }; + hummock_time_travel_version::Entity::insert(m) + .on_conflict( + OnConflict::column(hummock_time_travel_version::Column::VersionId) + .do_nothing() + .to_owned(), + ) + .do_nothing() + .exec(txn) + .await?; + } + let written = write_sstable_infos( + delta + .newly_added_sst_infos(&select_groups) + .filter(|s| !skip_sst_ids.contains(&s.sst_id)), + txn, + ) + .await?; + // Ignore delta which adds no data. + if written > 0 { + let m = hummock_time_travel_delta::ActiveModel { + version_id: Set( + risingwave_meta_model_v2::HummockVersionId::try_from(delta.id).unwrap(), + ), + version_delta: Set((&IncompleteHummockVersionDelta::from(( + &delta, + &select_groups, + )) + .to_protobuf()) + .into()), + }; + hummock_time_travel_delta::Entity::insert(m) + .on_conflict( + OnConflict::column(hummock_time_travel_delta::Column::VersionId) + .do_nothing() + .to_owned(), + ) + .do_nothing() + .exec(txn) + .await?; + } + + Ok(version_sst_ids) + } +} + +fn replay_archive( + version: PbHummockVersion, + deltas: impl Iterator, +) -> HummockVersion { + let mut last_version = HummockVersion::from_persisted_protobuf(&version); + let mut mce = last_version.max_committed_epoch; + for d in deltas { + let d = HummockVersionDelta::from_persisted_protobuf(&d); + assert!( + d.max_committed_epoch > mce, + "time travel expects delta from commit_epoch only" + ); + mce = d.max_committed_epoch; + // Need to work around the assertion in `apply_version_delta`. + // Because compaction deltas are not included in time travel archive. + while last_version.id < d.prev_id { + last_version.id += 1; + } + last_version.apply_version_delta(&d); + } + last_version +} + +fn find_root_group( + group_id: CompactionGroupId, + parents: &HashMap, +) -> CompactionGroupId { + let mut root = group_id; + while let Some(parent) = parents.get(&root) + && *parent != 0 + { + root = *parent; + } + root +} + +fn should_ignore_group(root_group_id: CompactionGroupId) -> bool { + // It is possible some intermediate groups has been dropped, + // so it's impossible to tell whether the root group is MaterializedView or not. + // Just treat them as MaterializedView for correctness. + root_group_id == StaticCompactionGroupId::StateDefault as CompactionGroupId +} + +pub(crate) fn require_sql_meta_store_err() -> Error { + Error::TimeTravel(anyhow!("time travel requires SQL meta store")) +} diff --git a/src/meta/src/hummock/manager/transaction.rs b/src/meta/src/hummock/manager/transaction.rs index 94a3fa469693..878870a6d8d2 100644 --- a/src/meta/src/hummock/manager/transaction.rs +++ b/src/meta/src/hummock/manager/transaction.rs @@ -108,6 +108,7 @@ impl<'a> HummockVersionTransaction<'a> { deltas.push(delta); } + /// Returns a duplicate delta, used by time travel. pub(super) fn pre_commit_epoch( &mut self, epoch: HummockEpoch, @@ -115,7 +116,7 @@ impl<'a> HummockVersionTransaction<'a> { new_table_ids: HashMap, new_table_watermarks: HashMap, change_log_delta: HashMap, - ) { + ) -> HummockVersionDelta { let mut new_version_delta = self.new_delta(); new_version_delta.max_committed_epoch = epoch; new_version_delta.new_table_watermarks = new_table_watermarks; @@ -172,7 +173,9 @@ impl<'a> HummockVersionTransaction<'a> { } }); + let time_travel_delta = (*new_version_delta).clone(); new_version_delta.pre_apply(); + time_travel_delta } } diff --git a/src/meta/src/hummock/manager/utils.rs b/src/meta/src/hummock/manager/utils.rs index 41a9d5885f88..fd1372b3a500 100644 --- a/src/meta/src/hummock/manager/utils.rs +++ b/src/meta/src/hummock/manager/utils.rs @@ -24,11 +24,11 @@ macro_rules! commit_multi_var { $crate::manager::MetaStoreImpl::Kv(meta_store) => { use crate::storage::Transaction; use crate::storage::meta_store::MetaStore; - let mut trx = Transaction::default(); + let mut txn = Transaction::default(); $( - $val_txn.apply_to_txn(&mut trx).await?; + $val_txn.apply_to_txn(&mut txn).await?; )* - meta_store.txn(trx).await?; + meta_store.txn(txn).await?; $( $val_txn.commit(); )* @@ -37,11 +37,11 @@ macro_rules! commit_multi_var { crate::manager::MetaStoreImpl::Sql(sql_meta_store) => { use sea_orm::TransactionTrait; use crate::model::MetadataModelError; - let mut trx = sql_meta_store.conn.begin().await.map_err(MetadataModelError::from)?; + let mut txn = sql_meta_store.conn.begin().await.map_err(MetadataModelError::from)?; $( - $val_txn.apply_to_txn(&mut trx).await?; + $val_txn.apply_to_txn(&mut txn).await?; )* - trx.commit().await.map_err(MetadataModelError::from)?; + txn.commit().await.map_err(MetadataModelError::from)?; $( $val_txn.commit(); )* @@ -53,8 +53,27 @@ macro_rules! commit_multi_var { }; } -pub(crate) use commit_multi_var; +macro_rules! commit_multi_var_with_provided_txn { + ($txn:expr, $($val_txn:expr),*) => { + { + async { + use crate::model::{InMemValTransaction, ValTransaction}; + use crate::model::MetadataModelError; + $( + $val_txn.apply_to_txn(&mut $txn).await?; + )* + $txn.commit().await.map_err(MetadataModelError::from)?; + $( + $val_txn.commit(); + )* + Result::Ok(()) + }.await + } + }; +} + use risingwave_hummock_sdk::SstObjectIdRange; +pub(crate) use {commit_multi_var, commit_multi_var_with_provided_txn}; use crate::hummock::error::Result; use crate::hummock::sequence::next_sstable_object_id; diff --git a/src/meta/src/hummock/manager/versioning.rs b/src/meta/src/hummock/manager/versioning.rs index 2e6b2512a8be..0603325c2e62 100644 --- a/src/meta/src/hummock/manager/versioning.rs +++ b/src/meta/src/hummock/manager/versioning.rs @@ -24,7 +24,8 @@ use risingwave_hummock_sdk::compaction_group::StateTableId; use risingwave_hummock_sdk::table_stats::add_prost_table_stats_map; use risingwave_hummock_sdk::version::{HummockVersion, HummockVersionDelta}; use risingwave_hummock_sdk::{ - CompactionGroupId, HummockContextId, HummockEpoch, HummockSstableObjectId, HummockVersionId, + CompactionGroupId, HummockContextId, HummockEpoch, HummockSstableId, HummockSstableObjectId, + HummockVersionId, }; use risingwave_pb::common::WorkerNode; use risingwave_pb::hummock::write_limits::WriteLimit; @@ -56,6 +57,9 @@ pub struct Versioning { /// Latest hummock version pub current_version: HummockVersion, pub local_metrics: HashMap, + pub time_travel_snapshot_interval_counter: u64, + /// Used to avoid the attempts to rewrite the same SST to meta store + pub last_time_travel_snapshot_sst_ids: HashSet, // Persistent states below pub hummock_version_deltas: BTreeMap, @@ -95,6 +99,10 @@ impl Versioning { .flat_map(|(_, stale_objects)| stale_objects.id.iter().cloned()), ); } + + pub(super) fn mark_next_time_travel_version_snapshot(&mut self) { + self.time_travel_snapshot_interval_counter = u64::MAX; + } } impl HummockManager { diff --git a/src/meta/src/hummock/mock_hummock_meta_client.rs b/src/meta/src/hummock/mock_hummock_meta_client.rs index 1657eb4aeef1..f61049980af4 100644 --- a/src/meta/src/hummock/mock_hummock_meta_client.rs +++ b/src/meta/src/hummock/mock_hummock_meta_client.rs @@ -34,7 +34,7 @@ use risingwave_pb::hummock::compact_task::TaskStatus; use risingwave_pb::hummock::subscribe_compaction_event_request::{Event, ReportTask}; use risingwave_pb::hummock::subscribe_compaction_event_response::Event as ResponseEvent; use risingwave_pb::hummock::{ - compact_task, CompactTask, HummockSnapshot, SubscribeCompactionEventRequest, + compact_task, CompactTask, HummockSnapshot, PbHummockVersion, SubscribeCompactionEventRequest, SubscribeCompactionEventResponse, VacuumTask, }; use risingwave_rpc_client::error::{Result, RpcError}; @@ -341,6 +341,10 @@ impl HummockMetaClient for MockHummockMetaClient { }), )) } + + async fn get_version_by_epoch(&self, _epoch: HummockEpoch) -> Result { + unimplemented!() + } } impl MockHummockMetaClient { diff --git a/src/meta/src/hummock/model/ext/hummock.rs b/src/meta/src/hummock/model/ext/hummock.rs index 5becdcc9b985..6f40523f1e72 100644 --- a/src/meta/src/hummock/model/ext/hummock.rs +++ b/src/meta/src/hummock/model/ext/hummock.rs @@ -50,7 +50,7 @@ impl From for MetadataModelError { impl Transactional for CompactionGroup { async fn upsert_in_transaction(&self, trx: &mut Transaction) -> MetadataModelResult<()> { let m = compaction_config::ActiveModel { - compaction_group_id: Set(self.group_id as _), + compaction_group_id: Set(self.group_id.try_into().unwrap()), config: Set(CompactionConfig::from(&(*self.compaction_config))), }; compaction_config::Entity::insert(m) @@ -65,9 +65,11 @@ impl Transactional for CompactionGroup { } async fn delete_in_transaction(&self, trx: &mut Transaction) -> MetadataModelResult<()> { - compaction_config::Entity::delete_by_id(self.group_id as CompactionGroupId) - .exec(trx) - .await?; + compaction_config::Entity::delete_by_id( + CompactionGroupId::try_from(self.group_id).unwrap(), + ) + .exec(trx) + .await?; Ok(()) } } @@ -76,7 +78,7 @@ impl Transactional for CompactionGroup { impl Transactional for CompactStatus { async fn upsert_in_transaction(&self, trx: &mut Transaction) -> MetadataModelResult<()> { let m = compaction_status::ActiveModel { - compaction_group_id: Set(self.compaction_group_id as _), + compaction_group_id: Set(self.compaction_group_id.try_into().unwrap()), status: Set(LevelHandlers::from( self.level_handlers.iter().map_into().collect_vec(), )), @@ -93,9 +95,11 @@ impl Transactional for CompactStatus { } async fn delete_in_transaction(&self, trx: &mut Transaction) -> MetadataModelResult<()> { - compaction_status::Entity::delete_by_id(self.compaction_group_id as CompactionGroupId) - .exec(trx) - .await?; + compaction_status::Entity::delete_by_id( + CompactionGroupId::try_from(self.compaction_group_id).unwrap(), + ) + .exec(trx) + .await?; Ok(()) } } @@ -105,8 +109,8 @@ impl Transactional for CompactTaskAssignment { async fn upsert_in_transaction(&self, trx: &mut Transaction) -> MetadataModelResult<()> { let task = self.compact_task.to_owned().unwrap(); let m = compaction_task::ActiveModel { - id: Set(task.task_id as _), - context_id: Set(self.context_id as _), + id: Set(task.task_id.try_into().unwrap()), + context_id: Set(self.context_id.try_into().unwrap()), task: Set(CompactionTask::from(&task)), }; compaction_task::Entity::insert(m) @@ -125,7 +129,7 @@ impl Transactional for CompactTaskAssignment { async fn delete_in_transaction(&self, trx: &mut Transaction) -> MetadataModelResult<()> { compaction_task::Entity::delete_by_id( - self.compact_task.as_ref().unwrap().task_id as CompactionTaskId, + CompactionTaskId::try_from(self.compact_task.as_ref().unwrap().task_id).unwrap(), ) .exec(trx) .await?; @@ -137,8 +141,8 @@ impl Transactional for CompactTaskAssignment { impl Transactional for HummockPinnedVersion { async fn upsert_in_transaction(&self, trx: &mut Transaction) -> MetadataModelResult<()> { let m = hummock_pinned_version::ActiveModel { - context_id: Set(self.context_id as _), - min_pinned_id: Set(self.min_pinned_id as _), + context_id: Set(self.context_id.try_into().unwrap()), + min_pinned_id: Set(self.min_pinned_id.try_into().unwrap()), }; hummock_pinned_version::Entity::insert(m) .on_conflict( @@ -152,7 +156,7 @@ impl Transactional for HummockPinnedVersion { } async fn delete_in_transaction(&self, trx: &mut Transaction) -> MetadataModelResult<()> { - hummock_pinned_version::Entity::delete_by_id(self.context_id as WorkerId) + hummock_pinned_version::Entity::delete_by_id(WorkerId::try_from(self.context_id).unwrap()) .exec(trx) .await?; Ok(()) @@ -163,8 +167,8 @@ impl Transactional for HummockPinnedVersion { impl Transactional for HummockPinnedSnapshot { async fn upsert_in_transaction(&self, trx: &mut Transaction) -> MetadataModelResult<()> { let m = hummock_pinned_snapshot::ActiveModel { - context_id: Set(self.context_id as _), - min_pinned_snapshot: Set(self.minimal_pinned_snapshot as _), + context_id: Set(self.context_id.try_into().unwrap()), + min_pinned_snapshot: Set(self.minimal_pinned_snapshot.try_into().unwrap()), }; hummock_pinned_snapshot::Entity::insert(m) .on_conflict( @@ -178,7 +182,7 @@ impl Transactional for HummockPinnedSnapshot { } async fn delete_in_transaction(&self, trx: &mut Transaction) -> MetadataModelResult<()> { - hummock_pinned_snapshot::Entity::delete_by_id(self.context_id as i32) + hummock_pinned_snapshot::Entity::delete_by_id(WorkerId::try_from(self.context_id).unwrap()) .exec(trx) .await?; Ok(()) @@ -189,7 +193,7 @@ impl Transactional for HummockPinnedSnapshot { impl Transactional for HummockVersionStats { async fn upsert_in_transaction(&self, trx: &mut Transaction) -> MetadataModelResult<()> { let m = hummock_version_stats::ActiveModel { - id: Set(self.hummock_version_id as _), + id: Set(self.hummock_version_id.try_into().unwrap()), stats: Set(TableStats(self.table_stats.clone())), }; hummock_version_stats::Entity::insert(m) @@ -204,9 +208,11 @@ impl Transactional for HummockVersionStats { } async fn delete_in_transaction(&self, trx: &mut Transaction) -> MetadataModelResult<()> { - hummock_version_stats::Entity::delete_by_id(self.hummock_version_id as i64) - .exec(trx) - .await?; + hummock_version_stats::Entity::delete_by_id( + HummockVersionId::try_from(self.hummock_version_id).unwrap(), + ) + .exec(trx) + .await?; Ok(()) } } @@ -215,10 +221,10 @@ impl Transactional for HummockVersionStats { impl Transactional for HummockVersionDelta { async fn upsert_in_transaction(&self, trx: &mut Transaction) -> MetadataModelResult<()> { let m = hummock_version_delta::ActiveModel { - id: Set(self.id as _), - prev_id: Set(self.prev_id as _), - max_committed_epoch: Set(self.max_committed_epoch as _), - safe_epoch: Set(self.visible_table_safe_epoch() as _), + id: Set(self.id.try_into().unwrap()), + prev_id: Set(self.prev_id.try_into().unwrap()), + max_committed_epoch: Set(self.max_committed_epoch.try_into().unwrap()), + safe_epoch: Set(self.visible_table_safe_epoch().try_into().unwrap()), trivial_move: Set(self.trivial_move), full_version_delta: Set(FullVersionDelta::from(&self.to_protobuf())), }; @@ -240,7 +246,7 @@ impl Transactional for HummockVersionDelta { } async fn delete_in_transaction(&self, trx: &mut Transaction) -> MetadataModelResult<()> { - hummock_version_delta::Entity::delete_by_id(self.id as HummockVersionId) + hummock_version_delta::Entity::delete_by_id(HummockVersionId::try_from(self.id).unwrap()) .exec(trx) .await?; Ok(()) @@ -249,14 +255,17 @@ impl Transactional for HummockVersionDelta { impl From for CompactionGroup { fn from(value: compaction_config::Model) -> Self { - Self::new(value.compaction_group_id as _, value.config.to_protobuf()) + Self::new( + value.compaction_group_id.try_into().unwrap(), + value.config.to_protobuf(), + ) } } impl From for CompactStatus { fn from(value: compaction_status::Model) -> Self { Self { - compaction_group_id: value.compaction_group_id as _, + compaction_group_id: value.compaction_group_id.try_into().unwrap(), level_handlers: value.status.to_protobuf().iter().map_into().collect(), } } diff --git a/src/meta/src/hummock/vacuum.rs b/src/meta/src/hummock/vacuum.rs index 6cde13507836..5dffb35dfc6c 100644 --- a/src/meta/src/hummock/vacuum.rs +++ b/src/meta/src/hummock/vacuum.rs @@ -17,6 +17,7 @@ use std::sync::Arc; use std::time::Duration; use itertools::Itertools; +use risingwave_common::util::epoch::Epoch; use risingwave_hummock_sdk::HummockSstableObjectId; use risingwave_pb::hummock::subscribe_compaction_event_response::Event as ResponseEvent; use risingwave_pb::hummock::VacuumTask; @@ -76,6 +77,16 @@ impl VacuumManager { break; } } + if self.env.opts.enable_hummock_time_travel { + let current_epoch_time = Epoch::now().physical_time(); + let epoch_watermark = Epoch::from_physical_time( + current_epoch_time.saturating_sub(self.env.opts.hummock_time_travel_retention_ms), + ) + .0; + self.hummock_manager + .truncate_time_travel_metadata(epoch_watermark) + .await?; + } Ok(total_deleted) } @@ -165,6 +176,9 @@ impl VacuumManager { &self, objects_to_delete: &mut Vec, ) -> MetaResult<()> { + if objects_to_delete.is_empty() { + return Ok(()); + } let reject = self.backup_manager.list_pinned_ssts(); // Ack these SSTs immediately, because they tend to be pinned for long time. // They will be GCed during full GC when they are no longer pinned. diff --git a/src/meta/src/manager/env.rs b/src/meta/src/manager/env.rs index e10d64a65632..78b5f3989935 100644 --- a/src/meta/src/manager/env.rs +++ b/src/meta/src/manager/env.rs @@ -170,6 +170,9 @@ pub struct MetaOpts { /// Interval of hummock version checkpoint. pub hummock_version_checkpoint_interval_sec: u64, pub enable_hummock_data_archive: bool, + pub enable_hummock_time_travel: bool, + pub hummock_time_travel_retention_ms: u64, + pub hummock_time_travel_snapshot_interval: u64, /// The minimum delta log number a new checkpoint should compact, otherwise the checkpoint /// attempt is rejected. Greater value reduces object store IO, meanwhile it results in /// more loss of in memory `HummockVersionCheckpoint::stale_objects` state when meta node is @@ -305,6 +308,9 @@ impl MetaOpts { vacuum_spin_interval_ms: 0, hummock_version_checkpoint_interval_sec: 30, enable_hummock_data_archive: false, + enable_hummock_time_travel: false, + hummock_time_travel_retention_ms: 0, + hummock_time_travel_snapshot_interval: 0, min_delta_log_num_for_hummock_version_checkpoint: 1, min_sst_retention_time_sec: 3600 * 24 * 7, full_gc_interval_sec: 3600 * 24 * 7, diff --git a/src/rpc_client/src/hummock_meta_client.rs b/src/rpc_client/src/hummock_meta_client.rs index 6e1dfec3b7be..e037114d843c 100644 --- a/src/rpc_client/src/hummock_meta_client.rs +++ b/src/rpc_client/src/hummock_meta_client.rs @@ -19,7 +19,8 @@ use risingwave_hummock_sdk::{ HummockEpoch, HummockSstableObjectId, HummockVersionId, SstObjectIdRange, SyncResult, }; use risingwave_pb::hummock::{ - HummockSnapshot, SubscribeCompactionEventRequest, SubscribeCompactionEventResponse, VacuumTask, + HummockSnapshot, PbHummockVersion, SubscribeCompactionEventRequest, + SubscribeCompactionEventResponse, VacuumTask, }; use tokio::sync::mpsc::UnboundedSender; @@ -61,4 +62,6 @@ pub trait HummockMetaClient: Send + Sync + 'static { UnboundedSender, BoxStream<'static, CompactionEventItem>, )>; + + async fn get_version_by_epoch(&self, epoch: HummockEpoch) -> Result; } diff --git a/src/rpc_client/src/meta_client.rs b/src/rpc_client/src/meta_client.rs index 76d5666c4ec4..130f5ef694ab 100644 --- a/src/rpc_client/src/meta_client.rs +++ b/src/rpc_client/src/meta_client.rs @@ -1379,6 +1379,12 @@ impl MetaClient { let resp = self.inner.cancel_compact_task(req).await?; Ok(resp.ret) } + + pub async fn get_version_by_epoch(&self, epoch: HummockEpoch) -> Result { + let req = GetVersionByEpochRequest { epoch }; + let resp = self.inner.get_version_by_epoch(req).await?; + Ok(resp.version.unwrap()) + } } #[async_trait] @@ -1540,6 +1546,10 @@ impl HummockMetaClient for MetaClient { Ok((request_sender, Box::pin(stream))) } + + async fn get_version_by_epoch(&self, epoch: HummockEpoch) -> Result { + self.get_version_by_epoch(epoch).await + } } #[async_trait] @@ -2035,6 +2045,7 @@ macro_rules! for_all_meta_rpc { ,{ hummock_client, list_compact_task_progress, ListCompactTaskProgressRequest, ListCompactTaskProgressResponse } ,{ hummock_client, cancel_compact_task, CancelCompactTaskRequest, CancelCompactTaskResponse} ,{ hummock_client, list_change_log_epochs, ListChangeLogEpochsRequest, ListChangeLogEpochsResponse } + ,{ hummock_client, get_version_by_epoch, GetVersionByEpochRequest, GetVersionByEpochResponse } ,{ user_client, create_user, CreateUserRequest, CreateUserResponse } ,{ user_client, update_user, UpdateUserRequest, UpdateUserResponse } ,{ user_client, drop_user, DropUserRequest, DropUserResponse } diff --git a/src/storage/hummock_sdk/src/compaction_group/hummock_version_ext.rs b/src/storage/hummock_sdk/src/compaction_group/hummock_version_ext.rs index 70872b4e2583..a27a94bf319c 100644 --- a/src/storage/hummock_sdk/src/compaction_group/hummock_version_ext.rs +++ b/src/storage/hummock_sdk/src/compaction_group/hummock_version_ext.rs @@ -159,23 +159,62 @@ impl HummockVersion { } pub fn get_object_ids(&self) -> HashSet { + self.get_sst_infos().map(|s| s.object_id).collect() + } + + pub fn get_sst_ids(&self) -> HashSet { + self.get_sst_infos().map(|s| s.sst_id).collect() + } + + pub fn get_sst_infos(&self) -> impl Iterator { self.get_combined_levels() + .flat_map(|level| level.table_infos.iter()) + .chain(self.table_change_log.values().flat_map(|change_log| { + change_log.0.iter().flat_map(|epoch_change_log| { + epoch_change_log + .old_value + .iter() + .chain(epoch_change_log.new_value.iter()) + }) + })) + } + + /// `get_sst_infos_from_groups` doesn't guarantee that all returned sst info belongs to `select_group`. + /// i.e. `select_group` is just a hint. + /// We separate `get_sst_infos_from_groups` and `get_sst_infos` because `get_sst_infos_from_groups` may be further customized in the future. + pub fn get_sst_infos_from_groups<'a>( + &'a self, + select_group: &'a HashSet, + ) -> impl Iterator + 'a { + self.levels + .iter() + .filter_map(|(cg_id, level)| { + if select_group.contains(cg_id) { + Some(level) + } else { + None + } + }) .flat_map(|level| { level - .table_infos + .l0 + .as_ref() + .unwrap() + .sub_levels .iter() - .map(|table_info| table_info.get_object_id()) + .rev() + .chain(level.levels.iter()) }) + .flat_map(|level| level.table_infos.iter()) .chain(self.table_change_log.values().flat_map(|change_log| { + // TODO: optimization: strip table change log change_log.0.iter().flat_map(|epoch_change_log| { epoch_change_log .old_value .iter() - .map(|sst| sst.object_id) - .chain(epoch_change_log.new_value.iter().map(|sst| sst.object_id)) + .chain(epoch_change_log.new_value.iter()) }) })) - .collect() } pub fn level_iter bool>( diff --git a/src/storage/hummock_sdk/src/lib.rs b/src/storage/hummock_sdk/src/lib.rs index 76be3fd901c4..4b74a6e790c0 100644 --- a/src/storage/hummock_sdk/src/lib.rs +++ b/src/storage/hummock_sdk/src/lib.rs @@ -44,6 +44,7 @@ pub mod key_range; pub mod prost_key_range; pub mod table_stats; pub mod table_watermark; +pub mod time_travel; pub mod version; pub use compact::*; @@ -146,6 +147,7 @@ pub enum HummockReadEpoch { NoWait(HummockEpoch), /// We don't need to wait epoch. Backup(HummockEpoch), + TimeTravel(HummockEpoch), } impl From for HummockReadEpoch { @@ -154,6 +156,7 @@ impl From for HummockReadEpoch { batch_query_epoch::Epoch::Committed(epoch) => HummockReadEpoch::Committed(epoch), batch_query_epoch::Epoch::Current(epoch) => HummockReadEpoch::Current(epoch), batch_query_epoch::Epoch::Backup(epoch) => HummockReadEpoch::Backup(epoch), + batch_query_epoch::Epoch::TimeTravel(epoch) => HummockReadEpoch::TimeTravel(epoch), } } } @@ -171,6 +174,7 @@ impl HummockReadEpoch { HummockReadEpoch::Current(epoch) => epoch, HummockReadEpoch::NoWait(epoch) => epoch, HummockReadEpoch::Backup(epoch) => epoch, + HummockReadEpoch::TimeTravel(epoch) => epoch, } } } diff --git a/src/storage/hummock_sdk/src/time_travel.rs b/src/storage/hummock_sdk/src/time_travel.rs new file mode 100644 index 000000000000..2a848c272e23 --- /dev/null +++ b/src/storage/hummock_sdk/src/time_travel.rs @@ -0,0 +1,348 @@ +// Copyright 2024 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::{HashMap, HashSet}; +use std::sync::Arc; + +use risingwave_common::catalog::TableId; +use risingwave_pb::hummock::group_delta::DeltaType; +use risingwave_pb::hummock::hummock_version::PbLevels; +use risingwave_pb::hummock::hummock_version_delta::{ChangeLogDelta, PbGroupDeltas}; +use risingwave_pb::hummock::{ + EpochNewChangeLog, PbGroupDelta, PbHummockVersion, PbHummockVersionDelta, PbIntraLevelDelta, + PbLevel, PbOverlappingLevel, PbSstableInfo, SstableInfo, StateTableInfoDelta, +}; + +use crate::change_log::TableChangeLog; +use crate::table_watermark::TableWatermarks; +use crate::version::{HummockVersion, HummockVersionDelta, HummockVersionStateTableInfo}; +use crate::{CompactionGroupId, HummockSstableId}; + +/// [`IncompleteHummockVersion`] is incomplete because `SSTableInfo` only has the `sst_id` set in the following fields: +/// - `PbLevels` +/// - `TableChangeLog` +#[derive(Debug, Clone, PartialEq)] +pub struct IncompleteHummockVersion { + pub id: u64, + pub levels: HashMap, + pub max_committed_epoch: u64, + safe_epoch: u64, + pub table_watermarks: HashMap>, + pub table_change_log: HashMap, + pub state_table_info: HummockVersionStateTableInfo, +} + +/// Clone from an `SstableInfo`, but only set the `sst_id` for the target, leaving other fields as default. +/// The goal is to reduce the size of pb object generated afterward. +fn stripped_sstable_info(origin: &SstableInfo) -> SstableInfo { + SstableInfo { + object_id: Default::default(), + sst_id: origin.sst_id, + key_range: Default::default(), + file_size: Default::default(), + table_ids: Default::default(), + meta_offset: Default::default(), + stale_key_count: Default::default(), + total_key_count: Default::default(), + min_epoch: Default::default(), + max_epoch: Default::default(), + uncompressed_file_size: Default::default(), + range_tombstone_count: Default::default(), + bloom_filter_kind: Default::default(), + } +} + +fn stripped_epoch_new_change_log(origin: &EpochNewChangeLog) -> EpochNewChangeLog { + EpochNewChangeLog { + old_value: origin.old_value.iter().map(stripped_sstable_info).collect(), + new_value: origin.new_value.iter().map(stripped_sstable_info).collect(), + epochs: origin.epochs.clone(), + } +} + +fn stripped_change_log_delta(origin: &ChangeLogDelta) -> ChangeLogDelta { + ChangeLogDelta { + new_log: origin.new_log.as_ref().map(stripped_epoch_new_change_log), + truncate_epoch: origin.truncate_epoch, + } +} + +fn stripped_level(origin: &PbLevel) -> PbLevel { + PbLevel { + level_idx: origin.level_idx, + level_type: origin.level_type, + table_infos: origin + .table_infos + .iter() + .map(stripped_sstable_info) + .collect(), + total_file_size: origin.total_file_size, + sub_level_id: origin.sub_level_id, + uncompressed_file_size: origin.uncompressed_file_size, + vnode_partition_count: origin.vnode_partition_count, + } +} + +pub fn refill_version( + version: &mut HummockVersion, + sst_id_to_info: &HashMap, +) { + for level in version.levels.values_mut().flat_map(|level| { + level + .l0 + .as_mut() + .unwrap() + .sub_levels + .iter_mut() + .rev() + .chain(level.levels.iter_mut()) + }) { + refill_level(level, sst_id_to_info); + } + + for t in version.table_change_log.values_mut() { + refill_table_change_log(t, sst_id_to_info); + } +} + +fn refill_level(level: &mut PbLevel, sst_id_to_info: &HashMap) { + for s in &mut level.table_infos { + refill_sstable_info(s, sst_id_to_info); + } +} + +fn refill_table_change_log( + table_change_log: &mut TableChangeLog, + sst_id_to_info: &HashMap, +) { + for c in &mut table_change_log.0 { + for s in &mut c.old_value { + refill_sstable_info(s, sst_id_to_info); + } + for s in &mut c.new_value { + refill_sstable_info(s, sst_id_to_info); + } + } +} + +/// Caller should ensure `sst_id_to_info` includes an entry corresponding to `sstable_info`. +fn refill_sstable_info( + sstable_info: &mut PbSstableInfo, + sst_id_to_info: &HashMap, +) { + *sstable_info = sst_id_to_info + .get(&sstable_info.sst_id) + .unwrap_or_else(|| panic!("SstableInfo should exist")) + .clone(); +} + +fn stripped_l0(origin: &PbOverlappingLevel) -> PbOverlappingLevel { + PbOverlappingLevel { + sub_levels: origin.sub_levels.iter().map(stripped_level).collect(), + total_file_size: origin.total_file_size, + uncompressed_file_size: origin.uncompressed_file_size, + } +} + +#[allow(deprecated)] +fn stripped_levels(origin: &PbLevels) -> PbLevels { + PbLevels { + levels: origin.levels.iter().map(stripped_level).collect(), + l0: origin.l0.as_ref().map(stripped_l0), + group_id: origin.group_id, + parent_group_id: origin.parent_group_id, + member_table_ids: Default::default(), + } +} + +fn stripped_intra_level_delta(origin: &PbIntraLevelDelta) -> PbIntraLevelDelta { + PbIntraLevelDelta { + level_idx: origin.level_idx, + l0_sub_level_id: origin.l0_sub_level_id, + removed_table_ids: origin.removed_table_ids.clone(), + inserted_table_infos: origin + .inserted_table_infos + .iter() + .map(stripped_sstable_info) + .collect(), + vnode_partition_count: origin.vnode_partition_count, + } +} + +fn stripped_group_delta(origin: &PbGroupDelta) -> PbGroupDelta { + let delta_type = origin.delta_type.as_ref().map(|d| match d { + DeltaType::IntraLevel(l) => DeltaType::IntraLevel(stripped_intra_level_delta(l)), + _ => panic!("time travel expects DeltaType::IntraLevel only"), + }); + PbGroupDelta { delta_type } +} + +fn stripped_group_deltas(origin: &PbGroupDeltas) -> PbGroupDeltas { + let group_deltas = origin + .group_deltas + .iter() + .map(stripped_group_delta) + .collect(); + PbGroupDeltas { group_deltas } +} + +/// `SStableInfo` will be stripped. +impl From<(&HummockVersion, &HashSet)> for IncompleteHummockVersion { + fn from(p: (&HummockVersion, &HashSet)) -> Self { + let (version, select_group) = p; + Self { + id: version.id, + levels: version + .levels + .iter() + .filter_map(|(group_id, levels)| { + if select_group.contains(group_id) { + Some((*group_id as CompactionGroupId, stripped_levels(levels))) + } else { + None + } + }) + .collect(), + max_committed_epoch: version.max_committed_epoch, + safe_epoch: version.visible_table_safe_epoch(), + table_watermarks: version.table_watermarks.clone(), + // TODO: optimization: strip table change log + table_change_log: version + .table_change_log + .iter() + .map(|(table_id, change_log)| { + let incomplete_table_change_log = change_log + .0 + .iter() + .map(stripped_epoch_new_change_log) + .collect(); + (*table_id, TableChangeLog(incomplete_table_change_log)) + }) + .collect(), + state_table_info: version.state_table_info.clone(), + } + } +} + +impl IncompleteHummockVersion { + /// Resulted `SStableInfo` is incompelte. + pub fn to_protobuf(&self) -> PbHummockVersion { + PbHummockVersion { + id: self.id, + levels: self + .levels + .iter() + .map(|(group_id, levels)| (*group_id as _, levels.clone())) + .collect(), + max_committed_epoch: self.max_committed_epoch, + safe_epoch: self.safe_epoch, + table_watermarks: self + .table_watermarks + .iter() + .map(|(table_id, watermark)| (table_id.table_id, watermark.to_protobuf())) + .collect(), + table_change_logs: self + .table_change_log + .iter() + .map(|(table_id, change_log)| (table_id.table_id, change_log.to_protobuf())) + .collect(), + state_table_info: self.state_table_info.to_protobuf(), + } + } +} + +/// [`IncompleteHummockVersionDelta`] is incomplete because `SSTableInfo` only has the `sst_id` set in the following fields: +/// - `PbGroupDeltas` +/// - `ChangeLogDelta` +#[derive(Debug, PartialEq, Clone)] +pub struct IncompleteHummockVersionDelta { + pub id: u64, + pub prev_id: u64, + pub group_deltas: HashMap, + pub max_committed_epoch: u64, + pub safe_epoch: u64, + pub trivial_move: bool, + pub new_table_watermarks: HashMap, + pub removed_table_ids: HashSet, + pub change_log_delta: HashMap, + pub state_table_info_delta: HashMap, +} + +/// `SStableInfo` will be stripped. +impl From<(&HummockVersionDelta, &HashSet)> for IncompleteHummockVersionDelta { + fn from(p: (&HummockVersionDelta, &HashSet)) -> Self { + let (delta, select_group) = p; + Self { + id: delta.id, + prev_id: delta.prev_id, + group_deltas: delta + .group_deltas + .iter() + .filter_map(|(cg_id, deltas)| { + if select_group.contains(cg_id) { + Some((*cg_id, stripped_group_deltas(deltas))) + } else { + None + } + }) + .collect(), + max_committed_epoch: delta.max_committed_epoch, + safe_epoch: delta.visible_table_safe_epoch(), + trivial_move: delta.trivial_move, + new_table_watermarks: delta.new_table_watermarks.clone(), + removed_table_ids: delta.removed_table_ids.clone(), + // TODO: optimization: strip table change log + change_log_delta: delta + .change_log_delta + .iter() + .map(|(table_id, log_delta)| (*table_id, stripped_change_log_delta(log_delta))) + .collect(), + state_table_info_delta: delta.state_table_info_delta.clone(), + } + } +} + +impl IncompleteHummockVersionDelta { + /// Resulted `SStableInfo` is incompelte. + pub fn to_protobuf(&self) -> PbHummockVersionDelta { + PbHummockVersionDelta { + id: self.id, + prev_id: self.prev_id, + group_deltas: self.group_deltas.clone(), + max_committed_epoch: self.max_committed_epoch, + safe_epoch: self.safe_epoch, + trivial_move: self.trivial_move, + new_table_watermarks: self + .new_table_watermarks + .iter() + .map(|(table_id, watermarks)| (table_id.table_id, watermarks.to_protobuf())) + .collect(), + removed_table_ids: self + .removed_table_ids + .iter() + .map(|table_id| table_id.table_id) + .collect(), + change_log_delta: self + .change_log_delta + .iter() + .map(|(table_id, log_delta)| (table_id.table_id, log_delta.clone())) + .collect(), + state_table_info_delta: self + .state_table_info_delta + .iter() + .map(|(table_id, delta)| (table_id.table_id, delta.clone())) + .collect(), + } + } +} diff --git a/src/storage/hummock_sdk/src/version.rs b/src/storage/hummock_sdk/src/version.rs index 611e177db74d..920c43df1df1 100644 --- a/src/storage/hummock_sdk/src/version.rs +++ b/src/storage/hummock_sdk/src/version.rs @@ -538,6 +538,70 @@ impl HummockVersionDelta { .collect() } + pub fn newly_added_sst_ids(&self) -> HashSet { + self.group_deltas + .values() + .flat_map(|group_deltas| { + group_deltas.group_deltas.iter().flat_map(|group_delta| { + group_delta.delta_type.iter().flat_map(|delta_type| { + static EMPTY_VEC: Vec = Vec::new(); + let sst_slice = match delta_type { + DeltaType::IntraLevel(level_delta) => &level_delta.inserted_table_infos, + DeltaType::GroupConstruct(_) + | DeltaType::GroupDestroy(_) + | DeltaType::GroupMetaChange(_) + | DeltaType::GroupTableChange(_) => &EMPTY_VEC, + }; + sst_slice.iter().map(|sst| sst.sst_id) + }) + }) + }) + .chain(self.change_log_delta.values().flat_map(|delta| { + let new_log = delta.new_log.as_ref().unwrap(); + new_log + .new_value + .iter() + .map(|sst| sst.sst_id) + .chain(new_log.old_value.iter().map(|sst| sst.sst_id)) + })) + .collect() + } + + pub fn newly_added_sst_infos<'a>( + &'a self, + select_group: &'a HashSet, + ) -> impl Iterator + 'a { + self.group_deltas + .iter() + .filter_map(|(cg_id, group_deltas)| { + if select_group.contains(cg_id) { + Some(group_deltas) + } else { + None + } + }) + .flat_map(|group_deltas| { + group_deltas.group_deltas.iter().flat_map(|group_delta| { + group_delta.delta_type.iter().flat_map(|delta_type| { + static EMPTY_VEC: Vec = Vec::new(); + let sst_slice = match delta_type { + DeltaType::IntraLevel(level_delta) => &level_delta.inserted_table_infos, + DeltaType::GroupConstruct(_) + | DeltaType::GroupDestroy(_) + | DeltaType::GroupMetaChange(_) + | DeltaType::GroupTableChange(_) => &EMPTY_VEC, + }; + sst_slice.iter() + }) + }) + }) + .chain(self.change_log_delta.values().flat_map(|delta| { + // TODO: optimization: strip table change log + let new_log = delta.new_log.as_ref().unwrap(); + new_log.new_value.iter().chain(new_log.old_value.iter()) + })) + } + pub fn visible_table_safe_epoch(&self) -> u64 { self.safe_epoch } diff --git a/src/storage/hummock_trace/src/opts.rs b/src/storage/hummock_trace/src/opts.rs index 709595009a32..2a5b62f046d3 100644 --- a/src/storage/hummock_trace/src/opts.rs +++ b/src/storage/hummock_trace/src/opts.rs @@ -109,6 +109,7 @@ pub struct TracedReadOptions { pub retention_seconds: Option, pub table_id: TracedTableId, pub read_version_from_backup: bool, + pub read_version_from_time_travel: bool, } impl TracedReadOptions { @@ -123,7 +124,8 @@ impl TracedReadOptions { cache_policy: TracedCachePolicy::Disable, retention_seconds: None, table_id: TracedTableId { table_id }, - read_version_from_backup: true, + read_version_from_backup: false, + read_version_from_time_travel: false, } } } @@ -195,6 +197,7 @@ pub enum TracedHummockReadEpoch { Current(TracedHummockEpoch), NoWait(TracedHummockEpoch), Backup(TracedHummockEpoch), + TimeTravel(TracedHummockEpoch), } impl From for TracedHummockReadEpoch { @@ -204,6 +207,7 @@ impl From for TracedHummockReadEpoch { HummockReadEpoch::Current(epoch) => Self::Current(epoch), HummockReadEpoch::NoWait(epoch) => Self::NoWait(epoch), HummockReadEpoch::Backup(epoch) => Self::Backup(epoch), + HummockReadEpoch::TimeTravel(epoch) => Self::TimeTravel(epoch), } } } @@ -215,6 +219,7 @@ impl From for HummockReadEpoch { TracedHummockReadEpoch::Current(epoch) => Self::Current(epoch), TracedHummockReadEpoch::NoWait(epoch) => Self::NoWait(epoch), TracedHummockReadEpoch::Backup(epoch) => Self::Backup(epoch), + TracedHummockReadEpoch::TimeTravel(epoch) => Self::TimeTravel(epoch), } } } diff --git a/src/storage/src/hummock/hummock_meta_client.rs b/src/storage/src/hummock/hummock_meta_client.rs index 5cf380285cf1..f30240b1264c 100644 --- a/src/storage/src/hummock/hummock_meta_client.rs +++ b/src/storage/src/hummock/hummock_meta_client.rs @@ -18,7 +18,9 @@ use async_trait::async_trait; use futures::stream::BoxStream; use risingwave_hummock_sdk::version::HummockVersion; use risingwave_hummock_sdk::{HummockSstableObjectId, SstObjectIdRange, SyncResult}; -use risingwave_pb::hummock::{HummockSnapshot, SubscribeCompactionEventRequest, VacuumTask}; +use risingwave_pb::hummock::{ + HummockSnapshot, PbHummockVersion, SubscribeCompactionEventRequest, VacuumTask, +}; use risingwave_rpc_client::error::Result; use risingwave_rpc_client::{CompactionEventItem, HummockMetaClient, MetaClient}; use tokio::sync::mpsc::UnboundedSender; @@ -127,4 +129,8 @@ impl HummockMetaClient for MonitoredHummockMetaClient { )> { self.meta_client.subscribe_compaction_event().await } + + async fn get_version_by_epoch(&self, epoch: HummockEpoch) -> Result { + self.meta_client.get_version_by_epoch(epoch).await + } } diff --git a/src/storage/src/hummock/store/hummock_storage.rs b/src/storage/src/hummock/store/hummock_storage.rs index f7794604e5a8..d3c13b77863a 100644 --- a/src/storage/src/hummock/store/hummock_storage.rs +++ b/src/storage/src/hummock/store/hummock_storage.rs @@ -30,9 +30,11 @@ use risingwave_hummock_sdk::key::{ is_empty_key_range, vnode, vnode_range, TableKey, TableKeyRange, }; use risingwave_hummock_sdk::table_watermark::TableWatermarksIndex; +use risingwave_hummock_sdk::version::HummockVersion; use risingwave_hummock_sdk::HummockReadEpoch; use risingwave_pb::hummock::SstableInfo; use risingwave_rpc_client::HummockMetaClient; +use thiserror_ext::AsReport; use tokio::sync::mpsc::{unbounded_channel, UnboundedSender}; use tokio::sync::oneshot; use tracing::error; @@ -115,6 +117,8 @@ pub struct HummockStorage { write_limiter: WriteLimiterRef, compact_await_tree_reg: Option, + + hummock_meta_client: Arc, } pub type ReadVersionTuple = (Vec, Vec, CommittedVersion); @@ -232,6 +236,7 @@ impl HummockStorage { min_current_epoch, write_limiter, compact_await_tree_reg: await_tree_reg, + hummock_meta_client, }; tokio::spawn(hummock_event_handler.start_hummock_event_handler_worker()); @@ -254,7 +259,10 @@ impl HummockStorage { ) -> StorageResult> { let key_range = (Bound::Included(key.clone()), Bound::Included(key.clone())); - let (key_range, read_version_tuple) = if read_options.read_version_from_backup { + let (key_range, read_version_tuple) = if read_options.read_version_from_time_travel { + self.build_read_version_by_time_travel(epoch, read_options.table_id, key_range) + .await? + } else if read_options.read_version_from_backup { self.build_read_version_tuple_from_backup(epoch, read_options.table_id, key_range) .await? } else { @@ -276,7 +284,10 @@ impl HummockStorage { epoch: u64, read_options: ReadOptions, ) -> StorageResult { - let (key_range, read_version_tuple) = if read_options.read_version_from_backup { + let (key_range, read_version_tuple) = if read_options.read_version_from_time_travel { + self.build_read_version_by_time_travel(epoch, read_options.table_id, key_range) + .await? + } else if read_options.read_version_from_backup { self.build_read_version_tuple_from_backup(epoch, read_options.table_id, key_range) .await? } else { @@ -294,7 +305,10 @@ impl HummockStorage { epoch: u64, read_options: ReadOptions, ) -> StorageResult { - let (key_range, read_version_tuple) = if read_options.read_version_from_backup { + let (key_range, read_version_tuple) = if read_options.read_version_from_time_travel { + self.build_read_version_by_time_travel(epoch, read_options.table_id, key_range) + .await? + } else if read_options.read_version_from_backup { self.build_read_version_tuple_from_backup(epoch, read_options.table_id, key_range) .await? } else { @@ -306,6 +320,29 @@ impl HummockStorage { .await } + async fn build_read_version_by_time_travel( + &self, + epoch: u64, + table_id: TableId, + key_range: TableKeyRange, + ) -> StorageResult<(TableKeyRange, ReadVersionTuple)> { + let pb_version = self + .hummock_meta_client + .get_version_by_epoch(epoch) + .await + .inspect_err(|e| tracing::error!("{}", e.to_report_string())) + .map_err(|e| HummockError::meta_error(e.to_report_string()))?; + let version = HummockVersion::from_rpc_protobuf(&pb_version); + validate_safe_epoch(&version, table_id, epoch)?; + let (tx, _rx) = unbounded_channel(); + Ok(get_committed_read_version_tuple( + PinnedVersion::new(version, tx), + table_id, + key_range, + epoch, + )) + } + async fn build_read_version_tuple_from_backup( &self, epoch: u64, @@ -641,9 +678,6 @@ impl StateStore for HummockStorage { } } -#[cfg(any(test, feature = "test"))] -use risingwave_hummock_sdk::version::HummockVersion; - #[cfg(any(test, feature = "test"))] impl HummockStorage { pub async fn seal_and_sync_epoch( diff --git a/src/storage/src/store.rs b/src/storage/src/store.rs index 56395d48b624..5653927891bd 100644 --- a/src/storage/src/store.rs +++ b/src/storage/src/store.rs @@ -486,6 +486,7 @@ pub struct ReadOptions { /// Read from historical hummock version of meta snapshot backup. /// It should only be used by `StorageTable` for batch query. pub read_version_from_backup: bool, + pub read_version_from_time_travel: bool, } impl From for ReadOptions { @@ -498,6 +499,7 @@ impl From for ReadOptions { retention_seconds: value.retention_seconds, table_id: value.table_id.into(), read_version_from_backup: value.read_version_from_backup, + read_version_from_time_travel: value.read_version_from_time_travel, } } } @@ -512,6 +514,7 @@ impl From for TracedReadOptions { retention_seconds: value.retention_seconds, table_id: value.table_id.into(), read_version_from_backup: value.read_version_from_backup, + read_version_from_time_travel: value.read_version_from_time_travel, } } } diff --git a/src/storage/src/table/batch_table/storage_table.rs b/src/storage/src/table/batch_table/storage_table.rs index 7f202bfd2b26..9a6169ff5838 100644 --- a/src/storage/src/table/batch_table/storage_table.rs +++ b/src/storage/src/table/batch_table/storage_table.rs @@ -359,6 +359,7 @@ impl StorageTableInner { ) -> StorageResult> { let epoch = wait_epoch.get_epoch(); let read_backup = matches!(wait_epoch, HummockReadEpoch::Backup(_)); + let read_time_travel = matches!(wait_epoch, HummockReadEpoch::TimeTravel(_)); self.store.try_wait_epoch(wait_epoch).await?; let serialized_pk = serialize_pk_with_vnode( &pk, @@ -379,6 +380,7 @@ impl StorageTableInner { retention_seconds: self.table_option.retention_seconds, table_id: self.table_id, read_version_from_backup: read_backup, + read_version_from_time_travel: read_time_travel, cache_policy: CachePolicy::Fill(CacheContext::Default), ..Default::default() }; @@ -486,12 +488,14 @@ impl StorageTableInner { let iterators: Vec<_> = try_join_all(table_key_ranges.map(|table_key_range| { let prefix_hint = prefix_hint.clone(); let read_backup = matches!(wait_epoch, HummockReadEpoch::Backup(_)); + let read_time_travel = matches!(wait_epoch, HummockReadEpoch::TimeTravel(_)); async move { let read_options = ReadOptions { prefix_hint, retention_seconds: self.table_option.retention_seconds, table_id: self.table_id, read_version_from_backup: read_backup, + read_version_from_time_travel: read_time_travel, prefetch_options, cache_policy, ..Default::default()