Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(batch): support batch read iceberg source #15214

Merged
merged 30 commits into from
Feb 27, 2024
Merged
Show file tree
Hide file tree
Changes from 28 commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
7888b8f
support iceberg scan executor
chenzl25 Feb 1, 2024
5e4a47a
refactor
chenzl25 Feb 1, 2024
11fdfcd
Merge branch 'main' into dylan/support_iceberg_scan_executor
chenzl25 Feb 1, 2024
8f0681b
fmt
chenzl25 Feb 1, 2024
52de44c
fix doc
chenzl25 Feb 1, 2024
898edaa
Merge remote-tracking branch 'origin/main' into dylan/support_create_…
chenzl25 Feb 1, 2024
f86f8ee
Merge remote-tracking branch 'origin/main' into dylan/support_create_…
chenzl25 Feb 1, 2024
d3a6609
Merge remote-tracking branch 'origin/main' into dylan/support_create_…
chenzl25 Feb 2, 2024
b5423ea
support create iceberg source
chenzl25 Feb 2, 2024
d3e1d48
support batch read iceberg source
chenzl25 Feb 4, 2024
aa41d6a
Merge remote-tracking branch 'origin/main' into dylan/support_batch_r…
chenzl25 Feb 4, 2024
1cada1c
Merge remote-tracking branch 'origin/main' into dylan/support_create_…
chenzl25 Feb 4, 2024
b60fe1a
introduce none format
chenzl25 Feb 5, 2024
72d7787
resolve conflict
chenzl25 Feb 5, 2024
6a61245
fix
chenzl25 Feb 5, 2024
5182d50
fix test
chenzl25 Feb 6, 2024
db5dcd4
fix test
chenzl25 Feb 6, 2024
aaada21
improve
chenzl25 Feb 6, 2024
ea78079
refactor
chenzl25 Feb 7, 2024
7ea7e0d
resolve conflict
chenzl25 Feb 7, 2024
379867b
resolve conflicts
chenzl25 Feb 23, 2024
f9d74fa
split iceberg files based on batch parallelism
chenzl25 Feb 23, 2024
fd3137c
resolve conflicts
chenzl25 Feb 23, 2024
b043292
fix
chenzl25 Feb 23, 2024
efa4c1b
fix test
chenzl25 Feb 26, 2024
43951bd
comment
chenzl25 Feb 26, 2024
cb9db2f
refine
chenzl25 Feb 26, 2024
eff76ef
Merge branch 'main' into dylan/support_batch_read_iceberg_source
chenzl25 Feb 26, 2024
b6d167d
fix
chenzl25 Feb 26, 2024
5290bec
resolve conflicts
chenzl25 Feb 27, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 42 additions & 21 deletions src/batch/src/executor/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use risingwave_common::array::{DataChunk, Op, StreamChunk};
use risingwave_common::catalog::{ColumnDesc, ColumnId, Field, Schema, TableId};
use risingwave_common::types::DataType;
use risingwave_connector::parser::SpecificParserConfig;
use risingwave_connector::source::iceberg::{IcebergProperties, IcebergSplit};
use risingwave_connector::source::monitor::SourceMetrics;
use risingwave_connector::source::reader::reader::SourceReader;
use risingwave_connector::source::{
Expand All @@ -30,7 +31,9 @@ use risingwave_pb::batch_plan::plan_node::NodeBody;

use super::Executor;
use crate::error::{BatchError, Result};
use crate::executor::{BoxedExecutor, BoxedExecutorBuilder, ExecutorBuilder};
use crate::executor::{
BoxedExecutor, BoxedExecutorBuilder, ExecutorBuilder, FileSelector, IcebergScanExecutor,
};
use crate::task::BatchTaskContext;

pub struct SourceExecutor {
Expand Down Expand Up @@ -75,16 +78,6 @@ impl BoxedExecutorBuilder for SourceExecutor {
.map(|c| SourceColumnDesc::from(&ColumnDesc::from(c.column_desc.as_ref().unwrap())))
.collect();

let source_reader = SourceReader {
config,
columns,
parser_config,
connector_message_buffer_size: source
.context()
.get_config()
.developer
.connector_message_buffer_size,
};
let source_ctrl_opts = SourceCtrlOpts {
chunk_size: source.context().get_config().developer.chunk_size,
rate_limit: None,
Expand All @@ -110,16 +103,44 @@ impl BoxedExecutorBuilder for SourceExecutor {
.collect();
let schema = Schema::new(fields);

Ok(Box::new(SourceExecutor {
source: source_reader,
column_ids,
metrics: source.context().source_metrics(),
source_id: TableId::new(source_node.source_id),
split,
schema,
identity: source.plan_node().get_identity().clone(),
source_ctrl_opts,
}))
if let ConnectorProperties::Iceberg(iceberg_properties) = config {
let iceberg_properties: IcebergProperties = *iceberg_properties;
if let SplitImpl::Iceberg(split) = split {
let split: IcebergSplit = split;
Ok(Box::new(IcebergScanExecutor::new(
iceberg_properties.to_iceberg_config(),
Some(split.snapshot_id),
FileSelector::FileList(split.files),
source.context.get_config().developer.chunk_size,
schema,
source.plan_node().get_identity().clone(),
)))
} else {
unreachable!()
}
} else {
let source_reader = SourceReader {
config,
columns,
parser_config,
connector_message_buffer_size: source
.context()
.get_config()
.developer
.connector_message_buffer_size,
};

Ok(Box::new(SourceExecutor {
source: source_reader,
column_ids,
metrics: source.context().source_metrics(),
source_id: TableId::new(source_node.source_id),
split,
schema,
identity: source.plan_node().get_identity().clone(),
source_ctrl_opts,
}))
}
}
}

Expand Down
6 changes: 6 additions & 0 deletions src/connector/src/parser/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -974,6 +974,8 @@ pub enum EncodingProperties {
Json(JsonProperties),
Bytes(BytesProperties),
Native,
/// Encoding can't be specified because the source will determines it. Now only used in Iceberg.
None,
chenzl25 marked this conversation as resolved.
Show resolved Hide resolved
#[default]
Unspecified,
}
Expand All @@ -987,6 +989,8 @@ pub enum ProtocolProperties {
Plain,
Upsert,
Native,
/// Protocol can't be specified because the source will determines it. Now only used in Iceberg.
None,
#[default]
Unspecified,
}
Expand All @@ -1004,6 +1008,7 @@ impl SpecificParserConfig {
// in the future
let protocol_config = match format {
SourceFormat::Native => ProtocolProperties::Native,
SourceFormat::None => ProtocolProperties::None,
SourceFormat::Debezium => ProtocolProperties::Debezium,
SourceFormat::DebeziumMongo => ProtocolProperties::DebeziumMongo,
SourceFormat::Maxwell => ProtocolProperties::Maxwell,
Expand Down Expand Up @@ -1114,6 +1119,7 @@ impl SpecificParserConfig {
EncodingProperties::Bytes(BytesProperties { column_name: None })
}
(SourceFormat::Native, SourceEncode::Native) => EncodingProperties::Native,
(SourceFormat::None, SourceEncode::None) => EncodingProperties::None,
(format, encode) => {
bail!("Unsupported format {:?} encode {:?}", format, encode);
}
Expand Down
3 changes: 3 additions & 0 deletions src/connector/src/source/base.rs
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,7 @@ pub enum SourceFormat {
#[default]
Invalid,
Native,
None,
Debezium,
DebeziumMongo,
Maxwell,
Expand All @@ -274,6 +275,7 @@ pub enum SourceEncode {
#[default]
Invalid,
Native,
None,
Avro,
Csv,
Protobuf,
Expand Down Expand Up @@ -334,6 +336,7 @@ pub fn extract_source_struct(info: &PbStreamSourceInfo) -> Result<SourceStruct>
(PbFormatType::Native, PbEncodeType::Native) => {
(SourceFormat::Native, SourceEncode::Native)
}
(PbFormatType::None, PbEncodeType::None) => (SourceFormat::None, SourceEncode::None),
(PbFormatType::Debezium, PbEncodeType::Avro) => {
(SourceFormat::Debezium, SourceEncode::Avro)
}
Expand Down
89 changes: 81 additions & 8 deletions src/connector/src/source/iceberg/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,16 @@

use std::collections::HashMap;

use anyhow::anyhow;
use async_trait::async_trait;
use itertools::Itertools;
use risingwave_common::bail;
use risingwave_common::types::JsonbVal;
use serde::{Deserialize, Serialize};

use crate::error::ConnectorResult;
use crate::parser::ParserConfig;
use crate::sink::iceberg::IcebergConfig;
use crate::source::{
BoxChunkSourceStream, Column, SourceContextRef, SourceEnumeratorContextRef, SourceProperties,
SplitEnumerator, SplitId, SplitMetaData, SplitReader, UnknownFields,
Expand Down Expand Up @@ -50,6 +54,22 @@ pub struct IcebergProperties {
pub unknown_fields: HashMap<String, String>,
}

impl IcebergProperties {
pub fn to_iceberg_config(&self) -> IcebergConfig {
IcebergConfig {
database_name: self.database_name.clone(),
table_name: self.table_name.clone(),
catalog_type: Some(self.catalog_type.clone()),
path: self.warehouse_path.clone(),
endpoint: Some(self.endpoint.clone()),
access_key: self.s3_access.clone(),
secret_key: self.s3_secret.clone(),
region: Some(self.region_name.clone()),
..Default::default()
}
}
}

impl SourceProperties for IcebergProperties {
type Split = IcebergSplit;
type SplitEnumerator = IcebergSplitEnumerator;
Expand All @@ -65,19 +85,23 @@ impl UnknownFields for IcebergProperties {
}

#[derive(Debug, Clone, Eq, PartialEq, Hash, Serialize, Deserialize)]
pub struct IcebergSplit {}
pub struct IcebergSplit {
pub split_id: i64,
pub snapshot_id: i64,
pub files: Vec<String>,
}

impl SplitMetaData for IcebergSplit {
fn id(&self) -> SplitId {
unimplemented!()
self.split_id.to_string().into()
}

fn restore_from_json(_value: JsonbVal) -> ConnectorResult<Self> {
unimplemented!()
fn restore_from_json(value: JsonbVal) -> ConnectorResult<Self> {
serde_json::from_value(value.take()).map_err(|e| anyhow!(e).into())
}

fn encode_to_json(&self) -> JsonbVal {
unimplemented!()
serde_json::to_value(self.clone()).unwrap().into()
}

fn update_with_offset(&mut self, _start_offset: String) -> ConnectorResult<()> {
Expand All @@ -86,25 +110,74 @@ impl SplitMetaData for IcebergSplit {
}

#[derive(Debug, Clone)]
pub struct IcebergSplitEnumerator {}
pub struct IcebergSplitEnumerator {
config: IcebergConfig,
}

#[async_trait]
impl SplitEnumerator for IcebergSplitEnumerator {
type Properties = IcebergProperties;
type Split = IcebergSplit;

async fn new(
_properties: Self::Properties,
properties: Self::Properties,
_context: SourceEnumeratorContextRef,
) -> ConnectorResult<Self> {
Ok(Self {})
let iceberg_config = properties.to_iceberg_config();
Ok(Self {
config: iceberg_config,
})
}

async fn list_splits(&mut self) -> ConnectorResult<Vec<Self::Split>> {
// Iceberg source does not support streaming queries
Ok(vec![])
chenzl25 marked this conversation as resolved.
Show resolved Hide resolved
}
}

impl IcebergSplitEnumerator {
pub async fn list_splits_batch(
&self,
batch_parallelism: usize,
) -> ConnectorResult<Vec<IcebergSplit>> {
let table = self.config.load_table().await?;
let snapshot_id = table.current_table_metadata().current_snapshot_id.unwrap();
chenzl25 marked this conversation as resolved.
Show resolved Hide resolved
let files = table
.current_data_files()
.await?
.into_iter()
.map(|f| f.file_path)
chenzl25 marked this conversation as resolved.
Show resolved Hide resolved
.collect_vec();
if batch_parallelism == 0 {
chenzl25 marked this conversation as resolved.
Show resolved Hide resolved
bail!("Batch parallelism is 0. Cannot split the iceberg files.")
}
let split_num = batch_parallelism;
// evenly split the files into splits based on the parallelism.
let split_size = files.len() / split_num;
let remaining = files.len() % split_num;
let mut splits = vec![];
for i in 0..split_num {
let start = i * split_size;
let end = (i + 1) * split_size;
let split = IcebergSplit {
split_id: i as i64,
snapshot_id,
files: files[start..end].to_vec(),
};
splits.push(split);
}
for i in 0..remaining {
splits[i]
.files
.push(files[split_num * split_size + i].clone());
}
Ok(splits
.into_iter()
.filter(|split| !split.files.is_empty())
.collect_vec())
}
}

#[derive(Debug)]
pub struct IcebergFileReader {}

Expand Down
3 changes: 2 additions & 1 deletion src/frontend/planner_test/tests/testdata/output/explain.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,8 @@
},
"parent_edges": {
"0": []
}
},
"batch_parallelism": 0
}
- sql: |
create table t1(v1 int);
Expand Down
13 changes: 1 addition & 12 deletions src/frontend/src/handler/create_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ use risingwave_connector::parser::{
use risingwave_connector::schema::schema_registry::{
name_strategy_from_str, SchemaRegistryAuth, SCHEMA_REGISTRY_PASSWORD, SCHEMA_REGISTRY_USERNAME,
};
use risingwave_connector::sink::iceberg::IcebergConfig;
use risingwave_connector::source::cdc::external::CdcTableType;
use risingwave_connector::source::cdc::{
CDC_SHARING_MODE_KEY, CDC_SNAPSHOT_BACKFILL, CDC_SNAPSHOT_MODE_KEY, CDC_TRANSACTIONAL_KEY,
Expand Down Expand Up @@ -1154,17 +1153,7 @@ pub async fn check_iceberg_source(
)));
};

let iceberg_config = IcebergConfig {
database_name: properties.database_name,
table_name: properties.table_name,
catalog_type: Some(properties.catalog_type),
path: properties.warehouse_path,
endpoint: Some(properties.endpoint),
access_key: properties.s3_access,
secret_key: properties.s3_secret,
region: Some(properties.region_name),
..Default::default()
};
let iceberg_config = properties.to_iceberg_config();

let schema = Schema {
fields: columns
Expand Down
Loading
Loading