Skip to content

Latest commit

 

History

History
153 lines (143 loc) · 6.74 KB

16_over_alerting.md

File metadata and controls

153 lines (143 loc) · 6.74 KB

Advent of Flink - Day 16 Fun with `OVER´ Aggregations (Event-Driven Alerting)

Today, is going to be a more creative, fun one than the last days.

Let’s say a customer comes to you with the following requirements:

We would like to block customers in certain situations from placing orders. It’s essentially an anti-fraud or anti-abuse system. For simplicity, let’s say, if a customers places orders worth more than X$ in a certain time interval, or if a customer places more than Y orders per time interval, they should be blocked and a notification should be send. It is important that this happens right away after the order that exceeded the limit was placed - within seconds or so. Now, if the condition is not met anymore when another order is placed, we need another event to unblock the customer.

We will now implement this using Flink SQL. At the core of our implementation are OVER aggregations.

OVER aggregates compute an aggregated value for every input row over a range of ordered rows. In contrast to a GROUP BY aggregate, an OVER aggregate doesn’t reduce the number of result rows to a single row for every group. Instead, an OVER aggregate produces an aggregated value for every input row.

This is good for our requirements, because we don’t want to count orders or sum up prices over a certain time interval (whether tumbling, hopping, session or cumulating), but we want to act on each order immediately.

Let’s say our time interval is 10 seconds. Then, we could start with the following query:

SELECT 
  order_id,
  customer_id,
  `$rowtime`,
  SUM(price) OVER w AS total_price_ten_secs, 
  COUNT(*) OVER w AS total_orders_ten_secs
FROM `examples`.`marketplace`.`orders`
WINDOW w AS (
  PARTITION BY customer_id
  ORDER BY `$rowtime`
  RANGE BETWEEN INTERVAL '10' SECONDS PRECEDING AND CURRENT ROW
)

For every, order we select order_id, customer_id and $rowtime as well as the number (total_orders_ten_secs) and total value (total_price_ten_secs) for the same customer (PARTITION BY customer_id) over the last ten seconds (RANGE BETWEEN INTERVAL '10' SECONDS PRECEDING AND CURRENT ROW). Run this query to understand the output.

Ok, that’s a good start. We could now easily alert on every order that exceeds the limit, but we actually only want to send an event once the customer exceeds the limit for the first time and we want to send another event when the customer drops below the limit again. For this, we’ll do another OVER aggregation over the the existing one so that we can compare total_price_ten_secs between two consecutive orders.

WITH orders_ten_secs AS 
( 
SELECT 
  order_id,
  customer_id,
  `$rowtime`,
  SUM(price) OVER w AS total_price_ten_secs, 
  COUNT(*) OVER w AS total_orders_ten_secs
FROM `examples`.`marketplace`.`orders`
WINDOW w AS (
  PARTITION BY customer_id
  ORDER BY `$rowtime`
  RANGE BETWEEN INTERVAL '10' SECONDS PRECEDING AND CURRENT ROW
)
)
SELECT 
  *,
  LAG(total_price_ten_secs, 1) OVER w AS total_price_ten_secs_lag, 
  LAG(total_orders_ten_secs, 1) OVER w AS total_orders_ten_secs_lag
FROM orders_ten_secs
WINDOW w AS (
  PARTITION BY customer_id
  ORDER BY `$rowtime`
)

That’s pretty cool. So, now for each order, we not only have the total order value and number of orders for the last ten seconds, but also the same value for when the last order was placed. This allows us to now filter for those orders specifically that pushed the customer above or below the limit. Let’s say the limit for total account value is “$300” and the limit for orders is “5".

WITH orders_ten_secs AS 
( 
SELECT 
  order_id,
  customer_id,
  `$rowtime`,
  SUM(price) OVER w AS total_price_ten_secs, 
  COUNT(*) OVER w AS total_orders_ten_secs
FROM `examples`.`marketplace`.`orders`
WINDOW w AS (
  PARTITION BY customer_id
  ORDER BY `$rowtime`
  RANGE BETWEEN INTERVAL '10' SECONDS PRECEDING AND CURRENT ROW
)
),
orders_ten_secs_with_lag AS 
(
SELECT 
  *,
  LAG(total_price_ten_secs, 1) OVER w AS total_price_ten_secs_lag, 
  LAG(total_orders_ten_secs, 1) OVER w AS total_orders_ten_secs_lag
FROM orders_ten_secs
WINDOW w AS (
  PARTITION BY customer_id
  ORDER BY `$rowtime`
)
)
SELECT customer_id, 'BLOCK' AS action, `$rowtime` AS updated_at 
FROM orders_ten_secs_with_lag 
WHERE 
(total_price_ten_secs > 300 AND total_price_ten_secs_lag <= 300) OR
(total_orders_ten_secs > 5 AND total_orders_ten_secs_lag <= 5)
UNION ALL 
SELECT customer_id, 'UNBLOCK' AS action, `$rowtime` AS updated_at 
FROM orders_ten_secs_with_lag 
WHERE 
(total_price_ten_secs <= 300 AND total_price_ten_secs_lag > 300) OR
(total_orders_ten_secs <= 5 AND total_orders_ten_secs_lag > 5)

Ok, finally, let’s put this into another Kafka topic with a CREATE TABLE AS. I am specifying, customer_id as a primary key, so that Flink interprets every message with the same customer_id as an update to any previous row with the same customer_id.

CREATE TABLE `customers_status` (
    PRIMARY KEY (customer_id) NOT ENFORCED
)
AS 
WITH orders_ten_secs AS 
( 
SELECT 
  order_id,
  customer_id,
  `$rowtime`,
  SUM(price) OVER w AS total_price_ten_secs, 
  COUNT(*) OVER w AS total_orders_ten_secs
FROM `examples`.`marketplace`.`orders`
WINDOW w AS (
  PARTITION BY customer_id
  ORDER BY `$rowtime`
  RANGE BETWEEN INTERVAL '10' SECONDS PRECEDING AND CURRENT ROW
)
),
orders_ten_secs_with_lag AS 
(
SELECT 
  *,
  LAG(total_price_ten_secs, 1) OVER w AS total_price_ten_secs_lag, 
  LAG(total_orders_ten_secs, 1) OVER w AS total_orders_ten_secs_lag
FROM orders_ten_secs
WINDOW w AS (
  PARTITION BY customer_id
  ORDER BY `$rowtime`
)
)
SELECT customer_id, 'BLOCK' AS action, `$rowtime` AS updated_at 
FROM orders_ten_secs_with_lag 
WHERE 
(total_price_ten_secs > 300 AND total_price_ten_secs_lag <= 300) OR
(total_orders_ten_secs > 5 AND total_orders_ten_secs_lag <= 5)
UNION ALL 
SELECT customer_id, 'UNBLOCK' AS action, `$rowtime` AS updated_at 
FROM orders_ten_secs_with_lag 
WHERE 
(total_price_ten_secs <= 300 AND total_price_ten_secs_lag > 300) OR
(total_orders_ten_secs <= 5 AND total_orders_ten_secs_lag > 5)

If we know, run SELECT * FROM customers_status; we get a dynamically updating table that holds for every customer (that ever got blocked) its latest action. Given the thresholds we picked it actually turns out that most customers are blocked the majority of time with the sample data sources that come with Confluent Cloud.

That’s it for today. I hope you enjoyed it. I think, its a great example that simple event driven logic can be nicely implemented in SQL. I think, I’ll have more examples for OVER windows on another day - just because they are so useful. As always (so far), the examples in here are runnable out of the box on Confluent Cloud.