diff --git a/src/mito2/src/engine.rs b/src/mito2/src/engine.rs index 5c04d75b2292..546b2c21e1b9 100644 --- a/src/mito2/src/engine.rs +++ b/src/mito2/src/engine.rs @@ -21,6 +21,8 @@ mod append_mode_test; #[cfg(test)] mod basic_test; #[cfg(test)] +mod batch_open_test; +#[cfg(test)] mod catchup_test; #[cfg(test)] mod close_test; @@ -50,6 +52,7 @@ mod set_readonly_test; mod truncate_test; use std::any::Any; +use std::collections::HashMap; use std::sync::Arc; use std::time::Instant; @@ -58,22 +61,33 @@ use async_trait::async_trait; use common_error::ext::BoxedError; use common_recordbatch::SendableRecordBatchStream; use common_telemetry::tracing; +use common_wal::options::{WalOptions, WAL_OPTIONS_KEY}; +use futures::future::{join_all, try_join_all}; use object_store::manager::ObjectStoreManagerRef; use snafu::{ensure, OptionExt, ResultExt}; +use store_api::logstore::provider::Provider; use store_api::logstore::LogStore; use store_api::metadata::RegionMetadataRef; -use store_api::region_engine::{RegionEngine, RegionRole, RegionScannerRef, SetReadonlyResponse}; -use store_api::region_request::{AffectedRows, RegionRequest}; +use store_api::region_engine::{ + BatchResponses, RegionEngine, RegionRole, RegionScannerRef, SetReadonlyResponse, +}; +use store_api::region_request::{AffectedRows, RegionOpenRequest, RegionRequest}; use store_api::storage::{RegionId, ScanRequest}; -use tokio::sync::oneshot; +use tokio::sync::{oneshot, Semaphore}; use crate::config::MitoConfig; -use crate::error::{InvalidRequestSnafu, RecvSnafu, RegionNotFoundSnafu, Result}; +use crate::error::{ + InvalidRequestSnafu, JoinSnafu, RecvSnafu, RegionNotFoundSnafu, Result, SerdeJsonSnafu, +}; use crate::manifest::action::RegionEdit; use crate::metrics::HANDLE_REQUEST_ELAPSED; use crate::read::scan_region::{ScanParallism, ScanRegion, Scanner}; use crate::region::RegionUsage; use crate::request::WorkerRequest; +use crate::wal::entry_distributor::{ + build_wal_entry_distributor_and_receivers, DEFAULT_ENTRY_RECEIVER_BUFFER_SIZE, +}; +use crate::wal::raw_entry_reader::{LogStoreRawEntryReader, RawEntryReader}; use crate::worker::WorkerGroup; pub const MITO_ENGINE_NAME: &str = "mito"; @@ -211,6 +225,41 @@ struct EngineInner { workers: WorkerGroup, /// Config of the engine. config: Arc, + /// The Wal raw entry reader. + wal_raw_entry_reader: Arc, +} + +type TopicGroupedRegionOpenRequests = HashMap>; + +/// Returns requests([TopicGroupedRegionOpenRequests]) grouped by topic and remaining requests. +fn prepare_batch_open_requests( + requests: Vec<(RegionId, RegionOpenRequest)>, +) -> Result<( + TopicGroupedRegionOpenRequests, + Vec<(RegionId, RegionOpenRequest)>, +)> { + let mut topic_to_regions: HashMap> = HashMap::new(); + let mut remaining_regions: Vec<(RegionId, RegionOpenRequest)> = Vec::new(); + for (region_id, request) in requests { + let options = if let Some(options) = request.options.get(WAL_OPTIONS_KEY) { + serde_json::from_str(options).context(SerdeJsonSnafu)? + } else { + WalOptions::RaftEngine + }; + match options { + WalOptions::Kafka(options) => { + topic_to_regions + .entry(options.topic) + .or_default() + .push((region_id, request)); + } + WalOptions::RaftEngine => { + remaining_regions.push((region_id, request)); + } + } + } + + Ok((topic_to_regions, remaining_regions)) } impl EngineInner { @@ -221,9 +270,11 @@ impl EngineInner { object_store_manager: ObjectStoreManagerRef, ) -> Result { let config = Arc::new(config); + let wal_raw_entry_reader = Arc::new(LogStoreRawEntryReader::new(log_store.clone())); Ok(EngineInner { workers: WorkerGroup::start(config.clone(), log_store, object_store_manager).await?, config, + wal_raw_entry_reader, }) } @@ -244,6 +295,93 @@ impl EngineInner { Ok(region.metadata()) } + async fn open_topic_regions( + &self, + topic: String, + region_requests: Vec<(RegionId, RegionOpenRequest)>, + ) -> Result)>> { + let region_ids = region_requests + .iter() + .map(|(region_id, _)| *region_id) + .collect::>(); + let provider = Provider::kafka_provider(topic); + let (distributor, entry_receivers) = build_wal_entry_distributor_and_receivers( + provider, + self.wal_raw_entry_reader.clone(), + ®ion_ids, + DEFAULT_ENTRY_RECEIVER_BUFFER_SIZE, + ); + + let mut responses = Vec::with_capacity(region_requests.len()); + for ((region_id, request), entry_receiver) in + region_requests.into_iter().zip(entry_receivers) + { + let (request, receiver) = + WorkerRequest::new_open_region_request(region_id, request, Some(entry_receiver)); + self.workers.submit_to_worker(region_id, request).await?; + responses.push(async move { receiver.await.context(RecvSnafu)? }); + } + + // Waits for entries distribution. + let distribution = + common_runtime::spawn_read(async move { distributor.distribute().await }); + // Waits for worker returns. + let responses = join_all(responses).await; + + distribution.await.context(JoinSnafu)??; + Ok(region_ids.into_iter().zip(responses).collect()) + } + + async fn handle_batch_open_requests( + &self, + parallelism: usize, + requests: Vec<(RegionId, RegionOpenRequest)>, + ) -> Result)>> { + let semaphore = Arc::new(Semaphore::new(parallelism)); + let (topic_to_region_requests, remaining_region_requests) = + prepare_batch_open_requests(requests)?; + let mut responses = + Vec::with_capacity(topic_to_region_requests.len() + remaining_region_requests.len()); + + if !topic_to_region_requests.is_empty() { + let mut tasks = Vec::with_capacity(topic_to_region_requests.len()); + for (topic, region_requests) in topic_to_region_requests { + let semaphore_moved = semaphore.clone(); + tasks.push(async move { + // Safety: semaphore must exist + let _permit = semaphore_moved.acquire().await.unwrap(); + self.open_topic_regions(topic, region_requests).await + }) + } + let r = try_join_all(tasks).await?; + responses.extend(r.into_iter().flatten()); + } + + if !remaining_region_requests.is_empty() { + let mut tasks = Vec::with_capacity(remaining_region_requests.len()); + let mut region_ids = Vec::with_capacity(remaining_region_requests.len()); + for (region_id, request) in remaining_region_requests { + let semaphore_moved = semaphore.clone(); + region_ids.push(region_id); + tasks.push(async move { + // Safety: semaphore must exist + let _permit = semaphore_moved.acquire().await.unwrap(); + let (request, receiver) = + WorkerRequest::new_open_region_request(region_id, request, None); + + self.workers.submit_to_worker(region_id, request).await?; + + receiver.await.context(RecvSnafu)? + }) + } + + let results = join_all(tasks).await; + responses.extend(region_ids.into_iter().zip(results)); + } + + Ok(responses) + } + /// Handles [RegionRequest] and return its executed result. async fn handle_request( &self, @@ -323,6 +461,30 @@ impl RegionEngine for MitoEngine { MITO_ENGINE_NAME } + #[tracing::instrument(skip_all)] + async fn handle_batch_open_requests( + &self, + parallelism: usize, + requests: Vec<(RegionId, RegionOpenRequest)>, + ) -> Result { + // TODO(weny): add metrics. + self.inner + .handle_batch_open_requests(parallelism, requests) + .await + .map(|responses| { + responses + .into_iter() + .map(|(region_id, response)| { + ( + region_id, + response.map(RegionResponse::new).map_err(BoxedError::new), + ) + }) + .collect::>() + }) + .map_err(BoxedError::new) + } + #[tracing::instrument(skip_all)] async fn handle_request( &self, @@ -421,6 +583,7 @@ impl MitoEngine { config.sanitize(data_home)?; let config = Arc::new(config); + let wal_raw_entry_reader = Arc::new(LogStoreRawEntryReader::new(log_store.clone())); Ok(MitoEngine { inner: Arc::new(EngineInner { workers: WorkerGroup::start_for_test( @@ -433,6 +596,7 @@ impl MitoEngine { ) .await?, config, + wal_raw_entry_reader, }), }) } diff --git a/src/mito2/src/engine/batch_open_test.rs b/src/mito2/src/engine/batch_open_test.rs new file mode 100644 index 000000000000..793e79ca56cf --- /dev/null +++ b/src/mito2/src/engine/batch_open_test.rs @@ -0,0 +1,203 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::HashMap; + +use api::v1::Rows; +use common_error::ext::ErrorExt; +use common_error::status_code::StatusCode; +use common_recordbatch::RecordBatches; +use common_wal::options::{KafkaWalOptions, WalOptions, WAL_OPTIONS_KEY}; +use rstest::rstest; +use rstest_reuse::apply; +use store_api::region_engine::RegionEngine; +use store_api::region_request::{RegionOpenRequest, RegionRequest}; +use store_api::storage::{RegionId, ScanRequest}; + +use super::MitoEngine; +use crate::config::MitoConfig; +use crate::test_util::{ + build_rows, kafka_log_store_factory, multiple_log_store_factories, + prepare_test_for_kafka_log_store, put_rows, raft_engine_log_store_factory, rows_schema, + CreateRequestBuilder, LogStoreFactory, TestEnv, +}; + +#[apply(multiple_log_store_factories)] +async fn test_batch_open(factory: Option) { + common_telemetry::init_default_ut_logging(); + let Some(factory) = factory else { + return; + }; + let mut env = + TestEnv::with_prefix("open-batch-regions").with_log_store_factory(factory.clone()); + let engine = env.create_engine(MitoConfig::default()).await; + let topic = prepare_test_for_kafka_log_store(&factory).await; + + let num_regions = 3u32; + let region_dir = |region_id| format!("test/{region_id}"); + let mut region_schema = HashMap::new(); + + for id in 1..=num_regions { + let engine = engine.clone(); + let topic = topic.clone(); + let region_id = RegionId::new(1, id); + let request = CreateRequestBuilder::new() + .region_dir(®ion_dir(region_id)) + .kafka_topic(topic.clone()) + .build(); + let column_schemas = rows_schema(&request); + region_schema.insert(region_id, column_schemas); + engine + .handle_request(region_id, RegionRequest::Create(request)) + .await + .unwrap(); + } + + for i in 0..10 { + for region_number in 1..=num_regions { + let region_id = RegionId::new(1, region_number); + let rows = Rows { + schema: region_schema[®ion_id].clone(), + rows: build_rows( + (region_number as usize) * 120 + i as usize, + (region_number as usize) * 120 + i as usize + 1, + ), + }; + put_rows(&engine, region_id, rows).await; + } + } + + let assert_result = |engine: MitoEngine| async move { + for i in 1..num_regions { + let region_id = RegionId::new(1, i); + let request = ScanRequest::default(); + let stream = engine.scan_to_stream(region_id, request).await.unwrap(); + let batches = RecordBatches::try_collect(stream).await.unwrap(); + let mut expected = String::new(); + expected.push_str( + "+-------+---------+---------------------+\n| tag_0 | field_0 | ts |\n+-------+---------+---------------------+\n", + ); + for row in 0..10 { + expected.push_str(&format!( + "| {} | {}.0 | 1970-01-01T00:{:02}:{:02} |\n", + i * 120 + row, + i * 120 + row, + 2 * i, + row + )); + } + expected.push_str("+-------+---------+---------------------+"); + assert_eq!(expected, batches.pretty_print().unwrap()); + } + }; + assert_result(engine.clone()).await; + + let mut options = HashMap::new(); + if let Some(topic) = &topic { + options.insert( + WAL_OPTIONS_KEY.to_string(), + serde_json::to_string(&WalOptions::Kafka(KafkaWalOptions { + topic: topic.to_string(), + })) + .unwrap(), + ); + }; + let mut requests = (1..=num_regions) + .map(|id| { + let region_id = RegionId::new(1, id); + ( + region_id, + RegionOpenRequest { + engine: String::new(), + region_dir: region_dir(region_id), + options: options.clone(), + skip_wal_replay: false, + }, + ) + }) + .collect::>(); + requests.push(( + RegionId::new(1, 4), + RegionOpenRequest { + engine: String::new(), + region_dir: "no-exists".to_string(), + options: options.clone(), + skip_wal_replay: false, + }, + )); + + // Reopen engine. + let engine = env.reopen_engine(engine, MitoConfig::default()).await; + let mut results = engine + .handle_batch_open_requests(4, requests) + .await + .unwrap(); + let (_, result) = results.pop().unwrap(); + assert_eq!( + result.unwrap_err().status_code(), + StatusCode::RegionNotFound + ); + for (_, result) in results { + assert!(result.is_ok()); + } + assert_result(engine.clone()).await; +} + +#[apply(multiple_log_store_factories)] +async fn test_batch_open_err(factory: Option) { + common_telemetry::init_default_ut_logging(); + let Some(factory) = factory else { + return; + }; + let mut env = + TestEnv::with_prefix("open-batch-regions-err").with_log_store_factory(factory.clone()); + let engine = env.create_engine(MitoConfig::default()).await; + let topic = prepare_test_for_kafka_log_store(&factory).await; + let mut options = HashMap::new(); + if let Some(topic) = &topic { + options.insert( + WAL_OPTIONS_KEY.to_string(), + serde_json::to_string(&WalOptions::Kafka(KafkaWalOptions { + topic: topic.to_string(), + })) + .unwrap(), + ); + }; + let num_regions = 3u32; + let region_dir = "test".to_string(); + let requests = (1..=num_regions) + .map(|id| { + ( + RegionId::new(1, id), + RegionOpenRequest { + engine: String::new(), + region_dir: region_dir.to_string(), + options: options.clone(), + skip_wal_replay: false, + }, + ) + }) + .collect::>(); + + let results = engine + .handle_batch_open_requests(3, requests) + .await + .unwrap(); + for (_, result) in results { + assert_eq!( + result.unwrap_err().status_code(), + StatusCode::RegionNotFound + ); + } +} diff --git a/src/mito2/src/region/opener.rs b/src/mito2/src/region/opener.rs index ed9cbf037b30..f046471cfc2b 100644 --- a/src/mito2/src/region/opener.rs +++ b/src/mito2/src/region/opener.rs @@ -20,6 +20,7 @@ use std::sync::Arc; use common_telemetry::{debug, error, info, warn}; use common_wal::options::WalOptions; +use futures::future::BoxFuture; use futures::StreamExt; use object_store::manager::ObjectStoreManagerRef; use object_store::util::{join_dir, normalize_dir}; @@ -48,6 +49,7 @@ use crate::schedule::scheduler::SchedulerRef; use crate::sst::file_purger::LocalFilePurger; use crate::sst::index::intermediate::IntermediateManager; use crate::time_provider::{StdTimeProvider, TimeProviderRef}; +use crate::wal::entry_reader::WalEntryReader; use crate::wal::{EntryId, Wal}; /// Builder to create a new [MitoRegion] or open an existing one. @@ -64,6 +66,7 @@ pub(crate) struct RegionOpener { intermediate_manager: IntermediateManager, time_provider: Option, stats: ManifestStats, + wal_entry_reader: Option>, } impl RegionOpener { @@ -89,6 +92,7 @@ impl RegionOpener { intermediate_manager, time_provider: None, stats: Default::default(), + wal_entry_reader: None, } } @@ -104,6 +108,16 @@ impl RegionOpener { Ok(self) } + /// If a [WalEntryReader] is set, the [RegionOpener] will use [WalEntryReader] instead of + /// constructing a new one from scratch. + pub(crate) fn wal_entry_reader( + mut self, + wal_entry_reader: Option>, + ) -> Self { + self.wal_entry_reader = wal_entry_reader; + self + } + /// Sets options for the region. pub(crate) fn options(mut self, options: RegionOptions) -> Self { self.options = Some(options); @@ -165,8 +179,8 @@ impl RegionOpener { } } let options = self.options.take().unwrap(); - let provider = self.provider(&options.wal_options); let object_store = self.object_store(&options.storage)?.clone(); + let provider = self.provider(&options.wal_options); // Create a manifest manager for this region and writes regions to the manifest file. let region_manifest_options = self.manifest_options(config, &options)?; @@ -231,7 +245,7 @@ impl RegionOpener { /// /// Returns error if the region doesn't exist. pub(crate) async fn open( - self, + mut self, config: &MitoConfig, wal: &Wal, ) -> Result { @@ -267,7 +281,7 @@ impl RegionOpener { /// Tries to open the region and returns `None` if the region directory is empty. async fn maybe_open( - &self, + &mut self, config: &MitoConfig, wal: &Wal, ) -> Result> { @@ -288,6 +302,11 @@ impl RegionOpener { let region_id = self.region_id; let provider = self.provider(®ion_options.wal_options); + let wal_entry_reader = self + .wal_entry_reader + .take() + .unwrap_or_else(|| wal.wal_entry_reader(&provider, region_id)); + let on_region_opened = wal.on_region_opened(); let object_store = self.object_store(®ion_options.storage)?.clone(); debug!("Open region {} with options: {:?}", region_id, self.options); @@ -331,12 +350,13 @@ impl RegionOpener { region_id ); replay_memtable( - wal, &provider, + wal_entry_reader, region_id, flushed_entry_id, &version_control, config.allow_stale_entries, + on_region_opened, ) .await?; } else { @@ -357,7 +377,7 @@ impl RegionOpener { RegionState::ReadOnly, )), file_purger, - provider, + provider: provider.clone(), last_flush_millis: AtomicI64::new(time_provider.current_time_millis()), time_provider, memtable_builder, @@ -448,21 +468,25 @@ pub(crate) fn check_recovered_region( } /// Replays the mutations from WAL and inserts mutations to memtable of given region. -pub(crate) async fn replay_memtable( - wal: &Wal, +pub(crate) async fn replay_memtable( provider: &Provider, + mut wal_entry_reader: Box, region_id: RegionId, flushed_entry_id: EntryId, version_control: &VersionControlRef, allow_stale_entries: bool, -) -> Result { + on_region_opened: F, +) -> Result +where + F: FnOnce(RegionId, EntryId, &Provider) -> BoxFuture> + Send, +{ let mut rows_replayed = 0; // Last entry id should start from flushed entry id since there might be no // data in the WAL. let mut last_entry_id = flushed_entry_id; let replay_from_entry_id = flushed_entry_id + 1; - let mut wal_stream = wal.scan(region_id, replay_from_entry_id, provider)?; + let mut wal_stream = wal_entry_reader.read(provider, replay_from_entry_id)?; while let Some(res) = wal_stream.next().await { let (entry_id, entry) = res?; if entry_id <= flushed_entry_id { @@ -496,7 +520,7 @@ pub(crate) async fn replay_memtable( // TODO(weny): We need to update `flushed_entry_id` in the region manifest // to avoid reading potentially incomplete entries in the future. - wal.obsolete(region_id, flushed_entry_id, provider).await?; + (on_region_opened)(region_id, flushed_entry_id, provider).await?; info!( "Replay WAL for region: {}, rows recovered: {}, last entry id: {}", diff --git a/src/mito2/src/request.rs b/src/mito2/src/request.rs index b7eed9327561..efd796b44db9 100644 --- a/src/mito2/src/request.rs +++ b/src/mito2/src/request.rs @@ -44,6 +44,7 @@ use crate::error::{ }; use crate::manifest::action::RegionEdit; use crate::metrics::COMPACTION_ELAPSED_TOTAL; +use crate::wal::entry_distributor::WalEntryReceiver; use crate::wal::EntryId; /// Request to write a region. @@ -497,6 +498,22 @@ pub(crate) enum WorkerRequest { } impl WorkerRequest { + pub(crate) fn new_open_region_request( + region_id: RegionId, + request: RegionOpenRequest, + entry_receiver: Option, + ) -> (WorkerRequest, Receiver>) { + let (sender, receiver) = oneshot::channel(); + + let worker_request = WorkerRequest::Ddl(SenderDdlRequest { + region_id, + sender: sender.into(), + request: DdlRequest::Open((request, entry_receiver)), + }); + + (worker_request, receiver) + } + /// Converts request from a [RegionRequest]. pub(crate) fn try_from_region_request( region_id: RegionId, @@ -531,7 +548,7 @@ impl WorkerRequest { RegionRequest::Open(v) => WorkerRequest::Ddl(SenderDdlRequest { region_id, sender: sender.into(), - request: DdlRequest::Open(v), + request: DdlRequest::Open((v, None)), }), RegionRequest::Close(v) => WorkerRequest::Ddl(SenderDdlRequest { region_id, @@ -585,7 +602,7 @@ impl WorkerRequest { pub(crate) enum DdlRequest { Create(RegionCreateRequest), Drop(RegionDropRequest), - Open(RegionOpenRequest), + Open((RegionOpenRequest, Option)), Close(RegionCloseRequest), Alter(RegionAlterRequest), Flush(RegionFlushRequest), diff --git a/src/mito2/src/wal.rs b/src/mito2/src/wal.rs index c0cca52dcbbd..042fdf926478 100644 --- a/src/mito2/src/wal.rs +++ b/src/mito2/src/wal.rs @@ -30,6 +30,7 @@ use std::sync::Arc; use api::v1::WalEntry; use common_error::ext::BoxedError; +use futures::future::BoxFuture; use futures::stream::BoxStream; use prost::Message; use snafu::ResultExt; @@ -86,6 +87,39 @@ impl Wal { } } + /// Returns a [OnRegionOpened] function. + pub(crate) fn on_region_opened( + &self, + ) -> impl FnOnce(RegionId, EntryId, &Provider) -> BoxFuture> { + let store = self.store.clone(); + move |region_id, last_entry_id, provider| -> BoxFuture<'_, Result<()>> { + Box::pin(async move { + store + .obsolete(provider, last_entry_id) + .await + .map_err(BoxedError::new) + .context(DeleteWalSnafu { region_id }) + }) + } + } + + /// Returns a [WalEntryReader] + pub(crate) fn wal_entry_reader( + &self, + provider: &Provider, + region_id: RegionId, + ) -> Box { + match provider { + Provider::RaftEngine(_) => Box::new(LogStoreEntryReader::new( + LogStoreRawEntryReader::new(self.store.clone()), + )), + Provider::Kafka(_) => Box::new(LogStoreEntryReader::new(RegionRawEntryReader::new( + LogStoreRawEntryReader::new(self.store.clone()), + region_id, + ))), + } + } + /// Scan entries of specific region starting from `start_id` (inclusive). pub fn scan<'a>( &'a self, diff --git a/src/mito2/src/wal/entry_distributor.rs b/src/mito2/src/wal/entry_distributor.rs index dacb1d3ae9dc..bad80fa9d312 100644 --- a/src/mito2/src/wal/entry_distributor.rs +++ b/src/mito2/src/wal/entry_distributor.rs @@ -20,7 +20,7 @@ use api::v1::WalEntry; use async_stream::stream; use common_telemetry::{debug, error}; use futures::future::join_all; -use snafu::ensure; +use snafu::{ensure, OptionExt}; use store_api::logstore::entry::Entry; use store_api::logstore::provider::Provider; use store_api::storage::RegionId; @@ -101,9 +101,9 @@ impl WalEntryDistributor { pub(crate) struct WalEntryReceiver { region_id: RegionId, /// Receives the [Entry] from the [WalEntryDistributor]. - entry_receiver: Receiver, + entry_receiver: Option>, /// Sends the `start_id` to the [WalEntryDistributor]. - arg_sender: oneshot::Sender, + arg_sender: Option>, } impl WalEntryReceiver { @@ -114,19 +114,22 @@ impl WalEntryReceiver { ) -> Self { Self { region_id, - entry_receiver, - arg_sender, + entry_receiver: Some(entry_receiver), + arg_sender: Some(arg_sender), } } } impl WalEntryReader for WalEntryReceiver { - fn read(self, provider: &Provider, start_id: EntryId) -> Result> { - let WalEntryReceiver { - region_id: expected_region_id, - mut entry_receiver, - arg_sender, - } = self; + fn read(&mut self, provider: &Provider, start_id: EntryId) -> Result> { + let mut arg_sender = + self.arg_sender + .take() + .with_context(|| error::InvalidWalReadRequestSnafu { + reason: format!("Call WalEntryReceiver multiple time, start_id: {start_id}"), + })?; + // Safety: check via arg_sender + let mut entry_receiver = self.entry_receiver.take().unwrap(); if arg_sender.send(start_id).is_err() { return error::InvalidWalReadRequestSnafu { @@ -167,6 +170,9 @@ struct EntryReceiver { sender: Sender, } +/// The default buffer size of the [Entry] receiver. +pub const DEFAULT_ENTRY_RECEIVER_BUFFER_SIZE: usize = 2048; + /// Returns [WalEntryDistributor] and batch [WalEntryReceiver]s. /// /// ### Note: @@ -186,14 +192,14 @@ struct EntryReceiver { pub fn build_wal_entry_distributor_and_receivers( provider: Provider, raw_wal_reader: Arc, - region_ids: Vec, + region_ids: &[RegionId], buffer_size: usize, ) -> (WalEntryDistributor, Vec) { let mut senders = HashMap::with_capacity(region_ids.len()); let mut readers = Vec::with_capacity(region_ids.len()); let mut arg_receivers = Vec::with_capacity(region_ids.len()); - for region_id in region_ids { + for ®ion_id in region_ids { let (entry_sender, entry_receiver) = mpsc::channel(buffer_size); let (arg_sender, arg_receiver) = oneshot::channel(); @@ -257,7 +263,7 @@ mod tests { let (distributor, receivers) = build_wal_entry_distributor_and_receivers( provider, reader, - vec![RegionId::new(1024, 1), RegionId::new(1025, 1)], + &[RegionId::new(1024, 1), RegionId::new(1025, 1)], 128, ); @@ -317,7 +323,7 @@ mod tests { let (distributor, mut receivers) = build_wal_entry_distributor_and_receivers( provider.clone(), reader, - vec![ + &[ RegionId::new(1024, 1), RegionId::new(1024, 2), RegionId::new(1024, 3), @@ -331,7 +337,7 @@ mod tests { drop(last); let mut streams = receivers - .into_iter() + .iter_mut() .map(|receiver| receiver.read(&provider, 0).unwrap()) .collect::>(); distributor.distribute().await.unwrap(); @@ -427,12 +433,12 @@ mod tests { let (distributor, mut receivers) = build_wal_entry_distributor_and_receivers( provider.clone(), Arc::new(corrupted_stream), - vec![region1, region2, region3], + &[region1, region2, region3], 128, ); assert_eq!(receivers.len(), 3); let mut streams = receivers - .into_iter() + .iter_mut() .map(|receiver| receiver.read(&provider, 0).unwrap()) .collect::>(); distributor.distribute().await.unwrap(); @@ -510,12 +516,12 @@ mod tests { let (distributor, mut receivers) = build_wal_entry_distributor_and_receivers( provider.clone(), Arc::new(corrupted_stream), - vec![region1, region2], + &[region1, region2], 128, ); assert_eq!(receivers.len(), 2); let mut streams = receivers - .into_iter() + .iter_mut() .map(|receiver| receiver.read(&provider, 0).unwrap()) .collect::>(); distributor.distribute().await.unwrap(); @@ -602,12 +608,12 @@ mod tests { let (distributor, mut receivers) = build_wal_entry_distributor_and_receivers( provider.clone(), reader, - vec![RegionId::new(1024, 1), RegionId::new(1024, 2)], + &[RegionId::new(1024, 1), RegionId::new(1024, 2)], 128, ); assert_eq!(receivers.len(), 2); let mut streams = receivers - .into_iter() + .iter_mut() .map(|receiver| receiver.read(&provider, 4).unwrap()) .collect::>(); distributor.distribute().await.unwrap(); diff --git a/src/mito2/src/wal/entry_reader.rs b/src/mito2/src/wal/entry_reader.rs index c29a5e629d5c..5db4eb9efe5f 100644 --- a/src/mito2/src/wal/entry_reader.rs +++ b/src/mito2/src/wal/entry_reader.rs @@ -38,8 +38,10 @@ pub(crate) fn decode_raw_entry(raw_entry: Entry) -> Result<(EntryId, WalEntry)> } /// [WalEntryReader] provides the ability to read and decode entries from the underlying store. +/// +/// Notes: It will consume the inner stream and only allow invoking the `read` at once. pub(crate) trait WalEntryReader: Send + Sync { - fn read(self, ns: &'_ Provider, start_id: EntryId) -> Result>; + fn read(&mut self, ns: &'_ Provider, start_id: EntryId) -> Result>; } /// A Reader reads the [RawEntry] from [RawEntryReader] and decodes [RawEntry] into [WalEntry]. @@ -54,7 +56,7 @@ impl LogStoreEntryReader { } impl WalEntryReader for LogStoreEntryReader { - fn read(self, ns: &'_ Provider, start_id: EntryId) -> Result> { + fn read(&mut self, ns: &'_ Provider, start_id: EntryId) -> Result> { let LogStoreEntryReader { reader } = self; let mut stream = reader.read(ns, start_id)?; @@ -136,7 +138,7 @@ mod tests { ], }; - let reader = LogStoreEntryReader::new(raw_entry_stream); + let mut reader = LogStoreEntryReader::new(raw_entry_stream); let entries = reader .read(&provider, 0) .unwrap() @@ -172,7 +174,7 @@ mod tests { ], }; - let reader = LogStoreEntryReader::new(raw_entry_stream); + let mut reader = LogStoreEntryReader::new(raw_entry_stream); let err = reader .read(&provider, 0) .unwrap() diff --git a/src/mito2/src/worker.rs b/src/mito2/src/worker.rs index 2122a052c839..b34d5b8566ec 100644 --- a/src/mito2/src/worker.rs +++ b/src/mito2/src/worker.rs @@ -721,8 +721,8 @@ impl RegionWorkerLoop { let res = match ddl.request { DdlRequest::Create(req) => self.handle_create_request(ddl.region_id, req).await, DdlRequest::Drop(_) => self.handle_drop_request(ddl.region_id).await, - DdlRequest::Open(req) => { - self.handle_open_request(ddl.region_id, req, ddl.sender) + DdlRequest::Open((req, wal_entry_receiver)) => { + self.handle_open_request(ddl.region_id, req, wal_entry_receiver, ddl.sender) .await; continue; } diff --git a/src/mito2/src/worker/handle_catchup.rs b/src/mito2/src/worker/handle_catchup.rs index 595b6ee56635..93a84b92e272 100644 --- a/src/mito2/src/worker/handle_catchup.rs +++ b/src/mito2/src/worker/handle_catchup.rs @@ -73,13 +73,16 @@ impl RegionWorkerLoop { let flushed_entry_id = region.version_control.current().last_entry_id; info!("Trying to replay memtable for region: {region_id}, flushed entry id: {flushed_entry_id}"); let timer = Instant::now(); + let wal_entry_reader = self.wal.wal_entry_reader(®ion.provider, region_id); + let on_region_opened = self.wal.on_region_opened(); let last_entry_id = replay_memtable( - &self.wal, ®ion.provider, + wal_entry_reader, region_id, flushed_entry_id, ®ion.version_control, self.config.allow_stale_entries, + on_region_opened, ) .await?; info!( diff --git a/src/mito2/src/worker/handle_open.rs b/src/mito2/src/worker/handle_open.rs index 6c63abf1535c..840e19583c49 100644 --- a/src/mito2/src/worker/handle_open.rs +++ b/src/mito2/src/worker/handle_open.rs @@ -29,6 +29,7 @@ use crate::error::{ use crate::metrics::REGION_COUNT; use crate::region::opener::RegionOpener; use crate::request::OptionOutputTx; +use crate::wal::entry_distributor::WalEntryReceiver; use crate::worker::handle_drop::remove_region_dir_once; use crate::worker::{RegionWorkerLoop, DROPPING_MARKER_FILE}; @@ -66,6 +67,7 @@ impl RegionWorkerLoop { &mut self, region_id: RegionId, request: RegionOpenRequest, + wal_entry_receiver: Option, sender: OptionOutputTx, ) { if self.regions.is_region_exists(region_id) { @@ -95,6 +97,7 @@ impl RegionWorkerLoop { ) .skip_wal_replay(request.skip_wal_replay) .cache(Some(self.cache_manager.clone())) + .wal_entry_reader(wal_entry_receiver.map(|receiver| Box::new(receiver) as _)) .parse_options(request.options) { Ok(opener) => opener, diff --git a/src/store-api/Cargo.toml b/src/store-api/Cargo.toml index 821f98c6fbf7..5b6f5ddc1603 100644 --- a/src/store-api/Cargo.toml +++ b/src/store-api/Cargo.toml @@ -26,8 +26,8 @@ serde.workspace = true serde_json.workspace = true snafu.workspace = true strum.workspace = true +tokio.workspace = true [dev-dependencies] async-stream.workspace = true serde_json.workspace = true -tokio.workspace = true diff --git a/src/store-api/src/region_engine.rs b/src/store-api/src/region_engine.rs index 3f9d58a95588..337effbd0374 100644 --- a/src/store-api/src/region_engine.rs +++ b/src/store-api/src/region_engine.rs @@ -26,12 +26,14 @@ use common_query::error::ExecuteRepeatedlySnafu; use common_recordbatch::SendableRecordBatchStream; use datafusion_physical_plan::{DisplayAs, DisplayFormatType}; use datatypes::schema::SchemaRef; +use futures::future::join_all; use serde::{Deserialize, Serialize}; use snafu::OptionExt; +use tokio::sync::Semaphore; use crate::logstore::entry; use crate::metadata::RegionMetadataRef; -use crate::region_request::RegionRequest; +use crate::region_request::{RegionOpenRequest, RegionRequest}; use crate::storage::{RegionId, ScanRequest}; /// The result of setting readonly for the region. @@ -177,11 +179,38 @@ pub trait RegionScanner: Debug + DisplayAs + Send + Sync { pub type RegionScannerRef = Arc; +pub type BatchResponses = Vec<(RegionId, Result)>; + #[async_trait] pub trait RegionEngine: Send + Sync { /// Name of this engine fn name(&self) -> &str; + /// Handles batch open region requests. + async fn handle_batch_open_requests( + &self, + parallelism: usize, + requests: Vec<(RegionId, RegionOpenRequest)>, + ) -> Result { + let semaphore = Arc::new(Semaphore::new(parallelism)); + let mut tasks = Vec::with_capacity(requests.len()); + + for (region_id, request) in requests { + let semaphore_moved = semaphore.clone(); + + tasks.push(async move { + // Safety: semaphore must exist + let _permit = semaphore_moved.acquire().await.unwrap(); + let result = self + .handle_request(region_id, RegionRequest::Open(request)) + .await; + (region_id, result) + }); + } + + Ok(join_all(tasks).await) + } + /// Handles non-query request to the region. Returns the count of affected rows. async fn handle_request( &self,