Skip to content

Commit

Permalink
feat: implement the handle_batch_open_requests
Browse files Browse the repository at this point in the history
  • Loading branch information
WenyXu committed May 30, 2024
1 parent 276da59 commit 188f6e6
Show file tree
Hide file tree
Showing 11 changed files with 304 additions and 40 deletions.
160 changes: 157 additions & 3 deletions src/mito2/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ mod set_readonly_test;
mod truncate_test;

use std::any::Any;
use std::collections::HashMap;
use std::sync::Arc;
use std::time::Instant;

Expand All @@ -58,22 +59,29 @@ use async_trait::async_trait;
use common_error::ext::BoxedError;
use common_recordbatch::SendableRecordBatchStream;
use common_telemetry::tracing;
use common_wal::options::{WalOptions, WAL_OPTIONS_KEY};
use futures::future::try_join_all;
use object_store::manager::ObjectStoreManagerRef;
use snafu::{ensure, OptionExt, ResultExt};
use store_api::logstore::provider::Provider;
use store_api::logstore::LogStore;
use store_api::metadata::RegionMetadataRef;
use store_api::region_engine::{RegionEngine, RegionRole, RegionScannerRef, SetReadonlyResponse};
use store_api::region_request::{AffectedRows, RegionRequest};
use store_api::region_request::{AffectedRows, RegionOpenRequest, RegionRequest};
use store_api::storage::{RegionId, ScanRequest};
use tokio::sync::oneshot;
use tokio::sync::{oneshot, Semaphore};

use crate::config::MitoConfig;
use crate::error::{InvalidRequestSnafu, RecvSnafu, RegionNotFoundSnafu, Result};
use crate::error::{
InvalidRequestSnafu, JoinSnafu, RecvSnafu, RegionNotFoundSnafu, Result, SerdeJsonSnafu,
};
use crate::manifest::action::RegionEdit;
use crate::metrics::HANDLE_REQUEST_ELAPSED;
use crate::read::scan_region::{ScanParallism, ScanRegion, Scanner};
use crate::region::RegionUsage;
use crate::request::WorkerRequest;
use crate::wal::entry_distributor::build_wal_entry_distributor_and_receivers;
use crate::wal::raw_entry_reader::{LogStoreRawEntryReader, RawEntryReader};
use crate::worker::WorkerGroup;

pub const MITO_ENGINE_NAME: &str = "mito";
Expand Down Expand Up @@ -211,6 +219,45 @@ struct EngineInner {
workers: WorkerGroup,
/// Config of the engine.
config: Arc<MitoConfig>,
/// The Wal raw entry reader.
wal_raw_entry_reader: Arc<dyn RawEntryReader>,
}

type TopicGroupedRegionOpenRequests = HashMap<String, Vec<(RegionId, RegionOpenRequest)>>;

/// **For Kafka Remote Wal:**
/// Group requests by topic.
///
/// **For RaftEngine Wal:**
/// Do nothing
fn prepare_batch_open_requests(
requests: Vec<(RegionId, RegionOpenRequest)>,
) -> Result<(
TopicGroupedRegionOpenRequests,
Vec<(RegionId, RegionOpenRequest)>,
)> {
let mut topic_to_regions: HashMap<String, Vec<(RegionId, RegionOpenRequest)>> = HashMap::new();
let mut remaining_regions: Vec<(RegionId, RegionOpenRequest)> = Vec::new();
for (region_id, request) in requests {
let options = if let Some(options) = request.options.get(WAL_OPTIONS_KEY) {
serde_json::from_str(options).context(SerdeJsonSnafu)?
} else {
WalOptions::RaftEngine
};
match options {
WalOptions::Kafka(options) => {
topic_to_regions
.entry(options.topic)
.or_default()
.push((region_id, request));
}
WalOptions::RaftEngine => {
remaining_regions.push((region_id, request));
}
}
}

Ok((topic_to_regions, remaining_regions))
}

