Skip to content

Commit

Permalink
Use the same EventLoop to tick a session.
Browse files Browse the repository at this point in the history
  • Loading branch information
SupremeMortal committed Jul 4, 2020
1 parent 9ab29a1 commit 0510384
Show file tree
Hide file tree
Showing 5 changed files with 21 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ public BedrockClient(InetSocketAddress bindAddress, EventLoopGroup eventLoopGrou
@Override
protected void onTick() {
if (this.session != null) {
this.eventLoopGroup.execute(session::onTick);
this.session.tick();
}
}

Expand Down Expand Up @@ -70,7 +70,7 @@ public CompletableFuture<BedrockClientSession> directConnect(InetSocketAddress a

RakNetClientSession connection = this.rakNetClient.create(address);
BedrockWrapperSerializer serializer = BedrockWrapperSerializers.getSerializer(connection.getProtocolVersion());
this.session = new BedrockClientSession(connection, serializer);
this.session = new BedrockClientSession(connection, this.eventLoopGroup.next(), serializer);
BedrockRakNetSessionListener.Client listener = new BedrockRakNetSessionListener.Client(this.session,
connection, this, future);
connection.setListener(listener);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,12 @@

import com.nukkitx.network.raknet.RakNetSession;
import com.nukkitx.protocol.bedrock.wrapper.BedrockWrapperSerializer;
import io.netty.channel.EventLoop;

public class BedrockClientSession extends BedrockSession {

BedrockClientSession(RakNetSession connection, BedrockWrapperSerializer serializer) {
super(connection, serializer);
BedrockClientSession(RakNetSession connection, EventLoop eventLoop, BedrockWrapperSerializer serializer) {
super(connection, eventLoop, serializer);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ public boolean isClosed() {
@Override
protected void onTick() {
for (BedrockServerSession session : sessions) {
this.eventLoopGroup.execute(session::onTick);
session.tick();
}
}

Expand Down Expand Up @@ -97,7 +97,7 @@ public byte[] onQuery(InetSocketAddress address) {
@Override
public void onSessionCreation(RakNetServerSession connection) {
BedrockWrapperSerializer serializer = BedrockWrapperSerializers.getSerializer(connection.getProtocolVersion());
BedrockServerSession session = new BedrockServerSession(connection, serializer);
BedrockServerSession session = new BedrockServerSession(connection, BedrockServer.this.eventLoopGroup.next(), serializer);
connection.setListener(new BedrockRakNetSessionListener.Server(session, connection, BedrockServer.this));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,14 @@
import com.nukkitx.protocol.MinecraftServerSession;
import com.nukkitx.protocol.bedrock.packet.DisconnectPacket;
import com.nukkitx.protocol.bedrock.wrapper.BedrockWrapperSerializer;
import io.netty.channel.EventLoop;

import javax.annotation.Nullable;

public class BedrockServerSession extends BedrockSession implements MinecraftServerSession<BedrockPacket> {

public BedrockServerSession(RakNetSession connection, BedrockWrapperSerializer serializer) {
super(connection, serializer);
public BedrockServerSession(RakNetSession connection, EventLoop eventLoop, BedrockWrapperSerializer serializer) {
super(connection, eventLoop, serializer);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
import com.nukkitx.protocol.bedrock.wrapper.BedrockWrapperSerializer;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.EventLoop;
import io.netty.util.internal.PlatformDependent;
import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory;
import it.unimi.dsi.fastutil.objects.ObjectArrayList;
Expand All @@ -29,7 +31,6 @@
import java.security.GeneralSecurityException;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import java.util.zip.Deflater;
Expand All @@ -40,9 +41,10 @@ public abstract class BedrockSession implements MinecraftSession<BedrockPacket>
private static final ThreadLocal<Sha256> HASH_LOCAL = ThreadLocal.withInitial(Natives.SHA_256);

private final Set<Consumer<DisconnectReason>> disconnectHandlers = Collections.newSetFromMap(new ConcurrentHashMap<>());
private final Queue<BedrockPacket> queuedPackets = new ConcurrentLinkedQueue<>();
private final Queue<BedrockPacket> queuedPackets = PlatformDependent.newMpscQueue();
private final AtomicLong sentEncryptedPacketCount = new AtomicLong();
private final BedrockWrapperSerializer wrapperSerializer;
private final EventLoop eventLoop;
final SessionConnection<ByteBuf> connection;
private BedrockPacketCodec packetCodec = BedrockCompat.COMPAT_CODEC;
private BedrockPacketHandler packetHandler;
Expand All @@ -54,8 +56,9 @@ public abstract class BedrockSession implements MinecraftSession<BedrockPacket>
private volatile boolean closed = false;
private volatile boolean logging = true;

BedrockSession(SessionConnection<ByteBuf> connection, BedrockWrapperSerializer serializer) {
BedrockSession(SessionConnection<ByteBuf> connection, EventLoop eventLoop, BedrockWrapperSerializer serializer) {
this.connection = connection;
this.eventLoop = eventLoop;
this.wrapperSerializer = serializer;
}

Expand Down Expand Up @@ -146,7 +149,11 @@ public synchronized void sendWrapped(ByteBuf compressed, boolean encrypt) {
}
}

void onTick() {
public void tick() {
this.eventLoop.execute(this::onTick);
}

private void onTick() {
if (this.closed) {
return;
}
Expand Down

0 comments on commit 0510384

Please sign in to comment.