Skip to content

Latest commit

 

History

History
588 lines (501 loc) · 21.6 KB

sql_stream_builder_enhanced.adoc

File metadata and controls

588 lines (501 loc) · 21.6 KB

Querying streams with SQL

Note
This lab assumes that the From Edge to Streams Processing lab has been completed.

In this workshop you will use SQL Stream Builder to query and manipulate data streams using SQL language. SQL Stream Builder is a powerful service that enables you to create Flink jobs without having to write Java/Scala code.

Labs summary

  • Lab 0 - Preparation: Start Kafka Producer (only necessary if you start in an empty environment)

  • Lab 1 - Create a Data Source for IoT Topic

  • Lab 2 - Create a Source Virtual Table for the IoT topic with JSON messages

  • Lab 3 - Run a simple query

  • Lab 4 - Computing and storing aggregation results

  • Lab 5 - Create a Source Virtual Table for WeatherCondition topic in CSV format

  • Lab 6 - Join two Kafka topics

  • Lab 7 - Look up Reference data stored in Kudu

  • Lab 8 - Materialized Views

  • Lab 9 - Perform an HTTP action

Introduction

In this lab, and the subsequent ones, we will use the iot topic created and populated in previous labs and contains a datastream of computer performance data points. If you start in an empty environment do Step 0 first

So let’s start with a straightforward goal: to query the contents of the iot topic using SQL to examine the data that is being streamed.

Albeit simple, this task will show the ease of use and power of SQL Stream Builder (SSB).

Lab 0 - Create a Data Source for IoT Topic

  1. Open one SSH connections to your LAB environment via a browser or terminal

    The details for the connection are available on the LAB environment overview page.

    flink ssl lite
  2. Use the first SSH connection to download a JAVA App which generates sample messages a publishing into Kafka iot topic

    sudo wget https://github.com/zBrainiac/kafka-producer/releases/download/0.0.1/kafka-producer-0.0.1.0.jar -P /opt/cloudera/parcels/FLINK/lib/flink/examples/streaming
  3. Switch directory and start JAVA App

    The IoT simulator allows the following input parameters:

    1. kafka-broker URI = edge2ai-1.dim.local:9092

    2. wait time between msg.= 1000 ms

      Sample command looks like:

      cd /opt/cloudera/parcels/FLINK/lib/flink/examples/streaming &&
      java -classpath streaming-flink-0.4.1.0.jar producer.KafkaIOTSensorSimulator edge2ai-1.dim.local:9092 1000

Lab 1 - Create a Data Source for IoT Topic

Before we can start querying data from Kafka topics we need to register the Kafka clusters as data sources in SSB.

  1. On the landing page or Cloudera Manager console, click on the Cloudera logo at the top-left corner to ensure you are at the home page and then click on the SQL Stream Builder service.

  2. Click on the SQLStreamBuilder Console link to open the SSB UI.

  3. On the logon screen, authenticate with user admin and password supersecret1

  4. Click on Data Providers you will notice that SSB already has a Kafka cluster registered as a data source, named CDP Kafka. This source is created automatically for SSB when it is installed on a cluster that also has a Kafka service:

    ssb register kafka provider
  5. You can use this screen to add other external Kafka clusters as data sources to SSB.

Lab 2 - Create a Source Virtual Table for a topic with JSON messages

Now we can map the iot topic to a virtual table that we can reference in our query. Virtual Tables on SSB are a way to associate a Kafka topic with a schema so that we can use that as a table in our queries.

