Skip to content

Commit

Permalink
Merge pull request #6132 from TouK/1.15.1-statistics-and-adhoc-test-fix
Browse files Browse the repository at this point in the history
1.15.1 statistics and adhoc test fix - cherry-pick from staging
  • Loading branch information
ForrestFairy authored Jun 5, 2024
2 parents a200797 + 92779e7 commit f374bf1
Show file tree
Hide file tree
Showing 11 changed files with 86 additions and 22 deletions.
1 change: 0 additions & 1 deletion designer/client/src/actions/nk/assignSettings.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ export type MetricsType = {

export type UsageStatisticsReports = {
enabled: boolean;
url?: string;
};

export type SurveySettings = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import cats.data.Validated
import de.heikoseeberger.akkahttpcirce.FailFastCirceSupport
import io.circe.{Decoder, Encoder}
import io.circe.generic.JsonCodec
import pl.touk.nussknacker.ui.config.{AnalyticsConfig, FeatureTogglesConfig}
import pl.touk.nussknacker.ui.config.{AnalyticsConfig, FeatureTogglesConfig, UsageStatisticsReportsConfig}
import pl.touk.nussknacker.engine.api.CirceUtil.codecs._

import java.net.URL
Expand All @@ -14,7 +14,8 @@ import scala.concurrent.ExecutionContext
class SettingsResources(
config: FeatureTogglesConfig,
authenticationMethod: String,
analyticsConfig: Option[AnalyticsConfig]
analyticsConfig: Option[AnalyticsConfig],
usageStatisticsReportsConfig: UsageStatisticsReportsConfig
)(implicit ec: ExecutionContext)
extends Directives
with FailFastCirceSupport
Expand All @@ -36,8 +37,7 @@ class SettingsResources(
intervalTimeSettings = config.intervalTimeSettings,
testDataSettings = config.testDataSettings,
redirectAfterArchive = config.redirectAfterArchive,
// TODO: It's disabled temporarily until we remove it on FE. We can remove it once it has been removed on FE.
usageStatisticsReports = UsageStatisticsReportsSettings(false, None)
usageStatisticsReports = UsageStatisticsReportsSettings(usageStatisticsReportsConfig.enabled)
)

val authenticationSettings = AuthenticationSettings(
Expand Down Expand Up @@ -145,4 +145,4 @@ object TopTabType extends Enumeration {
analytics: Option[AnalyticsSettings]
)

@JsonCodec final case class UsageStatisticsReportsSettings(enabled: Boolean, url: Option[String])
@JsonCodec final case class UsageStatisticsReportsSettings(enabled: Boolean)
Original file line number Diff line number Diff line change
@@ -1,7 +1,14 @@
package pl.touk.nussknacker.ui.process.processingtype

import com.typesafe.scalalogging.LazyLogging
import pl.touk.nussknacker.engine._
import pl.touk.nussknacker.engine.{
ConfigWithUnresolvedVersion,
DeploymentManagerDependencies,
DeploymentManagerProvider,
ModelData,
ModelDependencies,
ProcessingTypeConfig
}
import pl.touk.nussknacker.engine.api.process.ProcessingType
import pl.touk.nussknacker.engine.deployment.EngineSetupName
import pl.touk.nussknacker.engine.util.Implicits.RichScalaMap
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -442,7 +442,8 @@ class AkkaHttpBasedRouteProvider(
val settingsResources = new SettingsResources(
featureTogglesConfig,
authenticationResources.name,
analyticsConfig
analyticsConfig,
usageStatisticsReportsConfig
)
val apiResourcesWithoutAuthentication: List[Route] = List(
settingsResources.publicRoute(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach}
import pl.touk.nussknacker.test.PatientScalaFutures
import pl.touk.nussknacker.test.base.it.NuResourcesTest
import pl.touk.nussknacker.test.utils.domain.TestFactory.withoutPermissions
import pl.touk.nussknacker.ui.config.AnalyticsConfig
import pl.touk.nussknacker.ui.config.{AnalyticsConfig, UsageStatisticsReportsConfig}
import pl.touk.nussknacker.ui.security.basicauth.BasicAuthenticationConfiguration

class SettingsResourcesSpec
Expand All @@ -25,11 +25,14 @@ class SettingsResourcesSpec
private val authenticationConfig: BasicAuthenticationConfiguration =
BasicAuthenticationConfiguration.create(testConfig)
private val analyticsConfig: Option[AnalyticsConfig] = AnalyticsConfig(testConfig)
private val usageStatisticsReportsConfig: UsageStatisticsReportsConfig =
UsageStatisticsReportsConfig(true, None, None)

private val settingsRoute = new SettingsResources(
featureTogglesConfig,
authenticationConfig.name,
analyticsConfig
analyticsConfig,
usageStatisticsReportsConfig
)

// Values are exists at test/resources/application.conf
Expand All @@ -44,6 +47,7 @@ class SettingsResourcesSpec

data.intervalTimeSettings.processes shouldBe intervalTimeProcesses
data.intervalTimeSettings.healthCheck shouldBe intervalTimeHealthCheck
data.usageStatisticsReports.enabled shouldBe true
}
}

Expand Down
5 changes: 5 additions & 0 deletions docs/Changelog.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
# Changelog

1.15.1 (5 June 2024)
-------------------------
* [#6126](https://github.com/TouK/nussknacker/pull/6126) Fix statistics configuration.
* [#6127](https://github.com/TouK/nussknacker/pull/6127) Ad-hoc tests available in scenarios without `flink-dropwizard-metrics-deps` in classPath

1.15.0 (17 May 2024)
-------------------------
* [#5620](https://github.com/TouK/nussknacker/pull/5620) Nodes Api OpenApi-based documentation (e.g. `https://demo.nussknacker.io/api/docs`)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,33 @@ package pl.touk.nussknacker.engine.process.compiler

import org.apache.flink.api.common.functions.RuntimeContext
import pl.touk.nussknacker.engine.api.JobData
import pl.touk.nussknacker.engine.api.process.ComponentUseCase
import pl.touk.nussknacker.engine.api.runtimecontext.{ContextIdGenerator, IncContextIdGenerator}
import pl.touk.nussknacker.engine.flink.api.FlinkEngineRuntimeContext
import pl.touk.nussknacker.engine.process.compiler.MetricsProviderForFlink.createMetricsProvider
import pl.touk.nussknacker.engine.util.metrics.MetricsProviderForScenario

case class FlinkEngineRuntimeContextImpl(jobData: JobData, runtimeContext: RuntimeContext)
extends FlinkEngineRuntimeContext {
override val metricsProvider: MetricsProviderForScenario = new FlinkMetricsProviderForScenario(runtimeContext)
case class FlinkEngineRuntimeContextImpl(
jobData: JobData,
runtimeContext: RuntimeContext,
metricsProvider: MetricsProviderForScenario
) extends FlinkEngineRuntimeContext {

override def contextIdGenerator(nodeId: String): ContextIdGenerator =
new IncContextIdGenerator(jobData.metaData.name.value + "-" + nodeId + "-" + runtimeContext.getIndexOfThisSubtask)

}

object FlinkEngineRuntimeContextImpl {

// This creates FlinkEngineRuntimeContextImpl with correct metricsProviderForScenario based on ComponentUseCase
def apply(
jobData: JobData,
runtimeContext: RuntimeContext,
componentUseCase: ComponentUseCase
): FlinkEngineRuntimeContextImpl = {
val properMetricsProvider = createMetricsProvider(componentUseCase, runtimeContext)
new FlinkEngineRuntimeContextImpl(jobData, runtimeContext, properMetricsProvider)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ class FlinkProcessCompilerData(
def open(runtimeContext: RuntimeContext, nodesToUse: List[_ <: NodeData]): Unit = {
val lifecycle = compilerData.lifecycle(nodesToUse)
lifecycle.foreach {
_.open(FlinkEngineRuntimeContextImpl(jobData, runtimeContext))
_.open(FlinkEngineRuntimeContextImpl(jobData, runtimeContext, componentUseCase))
}
}

Expand Down Expand Up @@ -74,7 +74,9 @@ class FlinkProcessCompilerData(
def restartStrategy: RestartStrategies.RestartStrategyConfiguration = exceptionHandler.restartStrategy

def prepareExceptionHandler(runtimeContext: RuntimeContext): FlinkExceptionHandler = {
exceptionHandler.open(FlinkEngineRuntimeContextImpl(jobData, runtimeContext))
exceptionHandler.open(
FlinkEngineRuntimeContextImpl(jobData, runtimeContext, componentUseCase)
)
exceptionHandler
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package pl.touk.nussknacker.engine.process.compiler

import org.apache.flink.api.common.functions.RuntimeContext
import pl.touk.nussknacker.engine.api.process.ComponentUseCase
import pl.touk.nussknacker.engine.util.metrics.{MetricsProviderForScenario, NoOpMetricsProviderForScenario}

object MetricsProviderForFlink {

def createMetricsProvider(
componentUseCase: ComponentUseCase,
runtimeContext: RuntimeContext
): MetricsProviderForScenario = {
componentUseCase match {
case ComponentUseCase.TestRuntime => NoOpMetricsProviderForScenario
case _ => new FlinkMetricsProviderForScenario(runtimeContext)
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -140,12 +140,14 @@ class FlinkProcessRegistrar(
): FlinkCustomNodeContext = {
val exceptionHandlerPreparer = (runtimeContext: RuntimeContext) =>
compilerDataForProcessPart(None)(runtimeContext.getUserCodeClassLoader).prepareExceptionHandler(runtimeContext)
val jobData = compilerData.jobData
val jobData = compilerData.jobData
val componentUseCase = compilerData.componentUseCase

FlinkCustomNodeContext(
jobData,
nodeComponentId.nodeId,
compilerData.processTimeout,
convertToEngineRuntimeContext = FlinkEngineRuntimeContextImpl(jobData, _),
convertToEngineRuntimeContext = FlinkEngineRuntimeContextImpl(jobData, _, componentUseCase),
lazyParameterHelper = new FlinkLazyParameterFunctionHelper(
nodeComponentId,
exceptionHandlerPreparer,
Expand Down Expand Up @@ -182,7 +184,7 @@ class FlinkProcessRegistrar(

val start = source
.contextStream(env, nodeContext(nodeComponentInfoFrom(part), Left(ValidationContext.empty)))
.process(new SourceMetricsFunction(part.id), contextTypeInformation)
.process(new SourceMetricsFunction(part.id, compilerData.componentUseCase), contextTypeInformation)

val asyncAssigned = registerInterpretationPart(start, part, InterpretationName)

Expand Down Expand Up @@ -377,7 +379,12 @@ class FlinkProcessRegistrar(
} else {
val ti = InterpretationResultTypeInformation.create(typeInformationDetection, outputContexts)
stream.flatMap(
new SyncInterpretationFunction(compilerDataForProcessPart(Some(part)), node, validationContext, useIOMonad),
new SyncInterpretationFunction(
compilerDataForProcessPart(Some(part)),
node,
validationContext,
useIOMonad
),
ti
)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,18 @@ package pl.touk.nussknacker.engine.process.registrar
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.ProcessFunction
import org.apache.flink.util.Collector
import pl.touk.nussknacker.engine.process.compiler.FlinkMetricsProviderForScenario
import pl.touk.nussknacker.engine.api.process.ComponentUseCase
import pl.touk.nussknacker.engine.process.compiler.MetricsProviderForFlink.createMetricsProvider
import pl.touk.nussknacker.engine.util.metrics.common.OneSourceMetrics

private[registrar] class SourceMetricsFunction[T](sourceId: String) extends ProcessFunction[T, T] {
private[registrar] class SourceMetricsFunction[T](sourceId: String, componentUseCase: ComponentUseCase)
extends ProcessFunction[T, T] {

@transient private var metrics: OneSourceMetrics = _

override def open(parameters: Configuration): Unit = {
metrics = new OneSourceMetrics(sourceId)
val metricsProvider = new FlinkMetricsProviderForScenario(getRuntimeContext)
val metricsProvider = createMetricsProvider(componentUseCase, getRuntimeContext)
metrics.registerOwnMetrics(metricsProvider)
}

Expand Down

0 comments on commit f374bf1

Please sign in to comment.