From cd790455e3fa96b70f2afb51599ec539cd5a1c5c Mon Sep 17 00:00:00 2001 From: William Wen Date: Tue, 6 Feb 2024 00:44:38 +0800 Subject: [PATCH] compare result --- .../event_handler/hummock_event_handler.rs | 18 +++++ .../hummock/store/local_hummock_storage.rs | 80 +++++++++++++++++++ 2 files changed, 98 insertions(+) diff --git a/src/storage/src/hummock/event_handler/hummock_event_handler.rs b/src/storage/src/hummock/event_handler/hummock_event_handler.rs index fd4dd2b709fb4..0ac917baf444c 100644 --- a/src/storage/src/hummock/event_handler/hummock_event_handler.rs +++ b/src/storage/src/hummock/event_handler/hummock_event_handler.rs @@ -22,6 +22,7 @@ use await_tree::InstrumentAwait; use itertools::Itertools; use parking_lot::{Mutex, RwLock}; use prometheus::core::{AtomicU64, GenericGauge}; +use risingwave_hummock_sdk::version::HummockVersion; use risingwave_hummock_sdk::{HummockEpoch, LocalSstableInfo}; use risingwave_pb::stream_plan::StreamActor; use thiserror_ext::AsReport; @@ -111,6 +112,17 @@ pub struct BufferTracker { global_upload_task_size: GenericGauge, } +pub static VERSION_STORE: LazyLock>> = + LazyLock::new(|| Mutex::new(HashMap::new())); + +fn store_version(version: HummockVersion) { + VERSION_STORE.lock().insert(version.id, version); +} + +pub fn get_version(id: u64) -> HummockVersion { + VERSION_STORE.lock().get(&id).unwrap().clone() +} + impl BufferTracker { pub fn from_storage_opts( config: &StorageOpts, @@ -494,6 +506,10 @@ impl HummockEventHandler { .map(Arc::new) .unwrap_or_else(|| self.pinned_version.load().clone()); + if pinned_version.id() == 48311 { + warn!(?version_payload, "update on target version"); + } + let mut sst_delta_infos = vec![]; let newly_pinned_version = match version_payload { HummockVersionUpdate::VersionDeltas(version_deltas) => { @@ -565,6 +581,8 @@ impl HummockEventHandler { "update hummock version" ); + store_version(new_pinned_version.version().clone()); + self.uploader.update_pinned_version(new_pinned_version); } } diff --git a/src/storage/src/hummock/store/local_hummock_storage.rs b/src/storage/src/hummock/store/local_hummock_storage.rs index 4d5898187cf46..a1148535e1d47 100644 --- a/src/storage/src/hummock/store/local_hummock_storage.rs +++ b/src/storage/src/hummock/store/local_hummock_storage.rs @@ -14,25 +14,30 @@ use std::future::Future; use std::ops::Bound; +use std::ops::Bound::Unbounded; use std::sync::Arc; use await_tree::InstrumentAwait; use bytes::Bytes; +use futures::TryStreamExt; use itertools::Itertools; use risingwave_common::catalog::{TableId, TableOption}; use risingwave_common::util::epoch::MAX_SPILL_TIMES; use risingwave_hummock_sdk::key::{is_empty_key_range, TableKey, TableKeyRange}; use risingwave_hummock_sdk::{EpochWithGap, HummockEpoch}; use tokio::sync::mpsc; +use tokio::sync::mpsc::unbounded_channel; use tracing::{warn, Instrument}; use super::version::{StagingData, VersionUpdate}; use crate::error::StorageResult; +use crate::hummock::event_handler::hummock_event_handler::get_version; use crate::hummock::event_handler::{HummockEvent, HummockReadVersionRef, LocalInstanceGuard}; use crate::hummock::iterator::{ ConcatIteratorInner, Forward, HummockIteratorUnion, MergeIterator, SkipWatermarkIterator, UserIterator, }; +use crate::hummock::local_version::pinned_version::PinnedVersion; use crate::hummock::shared_buffer::shared_buffer_batch::{ SharedBufferBatch, SharedBufferBatchIterator, }; @@ -126,9 +131,84 @@ impl LocalHummockStorage { version_id = read_snapshot.2.id(), "get value" ); + + if read_snapshot.2.id() == 48312 { + let prev_version = get_version(48311); + + let origin_return = self + .hummock_version_reader + .get( + table_key.clone(), + epoch, + read_options.clone(), + read_snapshot.clone(), + ) + .await + .inspect_err(|_| warn!("get error when making original request"))?; + + let origin_scan = self + .hummock_version_reader + .iter( + (Unbounded, Unbounded), + epoch, + read_options.clone(), + read_snapshot.clone(), + ) + .await + .inspect_err(|_| warn!("get error when create iter on original version"))? + .try_collect::>() + .await + .inspect_err(|_| warn!("get error when scan on original version"))?; + + let read_snapshot = ( + read_snapshot.0, + read_snapshot.1, + PinnedVersion::new(prev_version, unbounded_channel().0), + read_snapshot.3, + ); + + let new_return = self + .hummock_version_reader + .get( + table_key.clone(), + epoch, + read_options.clone(), + read_snapshot.clone(), + ) + .await + .inspect_err(|_| warn!("get error when making new request"))?; + + let new_scan = self + .hummock_version_reader + .iter( + (Unbounded, Unbounded), + epoch, + read_options.clone(), + read_snapshot.clone(), + ) + .await + .inspect_err(|_| warn!("get error when create iter on new version"))? + .try_collect::>() + .await + .inspect_err(|_| warn!("get error when scan on new version"))?; + + warn!(?origin_return, ?new_return, "compare result"); + warn!( + ?origin_scan, + ?new_scan, + eq = origin_scan == new_scan, + "compare scan" + ); + + return Ok(origin_return); + } } if is_empty_key_range(&table_key_range) { + warn!( + "return None with empty range of table watermark: {:?}", + table_key_range + ); return Ok(None); }