Skip to content

Commit

Permalink
NATS disconnection notification support for Dacha2
Browse files Browse the repository at this point in the history
As Dacha2 heavily relies on NATS for its state, this allows
the unexpected dissolution of a NATs cluster to trigger a Dacha2
instance to drop being a cache and simply pass through requests to
MR until the NATS connection is reestablished.

#1187
  • Loading branch information
rvowles committed Nov 15, 2024
1 parent b5a31b9 commit 00b6f69
Show file tree
Hide file tree
Showing 9 changed files with 216 additions and 45 deletions.
156 changes: 129 additions & 27 deletions backend/dacha2/src/main/kotlin/io/featurehub/dacha2/Dacha2CacheImpl.kt
Original file line number Diff line number Diff line change
@@ -1,16 +1,24 @@
package io.featurehub.dacha2

import cd.connect.app.config.ConfigKey
import com.google.common.cache.*
import io.featurehub.dacha.model.*
import com.google.common.cache.Cache
import com.google.common.cache.CacheBuilder
import com.google.common.cache.CacheLoader
import com.google.common.cache.LoadingCache
import com.google.common.cache.RemovalListener
import io.featurehub.dacha.model.CacheServiceAccount
import io.featurehub.dacha.model.CacheServiceAccountPermission
import io.featurehub.dacha.model.PublishAction
import io.featurehub.dacha.model.PublishEnvironment
import io.featurehub.dacha.model.PublishFeatureValue
import io.featurehub.dacha.model.PublishServiceAccount
import io.featurehub.dacha2.api.Dacha2ServiceClient
import io.featurehub.enricher.EnrichmentEnvironment
import io.featurehub.enricher.FeatureEnrichmentCache
import io.featurehub.utils.FallbackPropertyConfig
import jakarta.inject.Inject
import jakarta.ws.rs.NotFoundException
import org.slf4j.Logger
import org.slf4j.LoggerFactory
import java.lang.RuntimeException
import java.util.*


Expand All @@ -32,11 +40,109 @@ interface Dacha2Cache {
* This will throw an exception if it can't be found
*/
fun findEnvironment(eId: UUID): FeatureValues
fun enableCache(cacheEnable: Boolean)
}

