Skip to content

Commit

Permalink
feat: implement the handle_batch_open_requests
Browse files Browse the repository at this point in the history
  • Loading branch information
WenyXu committed Jun 4, 2024
1 parent c0aed1d commit 4345b98
Show file tree
Hide file tree
Showing 11 changed files with 328 additions and 41 deletions.
184 changes: 180 additions & 4 deletions src/mito2/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ mod set_readonly_test;
mod truncate_test;

use std::any::Any;
use std::collections::HashMap;
use std::sync::Arc;
use std::time::Instant;

Expand All @@ -58,22 +59,31 @@ use async_trait::async_trait;
use common_error::ext::BoxedError;
use common_recordbatch::SendableRecordBatchStream;
use common_telemetry::tracing;
use common_wal::options::{WalOptions, WAL_OPTIONS_KEY};
use futures::future::{join_all, try_join_all};
use object_store::manager::ObjectStoreManagerRef;
use snafu::{ensure, OptionExt, ResultExt};
use store_api::logstore::provider::Provider;
use store_api::logstore::LogStore;
use store_api::metadata::RegionMetadataRef;
use store_api::region_engine::{RegionEngine, RegionRole, RegionScannerRef, SetReadonlyResponse};
use store_api::region_request::{AffectedRows, RegionRequest};
use store_api::region_engine::{
BatchResponses, RegionEngine, RegionRole, RegionScannerRef, SetReadonlyResponse,
};
use store_api::region_request::{AffectedRows, RegionOpenRequest, RegionRequest};
use store_api::storage::{RegionId, ScanRequest};
use tokio::sync::oneshot;
use tokio::sync::{oneshot, Semaphore};

use crate::config::MitoConfig;
use crate::error::{InvalidRequestSnafu, RecvSnafu, RegionNotFoundSnafu, Result};
use crate::error::{
InvalidRequestSnafu, JoinSnafu, RecvSnafu, RegionNotFoundSnafu, Result, SerdeJsonSnafu,
};
use crate::manifest::action::RegionEdit;
use crate::metrics::HANDLE_REQUEST_ELAPSED;
use crate::read::scan_region::{ScanParallism, ScanRegion, Scanner};
use crate::region::RegionUsage;
use crate::request::WorkerRequest;
use crate::wal::entry_distributor::build_wal_entry_distributor_and_receivers;
use crate::wal::raw_entry_reader::{LogStoreRawEntryReader, RawEntryReader};
use crate::worker::WorkerGroup;

pub const MITO_ENGINE_NAME: &str = "mito";
Expand Down Expand Up @@ -211,6 +221,45 @@ struct EngineInner {
workers: WorkerGroup,
/// Config of the engine.
config: Arc<MitoConfig>,
/// The Wal raw entry reader.
wal_raw_entry_reader: Arc<dyn RawEntryReader>,
}

type TopicGroupedRegionOpenRequests = HashMap<String, Vec<(RegionId, RegionOpenRequest)>>;

/// **For Kafka Remote Wal:**
/// Group requests by topic.
///
/// **For RaftEngine Wal:**
/// Do nothing
fn prepare_batch_open_requests(
requests: Vec<(RegionId, RegionOpenRequest)>,
) -> Result<(
TopicGroupedRegionOpenRequests,
Vec<(RegionId, RegionOpenRequest)>,
)> {
let mut topic_to_regions: HashMap<String, Vec<(RegionId, RegionOpenRequest)>> = HashMap::new();
let mut remaining_regions: Vec<(RegionId, RegionOpenRequest)> = Vec::new();
for (region_id, request) in requests {
let options = if let Some(options) = request.options.get(WAL_OPTIONS_KEY) {
serde_json::from_str(options).context(SerdeJsonSnafu)?
} else {
WalOptions::RaftEngine
};
match options {
WalOptions::Kafka(options) => {
topic_to_regions
.entry(options.topic)
.or_default()
.push((region_id, request));
}
WalOptions::RaftEngine => {
remaining_regions.push((region_id, request));
}
}
}

Ok((topic_to_regions, remaining_regions))
}

