Skip to content
This repository has been archived by the owner on Nov 26, 2021. It is now read-only.

Creating Tables and Writing Query Results to External Systems

Fabian Hueske edited this page Jul 17, 2020 · 4 revisions

This session discusses how to connect and integrate Flink SQL with external systems.

Specifically, you will learn

  • how to create tables to read and write data.
  • how to write query results to external systems.

Slides

Hands-On Exercises

Flink SQL and the SQL CLI client support the INSERT INTO clause to write the result of a SELECT query into a table. The table must have been previously registered as a sink table. The training environment does not provide pre-defined sink tables, so we have to define them ourselves.

You can define a tables with a CREATE TABLE DDL statement. Since Flink doesn't store data itself but uses external storage systems, a table definition requires additional information about the external system and possibly about the format in which the data should be stored.

In the following exercises we provide the DDL statements to define the tables and describe their properties.

Writing an Append-only Table to Kafka

Specify a query that computes the number of rides that started within the last 10 minutes and write the result of the query to table that is backed by a Kafka topic.

As a first step, we have to define the table to which we will write the data. The table should be backed by a Kafka topic and the records should be encoded as text-JSON (like our input topics).

We define the table with the following DDL statement.

CREATE TABLE TenMinPsgCnts (
  cntStart TIMESTAMP(3),
  cntEnd TIMESTAMP(3),
  cnt BIGINT
) WITH (
  'connector' = 'kafka',
  'topic' = 'TenMinPsgCnts',
  'properties.bootstrap.servers' = 'kafka:9092',
  'properties.group.id' = 'flinksql',
  'scan.startup.mode' = 'earliest-offset',
  'format' = 'json'
);

As you see, the table definition includes all details that Flink needs to talk to Kafka such as hostname, port, and topic name. Being an append-only table, TenMinPsgCnts only accepts append-only results, i.e., streams that do not update previously emitted results.

Now you have to write the SQL query that writes the requested result into the TenMinPsgCnts table. Note that the schema of the query result must exactly match with the schema of the result table which consists of three columns: a start timestamp, an end timestamp, and a count:

Flink SQL> DESCRIBE TenMinPsgCnts;
root
 |-- cntStart: TIMESTAMP(3)
 |-- cntEnd: TIMESTAMP(3)
 |-- cnt: BIGINT

You can monitor the Kafka topic of the TenMinsPsgCnts table by running the following command in the folder that contains the docker-compose.yml file:

docker-compose exec kafka kafka-console-consumer.sh --bootstrap-server kafka:9092 --topic TenMinPsgCnts --from-beginning

The result is encoded as JSON and should look as follows.

{"cntStart":1356998400000,"cntEnd":1356999000000,"cnt":2864}
{"cntStart":1356999000000,"cntEnd":1356999600000,"cnt":6884}
{"cntStart":1356999600000,"cntEnd":1357000200000,"cnt":9391}
{"cntStart":1357000200000,"cntEnd":1357000800000,"cnt":10158}
{"cntStart":1357000800000,"cntEnd":1357001400000,"cnt":10702}
{"cntStart":1357001400000,"cntEnd":1357002000000,"cnt":10817}
{"cntStart":1357002000000,"cntEnd":1357002600000,"cnt":10958}
{"cntStart":1357002600000,"cntEnd":1357003200000,"cnt":10946}

Note: An INSERT INTO query cannot be stopped from the CLI client yet. Please use the Flink web UI (at http://localhost:8081) to inspect the query and cancel it.

Click to see the solution.
INSERT INTO TenMinPsgCnts 
SELECT 
  TUMBLE_START(rideTime, INTERVAL '10' MINUTE) AS cntStart, 
  TUMBLE_END(rideTime, INTERVAL '10' MINUTE) AS cntEnd,
  COUNT(*) AS cnt 
FROM Rides 
GROUP BY TUMBLE(rideTime, INTERVAL '10' MINUTE);

Maintaining a Continuously Updated Materialized View in MySQL

Specify a query that maintains a materialized view consisting of the number of departing rides per area in a MySQL table AreaCnts.

First we define the sink table AreaCnts which is backed by a table with the same name in MySQL with the following DDL statement.

CREATE TABLE AreaCnts (
  areaId INT PRIMARY KEY NOT ENFORCED,
  cnt BIGINT
) WITH (
  'connector' = 'jdbc',
  'url' = 'jdbc:mysql://mysql:3306/flinksql',
  'table-name' = 'AreaCnts',
  'username' = 'flink',
  'password' = 'secret',
  'sink.buffer-flush.interval' = '1s',
  'sink.buffer-flush.max-rows' = '1000',
  'sink.max-retries' = '3'
);

AreaCnts is an upsert table stored in a MySQL table, which means that it only accepts insert-only results or results which can be updated based on a key.

Now you have to specify a query that computes the number of departing rides per area and write the result to AreaCnts.

Again, the schema of the query result and the schema of the sink table (AreaCnts) must exactly match. AreaCnts consists of an area identifier and count:

Flink SQL> DESCRIBE AreaCnts;
root
 |-- areaId: INT
 |-- cnt: BIGINT

To check, whether your query is producing a correct result, you can start the MySQL CLI client by running the following command in the folder that contains the docker-compose.yml file:

docker-compose exec mysql mysql -D flinksql -u flink --password=secret

Once you started the MySQL CLI client, you can query the MySQL table with regular SQL, such as for example:

mysql> SELECT * FROM AreaCnts ORDER BY cnt DESC LIMIT 10;
+---------+-----+
| areaId  | cnt |
+---------+-----+
| 8252892 |  90 |
|   51781 |  44 |
|   45548 |  32 |
|   46298 |  28 |
|   50801 |  22 |
|   52542 |  20 |
|   51047 |  20 |
|   51532 |  19 |
|   49551 |  18 |
|   54285 |  18 |
+---------+-----+
10 rows in set (0.01 sec)

By repeatedly running the same query, you can observe how Flink maintains the query result as a materialize view in the MySQL table.

Note: An INSERT INTO query cannot be stopped from the CLI client yet. Please use the Flink web UI (at http://localhost:8081) to inspect the query and cancel it.

Click to see the solution.
INSERT INTO AreaCnts
SELECT
  toAreaId(lon, lat) AS areaId,
  COUNT(*) AS cnt
FROM Rides
WHERE isStart
GROUP BY toAreaId(lon, lat);