Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: implement the handle_batch_open_requests #4075

Merged
merged 8 commits into from
Jun 5, 2024
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
182 changes: 178 additions & 4 deletions src/mito2/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ mod append_mode_test;
#[cfg(test)]
mod basic_test;
#[cfg(test)]
mod batch_open_test;
#[cfg(test)]
mod catchup_test;
#[cfg(test)]
mod close_test;
Expand Down Expand Up @@ -50,6 +52,7 @@ mod set_readonly_test;
mod truncate_test;

use std::any::Any;
use std::collections::HashMap;
use std::sync::Arc;
use std::time::Instant;

Expand All @@ -58,22 +61,33 @@ use async_trait::async_trait;
use common_error::ext::BoxedError;
use common_recordbatch::SendableRecordBatchStream;
use common_telemetry::tracing;
use common_wal::options::{WalOptions, WAL_OPTIONS_KEY};
use futures::future::{join_all, try_join_all};
use object_store::manager::ObjectStoreManagerRef;
use snafu::{ensure, OptionExt, ResultExt};
use store_api::logstore::provider::Provider;
use store_api::logstore::LogStore;
use store_api::metadata::RegionMetadataRef;
use store_api::region_engine::{RegionEngine, RegionRole, RegionScannerRef, SetReadonlyResponse};
use store_api::region_request::{AffectedRows, RegionRequest};
use store_api::region_engine::{
BatchResponses, RegionEngine, RegionRole, RegionScannerRef, SetReadonlyResponse,
};
use store_api::region_request::{AffectedRows, RegionOpenRequest, RegionRequest};
use store_api::storage::{RegionId, ScanRequest};
use tokio::sync::oneshot;
use tokio::sync::{oneshot, Semaphore};

use crate::config::MitoConfig;
use crate::error::{InvalidRequestSnafu, RecvSnafu, RegionNotFoundSnafu, Result};
use crate::error::{
InvalidRequestSnafu, JoinSnafu, RecvSnafu, RegionNotFoundSnafu, Result, SerdeJsonSnafu,
};
use crate::manifest::action::RegionEdit;
use crate::metrics::HANDLE_REQUEST_ELAPSED;
use crate::read::scan_region::{ScanParallism, ScanRegion, Scanner};
use crate::region::RegionUsage;
use crate::request::WorkerRequest;
use crate::wal::entry_distributor::{
build_wal_entry_distributor_and_receivers, DEFAULT_ENTRY_RECEIVER_BUFFER_SIZE,
};
use crate::wal::raw_entry_reader::{LogStoreRawEntryReader, RawEntryReader};
use crate::worker::WorkerGroup;

pub const MITO_ENGINE_NAME: &str = "mito";
Expand Down Expand Up @@ -211,6 +225,41 @@ struct EngineInner {
workers: WorkerGroup,
/// Config of the engine.
config: Arc<MitoConfig>,
/// The Wal raw entry reader.
wal_raw_entry_reader: Arc<dyn RawEntryReader>,
}

type TopicGroupedRegionOpenRequests = HashMap<String, Vec<(RegionId, RegionOpenRequest)>>;

/// Returns requests([TopicGroupedRegionOpenRequests]) grouped by topic and remaining requests.
fn prepare_batch_open_requests(
WenyXu marked this conversation as resolved.
Show resolved Hide resolved
requests: Vec<(RegionId, RegionOpenRequest)>,
) -> Result<(
TopicGroupedRegionOpenRequests,
Vec<(RegionId, RegionOpenRequest)>,
)> {
let mut topic_to_regions: HashMap<String, Vec<(RegionId, RegionOpenRequest)>> = HashMap::new();
let mut remaining_regions: Vec<(RegionId, RegionOpenRequest)> = Vec::new();
for (region_id, request) in requests {
let options = if let Some(options) = request.options.get(WAL_OPTIONS_KEY) {
serde_json::from_str(options).context(SerdeJsonSnafu)?
} else {
WalOptions::RaftEngine
};
match options {
WalOptions::Kafka(options) => {
topic_to_regions
.entry(options.topic)
.or_default()
.push((region_id, request));
}
WalOptions::RaftEngine => {
remaining_regions.push((region_id, request));
}
}
}

Ok((topic_to_regions, remaining_regions))
}

