Skip to content

Commit

Permalink
perf: configure GitLab client threads by leveraging concurrency property
Browse files Browse the repository at this point in the history
Additionally optimize threads allocation per usage.
  • Loading branch information
filipowm committed Mar 21, 2022
1 parent 937f8fd commit 7f6c7af
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 19 deletions.
Original file line number Diff line number Diff line change
@@ -1,9 +1,3 @@
@file:Suppress("SpringJavaInjectionPointsAutowiringInspection", "SpringJavaInjectionPointsAutowiringInspection", "SpringJavaInjectionPointsAutowiringInspection",
"SpringJavaInjectionPointsAutowiringInspection", "SpringJavaInjectionPointsAutowiringInspection", "SpringJavaInjectionPointsAutowiringInspection",
"SpringJavaInjectionPointsAutowiringInspection", "SpringJavaInjectionPointsAutowiringInspection", "SpringJavaInjectionPointsAutowiringInspection",
"SpringJavaInjectionPointsAutowiringInspection"
)

package com.roche.ambassador.configuration.concurrent

import com.roche.ambassador.ConcurrencyProvider
Expand All @@ -24,20 +18,28 @@ internal class ConcurrencyProviderImpl(indexerProperties: IndexerProperties) : C
private val producerExecutor: ExecutorService
private val consumerExecutor: ExecutorService
private val supportingExecutor: ExecutorService
private val sourceClientThreadsCount: Int

init {
val properties = indexerProperties.concurrency
val consumerThreadFactory = CustomizableThreadFactory(properties.consumerThreadPrefix)
val producerThreadFactory = CustomizableThreadFactory(properties.producerThreadPrefix)
val supportingThreadFactory = CustomizableThreadFactory(properties.supportingThreadPrefix)
val supportingExecutorThreads = max(1, ceil(properties.concurrencyLevel * 0.15).roundToInt())
val producerExecutorThreads = max(1, ceil(properties.concurrencyLevel * 0.1).roundToInt())
val consumerExecutorThreads = max(1, properties.concurrencyLevel - producerExecutorThreads - supportingExecutorThreads)
producerExecutor = MdcThreadPoolExecutor.newWithInheritedMdcFixedThreadPool(producerExecutorThreads, producerThreadFactory)
consumerExecutor = MdcThreadPoolExecutor.newWithInheritedMdcFixedThreadPool(consumerExecutorThreads, consumerThreadFactory)
supportingExecutor = MdcThreadPoolExecutor.newWithInheritedMdcFixedThreadPool(supportingExecutorThreads, supportingThreadFactory)
val calculateThreadsCount: (Int, Double) -> Int = { concurrencyLevel, threadsFactor ->
max(1, ceil(concurrencyLevel * threadsFactor).roundToInt())
}
with(indexerProperties.concurrency) {
val consumerThreadFactory = CustomizableThreadFactory(consumerThreadPrefix)
val producerThreadFactory = CustomizableThreadFactory(producerThreadPrefix)
val supportingThreadFactory = CustomizableThreadFactory(supportingThreadPrefix)
val supportingExecutorThreads = calculateThreadsCount(concurrencyLevel, 0.15)
val producerExecutorThreads = calculateThreadsCount(concurrencyLevel, 0.15)
sourceClientThreadsCount = calculateThreadsCount(concurrencyLevel, 0.35)
val consumerExecutorThreads = max(2, concurrencyLevel - sourceClientThreadsCount - producerExecutorThreads - supportingExecutorThreads)
producerExecutor = MdcThreadPoolExecutor.newWithInheritedMdcFixedThreadPool(producerExecutorThreads, producerThreadFactory)
consumerExecutor = MdcThreadPoolExecutor.newWithInheritedMdcFixedThreadPool(consumerExecutorThreads, consumerThreadFactory)
supportingExecutor = MdcThreadPoolExecutor.newWithInheritedMdcFixedThreadPool(supportingExecutorThreads, supportingThreadFactory)
}
}

override fun getSourceClientThreadsCount(): Int = sourceClientThreadsCount

override fun getSourceProjectProducerDispatcher(): CoroutineDispatcher = producerExecutor.asCoroutineDispatcher()

override fun getIndexingConsumerDispatcher(): CoroutineDispatcher = consumerExecutor.asCoroutineDispatcher()
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.roche.ambassador.configuration.source

import com.roche.ambassador.ConcurrencyProvider
import com.roche.ambassador.GenerationSpec
import com.roche.ambassador.configuration.source.ProjectSourcesProperties.System.FAKE
import com.roche.ambassador.configuration.source.ProjectSourcesProperties.System.GITLAB
Expand All @@ -21,10 +22,11 @@ class ProjectSourceConfiguration {
fun sources(
projectSourcesProperties: ProjectSourcesProperties,
@Qualifier("projectSourceCacheManager")
cacheManager: CacheManager
cacheManager: CacheManager,
concurrencyProvider: ConcurrencyProvider
): ProjectSources {
val source = when (projectSourcesProperties.system) {
GITLAB -> configureGitLab(projectSourcesProperties)
GITLAB -> configureGitLab(projectSourcesProperties, concurrencyProvider)
FAKE -> configureFake()
}
val cache = cacheManager.getCache(source.name())!!
Expand All @@ -39,7 +41,8 @@ class ProjectSourceConfiguration {

@ExperimentalCoroutinesApi
private fun configureGitLab(
projectSourcesProperties: ProjectSourcesProperties
projectSourcesProperties: ProjectSourcesProperties,
concurrencyProvider: ConcurrencyProvider
): GitLabSource {

// @formatter:off
Expand All @@ -49,6 +52,7 @@ class ProjectSourceConfiguration {
.exponentialBackoff(2.0, Duration.ofMinutes(5))
.build()
.httpClient()
.clientThreadsCount(concurrencyProvider.getSourceClientThreadsCount())
.logging().nothing().and()
.authenticated().withPersonalAccessToken(projectSourcesProperties.token)
.url(projectSourcesProperties.url)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import kotlinx.coroutines.CoroutineDispatcher

interface ConcurrencyProvider {

fun getSourceClientThreadsCount(): Int
fun getSourceProjectProducerDispatcher(): CoroutineDispatcher
fun getIndexingConsumerDispatcher(): CoroutineDispatcher
fun getSupportingDispatcher(): CoroutineDispatcher
Expand Down

0 comments on commit 7f6c7af

Please sign in to comment.