Skip to content

Commit

Permalink
Fix proxy name
Browse files Browse the repository at this point in the history
  • Loading branch information
Technoboy- committed Oct 22, 2024
1 parent 43b785b commit 41524a8
Show file tree
Hide file tree
Showing 2 changed files with 72 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,20 @@
*/
package io.streamnative.pulsar.handlers.mqtt.proxy;

import static com.google.common.base.Preconditions.checkArgument;
import static io.streamnative.pulsar.handlers.mqtt.common.utils.ConfigurationUtils.LISTENER_DEL;
import static io.streamnative.pulsar.handlers.mqtt.common.utils.ConfigurationUtils.PLAINTEXT_PREFIX;
import static io.streamnative.pulsar.handlers.mqtt.common.utils.ConfigurationUtils.PROTOCOL_PROXY_NAME;
import static io.streamnative.pulsar.handlers.mqtt.common.utils.ConfigurationUtils.getListenerPort;
import com.google.common.collect.ImmutableMap;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.socket.SocketChannel;
import io.netty.util.concurrent.DefaultThreadFactory;
import io.streamnative.pulsar.handlers.mqtt.common.utils.ConfigurationUtils;
import io.streamnative.pulsar.handlers.mqtt.proxy.channel.MQTTProxyChannelInitializer;
import java.net.InetSocketAddress;
import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
Expand Down Expand Up @@ -68,16 +75,19 @@ public void initialize(ServiceConfiguration conf) throws Exception {

@Override
public String getProtocolDataToAdvertise() {
return null;
if (log.isDebugEnabled()) {
log.debug("Get configured listener: {}", proxyConfig.getMqttListeners());
}
return proxyConfig.getMqttListeners();
}

@Override
public void start(BrokerService brokerService) {
this.brokerService = brokerService;
try {
proxyService = new MQTTProxyService(brokerService, proxyConfig);
proxyService.start();
log.info("Start MQTT proxy service at port: {}", proxyConfig.getMqttProxyPort());
proxyService.start0();
log.info("Start MQTT proxy service ");
} catch (Exception ex) {
log.error("Failed to start MQTT proxy service.", ex);
}
Expand All @@ -86,8 +96,26 @@ public void start(BrokerService brokerService) {
@Override
public Map<InetSocketAddress, ChannelInitializer<SocketChannel>> newChannelInitializers() {
try {
checkArgument(proxyConfig != null);
checkArgument(proxyConfig.getMqttListeners() != null);
checkArgument(brokerService != null);

this.sslContextRefresher = Executors.newSingleThreadScheduledExecutor(
new DefaultThreadFactory("mop-ssl-context-refresher"));

String listeners = proxyConfig.getMqttListeners();
String[] parts = listeners.split(LISTENER_DEL);
ImmutableMap.Builder<InetSocketAddress, ChannelInitializer<SocketChannel>> builder =
ImmutableMap.<InetSocketAddress, ChannelInitializer<SocketChannel>>builder();

for (String listener: parts) {
if (listener.startsWith(PLAINTEXT_PREFIX)) {
builder.put(
new InetSocketAddress(brokerService.pulsar().getBindAddress(), getListenerPort(listener)),
new MQTTProxyChannelInitializer(
proxyService, proxyConfig, false, false, sslContextRefresher));
}
}
return builder.build();
} catch (Exception e) {
log.error("MQTTProtocolHandler newChannelInitializers failed with", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,47 @@ public void start() throws MQTTProxyException {
this.eventService.start();
}

public void start0() throws MQTTProxyException {
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(acceptorGroup, workerGroup);
serverBootstrap.channel(EventLoopUtil.getServerSocketChannelClass(workerGroup));
EventLoopUtil.enableTriggeredMode(serverBootstrap);

if (proxyConfig.isMqttProxyTlsEnabled() || proxyConfig.isMqttProxyMTlsAuthenticationEnabled()) {
ServerBootstrap tlsBootstrap = serverBootstrap.clone();
tlsBootstrap.childHandler(new MQTTProxyChannelInitializer(
this, proxyConfig, true, sslContextRefresher));
try {
listenChannelTls = tlsBootstrap.bind(proxyConfig.getMqttProxyTlsPort()).sync().channel();
log.info("Started MQTT Proxy with TLS on {}", listenChannelTls.localAddress());
} catch (InterruptedException e) {
throw new MQTTProxyException(e);
}
}

if (proxyConfig.isMqttProxyTlsPskEnabled()) {
// init psk config
pskConfiguration.setIdentityHint(proxyConfig.getTlsPskIdentityHint());
pskConfiguration.setIdentity(proxyConfig.getTlsPskIdentity());
pskConfiguration.setIdentityFile(proxyConfig.getTlsPskIdentityFile());
pskConfiguration.setProtocols(proxyConfig.getTlsProtocols());
pskConfiguration.setCiphers(proxyConfig.getTlsCiphers());
this.eventService.addListener(pskConfiguration.getEventListener());
// Add channel initializer
ServerBootstrap tlsPskBootstrap = serverBootstrap.clone();
tlsPskBootstrap.childHandler(new MQTTProxyChannelInitializer(
this, proxyConfig, false, true, sslContextRefresher));
try {
listenChannelTlsPsk = tlsPskBootstrap.bind(proxyConfig.getMqttProxyTlsPskPort()).sync().channel();
log.info("Started MQTT Proxy with TLS-PSK on {}", listenChannelTlsPsk.localAddress());
} catch (InterruptedException e) {
throw new MQTTProxyException(e);
}
}
this.lookupHandler = new PulsarServiceLookupHandler(pulsarService, proxyConfig);
this.eventService.start();
}

@Override
public void close() {
if (listenChannel != null) {
Expand Down

0 comments on commit 41524a8

Please sign in to comment.