Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(storage): add table_id parameter in state store write and sync method for future partial checkpoint support #1812

Closed
wants to merge 9 commits into from
27 changes: 0 additions & 27 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

16 changes: 13 additions & 3 deletions src/bench/ss_bench/operations/write_batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use risingwave_meta::hummock::MockHummockMetaClient;
use risingwave_rpc_client::HummockMetaClient;
use risingwave_storage::hummock::compactor::{Compactor, CompactorContext};
use risingwave_storage::storage_value::StorageValue;
use risingwave_storage::store::GLOBAL_STORAGE_TABLE_ID;
use risingwave_storage::StateStore;

use super::{Batch, Operations, PerfMetrics};
Expand Down Expand Up @@ -187,14 +188,23 @@ impl Operations {
.map(|(k, v)| (k, StorageValue::new(Default::default(), v)))
.collect_vec();
let epoch = ctx.epoch.load(Ordering::Acquire);
store.ingest_batch(batch, epoch).await.unwrap();
store
.ingest_batch(batch, epoch, GLOBAL_STORAGE_TABLE_ID)
.await
.unwrap();
let last_batch = i + 1 == l;
if ctx.epoch_barrier_finish(last_batch) {
store.sync(Some(epoch)).await.unwrap();
store
.sync(Some(epoch), Some(vec![GLOBAL_STORAGE_TABLE_ID]))
.await
.unwrap();
ctx.meta_client.commit_epoch(epoch).await.unwrap();
ctx.epoch.fetch_add(1, Ordering::SeqCst);
}
store.wait_epoch(epoch).await.unwrap();
store
.wait_epoch(epoch, GLOBAL_STORAGE_TABLE_ID)
.await
.unwrap();
let time_nano = start.elapsed().as_nanos();
latencies.push(time_nano);
}
Expand Down
2 changes: 1 addition & 1 deletion src/connector/src/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ impl<S: StateStore> SourceStateHandler<S> {
// TODO should be a clear Error Code
Err(anyhow!("states require not null"))
} else {
let mut write_batch = self.keyspace.state_store().start_write_batch();
let mut write_batch = self.keyspace.start_write_batch();
let mut local_batch = write_batch.prefixify(&self.keyspace);
states.iter().for_each(|state| {
// state inner key format (state_identifier | epoch)
Expand Down
1 change: 0 additions & 1 deletion src/storage/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ byteorder = "1"
bytes = { version = "1", features = ["serde"] }
chrono = "0.4"
crc32fast = "1"
crossbeam = "0.8.1"
dashmap = { version = "5", default-features = false }
either = "1"
enum-as-inner = "0.4"
Expand Down
7 changes: 6 additions & 1 deletion src/storage/src/hummock/compactor_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ mod tests {
use crate::monitor::StateStoreMetrics;
use crate::object::{InMemObjectStore, ObjectStoreImpl};
use crate::storage_value::StorageValue;
use crate::store::GLOBAL_STORAGE_TABLE_ID;
use crate::StateStore;

async fn get_hummock_storage(
Expand Down Expand Up @@ -102,10 +103,14 @@ mod tests {
.ingest_batch(
vec![(key.clone(), StorageValue::new_default_put(val.clone()))],
epoch,
GLOBAL_STORAGE_TABLE_ID,
)
.await
.unwrap();
storage.sync(Some(epoch)).await.unwrap();
storage
.sync(Some(epoch), Some(vec![GLOBAL_STORAGE_TABLE_ID]))
.await
.unwrap();
hummock_meta_client.commit_epoch(epoch).await.unwrap();
}

Expand Down
129 changes: 92 additions & 37 deletions src/storage/src/hummock/conflict_detector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,51 +14,63 @@

//! This mod implements a `ConflictDetector` that detect write key conflict in each epoch

use std::collections::HashSet;
use std::collections::{HashMap, HashSet};

use bytes::Bytes;
use crossbeam::atomic::AtomicCell;
use dashmap::DashMap;

use crate::hummock::value::HummockValue;
use crate::hummock::HummockEpoch;
use crate::store::StorageTableId;

pub struct ConflictDetector {
// epoch -> key-sets
epoch_history: DashMap<HummockEpoch, HashSet<Bytes>>,
epoch_watermark: AtomicCell<HummockEpoch>,
// epoch -> table_id -> key-sets
epoch_history: DashMap<HummockEpoch, HashMap<StorageTableId, HashSet<Vec<u8>>>>,
table_epoch_watermark: DashMap<StorageTableId, HummockEpoch>,
}

impl ConflictDetector {
pub fn new() -> ConflictDetector {
ConflictDetector {
epoch_history: DashMap::new(),
epoch_watermark: AtomicCell::new(HummockEpoch::MIN),
table_epoch_watermark: DashMap::new(),
}
}

pub fn get_epoch_watermark(&self) -> HummockEpoch {
self.epoch_watermark.load()
pub fn get_epoch_watermark(&self, table_id: StorageTableId) -> HummockEpoch {
self.table_epoch_watermark
.get(&table_id)
.map(|entry| *entry.value())
.unwrap_or(HummockEpoch::MIN)
}

// Sets the new watermark with CAS to enable detection in concurrent update
pub fn set_watermark(&self, epoch: HummockEpoch) {
loop {
let current_watermark = self.get_epoch_watermark();
assert!(
epoch > current_watermark,
"not allowed to set epoch watermark to equal to or lower than current watermark: current is {}, epoch to set {}",
current_watermark,
epoch
);
if self
.epoch_watermark
.compare_exchange(current_watermark, epoch)
.is_ok()
{
return;
}
}
pub fn set_single_table_watermark(&self, epoch: HummockEpoch, table_id: StorageTableId) {
let mut table_watermark = self
.table_epoch_watermark
.entry(table_id)
.or_insert(HummockEpoch::MIN);

assert!(
epoch > *table_watermark.value(),
"not allowed to set epoch watermark to equal to or lower than current watermark: current is {}, epoch to set {}",
*table_watermark.value(),
epoch
);

*table_watermark = epoch;
}

pub fn set_watermark(&self, epoch: HummockEpoch, table_ids: Option<&Vec<StorageTableId>>) {
let table_ids = table_ids.cloned().unwrap_or_else(|| {
self.table_epoch_watermark
.iter()
.map(|entry| *entry.key())
.collect()
});

table_ids.iter().for_each(|table_id| {
self.set_single_table_watermark(epoch, *table_id);
});
}

/// Checks whether there is key conflict for the given `kv_pairs` and adds the key in `kv_pairs`
Expand All @@ -68,29 +80,60 @@ impl ConflictDetector {
&self,
kv_pairs: &[(Bytes, HummockValue<Bytes>)],
epoch: HummockEpoch,
table_id: StorageTableId,
) {
assert!(
epoch > self.get_epoch_watermark(),
epoch > self.get_epoch_watermark(table_id),
"write to an archived epoch: {}",
epoch
);

let mut written_key = self.epoch_history.entry(epoch).or_insert(HashSet::new());
let mut epoch_written_key = self.epoch_history.entry(epoch).or_insert(HashMap::new());
// check whether the key has been written in the epoch in any table
epoch_written_key.values().for_each(|table_written_key| {
for (key, value) in kv_pairs {
assert!(
!table_written_key.contains(&key.to_vec()),
"key {:?} is written again after previously written, value is {:?}",
key,
value,
);
}
});

let table_written_key = epoch_written_key
.entry(table_id)
.or_insert_with(HashSet::new);

// add the keys to history
for (key, value) in kv_pairs.iter() {
assert!(
written_key.insert(key.clone()),
table_written_key.insert(key.to_vec()),
"key {:?} is written again after previously written, value is {:?}",
key,
value,
);
}
}

/// Archives an epoch. An archived epoch cannot be written anymore.
pub fn archive_epoch(&self, epoch: HummockEpoch) {
self.epoch_history.remove(&epoch);
self.set_watermark(epoch);
/// Archives an epoch for a storage table. An archived epoch cannot be written anymore in the
/// storage table.
///
/// `table_ids` is an optional parameter that specifies which storage tables to archive. If
/// `None`, all tables are archived.
pub fn archive_epoch(&self, epoch: HummockEpoch, table_ids: Option<&Vec<StorageTableId>>) {
if let Some(mut epoch_history) = self.epoch_history.get_mut(&epoch) {
if let Some(table_ids) = table_ids {
for table_id in table_ids {
epoch_history.remove(table_id);
}
} else {
epoch_history.clear();
}
}
self.epoch_history
.remove_if(&epoch, |_, epoch_history| epoch_history.is_empty());
self.set_watermark(epoch, table_ids);
}
}

Expand All @@ -103,6 +146,7 @@ mod test {

use crate::hummock::conflict_detector::ConflictDetector;
use crate::hummock::value::HummockValue;
use crate::store::GLOBAL_STORAGE_TABLE_ID;

#[test]
#[should_panic]
Expand All @@ -120,6 +164,7 @@ mod test {
.collect_vec()
.as_slice(),
233,
GLOBAL_STORAGE_TABLE_ID,
);
}

Expand All @@ -135,6 +180,7 @@ mod test {
.collect_vec()
.as_slice(),
233,
GLOBAL_STORAGE_TABLE_ID,
);
detector.check_conflict_and_track_write_batch(
once((
Expand All @@ -144,6 +190,7 @@ mod test {
.collect_vec()
.as_slice(),
233,
GLOBAL_STORAGE_TABLE_ID,
);
}

Expand All @@ -158,6 +205,7 @@ mod test {
.collect_vec()
.as_slice(),
233,
GLOBAL_STORAGE_TABLE_ID,
);
detector.check_conflict_and_track_write_batch(
once((
Expand All @@ -167,8 +215,9 @@ mod test {
.collect_vec()
.as_slice(),
233,
GLOBAL_STORAGE_TABLE_ID,
);
detector.archive_epoch(233);
detector.archive_epoch(233, Some(&vec![GLOBAL_STORAGE_TABLE_ID]));
detector.check_conflict_and_track_write_batch(
once((
Bytes::from("key1"),
Expand All @@ -177,6 +226,7 @@ mod test {
.collect_vec()
.as_slice(),
234,
GLOBAL_STORAGE_TABLE_ID,
);
}

Expand All @@ -192,8 +242,9 @@ mod test {
.collect_vec()
.as_slice(),
233,
GLOBAL_STORAGE_TABLE_ID,
);
detector.archive_epoch(233);
detector.archive_epoch(233, Some(&vec![GLOBAL_STORAGE_TABLE_ID]));
detector.check_conflict_and_track_write_batch(
once((
Bytes::from("key1"),
Expand All @@ -202,6 +253,7 @@ mod test {
.collect_vec()
.as_slice(),
233,
GLOBAL_STORAGE_TABLE_ID,
);
}

Expand All @@ -216,9 +268,10 @@ mod test {
.collect_vec()
.as_slice(),
233,
GLOBAL_STORAGE_TABLE_ID,
);
assert!(!detector.epoch_history.get(&233).unwrap().is_empty());
detector.archive_epoch(233);
detector.archive_epoch(233, Some(&vec![GLOBAL_STORAGE_TABLE_ID]));
assert!(detector.epoch_history.get(&233).is_none());
}

Expand All @@ -234,8 +287,9 @@ mod test {
.collect_vec()
.as_slice(),
233,
GLOBAL_STORAGE_TABLE_ID,
);
detector.archive_epoch(233);
detector.archive_epoch(233, Some(&vec![GLOBAL_STORAGE_TABLE_ID]));
detector.check_conflict_and_track_write_batch(
once((
Bytes::from("key1"),
Expand All @@ -244,6 +298,7 @@ mod test {
.collect_vec()
.as_slice(),
232,
GLOBAL_STORAGE_TABLE_ID,
);
}
}
Loading