Skip to content
This repository has been archived by the owner on Nov 26, 2021. It is now read-only.

Queries and Time

Fabian Hueske edited this page Jun 8, 2020 · 2 revisions

This session discusses how to write SQL queries to process streaming data.

Specifically, you will learn about

  • Event-Time and Processing-Time handing in SQL
  • GROUP BY window aggregations
  • OVER window aggregations

Slides

Hands-On Exercises

These exercises will teach you how to perform temporal aggregation with SQL. The following exercises process data based on the time when events have occurred. The queries consume an append-only stream and produce an append-only stream.

GROUP BY windows aggregation

Windowed Ride Count

For continuously determining the city's taxi traffic situation, count the number of arriving and departing rides per area in a window of 5 minutes.

We are only interested in events that start or end in New York City and areas with at least 5 arriving or departing rides.

Click here for hints.
  • Use the provided toAreaId to convert coordinates to an area id.


The output should look similar to:
area         isStart      t                             cnt
49282        true         2013-01-01 00:05:00.0           6
45881        true         2013-01-01 00:05:00.0           8
51781        true         2013-01-01 00:05:00.0           8
49551        true         2013-01-01 00:05:00.0           7
48540        true         2013-01-01 00:10:00.0           6
51795        true         2013-01-01 00:10:00.0           6
47550        true         2013-01-01 00:10:00.0           6
54285        true         2013-01-01 00:10:00.0           8
51781        true         2013-01-01 00:10:00.0          17
45548        true         2013-01-01 00:10:00.0          14

The t column represents the end of every 5-minute window.

Click to see the solution.
SELECT
  toAreaId(lon, lat) AS area,
  isStart,
  TUMBLE_END(rideTime, INTERVAL '5' MINUTE) AS t,
  COUNT(*) AS cnt
FROM Rides
WHERE isInNYC(lon, lat)
GROUP BY
  toAreaId(lon, lat),
  isStart,
  TUMBLE(rideTime, INTERVAL '5' MINUTE)
HAVING COUNT(*) >= 5;

The query filters out events that do not start in New York City. It uses a tumbling window of 5 minutes on the rideTime time attribute. The result is grouped by the area id, the isStart flag, and the tumbling window. For every group, we return the area id, the isStart flag, the end boundary of the window, and the aggregated count. We only return counts that are equal or greater than 5.


OVER window aggregation

Areas with Leaving People

In this exercise we want to return the areas from which more than 10 people left by taxi in the last 10 minutes. Return for each departure (start) event the area id, the timestamp, and the number of people that left the area in the last 10 minutes, if more than 10 people left.

We are only interested in rides that depart from New York City.

Click here for hints.
  • First, filter for ride start events that happened in New York City.
  • Because the query should return a new row with an updated count whenever a new start event arrives, use an OVER window to compute the running count per departure area.


The output should look similar to:
areaId                  rideTime       peopleCnt
  45881     2013-01-01 00:00:56.0              12
  45881     2013-01-01 00:01:02.0              14
  53283     2013-01-01 00:02:00.0              11
8252892     2013-01-01 00:02:33.0              11
8252892     2013-01-01 00:02:41.0              12
8252892     2013-01-01 00:02:55.0              13
8252892     2013-01-01 00:03:00.0              18
8252892     2013-01-01 00:03:00.0              18
  41819     2013-01-01 00:03:00.0              12
  45631     2013-01-01 00:03:00.0              13
  45881     2013-01-01 00:03:07.0              18
Click to see the solution.
SELECT
  areaId,
  rideTime,
  peopleCnt
FROM
  (
    SELECT
      areaId,
      rideTime,
      SUM(psgCnt) OVER w AS peopleCnt
    FROM
      (SELECT toAreaId(lon, lat) AS areaId, rideTime, psgCnt FROM Rides WHERE isStart)
    WINDOW w AS (
      PARTITION BY areaId
      ORDER BY rideTime RANGE BETWEEN INTERVAL '10' MINUTE PRECEDING AND CURRENT ROW
    )
  )
WHERE peopleCnt > 10;

The subquery returns all start events with a converted area id.

The result of the subquery is ordered by time (rideTime) and partitioned by area id to compute a running sum of all passengers that left the area within the last 10 minutes.

Filter the result on records with a leaving persons count of at least 10.


Combining Temporal and Materializing Operations

The following exercise processes data by continuously updating its results with input from streaming operations. Queries produce updates and deletions.

Average Number of Persons Leaving an Area Per Hour

For this exercise, you should compute for each area in New York City the average number of persons that are leaving the area per hour. For simplicity let's assume that all rides start in another area in which they end.

Click here for hints.
  • Each ride is represented by two events. Filter out all end events for accurate counts.
  • Use the provided toAreaId to convert coordinates to an area id.
  • Compute the result in two steps: First compute for every hour the number of leaving passengers per area; second compute for each area the average number of leaving passengers per hour using the first result.


The output should look similar to:
area             avgPsgLeaving
46568 1.7083333333333333333333~
47559 0.7916666666666666666666~
36313 0.0833333333333333333333~
49106 0.0416666666666666666666~
48792                       0.5
55325 0.0833333333333333333333~
37570                     0.375
Click to see the solution.
SELECT
  area,
  SUM(psgSum)/24.0 AS avgPsgLeaving
FROM
  (SELECT 
     toAreaId(lon, lat) AS area,
     TUMBLE_END(rideTime ,INTERVAL '1' HOUR) AS t,
     SUM(psgCnt) AS psgSum
   FROM 
     Rides
   WHERE 
     isStart AND isInNYC(lon, lat)
   GROUP BY
     toAreaId(lon, lat),
     TUMBLE(rideTime, INTERVAL '1' HOUR))
GROUP BY
  area;

The query defines a subquery which uses an hourly tumbling window to compute every hour the number of passengers leaving an area. The subquery produces a streaming result that is grouped by area to compute for every area the average number of people leaving per hour.