From 1c767daa395deabef6d193e939b922b2978d8fcf Mon Sep 17 00:00:00 2001 From: lukas0008 Date: Sat, 17 Aug 2024 18:46:19 +0200 Subject: [PATCH] Sum ting wong --- pumpkin-protocol/src/bytebuf/mod.rs | 5 +- .../src/client/config/c_plugin_message.rs | 2 - .../src/client/login/c_login_success.rs | 2 +- .../src/client/play/c_chunk_data.rs | 6 +- .../src/client/play/c_player_chat_message.rs | 2 +- pumpkin-protocol/src/lib.rs | 7 +- pumpkin-protocol/src/packet_decoder.rs | 2 +- pumpkin-protocol/src/packet_encoder.rs | 2 +- .../src/server/play/c_interact.rs | 2 - .../src/server/play/s_player_action.rs | 1 + .../src/server/status/s_ping_request.rs | 2 +- pumpkin-world/src/chunk.rs | 4 +- pumpkin-world/src/radial_chunk_iterator.rs | 2 - pumpkin-world/src/world.rs | 17 +- pumpkin/src/client/client_packet.rs | 73 ++++--- pumpkin/src/client/connection.rs | 119 +++++++++++ pumpkin/src/client/mod.rs | 179 +++++++--------- pumpkin/src/client/player_packet.rs | 50 ++--- pumpkin/src/client/try_send_packet.rs | 29 +++ pumpkin/src/commands/gamemode.rs | 52 ++--- pumpkin/src/commands/mod.rs | 18 +- pumpkin/src/commands/pumpkin.rs | 4 +- pumpkin/src/commands/stop.rs | 2 +- pumpkin/src/config/auth_config.rs | 8 +- pumpkin/src/config/mod.rs | 26 ++- pumpkin/src/config/resource_pack.rs | 2 +- pumpkin/src/main.rs | 197 +++++------------- pumpkin/src/rcon/mod.rs | 2 +- pumpkin/src/server.rs | 168 ++++++++------- 29 files changed, 533 insertions(+), 452 deletions(-) create mode 100644 pumpkin/src/client/connection.rs create mode 100644 pumpkin/src/client/try_send_packet.rs diff --git a/pumpkin-protocol/src/bytebuf/mod.rs b/pumpkin-protocol/src/bytebuf/mod.rs index caca3480a..076581959 100644 --- a/pumpkin-protocol/src/bytebuf/mod.rs +++ b/pumpkin-protocol/src/bytebuf/mod.rs @@ -168,10 +168,7 @@ impl ByteBuffer { /// some, then it also calls the `write` closure. pub fn put_option(&mut self, val: &Option, write: impl FnOnce(&mut Self, &T)) { self.put_bool(val.is_some()); - match val { - Some(v) => write(self, v), - None => {} - } + if let Some(v) = val { write(self, v) } } pub fn get_list(&mut self, val: impl Fn(&mut Self) -> T) -> Vec { diff --git a/pumpkin-protocol/src/client/config/c_plugin_message.rs b/pumpkin-protocol/src/client/config/c_plugin_message.rs index bdb7692b4..c320ca291 100644 --- a/pumpkin-protocol/src/client/config/c_plugin_message.rs +++ b/pumpkin-protocol/src/client/config/c_plugin_message.rs @@ -1,8 +1,6 @@ use pumpkin_macros::packet; use serde::Serialize; -use crate::{bytebuf::ByteBuffer, ClientPacket}; - #[derive(Serialize)] #[packet(0x01)] pub struct CPluginMessage<'a> { diff --git a/pumpkin-protocol/src/client/login/c_login_success.rs b/pumpkin-protocol/src/client/login/c_login_success.rs index 2493f1ced..4177d1c5a 100644 --- a/pumpkin-protocol/src/client/login/c_login_success.rs +++ b/pumpkin-protocol/src/client/login/c_login_success.rs @@ -29,7 +29,7 @@ impl<'a> CLoginSuccess<'a> { impl<'a> ClientPacket for CLoginSuccess<'a> { fn write(&self, bytebuf: &mut ByteBuffer) { bytebuf.put_uuid(self.uuid); - bytebuf.put_string(&self.username); + bytebuf.put_string(self.username); bytebuf.put_list::(self.properties, |p, v| { p.put_string(&v.name); p.put_string(&v.value); diff --git a/pumpkin-protocol/src/client/play/c_chunk_data.rs b/pumpkin-protocol/src/client/play/c_chunk_data.rs index eec44e47c..484be4fd6 100644 --- a/pumpkin-protocol/src/client/play/c_chunk_data.rs +++ b/pumpkin-protocol/src/client/play/c_chunk_data.rs @@ -32,7 +32,7 @@ impl<'a> ClientPacket for CChunkData<'a> { data_buf.put_i16(block_count); //// Block states - let palette = chunk.into_iter().dedup().collect_vec(); + let palette = chunk.iter().dedup().collect_vec(); // TODO: make dynamic block_size work // TODO: make direct block_size work enum PaletteType { @@ -63,7 +63,7 @@ impl<'a> ClientPacket for CChunkData<'a> { palette.iter().enumerate().for_each(|(i, id)| { palette_map.insert(*id, i); // Palette - data_buf.put_var_int(&VarInt(**id as i32)); + data_buf.put_var_int(&VarInt(**id)); }); for block_clump in chunk.chunks(64 / block_size as usize) { let mut out_long: i64 = 0; @@ -109,7 +109,7 @@ impl<'a> ClientPacket for CChunkData<'a> { // Size buf.put_var_int(&VarInt(data_buf.buf().len() as i32)); // Data - buf.put_slice(&data_buf.buf()); + buf.put_slice(data_buf.buf()); // TODO: block entities buf.put_var_int(&VarInt(0)); diff --git a/pumpkin-protocol/src/client/play/c_player_chat_message.rs b/pumpkin-protocol/src/client/play/c_player_chat_message.rs index 47726a1f3..780dc383d 100644 --- a/pumpkin-protocol/src/client/play/c_player_chat_message.rs +++ b/pumpkin-protocol/src/client/play/c_player_chat_message.rs @@ -3,7 +3,7 @@ use pumpkin_macros::packet; use pumpkin_text::TextComponent; use serde::Serialize; -use crate::{uuid::UUID, BitSet, VarInt}; +use crate::{uuid::UUID, VarInt}; #[derive(Serialize)] #[packet(0x39)] diff --git a/pumpkin-protocol/src/lib.rs b/pumpkin-protocol/src/lib.rs index 412aa8eb2..a1d7def68 100644 --- a/pumpkin-protocol/src/lib.rs +++ b/pumpkin-protocol/src/lib.rs @@ -1,6 +1,6 @@ use bytebuf::{packet_id::Packet, ByteBuffer, DeserializerError}; use bytes::Buf; -use serde::{Deserialize, Serialize, Serializer}; +use serde::{Deserialize, Serialize}; use std::io::{self, Write}; use thiserror::Error; @@ -8,9 +8,9 @@ pub mod bytebuf; pub mod client; pub mod packet_decoder; pub mod packet_encoder; +pub mod position; pub mod server; pub mod uuid; -pub mod position; pub const CURRENT_MC_PROTOCOL: u32 = 767; @@ -152,6 +152,8 @@ pub enum PacketError { OutOfBounds, #[error("malformed packet length VarInt")] MailformedLength, + #[error("client disconnected")] + ClientDisconnected, } #[derive(Debug, PartialEq)] @@ -179,6 +181,7 @@ impl From for ConnectionState { } } +#[derive(Debug)] pub struct RawPacket { pub id: VarInt, pub bytebuf: ByteBuffer, diff --git a/pumpkin-protocol/src/packet_decoder.rs b/pumpkin-protocol/src/packet_decoder.rs index 3d3e3449a..01bd0d524 100644 --- a/pumpkin-protocol/src/packet_decoder.rs +++ b/pumpkin-protocol/src/packet_decoder.rs @@ -13,7 +13,7 @@ use crate::{ type Cipher = cfb8::Decryptor; // Decoder: Client -> Server -#[derive(Default)] +#[derive(Default, Debug)] pub struct PacketDecoder { buf: BytesMut, decompress_buf: BytesMut, diff --git a/pumpkin-protocol/src/packet_encoder.rs b/pumpkin-protocol/src/packet_encoder.rs index 1c545804b..279bbf34a 100644 --- a/pumpkin-protocol/src/packet_encoder.rs +++ b/pumpkin-protocol/src/packet_encoder.rs @@ -13,7 +13,7 @@ use crate::{bytebuf::ByteBuffer, ClientPacket, PacketError, VarInt, MAX_PACKET_S type Cipher = cfb8::Encryptor; // Encoder: Server -> Client -#[derive(Default)] +#[derive(Default, Debug)] pub struct PacketEncoder { buf: BytesMut, compress_buf: Vec, diff --git a/pumpkin-protocol/src/server/play/c_interact.rs b/pumpkin-protocol/src/server/play/c_interact.rs index ec54a626f..a31c5af2b 100644 --- a/pumpkin-protocol/src/server/play/c_interact.rs +++ b/pumpkin-protocol/src/server/play/c_interact.rs @@ -1,6 +1,4 @@ use pumpkin_macros::packet; -use serde::Deserialize; - use crate::{ServerPacket, VarInt}; #[packet(0x16)] diff --git a/pumpkin-protocol/src/server/play/s_player_action.rs b/pumpkin-protocol/src/server/play/s_player_action.rs index 6b4b40616..103582d43 100644 --- a/pumpkin-protocol/src/server/play/s_player_action.rs +++ b/pumpkin-protocol/src/server/play/s_player_action.rs @@ -4,6 +4,7 @@ use crate::{position::WorldPosition, VarInt}; #[derive(serde::Deserialize)] #[packet(0x24)] +#[allow(dead_code)] // TODO remove pub struct SPlayerAction { status: VarInt, location: WorldPosition, diff --git a/pumpkin-protocol/src/server/status/s_ping_request.rs b/pumpkin-protocol/src/server/status/s_ping_request.rs index c73f8536a..745c66c98 100644 --- a/pumpkin-protocol/src/server/status/s_ping_request.rs +++ b/pumpkin-protocol/src/server/status/s_ping_request.rs @@ -1,6 +1,6 @@ use pumpkin_macros::packet; -#[derive(serde::Deserialize)] +#[derive(serde::Deserialize, Debug)] #[packet(0x01)] pub struct SPingRequest { pub payload: i64, diff --git a/pumpkin-world/src/chunk.rs b/pumpkin-world/src/chunk.rs index f41034d58..8ecfc7e23 100644 --- a/pumpkin-world/src/chunk.rs +++ b/pumpkin-world/src/chunk.rs @@ -60,14 +60,14 @@ pub struct ChunkHeightmaps { #[derive(serde::Deserialize, Debug)] struct ChunkSection { #[serde(rename = "Y")] - y: i32, + _y: i32, block_states: Option, } #[derive(serde::Deserialize, Debug)] struct ChunkNbt { #[serde(rename = "DataVersion")] - data_version: usize, + _data_version: usize, sections: Vec, #[serde(rename = "Heightmaps")] heightmaps: ChunkHeightmaps, diff --git a/pumpkin-world/src/radial_chunk_iterator.rs b/pumpkin-world/src/radial_chunk_iterator.rs index 650a55253..234773dc5 100644 --- a/pumpkin-world/src/radial_chunk_iterator.rs +++ b/pumpkin-world/src/radial_chunk_iterator.rs @@ -1,5 +1,3 @@ -use std::collections::VecDeque; - pub struct RadialIterator { radius: i32, direction: usize, diff --git a/pumpkin-world/src/world.rs b/pumpkin-world/src/world.rs index 2f2c0d407..8f586bb7a 100644 --- a/pumpkin-world/src/world.rs +++ b/pumpkin-world/src/world.rs @@ -1,20 +1,13 @@ use std::{ io::{Read, Seek}, path::PathBuf, - sync::{atomic::AtomicUsize, Arc}, }; use flate2::bufread::ZlibDecoder; use itertools::Itertools; use rayon::prelude::*; use thiserror::Error; -use tokio::{ - fs::File, - io::{AsyncReadExt, AsyncSeekExt}, - runtime::Handle, - sync::{mpsc, oneshot, Mutex}, - task::spawn_blocking, -}; +use tokio::sync::mpsc; use crate::chunk::ChunkData; @@ -89,7 +82,11 @@ impl Level { region_file_path.push(format!("r.{}.{}.mca", region.0, region.1)); // return different error when file is not found (because that means that the chunks have just not been generated yet) - let mut region_file = match std::fs::File::options().read(true).write(false).open(®ion_file_path) { + let mut region_file = match std::fs::File::options() + .read(true) + .write(false) + .open(®ion_file_path) + { Ok(f) => f, Err(err) => match err.kind() { std::io::ErrorKind::NotFound => { @@ -173,7 +170,7 @@ impl Level { 4 => Compression::LZ4, _ => { let _ = - channel.send(((chunk.0, chunk.1), Err(WorldError::RegionIsInvalid))); + channel.blocking_send(((chunk.0, chunk.1), Err(WorldError::RegionIsInvalid))); return; } }; diff --git a/pumpkin/src/client/client_packet.rs b/pumpkin/src/client/client_packet.rs index edc5988ab..6f8668cb3 100644 --- a/pumpkin/src/client/client_packet.rs +++ b/pumpkin/src/client/client_packet.rs @@ -33,32 +33,37 @@ use super::{ /// Processes incoming Packets from the Client to the Server /// Implements the `Client` Packets impl Client { - pub fn handle_handshake(&mut self, _server: &mut Server, handshake: SHandShake) { + pub async fn handle_handshake(&mut self, _server: &mut Server, handshake: SHandShake) { self.protocol_version = handshake.protocol_version.0; self.connection_state = handshake.next_state; if self.connection_state == ConnectionState::Login { if self.protocol_version < CURRENT_MC_PROTOCOL as i32 { let protocol = self.protocol_version; - self.kick(&format!("Client outdated ({protocol}), Server uses Minecraft {CURRENT_MC_VERSION}, Protocol {CURRENT_MC_PROTOCOL}")); - return; + self.kick(&format!("Client outdated ({protocol}), Server uses Minecraft {CURRENT_MC_VERSION}, Protocol {CURRENT_MC_PROTOCOL}")).await; } else if self.protocol_version > CURRENT_MC_PROTOCOL as i32 { - self.kick(&format!("Server outdated, Server uses Minecraft {CURRENT_MC_VERSION}, Protocol {CURRENT_MC_PROTOCOL}")); + self.kick(&format!("Server outdated, Server uses Minecraft {CURRENT_MC_VERSION}, Protocol {CURRENT_MC_PROTOCOL}")).await; return; } } } - pub fn handle_status_request(&mut self, server: &mut Server, _status_request: SStatusRequest) { - self.send_packet(&CStatusResponse::new(&server.status_response_json)); + pub async fn handle_status_request( + &mut self, + server: &mut Server, + _status_request: SStatusRequest, + ) { + self.send_packet(&CStatusResponse::new(&server.status_response_json)) + .await; } - pub fn handle_ping_request(&mut self, _server: &mut Server, ping_request: SPingRequest) { + pub async fn handle_ping_request(&mut self, _server: &mut Server, ping_request: SPingRequest) { dbg!("ping"); - self.send_packet(&CPingResponse::new(ping_request.payload)); + self.send_packet(&CPingResponse::new(ping_request.payload)) + .await; self.close(); } - pub fn handle_login_start(&mut self, server: &mut Server, login_start: SLoginStart) { + pub async fn handle_login_start(&mut self, server: &mut Server, login_start: SLoginStart) { // TODO: do basic name validation dbg!("login start"); // default game profile, when no online mode @@ -79,7 +84,7 @@ impl Client { &verify_token, server.base_config.online_mode, // TODO ); - self.send_packet(&packet); + self.send_packet(&packet).await; } pub async fn handle_encryption_response( @@ -92,8 +97,10 @@ impl Client { .decrypt(Pkcs1v15Encrypt, &encryption_response.shared_secret) .map_err(|_| EncryptionError::FailedDecrypt) .unwrap(); - self.enable_encryption(&shared_secret) - .unwrap_or_else(|e| self.kick(&e.to_string())); + match self.enable_encryption(&shared_secret) { + Ok(_) => {} + Err(e) => self.kick(&e.to_string()).await, + } if server.base_config.online_mode { let hash = Sha1::new() @@ -120,7 +127,7 @@ impl Client { .allow_banned_players { if !p.is_empty() { - self.kick("Your account can't join"); + self.kick("Your account can't join").await; } } else { for allowed in server @@ -131,14 +138,14 @@ impl Client { .clone() { if !p.contains(&allowed) { - self.kick("Your account can't join"); + self.kick("Your account can't join").await; } } } } self.gameprofile = Some(p); } - Err(e) => self.kick(&e.to_string()), + Err(e) => self.kick(&e.to_string()).await, } } for ele in self.gameprofile.as_ref().unwrap().properties.clone() { @@ -153,15 +160,16 @@ impl Client { .packet_compression .compression_threshold; let level = server.advanced_config.packet_compression.compression_level; - self.send_packet(&CSetCompression::new(threshold.into())); + self.send_packet(&CSetCompression::new(threshold.into())) + .await; self.set_compression(Some((threshold, level))); } if let Some(profile) = self.gameprofile.as_ref().cloned() { let packet = CLoginSuccess::new(profile.id, &profile.name, &profile.properties, false); - self.send_packet(&packet); + self.send_packet(&packet).await; } else { - self.kick("game profile is none"); + self.kick("game profile is none").await; } } @@ -172,13 +180,13 @@ impl Client { ) { } - pub fn handle_login_acknowledged( + pub async fn handle_login_acknowledged( &mut self, server: &mut Server, _login_acknowledged: SLoginAcknowledged, ) { self.connection_state = ConnectionState::Config; - server.send_brand(self); + server.send_brand(self).await; let resource_config = &server.advanced_config.resource_pack; if resource_config.enabled { @@ -193,7 +201,8 @@ impl Client { resource_config.resource_pack_sha1.clone(), resource_config.force, prompt_message, - )); + )) + .await; } // known data packs @@ -201,7 +210,8 @@ impl Client { namespace: "minecraft", id: "core", version: "1.21", - }])); + }])) + .await; dbg!("login achnowlaged"); } pub fn handle_client_information_config( @@ -222,29 +232,38 @@ impl Client { }); } - pub fn handle_plugin_message(&mut self, _server: &mut Server, plugin_message: SPluginMessage) { + pub async fn handle_plugin_message( + &mut self, + _server: &mut Server, + plugin_message: SPluginMessage, + ) { if plugin_message.channel.starts_with("minecraft:brand") || plugin_message.channel.starts_with("MC|Brand") { dbg!("got a client brand"); match String::from_utf8(plugin_message.data) { Ok(brand) => self.brand = Some(brand), - Err(e) => self.kick(&e.to_string()), + Err(e) => self.kick(&e.to_string()).await, } } } - pub fn handle_known_packs(&mut self, server: &mut Server, _config_acknowledged: SKnownPacks) { + pub async fn handle_known_packs( + &mut self, + server: &mut Server, + _config_acknowledged: SKnownPacks, + ) { for registry in &server.cached_registry { self.send_packet(&CRegistryData::new( ®istry.registry_id, ®istry.registry_entries, - )); + )) + .await; } // We are done with configuring dbg!("finish config"); - self.send_packet(&CFinishConfig::new()); + self.send_packet(&CFinishConfig::new()).await; } pub async fn handle_config_acknowledged( diff --git a/pumpkin/src/client/connection.rs b/pumpkin/src/client/connection.rs new file mode 100644 index 000000000..23d2d2417 --- /dev/null +++ b/pumpkin/src/client/connection.rs @@ -0,0 +1,119 @@ +use pumpkin_protocol::{ + packet_decoder::PacketDecoder, packet_encoder::PacketEncoder, ClientPacket, PacketError, + RawPacket, VarIntType, +}; +use tokio::{ + io::{AsyncReadExt, BufReader}, + net::{ + tcp::{OwnedReadHalf, OwnedWriteHalf}, + TcpStream, + }, + sync::mpsc, +}; + +use super::try_send_packet::TrySendPacketInternal; + +#[derive(Debug)] +pub struct Connection { + pub writer: OwnedWriteHalf, + pub reader: mpsc::Receiver>, + pub encoder: PacketEncoder, + pub decoder: PacketDecoder, +} + +impl Connection { + pub fn new(stream: TcpStream, encoder: PacketEncoder, decoder: PacketDecoder) -> Self { + let (reader, writer) = stream.into_split(); + let (sender, receiver) = mpsc::channel(256); + tokio::spawn(async move { + let mut reader = BufReader::new(reader); + loop { + let mut buffer: Vec = Vec::with_capacity(4); + let size = match try_parse_var_int(&mut reader, &mut buffer).await { + Ok(v) => v as usize, + Err(e) => { + log::debug!("Client disconnected: (Error parsing var int: {})", e); + break; + } + }; + dbg!(&buffer); + dbg!(size); + let mut data_buffer = vec![0; size]; + match reader.read_exact(data_buffer.as_mut_slice()).await { + Ok(bytes_read) => { + if bytes_read != size { + panic!(); + } + } + Err(e) => { + log::debug!("Client disconnected: ({e})"); + break; + } + }; + let _ = sender.send([buffer, data_buffer].concat()).await; + } + }); + Self { + writer, + reader: receiver, + encoder, + decoder, + } + } + pub fn try_read(&mut self) -> Result, PacketError> { + let bytes = match self.reader.try_recv() { + Ok(v) => v, + Err(mpsc::error::TryRecvError::Empty) => return Ok(None), + _ => return Err(PacketError::ClientDisconnected), + }; + self.decoder.reserve(bytes.len()); + self.decoder.queue_slice(&bytes); + let packet = self.decoder.decode(); + match &packet { + Ok(Some(packet)) => log::debug!("reading packet: {}", packet.id.0), + Ok(None) => { + log::error!("Packet size is larger than said size"); + } + _ => {} + } + packet + } + pub async fn send(&mut self, packet: &P) -> Result<(), PacketError> { + log::debug!("sending packet: {}", P::PACKET_ID); + self.writer + .try_send_packet_internal(packet, &mut self.encoder) + .await + } +} + +const SEGMENT_BITS: u8 = 0x7F; +const CONTINUE_BIT: u8 = 0x80; +async fn try_parse_var_int( + reader: &mut T, + buf: &mut Vec, +) -> Result { + let mut value: i32 = 0; + let mut position: i32 = 0; + + loop { + let read = reader.read_u8().await?; + buf.push(read); + + value |= ((read & SEGMENT_BITS) as i32) << position; + + if read & CONTINUE_BIT == 0 { + break; + } + + position += 7; + + if position >= 32 { + return Err(std::io::Error::new( + std::io::ErrorKind::InvalidData, + "The VarInt is too big", + )); + } + } + + Ok(value) +} diff --git a/pumpkin/src/client/mod.rs b/pumpkin/src/client/mod.rs index 5bebd9c2c..5a98f32fc 100644 --- a/pumpkin/src/client/mod.rs +++ b/pumpkin/src/client/mod.rs @@ -1,9 +1,4 @@ -use std::{ - collections::VecDeque, - io::{self, Write}, - net::SocketAddr, - rc::Rc, -}; +use std::{collections::VecDeque, io, net::SocketAddr, sync::Arc}; use crate::{ entity::player::{ChatMode, GameMode, Hand, Player}, @@ -11,7 +6,8 @@ use crate::{ }; use authentication::GameProfile; -use mio::{event::Event, net::TcpStream, Token}; +use connection::Connection; +use mio::Token; use num_traits::ToPrimitive; use pumpkin_protocol::{ bytebuf::packet_id::Packet, @@ -36,13 +32,14 @@ use pumpkin_protocol::{ ClientPacket, ConnectionState, PacketError, RawPacket, ServerPacket, }; use pumpkin_text::TextComponent; - -use std::io::Read; use thiserror::Error; +use tokio::{io::AsyncWriteExt, net::TcpStream}; pub mod authentication; mod client_packet; +pub mod connection; pub mod player_packet; +pub mod try_send_packet; pub struct PlayerConfig { pub locale: String, // 16 @@ -67,16 +64,15 @@ pub struct Client { pub connection_state: ConnectionState, pub encrytion: bool, pub closed: bool, - pub token: Rc, - pub connection: TcpStream, + pub token: Arc, + pub connection: Connection, pub address: SocketAddr, - enc: PacketEncoder, - dec: PacketDecoder, pub client_packets_queue: VecDeque, + pub read_buf: Vec, } impl Client { - pub fn new(token: Rc, connection: TcpStream, address: SocketAddr) -> Self { + pub fn new(token: Arc, connection: TcpStream, address: SocketAddr) -> Self { Self { protocol_version: 0, gameprofile: None, @@ -86,12 +82,15 @@ impl Client { address, player: None, connection_state: ConnectionState::HandShake, - connection, - enc: PacketEncoder::default(), - dec: PacketDecoder::default(), + connection: Connection::new( + connection, + PacketEncoder::default(), + PacketDecoder::default(), + ), encrytion: true, closed: false, client_packets_queue: VecDeque::new(), + read_buf: vec![0; 4096], } } @@ -109,15 +108,17 @@ impl Client { let crypt_key: [u8; 16] = shared_secret .try_into() .map_err(|_| EncryptionError::SharedWrongLength)?; - self.dec.enable_encryption(&crypt_key); - self.enc.enable_encryption(&crypt_key); + self.connection.decoder.enable_encryption(&crypt_key); + self.connection.encoder.enable_encryption(&crypt_key); Ok(()) } // Compression threshold, Compression level pub fn set_compression(&mut self, compression: Option<(u32, u32)>) { - self.dec.set_compression(compression.map(|v| v.0)); - self.enc.set_compression(compression); + self.connection + .decoder + .set_compression(compression.map(|v| v.0)); + self.connection.encoder.set_compression(compression); } pub fn is_player(&self) -> bool { @@ -125,25 +126,26 @@ impl Client { } /// Send a Clientbound Packet to the Client - pub fn send_packet(&mut self, packet: &P) { - self.enc - .append_packet(packet) - .unwrap_or_else(|e| self.kick(&e.to_string())); - self.connection - .write_all(&self.enc.take()) + pub async fn send_packet(&mut self, packet: &P) { + match self + .connection + .send(packet) + .await .map_err(|_| PacketError::ConnectionWrite) - .unwrap_or_else(|e| self.kick(&e.to_string())); + { + Ok(_) => {} + Err(e) => self.kick(&e.to_string()).await, + } } - pub fn try_send_packet(&mut self, packet: &P) -> Result<(), PacketError> { - self.enc.append_packet(packet)?; - self.connection - .write_all(&self.enc.take()) - .map_err(|_| PacketError::ConnectionWrite)?; - Ok(()) + pub async fn try_send_packet( + &mut self, + packet: &P, + ) -> Result<(), PacketError> { + self.connection.send(packet).await } - pub fn teleport(&mut self, x: f64, y: f64, z: f64, yaw: f32, pitch: f32) { + pub async fn teleport(&mut self, x: f64, y: f64, z: f64, yaw: f32, pitch: f32) { assert!(self.is_player()); // TODO let id = 0; @@ -158,13 +160,15 @@ impl Client { entity.yaw = yaw; entity.pitch = pitch; player.awaiting_teleport = Some(id.into()); - self.send_packet(&CSyncPlayerPostion::new(x, y, z, yaw, pitch, 0, id.into())); + self.send_packet(&CSyncPlayerPostion::new(x, y, z, yaw, pitch, 0, id.into())) + .await; } - pub fn set_gamemode(&mut self, gamemode: GameMode) { + pub async fn set_gamemode(&mut self, gamemode: GameMode) { let player = self.player.as_mut().unwrap(); player.gamemode = gamemode; - self.send_packet(&CGameEvent::new(3, gamemode.to_f32().unwrap())); + self.send_packet(&CGameEvent::new(3, gamemode.to_f32().unwrap())) + .await; } pub async fn process_packets(&mut self, server: &mut Server) { @@ -184,6 +188,7 @@ impl Client { pumpkin_protocol::ConnectionState::HandShake => match packet.id.0 { SHandShake::PACKET_ID => { self.handle_handshake(server, SHandShake::read(bytebuf).unwrap()) + .await } _ => log::error!( "Failed to handle packet id {} while in Handshake state", @@ -193,9 +198,11 @@ impl Client { pumpkin_protocol::ConnectionState::Status => match packet.id.0 { SStatusRequest::PACKET_ID => { self.handle_status_request(server, SStatusRequest::read(bytebuf).unwrap()) + .await } SPingRequest::PACKET_ID => { self.handle_ping_request(server, SPingRequest::read(bytebuf).unwrap()) + .await } _ => log::error!( "Failed to handle packet id {} while in Status state", @@ -205,6 +212,7 @@ impl Client { pumpkin_protocol::ConnectionState::Login => match packet.id.0 { SLoginStart::PACKET_ID => { self.handle_login_start(server, SLoginStart::read(bytebuf).unwrap()) + .await } SEncryptionResponse::PACKET_ID => { self.handle_encryption_response( @@ -215,8 +223,13 @@ impl Client { } SLoginPluginResponse::PACKET_ID => self .handle_plugin_response(server, SLoginPluginResponse::read(bytebuf).unwrap()), - SLoginAcknowledged::PACKET_ID => self - .handle_login_acknowledged(server, SLoginAcknowledged::read(bytebuf).unwrap()), + SLoginAcknowledged::PACKET_ID => { + self.handle_login_acknowledged( + server, + SLoginAcknowledged::read(bytebuf).unwrap(), + ) + .await + } _ => log::error!( "Failed to handle packet id {} while in Login state", packet.id.0 @@ -229,6 +242,7 @@ impl Client { ), SPluginMessage::PACKET_ID => { self.handle_plugin_message(server, SPluginMessage::read(bytebuf).unwrap()) + .await } SAcknowledgeFinishConfig::PACKET_ID => { self.handle_config_acknowledged( @@ -239,6 +253,7 @@ impl Client { } SKnownPacks::PACKET_ID => { self.handle_known_packs(server, SKnownPacks::read(bytebuf).unwrap()) + .await } _ => log::error!( "Failed to handle packet id {} while in Config state", @@ -247,47 +262,61 @@ impl Client { }, pumpkin_protocol::ConnectionState::Play => { if self.player.is_some() { - self.handle_play_packet(server, packet); + self.handle_play_packet(server, packet).await; } else { // should be impossible - self.kick("no player in play state?") + self.kick("no player in play state?").await } } _ => log::error!("Invalid Connection state {:?}", self.connection_state), } } - pub fn handle_play_packet(&mut self, server: &mut Server, packet: &mut RawPacket) { + pub async fn handle_play_packet(&mut self, server: &mut Server, packet: &mut RawPacket) { let bytebuf = &mut packet.bytebuf; match packet.id.0 { SConfirmTeleport::PACKET_ID => { self.handle_confirm_teleport(server, SConfirmTeleport::read(bytebuf).unwrap()) + .await } SChatCommand::PACKET_ID => { self.handle_chat_command(server, SChatCommand::read(bytebuf).unwrap()) + .await } SPlayerPosition::PACKET_ID => { self.handle_position(server, SPlayerPosition::read(bytebuf).unwrap()) + .await + } + SPlayerPositionRotation::PACKET_ID => { + self.handle_position_rotation( + server, + SPlayerPositionRotation::read(bytebuf).unwrap(), + ) + .await } - SPlayerPositionRotation::PACKET_ID => self - .handle_position_rotation(server, SPlayerPositionRotation::read(bytebuf).unwrap()), SPlayerRotation::PACKET_ID => { self.handle_rotation(server, SPlayerRotation::read(bytebuf).unwrap()) + .await } SPlayerCommand::PACKET_ID => { self.handle_player_command(server, SPlayerCommand::read(bytebuf).unwrap()) } SSwingArm::PACKET_ID => { self.handle_swing_arm(server, SSwingArm::read(bytebuf).unwrap()) + .await } SChatMessage::PACKET_ID => { self.handle_chat_message(server, SChatMessage::read(bytebuf).unwrap()) + .await } SClientInformationPlay::PACKET_ID => self.handle_client_information_play( server, SClientInformationPlay::read(bytebuf).unwrap(), ), - SInteract::PACKET_ID => self.handle_interact(server, SInteract::read(bytebuf).unwrap()), + SInteract::PACKET_ID => { + self.handle_interact(server, SInteract::read(bytebuf).unwrap()) + .await + } SPlayerAction::PACKET_ID => { self.handle_player_action(server, SPlayerAction::read(bytebuf).unwrap()) } @@ -295,71 +324,29 @@ impl Client { } } - // Reads the connection until our buffer of len 4096 is full, then decode - /// Close connection when an error occurs - pub async fn poll(&mut self, server: &mut Server, event: &Event) { - if event.is_readable() { - let mut received_data = vec![0; 4096]; - let mut bytes_read = 0; - // We can (maybe) read from the connection. - loop { - match self.connection.read(&mut received_data[bytes_read..]) { - Ok(0) => { - // Reading 0 bytes means the other side has closed the - // connection or is done writing, then so are we. - self.close(); - break; - } - Ok(n) => { - bytes_read += n; - received_data.extend(&vec![0; n]); - } - // Would block "errors" are the OS's way of saying that the - // connection is not actually ready to perform this I/O operation. - Err(ref err) if would_block(err) => break, - Err(ref err) if interrupted(err) => continue, - // Other errors we'll consider fatal. - Err(_) => self.close(), - } - } - - if bytes_read != 0 { - self.dec.reserve(4096); - self.dec.queue_slice(&received_data[..bytes_read]); - match self.dec.decode() { - Ok(packet) => { - if let Some(packet) = packet { - self.add_packet(packet); - self.process_packets(server).await; - } - } - Err(err) => self.kick(&err.to_string()), - } - self.dec.clear(); - } - } - } - - pub fn send_system_message(&mut self, text: TextComponent) { - self.send_packet(&CSystemChatMessge::new(text, false)); + pub async fn send_system_message(&mut self, text: TextComponent) { + self.send_packet(&CSystemChatMessge::new(text, false)).await; } /// Kicks the Client with a reason depending on the connection state - pub fn kick(&mut self, reason: &str) { + pub async fn kick(&mut self, reason: &str) { dbg!(reason); match self.connection_state { ConnectionState::Login => { self.try_send_packet(&CLoginDisconnect::new( &serde_json::to_string_pretty(&reason).unwrap(), )) + .await .unwrap_or_else(|_| self.close()); } ConnectionState::Config => { self.try_send_packet(&CConfigDisconnect::new(reason)) + .await .unwrap_or_else(|_| self.close()); } ConnectionState::Play => { self.try_send_packet(&CPlayDisconnect::new(TextComponent::from(reason))) + .await .unwrap_or_else(|_| self.close()); } _ => { @@ -383,10 +370,6 @@ pub enum EncryptionError { SharedWrongLength, } -fn would_block(err: &io::Error) -> bool { - err.kind() == io::ErrorKind::WouldBlock -} - pub fn interrupted(err: &io::Error) -> bool { err.kind() == io::ErrorKind::Interrupted } diff --git a/pumpkin/src/client/player_packet.rs b/pumpkin/src/client/player_packet.rs index d5d601203..d8ad27b7e 100644 --- a/pumpkin/src/client/player_packet.rs +++ b/pumpkin/src/client/player_packet.rs @@ -30,7 +30,7 @@ fn modulus(a: f32, b: f32) -> f32 { /// Handles all Play Packets send by a real Player impl Client { - pub fn handle_confirm_teleport( + pub async fn handle_confirm_teleport( &mut self, _server: &mut Server, confirm_teleport: SConfirmTeleport, @@ -43,7 +43,7 @@ impl Client { } player.awaiting_teleport = None; } else { - self.kick("Send Teleport confirm, but we did not teleport") + self.kick("Send Teleport confirm, but we did not teleport").await } } @@ -55,9 +55,9 @@ impl Client { pos.clamp(-2.0E7, 2.0E7) } - pub fn handle_position(&mut self, server: &mut Server, position: SPlayerPosition) { + pub async fn handle_position(&mut self, server: &mut Server, position: SPlayerPosition) { if position.x.is_nan() || position.feet_y.is_nan() || position.z.is_nan() { - self.kick("Invalid movement"); + self.kick("Invalid movement").await; return; } let player = self.player.as_mut().unwrap(); @@ -86,10 +86,10 @@ impl Client { (z * 4096.0 - lastz * 4096.0) as i16, on_ground, ), - ); + ).await; } - pub fn handle_position_rotation( + pub async fn handle_position_rotation( &mut self, server: &mut Server, position_rotation: SPlayerPositionRotation, @@ -98,11 +98,11 @@ impl Client { || position_rotation.feet_y.is_nan() || position_rotation.z.is_nan() { - self.kick("Invalid movement"); + self.kick("Invalid movement").await; return; } if !position_rotation.yaw.is_finite() || !position_rotation.pitch.is_finite() { - self.kick("Invalid rotation"); + self.kick("Invalid rotation").await; return; } let player = self.player.as_mut().unwrap(); @@ -138,13 +138,13 @@ impl Client { pitch as u8, on_ground, ), - ); - server.broadcast_packet(self, &CHeadRot::new(entity_id.into(), yaw as u8)); + ).await; + server.broadcast_packet(self, &CHeadRot::new(entity_id.into(), yaw as u8)).await; } - pub fn handle_rotation(&mut self, server: &mut Server, rotation: SPlayerRotation) { + pub async fn handle_rotation(&mut self, server: &mut Server, rotation: SPlayerRotation) { if !rotation.yaw.is_finite() || !rotation.pitch.is_finite() { - self.kick("Invalid rotation"); + self.kick("Invalid rotation").await; return; } let player = self.player.as_mut().unwrap(); @@ -161,12 +161,12 @@ impl Client { server.broadcast_packet( self, &CUpdateEntityRot::new(entity_id.into(), yaw as u8, pitch as u8, on_ground), - ); - server.broadcast_packet(self, &CHeadRot::new(entity_id.into(), yaw as u8)); + ).await; + server.broadcast_packet(self, &CHeadRot::new(entity_id.into(), yaw as u8)).await; } - pub fn handle_chat_command(&mut self, _server: &mut Server, command: SChatCommand) { - handle_command(&mut CommandSender::Player(self), &command.command); + pub async fn handle_chat_command(&mut self, _server: &mut Server, command: SChatCommand) { + handle_command(&mut CommandSender::Player(self), &command.command).await; } pub fn handle_player_command(&mut self, _server: &mut Server, command: SPlayerCommand) { @@ -188,25 +188,25 @@ impl Client { } } - pub fn handle_swing_arm(&mut self, server: &mut Server, swing_arm: SSwingArm) { + pub async fn handle_swing_arm(&mut self, server: &mut Server, swing_arm: SSwingArm) { let animation = match Hand::from_i32(swing_arm.hand.0).unwrap() { Hand::Main => Animation::SwingMainArm, Hand::Off => Animation::SwingOffhand, }; let player = self.player.as_mut().unwrap(); let id = player.entity_id(); - server.broadcast_packet_expect(self, &CEntityAnimation::new(id.into(), animation as u8)) + server.broadcast_packet_expect(self, &CEntityAnimation::new(id.into(), animation as u8)).await; } - pub fn handle_chat_message(&mut self, server: &mut Server, chat_message: SChatMessage) { - let message = chat_message.message; + pub async fn handle_chat_message(&mut self, _server: &mut Server, chat_message: SChatMessage) { + let _message = chat_message.message; self.send_packet(&COpenScreen::new( VarInt(0), VarInt(WindowType::CraftingTable as i32), TextComponent::from("Test Crafter"), - )); + )).await; // TODO: filter message & validation - let gameprofile = self.gameprofile.as_ref().unwrap(); + let _gameprofile = self.gameprofile.as_ref().unwrap(); dbg!("got message"); // yeah a "raw system message", the ugly way to do that, but it works // server.broadcast_packet( @@ -263,7 +263,7 @@ impl Client { }); } - pub fn handle_interact(&mut self, server: &mut Server, interact: SInteract) { + pub async fn handle_interact(&mut self, server: &mut Server, interact: SInteract) { // TODO: do validation and stuff let config = &server.advanced_config.pvp; if config.enabled { @@ -277,10 +277,10 @@ impl Client { drop(client); if config.hurt_animation { // TODO - server.broadcast_packet(self, &CHurtAnimation::new(interact.entity_id, 10.0)) + server.broadcast_packet(self, &CHurtAnimation::new(interact.entity_id, 10.0)).await } } } } - pub fn handle_player_action(&mut self, _server: &mut Server, player_action: SPlayerAction) {} + pub fn handle_player_action(&mut self, _server: &mut Server, _player_action: SPlayerAction) {} } diff --git a/pumpkin/src/client/try_send_packet.rs b/pumpkin/src/client/try_send_packet.rs new file mode 100644 index 000000000..9478b638e --- /dev/null +++ b/pumpkin/src/client/try_send_packet.rs @@ -0,0 +1,29 @@ +use std::future::Future; + +use pumpkin_protocol::{packet_encoder::PacketEncoder, ClientPacket, PacketError}; +use tokio::io::AsyncWriteExt; + +pub trait TrySendPacketInternal { + fn try_send_packet_internal( + &mut self, + packet: &P, + encoder: &mut PacketEncoder, + ) -> impl Future>; +} + +impl TrySendPacketInternal for T +where + T: AsyncWriteExt + Unpin, +{ + async fn try_send_packet_internal( + &mut self, + packet: &P, + encoder: &mut PacketEncoder, + ) -> Result<(), PacketError> { + encoder.append_packet(packet)?; + self.write_all(&encoder.take()) + .await + .map_err(|_| PacketError::ConnectionWrite)?; + Ok(()) + } +} diff --git a/pumpkin/src/commands/gamemode.rs b/pumpkin/src/commands/gamemode.rs index af0953913..ceeabf81a 100644 --- a/pumpkin/src/commands/gamemode.rs +++ b/pumpkin/src/commands/gamemode.rs @@ -12,44 +12,46 @@ impl<'a> Command<'a> for GamemodeCommand { const DESCRIPTION: &'a str = "Changes the gamemode for a Player"; - fn on_execute(sender: &mut super::CommandSender<'a>, command: String) { + async fn on_execute(sender: &mut super::CommandSender<'a>, command: String) { let player = sender.as_mut_player().unwrap(); let args: Vec<&str> = command.split_whitespace().collect(); if args.len() != 2 { - player.send_system_message( - TextComponent::from("Usage: /gamemode ") - .color_named(pumpkin_text::color::NamedColor::Red), - ); + player + .send_system_message( + TextComponent::from("Usage: /gamemode ") + .color_named(pumpkin_text::color::NamedColor::Red), + ) + .await; return; } let mode_str = args[1].to_lowercase(); match mode_str.parse() { Ok(mode) => { - player.set_gamemode(mode); - player.send_system_message(format!("Set own game mode to {:?}", mode).into()); + player.set_gamemode(mode).await; + player + .send_system_message(format!("Set own game mode to {:?}", mode).into()) + .await; } Err(_) => { // try to parse from number - match mode_str.parse::() { - Ok(i) => match GameMode::from_u8(i) { - Some(mode) => { - player.set_gamemode(mode); - player.send_system_message( - format!("Set own game mode to {:?}", mode).into(), - ); - return; - } - None => {} - }, - Err(_) => {} - } - - player.send_system_message( - TextComponent::from("Invalid gamemode") - .color_named(pumpkin_text::color::NamedColor::Red), - ); + if let Ok(i) = mode_str.parse::() { if let Some(mode) = GameMode::from_u8(i) { + player.set_gamemode(mode).await; + player + .send_system_message( + format!("Set own game mode to {:?}", mode).into(), + ) + .await; + return; + } } + + player + .send_system_message( + TextComponent::from("Invalid gamemode") + .color_named(pumpkin_text::color::NamedColor::Red), + ) + .await; } } } diff --git a/pumpkin/src/commands/mod.rs b/pumpkin/src/commands/mod.rs index dc687430f..02168b95a 100644 --- a/pumpkin/src/commands/mod.rs +++ b/pumpkin/src/commands/mod.rs @@ -1,3 +1,5 @@ +use std::future::Future; + use gamemode::GamemodeCommand; use pumpkin::PumpkinCommand; use pumpkin_text::TextComponent; @@ -15,7 +17,7 @@ pub trait Command<'a> { const NAME: &'a str; const DESCRIPTION: &'a str; - fn on_execute(sender: &mut CommandSender<'a>, command: String); + fn on_execute(sender: &mut CommandSender<'a>, command: String) -> impl Future; /// Specifies wether the Command Sender has to be a Player /// TODO: implement @@ -31,11 +33,11 @@ pub enum CommandSender<'a> { } impl<'a> CommandSender<'a> { - pub fn send_message(&mut self, text: TextComponent) { + pub async fn send_message(&mut self, text: TextComponent) { match self { // TODO: add color and stuff to console CommandSender::Console => log::info!("{:?}", text.content), - CommandSender::Player(c) => c.send_system_message(text), + CommandSender::Player(c) => c.send_system_message(text).await, CommandSender::Rcon(s) => s.push(format!("{:?}", text.content)), } } @@ -63,21 +65,21 @@ impl<'a> CommandSender<'a> { } } } -pub fn handle_command(sender: &mut CommandSender, command: &str) { +pub async fn handle_command<'a>(sender: &'a mut CommandSender<'a>, command: &str) { let command = command.to_lowercase(); // an ugly mess i know if command.starts_with(PumpkinCommand::NAME) { - PumpkinCommand::on_execute(sender, command); + PumpkinCommand::on_execute(sender, command).await; return; } if command.starts_with(GamemodeCommand::NAME) { - GamemodeCommand::on_execute(sender, command); + GamemodeCommand::on_execute(sender, command).await; return; } if command.starts_with(StopCommand::NAME) { - StopCommand::on_execute(sender, command); + StopCommand::on_execute(sender, command).await; return; } // TODO: red color - sender.send_message("Command not Found".into()); + sender.send_message("Command not Found".into()).await; } diff --git a/pumpkin/src/commands/pumpkin.rs b/pumpkin/src/commands/pumpkin.rs index 8a648dd50..d9d18dc0e 100644 --- a/pumpkin/src/commands/pumpkin.rs +++ b/pumpkin/src/commands/pumpkin.rs @@ -12,9 +12,9 @@ impl<'a> Command<'a> for PumpkinCommand { const DESCRIPTION: &'a str = "Displays information about Pumpkin"; - fn on_execute(sender: &mut super::CommandSender<'a>, _command: String) { + async fn on_execute(sender: &mut super::CommandSender<'a>, _command: String) { let version = env!("CARGO_PKG_VERSION"); let description = env!("CARGO_PKG_DESCRIPTION"); - sender.send_message(TextComponent::from(format!("Pumpkin {version}, {description} (Minecraft {CURRENT_MC_VERSION}, Protocol {CURRENT_MC_PROTOCOL})")).color_named(NamedColor::Green)) + sender.send_message(TextComponent::from(format!("Pumpkin {version}, {description} (Minecraft {CURRENT_MC_VERSION}, Protocol {CURRENT_MC_PROTOCOL})")).color_named(NamedColor::Green)).await } } diff --git a/pumpkin/src/commands/stop.rs b/pumpkin/src/commands/stop.rs index 09d621228..641854c2d 100644 --- a/pumpkin/src/commands/stop.rs +++ b/pumpkin/src/commands/stop.rs @@ -8,7 +8,7 @@ impl<'a> Command<'a> for StopCommand { const NAME: &'static str = "stop"; const DESCRIPTION: &'static str = "Stops the server"; - fn on_execute(sender: &mut super::CommandSender<'a>, command: String) { + async fn on_execute(_sender: &mut super::CommandSender<'a>, _command: String) { std::process::exit(0); } fn player_required() -> bool { diff --git a/pumpkin/src/config/auth_config.rs b/pumpkin/src/config/auth_config.rs index 09c4188ff..81d1ecc7a 100644 --- a/pumpkin/src/config/auth_config.rs +++ b/pumpkin/src/config/auth_config.rs @@ -2,7 +2,7 @@ use serde::{Deserialize, Serialize}; use crate::client::authentication::ProfileAction; -#[derive(Deserialize, Serialize)] +#[derive(Deserialize, Serialize, Clone)] pub struct AuthenticationConfig { /// Whether to use Mojang authentication. pub enabled: bool, @@ -17,7 +17,7 @@ pub struct AuthenticationConfig { pub textures: TextureConfig, } -#[derive(Deserialize, Serialize)] +#[derive(Deserialize, Serialize, Clone)] pub struct PlayerProfileConfig { /// Allow players flagged by Mojang (banned, forced name change). pub allow_banned_players: bool, @@ -37,7 +37,7 @@ impl Default for PlayerProfileConfig { } } -#[derive(Deserialize, Serialize)] +#[derive(Deserialize, Serialize, Clone)] pub struct TextureConfig { /// Whether to use player textures. pub enabled: bool, @@ -60,7 +60,7 @@ impl Default for TextureConfig { } } -#[derive(Deserialize, Serialize)] +#[derive(Deserialize, Serialize, Clone)] pub struct TextureTypes { /// Use player skins. pub skin: bool, diff --git a/pumpkin/src/config/mod.rs b/pumpkin/src/config/mod.rs index f4dc19219..9f3875f20 100644 --- a/pumpkin/src/config/mod.rs +++ b/pumpkin/src/config/mod.rs @@ -12,7 +12,7 @@ pub mod resource_pack; /// Current Config version of the Base Config const CURRENT_BASE_VERSION: &str = "1.0.0"; -#[derive(Deserialize, Serialize)] +#[derive(Deserialize, Serialize, Clone)] /// The idea is that Pumpkin should very customizable, You can Enable or Disable Features depning on your needs. /// This also allows you get some Performance or Resource boosts. /// Important: The Configuration should match Vanilla by default @@ -44,7 +44,7 @@ impl Default for RCONConfig { } } -#[derive(Deserialize, Serialize)] +#[derive(Deserialize, Serialize, Clone)] pub struct CommandsConfig { /// Are commands from the Console accepted ? pub use_console: bool, @@ -57,7 +57,7 @@ impl Default for CommandsConfig { } } -#[derive(Deserialize, Serialize)] +#[derive(Deserialize, Serialize, Clone)] pub struct PVPConfig { /// Is PVP enabled ? pub enabled: bool, @@ -77,7 +77,7 @@ impl Default for PVPConfig { } } -#[derive(Deserialize, Serialize)] +#[derive(Deserialize, Serialize, Clone)] // Packet compression pub struct CompressionConfig { /// Is compression enabled ? @@ -114,7 +114,7 @@ impl Default for AdvancedConfiguration { } } -#[derive(Serialize, Deserialize)] +#[derive(Serialize, Deserialize, Clone)] pub struct BasicConfiguration { /// A version identifier for the configuration format. pub config_version: String, @@ -168,9 +168,11 @@ impl Default for BasicConfiguration { } impl AdvancedConfiguration { - pub fn load>(path: P) -> AdvancedConfiguration { + pub async fn load>(path: P) -> AdvancedConfiguration { if path.as_ref().exists() { - let toml = std::fs::read_to_string(path).expect("Couldn't read configuration"); + let toml = tokio::fs::read_to_string(path) + .await + .expect("Couldn't read configuration"); let config: AdvancedConfiguration = toml::from_str(toml.as_str()).expect("Couldn't parse features.toml, Proberbly old config, Replacing with a new one or just delete it"); config.validate(); @@ -178,7 +180,9 @@ impl AdvancedConfiguration { } else { let config = AdvancedConfiguration::default(); let toml = toml::to_string(&config).expect("Couldn't create toml!"); - std::fs::write(path, toml).expect("Couldn't save configuration"); + tokio::fs::write(path, toml) + .await + .expect("Couldn't save configuration"); config.validate(); config } @@ -189,16 +193,16 @@ impl AdvancedConfiguration { } impl BasicConfiguration { - pub fn load>(path: P) -> BasicConfiguration { + pub async fn load>(path: P) -> BasicConfiguration { if path.as_ref().exists() { - let toml = std::fs::read_to_string(path).expect("Couldn't read configuration"); + let toml = tokio::fs::read_to_string(path).await.expect("Couldn't read configuration"); let config: BasicConfiguration = toml::from_str(toml.as_str()).expect("Couldn't parse configuration.toml, Proberbly old config, Replacing with a new one or just delete it"); config.validate(); config } else { let config = BasicConfiguration::default(); let toml = toml::to_string(&config).expect("Couldn't create toml!"); - std::fs::write(path, toml).expect("Couldn't save configuration"); + tokio::fs::write(path, toml).await.expect("Couldn't save configuration"); config.validate(); config } diff --git a/pumpkin/src/config/resource_pack.rs b/pumpkin/src/config/resource_pack.rs index f3479240e..e18853dfb 100644 --- a/pumpkin/src/config/resource_pack.rs +++ b/pumpkin/src/config/resource_pack.rs @@ -1,6 +1,6 @@ use serde::{Deserialize, Serialize}; -#[derive(Deserialize, Serialize)] +#[derive(Deserialize, Serialize, Clone)] pub struct ResourcePackConfig { pub enabled: bool, /// The path to the resource pack. diff --git a/pumpkin/src/main.rs b/pumpkin/src/main.rs index 79a19884f..b0b3e9db4 100644 --- a/pumpkin/src/main.rs +++ b/pumpkin/src/main.rs @@ -1,17 +1,13 @@ -use mio::net::TcpListener; -use mio::{Events, Interest, Poll, Token}; -use std::io::{self}; - use client::Client; -use commands::handle_command; use config::AdvancedConfiguration; -use std::{collections::HashMap, rc::Rc, thread}; - -use client::interrupted; use config::BasicConfiguration; +use mio::Token; use server::Server; - +use std::time::Duration; +use std::{collections::HashMap, net::SocketAddr, sync::Arc}; +use tokio::{net::TcpListener, sync::mpsc, time::Instant}; +pub const TPS: usize = 20; // Setup some tokens to allow us to identify which event is for which socket. pub mod client; @@ -20,18 +16,23 @@ pub mod config; pub mod entity; pub mod rcon; pub mod server; +// pub mod status; pub mod util; #[cfg(feature = "dhat-heap")] #[global_allocator] static ALLOC: dhat::Alloc = dhat::Alloc; -#[cfg(not(target_os = "wasi"))] -fn main() -> io::Result<()> { +fn main() -> std::io::Result<()> { + let _begin_time = Instant::now(); + + simple_logger::SimpleLogger::new().init().unwrap(); + #[cfg(feature = "dhat-heap")] let _profiler = dhat::Profiler::new_heap(); #[cfg(feature = "dhat-heap")] println!("Using a memory profiler"); + let rt = tokio::runtime::Builder::new_multi_thread() .enable_all() .build() @@ -39,148 +40,60 @@ fn main() -> io::Result<()> { // ensure rayon is built outside of tokio scope rayon::ThreadPoolBuilder::new().build_global().unwrap(); rt.block_on(async { - const SERVER: Token = Token(0); - use std::{cell::RefCell, time::Instant}; - - use rcon::RCONServer; - - let time = Instant::now(); - let basic_config = BasicConfiguration::load("configuration.toml"); + let basic_config = BasicConfiguration::load("configuration.toml").await; + let advanced_config = AdvancedConfiguration::load("features.toml").await; - let advanced_configuration = AdvancedConfiguration::load("features.toml"); - - simple_logger::SimpleLogger::new().init().unwrap(); - - // Create a poll instance. - let mut poll = Poll::new()?; - // Create storage for events. - let mut events = Events::with_capacity(128); - - // Setup the TCP server socket. - - let addr = format!( + let addr: SocketAddr = format!( "{}:{}", basic_config.server_address, basic_config.server_port ) .parse() - .unwrap(); - - let mut listener = TcpListener::bind(addr)?; - - // Register the server with poll we can receive events for it. - poll.registry() - .register(&mut listener, SERVER, Interest::READABLE)?; - - // Unique token for each incoming connection. - let mut unique_token = Token(SERVER.0 + 1); - - let use_console = advanced_configuration.commands.use_console; - let rcon = advanced_configuration.rcon.clone(); - - let mut connections: HashMap>> = HashMap::new(); - - let mut server = Server::new((basic_config, advanced_configuration)); - log::info!("Started Server took {}ms", time.elapsed().as_millis()); - log::info!("You now can connect to the server"); + .expect("Could not parse server address"); + + let listener = TcpListener::bind(addr).await?; + + let (client_sender, mut client_receiver) = mpsc::channel(8); + // let _first_connection_responder = + // FirstConnectionResponder::new(basic_config.clone(), advanced_config.clone()); + let _listener_handle = tokio::spawn(async move { + loop { + let (client, address) = match listener.accept().await { + Ok(client) => client, + Err(_) => continue, + }; + log::info!("Accepted connection from: {}", address); + + // match first_connection_responder.handle(client, address).await { + // FirstConnectionResult::Play(client) => { + // // send the client to the main loop through a mpsc channel + // match client_sender.send(client).await { + // Ok(_) => {} + // Err(_) => {} + // } + // } + // FirstConnectionResult::ConnectionFinished => {} + // } + let client = Client::new(Arc::new(mio::Token(0)), client, address); + let _ = client_sender.send(client).await; + } + }); - if use_console { - thread::spawn(move || { - let stdin = std::io::stdin(); - loop { - let mut out = String::new(); - stdin - .read_line(&mut out) - .expect("Failed to read console line"); - handle_command(&mut commands::CommandSender::Console, &out); - } - }); - } - if rcon.enabled { - tokio::spawn(async move { - RCONServer::new(&rcon).await.unwrap(); - }); - } + let mut clients = HashMap::::new(); + let mut server = Server::new((basic_config, advanced_config)); + let mut interval = tokio::time::interval(Duration::from_millis(1000 / TPS as u64)); loop { - if let Err(err) = poll.poll(&mut events, None) { - if interrupted(&err) { - continue; - } - return Err(err); + // receive new clients + while let Ok(new_client) = client_receiver.try_recv() { + clients.insert(*new_client.token.as_ref(), new_client); } - for event in events.iter() { - match event.token() { - SERVER => loop { - // Received an event for the TCP server socket, which - // indicates we can accept an connection. - let (mut connection, address) = match listener.accept() { - Ok((connection, address)) => (connection, address), - Err(e) if e.kind() == io::ErrorKind::WouldBlock => { - // If we get a `WouldBlock` error we know our - // listener has no more incoming connections queued, - // so we can return to polling and wait for some - // more. - break; - } - Err(e) => { - // If it was any other kind of error, something went - // wrong and we terminate with an error. - return Err(e); - } - }; - if let Err(e) = connection.set_nodelay(true) { - log::warn!("failed to set TCP_NODELAY {e}"); - } - - log::info!("Accepted connection from: {}", address); - - let token = next(&mut unique_token); - poll.registry().register( - &mut connection, - token, - Interest::READABLE.add(Interest::WRITABLE), - )?; - let rc_token = Rc::new(token); - let client = Rc::new(RefCell::new(Client::new( - Rc::clone(&rc_token), - connection, - addr, - ))); - server.add_client(rc_token, Rc::clone(&client)); - connections.insert(token, client); - }, - - token => { - // Maybe received an event for a TCP connection. - let done = if let Some(client) = connections.get_mut(&token) { - let mut client = client.borrow_mut(); - client.poll(&mut server, event).await; - client.closed - } else { - // Sporadic events happen, we can safely ignore them. - false - }; - if done { - if let Some(client) = connections.remove(&token) { - server.remove_client(&token); - let mut client = client.borrow_mut(); - poll.registry().deregister(&mut client.connection)?; - } - } - } + for (_id, client) in clients.iter_mut() { + while let Ok(Some(mut packet)) = client.connection.try_read() { + // dbg!(&packet); + client.handle_packet(&mut server, &mut packet).await; } } + interval.tick().await; } }) } - -fn next(current: &mut Token) -> Token { - let next = current.0; - current.0 += 1; - Token(next) -} - -#[cfg(target_os = "wasi")] -fn main() { - panic!("can't bind to an address with wasi") -} diff --git a/pumpkin/src/rcon/mod.rs b/pumpkin/src/rcon/mod.rs index c3606ca2e..3cdef5168 100644 --- a/pumpkin/src/rcon/mod.rs +++ b/pumpkin/src/rcon/mod.rs @@ -186,7 +186,7 @@ impl RCONClient { handle_command( &mut crate::commands::CommandSender::Rcon(&mut output), packet.get_body(), - ); + ).await; for line in output { self.send(&mut Packet::new(packet.get_id(), PacketType::Output, line)) .await diff --git a/pumpkin/src/server.rs b/pumpkin/src/server.rs index fabd93570..da539f3ca 100644 --- a/pumpkin/src/server.rs +++ b/pumpkin/src/server.rs @@ -1,9 +1,10 @@ use std::{ - cell::{RefCell, RefMut}, collections::HashMap, io::Cursor, - rc::Rc, - sync::atomic::{AtomicI32, Ordering}, + sync::{ + atomic::{AtomicI32, Ordering}, + Arc, Mutex, MutexGuard, + }, time::Duration, }; @@ -59,7 +60,7 @@ pub struct Server { /// Cache the registry so we don't have to parse it every time a player joins pub cached_registry: Vec, - pub current_clients: HashMap, Rc>>, + pub current_clients: HashMap, Arc>>, // TODO: replace with HashMap entity_id: AtomicI32, // TODO: place this into every world @@ -117,25 +118,27 @@ impl Server { } // Returns Tokens to remove - pub async fn poll(&mut self, client: &mut Client, _poll: &Poll, event: &Event) { + pub async fn poll(&mut self, _client: &mut Client, _poll: &Poll, _event: &Event) { // TODO: Poll players in every world - client.poll(self, event).await + // client.poll(self, event).await } - pub fn add_client(&mut self, token: Rc, client: Rc>) { + pub fn add_client(&mut self, token: Arc, client: Arc>) { self.current_clients.insert(token, client); } - pub fn remove_client(&mut self, token: &Token) { + pub async fn remove_client(&mut self, token: &Token) { let client = self.current_clients.remove(token).unwrap(); - let client = client.borrow(); + let client = client.lock().unwrap(); // despawn the player // todo: put this into the entitiy struct if client.is_player() { let id = client.player.as_ref().unwrap().entity_id(); let uuid = client.gameprofile.as_ref().unwrap().id; - self.broadcast_packet_expect(&client, &CRemovePlayerInfo::new(1.into(), &[UUID(uuid)])); + self.broadcast_packet_expect(&client, &CRemovePlayerInfo::new(1.into(), &[UUID(uuid)])) + .await; self.broadcast_packet_expect(&client, &CRemoveEntities::new(&[id.into()])) + .await } } @@ -153,30 +156,34 @@ impl Server { client.player = Some(player); // login packet for our new player - client.send_packet(&CLogin::new( - entity_id, - self.base_config.hardcore, - &["minecraft:overworld"], - self.base_config.max_players.into(), - self.base_config.view_distance.into(), // TODO: view distance - self.base_config.simulation_distance.into(), // TODO: sim view dinstance - false, - false, - false, - 0.into(), - "minecraft:overworld", - 0, // seed - gamemode.to_u8().unwrap(), - self.base_config.default_gamemode.to_i8().unwrap(), - false, - false, - None, - 0.into(), - false, - )); + client + .send_packet(&CLogin::new( + entity_id, + self.base_config.hardcore, + &["minecraft:overworld"], + self.base_config.max_players.into(), + self.base_config.view_distance.into(), // TODO: view distance + self.base_config.simulation_distance.into(), // TODO: sim view dinstance + false, + false, + false, + 0.into(), + "minecraft:overworld", + 0, // seed + gamemode.to_u8().unwrap(), + self.base_config.default_gamemode.to_i8().unwrap(), + false, + false, + None, + 0.into(), + false, + )) + .await; dbg!("sending abilities"); // player abilities - client.send_packet(&CPlayerAbilities::new(0x02, 00.2, 0.1)); + client + .send_packet(&CPlayerAbilities::new(0x02, 00.2, 0.1)) + .await; // teleport let x = 10.0; @@ -184,7 +191,7 @@ impl Server { let z = 10.0; let yaw = 10.0; let pitch = 10.0; - client.teleport(x, y, z, 10.0, 10.0); + client.teleport(x, y, z, 10.0, 10.0).await; let gameprofile = client.gameprofile.as_ref().unwrap(); // first send info update to our new player, So he can see his Skin // also send his info to everyone else @@ -203,12 +210,13 @@ impl Server { ], }], ), - ); + ) + .await; // here we send all the infos of already joined players let mut entries = Vec::new(); for (_, client) in self.current_clients.iter().filter(|c| c.0 != &client.token) { - let client = client.borrow(); + let client = client.lock().unwrap(); if client.is_player() { let gameprofile = client.gameprofile.as_ref().unwrap(); entries.push(pumpkin_protocol::client::play::Player { @@ -223,10 +231,12 @@ impl Server { }) } } - client.send_packet(&CPlayerInfoUpdate::new(0x01 | 0x08, &entries)); + client + .send_packet(&CPlayerInfoUpdate::new(0x01 | 0x08, &entries)) + .await; // Start waiting for level chunks - client.send_packet(&CGameEvent::new(13, 0.0)); + client.send_packet(&CGameEvent::new(13, 0.0)).await; let gameprofile = client.gameprofile.as_ref().unwrap(); @@ -249,29 +259,32 @@ impl Server { 0.0, 0.0, ), - ); + ) + .await; // spawn players for our client let token = client.token.clone(); for (_, existing_client) in self.current_clients.iter().filter(|c| c.0 != &token) { - let existing_client = existing_client.borrow(); + let existing_client = existing_client.lock().unwrap(); if let Some(player) = &existing_client.player { let entity = &player.entity; let gameprofile = existing_client.gameprofile.as_ref().unwrap(); - client.send_packet(&CSpawnEntity::new( - player.entity_id().into(), - UUID(gameprofile.id), - EntityType::Player.to_i32().unwrap().into(), - entity.x, - entity.y, - entity.z, - entity.yaw, - entity.pitch, - entity.pitch, - 0.into(), - 0.0, - 0.0, - 0.0, - )) + client + .send_packet(&CSpawnEntity::new( + player.entity_id().into(), + UUID(gameprofile.id), + EntityType::Player.to_i32().unwrap().into(), + entity.x, + entity.y, + entity.z, + entity.yaw, + entity.pitch, + entity.pitch, + 0.into(), + 0.0, + 0.0, + 0.0, + )) + .await } } // entity meta data @@ -283,16 +296,17 @@ impl Server { Metadata::new(17, VarInt(0), config.skin_parts), ), ) + .await } Server::spawn_test_chunk(client).await; } /// TODO: This definitly should be in world - pub fn get_by_entityid(&self, from: &Client, id: EntityId) -> Option> { + pub fn get_by_entityid(&self, from: &Client, id: EntityId) -> Option> { for (_, client) in self.current_clients.iter().filter(|c| c.0 != &from.token) { // Check if client is a player - let client = client.borrow_mut(); + let client = client.lock().unwrap(); if client.is_player() && client.player.as_ref().unwrap().entity_id() == id { return Some(client); } @@ -301,30 +315,30 @@ impl Server { } /// Sends a Packet to all Players - pub fn broadcast_packet

(&self, from: &mut Client, packet: &P) + pub async fn broadcast_packet

(&self, from: &mut Client, packet: &P) where P: ClientPacket, { // we can't borrow twice at same time - from.send_packet(packet); + from.send_packet(packet).await; for (_, client) in self.current_clients.iter().filter(|c| c.0 != &from.token) { // Check if client is a player - let mut client = client.borrow_mut(); + let mut client = client.lock().unwrap(); if client.is_player() { - client.send_packet(packet); + client.send_packet(packet).await; } } } - pub fn broadcast_packet_expect

(&self, from: &Client, packet: &P) + pub async fn broadcast_packet_expect

(&self, from: &Client, packet: &P) where P: ClientPacket, { for (_, client) in self.current_clients.iter().filter(|c| c.0 != &from.token) { // Check if client is a player - let mut client = client.borrow_mut(); + let mut client = client.lock().unwrap(); if client.is_player() { - client.send_packet(packet); + client.send_packet(packet).await; } } } @@ -343,11 +357,13 @@ impl Server { .await; }); - client.send_packet(&CCenterChunk { - chunk_x: 0.into(), - chunk_z: 0.into(), - }); - + client + .send_packet(&CCenterChunk { + chunk_x: 0.into(), + chunk_z: 0.into(), + }) + .await; + while let Some((chunk_pos, chunk_data)) = chunk_receiver.recv().await { // dbg!(chunk_pos); let chunk_data = match chunk_data { @@ -366,7 +382,7 @@ impl Server { len / (1024 * 1024) ); } - client.send_packet(&CChunkData(&chunk_data)); + client.send_packet(&CChunkData(&chunk_data)).await; } let t = std::time::Instant::now().duration_since(inst); dbg!("DONE", t); @@ -385,12 +401,14 @@ impl Server { buf } - pub fn send_brand(&self, client: &mut Client) { + pub async fn send_brand(&self, client: &mut Client) { // send server brand - client.send_packet(&CPluginMessage::new( - "minecraft:brand", - &self.cached_server_brand, - )); + client + .send_packet(&CPluginMessage::new( + "minecraft:brand", + &self.cached_server_brand, + )) + .await; } pub fn build_response(config: &BasicConfiguration) -> StatusResponse { @@ -439,7 +457,7 @@ impl Server { } } -#[derive(PartialEq, Serialize, Deserialize)] +#[derive(PartialEq, Serialize, Deserialize, Clone)] pub enum Difficulty { Peaceful, Easy,