-
Notifications
You must be signed in to change notification settings - Fork 230
Creating Tables and Writing Query Results to External Systems
This session discusses how to connect and integrate Flink SQL with external systems.
Specifically, you will learn
- how to create tables to read and write data.
- how to write query results to external systems.
Flink SQL and the SQL CLI client support the INSERT INTO
clause to write the result of a SELECT
query into a table. The table must have been previously registered as a sink table. The training environment does not provide pre-defined sink tables, so we have to define them ourselves.
You can define a tables with a CREATE TABLE
DDL statement. Since Flink doesn't store data itself but uses external storage systems, a table definition requires additional information about the external system and possibly about the format in which the data should be stored.
In the following exercises we provide the DDL statements to define the tables and describe their properties.
Specify a query that computes the number of rides that started within the last 10 minutes and write the result of the query to table that is backed by a Kafka topic.
As a first step, we have to define the table to which we will write the data. The table should be backed by a Kafka topic and the records should be encoded as text-JSON (like our input topics).
We define the table with the following DDL statement.
CREATE TABLE TenMinPsgCnts (
cntStart TIMESTAMP(3),
cntEnd TIMESTAMP(3),
cnt BIGINT
) WITH (
'connector' = 'kafka',
'topic' = 'TenMinPsgCnts',
'properties.bootstrap.servers' = 'kafka:9092',
'properties.group.id' = 'flinksql',
'scan.startup.mode' = 'earliest-offset',
'format' = 'json'
);
As you see, the table definition includes all details that Flink needs to talk to Kafka such as hostname, port, and topic name. Being an append-only table, TenMinPsgCnts
only accepts append-only results, i.e., streams that do not update previously emitted results.
Now you have to write the SQL query that writes the requested result into the TenMinPsgCnts
table. Note that the schema of the query result must exactly match with the schema of the result table which consists of three columns: a start timestamp, an end timestamp, and a count:
Flink SQL> DESCRIBE TenMinPsgCnts;
root
|-- cntStart: TIMESTAMP(3)
|-- cntEnd: TIMESTAMP(3)
|-- cnt: BIGINT
You can monitor the Kafka topic of the TenMinsPsgCnts
table by running the following command in the folder that contains the docker-compose.yml
file:
docker-compose exec kafka kafka-console-consumer.sh --bootstrap-server kafka:9092 --topic TenMinPsgCnts --from-beginning
The result is encoded as JSON and should look as follows.
{"cntStart":1356998400000,"cntEnd":1356999000000,"cnt":2864}
{"cntStart":1356999000000,"cntEnd":1356999600000,"cnt":6884}
{"cntStart":1356999600000,"cntEnd":1357000200000,"cnt":9391}
{"cntStart":1357000200000,"cntEnd":1357000800000,"cnt":10158}
{"cntStart":1357000800000,"cntEnd":1357001400000,"cnt":10702}
{"cntStart":1357001400000,"cntEnd":1357002000000,"cnt":10817}
{"cntStart":1357002000000,"cntEnd":1357002600000,"cnt":10958}
{"cntStart":1357002600000,"cntEnd":1357003200000,"cnt":10946}
Note: An INSERT INTO
query cannot be stopped from the CLI client yet. Please use the Flink web UI (at http://localhost:8081) to inspect the query and cancel it.
Click to see the solution.
INSERT INTO TenMinPsgCnts
SELECT
TUMBLE_START(rideTime, INTERVAL '10' MINUTE) AS cntStart,
TUMBLE_END(rideTime, INTERVAL '10' MINUTE) AS cntEnd,
COUNT(*) AS cnt
FROM Rides
GROUP BY TUMBLE(rideTime, INTERVAL '10' MINUTE);
Specify a query that maintains a materialized view consisting of the number of departing rides per area in a MySQL table AreaCnts
.
First we define the sink table AreaCnts
which is backed by a table with the same name in MySQL with the following DDL statement.
CREATE TABLE AreaCnts (
areaId INT PRIMARY KEY NOT ENFORCED,
cnt BIGINT
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://mysql:3306/flinksql',
'table-name' = 'AreaCnts',
'username' = 'flink',
'password' = 'secret',
'sink.buffer-flush.interval' = '1s',
'sink.buffer-flush.max-rows' = '1000',
'sink.max-retries' = '3'
);
AreaCnts
is an upsert table stored in a MySQL table, which means that it only accepts insert-only results or results which can be updated based on a key.
Now you have to specify a query that computes the number of departing rides per area and write the result to AreaCnts
.
Again, the schema of the query result and the schema of the sink table (AreaCnts
) must exactly match. AreaCnts
consists of an area identifier and count:
Flink SQL> DESCRIBE AreaCnts;
root
|-- areaId: INT
|-- cnt: BIGINT
To check, whether your query is producing a correct result, you can start the MySQL CLI client by running the following command in the folder that contains the docker-compose.yml
file:
docker-compose exec mysql mysql -D flinksql -u flink --password=secret
Once you started the MySQL CLI client, you can query the MySQL table with regular SQL, such as for example:
mysql> SELECT * FROM AreaCnts ORDER BY cnt DESC LIMIT 10;
+---------+-----+
| areaId | cnt |
+---------+-----+
| 8252892 | 90 |
| 51781 | 44 |
| 45548 | 32 |
| 46298 | 28 |
| 50801 | 22 |
| 52542 | 20 |
| 51047 | 20 |
| 51532 | 19 |
| 49551 | 18 |
| 54285 | 18 |
+---------+-----+
10 rows in set (0.01 sec)
By repeatedly running the same query, you can observe how Flink maintains the query result as a materialize view in the MySQL table.
Note: An INSERT INTO
query cannot be stopped from the CLI client yet. Please use the Flink web UI (at http://localhost:8081) to inspect the query and cancel it.
Click to see the solution.
INSERT INTO AreaCnts
SELECT
toAreaId(lon, lat) AS areaId,
COUNT(*) AS cnt
FROM Rides
WHERE isStart
GROUP BY toAreaId(lon, lat);
Apache Flink, Flink®, Apache®, the squirrel logo, and the Apache feather logo are either registered trademarks or trademarks of The Apache Software Foundation.