Skip to content

Commit

Permalink
switch to Java API
Browse files Browse the repository at this point in the history
  • Loading branch information
novakov-alexey committed Sep 4, 2023
1 parent 876d0b8 commit 67919b2
Show file tree
Hide file tree
Showing 3 changed files with 28 additions and 14 deletions.
12 changes: 6 additions & 6 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,16 @@ Global / onChangedBuildSource := ReloadOnSourceChanges
ThisBuild / organization := "com.github.novakov-alexey"
ThisBuild / scalaVersion := "2.12.7"

// lazy val flinkVersion = "1.15.2"
lazy val flinkVersion = "1.14.6"
lazy val flinkVersion = "1.15.2"
//lazy val flinkVersion = "1.14.6"

lazy val root = (project in file(".")).settings(
name := "scala-212-savepoint",
libraryDependencies ++= Seq(
"org.apache.flink" %% "flink-streaming-java" % flinkVersion % Provided,
"org.apache.flink" %% "flink-streaming-scala" % flinkVersion % Provided,
"org.apache.flink" %% "flink-state-processor-api" % flinkVersion % Provided,
"org.apache.flink" %% "flink-clients" % flinkVersion % Provided,
"org.apache.flink" % "flink-streaming-java" % flinkVersion % Provided,
//"org.apache.flink" %% "flink-streaming-scala" % flinkVersion % Provided,
"org.apache.flink" % "flink-state-processor-api" % flinkVersion % Provided,
"org.apache.flink" % "flink-clients" % flinkVersion % Provided,
"org.apache.flink" % "flink-avro" % flinkVersion,
"org.apache.avro" % "avro" % "1.11.2"
),
Expand Down
16 changes: 16 additions & 0 deletions src/main/avro/Event.avsc
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
{
"type": "record",
"namespace": "com.github.novakovalexey",
"name": "Event",
"fields": [
{
"name": "number",
"type": "int"
},
{
"name": "count",
"type": "int",
"default": 1
}
]
}
14 changes: 6 additions & 8 deletions src/main/scala/com/github/novakovalexey/WordCount.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@ package com.github.novakovalexey
import org.apache.flink.api.common.state.{MapState, MapStateDescriptor}
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.java.ExecutionEnvironment
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
//import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.functions.KeyedProcessFunction
import org.apache.flink.streaming.api.functions.source.FromIteratorFunction
import org.apache.flink.configuration.Configuration
Expand All @@ -18,11 +19,9 @@ import org.apache.flink.api.java.functions.KeySelector
import scala.util.Random
import scala.collection.JavaConverters._

case class Event(number: Int, count: Int = 1)

object FakeSource {
val iterator = new FromIteratorFunction[Event](
(new Iterator[Event] with Serializable {
new Iterator[Event] with Serializable {
val rand = new Random()

override def hasNext: Boolean = true
Expand All @@ -31,7 +30,7 @@ object FakeSource {
Thread.sleep(1000)
Event(rand.nextInt(20))
}
}).asJava
}.asJava
)
}

Expand Down Expand Up @@ -75,9 +74,8 @@ object Main extends App {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env
.addSource(FakeSource.iterator)
.keyBy(new KeySelector[Event, Int] {
override def getKey(value: Event): Int = value.number
})
.returns(TypeInformation.of(classOf[Event]))
.keyBy(_.number, TypeInformation.of(classOf[Int]))
.process(new WordCounter())
.uid("word-count")
.print()
Expand Down

0 comments on commit 67919b2

Please sign in to comment.