We will use a Source Virtual Table now to read from the topic. Later we will look into Sink Virtual Tables to write data to Kafka.

  1. To create our first Source Virtual Table, click on Console (on the left bar) > Tables > Add table > Apache Kafka.

    ssb add source virtual table
  2. On the Kafka Source window, enter the following information:

    Virtual table name: iot_enriched_source
    Kafka Cluster:      CDP Kafka
    Topic Name:         iot
    Data Format:        JSON
    ssb kafka source
  3. Ensure the Schema tab is selected. Scroll to the bottom of the tab and click Detect Schema. SSB will take a sample of the data flowing through the topic and will infer the schema used to parse the content. Alternatively you could also specify the schema in this tab.

    ssb detect schema
  4. Click on the Event Time tab, define your time handling. You can specify Watermark Definitions when adding a Kafka table. Watermarks use an event time attribute and have a watermark strategy, and can be used for various time-based operations.

    The Event Time tab provides the following properties to configure the event time field and watermark for the Kafka stream:

    • Input Timestamp Column: name of the timestamp column in the Kafka table from where the event time column is mapped. If you wanna use a colume from the event message you have to unselect the box Use Kafka Timestamp first.

    • Event Time Column: new name of the timestamp column where the watermarks are going to be mapped

    • Watermark seconds : number of seconds used in the watermark strategy. The watermark is defined by the current event timestamp minus this value.

      Input Timestamp Column: sensor_ts
      Event Time Column:      event_ts
      Watermark Seconds:      3
      ssb define timehandling
  5. If we need to manipulate the source data to fix, cleanse or convert some values, we can define transformations for the data source to perform those changes. These transformations are defined in Javascript.

    The serialized record read from Kafka is provided to the Javascript code in the record.value variable. The last command of the transformation must return the serialized content of the modified record.

    The sensor_0 data in the iot topic has a pressure expressed in micro-pascal. Let’s say we need the value in pascal scale. Let’s write a transformation to perform that conversion for us at the source.

    Click on the Transformations tab and enter the following code in the Code field:

    // Kafka payload (record value JSON deserialized to JavaScript object)
    var payload = JSON.parse(record.value);
    payload['sensor_0'] = Math.round(payload.sensor_0 * 1000);
    JSON.stringify(payload);
    ssb source transformations
  6. Click on the Properties tab, enter the following value for the Consumer Group property and click Save changes.

    Consumer Group: ssb-iot-1
    ssb source properties
    Note
    Setting the Consumer Group properties for a virtual table will ensure that if you stop a query and restart it later, the second query execute will continue to read the data from the point where the first query stopped, without skipping data. However, if multiple queries use the same virtual table, setting this property will effectively distribute the data across the queries so that each record is only read by a single query. If you want to share a virtual table with multiple distinct queries, ensure that the Consumer Group property is unset.

Lab 3 - Run a simple query

We have now all that we need to run our first query in SSB. We want to simply query the raw contents of topic to ensure that the everything is working correctly before we proceed to do more complex things.

If your environment is healthy and all the steps from previous labs were completed correctly you should be able to visualize the data with the steps below.

  1. On the SSB UI, click on Console (on the left bar) > Compose > SQL and type the following query:

    select *
    from iot_enriched_source
    ssb compose sql
  2. Set a SQL Job Name for your job or use the random name provided.

  3. Do not add a Sink Virtual Table.

  4. Click Execute

  5. Scroll to the bottom of the page and you will see the log messages generated by your query execution.

    ssb sql execution
  6. After a few seconds the SQL Console will start showing the results of the query coming from the iot topic.

    The data displayed on the screen is only a sample of the data returned by the query, not the full data. The column on the right shows the previously defined Event Time

    ssb sql results
  7. well done so let’s increase the level of difficulty and replace the existing SQL with the HELLO WORLD in streaming and counting the events by sensor_id - standard SQL with GROUP BY and COUNT.

    select sensor_id,
      count(*) as sensorCount
    from iot_enriched_source
    group by sensor_id
  8. After editing SQL - click on Restart

    Warning
    Make sure to stop your queries to release all resources once you finish this lab. You can double-check that all queries/jobs have been stopped by clicking on the SQL Jobs tab. If any jobs are still running, you can stop them from that page.

Lab 4 - Computing and storing aggregation results

