Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat/kicking inactive #126

22 changes: 16 additions & 6 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,17 @@ members = [
"src/lib/net/crates/codec",
"src/lib/plugins",
"src/lib/storage",
"src/lib/utils", "src/lib/utils/logging", "src/lib/utils/profiling", "src/lib/utils/general_purpose",
"src/lib/utils",
"src/lib/utils/logging",
"src/lib/utils/profiling",
"src/lib/utils/general_purpose",
"src/lib/world",
"src/lib/derive_macros",
"src/lib/adapters/nbt", "src/lib/adapters/mca",
"src/tests", "src/lib/adapters/anvil",
"src/lib/adapters/nbt",
"src/lib/adapters/mca",
"src/tests",
"src/lib/adapters/anvil",
"src/lib/text",
]

#================== Lints ==================#
Expand Down Expand Up @@ -74,6 +80,7 @@ ferrumc-core = { path = "src/lib/core" }
ferrumc-ecs = { path = "src/lib/ecs" }
ferrumc-events = { path = "src/lib/events" }
ferrumc-net = { path = "src/lib/net" }
ferrumc-text = { path = "src/lib/text" }
ferrumc-net-encryption = { path = "src/lib/net/crates/encryption" }
ferrumc-net-codec = { path = "src/lib/net/crates/codec" }
ferrumc-plugins = { path = "src/lib/plugins" }
Expand Down Expand Up @@ -117,25 +124,27 @@ rand = "0.9.0-alpha.2"
fnv = "1.0.7"

# Encoding/Serialization
serde = "1.0.210"
serde = { version = "1.0.210", features = ["derive"] }
serde_json = "1.0.128"
serde_derive = "1.0.210"
base64 = "0.22.1"
bitcode = "0.6.3"
bitcode_derive = "0.6.3"


# Data types
hashbrown = "0.15.0"
tinyvec = "1.8.0"
dashmap = "6.1.0"
uuid = "1.1"
uuid = { version = "1.1", features = ["v4", "v3", "serde"] }

# Macros
lazy_static = "1.5.0"
quote = "1.0.37"
syn = "2.0.77"
proc-macro2 = "1.0.86"
proc-macro-crate = "3.2.0"
paste = "1.0.15"
maplit = "1.0.2"
macro_rules_attribute = "0.2.0"

Expand All @@ -159,6 +168,7 @@ colored = "2.1.0"
# Misc
deepsize = "0.2.0"


