Skip to content

Commit

Permalink
feat(remote_wal): add skeleton for remote wal related to meta srv (#2933
Browse files Browse the repository at this point in the history
)

* feat: introduce wal config and kafka config

* feat: introduce kafka topic manager and selector

* feat: introduce region wal options

* chore: build region wal options upon starting meta srv

* feat: integrate region wal options allocator into table meta allocator

* chore: add wal config to metasrv.example.toml

* chore: add region wal options map to create table procedure

* feat: augment region create request with wal options

* feat: augment DatanodeTableValue with region wal options map

* chore: encode region wal options upon constructing table creator

* feat: persist region wal options when creating table meta

* fix: sqlness test

* chore: set default wal provider to raft-engine

* refactor: refactor wal options

* chore: update wal options allocator

* refactor: rename region wal options to wal options

* chore: update usages of region wal options

* chore: add some comments to kafka

* chore: fill in kafka config

* test: add tests for serde wal config

* test: add tests for wal options

* refactor: refactor wal options allocator to enum

* refactor: store wal options into the request options instead

* fix: typo

* fix: typo

* refactor: move wal options map to region info

* refactor: refacto serialization and deserialization of wal options

* refactor: use serde_json to encode wal options

* chore: rename wal_options_map to region_wal_options

* chore: resolve some review comments

* fix: typo

* refactor: replace kecab-case with snake_case

* fix: sqlness and converage tests

* fix: typo

* fix: coverage test

* fix: coverage test

* chore: resolve some review conversations

* Update config/metasrv.example.toml

Co-authored-by: Weny Xu <[email protected]>

---------

Co-authored-by: Weny Xu <[email protected]>
  • Loading branch information
niebayes and WenyXu authored Dec 19, 2023
1 parent 6b8dbcf commit d273385
Show file tree
Hide file tree
Showing 42 changed files with 790 additions and 88 deletions.
3 changes: 3 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

25 changes: 25 additions & 0 deletions config/metasrv.example.toml
Original file line number Diff line number Diff line change
Expand Up @@ -42,3 +42,28 @@ first_heartbeat_estimate = "1000ms"
# timeout = "10s"
# connect_timeout = "10s"
# tcp_nodelay = true

[wal]
# Available wal providers:
# - "raft_engine" (default)
# - "kafka"
provider = "raft_engine"

# There're none raft-engine wal config since meta srv only involves in remote wal currently.

# Kafka wal config.
# The broker endpoints of the Kafka cluster. ["127.0.0.1:9090"] by default.
# broker_endpoints = ["127.0.0.1:9090"]
# Number of topics to be created upon start.
# num_topics = 64
# Topic selector type.
# Available selector types:
# - "round_robin" (default)
# selector_type = "round_robin"
# A Kafka topic is constructed by concatenating `topic_name_prefix` and `topic_id`.
# topic_name_prefix = "greptimedb_kafka_wal"
# Number of partitions per topic.
# num_partitions = 1
# Expected number of replicas of each partition.
# replication_factor = 3

1 change: 1 addition & 0 deletions src/cmd/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ serde_json.workspace = true
servers.workspace = true
session.workspace = true
snafu.workspace = true
store-api.workspace = true
substrait.workspace = true
table.workspace = true
tokio.workspace = true
Expand Down
19 changes: 13 additions & 6 deletions src/cmd/src/cli/bench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::BTreeMap;
use std::collections::{BTreeMap, HashMap};
use std::future::Future;
use std::sync::Arc;
use std::time::Duration;
Expand All @@ -28,6 +28,7 @@ use common_telemetry::info;
use datatypes::data_type::ConcreteDataType;
use datatypes::schema::{ColumnSchema, RawSchema};
use rand::Rng;
use store_api::storage::RegionNumber;
use table::metadata::{RawTableInfo, RawTableMeta, TableId, TableIdent, TableType};

use self::metadata::TableMetadataBencher;
Expand Down Expand Up @@ -137,12 +138,12 @@ fn create_table_info(table_id: TableId, table_name: TableName) -> RawTableInfo {
}
}

