Skip to content

Latest commit

 

History

History
259 lines (198 loc) · 6.07 KB

demo_introduction_to_ksqldb.adoc

File metadata and controls

259 lines (198 loc) · 6.07 KB

Introduction to ksqlDB - demo script

Running the test rig

  1. Bring up the stack

    git clone https://github.com/confluentinc/demo-scene.git
    cd introduction-to-ksqldb
    docker-compose up -d
  2. Run two terminal windows, side by side, both running ksqlDB CLI

    docker exec -it ksqldb ksql http://localhost:8088

ksqlDB basics

[Window 1] Create a stream

CREATE STREAM MOVEMENTS (LOCATION VARCHAR)
    WITH (VALUE_FORMAT='JSON', PARTITIONS=1, KAFKA_TOPIC='movements');

[Window 2] Query the stream

SELECT TIMESTAMPTOSTRING(ROWTIME,'yyyy-MM-dd HH:mm:ss','Europe/Amsterdam') AS EVENT_TS,
       ROWKEY AS PERSON,
       LOCATION
  FROM MOVEMENTS
  EMIT CHANGES;

[Window 1] Insert some data

INSERT INTO MOVEMENTS VALUES ('robin', 'York');
INSERT INTO MOVEMENTS VALUES ('robin', 'Leeds');
INSERT INTO MOVEMENTS VALUES ('robin', 'Ilkley');

[Window 2] Cancel the query, show the topics

SHOW TOPICS;

Dump the topic contents:

PRINT movements FROM BEGINNING;

[Window 2] Cancel the PRINT, query the stream with a predicate

SET 'auto.offset.reset' = 'earliest';
SELECT TIMESTAMPTOSTRING(ROWTIME,'yyyy-MM-dd HH:mm:ss','Europe/Amsterdam') AS EVENT_TS,
       ROWKEY AS PERSON,
       LOCATION
  FROM MOVEMENTS
  WHERE LCASE(LOCATION)='leeds'
  EMIT CHANGES;

[Window 1] Insert some more data

INSERT INTO MOVEMENTS VALUES ('robin', 'Sheffield');
INSERT INTO MOVEMENTS VALUES ('robin', 'Leeds');
INSERT INTO MOVEMENTS VALUES ('robin', 'Wakefield');
INSERT INTO MOVEMENTS VALUES ('robin', 'Leeds');

Create a new topic that could be used to trigger an event-driven app when the user is in a certain location

CREATE STREAM LEEDS_USERS WITH (KAFKA_TOPIC='leeds-users')
    AS SELECT * FROM MOVEMENTS WHERE LCASE(LOCATION)='leeds' EMIT CHANGES;

Show that a new topic has been created

SHOW TOPICS;

Dump the topic contents

PRINT 'leeds-users' FROM BEGINNING;

[Window 1] Insert some more data

INSERT INTO MOVEMENTS VALUES ('robin', 'Sheffield');
INSERT INTO MOVEMENTS VALUES ('robin', 'Leeds');

Stream/Table duality

Show the difference between stream and table - but note that it’s the same Kafka topic underneath

CREATE TABLE MOVEMENTS_T (LOCATION VARCHAR)
    WITH (VALUE_FORMAT='JSON', KAFKA_TOPIC='movements');

SELECT TIMESTAMPTOSTRING(ROWTIME,'yyyy-MM-dd HH:mm:ss','Europe/Amsterdam') AS EVENT_TS, ROWKEY AS PERSON, LOCATION
    FROM MOVEMENTS_T EMIT CHANGES;

SELECT TIMESTAMPTOSTRING(ROWTIME,'yyyy-MM-dd HH:mm:ss','Europe/Amsterdam') AS EVENT_TS, ROWKEY AS PERSON, LOCATION
    FROM MOVEMENTS EMIT CHANGES;

Aggregates

SET 'auto.offset.reset' = 'earliest';

