-
Notifications
You must be signed in to change notification settings - Fork 230
Queries and Time
This session discusses how to write SQL queries to process streaming data.
Specifically, you will learn about
- Event-Time and Processing-Time handing in SQL
- GROUP BY window aggregations
- OVER window aggregations
These exercises will teach you how to perform temporal aggregation with SQL. The following exercises process data based on the time when events have occurred. The queries consume an append-only stream and produce an append-only stream.
For continuously determining the city's taxi traffic situation, count the number of arriving and departing rides per area in a window of 5 minutes.
We are only interested in events that start or end in New York City and areas with at least 5 arriving or departing rides.
Click here for hints.
- Use the provided
toAreaId
to convert coordinates to an area id.
The output should look similar to:
area isStart t cnt
49282 true 2013-01-01 00:05:00.0 6
45881 true 2013-01-01 00:05:00.0 8
51781 true 2013-01-01 00:05:00.0 8
49551 true 2013-01-01 00:05:00.0 7
48540 true 2013-01-01 00:10:00.0 6
51795 true 2013-01-01 00:10:00.0 6
47550 true 2013-01-01 00:10:00.0 6
54285 true 2013-01-01 00:10:00.0 8
51781 true 2013-01-01 00:10:00.0 17
45548 true 2013-01-01 00:10:00.0 14
The t
column represents the end of every 5-minute window.
Click to see the solution.
SELECT
toAreaId(lon, lat) AS area,
isStart,
TUMBLE_END(rideTime, INTERVAL '5' MINUTE) AS t,
COUNT(*) AS cnt
FROM Rides
WHERE isInNYC(lon, lat)
GROUP BY
toAreaId(lon, lat),
isStart,
TUMBLE(rideTime, INTERVAL '5' MINUTE)
HAVING COUNT(*) >= 5;
The query filters out events that do not start in New York City. It uses a tumbling window of 5 minutes on the rideTime
time attribute. The result is grouped by the area id, the isStart
flag, and the tumbling window. For every group, we return the area id, the isStart
flag, the end boundary of the window, and the aggregated count. We only return counts that are equal or greater than 5.
In this exercise we want to return the areas from which more than 10 people left by taxi in the last 10 minutes. Return for each departure (start) event the area id, the timestamp, and the number of people that left the area in the last 10 minutes, if more than 10 people left.
We are only interested in rides that depart from New York City.
Click here for hints.
- First, filter for ride start events that happened in New York City.
- Because the query should return a new row with an updated count whenever a new start event arrives, use an
OVER
window to compute the running count per departure area.
The output should look similar to:
areaId rideTime peopleCnt
45881 2013-01-01 00:00:56.0 12
45881 2013-01-01 00:01:02.0 14
53283 2013-01-01 00:02:00.0 11
8252892 2013-01-01 00:02:33.0 11
8252892 2013-01-01 00:02:41.0 12
8252892 2013-01-01 00:02:55.0 13
8252892 2013-01-01 00:03:00.0 18
8252892 2013-01-01 00:03:00.0 18
41819 2013-01-01 00:03:00.0 12
45631 2013-01-01 00:03:00.0 13
45881 2013-01-01 00:03:07.0 18
Click to see the solution.
SELECT
areaId,
rideTime,
peopleCnt
FROM
(
SELECT
areaId,
rideTime,
SUM(psgCnt) OVER w AS peopleCnt
FROM
(SELECT toAreaId(lon, lat) AS areaId, rideTime, psgCnt FROM Rides WHERE isStart AND isInNYC(lon, lat))
WINDOW w AS (
PARTITION BY areaId
ORDER BY rideTime RANGE BETWEEN INTERVAL '10' MINUTE PRECEDING AND CURRENT ROW
)
)
WHERE peopleCnt > 10;
The subquery returns all start events with a converted area id.
The result of the subquery is ordered by time (rideTime
) and partitioned by area id to compute a running sum of all passengers that left the area within the last 10 minutes.
Filter the result on records with a leaving persons count of at least 10.
The following exercise processes data by continuously updating its results with input from streaming operations. Queries produce updates and deletions.
For this exercise, you should compute for each area in New York City the average number of persons that are leaving the area per hour. For simplicity let's assume that all rides start in another area in which they end.
Click here for hints.
- Each ride is represented by two events. Filter out all end events for accurate counts.
- Use the provided
toAreaId
to convert coordinates to an area id. - Compute the result in two steps: First compute for every hour the number of leaving passengers per area; second compute for each area the average number of leaving passengers per hour using the first result.
The output should look similar to:
area avgPsgLeaving
46568 1.7083333333333333333333~
47559 0.7916666666666666666666~
36313 0.0833333333333333333333~
49106 0.0416666666666666666666~
48792 0.5
55325 0.0833333333333333333333~
37570 0.375
Click to see the solution.
SELECT
area,
SUM(psgSum)/24.0 AS avgPsgLeaving
FROM
(SELECT
toAreaId(lon, lat) AS area,
TUMBLE_END(rideTime ,INTERVAL '1' HOUR) AS t,
SUM(psgCnt) AS psgSum
FROM
Rides
WHERE
isStart AND isInNYC(lon, lat)
GROUP BY
toAreaId(lon, lat),
TUMBLE(rideTime, INTERVAL '1' HOUR))
GROUP BY
area;
The query defines a subquery which uses an hourly tumbling window to compute every hour the number of passengers leaving an area. The subquery produces a streaming result that is grouped by area to compute for every area the average number of people leaving per hour.
Apache Flink, Flink®, Apache®, the squirrel logo, and the Apache feather logo are either registered trademarks or trademarks of The Apache Software Foundation.