# I/O
tempfile = "3.12.0"
memmap2 = "0.9.5"
Expand All @@ -174,4 +184,4 @@ debug = false
debug-assertions = false
overflow-checks = false
panic = "abort"
codegen-units = 1
codegen-units = 1
9 changes: 4 additions & 5 deletions src/bin/src/packet_handlers/login_process.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use ferrumc_net::packets::incoming::login_start::LoginStartEvent;
use ferrumc_net::packets::incoming::server_bound_known_packs::ServerBoundKnownPacksEvent;
use ferrumc_net::packets::outgoing::client_bound_known_packs::ClientBoundKnownPacksPacket;
use ferrumc_net::packets::outgoing::game_event::GameEventPacket;
use ferrumc_net::packets::outgoing::keep_alive::{KeepAlive, KeepAlivePacket};
use ferrumc_net::packets::outgoing::keep_alive::OutgoingKeepAlivePacket;
use ferrumc_net::packets::outgoing::login_play::LoginPlayPacket;
use ferrumc_net::packets::outgoing::login_success::LoginSuccessPacket;
use ferrumc_net::packets::outgoing::registry_data::get_registry_packets;
Expand All @@ -35,7 +35,7 @@ async fn handle_login_start(
// Add the player identity component to the ECS for the entity.
state.universe.add_component::<PlayerIdentity>(
login_start_event.conn_id,
PlayerIdentity::new(username.to_string(), uuid),
PlayerIdentity::new(username.to_string(), uuid, false),
)?;

//Send a Login Success Response to further the login sequence
Expand Down Expand Up @@ -124,12 +124,11 @@ async fn handle_ack_finish_configuration(
Ok(ack_finish_configuration_event)
}
async fn send_keep_alive(conn_id: usize, state: GlobalState, writer: &mut ComponentRefMut<'_, StreamWriter>) -> Result<(), NetError> {
let keep_alive_packet = KeepAlivePacket::default();
let keep_alive_packet = OutgoingKeepAlivePacket::default();
writer.send_packet(&keep_alive_packet, &NetEncodeOpts::WithLength).await?;

let id = keep_alive_packet.id;

state.universe.add_component::<KeepAlive>(conn_id, id)?;
state.universe.add_component::<OutgoingKeepAlivePacket>(conn_id, keep_alive_packet)?;


Ok(())
Expand Down
165 changes: 129 additions & 36 deletions src/bin/src/systems/keep_alive_system.rs
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove traces when random stop issue is fixed

Original file line number Diff line number Diff line change
@@ -1,13 +1,17 @@
use crate::systems::definition::System;
use async_trait::async_trait;
use ferrumc_core::identity::player_identity::PlayerIdentity;
use ferrumc_net::connection::{ConnectionState, StreamWriter};
use ferrumc_net::packets::outgoing::keep_alive::{KeepAlive, KeepAlivePacket};
use ferrumc_net::connection::{ConnectionControl, ConnectionState, StreamWriter};
use ferrumc_net::packets::incoming::keep_alive::IncomingKeepAlivePacket;
use ferrumc_net::packets::outgoing::disconnect::Disconnect;
use ferrumc_net::packets::outgoing::keep_alive::OutgoingKeepAlivePacket;

use ferrumc_net::utils::broadcast::{BroadcastOptions, BroadcastToAll};
use ferrumc_net::GlobalState;
use ferrumc_net_codec::encode::NetEncodeOpts;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use tracing::{error, info, trace, warn};
use tracing::{debug, error, info, trace, warn};

pub struct KeepAliveSystem {
shutdown: AtomicBool,
Expand All @@ -29,10 +33,8 @@ impl System for KeepAliveSystem {
.duration_since(std::time::UNIX_EPOCH)
.expect("Time went backwards")
.as_millis() as i64;
loop {
if self.shutdown.load(Ordering::Relaxed) {
break;
}
while !self.shutdown.load(Ordering::Relaxed) {
trace!("starting to check keep alive");

let online_players = state.universe.query::<&PlayerIdentity>();

Expand All @@ -45,49 +47,140 @@ impl System for KeepAliveSystem {
info!("Online players: {}", online_players.count());
last_time = current_time;
}

let fifteen_seconds_ms = 15000; // 15 seconds in milliseconds

trace!("doing query");
let entities = state
.universe
.query::<(&mut StreamWriter, &ConnectionState, &KeepAlive)>()
.into_entities()
.into_iter()
.query::<(&mut StreamWriter, &ConnectionState)>()
.into_entities();

trace!("done query");

trace!("getting entities to keep alive");

let entities_to_keep_alive = entities
.iter()
.filter_map(|entity| {
let conn_state = state.universe.get::<ConnectionState>(entity).ok()?;
let keep_alive = state.universe.get_mut::<KeepAlive>(entity).ok()?;
let conn_state = state.universe.get::<ConnectionState>(*entity).ok()?;
let keep_alive = state
.universe
.get_mut::<IncomingKeepAlivePacket>(*entity)
.ok()?;

if matches!(*conn_state, ConnectionState::Play)
&& (current_time - keep_alive.id) >= fifteen_seconds_ms
{
Some(entity)
if matches!(*conn_state, ConnectionState::Play) {
let time_diff = current_time - keep_alive.id;
if time_diff >= 30000
// the client did not reciprocate the last keep alive 15 seconds ago, therefore it must be kicked see https://wiki.vg/Protocol#Clientbound_Keep_Alive_.28configuration.29
{
let mut ident =
state.universe.get_mut::<PlayerIdentity>(*entity).ok()?;
ident.failed_keep_alive = true;
None
} else if time_diff >= 15000 {
Some(*entity)
} else {
None
}
} else {
None
}
})
.collect::<Vec<_>>();
if !entities.is_empty() {
trace!("there are {:?} players to keep alive", entities.len());
}
trace!("got entities to keep alive");

// Kick players with failed keep alive

let packet = KeepAlivePacket::default();
trace!("getting entities to kick");

let broadcast_opts = BroadcastOptions::default()
.only(entities)
.with_sync_callback(move |entity, state| {
let Ok(mut keep_alive) = state.universe.get_mut::<KeepAlive>(entity) else {
warn!("Failed to get <KeepAlive> component for entity {}", entity);
return;
let entities_to_kick = entities
.iter()
.filter_map(|entity| {
let Ok(ident) = state.universe.get::<PlayerIdentity>(*entity) else {
warn!(
"Failed to get the <PlayerIdentity> Component for entity with id {:?}",
*entity
);
return None;
};
if ident.failed_keep_alive {
Some(*entity)
} else {
None
}
})
.collect::<Vec<_>>();
trace!("got entities to kick");

*keep_alive = KeepAlive::from(current_time);
});
let kick_packet = Disconnect::from_string("Timeout".to_string());
for entity in entities_to_kick {
debug!("Kicking player with entity id {:?} for a timeout", entity);
let Ok(mut writer) = state.universe.get_mut::<StreamWriter>(entity) else {
warn!(
"Failed to get the <StreamWriter> Component for entity with id {:?}",
entity
);
continue;
};
match writer
.send_packet(&kick_packet, &NetEncodeOpts::WithLength)
.await
{
Ok(_) => {
trace!("kick packet sent for entity {:?} for timeout", entity);
if let Ok(control) = state.universe.get_mut::<ConnectionControl>(entity) {
control.should_disconnect.store(true, Ordering::Relaxed);
debug!("Requested disconnect for entity : {:?}", entity);
} else {
debug!("failed to get <ConnectionControl> for entity {:?}", entity);
}
}
Err(err) => {
warn!(
"Failed to kick entity {:?} for timeout\n Error : {:?}",
entity, err
);
continue;
}
}
}

if let Err(e) = state.broadcast(&packet, broadcast_opts).await {
error!("Error sending keep alive packet: {}", e);
};
// TODO, this should be configurable as some people may have bad network so the clients may end up disconnecting from the server moments before the keep alive is sent
tokio::time::sleep(tokio::time::Duration::from_secs(30)).await;
if !entities_to_keep_alive.is_empty() {
trace!(
"there are {:?} players to keep alive",
entities_to_keep_alive.len()
);

let packet = OutgoingKeepAlivePacket { id: current_time };

let broadcast_opts = BroadcastOptions::default()
.only(entities_to_keep_alive)
.with_sync_callback(move |entity, state| {
let Ok(mut outgoing_keep_alive) =
state.universe.get_mut::<OutgoingKeepAlivePacket>(entity)
else {
warn!(
"Failed to get <OutgoingKeepAlive> component for entity {:?}",
entity
);
return;
};

*outgoing_keep_alive = OutgoingKeepAlivePacket { id: current_time };
trace!(
"set outgoing keep alive for entity {:?} to {}",
entity,
current_time
);
});

if let Err(e) = state.broadcast(&packet, broadcast_opts).await {
error!("Error sending keep alive packet: {}", e);
} else {
trace!("sent keep alive packets");
}
}
trace!("finished checking keep alives, waiting 15 secs...");
tokio::time::sleep(tokio::time::Duration::from_secs(15)).await;
trace!("done sleeping");
}
}

Expand Down
1 change: 1 addition & 0 deletions src/lib/adapters/nbt/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ ferrumc-net-codec = { workspace = true }
tracing = { workspace = true}
tokio = { workspace = true }
ferrumc-general-purpose = { workspace = true }
uuid = { workspace = true }

[lints]
workspace = true
Expand Down
4 changes: 2 additions & 2 deletions src/lib/adapters/nbt/src/de/borrow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -634,7 +634,7 @@ impl NbtTapeElement<'_> {
writer.write_all(&[self.nbt_id()])?;
name.serialize(writer, &NBTSerializeOptions::None);
}
NBTSerializeOptions::Network => {
NBTSerializeOptions::Network | NBTSerializeOptions::Flatten => {
writer.write_all(&[self.nbt_id()])?;
}
}
Expand Down Expand Up @@ -754,4 +754,4 @@ impl NbtTapeElement<'_> {
}
}
}
}
}
26 changes: 25 additions & 1 deletion src/lib/adapters/nbt/src/ser/impl.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::collections::HashMap;
use ferrumc_general_purpose::simd::arrays;
use super::{NBTSerializable, NBTSerializeOptions};
use uuid::Uuid;

