diff --git a/bedrock/bedrock-common/src/main/java/com/nukkitx/protocol/bedrock/BedrockClient.java b/bedrock/bedrock-common/src/main/java/com/nukkitx/protocol/bedrock/BedrockClient.java index 1cf55bea3..bee7b38b0 100644 --- a/bedrock/bedrock-common/src/main/java/com/nukkitx/protocol/bedrock/BedrockClient.java +++ b/bedrock/bedrock-common/src/main/java/com/nukkitx/protocol/bedrock/BedrockClient.java @@ -29,7 +29,7 @@ public BedrockClient(InetSocketAddress bindAddress, EventLoopGroup eventLoopGrou @Override protected void onTick() { if (this.session != null) { - this.eventLoopGroup.execute(session::onTick); + this.session.tick(); } } diff --git a/bedrock/bedrock-common/src/main/java/com/nukkitx/protocol/bedrock/BedrockServer.java b/bedrock/bedrock-common/src/main/java/com/nukkitx/protocol/bedrock/BedrockServer.java index 87d96609e..939cd6491 100644 --- a/bedrock/bedrock-common/src/main/java/com/nukkitx/protocol/bedrock/BedrockServer.java +++ b/bedrock/bedrock-common/src/main/java/com/nukkitx/protocol/bedrock/BedrockServer.java @@ -69,7 +69,7 @@ public boolean isClosed() { @Override protected void onTick() { for (BedrockServerSession session : sessions) { - this.eventLoopGroup.execute(session::onTick); + session.tick(); } } @@ -97,7 +97,7 @@ public byte[] onQuery(InetSocketAddress address) { @Override public void onSessionCreation(RakNetServerSession connection) { BedrockWrapperSerializer serializer = BedrockWrapperSerializers.getSerializer(connection.getProtocolVersion()); - BedrockServerSession session = new BedrockServerSession(connection, serializer); + BedrockServerSession session = new BedrockServerSession(connection, BedrockServer.this.eventLoopGroup.next(), serializer); connection.setListener(new BedrockRakNetSessionListener.Server(session, connection, BedrockServer.this)); } diff --git a/bedrock/bedrock-common/src/main/java/com/nukkitx/protocol/bedrock/BedrockServerSession.java b/bedrock/bedrock-common/src/main/java/com/nukkitx/protocol/bedrock/BedrockServerSession.java index c4ab32dff..3ba4d6c0f 100644 --- a/bedrock/bedrock-common/src/main/java/com/nukkitx/protocol/bedrock/BedrockServerSession.java +++ b/bedrock/bedrock-common/src/main/java/com/nukkitx/protocol/bedrock/BedrockServerSession.java @@ -4,13 +4,14 @@ import com.nukkitx.protocol.MinecraftServerSession; import com.nukkitx.protocol.bedrock.packet.DisconnectPacket; import com.nukkitx.protocol.bedrock.wrapper.BedrockWrapperSerializer; +import io.netty.channel.EventLoop; import javax.annotation.Nullable; public class BedrockServerSession extends BedrockSession implements MinecraftServerSession { - public BedrockServerSession(RakNetSession connection, BedrockWrapperSerializer serializer) { - super(connection, serializer); + public BedrockServerSession(RakNetSession connection, EventLoop eventLoop, BedrockWrapperSerializer serializer) { + super(connection, eventLoop, serializer); } @Override diff --git a/bedrock/bedrock-common/src/main/java/com/nukkitx/protocol/bedrock/BedrockSession.java b/bedrock/bedrock-common/src/main/java/com/nukkitx/protocol/bedrock/BedrockSession.java index e03083f97..1fa050cb8 100644 --- a/bedrock/bedrock-common/src/main/java/com/nukkitx/protocol/bedrock/BedrockSession.java +++ b/bedrock/bedrock-common/src/main/java/com/nukkitx/protocol/bedrock/BedrockSession.java @@ -17,6 +17,8 @@ import com.nukkitx.protocol.bedrock.wrapper.BedrockWrapperSerializer; import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufAllocator; +import io.netty.channel.EventLoop; +import io.netty.util.internal.PlatformDependent; import io.netty.util.internal.logging.InternalLogger; import io.netty.util.internal.logging.InternalLoggerFactory; import it.unimi.dsi.fastutil.objects.ObjectArrayList; @@ -29,7 +31,6 @@ import java.security.GeneralSecurityException; import java.util.*; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.atomic.AtomicLong; import java.util.function.Consumer; import java.util.zip.Deflater; @@ -40,9 +41,10 @@ public abstract class BedrockSession implements MinecraftSession private static final ThreadLocal HASH_LOCAL = ThreadLocal.withInitial(Natives.SHA_256); private final Set> disconnectHandlers = Collections.newSetFromMap(new ConcurrentHashMap<>()); - private final Queue queuedPackets = new ConcurrentLinkedQueue<>(); + private final Queue queuedPackets = PlatformDependent.newMpscQueue(); private final AtomicLong sentEncryptedPacketCount = new AtomicLong(); private final BedrockWrapperSerializer wrapperSerializer; + private final EventLoop eventLoop; final SessionConnection connection; private BedrockPacketCodec packetCodec = BedrockCompat.COMPAT_CODEC; private BedrockPacketHandler packetHandler; @@ -54,8 +56,9 @@ public abstract class BedrockSession implements MinecraftSession private volatile boolean closed = false; private volatile boolean logging = true; - BedrockSession(SessionConnection connection, BedrockWrapperSerializer serializer) { + BedrockSession(SessionConnection connection, EventLoop eventLoop, BedrockWrapperSerializer serializer) { this.connection = connection; + this.eventLoop = eventLoop; this.wrapperSerializer = serializer; } @@ -146,7 +149,11 @@ public synchronized void sendWrapped(ByteBuf compressed, boolean encrypt) { } } - void onTick() { + public void tick() { + this.eventLoop.execute(this::onTick); + } + + private void onTick() { if (this.closed) { return; }