We want to start computing window aggregates for our incoming data stream and make the aggregation results available for downstream applications. SQL Stream Builder’s Sink Virtual Tables give us the ability to publish/store streaming data to several different services (Kafka, AWS S3, Google GCS, Elastic Search and generic webhooks). In this lab we’ll use a Kafka sink to publish the results of our aggregation to another Kafka topic.

  1. Let’s first create a topic (sensor6_stats) where to publish our aggregation results:

    1. Navigate to the SMM UI (Cloudera Manager > SMM service > Streams Messaging Manager Web UI).

    2. On the SMM UI, click the Topics tab (topics icon).

    3. Click the Add New button.

    4. Enter the following details for the topic and click Save when ready:

      1. Topic name: sensor6_stats

      2. Partitions: 10

      3. Availability: Low

      4. Cleanup Policy: delete

  2. Back in SSB to create the Sink Table, click on Console (on the left bar) > Tables > Add Table > Apache Kafka.

    ssb add source virtual table
  3. On the Apache Kafka window, enter the following information and click Save changes:

    Virtual table name: sensor6_stats_sink
    Kafka Cluster:      CDP Kafka
    Topic Name:         sensor6_stats
    Dynamic Schema:     YES
    ssb kafka sink
  4. On the SSB UI, click on Console (on the left bar) > Compose > SQL and type the query shown below.

    This query will compute aggregates over 30-seconds windows that slide forward every second. For a specific sensor value in the record (sensor_6) it computes the following aggregations for each window:

    • Number of events received

    • Sum of the sensor_6 value for all the events

    • Average of the sensor_6 value across all the events

    • Min and max values of the sensor_6 field

    • Number of events for which the sensor_6 value exceeds 70

    SELECT
      sensor_id as device_id,
      HOP_END(event_ts, INTERVAL '1' SECOND, INTERVAL '30' SECOND) as windowEnd,
      count(*) as sensorCount,
      sum(sensor_6) as sensorSum,
      avg(cast(sensor_6 as float)) as sensorAverage,
      min(sensor_6) as sensorMin,
      max(sensor_6) as sensorMax,
      sum(case when sensor_6 > 70 then 1 else 0 end) as sensorGreaterThan60
    FROM iot_enriched_source
    GROUP BY
      sensor_id,
      HOP(event_ts, INTERVAL '1' SECOND, INTERVAL '30' SECOND)
    ssb sql aggregation
  5. Enter Sensor6Stats for the SQL Job Name field.

  6. On the Sink Virtual Table field, click on the None drop-down and select the Virtual Sink Table that you created previously (sensor6_stats_sink)

    ssb select sink
  7. Click Execute.

  8. Scroll to the bottom of the page and you will see the log messages generated by your query execution.

    ssb sql execution
  9. After a few seconds the SQL Console will start showing the results of your aggregation query.

    Note that the data displayed on the screen is only a sample of the data returned by the query, not the full data.

    ssb sql aggr results
  10. Check the job execution details and logs by clicking on Console (on the left bar) > SQL Jobs tab. Explore the options on this screen:

    1. Click on the Sensor6Stats job.

    2. Click on the Details tab to see job details.

    3. Click on the Log tab to see log messages generated by the job execution.

    ssb job details
  11. Click on the Flink Dashboard link to open the job’s page on the dashboard. Navigate the dashboard pages to explore details and metrics of the job execution.

    ssb job dashboard
  12. Let’s query the sensor6_stats table to examine the data that is being written to it. First we need to define a Source Table associated with the sensor6_stats topic.

    1. Click on Console (on the left bar) > Tables > Add Table > Apache Kafka

    2. On the Kafka Source window, enter the following information and click Save changes:

      Virtual table name: sensor6_stats_source
      Kafka Cluster:      CDP Kafka
      Topic Name:         sensor6_stats
      Data Format:        JSON
    1. Click on Detect Schema and wait for the schema to be updated.

    2. Click Save changes.

  13. Click on Console (on the left bar) to refresh the screen and clear the SQL Compose field, which may still show the running aggregation job.

    Note that the job will continue to run in the background and you can continue to monitor it through the Job Logs page.

  14. Enter the following query in the SQL field and execute it:

    SELECT *
    FROM sensor6_stats_source
  15. After a few seconds you should see the contents of the sensor6_stats topic displayed on the screen:

    ssb stats results
  16. You will need to leave the Sensor6Stats job running to use it in the next lab. Make sure you stop all other jobs to release cluster resources.

    ssb jobs running
