Skip to content

Commit

Permalink
feat(sqlness): add kafka wal config
Browse files Browse the repository at this point in the history
  • Loading branch information
WenyXu committed Dec 28, 2023
1 parent 485a91f commit 62b489b
Show file tree
Hide file tree
Showing 7 changed files with 94 additions and 8 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

19 changes: 19 additions & 0 deletions tests-integration/fixtures/kafka/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
## Starts a standalone kafka
```bash
docker compose -f docker-compose-standalone.yml up kafka -d
```

## Lists running containers
```bash
docker compose -f docker-compose-standalone.yml ps
```

## Stops the standalone kafka
```bash
docker compose -f docker-compose-standalone.yml stop kafka
```

## Stops and removes a standalone kafka
```bash
docker compose -f docker-compose-standalone.yml down kafka
```
5 changes: 5 additions & 0 deletions tests/conf/datanode-test.toml.template
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,17 @@ rpc_hostname = '127.0.0.1'
rpc_runtime_size = 8

[wal]
{{ if is_raft_engine }}
provider = "raft_engine"
file_size = '1GB'
purge_interval = '10m'
purge_threshold = '10GB'
read_batch_size = 128
sync_write = false
{{ else }}
provider = "kafka"
broker_endpoints = {kafka_wal_broker_endpoints}
{{ endif }}

[storage]
type = 'File'
Expand Down
5 changes: 5 additions & 0 deletions tests/conf/standalone-test.toml.template
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,17 @@ enable_memory_catalog = false
require_lease_before_startup = true

[wal]
{{ if is_raft_engine }}
provider = "raft_engine"
file_size = '1GB'
purge_interval = '10m'
purge_threshold = '10GB'
read_batch_size = 128
sync_write = false
{{ else }}
provider = "kafka"
broker_endpoints = {kafka_wal_broker_endpoints | unescaped}
{{ endif }}

[storage]
type = 'File'
Expand Down
1 change: 1 addition & 0 deletions tests/runner/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ common-query.workspace = true
common-recordbatch.workspace = true
common-time.workspace = true
serde.workspace = true
serde_json.workspace = true
sqlness = { version = "0.5" }
tinytemplate = "1.2"
tokio.workspace = true
36 changes: 32 additions & 4 deletions tests/runner/src/env.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,17 @@ const METASRV_ADDR: &str = "127.0.0.1:3002";
const SERVER_ADDR: &str = "127.0.0.1:4001";
const DEFAULT_LOG_LEVEL: &str = "--log-level=debug,hyper=warn,tower=warn,datafusion=warn,reqwest=warn,sqlparser=warn,h2=info,opendal=info";

#[derive(Clone)]
pub enum WalConfig {
RaftEngine,
Kafka { broker_endpoints: Vec<String> },
}

#[derive(Clone)]
pub struct Env {
data_home: PathBuf,
server_addr: Option<String>,
wal: WalConfig,
}

#[allow(clippy::print_stdout)]
Expand All @@ -68,10 +75,11 @@ impl EnvController for Env {

#[allow(clippy::print_stdout)]
impl Env {
pub fn new(data_home: PathBuf, server_addr: Option<String>) -> Self {
pub fn new(data_home: PathBuf, server_addr: Option<String>, wal: WalConfig) -> Self {
Self {
data_home,
server_addr,
wal,
}
}

Expand All @@ -81,7 +89,7 @@ impl Env {
} else {
Self::build_db().await;

let db_ctx = GreptimeDBContext::new();
let db_ctx = GreptimeDBContext::new(self.wal.clone());

let server_process = self.start_server("standalone", &db_ctx, true).await;

Expand All @@ -106,7 +114,7 @@ impl Env {
} else {
Self::build_db().await;

let db_ctx = GreptimeDBContext::new();
let db_ctx = GreptimeDBContext::new(self.wal.clone());

// start a distributed GreptimeDB
let meta_server = self.start_server("metasrv", &db_ctx, true).await;
Expand Down Expand Up @@ -145,6 +153,7 @@ impl Env {
ctx: GreptimeDBContext {
time: 0,
datanode_id: Default::default(),
wal: self.wal.clone(),
},
is_standalone: false,
env: self.clone(),
Expand Down Expand Up @@ -321,6 +330,8 @@ impl Env {
wal_dir: String,
data_home: String,
procedure_dir: String,
is_raft_engine: bool,
kafka_wal_broker_endpoints: String,
}

let data_home = self
Expand All @@ -334,6 +345,8 @@ impl Env {
wal_dir,
data_home: data_home.display().to_string(),
procedure_dir,
is_raft_engine: db_ctx.is_raft_engine(),
kafka_wal_broker_endpoints: db_ctx.kafka_wal_broker_endpoints(),
};
let rendered = tt.render(subcommand, &ctx).unwrap();

Expand Down Expand Up @@ -447,13 +460,28 @@ struct GreptimeDBContext {
/// Start time in millisecond
time: i64,
datanode_id: AtomicU32,
wal: WalConfig,
}

impl GreptimeDBContext {
pub fn new() -> Self {
pub fn new(wal: WalConfig) -> Self {
Self {
time: common_time::util::current_time_millis(),
datanode_id: AtomicU32::new(0),
wal,
}
}

fn is_raft_engine(&self) -> bool {
matches!(self.wal, WalConfig::RaftEngine)
}

fn kafka_wal_broker_endpoints(&self) -> String {
match &self.wal {
WalConfig::RaftEngine => String::new(),
WalConfig::Kafka { broker_endpoints } => {
serde_json::to_string(&broker_endpoints).unwrap()
}
}
}

Expand Down
35 changes: 31 additions & 4 deletions tests/runner/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,20 @@

use std::path::PathBuf;

use clap::Parser;
use env::Env;
use clap::{Parser, ValueEnum};
use env::{Env, WalConfig};
use sqlness::{ConfigBuilder, Runner};

mod env;
mod util;

#[derive(ValueEnum, Debug, Clone)]
#[clap(rename_all = "snake_case")]
enum Wal {
RaftEngine,
Kafka,
}

#[derive(Parser, Debug)]
#[clap(author, version, about, long_about = None)]
/// SQL Harness for GrepTimeDB
Expand All @@ -41,9 +48,17 @@ struct Args {
#[clap(short, long, default_value = ".*")]
test_filter: String,

/// Address of the server
/// Address of the server.
#[clap(short, long)]
server_addr: Option<String>,

/// The type of Wal.
#[clap(short, long, default_value = "raft_engine")]
wal: Wal,

/// The kafka wal broker endpoints.
#[clap(short, long, default_value = "127.0.0.1:9092")]
kafka_wal_broker_endpoints: String,
}

#[tokio::main]
Expand All @@ -63,6 +78,18 @@ async fn main() {
.env_config_file(args.env_config_file)
.build()
.unwrap();
let runner = Runner::new(config, Env::new(data_home, args.server_addr));

let wal = match args.wal {
Wal::RaftEngine => WalConfig::RaftEngine,
Wal::Kafka => WalConfig::Kafka {
broker_endpoints: args
.kafka_wal_broker_endpoints
.split(',')
.map(|s| s.to_string())
.collect(),
},
};

let runner = Runner::new(config, Env::new(data_home, args.server_addr, wal));
runner.run().await.unwrap();
}

0 comments on commit 62b489b

Please sign in to comment.