Skip to content

Latest commit

 

History

History
514 lines (421 loc) · 21 KB

workshop_ssb.adoc

File metadata and controls

514 lines (421 loc) · 21 KB

Querying streams with SQL

In this workshop you will use Cloudera SQL Stream Builder to query and manipulate data streams using SQL language. SQL Stream Builder is a powerful service that enables you to create Flink jobs without having to write Java/Scala code.

You will get data from the iot_enriched topic created and populated in previous labs that contains a stream of temperature sensor’s data points.

Preparation

This workshop builds upon the content developed in the Edge and Nifi workshops.

To clean your environment and make it ready for the beginning of this lab, please SSH to your cluster host and run the following command:

Note
The command below will undo everything done in the cluster in previous workshops.
/tmp/resources/reset-to-lab.sh ssb 1

Labs summary

  • Lab 1 - Create a Data Provider

  • Lab 2 - Create a Table for a topic with JSON messages

  • Lab 3 - Integrate SQL Stream Builder with Schema Registry

  • Lab 4 - Computing and storing aggregation results

  • Lab 5 - Materialized Views

Lab 1 - Create a Data Provider

Let’s start with a straightforward goal: to query the contents of the iot_enriched topic using SQL to examine the data that is being streamed. Albeit simple, this task will show the ease of use and power of SQL Stream Builder (SSB).

Before you can start querying data from Kafka topics you need to register the Kafka clusters as data sources in SSB.

  1. On the Cloudera Manager console, click on the Cloudera logo at the top-left corner to ensure you are at the home page and then click on the SQL Stream Builder service.

  2. Click on the SQLStreamBuilder Console link to open the SSB UI.

  3. On the logon screen, authenticate with user admin and password Supersecret1.

  4. You will notice that SSB already has a Kafka cluster registered as a data provider, named CDP Kafka. This provider is created automatically for SSB when it is installed on a cluster that also has a Kafka service:

    register kafka provider
  5. You can use this screen to add other external Kafka clusters as data providers to SSB. In this lab you’ll add a second data provider using a different host name, just to show how simple it is.

  6. Click on Register Kafka Provider and in the Add Kafka Provider window, enter the details for your new data source and click Save changes.

    Name:                edge2ai-kafka
    Brokers:             <CLUSTER_HOSTNAME>:9092
    Connection protocol: PLAINTEXT
    add kafka provider

Lab 2 - Create a Table for a topic with JSON messages

Now you can map the iot_enriched topic to a table in SQL Stream Builder. Tables in SSB are a way to associate a Kafka topic with a schema so that you can use it in your SQL queries.

  1. To create your first Table, click on Console (on the left bar), enter a name for your job (e.g. "my_first_job") and click on the Create Job button.

  2. On the Virtual Tables pane on the left, click Add Table > Apache Kafka.

    add table
  3. On the Kafka Table window, enter the following information:

    Table Name:    iot_enriched
    Kafka Cluster: edge2ai-kafka
    Data Format:   JSON
    Topic Name:    iot_enriched
    kafka source
  4. Ensure the Schema Definition tab is selected. Click Detect Schema at the bottom of the window. SSB will take a sample of the data flowing through the topic and will infer the schema used to parse the content. Alternatively you could also specify the schema in this tab.

    detect schema
  5. Click OK to acknowledge the "Schema Detection Complete" message.

  6. Whenever you need to manipulate the source data to fix, cleanse or convert some values, you can define transformations for the table. Transformations are defined in Javascript code.

    The serialized record read from Kafka is provided to the Javascript code in the record variable. The last command of the transformation code must return the serialized content of the modified record.

    The data in the iot_enriched topic has a timestamp expressed in microseconds. You will need to convert this field to milliseconds. Let’s write a transformation to perform that conversion for us.

    Click on the Data Transformations tab and enter the following code in the code area:

    // parse the JSON record
    var parsedVal = JSON.parse(record.value);
    // Convert sensor_ts from micro to milliseconds
    parsedVal['sensor_ts'] = Math.round(parsedVal['sensor_ts']/1000);
    // serialize output as JSON
    JSON.stringify(parsedVal);
    source transformations
  7. Now that you have converted the sensor_ts field to milliseconds, you can tell SSB to use it as a source for the event time, which is the time that will be used for defining aggregation windows for your queries.

    To do this, click on the Event Time tab and configure the following properties:

    Use Kafka Timestamps:   Uncheck it
    Input Timestamp Column: sensor_ts
    Event Time Column:      event_time
    Watermark Seconds:      3
    event time column

    This will add the event_time column to the table. This column has a TIMESTAMP ROWTIME data type and is derived from the value of the sensor_ts column.

  8. Click on the Properties tab, enter the following value for the Consumer Group property and click Save changes.

    Consumer Group: ssb-iot-1
    source properties
    Note
    Setting the Consumer Group properties for a virtual table will allow SSB to also store offsets in Kafka, in addition to storing offsets in the job state, which is the default.
  9. Click Create and Review to complete the table creation. On the Review window, click Keep.

  10. Let’s query the newly created table to ensure things are working correctly. Enter the following query on the SQL editor are (top-right in the Console screen):

    SELECT
      event_time,
      sensor_id,
      sensor_ts,
      is_healthy,
      sensor_0,
      sensor_1
    FROM
      iot_enriched
  11. Click on Execute. After a few seconds you should see the data from the topic displayed on the Results panel:

    Note
    The first query execution usually takes a bit longer, since SSB has to start the Job Manager that will handle the job execution.
    first query
  12. Click Stop to stop the job and release all the cluster resources used by the query. You can double-check that all queries/jobs have been stopped by clicking on the SQL Jobs tab. If any jobs are still running, you can stop them from that page.

