From f2494ba9e16f905723f40f19252d7511727be97c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Bigorajski?= <72501021+lukasz-bigorajski@users.noreply.github.com> Date: Tue, 4 Jun 2024 15:25:30 +0200 Subject: [PATCH 1/2] Fix statistics configuration (#6126) - cherry pick MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --------- Co-authored-by: Ɓukasz Bigorajski --- designer/client/src/actions/nk/assignSettings.ts | 1 - .../pl/touk/nussknacker/ui/api/SettingsResources.scala | 10 +++++----- .../ui/server/AkkaHttpBasedRouteProvider.scala | 3 ++- .../nussknacker/ui/api/SettingsResourcesSpec.scala | 8 ++++++-- docs/Changelog.md | 4 ++++ 5 files changed, 17 insertions(+), 9 deletions(-) diff --git a/designer/client/src/actions/nk/assignSettings.ts b/designer/client/src/actions/nk/assignSettings.ts index ce754179ca7..89ce9c01e33 100644 --- a/designer/client/src/actions/nk/assignSettings.ts +++ b/designer/client/src/actions/nk/assignSettings.ts @@ -10,7 +10,6 @@ export type MetricsType = { export type UsageStatisticsReports = { enabled: boolean; - url?: string; }; export type SurveySettings = { diff --git a/designer/server/src/main/scala/pl/touk/nussknacker/ui/api/SettingsResources.scala b/designer/server/src/main/scala/pl/touk/nussknacker/ui/api/SettingsResources.scala index 5607ee55fb7..1015bbe0163 100644 --- a/designer/server/src/main/scala/pl/touk/nussknacker/ui/api/SettingsResources.scala +++ b/designer/server/src/main/scala/pl/touk/nussknacker/ui/api/SettingsResources.scala @@ -5,7 +5,7 @@ import cats.data.Validated import de.heikoseeberger.akkahttpcirce.FailFastCirceSupport import io.circe.{Decoder, Encoder} import io.circe.generic.JsonCodec -import pl.touk.nussknacker.ui.config.{AnalyticsConfig, FeatureTogglesConfig} +import pl.touk.nussknacker.ui.config.{AnalyticsConfig, FeatureTogglesConfig, UsageStatisticsReportsConfig} import pl.touk.nussknacker.engine.api.CirceUtil.codecs._ import java.net.URL @@ -14,7 +14,8 @@ import scala.concurrent.ExecutionContext class SettingsResources( config: FeatureTogglesConfig, authenticationMethod: String, - analyticsConfig: Option[AnalyticsConfig] + analyticsConfig: Option[AnalyticsConfig], + usageStatisticsReportsConfig: UsageStatisticsReportsConfig )(implicit ec: ExecutionContext) extends Directives with FailFastCirceSupport @@ -36,8 +37,7 @@ class SettingsResources( intervalTimeSettings = config.intervalTimeSettings, testDataSettings = config.testDataSettings, redirectAfterArchive = config.redirectAfterArchive, - // TODO: It's disabled temporarily until we remove it on FE. We can remove it once it has been removed on FE. - usageStatisticsReports = UsageStatisticsReportsSettings(false, None) + usageStatisticsReports = UsageStatisticsReportsSettings(usageStatisticsReportsConfig.enabled) ) val authenticationSettings = AuthenticationSettings( @@ -145,4 +145,4 @@ object TopTabType extends Enumeration { analytics: Option[AnalyticsSettings] ) -@JsonCodec final case class UsageStatisticsReportsSettings(enabled: Boolean, url: Option[String]) +@JsonCodec final case class UsageStatisticsReportsSettings(enabled: Boolean) diff --git a/designer/server/src/main/scala/pl/touk/nussknacker/ui/server/AkkaHttpBasedRouteProvider.scala b/designer/server/src/main/scala/pl/touk/nussknacker/ui/server/AkkaHttpBasedRouteProvider.scala index 11e77be15c8..c38680bd906 100644 --- a/designer/server/src/main/scala/pl/touk/nussknacker/ui/server/AkkaHttpBasedRouteProvider.scala +++ b/designer/server/src/main/scala/pl/touk/nussknacker/ui/server/AkkaHttpBasedRouteProvider.scala @@ -442,7 +442,8 @@ class AkkaHttpBasedRouteProvider( val settingsResources = new SettingsResources( featureTogglesConfig, authenticationResources.name, - analyticsConfig + analyticsConfig, + usageStatisticsReportsConfig ) val apiResourcesWithoutAuthentication: List[Route] = List( settingsResources.publicRoute(), diff --git a/designer/server/src/test/scala/pl/touk/nussknacker/ui/api/SettingsResourcesSpec.scala b/designer/server/src/test/scala/pl/touk/nussknacker/ui/api/SettingsResourcesSpec.scala index 512673c94dc..a3123291b84 100644 --- a/designer/server/src/test/scala/pl/touk/nussknacker/ui/api/SettingsResourcesSpec.scala +++ b/designer/server/src/test/scala/pl/touk/nussknacker/ui/api/SettingsResourcesSpec.scala @@ -9,7 +9,7 @@ import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach} import pl.touk.nussknacker.test.PatientScalaFutures import pl.touk.nussknacker.test.base.it.NuResourcesTest import pl.touk.nussknacker.test.utils.domain.TestFactory.withoutPermissions -import pl.touk.nussknacker.ui.config.AnalyticsConfig +import pl.touk.nussknacker.ui.config.{AnalyticsConfig, UsageStatisticsReportsConfig} import pl.touk.nussknacker.ui.security.basicauth.BasicAuthenticationConfiguration class SettingsResourcesSpec @@ -25,11 +25,14 @@ class SettingsResourcesSpec private val authenticationConfig: BasicAuthenticationConfiguration = BasicAuthenticationConfiguration.create(testConfig) private val analyticsConfig: Option[AnalyticsConfig] = AnalyticsConfig(testConfig) + private val usageStatisticsReportsConfig: UsageStatisticsReportsConfig = + UsageStatisticsReportsConfig(true, None, None) private val settingsRoute = new SettingsResources( featureTogglesConfig, authenticationConfig.name, - analyticsConfig + analyticsConfig, + usageStatisticsReportsConfig ) // Values are exists at test/resources/application.conf @@ -44,6 +47,7 @@ class SettingsResourcesSpec data.intervalTimeSettings.processes shouldBe intervalTimeProcesses data.intervalTimeSettings.healthCheck shouldBe intervalTimeHealthCheck + data.usageStatisticsReports.enabled shouldBe true } } diff --git a/docs/Changelog.md b/docs/Changelog.md index aa864bca5db..befe54f56e3 100644 --- a/docs/Changelog.md +++ b/docs/Changelog.md @@ -1,5 +1,9 @@ # Changelog +1.15.1 (4 June 2024) +------------------------- +* [#6126](https://github.com/TouK/nussknacker/pull/6126) Fix statistics configuration. + 1.15.0 (17 May 2024) ------------------------- * [#5620](https://github.com/TouK/nussknacker/pull/5620) Nodes Api OpenApi-based documentation (e.g. `https://demo.nussknacker.io/api/docs`) From 92779e75279a7b410a6281abb5737c9e18b3ee51 Mon Sep 17 00:00:00 2001 From: ForrestFairy Date: Wed, 5 Jun 2024 08:53:45 +0200 Subject: [PATCH 2/2] [NU-1296] Ad-hoc tests with ververica in staging (#6127) - cherry pick * [NU-1296] Ad-hoc tests with ververica (#5611) * With a new default parameter "isTest" in a couple of places ad-hoc test works without adding "flink-dropwizard-metrics-deps" to classpath * Removed 'isTest' param as we can use already present ComponentUseCase * One place was hardcoded to give dummy TestMetrics and this caused the test to fail, now it also matches on the ComponentUseCase Also some cleaning * Ad-hoc test goes through as everything gets dummy metrics in FlinkEngineRuntimeContextImpl Need to find a way to parse ComponentUseCase, so it still will be serializable on Flink * MetricsSpec passes with this change * Work in progress, works everywhere except for FlinkProcessRegistrar * FlinkProcessRegistrar now also works - there is no serialization error, parts of compilerData have to be initialized before using it in function * Getting MetricsProviderForScenario for FlinkEngineRuntimeContextImpl and SourceMetricsFunction is moved to MetricsProviderForFlink Also renamed FlinkEngineRuntimeContextImpl method to 'apply' as it is used to create new instance of it * Name changed as we create MetricProvider, not just get it from somewhere --------- Co-authored-by: Szymon Bogusz --- .../ProcessingTypeDataReader.scala | 9 ++++++- docs/Changelog.md | 3 ++- .../FlinkEngineRuntimeContextImpl.scala | 24 ++++++++++++++++--- .../compiler/FlinkProcessCompilerData.scala | 6 +++-- .../compiler/MetricsProviderForFlink.scala | 19 +++++++++++++++ .../registrar/FlinkProcessRegistrar.scala | 15 ++++++++---- .../registrar/SourceMetricsFunction.scala | 8 ++++--- 7 files changed, 70 insertions(+), 14 deletions(-) create mode 100644 engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/compiler/MetricsProviderForFlink.scala diff --git a/designer/server/src/main/scala/pl/touk/nussknacker/ui/process/processingtype/ProcessingTypeDataReader.scala b/designer/server/src/main/scala/pl/touk/nussknacker/ui/process/processingtype/ProcessingTypeDataReader.scala index 53839aa59e2..c6ab9a5752c 100644 --- a/designer/server/src/main/scala/pl/touk/nussknacker/ui/process/processingtype/ProcessingTypeDataReader.scala +++ b/designer/server/src/main/scala/pl/touk/nussknacker/ui/process/processingtype/ProcessingTypeDataReader.scala @@ -1,7 +1,14 @@ package pl.touk.nussknacker.ui.process.processingtype import com.typesafe.scalalogging.LazyLogging -import pl.touk.nussknacker.engine._ +import pl.touk.nussknacker.engine.{ + ConfigWithUnresolvedVersion, + DeploymentManagerDependencies, + DeploymentManagerProvider, + ModelData, + ModelDependencies, + ProcessingTypeConfig +} import pl.touk.nussknacker.engine.api.process.ProcessingType import pl.touk.nussknacker.engine.deployment.EngineSetupName import pl.touk.nussknacker.engine.util.Implicits.RichScalaMap diff --git a/docs/Changelog.md b/docs/Changelog.md index befe54f56e3..f0516761daf 100644 --- a/docs/Changelog.md +++ b/docs/Changelog.md @@ -1,8 +1,9 @@ # Changelog -1.15.1 (4 June 2024) +1.15.1 (5 June 2024) ------------------------- * [#6126](https://github.com/TouK/nussknacker/pull/6126) Fix statistics configuration. +* [#6127](https://github.com/TouK/nussknacker/pull/6127) Ad-hoc tests available in scenarios without `flink-dropwizard-metrics-deps` in classPath 1.15.0 (17 May 2024) ------------------------- diff --git a/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/compiler/FlinkEngineRuntimeContextImpl.scala b/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/compiler/FlinkEngineRuntimeContextImpl.scala index d4ebaa1fb0f..53fc8dc2ea8 100644 --- a/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/compiler/FlinkEngineRuntimeContextImpl.scala +++ b/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/compiler/FlinkEngineRuntimeContextImpl.scala @@ -2,15 +2,33 @@ package pl.touk.nussknacker.engine.process.compiler import org.apache.flink.api.common.functions.RuntimeContext import pl.touk.nussknacker.engine.api.JobData +import pl.touk.nussknacker.engine.api.process.ComponentUseCase import pl.touk.nussknacker.engine.api.runtimecontext.{ContextIdGenerator, IncContextIdGenerator} import pl.touk.nussknacker.engine.flink.api.FlinkEngineRuntimeContext +import pl.touk.nussknacker.engine.process.compiler.MetricsProviderForFlink.createMetricsProvider import pl.touk.nussknacker.engine.util.metrics.MetricsProviderForScenario -case class FlinkEngineRuntimeContextImpl(jobData: JobData, runtimeContext: RuntimeContext) - extends FlinkEngineRuntimeContext { - override val metricsProvider: MetricsProviderForScenario = new FlinkMetricsProviderForScenario(runtimeContext) +case class FlinkEngineRuntimeContextImpl( + jobData: JobData, + runtimeContext: RuntimeContext, + metricsProvider: MetricsProviderForScenario +) extends FlinkEngineRuntimeContext { override def contextIdGenerator(nodeId: String): ContextIdGenerator = new IncContextIdGenerator(jobData.metaData.name.value + "-" + nodeId + "-" + runtimeContext.getIndexOfThisSubtask) } + +object FlinkEngineRuntimeContextImpl { + +// This creates FlinkEngineRuntimeContextImpl with correct metricsProviderForScenario based on ComponentUseCase + def apply( + jobData: JobData, + runtimeContext: RuntimeContext, + componentUseCase: ComponentUseCase + ): FlinkEngineRuntimeContextImpl = { + val properMetricsProvider = createMetricsProvider(componentUseCase, runtimeContext) + new FlinkEngineRuntimeContextImpl(jobData, runtimeContext, properMetricsProvider) + } + +} diff --git a/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/compiler/FlinkProcessCompilerData.scala b/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/compiler/FlinkProcessCompilerData.scala index d9bf19554ce..5a9ab89750d 100644 --- a/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/compiler/FlinkProcessCompilerData.scala +++ b/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/compiler/FlinkProcessCompilerData.scala @@ -39,7 +39,7 @@ class FlinkProcessCompilerData( def open(runtimeContext: RuntimeContext, nodesToUse: List[_ <: NodeData]): Unit = { val lifecycle = compilerData.lifecycle(nodesToUse) lifecycle.foreach { - _.open(FlinkEngineRuntimeContextImpl(jobData, runtimeContext)) + _.open(FlinkEngineRuntimeContextImpl(jobData, runtimeContext, componentUseCase)) } } @@ -74,7 +74,9 @@ class FlinkProcessCompilerData( def restartStrategy: RestartStrategies.RestartStrategyConfiguration = exceptionHandler.restartStrategy def prepareExceptionHandler(runtimeContext: RuntimeContext): FlinkExceptionHandler = { - exceptionHandler.open(FlinkEngineRuntimeContextImpl(jobData, runtimeContext)) + exceptionHandler.open( + FlinkEngineRuntimeContextImpl(jobData, runtimeContext, componentUseCase) + ) exceptionHandler } diff --git a/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/compiler/MetricsProviderForFlink.scala b/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/compiler/MetricsProviderForFlink.scala new file mode 100644 index 00000000000..60457f82db0 --- /dev/null +++ b/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/compiler/MetricsProviderForFlink.scala @@ -0,0 +1,19 @@ +package pl.touk.nussknacker.engine.process.compiler + +import org.apache.flink.api.common.functions.RuntimeContext +import pl.touk.nussknacker.engine.api.process.ComponentUseCase +import pl.touk.nussknacker.engine.util.metrics.{MetricsProviderForScenario, NoOpMetricsProviderForScenario} + +object MetricsProviderForFlink { + + def createMetricsProvider( + componentUseCase: ComponentUseCase, + runtimeContext: RuntimeContext + ): MetricsProviderForScenario = { + componentUseCase match { + case ComponentUseCase.TestRuntime => NoOpMetricsProviderForScenario + case _ => new FlinkMetricsProviderForScenario(runtimeContext) + } + } + +} diff --git a/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/registrar/FlinkProcessRegistrar.scala b/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/registrar/FlinkProcessRegistrar.scala index 62b9f50f8d7..0a907feeaa8 100644 --- a/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/registrar/FlinkProcessRegistrar.scala +++ b/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/registrar/FlinkProcessRegistrar.scala @@ -140,12 +140,14 @@ class FlinkProcessRegistrar( ): FlinkCustomNodeContext = { val exceptionHandlerPreparer = (runtimeContext: RuntimeContext) => compilerDataForProcessPart(None)(runtimeContext.getUserCodeClassLoader).prepareExceptionHandler(runtimeContext) - val jobData = compilerData.jobData + val jobData = compilerData.jobData + val componentUseCase = compilerData.componentUseCase + FlinkCustomNodeContext( jobData, nodeComponentId.nodeId, compilerData.processTimeout, - convertToEngineRuntimeContext = FlinkEngineRuntimeContextImpl(jobData, _), + convertToEngineRuntimeContext = FlinkEngineRuntimeContextImpl(jobData, _, componentUseCase), lazyParameterHelper = new FlinkLazyParameterFunctionHelper( nodeComponentId, exceptionHandlerPreparer, @@ -182,7 +184,7 @@ class FlinkProcessRegistrar( val start = source .contextStream(env, nodeContext(nodeComponentInfoFrom(part), Left(ValidationContext.empty))) - .process(new SourceMetricsFunction(part.id), contextTypeInformation) + .process(new SourceMetricsFunction(part.id, compilerData.componentUseCase), contextTypeInformation) val asyncAssigned = registerInterpretationPart(start, part, InterpretationName) @@ -377,7 +379,12 @@ class FlinkProcessRegistrar( } else { val ti = InterpretationResultTypeInformation.create(typeInformationDetection, outputContexts) stream.flatMap( - new SyncInterpretationFunction(compilerDataForProcessPart(Some(part)), node, validationContext, useIOMonad), + new SyncInterpretationFunction( + compilerDataForProcessPart(Some(part)), + node, + validationContext, + useIOMonad + ), ti ) } diff --git a/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/registrar/SourceMetricsFunction.scala b/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/registrar/SourceMetricsFunction.scala index 386c400bda2..5174ba94be6 100644 --- a/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/registrar/SourceMetricsFunction.scala +++ b/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/registrar/SourceMetricsFunction.scala @@ -3,16 +3,18 @@ package pl.touk.nussknacker.engine.process.registrar import org.apache.flink.configuration.Configuration import org.apache.flink.streaming.api.functions.ProcessFunction import org.apache.flink.util.Collector -import pl.touk.nussknacker.engine.process.compiler.FlinkMetricsProviderForScenario +import pl.touk.nussknacker.engine.api.process.ComponentUseCase +import pl.touk.nussknacker.engine.process.compiler.MetricsProviderForFlink.createMetricsProvider import pl.touk.nussknacker.engine.util.metrics.common.OneSourceMetrics -private[registrar] class SourceMetricsFunction[T](sourceId: String) extends ProcessFunction[T, T] { +private[registrar] class SourceMetricsFunction[T](sourceId: String, componentUseCase: ComponentUseCase) + extends ProcessFunction[T, T] { @transient private var metrics: OneSourceMetrics = _ override def open(parameters: Configuration): Unit = { metrics = new OneSourceMetrics(sourceId) - val metricsProvider = new FlinkMetricsProviderForScenario(getRuntimeContext) + val metricsProvider = createMetricsProvider(componentUseCase, getRuntimeContext) metrics.registerOwnMetrics(metricsProvider) }