Skip to content

Commit

Permalink
resolve comments
Browse files Browse the repository at this point in the history
  • Loading branch information
wcy-fdu committed Mar 6, 2024
1 parent dc63913 commit 60b4a2a
Show file tree
Hide file tree
Showing 4 changed files with 43 additions and 55 deletions.
2 changes: 1 addition & 1 deletion src/storage/hummock_test/src/hummock_read_version_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ async fn test_read_version_basic() {

read_version.update(VersionUpdate::Staging(StagingData::ImmMem(imm)));

let key = iterator_test_table_key_of((1) as usize);
let key = iterator_test_table_key_of(1_usize);
let key_range = map_table_key_range((
Bound::Included(Bytes::from(key.to_vec())),
Bound::Included(Bytes::from(key.to_vec())),
Expand Down
6 changes: 3 additions & 3 deletions src/stream/src/common/log_store_impl/in_mem.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use std::sync::Arc;
use anyhow::{anyhow, Context};
use risingwave_common::array::StreamChunk;
use risingwave_common::buffer::Bitmap;
use risingwave_common::util::epoch::{test_epoch, EpochPair, INVALID_EPOCH};
use risingwave_common::util::epoch::{EpochExt, EpochPair, INVALID_EPOCH};
use risingwave_connector::sink::log_store::{
LogReader, LogStoreFactory, LogStoreReadItem, LogStoreResult, LogWriter, TruncateOffset,
};
Expand Down Expand Up @@ -133,10 +133,10 @@ impl LogReader for BoundedInMemLogStoreReader {
assert_eq!(self.epoch_progress, UNINITIALIZED);
self.epoch_progress = LogReaderEpochProgress::Consuming(epoch);
self.latest_offset = TruncateOffset::Barrier {
epoch: epoch - test_epoch(1),
epoch: epoch.prev_epoch(),
};
self.truncate_offset = TruncateOffset::Barrier {
epoch: epoch - test_epoch(1),
epoch: epoch.prev_epoch(),
};
Ok(())
}
Expand Down
84 changes: 36 additions & 48 deletions src/stream/src/common/log_store_impl/kv_log_store/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -328,14 +328,12 @@ mod tests {
);
let (mut reader, mut writer) = factory.build().await;

let epoch1 = test_epoch(
test_env
.storage
.get_pinned_version()
.version()
.max_committed_epoch
+ 1,
);
let epoch1 = test_env
.storage
.get_pinned_version()
.version()
.max_committed_epoch
.next_epoch();
writer
.init(EpochPair::new_test_epoch(epoch1), false)
.await
Expand Down Expand Up @@ -424,14 +422,12 @@ mod tests {
);
let (mut reader, mut writer) = factory.build().await;

let epoch1 = test_epoch(
test_env
.storage
.get_pinned_version()
.version()
.max_committed_epoch
+ 1,
);
let epoch1 = test_env
.storage
.get_pinned_version()
.version()
.max_committed_epoch
.next_epoch();
writer
.init(EpochPair::new_test_epoch(epoch1), false)
.await
Expand Down Expand Up @@ -610,14 +606,12 @@ mod tests {
);
let (mut reader, mut writer) = factory.build().await;

let epoch1 = test_epoch(
test_env
.storage
.get_pinned_version()
.version()
.max_committed_epoch
+ 1,
);
let epoch1 = test_env
.storage
.get_pinned_version()
.version()
.max_committed_epoch
.next_epoch();
writer
.init(EpochPair::new_test_epoch(epoch1), false)
.await
Expand Down Expand Up @@ -838,14 +832,12 @@ mod tests {
let (mut reader1, mut writer1) = factory1.build().await;
let (mut reader2, mut writer2) = factory2.build().await;

let epoch1 = test_epoch(
test_env
.storage
.get_pinned_version()
.version()
.max_committed_epoch
+ 1,
);
let epoch1 = test_env
.storage
.get_pinned_version()
.version()
.max_committed_epoch
.next_epoch();
writer1
.init(EpochPair::new_test_epoch(epoch1), false)
.await
Expand Down Expand Up @@ -1032,14 +1024,12 @@ mod tests {
);
let (mut reader, mut writer) = factory.build().await;

let epoch1 = test_epoch(
test_env
.storage
.get_pinned_version()
.version()
.max_committed_epoch
+ 1,
);
let epoch1 = test_env
.storage
.get_pinned_version()
.version()
.max_committed_epoch
.next_epoch();
writer
.init(EpochPair::new_test_epoch(epoch1), false)
.await
Expand Down Expand Up @@ -1175,14 +1165,12 @@ mod tests {
);
let (mut reader, mut writer) = factory.build().await;

let epoch1 = test_epoch(
test_env
.storage
.get_pinned_version()
.version()
.max_committed_epoch
+ 1,
);
let epoch1 = test_env
.storage
.get_pinned_version()
.version()
.max_committed_epoch
.next_epoch();
writer
.init(EpochPair::new_test_epoch(epoch1), false)
.await
Expand Down
6 changes: 3 additions & 3 deletions src/stream/src/common/log_store_impl/kv_log_store/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use risingwave_common::cache::CachePriority;
use risingwave_common::catalog::TableId;
use risingwave_common::hash::VnodeBitmapExt;
use risingwave_common::metrics::{LabelGuardedHistogram, LabelGuardedIntCounter};
use risingwave_common::util::epoch::test_epoch;
use risingwave_common::util::epoch::EpochExt;
use risingwave_connector::sink::log_store::{
ChunkId, LogReader, LogStoreReadItem, LogStoreResult, TruncateOffset,
};
Expand Down Expand Up @@ -190,7 +190,7 @@ impl<S: StateStore> KvLogStoreReader<S> {
// start from the next epoch of last_persisted_epoch
Included(
self.serde
.serialize_epoch(last_persisted_epoch + test_epoch(1)),
.serialize_epoch(last_persisted_epoch.next_epoch()),
)
} else {
Unbounded
Expand Down Expand Up @@ -478,7 +478,7 @@ impl<S: StateStore> LogReader for KvLogStoreReader<S> {
let persisted_epoch =
self.truncate_offset
.map(|truncate_offset| match truncate_offset {
TruncateOffset::Chunk { epoch, .. } => epoch - test_epoch(1),
TruncateOffset::Chunk { epoch, .. } => epoch.prev_epoch(),
TruncateOffset::Barrier { epoch } => epoch,
});
self.state_store_stream = Some(self.read_persisted_log_store(persisted_epoch).await?);
Expand Down

0 comments on commit 60b4a2a

Please sign in to comment.