Warning
Make sure to stop your queries to release all resources once you finish this lab. You can double-check that all queries/jobs have been stopped by clicking on the SQL Jobs tab. If any jobs are still running, you can stop them from that page.

Lab 5 - Create a Source Virtual Table for WeatherCondition topic in CSV format

  1. Let’s got back to SSH and use the second session

  2. Switch directory and start JAVA App which publish messages in CSV format to the topic kafka_LookupWeatherCondition

    cd /opt/cloudera/parcels/FLINK/lib/flink/examples/streaming
    java -classpath streaming-flink-0.4.1.0.jar producer.KafkaLookupWeatherCondition edge2ai-1.dim.local:9092
  3. Define new virtual table weather_condition. To create a new Source Table, click on Console (on the left bar) > Tables > Add Table > Add Source > Flink DDL.

  4. On the Flink DDL window, enter the following information:

    CREATE TABLE weather_condition (
      stationid INT,
      eventDate STRING,
      tre200s0 DOUBLE,
      rre150z0 DOUBLE,
      sre000z0 DOUBLE,
      gre000z0 DOUBLE,
      ure200s0 DOUBLE,
      tde200s0 DOUBLE,
      dkl010z0 DOUBLE,
      fu3010z0 DOUBLE,
      fu3010z1 DOUBLE,
      prestas0 DOUBLE,
      pp0qffs0 DOUBLE,
      pp0qnhs0 DOUBLE,
      ppz850s0 DOUBLE,
      ppz700s0 DOUBLE,
      dv1towz0 DOUBLE,
      fu3towz0 DOUBLE,
      fu3towz1 DOUBLE,
      ta1tows0 DOUBLE,
      uretows0 DOUBLE,
      tdetows0 DOUBLE
    ) WITH (
     'connector' = 'kafka',
     'topic' = 'kafka_LookupWeatherCondition',
     'properties.bootstrap.servers' = 'edge2ai-1.dim.local:9092',
     'properties.group.id' = 'kafka_LookupWeatherCondition',
     'format' = 'csv',
     'csv.ignore-parse-errors' = 'true',
     'csv.allow-comments' = 'true'
    )

Lab 6 - Join two Kafka topics

  1. Back to the Console

  2. Add SQL join statement between IoT and WeatherCondition

First let’s check the data feeds

select * from weather_condition

Update the SQL with some join statement

select iot.sensor_id,
 sensor_0 as pressure,
 sensor_1 as torsion,
 tre200s0 as AirTemperature2m
from iot_enriched_source as iot,  weather_condition as weather
where iot.sensor_id = weather.stationid

/* or as inner join */

select iot.sensor_id,
 sensor_0 as pressure,
 sensor_1 as torsion,
 tre200s0 as AirTemperature2m
from iot_enriched_source as iot
inner join weather_condition as weather
on iot.sensor_id = weather.stationid
Warning
Make sure to stop your queries to release all resources once you finish this lab. You can double-check that all queries/jobs have been stopped by clicking on the SQL Jobs tab. If any jobs are still running, you can stop them from that page.

Lab 7 - Look up Reference data stored in Kudu

