Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: deletion between two put may not work in last_non_null mode #5168

Merged
merged 3 commits into from
Dec 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 44 additions & 1 deletion src/mito2/src/read/dedup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,12 @@ pub(crate) struct DedupMetrics {
}

/// Buffer to store fields in the last row to merge.
///
/// Usage:
/// We should call `maybe_init()` to initialize the builder and then call `push_first_row()`
/// to push the first row of batches that the timestamp is the same as the row in this builder.
/// Finally we should call `merge_last_non_null()` to merge the last non-null fields and
/// return the merged batch.
struct LastFieldsBuilder {
/// Filter deleted rows.
filter_deleted: bool,
Expand Down Expand Up @@ -311,6 +317,16 @@ impl LastFieldsBuilder {
return;
}

// Both `maybe_init()` and `push_first_row()` can update the builder. If the delete
// op is not in the latest row, then we can't set the deletion flag in the `maybe_init()`.
// We must check the batch and update the deletion flag here to prevent
// the builder from merging non-null fields in rows that insert before the deleted row.
self.contains_deletion = batch.op_types().get_data(0).unwrap() == OpType::Delete as u8;
evenyag marked this conversation as resolved.
Show resolved Hide resolved
if self.contains_deletion {
// Deletes this row.
return;
}

let fields = batch.fields();
for (idx, value) in self.last_fields.iter_mut().enumerate() {
if value.is_null() && !fields[idx].data.is_null(0) {
Expand All @@ -323,7 +339,8 @@ impl LastFieldsBuilder {
}

/// Merges last non-null fields, builds a new batch and resets the builder.
/// It may overwrites the last row of the `buffer`.
/// It may overwrites the last row of the `buffer`. The `buffer` is the batch
/// that initialized the builder.
fn merge_last_non_null(
&mut self,
buffer: Batch,
Expand Down Expand Up @@ -1082,6 +1099,32 @@ mod tests {
);
}

#[test]
fn test_last_non_null_strategy_delete_middle() {
let input = [
new_batch_multi_fields(b"k1", &[1], &[7], &[OpType::Put], &[(Some(11), None)]),
new_batch_multi_fields(b"k1", &[1], &[4], &[OpType::Delete], &[(None, None)]),
new_batch_multi_fields(b"k1", &[1], &[1], &[OpType::Put], &[(Some(12), Some(1))]),
new_batch_multi_fields(b"k1", &[2], &[8], &[OpType::Put], &[(Some(21), None)]),
new_batch_multi_fields(b"k1", &[2], &[5], &[OpType::Delete], &[(None, None)]),
new_batch_multi_fields(b"k1", &[2], &[2], &[OpType::Put], &[(Some(22), Some(2))]),
new_batch_multi_fields(b"k1", &[3], &[9], &[OpType::Put], &[(Some(31), None)]),
new_batch_multi_fields(b"k1", &[3], &[6], &[OpType::Delete], &[(None, None)]),
new_batch_multi_fields(b"k1", &[3], &[3], &[OpType::Put], &[(Some(32), Some(3))]),
];

let mut strategy = LastNonNull::new(true);
check_dedup_strategy(
&input,
&mut strategy,
&[
new_batch_multi_fields(b"k1", &[1], &[7], &[OpType::Put], &[(Some(11), None)]),
new_batch_multi_fields(b"k1", &[2], &[8], &[OpType::Put], &[(Some(21), None)]),
new_batch_multi_fields(b"k1", &[3], &[9], &[OpType::Put], &[(Some(31), None)]),
],
);
}

#[test]
fn test_last_non_null_iter_on_batch() {
let input = [new_batch_multi_fields(
Expand Down
65 changes: 65 additions & 0 deletions tests/cases/standalone/common/insert/merge_mode.result
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,71 @@ DROP TABLE last_row_table;

Affected Rows: 0

CREATE TABLE IF NOT EXISTS `delete_between` (
`time` TIMESTAMP(0) NOT NULL,
`code` STRING NULL,
`name` STRING NULL,
`status` TINYINT NULL,
TIME INDEX (`time`),
PRIMARY KEY (`code`)
) ENGINE=mito WITH(
merge_mode = 'last_non_null'
);

Affected Rows: 0

INSERT INTO `delete_between` (`time`, `code`, `name`, `status`) VALUES ('2024-11-26 10:00:00', 'achn', '1.png', 0);

Affected Rows: 1

INSERT INTO `delete_between` (`time`, `code`, `name`, `status`) VALUES ('2024-11-26 10:01:00', 'achn', '2.png', 0);

Affected Rows: 1

INSERT INTO `delete_between` (`time`, `code`, `name`, `status`) VALUES ('2024-11-26 10:02:00', 'achn', '3.png', 1);

Affected Rows: 1

SELECT * FROM `delete_between`;

+---------------------+------+-------+--------+
| time | code | name | status |
+---------------------+------+-------+--------+
| 2024-11-26T10:00:00 | achn | 1.png | 0 |
| 2024-11-26T10:01:00 | achn | 2.png | 0 |
| 2024-11-26T10:02:00 | achn | 3.png | 1 |
+---------------------+------+-------+--------+

DELETE FROM `delete_between`;

Affected Rows: 3

INSERT INTO `delete_between` (`time`, `code`, `name`) VALUES ('2024-11-26 10:00:00', 'achn', '1.png');

Affected Rows: 1

INSERT INTO `delete_between` (`time`, `code`, `name`) VALUES ('2024-11-26 10:01:00', 'achn', '2.png');

Affected Rows: 1

INSERT INTO `delete_between` (`time`, `code`, `name`) VALUES ('2024-11-26 10:02:00', 'achn', '3.png');

Affected Rows: 1

SELECT * FROM `delete_between`;

+---------------------+------+-------+--------+
| time | code | name | status |
+---------------------+------+-------+--------+
| 2024-11-26T10:00:00 | achn | 1.png | |
| 2024-11-26T10:01:00 | achn | 2.png | |
| 2024-11-26T10:02:00 | achn | 3.png | |
+---------------------+------+-------+--------+

DROP TABLE `delete_between`;

Affected Rows: 0

create table if not exists invalid_merge_mode(
host string,
ts timestamp,
Expand Down
27 changes: 27 additions & 0 deletions tests/cases/standalone/common/insert/merge_mode.sql
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,33 @@ SELECT * from last_row_table ORDER BY host, ts;

DROP TABLE last_row_table;

CREATE TABLE IF NOT EXISTS `delete_between` (
`time` TIMESTAMP(0) NOT NULL,
`code` STRING NULL,
`name` STRING NULL,
`status` TINYINT NULL,
TIME INDEX (`time`),
PRIMARY KEY (`code`)
) ENGINE=mito WITH(
merge_mode = 'last_non_null'
);

INSERT INTO `delete_between` (`time`, `code`, `name`, `status`) VALUES ('2024-11-26 10:00:00', 'achn', '1.png', 0);
INSERT INTO `delete_between` (`time`, `code`, `name`, `status`) VALUES ('2024-11-26 10:01:00', 'achn', '2.png', 0);
INSERT INTO `delete_between` (`time`, `code`, `name`, `status`) VALUES ('2024-11-26 10:02:00', 'achn', '3.png', 1);

SELECT * FROM `delete_between`;

DELETE FROM `delete_between`;

INSERT INTO `delete_between` (`time`, `code`, `name`) VALUES ('2024-11-26 10:00:00', 'achn', '1.png');
INSERT INTO `delete_between` (`time`, `code`, `name`) VALUES ('2024-11-26 10:01:00', 'achn', '2.png');
INSERT INTO `delete_between` (`time`, `code`, `name`) VALUES ('2024-11-26 10:02:00', 'achn', '3.png');

SELECT * FROM `delete_between`;

DROP TABLE `delete_between`;

create table if not exists invalid_merge_mode(
host string,
ts timestamp,
Expand Down
Loading