Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(remote_wal): make meta srv able to alloc kafka topics #2753

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 43 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ reqwest = { version = "0.11", default-features = false, features = [
"rustls-tls-native-roots",
"stream",
] }
rskafka = "0.5"
rust_decimal = "1.33"
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
Expand Down
24 changes: 24 additions & 0 deletions config/metasrv.example.toml
Original file line number Diff line number Diff line change
Expand Up @@ -42,3 +42,27 @@ first_heartbeat_estimate = "1000ms"
# timeout = "10s"
# connect_timeout = "10s"
# tcp_nodelay = true

# Wal options.
[wal]
niebayes marked this conversation as resolved.
Show resolved Hide resolved
# Available wal providers:
# - "RaftEngine" (default)
# - "Kafka"
provider = "Kafka"

# Kafka wal options.
[wal.kafka]
# The broker endpoints of the Kafka cluster. ["127.0.0.1:9090"] by default.
broker_endpoints = ["127.0.0.1:9090"]
# Number of topics to be created upon start.
num_topics = 64
# Topic selector type.
# Available selector types:
# - "RoundRobin" (default)
selector_type = "RoundRobin"
# A Kafka topic is constructed by concatenating `topic_name_prefix` and `topic_id`.
topic_name_prefix = "greptime_wal"
# Number of partitions per topic.
num_partitions = 1
# Expected number of replicas of each partition.
replication_factor = 3
2 changes: 1 addition & 1 deletion src/cmd/src/options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ mod tests {
Some("mybucket"),
),
(
// wal.dir = /other/wal/dir
// wal.raft_engine_opts.dir = /other/wal/dir
[
env_prefix.to_string(),
"wal".to_uppercase(),
Expand Down
3 changes: 3 additions & 0 deletions src/common/meta/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,17 @@ humantime-serde.workspace = true
lazy_static.workspace = true
prometheus.workspace = true
prost.workspace = true
rand.workspace = true
regex.workspace = true
rskafka.workspace = true
serde.workspace = true
serde_json.workspace = true
snafu.workspace = true
store-api.workspace = true
strum.workspace = true
table.workspace = true
tokio.workspace = true
toml.workspace = true
tonic.workspace = true

[dev-dependencies]
Expand Down
9 changes: 8 additions & 1 deletion src/common/meta/src/ddl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use crate::error::Result;
use crate::key::TableMetadataManagerRef;
use crate::rpc::ddl::{SubmitDdlTaskRequest, SubmitDdlTaskResponse};
use crate::rpc::router::RegionRoute;
use crate::wal::kafka::KafkaTopic;

pub mod alter_table;
pub mod create_table;
Expand Down Expand Up @@ -53,14 +54,20 @@ pub struct TableMetadataAllocatorContext {
pub cluster_id: u64,
}

pub struct TableMetadata {
niebayes marked this conversation as resolved.
Show resolved Hide resolved
pub table_id: TableId,
pub region_routes: Vec<RegionRoute>,
pub region_topics: Vec<KafkaTopic>,
niebayes marked this conversation as resolved.
Show resolved Hide resolved
}

#[async_trait::async_trait]
pub trait TableMetadataAllocator: Send + Sync {
async fn create(
&self,
ctx: &TableMetadataAllocatorContext,
table_info: &mut RawTableInfo,
partitions: &[Partition],
) -> Result<(TableId, Vec<RegionRoute>)>;
) -> Result<TableMetadata>;
}

pub type TableMetadataAllocatorRef = Arc<dyn TableMetadataAllocator>;
Expand Down
49 changes: 45 additions & 4 deletions src/common/meta/src/ddl/create_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::HashMap;

use api::v1::region::region_request::Body as PbRegionRequest;
use api::v1::region::{
CreateRequest as PbCreateRegionRequest, RegionColumnDef, RegionRequest, RegionRequestHeader,
Expand All @@ -32,11 +34,15 @@ use table::metadata::{RawTableInfo, TableId};

use crate::ddl::utils::{handle_operate_region_error, handle_retry_error, region_storage_path};
use crate::ddl::DdlContext;
use crate::error::{self, Result};
use crate::error::{self, Result, UnexpectedNumRegionTopicsSnafu};
use crate::key::table_name::TableNameKey;
use crate::metrics;
use crate::rpc::ddl::CreateTableTask;
use crate::rpc::router::{find_leader_regions, find_leaders, RegionRoute};
use crate::wal::kafka::KafkaTopic;

// TODO(niebayes): Maybe move `TOPIC_KEY` into a more appropriate crate.
pub const TOPIC_KEY: &str = "kafka_topic";
niebayes marked this conversation as resolved.
Show resolved Hide resolved

pub struct CreateTableProcedure {
pub context: DdlContext,
Expand All @@ -50,11 +56,12 @@ impl CreateTableProcedure {
cluster_id: u64,
task: CreateTableTask,
region_routes: Vec<RegionRoute>,
region_topics: Vec<KafkaTopic>,
context: DdlContext,
) -> Self {
Self {
context,
creator: TableCreator::new(cluster_id, task, region_routes),
creator: TableCreator::new(cluster_id, task, region_routes, region_topics),
}
}

Expand Down Expand Up @@ -170,6 +177,27 @@ impl CreateTableProcedure {
pub async fn on_datanode_create_regions(&mut self) -> Result<Status> {
let create_table_data = &self.creator.data;
let region_routes = &create_table_data.region_routes;
let region_topics = &create_table_data.region_topics;

// The following checking is redundant as the wal meta allocator ensures the allocated
// region topics are of the same length as the region routes.
if !region_topics.is_empty() {
niebayes marked this conversation as resolved.
Show resolved Hide resolved
let num_region_routes = region_routes.len();
let num_region_topics = region_topics.len();
ensure!(
num_region_routes == num_region_topics,
UnexpectedNumRegionTopicsSnafu {
num_region_topics,
num_region_routes
}
);
}
niebayes marked this conversation as resolved.
Show resolved Hide resolved

let region_topic_map: HashMap<_, _> = region_routes
.iter()
.map(|route| route.region.id.region_number())
.zip(region_topics)
.collect();

let create_table_expr = &create_table_data.task.create_table;
let catalog = &create_table_expr.catalog_name;
Expand All @@ -193,6 +221,12 @@ impl CreateTableProcedure {
let mut create_region_request = request_template.clone();
create_region_request.region_id = region_id.as_u64();
create_region_request.path = storage_path.clone();
region_topic_map.get(region_number).and_then(|topic| {
create_region_request
.options
.insert(TOPIC_KEY.to_string(), topic.to_string())
});
niebayes marked this conversation as resolved.
Show resolved Hide resolved

PbRegionRequest::Create(create_region_request)
})
.collect::<Vec<_>>();
Expand Down Expand Up @@ -285,13 +319,19 @@ pub struct TableCreator {
}

impl TableCreator {
pub fn new(cluster_id: u64, task: CreateTableTask, region_routes: Vec<RegionRoute>) -> Self {
pub fn new(
cluster_id: u64,
task: CreateTableTask,
region_routes: Vec<RegionRoute>,
region_topics: Vec<KafkaTopic>,
niebayes marked this conversation as resolved.
Show resolved Hide resolved
) -> Self {
Self {
data: CreateTableData {
state: CreateTableState::Prepare,
cluster_id,
task,
region_routes,
region_topics,
},
}
}
Expand All @@ -311,8 +351,9 @@ pub enum CreateTableState {
pub struct CreateTableData {
pub state: CreateTableState,
pub task: CreateTableTask,
pub region_routes: Vec<RegionRoute>,
pub cluster_id: u64,
pub region_routes: Vec<RegionRoute>,
pub region_topics: Vec<KafkaTopic>,
}

impl CreateTableData {
Expand Down
23 changes: 18 additions & 5 deletions src/common/meta/src/ddl_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use crate::ddl::create_table::CreateTableProcedure;
use crate::ddl::drop_table::DropTableProcedure;
use crate::ddl::truncate_table::TruncateTableProcedure;
use crate::ddl::{
DdlContext, DdlTaskExecutor, ExecutorContext, TableMetadataAllocatorContext,
DdlContext, DdlTaskExecutor, ExecutorContext, TableMetadata, TableMetadataAllocatorContext,
TableMetadataAllocatorRef,
};
use crate::error::{
Expand All @@ -43,6 +43,7 @@ use crate::rpc::ddl::{
TruncateTableTask,
};
use crate::rpc::router::RegionRoute;
use crate::wal::kafka::KafkaTopic;

pub type DdlManagerRef = Arc<DdlManager>;

Expand Down Expand Up @@ -164,11 +165,17 @@ impl DdlManager {
cluster_id: u64,
create_table_task: CreateTableTask,
region_routes: Vec<RegionRoute>,
region_topics: Vec<KafkaTopic>,
) -> Result<ProcedureId> {
let context = self.create_context();

let procedure =
CreateTableProcedure::new(cluster_id, create_table_task, region_routes, context);
let procedure = CreateTableProcedure::new(
cluster_id,
create_table_task,
region_routes,
region_topics,
context,
);

let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));

Expand Down Expand Up @@ -360,7 +367,7 @@ async fn handle_create_table_task(
cluster_id: u64,
mut create_table_task: CreateTableTask,
) -> Result<SubmitDdlTaskResponse> {
let (table_id, region_routes) = ddl_manager
let table_metadata = ddl_manager
.table_meta_allocator
.create(
&TableMetadataAllocatorContext { cluster_id },
Expand All @@ -369,8 +376,14 @@ async fn handle_create_table_task(
)
.await?;

let TableMetadata {
table_id,
region_routes,
region_topics,
} = table_metadata;

let id = ddl_manager
.submit_create_table_task(cluster_id, create_table_task, region_routes)
.submit_create_table_task(cluster_id, create_table_task, region_routes, region_topics)
.await?;

info!("Table: {table_id:?} is created via procedure_id {id:?}");
Expand Down
Loading
Loading