Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

test(remote_wal): add sqlness with kafka wal #3027

Merged
merged 12 commits into from
Dec 29, 2023
31 changes: 31 additions & 0 deletions .github/workflows/develop.yml
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,37 @@ jobs:
path: ${{ runner.temp }}/greptime-*.log
retention-days: 3

sqlness-kafka-wal:
name: Sqlness Test with Kafka Wal
if: github.event.pull_request.draft == false
runs-on: ${{ matrix.os }}
strategy:
matrix:
os: [ ubuntu-20.04-8-cores ]
timeout-minutes: 60
steps:
- uses: actions/checkout@v3
- uses: arduino/setup-protoc@v1
with:
repo-token: ${{ secrets.GITHUB_TOKEN }}
- uses: dtolnay/rust-toolchain@master
with:
toolchain: ${{ env.RUST_TOOLCHAIN }}
- name: Rust Cache
uses: Swatinem/rust-cache@v2
- name: Setup kafka server
working-directory: tests-integration/fixtures/kafka
run: docker compose -f docker-compose-standalone.yml up -d --wait
- name: Run sqlness
run: cargo sqlness -w kafka -k 127.0.0.1:9092
- name: Upload sqlness logs
if: always()
uses: actions/upload-artifact@v3
with:
name: sqlness-logs
path: ${{ runner.temp }}/greptime-*.log
retention-days: 3

fmt:
name: Rustfmt
if: github.event.pull_request.draft == false
Expand Down
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions src/common/meta/src/wal/kafka.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ pub use crate::wal::kafka::topic_manager::TopicManager;

