Skip to content

Commit

Permalink
feat(batch): support time travel for iceberg source (#15866)
Browse files Browse the repository at this point in the history
  • Loading branch information
chenzl25 authored Mar 26, 2024
1 parent 46a6304 commit 3607db5
Show file tree
Hide file tree
Showing 9 changed files with 110 additions and 13 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 3 additions & 1 deletion integration_tests/iceberg-source/python/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,9 @@ def check_risingwave_iceberg_source(docker):
config = read_config(f"{docker.case_dir()}/config.ini")

sqls = [
"select count(*) from iceberg_source"
"select count(*) from iceberg_source",
"select count(*) from iceberg_source for system_time as of '2100-01-01 00:00:00+00:00'",
"select count(*) from iceberg_source for system_time as of 4102444800"
]

rw_config = config['risingwave']
Expand Down
44 changes: 43 additions & 1 deletion src/connector/src/source/iceberg/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -157,16 +157,58 @@ impl SplitEnumerator for IcebergSplitEnumerator {
}
}

pub enum IcebergTimeTravelInfo {
Version(i64),
TimestampMs(i64),
}

impl IcebergSplitEnumerator {
pub async fn list_splits_batch(
&self,
time_traval_info: Option<IcebergTimeTravelInfo>,
batch_parallelism: usize,
) -> ConnectorResult<Vec<IcebergSplit>> {
if batch_parallelism == 0 {
bail!("Batch parallelism is 0. Cannot split the iceberg files.");
}
let table = self.config.load_table().await?;
let snapshot_id = table.current_table_metadata().current_snapshot_id.unwrap();
let snapshot_id = match time_traval_info {
Some(IcebergTimeTravelInfo::Version(version)) => {
let Some(snapshot) = table.current_table_metadata().snapshot(version) else {
bail!("Cannot find the snapshot id in the iceberg table.");
};
snapshot.snapshot_id
}
Some(IcebergTimeTravelInfo::TimestampMs(timestamp)) => {
match &table.current_table_metadata().snapshots {
Some(snapshots) => {
let snapshot = snapshots
.iter()
.filter(|snapshot| snapshot.timestamp_ms <= timestamp)
.max_by_key(|snapshot| snapshot.timestamp_ms);
match snapshot {
Some(snapshot) => snapshot.snapshot_id,
None => {
// convert unix time to human readable time
let time = chrono::NaiveDateTime::from_timestamp_millis(timestamp);
if time.is_some() {
bail!("Cannot find a snapshot older than {}", time.unwrap());
} else {
bail!("Cannot find a snapshot");
}
}
}
}
None => {
bail!("Cannot find the snapshots in the iceberg table.");
}
}
}
None => match table.current_table_metadata().current_snapshot_id {
Some(snapshot_id) => snapshot_id,
None => bail!("Cannot find the current snapshot id in the iceberg table."),
},
};
let mut files = vec![];
for file in table.current_data_files().await? {
if file.content != DataContentType::Data {
Expand Down
1 change: 1 addition & 0 deletions src/frontend/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ serde = { version = "1", features = ["derive"] }
serde_json = "1"
sha2 = "0.10.7"
smallvec = { version = "1.13.1", features = ["serde"] }
speedate = "0.13.0"
tempfile = "3"
thiserror = "1"
thiserror-ext = { workspace = true }
Expand Down
10 changes: 9 additions & 1 deletion src/frontend/src/optimizer/plan_node/batch_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use std::rc::Rc;
use pretty_xmlish::{Pretty, XmlNode};
use risingwave_pb::batch_plan::plan_node::NodeBody;
use risingwave_pb::batch_plan::SourceNode;
use risingwave_sqlparser::ast::AsOf;

use super::batch::prelude::*;
use super::utils::{childless_record, column_names_pretty, Distill};
Expand Down Expand Up @@ -59,6 +60,10 @@ impl BatchSource {
self.core.kafka_timestamp_range_value()
}

pub fn as_of(&self) -> Option<AsOf> {
self.core.as_of.clone()
}

pub fn clone_with_dist(&self) -> Self {
let base = self
.base
Expand All @@ -75,11 +80,14 @@ impl_plan_tree_node_for_leaf! { BatchSource }
impl Distill for BatchSource {
fn distill<'a>(&self) -> XmlNode<'a> {
let src = Pretty::from(self.source_catalog().unwrap().name.clone());
let fields = vec![
let mut fields = vec![
("source", src),
("columns", column_names_pretty(self.schema())),
("filter", Pretty::debug(&self.kafka_timestamp_range_value())),
];
if let Some(as_of) = &self.core.as_of {
fields.push(("as_of", Pretty::debug(as_of)));
}
childless_record("BatchSource", fields)
}
}
Expand Down
11 changes: 11 additions & 0 deletions src/frontend/src/optimizer/plan_node/generic/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,17 @@ impl Source {
.is_some_and(|catalog| catalog.with_properties.is_new_fs_connector())
}

pub fn is_iceberg_connector(&self) -> bool {
self.catalog
.as_ref()
.is_some_and(|catalog| catalog.with_properties.is_iceberg_connector())
}

/// Currently, only iceberg source supports time travel.
pub fn support_time_travel(&self) -> bool {
self.is_iceberg_connector()
}

/// The columns in stream/batch source node indicate the actual columns it will produce,
/// instead of the columns defined in source catalog. The difference is generated columns.
pub fn exclude_generated_columns(mut self) -> (Self, Option<usize>) {
Expand Down
13 changes: 11 additions & 2 deletions src/frontend/src/optimizer/plan_node/logical_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use std::rc::Rc;

use fixedbitset::FixedBitSet;
use pretty_xmlish::{Pretty, XmlNode};
use risingwave_common::bail;
use risingwave_common::catalog::{
ColumnCatalog, ColumnDesc, Field, Schema, KAFKA_TIMESTAMP_COLUMN_NAME,
};
Expand Down Expand Up @@ -85,6 +86,10 @@ impl LogicalSource {
as_of,
};

if core.as_of.is_some() && !core.support_time_travel() {
bail!("Time travel is not supported for the source")
}

let base = PlanBase::new_logical_with_core(&core);

let output_exprs = Self::derive_output_exprs_from_generated_columns(&core.column_catalog)?;
Expand Down Expand Up @@ -262,11 +267,15 @@ impl Distill for LogicalSource {
let fields = if let Some(catalog) = self.source_catalog() {
let src = Pretty::from(catalog.name.clone());
let time = Pretty::debug(&self.core.kafka_timestamp_range);
vec![
let mut fields = vec![
("source", src),
("columns", column_names_pretty(self.schema())),
("time_range", time),
]
];
if let Some(as_of) = &self.core.as_of {
fields.push(("as_of", Pretty::debug(as_of)));
}
fields
} else {
vec![]
};
Expand Down
10 changes: 5 additions & 5 deletions src/frontend/src/planner/relation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,14 +104,14 @@ impl Planner {
} else {
let as_of = source.as_of.clone();
match as_of {
None => {}
None
| Some(AsOf::VersionNum(_))
| Some(AsOf::TimestampString(_))
| Some(AsOf::TimestampNum(_)) => {}
Some(AsOf::ProcessTime) => {
bail_not_implemented!("As Of ProcessTime() is not supported yet.")
}
Some(AsOf::TimestampString(_)) | Some(AsOf::TimestampNum(_)) => {
bail_not_implemented!("As Of Timestamp is not supported yet.")
}
Some(AsOf::VersionNum(_)) | Some(AsOf::VersionString(_)) => {
Some(AsOf::VersionString(_)) => {
bail_not_implemented!("As Of Version is not supported yet.")
}
}
Expand Down
29 changes: 26 additions & 3 deletions src/frontend/src/scheduler/plan_fragmenter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,15 @@ use enum_as_inner::EnumAsInner;
use futures::TryStreamExt;
use itertools::Itertools;
use pgwire::pg_server::SessionId;
use risingwave_common::bail;
use risingwave_common::buffer::{Bitmap, BitmapBuilder};
use risingwave_common::catalog::TableDesc;
use risingwave_common::hash::table_distribution::TableDistribution;
use risingwave_common::hash::{ParallelUnitId, ParallelUnitMapping, VirtualNode};
use risingwave_common::util::scan_range::ScanRange;
use risingwave_connector::source::filesystem::opendal_source::opendal_enumerator::OpendalEnumerator;
use risingwave_connector::source::filesystem::opendal_source::{OpendalGcs, OpendalS3};
use risingwave_connector::source::iceberg::IcebergSplitEnumerator;
use risingwave_connector::source::iceberg::{IcebergSplitEnumerator, IcebergTimeTravelInfo};
use risingwave_connector::source::kafka::KafkaSplitEnumerator;
use risingwave_connector::source::reader::reader::build_opendal_fs_list_for_batch;
use risingwave_connector::source::{
Expand All @@ -41,6 +42,7 @@ use risingwave_pb::batch_plan::plan_node::NodeBody;
use risingwave_pb::batch_plan::{ExchangeInfo, ScanRange as ScanRangeProto};
use risingwave_pb::common::Buffer;
use risingwave_pb::plan_common::Field as FieldPb;
use risingwave_sqlparser::ast::AsOf;
use serde::ser::SerializeStruct;
use serde::Serialize;
use uuid::Uuid;
Expand All @@ -50,7 +52,7 @@ use crate::catalog::catalog_service::CatalogReader;
use crate::catalog::TableId;
use crate::error::RwError;
use crate::optimizer::plan_node::generic::{GenericPlanRef, PhysicalPlanRef};
use crate::optimizer::plan_node::{PlanNodeId, PlanNodeType};
use crate::optimizer::plan_node::{BatchSource, PlanNodeId, PlanNodeType};
use crate::optimizer::property::Distribution;
use crate::optimizer::PlanRef;
use crate::scheduler::worker_node_manager::WorkerNodeSelector;
Expand Down Expand Up @@ -266,6 +268,7 @@ impl Query {
pub struct SourceFetchInfo {
pub connector: ConnectorProperties,
pub timebound: (Option<i64>, Option<i64>),
pub as_of: Option<AsOf>,
}

#[derive(Clone, Debug)]
Expand Down Expand Up @@ -328,8 +331,25 @@ impl SourceScanInfo {
IcebergSplitEnumerator::new(*prop, SourceEnumeratorContext::default().into())
.await?;

let time_travel_info = match fetch_info.as_of {
Some(AsOf::VersionNum(v)) => Some(IcebergTimeTravelInfo::Version(v)),
Some(AsOf::TimestampNum(ts)) => {
Some(IcebergTimeTravelInfo::TimestampMs(ts * 1000))
}
Some(AsOf::VersionString(_)) => {
bail!("Unsupported version string in iceberg time travel")
}
Some(AsOf::TimestampString(ts)) => Some(
speedate::DateTime::parse_str_rfc3339(&ts)
.map(|t| IcebergTimeTravelInfo::TimestampMs(t.timestamp_tz() * 1000))
.map_err(|_e| anyhow!("fail to parse timestamp"))?,
),
Some(AsOf::ProcessTime) => unreachable!(),
None => None,
};

let split_info = iceberg_enumerator
.list_splits_batch(batch_parallelism)
.list_splits_batch(time_travel_info, batch_parallelism)
.await?
.into_iter()
.map(SplitImpl::Iceberg)
Expand Down Expand Up @@ -979,16 +999,19 @@ impl BatchPlanFragmenter {
}

if let Some(source_node) = node.as_batch_source() {
let source_node: &BatchSource = source_node;
let source_catalog = source_node.source_catalog();
if let Some(source_catalog) = source_catalog {
let property = ConnectorProperties::extract(
source_catalog.with_properties.clone().into_iter().collect(),
false,
)?;
let timestamp_bound = source_node.kafka_timestamp_range_value();
let as_of = source_node.as_of();
return Ok(Some(SourceScanInfo::new(SourceFetchInfo {
connector: property,
timebound: timestamp_bound,
as_of,
})));
}
}
Expand Down

0 comments on commit 3607db5

Please sign in to comment.