Skip to content

Commit

Permalink
feat: enhance game data processing with async support and update Game…
Browse files Browse the repository at this point in the history
…DataProcessor
  • Loading branch information
hsnks100 committed Dec 8, 2024
1 parent c293376 commit 614cadd
Show file tree
Hide file tree
Showing 4 changed files with 124 additions and 61 deletions.
51 changes: 32 additions & 19 deletions src/game_cache.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
use std::collections::{HashMap, VecDeque};
use std::fmt::Debug;
use std::sync::Arc;
use tokio::sync::Mutex;

const CACHE_SIZE: usize = 256;

Expand Down Expand Up @@ -96,10 +98,10 @@ pub struct FrameResult {
}

/// Represents the game data processor for handling inputs and outputs.
#[derive(Debug)]
#[derive(Debug, Clone)]
pub struct GameDataProcessor {
/// Map of connected clients.
clients: HashMap<u32, Box<dyn ClientTrait>>,
clients: HashMap<u32, Arc<Mutex<dyn ClientTrait>>>,
/// Global cache for aggregated game data.
aggregated_cache: GameCache,
/// Current frame index.
Expand All @@ -122,34 +124,41 @@ impl GameDataProcessor {

#[allow(dead_code)]
/// Adds a new client to the processor.
fn add_client(&mut self, client_id: u32, client: Box<dyn ClientTrait>) {
pub fn add_client(&mut self, client_id: u32, client: Arc<Mutex<dyn ClientTrait>>) {
self.clients.insert(client_id, client);
}

#[allow(dead_code)]
/// Processes incoming game data from a client.
/// Stores the data in the client's pending inputs.
fn process_incoming(&mut self, client_id: u32, data: Vec<u8>) {
let client = self.clients.get_mut(&client_id).expect("Client not found");
pub async fn process_incoming(&mut self, client_id: u32, data: Vec<u8>) {
let client = self.clients.get(&client_id).expect("Client not found");
let mut client = client.lock().await;
client.handle_incoming(data);
}

#[allow(dead_code)]
/// Processes a frame if inputs from all clients are available.
/// Returns Some(FrameResult) if the frame was processed.
pub fn process_frame(&mut self) -> Option<FrameResult> {
pub async fn process_frame(&mut self) -> Option<FrameResult> {
let mut frame_data = Vec::new();

let all_clients_have_input = self
.clients
.values()
.all(|client| client.has_pending_input());
let all_clients_have_input = {
let mut all_have_input = true;
for client in self.clients.values() {
let client = client.lock().await;
if !client.has_pending_input() {
all_have_input = false;
break;
}
}
all_have_input
};

if !all_clients_have_input {
return None;
}

for (&client_id, client) in &mut self.clients {
for (&client_id, client) in &self.clients {
let mut client = client.lock().await;
let input = client.get_next_input().expect("Input should be available");
frame_data.extend(input.clone());
self.frame_inputs
Expand Down Expand Up @@ -228,13 +237,13 @@ mod tests {
}
}

#[test]
fn test_game_data_processing_async() {
#[tokio::test]
async fn test_game_data_processing_async() {
let mut processor = GameDataProcessor::new();

// Simulate two clients connecting.
processor.add_client(1, Box::new(MockClient::new(1))); // Client ID 1
processor.add_client(2, Box::new(MockClient::new(2))); // Client ID 2
processor.add_client(1, Arc::new(Mutex::new(MockClient::new(1))));
processor.add_client(2, Arc::new(Mutex::new(MockClient::new(2))));

// Expected cache usage and positions per frame.
let expected_results = [
Expand Down Expand Up @@ -265,10 +274,10 @@ mod tests {
let mut frame_results = Vec::new();

for (client_id, input) in inputs {
processor.process_incoming(client_id, input);
processor.process_incoming(client_id, input).await;

// Attempt to process frames as inputs become available.
while let Some(frame_result) = processor.process_frame() {
while let Some(frame_result) = processor.process_frame().await {
frames_processed += 1;
frame_results.push(frame_result);
}
Expand All @@ -282,6 +291,10 @@ mod tests {
// Perform assertions on the frame results.
for (i, frame_result) in frame_results.iter().enumerate() {
let (exp_use_cache, exp_cache_pos) = expected_results[i];
println!(
"send to bytes: {}: {:?}",
frame_result.use_cache, frame_result.data_to_send
);
assert_eq!(
frame_result.use_cache,
exp_use_cache,
Expand Down
1 change: 1 addition & 0 deletions src/handlers/create_game.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ pub async fn handle_create_game(
max_players: 4,
game_status: 0, // Waiting
players,
processor: game_cache::GameDataProcessor::new(),
};

// Add game to games list
Expand Down
3 changes: 2 additions & 1 deletion src/handlers/data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ impl crate::game_cache::ClientTrait for ClientInfo {
}
}

#[derive(Clone)]
#[derive(Debug, Clone)]
pub struct GameInfo {
pub game_id: u32,
pub game_name: String,
Expand All @@ -70,4 +70,5 @@ pub struct GameInfo {
pub max_players: u8,
pub game_status: u8, // 0=Waiting, 1=Playing, 2=Netsync
pub players: HashSet<std::net::SocketAddr>, // 이 필드 추가
pub processor: crate::game_cache::GameDataProcessor, // GameDataProcessor 추가
}
130 changes: 89 additions & 41 deletions src/handlers/handlerf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ use std::sync::Arc;
use std::time::Instant;
use tokio::sync::{mpsc, Mutex};

use crate::game_cache::ClientTrait;
use crate::game_cache::GameDataProcessor;
use crate::packet_history::PacketHistory;
use crate::*;

Expand Down Expand Up @@ -287,59 +289,105 @@ pub async fn handle_game_data(
) -> Result<(), Box<dyn Error>> {
let mut buf = BytesMut::from(&message.data[..]);
let _ = buf.get_u8(); // Empty String
let length_of_game_data = buf.get_u16_le();
let game_data = buf.split_to(length_of_game_data as usize).to_vec();
let data_length = buf.get_u16_le() as usize;
let game_data = buf.split_to(data_length).to_vec();

let (client_info_clone, game_info_clone) = {
let mut clients_lock = clients.lock().await;
let client_info = match clients_lock.get_mut(src) {
Some(client_info) => client_info,
None => {
eprintln!("Client not found during ready to play signal: addr={}", src);
return Ok(());
}
};
let client_info_clone = client_info.clone();
let game_id = {
let clients_lock = clients.lock().await;
let client_info = clients_lock.get(src).ok_or("Client not found")?;
client_info.game_id.ok_or("Game ID not found")?
};

let mut games_lock = games.lock().await;
let game_id = match client_info.game_id {
Some(game_id) => game_id,
None => {
eprintln!(
"Game ID not found during ready to play signal: addr={}",
src
);
return Ok(());
}
};
let game_info = match games_lock.get_mut(&game_id) {
Some(game_info) => game_info,
None => {
eprintln!(
"Game not found during ready to play signal: game_id={}",
game_id,
);
return Ok(());
}
};
let game_info_clone = game_info.clone();
let mut games_lock = games.lock().await;
let game_info = games_lock.get_mut(&game_id).ok_or("Game not found")?;

(client_info_clone, game_info_clone)
let user_id = {
let clients_lock = clients.lock().await;
let client_info = clients_lock.get(src).expect("Client not found");
client_info.user_id
};
for player_addr in game_info_clone.players.iter() {
let mut data = BytesMut::new();
data.put_u8(0);
data.put_u16_le(length_of_game_data);
data.put(&game_data[..]);
util::send_packet(&tx, &packet_history, player_addr, 0x12, data.to_vec()).await?;
game_info
.processor
.process_incoming(user_id as u32, game_data.clone())
.await;

if let Some(frame_result) = game_info.processor.process_frame().await {
let data_to_send = if frame_result.use_cache {
vec![frame_result.cache_pos]
} else {
frame_result.data_to_send.clone()
};

util::send_packet(
&tx,
&packet_history,
src,
if frame_result.use_cache { 0x13 } else { 0x12 },
data_to_send,
)
.await?;
}

Ok(())
}

pub async fn handle_game_cache(
message: kaillera::protocol::ParsedMessage,
src: &std::net::SocketAddr,
tx: mpsc::Sender<Message>,
clients: Arc<Mutex<HashMap<std::net::SocketAddr, ClientInfo>>>,
games: Arc<Mutex<HashMap<u32, GameInfo>>>,
packet_history: Arc<PacketHistory>,
) -> Result<(), Box<dyn Error>> {
let mut buf = BytesMut::from(&message.data[..]);
let _ = buf.get_u8(); // Empty String
let cache_position = buf.get_u8();

let game_id = {
let clients_lock = clients.lock().await;
let client_info = clients_lock.get(src).ok_or("Client not found")?;
client_info.game_id.ok_or("Game ID not found")?
};

let mut games_lock = games.lock().await;
let game_info = games_lock.get_mut(&game_id).ok_or("Game not found")?;

let client_id = {
let clients_lock = clients.lock().await;
let client = clients_lock.get(src).expect("Client not found");
client.id()
};
let cached_data = {
let mut clients_lock = clients.lock().await;
let client = clients_lock.get_mut(src).expect("Client not found");
// client.id()
let receive_cache = client.get_receive_cache();
receive_cache.get(cache_position).cloned()
};

if let Some(data) = cached_data {
game_info
.processor
.process_incoming(client_id, data.clone())
.await;

if let Some(frame_result) = game_info.processor.process_frame().await {
let data_to_send = if frame_result.use_cache {
vec![frame_result.cache_pos]
} else {
frame_result.data_to_send.clone()
};

util::send_packet(
&tx,
&packet_history,
src,
if frame_result.use_cache { 0x13 } else { 0x12 },
data_to_send,
)
.await?;
}
}

Ok(())
}

0 comments on commit 614cadd

Please sign in to comment.