Skip to content

Commit

Permalink
add savepoint rewrite example
Browse files Browse the repository at this point in the history
  • Loading branch information
novakov-alexey committed Sep 13, 2023
1 parent 67919b2 commit 81e811d
Show file tree
Hide file tree
Showing 3 changed files with 101 additions and 22 deletions.
6 changes: 3 additions & 3 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,14 @@ Global / onChangedBuildSource := ReloadOnSourceChanges
ThisBuild / organization := "com.github.novakov-alexey"
ThisBuild / scalaVersion := "2.12.7"

lazy val flinkVersion = "1.15.2"
//lazy val flinkVersion = "1.14.6"
lazy val flinkVersion = "1.15.2"
// lazy val flinkVersion = "1.14.6"

lazy val root = (project in file(".")).settings(
name := "scala-212-savepoint",
libraryDependencies ++= Seq(
"org.apache.flink" % "flink-streaming-java" % flinkVersion % Provided,
//"org.apache.flink" %% "flink-streaming-scala" % flinkVersion % Provided,
"org.apache.flink" %% "flink-streaming-scala" % flinkVersion % Provided,
"org.apache.flink" % "flink-state-processor-api" % flinkVersion % Provided,
"org.apache.flink" % "flink-clients" % flinkVersion % Provided,
"org.apache.flink" % "flink-avro" % flinkVersion,
Expand Down
10 changes: 9 additions & 1 deletion src/main/avro/WordCountState.avsc
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,15 @@
},
{
"name": "changed",
"type": "long"
"type": "long"
},
{
"name": "lastCount",
"type": [
"null",
"int"
],
"default": null
}
]
}
107 changes: 89 additions & 18 deletions src/main/scala/com/github/novakovalexey/WordCount.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,35 @@ package com.github.novakovalexey

import org.apache.flink.api.common.state.{MapState, MapStateDescriptor}
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.java.ExecutionEnvironment
import org.apache.flink.api.java.functions.KeySelector
import org.apache.flink.api.java.operators.DataSource
import org.apache.flink.runtime.state.{
FunctionInitializationContext,
FunctionSnapshotContext
}
import org.apache.flink.state.api.{
BootstrapTransformation,
SavepointReader,
SavepointWriter
}
import org.apache.flink.state.api.functions.{
KeyedStateBootstrapFunction,
StateBootstrapFunction
}
import org.apache.flink.api.scala.ExecutionEnvironment
// import org.apache.flink.api.java.ExecutionEnvironment
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
//import org.apache.flink.streaming.api.scala._
//import org.apache.flink.streaming.api.scala.createTypeInformation
import org.apache.flink.streaming.api.functions.KeyedProcessFunction
import org.apache.flink.streaming.api.functions.source.FromIteratorFunction
import org.apache.flink.configuration.Configuration
import org.apache.flink.runtime.state.hashmap.HashMapStateBackend
import org.apache.flink.state.api.Savepoint
import org.apache.flink.state.api.OperatorTransformation
import org.apache.flink.state.api.functions.KeyedStateReaderFunction
import org.apache.flink.state.api.functions.KeyedStateReaderFunction.Context
import org.apache.flink.util.Collector
import WordCounter._
import org.apache.flink.api.java.functions.KeySelector

