Skip to content

Commit

Permalink
feat: add write_batch
Browse files Browse the repository at this point in the history
  • Loading branch information
Yongxin-Hu committed Mar 15, 2024
1 parent ed6180f commit 2f460b5
Showing 1 changed file with 41 additions and 20 deletions.
61 changes: 41 additions & 20 deletions src/engines/lsm/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,12 @@ impl LsmStorage{
}
}

pub enum WriteBatchRecord<T: AsRef<[u8]>> {
Put(T, T),
Del(T),
}


/// The storage interface of the LSM tree.
pub struct LsmStorageInner {
pub(crate) state: Arc<RwLock<Arc<LsmStorageState>>>,
Expand Down Expand Up @@ -337,31 +343,46 @@ impl LsmStorageInner{
Ok(None)
}

/// Put a key-value pair into the storage by writing into the current memtable.
/// 将 kv-pair 写入 activate_memtable
pub fn put(&self, key: &[u8], value: &[u8]) -> Result<()> {
assert!(!value.is_empty(), "value cannot be empty");
assert!(!key.is_empty(), "key cannot be empty");
let size;
{
let guard = self.state.read();
guard.active_memtable.put(key, value)?;
size = guard.active_memtable.approximate_size();
}
self.try_freeze(size)?;
Ok(())
self.write_batch(&[WriteBatchRecord::Put(key, value)])
}

/// Remove a key from the storage by writing an empty value.
/// 删除 `key` 写入空的 value
pub fn delete(&self, key: &[u8]) -> Result<()> {
assert!(!key.is_empty(), "key cannot be empty");
let size;
{
let guard = self.state.read();
guard.active_memtable.put(key, b"")?;
size = guard.active_memtable.approximate_size();
}
self.try_freeze(size)?;
self.write_batch(&[WriteBatchRecord::Del(key)])
}

/// 批量写入
pub fn write_batch<T: AsRef<[u8]>>(&self, batch: &[WriteBatchRecord<T>]) -> Result<()> {
for record in batch {
match record {
WriteBatchRecord::Put(key, value) => {
let key = key.as_ref();
let value = value.as_ref();
assert!(!key.is_empty(), "key cannot be empty");
assert!(!value.is_empty(), "value cannot be empty");
let size;
{
let guard = self.state.read();
guard.active_memtable.put(key, value)?;
size = guard.active_memtable.approximate_size();
}
self.try_freeze(size)?;
},
WriteBatchRecord::Del(key) => {
let key = key.as_ref();
assert!(!key.is_empty(), "key cannot be empty");
let size;
{
let guard = self.state.read();
guard.active_memtable.put(key, b"")?;
size = guard.active_memtable.approximate_size();
}
self.try_freeze(size)?;
}
}
}
Ok(())
}

Expand Down

0 comments on commit 2f460b5

Please sign in to comment.