impl EngineInner {
Expand All @@ -221,9 +270,11 @@ impl EngineInner {
object_store_manager: ObjectStoreManagerRef,
) -> Result<EngineInner> {
let config = Arc::new(config);
let wal_raw_entry_reader = Arc::new(LogStoreRawEntryReader::new(log_store.clone()));
Ok(EngineInner {
workers: WorkerGroup::start(config.clone(), log_store, object_store_manager).await?,
config,
wal_raw_entry_reader,
})
}

Expand All @@ -244,6 +295,106 @@ impl EngineInner {
Ok(region.metadata())
}

async fn open_topic_regions(
&self,
topic: String,
region_requests: Vec<(RegionId, RegionOpenRequest)>,
) -> Result<Vec<Result<(RegionId, AffectedRows)>>> {
let mut response_receivers = Vec::with_capacity(region_requests.len());
let region_ids = region_requests
.iter()
.map(|(region_id, _)| *region_id)
.collect();
let provider = Provider::kafka_provider(topic);
let (distributor, entry_receivers) = build_wal_entry_distributor_and_receivers(
provider,
self.wal_raw_entry_reader.clone(),
region_ids,
2048,
);

for (region_id, request, receiver) in region_requests.into_iter().zip(entry_receivers).map(
|((region_id, request), entry_receiver)| {
let (request, receiver) = WorkerRequest::new_open_region_request(
region_id,
request,
Some(entry_receiver),
);

(region_id, request, receiver)
},
) {
self.workers.submit_to_worker(region_id, request).await?;
response_receivers.push((region_id, receiver));
}
// Waits for entries distribution.
let distribution =
common_runtime::spawn_read(async move { distributor.distribute().await });

// Waits for worker returns.
let responses = join_all(response_receivers.into_iter().map(
|(region_id, receiver)| async move {
receiver
.await
.context(RecvSnafu)?
.map(|response| (region_id, response))
},
))
.await;

let _ = distribution.await.context(JoinSnafu)?;
Ok(responses)
}

async fn handle_batch_open_requests(
&self,
parallelism: usize,
requests: Vec<(RegionId, RegionOpenRequest)>,
) -> Result<Vec<Result<(RegionId, AffectedRows)>>> {
let semaphore = Arc::new(Semaphore::new(parallelism));
let (topic_to_region_requests, remaining_region_requests) =
prepare_batch_open_requests(requests)?;
let mut responses =
Vec::with_capacity(topic_to_region_requests.len() + remaining_region_requests.len());

if !topic_to_region_requests.is_empty() {
let mut tasks = Vec::with_capacity(topic_to_region_requests.len());
for (topic, region_requests) in topic_to_region_requests {
let self_ref = self;
let semaphore_moved = semaphore.clone();
tasks.push(async move {
// Safety: semaphore must exist
let _permit = semaphore_moved.acquire().await.unwrap();
self_ref.open_topic_regions(topic, region_requests).await
})
}
let r = try_join_all(tasks).await?;
responses.extend(r.into_iter().flatten());
}

if !remaining_region_requests.is_empty() {
let mut tasks = Vec::with_capacity(remaining_region_requests.len());
for (region_id, request) in remaining_region_requests {
tasks.push(async move {
let (request, receiver) =
WorkerRequest::new_open_region_request(region_id, request, None);

self.workers.submit_to_worker(region_id, request).await?;

receiver
.await
.context(RecvSnafu)?
.map(|response| (region_id, response))
})
}

let r = join_all(tasks).await;
responses.extend(r);
}

Ok(responses)
}

/// Handles [RegionRequest] and return its executed result.
async fn handle_request(
&self,
Expand Down Expand Up @@ -323,6 +474,29 @@ impl RegionEngine for MitoEngine {
MITO_ENGINE_NAME
}

#[tracing::instrument(skip_all)]
async fn handle_batch_open_requests(
&self,
parallelism: usize,
requests: Vec<(RegionId, RegionOpenRequest)>,
) -> Result<BatchResponses, BoxedError> {
// TODO(weny): add metrics.
self.inner
.handle_batch_open_requests(parallelism, requests)
.await
.map(|responses| {
responses
.into_iter()
.map(|response| {
response
.map(|(region_id, row)| (region_id, RegionResponse::new(row)))
.map_err(BoxedError::new)
})
.collect::<Vec<_>>()
})
.map_err(BoxedError::new)
}

#[tracing::instrument(skip_all)]
async fn handle_request(
&self,
Expand Down Expand Up @@ -421,6 +595,7 @@ impl MitoEngine {
config.sanitize(data_home)?;

let config = Arc::new(config);
let wal_raw_entry_reader = Arc::new(LogStoreRawEntryReader::new(log_store.clone()));
Ok(MitoEngine {
inner: Arc::new(EngineInner {
workers: WorkerGroup::start_for_test(
Expand All @@ -433,6 +608,7 @@ impl MitoEngine {
)
.await?,
config,
wal_raw_entry_reader,
}),
})
}
Expand Down
Loading

0 comments on commit 4345b98

Please sign in to comment.