impl EngineInner {
Expand All @@ -221,9 +268,11 @@ impl EngineInner {
object_store_manager: ObjectStoreManagerRef,
) -> Result<EngineInner> {
let config = Arc::new(config);
let wal_raw_entry_reader = Arc::new(LogStoreRawEntryReader::new(log_store.clone()));
Ok(EngineInner {
workers: WorkerGroup::start(config.clone(), log_store, object_store_manager).await?,
config,
wal_raw_entry_reader,
})
}

Expand All @@ -244,6 +293,96 @@ impl EngineInner {
Ok(region.metadata())
}

async fn open_topic_regions(
&self,
topic: String,
region_requests: Vec<(RegionId, RegionOpenRequest)>,
) -> Result<()> {
let mut response_receivers = Vec::with_capacity(region_requests.len());
let region_ids = region_requests
.iter()
.map(|(region_id, _)| *region_id)
.collect();
let provider = Provider::kafka_provider(topic);
let (distributor, entry_receivers) = build_wal_entry_distributor_and_receivers(
provider,
self.wal_raw_entry_reader.clone(),
region_ids,
2048,
);

for (region_id, request, receiver) in region_requests.into_iter().zip(entry_receivers).map(
|((region_id, request), entry_receiver)| {
let (request, receiver) = WorkerRequest::new_open_region_request(
region_id,
request,
Some(entry_receiver),
);

(region_id, request, receiver)
},
) {
self.workers.submit_to_worker(region_id, request).await?;
response_receivers.push((region_id, receiver));
}
// Waits for entries distribution.
let distribution =
common_runtime::spawn_read(async move { distributor.distribute().await });

// Waits for worker returns.
try_join_all(
response_receivers
.into_iter()
.map(|(_, receiver)| async { receiver.await.context(RecvSnafu)? }),
)
.await?;

let _ = distribution.await.context(JoinSnafu)?;
Ok(())
}

async fn handle_batch_open_requests(
&self,
parallelism: usize,
requests: Vec<(RegionId, RegionOpenRequest)>,
) -> Result<()> {
let semaphore = Arc::new(Semaphore::new(parallelism));
let (topic_to_region_requests, remaining_region_requests) =
prepare_batch_open_requests(requests)?;

if !topic_to_region_requests.is_empty() {
let mut tasks = Vec::with_capacity(topic_to_region_requests.len());
for (topic, region_requests) in topic_to_region_requests {
let self_ref = self;
let semaphore_moved = semaphore.clone();
tasks.push(async move {
// Safety: semaphore must exist
let _permit = semaphore_moved.acquire().await.unwrap();
self_ref.open_topic_regions(topic, region_requests).await
})
}
try_join_all(tasks).await?;
}

if !remaining_region_requests.is_empty() {
let mut tasks = Vec::with_capacity(remaining_region_requests.len());
for (region_id, request) in remaining_region_requests {
tasks.push(async move {
let (request, receiver) =
WorkerRequest::new_open_region_request(region_id, request, None);

self.workers.submit_to_worker(region_id, request).await?;

receiver.await.context(RecvSnafu)?
})
}

try_join_all(tasks).await?;
}

Ok(())
}

/// Handles [RegionRequest] and return its executed result.
async fn handle_request(
&self,
Expand Down Expand Up @@ -323,6 +462,19 @@ impl RegionEngine for MitoEngine {
MITO_ENGINE_NAME
}

#[tracing::instrument(skip_all)]
async fn handle_batch_open_requests(
&self,
parallelism: usize,
requests: Vec<(RegionId, RegionOpenRequest)>,
) -> Result<(), BoxedError> {
// TODO(weny): add metrics.
self.inner
.handle_batch_open_requests(parallelism, requests)
.await
.map_err(BoxedError::new)
}

#[tracing::instrument(skip_all)]
async fn handle_request(
&self,
Expand Down Expand Up @@ -421,6 +573,7 @@ impl MitoEngine {
config.sanitize(data_home)?;

let config = Arc::new(config);
let wal_raw_entry_reader = Arc::new(LogStoreRawEntryReader::new(log_store.clone()));
Ok(MitoEngine {
inner: Arc::new(EngineInner {
workers: WorkerGroup::start_for_test(
Expand All @@ -433,6 +586,7 @@ impl MitoEngine {
)
.await?,
config,
wal_raw_entry_reader,
}),
})
}
Expand Down
40 changes: 30 additions & 10 deletions src/mito2/src/region/opener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use std::sync::Arc;

use common_telemetry::{debug, error, info, warn};
use common_wal::options::WalOptions;
use futures::future::BoxFuture;
use futures::StreamExt;
use object_store::manager::ObjectStoreManagerRef;
use object_store::util::{join_dir, normalize_dir};
Expand Down Expand Up @@ -48,6 +49,7 @@ use crate::schedule::scheduler::SchedulerRef;
use crate::sst::file_purger::LocalFilePurger;
use crate::sst::index::intermediate::IntermediateManager;
use crate::time_provider::{StdTimeProvider, TimeProviderRef};
use crate::wal::entry_reader::WalEntryReader;
use crate::wal::{EntryId, Wal};

/// Builder to create a new [MitoRegion] or open an existing one.
Expand All @@ -64,6 +66,7 @@ pub(crate) struct RegionOpener {
intermediate_manager: IntermediateManager,
time_provider: Option<TimeProviderRef>,
stats: ManifestStats,
wal_entry_reader: Option<Box<dyn WalEntryReader>>,
}

impl RegionOpener {
Expand All @@ -89,6 +92,7 @@ impl RegionOpener {
intermediate_manager,
time_provider: None,
stats: Default::default(),
wal_entry_reader: None,
}
}

Expand All @@ -104,6 +108,12 @@ impl RegionOpener {
Ok(self)
}

/// If a [WalEntryReader] is set
pub(crate) fn wal_entry_reader(mut self, wal_entry_reader: Box<dyn WalEntryReader>) -> Self {
self.wal_entry_reader = Some(wal_entry_reader);
self
}

/// Sets options for the region.
pub(crate) fn options(mut self, options: RegionOptions) -> Self {
self.options = Some(options);
Expand Down Expand Up @@ -165,8 +175,8 @@ impl RegionOpener {
}
}
let options = self.options.take().unwrap();
let provider = self.provider(&options.wal_options);
let object_store = self.object_store(&options.storage)?.clone();
let provider = self.provider(&options.wal_options);

// Create a manifest manager for this region and writes regions to the manifest file.
let region_manifest_options = self.manifest_options(config, &options)?;
Expand Down Expand Up @@ -231,7 +241,7 @@ impl RegionOpener {
///
/// Returns error if the region doesn't exist.
pub(crate) async fn open<S: LogStore>(
self,
mut self,
config: &MitoConfig,
wal: &Wal<S>,
) -> Result<MitoRegion> {
Expand Down Expand Up @@ -267,7 +277,7 @@ impl RegionOpener {

/// Tries to open the region and returns `None` if the region directory is empty.
async fn maybe_open<S: LogStore>(
&self,
&mut self,
config: &MitoConfig,
wal: &Wal<S>,
) -> Result<Option<MitoRegion>> {
Expand All @@ -288,6 +298,11 @@ impl RegionOpener {

let region_id = self.region_id;
let provider = self.provider(&region_options.wal_options);
let wal_entry_reader = self
.wal_entry_reader
.take()
.unwrap_or_else(|| wal.wal_entry_reader(&provider, region_id));
let on_region_opened = wal.on_region_opened();
let object_store = self.object_store(&region_options.storage)?.clone();

debug!("Open region {} with options: {:?}", region_id, self.options);
Expand Down Expand Up @@ -331,12 +346,13 @@ impl RegionOpener {
region_id
);
replay_memtable(
wal,
&provider,
wal_entry_reader,
region_id,
flushed_entry_id,
&version_control,
config.allow_stale_entries,
on_region_opened,
)
.await?;
} else {
Expand All @@ -357,7 +373,7 @@ impl RegionOpener {
RegionState::ReadOnly,
)),
file_purger,
provider,
provider: provider.clone(),
last_flush_millis: AtomicI64::new(time_provider.current_time_millis()),
time_provider,
memtable_builder,
Expand Down Expand Up @@ -448,21 +464,25 @@ pub(crate) fn check_recovered_region(
}

/// Replays the mutations from WAL and inserts mutations to memtable of given region.
pub(crate) async fn replay_memtable<S: LogStore>(
wal: &Wal<S>,
pub(crate) async fn replay_memtable<F>(
provider: &Provider,
mut wal_entry_reader: Box<dyn WalEntryReader>,
region_id: RegionId,
flushed_entry_id: EntryId,
version_control: &VersionControlRef,
allow_stale_entries: bool,
) -> Result<EntryId> {
on_region_opened: F,
) -> Result<EntryId>
where
F: FnOnce(RegionId, EntryId, &Provider) -> BoxFuture<Result<()>> + Send,
{
let mut rows_replayed = 0;
// Last entry id should start from flushed entry id since there might be no
// data in the WAL.
let mut last_entry_id = flushed_entry_id;
let replay_from_entry_id = flushed_entry_id + 1;

let mut wal_stream = wal.scan(region_id, replay_from_entry_id, provider)?;
let mut wal_stream = wal_entry_reader.read(provider, replay_from_entry_id)?;
while let Some(res) = wal_stream.next().await {
let (entry_id, entry) = res?;
if entry_id <= flushed_entry_id {
Expand Down Expand Up @@ -496,7 +516,7 @@ pub(crate) async fn replay_memtable<S: LogStore>(

// TODO(weny): We need to update `flushed_entry_id` in the region manifest
// to avoid reading potentially incomplete entries in the future.
wal.obsolete(region_id, flushed_entry_id, provider).await?;
(on_region_opened)(region_id, flushed_entry_id, provider).await?;

info!(
"Replay WAL for region: {}, rows recovered: {}, last entry id: {}",
Expand Down
Loading

0 comments on commit 188f6e6

Please sign in to comment.