Skip to content

Commit

Permalink
Initial revision for comment.
Browse files Browse the repository at this point in the history
  • Loading branch information
JemDay committed Jan 18, 2023
1 parent f71303b commit 0d674c7
Show file tree
Hide file tree
Showing 30 changed files with 1,804 additions and 6 deletions.
31 changes: 31 additions & 0 deletions bindings/mqtt/core/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>cloudevents-parent</artifactId>
<groupId>io.cloudevents</groupId>
<version>2.5.0-SNAPSHOT</version>
<relativePath>../../../pom.xml</relativePath>
</parent>
<modelVersion>4.0.0</modelVersion>

<artifactId>cloudevents-mqtt-core</artifactId>
<name>CloudEvents - MQTT Common</name>
<packaging>jar</packaging>

<properties>
<module-name>io.cloudevents.mqtt.core</module-name>
</properties>

<dependencies>

<dependency>
<groupId>io.cloudevents</groupId>
<artifactId>cloudevents-core</artifactId>
<version>${project.version}</version>
</dependency>

</dependencies>

</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
package io.cloudevents.mqtt.core;

import io.cloudevents.SpecVersion;
import io.cloudevents.core.data.BytesCloudEventData;
import io.cloudevents.core.message.impl.BaseGenericBinaryMessageReaderImpl;
import io.cloudevents.core.v1.CloudEventV1;

import java.util.function.BiConsumer;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

