Skip to content

Commit

Permalink
cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
gskrobisz committed Sep 25, 2024
1 parent e0b7858 commit e972f8c
Show file tree
Hide file tree
Showing 8 changed files with 11 additions and 15 deletions.
8 changes: 3 additions & 5 deletions designer/client/src/http/HttpService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,8 @@ export type SourceWithParametersTest = {
parameterExpressions: { [paramName: string]: Expression };
};

export type NodesDeploymentData = Record<NodeId, Record<string, string>>;

export type NodeUsageData = {
fragmentNodeId?: string;
nodeId: string;
Expand Down Expand Up @@ -314,11 +316,7 @@ class HttpService {
);
}

deploy(
processName: string,
comment?: string,
nodesDeploymentData?: Record<NodeId, Record<string, string>>,
): Promise<{ isSuccess: boolean }> {
deploy(processName: string, comment?: string, nodesDeploymentData?: NodesDeploymentData): Promise<{ isSuccess: boolean }> {
const runDeploymentRequest = {
...(nodesDeploymentData && { nodesDeploymentData: nodesDeploymentData }),
...(comment && { comment: comment }),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@ class DeploymentApiHttpService(
RunDeploymentCommand(
id = deploymentId,
scenarioName = request.scenarioName,
nodesDeploymentData = NodesDeploymentData(request.nodesDeploymentData.map { case (n, p) =>
(n, Map("sqlExpression" -> p))
nodesDeploymentData = NodesDeploymentData(request.nodesDeploymentData.map { case (nodeId, paramValue) =>
(nodeId, Map("sqlExpression" -> paramValue))
}),
user = loggedUser
),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,10 @@ import io.circe.generic.extras.semiauto.deriveConfiguredEncoder
import io.circe.{Decoder, Encoder, Json, parser}
import io.dropwizard.metrics5.MetricRegistry
import pl.touk.nussknacker.engine.ModelData
import pl.touk.nussknacker.engine.api.NodeId
import pl.touk.nussknacker.engine.api.component.NodesDeploymentData
import pl.touk.nussknacker.engine.api.deployment.DeploymentUpdateStrategy.StateRestoringStrategy
import pl.touk.nussknacker.engine.api.deployment._
import pl.touk.nussknacker.engine.api.graph.ScenarioGraph
import pl.touk.nussknacker.engine.api.process.ProcessName
import pl.touk.nussknacker.engine.testmode.TestProcess._
import pl.touk.nussknacker.restmodel.{CustomActionRequest, CustomActionResponse}
import pl.touk.nussknacker.ui.api.description.NodesApiEndpoints.Dtos.{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ object DeploymentApiEndpoints {
@derive(encoder, decoder, schema)
final case class RunDeploymentRequest(
scenarioName: ProcessName,
nodesDeploymentData: Map[NodeId, String], // nodeId -> single parameter value
nodesDeploymentData: Map[NodeId, String], // nodeId -> single parameter value (currently sqlExpression)
comment: Option[ApiCallComment]
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ import pl.touk.nussknacker.ui.processreport.ProcessCounter
import pl.touk.nussknacker.ui.security.api.{LoggedUser, RealLoggedUser}
import pl.touk.nussknacker.ui.util.{MultipartUtils, NuPathMatchers}
import slick.dbio.DBIOAction

import java.net.URI
import scala.concurrent.{ExecutionContext, Future}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package pl.touk.nussknacker.ui.api

import io.restassured.RestAssured.`given`
import io.restassured.module.scala.RestAssuredSupport.AddThenToResponse
import org.hamcrest.Matchers.{emptyOrNullString, equalTo, is, notNullValue}
import org.hamcrest.Matchers.{equalTo, notNullValue}
import org.scalatest.freespec.AnyFreeSpecLike
import pl.touk.nussknacker.engine.build.ScenarioBuilder
import pl.touk.nussknacker.test.base.it.{NuItTest, WithSimplifiedConfigScenarioHelper}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import pl.touk.nussknacker.engine.api.process.{
}
import pl.touk.nussknacker.engine.api.runtimecontext.{ContextIdGenerator, EngineRuntimeContext}
import pl.touk.nussknacker.engine.api.test.{TestRecord, TestRecordParser}
import pl.touk.nussknacker.engine.api.typed.typing.Typed
import pl.touk.nussknacker.engine.flink.api.exception.ExceptionHandler
import pl.touk.nussknacker.engine.flink.api.process.{
FlinkCustomNodeContext,
Expand Down Expand Up @@ -113,7 +112,7 @@ class FlinkKafkaSource[T](
consumerGroupId: String,
flinkNodeContext: FlinkCustomNodeContext
): SourceFunction[T] = {
// TODO: handle deployment parameters -> offset
// TODO: use deployment parameters -> offsetResetStrategy
val offsetResetStrategy =
flinkNodeContext.nodeDeploymentData.flatMap(_.get(OFFSET_RESET_STRATEGY_PARAM_NAME)).getOrElse()
topics.toList.foreach(KafkaUtils.setToLatestOffsetIfNeeded(kafkaConfig, _, consumerGroupId))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,8 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
import pl.touk.nussknacker.engine.api.component.{ParameterConfig, UnboundedStreamComponent}
import pl.touk.nussknacker.engine.api.definition.{FixedExpressionValue, FixedValuesParameterEditor, RawParameterEditor}
import pl.touk.nussknacker.engine.api.deployment.ScenarioActionName
import pl.touk.nussknacker.engine.api.parameter.ParameterName
import pl.touk.nussknacker.engine.api.process.{SourceFactory, WithActivityParameters}
import pl.touk.nussknacker.engine.api.typed.typing.{Typed, Unknown}
import pl.touk.nussknacker.engine.api.typed.typing.Unknown
import pl.touk.nussknacker.engine.api.{MethodToInvoke, ParamName}
import pl.touk.nussknacker.engine.flink.api.process.FlinkCustomNodeContext
import pl.touk.nussknacker.engine.flink.util.source.CollectionSource
Expand Down Expand Up @@ -49,6 +48,7 @@ object BoundedSourceWithOffset extends SourceFactory with UnboundedStreamCompone
"Set offset to setup source to emit elements from specified start point in input collection. Empty field resets collection to the beginning."
)
),
// TODO: remove offsetResetStrategy
"offsetResetStrategy" -> ParameterConfig(
defaultValue = Some("EARLIEST"),
editor = fixedValuesEditor,
Expand Down

0 comments on commit e972f8c

Please sign in to comment.