Skip to content

Commit

Permalink
[NU-1296] Ad-hoc tests with ververica in staging (#6127) - cherry pick
Browse files Browse the repository at this point in the history
* [NU-1296] Ad-hoc tests with ververica (#5611)

* With a new default parameter "isTest" in a couple of places ad-hoc test works without adding "flink-dropwizard-metrics-deps" to classpath

* Removed 'isTest' param as we can use already present ComponentUseCase

* One place was hardcoded to give dummy TestMetrics and this caused the test to fail, now it also matches on the ComponentUseCase
Also some cleaning

* Ad-hoc test goes through as everything gets dummy metrics in FlinkEngineRuntimeContextImpl
Need to find a way to parse ComponentUseCase, so it still will be serializable on Flink

* MetricsSpec passes with this change

* Work in progress, works everywhere except for FlinkProcessRegistrar

* FlinkProcessRegistrar now also works - there is no serialization error, parts of compilerData have to be initialized before using it in function

* Getting MetricsProviderForScenario for FlinkEngineRuntimeContextImpl and SourceMetricsFunction is moved to MetricsProviderForFlink
Also renamed FlinkEngineRuntimeContextImpl method to 'apply' as it is used to create new instance of it

* Name changed as we create MetricProvider, not just get it from somewhere

---------

Co-authored-by: Szymon Bogusz <[email protected]>
  • Loading branch information
ForrestFairy and Szymon Bogusz committed Jun 5, 2024
1 parent f2494ba commit 92779e7
Show file tree
Hide file tree
Showing 7 changed files with 70 additions and 14 deletions.
Original file line number Diff line number Diff line change
@@ -1,7 +1,14 @@
package pl.touk.nussknacker.ui.process.processingtype

import com.typesafe.scalalogging.LazyLogging
import pl.touk.nussknacker.engine._
import pl.touk.nussknacker.engine.{
ConfigWithUnresolvedVersion,
DeploymentManagerDependencies,
DeploymentManagerProvider,
ModelData,
ModelDependencies,
ProcessingTypeConfig
}
import pl.touk.nussknacker.engine.api.process.ProcessingType
import pl.touk.nussknacker.engine.deployment.EngineSetupName
import pl.touk.nussknacker.engine.util.Implicits.RichScalaMap
Expand Down
3 changes: 2 additions & 1 deletion docs/Changelog.md
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
# Changelog

1.15.1 (4 June 2024)
1.15.1 (5 June 2024)
-------------------------
* [#6126](https://github.com/TouK/nussknacker/pull/6126) Fix statistics configuration.
* [#6127](https://github.com/TouK/nussknacker/pull/6127) Ad-hoc tests available in scenarios without `flink-dropwizard-metrics-deps` in classPath

1.15.0 (17 May 2024)
-------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,33 @@ package pl.touk.nussknacker.engine.process.compiler

import org.apache.flink.api.common.functions.RuntimeContext
import pl.touk.nussknacker.engine.api.JobData
import pl.touk.nussknacker.engine.api.process.ComponentUseCase
import pl.touk.nussknacker.engine.api.runtimecontext.{ContextIdGenerator, IncContextIdGenerator}
import pl.touk.nussknacker.engine.flink.api.FlinkEngineRuntimeContext
import pl.touk.nussknacker.engine.process.compiler.MetricsProviderForFlink.createMetricsProvider
import pl.touk.nussknacker.engine.util.metrics.MetricsProviderForScenario

case class FlinkEngineRuntimeContextImpl(jobData: JobData, runtimeContext: RuntimeContext)
extends FlinkEngineRuntimeContext {
override val metricsProvider: MetricsProviderForScenario = new FlinkMetricsProviderForScenario(runtimeContext)
case class FlinkEngineRuntimeContextImpl(
jobData: JobData,
runtimeContext: RuntimeContext,
metricsProvider: MetricsProviderForScenario
) extends FlinkEngineRuntimeContext {

override def contextIdGenerator(nodeId: String): ContextIdGenerator =
new IncContextIdGenerator(jobData.metaData.name.value + "-" + nodeId + "-" + runtimeContext.getIndexOfThisSubtask)

}

object FlinkEngineRuntimeContextImpl {

// This creates FlinkEngineRuntimeContextImpl with correct metricsProviderForScenario based on ComponentUseCase
def apply(
jobData: JobData,
runtimeContext: RuntimeContext,
componentUseCase: ComponentUseCase
): FlinkEngineRuntimeContextImpl = {
val properMetricsProvider = createMetricsProvider(componentUseCase, runtimeContext)
new FlinkEngineRuntimeContextImpl(jobData, runtimeContext, properMetricsProvider)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ class FlinkProcessCompilerData(
def open(runtimeContext: RuntimeContext, nodesToUse: List[_ <: NodeData]): Unit = {
val lifecycle = compilerData.lifecycle(nodesToUse)
lifecycle.foreach {
_.open(FlinkEngineRuntimeContextImpl(jobData, runtimeContext))
_.open(FlinkEngineRuntimeContextImpl(jobData, runtimeContext, componentUseCase))
}
}

Expand Down Expand Up @@ -74,7 +74,9 @@ class FlinkProcessCompilerData(
def restartStrategy: RestartStrategies.RestartStrategyConfiguration = exceptionHandler.restartStrategy

def prepareExceptionHandler(runtimeContext: RuntimeContext): FlinkExceptionHandler = {
exceptionHandler.open(FlinkEngineRuntimeContextImpl(jobData, runtimeContext))
exceptionHandler.open(
FlinkEngineRuntimeContextImpl(jobData, runtimeContext, componentUseCase)
)
exceptionHandler
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package pl.touk.nussknacker.engine.process.compiler

import org.apache.flink.api.common.functions.RuntimeContext
import pl.touk.nussknacker.engine.api.process.ComponentUseCase
import pl.touk.nussknacker.engine.util.metrics.{MetricsProviderForScenario, NoOpMetricsProviderForScenario}

object MetricsProviderForFlink {

def createMetricsProvider(
componentUseCase: ComponentUseCase,
runtimeContext: RuntimeContext
): MetricsProviderForScenario = {
componentUseCase match {
case ComponentUseCase.TestRuntime => NoOpMetricsProviderForScenario
case _ => new FlinkMetricsProviderForScenario(runtimeContext)
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -140,12 +140,14 @@ class FlinkProcessRegistrar(
): FlinkCustomNodeContext = {
val exceptionHandlerPreparer = (runtimeContext: RuntimeContext) =>
compilerDataForProcessPart(None)(runtimeContext.getUserCodeClassLoader).prepareExceptionHandler(runtimeContext)
val jobData = compilerData.jobData
val jobData = compilerData.jobData
val componentUseCase = compilerData.componentUseCase

FlinkCustomNodeContext(
jobData,
nodeComponentId.nodeId,
compilerData.processTimeout,
convertToEngineRuntimeContext = FlinkEngineRuntimeContextImpl(jobData, _),
convertToEngineRuntimeContext = FlinkEngineRuntimeContextImpl(jobData, _, componentUseCase),
lazyParameterHelper = new FlinkLazyParameterFunctionHelper(
nodeComponentId,
exceptionHandlerPreparer,
Expand Down Expand Up @@ -182,7 +184,7 @@ class FlinkProcessRegistrar(

val start = source
.contextStream(env, nodeContext(nodeComponentInfoFrom(part), Left(ValidationContext.empty)))
.process(new SourceMetricsFunction(part.id), contextTypeInformation)
.process(new SourceMetricsFunction(part.id, compilerData.componentUseCase), contextTypeInformation)

val asyncAssigned = registerInterpretationPart(start, part, InterpretationName)

Expand Down Expand Up @@ -377,7 +379,12 @@ class FlinkProcessRegistrar(
} else {
val ti = InterpretationResultTypeInformation.create(typeInformationDetection, outputContexts)
stream.flatMap(
new SyncInterpretationFunction(compilerDataForProcessPart(Some(part)), node, validationContext, useIOMonad),
new SyncInterpretationFunction(
compilerDataForProcessPart(Some(part)),
node,
validationContext,
useIOMonad
),
ti
)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,18 @@ package pl.touk.nussknacker.engine.process.registrar
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.ProcessFunction
import org.apache.flink.util.Collector
import pl.touk.nussknacker.engine.process.compiler.FlinkMetricsProviderForScenario
import pl.touk.nussknacker.engine.api.process.ComponentUseCase
import pl.touk.nussknacker.engine.process.compiler.MetricsProviderForFlink.createMetricsProvider
import pl.touk.nussknacker.engine.util.metrics.common.OneSourceMetrics

private[registrar] class SourceMetricsFunction[T](sourceId: String) extends ProcessFunction[T, T] {
private[registrar] class SourceMetricsFunction[T](sourceId: String, componentUseCase: ComponentUseCase)
extends ProcessFunction[T, T] {

@transient private var metrics: OneSourceMetrics = _

override def open(parameters: Configuration): Unit = {
metrics = new OneSourceMetrics(sourceId)
val metricsProvider = new FlinkMetricsProviderForScenario(getRuntimeContext)
val metricsProvider = createMetricsProvider(componentUseCase, getRuntimeContext)
metrics.registerOwnMetrics(metricsProvider)
}

Expand Down

0 comments on commit 92779e7

Please sign in to comment.