-
Bring up the stack
git clone https://github.com/confluentinc/demo-scene.git cd introduction-to-ksqldb docker-compose up -d
-
Run two terminal windows, side by side, both running ksqlDB CLI
docker exec -it ksqldb ksql http://localhost:8088
[Window 1] Create a stream
CREATE STREAM MOVEMENTS (LOCATION VARCHAR)
WITH (VALUE_FORMAT='JSON', PARTITIONS=1, KAFKA_TOPIC='movements');
[Window 2] Query the stream
SELECT TIMESTAMPTOSTRING(ROWTIME,'yyyy-MM-dd HH:mm:ss','Europe/Amsterdam') AS EVENT_TS,
ROWKEY AS PERSON,
LOCATION
FROM MOVEMENTS
EMIT CHANGES;
[Window 1] Insert some data
INSERT INTO MOVEMENTS VALUES ('robin', 'York');
INSERT INTO MOVEMENTS VALUES ('robin', 'Leeds');
INSERT INTO MOVEMENTS VALUES ('robin', 'Ilkley');
[Window 2] Cancel the query, show the topics
SHOW TOPICS;
Dump the topic contents:
PRINT movements FROM BEGINNING;
[Window 2] Cancel the PRINT, query the stream with a predicate
SET 'auto.offset.reset' = 'earliest';
SELECT TIMESTAMPTOSTRING(ROWTIME,'yyyy-MM-dd HH:mm:ss','Europe/Amsterdam') AS EVENT_TS,
ROWKEY AS PERSON,
LOCATION
FROM MOVEMENTS
WHERE LCASE(LOCATION)='leeds'
EMIT CHANGES;
[Window 1] Insert some more data
INSERT INTO MOVEMENTS VALUES ('robin', 'Sheffield');
INSERT INTO MOVEMENTS VALUES ('robin', 'Leeds');
INSERT INTO MOVEMENTS VALUES ('robin', 'Wakefield');
INSERT INTO MOVEMENTS VALUES ('robin', 'Leeds');
Create a new topic that could be used to trigger an event-driven app when the user is in a certain location
CREATE STREAM LEEDS_USERS WITH (KAFKA_TOPIC='leeds-users')
AS SELECT * FROM MOVEMENTS WHERE LCASE(LOCATION)='leeds' EMIT CHANGES;
Show that a new topic has been created
SHOW TOPICS;
Dump the topic contents
PRINT 'leeds-users' FROM BEGINNING;
[Window 1] Insert some more data
INSERT INTO MOVEMENTS VALUES ('robin', 'Sheffield');
INSERT INTO MOVEMENTS VALUES ('robin', 'Leeds');
Show the difference between stream and table - but note that it’s the same Kafka topic underneath
CREATE TABLE MOVEMENTS_T (LOCATION VARCHAR)
WITH (VALUE_FORMAT='JSON', KAFKA_TOPIC='movements');
SELECT TIMESTAMPTOSTRING(ROWTIME,'yyyy-MM-dd HH:mm:ss','Europe/Amsterdam') AS EVENT_TS, ROWKEY AS PERSON, LOCATION
FROM MOVEMENTS_T EMIT CHANGES;
SELECT TIMESTAMPTOSTRING(ROWTIME,'yyyy-MM-dd HH:mm:ss','Europe/Amsterdam') AS EVENT_TS, ROWKEY AS PERSON, LOCATION
FROM MOVEMENTS EMIT CHANGES;
SET 'auto.offset.reset' = 'earliest';
CREATE TABLE PERSON_STATS WITH (VALUE_FORMAT='AVRO') AS
SELECT ROWKEY AS PERSON,
COUNT(*) AS LOCATION_CHANGES,
COUNT_DISTINCT(LOCATION) AS UNIQUE_LOCATIONS
FROM MOVEMENTS
GROUP BY ROWKEY
EMIT CHANGES;
SELECT PERSON, LOCATION_CHANGES, UNIQUE_LOCATIONS
FROM PERSON_STATS
EMIT CHANGES;
[Window 1] Insert some data
INSERT INTO MOVEMENTS VALUES ('robin', 'Leeds');
INSERT INTO MOVEMENTS VALUES ('robin', 'London');
Show a pull query in action
SELECT PERSON, LOCATION_CHANGES, UNIQUE_LOCATIONS
FROM PERSON_STATS
WHERE ROWKEY='robin';
Run a pull query using the REST API
docker exec -t ksqldb curl -s -X "POST" "http://localhost:8088/query" \
-H "Content-Type: application/vnd.ksql.v1+json; charset=utf-8" \
-d '{"ksql":"SELECT PERSON, LOCATION_CHANGES, UNIQUE_LOCATIONS FROM PERSON_STATS WHERE ROWKEY='\''robin'\'';"}'|jq '.[].row'
First, check that the connector plugin has been installed.
docker exec -it ksqldb curl -s localhost:8083/connector-plugins|jq '.[].class'
Should include io.confluent.connect.jdbc.JdbcSinkConnector
in its output.
CREATE SINK CONNECTOR SINK_POSTGRES WITH (
'connector.class' = 'io.confluent.connect.jdbc.JdbcSinkConnector',
'connection.url' = 'jdbc:postgresql://postgres:5432/',
'connection.user' = 'postgres',
'connection.password' = 'postgres',
'topics' = 'PERSON_STATS',
'key.converter' = 'org.apache.kafka.connect.storage.StringConverter',
'auto.create' = 'true',
'insert.mode' = 'upsert',
'pk.mode' = 'record_key',
'pk.fields' = 'PERSON'
);
Show the data in Postgres
docker exec -it postgres bash -c 'psql -U $POSTGRES_USER $POSTGRES_DB'
SELECT * FROM "PERSON_STATS";
Add some more data into Kafka topic, show postgres updating in place.
INSERT INTO MOVEMENTS (ROWTIME, ROWKEY, LOCATION) VALUES (STRINGTOTIMESTAMP('2020-02-17T15:22:00Z','yyyy-MM-dd''T''HH:mm:ssX'), 'robin', 'Leeds');
INSERT INTO MOVEMENTS (ROWTIME, ROWKEY, LOCATION) VALUES (STRINGTOTIMESTAMP('2020-02-17T16:22:00Z','yyyy-MM-dd''T''HH:mm:ssX'), 'robin', 'Retford');
docker exec -t ksqldb curl -s -X "POST" "http://localhost:8088/ksql" \
-H "Content-Type: application/vnd.ksql.v1+json; charset=utf-8" \
-d '{"ksql":"CREATE STREAM MOVEMENTS (LOCATION VARCHAR) WITH (VALUE_FORMAT='\''JSON'\'', PARTITIONS=1, KAFKA_TOPIC='\''movements'\'');"}'
docker exec -t ksqldb curl -s -X "POST" "http://localhost:8088/ksql" \
-H "Content-Type: application/vnd.ksql.v1+json; charset=utf-8" \
-d '{
"ksql":"CREATE STREAM LONDON AS SELECT * FROM MOVEMENTS WHERE LCASE(LOCATION)='\''london'\'';",
"streamsProperties": {
"ksql.streams.auto.offset.reset": "earliest"
}
}'