Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(remote_wal): add skeleton for remote wal related to meta srv #2933

Merged
merged 45 commits into from
Dec 19, 2023
Merged
Show file tree
Hide file tree
Changes from 37 commits
Commits
Show all changes
45 commits
Select commit Hold shift + click to select a range
7c209e9
feat: introduce wal config and kafka config
niebayes Dec 14, 2023
83c8d85
feat: introduce kafka topic manager and selector
niebayes Dec 14, 2023
0ac7cd7
feat: introduce region wal options
niebayes Dec 14, 2023
861cced
chore: build region wal options upon starting meta srv
niebayes Dec 14, 2023
5154865
feat: integrate region wal options allocator into table meta allocator
niebayes Dec 14, 2023
74f7c2b
chore: add wal config to metasrv.example.toml
niebayes Dec 14, 2023
cd149dd
chore: add region wal options map to create table procedure
niebayes Dec 14, 2023
ef1c819
feat: augment region create request with wal options
niebayes Dec 14, 2023
61e1741
feat: augment DatanodeTableValue with region wal options map
niebayes Dec 14, 2023
f5e42ad
chore: encode region wal options upon constructing table creator
niebayes Dec 14, 2023
b22af80
feat: persist region wal options when creating table meta
niebayes Dec 14, 2023
9d16265
fix: sqlness test
niebayes Dec 14, 2023
90d3069
chore: set default wal provider to raft-engine
niebayes Dec 14, 2023
f6e019a
refactor: refactor wal options
niebayes Dec 15, 2023
0c4a798
chore: update wal options allocator
niebayes Dec 15, 2023
4ee06f6
refactor: rename region wal options to wal options
niebayes Dec 15, 2023
a6d04c2
chore: update usages of region wal options
niebayes Dec 15, 2023
29cbacf
chore: add some comments to kafka
niebayes Dec 15, 2023
0ea5461
chore: fill in kafka config
niebayes Dec 15, 2023
1a4623e
test: add tests for serde wal config
niebayes Dec 15, 2023
8543f2f
test: add tests for wal options
niebayes Dec 15, 2023
4aeb686
refactor: refactor wal options allocator to enum
niebayes Dec 15, 2023
153d460
refactor: store wal options into the request options instead
niebayes Dec 15, 2023
6da6de0
fix: typo
niebayes Dec 15, 2023
f1f5905
fix: typo
niebayes Dec 15, 2023
56c325d
refactor: move wal options map to region info
niebayes Dec 15, 2023
c430bc9
refactor: refacto serialization and deserialization of wal options
niebayes Dec 18, 2023
de9cdb5
refactor: use serde_json to encode wal options
niebayes Dec 18, 2023
6fa7205
chore: rename wal_options_map to region_wal_options
niebayes Dec 18, 2023
4d8c844
chore: resolve some review comments
niebayes Dec 18, 2023
3a420df
chore: merge develop
niebayes Dec 18, 2023
4be162d
fix: typo
niebayes Dec 18, 2023
c3efada
refactor: replace kecab-case with snake_case
niebayes Dec 18, 2023
7bb3981
fix: sqlness and converage tests
niebayes Dec 18, 2023
97a0fee
fix: typo
niebayes Dec 18, 2023
6781a5f
fix: coverage test
niebayes Dec 18, 2023
b2ec3d1
fix: coverage test
niebayes Dec 18, 2023
4350751
chore: resolve some review conversations
niebayes Dec 19, 2023
f6b8464
fix: resolve some review conversations
niebayes Dec 19, 2023
33c4b70
chore: format comments in metasrv.example.toml
niebayes Dec 19, 2023
cd1afdd
chore: update import style
niebayes Dec 19, 2023
9a3a967
feat: integrate wal options allocator to standalone mode
niebayes Dec 19, 2023
53bf3a5
test: add compatible test for OpenRegion
niebayes Dec 19, 2023
157fc71
test: add compatible test for UpdateRegionMetadata
niebayes Dec 19, 2023
d8ef9bc
chore: remove based suffix from topic selector type
niebayes Dec 19, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

