Skip to content

Commit

Permalink
Send an approximate event store size in the setup payload.
Browse files Browse the repository at this point in the history
  • Loading branch information
Gerard Klijs committed Feb 13, 2024
1 parent ba7d659 commit a6e5b62
Show file tree
Hide file tree
Showing 2 changed files with 137 additions and 117 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -111,29 +111,30 @@ data class CommandBusInformation(
val axonServer: Boolean,
val localSegmentType: String?,
val context: String?,
val handlerInterceptors: List<io.axoniq.console.framework.api.InterceptorInformation> = emptyList(),
val dispatchInterceptors: List<io.axoniq.console.framework.api.InterceptorInformation> = emptyList(),
val messageSerializer: io.axoniq.console.framework.api.SerializerInformation?,
val handlerInterceptors: List<InterceptorInformation> = emptyList(),
val dispatchInterceptors: List<InterceptorInformation> = emptyList(),
val messageSerializer: SerializerInformation?,
)

data class QueryBusInformation(
val type: String,
val axonServer: Boolean,
val localSegmentType: String?,
val context: String?,
val handlerInterceptors: List<io.axoniq.console.framework.api.InterceptorInformation> = emptyList(),
val dispatchInterceptors: List<io.axoniq.console.framework.api.InterceptorInformation> = emptyList(),
val messageSerializer: io.axoniq.console.framework.api.SerializerInformation?,
val serializer: io.axoniq.console.framework.api.SerializerInformation?,
val handlerInterceptors: List<InterceptorInformation> = emptyList(),
val dispatchInterceptors: List<InterceptorInformation> = emptyList(),
val messageSerializer: SerializerInformation?,
val serializer: SerializerInformation?,
)

data class EventStoreInformation(
val type: String,
val axonServer: Boolean,
val context: String?,
val dispatchInterceptors: List<io.axoniq.console.framework.api.InterceptorInformation> = emptyList(),
val eventSerializer: io.axoniq.console.framework.api.SerializerInformation?,
val snapshotSerializer: io.axoniq.console.framework.api.SerializerInformation?,
val dispatchInterceptors: List<InterceptorInformation> = emptyList(),
val eventSerializer: SerializerInformation?,
val snapshotSerializer: SerializerInformation?,
val approximateSize: Long? = null,
)

