Skip to content

Commit

Permalink
refactor: refactor wal options
Browse files Browse the repository at this point in the history
  • Loading branch information
niebayes committed Dec 15, 2023
1 parent 90d3069 commit f6e019a
Show file tree
Hide file tree
Showing 15 changed files with 113 additions and 111 deletions.
4 changes: 2 additions & 2 deletions src/cmd/src/cli/bench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use common_meta::kv_backend::etcd::EtcdStore;
use common_meta::peer::Peer;
use common_meta::rpc::router::{Region, RegionRoute};
use common_meta::table_name::TableName;
use common_meta::wal::region_wal_options::EncodedRegionWalOptions;
use common_meta::wal::EncodedWalOptions;
use common_telemetry::info;
use datatypes::data_type::ConcreteDataType;
use datatypes::schema::{ColumnSchema, RawSchema};
Expand Down Expand Up @@ -165,7 +165,7 @@ fn create_region_routes(regions: Vec<RegionNumber>) -> Vec<RegionRoute> {

fn create_region_wal_options_map(
regions: Vec<RegionNumber>,
) -> HashMap<RegionNumber, EncodedRegionWalOptions> {
) -> HashMap<RegionNumber, EncodedWalOptions> {
// TODO(niebayes): construct region wal options for benchmark.
let _ = regions;
HashMap::default()
Expand Down
4 changes: 2 additions & 2 deletions src/common/meta/src/ddl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use crate::key::TableMetadataManagerRef;
use crate::region_keeper::MemoryRegionKeeperRef;
use crate::rpc::ddl::{SubmitDdlTaskRequest, SubmitDdlTaskResponse};
use crate::rpc::router::RegionRoute;
use crate::wal::region_wal_options::RegionWalOptionsMap;
use crate::wal::WalOptionsMap;

pub mod alter_table;
pub mod create_table;
Expand Down Expand Up @@ -58,7 +58,7 @@ pub struct TableMetadataAllocatorContext {
pub struct TableMetadata {
pub table_id: TableId,
pub region_routes: Vec<RegionRoute>,
pub region_wal_options_map: RegionWalOptionsMap,
pub region_wal_options_map: WalOptionsMap,
}

#[async_trait::async_trait]
Expand Down
10 changes: 5 additions & 5 deletions src/common/meta/src/ddl/create_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ use crate::rpc::ddl::CreateTableTask;
use crate::rpc::router::{
find_leader_regions, find_leaders, operating_leader_regions, RegionRoute,
};
use crate::wal::region_wal_options::{EncodedRegionWalOptions, RegionWalOptionsMap};
use crate::wal::{EncodedWalOptions, WalOptionsMap};

pub struct CreateTableProcedure {
pub context: DdlContext,
Expand All @@ -59,7 +59,7 @@ impl CreateTableProcedure {
cluster_id: u64,
task: CreateTableTask,
region_routes: Vec<RegionRoute>,
region_wal_options_map: RegionWalOptionsMap,
region_wal_options_map: WalOptionsMap,
context: DdlContext,
) -> Self {
Self {
Expand Down Expand Up @@ -96,7 +96,7 @@ impl CreateTableProcedure {
&self.creator.data.region_routes
}

pub fn region_wal_options_map(&self) -> &HashMap<RegionNumber, EncodedRegionWalOptions> {
pub fn region_wal_options_map(&self) -> &HashMap<RegionNumber, EncodedWalOptions> {
&self.creator.data.region_wal_options_map
}

Expand Down Expand Up @@ -328,7 +328,7 @@ impl TableCreator {
cluster_id: u64,
task: CreateTableTask,
region_routes: Vec<RegionRoute>,
region_wal_options_map: RegionWalOptionsMap,
region_wal_options_map: WalOptionsMap,
) -> Self {
let region_wal_options_map = region_wal_options_map
.into_iter()
Expand Down Expand Up @@ -390,7 +390,7 @@ pub struct CreateTableData {
pub state: CreateTableState,
pub task: CreateTableTask,
pub region_routes: Vec<RegionRoute>,
pub region_wal_options_map: HashMap<RegionNumber, EncodedRegionWalOptions>,
pub region_wal_options_map: HashMap<RegionNumber, EncodedWalOptions>,
pub cluster_id: u64,
}

Expand Down
4 changes: 2 additions & 2 deletions src/common/meta/src/ddl_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ use crate::rpc::ddl::{
TruncateTableTask,
};
use crate::rpc::router::RegionRoute;
use crate::wal::region_wal_options::RegionWalOptionsMap;
use crate::wal::WalOptionsMap;
pub type DdlManagerRef = Arc<DdlManager>;

/// The [DdlManager] provides the ability to execute Ddl.
Expand Down Expand Up @@ -177,7 +177,7 @@ impl DdlManager {
cluster_id: u64,
create_table_task: CreateTableTask,
region_routes: Vec<RegionRoute>,
region_wal_options_map: RegionWalOptionsMap,
region_wal_options_map: WalOptionsMap,
) -> Result<ProcedureId> {
let context = self.create_context();

Expand Down
4 changes: 2 additions & 2 deletions src/common/meta/src/key.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ use crate::error::{self, Result, SerdeJsonSnafu};
use crate::kv_backend::txn::Txn;
use crate::kv_backend::KvBackendRef;
use crate::rpc::router::{region_distribution, RegionRoute, RegionStatus};
use crate::wal::region_wal_options::EncodedRegionWalOptions;
use crate::wal::EncodedWalOptions;
use crate::DatanodeId;

pub const REMOVED_PREFIX: &str = "__removed";
Expand Down Expand Up @@ -365,7 +365,7 @@ impl TableMetadataManager {
&self,
mut table_info: RawTableInfo,
region_routes: Vec<RegionRoute>,
region_wal_options_map: HashMap<RegionNumber, EncodedRegionWalOptions>,
region_wal_options_map: HashMap<RegionNumber, EncodedWalOptions>,
) -> Result<()> {
let region_numbers = region_routes
.iter()
Expand Down
8 changes: 4 additions & 4 deletions src/common/meta/src/key/datanode_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ use crate::kv_backend::KvBackendRef;
use crate::range_stream::{PaginationStream, DEFAULT_PAGE_SIZE};
use crate::rpc::store::RangeRequest;
use crate::rpc::KeyValue;
use crate::wal::region_wal_options::EncodedRegionWalOptions;
use crate::wal::EncodedWalOptions;
use crate::DatanodeId;

#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Default)]
Expand Down Expand Up @@ -105,7 +105,7 @@ pub struct DatanodeTableValue {
#[serde(flatten)]
pub region_info: RegionInfo,
#[serde(default)]
pub region_wal_options_map: HashMap<RegionNumber, EncodedRegionWalOptions>,
pub region_wal_options_map: HashMap<RegionNumber, EncodedWalOptions>,
version: u64,
}

Expand All @@ -114,7 +114,7 @@ impl DatanodeTableValue {
table_id: TableId,
regions: Vec<RegionNumber>,
region_info: RegionInfo,
region_wal_options_map: HashMap<RegionNumber, EncodedRegionWalOptions>,
region_wal_options_map: HashMap<RegionNumber, EncodedWalOptions>,
) -> Self {
Self {
table_id,
Expand Down Expand Up @@ -174,7 +174,7 @@ impl DatanodeTableManager {
engine: &str,
region_storage_path: &str,
region_options: HashMap<String, String>,
region_wal_options_map: HashMap<RegionNumber, EncodedRegionWalOptions>,
region_wal_options_map: HashMap<RegionNumber, EncodedWalOptions>,
distribution: RegionDistribution,
) -> Result<Txn> {
let txns = distribution
Expand Down
37 changes: 34 additions & 3 deletions src/common/meta/src/wal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,18 @@
// limitations under the License.

pub mod kafka;
pub mod region_wal_options;
pub mod options_allocator;

use std::default;
use std::collections::HashMap;

use serde::{Deserialize, Serialize};
use store_api::storage::RegionNumber;

pub use crate::wal::kafka::KafkaConfig;
use crate::error::Result;
use crate::wal::kafka::{KafkaConfig, KafkaWalOptions};
pub use crate::wal::options_allocator::WalOptionsAllocator;

/// Wal configurations for bootstraping meta srv.

Check warning on line 27 in src/common/meta/src/wal.rs

View workflow job for this annotation

GitHub Actions / Spell Check with Typos

"bootstraping" should be "bootstrapping".

Check warning on line 27 in src/common/meta/src/wal.rs

View workflow job for this annotation

GitHub Actions / Spell Check with Typos

"bootstraping" should be "bootstrapping".
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize, Default)]
#[serde(tag = "provider")]
pub enum WalConfig {
Expand All @@ -30,3 +34,30 @@ pub enum WalConfig {
#[serde(rename = "kafka")]
Kafka(KafkaConfig),
}

/// Wal options for a region.
#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Default)]
pub enum WalOptions {
#[default]
RaftEngine,
Kafka(KafkaWalOptions),
}

// TODO(niebayes): determine how to encode wal options.
pub type WalOptionsMap = HashMap<RegionNumber, WalOptions>;
pub type EncodedWalOptions = HashMap<String, String>;

impl From<WalOptions> for EncodedWalOptions {
fn from(value: WalOptions) -> Self {
// TODO(niebayes): implement encoding/decoding for wal options.
EncodedWalOptions::default()
}
}

impl TryFrom<EncodedWalOptions> for WalOptions {
type Error = crate::error::Error;

fn try_from(value: EncodedWalOptions) -> Result<Self> {
todo!()
}
}
3 changes: 3 additions & 0 deletions src/common/meta/src/wal/kafka.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,3 +23,6 @@ pub use crate::wal::kafka::topic_manager::TopicManager as KafkaTopicManager;

#[derive(Clone, Debug, PartialEq, Serialize, Deserialize, Default)]
pub struct KafkaConfig;

#[derive(Serialize, Deserialize, Default, Debug, Clone, PartialEq)]
pub struct KafkaWalOptions;
44 changes: 44 additions & 0 deletions src/common/meta/src/wal/options_allocator.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use crate::error::Result;
use crate::wal::{WalConfig, WalOptions};

pub struct WalOptionsAllocator {
// TODO(niebayes): uncomment this.
// kafka_topic_manager: KafkaTopicManager,
}

impl WalOptionsAllocator {
/// Creates a WalOptionsAllocator.
pub fn new(config: &WalConfig) -> Self {
// TODO(niebayes): properly init.
Self {}
}

pub fn try_init(&self) -> Result<()> {
todo!()
}

/// Allocates a wal options for a region.
pub fn alloc(&self) -> WalOptions {
todo!()
}

/// Allocates a wal options for each region.
pub fn alloc_batch(&self, num_regions: usize) -> Vec<WalOptions> {
// TODO(niebayes): allocate a batch of region wal options.
vec![WalOptions::default(); num_regions]
}
}
73 changes: 0 additions & 73 deletions src/common/meta/src/wal/region_wal_options.rs

This file was deleted.

4 changes: 2 additions & 2 deletions src/frontend/src/instance/standalone.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use common_meta::kv_backend::KvBackendRef;
use common_meta::peer::Peer;
use common_meta::rpc::router::{Region, RegionRoute};
use common_meta::sequence::{Sequence, SequenceRef};
use common_meta::wal::region_wal_options::RegionWalOptionsMap;
use common_meta::wal::WalOptionsMap;
use common_recordbatch::SendableRecordBatchStream;
use common_telemetry::tracing;
use common_telemetry::tracing_context::{FutureExt, TracingContext};
Expand Down Expand Up @@ -154,7 +154,7 @@ impl TableMetadataAllocator for StandaloneTableMetadataCreator {
Ok(TableMetadata {
table_id,
region_routes,
region_wal_options_map: RegionWalOptionsMap::default(),
region_wal_options_map: WalOptionsMap::default(),
})
}
}
13 changes: 5 additions & 8 deletions src/meta-srv/src/metasrv/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,7 @@ use common_meta::kv_backend::{KvBackendRef, ResettableKvBackendRef};
use common_meta::region_keeper::{MemoryRegionKeeper, MemoryRegionKeeperRef};
use common_meta::sequence::Sequence;
use common_meta::state_store::KvStateStore;
use common_meta::wal::region_wal_options::RegionWalOptionsAllocator;
use common_meta::wal::WalConfig;
use common_meta::wal::{WalConfig, WalOptionsAllocator};
use common_procedure::local::{LocalManager, ManagerConfig};
use common_procedure::ProcedureManagerRef;
use snafu::ResultExt;
Expand Down Expand Up @@ -205,13 +204,13 @@ impl MetaSrvBuilder {
table_id: None,
};

let region_wal_options_allocator = build_region_wal_options_allocator(&options.wal).await?;
let wal_options_allocator = build_wal_options_allocator(&options.wal).await?;
let table_metadata_allocator = table_metadata_allocator.unwrap_or_else(|| {
Arc::new(MetaSrvTableMetadataAllocator::new(
selector_ctx.clone(),
selector.clone(),
table_id_sequence.clone(),
region_wal_options_allocator,
wal_options_allocator,
))
});

Expand Down Expand Up @@ -349,11 +348,9 @@ fn build_procedure_manager(
Arc::new(LocalManager::new(manager_config, state_store))
}

async fn build_region_wal_options_allocator(
config: &WalConfig,
) -> Result<RegionWalOptionsAllocator> {
async fn build_wal_options_allocator(config: &WalConfig) -> Result<WalOptionsAllocator> {
// TODO(niebayes): init the allocator.
Ok(RegionWalOptionsAllocator::new(config))
Ok(WalOptionsAllocator::new(config))
}

fn build_ddl_manager(
Expand Down
Loading

0 comments on commit f6e019a

Please sign in to comment.