Skip to content

Commit

Permalink
fix: deletion between two put may not work in last_non_null mode (#…
Browse files Browse the repository at this point in the history
…5168)

* fix: deletion between rows with the same key may not work

* test: add sqlness test case

* chore: comments
  • Loading branch information
evenyag committed Dec 20, 2024
1 parent 554121a commit ffdcb8c
Show file tree
Hide file tree
Showing 3 changed files with 136 additions and 1 deletion.
45 changes: 44 additions & 1 deletion src/mito2/src/read/dedup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,12 @@ pub(crate) struct DedupMetrics {
}

/// Buffer to store fields in the last row to merge.
///
/// Usage:
/// We should call `maybe_init()` to initialize the builder and then call `push_first_row()`
/// to push the first row of batches that the timestamp is the same as the row in this builder.
/// Finally we should call `merge_last_non_null()` to merge the last non-null fields and
/// return the merged batch.
struct LastFieldsBuilder {
/// Filter deleted rows.
filter_deleted: bool,
Expand Down Expand Up @@ -311,6 +317,16 @@ impl LastFieldsBuilder {
return;
}

// Both `maybe_init()` and `push_first_row()` can update the builder. If the delete
// op is not in the latest row, then we can't set the deletion flag in the `maybe_init()`.
// We must check the batch and update the deletion flag here to prevent
// the builder from merging non-null fields in rows that insert before the deleted row.
self.contains_deletion = batch.op_types().get_data(0).unwrap() == OpType::Delete as u8;
if self.contains_deletion {
// Deletes this row.
return;
}

let fields = batch.fields();
for (idx, value) in self.last_fields.iter_mut().enumerate() {
if value.is_null() && !fields[idx].data.is_null(0) {
Expand All @@ -323,7 +339,8 @@ impl LastFieldsBuilder {
}

/// Merges last non-null fields, builds a new batch and resets the builder.
/// It may overwrites the last row of the `buffer`.
/// It may overwrites the last row of the `buffer`. The `buffer` is the batch
/// that initialized the builder.
fn merge_last_non_null(
&mut self,
buffer: Batch,
Expand Down Expand Up @@ -1082,6 +1099,32 @@ mod tests {
);
}

#[test]
fn test_last_non_null_strategy_delete_middle() {
let input = [
new_batch_multi_fields(b"k1", &[1], &[7], &[OpType::Put], &[(Some(11), None)]),
new_batch_multi_fields(b"k1", &[1], &[4], &[OpType::Delete], &[(None, None)]),
new_batch_multi_fields(b"k1", &[1], &[1], &[OpType::Put], &[(Some(12), Some(1))]),
new_batch_multi_fields(b"k1", &[2], &[8], &[OpType::Put], &[(Some(21), None)]),
new_batch_multi_fields(b"k1", &[2], &[5], &[OpType::Delete], &[(None, None)]),
new_batch_multi_fields(b"k1", &[2], &[2], &[OpType::Put], &[(Some(22), Some(2))]),
new_batch_multi_fields(b"k1", &[3], &[9], &[OpType::Put], &[(Some(31), None)]),
new_batch_multi_fields(b"k1", &[3], &[6], &[OpType::Delete], &[(None, None)]),
new_batch_multi_fields(b"k1", &[3], &[3], &[OpType::Put], &[(Some(32), Some(3))]),
];

let mut strategy = LastNonNull::new(true);
check_dedup_strategy(
&input,
&mut strategy,
&[
new_batch_multi_fields(b"k1", &[1], &[7], &[OpType::Put], &[(Some(11), None)]),
new_batch_multi_fields(b"k1", &[2], &[8], &[OpType::Put], &[(Some(21), None)]),
new_batch_multi_fields(b"k1", &[3], &[9], &[OpType::Put], &[(Some(31), None)]),
],
);
}

#[test]
fn test_last_non_null_iter_on_batch() {
let input = [new_batch_multi_fields(
Expand Down
65 changes: 65 additions & 0 deletions tests/cases/standalone/common/insert/merge_mode.result
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,71 @@ DROP TABLE last_row_table;

Affected Rows: 0

CREATE TABLE IF NOT EXISTS `delete_between` (
`time` TIMESTAMP(0) NOT NULL,
`code` STRING NULL,
`name` STRING NULL,
`status` TINYINT NULL,
TIME INDEX (`time`),
PRIMARY KEY (`code`)
) ENGINE=mito WITH(
merge_mode = 'last_non_null'
);

Affected Rows: 0

INSERT INTO `delete_between` (`time`, `code`, `name`, `status`) VALUES ('2024-11-26 10:00:00', 'achn', '1.png', 0);

Affected Rows: 1

INSERT INTO `delete_between` (`time`, `code`, `name`, `status`) VALUES ('2024-11-26 10:01:00', 'achn', '2.png', 0);

Affected Rows: 1

INSERT INTO `delete_between` (`time`, `code`, `name`, `status`) VALUES ('2024-11-26 10:02:00', 'achn', '3.png', 1);

Affected Rows: 1

SELECT * FROM `delete_between`;

+---------------------+------+-------+--------+
| time | code | name | status |
+---------------------+------+-------+--------+
| 2024-11-26T10:00:00 | achn | 1.png | 0 |
| 2024-11-26T10:01:00 | achn | 2.png | 0 |
| 2024-11-26T10:02:00 | achn | 3.png | 1 |
+---------------------+------+-------+--------+

DELETE FROM `delete_between`;

Affected Rows: 3

INSERT INTO `delete_between` (`time`, `code`, `name`) VALUES ('2024-11-26 10:00:00', 'achn', '1.png');

Affected Rows: 1

INSERT INTO `delete_between` (`time`, `code`, `name`) VALUES ('2024-11-26 10:01:00', 'achn', '2.png');

Affected Rows: 1

INSERT INTO `delete_between` (`time`, `code`, `name`) VALUES ('2024-11-26 10:02:00', 'achn', '3.png');

Affected Rows: 1

SELECT * FROM `delete_between`;

+---------------------+------+-------+--------+
| time | code | name | status |
+---------------------+------+-------+--------+
| 2024-11-26T10:00:00 | achn | 1.png | |
| 2024-11-26T10:01:00 | achn | 2.png | |
| 2024-11-26T10:02:00 | achn | 3.png | |
+---------------------+------+-------+--------+

DROP TABLE `delete_between`;

Affected Rows: 0

create table if not exists invalid_merge_mode(
host string,
ts timestamp,
Expand Down
27 changes: 27 additions & 0 deletions tests/cases/standalone/common/insert/merge_mode.sql
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,33 @@ SELECT * from last_row_table ORDER BY host, ts;

DROP TABLE last_row_table;

CREATE TABLE IF NOT EXISTS `delete_between` (
`time` TIMESTAMP(0) NOT NULL,
`code` STRING NULL,
`name` STRING NULL,
`status` TINYINT NULL,
TIME INDEX (`time`),
PRIMARY KEY (`code`)
) ENGINE=mito WITH(
merge_mode = 'last_non_null'
);

INSERT INTO `delete_between` (`time`, `code`, `name`, `status`) VALUES ('2024-11-26 10:00:00', 'achn', '1.png', 0);
INSERT INTO `delete_between` (`time`, `code`, `name`, `status`) VALUES ('2024-11-26 10:01:00', 'achn', '2.png', 0);
INSERT INTO `delete_between` (`time`, `code`, `name`, `status`) VALUES ('2024-11-26 10:02:00', 'achn', '3.png', 1);

SELECT * FROM `delete_between`;

DELETE FROM `delete_between`;

INSERT INTO `delete_between` (`time`, `code`, `name`) VALUES ('2024-11-26 10:00:00', 'achn', '1.png');
INSERT INTO `delete_between` (`time`, `code`, `name`) VALUES ('2024-11-26 10:01:00', 'achn', '2.png');
INSERT INTO `delete_between` (`time`, `code`, `name`) VALUES ('2024-11-26 10:02:00', 'achn', '3.png');

SELECT * FROM `delete_between`;

DROP TABLE `delete_between`;

create table if not exists invalid_merge_mode(
host string,
ts timestamp,
Expand Down

0 comments on commit ffdcb8c

Please sign in to comment.