Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

test(remote_wal): add sqlness with kafka wal #3027

Merged
merged 12 commits into from
Dec 29, 2023
31 changes: 31 additions & 0 deletions .github/workflows/develop.yml
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,37 @@ jobs:
path: ${{ runner.temp }}/greptime-*.log
retention-days: 3

sqlness-kafka-wal:
name: Sqlness Test with Kafka Wal
if: github.event.pull_request.draft == false
runs-on: ${{ matrix.os }}
strategy:
matrix:
os: [ ubuntu-20.04-8-cores ]
timeout-minutes: 60
steps:
- uses: actions/checkout@v3
- uses: arduino/setup-protoc@v1
with:
repo-token: ${{ secrets.GITHUB_TOKEN }}
- uses: dtolnay/rust-toolchain@master
with:
toolchain: ${{ env.RUST_TOOLCHAIN }}
- name: Rust Cache
uses: Swatinem/rust-cache@v2
- name: Setup kafka server
working-directory: tests-integration/fixtures/kafka
run: docker compose -f docker-compose-standalone.yml up -d --wait
- name: Run sqlness
run: cargo sqlness -w kafka -k 127.0.0.1:9092
- name: Upload sqlness logs
if: always()
uses: actions/upload-artifact@v3
with:
name: sqlness-logs
path: ${{ runner.temp }}/greptime-*.log
retention-days: 3

fmt:
name: Rustfmt
if: github.event.pull_request.draft == false
Expand Down
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

19 changes: 19 additions & 0 deletions tests-integration/fixtures/kafka/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
## Starts a standalone kafka
```bash
docker compose -f docker-compose-standalone.yml up kafka -d
```

## Lists running services
```bash
docker compose -f docker-compose-standalone.yml ps
```

## Stops the standalone kafka
```bash
docker compose -f docker-compose-standalone.yml stop kafka
```

## Stops and removes the standalone kafka
```bash
docker compose -f docker-compose-standalone.yml down kafka
```
5 changes: 5 additions & 0 deletions tests/conf/datanode-test.toml.template
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,17 @@ rpc_hostname = '127.0.0.1'
rpc_runtime_size = 8

[wal]
{{ if is_raft_engine }}
provider = "raft_engine"
file_size = '1GB'
purge_interval = '10m'
purge_threshold = '10GB'
read_batch_size = 128
sync_write = false
{{ else }}
provider = "kafka"
broker_endpoints = {kafka_wal_broker_endpoints | unescaped}
{{ endif }}

[storage]
type = 'File'
Expand Down
5 changes: 5 additions & 0 deletions tests/conf/standalone-test.toml.template
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,17 @@ enable_memory_catalog = false
require_lease_before_startup = true

[wal]
{{ if is_raft_engine }}
provider = "raft_engine"
file_size = '1GB'
purge_interval = '10m'
purge_threshold = '10GB'
read_batch_size = 128
sync_write = false
{{ else }}
provider = "kafka"
broker_endpoints = {kafka_wal_broker_endpoints | unescaped}
{{ endif }}

[storage]
type = 'File'
Expand Down
1 change: 1 addition & 0 deletions tests/runner/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ common-query.workspace = true
common-recordbatch.workspace = true
common-time.workspace = true
serde.workspace = true
serde_json.workspace = true
sqlness = { version = "0.5" }
tinytemplate = "1.2"
tokio.workspace = true
36 changes: 32 additions & 4 deletions tests/runner/src/env.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,17 @@ const METASRV_ADDR: &str = "127.0.0.1:3002";
const SERVER_ADDR: &str = "127.0.0.1:4001";
const DEFAULT_LOG_LEVEL: &str = "--log-level=debug,hyper=warn,tower=warn,datafusion=warn,reqwest=warn,sqlparser=warn,h2=info,opendal=info";

#[derive(Clone)]
pub enum WalConfig {
RaftEngine,
Kafka { broker_endpoints: Vec<String> },
}

#[derive(Clone)]
pub struct Env {
data_home: PathBuf,
server_addr: Option<String>,
wal: WalConfig,
}

