From 8cda670c9d01d2671cd02afe5e47329f5e0bee4b Mon Sep 17 00:00:00 2001 From: mvlabat Date: Fri, 19 Nov 2021 00:18:57 +0200 Subject: [PATCH] Refactor mr_matchmaker to work with GameServer resources --- Cargo.lock | 122 ++++++++++++++++++++++++++++--- bins/matchmaker/Cargo.toml | 8 +- bins/matchmaker/src/main.rs | 134 ++++++++++++++++++++++------------ k8s/agones/mr_server_fleet.tf | 5 ++ module.tf | 4 +- 5 files changed, 211 insertions(+), 62 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index c0de7115..8c93fa37 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1568,8 +1568,18 @@ version = "0.10.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0d706e75d87e35569db781a9b5e2416cff1236a47ed380831f959382ccd5f858" dependencies = [ - "darling_core", - "darling_macro", + "darling_core 0.10.2", + "darling_macro 0.10.2", +] + +[[package]] +name = "darling" +version = "0.12.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5f2c43f534ea4b0b049015d00269734195e6d3f0f6635cb692251aca6f9f8b3c" +dependencies = [ + "darling_core 0.12.4", + "darling_macro 0.12.4", ] [[package]] @@ -1582,7 +1592,21 @@ dependencies = [ "ident_case", "proc-macro2", "quote", - "strsim", + "strsim 0.9.3", + "syn", +] + +[[package]] +name = "darling_core" +version = "0.12.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8e91455b86830a1c21799d94524df0845183fa55bafd9aa137b01c7d1065fa36" +dependencies = [ + "fnv", + "ident_case", + "proc-macro2", + "quote", + "strsim 0.10.0", "syn", ] @@ -1592,7 +1616,18 @@ version = "0.10.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d9b5a2f4ac4969822c62224815d069952656cadc7084fdca9751e6d959189b72" dependencies = [ - "darling_core", + "darling_core 0.10.2", + "quote", + "syn", +] + +[[package]] +name = "darling_macro" +version = "0.12.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "29b5acf0dea37a7f66f7b25d2c5e93fd46f8f6968b1a5d7a3e02e97768afc95a" +dependencies = [ + "darling_core 0.12.4", "quote", "syn", ] @@ -1720,6 +1755,12 @@ version = "0.4.8" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "56899898ce76aaf4a0f24d914c97ea6ed976d42fec6ad33fcbb0a1103e07b2b0" +[[package]] +name = "dyn-clone" +version = "1.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ee2626afccd7561a06cf1367e2950c4718ea04565e20fb5029b6c7d8ad09abcf" + [[package]] name = "egui" version = "0.14.2" @@ -2886,9 +2927,9 @@ dependencies = [ [[package]] name = "kube" -version = "0.61.0" +version = "0.60.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dbaea4edf25bd83d77eebacd353bacd35d659678d57238aae07a8d739b26f1cd" +checksum = "a0ae4dcb1a65182551922303a2d292b463513a6727db5ad980afbd32df7f3c16" dependencies = [ "base64", "bytes 1.1.0", @@ -2921,11 +2962,10 @@ dependencies = [ [[package]] name = "kube-core" -version = "0.61.0" +version = "0.60.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "aa3021021480d7034e2cbe50bff9e8176e5ce33ddd6ba0c26ea9cfe310304c63" +checksum = "04ccd59635e9b21353da8d4a394bb5d3473b5965ed44496c8f857281b0625ffe" dependencies = [ - "chrono", "form_urlencoded", "http", "json-patch", @@ -2936,11 +2976,24 @@ dependencies = [ "thiserror", ] +[[package]] +name = "kube-derive" +version = "0.60.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e4191660b8e26f6e6cb06f21b5372bdbc2c76b54f7c3d65e7a8c8708f9c36ed5" +dependencies = [ + "darling 0.12.4", + "proc-macro2", + "quote", + "serde_json", + "syn", +] + [[package]] name = "kube-runtime" -version = "0.61.0" +version = "0.60.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b10dde88f05e8dc156d72736639476b62ae1bedf99680dc32556ac4de274a9ea" +checksum = "eec378b03890f9f2bfa9448a51aa0f6a4299f6bb2ed0d180330e628c7a395918" dependencies = [ "dashmap", "derivative", @@ -3323,12 +3376,16 @@ dependencies = [ "futures", "k8s-openapi", "kube", + "kube-derive", "kube-runtime", "log", "mr_messages_lib", "mr_utils_lib", "rymder", + "schemars", "sentry", + "serde", + "serde_derive", "serde_json", "tokio 1.12.0", "tokio-tungstenite", @@ -3626,7 +3683,7 @@ version = "0.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "05d1c6307dc424d0f65b9b06e94f88248e6305726b14729fd67a5e47b2dc481d" dependencies = [ - "darling", + "darling 0.10.2", "proc-macro-crate 0.1.5", "proc-macro2", "quote", @@ -4701,6 +4758,30 @@ dependencies = [ "winapi 0.3.9", ] +[[package]] +name = "schemars" +version = "0.8.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "271ac0c667b8229adf70f0f957697c96fafd7486ab7481e15dc5e45e3e6a4368" +dependencies = [ + "dyn-clone", + "schemars_derive", + "serde", + "serde_json", +] + +[[package]] +name = "schemars_derive" +version = "0.8.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6ebda811090b257411540779860bc09bf321bc587f58d2c5864309d1566214e7" +dependencies = [ + "proc-macro2", + "quote", + "serde_derive_internals", + "syn", +] + [[package]] name = "scoped-tls" version = "1.0.0" @@ -4873,6 +4954,17 @@ dependencies = [ "syn", ] +[[package]] +name = "serde_derive_internals" +version = "0.25.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1dbab34ca63057a1f15280bdf3c39f2b1eb1b54c17e98360e511637aef7418c6" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "serde_json" version = "1.0.68" @@ -5290,6 +5382,12 @@ version = "0.9.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6446ced80d6c486436db5c078dde11a9f73d42b57fb273121e160b84f63d894c" +[[package]] +name = "strsim" +version = "0.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "73473c0e59e6d5812c5dfe2a064a6444949f089e20eec9a2e5506596494e4623" + [[package]] name = "svg_fmt" version = "0.4.1" diff --git a/bins/matchmaker/Cargo.toml b/bins/matchmaker/Cargo.toml index d673a28a..08826262 100644 --- a/bins/matchmaker/Cargo.toml +++ b/bins/matchmaker/Cargo.toml @@ -13,12 +13,16 @@ mr_utils_lib = { path = "../../libs/utils_lib" } bincode = "1.3.3" env_logger = "0.8.1" futures = "0.3.17" -kube = "0.61.0" -kube-runtime = "0.61.0" +kube = "0.60.0" +kube-derive = "0.60.0" +kube-runtime = "0.60.0" log = "0.4.14" k8s-openapi = { version = "0.13.1", default-features = false, features = ["v1_20"] } rymder = "0.2.2" sentry = "0.21.0" +serde = "1.0.68" +serde_derive = "1.0.68" serde_json = "1.0.68" +schemars = "0.8.7" tokio = { version = "1.12.0", features = ["rt-multi-thread", "sync"] } tokio-tungstenite = "0.15.0" diff --git a/bins/matchmaker/src/main.rs b/bins/matchmaker/src/main.rs index 59d50b2e..f3344081 100644 --- a/bins/matchmaker/src/main.rs +++ b/bins/matchmaker/src/main.rs @@ -1,10 +1,13 @@ use futures::{future, pin_mut, SinkExt, StreamExt, TryStreamExt}; -use k8s_openapi::api::core::v1::Pod; use kube::{ api::{Api, ListParams, WatchEvent}, Client, }; +use kube_derive::CustomResource; use mr_messages_lib::{MatchmakerMessage, Server}; +use schemars::JsonSchema; +use serde::Deserializer; +use serde_derive::{Deserialize, Serialize}; use std::{ collections::HashMap, net::{IpAddr, SocketAddr}, @@ -23,6 +26,36 @@ pub struct Servers { servers: std::sync::Arc>>, } +#[derive(CustomResource, Debug, Serialize, Deserialize, Default, Clone, JsonSchema)] +#[kube(group = "agones.dev", version = "v1", kind = "GameServer", namespaced)] +#[kube(status = "GameServerStatus")] +pub struct GameServerSpec { + container: String, +} + +#[derive(Debug, Serialize, Deserialize, Default, Clone, JsonSchema)] +#[serde(rename_all = "camelCase")] +pub struct GameServerStatus { + state: String, + #[serde(deserialize_with = "deserialize_null_default")] + ports: Vec, + address: String, + node_name: String, + players: GameServerPlayerStatus, +} + +#[derive(Debug, Serialize, Deserialize, Default, Clone, JsonSchema)] +pub struct GameServerPort { + name: String, + port: u16, +} + +#[derive(Debug, Serialize, Deserialize, Default, Clone, JsonSchema)] +pub struct GameServerPlayerStatus { + count: u64, + capacity: u64, +} + impl Servers { pub async fn init(&self, initial_list: Vec) { let mut servers = self.servers.lock().await; @@ -77,34 +110,34 @@ async fn main() { let servers = Servers::default(); future::select( - tokio::spawn(watch_pods(tx.clone(), servers.clone())), + tokio::spawn(watch_game_servers(tx.clone(), servers.clone())), tokio::spawn(listen_websocket(tx, servers)), ) .await; } -async fn watch_pods(tx: Sender, servers: Servers) { +async fn watch_game_servers(tx: Sender, servers: Servers) { log::info!("Starting k8s client..."); let client = Client::try_default().await.unwrap(); - let pods: Api = Api::namespaced(client, "default"); + let game_servers: Api = Api::namespaced(client, "default"); let lp = ListParams::default().labels("app=mr_server").timeout(0); - let mut stream = pods + let mut stream = game_servers .watch(&lp, "0") .await - .expect("Failed to start watching pods") + .expect("Failed to start watching game servers") .boxed(); - log::info!("Watching pod updates..."); + log::info!("Watching GameServer updates..."); - let initial_list = pods + let initial_list = game_servers .list(&lp) .await - .expect("Failed to get a list of running pods") + .expect("Failed to get a list of running game servers") .items .into_iter() - .filter_map(|pod| server_from_resource(&pod)) + .filter_map(|gs| server_from_resource(&gs)) .collect::>(); let list_len = initial_list.len(); servers.init(initial_list).await; @@ -119,7 +152,7 @@ async fn watch_pods(tx: Sender, servers: Servers) { let message = match status { WatchEvent::Added(resource) => { if let Some(server) = server_from_resource(&resource) { - log::info!("New server: {:?}", server); + log::info!("New server: {:?}", resource.status); servers.add(server.clone()).await; Some(MatchmakerMessage::ServerUpdated(server)) } else { @@ -226,55 +259,64 @@ async fn handle_connection( log::info!("{} disconnected", addr); } -fn server_from_resource(resource: &Pod) -> Option { +fn server_from_resource(resource: &GameServer) -> Option { resource - .spec + .status .as_ref() - .and_then(|spec| { - spec.containers - .iter() - .find(|container| container.name == "mr-server") - .map(|container| { - ( - resource - .metadata - .name - .clone() - .expect("Expected a name for a server"), - container.clone(), - ) - }) - }) - .and_then(|(name, container)| { - let status = resource.status.as_ref().expect("Expected pod status"); - if status.phase.as_deref() != Some("Running") { - log::warn!("Pod {} is not yet in the running state", name); + .and_then(|status: &GameServerStatus| { + let name = match &resource.metadata.name { + Some(name) => name.clone(), + None => { + log::error!("GameServer doesn't have a name, skipping"); + return None; + } + }; + + if status.state != "Ready" { + log::info!( + "GameServer {} is not in Ready state (current: {}), skipping", + name, + status.state + ); return None; } - let host_ip = match &status.host_ip { - Some(host_ip) => host_ip.parse::().expect("Failed to parse host ip"), - None => { - log::warn!("Host ip of {} pod is not yet allocated", name); + let ip_addr = match status.address.parse::() { + Ok(ip) => ip, + Err(err) => { + log::error!( + "Skipping GameServer {} (failed to parse ip address '{}': {:?})", + name, + status.address, + err + ); return None; } }; - let port = match container.ports.and_then(|ports| { - ports - .iter() - .find(|port| port.protocol.as_deref() == Some("UDP")) - .cloned() - }) { - Some(port) => port.host_port.expect("Expected a host_port"), + let GameServerPort { port, .. } = match status + .ports + .iter() + .find(|port| port.name == "MUDDLE_LISTEN_PORT-udp") + .cloned() + { + Some(port) => port, None => { - log::error!("Pod {} doesn't expose a UDP port", name); + log::error!("GameServer {} doesn't expose a UDP port, skipping", name); return None; } }; - Some(Server { name, - addr: SocketAddr::new(host_ip, port as u16), + addr: SocketAddr::new(ip_addr, port), }) }) } + +fn deserialize_null_default<'de, D, T>(deserializer: D) -> Result +where + T: Default + serde::Deserialize<'de>, + D: Deserializer<'de>, +{ + let opt = as serde::Deserialize>::deserialize(deserializer)?; + Ok(opt.unwrap_or_default()) +} diff --git a/k8s/agones/mr_server_fleet.tf b/k8s/agones/mr_server_fleet.tf index e8907a3e..ccb847a8 100644 --- a/k8s/agones/mr_server_fleet.tf +++ b/k8s/agones/mr_server_fleet.tf @@ -29,6 +29,11 @@ resource "kubernetes_manifest" "mr_server_fleet" { type = "Recreate" } template = { + metadata = { + labels = { + app = "mr_server" + } + } spec = { ports = [ { diff --git a/module.tf b/module.tf index 4b161962..809182b3 100644 --- a/module.tf +++ b/module.tf @@ -119,12 +119,12 @@ module "matchmaker" { } module "web_client" { - source = "./k8s/web_client" + source = "./k8s/web_client" depends_on = [module.aws_load_balancer_controller] } module "service" { - source = "./k8s/service" + source = "./k8s/service" depends_on = [module.matchmaker, module.web_client] }