diff --git a/src/mito2/src/read/dedup.rs b/src/mito2/src/read/dedup.rs index c77d0c3fabe1..a29781b94746 100644 --- a/src/mito2/src/read/dedup.rs +++ b/src/mito2/src/read/dedup.rs @@ -224,6 +224,12 @@ pub(crate) struct DedupMetrics { } /// Buffer to store fields in the last row to merge. +/// +/// Usage: +/// We should call `maybe_init()` to initialize the builder and then call `push_first_row()` +/// to push the first row of batches that the timestamp is the same as the row in this builder. +/// Finally we should call `merge_last_non_null()` to merge the last non-null fields and +/// return the merged batch. struct LastFieldsBuilder { /// Filter deleted rows. filter_deleted: bool, @@ -311,6 +317,16 @@ impl LastFieldsBuilder { return; } + // Both `maybe_init()` and `push_first_row()` can update the builder. If the delete + // op is not in the latest row, then we can't set the deletion flag in the `maybe_init()`. + // We must check the batch and update the deletion flag here to prevent + // the builder from merging non-null fields in rows that insert before the deleted row. + self.contains_deletion = batch.op_types().get_data(0).unwrap() == OpType::Delete as u8; + if self.contains_deletion { + // Deletes this row. + return; + } + let fields = batch.fields(); for (idx, value) in self.last_fields.iter_mut().enumerate() { if value.is_null() && !fields[idx].data.is_null(0) { @@ -323,7 +339,8 @@ impl LastFieldsBuilder { } /// Merges last non-null fields, builds a new batch and resets the builder. - /// It may overwrites the last row of the `buffer`. + /// It may overwrites the last row of the `buffer`. The `buffer` is the batch + /// that initialized the builder. fn merge_last_non_null( &mut self, buffer: Batch, @@ -1082,6 +1099,32 @@ mod tests { ); } + #[test] + fn test_last_non_null_strategy_delete_middle() { + let input = [ + new_batch_multi_fields(b"k1", &[1], &[7], &[OpType::Put], &[(Some(11), None)]), + new_batch_multi_fields(b"k1", &[1], &[4], &[OpType::Delete], &[(None, None)]), + new_batch_multi_fields(b"k1", &[1], &[1], &[OpType::Put], &[(Some(12), Some(1))]), + new_batch_multi_fields(b"k1", &[2], &[8], &[OpType::Put], &[(Some(21), None)]), + new_batch_multi_fields(b"k1", &[2], &[5], &[OpType::Delete], &[(None, None)]), + new_batch_multi_fields(b"k1", &[2], &[2], &[OpType::Put], &[(Some(22), Some(2))]), + new_batch_multi_fields(b"k1", &[3], &[9], &[OpType::Put], &[(Some(31), None)]), + new_batch_multi_fields(b"k1", &[3], &[6], &[OpType::Delete], &[(None, None)]), + new_batch_multi_fields(b"k1", &[3], &[3], &[OpType::Put], &[(Some(32), Some(3))]), + ]; + + let mut strategy = LastNonNull::new(true); + check_dedup_strategy( + &input, + &mut strategy, + &[ + new_batch_multi_fields(b"k1", &[1], &[7], &[OpType::Put], &[(Some(11), None)]), + new_batch_multi_fields(b"k1", &[2], &[8], &[OpType::Put], &[(Some(21), None)]), + new_batch_multi_fields(b"k1", &[3], &[9], &[OpType::Put], &[(Some(31), None)]), + ], + ); + } + #[test] fn test_last_non_null_iter_on_batch() { let input = [new_batch_multi_fields( diff --git a/tests/cases/standalone/common/insert/merge_mode.result b/tests/cases/standalone/common/insert/merge_mode.result index f96ad2c8bce2..a98f6b6e38e7 100644 --- a/tests/cases/standalone/common/insert/merge_mode.result +++ b/tests/cases/standalone/common/insert/merge_mode.result @@ -92,6 +92,71 @@ DROP TABLE last_row_table; Affected Rows: 0 +CREATE TABLE IF NOT EXISTS `delete_between` ( + `time` TIMESTAMP(0) NOT NULL, + `code` STRING NULL, + `name` STRING NULL, + `status` TINYINT NULL, + TIME INDEX (`time`), + PRIMARY KEY (`code`) +) ENGINE=mito WITH( + merge_mode = 'last_non_null' +); + +Affected Rows: 0 + +INSERT INTO `delete_between` (`time`, `code`, `name`, `status`) VALUES ('2024-11-26 10:00:00', 'achn', '1.png', 0); + +Affected Rows: 1 + +INSERT INTO `delete_between` (`time`, `code`, `name`, `status`) VALUES ('2024-11-26 10:01:00', 'achn', '2.png', 0); + +Affected Rows: 1 + +INSERT INTO `delete_between` (`time`, `code`, `name`, `status`) VALUES ('2024-11-26 10:02:00', 'achn', '3.png', 1); + +Affected Rows: 1 + +SELECT * FROM `delete_between`; + ++---------------------+------+-------+--------+ +| time | code | name | status | ++---------------------+------+-------+--------+ +| 2024-11-26T10:00:00 | achn | 1.png | 0 | +| 2024-11-26T10:01:00 | achn | 2.png | 0 | +| 2024-11-26T10:02:00 | achn | 3.png | 1 | ++---------------------+------+-------+--------+ + +DELETE FROM `delete_between`; + +Affected Rows: 3 + +INSERT INTO `delete_between` (`time`, `code`, `name`) VALUES ('2024-11-26 10:00:00', 'achn', '1.png'); + +Affected Rows: 1 + +INSERT INTO `delete_between` (`time`, `code`, `name`) VALUES ('2024-11-26 10:01:00', 'achn', '2.png'); + +Affected Rows: 1 + +INSERT INTO `delete_between` (`time`, `code`, `name`) VALUES ('2024-11-26 10:02:00', 'achn', '3.png'); + +Affected Rows: 1 + +SELECT * FROM `delete_between`; + ++---------------------+------+-------+--------+ +| time | code | name | status | ++---------------------+------+-------+--------+ +| 2024-11-26T10:00:00 | achn | 1.png | | +| 2024-11-26T10:01:00 | achn | 2.png | | +| 2024-11-26T10:02:00 | achn | 3.png | | ++---------------------+------+-------+--------+ + +DROP TABLE `delete_between`; + +Affected Rows: 0 + create table if not exists invalid_merge_mode( host string, ts timestamp, diff --git a/tests/cases/standalone/common/insert/merge_mode.sql b/tests/cases/standalone/common/insert/merge_mode.sql index 967f94933311..9d22cc13d659 100644 --- a/tests/cases/standalone/common/insert/merge_mode.sql +++ b/tests/cases/standalone/common/insert/merge_mode.sql @@ -44,6 +44,33 @@ SELECT * from last_row_table ORDER BY host, ts; DROP TABLE last_row_table; +CREATE TABLE IF NOT EXISTS `delete_between` ( + `time` TIMESTAMP(0) NOT NULL, + `code` STRING NULL, + `name` STRING NULL, + `status` TINYINT NULL, + TIME INDEX (`time`), + PRIMARY KEY (`code`) +) ENGINE=mito WITH( + merge_mode = 'last_non_null' +); + +INSERT INTO `delete_between` (`time`, `code`, `name`, `status`) VALUES ('2024-11-26 10:00:00', 'achn', '1.png', 0); +INSERT INTO `delete_between` (`time`, `code`, `name`, `status`) VALUES ('2024-11-26 10:01:00', 'achn', '2.png', 0); +INSERT INTO `delete_between` (`time`, `code`, `name`, `status`) VALUES ('2024-11-26 10:02:00', 'achn', '3.png', 1); + +SELECT * FROM `delete_between`; + +DELETE FROM `delete_between`; + +INSERT INTO `delete_between` (`time`, `code`, `name`) VALUES ('2024-11-26 10:00:00', 'achn', '1.png'); +INSERT INTO `delete_between` (`time`, `code`, `name`) VALUES ('2024-11-26 10:01:00', 'achn', '2.png'); +INSERT INTO `delete_between` (`time`, `code`, `name`) VALUES ('2024-11-26 10:02:00', 'achn', '3.png'); + +SELECT * FROM `delete_between`; + +DROP TABLE `delete_between`; + create table if not exists invalid_merge_mode( host string, ts timestamp,