/// Configurations for kafka wal.
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
#[serde(default)]
pub struct KafkaConfig {
/// The broker endpoints of the Kafka cluster.
pub broker_endpoints: Vec<String>,
Expand Down
2 changes: 0 additions & 2 deletions src/log-store/src/kafka/log_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -197,8 +197,6 @@ impl LogStore for KafkaLogStore {
&& entry.ns.region_id == region_id
{
yield Ok(entries);
} else {
yield Ok(vec![]);
}

// Terminates the stream if the entry with the end offset was read.
Expand Down
3 changes: 3 additions & 0 deletions src/meta-srv/src/metasrv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -330,6 +330,9 @@ impl MetaSrv {
info!("MetaSrv stopped");
});
} else {
if let Err(e) = self.wal_options_allocator.start().await {
error!(e; "Failed to start wal options allocator");
}
// Always load kv into cached kv store.
self.leader_cached_kv_backend
.load()
Expand Down
19 changes: 19 additions & 0 deletions tests-integration/fixtures/kafka/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
## Starts a standalone kafka
```bash
docker compose -f docker-compose-standalone.yml up kafka -d
```

## Lists running services
```bash
docker compose -f docker-compose-standalone.yml ps
```

## Stops the standalone kafka
```bash
docker compose -f docker-compose-standalone.yml stop kafka
```

## Stops and removes the standalone kafka
```bash
docker compose -f docker-compose-standalone.yml down kafka
```
112 changes: 16 additions & 96 deletions tests/cases/standalone/common/types/string/scan_big_varchar.result
Original file line number Diff line number Diff line change
Expand Up @@ -126,102 +126,22 @@ SELECT COUNT(*), COUNT(a), MAX(LENGTH(a)), SUM(LENGTH(a)) FROM bigtable;
| 128 | 128 | 10000 | 1280000 |
+----------+-------------------+-----------------------------------+-----------------------------------+

INSERT INTO bigtable SELECT a, to_unixtime(ts) * 53 FROM bigtable;

Affected Rows: 128

SELECT COUNT(*), COUNT(a), MAX(LENGTH(a)), SUM(LENGTH(a)) FROM bigtable;

+----------+-------------------+-----------------------------------+-----------------------------------+
| COUNT(*) | COUNT(bigtable.a) | MAX(character_length(bigtable.a)) | SUM(character_length(bigtable.a)) |
+----------+-------------------+-----------------------------------+-----------------------------------+
| 256 | 256 | 10000 | 2560000 |
+----------+-------------------+-----------------------------------+-----------------------------------+

INSERT INTO bigtable SELECT a, to_unixtime(ts) * 57 FROM bigtable;

Affected Rows: 256

SELECT COUNT(*), COUNT(a), MAX(LENGTH(a)), SUM(LENGTH(a)) FROM bigtable;

+----------+-------------------+-----------------------------------+-----------------------------------+
| COUNT(*) | COUNT(bigtable.a) | MAX(character_length(bigtable.a)) | SUM(character_length(bigtable.a)) |
+----------+-------------------+-----------------------------------+-----------------------------------+
| 512 | 512 | 10000 | 5120000 |
+----------+-------------------+-----------------------------------+-----------------------------------+

INSERT INTO bigtable SELECT a, to_unixtime(ts) * 61 FROM bigtable;

Affected Rows: 512

SELECT COUNT(*), COUNT(a), MAX(LENGTH(a)), SUM(LENGTH(a)) FROM bigtable;

+----------+-------------------+-----------------------------------+-----------------------------------+
| COUNT(*) | COUNT(bigtable.a) | MAX(character_length(bigtable.a)) | SUM(character_length(bigtable.a)) |
+----------+-------------------+-----------------------------------+-----------------------------------+
| 1024 | 1024 | 10000 | 10240000 |
+----------+-------------------+-----------------------------------+-----------------------------------+

INSERT INTO bigtable SELECT a, to_unixtime(ts) * 63 FROM bigtable;

Affected Rows: 1024

SELECT COUNT(*), COUNT(a), MAX(LENGTH(a)), SUM(LENGTH(a)) FROM bigtable;

+----------+-------------------+-----------------------------------+-----------------------------------+
| COUNT(*) | COUNT(bigtable.a) | MAX(character_length(bigtable.a)) | SUM(character_length(bigtable.a)) |
+----------+-------------------+-----------------------------------+-----------------------------------+
| 2048 | 2048 | 10000 | 20480000 |
+----------+-------------------+-----------------------------------+-----------------------------------+

INSERT INTO bigtable SELECT a, to_unixtime(ts) * 67 FROM bigtable;

Affected Rows: 2048

SELECT COUNT(*), COUNT(a), MAX(LENGTH(a)), SUM(LENGTH(a)) FROM bigtable;

+----------+-------------------+-----------------------------------+-----------------------------------+
| COUNT(*) | COUNT(bigtable.a) | MAX(character_length(bigtable.a)) | SUM(character_length(bigtable.a)) |
+----------+-------------------+-----------------------------------+-----------------------------------+
| 4096 | 4096 | 10000 | 40960000 |
+----------+-------------------+-----------------------------------+-----------------------------------+

INSERT INTO bigtable SELECT a, to_unixtime(ts) * 71 FROM bigtable;

Affected Rows: 4096

SELECT COUNT(*), COUNT(a), MAX(LENGTH(a)), SUM(LENGTH(a)) FROM bigtable;

+----------+-------------------+-----------------------------------+-----------------------------------+
| COUNT(*) | COUNT(bigtable.a) | MAX(character_length(bigtable.a)) | SUM(character_length(bigtable.a)) |
+----------+-------------------+-----------------------------------+-----------------------------------+
| 8192 | 8192 | 10000 | 81920000 |
+----------+-------------------+-----------------------------------+-----------------------------------+

INSERT INTO bigtable SELECT a, to_unixtime(ts) * 73 FROM bigtable;

Affected Rows: 8192

SELECT COUNT(*), COUNT(a), MAX(LENGTH(a)), SUM(LENGTH(a)) FROM bigtable;

+----------+-------------------+-----------------------------------+-----------------------------------+
| COUNT(*) | COUNT(bigtable.a) | MAX(character_length(bigtable.a)) | SUM(character_length(bigtable.a)) |
+----------+-------------------+-----------------------------------+-----------------------------------+
| 16384 | 16384 | 10000 | 163840000 |
+----------+-------------------+-----------------------------------+-----------------------------------+

INSERT INTO bigtable SELECT a, to_unixtime(ts) * 79 FROM bigtable;

Affected Rows: 16384

SELECT COUNT(*), COUNT(a), MAX(LENGTH(a)), SUM(LENGTH(a)) FROM bigtable;

+----------+-------------------+-----------------------------------+-----------------------------------+
| COUNT(*) | COUNT(bigtable.a) | MAX(character_length(bigtable.a)) | SUM(character_length(bigtable.a)) |
+----------+-------------------+-----------------------------------+-----------------------------------+
| 32768 | 32768 | 10000 | 327680000 |
+----------+-------------------+-----------------------------------+-----------------------------------+

-- INSERT INTO bigtable SELECT a, to_unixtime(ts) * 53 FROM bigtable;
-- SELECT COUNT(*), COUNT(a), MAX(LENGTH(a)), SUM(LENGTH(a)) FROM bigtable;
-- INSERT INTO bigtable SELECT a, to_unixtime(ts) * 57 FROM bigtable;
-- SELECT COUNT(*), COUNT(a), MAX(LENGTH(a)), SUM(LENGTH(a)) FROM bigtable;
-- INSERT INTO bigtable SELECT a, to_unixtime(ts) * 61 FROM bigtable;
-- SELECT COUNT(*), COUNT(a), MAX(LENGTH(a)), SUM(LENGTH(a)) FROM bigtable;
-- INSERT INTO bigtable SELECT a, to_unixtime(ts) * 63 FROM bigtable;
-- SELECT COUNT(*), COUNT(a), MAX(LENGTH(a)), SUM(LENGTH(a)) FROM bigtable;
-- INSERT INTO bigtable SELECT a, to_unixtime(ts) * 67 FROM bigtable;
-- SELECT COUNT(*), COUNT(a), MAX(LENGTH(a)), SUM(LENGTH(a)) FROM bigtable;
-- INSERT INTO bigtable SELECT a, to_unixtime(ts) * 71 FROM bigtable;
-- SELECT COUNT(*), COUNT(a), MAX(LENGTH(a)), SUM(LENGTH(a)) FROM bigtable;
-- INSERT INTO bigtable SELECT a, to_unixtime(ts) * 73 FROM bigtable;
-- SELECT COUNT(*), COUNT(a), MAX(LENGTH(a)), SUM(LENGTH(a)) FROM bigtable;
-- INSERT INTO bigtable SELECT a, to_unixtime(ts) * 79 FROM bigtable;
-- SELECT COUNT(*), COUNT(a), MAX(LENGTH(a)), SUM(LENGTH(a)) FROM bigtable;
DROP TABLE test;

Affected Rows: 0
Expand Down
32 changes: 16 additions & 16 deletions tests/cases/standalone/common/types/string/scan_big_varchar.sql
Original file line number Diff line number Diff line change
Expand Up @@ -51,38 +51,38 @@ INSERT INTO bigtable SELECT a, to_unixtime(ts) * 51 FROM bigtable;

SELECT COUNT(*), COUNT(a), MAX(LENGTH(a)), SUM(LENGTH(a)) FROM bigtable;

INSERT INTO bigtable SELECT a, to_unixtime(ts) * 53 FROM bigtable;
-- INSERT INTO bigtable SELECT a, to_unixtime(ts) * 53 FROM bigtable;

SELECT COUNT(*), COUNT(a), MAX(LENGTH(a)), SUM(LENGTH(a)) FROM bigtable;
-- SELECT COUNT(*), COUNT(a), MAX(LENGTH(a)), SUM(LENGTH(a)) FROM bigtable;

INSERT INTO bigtable SELECT a, to_unixtime(ts) * 57 FROM bigtable;
-- INSERT INTO bigtable SELECT a, to_unixtime(ts) * 57 FROM bigtable;

SELECT COUNT(*), COUNT(a), MAX(LENGTH(a)), SUM(LENGTH(a)) FROM bigtable;
-- SELECT COUNT(*), COUNT(a), MAX(LENGTH(a)), SUM(LENGTH(a)) FROM bigtable;

INSERT INTO bigtable SELECT a, to_unixtime(ts) * 61 FROM bigtable;
-- INSERT INTO bigtable SELECT a, to_unixtime(ts) * 61 FROM bigtable;

SELECT COUNT(*), COUNT(a), MAX(LENGTH(a)), SUM(LENGTH(a)) FROM bigtable;
-- SELECT COUNT(*), COUNT(a), MAX(LENGTH(a)), SUM(LENGTH(a)) FROM bigtable;

INSERT INTO bigtable SELECT a, to_unixtime(ts) * 63 FROM bigtable;
-- INSERT INTO bigtable SELECT a, to_unixtime(ts) * 63 FROM bigtable;

SELECT COUNT(*), COUNT(a), MAX(LENGTH(a)), SUM(LENGTH(a)) FROM bigtable;
-- SELECT COUNT(*), COUNT(a), MAX(LENGTH(a)), SUM(LENGTH(a)) FROM bigtable;


INSERT INTO bigtable SELECT a, to_unixtime(ts) * 67 FROM bigtable;
-- INSERT INTO bigtable SELECT a, to_unixtime(ts) * 67 FROM bigtable;

SELECT COUNT(*), COUNT(a), MAX(LENGTH(a)), SUM(LENGTH(a)) FROM bigtable;
-- SELECT COUNT(*), COUNT(a), MAX(LENGTH(a)), SUM(LENGTH(a)) FROM bigtable;

INSERT INTO bigtable SELECT a, to_unixtime(ts) * 71 FROM bigtable;
-- INSERT INTO bigtable SELECT a, to_unixtime(ts) * 71 FROM bigtable;

SELECT COUNT(*), COUNT(a), MAX(LENGTH(a)), SUM(LENGTH(a)) FROM bigtable;
-- SELECT COUNT(*), COUNT(a), MAX(LENGTH(a)), SUM(LENGTH(a)) FROM bigtable;

INSERT INTO bigtable SELECT a, to_unixtime(ts) * 73 FROM bigtable;
-- INSERT INTO bigtable SELECT a, to_unixtime(ts) * 73 FROM bigtable;

SELECT COUNT(*), COUNT(a), MAX(LENGTH(a)), SUM(LENGTH(a)) FROM bigtable;
-- SELECT COUNT(*), COUNT(a), MAX(LENGTH(a)), SUM(LENGTH(a)) FROM bigtable;

INSERT INTO bigtable SELECT a, to_unixtime(ts) * 79 FROM bigtable;
-- INSERT INTO bigtable SELECT a, to_unixtime(ts) * 79 FROM bigtable;

SELECT COUNT(*), COUNT(a), MAX(LENGTH(a)), SUM(LENGTH(a)) FROM bigtable;
-- SELECT COUNT(*), COUNT(a), MAX(LENGTH(a)), SUM(LENGTH(a)) FROM bigtable;


DROP TABLE test;
Expand Down
5 changes: 5 additions & 0 deletions tests/conf/datanode-test.toml.template
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,17 @@ rpc_hostname = '127.0.0.1'
rpc_runtime_size = 8

[wal]
{{ if is_raft_engine }}
provider = "raft_engine"
file_size = '1GB'
purge_interval = '10m'
purge_threshold = '10GB'
read_batch_size = 128
sync_write = false
{{ else }}
provider = "kafka"
broker_endpoints = {kafka_wal_broker_endpoints | unescaped}
{{ endif }}

[storage]
type = 'File'
Expand Down
10 changes: 10 additions & 0 deletions tests/conf/metasrv-test.toml.template
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
[wal]
{{ if is_raft_engine }}
provider = "raft_engine"
{{ else }}
provider = "kafka"
broker_endpoints = {kafka_wal_broker_endpoints | unescaped}
num_topics = 64
selector_type = "round_robin"
topic_name_prefix = "distributed_test_greptimedb_wal_topic"
{{ endif }}
5 changes: 5 additions & 0 deletions tests/conf/standalone-test.toml.template
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,17 @@ enable_memory_catalog = false
require_lease_before_startup = true

[wal]
{{ if is_raft_engine }}
provider = "raft_engine"
file_size = '1GB'
purge_interval = '10m'
purge_threshold = '10GB'
read_batch_size = 128
sync_write = false
{{ else }}
provider = "kafka"
broker_endpoints = {kafka_wal_broker_endpoints | unescaped}
{{ endif }}

[storage]
type = 'File'
Expand Down
1 change: 1 addition & 0 deletions tests/runner/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ common-query.workspace = true
common-recordbatch.workspace = true
common-time.workspace = true
serde.workspace = true
serde_json.workspace = true
sqlness = { version = "0.5" }
tinytemplate = "1.2"
tokio.workspace = true
Loading
Loading