fn create_region_routes() -> Vec<RegionRoute> {
let mut regions = Vec::with_capacity(100);
fn create_region_routes(regions: Vec<RegionNumber>) -> Vec<RegionRoute> {
let mut region_routes = Vec::with_capacity(100);
let mut rng = rand::thread_rng();

for region_id in 0..64u64 {
regions.push(RegionRoute {
for region_id in regions.into_iter().map(u64::from) {
region_routes.push(RegionRoute {
region: Region {
id: region_id.into(),
name: String::new(),
Expand All @@ -158,5 +159,11 @@ fn create_region_routes() -> Vec<RegionRoute> {
});
}

regions
region_routes
}

fn create_region_wal_options(regions: Vec<RegionNumber>) -> HashMap<RegionNumber, String> {
// TODO(niebayes): construct region wal options for benchmark.
let _ = regions;
HashMap::default()
}
9 changes: 6 additions & 3 deletions src/cmd/src/cli/bench/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use std::time::Instant;
use common_meta::key::TableMetadataManagerRef;
use common_meta::table_name::TableName;

use super::{bench_self_recorded, create_region_routes, create_table_info};
use crate::cli::bench::*;

pub struct TableMetadataBencher {
table_metadata_manager: TableMetadataManagerRef,
Expand All @@ -43,12 +43,15 @@ impl TableMetadataBencher {
let table_name = format!("bench_table_name_{}", i);
let table_name = TableName::new("bench_catalog", "bench_schema", table_name);
let table_info = create_table_info(i, table_name);
let region_routes = create_region_routes();

let regions: Vec<_> = (0..64).collect();
let region_routes = create_region_routes(regions.clone());
let region_wal_options = create_region_wal_options(regions);

let start = Instant::now();

self.table_metadata_manager
.create_table_metadata(table_info, region_routes)
.create_table_metadata(table_info, region_routes, region_wal_options)
.await
.unwrap();

Expand Down
5 changes: 5 additions & 0 deletions src/cmd/src/cli/upgrade.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::HashMap;
use std::sync::Arc;

use async_trait::async_trait;
Expand Down Expand Up @@ -395,6 +396,9 @@ impl MigrateTableMetadata {
let region_distribution: RegionDistribution =
value.regions_id_map.clone().into_iter().collect();

// TODO(niebayes): properly fetch or construct wal options.
let region_wal_options = HashMap::default();

let datanode_table_kvs = region_distribution
.into_iter()
.map(|(datanode_id, regions)| {
Expand All @@ -409,6 +413,7 @@ impl MigrateTableMetadata {
engine: engine.to_string(),
region_storage_path: region_storage_path.clone(),
region_options: (&value.table_info.meta.options).into(),
region_wal_options: region_wal_options.clone(),
},
),
)
Expand Down
2 changes: 2 additions & 0 deletions src/common/meta/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,13 @@ prost.workspace = true
regex.workspace = true
serde.workspace = true
serde_json.workspace = true
serde_with = "3"
snafu.workspace = true
store-api.workspace = true
strum.workspace = true
table.workspace = true
tokio.workspace = true
toml.workspace = true
tonic.workspace = true

[dev-dependencies]
Expand Down
16 changes: 14 additions & 2 deletions src/common/meta/src/ddl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,12 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::HashMap;
use std::sync::Arc;

use api::v1::meta::Partition;
use common_telemetry::tracing_context::W3cTrace;
use store_api::storage::TableId;
use store_api::storage::{RegionNumber, TableId};
use table::metadata::RawTableInfo;

use crate::cache_invalidator::CacheInvalidatorRef;
Expand Down Expand Up @@ -54,14 +55,25 @@ pub struct TableMetadataAllocatorContext {
pub cluster_id: u64,
}

/// Metadata allocated to a table.
pub struct TableMetadata {
/// Table id.
pub table_id: TableId,
/// Route information for each region of the table.
pub region_routes: Vec<RegionRoute>,
/// The encoded wal options for regions of the table.
// If a region does not have an associated wal options, no key for the region would be found in the map.
pub region_wal_options: HashMap<RegionNumber, String>,
}

#[async_trait::async_trait]
pub trait TableMetadataAllocator: Send + Sync {
async fn create(
&self,
ctx: &TableMetadataAllocatorContext,
table_info: &mut RawTableInfo,
partitions: &[Partition],
) -> Result<(TableId, Vec<RegionRoute>)>;
) -> Result<TableMetadata>;
}

pub type TableMetadataAllocatorRef = Arc<dyn TableMetadataAllocator>;
Expand Down
36 changes: 31 additions & 5 deletions src/common/meta/src/ddl/create_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::HashMap;

use api::v1::region::region_request::Body as PbRegionRequest;
use api::v1::region::{
CreateRequest as PbCreateRegionRequest, RegionColumnDef, RegionRequest, RegionRequestHeader,
Expand All @@ -30,7 +32,7 @@ use futures::future::join_all;
use serde::{Deserialize, Serialize};
use snafu::{ensure, OptionExt, ResultExt};
use store_api::metric_engine_consts::LOGICAL_TABLE_METADATA_KEY;
use store_api::storage::RegionId;
use store_api::storage::{RegionId, RegionNumber};
use strum::AsRefStr;
use table::engine::TableReference;
use table::metadata::{RawTableInfo, TableId};
Expand All @@ -45,6 +47,7 @@ use crate::rpc::ddl::CreateTableTask;
use crate::rpc::router::{
find_leader_regions, find_leaders, operating_leader_regions, RegionRoute,
};
use crate::wal::WAL_OPTIONS_KEY;

pub struct CreateTableProcedure {
pub context: DdlContext,
Expand All @@ -58,11 +61,12 @@ impl CreateTableProcedure {
cluster_id: u64,
task: CreateTableTask,
region_routes: Vec<RegionRoute>,
region_wal_options: HashMap<RegionNumber, String>,
context: DdlContext,
) -> Self {
Self {
context,
creator: TableCreator::new(cluster_id, task, region_routes),
creator: TableCreator::new(cluster_id, task, region_routes, region_wal_options),
}
}

Expand Down Expand Up @@ -94,6 +98,10 @@ impl CreateTableProcedure {
&self.creator.data.region_routes
}

pub fn region_wal_options(&self) -> &HashMap<RegionNumber, String> {
&self.creator.data.region_wal_options
}

/// Checks whether the table exists.
async fn on_prepare(&mut self) -> Result<Status> {
let expr = &self.creator.data.task.create_table;
Expand Down Expand Up @@ -193,6 +201,7 @@ impl CreateTableProcedure {

let create_table_data = &self.creator.data;
let region_routes = &create_table_data.region_routes;
let region_wal_options = &create_table_data.region_wal_options;

let create_table_expr = &create_table_data.task.create_table;
let catalog = &create_table_expr.catalog_name;
Expand All @@ -211,12 +220,12 @@ impl CreateTableProcedure {
let mut requests = Vec::with_capacity(regions.len());
for region_number in regions {
let region_id = RegionId::new(self.table_id(), region_number);

let create_region_request = request_builder
.build_one(
&self.creator.data.task.create_table,
region_id,
storage_path.clone(),
region_wal_options,
)
.await?;

Expand Down Expand Up @@ -262,8 +271,9 @@ impl CreateTableProcedure {

let raw_table_info = self.table_info().clone();
let region_routes = self.region_routes().clone();
let region_wal_options = self.region_wal_options().clone();
manager
.create_table_metadata(raw_table_info, region_routes)
.create_table_metadata(raw_table_info, region_routes, region_wal_options)
.await?;
info!("Created table metadata for table {table_id}");

Expand Down Expand Up @@ -316,13 +326,19 @@ pub struct TableCreator {
}

impl TableCreator {
pub fn new(cluster_id: u64, task: CreateTableTask, region_routes: Vec<RegionRoute>) -> Self {
pub fn new(
cluster_id: u64,
task: CreateTableTask,
region_routes: Vec<RegionRoute>,
region_wal_options: HashMap<RegionNumber, String>,
) -> Self {
Self {
data: CreateTableData {
state: CreateTableState::Prepare,
cluster_id,
task,
region_routes,
region_wal_options,
},
opening_regions: vec![],
}
Expand Down Expand Up @@ -371,6 +387,7 @@ pub struct CreateTableData {
pub state: CreateTableState,
pub task: CreateTableTask,
pub region_routes: Vec<RegionRoute>,
pub region_wal_options: HashMap<RegionNumber, String>,
pub cluster_id: u64,
}

Expand Down Expand Up @@ -406,11 +423,20 @@ impl CreateRequestBuilder {
create_expr: &CreateTableExpr,
region_id: RegionId,
storage_path: String,
region_wal_options: &HashMap<RegionNumber, String>,
) -> Result<PbCreateRegionRequest> {
let mut request = self.template.clone();

request.region_id = region_id.as_u64();
request.path = storage_path;
// Stores the encoded wal options into the request options.
region_wal_options
.get(&region_id.region_number())
.and_then(|wal_options| {
request
.options
.insert(WAL_OPTIONS_KEY.to_string(), wal_options.clone())
});

if self.template.engine == METRIC_ENGINE {
self.metric_engine_hook(create_expr, region_id, &mut request)
Expand Down
Loading

0 comments on commit d273385

Please sign in to comment.