Skip to content

Commit

Permalink
fix(state table): use value_checker in MemTable::delete (#16539)
Browse files Browse the repository at this point in the history
Signed-off-by: Richard Chien <[email protected]>
  • Loading branch information
stdrc authored Apr 29, 2024
1 parent da899ed commit a051c25
Showing 1 changed file with 54 additions and 57 deletions.
111 changes: 54 additions & 57 deletions src/storage/src/mem_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,30 +134,29 @@ impl MemTable {
pub fn insert(&mut self, pk: TableKey<Bytes>, value: Bytes) -> Result<()> {
if let OpConsistencyLevel::Inconsistent = &self.op_consistency_level {
let key_len = std::mem::size_of::<Bytes>() + pk.len();
let insert_value = KeyOp::Insert(value);
self.kv_size.add(&pk, &insert_value);
let origin_value = self.buffer.insert(pk, insert_value);
self.sub_origin_size(origin_value, key_len);
let op = KeyOp::Insert(value);
self.kv_size.add(&pk, &op);
let old_op = self.buffer.insert(pk, op);
self.sub_old_op_size(old_op, key_len);

return Ok(());
};
let entry = self.buffer.entry(pk);
match entry {
Entry::Vacant(e) => {
let insert_value = KeyOp::Insert(value);
self.kv_size.add(e.key(), &insert_value);
e.insert(insert_value);
let op = KeyOp::Insert(value);
self.kv_size.add(e.key(), &op);
e.insert(op);
Ok(())
}
Entry::Occupied(mut e) => {
let origin_value = e.get_mut();
self.kv_size.sub_val(origin_value);
match origin_value {
KeyOp::Delete(ref mut old_value) => {
let old_val = std::mem::take(old_value);
let update_value = KeyOp::Update((old_val, value));
self.kv_size.add_val(&update_value);
e.insert(update_value);
let old_op = e.get_mut();
self.kv_size.sub_val(old_op);
match old_op {
KeyOp::Delete(ref mut old_op_old_value) => {
let new_op = KeyOp::Update((std::mem::take(old_op_old_value), value));
self.kv_size.add_val(&new_op);
e.insert(new_op);
Ok(())
}
KeyOp::Insert(_) | KeyOp::Update(_) => {
Expand All @@ -180,26 +179,26 @@ impl MemTable {
..
} = &self.op_consistency_level
else {
let delete_value = KeyOp::Delete(old_value);
self.kv_size.add(&pk, &delete_value);
let origin_value = self.buffer.insert(pk, delete_value);
self.sub_origin_size(origin_value, key_len);
let op = KeyOp::Delete(old_value);
self.kv_size.add(&pk, &op);
let old_op = self.buffer.insert(pk, op);
self.sub_old_op_size(old_op, key_len);
return Ok(());
};
let entry = self.buffer.entry(pk);
match entry {
Entry::Vacant(e) => {
let delete_value = KeyOp::Delete(old_value);
self.kv_size.add(e.key(), &delete_value);
e.insert(delete_value);
let op = KeyOp::Delete(old_value);
self.kv_size.add(e.key(), &op);
e.insert(op);
Ok(())
}
Entry::Occupied(mut e) => {
let origin_value = e.get_mut();
self.kv_size.sub_val(origin_value);
match origin_value {
KeyOp::Insert(original_value) => {
if ENABLE_SANITY_CHECK && !value_checker(original_value, &old_value) {
let old_op = e.get_mut();
self.kv_size.sub_val(old_op);
match old_op {
KeyOp::Insert(old_op_new_value) => {
if ENABLE_SANITY_CHECK && !value_checker(old_op_new_value, &old_value) {
return Err(Box::new(MemTableError::InconsistentOperation {
key: e.key().clone(),
prev: e.get().clone(),
Expand All @@ -218,18 +217,17 @@ impl MemTable {
new: KeyOp::Delete(old_value),
}
.into()),
KeyOp::Update(value) => {
let (original_old_value, original_new_value) = std::mem::take(value);
if ENABLE_SANITY_CHECK && original_new_value != old_value {
KeyOp::Update((old_op_old_value, old_op_new_value)) => {
if ENABLE_SANITY_CHECK && !value_checker(old_op_new_value, &old_value) {
return Err(Box::new(MemTableError::InconsistentOperation {
key: e.key().clone(),
prev: e.get().clone(),
new: KeyOp::Delete(old_value),
}));
}
let delete_value = KeyOp::Delete(original_old_value);
self.kv_size.add_val(&delete_value);
e.insert(delete_value);
let new_op = KeyOp::Delete(std::mem::take(old_op_old_value));
self.kv_size.add_val(&new_op);
e.insert(new_op);
Ok(())
}
}
Expand All @@ -250,49 +248,48 @@ impl MemTable {
else {
let key_len = std::mem::size_of::<Bytes>() + pk.len();

let update_value = KeyOp::Update((old_value, new_value));
self.kv_size.add(&pk, &update_value);
let origin_value = self.buffer.insert(pk, update_value);
self.sub_origin_size(origin_value, key_len);
let op = KeyOp::Update((old_value, new_value));
self.kv_size.add(&pk, &op);
let old_op = self.buffer.insert(pk, op);
self.sub_old_op_size(old_op, key_len);
return Ok(());
};
let entry = self.buffer.entry(pk);
match entry {
Entry::Vacant(e) => {
let update_value = KeyOp::Update((old_value, new_value));
self.kv_size.add(e.key(), &update_value);
e.insert(update_value);
let op = KeyOp::Update((old_value, new_value));
self.kv_size.add(e.key(), &op);
e.insert(op);
Ok(())
}
Entry::Occupied(mut e) => {
let origin_value = e.get_mut();
self.kv_size.sub_val(origin_value);
match origin_value {
KeyOp::Insert(original_new_value) => {
if ENABLE_SANITY_CHECK && !value_checker(original_new_value, &old_value) {
let old_op = e.get_mut();
self.kv_size.sub_val(old_op);
match old_op {
KeyOp::Insert(old_op_new_value) => {
if ENABLE_SANITY_CHECK && !value_checker(old_op_new_value, &old_value) {
return Err(Box::new(MemTableError::InconsistentOperation {
key: e.key().clone(),
prev: e.get().clone(),
new: KeyOp::Update((old_value, new_value)),
}));
}
let new_key_op = KeyOp::Insert(new_value);
self.kv_size.add_val(&new_key_op);
e.insert(new_key_op);
let new_op = KeyOp::Insert(new_value);
self.kv_size.add_val(&new_op);
e.insert(new_op);
Ok(())
}
KeyOp::Update((origin_old_value, original_new_value)) => {
if ENABLE_SANITY_CHECK && !value_checker(original_new_value, &old_value) {
KeyOp::Update((old_op_old_value, old_op_new_value)) => {
if ENABLE_SANITY_CHECK && !value_checker(old_op_new_value, &old_value) {
return Err(Box::new(MemTableError::InconsistentOperation {
key: e.key().clone(),
prev: e.get().clone(),
new: KeyOp::Update((old_value, new_value)),
}));
}
let old_value = std::mem::take(origin_old_value);
let new_key_op = KeyOp::Update((old_value, new_value));
self.kv_size.add_val(&new_key_op);
e.insert(new_key_op);
let new_op = KeyOp::Update((std::mem::take(old_op_old_value), new_value));
self.kv_size.add_val(&new_op);
e.insert(new_op);
Ok(())
}
KeyOp::Delete(_) => Err(MemTableError::InconsistentOperation {
Expand Down Expand Up @@ -320,9 +317,9 @@ impl MemTable {
self.buffer.range(key_range)
}

fn sub_origin_size(&mut self, origin_value: Option<KeyOp>, key_len: usize) {
if let Some(origin_value) = origin_value {
self.kv_size.sub_val(&origin_value);
fn sub_old_op_size(&mut self, old_op: Option<KeyOp>, key_len: usize) {
if let Some(op) = old_op {
self.kv_size.sub_val(&op);
self.kv_size.sub_size(key_len);
}
}
Expand Down

0 comments on commit a051c25

Please sign in to comment.