/**
* Enable the hydration of a CloudEvent in binary mode from an MQTT message.
* <p>
* This abstract class provides common behavior across different MQTT
* client implementations.
*/
public abstract class BaseMqttBinaryMessageReader extends BaseGenericBinaryMessageReaderImpl<String, Object> {

/**
* CloudEvent attribute names must match this pattern.
*/
private static final Pattern CE_ATTR_NAME_REGEX = Pattern.compile("^[a-z\\d]+$");
private final String contentType;

/**
* Initialise the binary message reader.
* @param version The CloudEvent message version.
* @param contentType The assigned media content type.
* @param payload The raw data payload from the MQTT message.
*/
protected BaseMqttBinaryMessageReader(final SpecVersion version, final String contentType, final byte[] payload) {
super(version, payload != null && payload.length > 0 ? BytesCloudEventData.wrap(payload) : null);
this.contentType = contentType;
}

// --- Overrides

@Override
protected boolean isContentTypeHeader(String key) {
return false; // The content type is not defined in a user-property
}

@Override
protected boolean isCloudEventsHeader(String key) {

// The binding specification does not require name prefixing,
// as such any user-property is a potential CE Context Attribute.
//
// If the name complies with CE convention then we'll assume
// it's a context attribute.
//
Matcher m = CE_ATTR_NAME_REGEX.matcher(key);
return m.matches();
}

@Override
protected String toCloudEventsKey(String key) {
return key; // No special prefixing occurs in the MQTT binding spec.
}


@Override
protected void forEachHeader(BiConsumer<String, Object> fn) {

// If there is a content-type then we need set it.
// Inspired by AMQP/Proton code :-)

if (contentType != null) {
fn.accept(CloudEventV1.DATACONTENTTYPE, contentType);
}

// Now process each MQTT User Property.
forEachUserProperty(fn);

}

@Override
protected String toCloudEventsValue(Object value) {
return value.toString();
}

/**
* Visit each MQTT user-property and invoke the supplied function.
* @param fn The function to invoke for each MQTT User property.
*/
protected abstract void forEachUserProperty(BiConsumer<String, Object> fn);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package io.cloudevents.mqtt.core;

import io.cloudevents.core.format.EventFormat;
import io.cloudevents.core.provider.EventFormatProvider;

/**
* General MQTT Utilities and Helpers
*/
public class MqttUtils {

private MqttUtils() {}

private static final String DEFAULT_FORMAT = "application/cloudevents+json";

/**
* Obtain the {@link EventFormat} to use when working with MQTT V3
* messages.
*
* @return An event format.
*/
public static EventFormat getDefaultEventFormat () {

return EventFormatProvider.getInstance().resolveFormat(DEFAULT_FORMAT);

}

/**
* Get the default content type to assume for MQTT messages.
* @return A Content-Type
*/
public static final String getDefaultContentType() {
return DEFAULT_FORMAT;
}

}
80 changes: 80 additions & 0 deletions bindings/mqtt/hivemq/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>cloudevents-parent</artifactId>
<groupId>io.cloudevents</groupId>
<version>2.5.0-SNAPSHOT</version>
<relativePath>../../../pom.xml</relativePath>
</parent>

<modelVersion>4.0.0</modelVersion>

<artifactId>cloudevents-mqtt-hivemq</artifactId>
<name>CloudEvents - MQTT HiveMQ Binding</name>
<packaging>jar</packaging>

<properties>
<module-name>io.cloudevents.mqtt.hivemq</module-name>
<hivemq.version>1.3.0</hivemq.version>
</properties>

<dependencies>

<dependency>
<groupId>io.cloudevents</groupId>
<artifactId>cloudevents-core</artifactId>
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>io.cloudevents</groupId>
<artifactId>cloudevents-mqtt-core</artifactId>
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>com.hivemq</groupId>
<artifactId>hivemq-mqtt-client</artifactId>
<version>${hivemq.version}</version>
<scope>provided</scope>
</dependency>

<!-- Testing Dependencies -->

<dependency>
<groupId>io.cloudevents</groupId>
<artifactId>cloudevents-core</artifactId>
<classifier>tests</classifier>
<type>test-jar</type>
<version>${project.version}</version>
<scope>test</scope>
</dependency>

<!-- We need a JSON Format for V3 compliance checking -->

<dependency>
<groupId>io.cloudevents</groupId>
<artifactId>cloudevents-json-jackson</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.assertj</groupId>
<artifactId>assertj-core</artifactId>
<version>${assertj-core.version}</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter</artifactId>
<version>${junit-jupiter.version}</version>
<scope>test</scope>
</dependency>

</dependencies>

</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package io.cloudevents.mqtt.hivemq;

import com.hivemq.client.mqtt.mqtt5.message.publish.Mqtt5Publish;
import io.cloudevents.SpecVersion;
import io.cloudevents.mqtt.core.BaseMqttBinaryMessageReader;

import java.util.function.BiConsumer;

final class BinaryMessageReader extends BaseMqttBinaryMessageReader {

Mqtt5Publish message;

BinaryMessageReader(final SpecVersion version, final String contentType, Mqtt5Publish message) {
super(version, contentType, message.getPayloadAsBytes());

this.message = message;
}

@Override
protected void forEachUserProperty(BiConsumer<String, Object> fn) {

message.getUserProperties().asList().forEach(up -> {

final String key = up.getName().toString();
final String val = up.getValue().toString();

if (key != null && val != null) {
fn.accept(key, val);
}
});

}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
package io.cloudevents.mqtt.hivemq;

import com.hivemq.client.mqtt.datatypes.MqttUtf8String;
import com.hivemq.client.mqtt.mqtt3.message.publish.Mqtt3Publish;
import com.hivemq.client.mqtt.mqtt3.message.publish.Mqtt3PublishBuilder;
import com.hivemq.client.mqtt.mqtt5.datatypes.Mqtt5UserProperty;
import com.hivemq.client.mqtt.mqtt5.message.publish.Mqtt5Publish;
import com.hivemq.client.mqtt.mqtt5.message.publish.Mqtt5PublishBuilder;
import io.cloudevents.core.message.MessageReader;
import io.cloudevents.core.message.MessageWriter;
import io.cloudevents.core.message.impl.GenericStructuredMessageReader;
import io.cloudevents.core.message.impl.MessageUtils;
import io.cloudevents.core.v1.CloudEventV1;
import io.cloudevents.mqtt.core.MqttUtils;

import java.util.List;
import java.util.Optional;

/**
* A factory to obtain:
* - {@link MessageReader} instances to read CloudEvents from MQTT messages.
* - {@link MessageWriter} instances to write CloudEvents into MQTT messages.
*
*/
public class MqttMessageFactory {

// Prevent Instantiation.
private MqttMessageFactory() {
}

/**
* Create a {@link MessageReader} for an MQTT V3 message.
* <p>
* As-Per MQTT Binding specification this only supports
* a structured JSON Format message.
*
* @param message An MQTT V3 message.
* @return MessageReader.
*/
public static MessageReader createReader(Mqtt3Publish message) {
return new GenericStructuredMessageReader(MqttUtils.getDefaultEventFormat(), message.getPayloadAsBytes());
}

/**
* Create a {@link MessageReader} for an MQTT V5 message
*
* @param message An MQTT V5 message.
* @return A message reader.
*/
public static MessageReader createReader(Mqtt5Publish message) {

Optional<MqttUtf8String> cType = message.getContentType();

String contentType = cType.isPresent() ? cType.get().toString() : null;

return MessageUtils.parseStructuredOrBinaryMessage(
() -> contentType,
format -> new GenericStructuredMessageReader(format, message.getPayloadAsBytes()),
() -> getSpecVersion(message),
sv -> new BinaryMessageReader(sv, contentType, message)
);
}


/**
* Create a {@link MessageWriter} for an MQTT V5 Message.
*
* @param builder {@link Mqtt5PublishBuilder.Complete}
* @return A message writer.
*/
public static MessageWriter createWriter(Mqtt5PublishBuilder.Complete builder) {
return new V5MessageWriter(builder);
}

/**
* Create a {@link MessageWriter} for an MQTT V3 Message.
*
* Only supports structured messages.
*
* @param builder {@link Mqtt3PublishBuilder.Complete}
* @return A message writer.
*/
public static MessageWriter createWriter(Mqtt3PublishBuilder.Complete builder) {
return new V3MessageWriter(builder);
}


// -- Private functions

/**
* Find the value of the CloudEvent 'specversion' in the MQTT V5 User Properties.
* @param message An MQTT message.
* @return spec version attribute content.
*/
private static String getSpecVersion(Mqtt5Publish message) {

List<Mqtt5UserProperty> props = (List<Mqtt5UserProperty>) message.getUserProperties().asList();

Optional<Mqtt5UserProperty> up = props.stream().filter(p -> p.getName().toString().equals(CloudEventV1.SPECVERSION)).findFirst();

return (up.isPresent()) ? up.get().getValue().toString() : null;

}

}
Loading

0 comments on commit 0d674c7

Please sign in to comment.