24 changes: 24 additions & 0 deletions config/metasrv.example.toml
Original file line number Diff line number Diff line change
Expand Up @@ -42,3 +42,27 @@ first_heartbeat_estimate = "1000ms"
# timeout = "10s"
# connect_timeout = "10s"
# tcp_nodelay = true

[wal]
# Available wal providers:
# - "raft_engine" (default)
# - "kafka"
provider = "raft_engine"

# There're none raft-engine wal config since meta srv only involves in remote wal currently.
niebayes marked this conversation as resolved.
Show resolved Hide resolved

niebayes marked this conversation as resolved.
Show resolved Hide resolved
# Kafka wal config.
# The broker endpoints of the Kafka cluster. ["127.0.0.1:9090"] by default.
broker_endpoints = ["127.0.0.1:9090"]
# Number of topics to be created upon start.
num_topics = 64
# Topic selector type.
# Available selector types:
# - "round_robin" (default)
selector_type = "round_robin"
# A Kafka topic is constructed by concatenating `topic_name_prefix` and `topic_id`.
topic_name_prefix = "greptimedb_kafka_wal"
# Number of partitions per topic.
num_partitions = 1
# Expected number of replicas of each partition.
replication_factor = 3
niebayes marked this conversation as resolved.
Show resolved Hide resolved
1 change: 1 addition & 0 deletions src/cmd/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ serde_json.workspace = true
servers.workspace = true
session.workspace = true
snafu.workspace = true
store-api.workspace = true
substrait.workspace = true
table.workspace = true
tokio.workspace = true
Expand Down
19 changes: 13 additions & 6 deletions src/cmd/src/cli/bench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::BTreeMap;
use std::collections::{BTreeMap, HashMap};
use std::future::Future;
use std::sync::Arc;
use std::time::Duration;
Expand All @@ -28,6 +28,7 @@ use common_telemetry::info;
use datatypes::data_type::ConcreteDataType;
use datatypes::schema::{ColumnSchema, RawSchema};
use rand::Rng;
use store_api::storage::RegionNumber;
use table::metadata::{RawTableInfo, RawTableMeta, TableId, TableIdent, TableType};

use self::metadata::TableMetadataBencher;
Expand Down Expand Up @@ -137,12 +138,12 @@ fn create_table_info(table_id: TableId, table_name: TableName) -> RawTableInfo {
}
}

