diff --git a/src/mito2/src/error.rs b/src/mito2/src/error.rs index 7d523c4168fe..df5062aa7dfb 100644 --- a/src/mito2/src/error.rs +++ b/src/mito2/src/error.rs @@ -331,6 +331,13 @@ pub enum Error { location: Location, }, + #[snafu(display("Invalid wal read request, {}", reason))] + InvalidWalReadRequest { + reason: String, + #[snafu(implicit)] + location: Location, + }, + #[snafu(display("Failed to convert array to vector"))] ConvertVector { #[snafu(implicit)] @@ -787,7 +794,8 @@ impl ErrorExt for Error { | ConvertColumnDataType { .. } | ColumnNotFound { .. } | InvalidMetadata { .. } - | InvalidRegionOptions { .. } => StatusCode::InvalidArguments, + | InvalidRegionOptions { .. } + | InvalidWalReadRequest { .. } => StatusCode::InvalidArguments, InvalidRegionRequestSchemaVersion { .. } => StatusCode::RequestOutdated, diff --git a/src/mito2/src/test_util.rs b/src/mito2/src/test_util.rs index 4c80782f4d59..d7c671962c03 100644 --- a/src/mito2/src/test_util.rs +++ b/src/mito2/src/test_util.rs @@ -20,6 +20,7 @@ pub mod meta_util; pub mod scheduler_util; pub mod sst_util; pub mod version_util; +pub mod wal_util; use std::collections::HashMap; use std::path::Path; diff --git a/src/mito2/src/test_util/wal_util.rs b/src/mito2/src/test_util/wal_util.rs new file mode 100644 index 000000000000..823242faae23 --- /dev/null +++ b/src/mito2/src/test_util/wal_util.rs @@ -0,0 +1,68 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use api::v1::WalEntry; +use futures::stream; +use prost::Message; +use store_api::logstore::entry::{Entry, MultiplePartEntry, MultiplePartHeader}; +use store_api::logstore::provider::Provider; +use store_api::logstore::EntryId; +use store_api::storage::RegionId; + +use crate::error::Result; +use crate::wal::raw_entry_reader::{EntryStream, RawEntryReader}; + +pub(crate) struct MockRawEntryStream { + pub(crate) entries: Vec, +} + +impl RawEntryReader for MockRawEntryStream { + fn read(&self, _ns: &Provider, _start_id: EntryId) -> Result> { + Ok(Box::pin(stream::iter( + self.entries.clone().into_iter().map(Ok), + ))) + } +} + +/// Puts an incomplete [`Entry`] at the end of `input`. +pub(crate) fn generate_tail_corrupted_stream( + provider: Provider, + region_id: RegionId, + input: &WalEntry, + num_parts: usize, +) -> Vec { + let encoded_entry = input.encode_to_vec(); + let parts = encoded_entry + .chunks(encoded_entry.len() / num_parts) + .map(Into::into) + .collect::>(); + + vec![ + Entry::MultiplePart(MultiplePartEntry { + provider: provider.clone(), + region_id, + entry_id: 0, + headers: vec![MultiplePartHeader::First, MultiplePartHeader::Last], + parts, + }), + // The tail corrupted data. + Entry::MultiplePart(MultiplePartEntry { + provider: provider.clone(), + region_id, + entry_id: 0, + headers: vec![MultiplePartHeader::First], + parts: vec![vec![1; 100]], + }), + ] +} diff --git a/src/mito2/src/wal.rs b/src/mito2/src/wal.rs index 18feb9620473..c0cca52dcbbd 100644 --- a/src/mito2/src/wal.rs +++ b/src/mito2/src/wal.rs @@ -16,10 +16,13 @@ /// TODO(weny): remove it #[allow(unused)] -pub(crate) mod raw_entry_reader; +pub(crate) mod entry_distributor; +/// TODO(weny): remove it +#[allow(unused)] +pub(crate) mod entry_reader; /// TODO(weny): remove it #[allow(unused)] -pub(crate) mod wal_entry_reader; +pub(crate) mod raw_entry_reader; use std::collections::HashMap; use std::mem; @@ -36,8 +39,8 @@ use store_api::logstore::{AppendBatchResponse, LogStore}; use store_api::storage::RegionId; use crate::error::{BuildEntrySnafu, DeleteWalSnafu, EncodeWalSnafu, Result, WriteWalSnafu}; +use crate::wal::entry_reader::{LogStoreEntryReader, WalEntryReader}; use crate::wal::raw_entry_reader::{LogStoreRawEntryReader, RegionRawEntryReader}; -use crate::wal::wal_entry_reader::{LogStoreEntryReader, WalEntryReader}; /// WAL entry id. pub type EntryId = store_api::logstore::entry::Id; diff --git a/src/mito2/src/wal/entry_distributor.rs b/src/mito2/src/wal/entry_distributor.rs new file mode 100644 index 000000000000..dacb1d3ae9dc --- /dev/null +++ b/src/mito2/src/wal/entry_distributor.rs @@ -0,0 +1,634 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::cmp::min; +use std::collections::HashMap; +use std::sync::Arc; + +use api::v1::WalEntry; +use async_stream::stream; +use common_telemetry::{debug, error}; +use futures::future::join_all; +use snafu::ensure; +use store_api::logstore::entry::Entry; +use store_api::logstore::provider::Provider; +use store_api::storage::RegionId; +use tokio::sync::mpsc::{self, Receiver, Sender, UnboundedReceiver, UnboundedSender}; +use tokio::sync::oneshot; +use tokio_stream::StreamExt; + +use crate::error::{self, Result}; +use crate::wal::entry_reader::{decode_raw_entry, WalEntryReader}; +use crate::wal::raw_entry_reader::RawEntryReader; +use crate::wal::{EntryId, WalEntryStream}; + +/// [WalEntryDistributor] distributes Wal entries to specific [WalEntryReceiver]s based on [RegionId]. +pub(crate) struct WalEntryDistributor { + raw_wal_reader: Arc, + provider: Provider, + /// Sends [Entry] to receivers based on [RegionId] + senders: HashMap>, + /// Waits for the arg from the [WalEntryReader]. + arg_receivers: Vec<(RegionId, oneshot::Receiver)>, +} + +impl WalEntryDistributor { + /// Distributes entries to specific [WalEntryReceiver]s based on [RegionId]. + pub async fn distribute(mut self) -> Result<()> { + let arg_futures = self + .arg_receivers + .iter_mut() + .map(|(region_id, receiver)| async { (*region_id, receiver.await.ok()) }); + let args = join_all(arg_futures) + .await + .into_iter() + .filter_map(|(region_id, start_id)| start_id.map(|start_id| (region_id, start_id))) + .collect::>(); + + // No subscribers + if args.is_empty() { + return Ok(()); + } + // Safety: must exist + let min_start_id = args.iter().map(|(_, start_id)| *start_id).min().unwrap(); + let receivers: HashMap<_, _> = args + .into_iter() + .map(|(region_id, start_id)| { + ( + region_id, + EntryReceiver { + start_id, + sender: self.senders[®ion_id].clone(), + }, + ) + }) + .collect(); + + let mut stream = self.raw_wal_reader.read(&self.provider, min_start_id)?; + while let Some(entry) = stream.next().await { + let entry = entry?; + let entry_id = entry.entry_id(); + let region_id = entry.region_id(); + + if let Some(EntryReceiver { sender, start_id }) = receivers.get(®ion_id) { + if entry_id >= *start_id { + if let Err(err) = sender.send(entry).await { + error!(err; "Failed to distribute raw entry, entry_id:{}, region_id: {}", entry_id, region_id); + } + } + } else { + debug!("Subscriber not found, region_id: {}", region_id); + } + } + + Ok(()) + } +} + +/// Receives the Wal entries from [WalEntryDistributor]. +#[derive(Debug)] +pub(crate) struct WalEntryReceiver { + region_id: RegionId, + /// Receives the [Entry] from the [WalEntryDistributor]. + entry_receiver: Receiver, + /// Sends the `start_id` to the [WalEntryDistributor]. + arg_sender: oneshot::Sender, +} + +impl WalEntryReceiver { + pub fn new( + region_id: RegionId, + entry_receiver: Receiver, + arg_sender: oneshot::Sender, + ) -> Self { + Self { + region_id, + entry_receiver, + arg_sender, + } + } +} + +impl WalEntryReader for WalEntryReceiver { + fn read(self, provider: &Provider, start_id: EntryId) -> Result> { + let WalEntryReceiver { + region_id: expected_region_id, + mut entry_receiver, + arg_sender, + } = self; + + if arg_sender.send(start_id).is_err() { + return error::InvalidWalReadRequestSnafu { + reason: format!( + "WalEntryDistributor is dropped, failed to send arg, start_id: {start_id}" + ), + } + .fail(); + } + + let stream = stream! { + let mut buffered_entry = None; + while let Some(next_entry) = entry_receiver.recv().await { + match buffered_entry.take() { + Some(entry) => { + yield decode_raw_entry(entry); + buffered_entry = Some(next_entry); + }, + None => { + buffered_entry = Some(next_entry); + } + }; + } + if let Some(entry) = buffered_entry { + // Ignores tail corrupted data. + if entry.is_complete() { + yield decode_raw_entry(entry); + } + } + }; + + Ok(Box::pin(stream)) + } +} + +struct EntryReceiver { + start_id: EntryId, + sender: Sender, +} + +/// Returns [WalEntryDistributor] and batch [WalEntryReceiver]s. +/// +/// ### Note: +/// Ensures `receiver.read` is called before the `distributor.distribute` in the same thread. +/// +/// ```text +/// let (distributor, receivers) = build_wal_entry_distributor_and_receivers(..); +/// Thread 1 | +/// | +/// // may deadlock | +/// distributor.distribute().await; | +/// | +/// | +/// receivers[0].read().await | +/// ``` +/// +pub fn build_wal_entry_distributor_and_receivers( + provider: Provider, + raw_wal_reader: Arc, + region_ids: Vec, + buffer_size: usize, +) -> (WalEntryDistributor, Vec) { + let mut senders = HashMap::with_capacity(region_ids.len()); + let mut readers = Vec::with_capacity(region_ids.len()); + let mut arg_receivers = Vec::with_capacity(region_ids.len()); + + for region_id in region_ids { + let (entry_sender, entry_receiver) = mpsc::channel(buffer_size); + let (arg_sender, arg_receiver) = oneshot::channel(); + + senders.insert(region_id, entry_sender); + arg_receivers.push((region_id, arg_receiver)); + readers.push(WalEntryReceiver::new(region_id, entry_receiver, arg_sender)); + } + + ( + WalEntryDistributor { + provider, + raw_wal_reader, + senders, + arg_receivers, + }, + readers, + ) +} + +#[cfg(test)] +mod tests { + use std::assert_matches::assert_matches; + + use api::v1::{Mutation, OpType}; + use futures::{stream, TryStreamExt}; + use prost::Message; + use store_api::logstore::entry::{Entry, MultiplePartEntry, MultiplePartHeader, NaiveEntry}; + + use super::*; + use crate::test_util::wal_util::generate_tail_corrupted_stream; + use crate::wal::raw_entry_reader::{EntryStream, RawEntryReader}; + use crate::wal::EntryId; + + struct MockRawEntryReader { + entries: Vec, + } + + impl MockRawEntryReader { + pub fn new(entries: Vec) -> MockRawEntryReader { + Self { entries } + } + } + + impl RawEntryReader for MockRawEntryReader { + fn read(&self, provider: &Provider, _start_id: EntryId) -> Result> { + let stream = stream::iter(self.entries.clone().into_iter().map(Ok)); + Ok(Box::pin(stream)) + } + } + + #[tokio::test] + async fn test_wal_entry_distributor_without_receivers() { + let provider = Provider::kafka_provider("my_topic".to_string()); + let reader = Arc::new(MockRawEntryReader::new(vec![Entry::Naive(NaiveEntry { + region_id: RegionId::new(1024, 1), + provider: provider.clone(), + entry_id: 1, + data: vec![1], + })])); + + let (distributor, receivers) = build_wal_entry_distributor_and_receivers( + provider, + reader, + vec![RegionId::new(1024, 1), RegionId::new(1025, 1)], + 128, + ); + + // Drops all receivers + drop(receivers); + // Returns immediately + distributor.distribute().await.unwrap(); + } + + #[tokio::test] + async fn test_wal_entry_distributor() { + common_telemetry::init_default_ut_logging(); + let provider = Provider::kafka_provider("my_topic".to_string()); + let reader = Arc::new(MockRawEntryReader::new(vec![ + Entry::Naive(NaiveEntry { + provider: provider.clone(), + region_id: RegionId::new(1024, 1), + entry_id: 1, + data: WalEntry { + mutations: vec![Mutation { + op_type: OpType::Put as i32, + sequence: 1u64, + rows: None, + }], + } + .encode_to_vec(), + }), + Entry::Naive(NaiveEntry { + provider: provider.clone(), + region_id: RegionId::new(1024, 2), + entry_id: 2, + data: WalEntry { + mutations: vec![Mutation { + op_type: OpType::Put as i32, + sequence: 2u64, + rows: None, + }], + } + .encode_to_vec(), + }), + Entry::Naive(NaiveEntry { + provider: provider.clone(), + region_id: RegionId::new(1024, 3), + entry_id: 3, + data: WalEntry { + mutations: vec![Mutation { + op_type: OpType::Put as i32, + sequence: 3u64, + rows: None, + }], + } + .encode_to_vec(), + }), + ])); + + // Builds distributor and receivers + let (distributor, mut receivers) = build_wal_entry_distributor_and_receivers( + provider.clone(), + reader, + vec![ + RegionId::new(1024, 1), + RegionId::new(1024, 2), + RegionId::new(1024, 3), + ], + 128, + ); + assert_eq!(receivers.len(), 3); + + // Should be okay if one of receiver is dropped. + let last = receivers.pop().unwrap(); + drop(last); + + let mut streams = receivers + .into_iter() + .map(|receiver| receiver.read(&provider, 0).unwrap()) + .collect::>(); + distributor.distribute().await.unwrap(); + let entries = streams + .get_mut(0) + .unwrap() + .try_collect::>() + .await + .unwrap(); + assert_eq!( + entries, + vec![( + 1, + WalEntry { + mutations: vec![Mutation { + op_type: OpType::Put as i32, + sequence: 1u64, + rows: None, + }], + } + )] + ); + let entries = streams + .get_mut(1) + .unwrap() + .try_collect::>() + .await + .unwrap(); + assert_eq!( + entries, + vec![( + 2, + WalEntry { + mutations: vec![Mutation { + op_type: OpType::Put as i32, + sequence: 2u64, + rows: None, + }], + } + )] + ); + } + + #[tokio::test] + async fn test_tail_corrupted_stream() { + let mut entries = vec![]; + let region1 = RegionId::new(1, 1); + let region1_expected_wal_entry = WalEntry { + mutations: vec![Mutation { + op_type: OpType::Put as i32, + sequence: 1u64, + rows: None, + }], + }; + let region2 = RegionId::new(1, 2); + let region2_expected_wal_entry = WalEntry { + mutations: vec![Mutation { + op_type: OpType::Put as i32, + sequence: 3u64, + rows: None, + }], + }; + let region3 = RegionId::new(1, 3); + let region3_expected_wal_entry = WalEntry { + mutations: vec![Mutation { + op_type: OpType::Put as i32, + sequence: 3u64, + rows: None, + }], + }; + let provider = Provider::kafka_provider("my_topic".to_string()); + entries.extend(generate_tail_corrupted_stream( + provider.clone(), + region1, + ®ion1_expected_wal_entry, + 3, + )); + entries.extend(generate_tail_corrupted_stream( + provider.clone(), + region2, + ®ion2_expected_wal_entry, + 2, + )); + entries.extend(generate_tail_corrupted_stream( + provider.clone(), + region3, + ®ion3_expected_wal_entry, + 4, + )); + + let corrupted_stream = MockRawEntryReader { entries }; + // Builds distributor and receivers + let (distributor, mut receivers) = build_wal_entry_distributor_and_receivers( + provider.clone(), + Arc::new(corrupted_stream), + vec![region1, region2, region3], + 128, + ); + assert_eq!(receivers.len(), 3); + let mut streams = receivers + .into_iter() + .map(|receiver| receiver.read(&provider, 0).unwrap()) + .collect::>(); + distributor.distribute().await.unwrap(); + + assert_eq!( + streams + .get_mut(0) + .unwrap() + .try_collect::>() + .await + .unwrap(), + vec![(0, region1_expected_wal_entry)] + ); + + assert_eq!( + streams + .get_mut(1) + .unwrap() + .try_collect::>() + .await + .unwrap(), + vec![(0, region2_expected_wal_entry)] + ); + + assert_eq!( + streams + .get_mut(2) + .unwrap() + .try_collect::>() + .await + .unwrap(), + vec![(0, region3_expected_wal_entry)] + ); + } + + #[tokio::test] + async fn test_part_corrupted_stream() { + let mut entries = vec![]; + let region1 = RegionId::new(1, 1); + let region1_expected_wal_entry = WalEntry { + mutations: vec![Mutation { + op_type: OpType::Put as i32, + sequence: 1u64, + rows: None, + }], + }; + let region2 = RegionId::new(1, 2); + let provider = Provider::kafka_provider("my_topic".to_string()); + entries.extend(generate_tail_corrupted_stream( + provider.clone(), + region1, + ®ion1_expected_wal_entry, + 3, + )); + entries.extend(vec![ + // The corrupted data. + Entry::MultiplePart(MultiplePartEntry { + provider: provider.clone(), + region_id: region2, + entry_id: 0, + headers: vec![MultiplePartHeader::First], + parts: vec![vec![1; 100]], + }), + Entry::MultiplePart(MultiplePartEntry { + provider: provider.clone(), + region_id: region2, + entry_id: 0, + headers: vec![MultiplePartHeader::First], + parts: vec![vec![1; 100]], + }), + ]); + + let corrupted_stream = MockRawEntryReader { entries }; + // Builds distributor and receivers + let (distributor, mut receivers) = build_wal_entry_distributor_and_receivers( + provider.clone(), + Arc::new(corrupted_stream), + vec![region1, region2], + 128, + ); + assert_eq!(receivers.len(), 2); + let mut streams = receivers + .into_iter() + .map(|receiver| receiver.read(&provider, 0).unwrap()) + .collect::>(); + distributor.distribute().await.unwrap(); + assert_eq!( + streams + .get_mut(0) + .unwrap() + .try_collect::>() + .await + .unwrap(), + vec![(0, region1_expected_wal_entry)] + ); + + assert_matches!( + streams + .get_mut(1) + .unwrap() + .try_collect::>() + .await + .unwrap_err(), + error::Error::CorruptedEntry { .. } + ); + } + + #[tokio::test] + async fn test_wal_entry_receiver_start_id() { + let provider = Provider::kafka_provider("my_topic".to_string()); + let reader = Arc::new(MockRawEntryReader::new(vec![ + Entry::Naive(NaiveEntry { + provider: provider.clone(), + region_id: RegionId::new(1024, 1), + entry_id: 1, + data: WalEntry { + mutations: vec![Mutation { + op_type: OpType::Put as i32, + sequence: 1u64, + rows: None, + }], + } + .encode_to_vec(), + }), + Entry::Naive(NaiveEntry { + provider: provider.clone(), + region_id: RegionId::new(1024, 2), + entry_id: 2, + data: WalEntry { + mutations: vec![Mutation { + op_type: OpType::Put as i32, + sequence: 2u64, + rows: None, + }], + } + .encode_to_vec(), + }), + Entry::Naive(NaiveEntry { + provider: provider.clone(), + region_id: RegionId::new(1024, 1), + entry_id: 3, + data: WalEntry { + mutations: vec![Mutation { + op_type: OpType::Put as i32, + sequence: 3u64, + rows: None, + }], + } + .encode_to_vec(), + }), + Entry::Naive(NaiveEntry { + provider: provider.clone(), + region_id: RegionId::new(1024, 2), + entry_id: 4, + data: WalEntry { + mutations: vec![Mutation { + op_type: OpType::Put as i32, + sequence: 4u64, + rows: None, + }], + } + .encode_to_vec(), + }), + ])); + + // Builds distributor and receivers + let (distributor, mut receivers) = build_wal_entry_distributor_and_receivers( + provider.clone(), + reader, + vec![RegionId::new(1024, 1), RegionId::new(1024, 2)], + 128, + ); + assert_eq!(receivers.len(), 2); + let mut streams = receivers + .into_iter() + .map(|receiver| receiver.read(&provider, 4).unwrap()) + .collect::>(); + distributor.distribute().await.unwrap(); + + assert_eq!( + streams + .get_mut(1) + .unwrap() + .try_collect::>() + .await + .unwrap(), + vec![( + 4, + WalEntry { + mutations: vec![Mutation { + op_type: OpType::Put as i32, + sequence: 4u64, + rows: None, + }], + } + )] + ); + } +} diff --git a/src/mito2/src/wal/wal_entry_reader.rs b/src/mito2/src/wal/entry_reader.rs similarity index 93% rename from src/mito2/src/wal/wal_entry_reader.rs rename to src/mito2/src/wal/entry_reader.rs index 82db59540059..c29a5e629d5c 100644 --- a/src/mito2/src/wal/wal_entry_reader.rs +++ b/src/mito2/src/wal/entry_reader.rs @@ -13,6 +13,7 @@ // limitations under the License. use api::v1::WalEntry; +use async_stream::stream; use common_telemetry::info; use futures::StreamExt; use prost::Message; @@ -57,7 +58,7 @@ impl WalEntryReader for LogStoreEntryReader { let LogStoreEntryReader { reader } = self; let mut stream = reader.read(ns, start_id)?; - let stream = async_stream::stream! { + let stream = stream! { let mut buffered_entry = None; while let Some(next_entry) = stream.next().await { match buffered_entry.take() { @@ -94,22 +95,11 @@ mod tests { use store_api::storage::RegionId; use crate::error::{self, Result}; + use crate::test_util::wal_util::MockRawEntryStream; + use crate::wal::entry_reader::{LogStoreEntryReader, WalEntryReader}; use crate::wal::raw_entry_reader::{EntryStream, RawEntryReader}; - use crate::wal::wal_entry_reader::{LogStoreEntryReader, WalEntryReader}; use crate::wal::EntryId; - struct MockRawEntryStream { - entries: Vec, - } - - impl RawEntryReader for MockRawEntryStream { - fn read(&self, ns: &Provider, start_id: EntryId) -> Result> { - let entries = self.entries.clone().into_iter().map(Ok); - - Ok(Box::pin(stream::iter(entries))) - } - } - #[tokio::test] async fn test_tail_corrupted_stream() { common_telemetry::init_default_ut_logging();