Skip to content

Commit

Permalink
refactor: refine replace into by caching individual BlockMeta
Browse files Browse the repository at this point in the history
  • Loading branch information
dantengsky committed Jan 25, 2025
1 parent e057f2e commit 21c1796
Show file tree
Hide file tree
Showing 3 changed files with 51 additions and 20 deletions.
6 changes: 3 additions & 3 deletions src/query/storages/common/cache/src/caches.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ pub type CompactSegmentInfoCache = InMemoryLruCache<CompactSegmentInfo>;
pub type SegmentBlockMetasCache = InMemoryLruCache<Vec<Arc<BlockMeta>>>;

/// In-memory cache of individual BlockMeta.
pub type BlockMetaCache = InMemoryLruCache<Arc<BlockMeta>>;
pub type BlockMetaCache = InMemoryLruCache<BlockMeta>;

/// In memory object cache of TableSnapshot
pub type TableSnapshotCache = InMemoryLruCache<TableSnapshot>;
Expand Down Expand Up @@ -186,8 +186,8 @@ impl From<Vec<Arc<BlockMeta>>> for CacheValue<Vec<Arc<BlockMeta>>> {
}
}

impl From<Arc<BlockMeta>> for CacheValue<Arc<BlockMeta>> {
fn from(value: Arc<BlockMeta>) -> Self {
impl From<BlockMeta> for CacheValue<BlockMeta> {
fn from(value: BlockMeta) -> Self {
CacheValue {
inner: Arc::new(value),
mem_bytes: 0,
Expand Down
1 change: 1 addition & 0 deletions src/query/storages/common/cache/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ mod temp_dir;

pub use cache::CacheAccessor;
pub use cache::Unit;
pub use caches::BlockMetaCache;
pub use caches::CacheValue;
pub use caches::CachedObject;
pub use caches::SegmentBlockMetasCache;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@ use databend_common_metrics::storage::*;
use databend_common_sql::evaluator::BlockOperator;
use databend_common_sql::executor::physical_plans::OnConflictField;
use databend_common_sql::StreamContext;
use databend_storages_common_cache::BlockMetaCache;
use databend_storages_common_cache::CacheAccessor;
use databend_storages_common_cache::CacheManager;
use databend_storages_common_cache::LoadParams;
use databend_storages_common_index::filters::Filter;
use databend_storages_common_index::filters::Xor8Filter;
Expand Down Expand Up @@ -100,6 +103,8 @@ struct AggregationContext {
io_request_semaphore: Arc<Semaphore>,
// generate stream columns if necessary
stream_ctx: Option<StreamContext>,

block_meta_cache: Option<BlockMetaCache>,
}

// Apply MergeIntoOperations to segments
Expand Down Expand Up @@ -209,6 +214,7 @@ impl ReplaceIntoOperationAggregator {
block_builder,
io_request_semaphore,
stream_ctx,
block_meta_cache: CacheManager::instance().get_block_meta_cache(),
}),
})
}
Expand Down Expand Up @@ -291,6 +297,8 @@ impl ReplaceIntoOperationAggregator {
impl ReplaceIntoOperationAggregator {
#[async_backtrace::framed]
pub async fn apply(&mut self) -> Result<Option<MutationLogs>> {
let block_meta_cache = &self.aggregation_ctx.block_meta_cache;

metrics_inc_replace_number_apply_deletion();

// track number of segments and blocks after pruning (per merge action application)
Expand All @@ -317,7 +325,7 @@ impl ReplaceIntoOperationAggregator {
let mut mutation_log_handlers = Vec::new();
let mut num_rows_mutated = 0;
for (segment_idx, block_deletion) in self.deletion_accumulator.deletions.drain() {
let (path, ver) = self
let (segment_path, ver) = self
.aggregation_ctx
.segment_locations
.get(&segment_idx)
Expand All @@ -329,19 +337,41 @@ impl ReplaceIntoOperationAggregator {
})?;

let load_param = LoadParams {
location: path.clone(),
location: segment_path.clone(),
len_hint: None,
ver: *ver,
put_cache: true,
};

let compact_segment_info = aggregation_ctx.segment_reader.read(&load_param).await?;
let segment_info: SegmentInfo = compact_segment_info.try_into()?;
// Retain SegmentInfo to avoid repeatedly extracting it from CompactSegmentInfo later.
let mut opt_segment_info: Option<SegmentInfo> = None;

for (block_index, keys) in block_deletion {
let block_cache_key = format!("{segment_path}-{block_index}");
let block_meta = match block_meta_cache.get(&block_cache_key) {
Some(block_meta) => block_meta,
None => {
let block_meta = if let Some(segment_info) = &opt_segment_info {
segment_info.blocks[block_index].clone()
} else {
let compact_segment_info =
aggregation_ctx.segment_reader.read(&load_param).await?;
let segment_info: SegmentInfo = compact_segment_info.try_into()?;
let block_meta = segment_info.blocks[block_index].clone();
opt_segment_info = Some(segment_info);
block_meta
};
// A query node typically processes only a subset of the BlockMeta in a given segment.
// Therefore, even though all BlockMeta of a segment are available here, not all are populated into the cache.
block_meta_cache.insert(block_cache_key, block_meta.as_ref().clone());
block_meta
}
};

let permit =
acquire_task_permit(aggregation_ctx.io_request_semaphore.clone()).await?;
let block_meta = segment_info.blocks[block_index].clone();

// let block_meta = segment_info.blocks[block_index].clone();
let aggregation_ctx = aggregation_ctx.clone();
num_rows_mutated += block_meta.row_count;
// self.aggregation_ctx.
Expand Down Expand Up @@ -604,7 +634,7 @@ impl AggregationContext {
if let Some(stats) = column_stats {
let max = stats.max();
let min = stats.min();
std::cmp::min(key_max, max) >= std::cmp::max(key_min,min)
std::cmp::min(key_max, max) >= std::cmp::max(key_min, min)
|| // coincide overlap
(max == key_max && min == key_min)
} else {
Expand All @@ -630,22 +660,22 @@ impl AggregationContext {
let reader = reader.clone();
GlobalIORuntime::instance()
.spawn(async move {
let column_chunks = merged_io_read_result.columns_chunks()?;
reader.deserialize_chunks(
block_meta_ptr.location.0.as_str(),
block_meta_ptr.row_count as usize,
&block_meta_ptr.compression,
&block_meta_ptr.col_metas,
column_chunks,
&storage_format,
)
})
let column_chunks = merged_io_read_result.columns_chunks()?;
reader.deserialize_chunks(
block_meta_ptr.location.0.as_str(),
block_meta_ptr.row_count as usize,
&block_meta_ptr.compression,
&block_meta_ptr.col_metas,
column_chunks,
&storage_format,
)
})
.await
.map_err(|e| {
ErrorCode::Internal(
"unexpected, failed to join aggregation context read block tasks for replace into.",
)
.add_message_back(e.to_string())
.add_message_back(e.to_string())
})?
}

Expand Down

0 comments on commit 21c1796

Please sign in to comment.