Skip to content

Commit

Permalink
add proxy listener port method
Browse files Browse the repository at this point in the history
  • Loading branch information
Technoboy- committed Oct 22, 2024
1 parent 3a4d72b commit dec200f
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,9 @@ public final class ConfigurationUtils {
public static final String LISTENER_PATTERN =
"^((mqtt)(\\+ssl)?(\\+psk)?|(ws)(\\+ssl)?)://[-a-zA-Z0-9+&@#/%?=~_|!:,.;]*[-0-9+]";

public static final String PROXY_LISTENER_PATTERN =
"^(mqtt-proxy)://[-a-zA-Z0-9+&@#/%?=~_|!:,.;]*[-0-9+]";

/**
* Creates PulsarConfiguration and loads it with populated attribute values loaded from provided property file.
*
Expand Down Expand Up @@ -160,6 +163,13 @@ public static int getListenerPort(String listener) {
return Integer.parseInt(listener.substring(lastIndex + 1));
}

public static int getProxyListenerPort(String listener) {
checkArgument(listener.matches(PROXY_LISTENER_PATTERN), "proxy listener not match pattern");

int lastIndex = listener.lastIndexOf(':');
return Integer.parseInt(listener.substring(lastIndex + 1));
}

private ConfigurationUtils() {}

/**
Expand Down Expand Up @@ -246,4 +256,5 @@ private static String listToString(Object fieldValue) {
return Joiner.on(LISTENER_DEL).join(list);
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
import static io.streamnative.pulsar.handlers.mqtt.common.utils.ConfigurationUtils.LISTENER_DEL;
import static io.streamnative.pulsar.handlers.mqtt.common.utils.ConfigurationUtils.PROTOCOL_PROXY_NAME;
import static io.streamnative.pulsar.handlers.mqtt.common.utils.ConfigurationUtils.PROXY_PREFIX;
import static io.streamnative.pulsar.handlers.mqtt.common.utils.ConfigurationUtils.getListenerPort;
import static io.streamnative.pulsar.handlers.mqtt.common.utils.ConfigurationUtils.getProxyListenerPort;
import com.google.common.collect.ImmutableMap;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.socket.SocketChannel;
Expand Down Expand Up @@ -111,7 +111,8 @@ public Map<InetSocketAddress, ChannelInitializer<SocketChannel>> newChannelIniti
for (String listener: parts) {
if (listener.startsWith(PROXY_PREFIX)) {
builder.put(
new InetSocketAddress(brokerService.pulsar().getBindAddress(), getListenerPort(listener)),
new InetSocketAddress(brokerService.pulsar().getBindAddress(),
getProxyListenerPort(listener)),
new MQTTProxyChannelInitializer(
proxyService, proxyConfig, false, false, sslContextRefresher));
}
Expand Down

0 comments on commit dec200f

Please sign in to comment.