Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(remote_wal): introduce kafka remote wal #3001

Merged
merged 20 commits into from
Dec 26, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 36 additions & 2 deletions config/standalone.example.toml
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,42 @@ enable = true
# Whether to enable Prometheus remote write and read in HTTP API, true by default.
enable = true

# WAL options.
[wal]
[wal_meta]
# Available wal providers:
# - "raft_engine" (default)
# - "kafka"
provider = "raft_engine"

# There're none raft-engine wal config since meta srv only involves in remote wal currently.

# Kafka wal config.
# The broker endpoints of the Kafka cluster. ["127.0.0.1:9090"] by default.
# broker_endpoints = ["127.0.0.1:9090"]
# Number of topics to be created upon start.
# num_topics = 64
# Topic selector type.
# Available selector types:
# - "round_robin" (default)
# selector_type = "round_robin"
# A Kafka topic is constructed by concatenating `topic_name_prefix` and `topic_id`.
# topic_name_prefix = "greptimedb_wal_topic"
# Number of partitions per topic.
# num_partitions = 1
# Expected number of replicas of each partition.
# replication_factor = 3
# Above which a topic creation operation will be cancelled.
# create_topic_timeout = "30s"
# The initial backoff for kafka clients.
# backoff_init = "500ms"
# The maximum backoff for kafka clients.
# backoff_max = "10s"
# Exponential backoff rate, i.e. next backoff = base * current backoff.
# backoff_base = 2.0
# Stop reconnecting if the total wait time reaches the deadline. If this config is missing, the reconnecting won't terminate.
# backoff_deadline = "5mins"

# WAL options for datanode.
[wal_datanode]
# Available wal providers:
# - "RaftEngine" (default)
# - "Kafka"
Expand Down
2 changes: 2 additions & 0 deletions src/cmd/src/options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

use clap::ArgMatches;
use common_config::KvBackendConfig;
use common_meta::wal::WalConfig as MetaSrvWalConfig;
use common_telemetry::logging::{LoggingOptions, TracingOptions};
use config::{Config, Environment, File, FileFormat};
use datanode::config::{DatanodeOptions, ProcedureConfig};
Expand All @@ -37,6 +38,7 @@ pub struct MixOptions {
pub frontend: FrontendOptions,
pub datanode: DatanodeOptions,
pub logging: LoggingOptions,
pub wal_meta: MetaSrvWalConfig,
}