Lab 3 - Integrate SQL Stream Builder with Schema Registry

The SQL Stream Builder’s integration with Schema Registry automatically exposes the schemas stored in the registry as tables in SSB. The schema names in Schema Registry must match the corresponding topic names in Kafka.

In this lab you will register Schema Registry as a catalog in SSB so that you can automatically read the contents of the iot_enriched_avro topic, which is stored in AVRO format.

  1. Go to the following URL, which contains the schema definition for the data in the iot_enriched_avro topic. Select and copy the contents of the page.

  2. In the Schema Registry Web UI, click the + sign to register a new schema.

  3. Click on a blank area in the Schema Text field and paste the contents you copied.

  4. Complete the schema creation by filling the following properties and save the schema.

    Name:          iot_enriched_avro
    Description:   Schema for the data in the iot_enriched_avro topic
    Type:          Avro schema provider
    Schema Group:  Kafka
    Compatibility: Backward
    Evolve:        checked
    schema registy iot enriched
  5. Back on the SQL Stream Builder page, click on Data Providers (on the left bar) > Catalogs > (+) Register Catalog.

    add catalog sr
  6. In the Catalog dialog box, enter the following details:

    Name:                sr
    Catalog Type:        Schema Registry
    Kafka Cluster:       edge2ai-kafka
    Schema Registry URL: http://<CLUSTER_HOSTNAME>:7788/api/v1
    Enable TLS:          No
  7. Click on the Add Filter button and enter the following configuration for the filter:

    Database Filter: .*
    Table Filter:    iot.*
  8. Click on the plus sign besides the filter details to register the filter:

    add filter
  9. Click on Validate. If the configuration is correct you should see the message "Provider is valid". Hover your mouse over the message and you’ll see the number of tables (schemas) that matched your filter.

    add sr catalog
  10. Click Create to complete the catalog registration.

  11. On the Console screen you should see now the list of tables that were imported from Schema Registry.

    sr tables
  12. Query the imported table to ensure it is working correctly.

    Clear the contents of the SQL editor and type the following query:

    SELECT *
    FROM `sr`.`default_database`.`iot_enriched_avro`
    Tip
    If you type only SELECT * FROM and then press CTRL+SPACE, the editor will present you a list of completion option that you can select from. This works for different parts of the SQL query and it is a handy feature to help recalling names of tables, functions, columns, etc…​ that you don’t remember.
    code completion
  13. Click on Execute. After a few seconds you should see the data from the topic displayed on the Results panel.

  14. Click Stop to stop the job and release all the cluster resources used by the query. You can double-check that all queries/jobs have been stopped by clicking on the SQL Jobs tab. If any jobs are still running, you can stop them from that page.

Lab 4 - Computing and storing aggregation results

Now that you have already run a few basic queries and confirmed that your tables are working correctly, you want to start computing aggregates for your incoming data stream and make the results available for downstream applications.

SQL Stream Builder’s Tables give us the ability to publish/store streaming data to several different services (Kafka, AWS S3, Google GCS, Kudu, HBase, etc…​).

