Skip to content

Commit

Permalink
Add a notification list to the api, and will log any notifications on…
Browse files Browse the repository at this point in the history
… the correct leven when returned from the server.
  • Loading branch information
Gerard Klijs authored and gklijs committed Jan 30, 2024
1 parent 3a9ffc3 commit 29afc6a
Show file tree
Hide file tree
Showing 4 changed files with 132 additions and 36 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Copyright (c) 2022-2024. AxonIQ B.V.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.axoniq.console.framework.api.notifications

import com.fasterxml.jackson.annotation.JsonProperty

data class Notification(
@JsonProperty("l")
val level: NotificationLevel,
@JsonProperty("m")
val message: String,
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Copyright (c) 2022-2024. AxonIQ B.V.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.axoniq.console.framework.api.notifications

import com.fasterxml.jackson.annotation.JsonProperty

enum class NotificationLevel {
@JsonProperty("d")
Debug,

@JsonProperty("i")
Info,

@JsonProperty("w")
Warn
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/*
* Copyright (c) 2022-2024. AxonIQ B.V.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.axoniq.console.framework.api.notifications

import com.fasterxml.jackson.annotation.JsonProperty

data class NotificationList(
@JsonProperty("l")
val messages: List<Notification>,
)
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ package io.axoniq.console.framework.client

import io.axoniq.console.framework.api.ClientSettings
import io.axoniq.console.framework.api.Routes
import io.axoniq.console.framework.api.notifications.NotificationLevel
import io.axoniq.console.framework.api.notifications.NotificationList
import io.axoniq.console.framework.client.strategy.RSocketPayloadEncodingStrategy
import io.netty.buffer.ByteBufAllocator
import io.netty.buffer.CompositeByteBuf
Expand All @@ -28,6 +30,7 @@ import io.rsocket.metadata.WellKnownMimeType
import io.rsocket.transport.netty.client.TcpClientTransport
import org.axonframework.lifecycle.Lifecycle
import org.axonframework.lifecycle.Phase
import org.slf4j.Logger
import org.slf4j.LoggerFactory
import reactor.core.publisher.Mono
import reactor.netty.tcp.TcpClient
Expand All @@ -52,19 +55,19 @@ import kotlin.math.pow
*/
@Suppress("MemberVisibilityCanBePrivate")
class AxoniqConsoleRSocketClient(
private val environmentId: String,
private val accessToken: String,
private val applicationName: String,
private val host: String,
private val port: Int,
private val secure: Boolean,
private val initialDelay: Long,
private val setupPayloadCreator: SetupPayloadCreator,
private val registrar: RSocketHandlerRegistrar,
private val encodingStrategy: RSocketPayloadEncodingStrategy,
private val clientSettingsService: ClientSettingsService,
private val executor: ScheduledExecutorService,
private val nodeName: String,
private val environmentId: String,
private val accessToken: String,
private val applicationName: String,
private val host: String,
private val port: Int,
private val secure: Boolean,
private val initialDelay: Long,
private val setupPayloadCreator: SetupPayloadCreator,
private val registrar: RSocketHandlerRegistrar,
private val encodingStrategy: RSocketPayloadEncodingStrategy,
private val clientSettingsService: ClientSettingsService,
private val executor: ScheduledExecutorService,
private val nodeName: String,
) : Lifecycle {
private val heartbeatOrchestrator = HeartbeatOrchestrator()
private var maintenanceTask: ScheduledFuture<*>? = null
Expand Down Expand Up @@ -92,10 +95,13 @@ class AxoniqConsoleRSocketClient(
* Sends a message to the AxonIQ Console. If there is no connection active, does nothing silently.
* The connection will automatically be setup. Losing a few reports is no problem.
*/
fun send(route: String, payload: Any): Mono<Void> {
fun send(route: String, payload: Any): Mono<Unit> {
return rsocket
?.requestResponse(encodingStrategy.encode(payload, createRoutingMetadata(route)))
?.then()
?.map {
val notifications = encodingStrategy.decode(it, NotificationList::class.java)
logger.log(notifications)
}
?: Mono.empty()
}

Expand All @@ -106,7 +112,7 @@ class AxoniqConsoleRSocketClient(
* 60 seconds.
*/
fun start() {
if(this.maintenanceTask != null) {
if (this.maintenanceTask != null) {
return
}
this.maintenanceTask = executor.scheduleWithFixedDelay(
Expand Down Expand Up @@ -143,27 +149,27 @@ class AxoniqConsoleRSocketClient(

private fun createRSocket(): RSocket {
val authentication = io.axoniq.console.framework.api.ConsoleClientAuthentication(
identification = io.axoniq.console.framework.api.ConsoleClientIdentifier(
environmentId = environmentId,
applicationName = applicationName,
nodeName = nodeName
),
accessToken = accessToken
identification = io.axoniq.console.framework.api.ConsoleClientIdentifier(
environmentId = environmentId,
applicationName = applicationName,
nodeName = nodeName
),
accessToken = accessToken
)

val setupPayload = encodingStrategy.encode(
setupPayloadCreator.createReport(),
createSetupMetadata(authentication)
)
val rsocket = RSocketConnector.create()
.metadataMimeType(WellKnownMimeType.MESSAGE_RSOCKET_COMPOSITE_METADATA.string)
.dataMimeType(encodingStrategy.getMimeType().string)
.setupPayload(setupPayload)
.acceptor { _, rsocket ->
Mono.just(registrar.createRespondingRSocketFor(rsocket))
}
.connect(tcpClientTransport())
.block()!!
.metadataMimeType(WellKnownMimeType.MESSAGE_RSOCKET_COMPOSITE_METADATA.string)
.dataMimeType(encodingStrategy.getMimeType().string)
.setupPayload(setupPayload)
.acceptor { _, rsocket ->
Mono.just(registrar.createRespondingRSocketFor(rsocket))
}
.connect(tcpClientTransport())
.block()!!
return rsocket
}

Expand All @@ -181,15 +187,15 @@ class AxoniqConsoleRSocketClient(
}

private fun tcpClientTransport() =
TcpClientTransport.create(tcpClient())
TcpClientTransport.create(tcpClient())

private fun tcpClient(): TcpClient {
val client = TcpClient.create()
.host(host)
.port(port)
.doOnDisconnected {
disposeCurrentConnection()
}
.host(host)
.port(port)
.doOnDisconnected {
disposeCurrentConnection()
}
return if (secure) {
return client.secure()
} else client
Expand Down Expand Up @@ -280,4 +286,14 @@ class AxoniqConsoleRSocketClient(
}
}
}

private fun Logger.log(notificationList: NotificationList) {
notificationList.messages.forEach {
when (it.level) {
NotificationLevel.Debug -> this.debug(it.message)
NotificationLevel.Info -> this.info(it.message)
NotificationLevel.Warn -> this.warn(it.message)
}
}
}
}

0 comments on commit 29afc6a

Please sign in to comment.