data class ProcessorInformation(
Expand All @@ -147,7 +148,7 @@ data class ProcessorInformation(
val tokenStoreClaimTimeout: Long,
val errorHandler: String,
val invocationErrorHandler: String,
val interceptors: List<io.axoniq.console.framework.api.InterceptorInformation>,
val interceptors: List<InterceptorInformation>,
)

data class InterceptorInformation(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,7 @@ import org.axonframework.commandhandling.CommandBus
import org.axonframework.common.ReflectionUtils
import org.axonframework.config.Configuration
import org.axonframework.config.EventProcessingModule
import org.axonframework.eventhandling.MultiStreamableMessageSource
import org.axonframework.eventhandling.StreamingEventProcessor
import org.axonframework.eventhandling.*
import org.axonframework.eventhandling.tokenstore.TokenStore
import org.axonframework.eventsourcing.eventstore.EventStore
import org.axonframework.messaging.StreamableMessageSource
Expand All @@ -34,94 +33,94 @@ import java.time.temporal.ChronoUnit
import java.time.temporal.TemporalAmount

class SetupPayloadCreator(
private val configuration: Configuration,
private val configuration: Configuration,
) {
private val eventProcessingConfiguration = configuration.eventProcessingConfiguration() as EventProcessingModule

fun createReport(): SetupPayload {
val processors = eventProcessingConfiguration.eventProcessors()
.filter { it.value is StreamingEventProcessor }
.map { entry ->
entry.key
}
.filter { it.value is StreamingEventProcessor }
.map { entry ->
entry.key
}
return SetupPayload(
commandBus = commandBusInformation(),
queryBus = queryBusInformation(),
eventStore = eventBusInformation(),
processors = processors.map {
val processor =
eventProcessingConfiguration.eventProcessor(it, StreamingEventProcessor::class.java).get()
ProcessorInformation(
name = it,
supportsReset = processor.supportsReset(),
batchSize = processor.getBatchSize(),
messageSourceType = processor.getMessageSource(),
tokenClaimInterval = processor.getTokenClaimInterval(),
tokenStoreClaimTimeout = processor.getStoreTokenClaimTimeout(),
errorHandler = eventProcessingConfiguration.errorHandler(it)::class.java.name,
invocationErrorHandler = eventProcessingConfiguration.listenerInvocationErrorHandler(it)::class.java.name,
interceptors = processor.getInterceptors("interceptors"),
tokenStoreType = processor.getStoreTokenStoreType(),
contexts = processor.contexts()
commandBus = commandBusInformation(),
queryBus = queryBusInformation(),
eventStore = eventBusInformation(),
processors = processors.map {
val processor =
eventProcessingConfiguration.eventProcessor(it, StreamingEventProcessor::class.java).get()
ProcessorInformation(
name = it,
supportsReset = processor.supportsReset(),
batchSize = processor.getBatchSize(),
messageSourceType = processor.getMessageSource(),
tokenClaimInterval = processor.getTokenClaimInterval(),
tokenStoreClaimTimeout = processor.getStoreTokenClaimTimeout(),
errorHandler = eventProcessingConfiguration.errorHandler(it)::class.java.name,
invocationErrorHandler = eventProcessingConfiguration.listenerInvocationErrorHandler(it)::class.java.name,
interceptors = processor.getInterceptors("interceptors"),
tokenStoreType = processor.getStoreTokenStoreType(),
contexts = processor.contexts()
)
},
versions = versionInformation(),
upcasters = upcasters(),
features = SupportedFeatures(
heartbeat = true
)
},
versions = versionInformation(),
upcasters = upcasters(),
features = SupportedFeatures(
heartbeat = true
)
)
}

private fun upcasters(): List<String> {
val upcasters =
configuration.upcasterChain().getPropertyValue<List<out Upcaster<*>>>("upcasters") ?: emptyList()
configuration.upcasterChain().getPropertyValue<List<out Upcaster<*>>>("upcasters") ?: emptyList()
return upcasters.map { it::class.java.name }
}

private val dependenciesToCheck = listOf(
"org.axonframework:axon-messaging",
"org.axonframework:axon-configuration",
"org.axonframework:axon-disruptor",
"org.axonframework:axon-eventsourcing",
"org.axonframework:axon-legacy",
"org.axonframework:axon-metrics",
"org.axonframework:axon-micrometer",
"org.axonframework:axon-modelling",
"org.axonframework:axon-server-connector",
"org.axonframework:axon-spring",
"org.axonframework:axon-spring-boot-autoconfigure",
"org.axonframework:axon-spring-boot-starter",
"org.axonframework:axon-tracing-opentelemetry",
"org.axonframework.extensions.amqp:axon-amqp",
"org.axonframework.extensions.jgroups:axon-jgroups",
"org.axonframework.extensions.kafka:axon-kafka",
"org.axonframework.extensions.mongo:axon-mongo",
"org.axonframework.extensions.reactor:axon-reactor",
"org.axonframework.extensions.springcloud:axon-springcloud",
"org.axonframework.extensions.tracing:axon-tracing",
"io.axoniq:axonserver-connector-java",
"io.axoniq.console:console-framework-client",
"org.axonframework:axon-messaging",
"org.axonframework:axon-configuration",
"org.axonframework:axon-disruptor",
"org.axonframework:axon-eventsourcing",
"org.axonframework:axon-legacy",
"org.axonframework:axon-metrics",
"org.axonframework:axon-micrometer",
"org.axonframework:axon-modelling",
"org.axonframework:axon-server-connector",
"org.axonframework:axon-spring",
"org.axonframework:axon-spring-boot-autoconfigure",
"org.axonframework:axon-spring-boot-starter",
"org.axonframework:axon-tracing-opentelemetry",
"org.axonframework.extensions.amqp:axon-amqp",
"org.axonframework.extensions.jgroups:axon-jgroups",
"org.axonframework.extensions.kafka:axon-kafka",
"org.axonframework.extensions.mongo:axon-mongo",
"org.axonframework.extensions.reactor:axon-reactor",
"org.axonframework.extensions.springcloud:axon-springcloud",
"org.axonframework.extensions.tracing:axon-tracing",
"io.axoniq:axonserver-connector-java",
"io.axoniq.console:console-framework-client",
)

private fun versionInformation(): Versions {
return Versions(
frameworkVersion = resolveVersion("org.axonframework:axon-messaging") ?: "Unknown",
moduleVersions = dependenciesToCheck.map {
io.axoniq.console.framework.api.ModuleVersion(
it,
resolveVersion(it)
)
}
frameworkVersion = resolveVersion("org.axonframework:axon-messaging") ?: "Unknown",
moduleVersions = dependenciesToCheck.map {
io.axoniq.console.framework.api.ModuleVersion(
it,
resolveVersion(it)
)
}
)
}

private fun resolveVersion(dep: String): String? {
val (groupId, artifactId) = dep.split(":")
return MavenArtifactVersionResolver(
groupId,
artifactId,
this::class.java.classLoader
groupId,
artifactId,
this::class.java.classLoader
).get()
}

Expand All @@ -143,44 +142,64 @@ class SetupPayloadCreator(
bus.getPropertyValue<Any>("serializer")?.getSerializerType("serializer")
} else null
return QueryBusInformation(
type = bus::class.java.name,
axonServer = axonServer,
localSegmentType = localSegmentType,
context = context,
handlerInterceptors = handlerInterceptors,
dispatchInterceptors = dispatchInterceptors,
messageSerializer = messageSerializer,
serializer = serializer,
type = bus::class.java.name,
axonServer = axonServer,
localSegmentType = localSegmentType,
context = context,
handlerInterceptors = handlerInterceptors,
dispatchInterceptors = dispatchInterceptors,
messageSerializer = messageSerializer,
serializer = serializer,
)
}

private fun eventBusInformation(): EventStoreInformation {
val bus = configuration.eventBus().unwrapPossiblyDecoratedClass(EventStore::class.java)
val axonServer =
bus::class.java.name == "org.axonframework.axonserver.connector.event.axon.AxonServerEventStore"
bus::class.java.name == "org.axonframework.axonserver.connector.event.axon.AxonServerEventStore"
val context = if (axonServer) {
bus.getPropertyValue<Any>("storageEngine")?.getPropertyValue<String>("context")
} else null
val dispatchInterceptors = bus.getInterceptors("dispatchInterceptors")
return EventStoreInformation(
type = bus::class.java.name,
axonServer = axonServer,
context = context,
dispatchInterceptors = dispatchInterceptors,
eventSerializer = bus.getPropertyValue<Any>("storageEngine")?.getSerializerType("eventSerializer"),
snapshotSerializer = bus.getPropertyValue<Any>("storageEngine")?.getSerializerType("snapshotSerializer"),
type = bus::class.java.name,
axonServer = axonServer,
context = context,
dispatchInterceptors = dispatchInterceptors,
eventSerializer = bus.getPropertyValue<Any>("storageEngine")?.getSerializerType("eventSerializer"),
snapshotSerializer = bus.getPropertyValue<Any>("storageEngine")?.getSerializerType("snapshotSerializer"),
approximateSize = getApproximateSize(bus)
)
}

private fun getApproximateSize(bus: EventBus): Long? =
if (bus is StreamableMessageSource<*>) {
runCatching {
getSizeFromToken(bus.createHeadToken())
}.getOrElse { null }
} else {
null
}

private fun getSizeFromToken(token: TrackingToken): Long? =
when (token) {
is GlobalSequenceTrackingToken -> token.globalIndex
is GapAwareTrackingToken -> token.index
is MultiSourceTrackingToken -> token.trackingTokens.values.sumOf {
getSizeFromToken(it) ?: 0
}
else -> null
}

private fun commandBusInformation(): CommandBusInformation {
val bus = configuration.commandBus().unwrapPossiblyDecoratedClass(CommandBus::class.java)
val axonServer = bus::class.java.name == "org.axonframework.axonserver.connector.command.AxonServerCommandBus"
val localSegmentType =
if (axonServer) bus.getPropertyTypeNested("localSegment", CommandBus::class.java) else null
if (axonServer) bus.getPropertyTypeNested("localSegment", CommandBus::class.java) else null
val context = if (axonServer) bus.getPropertyValue<String>("context") else null
val handlerInterceptors = if (axonServer) {
bus.getPropertyValue<Any>("localSegment")?.getInterceptors("handlerInterceptors", "invokerInterceptors")
?: emptyList()
?: emptyList()
} else {
bus.getInterceptors("handlerInterceptors", "invokerInterceptors")
}
Expand All @@ -189,50 +208,50 @@ class SetupPayloadCreator(
bus.getPropertyValue<Any>("serializer")?.getSerializerType("messageSerializer")
} else null
return CommandBusInformation(
type = bus::class.java.name,
axonServer = axonServer,
localSegmentType = localSegmentType,
context = context,
handlerInterceptors = handlerInterceptors,
dispatchInterceptors = dispatchInterceptors,
messageSerializer = serializer
type = bus::class.java.name,
axonServer = axonServer,
localSegmentType = localSegmentType,
context = context,
handlerInterceptors = handlerInterceptors,
dispatchInterceptors = dispatchInterceptors,
messageSerializer = serializer
)
}

private fun <T> Any.getPropertyValue(fieldName: String): T? {
val field = ReflectionUtils.fieldsOf(this::class.java).firstOrNull { it.name == fieldName } ?: return null
return ReflectionUtils.getMemberValue(
field,
this
field,
this
)
}

private fun Any.getPropertyType(fieldName: String): String {
return ReflectionUtils.getMemberValue<Any>(
ReflectionUtils.fieldsOf(this::class.java).first { it.name == fieldName },
this
ReflectionUtils.fieldsOf(this::class.java).first { it.name == fieldName },
this
).let { it::class.java.name }
}

private fun Any.getPropertyTypeNested(fieldName: String, clazz: Class<out Any>): String {
return ReflectionUtils.getMemberValue<Any>(
ReflectionUtils.fieldsOf(this::class.java).first { it.name == fieldName },
this
ReflectionUtils.fieldsOf(this::class.java).first { it.name == fieldName },
this
)
.let { it.unwrapPossiblyDecoratedClass(clazz) }
.let { it::class.java.name }
.let { it.unwrapPossiblyDecoratedClass(clazz) }
.let { it::class.java.name }
}

private fun StreamingEventProcessor.getBatchSize(): Int = getPropertyValue("batchSize") ?: -1
private fun StreamingEventProcessor.getMessageSource(): String =
getPropertyTypeNested("messageSource", EventStore::class.java)
getPropertyTypeNested("messageSource", EventStore::class.java)

private fun StreamingEventProcessor.getTokenClaimInterval(): Long = getPropertyValue("tokenClaimInterval") ?: -1
private fun StreamingEventProcessor.getStoreTokenStoreType(): String =
getPropertyTypeNested("tokenStore", TokenStore::class.java)
getPropertyTypeNested("tokenStore", TokenStore::class.java)

private fun StreamingEventProcessor.getStoreTokenClaimTimeout(): Long = getPropertyValue<Any>("tokenStore")
?.getPropertyValue<TemporalAmount>("claimTimeout")?.let { it.get(ChronoUnit.SECONDS) * 1000 } ?: -1
?.getPropertyValue<TemporalAmount>("claimTimeout")?.let { it.get(ChronoUnit.SECONDS) * 1000 } ?: -1


private fun Any.getInterceptors(vararg fieldNames: String): List<InterceptorInformation> {
Expand All @@ -245,13 +264,13 @@ class SetupPayloadCreator(
return emptyList()
}
return interceptors
.filterNotNull()
.map {
if (it is AxoniqConsoleMeasuringHandlerInterceptor) {
InterceptorInformation(it.subject::class.java.name, true)
} else InterceptorInformation(it::class.java.name, false)
}
.filter { !it.type.startsWith("org.axonframework.eventhandling") }
.filterNotNull()
.map {
if (it is AxoniqConsoleMeasuringHandlerInterceptor) {
InterceptorInformation(it.subject::class.java.name, true)
} else InterceptorInformation(it::class.java.name, false)
}
.filter { !it.type.startsWith("org.axonframework.eventhandling") }
}

private fun Any.getSerializerType(fieldName: String): SerializerInformation? {
Expand Down

0 comments on commit a6e5b62

Please sign in to comment.