Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Better cleaning of allocated resources #7280

Draft
wants to merge 2 commits into
base: staging
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/Changelog.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

### 1.19.0 (Not released yet)

* [#7280](https://github.com/TouK/nussknacker/pull/7280) Better cleaning of allocated resources
* [#7145](https://github.com/TouK/nussknacker/pull/7145) Lift TypingResult information for dictionaries
* [#7116](https://github.com/TouK/nussknacker/pull/7116) Improve missing Flink Kafka Source / Sink TypeInformation
* [#7123](https://github.com/TouK/nussknacker/pull/7123) Fix deployments for scenarios with dict editors after model reload
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,17 @@ import scala.util.{Failure, Success, Try, Using}
object FlinkMiniClusterTableOperations extends LazyLogging {

def parseTestRecords(records: List[TestRecord], schema: Schema): List[Row] = {
implicit val env: StreamTableEnvironment = MiniClusterEnvBuilder.buildStreamTableEnv
val (inputTablePath, inputTableName) = createTempFileTable(schema)
val parsedRecords = Try {
writeRecordsToFile(inputTablePath, records)
val inputTable = env.from(s"`$inputTableName`")
env.toDataStream(inputTable).executeAndCollect().asScala.toList
Using.resource(MiniClusterEnvBuilder.createLocalStreamEnv) { streamEnv =>
implicit val tableEvn: StreamTableEnvironment = MiniClusterEnvBuilder.createTableStreamEnv(streamEnv)
val (inputTablePath, inputTableName) = createTempFileTable(schema)
try {
writeRecordsToFile(inputTablePath, records)
val inputTable = tableEvn.from(s"`$inputTableName`")
tableEvn.toDataStream(inputTable).executeAndCollect().asScala.toList
} finally {
cleanup(inputTablePath)
}
}
cleanup(inputTablePath)
parsedRecords.get
}

def generateLiveTestData(
Expand Down Expand Up @@ -61,7 +63,7 @@ object FlinkMiniClusterTableOperations extends LazyLogging {
schema: Schema,
buildSourceTable: TableEnvironment => Table
): TestData = {
implicit val env: TableEnvironment = MiniClusterEnvBuilder.buildTableEnv
implicit val env: TableEnvironment = MiniClusterEnvBuilder.createTableEnv
val sourceTable = buildSourceTable(env)
val (outputFilePath, outputTableName) = createTempFileTable(schema)
val generatedRows = Try {
Expand Down Expand Up @@ -184,12 +186,13 @@ object FlinkMiniClusterTableOperations extends LazyLogging {

private lazy val tableEnvConfig = EnvironmentSettings.newInstance().withConfiguration(streamEnvConfig).build()

def buildTableEnv: TableEnvironment = TableEnvironment.create(tableEnvConfig)
def createTableEnv: TableEnvironment = TableEnvironment.create(tableEnvConfig)

def buildStreamTableEnv: StreamTableEnvironment = StreamTableEnvironment.create(
StreamExecutionEnvironment.createLocalEnvironment(streamEnvConfig),
tableEnvConfig
)
def createLocalStreamEnv: StreamExecutionEnvironment =
StreamExecutionEnvironment.createLocalEnvironment(streamEnvConfig)

def createTableStreamEnv(streamEnv: StreamExecutionEnvironment): StreamTableEnvironment =
StreamTableEnvironment.create(streamEnv, tableEnvConfig)

}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package pl.touk.nussknacker.engine.process.runner

import com.typesafe.scalalogging.LazyLogging
import org.apache.flink.configuration.{
ConfigUtils,
Configuration,
Expand All @@ -21,21 +22,24 @@ import java.net.{MalformedURLException, URL}
import scala.jdk.CollectionConverters._
import scala.util.Using

trait FlinkStubbedRunner {
trait FlinkStubbedRunner { self: LazyLogging =>

protected def modelData: ModelData

protected def process: CanonicalProcess

protected def configuration: Configuration

protected def createEnv: StreamExecutionEnvironment = StreamExecutionEnvironment.createLocalEnvironment(
MetaDataExtractor
.extractTypeSpecificDataOrDefault[StreamMetaData](process.metaData, StreamMetaData())
.parallelism
.getOrElse(1),
configuration
)
protected def createEnv: StreamExecutionEnvironment = {
logger.debug(s"Creating LocalEnvironment for model with classpath: ${modelData.modelClassLoader}")
StreamExecutionEnvironment.createLocalEnvironment(
MetaDataExtractor
.extractTypeSpecificDataOrDefault[StreamMetaData](process.metaData, StreamMetaData())
.parallelism
.getOrElse(1),
configuration
)
}

// we use own LocalFlinkMiniCluster, instead of LocalExecutionEnvironment, to be able to pass own classpath...
protected def execute[T](
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package pl.touk.nussknacker.engine.process.runner

import com.typesafe.scalalogging.LazyLogging
import io.circe.Json
import org.apache.flink.configuration.Configuration
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings
Expand Down Expand Up @@ -53,7 +54,8 @@ class FlinkTestMain(
processVersion: ProcessVersion,
deploymentData: DeploymentData,
val configuration: Configuration
) extends FlinkStubbedRunner {
) extends FlinkStubbedRunner
with LazyLogging {

def runTest: TestResults[Json] = {
val collectingListener = ResultsCollectingListenerHolder.registerTestEngineListener
Expand All @@ -62,9 +64,14 @@ class FlinkTestMain(
val registrar = prepareRegistrar(collectingListener, scenarioTestData)
val env = createEnv

registrar.register(env, process, processVersion, deploymentData, resultCollector)
execute(env, SavepointRestoreSettings.none())
collectingListener.results
try {
registrar.register(env, process, processVersion, deploymentData, resultCollector)
execute(env, SavepointRestoreSettings.none())
collectingListener.results
} finally {
logger.debug(s"Closing LocalEnvironment for model with classpath: ${modelData.modelClassLoader}")
env.close()
}
} finally {
collectingListener.clean()
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package pl.touk.nussknacker.engine.process.runner

import com.typesafe.scalalogging.LazyLogging
import org.apache.flink.configuration.Configuration
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings
import pl.touk.nussknacker.engine.ModelData
Expand All @@ -11,6 +12,8 @@ import pl.touk.nussknacker.engine.process.registrar.FlinkProcessRegistrar
import pl.touk.nussknacker.engine.process.{ExecutionConfigPreparer, FlinkJobConfig}
import pl.touk.nussknacker.engine.testmode.{ResultsCollectingListenerHolder, TestRunId, TestServiceInvocationCollector}

import scala.util.Using

object FlinkVerificationMain extends FlinkRunner {

def run(
Expand All @@ -33,16 +36,26 @@ class FlinkVerificationMain(
deploymentData: DeploymentData,
savepointPath: String,
val configuration: Configuration
) extends FlinkStubbedRunner {
) extends FlinkStubbedRunner
with LazyLogging {

def runTest(): Unit = {
val collectingListener = ResultsCollectingListenerHolder.registerTestEngineListener
val resultCollector = new TestServiceInvocationCollector(collectingListener)
val registrar = prepareRegistrar()
val env = createEnv
try {
val resultCollector = new TestServiceInvocationCollector(collectingListener)
val registrar = prepareRegistrar()
val env = createEnv

registrar.register(env, process, processVersion, deploymentData, resultCollector)
execute(env, SavepointRestoreSettings.forPath(savepointPath, true))
try {
registrar.register(env, process, processVersion, deploymentData, resultCollector)
execute(env, SavepointRestoreSettings.forPath(savepointPath, true))
} finally {
logger.debug(s"Closing LocalEnvironment for model with classpath: ${modelData.modelClassLoader}")
env.close()
}
} finally {
collectingListener.close()
}
}

protected def prepareRegistrar(): FlinkProcessRegistrar = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,11 @@ trait ModelData extends BaseModelData with AutoCloseable {

final def close(): Unit = {
designerDictServices.close()
if (shouldCloseClassLoader) {
modelClassLoader.close()
}
}

protected def shouldCloseClassLoader: Boolean = true

}
Original file line number Diff line number Diff line change
Expand Up @@ -118,4 +118,8 @@ case class LocalModelData(
override val extractModelDefinitionFun =
new ExtractDefinitionFunImpl(configCreator, category, components, componentDefinitionExtractionMode)

// For LocalModelData we can't close classloader because it is used not a dedicated classloader in this case
// but AppClassLoader
override protected def shouldCloseClassLoader: Boolean = false

}
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,11 @@ package pl.touk.nussknacker.engine.util.loader

import com.typesafe.scalalogging.LazyLogging

import java.io.File
import java.io.{Closeable, File}
import java.net.{URI, URL, URLClassLoader}
import java.nio.file.Path

case class ModelClassLoader private (classLoader: ClassLoader, urls: List[URL]) {
case class ModelClassLoader private (classLoader: ClassLoader, urls: List[URL]) extends LazyLogging {

override def toString: String = s"ModelClassLoader(${toString(classLoader)})"

Expand All @@ -16,6 +16,16 @@ case class ModelClassLoader private (classLoader: ClassLoader, urls: List[URL])
case other => s"${other.toString}(${toString(other.getParent)})"
}

def close(): Unit = {
classLoader match {
case closeable: Closeable =>
logger.debug(s"$toString: Closing Closeable classloader")
closeable.close()
case _ =>
logger.debug(s"$toString: Classloader ${classLoader.getClass.getName} is not Closeable, skipping close")
}
}
Comment on lines +19 to +27
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Add exception handling to close() method

While the implementation is good, it should handle potential exceptions during close operation to prevent them from propagating up the call stack.

Consider this safer implementation:

 def close(): Unit = {
   classLoader match {
     case closeable: Closeable =>
       logger.debug(s"$toString: Closing Closeable classloader")
-      closeable.close()
+      try {
+        closeable.close()
+      } catch {
+        case e: Exception =>
+          logger.warn(s"$toString: Failed to close classloader", e)
+      }
     case _ =>
       logger.debug(s"$toString: Classloader ${classLoader.getClass.getName} is not Closeable, skipping close")
   }
 }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
def close(): Unit = {
classLoader match {
case closeable: Closeable =>
logger.debug(s"$toString: Closing Closeable classloader")
closeable.close()
case _ =>
logger.debug(s"$toString: Classloader ${classLoader.getClass.getName} is not Closeable, skipping close")
}
}
def close(): Unit = {
classLoader match {
case closeable: Closeable =>
logger.debug(s"$toString: Closing Closeable classloader")
try {
closeable.close()
} catch {
case e: Exception =>
logger.warn(s"$toString: Failed to close classloader", e)
}
case _ =>
logger.debug(s"$toString: Classloader ${classLoader.getClass.getName} is not Closeable, skipping close")
}
}


}

object ModelClassLoader extends LazyLogging {
Expand Down Expand Up @@ -67,10 +77,12 @@ object ModelClassLoader extends LazyLogging {
jarExtension: String = defaultJarExtension
): ModelClassLoader = {
val postProcessedURLs = expandFiles(urls.map(convertToURL(_, workingDirectoryOpt)), jarExtension)
ModelClassLoader(
val cl = ModelClassLoader(
new URLClassLoader(postProcessedURLs.toArray, this.getClass.getClassLoader),
postProcessedURLs.toList
)
logger.debug(s"$cl created")
cl
}

}
Loading