Skip to content

Commit

Permalink
compare result
Browse files Browse the repository at this point in the history
  • Loading branch information
wenym1 committed Feb 5, 2024
1 parent 03579ae commit cd79045
Show file tree
Hide file tree
Showing 2 changed files with 98 additions and 0 deletions.
18 changes: 18 additions & 0 deletions src/storage/src/hummock/event_handler/hummock_event_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use await_tree::InstrumentAwait;
use itertools::Itertools;
use parking_lot::{Mutex, RwLock};
use prometheus::core::{AtomicU64, GenericGauge};
use risingwave_hummock_sdk::version::HummockVersion;
use risingwave_hummock_sdk::{HummockEpoch, LocalSstableInfo};
use risingwave_pb::stream_plan::StreamActor;
use thiserror_ext::AsReport;
Expand Down Expand Up @@ -111,6 +112,17 @@ pub struct BufferTracker {
global_upload_task_size: GenericGauge<AtomicU64>,
}

pub static VERSION_STORE: LazyLock<Mutex<HashMap<u64, HummockVersion>>> =
LazyLock::new(|| Mutex::new(HashMap::new()));

fn store_version(version: HummockVersion) {
VERSION_STORE.lock().insert(version.id, version);
}

pub fn get_version(id: u64) -> HummockVersion {
VERSION_STORE.lock().get(&id).unwrap().clone()
}

impl BufferTracker {
pub fn from_storage_opts(
config: &StorageOpts,
Expand Down Expand Up @@ -494,6 +506,10 @@ impl HummockEventHandler {
.map(Arc::new)
.unwrap_or_else(|| self.pinned_version.load().clone());

if pinned_version.id() == 48311 {
warn!(?version_payload, "update on target version");
}

let mut sst_delta_infos = vec![];
let newly_pinned_version = match version_payload {
HummockVersionUpdate::VersionDeltas(version_deltas) => {
Expand Down Expand Up @@ -565,6 +581,8 @@ impl HummockEventHandler {
"update hummock version"
);

store_version(new_pinned_version.version().clone());

self.uploader.update_pinned_version(new_pinned_version);
}
}
Expand Down
80 changes: 80 additions & 0 deletions src/storage/src/hummock/store/local_hummock_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,25 +14,30 @@

use std::future::Future;
use std::ops::Bound;
use std::ops::Bound::Unbounded;
use std::sync::Arc;

use await_tree::InstrumentAwait;
use bytes::Bytes;
use futures::TryStreamExt;
use itertools::Itertools;
use risingwave_common::catalog::{TableId, TableOption};
use risingwave_common::util::epoch::MAX_SPILL_TIMES;
use risingwave_hummock_sdk::key::{is_empty_key_range, TableKey, TableKeyRange};
use risingwave_hummock_sdk::{EpochWithGap, HummockEpoch};
use tokio::sync::mpsc;
use tokio::sync::mpsc::unbounded_channel;
use tracing::{warn, Instrument};

use super::version::{StagingData, VersionUpdate};
use crate::error::StorageResult;
use crate::hummock::event_handler::hummock_event_handler::get_version;
use crate::hummock::event_handler::{HummockEvent, HummockReadVersionRef, LocalInstanceGuard};
use crate::hummock::iterator::{
ConcatIteratorInner, Forward, HummockIteratorUnion, MergeIterator, SkipWatermarkIterator,
UserIterator,
};
use crate::hummock::local_version::pinned_version::PinnedVersion;
use crate::hummock::shared_buffer::shared_buffer_batch::{
SharedBufferBatch, SharedBufferBatchIterator,
};
Expand Down Expand Up @@ -126,9 +131,84 @@ impl LocalHummockStorage {
version_id = read_snapshot.2.id(),
"get value"
);

if read_snapshot.2.id() == 48312 {
let prev_version = get_version(48311);

let origin_return = self
.hummock_version_reader
.get(
table_key.clone(),
epoch,
read_options.clone(),
read_snapshot.clone(),
)
.await
.inspect_err(|_| warn!("get error when making original request"))?;

let origin_scan = self
.hummock_version_reader
.iter(
(Unbounded, Unbounded),
epoch,
read_options.clone(),
read_snapshot.clone(),
)
.await
.inspect_err(|_| warn!("get error when create iter on original version"))?
.try_collect::<Vec<_>>()
.await
.inspect_err(|_| warn!("get error when scan on original version"))?;

let read_snapshot = (
read_snapshot.0,
read_snapshot.1,
PinnedVersion::new(prev_version, unbounded_channel().0),
read_snapshot.3,
);

let new_return = self
.hummock_version_reader
.get(
table_key.clone(),
epoch,
read_options.clone(),
read_snapshot.clone(),
)
.await
.inspect_err(|_| warn!("get error when making new request"))?;

let new_scan = self
.hummock_version_reader
.iter(
(Unbounded, Unbounded),
epoch,
read_options.clone(),
read_snapshot.clone(),
)
.await
.inspect_err(|_| warn!("get error when create iter on new version"))?
.try_collect::<Vec<_>>()
.await
.inspect_err(|_| warn!("get error when scan on new version"))?;

warn!(?origin_return, ?new_return, "compare result");
warn!(
?origin_scan,
?new_scan,
eq = origin_scan == new_scan,
"compare scan"
);

return Ok(origin_return);
}
}

if is_empty_key_range(&table_key_range) {
warn!(
"return None with empty range of table watermark: {:?}",
table_key_range
);
return Ok(None);
}

Expand Down

0 comments on commit cd79045

Please sign in to comment.