diff --git a/transport-raknet/src/main/java/org/cloudburstmc/netty/channel/raknet/RakConstants.java b/transport-raknet/src/main/java/org/cloudburstmc/netty/channel/raknet/RakConstants.java index 4af241bc..e4e99485 100644 --- a/transport-raknet/src/main/java/org/cloudburstmc/netty/channel/raknet/RakConstants.java +++ b/transport-raknet/src/main/java/org/cloudburstmc/netty/channel/raknet/RakConstants.java @@ -50,7 +50,18 @@ public class RakConstants { * Time after {@link RakSessionCodec} is refreshed due to no activity. */ public static final int SESSION_STALE_MS = 5000; - + /** + * A number of datagram packets each address can send within one RakNet tick (10ms) + */ + public static final int DEFAULT_PACKET_LIMIT = 120; + /** + * A number of "unconnected" datagram packets each address can send within one second. + */ + public static final int DEFAULT_OFFLINE_PACKET_LIMIT = 10; + /** + * A number of all datagrams that will be handled within one RakNet tick before server starts dropping any incoming data. + */ + public static final int DEFAULT_GLOBAL_PACKET_LIMIT = 100000; /* * Flags */ diff --git a/transport-raknet/src/main/java/org/cloudburstmc/netty/channel/raknet/RakServerChannel.java b/transport-raknet/src/main/java/org/cloudburstmc/netty/channel/raknet/RakServerChannel.java index 11f26aae..acd95955 100644 --- a/transport-raknet/src/main/java/org/cloudburstmc/netty/channel/raknet/RakServerChannel.java +++ b/transport-raknet/src/main/java/org/cloudburstmc/netty/channel/raknet/RakServerChannel.java @@ -27,13 +27,16 @@ import org.cloudburstmc.netty.channel.raknet.config.RakServerChannelConfig; import org.cloudburstmc.netty.handler.codec.raknet.common.UnconnectedPongEncoder; import org.cloudburstmc.netty.handler.codec.raknet.server.RakServerOfflineHandler; +import org.cloudburstmc.netty.handler.codec.raknet.server.RakServerRateLimiter; import org.cloudburstmc.netty.handler.codec.raknet.server.RakServerRouteHandler; import org.cloudburstmc.netty.handler.codec.raknet.server.RakServerTailHandler; +import java.net.InetAddress; import java.net.InetSocketAddress; import java.net.SocketAddress; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeUnit; public class RakServerChannel extends ProxyChannel implements ServerChannel { @@ -44,8 +47,9 @@ public RakServerChannel(DatagramChannel channel) { super(channel); this.config = new DefaultRakServerConfig(this); // Default common handler of offline phase. Handles only raknet packets, forwards rest. - this.pipeline.addLast(UnconnectedPongEncoder.NAME, UnconnectedPongEncoder.INSTANCE); - this.pipeline.addLast(RakServerOfflineHandler.NAME, new RakServerOfflineHandler()); + this.pipeline().addLast(UnconnectedPongEncoder.NAME, UnconnectedPongEncoder.INSTANCE); + this.pipeline().addLast(RakServerRateLimiter.NAME, new RakServerRateLimiter(this)); + this.pipeline().addLast(RakServerOfflineHandler.NAME, new RakServerOfflineHandler(this)); this.pipeline().addLast(RakServerRouteHandler.NAME, new RakServerRouteHandler(this)); this.pipeline().addLast(RakServerTailHandler.NAME, RakServerTailHandler.INSTANCE); } @@ -92,6 +96,15 @@ public void onCloseTriggered(ChannelPromise promise) { combiner.finish(combinedPromise); } + public boolean tryBlockAddress(InetAddress address, long time, TimeUnit unit) { + RakServerRateLimiter rateLimiter = this.pipeline().get(RakServerRateLimiter.class); + if (rateLimiter != null) { + rateLimiter.blockAddress(address, time, unit); + return true; + } + return false; + } + @Override public RakServerChannelConfig config() { return this.config; diff --git a/transport-raknet/src/main/java/org/cloudburstmc/netty/channel/raknet/config/DefaultRakServerConfig.java b/transport-raknet/src/main/java/org/cloudburstmc/netty/channel/raknet/config/DefaultRakServerConfig.java index 1ec11336..11c2ed9f 100644 --- a/transport-raknet/src/main/java/org/cloudburstmc/netty/channel/raknet/config/DefaultRakServerConfig.java +++ b/transport-raknet/src/main/java/org/cloudburstmc/netty/channel/raknet/config/DefaultRakServerConfig.java @@ -43,6 +43,9 @@ public class DefaultRakServerConfig extends DefaultChannelConfig implements RakS private volatile boolean handlePing; private volatile int maxMtu = RakConstants.MAXIMUM_MTU_SIZE; private volatile int minMtu = RakConstants.MINIMUM_MTU_SIZE; + private volatile int packetLimit = RakConstants.DEFAULT_PACKET_LIMIT; + private volatile int globalPacketLimit = RakConstants.DEFAULT_GLOBAL_PACKET_LIMIT; + private volatile int unconnectedPacketLimit = RakConstants.DEFAULT_OFFLINE_PACKET_LIMIT; public DefaultRakServerConfig(RakServerChannel channel) { super(channel); @@ -86,6 +89,15 @@ public T getOption(ChannelOption option) { if (option == RakChannelOption.RAK_HANDLE_PING) { return (T) Boolean.valueOf(this.getHandlePing()); } + if (option == RakChannelOption.RAK_PACKET_LIMIT) { + return (T) Integer.valueOf(this.getPacketLimit()); + } + if (option == RakChannelOption.RAK_GLOBAL_PACKET_LIMIT) { + return (T) Integer.valueOf(this.getGlobalPacketLimit()); + } + if (option == RakChannelOption.RAK_OFFLINE_PACKET_LIMIT) { + return (T) Integer.valueOf(this.getUnconnectedPacketLimit()); + } return this.channel.parent().config().getOption(option); } @@ -111,6 +123,12 @@ public boolean setOption(ChannelOption option, T value) { this.setMaxMtu((Integer) value); } else if (option == RakChannelOption.RAK_MIN_MTU) { this.setMinMtu((Integer) value); + } else if (option == RakChannelOption.RAK_PACKET_LIMIT) { + this.setPacketLimit((Integer) value); + } else if (option == RakChannelOption.RAK_OFFLINE_PACKET_LIMIT) { + this.setUnconnectedPacketLimit((Integer) value); + } else if (option == RakChannelOption.RAK_GLOBAL_PACKET_LIMIT) { + this.setGlobalPacketLimit((Integer) value); } else { return this.channel.parent().config().setOption(option, value); } @@ -226,4 +244,34 @@ public RakServerChannelConfig setMinMtu(int minMtu) { public int getMinMtu() { return this.minMtu; } + + @Override + public void setPacketLimit(int limit) { + this.packetLimit = limit; + } + + @Override + public int getPacketLimit() { + return this.packetLimit; + } + + @Override + public int getUnconnectedPacketLimit() { + return unconnectedPacketLimit; + } + + @Override + public void setUnconnectedPacketLimit(int unconnectedPacketLimit) { + this.unconnectedPacketLimit = unconnectedPacketLimit; + } + + @Override + public int getGlobalPacketLimit() { + return globalPacketLimit; + } + + @Override + public void setGlobalPacketLimit(int globalPacketLimit) { + this.globalPacketLimit = globalPacketLimit; + } } diff --git a/transport-raknet/src/main/java/org/cloudburstmc/netty/channel/raknet/config/RakChannelOption.java b/transport-raknet/src/main/java/org/cloudburstmc/netty/channel/raknet/config/RakChannelOption.java index 54c0c18f..059cf37c 100644 --- a/transport-raknet/src/main/java/org/cloudburstmc/netty/channel/raknet/config/RakChannelOption.java +++ b/transport-raknet/src/main/java/org/cloudburstmc/netty/channel/raknet/config/RakChannelOption.java @@ -133,6 +133,28 @@ public class RakChannelOption extends ChannelOption { public static final ChannelOption RAK_FLUSH_INTERVAL = valueOf(RakChannelOption.class, "RAK_FLUSH_INTERVAL"); + /** + * A number of datagram packets each address can send within one RakNet tick (10ms). + * Default is 120 packets. + */ + public static final ChannelOption RAK_PACKET_LIMIT = + valueOf(RakChannelOption.class, "RAK_PACKET_LIMIT"); + + /** + * A number of "unconnected" datagram packets each address can send within one second. + * This includes packets such as UNCONNECTED_PING and OPEN_CONNECTION_REQUEST packets. + * Default is 10 packets. + */ + public static final ChannelOption RAK_OFFLINE_PACKET_LIMIT = + valueOf(RakChannelOption.class, "RAK_OFFLINE_PACKET_LIMIT"); + + /** + * A number of all datagrams that will be handled within one RakNet tick before server starts dropping any incoming data. + * Default is 100_000 (RAK_PACKET_LIMIT * 0.56 * 1500 different connections). + */ + public static final ChannelOption RAK_GLOBAL_PACKET_LIMIT = + valueOf(RakChannelOption.class, "RAK_GLOBAL_PACKET_LIMIT"); + @SuppressWarnings("deprecation") protected RakChannelOption() { super(null); diff --git a/transport-raknet/src/main/java/org/cloudburstmc/netty/channel/raknet/config/RakServerChannelConfig.java b/transport-raknet/src/main/java/org/cloudburstmc/netty/channel/raknet/config/RakServerChannelConfig.java index c6e0ef12..b2aa7958 100644 --- a/transport-raknet/src/main/java/org/cloudburstmc/netty/channel/raknet/config/RakServerChannelConfig.java +++ b/transport-raknet/src/main/java/org/cloudburstmc/netty/channel/raknet/config/RakServerChannelConfig.java @@ -56,4 +56,16 @@ public interface RakServerChannelConfig extends ChannelConfig { int getMinMtu(); RakServerChannelConfig setMinMtu(int mtu); + + int getPacketLimit(); + + void setPacketLimit(int limit); + + int getGlobalPacketLimit(); + + void setGlobalPacketLimit(int limit); + + int getUnconnectedPacketLimit(); + + void setUnconnectedPacketLimit(int limit); } diff --git a/transport-raknet/src/main/java/org/cloudburstmc/netty/handler/codec/raknet/common/RakSessionCodec.java b/transport-raknet/src/main/java/org/cloudburstmc/netty/handler/codec/raknet/common/RakSessionCodec.java index 25717ac3..79f3ada7 100644 --- a/transport-raknet/src/main/java/org/cloudburstmc/netty/handler/codec/raknet/common/RakSessionCodec.java +++ b/transport-raknet/src/main/java/org/cloudburstmc/netty/handler/codec/raknet/common/RakSessionCodec.java @@ -397,11 +397,15 @@ private void tryTick() { } private void onTick() { + long curTime = System.currentTimeMillis(); + if (this.state == RakState.UNCONNECTED) { + if (this.isTimedOut(curTime)) { + this.close(RakDisconnectReason.TIMED_OUT); + } return; } - long curTime = System.currentTimeMillis(); if (this.isTimedOut(curTime)) { this.disconnect(RakDisconnectReason.TIMED_OUT); return; @@ -760,6 +764,15 @@ private ChannelPromise disconnect0(RakDisconnectReason reason) { } public void close(RakDisconnectReason reason) { + if (this.state == RakState.DISCONNECTING) { + return; + } + this.state = RakState.DISCONNECTING; + + if (log.isDebugEnabled()) { + log.debug("Closing RakNet Session ({} => {}) due to {}", this.channel.localAddress(), this.getRemoteAddress(), reason); + } + this.channel.pipeline().fireUserEventTriggered(reason).close(); } diff --git a/transport-raknet/src/main/java/org/cloudburstmc/netty/handler/codec/raknet/server/RakServerOfflineHandler.java b/transport-raknet/src/main/java/org/cloudburstmc/netty/handler/codec/raknet/server/RakServerOfflineHandler.java index c4d1db3b..406af851 100644 --- a/transport-raknet/src/main/java/org/cloudburstmc/netty/handler/codec/raknet/server/RakServerOfflineHandler.java +++ b/transport-raknet/src/main/java/org/cloudburstmc/netty/handler/codec/raknet/server/RakServerOfflineHandler.java @@ -44,7 +44,6 @@ public class RakServerOfflineHandler extends AdvancedChannelInboundHandler { public static final String NAME = "rak-offline-handler"; - private static final int MAX_PACKETS_PER_SECOND = 10; private static final InternalLogger log = InternalLoggerFactory.getInstance(RakServerOfflineHandler.class); @@ -59,6 +58,12 @@ public class RakServerOfflineHandler extends AdvancedChannelInboundHandler new AtomicInteger()); - if (counter.incrementAndGet() > MAX_PACKETS_PER_SECOND) { + if (counter.incrementAndGet() > this.channel.config().getUnconnectedPacketLimit()) { log.warn("[{}] Sent too many packets per second", packet.sender()); + this.channel.tryBlockAddress(packet.sender().getAddress(), 10, TimeUnit.SECONDS); return; } diff --git a/transport-raknet/src/main/java/org/cloudburstmc/netty/handler/codec/raknet/server/RakServerOnlineInitialHandler.java b/transport-raknet/src/main/java/org/cloudburstmc/netty/handler/codec/raknet/server/RakServerOnlineInitialHandler.java index bce6b00f..89970361 100644 --- a/transport-raknet/src/main/java/org/cloudburstmc/netty/handler/codec/raknet/server/RakServerOnlineInitialHandler.java +++ b/transport-raknet/src/main/java/org/cloudburstmc/netty/handler/codec/raknet/server/RakServerOnlineInitialHandler.java @@ -20,11 +20,12 @@ import io.netty.channel.ChannelHandler.Sharable; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; +import io.netty.util.internal.logging.InternalLogger; +import io.netty.util.internal.logging.InternalLoggerFactory; import org.cloudburstmc.netty.channel.raknet.RakChildChannel; import org.cloudburstmc.netty.channel.raknet.RakDisconnectReason; import org.cloudburstmc.netty.channel.raknet.RakPriority; import org.cloudburstmc.netty.channel.raknet.RakReliability; -import org.cloudburstmc.netty.channel.raknet.config.RakChannelConfig; import org.cloudburstmc.netty.channel.raknet.config.RakServerChannelConfig; import org.cloudburstmc.netty.channel.raknet.packet.EncapsulatedPacket; import org.cloudburstmc.netty.channel.raknet.packet.RakMessage; @@ -33,15 +34,17 @@ import java.net.Inet6Address; import java.net.InetSocketAddress; +import java.util.concurrent.atomic.AtomicInteger; import static org.cloudburstmc.netty.channel.raknet.RakConstants.*; @Sharable public class RakServerOnlineInitialHandler extends SimpleChannelInboundHandler { - public static final String NAME = "rak-server-online-initial-handler"; + private static final InternalLogger log = InternalLoggerFactory.getInstance(RakServerOnlineInitialHandler.class); private final RakChildChannel channel; + private final AtomicInteger retriesCounter = new AtomicInteger(5); public RakServerOnlineInitialHandler(RakChildChannel channel) { this.channel = channel; @@ -73,12 +76,15 @@ protected void channelRead0(ChannelHandlerContext ctx, EncapsulatedPacket messag private void onConnectionRequest(ChannelHandlerContext ctx, ByteBuf buffer) { buffer.skipBytes(1); - long guid = ((RakChannelConfig) this.channel.config()).getGuid(); + long guid = this.channel.config().getGuid(); long serverGuid = buffer.readLong(); long timestamp = buffer.readLong(); boolean security = buffer.readBoolean(); - if (serverGuid != guid || security) { + if (this.retriesCounter.decrementAndGet() < 0) { + this.sendConnectionRequestFailed(ctx, guid); + log.warn("[{}] Connection request failed due to too many retries", this.channel.remoteAddress()); + } else if (serverGuid != guid || security) { this.sendConnectionRequestFailed(ctx, guid); } else { this.sendConnectionRequestAccepted(ctx, timestamp); @@ -86,7 +92,7 @@ private void onConnectionRequest(ChannelHandlerContext ctx, ByteBuf buffer) { } private void sendConnectionRequestAccepted(ChannelHandlerContext ctx, long time) { - InetSocketAddress address = ((InetSocketAddress) this.channel.remoteAddress()); + InetSocketAddress address = this.channel.remoteAddress(); boolean ipv6 = address.getAddress() instanceof Inet6Address; ByteBuf outBuf = ctx.alloc().ioBuffer(ipv6 ? 628 : 166); @@ -99,7 +105,7 @@ private void sendConnectionRequestAccepted(ChannelHandlerContext ctx, long time) outBuf.writeLong(time); outBuf.writeLong(System.currentTimeMillis()); - ctx.writeAndFlush(new RakMessage(outBuf, RakReliability.RELIABLE, RakPriority.IMMEDIATE)); + ctx.writeAndFlush(new RakMessage(outBuf, RakReliability.UNRELIABLE, RakPriority.IMMEDIATE)); } private void sendConnectionRequestFailed(ChannelHandlerContext ctx, long guid) { diff --git a/transport-raknet/src/main/java/org/cloudburstmc/netty/handler/codec/raknet/server/RakServerRateLimiter.java b/transport-raknet/src/main/java/org/cloudburstmc/netty/handler/codec/raknet/server/RakServerRateLimiter.java new file mode 100644 index 00000000..7b90a17e --- /dev/null +++ b/transport-raknet/src/main/java/org/cloudburstmc/netty/handler/codec/raknet/server/RakServerRateLimiter.java @@ -0,0 +1,121 @@ +/* + * Copyright 2024 CloudburstMC + * + * CloudburstMC licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package org.cloudburstmc.netty.handler.codec.raknet.server; + +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.SimpleChannelInboundHandler; +import io.netty.channel.socket.DatagramPacket; +import io.netty.util.concurrent.ScheduledFuture; +import io.netty.util.internal.logging.InternalLogger; +import io.netty.util.internal.logging.InternalLoggerFactory; +import org.cloudburstmc.netty.channel.raknet.RakServerChannel; + +import java.net.InetAddress; +import java.util.Iterator; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; + +public class RakServerRateLimiter extends SimpleChannelInboundHandler { + public static final String NAME = "rak-server-rate-limiter"; + private static final InternalLogger log = InternalLoggerFactory.getInstance(RakServerRateLimiter.class); + + private final RakServerChannel channel; + + private final ConcurrentHashMap rateLimitMap = new ConcurrentHashMap<>(); + private final Map blockedConnections = new ConcurrentHashMap<>(); + + private final AtomicLong globalCounter = new AtomicLong(0); + + private ScheduledFuture tickFuture; + private ScheduledFuture blockedTickFuture; + + public RakServerRateLimiter(RakServerChannel channel) { + this.channel = channel; + } + + @Override + public void handlerAdded(ChannelHandlerContext ctx) throws Exception { + this.tickFuture = ctx.channel().eventLoop().scheduleAtFixedRate(this::onRakTick, 10, 10, TimeUnit.MILLISECONDS); + this.blockedTickFuture = ctx.channel().eventLoop().scheduleAtFixedRate(this::onBlockedTick, 100, 100, TimeUnit.MILLISECONDS); + } + + @Override + public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { + this.tickFuture.cancel(false); + this.blockedTickFuture.cancel(true); + this.rateLimitMap.clear(); + } + + private void onRakTick() { + this.rateLimitMap.clear(); + this.globalCounter.set(0); + } + + private void onBlockedTick() { + long currTime = System.currentTimeMillis(); + + Iterator> iterator = this.blockedConnections.entrySet().iterator(); + while (iterator.hasNext()) { + Map.Entry entry = iterator.next(); + if (entry.getValue() != 0 && currTime > entry.getValue()) { + iterator.remove(); + log.info("Unblocked address {}", entry.getKey()); + } + } + } + + public void blockAddress(InetAddress address, long time, TimeUnit unit) { + long millis = unit.toMillis(time); + this.blockedConnections.put(address, System.currentTimeMillis() + millis); + } + + public void unblockAddress(InetAddress address) { + if (this.blockedConnections.remove(address) != null) { + log.info("Unblocked address {}", address); + } + } + + public boolean isAddressBlocked(InetAddress address) { + return this.blockedConnections.containsKey(address); + } + + @Override + protected void channelRead0(ChannelHandlerContext ctx, DatagramPacket datagram) throws Exception { + if (this.globalCounter.incrementAndGet() > this.channel.config().getGlobalPacketLimit()) { + if (log.isTraceEnabled()) { + log.trace("[{}] Dropped incoming packet because global packet limit was reached: {}", datagram.sender(), this.globalCounter.get()); + } + return; + } + + InetAddress address = datagram.sender().getAddress(); + if (this.blockedConnections.containsKey(address)) { + return; + } + + AtomicInteger counter = this.rateLimitMap.computeIfAbsent(address, a -> new AtomicInteger()); + if (counter.incrementAndGet() > this.channel.config().getPacketLimit()) { + this.blockAddress(address, 10, TimeUnit.SECONDS); + log.warn("[{}] Blocked because packet limit was reached"); + } else { + ctx.fireChannelRead(datagram.retain()); + } + } +}