Skip to content

Latest commit

 

History

History
163 lines (118 loc) · 6.49 KB

File metadata and controls

163 lines (118 loc) · 6.49 KB

kafka2avro

This example shows how to use Apache Beam and SCIO to read objects from a Kafka topic, and serialize them encoded as Avro files in Google Cloud Storage.

This example contains two Dataflow pipelines:

  • Object2Kafka: generates a set of objects and write them to Kafka. This is a batch mode pipeline.
  • Kafka2Avro: reads objects from Kafka, convert them to Avro, and write the output to Google Cloud Storage. This is a streaming mode pipeline

Configuration

Before compiling and generating your package, you need to change some options in src/main/resources/application.conf:

  • broker: String with the address of the Kafka brokers.
  • dest-bucket: The name of the bucket where the Avro files will be written
  • dest-path: The directories structure where the Avro files will be written (e.g. blank to write in the top level dir in the bucket, or anything like a/b/c).
  • kafka-topic: The name of the topic where the objects are written to, or read from.
  • num-demo-objects: Number of objects that will be generated by the Object2Kafka pipeline, these objects can be read with the Kafka2Avro pipeline to test that everything is working as expected.

The configuration file follows the HOCON format.

Here is a sample configuration file with all the options set:

broker = "1.2.3.4:9092"
dest-bucket = "my-bucket-in-gcs"
dest-path = "persisted/fromkafka/avro/"
kafka-topic = "my_kafka_topic"
num-demo-objects = 500  # comments are allowed in the config file

Pre-requirements

Build tool

This example is written in Scala and uses SBT as build tool.

You need to have SBT >= 1.0 installed. You can download SBT from https://www.scala-sbt.org/

The Scala version is 2.12.8. If you have the JDK > 1.8 installed, SBT should automatically download the Scala compiler.

Compile

Run sbt in the top sources folder.

Inside sbt, download all the dependencies:

sbt:kafka2avro> update

and then compile

sbt:kafka2avro> compile

Deploy and run

If you have managed to compile the code, you can generate a JAR package to be deployed on Dataflow, with:

sbt:kafka2avro> pack

This will generate a set of JAR files in target/pack/lib

Running the Object2Kafka pipeline

This is batch pipeline, provided just an example to populate Kafka and test the streaming pipeline.

Once you have generated the JAR file using the pack command inside SBT, you can now launch the job in Dataflow to populate Kafka with some demo objects. Using Java 1.8, run the following command. Notice that you have to set the project id, and a location in a GCS bucket to store the JARs imported by Dataflow:

CLASSPATH="target/pack/lib/*" java com.google.cloud.pso.kafka2avro.Object2Kafka --exec.mainClass=com.google.cloud.pso.kafka2avro.Object2Kafka --project=YOUR_PROJECT_ID --stagingLocation="gs://YOUR_BUCKET/YOUR_STAGING_LOCATION" --runner=DataflowRunner

Running the Kafka2Avro pipeline

This is a streaming pipeline. It will keep running unless you cancel it. The default windowing policy is to group messages every 2 minutes, in a fixed window. To change the policy, please see the function windowIn in Kafka2Avro.scala.

Once you have generated the JAR file using the pack command inside SBT, you can now launch the job in Dataflow to populate Kafka with some demo objects. Using Java 1.8, run the following command. Notice that you have to set the project id, and a location in a GCS bucket to store the JARs imported by Dataflow:

CLASSPATH="target/pack/lib/*" java com.google.cloud.pso.kafka2avro.Kafka2Avro --exec.mainClass=com.google.cloud.pso.kafka2avro.Kafka2Avro --project=YOUR_PROJECT_ID --stagingLocation="gs://YOUR_BUCKET/YOUR_STAGING_LOCATION" --runner=DataflowRunner

Please remember that the machine running the JAR may need to have connectivity to the Kafka cluster in order to retrieve some metadata, prior to launching the pipeline in Dataflow.

Remember that this is a streaming pipeline, it will keep running forever until you cancel or stop it.

Wrong filenames for some dependencies

In some cases, some dependencies may be downloaded with wrong filenames. For instance, containing symbols that need to be escaped. Importing these JARs in the job in Dataflow will fail.

If when running your Dataflow job, it fails before it is launched because it cannot copy some dependencies, change the name of the offending files so they don't contain symbols. For instance:

mv target/pack/lib/netty-codec-http2-\[4.1.25.Final,4.1.25.Final\].jar target/pack/lib/netty-codec-http2.jar

Continuous Integration

This example includes a configuration file for Cloud Build, so you can use it to run the unit tests with every commit done to your repository. To use this configuration file:

  • Add your sources to a Git repository (either in Bitbucket, Github or Google Cloud Source).
  • Configure a trigger in Google Cloud Build linked to your Git repository.
  • Set the path for the configuration file to cloudbuild.yaml.

The included configuration file will do the following steps:

  • Download a cache for Ivy2 from a Google Cloud Storage bucket named YOURPROJECT_cache, where YOURPROJECT is your GCP project id.
  • Compile and test the Scala code.
  • Generate a package.
  • Upload the new Ivy2 cache to the same bucket as in the first step.
  • Upload the generated package and all its dependencies to a bucket named YOURPROJECT_pkgs, where YOURPROJECT is your GCP project id.

So these default steps will try to write to and read from two different buckets in Google Cloud Storage. Please either create these buckets in your GCP project, or change the configuration.

Please note that you need to build and include the scala-sbt Cloud Builder in order to use this configuration file.

  • Make sure you have the Google Cloud SDK configured with your credentials and project
  • Download the sources from GoogleCloudPlatform/cloud-builders-community/tree/master/scala-sbt
  • And then in the scala-sbt sources dir, run gcloud builds submit . --config=cloudbuild.yaml to add the builder to your GCP project. You only need to do this once.