impl EngineInner {
Expand All @@ -221,9 +270,11 @@ impl EngineInner {
object_store_manager: ObjectStoreManagerRef,
) -> Result<EngineInner> {
let config = Arc::new(config);
let wal_raw_entry_reader = Arc::new(LogStoreRawEntryReader::new(log_store.clone()));
Ok(EngineInner {
workers: WorkerGroup::start(config.clone(), log_store, object_store_manager).await?,
config,
wal_raw_entry_reader,
})
}

Expand All @@ -244,6 +295,103 @@ impl EngineInner {
Ok(region.metadata())
}

async fn open_topic_regions(
&self,
topic: String,
region_requests: Vec<(RegionId, RegionOpenRequest)>,
) -> Result<Vec<(RegionId, Result<AffectedRows>)>> {
let mut responses = Vec::with_capacity(region_requests.len());
let region_ids = region_requests
.iter()
.map(|(region_id, _)| *region_id)
.collect();
let provider = Provider::kafka_provider(topic);
let (distributor, entry_receivers) = build_wal_entry_distributor_and_receivers(
provider,
self.wal_raw_entry_reader.clone(),
region_ids,
DEFAULT_ENTRY_RECEIVER_BUFFER_SIZE,
);

for ((region_id, request), entry_receiver) in
region_requests.into_iter().zip(entry_receivers)
{
let (request, receiver) =
WorkerRequest::new_open_region_request(region_id, request, Some(entry_receiver));
self.workers.submit_to_worker(region_id, request).await?;
responses.push((region_id, async move { receiver.await.context(RecvSnafu)? }));
}

// Waits for entries distribution.
let distribution =
common_runtime::spawn_read(async move { distributor.distribute().await });

// Waits for worker returns.
let responses = join_all(
responses
.into_iter()
.map(|(region_id, rows)| async move { (region_id, rows.await) }),
)
.await;

distribution.await.context(JoinSnafu)??;
Ok(responses)
}

async fn handle_batch_open_requests(
&self,
parallelism: usize,
requests: Vec<(RegionId, RegionOpenRequest)>,
) -> Result<Vec<(RegionId, Result<AffectedRows>)>> {
let semaphore = Arc::new(Semaphore::new(parallelism));
let (topic_to_region_requests, remaining_region_requests) =
prepare_batch_open_requests(requests)?;
let mut responses =
Vec::with_capacity(topic_to_region_requests.len() + remaining_region_requests.len());

if !topic_to_region_requests.is_empty() {
let mut tasks = Vec::with_capacity(topic_to_region_requests.len());
for (topic, region_requests) in topic_to_region_requests {
let semaphore_moved = semaphore.clone();
tasks.push(async move {
// Safety: semaphore must exist
let _permit = semaphore_moved.acquire().await.unwrap();
self.open_topic_regions(topic, region_requests).await
})
}
let r = try_join_all(tasks).await?;
responses.extend(r.into_iter().flatten());
}

if !remaining_region_requests.is_empty() {
let mut tasks = Vec::with_capacity(remaining_region_requests.len());
for (region_id, request) in remaining_region_requests {
let semaphore_moved = semaphore.clone();
tasks.push((region_id, async move {
// Safety: semaphore must exist
let _permit = semaphore_moved.acquire().await.unwrap();
let (request, receiver) =
WorkerRequest::new_open_region_request(region_id, request, None);

self.workers.submit_to_worker(region_id, request).await?;

receiver.await.context(RecvSnafu)?
WenyXu marked this conversation as resolved.
Show resolved Hide resolved
}))
}

let r = join_all(
tasks
.into_iter()
.map(|(region_id, rows)| async move { (region_id, rows.await) }),
)
.await;

responses.extend(r);
}

Ok(responses)
}

/// Handles [RegionRequest] and return its executed result.
async fn handle_request(
&self,
Expand Down Expand Up @@ -323,6 +471,30 @@ impl RegionEngine for MitoEngine {
MITO_ENGINE_NAME
}

#[tracing::instrument(skip_all)]
async fn handle_batch_open_requests(
&self,
parallelism: usize,
requests: Vec<(RegionId, RegionOpenRequest)>,
) -> Result<BatchResponses, BoxedError> {
// TODO(weny): add metrics.
self.inner
.handle_batch_open_requests(parallelism, requests)
.await
.map(|responses| {
responses
.into_iter()
.map(|(region_id, response)| {
(
region_id,
response.map(RegionResponse::new).map_err(BoxedError::new),
)
})
.collect::<Vec<_>>()
})
.map_err(BoxedError::new)
}

#[tracing::instrument(skip_all)]
async fn handle_request(
&self,
Expand Down Expand Up @@ -421,6 +593,7 @@ impl MitoEngine {
config.sanitize(data_home)?;

let config = Arc::new(config);
let wal_raw_entry_reader = Arc::new(LogStoreRawEntryReader::new(log_store.clone()));
Ok(MitoEngine {
inner: Arc::new(EngineInner {
workers: WorkerGroup::start_for_test(
Expand All @@ -433,6 +606,7 @@ impl MitoEngine {
)
.await?,
config,
wal_raw_entry_reader,
}),
})
}
Expand Down
Loading