Skip to content

Commit

Permalink
Add more docs
Browse files Browse the repository at this point in the history
  • Loading branch information
rubengees committed Jan 30, 2024
1 parent dca9293 commit 45f0986
Show file tree
Hide file tree
Showing 7 changed files with 39 additions and 10 deletions.
4 changes: 4 additions & 0 deletions detekt.yml
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
comments:
UndocumentedPublicClass:
active: true

exceptions:
TooGenericExceptionCaught:
active: false
Expand Down
9 changes: 9 additions & 0 deletions src/main/kotlin/de/smartsquare/starter/mqtt/MqttConnector.kt
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@ import java.util.concurrent.CompletableFuture
import java.util.concurrent.TimeUnit
import java.util.concurrent.TimeoutException

/**
* Base class for connectors implementing common logic.
*/
abstract class MqttConnector : SmartLifecycle {

companion object {
Expand All @@ -34,6 +37,9 @@ abstract class MqttConnector : SmartLifecycle {
abstract override fun stop(callback: Runnable)
}

/**
* Class responsible for connecting a client (mqtt 3) and subscribing to collected topics.
*/
class Mqtt3Connector(
client: Mqtt3Client,
private val collector: MqttSubscriberCollector,
Expand Down Expand Up @@ -100,6 +106,9 @@ class Mqtt3Connector(
override fun isRunning() = client.state != MqttClientState.DISCONNECTED
}

/**
* Class responsible for connecting a client (mqtt 5) and subscribing to collected topics.
*/
class Mqtt5Connector(
client: Mqtt5Client,
private val collector: MqttSubscriberCollector,
Expand Down
12 changes: 12 additions & 0 deletions src/main/kotlin/de/smartsquare/starter/mqtt/MqttExceptions.kt
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package de.smartsquare.starter.mqtt

import com.hivemq.client.mqtt.datatypes.MqttTopic

/**
* Exception thrown when invalid methods with [MqttSubscribe] are found.
*/
Expand All @@ -9,3 +11,13 @@ class MqttConfigurationException(message: String) : RuntimeException(message)
* Exception thrown when the connection to mqtt broker fails.
*/
class MqttBrokerConnectException(message: String, cause: Throwable? = null) : RuntimeException(message, cause)

/**
* Exception thrown when processing a single mqtt message fails.
*/
class MqttMessageException(
val topic: MqttTopic,
val payload: ByteArray,
message: String? = null,
cause: Throwable? = null,
) : RuntimeException(message, cause)
4 changes: 4 additions & 0 deletions src/main/kotlin/de/smartsquare/starter/mqtt/MqttExecutor.kt
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,10 @@ package de.smartsquare.starter.mqtt
import org.springframework.context.event.ContextClosedEvent
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor

/**
* Custom executor suitable for processing mqtt messages. It is configured to support a graceful shutdown - Remaining
* messages are processed for a configurable amount of time until stopped.
*/
class MqttExecutor : ThreadPoolTaskExecutor() {

init {
Expand Down
7 changes: 7 additions & 0 deletions src/main/kotlin/de/smartsquare/starter/mqtt/MqttHandler.kt
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@ import org.slf4j.LoggerFactory
import java.lang.reflect.InvocationTargetException
import java.util.concurrent.ConcurrentHashMap

/**
* Class for consuming and forwarding messages to the correct subscriber.
*/
class MqttHandler(
private val collector: MqttSubscriberCollector,
private val adapter: MqttMessageAdapter,
Expand All @@ -18,6 +21,10 @@ class MqttHandler(

private val subscriberCache = ConcurrentHashMap<MqttTopic, ResolvedMqttSubscriber>(collector.subscribers.size)

/**
* Handles a single message. The [topic] is used to determine the correct subscriber which is then invoked with
* parameters produced by the [MqttMessageAdapter].
*/
fun handle(topic: MqttTopic, payload: ByteArray) {
if (logger.isTraceEnabled) {
logger.trace("Received mqtt message on topic [$topic] with payload ${payload.toString(Charsets.UTF_8)}")
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -76,5 +76,8 @@ class MqttSubscriberCollector(private val config: MqttProperties) : BeanPostProc
return topicParamCount > 1 || payloadParamCount > 1
}

/**
* Data class representing a subscriber method annotated with [MqttSubscribe].
*/
data class ResolvedMqttSubscriber(val bean: Any, val method: Method, val topic: MqttTopicFilter, val qos: MqttQos)
}

0 comments on commit 45f0986

Please sign in to comment.