-
Notifications
You must be signed in to change notification settings - Fork 229
Pattern Matching with MATCH_RECOGNIZE
This session discusses how to write SQL queries that detect patterns in streaming data.
Specifically, you will learn about
- The
MATCH_RECOGNIZE
clause defined in SQL:2016 - Supported semantics and limitations of Flink's implementation
- Examples of how to perform pattern matching on append tables
This exercises will teach you how to detect a pattern in data streams and to perform a computation on the matching set of rows.
In this exercise we want to compute the duration of every taxi ride, i.e., the time between its start and end event, in minutes. This means that we need to look for the pattern of a start event and end event based on the ride id.
Note: This is exactly the same exercise as in the join exercises, but this time we want to solve it with MATCH_RECOGNIZE
.
Click here for hints.
- We are looking for a pattern per
rideId
. - The pattern consists of two types of events a start event followed by an end event.
- Use the provided
timeDiff
function to return the difference of two timestamps in milliseconds.
The output should look similar to:
rideId durationMin
52693 13
46868 24
53226 12
53629 11
55651 7
43220 31
53082 12
54716 9
55125 9
57211 4
44795 28
53563 12
Click to see the solution.
SELECT rideId, timeDiff(startT, endT) / 60000 AS durationMin
FROM Rides
MATCH_RECOGNIZE (
PARTITION BY rideId
ORDER BY rideTime
MEASURES
S.rideTime AS startT,
E.rideTime AS endT
AFTER MATCH SKIP PAST LAST ROW
PATTERN (S E)
DEFINE
S AS S.isStart,
E AS NOT E.isStart
);
This query matches start and end events of the same ride, i.e., that have the same ride id.
By partitioning the table on rideId
, only rides with the same id are processed together.
Events are then distinguished into a start event S
and an end event E
and the pattern to match is defined as (S E)
, i.e., exactly one S
followed by exactly one E
. Finally, we emit for each match, the rideId
and the timestamps of the start and end events, and compute the ride duration in the SELECT
clause using the timeDiff
function.
In this exercise, we want to detect how long taxis rest before they start a new ride.
Use the MATCH_RECOGNIZE
clause to detect a pattern of:
- a start event,
- an arbitrary number of potential intermediate events for the same taxi but from different rides,
- an end event,
- and the next start event.
Compute the resting times in minutes.
Click here for hints.
- Use a
AFTER MATCH SKIP TO LAST variable
strategy to include the last start event in the next pattern matching. - Use the built-in
TIMESTAMPDIFF
function to calculate the difference in minutes.
The output should look similar to:
taxiId ride_start ride_end next_ride_start minutes_of_rest
2013000002 2013-01-01 00:00:00.0 2013-01-01 00:06:00.0 2013-01-01 00:16:00.0 10
2013000004 2013-01-01 00:00:00.0 2013-01-01 00:08:00.0 2013-01-01 00:13:00.0 5
2013000032 2013-01-01 00:00:00.0 2013-01-01 00:05:00.0 2013-01-01 00:06:00.0 1
2013000128 2013-01-01 00:01:00.0 2013-01-01 00:04:00.0 2013-01-01 00:12:00.0 8
2013000256 2013-01-01 00:02:00.0 2013-01-01 00:10:00.0 2013-01-01 00:10:00.0 0
2013000512 2013-01-01 00:03:25.0 2013-01-01 00:04:51.0 2013-01-01 00:10:00.0 5
2013000512 2013-01-01 00:10:00.0 2013-01-01 00:13:31.0 2013-01-01 00:14:19.0 0
2013001028 2013-01-01 00:05:52.0 2013-01-01 00:13:20.0 2013-01-01 00:14:12.0 0
2013000258 2013-01-01 00:02:00.0 2013-01-01 00:08:00.0 2013-01-01 00:11:00.0 3
2013002070 2013-01-01 00:08:57.0 2013-01-01 00:12:26.0 2013-01-01 00:13:46.0 1
Click to see the solution.
SELECT * FROM Rides
MATCH_RECOGNIZE(
PARTITION BY taxiId
ORDER BY rideTime
MEASURES
START_RIDE.rideTime AS ride_start,
END_RIDE.rideTime AS ride_end,
NEXT_RIDE.rideTime AS next_ride_start,
TIMESTAMPDIFF(MINUTE, END_RIDE.rideTime, NEXT_RIDE.rideTime) AS minutes_of_rest
AFTER MATCH SKIP TO LAST NEXT_RIDE
PATTERN (START_RIDE M* END_RIDE NEXT_RIDE)
DEFINE
START_RIDE AS START_RIDE.isStart = true,
M AS M.rideId <> START_RIDE.rideId,
END_RIDE AS END_RIDE.isStart = false,
NEXT_RIDE AS NEXT_RIDE.isStart = true
);
The query matches the pattern mentioned above. The variable START_RIDE
detects the start event. M
defines a greedy set of ride events that don't belong to the same ride of the start event. The END_RIDE
detects the end event. The NEXT_RIDE
variable defines the following start event for computing the resting time. Once a match has been detected, we measure the difference in minutes and return it.
Apache Flink, Flink®, Apache®, the squirrel logo, and the Apache feather logo are either registered trademarks or trademarks of The Apache Software Foundation.