Skip to content

Commit

Permalink
feat(remote_wal): introduce wal meta to organize wal codes
Browse files Browse the repository at this point in the history
  • Loading branch information
niebayes committed Nov 23, 2023
1 parent c328c43 commit e77f0fa
Show file tree
Hide file tree
Showing 14 changed files with 184 additions and 54 deletions.
2 changes: 1 addition & 1 deletion src/cmd/src/options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ mod tests {
Some("mybucket"),
),
(
// wal.dir = /other/wal/dir
// wal.raft_engine_opts.dir = /other/wal/dir
[
env_prefix.to_string(),
"wal".to_uppercase(),
Expand Down
5 changes: 2 additions & 3 deletions src/common/meta/src/ddl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,14 @@ use crate::error::Result;
use crate::key::TableMetadataManagerRef;
use crate::rpc::ddl::{SubmitDdlTaskRequest, SubmitDdlTaskResponse};
use crate::rpc::router::RegionRoute;
use crate::wal::meta::WalMeta;

pub mod alter_table;
pub mod create_table;
pub mod drop_table;
pub mod truncate_table;
pub mod utils;

use crate::wal::kafka::KafkaTopic;

#[derive(Debug, Default)]
pub struct ExecutorContext {
pub cluster_id: Option<u64>,
Expand All @@ -58,7 +57,7 @@ pub struct TableMetadataAllocatorContext {
pub struct TableMetadata {
pub table_id: TableId,
pub region_routes: Vec<RegionRoute>,
pub region_topics: Option<Vec<KafkaTopic>>,
pub wal_meta: WalMeta,
}

#[async_trait::async_trait]
Expand Down
29 changes: 16 additions & 13 deletions src/common/meta/src/ddl/create_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,11 @@ use crate::key::table_name::TableNameKey;
use crate::metrics;
use crate::rpc::ddl::CreateTableTask;
use crate::rpc::router::{find_leader_regions, find_leaders, RegionRoute};
use crate::wal::kafka::KafkaTopic;
use crate::wal::meta::WalMeta;

const TOPIC_KEY: &str = "kafka_topic";
// TODO(niebayes): remove `WAL_PROVIDER_KEY` since there's no need to store wal provider in the region options any more.
pub const WAL_PROVIDER_KEY: &str = "wal_provider";
pub const TOPIC_KEY: &str = "kafka_topic";

pub struct CreateTableProcedure {
pub context: DdlContext,
Expand All @@ -53,12 +55,12 @@ impl CreateTableProcedure {
cluster_id: u64,
task: CreateTableTask,
region_routes: Vec<RegionRoute>,
region_topics: Option<Vec<KafkaTopic>>,
wal_meta: WalMeta,
context: DdlContext,
) -> Self {
Self {
context,
creator: TableCreator::new(cluster_id, task, region_routes, region_topics),
creator: TableCreator::new(cluster_id, task, region_routes, wal_meta),
}
}

Expand Down Expand Up @@ -174,7 +176,7 @@ impl CreateTableProcedure {
pub async fn on_datanode_create_regions(&mut self) -> Result<Status> {
let create_table_data = &self.creator.data;
let region_routes = &create_table_data.region_routes;
let region_topics = create_table_data.region_topics.as_ref();
let region_topics = &create_table_data.wal_meta.region_topics;

let create_table_expr = &create_table_data.task.create_table;
let catalog = &create_table_expr.catalog_name;
Expand All @@ -190,6 +192,11 @@ impl CreateTableProcedure {
let requester = self.context.datanode_manager.datanode(&datanode).await;

let regions = find_leader_regions(region_routes, &datanode);
// Safety: `TableMetadataAllocator` ensures the region routes and topics are of the same length.
// Besides, `find_leader_regions` may filter out some regions. Therefore, the following condition must be met
// and the indexing on `region_topics` is safe definitely.
assert!(regions.len() <= region_topics.len());

let requests = regions
.iter()
.enumerate()
Expand All @@ -200,11 +207,7 @@ impl CreateTableProcedure {
create_region_request.region_id = region_id.as_u64();
create_region_request.path = storage_path.clone();

if let Some(region_topics) = region_topics {
// Safety: `TableMetadataAllocator` ensures the region routes and topics are of the same length.
// and hence the following indexing operation is safe.
assert_eq!(region_routes.len(), region_topics.len());

if !region_topics.is_empty() {
create_region_request
.options
.insert(TOPIC_KEY.to_string(), region_topics[i].clone());
Expand Down Expand Up @@ -306,15 +309,15 @@ impl TableCreator {
cluster_id: u64,
task: CreateTableTask,
region_routes: Vec<RegionRoute>,
region_topics: Option<Vec<KafkaTopic>>,
wal_meta: WalMeta,
) -> Self {
Self {
data: CreateTableData {
state: CreateTableState::Prepare,
cluster_id,
task,
region_routes,
region_topics,
wal_meta,
},
}
}
Expand All @@ -336,7 +339,7 @@ pub struct CreateTableData {
pub task: CreateTableTask,
pub cluster_id: u64,
pub region_routes: Vec<RegionRoute>,
pub region_topics: Option<Vec<KafkaTopic>>,
pub wal_meta: WalMeta,
}

impl CreateTableData {
Expand Down
10 changes: 5 additions & 5 deletions src/common/meta/src/ddl_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ use crate::rpc::ddl::{
TruncateTableTask,
};
use crate::rpc::router::RegionRoute;
use crate::wal::kafka::KafkaTopic;
use crate::wal::meta::WalMeta;

pub type DdlManagerRef = Arc<DdlManager>;

Expand Down Expand Up @@ -165,15 +165,15 @@ impl DdlManager {
cluster_id: u64,
create_table_task: CreateTableTask,
region_routes: Vec<RegionRoute>,
region_topics: Option<Vec<KafkaTopic>>,
wal_meta: WalMeta,
) -> Result<ProcedureId> {
let context = self.create_context();

let procedure = CreateTableProcedure::new(
cluster_id,
create_table_task,
region_routes,
region_topics,
wal_meta,
context,
);

Expand Down Expand Up @@ -379,11 +379,11 @@ async fn handle_create_table_task(
let TableMetadata {
table_id,
region_routes,
region_topics,
wal_meta,
} = table_metadata;

let id = ddl_manager
.submit_create_table_task(cluster_id, create_table_task, region_routes, region_topics)
.submit_create_table_task(cluster_id, create_table_task, region_routes, wal_meta)
.await?;

info!("Table: {table_id:?} is created via procedure_id {id:?}");
Expand Down
10 changes: 7 additions & 3 deletions src/common/meta/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -324,6 +324,9 @@ pub enum Error {
#[snafu(source)]
error: RsKafkaError,
},

#[snafu(display("Missing required Kafka topic manager"))]
MissingKafkaTopicManager { location: Location },
}

pub type Result<T> = std::result::Result<T, Error>;
Expand Down Expand Up @@ -382,14 +385,15 @@ impl ErrorExt for Error {
InvalidCatalogValue { source, .. } => source.status_code(),
ConvertAlterTableRequest { source, .. } => source.status_code(),

Error::MissingKafkaOpts { .. }
| Error::DeserKafkaTopics { .. }
Error::DeserKafkaTopics { .. }
| Error::SerKafkaTopics { .. }
| Error::InvalidNumTopics { .. }
| Error::BuildKafkaClient { .. }
| Error::BuildKafkaCtrlClient { .. }
| Error::TooManyCreatedKafkaTopics { .. }
| Error::CreateKafkaTopic { .. } => StatusCode::Unexpected,
| Error::CreateKafkaTopic { .. }
| Error::MissingKafkaOpts { .. }
| Error::MissingKafkaTopicManager { .. } => StatusCode::Unexpected,
}
}

Expand Down
4 changes: 3 additions & 1 deletion src/common/meta/src/wal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,15 @@
// limitations under the License.

pub mod kafka;
pub mod meta;

use serde::{Deserialize, Serialize};

use crate::wal::kafka::KafkaOptions;

#[derive(Debug, Clone, Deserialize, Serialize, PartialEq)]
#[derive(Debug, Clone, Deserialize, Serialize, PartialEq, Default)]
pub enum WalProvider {
#[default]
RaftEngine,
Kafka,
}
Expand Down
4 changes: 2 additions & 2 deletions src/common/meta/src/wal/kafka/topic_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,8 @@ impl TopicManager {
})
}

pub fn select_topics(&self, num_regions: usize) -> Vec<Topic> {
(0..num_regions)
pub fn select_topics(&self, num_topics: usize) -> Vec<Topic> {
(0..num_topics)
.map(|_| self.topic_selector.select(&self.topic_pool))
.collect()
}
Expand Down
82 changes: 82 additions & 0 deletions src/common/meta/src/wal/meta.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

pub mod demand_builder;

pub use demand_builder::WalMetaDemandBuilder;
use serde::{Deserialize, Serialize};
use snafu::OptionExt;

use crate::error::{MissingKafkaTopicManagerSnafu, Result};
use crate::kv_backend::KvBackendRef;
use crate::wal::kafka::{KafkaTopic as Topic, KafkaTopicManager as TopicManager};
use crate::wal::{WalOptions, WalProvider};

/// Wal metadata allocated to a table.
#[derive(Debug, Serialize, Deserialize, Default)]
pub struct WalMeta {
pub region_topics: Vec<Topic>,
}

/// The allocator user shall state what wal meta it demands.
pub struct WalMetaDemand {
pub num_topics: Option<usize>,
}

/// The allocator responsible for allocating wal metadata for a table.
#[derive(Default)]
pub struct WalMetaAllocator {
wal_provider: WalProvider,
topic_manager: Option<TopicManager>,
}

impl WalMetaAllocator {
pub async fn try_new(wal_opts: &WalOptions, kv_backend: &KvBackendRef) -> Result<Self> {
let mut this = Self {
wal_provider: wal_opts.provider.clone(),
..Default::default()
};

match this.wal_provider {
WalProvider::RaftEngine => {}
WalProvider::Kafka => {
let topic_manager =
TopicManager::try_new(wal_opts.kafka_opts.as_ref(), kv_backend).await?;
this.topic_manager = Some(topic_manager);
}
}

Ok(this)
}

/// Allocate wal meta according to the wal provider.
pub async fn try_alloc(&self, demand: WalMetaDemand) -> Result<WalMeta> {
let mut allocated = WalMeta::default();

if let Some(num_topics) = demand.num_topics.as_ref() {
allocated.region_topics = self.try_alloc_topics(*num_topics).await?;
}

Ok(allocated)
}

pub async fn try_alloc_topics(&self, num_topics: usize) -> Result<Vec<Topic>> {
let topics = self
.topic_manager
.as_ref()
.context(MissingKafkaTopicManagerSnafu)?
.select_topics(num_topics);
Ok(topics)
}
}
37 changes: 37 additions & 0 deletions src/common/meta/src/wal/meta/demand_builder.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use crate::wal::meta::WalMetaDemand;

#[derive(Default)]
pub struct WalMetaDemandBuilder {
num_topics: Option<usize>,
}

impl WalMetaDemandBuilder {
pub fn new() -> Self {
Self::default()
}

pub fn with_num_topics(&mut self, num_topics: usize) -> &mut Self {
self.num_topics = Some(num_topics);
self
}

pub fn build(&self) -> WalMetaDemand {
WalMetaDemand {
num_topics: self.num_topics,
}
}
}
3 changes: 2 additions & 1 deletion src/frontend/src/instance/standalone.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use common_meta::kv_backend::KvBackendRef;
use common_meta::peer::Peer;
use common_meta::rpc::router::{Region, RegionRoute};
use common_meta::sequence::{Sequence, SequenceRef};
use common_meta::wal::meta::WalMeta;
use common_recordbatch::SendableRecordBatchStream;
use common_telemetry::tracing;
use common_telemetry::tracing_context::{FutureExt, TracingContext};
Expand Down Expand Up @@ -152,7 +153,7 @@ impl TableMetadataAllocator for StandaloneTableMetadataCreator {
Ok(TableMetadata {
table_id,
region_routes,
region_topics: None,
wal_meta: WalMeta::default(),
})
}
}
9 changes: 8 additions & 1 deletion src/meta-srv/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -579,6 +579,12 @@ pub enum Error {
source: common_meta::error::Error,
location: Location,
},

#[snafu(display("Failed to build a wal meta allocator"))]
BuildWalMetaAllocator {
source: common_meta::error::Error,
location: Location,
},
}

impl Error {
Expand Down Expand Up @@ -693,7 +699,8 @@ impl ErrorExt for Error {

Error::InitMetadata { source, .. } => source.status_code(),

Error::BuildKafkaTopicManager { source, .. } => source.status_code(),
Error::BuildKafkaTopicManager { source, .. }
| Error::BuildWalMetaAllocator { source, .. } => source.status_code(),

Error::Other { source, .. } => source.status_code(),
}
Expand Down
Loading

0 comments on commit e77f0fa

Please sign in to comment.