macro_rules! impl_ser_primitives {
($($($ty:ty) | * > $id:expr),*) => {
Expand Down Expand Up @@ -41,6 +42,19 @@ impl_ser_primitives!(
f64 > TAG_DOUBLE
);

impl<T> NBTSerializable for Box<T>
where
T: NBTSerializable,
{
fn serialize(&self, buf: &mut Vec<u8>, options: &NBTSerializeOptions<'_>) {
T::serialize(self, buf, options);
}

fn id() -> u8 {
T::id()
}
}

impl NBTSerializable for bool {
fn serialize(&self, buf: &mut Vec<u8>, options: &NBTSerializeOptions<'_>) {
write_header::<Self>(buf, options);
Expand Down Expand Up @@ -75,6 +89,16 @@ impl NBTSerializable for &str {
}
}

impl NBTSerializable for Uuid {
fn serialize(&self, buf: &mut Vec<u8>, options: &NBTSerializeOptions<'_>) {
NBTSerializable::serialize(&self.as_hyphenated().to_string().as_str(), buf, options);
}

fn id() -> u8 {
TAG_STRING
}
}

impl<T: NBTSerializable + std::fmt::Debug> NBTSerializable for Vec<T> {
fn serialize(&self, buf: &mut Vec<u8>, options: &NBTSerializeOptions<'_>) {
self.as_slice().serialize(buf, options);
Expand Down Expand Up @@ -269,7 +293,7 @@ fn write_header<T: NBTSerializable>(buf: &mut Vec<u8>, opts: &NBTSerializeOption
T::id().serialize(buf, &NBTSerializeOptions::None);
tag_name.serialize(buf, &NBTSerializeOptions::None);
}
NBTSerializeOptions::Network => {
NBTSerializeOptions::Network | NBTSerializeOptions::Flatten => {
T::id().serialize(buf, &NBTSerializeOptions::None);
}
}
Expand Down
Loading