In this lab you’ll use another Kafka table to publish the results of your aggregation to another Kafka topic.

  1. Let’s first create a topic (sensor6_stats) where to publish your aggregation results:

    1. Navigate to the SMM UI (Cloudera Manager > SMM service > Streams Messaging Manager Web UI).

    2. On the SMM UI, click the Topics tab (topics icon).

    3. Click the Add New button.

    4. Enter the following details for the topic and click Save when ready:

      1. Topic name: sensor6_stats

      2. Partitions: 10

      3. Availability: Low

      4. Cleanup Policy: delete

  2. On the SSB UI, click New Job at the top of the Console screen.

  3. On the Create New Job dialog box, enter Sensor6Stats for the Job Name and click Create Job.

  4. In the SQL editor type the query shown below.

    This query will compute aggregates over 30-seconds windows that slide forward every second. For a specific sensor value in the record (sensor_6) it computes the following aggregations for each window:

    • Number of events received

    • Sum of the sensor_6 value for all the events

    • Average of the sensor_6 value across all the events

    • Min and max values of the sensor_6 field

    • Number of events for which the sensor_6 value exceeds 70

    INSERT INTO sensor6stats
    SELECT
      sensor_id as device_id,
      HOP_END(event_time, INTERVAL '1' SECOND, INTERVAL '30' SECOND) as windowEnd,
      count(*) as sensorCount,
      sum(sensor_6) as sensorSum,
      avg(cast(sensor_6 as float)) as sensorAverage,
      min(sensor_6) as sensorMin,
      max(sensor_6) as sensorMax,
      sum(case when sensor_6 > 70 then 1 else 0 end) as sensorGreaterThan60
    FROM iot_enriched
    GROUP BY
      sensor_id,
      HOP(event_time, INTERVAL '1' SECOND, INTERVAL '30' SECOND)
  5. Before you can execute this query, though, the sensor6stats table must be created in SSB, mapping it to the sensor6_stats Kafka topic.

    Since we want the topic format to be JSON, click on Templates > local-kafka > JSON.

    template kafka json

    This will replace the query with a CREATE TABLE DDL that can be used to create a table that matches the structure of your query!

    template table ddl
    Note
    The template will complete overwrite the query in the editor. Please ensure you have a copy of the query saved somewhere before you select a template.
  6. Some of the table properties are already filled in for you. But there are a few adjustments you must make before you execute the statement:

    1. connector: Replace kafka: Local Kafka with kafka: edge2ai-kafka, which is the name of the Data Provider that you created at the beginning of the lab.

    2. topic: Replace the …​ value with the name of the topic that you created: sensor6_stats.

    3. properties.group.id: Add this new property and set it to sensor6stats-group-id.

      • This property specifies the consumer group ID to be used by consumers (readers) of this table.

    4. properties.auto.offset.reset: Add this new property and set it to latest.

      • This property specifies how the consumer should reset the partition offsets when the consumer group doesn’t yet exist.

    5. If you want to remove the comments, that’s also ok. Not necessary, though.

      Tip
      Table properties prefixed with properties. are passed directly to the Kafka client.
      template table edited
  7. Click Execute and the table will be created.

  8. Type the original query into the editor again and press Execute to run it.

  9. At the bottom of the screen you will see the log messages generated by your query execution.

    sql execution
  10. After a few seconds the Logs tab will automatically switch to the Results tab to show the results of your aggregation query.

    Note that the data displayed on the screen is only a sample of the data returned by the query, not the full data.

    sql aggr results
    Tip
    If you need more screen space to examine the query results, you can hide the tables pane by clicking on the editor option shown below:
    hide tables
  11. Check the job execution details and logs by clicking on SQL Jobs (on the left bar). Explore the options on this screen:

    1. Click on the Sensor6Stats job.

    2. Click on the SQL, Properties and Log tabs.

    3. Click on the Edit in Console View button.

    job details
  12. On the main SQL Jobs page, click on the Flink Dashboard link to open the Flink Dashboard for the Sensor6Stats job. Navigate the dashboard pages to explore details and metrics of the job execution.

    job dashboard
  13. Let’s query the sensor6_stats topic to examine the data that is being written to it. You already created the sensor6stats table and mapped it to that topic in a previous step.

    Now you only need to query the same table.

    Back in the Console screen, click on New Job and give it a name (e.g. adhoc_queries).

    Note
    The Sensor6Stats job will continue to run in the background. You can monitor and manage it through the SQL Jobs page.
  14. Enter the following query in the SQL field and execute it:

    SELECT *
    FROM sensor6stats
  15. After a few seconds you should see the contents of the sensor6_stats topic displayed on the screen:

    stats results
  16. You will need to leave the Sensor6Stats job running to use it in the next lab. Make sure you stop all other jobs to release cluster resources.

    jobs running

