diff --git a/Cargo.lock b/Cargo.lock index c8c9bf98d9412..2afdd92bbf435 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -9657,6 +9657,7 @@ dependencies = [ "serde_json", "sha2", "smallvec", + "speedate", "tempfile", "thiserror", "thiserror-ext", diff --git a/integration_tests/iceberg-source/python/main.py b/integration_tests/iceberg-source/python/main.py index ebfd6a6c468f5..bcc44bc77e0cf 100644 --- a/integration_tests/iceberg-source/python/main.py +++ b/integration_tests/iceberg-source/python/main.py @@ -88,7 +88,9 @@ def check_risingwave_iceberg_source(docker): config = read_config(f"{docker.case_dir()}/config.ini") sqls = [ - "select count(*) from iceberg_source" + "select count(*) from iceberg_source", + "select count(*) from iceberg_source for system_time as of '2100-01-01 00:00:00+00:00'", + "select count(*) from iceberg_source for system_time as of 4102444800" ] rw_config = config['risingwave'] diff --git a/src/connector/src/source/iceberg/mod.rs b/src/connector/src/source/iceberg/mod.rs index 8b7f0e696e95d..99f4259fb3607 100644 --- a/src/connector/src/source/iceberg/mod.rs +++ b/src/connector/src/source/iceberg/mod.rs @@ -157,16 +157,58 @@ impl SplitEnumerator for IcebergSplitEnumerator { } } +pub enum IcebergTimeTravelInfo { + Version(i64), + TimestampMs(i64), +} + impl IcebergSplitEnumerator { pub async fn list_splits_batch( &self, + time_traval_info: Option, batch_parallelism: usize, ) -> ConnectorResult> { if batch_parallelism == 0 { bail!("Batch parallelism is 0. Cannot split the iceberg files."); } let table = self.config.load_table().await?; - let snapshot_id = table.current_table_metadata().current_snapshot_id.unwrap(); + let snapshot_id = match time_traval_info { + Some(IcebergTimeTravelInfo::Version(version)) => { + let Some(snapshot) = table.current_table_metadata().snapshot(version) else { + bail!("Cannot find the snapshot id in the iceberg table."); + }; + snapshot.snapshot_id + } + Some(IcebergTimeTravelInfo::TimestampMs(timestamp)) => { + match &table.current_table_metadata().snapshots { + Some(snapshots) => { + let snapshot = snapshots + .iter() + .filter(|snapshot| snapshot.timestamp_ms <= timestamp) + .max_by_key(|snapshot| snapshot.timestamp_ms); + match snapshot { + Some(snapshot) => snapshot.snapshot_id, + None => { + // convert unix time to human readable time + let time = chrono::NaiveDateTime::from_timestamp_millis(timestamp); + if time.is_some() { + bail!("Cannot find a snapshot older than {}", time.unwrap()); + } else { + bail!("Cannot find a snapshot"); + } + } + } + } + None => { + bail!("Cannot find the snapshots in the iceberg table."); + } + } + } + None => match table.current_table_metadata().current_snapshot_id { + Some(snapshot_id) => snapshot_id, + None => bail!("Cannot find the current snapshot id in the iceberg table."), + }, + }; let mut files = vec![]; for file in table.current_data_files().await? { if file.content != DataContentType::Data { diff --git a/src/frontend/Cargo.toml b/src/frontend/Cargo.toml index 3ab30f4f397f2..387b370a6401a 100644 --- a/src/frontend/Cargo.toml +++ b/src/frontend/Cargo.toml @@ -78,6 +78,7 @@ serde = { version = "1", features = ["derive"] } serde_json = "1" sha2 = "0.10.7" smallvec = { version = "1.13.1", features = ["serde"] } +speedate = "0.13.0" tempfile = "3" thiserror = "1" thiserror-ext = { workspace = true } diff --git a/src/frontend/src/optimizer/plan_node/batch_source.rs b/src/frontend/src/optimizer/plan_node/batch_source.rs index 73547957c57a6..a0ece261dcea0 100644 --- a/src/frontend/src/optimizer/plan_node/batch_source.rs +++ b/src/frontend/src/optimizer/plan_node/batch_source.rs @@ -17,6 +17,7 @@ use std::rc::Rc; use pretty_xmlish::{Pretty, XmlNode}; use risingwave_pb::batch_plan::plan_node::NodeBody; use risingwave_pb::batch_plan::SourceNode; +use risingwave_sqlparser::ast::AsOf; use super::batch::prelude::*; use super::utils::{childless_record, column_names_pretty, Distill}; @@ -59,6 +60,10 @@ impl BatchSource { self.core.kafka_timestamp_range_value() } + pub fn as_of(&self) -> Option { + self.core.as_of.clone() + } + pub fn clone_with_dist(&self) -> Self { let base = self .base @@ -75,11 +80,14 @@ impl_plan_tree_node_for_leaf! { BatchSource } impl Distill for BatchSource { fn distill<'a>(&self) -> XmlNode<'a> { let src = Pretty::from(self.source_catalog().unwrap().name.clone()); - let fields = vec![ + let mut fields = vec![ ("source", src), ("columns", column_names_pretty(self.schema())), ("filter", Pretty::debug(&self.kafka_timestamp_range_value())), ]; + if let Some(as_of) = &self.core.as_of { + fields.push(("as_of", Pretty::debug(as_of))); + } childless_record("BatchSource", fields) } } diff --git a/src/frontend/src/optimizer/plan_node/generic/source.rs b/src/frontend/src/optimizer/plan_node/generic/source.rs index fa347375bf2bf..8d2a3dc8cec0f 100644 --- a/src/frontend/src/optimizer/plan_node/generic/source.rs +++ b/src/frontend/src/optimizer/plan_node/generic/source.rs @@ -107,6 +107,17 @@ impl Source { .is_some_and(|catalog| catalog.with_properties.is_new_fs_connector()) } + pub fn is_iceberg_connector(&self) -> bool { + self.catalog + .as_ref() + .is_some_and(|catalog| catalog.with_properties.is_iceberg_connector()) + } + + /// Currently, only iceberg source supports time travel. + pub fn support_time_travel(&self) -> bool { + self.is_iceberg_connector() + } + /// The columns in stream/batch source node indicate the actual columns it will produce, /// instead of the columns defined in source catalog. The difference is generated columns. pub fn exclude_generated_columns(mut self) -> (Self, Option) { diff --git a/src/frontend/src/optimizer/plan_node/logical_source.rs b/src/frontend/src/optimizer/plan_node/logical_source.rs index 54d3680778817..83fa891cb39a0 100644 --- a/src/frontend/src/optimizer/plan_node/logical_source.rs +++ b/src/frontend/src/optimizer/plan_node/logical_source.rs @@ -19,6 +19,7 @@ use std::rc::Rc; use fixedbitset::FixedBitSet; use pretty_xmlish::{Pretty, XmlNode}; +use risingwave_common::bail; use risingwave_common::catalog::{ ColumnCatalog, ColumnDesc, Field, Schema, KAFKA_TIMESTAMP_COLUMN_NAME, }; @@ -85,6 +86,10 @@ impl LogicalSource { as_of, }; + if core.as_of.is_some() && !core.support_time_travel() { + bail!("Time travel is not supported for the source") + } + let base = PlanBase::new_logical_with_core(&core); let output_exprs = Self::derive_output_exprs_from_generated_columns(&core.column_catalog)?; @@ -262,11 +267,15 @@ impl Distill for LogicalSource { let fields = if let Some(catalog) = self.source_catalog() { let src = Pretty::from(catalog.name.clone()); let time = Pretty::debug(&self.core.kafka_timestamp_range); - vec![ + let mut fields = vec![ ("source", src), ("columns", column_names_pretty(self.schema())), ("time_range", time), - ] + ]; + if let Some(as_of) = &self.core.as_of { + fields.push(("as_of", Pretty::debug(as_of))); + } + fields } else { vec![] }; diff --git a/src/frontend/src/planner/relation.rs b/src/frontend/src/planner/relation.rs index 16440e97c15bb..74060b484e9c0 100644 --- a/src/frontend/src/planner/relation.rs +++ b/src/frontend/src/planner/relation.rs @@ -104,14 +104,14 @@ impl Planner { } else { let as_of = source.as_of.clone(); match as_of { - None => {} + None + | Some(AsOf::VersionNum(_)) + | Some(AsOf::TimestampString(_)) + | Some(AsOf::TimestampNum(_)) => {} Some(AsOf::ProcessTime) => { bail_not_implemented!("As Of ProcessTime() is not supported yet.") } - Some(AsOf::TimestampString(_)) | Some(AsOf::TimestampNum(_)) => { - bail_not_implemented!("As Of Timestamp is not supported yet.") - } - Some(AsOf::VersionNum(_)) | Some(AsOf::VersionString(_)) => { + Some(AsOf::VersionString(_)) => { bail_not_implemented!("As Of Version is not supported yet.") } } diff --git a/src/frontend/src/scheduler/plan_fragmenter.rs b/src/frontend/src/scheduler/plan_fragmenter.rs index c68cd02c2eeeb..16ccbb8164bf4 100644 --- a/src/frontend/src/scheduler/plan_fragmenter.rs +++ b/src/frontend/src/scheduler/plan_fragmenter.rs @@ -24,6 +24,7 @@ use enum_as_inner::EnumAsInner; use futures::TryStreamExt; use itertools::Itertools; use pgwire::pg_server::SessionId; +use risingwave_common::bail; use risingwave_common::buffer::{Bitmap, BitmapBuilder}; use risingwave_common::catalog::TableDesc; use risingwave_common::hash::table_distribution::TableDistribution; @@ -31,7 +32,7 @@ use risingwave_common::hash::{ParallelUnitId, ParallelUnitMapping, VirtualNode}; use risingwave_common::util::scan_range::ScanRange; use risingwave_connector::source::filesystem::opendal_source::opendal_enumerator::OpendalEnumerator; use risingwave_connector::source::filesystem::opendal_source::{OpendalGcs, OpendalS3}; -use risingwave_connector::source::iceberg::IcebergSplitEnumerator; +use risingwave_connector::source::iceberg::{IcebergSplitEnumerator, IcebergTimeTravelInfo}; use risingwave_connector::source::kafka::KafkaSplitEnumerator; use risingwave_connector::source::reader::reader::build_opendal_fs_list_for_batch; use risingwave_connector::source::{ @@ -41,6 +42,7 @@ use risingwave_pb::batch_plan::plan_node::NodeBody; use risingwave_pb::batch_plan::{ExchangeInfo, ScanRange as ScanRangeProto}; use risingwave_pb::common::Buffer; use risingwave_pb::plan_common::Field as FieldPb; +use risingwave_sqlparser::ast::AsOf; use serde::ser::SerializeStruct; use serde::Serialize; use uuid::Uuid; @@ -50,7 +52,7 @@ use crate::catalog::catalog_service::CatalogReader; use crate::catalog::TableId; use crate::error::RwError; use crate::optimizer::plan_node::generic::{GenericPlanRef, PhysicalPlanRef}; -use crate::optimizer::plan_node::{PlanNodeId, PlanNodeType}; +use crate::optimizer::plan_node::{BatchSource, PlanNodeId, PlanNodeType}; use crate::optimizer::property::Distribution; use crate::optimizer::PlanRef; use crate::scheduler::worker_node_manager::WorkerNodeSelector; @@ -266,6 +268,7 @@ impl Query { pub struct SourceFetchInfo { pub connector: ConnectorProperties, pub timebound: (Option, Option), + pub as_of: Option, } #[derive(Clone, Debug)] @@ -328,8 +331,25 @@ impl SourceScanInfo { IcebergSplitEnumerator::new(*prop, SourceEnumeratorContext::default().into()) .await?; + let time_travel_info = match fetch_info.as_of { + Some(AsOf::VersionNum(v)) => Some(IcebergTimeTravelInfo::Version(v)), + Some(AsOf::TimestampNum(ts)) => { + Some(IcebergTimeTravelInfo::TimestampMs(ts * 1000)) + } + Some(AsOf::VersionString(_)) => { + bail!("Unsupported version string in iceberg time travel") + } + Some(AsOf::TimestampString(ts)) => Some( + speedate::DateTime::parse_str_rfc3339(&ts) + .map(|t| IcebergTimeTravelInfo::TimestampMs(t.timestamp_tz() * 1000)) + .map_err(|_e| anyhow!("fail to parse timestamp"))?, + ), + Some(AsOf::ProcessTime) => unreachable!(), + None => None, + }; + let split_info = iceberg_enumerator - .list_splits_batch(batch_parallelism) + .list_splits_batch(time_travel_info, batch_parallelism) .await? .into_iter() .map(SplitImpl::Iceberg) @@ -979,6 +999,7 @@ impl BatchPlanFragmenter { } if let Some(source_node) = node.as_batch_source() { + let source_node: &BatchSource = source_node; let source_catalog = source_node.source_catalog(); if let Some(source_catalog) = source_catalog { let property = ConnectorProperties::extract( @@ -986,9 +1007,11 @@ impl BatchPlanFragmenter { false, )?; let timestamp_bound = source_node.kafka_timestamp_range_value(); + let as_of = source_node.as_of(); return Ok(Some(SourceScanInfo::new(SourceFetchInfo { connector: property, timebound: timestamp_bound, + as_of, }))); } }