Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(iceberg): support eq delete merge on read for iceberg #18448

Merged
merged 27 commits into from
Sep 13, 2024
Merged
Show file tree
Hide file tree
Changes from 19 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
171 changes: 63 additions & 108 deletions Cargo.lock

Large diffs are not rendered by default.

7 changes: 3 additions & 4 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -143,9 +143,9 @@ arrow-array-iceberg = { package = "arrow-array", version = "52" }
arrow-schema-iceberg = { package = "arrow-schema", version = "52" }
arrow-buffer-iceberg = { package = "arrow-buffer", version = "52" }
arrow-cast-iceberg = { package = "arrow-cast", version = "52" }
iceberg = "0.3.0"
iceberg-catalog-rest = "0.3.0"
iceberg-catalog-glue = "0.3.0"
iceberg = { git = "https://github.com/risingwavelabs/iceberg-rust.git", rev = "bf97b9de51c5c86394e4df7aaff82cc3ed8bc928" }
xxhZs marked this conversation as resolved.
Show resolved Hide resolved
iceberg-catalog-rest = { git = "https://github.com/risingwavelabs/iceberg-rust.git", rev = "bf97b9de51c5c86394e4df7aaff82cc3ed8bc928" }
iceberg-catalog-glue = { git = "https://github.com/risingwavelabs/iceberg-rust.git", rev = "bf97b9de51c5c86394e4df7aaff82cc3ed8bc928" }
opendal = "0.47"
arrow-array = "50"
arrow-arith = "50"
Expand Down Expand Up @@ -343,7 +343,6 @@ opt-level = 2

[patch.crates-io]
# Patch third-party crates for deterministic simulation.
quanta = { git = "https://github.com/madsim-rs/quanta.git", rev = "948bdc3" }
getrandom = { git = "https://github.com/madsim-rs/getrandom.git", rev = "e79a7ae" }
# Don't patch `tokio-stream`, but only use the madsim version for **direct** dependencies.
# Imagine an unpatched dependency depends on the original `tokio` and the patched `tokio-stream`.
Expand Down
1 change: 1 addition & 0 deletions ci/scripts/e2e-iceberg-sink-v2-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ poetry run python main.py -t ./test_case/range_partition_append_only.toml
poetry run python main.py -t ./test_case/range_partition_upsert.toml
poetry run python main.py -t ./test_case/append_only_with_checkpoint_interval.toml
poetry run python main.py -t ./test_case/iceberg_select_empty_table.toml
poetry run python main.py -t ./test_case/iceberg_source_eq_delete.toml


echo "--- Kill cluster"
Expand Down
108 changes: 108 additions & 0 deletions e2e_test/iceberg/test_case/iceberg_source_eq_delete.slt
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
statement ok
set sink_decouple = false;

statement ok
set streaming_parallelism=4;

statement ok
CREATE TABLE s1 (i1 int, i2 varchar, i3 varchar);

statement ok
CREATE MATERIALIZED VIEW mv1 AS SELECT * FROM s1;

statement ok
CREATE SINK sink1 AS select * from mv1 WITH (
connector = 'iceberg',
type = 'upsert',
database.name = 'demo_db',
table.name = 't1',
catalog.name = 'demo',
catalog.type = 'storage',
warehouse.path = 's3a://icebergdata/demo',
s3.endpoint = 'http://127.0.0.1:9301',
s3.region = 'us-east-1',
s3.access.key = 'hummockadmin',
s3.secret.key = 'hummockadmin',
create_table_if_not_exists = 'true',
primary_key = 'i1,i2',
);

statement ok
CREATE SOURCE iceberg_t1_source
WITH (
connector = 'iceberg',
s3.endpoint = 'http://127.0.0.1:9301',
s3.region = 'us-east-1',
s3.access.key = 'hummockadmin',
s3.secret.key = 'hummockadmin',
catalog.type = 'storage',
warehouse.path = 's3a://icebergdata/demo',
database.name = 'demo_db',
table.name = 't1',
);

statement ok
insert into s1 values(1,'2','3');

statement ok
insert into s1 values(4,'5','6');

statement ok
insert into s1 values(7,'8','9');

statement ok
delete from s1 where i1 = 7;

statement ok
flush;

query I
select * from iceberg_t1_source order by i1;
----
1,'2','3'
4,'5','6'

query I
select i1,i2,i3 from iceberg_t1_source order by v1;
----
1,'2','3'
4,'5','6'

query I
select i3,i2 from iceberg_t1_source order by i2;
----
'3','2'
'6','5'

query I
select i2,i1 from iceberg_t1_source order by i1;
----
'2',1
'5',4

query I
select i1 from iceberg_t1_source order by i1;
----
1
4

