Skip to content

Commit

Permalink
Add RisingWave engine
Browse files Browse the repository at this point in the history
  • Loading branch information
sergiimk committed Apr 15, 2024
1 parent 88d19df commit a7a6761
Show file tree
Hide file tree
Showing 18 changed files with 196 additions and 30 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## Unreleased
- New engine based on RisingWave streaming database ([repo](https://github.com/kamu-data/kamu-engine-risingwave)) that provides mature streaming alternative to Flink
- See `examples/leaderboard` and updated `examples/covid` for usage

## [0.175.0] - 2024-04-15
### Added
- The `kamu ingest` command can now accept `--event-time` hint which is useful for snapshot-style data that doesn't have an event time column
Expand Down
32 changes: 16 additions & 16 deletions examples/covid/canada.case-details.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -15,39 +15,39 @@ content:
# Transformation that will be applied to produce new data
transform:
kind: Sql
engine: spark
engine: datafusion
query: |
SELECT
"AB" as province,
select
'AB' as province,
id,
reported_date,
gender,
location
FROM `covid19.alberta.case-details.hm`
UNION ALL
SELECT
"BC" as province,
from "covid19.alberta.case-details.hm"
union all
select
'BC' as province,
id,
reported_date,
gender,
location
FROM `covid19.british-columbia.case-details.hm`
UNION ALL
SELECT
"ON" as province,
from "covid19.british-columbia.case-details.hm"
union all
select
'ON' as province,
id,
reported_date,
gender,
location
FROM `covid19.ontario.case-details.hm`
UNION ALL
SELECT
"QC" as province,
from "covid19.ontario.case-details.hm"
union all
select
'QC' as province,
id,
reported_date,
gender,
location
FROM `covid19.quebec.case-details.hm`
from "covid19.quebec.case-details.hm"
- kind: SetVocab
eventTimeColumn: reported_date
- kind: SetInfo
Expand Down
28 changes: 21 additions & 7 deletions examples/covid/canada.daily-cases.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,28 @@ content:
- datasetRef: covid19.canada.case-details
transform:
kind: Sql
engine: flink
engine: risingwave
query: |
SELECT
TUMBLE_START(`reported_date`, INTERVAL '1' DAY) as `reported_date`,
`province`,
COUNT(*) as `total_daily`
FROM `covid19.canada.case-details`
GROUP BY TUMBLE(`reported_date`, INTERVAL '1' DAY), `province`
select
window_end - interval '1' second as reported_date,
province,
count(1) as total_daily
from tumble(
"covid19.canada.case-details",
reported_date,
interval '1' day
)
group by 1, 2
emit on window close
# # Alternatively we could use Flink
# engine: flink
# query: |
# SELECT
# TUMBLE_START(`reported_date`, INTERVAL '1' DAY) as `reported_date`,
# `province`,
# COUNT(*) as `total_daily`
# FROM `covid19.canada.case-details`
# GROUP BY TUMBLE(`reported_date`, INTERVAL '1' DAY), `province`
- kind: SetVocab
eventTimeColumn: reported_date
- kind: SetInfo
Expand Down
2 changes: 1 addition & 1 deletion examples/covid/ontario.case-details.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ content:
engine: datafusion
query: |
SELECT
"Row_ID" as id,
cast("Row_ID" as bigint) as id,
to_timestamp(coalesce("Case_Reported_Date", "Test_Reported_Date")) as case_reported_date,
case
when "Test_Reported_Date" != '' then to_timestamp("Test_Reported_Date")
Expand Down
6 changes: 2 additions & 4 deletions examples/covid/quebec.case-details.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -31,16 +31,14 @@ content:
- province STRING
- province_abbr STRING
- hr_uid INT
# Using Spark engine to convert a tricky date time string into a timestamp
# See: https://github.com/kamu-data/kamu-cli/issues/438
preprocess:
kind: Sql
engine: spark
engine: datafusion
query: |
select
objectid,
row_id,
to_timestamp(date_reported, "yyyy/MM/dd HH:mm:ssx") as date_reported,
to_timestamp(date_reported, '%Y/%m/%d %H:%M:%S%#z') as date_reported,
health_region,
age_group,
gender,
Expand Down
2 changes: 2 additions & 0 deletions examples/leaderboard/data/1.ndjson
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
{"match_time": "2000-01-01", "match_id": 1, "player_id": "Alice", "score": 100}
{"match_time": "2000-01-01", "match_id": 1, "player_id": "Bob", "score": 80}
2 changes: 2 additions & 0 deletions examples/leaderboard/data/2.ndjson
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
{"match_time": "2000-01-02", "match_id": 2, "player_id": "Alice", "score": 70}
{"match_time": "2000-01-02", "match_id": 2, "player_id": "Charlie", "score": 90}
2 changes: 2 additions & 0 deletions examples/leaderboard/data/3.ndjson
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
{"match_time": "2000-01-03", "match_id": 3, "player_id": "Bob", "score": 60}
{"match_time": "2000-01-03", "match_id": 3, "player_id": "Charlie", "score": 110}
13 changes: 13 additions & 0 deletions examples/leaderboard/init.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
#!/bin/bash
set -euo pipefail

kamu init --exists-ok
kamu add . -r

for file in `ls ./data/ | sort -g`; do
echo "kamu ingest player-scores data/$file"
kamu ingest player-scores data/$file

echo "kamu pull leaderboard"
kamu pull leaderboard
done
38 changes: 38 additions & 0 deletions examples/leaderboard/leaderboard.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
kind: DatasetSnapshot
version: 1
content:
name: leaderboard
kind: Derivative
metadata:
- kind: SetTransform
inputs:
- datasetRef: player-scores
alias: player_scores
transform:
kind: Sql
engine: risingwave
queries:
- alias: leaderboard
# Note we are using explicit `crate materialized view` statement below
# because RW does not currently support Top-N queries directly on sinks.
#
# Note `partition by 1` is currently required by RW engine
# See: https://docs.risingwave.com/docs/current/window-functions/#syntax
query: |
create materialized view leaderboard as
select
*
from (
select
row_number() over (partition by 1 order by score desc) as place,
match_time,
match_id,
player_id,
score
from player_scores
)
where place <= 2
- query: |
select * from leaderboard
- kind: SetVocab
eventTimeColumn: match_time
22 changes: 22 additions & 0 deletions examples/leaderboard/player-scores.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
kind: DatasetSnapshot
version: 1
content:
name: player-scores
kind: Root
metadata:
- kind: AddPushSource
sourceName: default
read:
kind: NdJson
schema:
- "match_time TIMESTAMP"
- "match_id BIGINT"
- "player_id STRING"
- "score BIGINT"
merge:
kind: Ledger
primaryKey:
- match_id
- player_id
- kind: SetVocab
eventTimeColumn: match_time
10 changes: 10 additions & 0 deletions src/app/cli/src/app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -501,6 +501,16 @@ pub fn register_config_in_catalog(
.datafusion
.clone()
.unwrap(),
risingwave_image: config
.engine
.as_ref()
.unwrap()
.images
.as_ref()
.unwrap()
.risingwave
.clone()
.unwrap(),
});

let ipfs_conf = config.protocol.as_ref().unwrap().ipfs.as_ref().unwrap();
Expand Down
1 change: 1 addition & 0 deletions src/app/cli/src/commands/pull_images_command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ impl Command for PullImagesCommand {
self.engine_config.spark_image.as_str(),
self.engine_config.flink_image.as_str(),
self.engine_config.datafusion_image.as_str(),
self.engine_config.risingwave_image.as_str(),
self.jupyter_config.image.as_ref().unwrap().as_str(),
self.jupyter_config.livy_image.as_ref().unwrap().as_str(),
];
Expand Down
4 changes: 4 additions & 0 deletions src/app/cli/src/services/config/models.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,8 @@ pub struct EngineImagesConfig {
pub flink: Option<String>,
/// UNSTABLE: Datafusion engine image
pub datafusion: Option<String>,
/// UNSTABLE: RisingWave engine image
pub risingwave: Option<String>,
}

impl EngineImagesConfig {
Expand All @@ -148,6 +150,7 @@ impl EngineImagesConfig {
spark: None,
flink: None,
datafusion: None,
risingwave: None,
}
}

Expand All @@ -162,6 +165,7 @@ impl Default for EngineImagesConfig {
spark: Some(docker_images::SPARK.to_owned()),
flink: Some(docker_images::FLINK.to_owned()),
datafusion: Some(docker_images::DATAFUSION.to_owned()),
risingwave: Some(docker_images::RISINGWAVE.to_owned()),
}
}
}
Expand Down
14 changes: 14 additions & 0 deletions src/infra/core/src/engine/engine_provisioner_local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ pub struct EngineProvisionerLocal {
spark_engine: Arc<dyn Engine>,
flink_engine: Arc<dyn Engine>,
datafusion_engine: Arc<dyn Engine>,
risingwave_engine: Arc<dyn Engine>,
container_runtime: Arc<ContainerRuntime>,
inner: Arc<Inner>,
}
Expand Down Expand Up @@ -81,6 +82,13 @@ impl EngineProvisionerLocal {
run_info_dir.clone(),
dataset_repo.clone(),
)),
risingwave_engine: Arc::new(ODFEngine::new(
container_runtime.clone(),
engine_config.clone(),
&config.risingwave_image,
run_info_dir.clone(),
dataset_repo.clone(),
)),
container_runtime,
inner: Arc::new(Inner {
state: Mutex::new(State {
Expand Down Expand Up @@ -201,6 +209,10 @@ impl EngineProvisioner for EngineProvisionerLocal {
self.datafusion_engine.clone(),
&self.config.datafusion_image,
)),
"risingwave" => Ok((
self.risingwave_engine.clone(),
&self.config.risingwave_image,
)),
_ => Err(format!("Unsupported engine {engine_id}").int_err()),
}?;

Expand Down Expand Up @@ -231,6 +243,7 @@ pub struct EngineProvisionerLocalConfig {
pub spark_image: String,
pub flink_image: String,
pub datafusion_image: String,
pub risingwave_image: String,
}

// This is for tests only
Expand All @@ -243,6 +256,7 @@ impl Default for EngineProvisionerLocalConfig {
spark_image: docker_images::SPARK.to_owned(),
flink_image: docker_images::FLINK.to_owned(),
datafusion_image: docker_images::DATAFUSION.to_owned(),
risingwave_image: docker_images::RISINGWAVE.to_owned(),
}
}
}
Expand Down
3 changes: 2 additions & 1 deletion src/infra/core/src/utils/docker_images.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,9 @@
// by the Apache License, Version 2.0.

pub const SPARK: &str = "ghcr.io/kamu-data/engine-spark:0.23.0-spark_3.5.0";
pub const FLINK: &str = "ghcr.io/kamu-data/engine-flink:0.18.1-flink_1.16.0-scala_2.12-java8";
pub const FLINK: &str = "ghcr.io/kamu-data/engine-flink:0.18.2-flink_1.16.0-scala_2.12-java8";
pub const DATAFUSION: &str = "ghcr.io/kamu-data/engine-datafusion:0.7.2";
pub const RISINGWAVE: &str = "ghcr.io/kamu-data/engine-risingwave:0.2.0-risingwave_1.7.0-alpha";

pub const LIVY: &str = SPARK;
pub const JUPYTER: &str = "ghcr.io/kamu-data/jupyter:0.6.1";
Expand Down
Loading

0 comments on commit a7a6761

Please sign in to comment.