Skip to content

Commit

Permalink
update iceberg
Browse files Browse the repository at this point in the history
  • Loading branch information
ZENOTME committed Dec 30, 2024
1 parent 99e74d7 commit 381464f
Show file tree
Hide file tree
Showing 4 changed files with 73 additions and 56 deletions.
30 changes: 15 additions & 15 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 3 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -153,9 +153,9 @@ icelake = { git = "https://github.com/risingwavelabs/icelake.git", rev = "0ec44f
"prometheus",
] }
# branch dev-rebase-main-20241030
iceberg = { git = "https://github.com/risingwavelabs/iceberg-rust.git", rev = "2a97e6fe6eb9e29ebd801d011f2a5b23aa7e1d21" }
iceberg-catalog-rest = { git = "https://github.com/risingwavelabs/iceberg-rust.git", rev = "2a97e6fe6eb9e29ebd801d011f2a5b23aa7e1d21" }
iceberg-catalog-glue = { git = "https://github.com/risingwavelabs/iceberg-rust.git", rev = "2a97e6fe6eb9e29ebd801d011f2a5b23aa7e1d21" }
iceberg = { git = "https://github.com/risingwavelabs/iceberg-rust.git", rev = "667c173d4c1d8fed2f4ef046ffa0c80a6ecec6ef" }
iceberg-catalog-rest = { git = "https://github.com/risingwavelabs/iceberg-rust.git", rev = "667c173d4c1d8fed2f4ef046ffa0c80a6ecec6ef" }
iceberg-catalog-glue = { git = "https://github.com/risingwavelabs/iceberg-rust.git", rev = "667c173d4c1d8fed2f4ef046ffa0c80a6ecec6ef" }
opendal = "0.49"
# used only by arrow-udf-flight
arrow-flight = "53"
Expand Down
7 changes: 5 additions & 2 deletions src/batch/executors/src/executor/iceberg_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,8 @@ impl IcebergScanExecutor {
.build();
let file_scan_stream = tokio_stream::once(Ok(data_file_scan_task));

let mut record_batch_stream = reader.read(Box::pin(file_scan_stream))?.enumerate();
let mut record_batch_stream =
reader.read(Box::pin(file_scan_stream)).await?.enumerate();

while let Some((index, record_batch)) = record_batch_stream.next().await {
let record_batch = record_batch?;
Expand Down Expand Up @@ -321,7 +322,9 @@ impl PositionDeleteFilter {

let reader = table.reader_builder().with_batch_size(batch_size).build();

let mut record_batch_stream = reader.read(Box::pin(position_delete_file_scan_stream))?;
let mut record_batch_stream = reader
.read(Box::pin(position_delete_file_scan_stream))
.await?;

while let Some(record_batch) = record_batch_stream.next().await {
let record_batch = record_batch?;
Expand Down
86 changes: 50 additions & 36 deletions src/connector/src/sink/iceberg/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,17 @@ use std::sync::Arc;

use anyhow::{anyhow, Context};
use async_trait::async_trait;
use iceberg::arrow::schema_to_arrow_schema;
use iceberg::arrow::{arrow_schema_to_schema, schema_to_arrow_schema};
use iceberg::spec::{DataFile, SerializedDataFile};
use iceberg::table::Table;
use iceberg::transaction::Transaction;
use iceberg::writer::base_writer::data_file_writer::DataFileWriterBuilder;
use iceberg::writer::base_writer::equality_delete_writer::EqualityDeleteFileWriterBuilder;
use iceberg::writer::base_writer::sort_position_delete_writer::SortPositionDeleteWriterBuilder;
use iceberg::writer::base_writer::equality_delete_writer::{
EqualityDeleteFileWriterBuilder, EqualityDeleteWriterConfig,
};
use iceberg::writer::base_writer::sort_position_delete_writer::{
SortPositionDeleteWriterBuilder, POSITION_DELETE_SCHEMA,
};
use iceberg::writer::file_writer::location_generator::{
DefaultFileNameGenerator, DefaultLocationGenerator,
};
Expand Down Expand Up @@ -540,6 +544,7 @@ impl IcebergSinkWriter {

let parquet_writer_builder = ParquetWriterBuilder::new(
WriterProperties::new(),
schema.clone(),
table.file_io().clone(),
DefaultLocationGenerator::new(table.metadata().clone())
.map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
Expand All @@ -549,8 +554,7 @@ impl IcebergSinkWriter {
iceberg::spec::DataFileFormat::Parquet,
),
);
let data_file_builder =
DataFileWriterBuilder::new(schema.clone(), parquet_writer_builder, None);
let data_file_builder = DataFileWriterBuilder::new(parquet_writer_builder, None);
if let Some(_extra_partition_col_idx) = extra_partition_col_idx {
Err(SinkError::Iceberg(anyhow!(
"Extra partition column is not supported in append-only mode"
Expand Down Expand Up @@ -586,8 +590,12 @@ impl IcebergSinkWriter {
})
} else {
let partition_builder = MonitoredGeneralWriterBuilder::new(
FanoutPartitionWriterBuilder::new(data_file_builder, partition_spec.clone())
.map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
FanoutPartitionWriterBuilder::new(
data_file_builder,
partition_spec.clone(),
schema.clone(),
)
.map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
write_qps.clone(),
write_latency.clone(),
);
Expand Down Expand Up @@ -662,6 +670,7 @@ impl IcebergSinkWriter {
let data_file_builder = {
let parquet_writer_builder = ParquetWriterBuilder::new(
WriterProperties::new(),
schema.clone(),
table.file_io().clone(),
DefaultLocationGenerator::new(table.metadata().clone())
.map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
Expand All @@ -671,11 +680,12 @@ impl IcebergSinkWriter {
iceberg::spec::DataFileFormat::Parquet,
),
);
DataFileWriterBuilder::new(schema.clone(), parquet_writer_builder.clone(), None)
DataFileWriterBuilder::new(parquet_writer_builder.clone(), None)
};
let position_delete_builder = {
let parquet_writer_builder = ParquetWriterBuilder::new(
WriterProperties::new(),
POSITION_DELETE_SCHEMA.clone(),
table.file_io().clone(),
DefaultLocationGenerator::new(table.metadata().clone())
.map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
Expand All @@ -691,8 +701,18 @@ impl IcebergSinkWriter {
)
};
let equality_delete_builder = {
let config = EqualityDeleteWriterConfig::new(
unique_column_ids.clone(),
table.metadata().current_schema().clone(),
None,
)
.map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
let parquet_writer_builder = ParquetWriterBuilder::new(
WriterProperties::new(),
Arc::new(
arrow_schema_to_schema(config.projected_arrow_schema_ref())
.map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
),
table.file_io().clone(),
DefaultLocationGenerator::new(table.metadata().clone())
.map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
Expand All @@ -703,13 +723,7 @@ impl IcebergSinkWriter {
),
);

EqualityDeleteFileWriterBuilder::new(
parquet_writer_builder.clone(),
unique_column_ids.clone(),
table.metadata().current_schema().clone(),
None,
)
.map_err(|err| SinkError::Iceberg(anyhow!(err)))?
EqualityDeleteFileWriterBuilder::new(parquet_writer_builder.clone(), config)
};
let delta_builder = EqualityDeltaWriterBuilder::new(
data_file_builder,
Expand Down Expand Up @@ -762,19 +776,6 @@ impl IcebergSinkWriter {
},
})
} else {
let partition_builder = MonitoredGeneralWriterBuilder::new(
FanoutPartitionWriterBuilder::new(delta_builder, partition_spec.clone())
.map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
write_qps.clone(),
write_latency.clone(),
);
let inner_writer = Some(Box::new(
partition_builder
.clone()
.build()
.await
.map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
) as Box<dyn IcebergWriter>);
let original_arrow_schema = Arc::new(
schema_to_arrow_schema(table.metadata().current_schema())
.map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
Expand All @@ -788,6 +789,23 @@ impl IcebergSinkWriter {
)));
Arc::new(ArrowSchema::new(new_fields))
};
let partition_builder = MonitoredGeneralWriterBuilder::new(
FanoutPartitionWriterBuilder::new_with_custom_schema(
delta_builder,
schema_with_extra_op_column.clone(),
partition_spec.clone(),
table.metadata().current_schema().clone(),
),
write_qps.clone(),
write_latency.clone(),
);
let inner_writer = Some(Box::new(
partition_builder
.clone()
.build()
.await
.map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
) as Box<dyn IcebergWriter>);
Ok(Self {
arrow_schema: original_arrow_schema,
metrics: IcebergWriterMetrics {
Expand Down Expand Up @@ -1020,11 +1038,7 @@ impl SinkWriter for IcebergSinkWriter {
match close_result {
Some(Ok(result)) => {
let version = self.table.metadata().format_version() as u8;
let partition_type = self
.table
.metadata()
.default_partition_spec()
.partition_type();
let partition_type = self.table.metadata().default_partition_type();
let data_files = result
.into_iter()
.map(|f| {
Expand Down Expand Up @@ -1216,17 +1230,17 @@ impl SinkCommitCoordinator for IcebergSinkCommitter {
write_results[0].partition_spec_id
)));
};
let bound_partition_spec = partition_spec
let partition_type = partition_spec
.as_ref()
.clone()
.bind(schema.clone())
.partition_type(schema)
.map_err(|err| SinkError::Iceberg(anyhow!(err)))?;

let data_files = write_results
.into_iter()
.flat_map(|r| {
r.data_files.into_iter().map(|f| {
f.try_into(bound_partition_spec.partition_type(), schema)
f.try_into(&partition_type, schema)
.map_err(|err| SinkError::Iceberg(anyhow!(err)))
})
})
Expand Down

0 comments on commit 381464f

Please sign in to comment.