diff --git a/ambassador-application/src/main/kotlin/com/roche/ambassador/configuration/concurrent/ConcurrencyProviderImpl.kt b/ambassador-application/src/main/kotlin/com/roche/ambassador/configuration/concurrent/ConcurrencyProviderImpl.kt index f2d19a0c..ada98c97 100644 --- a/ambassador-application/src/main/kotlin/com/roche/ambassador/configuration/concurrent/ConcurrencyProviderImpl.kt +++ b/ambassador-application/src/main/kotlin/com/roche/ambassador/configuration/concurrent/ConcurrencyProviderImpl.kt @@ -1,9 +1,3 @@ -@file:Suppress("SpringJavaInjectionPointsAutowiringInspection", "SpringJavaInjectionPointsAutowiringInspection", "SpringJavaInjectionPointsAutowiringInspection", - "SpringJavaInjectionPointsAutowiringInspection", "SpringJavaInjectionPointsAutowiringInspection", "SpringJavaInjectionPointsAutowiringInspection", - "SpringJavaInjectionPointsAutowiringInspection", "SpringJavaInjectionPointsAutowiringInspection", "SpringJavaInjectionPointsAutowiringInspection", - "SpringJavaInjectionPointsAutowiringInspection" -) - package com.roche.ambassador.configuration.concurrent import com.roche.ambassador.ConcurrencyProvider @@ -24,20 +18,28 @@ internal class ConcurrencyProviderImpl(indexerProperties: IndexerProperties) : C private val producerExecutor: ExecutorService private val consumerExecutor: ExecutorService private val supportingExecutor: ExecutorService + private val sourceClientThreadsCount: Int init { - val properties = indexerProperties.concurrency - val consumerThreadFactory = CustomizableThreadFactory(properties.consumerThreadPrefix) - val producerThreadFactory = CustomizableThreadFactory(properties.producerThreadPrefix) - val supportingThreadFactory = CustomizableThreadFactory(properties.supportingThreadPrefix) - val supportingExecutorThreads = max(1, ceil(properties.concurrencyLevel * 0.15).roundToInt()) - val producerExecutorThreads = max(1, ceil(properties.concurrencyLevel * 0.1).roundToInt()) - val consumerExecutorThreads = max(1, properties.concurrencyLevel - producerExecutorThreads - supportingExecutorThreads) - producerExecutor = MdcThreadPoolExecutor.newWithInheritedMdcFixedThreadPool(producerExecutorThreads, producerThreadFactory) - consumerExecutor = MdcThreadPoolExecutor.newWithInheritedMdcFixedThreadPool(consumerExecutorThreads, consumerThreadFactory) - supportingExecutor = MdcThreadPoolExecutor.newWithInheritedMdcFixedThreadPool(supportingExecutorThreads, supportingThreadFactory) + val calculateThreadsCount: (Int, Double) -> Int = { concurrencyLevel, threadsFactor -> + max(1, ceil(concurrencyLevel * threadsFactor).roundToInt()) + } + with(indexerProperties.concurrency) { + val consumerThreadFactory = CustomizableThreadFactory(consumerThreadPrefix) + val producerThreadFactory = CustomizableThreadFactory(producerThreadPrefix) + val supportingThreadFactory = CustomizableThreadFactory(supportingThreadPrefix) + val supportingExecutorThreads = calculateThreadsCount(concurrencyLevel, 0.15) + val producerExecutorThreads = calculateThreadsCount(concurrencyLevel, 0.15) + sourceClientThreadsCount = calculateThreadsCount(concurrencyLevel, 0.35) + val consumerExecutorThreads = max(2, concurrencyLevel - sourceClientThreadsCount - producerExecutorThreads - supportingExecutorThreads) + producerExecutor = MdcThreadPoolExecutor.newWithInheritedMdcFixedThreadPool(producerExecutorThreads, producerThreadFactory) + consumerExecutor = MdcThreadPoolExecutor.newWithInheritedMdcFixedThreadPool(consumerExecutorThreads, consumerThreadFactory) + supportingExecutor = MdcThreadPoolExecutor.newWithInheritedMdcFixedThreadPool(supportingExecutorThreads, supportingThreadFactory) + } } + override fun getSourceClientThreadsCount(): Int = sourceClientThreadsCount + override fun getSourceProjectProducerDispatcher(): CoroutineDispatcher = producerExecutor.asCoroutineDispatcher() override fun getIndexingConsumerDispatcher(): CoroutineDispatcher = consumerExecutor.asCoroutineDispatcher() diff --git a/ambassador-application/src/main/kotlin/com/roche/ambassador/configuration/source/ProjectSourceConfiguration.kt b/ambassador-application/src/main/kotlin/com/roche/ambassador/configuration/source/ProjectSourceConfiguration.kt index 3fd0840c..d6ded4e9 100644 --- a/ambassador-application/src/main/kotlin/com/roche/ambassador/configuration/source/ProjectSourceConfiguration.kt +++ b/ambassador-application/src/main/kotlin/com/roche/ambassador/configuration/source/ProjectSourceConfiguration.kt @@ -1,5 +1,6 @@ package com.roche.ambassador.configuration.source +import com.roche.ambassador.ConcurrencyProvider import com.roche.ambassador.GenerationSpec import com.roche.ambassador.configuration.source.ProjectSourcesProperties.System.FAKE import com.roche.ambassador.configuration.source.ProjectSourcesProperties.System.GITLAB @@ -21,10 +22,11 @@ class ProjectSourceConfiguration { fun sources( projectSourcesProperties: ProjectSourcesProperties, @Qualifier("projectSourceCacheManager") - cacheManager: CacheManager + cacheManager: CacheManager, + concurrencyProvider: ConcurrencyProvider ): ProjectSources { val source = when (projectSourcesProperties.system) { - GITLAB -> configureGitLab(projectSourcesProperties) + GITLAB -> configureGitLab(projectSourcesProperties, concurrencyProvider) FAKE -> configureFake() } val cache = cacheManager.getCache(source.name())!! @@ -39,7 +41,8 @@ class ProjectSourceConfiguration { @ExperimentalCoroutinesApi private fun configureGitLab( - projectSourcesProperties: ProjectSourcesProperties + projectSourcesProperties: ProjectSourcesProperties, + concurrencyProvider: ConcurrencyProvider ): GitLabSource { // @formatter:off @@ -49,6 +52,7 @@ class ProjectSourceConfiguration { .exponentialBackoff(2.0, Duration.ofMinutes(5)) .build() .httpClient() + .clientThreadsCount(concurrencyProvider.getSourceClientThreadsCount()) .logging().nothing().and() .authenticated().withPersonalAccessToken(projectSourcesProperties.token) .url(projectSourcesProperties.url) diff --git a/ambassador-commons/src/main/kotlin/com/roche/ambassador/ConcurrencyProvider.kt b/ambassador-commons/src/main/kotlin/com/roche/ambassador/ConcurrencyProvider.kt index 85dcd80a..694b0428 100644 --- a/ambassador-commons/src/main/kotlin/com/roche/ambassador/ConcurrencyProvider.kt +++ b/ambassador-commons/src/main/kotlin/com/roche/ambassador/ConcurrencyProvider.kt @@ -4,6 +4,7 @@ import kotlinx.coroutines.CoroutineDispatcher interface ConcurrencyProvider { + fun getSourceClientThreadsCount(): Int fun getSourceProjectProducerDispatcher(): CoroutineDispatcher fun getIndexingConsumerDispatcher(): CoroutineDispatcher fun getSupportingDispatcher(): CoroutineDispatcher