diff --git a/Cargo.lock b/Cargo.lock index c23acf60636d..8f5b1037193b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5270,6 +5270,7 @@ dependencies = [ name = "index" version = "0.12.0" dependencies = [ + "async-stream", "async-trait", "asynchronous-codec", "bytemuck", diff --git a/src/index/Cargo.toml b/src/index/Cargo.toml index f46c64a17606..f91a8eac9708 100644 --- a/src/index/Cargo.toml +++ b/src/index/Cargo.toml @@ -8,6 +8,7 @@ license.workspace = true workspace = true [dependencies] +async-stream.workspace = true async-trait.workspace = true asynchronous-codec = "0.7.0" bytemuck.workspace = true diff --git a/src/index/src/bloom_filter/creator.rs b/src/index/src/bloom_filter/creator.rs index b3c95d3a7626..04d2edc3d1b3 100644 --- a/src/index/src/bloom_filter/creator.rs +++ b/src/index/src/bloom_filter/creator.rs @@ -12,21 +12,27 @@ // See the License for the specific language governing permissions and // limitations under the License. +mod finalize_segment; +mod intermediate_codec; + use std::collections::HashSet; +use std::sync::atomic::{AtomicUsize, Ordering}; +use std::sync::Arc; -use fastbloom::BloomFilter; -use futures::{AsyncWrite, AsyncWriteExt}; +use finalize_segment::FinalizedBloomFilterStorage; +use futures::{AsyncWrite, AsyncWriteExt, StreamExt}; use snafu::ResultExt; use super::error::{IoSnafu, SerdeJsonSnafu}; use crate::bloom_filter::error::Result; use crate::bloom_filter::{BloomFilterMeta, BloomFilterSegmentLocation, Bytes}; +use crate::external_provider::ExternalTempFileProvider; /// The seed used for the Bloom filter. -const SEED: u128 = 42; +pub const SEED: u128 = 42; /// The false positive rate of the Bloom filter. -const FALSE_POSITIVE_RATE: f64 = 0.01; +pub const FALSE_POSITIVE_RATE: f64 = 0.01; /// `BloomFilterCreator` is responsible for creating and managing bloom filters /// for a set of elements. It divides the rows into segments and creates @@ -58,6 +64,9 @@ pub struct BloomFilterCreator { /// Storage for finalized Bloom filters. finalized_bloom_filters: FinalizedBloomFilterStorage, + + /// Global memory usage of the bloom filter creator. + global_memory_usage: Arc, } impl BloomFilterCreator { @@ -66,7 +75,12 @@ impl BloomFilterCreator { /// # PANICS /// /// `rows_per_segment` <= 0 - pub fn new(rows_per_segment: usize) -> Self { + pub fn new( + rows_per_segment: usize, + intermediate_provider: Box, + global_memory_usage: Arc, + global_memory_usage_threshold: Option, + ) -> Self { assert!( rows_per_segment > 0, "rows_per_segment must be greater than 0" @@ -77,54 +91,67 @@ impl BloomFilterCreator { accumulated_row_count: 0, cur_seg_distinct_elems: HashSet::default(), cur_seg_distinct_elems_mem_usage: 0, - finalized_bloom_filters: FinalizedBloomFilterStorage::default(), + global_memory_usage: global_memory_usage.clone(), + finalized_bloom_filters: FinalizedBloomFilterStorage::new( + intermediate_provider, + global_memory_usage, + global_memory_usage_threshold, + ), } } /// Adds a row of elements to the bloom filter. If the number of accumulated rows /// reaches `rows_per_segment`, it finalizes the current segment. - pub fn push_row_elems(&mut self, elems: impl IntoIterator) { + pub async fn push_row_elems(&mut self, elems: impl IntoIterator) -> Result<()> { self.accumulated_row_count += 1; + + let mut mem_diff = 0; for elem in elems.into_iter() { let len = elem.len(); let is_new = self.cur_seg_distinct_elems.insert(elem); if is_new { - self.cur_seg_distinct_elems_mem_usage += len; + mem_diff += len; } } + self.cur_seg_distinct_elems_mem_usage += mem_diff; + self.global_memory_usage + .fetch_add(mem_diff, Ordering::Relaxed); if self.accumulated_row_count % self.rows_per_segment == 0 { - self.finalize_segment(); + self.finalize_segment().await?; } + + Ok(()) } /// Finalizes any remaining segments and writes the bloom filters and metadata to the provided writer. pub async fn finish(&mut self, mut writer: impl AsyncWrite + Unpin) -> Result<()> { if !self.cur_seg_distinct_elems.is_empty() { - self.finalize_segment(); + self.finalize_segment().await?; } let mut meta = BloomFilterMeta { rows_per_segment: self.rows_per_segment, - seg_count: self.finalized_bloom_filters.len(), row_count: self.accumulated_row_count, ..Default::default() }; - let mut buf = Vec::new(); - for segment in self.finalized_bloom_filters.drain() { - let slice = segment.bloom_filter.as_slice(); - buf.clear(); - write_u64_slice(&mut buf, slice); - writer.write_all(&buf).await.context(IoSnafu)?; + let mut segs = self.finalized_bloom_filters.drain().await?; + while let Some(segment) = segs.next().await { + let segment = segment?; + writer + .write_all(&segment.bloom_filter_bytes) + .await + .context(IoSnafu)?; - let size = buf.len(); + let size = segment.bloom_filter_bytes.len(); meta.bloom_filter_segments.push(BloomFilterSegmentLocation { offset: meta.bloom_filter_segments_size as _, size: size as _, elem_count: segment.element_count, }); meta.bloom_filter_segments_size += size; + meta.seg_count += 1; } let meta_bytes = serde_json::to_vec(&meta).context(SerdeJsonSnafu)?; @@ -145,91 +172,29 @@ impl BloomFilterCreator { self.cur_seg_distinct_elems_mem_usage + self.finalized_bloom_filters.memory_usage() } - fn finalize_segment(&mut self) { + async fn finalize_segment(&mut self) -> Result<()> { let elem_count = self.cur_seg_distinct_elems.len(); self.finalized_bloom_filters - .add(self.cur_seg_distinct_elems.drain(), elem_count); - self.cur_seg_distinct_elems_mem_usage = 0; - } -} - -/// Storage for finalized Bloom filters. -/// -/// TODO(zhongzc): Add support for storing intermediate bloom filters on disk to control memory usage. -#[derive(Debug, Default)] -struct FinalizedBloomFilterStorage { - /// Bloom filters that are stored in memory. - in_memory: Vec, -} - -impl FinalizedBloomFilterStorage { - fn memory_usage(&self) -> usize { - self.in_memory.iter().map(|s| s.size).sum() - } - - /// Adds a new finalized Bloom filter to the storage. - /// - /// TODO(zhongzc): Add support for flushing to disk. - fn add(&mut self, elems: impl IntoIterator, elem_count: usize) { - let mut bf = BloomFilter::with_false_pos(FALSE_POSITIVE_RATE) - .seed(&SEED) - .expected_items(elem_count); - for elem in elems.into_iter() { - bf.insert(&elem); - } + .add(self.cur_seg_distinct_elems.drain(), elem_count) + .await?; - let cbf = FinalizedBloomFilterSegment::new(bf, elem_count); - self.in_memory.push(cbf); - } - - fn len(&self) -> usize { - self.in_memory.len() - } - - fn drain(&mut self) -> impl Iterator + '_ { - self.in_memory.drain(..) - } -} - -/// A finalized Bloom filter segment. -#[derive(Debug)] -struct FinalizedBloomFilterSegment { - /// The underlying Bloom filter. - bloom_filter: BloomFilter, - - /// The number of elements in the Bloom filter. - element_count: usize, - - /// The occupied memory size of the Bloom filter. - size: usize, -} - -impl FinalizedBloomFilterSegment { - fn new(bloom_filter: BloomFilter, elem_count: usize) -> Self { - let memory_usage = std::mem::size_of_val(bloom_filter.as_slice()); - Self { - bloom_filter, - element_count: elem_count, - size: memory_usage, - } - } -} - -/// Writes a slice of `u64` to the buffer in little-endian order. -fn write_u64_slice(buf: &mut Vec, slice: &[u64]) { - buf.reserve(std::mem::size_of_val(slice)); - for &x in slice { - buf.extend_from_slice(&x.to_le_bytes()); + self.global_memory_usage + .fetch_sub(self.cur_seg_distinct_elems_mem_usage, Ordering::Relaxed); + self.cur_seg_distinct_elems_mem_usage = 0; + Ok(()) } } #[cfg(test)] mod tests { + use fastbloom::BloomFilter; use futures::io::Cursor; use super::*; + use crate::external_provider::MockExternalTempFileProvider; - fn u64_vec_from_bytes(bytes: &[u8]) -> Vec { + /// Converts a slice of bytes to a vector of `u64`. + pub fn u64_vec_from_bytes(bytes: &[u8]) -> Vec { bytes .chunks_exact(std::mem::size_of::()) .map(|chunk| u64::from_le_bytes(chunk.try_into().unwrap())) @@ -239,18 +204,32 @@ mod tests { #[tokio::test] async fn test_bloom_filter_creator() { let mut writer = Cursor::new(Vec::new()); - let mut creator = BloomFilterCreator::new(2); + let mut creator = BloomFilterCreator::new( + 2, + Box::new(MockExternalTempFileProvider::new()), + Arc::new(AtomicUsize::new(0)), + None, + ); - creator.push_row_elems(vec![b"a".to_vec(), b"b".to_vec()]); + creator + .push_row_elems(vec![b"a".to_vec(), b"b".to_vec()]) + .await + .unwrap(); assert!(creator.cur_seg_distinct_elems_mem_usage > 0); assert!(creator.memory_usage() > 0); - creator.push_row_elems(vec![b"c".to_vec(), b"d".to_vec()]); + creator + .push_row_elems(vec![b"c".to_vec(), b"d".to_vec()]) + .await + .unwrap(); // Finalize the first segment - assert!(creator.cur_seg_distinct_elems_mem_usage == 0); + assert_eq!(creator.cur_seg_distinct_elems_mem_usage, 0); assert!(creator.memory_usage() > 0); - creator.push_row_elems(vec![b"e".to_vec(), b"f".to_vec()]); + creator + .push_row_elems(vec![b"e".to_vec(), b"f".to_vec()]) + .await + .unwrap(); assert!(creator.cur_seg_distinct_elems_mem_usage > 0); assert!(creator.memory_usage() > 0); diff --git a/src/index/src/bloom_filter/creator/finalize_segment.rs b/src/index/src/bloom_filter/creator/finalize_segment.rs new file mode 100644 index 000000000000..65b090de3eee --- /dev/null +++ b/src/index/src/bloom_filter/creator/finalize_segment.rs @@ -0,0 +1,293 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::pin::Pin; +use std::sync::atomic::{AtomicUsize, Ordering}; +use std::sync::Arc; + +use asynchronous_codec::{FramedRead, FramedWrite}; +use fastbloom::BloomFilter; +use futures::stream::StreamExt; +use futures::{stream, AsyncWriteExt, Stream}; +use snafu::ResultExt; + +use super::intermediate_codec::IntermediateBloomFilterCodecV1; +use crate::bloom_filter::creator::{FALSE_POSITIVE_RATE, SEED}; +use crate::bloom_filter::error::{IntermediateSnafu, IoSnafu, Result}; +use crate::bloom_filter::Bytes; +use crate::external_provider::ExternalTempFileProvider; + +/// The minimum memory usage threshold for flushing in-memory Bloom filters to disk. +const MIN_MEMORY_USAGE_THRESHOLD: usize = 1024 * 1024; // 1MB + +/// Storage for finalized Bloom filters. +pub struct FinalizedBloomFilterStorage { + /// Bloom filters that are stored in memory. + in_memory: Vec, + + /// Used to generate unique file IDs for intermediate Bloom filters. + intermediate_file_id_counter: usize, + + /// Prefix for intermediate Bloom filter files. + intermediate_prefix: String, + + /// The provider for intermediate Bloom filter files. + intermediate_provider: Box, + + /// The memory usage of the in-memory Bloom filters. + memory_usage: usize, + + /// The global memory usage provided by the user to track the + /// total memory usage of the creating Bloom filters. + global_memory_usage: Arc, + + /// The threshold of the global memory usage of the creating Bloom filters. + global_memory_usage_threshold: Option, +} + +impl FinalizedBloomFilterStorage { + /// Creates a new `FinalizedBloomFilterStorage`. + pub fn new( + intermediate_provider: Box, + global_memory_usage: Arc, + global_memory_usage_threshold: Option, + ) -> Self { + let external_prefix = format!("intm-bloom-filters-{}", uuid::Uuid::new_v4()); + Self { + in_memory: Vec::new(), + intermediate_file_id_counter: 0, + intermediate_prefix: external_prefix, + intermediate_provider, + memory_usage: 0, + global_memory_usage, + global_memory_usage_threshold, + } + } + + /// Returns the memory usage of the storage. + pub fn memory_usage(&self) -> usize { + self.memory_usage + } + + /// Adds a new finalized Bloom filter to the storage. + /// + /// If the memory usage exceeds the threshold, flushes the in-memory Bloom filters to disk. + pub async fn add( + &mut self, + elems: impl IntoIterator, + element_count: usize, + ) -> Result<()> { + let mut bf = BloomFilter::with_false_pos(FALSE_POSITIVE_RATE) + .seed(&SEED) + .expected_items(element_count); + for elem in elems.into_iter() { + bf.insert(&elem); + } + + let fbf = FinalizedBloomFilterSegment::from(bf, element_count); + + // Update memory usage. + let memory_diff = fbf.bloom_filter_bytes.len(); + self.memory_usage += memory_diff; + self.global_memory_usage + .fetch_add(memory_diff, Ordering::Relaxed); + + // Add the finalized Bloom filter to the in-memory storage. + self.in_memory.push(fbf); + + // Flush to disk if necessary. + + // Do not flush if memory usage is too low. + if self.memory_usage < MIN_MEMORY_USAGE_THRESHOLD { + return Ok(()); + } + + // Check if the global memory usage exceeds the threshold and flush to disk if necessary. + if let Some(threshold) = self.global_memory_usage_threshold { + let global = self.global_memory_usage.load(Ordering::Relaxed); + + if global > threshold { + self.flush_in_memory_to_disk().await?; + + self.global_memory_usage + .fetch_sub(self.memory_usage, Ordering::Relaxed); + self.memory_usage = 0; + } + } + + Ok(()) + } + + /// Drains the storage and returns a stream of finalized Bloom filter segments. + pub async fn drain( + &mut self, + ) -> Result> + '_>>> { + // FAST PATH: memory only + if self.intermediate_file_id_counter == 0 { + return Ok(Box::pin(stream::iter(self.in_memory.drain(..).map(Ok)))); + } + + // SLOW PATH: memory + disk + let mut on_disk = self + .intermediate_provider + .read_all(&self.intermediate_prefix) + .await + .context(IntermediateSnafu)?; + on_disk.sort_unstable_by(|x, y| x.0.cmp(&y.0)); + + let streams = on_disk + .into_iter() + .map(|(_, reader)| FramedRead::new(reader, IntermediateBloomFilterCodecV1::default())); + + let in_memory_stream = stream::iter(self.in_memory.drain(..)).map(Ok); + Ok(Box::pin( + stream::iter(streams).flatten().chain(in_memory_stream), + )) + } + + /// Flushes the in-memory Bloom filters to disk. + async fn flush_in_memory_to_disk(&mut self) -> Result<()> { + let file_id = self.intermediate_file_id_counter; + self.intermediate_file_id_counter += 1; + + let file_id = format!("{:08}", file_id); + let mut writer = self + .intermediate_provider + .create(&self.intermediate_prefix, &file_id) + .await + .context(IntermediateSnafu)?; + + let fw = FramedWrite::new(&mut writer, IntermediateBloomFilterCodecV1::default()); + // `forward()` will flush and close the writer when the stream ends + if let Err(e) = stream::iter(self.in_memory.drain(..).map(Ok)) + .forward(fw) + .await + { + writer.close().await.context(IoSnafu)?; + writer.flush().await.context(IoSnafu)?; + return Err(e); + } + + Ok(()) + } +} + +/// A finalized Bloom filter segment. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct FinalizedBloomFilterSegment { + /// The underlying Bloom filter bytes. + pub bloom_filter_bytes: Vec, + + /// The number of elements in the Bloom filter. + pub element_count: usize, +} + +impl FinalizedBloomFilterSegment { + fn from(bf: BloomFilter, elem_count: usize) -> Self { + let bf_slice = bf.as_slice(); + let mut bloom_filter_bytes = Vec::with_capacity(std::mem::size_of_val(bf_slice)); + for &x in bf_slice { + bloom_filter_bytes.extend_from_slice(&x.to_le_bytes()); + } + + Self { + bloom_filter_bytes, + element_count: elem_count, + } + } +} + +#[cfg(test)] +mod tests { + use std::collections::HashMap; + use std::sync::Mutex; + + use futures::AsyncRead; + use tokio::io::duplex; + use tokio_util::compat::{TokioAsyncReadCompatExt, TokioAsyncWriteCompatExt}; + + use super::*; + use crate::bloom_filter::creator::tests::u64_vec_from_bytes; + use crate::external_provider::MockExternalTempFileProvider; + + #[tokio::test] + async fn test_finalized_bloom_filter_storage() { + let mut mock_provider = MockExternalTempFileProvider::new(); + + let mock_files: Arc>>> = + Arc::new(Mutex::new(HashMap::new())); + + mock_provider.expect_create().returning({ + let files = Arc::clone(&mock_files); + move |file_group, file_id| { + assert!(file_group.starts_with("intm-bloom-filters-")); + let mut files = files.lock().unwrap(); + let (writer, reader) = duplex(2 * 1024 * 1024); + files.insert(file_id.to_string(), Box::new(reader.compat())); + Ok(Box::new(writer.compat_write())) + } + }); + + mock_provider.expect_read_all().returning({ + let files = Arc::clone(&mock_files); + move |file_group| { + assert!(file_group.starts_with("intm-bloom-filters-")); + let mut files = files.lock().unwrap(); + Ok(files.drain().collect::>()) + } + }); + + let global_memory_usage = Arc::new(AtomicUsize::new(0)); + let global_memory_usage_threshold = Some(1024 * 1024); // 1MB + let provider = Box::new(mock_provider); + let mut storage = FinalizedBloomFilterStorage::new( + provider, + global_memory_usage.clone(), + global_memory_usage_threshold, + ); + + let elem_count = 2000; + let batch = 1000; + + for i in 0..batch { + let elems = (elem_count * i..elem_count * (i + 1)).map(|x| x.to_string().into_bytes()); + storage.add(elems, elem_count).await.unwrap(); + } + + // Flush happens. + assert!(storage.intermediate_file_id_counter > 0); + + // Drain the storage. + let mut stream = storage.drain().await.unwrap(); + + let mut i = 0; + while let Some(segment) = stream.next().await { + let segment = segment.unwrap(); + assert_eq!(segment.element_count, elem_count); + + let v = u64_vec_from_bytes(&segment.bloom_filter_bytes); + + // Check the correctness of the Bloom filter. + let bf = BloomFilter::from_vec(v) + .seed(&SEED) + .expected_items(segment.element_count); + for elem in (elem_count * i..elem_count * (i + 1)).map(|x| x.to_string().into_bytes()) { + assert!(bf.contains(&elem)); + } + i += 1; + } + + assert_eq!(i, batch); + } +} diff --git a/src/index/src/bloom_filter/creator/intermediate_codec.rs b/src/index/src/bloom_filter/creator/intermediate_codec.rs new file mode 100644 index 000000000000..a01d7d72510d --- /dev/null +++ b/src/index/src/bloom_filter/creator/intermediate_codec.rs @@ -0,0 +1,248 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use asynchronous_codec::{BytesMut, Decoder, Encoder}; +use bytes::{Buf, BufMut}; +use snafu::{ensure, ResultExt}; + +use crate::bloom_filter::creator::finalize_segment::FinalizedBloomFilterSegment; +use crate::bloom_filter::error::{Error, InvalidIntermediateMagicSnafu, IoSnafu, Result}; + +/// The magic number for the codec version 1 of the intermediate bloom filter. +const CODEC_V1_MAGIC: &[u8; 4] = b"bi01"; + +/// Codec of the intermediate finalized bloom filter segment. +/// +/// # Format +/// +/// [ magic ][ elem count ][ size ][ bloom filter ][ elem count ][ size ][ bloom filter ]... +/// [4] [8] [8] [size] [8] [8] [size] +#[derive(Debug, Default)] +pub struct IntermediateBloomFilterCodecV1 { + handled_header_magic: bool, +} + +impl Encoder for IntermediateBloomFilterCodecV1 { + type Item<'a> = FinalizedBloomFilterSegment; + type Error = Error; + + fn encode(&mut self, item: FinalizedBloomFilterSegment, dst: &mut BytesMut) -> Result<()> { + if !self.handled_header_magic { + dst.extend_from_slice(CODEC_V1_MAGIC); + self.handled_header_magic = true; + } + + let segment_bytes = item.bloom_filter_bytes; + let elem_count = item.element_count; + + dst.reserve(2 * std::mem::size_of::() + segment_bytes.len()); + dst.put_u64_le(elem_count as u64); + dst.put_u64_le(segment_bytes.len() as u64); + dst.extend_from_slice(&segment_bytes); + Ok(()) + } +} + +impl Decoder for IntermediateBloomFilterCodecV1 { + type Item = FinalizedBloomFilterSegment; + type Error = Error; + + fn decode(&mut self, src: &mut BytesMut) -> Result> { + if !self.handled_header_magic { + let m_len = CODEC_V1_MAGIC.len(); + if src.remaining() < m_len { + return Ok(None); + } + let magic_bytes = &src[..m_len]; + ensure!( + magic_bytes == CODEC_V1_MAGIC, + InvalidIntermediateMagicSnafu { + invalid: magic_bytes, + } + ); + self.handled_header_magic = true; + src.advance(m_len); + } + + let s = &src[..]; + + let u64_size = std::mem::size_of::(); + let n_size = u64_size * 2; + if s.len() < n_size { + return Ok(None); + } + + let element_count = u64::from_le_bytes(s[0..u64_size].try_into().unwrap()) as usize; + let segment_size = u64::from_le_bytes(s[u64_size..n_size].try_into().unwrap()) as usize; + + if s.len() < n_size + segment_size { + return Ok(None); + } + + let bloom_filter_bytes = s[n_size..n_size + segment_size].to_vec(); + src.advance(n_size + segment_size); + Ok(Some(FinalizedBloomFilterSegment { + element_count, + bloom_filter_bytes, + })) + } +} + +/// Required for [`Encoder`] and [`Decoder`] implementations. +impl From for Error { + fn from(error: std::io::Error) -> Self { + Err::<(), std::io::Error>(error) + .context(IoSnafu) + .unwrap_err() + } +} + +#[cfg(test)] +mod tests { + use asynchronous_codec::{FramedRead, FramedWrite}; + use futures::io::Cursor; + use futures::{SinkExt, StreamExt}; + + use super::*; + use crate::bloom_filter::creator::finalize_segment::FinalizedBloomFilterSegment; + + #[test] + fn test_intermediate_bloom_filter_codec_v1_basic() { + let mut encoder = IntermediateBloomFilterCodecV1::default(); + let mut buf = BytesMut::new(); + + let item1 = FinalizedBloomFilterSegment { + element_count: 2, + bloom_filter_bytes: vec![1, 2, 3, 4], + }; + let item2 = FinalizedBloomFilterSegment { + element_count: 3, + bloom_filter_bytes: vec![5, 6, 7, 8], + }; + let item3 = FinalizedBloomFilterSegment { + element_count: 4, + bloom_filter_bytes: vec![9, 10, 11, 12], + }; + + encoder.encode(item1.clone(), &mut buf).unwrap(); + encoder.encode(item2.clone(), &mut buf).unwrap(); + encoder.encode(item3.clone(), &mut buf).unwrap(); + + let mut buf = buf.freeze().try_into_mut().unwrap(); + + let mut decoder = IntermediateBloomFilterCodecV1::default(); + let decoded_item1 = decoder.decode(&mut buf).unwrap().unwrap(); + let decoded_item2 = decoder.decode(&mut buf).unwrap().unwrap(); + let decoded_item3 = decoder.decode(&mut buf).unwrap().unwrap(); + + assert_eq!(item1, decoded_item1); + assert_eq!(item2, decoded_item2); + assert_eq!(item3, decoded_item3); + } + + #[tokio::test] + async fn test_intermediate_bloom_filter_codec_v1_frame_read_write() { + let item1 = FinalizedBloomFilterSegment { + element_count: 2, + bloom_filter_bytes: vec![1, 2, 3, 4], + }; + let item2 = FinalizedBloomFilterSegment { + element_count: 3, + bloom_filter_bytes: vec![5, 6, 7, 8], + }; + let item3 = FinalizedBloomFilterSegment { + element_count: 4, + bloom_filter_bytes: vec![9, 10, 11, 12], + }; + + let mut bytes = Cursor::new(vec![]); + + let mut writer = FramedWrite::new(&mut bytes, IntermediateBloomFilterCodecV1::default()); + writer.send(item1.clone()).await.unwrap(); + writer.send(item2.clone()).await.unwrap(); + writer.send(item3.clone()).await.unwrap(); + writer.flush().await.unwrap(); + writer.close().await.unwrap(); + + let bytes = bytes.into_inner(); + let mut reader = + FramedRead::new(bytes.as_slice(), IntermediateBloomFilterCodecV1::default()); + let decoded_item1 = reader.next().await.unwrap().unwrap(); + let decoded_item2 = reader.next().await.unwrap().unwrap(); + let decoded_item3 = reader.next().await.unwrap().unwrap(); + assert!(reader.next().await.is_none()); + + assert_eq!(item1, decoded_item1); + assert_eq!(item2, decoded_item2); + assert_eq!(item3, decoded_item3); + } + + #[tokio::test] + async fn test_intermediate_bloom_filter_codec_v1_frame_read_write_only_magic() { + let bytes = CODEC_V1_MAGIC.to_vec(); + let mut reader = + FramedRead::new(bytes.as_slice(), IntermediateBloomFilterCodecV1::default()); + assert!(reader.next().await.is_none()); + } + + #[tokio::test] + async fn test_intermediate_bloom_filter_codec_v1_frame_read_write_partial_magic() { + let bytes = CODEC_V1_MAGIC[..3].to_vec(); + let mut reader = + FramedRead::new(bytes.as_slice(), IntermediateBloomFilterCodecV1::default()); + let e = reader.next().await.unwrap(); + assert!(e.is_err()); + } + + #[tokio::test] + async fn test_intermediate_bloom_filter_codec_v1_frame_read_write_partial_item() { + let mut bytes = vec![]; + bytes.extend_from_slice(CODEC_V1_MAGIC); + bytes.extend_from_slice(&2u64.to_le_bytes()); + bytes.extend_from_slice(&4u64.to_le_bytes()); + + let mut reader = + FramedRead::new(bytes.as_slice(), IntermediateBloomFilterCodecV1::default()); + let e = reader.next().await.unwrap(); + assert!(e.is_err()); + } + + #[tokio::test] + async fn test_intermediate_bloom_filter_codec_v1_frame_read_write_corrupted_magic() { + let mut bytes = vec![]; + bytes.extend_from_slice(b"bi02"); + bytes.extend_from_slice(&2u64.to_le_bytes()); + bytes.extend_from_slice(&4u64.to_le_bytes()); + bytes.extend_from_slice(&[1, 2, 3, 4]); + + let mut reader = + FramedRead::new(bytes.as_slice(), IntermediateBloomFilterCodecV1::default()); + let e = reader.next().await.unwrap(); + assert!(e.is_err()); + } + + #[tokio::test] + async fn test_intermediate_bloom_filter_codec_v1_frame_read_write_corrupted_length() { + let mut bytes = vec![]; + bytes.extend_from_slice(CODEC_V1_MAGIC); + bytes.extend_from_slice(&2u64.to_le_bytes()); + bytes.extend_from_slice(&4u64.to_le_bytes()); + bytes.extend_from_slice(&[1, 2, 3]); + + let mut reader = + FramedRead::new(bytes.as_slice(), IntermediateBloomFilterCodecV1::default()); + let e = reader.next().await.unwrap(); + assert!(e.is_err()); + } +} diff --git a/src/index/src/bloom_filter/error.rs b/src/index/src/bloom_filter/error.rs index 8e95dc52255e..7b91061aea05 100644 --- a/src/index/src/bloom_filter/error.rs +++ b/src/index/src/bloom_filter/error.rs @@ -39,6 +39,20 @@ pub enum Error { location: Location, }, + #[snafu(display("Intermediate error"))] + Intermediate { + source: crate::error::Error, + #[snafu(implicit)] + location: Location, + }, + + #[snafu(display("Invalid intermediate magic"))] + InvalidIntermediateMagic { + invalid: Vec, + #[snafu(implicit)] + location: Location, + }, + #[snafu(display("External error"))] External { source: BoxedError, @@ -52,8 +66,11 @@ impl ErrorExt for Error { use Error::*; match self { - Io { .. } | Self::SerdeJson { .. } => StatusCode::Unexpected, + Io { .. } | SerdeJson { .. } | InvalidIntermediateMagic { .. } => { + StatusCode::Unexpected + } + Intermediate { source, .. } => source.status_code(), External { source, .. } => source.status_code(), } } diff --git a/src/index/src/error.rs b/src/index/src/error.rs new file mode 100644 index 000000000000..7214437bbbe4 --- /dev/null +++ b/src/index/src/error.rs @@ -0,0 +1,48 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::any::Any; + +use common_error::ext::{BoxedError, ErrorExt}; +use common_error::status_code::StatusCode; +use common_macro::stack_trace_debug; +use snafu::{Location, Snafu}; + +#[derive(Snafu)] +#[snafu(visibility(pub))] +#[stack_trace_debug] +pub enum Error { + #[snafu(display("External error"))] + External { + source: BoxedError, + #[snafu(implicit)] + location: Location, + }, +} + +impl ErrorExt for Error { + fn status_code(&self) -> StatusCode { + use Error::*; + + match self { + External { source, .. } => source.status_code(), + } + } + + fn as_any(&self) -> &dyn Any { + self + } +} + +pub type Result = std::result::Result; diff --git a/src/index/src/inverted_index/create/sort/external_provider.rs b/src/index/src/external_provider.rs similarity index 58% rename from src/index/src/inverted_index/create/sort/external_provider.rs rename to src/index/src/external_provider.rs index a86f3e06aad4..60d2fac2eaec 100644 --- a/src/index/src/inverted_index/create/sort/external_provider.rs +++ b/src/index/src/external_provider.rs @@ -15,25 +15,24 @@ use async_trait::async_trait; use futures::{AsyncRead, AsyncWrite}; -use crate::inverted_index::error::Result; +use crate::error::Error; -/// Trait for managing intermediate files during external sorting for a particular index. +pub type Writer = Box; +pub type Reader = Box; + +/// Trait for managing intermediate files to control memory usage for a particular index. #[mockall::automock] #[async_trait] pub trait ExternalTempFileProvider: Send + Sync { - /// Creates and opens a new intermediate file associated with a specific index for writing. + /// Creates and opens a new intermediate file associated with a specific `file_group` for writing. /// The implementation should ensure that the file does not already exist. /// - /// - `index_name`: the name of the index for which the file will be associated + /// - `file_group`: a unique identifier for the group of files /// - `file_id`: a unique identifier for the new file - async fn create( - &self, - index_name: &str, - file_id: &str, - ) -> Result>; + async fn create(&self, file_group: &str, file_id: &str) -> Result; - /// Retrieves all intermediate files associated with a specific index for an external sorting operation. + /// Retrieves all intermediate files and their associated file identifiers for a specific `file_group`. /// - /// `index_name`: the name of the index to retrieve intermediate files for - async fn read_all(&self, index_name: &str) -> Result>>; + /// `file_group` is a unique identifier for the group of files. + async fn read_all(&self, file_group: &str) -> Result, Error>; } diff --git a/src/index/src/inverted_index/create/sort.rs b/src/index/src/inverted_index/create/sort.rs index 369017835643..81ca9aeca690 100644 --- a/src/index/src/inverted_index/create/sort.rs +++ b/src/index/src/inverted_index/create/sort.rs @@ -12,7 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -pub mod external_provider; pub mod external_sort; mod intermediate_rw; mod merge_stream; diff --git a/src/index/src/inverted_index/create/sort/external_sort.rs b/src/index/src/inverted_index/create/sort/external_sort.rs index 17afd7ced31c..f4e1d9f9101d 100644 --- a/src/index/src/inverted_index/create/sort/external_sort.rs +++ b/src/index/src/inverted_index/create/sort/external_sort.rs @@ -23,15 +23,16 @@ use async_trait::async_trait; use common_base::BitVec; use common_telemetry::{debug, error}; use futures::stream; +use snafu::ResultExt; -use crate::inverted_index::create::sort::external_provider::ExternalTempFileProvider; +use crate::external_provider::ExternalTempFileProvider; use crate::inverted_index::create::sort::intermediate_rw::{ IntermediateReader, IntermediateWriter, }; use crate::inverted_index::create::sort::merge_stream::MergeSortedStream; use crate::inverted_index::create::sort::{SortOutput, SortedStream, Sorter}; use crate::inverted_index::create::sort_create::SorterFactory; -use crate::inverted_index::error::Result; +use crate::inverted_index::error::{IntermediateSnafu, Result}; use crate::inverted_index::{Bytes, BytesRef}; /// `ExternalSorter` manages the sorting of data using both in-memory structures and external files. @@ -107,7 +108,11 @@ impl Sorter for ExternalSorter { /// Finalizes the sorting operation, merging data from both in-memory buffer and external files /// into a sorted stream async fn output(&mut self) -> Result { - let readers = self.temp_file_provider.read_all(&self.index_name).await?; + let readers = self + .temp_file_provider + .read_all(&self.index_name) + .await + .context(IntermediateSnafu)?; // TODO(zhongzc): k-way merge instead of 2-way merge @@ -122,7 +127,7 @@ impl Sorter for ExternalSorter { Ok((value, bitmap)) }), ))); - for reader in readers { + for (_, reader) in readers { tree_nodes.push_back(IntermediateReader::new(reader).into_stream().await?); } @@ -241,7 +246,11 @@ impl ExternalSorter { let file_id = &format!("{:012}", self.total_row_count); let index_name = &self.index_name; - let writer = self.temp_file_provider.create(index_name, file_id).await?; + let writer = self + .temp_file_provider + .create(index_name, file_id) + .await + .context(IntermediateSnafu)?; let values = mem::take(&mut self.values_buffer); self.global_memory_usage @@ -302,7 +311,7 @@ mod tests { use tokio_util::compat::{TokioAsyncReadCompatExt, TokioAsyncWriteCompatExt}; use super::*; - use crate::inverted_index::create::sort::external_provider::MockExternalTempFileProvider; + use crate::external_provider::MockExternalTempFileProvider; async fn test_external_sorter( current_memory_usage_threshold: Option, @@ -332,7 +341,7 @@ mod tests { move |index_name| { assert_eq!(index_name, "test"); let mut files = files.lock().unwrap(); - Ok(files.drain().map(|f| f.1).collect::>()) + Ok(files.drain().collect::>()) } }); diff --git a/src/index/src/inverted_index/error.rs b/src/index/src/inverted_index/error.rs index 7e861beda6d1..c53e2ae9f57e 100644 --- a/src/index/src/inverted_index/error.rs +++ b/src/index/src/inverted_index/error.rs @@ -213,6 +213,13 @@ pub enum Error { #[snafu(implicit)] location: Location, }, + + #[snafu(display("Intermediate error"))] + Intermediate { + source: crate::error::Error, + #[snafu(implicit)] + location: Location, + }, } impl ErrorExt for Error { @@ -245,6 +252,7 @@ impl ErrorExt for Error { | InconsistentRowCount { .. } | IndexNotFound { .. } => StatusCode::InvalidArguments, + Intermediate { source, .. } => source.status_code(), External { source, .. } => source.status_code(), } } diff --git a/src/index/src/lib.rs b/src/index/src/lib.rs index e52a93138f68..e490dbc06464 100644 --- a/src/index/src/lib.rs +++ b/src/index/src/lib.rs @@ -16,5 +16,7 @@ #![feature(assert_matches)] pub mod bloom_filter; +pub mod error; +pub mod external_provider; pub mod fulltext_index; pub mod inverted_index; diff --git a/src/mito2/src/sst/index/intermediate.rs b/src/mito2/src/sst/index/intermediate.rs index 1568261e206f..d0da804c745b 100644 --- a/src/mito2/src/sst/index/intermediate.rs +++ b/src/mito2/src/sst/index/intermediate.rs @@ -104,16 +104,28 @@ impl IntermediateLocation { &self.files_dir } - /// Returns the path of the directory for intermediate files associated with a column: - /// `__intm/{region_id}/{sst_file_id}/{uuid}/{column_id}/` - pub fn column_path(&self, column_id: &str) -> String { - util::join_path(&self.files_dir, &format!("{column_id}/")) + /// Returns the path of the directory for intermediate files associated with the `file_group`: + /// `__intm/{region_id}/{sst_file_id}/{uuid}/{file_group}/` + pub fn file_group_path(&self, file_group: &str) -> String { + util::join_path(&self.files_dir, &format!("{file_group}/")) } - /// Returns the path of the intermediate file with the given id for a column: - /// `__intm/{region_id}/{sst_file_id}/{uuid}/{column_id}/{im_file_id}.im` - pub fn file_path(&self, column_id: &str, im_file_id: &str) -> String { - util::join_path(&self.column_path(column_id), &format!("{im_file_id}.im")) + /// Returns the path of the intermediate file with the given `file_group` and `im_file_id`: + /// `__intm/{region_id}/{sst_file_id}/{uuid}/{file_group}/{im_file_id}.im` + pub fn file_path(&self, file_group: &str, im_file_id: &str) -> String { + util::join_path( + &self.file_group_path(file_group), + &format!("{im_file_id}.im"), + ) + } + + /// Returns the intermediate file id from the path. + pub fn im_file_id_from_path(&self, path: &str) -> String { + path.rsplit('/') + .next() + .and_then(|s| s.strip_suffix(".im")) + .unwrap_or_default() + .to_string() } } @@ -161,17 +173,20 @@ mod tests { let uuid = location.files_dir.split('/').nth(3).unwrap(); - let column_id = "1"; + let file_group = "1"; assert_eq!( - location.column_path(column_id), - format!("{INTERMEDIATE_DIR}/0/{sst_file_id}/{uuid}/{column_id}/") + location.file_group_path(file_group), + format!("{INTERMEDIATE_DIR}/0/{sst_file_id}/{uuid}/{file_group}/") ); let im_file_id = "000000000010"; + let file_path = location.file_path(file_group, im_file_id); assert_eq!( - location.file_path(column_id, im_file_id), - format!("{INTERMEDIATE_DIR}/0/{sst_file_id}/{uuid}/{column_id}/{im_file_id}.im") + file_path, + format!("{INTERMEDIATE_DIR}/0/{sst_file_id}/{uuid}/{file_group}/{im_file_id}.im") ); + + assert_eq!(location.im_file_id_from_path(&file_path), im_file_id); } #[tokio::test] diff --git a/src/mito2/src/sst/index/inverted_index/creator/temp_provider.rs b/src/mito2/src/sst/index/inverted_index/creator/temp_provider.rs index ee80aaa0a63c..1822f3119459 100644 --- a/src/mito2/src/sst/index/inverted_index/creator/temp_provider.rs +++ b/src/mito2/src/sst/index/inverted_index/creator/temp_provider.rs @@ -16,9 +16,9 @@ use async_trait::async_trait; use common_error::ext::BoxedError; use common_telemetry::warn; use futures::{AsyncRead, AsyncWrite}; -use index::inverted_index::create::sort::external_provider::ExternalTempFileProvider; -use index::inverted_index::error as index_error; -use index::inverted_index::error::Result as IndexResult; +use index::error as index_error; +use index::error::Result as IndexResult; +use index::external_provider::ExternalTempFileProvider; use snafu::ResultExt; use crate::error::Result; @@ -42,10 +42,10 @@ pub(crate) struct TempFileProvider { impl ExternalTempFileProvider for TempFileProvider { async fn create( &self, - column_id: &str, + file_group: &str, file_id: &str, ) -> IndexResult> { - let path = self.location.file_path(column_id, file_id); + let path = self.location.file_path(file_group, file_id); let writer = self .manager .store() @@ -63,13 +63,13 @@ impl ExternalTempFileProvider for TempFileProvider { async fn read_all( &self, - column_id: &str, - ) -> IndexResult>> { - let column_path = self.location.column_path(column_id); + file_group: &str, + ) -> IndexResult)>> { + let file_group_path = self.location.file_group_path(file_group); let entries = self .manager .store() - .list(&column_path) + .list(&file_group_path) .await .map_err(BoxedError::new) .context(index_error::ExternalSnafu)?; @@ -81,6 +81,8 @@ impl ExternalTempFileProvider for TempFileProvider { continue; } + let im_file_id = self.location.im_file_id_from_path(entry.path()); + let reader = self .manager .store() @@ -93,7 +95,7 @@ impl ExternalTempFileProvider for TempFileProvider { .await .map_err(BoxedError::new) .context(index_error::ExternalSnafu)?; - readers.push(Box::new(reader) as _); + readers.push((im_file_id, Box::new(reader) as _)); } Ok(readers) @@ -133,36 +135,36 @@ mod tests { let store = IntermediateManager::init_fs(path).await.unwrap(); let provider = TempFileProvider::new(location.clone(), store); - let column_name = "tag0"; + let file_group = "tag0"; let file_id = "0000000010"; - let mut writer = provider.create(column_name, file_id).await.unwrap(); + let mut writer = provider.create(file_group, file_id).await.unwrap(); writer.write_all(b"hello").await.unwrap(); writer.flush().await.unwrap(); writer.close().await.unwrap(); let file_id = "0000000100"; - let mut writer = provider.create(column_name, file_id).await.unwrap(); + let mut writer = provider.create(file_group, file_id).await.unwrap(); writer.write_all(b"world").await.unwrap(); writer.flush().await.unwrap(); writer.close().await.unwrap(); - let column_name = "tag1"; + let file_group = "tag1"; let file_id = "0000000010"; - let mut writer = provider.create(column_name, file_id).await.unwrap(); + let mut writer = provider.create(file_group, file_id).await.unwrap(); writer.write_all(b"foo").await.unwrap(); writer.flush().await.unwrap(); writer.close().await.unwrap(); let readers = provider.read_all("tag0").await.unwrap(); assert_eq!(readers.len(), 2); - for mut reader in readers { + for (_, mut reader) in readers { let mut buf = Vec::new(); reader.read_to_end(&mut buf).await.unwrap(); assert!(matches!(buf.as_slice(), b"hello" | b"world")); } let readers = provider.read_all("tag1").await.unwrap(); assert_eq!(readers.len(), 1); - let mut reader = readers.into_iter().next().unwrap(); + let mut reader = readers.into_iter().map(|x| x.1).next().unwrap(); let mut buf = Vec::new(); reader.read_to_end(&mut buf).await.unwrap(); assert_eq!(buf, b"foo");