impl From<MixOptions> for FrontendOptions {
Expand Down
26 changes: 16 additions & 10 deletions src/cmd/src/standalone.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use std::{fs, path};
use async_trait::async_trait;
use clap::Parser;
use common_catalog::consts::MIN_USER_TABLE_ID;
use common_config::{metadata_store_dir, KvBackendConfig, WalConfig};
use common_config::{metadata_store_dir, KvBackendConfig, WalConfig as DatanodeWalConfig};
use common_meta::cache_invalidator::DummyCacheInvalidator;
use common_meta::datanode_manager::DatanodeManagerRef;
use common_meta::ddl::{DdlTaskExecutorRef, TableMetadataAllocatorRef};
Expand All @@ -27,7 +27,9 @@ use common_meta::key::{TableMetadataManager, TableMetadataManagerRef};
use common_meta::kv_backend::KvBackendRef;
use common_meta::region_keeper::MemoryRegionKeeper;
use common_meta::sequence::SequenceBuilder;
use common_meta::wal::{WalOptionsAllocator, WalOptionsAllocatorRef};
use common_meta::wal::{
WalConfig as MetaSrvWalConfig, WalOptionsAllocator, WalOptionsAllocatorRef,
};
use common_procedure::ProcedureManagerRef;
use common_telemetry::info;
use common_telemetry::logging::LoggingOptions;
Expand Down Expand Up @@ -104,7 +106,8 @@ pub struct StandaloneOptions {
pub opentsdb: OpentsdbOptions,
pub influxdb: InfluxdbOptions,
pub prom_store: PromStoreOptions,
pub wal: WalConfig,
pub wal_meta: MetaSrvWalConfig,
pub wal_datanode: DatanodeWalConfig,
pub storage: StorageConfig,
pub metadata_store: KvBackendConfig,
pub procedure: ProcedureConfig,
Expand All @@ -127,7 +130,8 @@ impl Default for StandaloneOptions {
opentsdb: OpentsdbOptions::default(),
influxdb: InfluxdbOptions::default(),
prom_store: PromStoreOptions::default(),
wal: WalConfig::default(),
wal_meta: MetaSrvWalConfig::default(),
wal_datanode: DatanodeWalConfig::default(),
storage: StorageConfig::default(),
metadata_store: KvBackendConfig::default(),
procedure: ProcedureConfig::default(),
Expand Down Expand Up @@ -166,7 +170,7 @@ impl StandaloneOptions {
DatanodeOptions {
node_id: Some(0),
enable_telemetry: self.enable_telemetry,
wal: self.wal,
wal: self.wal_datanode,
storage: self.storage,
region_engine: self.region_engine,
rpc_addr: self.grpc.addr,
Expand Down Expand Up @@ -338,7 +342,8 @@ impl StartCommand {
let procedure = opts.procedure.clone();
let frontend = opts.clone().frontend_options();
let logging = opts.logging.clone();
let datanode = opts.datanode_options();
let wal_meta = opts.wal_meta.clone();
let datanode = opts.datanode_options().clone();

Ok(Options::Standalone(Box::new(MixOptions {
procedure,
Expand All @@ -347,6 +352,7 @@ impl StartCommand {
frontend,
datanode,
logging,
wal_meta,
})))
}

Expand Down Expand Up @@ -392,9 +398,8 @@ impl StartCommand {
.step(10)
.build(),
);
// TODO(niebayes): add a wal config into the MixOptions and pass it to the allocator builder.
let wal_options_allocator = Arc::new(WalOptionsAllocator::new(
common_meta::wal::WalConfig::default(),
opts.wal_meta.clone(),
kv_backend.clone(),
));
let table_meta_allocator = Arc::new(StandaloneTableMetadataAllocator::new(
Expand Down Expand Up @@ -585,7 +590,7 @@ mod tests {
assert_eq!(None, fe_opts.mysql.reject_no_database);
assert!(fe_opts.influxdb.enable);

let WalConfig::RaftEngine(raft_engine_config) = dn_opts.wal else {
let DatanodeWalConfig::RaftEngine(raft_engine_config) = dn_opts.wal else {
unreachable!()
};
assert_eq!("/tmp/greptimedb/test/wal", raft_engine_config.dir.unwrap());
Expand Down Expand Up @@ -731,7 +736,8 @@ mod tests {
assert_eq!(options.opentsdb, default_options.opentsdb);
assert_eq!(options.influxdb, default_options.influxdb);
assert_eq!(options.prom_store, default_options.prom_store);
assert_eq!(options.wal, default_options.wal);
assert_eq!(options.wal_meta, default_options.wal_meta);
assert_eq!(options.wal_datanode, default_options.wal_datanode);
assert_eq!(options.metadata_store, default_options.metadata_store);
assert_eq!(options.procedure, default_options.procedure);
assert_eq!(options.logging, default_options.logging);
Expand Down
2 changes: 0 additions & 2 deletions src/common/meta/src/wal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,4 @@ mod tests {
};
assert_eq!(wal_config, WalConfig::Kafka(expected_kafka_config));
}

// TODO(niebayes): the integrate test needs to test that the example config file can be successfully parsed.
}
2 changes: 1 addition & 1 deletion src/common/meta/src/wal/kafka/topic_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ impl TopicManager {
)
})
.collect::<Vec<_>>();
// TODO(niebayes): Determine how rskafka handles an already-exist topic. Check if an error would be raised.
// FIXME(niebayes): try to create an already-exist topic would raise an error.
futures::future::try_join_all(tasks)
.await
.context(CreateKafkaWalTopicSnafu)
Expand Down
7 changes: 4 additions & 3 deletions tests-integration/src/standalone.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use common_meta::ddl_manager::DdlManager;
use common_meta::key::TableMetadataManager;
use common_meta::region_keeper::MemoryRegionKeeper;
use common_meta::sequence::SequenceBuilder;
use common_meta::wal::WalOptionsAllocator;
use common_meta::wal::{WalConfig as MetaSrvWalConfig, WalOptionsAllocator};
use common_procedure::options::ProcedureConfig;
use common_telemetry::logging::LoggingOptions;
use datanode::config::DatanodeOptions;
Expand Down Expand Up @@ -118,9 +118,9 @@ impl GreptimeDbStandaloneBuilder {
.step(10)
.build(),
);
// TODO(niebayes): add a wal config into the MixOptions and pass it to the allocator builder.
let wal_meta = MetaSrvWalConfig::default();
let wal_options_allocator = Arc::new(WalOptionsAllocator::new(
common_meta::wal::WalConfig::default(),
wal_meta.clone(),
kv_backend.clone(),
));
let table_meta_allocator = Arc::new(StandaloneTableMetadataAllocator::new(
Expand Down Expand Up @@ -163,6 +163,7 @@ impl GreptimeDbStandaloneBuilder {
frontend: FrontendOptions::default(),
datanode: opts,
logging: LoggingOptions::default(),
wal_meta,
},
guard,
}
Expand Down
Loading