Skip to content

Commit

Permalink
Introduce the hearbeat mechanism
Browse files Browse the repository at this point in the history
Fixes #32. Extends the routes to include a bi-directional heartbeat route. Sending and receiving heartbeats should happen every 10 seconds. If heartbeats are missed for 30 seconds, the connection is set up from scratch again.

The server knows that we support heartbeats through the SetupPayload's feature flags. The API default to false for backwards compatibility.
  • Loading branch information
CodeDrivenMitch committed Oct 24, 2023
1 parent f68ca1c commit 44bec1f
Show file tree
Hide file tree
Showing 5 changed files with 125 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@ package io.axoniq.console.framework.api

object Routes {

object Management {
const val HEARTBEAT = "heartbeat"
}

object EventProcessor {
const val REPORT = "processor-info-report"

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,12 +83,17 @@ data class ConsoleClientIdentifier(
)

data class SetupPayload(
val commandBus: CommandBusInformation,
val queryBus: QueryBusInformation,
val eventStore: EventStoreInformation,
val processors: List<ProcessorInformation>,
val versions: Versions,
val upcasters: List<String>,
val features: SupportedFeatures? = SupportedFeatures(),
val commandBus: CommandBusInformation,
val queryBus: QueryBusInformation,
val eventStore: EventStoreInformation,
val processors: List<ProcessorInformation>,
val versions: Versions,
val upcasters: List<String>,
)

data class SupportedFeatures(
val heartbeat: Boolean? = false,
)

data class Versions(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,8 @@ public void configureModule(@NotNull Configurer configurer) {
c.getComponent(RSocketHandlerRegistrar.class),
c.getComponent(RSocketPayloadEncodingStrategy.class),
reportingTaskExecutor,
ManagementFactory.getRuntimeMXBean().getName()
ManagementFactory.getRuntimeMXBean().getName(),
30
)
)
.registerComponent(ServerProcessorReporter.class,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,11 @@

package io.axoniq.console.framework.client

import io.axoniq.console.framework.api.Routes
import io.axoniq.console.framework.client.strategy.RSocketPayloadEncodingStrategy
import io.netty.buffer.ByteBufAllocator
import io.netty.buffer.CompositeByteBuf
import io.rsocket.Payload
import io.rsocket.RSocket
import io.rsocket.core.RSocketConnector
import io.rsocket.metadata.WellKnownMimeType
Expand All @@ -28,10 +30,22 @@ import org.axonframework.lifecycle.Phase
import org.slf4j.LoggerFactory
import reactor.core.publisher.Mono
import reactor.netty.tcp.TcpClient
import java.time.Instant
import java.time.temporal.ChronoUnit
import java.util.concurrent.ScheduledExecutorService
import java.util.concurrent.ScheduledFuture
import java.util.concurrent.TimeUnit
import kotlin.math.pow

/**
* The beating heart of the Console client. This class is responsible for connecting to the Console, and keeping
* the connection alive. It will also ensure that the connection is re-established in case of a disconnect.
*
* The server will send heartbeats to the client every 10 seconds. Upon not receiving heartbeats for 32 seconds
* (missing three of them) we can be sure the connection is dead, and the connection is killed.
* The client will send heartbeats every 10 seconds as well. It's up to the server to decide how lenient it is in terms
* of missing heartbeats.
*/
@Suppress("MemberVisibilityCanBePrivate")
class AxoniqConsoleRSocketClient(
private val environmentId: String,
Expand All @@ -46,46 +60,86 @@ class AxoniqConsoleRSocketClient(
private val encodingStrategy: RSocketPayloadEncodingStrategy,
private val executor: ScheduledExecutorService,
private val nodeName: String,
private val heartbeatTimeout: Long = 30
) : Lifecycle {
private var scheduledReconnector: ScheduledFuture<*>? = null
private var maintenanceTask: ScheduledFuture<*>? = null
private val logger = LoggerFactory.getLogger(this::class.java)

private lateinit var rsocket: RSocket
private var connected = false
private var rsocket: RSocket? = null
private var lastSentHeartbeat = Instant.EPOCH
private var lastReceivedHeartbeat = Instant.now()
private var lastConnectionTry = Instant.EPOCH
private var connectionRetryCount = 0

init {
registrar.registerHandlerWithoutPayload(Routes.Management.HEARTBEAT) {
logger.debug("Received heartbeat from AxonIQ Console. Last one was: {}", lastReceivedHeartbeat)
lastReceivedHeartbeat = Instant.now()
lastReceivedHeartbeat
}
}

override fun registerLifecycleHandlers(registry: Lifecycle.LifecycleRegistry) {
registry.onStart(Phase.EXTERNAL_CONNECTIONS, this::start)
registry.onShutdown(Phase.EXTERNAL_CONNECTIONS, this::dispose)
}

/**
* Sends a message to the AxonIQ Console. If there is no connection active, does nothing silently.
* The connection will automatically be setup. Losing a few reports is no problem.
*/
fun send(route: String, payload: Any): Mono<Void> {
if (!connected) {
return Mono.empty()
}
return rsocket
.requestResponse(encodingStrategy.encode(payload, createRoutingMetadata(route)))
.doOnError {
if (it.message?.contains("Access Denied") == true) {
logger.info("Was unable to send call to AxonIQ Console since authentication was incorrect!")
}
}
.then()
?.requestResponse(encodingStrategy.encode(payload, createRoutingMetadata(route)))
?.then()
?: Mono.empty()
}

/**
* Starts the connection, and starts the maintenance task.
* The task will ensure that if heartbeats are missed the connection is killed, as well as re-setup in case
* the connection was lost. The task will do so with an exponential backoff with factor 2, up to a maximum of
* 60 seconds.
*/
fun start() {
this.scheduledReconnector = executor.scheduleWithFixedDelay({
if (!connected) {
logger.info("Reconnecting to AxonIQ Console...")
connect()
this.maintenanceTask = executor.scheduleWithFixedDelay(
this::ensureConnectedAndAlive,
initialDelay,
1000, TimeUnit.MILLISECONDS
)
}

private fun ensureConnectedAndAlive() {
if (isConnected()) {
if (!isAlive()) {
logger.info("Haven't received a heartbeat for {} seconds from AxonIQ Console. Reconnecting...", ChronoUnit.SECONDS.between(lastReceivedHeartbeat, Instant.now()))
rsocket?.dispose()
rsocket = null
} else if(lastSentHeartbeat < Instant.now().minusSeconds(10)) {
logger.debug("Sending heartbeat to AxonIQ Console")
lastSentHeartbeat = Instant.now()
sendHeartbeat().subscribe()
}
}, initialDelay, 10000, TimeUnit.MILLISECONDS)
}
if (!isConnected()) {
val secondsToWaitForReconnect = BACKOFF_FACTOR.pow(connectionRetryCount.toDouble()).coerceAtMost(60.0)
if (ChronoUnit.SECONDS.between(lastConnectionTry, Instant.now()) < secondsToWaitForReconnect) {
return
}
logger.info("Reconnecting to AxonIQ Console...")
connectSafely()
}
}

fun connect() {
private fun connectSafely() {
try {
rsocket = createRSocket()
connected = true

// Send a heartbeat to ensure it's set up correctly. Rsocket initializes lazily, so sending a call will do it.
sendHeartbeat().block()
logger.info("Connection to AxonIQ Console set up successfully!")
} catch (e: Exception) {
disposeRsocket()
logger.info("Failed to connect to AxonIQ Console", e)
}
}
Expand All @@ -100,8 +154,10 @@ class AxoniqConsoleRSocketClient(
accessToken = accessToken
)

val setupPayload =
encodingStrategy.encode(setupPayloadCreator.createReport(), createSetupMetadata(authentication))
val setupPayload = encodingStrategy.encode(
setupPayloadCreator.createReport(),
createSetupMetadata(authentication)
)
val rsocket = RSocketConnector.create()
.metadataMimeType(WellKnownMimeType.MESSAGE_RSOCKET_COMPOSITE_METADATA.string)
.dataMimeType(encodingStrategy.getMimeType().string)
Expand All @@ -114,6 +170,20 @@ class AxoniqConsoleRSocketClient(
return rsocket
}

private fun sendHeartbeat(): Mono<Payload> {
return rsocket
?.requestResponse(encodingStrategy.encode("", createRoutingMetadata(Routes.Management.HEARTBEAT)))
?.doOnError {
if (it.message?.contains("Access Denied") == true) {
logger.info("Was unable to send call to AxonIQ Console since authentication was incorrect!")
}
}
?.doOnSuccess {
logger.debug("Heartbeat successfully sent to AxonIQ Console")
}
?: Mono.empty()
}

private fun createRoutingMetadata(route: String): CompositeByteBuf {
val metadata: CompositeByteBuf = ByteBufAllocator.DEFAULT.compositeBuffer()
metadata.addRouteMetadata(route)
Expand All @@ -135,20 +205,28 @@ class AxoniqConsoleRSocketClient(
.host(host)
.port(port)
.doOnDisconnected {
connected = false
disposeRsocket()
}
return if (secure) {
return client.secure()
} else client
}

fun isConnected() = connected
fun isConnected() = rsocket != null
fun isAlive() = isConnected() && lastReceivedHeartbeat > Instant.now().minusSeconds(heartbeatTimeout)

fun disposeRsocket() {
rsocket?.dispose()
rsocket = null
}

fun dispose() {
if (connected) {
rsocket.dispose()
}
scheduledReconnector?.cancel(true)
scheduledReconnector = null
disposeRsocket()
maintenanceTask?.cancel(true)
maintenanceTask = null
}

companion object {
private const val BACKOFF_FACTOR = 2.0
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ class SetupPayloadCreator(
processors = processors.map {
val processor =
eventProcessingConfiguration.eventProcessor(it, StreamingEventProcessor::class.java).get()
io.axoniq.console.framework.api.ProcessorInformation(
ProcessorInformation(
name = it,
supportsReset = processor.supportsReset(),
batchSize = processor.getBatchSize(),
Expand Down

0 comments on commit 44bec1f

Please sign in to comment.