#[allow(clippy::print_stdout)]
Expand All @@ -68,10 +75,11 @@ impl EnvController for Env {

#[allow(clippy::print_stdout)]
impl Env {
pub fn new(data_home: PathBuf, server_addr: Option<String>) -> Self {
pub fn new(data_home: PathBuf, server_addr: Option<String>, wal: WalConfig) -> Self {
Self {
data_home,
server_addr,
wal,
}
}

Expand All @@ -81,7 +89,7 @@ impl Env {
} else {
Self::build_db().await;

let db_ctx = GreptimeDBContext::new();
let db_ctx = GreptimeDBContext::new(self.wal.clone());

let server_process = self.start_server("standalone", &db_ctx, true).await;

Expand All @@ -106,7 +114,7 @@ impl Env {
} else {
Self::build_db().await;

let db_ctx = GreptimeDBContext::new();
let db_ctx = GreptimeDBContext::new(self.wal.clone());

// start a distributed GreptimeDB
let meta_server = self.start_server("metasrv", &db_ctx, true).await;
Expand Down Expand Up @@ -145,6 +153,7 @@ impl Env {
ctx: GreptimeDBContext {
time: 0,
datanode_id: Default::default(),
wal: self.wal.clone(),
},
is_standalone: false,
env: self.clone(),
Expand Down Expand Up @@ -321,6 +330,8 @@ impl Env {
wal_dir: String,
data_home: String,
procedure_dir: String,
is_raft_engine: bool,
kafka_wal_broker_endpoints: String,
}

let data_home = self
Expand All @@ -334,6 +345,8 @@ impl Env {
wal_dir,
data_home: data_home.display().to_string(),
procedure_dir,
is_raft_engine: db_ctx.is_raft_engine(),
kafka_wal_broker_endpoints: db_ctx.kafka_wal_broker_endpoints(),
};
let rendered = tt.render(subcommand, &ctx).unwrap();

Expand Down Expand Up @@ -447,13 +460,28 @@ struct GreptimeDBContext {
/// Start time in millisecond
time: i64,
datanode_id: AtomicU32,
wal: WalConfig,
}

impl GreptimeDBContext {
pub fn new() -> Self {
pub fn new(wal: WalConfig) -> Self {
Self {
time: common_time::util::current_time_millis(),
datanode_id: AtomicU32::new(0),
wal,
}
}

fn is_raft_engine(&self) -> bool {
matches!(self.wal, WalConfig::RaftEngine)
}

fn kafka_wal_broker_endpoints(&self) -> String {
match &self.wal {
WalConfig::RaftEngine => String::new(),
WenyXu marked this conversation as resolved.
Show resolved Hide resolved
WalConfig::Kafka { broker_endpoints } => {
serde_json::to_string(&broker_endpoints).unwrap()
}
}
}

Expand Down
35 changes: 31 additions & 4 deletions tests/runner/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,20 @@

use std::path::PathBuf;

use clap::Parser;
use env::Env;
use clap::{Parser, ValueEnum};
use env::{Env, WalConfig};
use sqlness::{ConfigBuilder, Runner};

mod env;
mod util;

#[derive(ValueEnum, Debug, Clone)]
#[clap(rename_all = "snake_case")]
enum Wal {
RaftEngine,
Kafka,
}

#[derive(Parser, Debug)]
#[clap(author, version, about, long_about = None)]
/// SQL Harness for GrepTimeDB
Expand All @@ -41,9 +48,17 @@ struct Args {
#[clap(short, long, default_value = ".*")]
test_filter: String,

/// Address of the server
/// Address of the server.
#[clap(short, long)]
server_addr: Option<String>,

/// The type of Wal.
#[clap(short, long, default_value = "raft_engine")]
wal: Wal,

/// The kafka wal broker endpoints.
#[clap(short, long, default_value = "127.0.0.1:9092")]
kafka_wal_broker_endpoints: String,
}

#[tokio::main]
Expand All @@ -63,6 +78,18 @@ async fn main() {
.env_config_file(args.env_config_file)
.build()
.unwrap();
let runner = Runner::new(config, Env::new(data_home, args.server_addr));

let wal = match args.wal {
Wal::RaftEngine => WalConfig::RaftEngine,
Wal::Kafka => WalConfig::Kafka {
broker_endpoints: args
.kafka_wal_broker_endpoints
.split(',')
.map(|s| s.trim().to_string())
.collect(),
},
};

let runner = Runner::new(config, Env::new(data_home, args.server_addr, wal));
runner.run().await.unwrap();
}
Loading