In the next Lab we join the data stream with static data e.g. reference data.

  1. To create the Kudu table we have to go back to the entry screen

  2. Use the Hue Web UI and login as admin / supersecret1. The first user to login to a Hue installation is automatically created and granted admin privileges in Hue.

  3. The Hue UI should open with the Impala Query Editor by default. If it doesn’t, you can always find it by clicking on Query button > Editor → Impala:

    impala editor
  4. Copy & past the following DDL which creates some tables and insert some reference data into it:

    Warning
    See Create SQL Statement
    create table
  5. Select all copied lines and execute

  6. Go back to SSB to add Kudu as a new data source: Data Providers > Register Catalog

  7. Edit Catalog window, enter the following information:

    Name:           kudu_source
    Catalog Type:   Kudu
    Kudu Master:    edge2ai-1.dim.local:7051
    ssb add kudu source virtual table
  8. Click ValidateValidation was successful, 2 tables found

  9. Click Add Tables

  10. Kudu tables are show up now as virtual tables

    ssb as kudu source virtual table
  11. Click on a kudu table and see the schema (DLL) of the table

    ssb kudu schema
  12. Back to the Console - add a standard SQL lookup

    select iot.sensor_id,
     sensor_0 as pressure,
     sensor_1 as torsion,
     city,
     lat,
     lon
    from iot_enriched_source as iot, `kudu_source`.`default_database`.`default.refdata_geolocation` as refdata
    where iot.sensor_id = refdata.sensor_id

Lab 8 - Materialized Views

Materialized Views are in synchronization with the mutating stream - they are updated by a primary key as data flows through the system. The data is updated by a given key, and it represents the latest view of the data by key.

  1. With the SQL from the last lab still running click in the Console on Materialized Views

    ssb mv base
  2. Configuration and Apply Configuration

    Primary Key            sensor_id
    Retention (Seconds)    300
    Recreate on Job Start  Y
    Ignore NULLs           N
    API Key                <create API key>
  3. Click Add Query to provide a URI name and select the attributes in the Materialized View

  4. Click Add filters to apply computations and further enrichment of your data

    1. Defining parameters for the filter criteria in the URI like /api/v1/query/5195/sensorid/{sensor_id}

    2. Use the defiled parameters in the filter statement like a normal SQL WHERE clause

      ssb mv addQuery
    3. REST endpoint is exposed as: <URI>/api/v1/query/<job id>/sensorid/{sensor_id}?key=<API Key>

    4. invoke it with: sample /api/v1/query/5195/sensorid/12?key=AbzRvs…​.

Lab 9 - Perform an HTTP action

You can configure the webhook table to perform an HTTP action per message (default) or to create code that controls the frequency (for instance, every N messages).

Warning
Open new browser in incognito mode and create a new http endpoint at https://webhook.site
  1. Select Console from the main menu.

  2. Select the Tables tab.

  3. Select Add table > Webhook. The Webhook Table window appears.

    Table name              webhook_table
    Http EndPoint           webhook endpoint https://
    HttpMethod              POST
    Disable SSL Validation  no
    Enable Request Template yes
  4. In the Code editor, you can specify a code block that controls how the webhook displays the data. For a webhook that is called for each message the following code is used:

    ssb webhooktable code
    // Boolean function that takes entire row from query as Json Object
    function onCondition(rowAsJson)
    {return true;   // return false here for no-op, or plug in custom
            logic}
    onCondition($p0)
  5. Add HTTP headers using the HTTP Headers tab, if needed.

    ssb webhooktable httpheader
    Http Header     Content-Type
    Value           application/json
  6. Press the plus sign to save

  7. On the Request Template tab modify template

    ssb webhooktable requesttemplate
    {
       "incident":{
          "type":"incident",
          "title":"${sensor_id} - pressure is too high!",
            "body":{
       "type":"incident_body",
             "details":"Sensor with id ${sensor_id} has ${sensor_0} pascal."
          }
        }
    }
  8. Click Save changes

  9. On the Console > Compose > SQL and type the following query:

    select sensor_id, sensor_0
    from iot_enriched_source
    where sensor_0 >= 8000
    ssb webhook sql
  10. Click Execute

  11. Back on the https://webhook.site page you will see the submitted messages

    ssb webhook result

End of the Streaming SQL Lab’s (for today)