import scala.util.Random
import scala.collection.JavaConverters._
Expand All @@ -36,12 +52,18 @@ object FakeSource {

object WordCounter {
val stateDescriptor =
new MapStateDescriptor("wordCounter", classOf[Int], classOf[WordCountState])
new MapStateDescriptor(
"wordCounter",
TypeInformation.of(classOf[Int]),
// createTypeInformation[Int],
TypeInformation.of(classOf[WordCountState])
// createTypeInformation[WordCountState]
)
}

class WordCounter extends KeyedProcessFunction[Int, Event, Event] {
// MapState is used here to check serialization. ValueState would be more efficient
@transient var countState: MapState[Int, WordCountState] = _
// MapState is used here to check serialization. ValueState would be enough to count numbers
private var countState: MapState[Int, WordCountState] = _

override def open(parameters: Configuration): Unit =
countState = getRuntimeContext.getMapState(stateDescriptor)
Expand All @@ -60,7 +82,8 @@ class WordCounter extends KeyedProcessFunction[Int, Event, Event] {
WordCountState(
event.number,
event.count + count,
System.currentTimeMillis
System.currentTimeMillis,
Some(count)
)
)
collector.collect(event.copy(count = count))
Expand All @@ -69,13 +92,17 @@ class WordCounter extends KeyedProcessFunction[Int, Event, Event] {

object Main extends App {
val conf = new Configuration()
//conf.setString("state.savepoints.dir", "file:///tmp/savepoints")
// val env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf)
val env = StreamExecutionEnvironment.getExecutionEnvironment
conf.setString("state.savepoints.dir", "file:///tmp/savepoints")
val env = StreamExecutionEnvironment.getExecutionEnvironment(conf)
// env.getConfig.disableGenericTypes()

val eventTi = TypeInformation.of(classOf[Event])

env
.addSource(FakeSource.iterator)
.returns(TypeInformation.of(classOf[Event]))
.keyBy(_.number, TypeInformation.of(classOf[Int]))
.returns(eventTi)
.keyBy((e: Event) => e.number, TypeInformation.of(classOf[Int]))
// .keyBy(_.number, createTypeInformation[Int])
.process(new WordCounter())
.uid("word-count")
.print()
Expand All @@ -93,25 +120,69 @@ class ReaderFunction extends KeyedStateReaderFunction[Int, WordCountState] {
key: Int,
ctx: Context,
out: Collector[WordCountState]
): Unit =
): Unit = {
val state = countState.get(key)
out.collect(
WordCountState(key, countState.get(key).count, System.currentTimeMillis)
WordCountState(
key,
state.count,
System.currentTimeMillis,
state.lastCount
)
)
}
}

object ReadState extends App {
val env = ExecutionEnvironment.getExecutionEnvironment
val savepoint = Savepoint.load(
val env = StreamExecutionEnvironment.getExecutionEnvironment
val oldSavepointPath = "/tmp/flink-savepoints/savepoint-7fb950-384cc7627885"
val savepoint = SavepointReader.read(
env,
"/tmp/flink-savepoints/savepoint-5103b1-065fc210b104",
oldSavepointPath,
new HashMapStateBackend()
)

val keyedState = savepoint.readKeyedState(
"word-count",
new ReaderFunction(),
TypeInformation.of(classOf[Int]),
// createTypeInformation[Int], // comes from flink-scala-api
TypeInformation.of(classOf[WordCountState])
// createTypeInformation[WordCountState] // comes from flink-scala-api
)
val res = keyedState.collect().asScala
val res = keyedState.executeAndCollect().asScala
println(res.mkString("\n"))

val transformation = OperatorTransformation
.bootstrapWith(keyedState)
.keyBy(
(value: WordCountState) => value.key,
TypeInformation.of(classOf[Int])
)
.transform(new KeyedStateBootstrapFunction[Int, WordCountState] {
private var countState: MapState[Int, WordCountState] = _

override def open(parameters: Configuration): Unit = {
val descriptor = new MapStateDescriptor(
"wordCounter",
TypeInformation.of(classOf[Int]),
TypeInformation.of(classOf[WordCountState])
) // this target state descriptor, which can be used to use different serializers / type info
countState = getRuntimeContext.getMapState(descriptor)
}

override def processElement(
value: WordCountState,
ctx: KeyedStateBootstrapFunction[Int, WordCountState]#Context
): Unit =
countState.put(value.key, value)
})

SavepointWriter
.fromExistingSavepoint(oldSavepointPath)
.removeOperator("word-count")
.withOperator("word-count", transformation)
.write(oldSavepointPath.replaceAll("savepoint-", "new-savepoint-"))

env.execute()
}

0 comments on commit 81e811d

Please sign in to comment.