fn create_region_routes() -> Vec<RegionRoute> {
let mut regions = Vec::with_capacity(100);
fn create_region_routes(regions: Vec<RegionNumber>) -> Vec<RegionRoute> {
let mut region_routes = Vec::with_capacity(100);
let mut rng = rand::thread_rng();

for region_id in 0..64u64 {
regions.push(RegionRoute {
for region_id in regions.into_iter().map(u64::from) {
region_routes.push(RegionRoute {
region: Region {
id: region_id.into(),
name: String::new(),
Expand All @@ -158,5 +159,11 @@ fn create_region_routes() -> Vec<RegionRoute> {
});
}

regions
region_routes
}

fn create_region_wal_options(regions: Vec<RegionNumber>) -> HashMap<RegionNumber, String> {
// TODO(niebayes): construct region wal options for benchmark.
let _ = regions;
HashMap::default()
}
11 changes: 8 additions & 3 deletions src/cmd/src/cli/bench/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@ use std::time::Instant;
use common_meta::key::TableMetadataManagerRef;
use common_meta::table_name::TableName;

use super::{bench_self_recorded, create_region_routes, create_table_info};
use super::{
bench_self_recorded, create_region_routes, create_region_wal_options, create_table_info,
};
niebayes marked this conversation as resolved.
Show resolved Hide resolved

pub struct TableMetadataBencher {
table_metadata_manager: TableMetadataManagerRef,
Expand All @@ -43,12 +45,15 @@ impl TableMetadataBencher {
let table_name = format!("bench_table_name_{}", i);
let table_name = TableName::new("bench_catalog", "bench_schema", table_name);
let table_info = create_table_info(i, table_name);
let region_routes = create_region_routes();

let regions: Vec<_> = (0..64).collect();
let region_routes = create_region_routes(regions.clone());
let region_wal_options = create_region_wal_options(regions);

let start = Instant::now();

self.table_metadata_manager
.create_table_metadata(table_info, region_routes)
.create_table_metadata(table_info, region_routes, region_wal_options)
.await
.unwrap();

Expand Down
5 changes: 5 additions & 0 deletions src/cmd/src/cli/upgrade.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::HashMap;
use std::sync::Arc;

use async_trait::async_trait;
Expand Down Expand Up @@ -395,6 +396,9 @@ impl MigrateTableMetadata {
let region_distribution: RegionDistribution =
value.regions_id_map.clone().into_iter().collect();

// TODO(niebayes): properly fetch or construct wal options.
let region_wal_options = HashMap::default();

let datanode_table_kvs = region_distribution
.into_iter()
.map(|(datanode_id, regions)| {
Expand All @@ -409,6 +413,7 @@ impl MigrateTableMetadata {
engine: engine.to_string(),
region_storage_path: region_storage_path.clone(),
region_options: (&value.table_info.meta.options).into(),
region_wal_options: region_wal_options.clone(),
},
),
)
Expand Down
2 changes: 2 additions & 0 deletions src/common/meta/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,13 @@ prost.workspace = true
regex.workspace = true
serde.workspace = true
serde_json.workspace = true
serde_with = "3"
snafu.workspace = true
store-api.workspace = true
strum.workspace = true
table.workspace = true
tokio.workspace = true
toml.workspace = true
tonic.workspace = true

[dev-dependencies]
Expand Down
16 changes: 14 additions & 2 deletions src/common/meta/src/ddl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,12 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::HashMap;
use std::sync::Arc;

use api::v1::meta::Partition;
use common_telemetry::tracing_context::W3cTrace;
use store_api::storage::TableId;
use store_api::storage::{RegionNumber, TableId};
use table::metadata::RawTableInfo;

use crate::cache_invalidator::CacheInvalidatorRef;
Expand Down Expand Up @@ -54,14 +55,25 @@ pub struct TableMetadataAllocatorContext {
pub cluster_id: u64,
}

/// Metadata allocated to a table.
pub struct TableMetadata {
/// Table id.
pub table_id: TableId,
/// Route information for each region of the table.
pub region_routes: Vec<RegionRoute>,
/// The encoded wal options for regions of the table.
// If a region does not have an associated wal options, no key for the region would be found in the map.
pub region_wal_options: HashMap<RegionNumber, String>,
}

#[async_trait::async_trait]
pub trait TableMetadataAllocator: Send + Sync {
async fn create(
&self,
ctx: &TableMetadataAllocatorContext,
table_info: &mut RawTableInfo,
partitions: &[Partition],
) -> Result<(TableId, Vec<RegionRoute>)>;
) -> Result<TableMetadata>;
}

pub type TableMetadataAllocatorRef = Arc<dyn TableMetadataAllocator>;
Expand Down
36 changes: 31 additions & 5 deletions src/common/meta/src/ddl/create_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::HashMap;

use api::v1::region::region_request::Body as PbRegionRequest;
use api::v1::region::{
CreateRequest as PbCreateRegionRequest, RegionColumnDef, RegionRequest, RegionRequestHeader,
Expand All @@ -30,7 +32,7 @@ use futures::future::join_all;
use serde::{Deserialize, Serialize};
use snafu::{ensure, OptionExt, ResultExt};
use store_api::metric_engine_consts::LOGICAL_TABLE_METADATA_KEY;
use store_api::storage::RegionId;
use store_api::storage::{RegionId, RegionNumber};
use strum::AsRefStr;
use table::engine::TableReference;
use table::metadata::{RawTableInfo, TableId};
Expand All @@ -45,6 +47,7 @@ use crate::rpc::ddl::CreateTableTask;
use crate::rpc::router::{
find_leader_regions, find_leaders, operating_leader_regions, RegionRoute,
};
use crate::wal::WAL_OPTIONS_KEY;

pub struct CreateTableProcedure {
pub context: DdlContext,
Expand All @@ -58,11 +61,12 @@ impl CreateTableProcedure {
cluster_id: u64,
task: CreateTableTask,
region_routes: Vec<RegionRoute>,
region_wal_options: HashMap<RegionNumber, String>,
context: DdlContext,
) -> Self {
Self {
context,
creator: TableCreator::new(cluster_id, task, region_routes),
creator: TableCreator::new(cluster_id, task, region_routes, region_wal_options),
}
}

Expand Down Expand Up @@ -94,6 +98,10 @@ impl CreateTableProcedure {
&self.creator.data.region_routes
}

pub fn region_wal_options(&self) -> &HashMap<RegionNumber, String> {
&self.creator.data.region_wal_options
}

/// Checks whether the table exists.
async fn on_prepare(&mut self) -> Result<Status> {
let expr = &self.creator.data.task.create_table;
Expand Down Expand Up @@ -193,6 +201,7 @@ impl CreateTableProcedure {

let create_table_data = &self.creator.data;
let region_routes = &create_table_data.region_routes;
let region_wal_options = &create_table_data.region_wal_options;

let create_table_expr = &create_table_data.task.create_table;
let catalog = &create_table_expr.catalog_name;
Expand All @@ -211,12 +220,12 @@ impl CreateTableProcedure {
let mut requests = Vec::with_capacity(regions.len());
for region_number in regions {
let region_id = RegionId::new(self.table_id(), region_number);

let create_region_request = request_builder
.build_one(
&self.creator.data.task.create_table,
region_id,
storage_path.clone(),
region_wal_options,
)
.await?;

Expand Down Expand Up @@ -262,8 +271,9 @@ impl CreateTableProcedure {

let raw_table_info = self.table_info().clone();
let region_routes = self.region_routes().clone();
let region_wal_options = self.region_wal_options().clone();
manager
.create_table_metadata(raw_table_info, region_routes)
.create_table_metadata(raw_table_info, region_routes, region_wal_options)
.await?;
info!("Created table metadata for table {table_id}");

Expand Down Expand Up @@ -316,13 +326,19 @@ pub struct TableCreator {
}

impl TableCreator {
pub fn new(cluster_id: u64, task: CreateTableTask, region_routes: Vec<RegionRoute>) -> Self {
pub fn new(
cluster_id: u64,
task: CreateTableTask,
region_routes: Vec<RegionRoute>,
region_wal_options: HashMap<RegionNumber, String>,
) -> Self {
Self {
data: CreateTableData {
state: CreateTableState::Prepare,
cluster_id,
task,
region_routes,
region_wal_options,
},
opening_regions: vec![],
}
Expand Down Expand Up @@ -371,6 +387,7 @@ pub struct CreateTableData {
pub state: CreateTableState,
pub task: CreateTableTask,
pub region_routes: Vec<RegionRoute>,
pub region_wal_options: HashMap<RegionNumber, String>,
pub cluster_id: u64,
}

Expand Down Expand Up @@ -406,11 +423,20 @@ impl CreateRequestBuilder {
create_expr: &CreateTableExpr,
region_id: RegionId,
storage_path: String,
region_wal_options: &HashMap<RegionNumber, String>,
) -> Result<PbCreateRegionRequest> {
let mut request = self.template.clone();

request.region_id = region_id.as_u64();
request.path = storage_path;
// Stores the encoded wal options into the request options.
region_wal_options
.get(&region_id.region_number())
.and_then(|wal_options| {
request
.options
.insert(WAL_OPTIONS_KEY.to_string(), wal_options.clone())
});

if self.template.engine == METRIC_ENGINE {
self.metric_engine_hook(create_expr, region_id, &mut request)
Expand Down
Loading
Loading