Lab 5 - Materialized Views

SQL Stream Builder can also take keyed snapshots of the data stream and make them available through a REST interface in the form of Materialized Views. In this lab you’ll create and query Materialized Views (MV).

You will define MVs on top of the query you created in the previous lab. Make sure that query is running before executing the steps below.

  1. On the SQL Jobs screen, verify that the Sensor6Stats job is running. Select the job and click on the Edit in Console View button.

    edit job
  2. In order to add Materialized Views to a query the job needs to be stopped. On the job page, click the Stop button to pause the job.

  3. Click on the Materialized View button and enter the following properties:

    Enable MV:             Yes
    Recreate on Job Start: No
    Ignore NULLs:          Yes
    Primary Key:           device_id
    Retention:             300
    mv config1
  4. To create a MV you need to have an API Key. The API key is the information given to clients so that they can access the MVs. If you have multiple MVs and want them to be accessed by different clients you can have multiple API keys to control access to the different MVs.

    If you have already created an API Key in SSB you can select it from the drop-down list. Otherwise, create one on the spot by clicking on the "plus" button shown above. Use ssb-lab as the Key Name.

    Once the API key is created, select it for your MV.

  5. Click Add New Query to create a new MV. You will create a view that shows all the devices for which sensor6 has had at least 1 reading above 60 in the last 300-seconds (MV window size).

    For this, enter the following parameters in the MV Query Configuration page:

    URL Pattern:   above60
    Description:   All devices with a sensor6 reading greater than 60
    Query Builder: <click "Select All" to add all columns>
    Filters:       sensorGreatThan60  greater  0
    mv config2
    mv config2b
  6. Click Apply and Save Job.

  7. Close the Materialized Views tab and click on Execute to start the job again.

  8. Click on the Materialized Views button again and on the Materialized Views tab, copy the new MV URL that’s shown on the screen and open it in a new browser tab (or simply click on the URL link). You will see the content of the MV current snapshot.

    If you refresh the page a few times you will notice that the MV snapshot is updated as new data points are coming through the stream.

    SSB keeps the last state of the data for each value of the defined primary key.

    mv contents

Materialized View with parameters

The MV you created above takes no parameters; it always returns the full content of the MV when you call the REST endpoint. It is possible to specify parameters for a MV so that you can filter the contents at query time.

In this section you will create a new MV that allows filtering by specifying a range for the sensorAverage column.

  1. First, stop the job again so that you can add another MV.

  2. Click on the Materialized Views button and then on Add New Query to create a new MV.

    Enter the following property values and click Apply and Save Job.

    URL Pattern:   above60withRange/{lowerTemp}/{upperTemp}
    Query Builder: <click "Select All" to add all columns>
    Filters:       sensorGreatThan60  greater           0
                   AND
                   sensorAverage      greater or equal  {lowerTemp}
                   AND
                   sensorAverage      less or equal     {upperTemp}
    mv config3
  3. You will notice that the new URL for this MV has placeholders for the {lowerTemp} and {upperTemp} parameters:

    mv url parameters
  4. Close the Materialized View tab and execute the job again.

  5. Click on the Materialize Views tab, and click on the link for the MV that you just created.

    Since there are parameters in this MV’s URL, instead of redirecting you directly to another browser tab, SSB ask for the parameters values first. Enter the lower and upper bounds for the temperature range you want to query, and press Go:

    mv parameters
  6. Verify that the values of the sensorAverage field in the MV must all be within the range you specified.

  7. Try changing the value range to verify that the filter is working as expected.

  8. Once you have finished the lab, click on the SQL Jobs tab and stop all your jobs to release cluster resources.

Conclusion

You have now taken data from one topic, calculated aggregated results and written these to another topic. In order to validate that this was successful you have selected the result with an independent select query. Finally, you created Materialized Views for one of your jobs and queried those views through their REST endpoints.