Skip to content

Commit

Permalink
chore: update NetUtils, UdsClient, and UdsServer (#906)
Browse files Browse the repository at this point in the history
  • Loading branch information
caochengxiang authored Nov 20, 2024
2 parents 894fb14 + 689df58 commit b05d500
Show file tree
Hide file tree
Showing 3 changed files with 53 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,23 @@
*/
public class NetUtils {

public static final String OS_NAME = System.getProperty("os.name");

private static boolean isLinuxPlatform = false;

static {
if (OS_NAME != null && OS_NAME.toLowerCase().indexOf("linux") >= 0) {
isLinuxPlatform = true;
}
}

public static EventLoopGroup getEventLoopGroup(boolean remote) {
if (remote) {
return new NioEventLoopGroup();
if(useEpoll()) {
return new EpollEventLoopGroup();
} else {
return new NioEventLoopGroup();
}
}
if (CommonUtils.isWindows()) {
return new NioEventLoopGroup();
Expand Down Expand Up @@ -50,4 +64,12 @@ public static Class getClientChannelClass(boolean mac, boolean remote) {
return mac ? KQueueDomainSocketChannel.class : EpollDomainSocketChannel.class;
}

public static boolean useEpoll() {
return isLinuxPlatform() && Epoll.isAvailable();
}

public static boolean isLinuxPlatform() {
return isLinuxPlatform;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ protected void initChannel(Channel ch) throws Exception {
f.sync();
} catch (Throwable ex) {
UdsClientContext.ins().exceptionCaught(ex);
log.error("start error:{} restart", ex.getMessage());
log.error("start error restart", ex);
if (null != group) {
group.shutdownGracefully();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,18 +29,14 @@
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.channel.*;
import io.netty.channel.unix.DomainSocketAddress;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.codec.LengthFieldPrepender;
import io.netty.util.HashedWheelTimer;
import io.netty.util.Timeout;
import lombok.Setter;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import run.mone.api.IServer;

import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.Optional;
Expand Down Expand Up @@ -103,35 +99,45 @@ public void start(String path) {
this.path = path;
delPath();
boolean mac = CommonUtils.isMac();
EventLoopGroup bossGroup = NetUtils.getEventLoopGroup(this.remote);
EventLoopGroup workerGroup = NetUtils.getEventLoopGroup(this.remote);
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.option(ChannelOption.SO_BACKLOG, 5000)
.option(ChannelOption.SO_RCVBUF, 65535)
.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
.channel(NetUtils.getServerChannelClass(mac, this.remote))
.childHandler(new ChannelInitializer<Channel>() {
@Override
public void initChannel(Channel ch) throws Exception {
ch.pipeline().addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4));
ch.pipeline().addLast(new LengthFieldPrepender(4));
ch.pipeline().addLast(new UdsServerHandler(processorMap));
EventLoopGroup bossGroup = NetUtils.getEventLoopGroup(this.remote);
EventLoopGroup workerGroup = NetUtils.getEventLoopGroup(this.remote);

ServerBootstrap serverBootstrap =
new ServerBootstrap().group(bossGroup, workerGroup)
.channel(NetUtils.getServerChannelClass(mac, this.remote))
.option(ChannelOption.SO_BACKLOG, 5000)
.option(ChannelOption.SO_RCVBUF, 65535)
.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);

ChannelInitializer<Channel> initializer = new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel ch) {
ch.pipeline().addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4));
ch.pipeline().addLast(new LengthFieldPrepender(4));
ch.pipeline().addLast(new UdsServerHandler(processorMap));
}
};

serverBootstrap.childHandler(initializer);
ChannelFuture future =
serverBootstrap.bind("0.0.0.0", this.port).addListener((ChannelFutureListener) future1 -> {
if (future1.isSuccess()) {
log.info("bind:{}", this.remote ? this.host + ":" + this.port : path);
}
});
}).awaitUninterruptibly();

SocketAddress s = this.remote ? new InetSocketAddress(this.host, this.port) : new DomainSocketAddress(path);
ChannelFuture f = b.bind(s);
log.info("bind:{}", this.remote ? this.host + ":" + this.port : path);
Throwable cause = future.cause();
if (cause != null) {
log.error(cause.getMessage(), cause);
}

if (Optional.ofNullable(this.regConsumer).isPresent()) {
RpcServerInfo si = this.getServerInfo();
log.info("reg server :{}", si);
this.regConsumer.accept(si);
}

f.channel().closeFuture().sync();
} catch (Throwable ex) {
log.error(ex.getMessage(), ex);
}
Expand Down

0 comments on commit b05d500

Please sign in to comment.