diff --git a/src/game_cache.rs b/src/game_cache.rs index fcb0c53..a61c365 100644 --- a/src/game_cache.rs +++ b/src/game_cache.rs @@ -1,5 +1,7 @@ use std::collections::{HashMap, VecDeque}; use std::fmt::Debug; +use std::sync::Arc; +use tokio::sync::Mutex; const CACHE_SIZE: usize = 256; @@ -96,10 +98,10 @@ pub struct FrameResult { } /// Represents the game data processor for handling inputs and outputs. -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct GameDataProcessor { /// Map of connected clients. - clients: HashMap>, + clients: HashMap>>, /// Global cache for aggregated game data. aggregated_cache: GameCache, /// Current frame index. @@ -122,34 +124,41 @@ impl GameDataProcessor { #[allow(dead_code)] /// Adds a new client to the processor. - fn add_client(&mut self, client_id: u32, client: Box) { + pub fn add_client(&mut self, client_id: u32, client: Arc>) { self.clients.insert(client_id, client); } - #[allow(dead_code)] /// Processes incoming game data from a client. /// Stores the data in the client's pending inputs. - fn process_incoming(&mut self, client_id: u32, data: Vec) { - let client = self.clients.get_mut(&client_id).expect("Client not found"); + pub async fn process_incoming(&mut self, client_id: u32, data: Vec) { + let client = self.clients.get(&client_id).expect("Client not found"); + let mut client = client.lock().await; client.handle_incoming(data); } - #[allow(dead_code)] /// Processes a frame if inputs from all clients are available. /// Returns Some(FrameResult) if the frame was processed. - pub fn process_frame(&mut self) -> Option { + pub async fn process_frame(&mut self) -> Option { let mut frame_data = Vec::new(); - let all_clients_have_input = self - .clients - .values() - .all(|client| client.has_pending_input()); + let all_clients_have_input = { + let mut all_have_input = true; + for client in self.clients.values() { + let client = client.lock().await; + if !client.has_pending_input() { + all_have_input = false; + break; + } + } + all_have_input + }; if !all_clients_have_input { return None; } - for (&client_id, client) in &mut self.clients { + for (&client_id, client) in &self.clients { + let mut client = client.lock().await; let input = client.get_next_input().expect("Input should be available"); frame_data.extend(input.clone()); self.frame_inputs @@ -228,13 +237,13 @@ mod tests { } } - #[test] - fn test_game_data_processing_async() { + #[tokio::test] + async fn test_game_data_processing_async() { let mut processor = GameDataProcessor::new(); // Simulate two clients connecting. - processor.add_client(1, Box::new(MockClient::new(1))); // Client ID 1 - processor.add_client(2, Box::new(MockClient::new(2))); // Client ID 2 + processor.add_client(1, Arc::new(Mutex::new(MockClient::new(1)))); + processor.add_client(2, Arc::new(Mutex::new(MockClient::new(2)))); // Expected cache usage and positions per frame. let expected_results = [ @@ -265,10 +274,10 @@ mod tests { let mut frame_results = Vec::new(); for (client_id, input) in inputs { - processor.process_incoming(client_id, input); + processor.process_incoming(client_id, input).await; // Attempt to process frames as inputs become available. - while let Some(frame_result) = processor.process_frame() { + while let Some(frame_result) = processor.process_frame().await { frames_processed += 1; frame_results.push(frame_result); } @@ -282,6 +291,10 @@ mod tests { // Perform assertions on the frame results. for (i, frame_result) in frame_results.iter().enumerate() { let (exp_use_cache, exp_cache_pos) = expected_results[i]; + println!( + "send to bytes: {}: {:?}", + frame_result.use_cache, frame_result.data_to_send + ); assert_eq!( frame_result.use_cache, exp_use_cache, diff --git a/src/handlers/create_game.rs b/src/handlers/create_game.rs index 5cdce89..cf8ca48 100644 --- a/src/handlers/create_game.rs +++ b/src/handlers/create_game.rs @@ -55,6 +55,7 @@ pub async fn handle_create_game( max_players: 4, game_status: 0, // Waiting players, + processor: game_cache::GameDataProcessor::new(), }; // Add game to games list diff --git a/src/handlers/data.rs b/src/handlers/data.rs index cdced45..0723ac7 100644 --- a/src/handlers/data.rs +++ b/src/handlers/data.rs @@ -60,7 +60,7 @@ impl crate::game_cache::ClientTrait for ClientInfo { } } -#[derive(Clone)] +#[derive(Debug, Clone)] pub struct GameInfo { pub game_id: u32, pub game_name: String, @@ -70,4 +70,5 @@ pub struct GameInfo { pub max_players: u8, pub game_status: u8, // 0=Waiting, 1=Playing, 2=Netsync pub players: HashSet, // 이 필드 추가 + pub processor: crate::game_cache::GameDataProcessor, // GameDataProcessor 추가 } diff --git a/src/handlers/handlerf.rs b/src/handlers/handlerf.rs index beb49c7..a57ee4d 100644 --- a/src/handlers/handlerf.rs +++ b/src/handlers/handlerf.rs @@ -6,6 +6,8 @@ use std::sync::Arc; use std::time::Instant; use tokio::sync::{mpsc, Mutex}; +use crate::game_cache::ClientTrait; +use crate::game_cache::GameDataProcessor; use crate::packet_history::PacketHistory; use crate::*; @@ -287,52 +289,45 @@ pub async fn handle_game_data( ) -> Result<(), Box> { let mut buf = BytesMut::from(&message.data[..]); let _ = buf.get_u8(); // Empty String - let length_of_game_data = buf.get_u16_le(); - let game_data = buf.split_to(length_of_game_data as usize).to_vec(); + let data_length = buf.get_u16_le() as usize; + let game_data = buf.split_to(data_length).to_vec(); - let (client_info_clone, game_info_clone) = { - let mut clients_lock = clients.lock().await; - let client_info = match clients_lock.get_mut(src) { - Some(client_info) => client_info, - None => { - eprintln!("Client not found during ready to play signal: addr={}", src); - return Ok(()); - } - }; - let client_info_clone = client_info.clone(); + let game_id = { + let clients_lock = clients.lock().await; + let client_info = clients_lock.get(src).ok_or("Client not found")?; + client_info.game_id.ok_or("Game ID not found")? + }; - let mut games_lock = games.lock().await; - let game_id = match client_info.game_id { - Some(game_id) => game_id, - None => { - eprintln!( - "Game ID not found during ready to play signal: addr={}", - src - ); - return Ok(()); - } - }; - let game_info = match games_lock.get_mut(&game_id) { - Some(game_info) => game_info, - None => { - eprintln!( - "Game not found during ready to play signal: game_id={}", - game_id, - ); - return Ok(()); - } - }; - let game_info_clone = game_info.clone(); + let mut games_lock = games.lock().await; + let game_info = games_lock.get_mut(&game_id).ok_or("Game not found")?; - (client_info_clone, game_info_clone) + let user_id = { + let clients_lock = clients.lock().await; + let client_info = clients_lock.get(src).expect("Client not found"); + client_info.user_id }; - for player_addr in game_info_clone.players.iter() { - let mut data = BytesMut::new(); - data.put_u8(0); - data.put_u16_le(length_of_game_data); - data.put(&game_data[..]); - util::send_packet(&tx, &packet_history, player_addr, 0x12, data.to_vec()).await?; + game_info + .processor + .process_incoming(user_id as u32, game_data.clone()) + .await; + + if let Some(frame_result) = game_info.processor.process_frame().await { + let data_to_send = if frame_result.use_cache { + vec![frame_result.cache_pos] + } else { + frame_result.data_to_send.clone() + }; + + util::send_packet( + &tx, + &packet_history, + src, + if frame_result.use_cache { 0x13 } else { 0x12 }, + data_to_send, + ) + .await?; } + Ok(()) } @@ -340,6 +335,59 @@ pub async fn handle_game_cache( message: kaillera::protocol::ParsedMessage, src: &std::net::SocketAddr, tx: mpsc::Sender, + clients: Arc>>, + games: Arc>>, + packet_history: Arc, ) -> Result<(), Box> { + let mut buf = BytesMut::from(&message.data[..]); + let _ = buf.get_u8(); // Empty String + let cache_position = buf.get_u8(); + + let game_id = { + let clients_lock = clients.lock().await; + let client_info = clients_lock.get(src).ok_or("Client not found")?; + client_info.game_id.ok_or("Game ID not found")? + }; + + let mut games_lock = games.lock().await; + let game_info = games_lock.get_mut(&game_id).ok_or("Game not found")?; + + let client_id = { + let clients_lock = clients.lock().await; + let client = clients_lock.get(src).expect("Client not found"); + client.id() + }; + let cached_data = { + let mut clients_lock = clients.lock().await; + let client = clients_lock.get_mut(src).expect("Client not found"); + // client.id() + let receive_cache = client.get_receive_cache(); + receive_cache.get(cache_position).cloned() + }; + + if let Some(data) = cached_data { + game_info + .processor + .process_incoming(client_id, data.clone()) + .await; + + if let Some(frame_result) = game_info.processor.process_frame().await { + let data_to_send = if frame_result.use_cache { + vec![frame_result.cache_pos] + } else { + frame_result.data_to_send.clone() + }; + + util::send_packet( + &tx, + &packet_history, + src, + if frame_result.use_cache { 0x13 } else { 0x12 }, + data_to_send, + ) + .await?; + } + } + Ok(()) }