query I
select i2 from iceberg_t1_source order by i2;
----
'2'
'5'

query I
select i3 from iceberg_t1_source order by i3;
----
'3'
'6'

statement ok
DROP SINK sink1;

statement ok
DROP SOURCE iceberg_t1_source;

statement ok
DROP TABLE s1 cascade;
11 changes: 11 additions & 0 deletions e2e_test/iceberg/test_case/iceberg_source_eq_delete.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
init_sqls = [
'CREATE SCHEMA IF NOT EXISTS demo_db',
'DROP TABLE IF EXISTS demo_db.t1',
]

slt = 'test_case/iceberg_source_eq_delete.slt'

drop_sqls = [
'DROP TABLE IF EXISTS demo_db.t1',
'DROP SCHEMA IF EXISTS demo_db',
]
165 changes: 139 additions & 26 deletions src/batch/src/executor/iceberg_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::HashMap;
use std::mem;

use futures_async_stream::try_stream;
Expand All @@ -20,8 +21,11 @@ use iceberg::scan::FileScanTask;
use iceberg::spec::TableMetadata;
use itertools::Itertools;
use risingwave_common::array::arrow::IcebergArrowConvert;
use risingwave_common::bitmap::Bitmap;
use risingwave_common::catalog::{Field, Schema};
use risingwave_common::row::{OwnedRow, Row};
use risingwave_common::types::DataType;
use risingwave_common::util::iter_util::ZipEqFast;
use risingwave_connector::sink::iceberg::IcebergConfig;
use risingwave_connector::source::iceberg::{IcebergProperties, IcebergSplit};
use risingwave_connector::source::{ConnectorProperties, SplitImpl, SplitMetaData};
Expand All @@ -38,7 +42,8 @@ pub struct IcebergScanExecutor {
#[allow(dead_code)]
snapshot_id: Option<i64>,
table_meta: TableMetadata,
file_scan_tasks: Vec<FileScanTask>,
data_file_scan_tasks: Vec<FileScanTask>,
eq_delete_file_scan_tasks: Vec<FileScanTask>,
batch_size: usize,
schema: Schema,
identity: String,
Expand All @@ -63,7 +68,8 @@ impl IcebergScanExecutor {
iceberg_config: IcebergConfig,
snapshot_id: Option<i64>,
table_meta: TableMetadata,
file_scan_tasks: Vec<FileScanTask>,
data_file_scan_tasks: Vec<FileScanTask>,
eq_delete_file_scan_tasks: Vec<FileScanTask>,
batch_size: usize,
schema: Schema,
identity: String,
Expand All @@ -72,7 +78,8 @@ impl IcebergScanExecutor {
iceberg_config,
snapshot_id,
table_meta,
file_scan_tasks,
data_file_scan_tasks,
eq_delete_file_scan_tasks,
batch_size,
schema,
identity,
Expand All @@ -86,33 +93,134 @@ impl IcebergScanExecutor {
.load_table_v2_with_metadata(self.table_meta)
.await?;
let data_types = self.schema.data_types();
let chunk_schema_name = self.schema.names();

let file_scan_tasks = mem::take(&mut self.file_scan_tasks);
let mut eq_delete_file_scan_tasks_map: HashMap<OwnedRow, i64> = HashMap::default();
let eq_delete_file_scan_tasks = mem::take(&mut self.eq_delete_file_scan_tasks);

let file_scan_stream = {
#[try_stream]
async move {
for file_scan_task in file_scan_tasks {
yield file_scan_task;
// Iterate all the delete files to get the value and seq_num
let mut delete_equality_ids: Option<Vec<_>> = None;
for eq_delete_file_scan_task in eq_delete_file_scan_tasks {
let mut sequence_number = eq_delete_file_scan_task.sequence_number;

if delete_equality_ids.is_none() {
delete_equality_ids = Some(eq_delete_file_scan_task.project_field_ids.clone());
}

let reader = table
.reader_builder()
.with_batch_size(self.batch_size)
.build();
let delete_file_scan_stream = tokio_stream::once(Ok(eq_delete_file_scan_task));

let mut delete_record_batch_stream = reader
.read(Box::pin(delete_file_scan_stream))
.map_err(BatchError::Iceberg)?;

while let Some(record_batch) = delete_record_batch_stream.next().await {
let record_batch = record_batch.map_err(BatchError::Iceberg)?;

let chunk = IcebergArrowConvert.chunk_from_record_batch(&record_batch)?;
for row in chunk.rows() {
let entry = eq_delete_file_scan_tasks_map
.entry(row.to_owned_row())
.or_default();
*entry = *entry.max(&mut sequence_number);
}
}
};

let reader = table
.reader_builder()
.with_batch_size(self.batch_size)
.build();

let record_batch_stream = reader
.read(Box::pin(file_scan_stream))
.map_err(BatchError::Iceberg)?;

#[for_await]
for record_batch in record_batch_stream {
let record_batch = record_batch.map_err(BatchError::Iceberg)?;
let chunk = IcebergArrowConvert.chunk_from_record_batch(&record_batch)?;
debug_assert_eq!(chunk.data_types(), data_types);
yield chunk;
}

let data_file_scan_tasks = mem::take(&mut self.data_file_scan_tasks);

// Delete rows in the data file that need to be deleted by map
for data_file_scan_task in data_file_scan_tasks {
let data_sequence_number = data_file_scan_task.sequence_number;

let column_names: Vec<_> = data_file_scan_task
.project_field_ids
.iter()
.filter_map(|id| {
data_file_scan_task
.schema
.name_by_field_id(*id)
.map(|name| name.to_string())
})
.collect();

// The order of the field_ids in the data file and delete file may be different, so need to correct them here
let delete_column_ids = delete_equality_ids.as_ref().map(|delete_column_ids| {
delete_column_ids
.iter()
.map(|delete_column_id| {
data_file_scan_task
.project_field_ids
.iter()
.position(|project_field_id| delete_column_id == project_field_id)
.expect("delete_column_id not found in delete_equality_ids")
})
.collect_vec()
});

let reader = table
.reader_builder()
.with_batch_size(self.batch_size)
.build();
let file_scan_stream = tokio_stream::once(Ok(data_file_scan_task));

let mut record_batch_stream = reader
.read(Box::pin(file_scan_stream))
.map_err(BatchError::Iceberg)?;

while let Some(record_batch) = record_batch_stream.next().await {
let record_batch = record_batch.map_err(BatchError::Iceberg)?;

let chunk = IcebergArrowConvert.chunk_from_record_batch(&record_batch)?;
let chunk = match delete_column_ids.as_ref() {
Some(delete_column_ids) => {
let visibility = Bitmap::from_iter(
// Project with the schema of the delete file
chunk.project(delete_column_ids).rows().map(|row_ref| {
xxhZs marked this conversation as resolved.
Show resolved Hide resolved
let row = OwnedRow::new(
row_ref
.iter()
.map(|scalar_ref| scalar_ref.map(Into::into))
.collect_vec(),
);
if let Some(delete_sequence_number) =
eq_delete_file_scan_tasks_map.get(&row)
&& delete_sequence_number > &data_sequence_number
{
// delete_sequence_number > data_sequence_number means the delete file is written later than data file,
// so it needs to be deleted
false
} else {
true
}
}),
)
.clone();
let (data, _chunk_visibilities) = chunk.into_parts_v2();
// Keep the schema consistent(chunk and executor)
let data = data
.iter()
.zip_eq_fast(&column_names)
.filter_map(|(array, columns)| {
if chunk_schema_name.contains(columns) {
Some(array.clone())
} else {
None
}
})
.collect_vec();
let chunk = DataChunk::new(data, visibility);
debug_assert_eq!(chunk.data_types(), data_types);
chunk
}
// If there is no delete file, the data file is directly output
None => chunk,
};
yield chunk;
}
}
}
}
Expand Down Expand Up @@ -171,6 +279,11 @@ impl BoxedExecutorBuilder for IcebergScanExecutorBuilder {
Some(split.snapshot_id),
split.table_meta.deserialize(),
split.files.into_iter().map(|x| x.deserialize()).collect(),
split
.eq_delete_files
.into_iter()
.map(|x| x.deserialize())
.collect(),
source.context.get_config().developer.chunk_size,
schema,
source.plan_node().get_identity().clone(),
Expand Down
2 changes: 1 addition & 1 deletion src/connector/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ jni = { version = "0.21.1", features = ["invocation"] }
jsonbb = { workspace = true }
jsonwebtoken = "9.2.0"
maplit = "1.0.2"
moka = { version = "0.12", features = ["future"] }
moka = { version = "0.12.8", features = ["future"] }
mongodb = { version = "2.8.2", features = ["tokio-runtime"] }
mysql_async = { version = "0.34", default-features = false, features = [
"default",
Expand Down
4 changes: 2 additions & 2 deletions src/connector/src/sink/iceberg/jni_catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,7 @@ impl CatalogV2 for JniCatalog {
"Failed to crete iceberg table.",
)
.with_source(e)
})
})?
}

/// Load table from the catalog.
Expand Down Expand Up @@ -338,7 +338,7 @@ impl CatalogV2 for JniCatalog {
"Failed to load iceberg table.",
)
.with_source(e)
})
})?
}

/// Drop a table from the catalog.
Expand Down
Loading
Loading