class Dacha2CacheImpl @Inject constructor(private val mrDacha2Api: Dacha2ServiceClient,
private val featureValueFactory: FeatureValuesFactory) : Dacha2Cache,
FeatureEnrichmentCache {
abstract class Dacha2BaseCache : Dacha2Cache, FeatureEnrichmentCache {}

class Dacha2DelegatingCache @Inject constructor(private val mrDacha2Api: Dacha2ServiceClient,
private val featureValueFactory: FeatureValuesFactory) : Dacha2BaseCache() {
private var cache: Dacha2BaseCache
private var cacheEnabled: Boolean = false
private val log: Logger = LoggerFactory.getLogger(Dacha2DelegatingCache::class.java)

init {
cache = Dacha2PassthroughImpl(mrDacha2Api, featureValueFactory)
log.info("started dacha2 in uncached passthrough mode (waiting on confirmed connection to streaming layer)")
}

override fun updateServiceAccount(serviceAccount: PublishServiceAccount) {
cache.updateServiceAccount(serviceAccount)
}

override fun updateEnvironment(env: PublishEnvironment) {
cache.updateEnvironment(env)
}

override fun updateFeature(feature: PublishFeatureValue) {
cache.updateFeature(feature)
}

override fun getFeatureCollection(eId: UUID, apiKey: String): FeatureCollection? {
return cache.getFeatureCollection(eId, apiKey)
}

override fun findEnvironment(eId: UUID): FeatureValues {
return cache.findEnvironment(eId)
}

override fun enableCache(cacheEnable: Boolean) {
if (cacheEnabled && !cacheEnable) {
log.info("lost connectivity, swapping dacha2 in uncached passthrough mode")
cache = Dacha2PassthroughImpl(mrDacha2Api, featureValueFactory)
} else if (!cacheEnabled && cacheEnable) {
log.info("connectivity to streaming established, swapping dacha2 to cached mode (empty cache)")
cache = Dacha2CacheImpl(mrDacha2Api, featureValueFactory)
}

cacheEnabled = cacheEnable
}

override fun getEnrichableEnvironment(eId: UUID): EnrichmentEnvironment {
return cache.getEnrichableEnvironment(eId)
}
}

class Dacha2PassthroughImpl(private val mrDacha2Api: Dacha2ServiceClient, private val featureValueFactory: FeatureValuesFactory) : Dacha2BaseCache() {
private val apiKey = FallbackPropertyConfig.getConfig("dacha2.cache.api-key")

override fun updateServiceAccount(serviceAccount: PublishServiceAccount) {
}

override fun updateEnvironment(env: PublishEnvironment) {
}

override fun updateFeature(feature: PublishFeatureValue) {
}

override fun getEnrichableEnvironment(eId: UUID): EnrichmentEnvironment {
val envFeatures = findEnvironment(eId)
return EnrichmentEnvironment(envFeatures.getFeatures(), envFeatures.environment)
}

private fun getEnvironment(eId: UUID): EnvironmentFeatures {
val env = mrDacha2Api.getEnvironment(eId, apiKey).env
return featureValueFactory.create(env)
}

private fun getServiceAccount(key: String): CacheServiceAccount {
return mrDacha2Api.getServiceAccount(key, apiKey).serviceAccount
}

override fun getFeatureCollection(eId: UUID, apiKey: String): FeatureCollection? {
val sa = getServiceAccount(apiKey)

var collection: FeatureCollection? = null

sa.permissions.find { it.environmentId == eId }?.let { perms ->
if (perms.permissions.isNotEmpty()) {
collection = FeatureCollection(getEnvironment(eId), perms, getServiceAccount(apiKey).id)
}
}

return collection
}

override fun findEnvironment(eId: UUID): FeatureValues {
return getEnvironment(eId)
}

override fun enableCache(cacheEnable: Boolean) {
}
}

class Dacha2CacheImpl(private val mrDacha2Api: Dacha2ServiceClient,
private val featureValueFactory: FeatureValuesFactory) : Dacha2BaseCache() {
private val log: Logger = LoggerFactory.getLogger(Dacha2CacheImpl::class.java)
private val serviceAccountApiKeyCache: LoadingCache<String, CacheServiceAccount>
private val serviceAccountCache: Cache<UUID, CacheServiceAccount>
Expand All @@ -49,42 +155,35 @@ class Dacha2CacheImpl @Inject constructor(private val mrDacha2Api: Dacha2Service
// environment id, environment-features
private val permsCache: Cache<String, CacheServiceAccountPermission>

@ConfigKey("dacha2.cache.service-account.miss-size")
var maximumServiceAccountMisses: Long? = 10000
private var maximumServiceAccountMisses = FallbackPropertyConfig.getConfig("dacha2.cache.service-account.miss-size", "10000").toLong()

@ConfigKey("dacha2.cache.service-account.perms-size")
var maximumServiceAccountPermissionsSize: Long? = 10000
private var maximumServiceAccountPermissionsSize = FallbackPropertyConfig.getConfig("dacha2.cache.service-account.perms-size", "10000").toLong()

@ConfigKey("dacha2.cache.service-account.size")
var maximumServiceAccounts: Long? = 10000
private var maximumServiceAccounts = FallbackPropertyConfig.getConfig("dacha2.cache.service-account.size", "10000").toLong()

@ConfigKey("dacha2.cache.environment.size")
var maximumEnvironments: Long? = 10000
private var maximumEnvironments = FallbackPropertyConfig.getConfig("dacha2.cache.environment.size", "10000").toLong()

@ConfigKey("dacha2.cache.environment.miss-size")
var maximumEnvironmentMisses: Long? = 10000
private var maximumEnvironmentMisses = FallbackPropertyConfig.getConfig("dacha2.cache.environment.miss-size", "10000").toLong()

@ConfigKey("dacha2.cache.all-updates")
var cacheStreamedUpdates: Boolean? = true
private var cacheStreamedUpdates: Boolean? = FallbackPropertyConfig.getConfig("dacha2.cache.all-updates") != "false"

@ConfigKey("dacha2.cache.api-key")
var apiKey: String? = null
var apiKey: String? = FallbackPropertyConfig.getConfig("dacha2.cache.api-key")

init {
environmentMissCache = CacheBuilder.newBuilder()
.maximumSize(maximumEnvironmentMisses!!)
.maximumSize(maximumEnvironmentMisses)
.build()

serviceAccountMissCache = CacheBuilder.newBuilder()
.maximumSize(maximumServiceAccountMisses!!)
.maximumSize(maximumServiceAccountMisses)
.build()

permsCache = CacheBuilder.newBuilder()
.maximumSize(maximumServiceAccountPermissionsSize!!)
.maximumSize(maximumServiceAccountPermissionsSize)
.build()

environmentCache = CacheBuilder.newBuilder()
.maximumSize(maximumEnvironments!!)
.maximumSize(maximumEnvironments)
.build(object : CacheLoader<UUID, EnvironmentFeatures>() {
override fun load(id: UUID): EnvironmentFeatures {
try {
Expand All @@ -102,10 +201,10 @@ class Dacha2CacheImpl @Inject constructor(private val mrDacha2Api: Dacha2Service
})


serviceAccountCache = CacheBuilder.newBuilder().maximumSize(maximumServiceAccounts!! / 2).build()
serviceAccountCache = CacheBuilder.newBuilder().maximumSize(maximumServiceAccounts / 2).build()

serviceAccountApiKeyCache = CacheBuilder.newBuilder()
.maximumSize(maximumServiceAccounts!!)
.maximumSize(maximumServiceAccounts)
.removalListener(RemovalListener<String, CacheServiceAccount> { notification ->
val value = notification.value!!

Expand Down Expand Up @@ -200,6 +299,9 @@ class Dacha2CacheImpl @Inject constructor(private val mrDacha2Api: Dacha2Service
return environmentCache.get(eId)
}

override fun enableCache(cacheEnable: Boolean) {
}

fun isEnvironmentPresent(eId: UUID): Boolean {
environmentMissCache.getIfPresent(eId)?.let {
return false
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,11 @@ import io.featurehub.dacha.model.PublishEnvironment
import io.featurehub.dacha.model.PublishFeatureValues
import io.featurehub.dacha.model.PublishServiceAccount
import io.featurehub.events.CloudEventReceiverRegistry
import io.featurehub.events.EventingConnection
import io.featurehub.lifecycle.LifecyclePriority
import io.featurehub.lifecycle.LifecycleStarted
import io.featurehub.utils.ExecutorSupplier
import io.featurehub.utils.FallbackPropertyConfig
import jakarta.inject.Inject
import org.glassfish.hk2.api.IterableProvider
import org.slf4j.Logger
Expand All @@ -23,17 +25,23 @@ class Dacha2CloudEventListenerImpl @Inject constructor(
private val dacha2Caches: IterableProvider<Dacha2CacheListener>,
private val dacha2Cache: Dacha2Cache,
register: CloudEventReceiverRegistry,
executorSupplier: ExecutorSupplier
executorSupplier: ExecutorSupplier,
eventingConnection: EventingConnection
) : LifecycleStarted {
private val log: Logger = LoggerFactory.getLogger(Dacha2CloudEventListenerImpl::class.java)
var dacha2CacheList = mutableListOf<Dacha2CacheListener>()
@ConfigKey("dacha2.thread-processors")
var nThreads: Int? = 20
var nThreads = FallbackPropertyConfig.getConfig("dacha2.thread-processors", "20").toInt()
val executorService: ExecutorService

init {
executorService = executorSupplier.executorService(nThreads!!)

eventingConnection.registerForConnectionEvents { event ->
dacha2Cache.enableCache(event == EventingConnection.ConnectionStatus.CONNECTED)
}

dacha2Cache.enableCache(eventingConnection.status() == EventingConnection.ConnectionStatus.CONNECTED)

register.listen(PublishEnvironment::class.java) { env, ce ->
log.trace("received environment {}", env)
dacha2Cache.updateEnvironment(env)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import io.featurehub.dacha2.resource.DachaEnvironmentResource
import io.featurehub.enricher.EnrichmentProcessingFeature
import io.featurehub.enricher.FeatureEnrichmentCache
import io.featurehub.lifecycle.LifecycleListeners
import io.featurehub.utils.FallbackPropertyConfig
import jakarta.inject.Singleton
import jakarta.ws.rs.core.Feature
import jakarta.ws.rs.core.FeatureContext
Expand All @@ -22,8 +23,15 @@ class Dacha2Feature : Feature {

context.register(object: AbstractBinder() {
override fun configure() {
bind(Dacha2CacheImpl::class.java).to(Dacha2Cache::class.java).to(
FeatureEnrichmentCache::class.java).`in`(Singleton::class.java)
if (FallbackPropertyConfig.getConfig("dacha2.cache-dump-on-disconnect") == "false") {
// if this is turned off, we assume straight cache usage and that the environment is tolerant of NATs pod
// movement
bind(Dacha2CacheImpl::class.java).to(Dacha2Cache::class.java).to(
FeatureEnrichmentCache::class.java).`in`(Singleton::class.java)
} else {
bind(Dacha2DelegatingCache::class.java).to(Dacha2Cache::class.java).to(
FeatureEnrichmentCache::class.java).`in`(Singleton::class.java)
}
if (FastlyPublisher.fastlyEnabled()) {
bind(FastlyPublisher::class.java).to(Dacha2CacheListener::class.java).`in`(Singleton::class.java)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import io.featurehub.dacha.model.PublishFeatureValues
import io.featurehub.dacha.model.PublishServiceAccount
import io.featurehub.events.CloudEventReceiverRegistry
import io.featurehub.events.CloudEventReceiverRegistryMock
import io.featurehub.events.EventingConnection
import io.featurehub.jersey.config.CacheJsonMapper
import io.featurehub.mr.model.FeatureValueType
import io.featurehub.utils.ExecutorSupplier
Expand All @@ -29,6 +30,7 @@ class Dacha2CloudEventListenerImplSpec extends Specification {
Dacha2Cache d2Cache
CloudEventReceiverRegistry register
IterableProvider<Dacha2CacheListener> cacheProvider
EventingConnection eventingConnection

UUID serviceAccountId
String apiKeyClientSide
Expand All @@ -45,12 +47,13 @@ class Dacha2CloudEventListenerImplSpec extends Specification {
cache = Mock()
cacheProvider = Mock(IterableProvider)
cacheProvider.iterator() >> [cache].iterator()
eventingConnection = Mock()
register = new CloudEventReceiverRegistryMock()
def execSupplierMock = Mock(ExecutorSupplier)
def execServiceMock = Mock(ExecutorService)
execSupplierMock.executorService(_) >> execServiceMock
execServiceMock.submit(_) >> { Runnable task -> task.run() }
listener = new Dacha2CloudEventListenerImpl(cacheProvider, d2Cache, register, execSupplierMock)
listener = new Dacha2CloudEventListenerImpl(cacheProvider, d2Cache, register, execSupplierMock, eventingConnection)
listener.started()
serviceAccountId = UUID.randomUUID()
apiKeyClientSide = "1234*1"
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package io.featurehub.events

interface EventingConnection {
enum class ConnectionStatus { DISCONNECTED, CONNECTED }

fun registerForConnectionEvents(handler: (event: ConnectionStatus) -> Unit)

fun status(): ConnectionStatus
}

/**
* The default implementation (used by PubSub) ignores disconnection errors.
*/
class DefaultEventingConnection : EventingConnection {
override fun registerForConnectionEvents(handler: (event: EventingConnection.ConnectionStatus) -> Unit) {
handler(EventingConnection.ConnectionStatus.CONNECTED)
}

override fun status(): EventingConnection.ConnectionStatus {
return EventingConnection.ConnectionStatus.CONNECTED
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package io.featurehub.events.pubsub

import io.featurehub.events.DefaultEventingConnection
import io.featurehub.events.EventingConnection
import io.featurehub.events.EventingFeatureSource
import io.featurehub.health.HealthSource
import io.featurehub.lifecycle.LifecycleListeners
Expand All @@ -22,6 +24,7 @@ class PubsubEventFeature : Feature {
context.register(object: AbstractBinder() {
override fun configure() {
bind(PubSubFactoryService::class.java).to(PubSubFactory::class.java).to(HealthSource::class.java).`in`(Singleton::class.java)
bind(DefaultEventingConnection::class.java).to(EventingConnection::class.java).`in`(Singleton::class.java)
}
})

Expand Down
Loading

0 comments on commit 00b6f69

Please sign in to comment.