CREATE TABLE PERSON_STATS WITH (VALUE_FORMAT='AVRO') AS
SELECT ROWKEY AS PERSON,
       COUNT(*) AS LOCATION_CHANGES,
       COUNT_DISTINCT(LOCATION) AS UNIQUE_LOCATIONS
  FROM MOVEMENTS
GROUP BY ROWKEY
EMIT CHANGES;

SELECT PERSON, LOCATION_CHANGES, UNIQUE_LOCATIONS
  FROM PERSON_STATS
  EMIT CHANGES;

[Window 1] Insert some data

INSERT INTO MOVEMENTS VALUES ('robin', 'Leeds');
INSERT INTO MOVEMENTS VALUES ('robin', 'London');

Show a pull query in action

SELECT PERSON, LOCATION_CHANGES, UNIQUE_LOCATIONS
  FROM PERSON_STATS
 WHERE ROWKEY='robin';

Run a pull query using the REST API

docker exec -t ksqldb curl -s -X "POST" "http://localhost:8088/query" \
     -H "Content-Type: application/vnd.ksql.v1+json; charset=utf-8" \
     -d '{"ksql":"SELECT PERSON, LOCATION_CHANGES, UNIQUE_LOCATIONS FROM PERSON_STATS WHERE ROWKEY='\''robin'\'';"}'|jq '.[].row'

Connecting to other systems

First, check that the connector plugin has been installed.

docker exec -it ksqldb curl -s localhost:8083/connector-plugins|jq '.[].class'

Should include io.confluent.connect.jdbc.JdbcSinkConnector in its output.

CREATE SINK CONNECTOR SINK_POSTGRES WITH (
    'connector.class'     = 'io.confluent.connect.jdbc.JdbcSinkConnector',
    'connection.url'      = 'jdbc:postgresql://postgres:5432/',
    'connection.user'     = 'postgres',
    'connection.password' = 'postgres',
    'topics'              = 'PERSON_STATS',
    'key.converter'       = 'org.apache.kafka.connect.storage.StringConverter',
    'auto.create'         = 'true',
    'insert.mode'         = 'upsert',
    'pk.mode'             = 'record_key',
    'pk.fields'           = 'PERSON'
  );

Show the data in Postgres

docker exec -it postgres bash -c 'psql -U $POSTGRES_USER $POSTGRES_DB'
SELECT * FROM "PERSON_STATS";

Add some more data into Kafka topic, show postgres updating in place.


Appendix

Setting the ROWTIME of inserted data

INSERT INTO MOVEMENTS (ROWTIME, ROWKEY, LOCATION) VALUES (STRINGTOTIMESTAMP('2020-02-17T15:22:00Z','yyyy-MM-dd''T''HH:mm:ssX'), 'robin', 'Leeds');
INSERT INTO MOVEMENTS (ROWTIME, ROWKEY, LOCATION) VALUES (STRINGTOTIMESTAMP('2020-02-17T16:22:00Z','yyyy-MM-dd''T''HH:mm:ssX'), 'robin', 'Retford');

Deploying code via REST API

docker exec -t ksqldb curl -s -X "POST" "http://localhost:8088/ksql" \
     -H "Content-Type: application/vnd.ksql.v1+json; charset=utf-8" \
     -d '{"ksql":"CREATE STREAM MOVEMENTS (LOCATION VARCHAR) WITH (VALUE_FORMAT='\''JSON'\'', PARTITIONS=1, KAFKA_TOPIC='\''movements'\'');"}'


docker exec -t ksqldb curl -s -X "POST" "http://localhost:8088/ksql" \
     -H "Content-Type: application/vnd.ksql.v1+json; charset=utf-8" \
     -d '{
            "ksql":"CREATE STREAM LONDON AS SELECT * FROM MOVEMENTS WHERE LCASE(LOCATION)='\''london'\'';",
            "streamsProperties": {
                "ksql.streams.auto.offset.reset": "earliest"
            }
        }'

TODO

  • TODO add in datagen

  • TODO INSERT INTO to merge streams