forked from mitch-seymour/mastering-kafka-streams-and-ksqldb
-
Notifications
You must be signed in to change notification settings - Fork 3
/
all.sql
43 lines (40 loc) · 1010 Bytes
/
all.sql
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
CREATE TYPE season_length AS STRUCT<season_id INT, episode_count INT> ;
CREATE TABLE titles (
id INT PRIMARY KEY,
title VARCHAR
) WITH (
KAFKA_TOPIC='titles',
VALUE_FORMAT='AVRO',
PARTITIONS=4
);
CREATE STREAM production_changes (
rowkey VARCHAR KEY,
uuid INT,
title_id INT,
change_type VARCHAR,
before season_length,
after season_length,
created_at VARCHAR
) WITH (
KAFKA_TOPIC='production_changes',
PARTITIONS='4',
VALUE_FORMAT='JSON',
TIMESTAMP='created_at',
TIMESTAMP_FORMAT='yyyy-MM-dd HH:mm:ss'
);
CREATE STREAM season_length_changes
WITH (
KAFKA_TOPIC = 'season_length_changes',
VALUE_FORMAT = 'AVRO',
PARTITIONS = 4,
REPLICAS = 1
) AS SELECT
ROWKEY,
title_id,
IFNULL(after->season_id, before->season_id) AS season_id,
before->episode_count AS old_episode_count,
after->episode_count AS new_episode_count,
created_at
FROM production_changes
WHERE change_type = 'season_length'
EMIT CHANGES ;