From 12776105ef21891a1dc37b0ef020c96bd7846983 Mon Sep 17 00:00:00 2001 From: Zakelly Date: Wed, 13 Mar 2024 18:19:51 +0800 Subject: [PATCH] [FLINK-34516] Use new CheckpointingMode in flink-core in scala --- .../scala/StreamExecutionEnvironment.scala | 44 +++++++++++++++---- .../stream/sql/WindowAggregateITCase.scala | 4 +- .../stream/sql/WindowDeduplicateITCase.scala | 3 +- .../sql/WindowDistinctAggregateITCase.scala | 4 +- .../runtime/stream/sql/WindowJoinITCase.scala | 4 +- .../runtime/stream/sql/WindowRankITCase.scala | 3 +- .../sql/WindowTableFunctionITCase.scala | 3 +- .../utils/StreamingWithStateTestBase.scala | 4 +- 8 files changed, 43 insertions(+), 26 deletions(-) diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala index 9bf0c604a966a6..649441be8a95a2 100644 --- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala +++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala @@ -30,10 +30,10 @@ import org.apache.flink.api.java.typeutils.ResultTypeQueryable import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer import org.apache.flink.api.scala.ClosureCleaner import org.apache.flink.configuration.{Configuration, ReadableConfig} -import org.apache.flink.core.execution.{JobClient, JobListener} +import org.apache.flink.core.execution.{CheckpointingMode, JobClient, JobListener} import org.apache.flink.core.fs.Path import org.apache.flink.runtime.state.StateBackend -import org.apache.flink.streaming.api.{CheckpointingMode, TimeCharacteristic} +import org.apache.flink.streaming.api.TimeCharacteristic import org.apache.flink.streaming.api.environment.{CheckpointConfig, StreamExecutionEnvironment => JavaEnv} import org.apache.flink.streaming.api.functions.source._ import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext @@ -45,7 +45,6 @@ import _root_.scala.language.implicitConversions import com.esotericsoftware.kryo.Serializer import java.net.URI - import scala.collection.JavaConverters._ /** @@ -202,12 +201,39 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) extends AutoCloseable { @PublicEvolving def enableCheckpointing( interval: Long, - mode: CheckpointingMode, + mode: org.apache.flink.streaming.api.CheckpointingMode, force: Boolean): StreamExecutionEnvironment = { javaEnv.enableCheckpointing(interval, mode, force) this } + /** + * Enables checkpointing for the streaming job. The distributed state of the streaming dataflow + * will be periodically snapshotted. In case of a failure, the streaming dataflow will be + * restarted from the latest completed checkpoint. + * + * The job draws checkpoints periodically, in the given interval. The system uses the given + * [[org.apache.flink.streaming.api.CheckpointingMode]] for the checkpointing ("exactly once" vs + * "at least once"). The state will be stored in the configured state backend. + * + * NOTE: Checkpointing iterative streaming dataflows in not properly supported at the moment. For + * that reason, iterative jobs will not be started if used with enabled checkpointing. + * + * @param interval + * Time interval between state checkpoints in milliseconds. + * @param mode + * The checkpointing mode, selecting between "exactly once" and "at least once" guarantees. + * @deprecated + * Use [[enableCheckpointing(Long, CheckpointingMode)]] instead. + */ + @deprecated + def enableCheckpointing( + interval: Long, + mode: org.apache.flink.streaming.api.CheckpointingMode): StreamExecutionEnvironment = { + javaEnv.enableCheckpointing(interval, mode) + this + } + /** * Enables checkpointing for the streaming job. The distributed state of the streaming dataflow * will be periodically snapshotted. In case of a failure, the streaming dataflow will be @@ -218,8 +244,7 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) extends AutoCloseable { * be stored in the configured state backend. * * NOTE: Checkpointing iterative streaming dataflows in not properly supported at the moment. For - * that reason, iterative jobs will not be started if used with enabled checkpointing. To override - * this mechanism, use the [[enableCheckpointing(long, CheckpointingMode, boolean)]] method. + * that reason, iterative jobs will not be started if used with enabled checkpointing. * * @param interval * Time interval between state checkpoints in milliseconds. @@ -241,8 +266,7 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) extends AutoCloseable { * backend. * * NOTE: Checkpointing iterative streaming dataflows in not properly supported at the moment. For - * that reason, iterative jobs will not be started if used with enabled checkpointing. To override - * this mechanism, use the [[enableCheckpointing(long, CheckpointingMode, boolean)]] method. + * that reason, iterative jobs will not be started if used with enabled checkpointing. * * @param interval * Time interval between state checkpoints in milliseconds. @@ -266,8 +290,12 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) extends AutoCloseable { this } + /** @deprecated Use [[getCheckpointingConsistencyMode()]] instead. */ + @deprecated def getCheckpointingMode = javaEnv.getCheckpointingMode() + def getCheckpointingConsistencyMode = javaEnv.getCheckpointingConsistencyMode() + /** * Sets the state backend that describes how to store operator. It defines the data structures * that hold state during execution (for example hash tables, RocksDB, or other data stores). diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/WindowAggregateITCase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/WindowAggregateITCase.scala index 118b9c5a22ab5a..32005c3af5e4f7 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/WindowAggregateITCase.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/WindowAggregateITCase.scala @@ -19,7 +19,7 @@ package org.apache.flink.table.planner.runtime.stream.sql import org.apache.flink.api.common.restartstrategy.RestartStrategies import org.apache.flink.api.scala._ -import org.apache.flink.streaming.api.CheckpointingMode +import org.apache.flink.core.execution.CheckpointingMode import org.apache.flink.table.api._ import org.apache.flink.table.api.bridge.scala._ import org.apache.flink.table.api.config.OptimizerConfigOptions @@ -33,14 +33,12 @@ import org.apache.flink.table.planner.utils.AggregatePhaseStrategy import org.apache.flink.table.planner.utils.AggregatePhaseStrategy._ import org.apache.flink.testutils.junit.extensions.parameterized.{ParameterizedTestExtension, Parameters} import org.apache.flink.types.Row - import org.assertj.core.api.Assertions.assertThat import org.junit.jupiter.api.{BeforeEach, TestTemplate} import org.junit.jupiter.api.extension.ExtendWith import java.time.ZoneId import java.util - import scala.collection.JavaConversions._ @ExtendWith(Array(classOf[ParameterizedTestExtension])) diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/WindowDeduplicateITCase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/WindowDeduplicateITCase.scala index f4841747a7e34f..760a348dc6237b 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/WindowDeduplicateITCase.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/WindowDeduplicateITCase.scala @@ -19,7 +19,7 @@ package org.apache.flink.table.planner.runtime.stream.sql import org.apache.flink.api.common.restartstrategy.RestartStrategies import org.apache.flink.api.scala._ -import org.apache.flink.streaming.api.CheckpointingMode +import org.apache.flink.core.execution.CheckpointingMode import org.apache.flink.table.api.bridge.scala._ import org.apache.flink.table.planner.factories.TestValuesTableFactory import org.apache.flink.table.planner.plan.utils.JavaUserDefinedAggFunctions.ConcatDistinctAggFunction @@ -27,7 +27,6 @@ import org.apache.flink.table.planner.runtime.utils.{FailingCollectionSource, St import org.apache.flink.table.planner.runtime.utils.StreamingWithStateTestBase.StateBackendMode import org.apache.flink.testutils.junit.extensions.parameterized.ParameterizedTestExtension import org.apache.flink.types.Row - import org.assertj.core.api.Assertions.assertThat import org.junit.jupiter.api.{BeforeEach, TestTemplate} import org.junit.jupiter.api.extension.ExtendWith diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/WindowDistinctAggregateITCase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/WindowDistinctAggregateITCase.scala index 1fe3b8e6b64b7d..8303d1fb6bd082 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/WindowDistinctAggregateITCase.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/WindowDistinctAggregateITCase.scala @@ -19,7 +19,7 @@ package org.apache.flink.table.planner.runtime.stream.sql import org.apache.flink.api.common.restartstrategy.RestartStrategies import org.apache.flink.api.scala._ -import org.apache.flink.streaming.api.CheckpointingMode +import org.apache.flink.core.execution.CheckpointingMode import org.apache.flink.table.api.bridge.scala.tableConversions import org.apache.flink.table.api.config.OptimizerConfigOptions import org.apache.flink.table.planner.factories.TestValuesTableFactory @@ -27,13 +27,11 @@ import org.apache.flink.table.planner.runtime.utils.{FailingCollectionSource, St import org.apache.flink.table.planner.runtime.utils.StreamingWithStateTestBase.{HEAP_BACKEND, ROCKSDB_BACKEND, StateBackendMode} import org.apache.flink.testutils.junit.extensions.parameterized.{ParameterizedTestExtension, Parameters} import org.apache.flink.types.Row - import org.assertj.core.api.Assertions.assertThat import org.junit.jupiter.api.{BeforeEach, TestTemplate} import org.junit.jupiter.api.extension.ExtendWith import java.util - import scala.collection.JavaConversions._ /** IT cases for window aggregates with distinct aggregates. */ diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/WindowJoinITCase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/WindowJoinITCase.scala index 3e4a33f0ab7a80..ae9cc84eabbd45 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/WindowJoinITCase.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/WindowJoinITCase.scala @@ -19,21 +19,19 @@ package org.apache.flink.table.planner.runtime.stream.sql import org.apache.flink.api.common.restartstrategy.RestartStrategies import org.apache.flink.api.scala._ -import org.apache.flink.streaming.api.CheckpointingMode +import org.apache.flink.core.execution.CheckpointingMode import org.apache.flink.table.api.bridge.scala._ import org.apache.flink.table.planner.factories.TestValuesTableFactory import org.apache.flink.table.planner.runtime.utils.{FailingCollectionSource, StreamingWithStateTestBase, TestData, TestingAppendSink} import org.apache.flink.table.planner.runtime.utils.StreamingWithStateTestBase.{HEAP_BACKEND, ROCKSDB_BACKEND, StateBackendMode} import org.apache.flink.testutils.junit.extensions.parameterized.{ParameterizedTestExtension, Parameters} import org.apache.flink.types.Row - import org.assertj.core.api.Assertions.assertThat import org.junit.jupiter.api.{BeforeEach, TestTemplate} import org.junit.jupiter.api.extension.ExtendWith import java.time.ZoneId import java.util - import scala.collection.JavaConversions._ @ExtendWith(Array(classOf[ParameterizedTestExtension])) diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/WindowRankITCase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/WindowRankITCase.scala index 7a2e4347589387..6db952c4d2fbbd 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/WindowRankITCase.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/WindowRankITCase.scala @@ -19,7 +19,7 @@ package org.apache.flink.table.planner.runtime.stream.sql import org.apache.flink.api.common.restartstrategy.RestartStrategies import org.apache.flink.api.scala._ -import org.apache.flink.streaming.api.CheckpointingMode +import org.apache.flink.core.execution.CheckpointingMode import org.apache.flink.table.api.bridge.scala._ import org.apache.flink.table.planner.factories.TestValuesTableFactory import org.apache.flink.table.planner.plan.utils.JavaUserDefinedAggFunctions.ConcatDistinctAggFunction @@ -27,7 +27,6 @@ import org.apache.flink.table.planner.runtime.utils.{FailingCollectionSource, St import org.apache.flink.table.planner.runtime.utils.StreamingWithStateTestBase.StateBackendMode import org.apache.flink.testutils.junit.extensions.parameterized.ParameterizedTestExtension import org.apache.flink.types.Row - import org.assertj.core.api.Assertions.assertThat import org.junit.jupiter.api.{BeforeEach, TestTemplate} import org.junit.jupiter.api.extension.ExtendWith diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/WindowTableFunctionITCase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/WindowTableFunctionITCase.scala index 6a55aaa117481b..2206e79cd49f88 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/WindowTableFunctionITCase.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/WindowTableFunctionITCase.scala @@ -18,13 +18,12 @@ package org.apache.flink.table.planner.runtime.stream.sql import org.apache.flink.api.common.restartstrategy.RestartStrategies -import org.apache.flink.streaming.api.CheckpointingMode +import org.apache.flink.core.execution.CheckpointingMode import org.apache.flink.table.api.bridge.scala._ import org.apache.flink.table.planner.factories.TestValuesTableFactory import org.apache.flink.table.planner.runtime.utils.{FailingCollectionSource, StreamingWithStateTestBase, TestData, TestingAppendSink} import org.apache.flink.table.planner.runtime.utils.StreamingWithStateTestBase.StateBackendMode import org.apache.flink.testutils.junit.extensions.parameterized.ParameterizedTestExtension - import org.assertj.core.api.Assertions.assertThat import org.junit.jupiter.api.{BeforeEach, TestTemplate} import org.junit.jupiter.api.extension.ExtendWith diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/utils/StreamingWithStateTestBase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/utils/StreamingWithStateTestBase.scala index 3a335f846ca5dd..2478c8c01a649e 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/utils/StreamingWithStateTestBase.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/utils/StreamingWithStateTestBase.scala @@ -22,8 +22,8 @@ import org.apache.flink.api.common.typeinfo.{TypeInformation, Types} import org.apache.flink.api.common.typeutils.CompositeType import org.apache.flink.configuration.{CheckpointingOptions, Configuration} import org.apache.flink.contrib.streaming.state.RocksDBStateBackend +import org.apache.flink.core.execution.CheckpointingMode import org.apache.flink.runtime.state.memory.MemoryStateBackend -import org.apache.flink.streaming.api.CheckpointingMode import org.apache.flink.streaming.api.functions.source.FromElementsFunction import org.apache.flink.streaming.api.scala.DataStream import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment @@ -36,13 +36,11 @@ import org.apache.flink.table.runtime.types.TypeInfoLogicalTypeConverter import org.apache.flink.table.runtime.typeutils.InternalTypeInfo import org.apache.flink.table.types.logical.RowType import org.apache.flink.testutils.junit.extensions.parameterized.Parameters - import org.assertj.core.api.Assertions.assertThat import org.junit.jupiter.api.{AfterEach, BeforeEach} import java.nio.file.Files import java.util - import scala.collection.JavaConversions._ import scala.collection.mutable import scala.collection.mutable.ArrayBuffer