From d910998cc3aefa03281e77f1d4199d69f9776b2e Mon Sep 17 00:00:00 2001 From: Alemiz Date: Tue, 5 Dec 2023 13:32:01 +0100 Subject: [PATCH] Do not send messages to user channel if not active --- .../netty/channel/raknet/RakChildChannel.java | 1 + .../common/RakUnhandledMessagesQueue.java | 82 +++++++++++++++++++ 2 files changed, 83 insertions(+) create mode 100644 transport-raknet/src/main/java/org/cloudburstmc/netty/handler/codec/raknet/common/RakUnhandledMessagesQueue.java diff --git a/transport-raknet/src/main/java/org/cloudburstmc/netty/channel/raknet/RakChildChannel.java b/transport-raknet/src/main/java/org/cloudburstmc/netty/channel/raknet/RakChildChannel.java index ee72c7f9..cd06de25 100644 --- a/transport-raknet/src/main/java/org/cloudburstmc/netty/channel/raknet/RakChildChannel.java +++ b/transport-raknet/src/main/java/org/cloudburstmc/netty/channel/raknet/RakChildChannel.java @@ -62,6 +62,7 @@ public RakChildChannel(InetSocketAddress remoteAddress, RakServerChannel parent, this.rakPipeline.addLast(ConnectedPongHandler.NAME, new ConnectedPongHandler(sessionCodec)); this.rakPipeline.addLast(DisconnectNotificationHandler.NAME, DisconnectNotificationHandler.INSTANCE); this.rakPipeline.addLast(RakServerOnlineInitialHandler.NAME, new RakServerOnlineInitialHandler(this)); + this.rakPipeline.addLast(RakUnhandledMessagesQueue.NAME, new RakUnhandledMessagesQueue(this)); this.rakPipeline.fireChannelRegistered(); this.rakPipeline.fireChannelActive(); } diff --git a/transport-raknet/src/main/java/org/cloudburstmc/netty/handler/codec/raknet/common/RakUnhandledMessagesQueue.java b/transport-raknet/src/main/java/org/cloudburstmc/netty/handler/codec/raknet/common/RakUnhandledMessagesQueue.java new file mode 100644 index 00000000..9e0a776c --- /dev/null +++ b/transport-raknet/src/main/java/org/cloudburstmc/netty/handler/codec/raknet/common/RakUnhandledMessagesQueue.java @@ -0,0 +1,82 @@ +/* + * Copyright 2023 CloudburstMC + * + * CloudburstMC licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package org.cloudburstmc.netty.handler.codec.raknet.common; + +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.SimpleChannelInboundHandler; +import io.netty.util.ReferenceCountUtil; +import io.netty.util.concurrent.ScheduledFuture; +import io.netty.util.internal.PlatformDependent; +import org.cloudburstmc.netty.channel.raknet.RakChannel; +import org.cloudburstmc.netty.channel.raknet.packet.EncapsulatedPacket; + +import java.util.Queue; +import java.util.concurrent.TimeUnit; + +public class RakUnhandledMessagesQueue extends SimpleChannelInboundHandler { + public static final String NAME = "rak-unhandled-messages-queue"; + + private final RakChannel channel; + private final Queue messages = PlatformDependent.newMpscQueue(); + private ScheduledFuture future; + + public RakUnhandledMessagesQueue(RakChannel channel) { + this.channel = channel; + } + + @Override + public void handlerAdded(ChannelHandlerContext ctx) throws Exception { + this.future = ctx.channel().eventLoop().scheduleAtFixedRate(() -> this.trySendMessages(ctx), + 0, 50, TimeUnit.MILLISECONDS); + } + + @Override + public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { + if (this.future != null) { + this.future.cancel(false); + } + + EncapsulatedPacket message; + while ((message = this.messages.poll()) != null) { + ReferenceCountUtil.release(message); + } + } + + private void trySendMessages(ChannelHandlerContext ctx) { + if (!this.channel.isActive()) { + return; + } + + EncapsulatedPacket message; + while ((message = this.messages.poll()) != null) { + ctx.fireChannelRead(message); + } + + ctx.pipeline().remove(this); + } + + @Override + protected void channelRead0(ChannelHandlerContext ctx, EncapsulatedPacket msg) throws Exception { + if (!this.channel.isActive()) { + this.messages.offer(msg.retain()); + return; + } + + this.trySendMessages(ctx); + ctx.fireChannelRead(msg.retain()); + } +}